Skip to content

Commit

Permalink
close stream if SYACK is not acked
Browse files Browse the repository at this point in the history
  • Loading branch information
orignal committed Nov 18, 2024
1 parent 3c4926f commit 5b2d0c5
Showing 1 changed file with 84 additions and 83 deletions.
167 changes: 84 additions & 83 deletions libi2pd/Streaming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1337,104 +1337,105 @@ namespace stream

void Stream::ResendPacket ()
{
// check for resend attempts
if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS)
{
LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
m_Status = eStreamStatusReset;
Close ();
return;
}
// check for resend attempts
if (m_SequenceNumber == 1 && m_NumResendAttempts > 0)
{
LogPrint (eLogWarning, "Streaming: SYNACK packet was not ACKed after ", m_NumResendAttempts, " attempts, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
m_Status = eStreamStatusReset;
Close ();
return;
}
if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS)
{
LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate, rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
m_Status = eStreamStatusReset;
Close ();
return;
}

// collect packets to resend
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
std::vector<Packet *> packets;
if (m_IsNAcked)
// collect packets to resend
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
std::vector<Packet *> packets;
if (m_IsNAcked)
{
for (auto it : m_NACKedPackets)
{
for (auto it : m_NACKedPackets)
if (ts >= it->sendTime + m_RTO)
{
if (ts >= it->sendTime + m_RTO)
{
if (ts < it->sendTime + m_RTO*2)
it->resent = true;
else
it->resent = false;
it->sendTime = ts;
packets.push_back (it);
if ((int)packets.size () >= m_NumPacketsToSend) break;
}
if (ts < it->sendTime + m_RTO*2)
it->resent = true;
else
it->resent = false;
it->sendTime = ts;
packets.push_back (it);
if ((int)packets.size () >= m_NumPacketsToSend) break;
}
}
else
}
else
{
for (auto it : m_SentPackets)
{
for (auto it : m_SentPackets)
if (ts >= it->sendTime + m_RTO)
{
if (ts >= it->sendTime + m_RTO)
{
if (ts < it->sendTime + m_RTO*2)
it->resent = true;
else
it->resent = false;
it->sendTime = ts;
packets.push_back (it);
if ((int)packets.size () >= m_NumPacketsToSend) break;
}
if (ts < it->sendTime + m_RTO*2)
it->resent = true;
else
it->resent = false;
it->sendTime = ts;
packets.push_back (it);
if ((int)packets.size () >= m_NumPacketsToSend) break;
}
}

// select tunnels if necessary and send
if (packets.size () > 0 && m_IsSendTime)
}

// select tunnels if necessary and send
if (packets.size () > 0 && m_IsSendTime)
{
if (m_IsNAcked) m_NumResendAttempts = 1;
else if (m_IsTimeOutResend) m_NumResendAttempts++;
if (m_NumResendAttempts == 1 && m_RTO != INITIAL_RTO)
{
// loss-based CC
if (!m_IsWinDropped && LOSS_BASED_CONTROL_ENABLED)
ProcessWindowDrop ();
}
else if (m_IsTimeOutResend)
{
if (m_IsNAcked) m_NumResendAttempts = 1;
else if (m_IsTimeOutResend) m_NumResendAttempts++;
if (m_NumResendAttempts == 1 && m_RTO != INITIAL_RTO)
m_IsTimeOutResend = false;
m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change
m_WindowDropTargetSize = INITIAL_WINDOW_SIZE;
m_LastWindowDropSize = 0;
m_WindowIncCounter = 0;
m_IsWinDropped = true;
m_IsFirstRttSample = true;
m_DropWindowDelaySequenceNumber = 0;
m_IsFirstACK = true;
UpdatePacingTime ();
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
if (m_NumResendAttempts & 1)
{
// loss-based CC
if (!m_IsWinDropped && LOSS_BASED_CONTROL_ENABLED)
ProcessWindowDrop ();
// pick another outbound tunnel
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
", another outbound tunnel has been selected for stream with sSID=", m_SendStreamID);
}
else if (m_IsTimeOutResend)
else
{
m_IsTimeOutResend = false;
m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change
m_WindowDropTargetSize = INITIAL_WINDOW_SIZE;
m_LastWindowDropSize = 0;
m_WindowIncCounter = 0;
m_IsWinDropped = true;
m_IsFirstRttSample = true;
m_DropWindowDelaySequenceNumber = 0;
m_IsFirstACK = true;
UpdatePacingTime ();
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
if (m_NumResendAttempts & 1)
{
// pick another outbound tunnel
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
", another outbound tunnel has been selected for stream with sSID=", m_SendStreamID);
}
else
{
CancelRemoteLeaseChange ();
UpdateCurrentRemoteLease (); // pick another lease
LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
", another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
}
CancelRemoteLeaseChange ();
UpdateCurrentRemoteLease (); // pick another lease
LogPrint (eLogWarning, "Streaming: Resend #", m_NumResendAttempts,
", another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID);
}
SendPackets (packets);
m_LastSendTime = ts;
m_IsSendTime = false;
if (m_IsNAcked || m_IsResendNeeded)
{
if (m_SequenceNumber > 1) // doesn't resend agressively very first packet
ScheduleSend ();
else
ScheduleResend ();
}
}
else
SendBuffer ();
if (!m_IsNAcked && !m_IsResendNeeded) ScheduleResend ();
SendPackets (packets);
m_LastSendTime = ts;
m_IsSendTime = false;
if (m_IsNAcked || m_IsResendNeeded) ScheduleSend ();
}
else
SendBuffer ();
if (!m_IsNAcked && !m_IsResendNeeded) ScheduleResend ();
}

void Stream::ScheduleAck (int timeout)
Expand Down

0 comments on commit 5b2d0c5

Please sign in to comment.