From 3a63f6775a1624c906d096d04eb147f5f7f9e912 Mon Sep 17 00:00:00 2001 From: orignal Date: Wed, 17 Jun 2015 10:47:26 -0400 Subject: [PATCH] pass I2NP message to transport session as shared_ptr --- NTCPSession.cpp | 42 +++++++++++++----------------------------- NTCPSession.h | 18 +++++++++--------- SSUData.cpp | 4 +--- SSUData.h | 2 +- SSUSession.cpp | 26 ++++++++------------------ SSUSession.h | 8 ++++---- TransportSession.h | 4 ++-- Transports.cpp | 10 ++++++++-- Transports.h | 8 +------- 9 files changed, 47 insertions(+), 75 deletions(-) diff --git a/NTCPSession.cpp b/NTCPSession.cpp index c5dac0a2..b7596997 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -83,8 +83,6 @@ namespace transport m_Socket.close (); transports.PeerDisconnected (shared_from_this ()); m_Server.RemoveNTCPSession (shared_from_this ()); - for (auto it: m_SendQueue) - DeleteI2NPMessage (it); m_SendQueue.clear (); if (m_NextMessage) { @@ -107,7 +105,7 @@ namespace transport m_DHKeysPair = nullptr; SendTimeSyncMessage (); - PostI2NPMessage (CreateDatabaseStoreMsg ()); // we tell immediately who we are + PostI2NPMessage (ToSharedI2NPMessage(CreateDatabaseStoreMsg ())); // we tell immediately who we are transports.PeerConnected (shared_from_this ()); } @@ -600,14 +598,14 @@ namespace transport return true; } - void NTCPSession::Send (i2p::I2NPMessage * msg) + void NTCPSession::Send (std::shared_ptr msg) { m_IsSending = true; boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector{ msg })); + std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector >{ msg })); } - boost::asio::const_buffers_1 NTCPSession::CreateMsgBuffer (I2NPMessage * msg) + boost::asio::const_buffers_1 NTCPSession::CreateMsgBuffer (std::shared_ptr msg) { uint8_t * sendBuffer; int len; @@ -616,10 +614,7 @@ namespace transport { // regular I2NP if (msg->offset < 2) - { - LogPrint (eLogError, "Malformed I2NP message"); - i2p::DeleteI2NPMessage (msg); - } + LogPrint (eLogError, "Malformed I2NP message"); // TODO: sendBuffer = msg->GetBuffer () - 2; len = msg->GetLength (); htobe16buf (sendBuffer, len); @@ -644,7 +639,7 @@ namespace transport } - void NTCPSession::Send (const std::vector& msgs) + void NTCPSession::Send (const std::vector >& msgs) { m_IsSending = true; std::vector bufs; @@ -654,11 +649,9 @@ namespace transport std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, msgs)); } - void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector msgs) + void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector > msgs) { m_IsSending = false; - for (auto it: msgs) - if (it) i2p::DeleteI2NPMessage (it); if (ecode) { LogPrint (eLogWarning, "Couldn't send msgs: ", ecode.message ()); @@ -686,20 +679,16 @@ namespace transport Send (nullptr); } - void NTCPSession::SendI2NPMessage (I2NPMessage * msg) + void NTCPSession::SendI2NPMessage (std::shared_ptr msg) { m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessage, shared_from_this (), msg)); } - void NTCPSession::PostI2NPMessage (I2NPMessage * msg) + void NTCPSession::PostI2NPMessage (std::shared_ptr msg) { if (msg) { - if (m_IsTerminated) - { - DeleteI2NPMessage (msg); - return; - } + if (m_IsTerminated) return; if (m_IsSending) m_SendQueue.push_back (msg); else @@ -707,19 +696,14 @@ namespace transport } } - void NTCPSession::SendI2NPMessages (const std::vector& msgs) + void NTCPSession::SendI2NPMessages (const std::vector >& msgs) { m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessages, shared_from_this (), msgs)); } - void NTCPSession::PostI2NPMessages (std::vector msgs) + void NTCPSession::PostI2NPMessages (std::vector > msgs) { - if (m_IsTerminated) - { - for (auto it: msgs) - DeleteI2NPMessage (it); - return; - } + if (m_IsTerminated) return; if (m_IsSending) { for (auto it: msgs) diff --git a/NTCPSession.h b/NTCPSession.h index 8e3e9073..b09713e4 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -61,13 +61,13 @@ namespace transport void ClientLogin (); void ServerLogin (); - void SendI2NPMessage (I2NPMessage * msg); - void SendI2NPMessages (const std::vector& msgs); + void SendI2NPMessage (std::shared_ptr msg); + void SendI2NPMessages (const std::vector >& msgs); private: - void PostI2NPMessage (I2NPMessage * msg); - void PostI2NPMessages (std::vector msgs); + void PostI2NPMessage (std::shared_ptr msg); + void PostI2NPMessages (std::vector > msgs); void Connected (); void SendTimeSyncMessage (); void SetIsEstablished (bool isEstablished) { m_IsEstablished = isEstablished; } @@ -96,10 +96,10 @@ namespace transport void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); bool DecryptNextBlock (const uint8_t * encrypted); - void Send (i2p::I2NPMessage * msg); - boost::asio::const_buffers_1 CreateMsgBuffer (I2NPMessage * msg); - void Send (const std::vector& msgs); - void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector msgs); + void Send (std::shared_ptr msg); + boost::asio::const_buffers_1 CreateMsgBuffer (std::shared_ptr msg); + void Send (const std::vector >& msgs); + void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector > msgs); // timer @@ -131,7 +131,7 @@ namespace transport i2p::I2NPMessagesHandler m_Handler; bool m_IsSending; - std::vector m_SendQueue; + std::vector > m_SendQueue; boost::asio::ip::address m_ConnectedFrom; // for ban }; diff --git a/SSUData.cpp b/SSUData.cpp index 4cacfd68..83b82aab 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -294,13 +294,12 @@ namespace transport ProcessFragments (buf); } - void SSUData::Send (i2p::I2NPMessage * msg) + void SSUData::Send (std::shared_ptr msg) { uint32_t msgID = msg->ToSSU (); if (m_SentMessages.count (msgID) > 0) { LogPrint (eLogWarning, "SSU message ", msgID, " already sent"); - DeleteI2NPMessage (msg); return; } if (m_SentMessages.empty ()) // schedule resend at first message only @@ -368,7 +367,6 @@ namespace transport len = 0; fragmentNum++; } - DeleteI2NPMessage (msg); } void SSUData::SendMsgAck (uint32_t msgID) diff --git a/SSUData.h b/SSUData.h index ff7bb96c..39d14cc4 100644 --- a/SSUData.h +++ b/SSUData.h @@ -89,7 +89,7 @@ namespace transport void ProcessMessage (uint8_t * buf, size_t len); void FlushReceivedMessage (); - void Send (i2p::I2NPMessage * msg); + void Send (std::shared_ptr msg); void UpdatePacketSize (const i2p::data::IdentHash& remoteIdent); diff --git a/SSUSession.cpp b/SSUSession.cpp index 1e4f0fe7..39ef9df0 100644 --- a/SSUSession.cpp +++ b/SSUSession.cpp @@ -262,7 +262,7 @@ namespace transport if (paddingSize > 0) paddingSize = 16 - paddingSize; payload += paddingSize; // TODO: verify signature (need data from session request), payload points to signature - m_Data.Send (CreateDeliveryStatusMsg (0)); + m_Data.Send (ToSharedI2NPMessage(CreateDeliveryStatusMsg (0))); Established (); } @@ -783,7 +783,7 @@ namespace transport m_DHKeysPair = nullptr; } m_Data.Start (); - m_Data.Send (CreateDatabaseStoreMsg ()); + m_Data.Send (ToSharedI2NPMessage(CreateDatabaseStoreMsg ())); transports.PeerConnected (shared_from_this ()); if (m_PeerTest && (m_RemoteRouter && m_RemoteRouter->IsPeerTesting ())) SendPeerTest (); @@ -832,39 +832,29 @@ namespace transport } } - void SSUSession::SendI2NPMessage (I2NPMessage * msg) + void SSUSession::SendI2NPMessage (std::shared_ptr msg) { GetService ().post (std::bind (&SSUSession::PostI2NPMessage, shared_from_this (), msg)); } - void SSUSession::PostI2NPMessage (I2NPMessage * msg) + void SSUSession::PostI2NPMessage (std::shared_ptr msg) { - if (msg) - { - if (m_State == eSessionStateEstablished) - m_Data.Send (msg); - else - DeleteI2NPMessage (msg); - } + if (msg &&m_State == eSessionStateEstablished) + m_Data.Send (msg); } - void SSUSession::SendI2NPMessages (const std::vector& msgs) + void SSUSession::SendI2NPMessages (const std::vector >& msgs) { GetService ().post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs)); } - void SSUSession::PostI2NPMessages (std::vector msgs) + void SSUSession::PostI2NPMessages (std::vector > msgs) { if (m_State == eSessionStateEstablished) { for (auto it: msgs) if (it) m_Data.Send (it); } - else - { - for (auto it: msgs) - DeleteI2NPMessage (it); - } } void SSUSession::ProcessData (uint8_t * buf, size_t len) diff --git a/SSUSession.h b/SSUSession.h index 5f980784..fd1ba39e 100644 --- a/SSUSession.h +++ b/SSUSession.h @@ -76,8 +76,8 @@ namespace transport void Done (); boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; }; bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); }; - void SendI2NPMessage (I2NPMessage * msg); - void SendI2NPMessages (const std::vector& msgs); + void SendI2NPMessage (std::shared_ptr msg); + void SendI2NPMessages (const std::vector >& msgs); void SendPeerTest (); // Alice SessionState GetState () const { return m_State; }; @@ -95,8 +95,8 @@ namespace transport boost::asio::io_service& GetService (); void CreateAESandMacKey (const uint8_t * pubKey); - void PostI2NPMessage (I2NPMessage * msg); - void PostI2NPMessages (std::vector msgs); + void PostI2NPMessage (std::shared_ptr msg); + void PostI2NPMessages (std::vector > msgs); void ProcessMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); // call for established session void ProcessSessionRequest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void SendSessionRequest (); diff --git a/TransportSession.h b/TransportSession.h index 95267452..dcd9f66c 100644 --- a/TransportSession.h +++ b/TransportSession.h @@ -71,8 +71,8 @@ namespace transport size_t GetNumSentBytes () const { return m_NumSentBytes; }; size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; - virtual void SendI2NPMessage (I2NPMessage * msg) = 0; - virtual void SendI2NPMessages (const std::vector& msgs) = 0; + virtual void SendI2NPMessage (std::shared_ptr msg) = 0; + virtual void SendI2NPMessages (const std::vector >& msgs) = 0; protected: diff --git a/Transports.cpp b/Transports.cpp index f288d113..c24bd508 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -255,11 +255,17 @@ namespace transport } } if (!it->second.sessions.empty ()) - it->second.sessions.front ()->SendI2NPMessages (msgs); + { + // TODO: remove this copy operation later + std::vector > msgs1; + for (auto it1: msgs) + msgs1.push_back (ToSharedI2NPMessage(it1)); + it->second.sessions.front ()->SendI2NPMessages (msgs1); + } else { for (auto it1: msgs) - it->second.delayedMessages.push_back (it1); + it->second.delayedMessages.push_back (ToSharedI2NPMessage(it1)); } } diff --git a/Transports.h b/Transports.h index ec04f6c7..d660a36c 100644 --- a/Transports.h +++ b/Transports.h @@ -62,19 +62,13 @@ namespace transport std::shared_ptr router; std::list > sessions; uint64_t creationTime; - std::vector delayedMessages; + std::vector > delayedMessages; void Done () { for (auto it: sessions) it->Done (); } - - ~Peer () - { - for (auto it :delayedMessages) - i2p::DeleteI2NPMessage (it); - } }; const size_t SESSION_CREATION_TIMEOUT = 10; // in seconds