Skip to content

Commit

Permalink
Merge pull request #2010 from wekoq/udp-tunnels-dest
Browse files Browse the repository at this point in the history
Add support for multiple udp server tunnels on one destionation
  • Loading branch information
orignal authored Jan 25, 2024
2 parents 70639f1 + 4b167fd commit 25e8210
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 59 deletions.
70 changes: 65 additions & 5 deletions libi2pd/Datagram.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2023, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
Expand All @@ -19,7 +19,7 @@ namespace i2p
namespace datagram
{
DatagramDestination::DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner, bool gzip):
m_Owner (owner), m_Receiver (nullptr), m_RawReceiver (nullptr), m_Gzip (gzip)
m_Owner (owner), m_DefaultReceiver (nullptr), m_DefaultRawReceiver (nullptr), m_Gzip (gzip)
{
if (m_Gzip)
m_Deflator.reset (new i2p::data::GzipDeflator);
Expand Down Expand Up @@ -119,19 +119,79 @@ namespace datagram

void DatagramDestination::HandleRawDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
if (m_RawReceiver)
m_RawReceiver (fromPort, toPort, buf, len);
auto r = FindRawReceiver(toPort);

if (r)
r (fromPort, toPort, buf, len);
else
LogPrint (eLogWarning, "DatagramDestination: no receiver for raw datagram");
}

void DatagramDestination::SetReceiver (const Receiver& receiver, uint16_t port)
{
std::lock_guard<std::mutex> lock(m_ReceiversMutex);
m_ReceiversByPorts[port] = receiver;
if (!m_DefaultReceiver) {
m_DefaultReceiver = receiver;
m_DefaultReceiverPort = port;
}
}

void DatagramDestination::ResetReceiver (uint16_t port)
{
std::lock_guard<std::mutex> lock(m_ReceiversMutex);
m_ReceiversByPorts.erase (port);
if (m_DefaultReceiverPort == port) {
m_DefaultReceiver = nullptr;
m_DefaultReceiverPort = 0;
}
}


void DatagramDestination::SetRawReceiver (const RawReceiver& receiver, uint16_t port)
{
std::lock_guard<std::mutex> lock(m_RawReceiversMutex);
m_RawReceiversByPorts[port] = receiver;
if (!m_DefaultRawReceiver) {
m_DefaultRawReceiver = receiver;
m_DefaultRawReceiverPort = port;
}
};

void DatagramDestination::ResetRawReceiver (uint16_t port)
{
std::lock_guard<std::mutex> lock(m_RawReceiversMutex);
m_RawReceiversByPorts.erase (port);
if (m_DefaultRawReceiverPort == port) {
m_DefaultRawReceiver = nullptr;
m_DefaultRawReceiverPort = 0;
}
}


DatagramDestination::Receiver DatagramDestination::FindReceiver(uint16_t port)
{
std::lock_guard<std::mutex> lock(m_ReceiversMutex);
Receiver r = m_Receiver;
Receiver r = nullptr;
auto itr = m_ReceiversByPorts.find(port);
if (itr != m_ReceiversByPorts.end())
r = itr->second;
else {
r = m_DefaultReceiver;
}
return r;
}

DatagramDestination::RawReceiver DatagramDestination::FindRawReceiver(uint16_t port)
{
std::lock_guard<std::mutex> lock(m_RawReceiversMutex);
RawReceiver r = nullptr;
auto itr = m_RawReceiversByPorts.find(port);
if (itr != m_RawReceiversByPorts.end())
r = itr->second;
else {
r = m_DefaultRawReceiver;
}
return r;
}

Expand Down
28 changes: 16 additions & 12 deletions libi2pd/Datagram.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
Expand Down Expand Up @@ -126,14 +126,12 @@ namespace datagram

void HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len, bool isRaw = false);

void SetReceiver (const Receiver& receiver) { m_Receiver = receiver; };
void ResetReceiver () { m_Receiver = nullptr; };

void SetReceiver (const Receiver& receiver, uint16_t port) { std::lock_guard<std::mutex> lock(m_ReceiversMutex); m_ReceiversByPorts[port] = receiver; };
void ResetReceiver (uint16_t port) { std::lock_guard<std::mutex> lock(m_ReceiversMutex); m_ReceiversByPorts.erase (port); };
void SetReceiver (const Receiver& receiver, uint16_t port);
void ResetReceiver (uint16_t port);

void SetRawReceiver (const RawReceiver& receiver) { m_RawReceiver = receiver; };
void ResetRawReceiver () { m_RawReceiver = nullptr; };
void SetRawReceiver (const RawReceiver& receiver, uint16_t port);
void ResetRawReceiver (uint16_t port);

std::shared_ptr<DatagramSession::Info> GetInfoForRemote(const i2p::data::IdentHash & remote);

Expand All @@ -150,20 +148,26 @@ namespace datagram
void HandleDatagram (uint16_t fromPort, uint16_t toPort, uint8_t *const& buf, size_t len);
void HandleRawDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);

/** find a receiver by port, if none by port is found try default receiver, otherwise returns nullptr */
Receiver FindReceiver(uint16_t port);
RawReceiver FindRawReceiver(uint16_t port);

private:

std::shared_ptr<i2p::client::ClientDestination> m_Owner;
Receiver m_Receiver; // default
RawReceiver m_RawReceiver; // default
bool m_Gzip; // gzip compression of data messages

std::mutex m_SessionsMutex;
std::map<i2p::data::IdentHash, DatagramSession_ptr > m_Sessions;

Receiver m_DefaultReceiver;
RawReceiver m_DefaultRawReceiver;
uint16_t m_DefaultReceiverPort;
uint16_t m_DefaultRawReceiverPort;
std::mutex m_ReceiversMutex;
std::map<uint16_t, Receiver> m_ReceiversByPorts;
std::mutex m_RawReceiversMutex;
std::unordered_map<uint16_t, Receiver> m_ReceiversByPorts;
std::unordered_map<uint16_t, RawReceiver> m_RawReceiversByPorts;

bool m_Gzip; // gzip compression of data messages
i2p::data::GzipInflator m_Inflator;
std::unique_ptr<i2p::data::GzipDeflator> m_Deflator;
std::vector<uint8_t> m_From, m_Signature;
Expand Down
4 changes: 2 additions & 2 deletions libi2pd_client/ClientContext.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2023, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
Expand Down Expand Up @@ -775,7 +775,7 @@ namespace client
address = "127.0.0.1";
}
auto localAddress = boost::asio::ip::address::from_string(address);
auto serverTunnel = std::make_shared<I2PUDPServerTunnel>(name, localDestination, localAddress, endpoint, port, gzip);
auto serverTunnel = std::make_shared<I2PUDPServerTunnel>(name, localDestination, localAddress, endpoint, inPort, gzip);
if(!isUniqueLocal)
{
LogPrint(eLogInfo, "Clients: Disabling loopback address mapping");
Expand Down
53 changes: 29 additions & 24 deletions libi2pd_client/SAM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,17 @@ namespace client
{
session->UDPEndpoint = forward;
auto dest = session->GetLocalDestination ()->CreateDatagramDestination ();
auto port = std::stoi(params[SAM_PARAM_PORT]);
if (type == eSAMSessionTypeDatagram)
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5),
port
);
else // raw
dest->SetRawReceiver (std::bind (&SAMSocket::HandleI2PRawDatagramReceive, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
port
);
}

if (session->GetLocalDestination ()->IsReady ())
Expand Down Expand Up @@ -524,7 +529,7 @@ namespace client
if (addr->IsIdentHash ())
{
if (session->GetLocalDestination ()->GetIdentHash () != addr->identHash)
{
{
auto leaseSet = session->GetLocalDestination ()->FindLeaseSet(addr->identHash);
if (leaseSet)
Connect(leaseSet, session);
Expand Down Expand Up @@ -556,7 +561,7 @@ namespace client
if (session)
{
if (session->GetLocalDestination ()->SupportsEncryptionType (remote->GetEncryptionType ()))
{
{
m_SocketType = eSAMSocketTypeStream;
m_Stream = session->GetLocalDestination ()->CreateStream (remote);
if (m_Stream)
Expand All @@ -570,7 +575,7 @@ namespace client
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
}
else
SendStreamCantReachPeer ("Incompatible crypto");
SendStreamCantReachPeer ("Incompatible crypto");
}
else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
Expand All @@ -583,7 +588,7 @@ namespace client
else
{
LogPrint (eLogError, "SAM: Destination to connect not found");
SendStreamCantReachPeer ("LeaseSet not found");
SendStreamCantReachPeer ("LeaseSet not found");
}
}

Expand Down Expand Up @@ -612,27 +617,27 @@ namespace client
session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
}
else
{
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
while (!session->acceptQueue.empty () && session->acceptQueue.front ().second + SAM_SESSION_MAX_ACCEPT_INTERVAL > ts)
{
{
auto socket = session->acceptQueue.front ().first;
session->acceptQueue.pop_front ();
if (socket)
m_Owner.GetService ().post (std::bind(&SAMSocket::TerminateClose, socket));
}
}
if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE)
{
// already accepting, queue up
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
session->acceptQueue.push_back (std::make_pair(shared_from_this(), ts));
}
else
{
}
else
{
LogPrint (eLogInfo, "SAM: Session ", m_ID, " accept queue is full ", session->acceptQueue.size ());
SendStreamI2PError ("Already accepting");
}
}
}
}
}
else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
Expand Down Expand Up @@ -875,8 +880,8 @@ namespace client
size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, reply, msg.c_str());
#endif
SendMessageReply (m_Buffer, len, true);
}
}

void SAMSocket::SendSessionI2PError(const std::string & msg)
{
LogPrint (eLogError, "SAM: Session I2P error: ", msg);
Expand All @@ -886,14 +891,14 @@ namespace client
void SAMSocket::SendStreamI2PError(const std::string & msg)
{
LogPrint (eLogError, "SAM: Stream I2P error: ", msg);
SendReplyWithMessage (SAM_STREAM_STATUS_I2P_ERROR, msg);
SendReplyWithMessage (SAM_STREAM_STATUS_I2P_ERROR, msg);
}

void SAMSocket::SendStreamCantReachPeer(const std::string & msg)
{
SendReplyWithMessage (SAM_STREAM_STATUS_CANT_REACH_PEER, msg);
}
SendReplyWithMessage (SAM_STREAM_STATUS_CANT_REACH_PEER, msg);
}

void SAMSocket::HandleNamingLookupLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet, std::string name)
{
if (leaseSet)
Expand Down Expand Up @@ -1093,22 +1098,22 @@ namespace client
// pending acceptors
auto ts = i2p::util::GetSecondsSinceEpoch ();
while (!session->acceptQueue.empty () && session->acceptQueue.front ().second + SAM_SESSION_MAX_ACCEPT_INTERVAL > ts)
{
{
auto socket = session->acceptQueue.front ().first;
session->acceptQueue.pop_front ();
if (socket)
m_Owner.GetService ().post (std::bind(&SAMSocket::TerminateClose, socket));
}
}
if (!session->acceptQueue.empty ())
{
{
auto socket = session->acceptQueue.front ().first;
session->acceptQueue.pop_front ();
if (socket && socket->GetSocketType () == eSAMSocketTypeAcceptor)
{
socket->m_IsAccepting = true;
session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, socket, std::placeholders::_1));
}
}
}
}
if (!m_IsSilent)
{
Expand Down
Loading

0 comments on commit 25e8210

Please sign in to comment.