Skip to content

Commit

Permalink
Merge pull request #18108 from richardkchapman/udpresend
Browse files Browse the repository at this point in the history
HPCC-30960 Roxie packet resend logic may not handle receiver restart

Reviewed-by: Mark Kelly [email protected]
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Dec 7, 2023
2 parents 2daeb00 + 00cfe80 commit 7578701
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 4 deletions.
38 changes: 38 additions & 0 deletions devdoc/roxie.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,41 @@ We do pack into dataBuffers rather than MemoryBuffers, which avoids a need to co
What is the lifespan issue? In-flight queries may be abandoned when a server-side query fails, times out, or no longer needs the data. Using DataBuffer does not have this issue as they are attached to the query's memory manager/allocation once read. Or we could bypass the agent queue altogether, but rather more refactoring needed for that (might almost be easier to extent the "local optimization" mode to use multiple threads at that point)

abortPending, replyPending, and abortPendingData methods are unimplemented, which may lead to some inefficiencies?

Some notes on UDP packet sending mechanism
==========================================

Requests from server to agents are send via UDP (and have a size limit of 64k as a result). Historically they were sent using multicast to go to
all agents on a channel at the same time, but since most cloud providers do not support multicast, there has long been an option to
avoid multicast and send explicitly to the agent IPs. In bare metal systems these IPs are known via the topology file, and do not change. In cloud
systems the topology server provides the IPs of all agents for a channel.

In cloud systems, the list of IPs that a message was sent to is included in the message header, so that the IBYTI messages can be sent without requiring
that all agents/servers have the same topology information at any given moment (they will stay in sync because of topology server, but may be
temporarily out of sync when nodes are added/removed, until next time topology info is retrieved). This is controled by the SUBCHANNELS_IN_HEADER define.

Packets back from agents to server go via the udplib message-passing code. This can best be described by looking at the sending and receiving sides
separately.

When sending, results are split into individual packets (DataBuffers), each designed to be under 1 MTU in size. Traditionally this meant they were 1k,
but they can be set larger (8k is good). They do have to be a power of 2 because of how they are allocated from the roxiemem heap. The sender maintains
a set of UdpReceiverEntry objects, one for each server that it is conversing with. Each UdpReceiverEntry maintains multiple queues of data packets
waiting to be sent, one queue for each priority. The UdpReceiverEntry maintains a count of how many packets are contained across all its queues in
packetsQueued, so that it knows if there is data to send.

The priority levels are:
0: Out Of Band
1: Fast lane
2: Standard

This is designed to allow control information to be sent without getting blocked by data, and high priority queries to avoid being blocked by data going
to lower priority ones. The mechanism for deciding what packet to send next is a little odd though - rather than sending all higher-priorty packets
before any lower-priority ones, it round robins across the queues sending up to N^2 from queue 0 then up to N from queue 1 then 1 from queue 2, where N
is set by the UdpOutQsPriority option, or 1 if not set. This may be a mistake - probably any from queue 0 should be sent first, before round-robining
the other queues in this fashion.

UdpReceiverEntry objects are also responsible for maintaining a list of packets that have been sent but receiver has not yet indicated that they have
arrived.

If an agent has data ready for a given receiver, it will send a requestToSend to that receiver, and wait for a permitToSend response. Sequence numbers
are used to handle situations where these messages get lost. A permitToSend that does not contain the expected sequence number is ignored.
5 changes: 3 additions & 2 deletions roxie/udplib/udpsha.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,10 @@ bool PacketTracker::noteSeen(UdpPacketHeader &hdr)
else if (!resent)
packetsOOO++;
}
else if (resent)
else if (resent && base)
// Don't treat a resend that goes out of range as indicative of a restart - it probably just means
// that the resend was not needed and the original moved things on when it arrived
// that the resend was not needed and the original moved things on when it arrived. Unless base is 0
// in which case it probably means I restarted
duplicate = true;
else
{
Expand Down
5 changes: 4 additions & 1 deletion roxie/udplib/udpsim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,11 @@ void simulateTraffic()
{
if (restartReceiver)
{
Sleep(100);
Sleep(1000);
rm.clear();
DBGLOG("Killed receiver");
Sleep(500);
DBGLOG("Restarting receiver");
rm.setown(createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, false));
}
}
Expand Down
1 change: 0 additions & 1 deletion roxie/udplib/udptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ class UdpReceiverEntry : public IUdpReceiverEntry
const IpAddress ip;
std::atomic<unsigned> timeouts{0}; // Number of consecutive timeouts
std::atomic<unsigned> requestExpiryTime{0}; // Updated by send_flow thread, read by send_resend thread and send_data thread

static bool comparePacket(const void *pkData, const void *key)
{
UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;
Expand Down

0 comments on commit 7578701

Please sign in to comment.