diff --git a/devdoc/roxie.md b/devdoc/roxie.md index 60a808be4c8..042e8d245e4 100644 --- a/devdoc/roxie.md +++ b/devdoc/roxie.md @@ -295,3 +295,41 @@ We do pack into dataBuffers rather than MemoryBuffers, which avoids a need to co What is the lifespan issue? In-flight queries may be abandoned when a server-side query fails, times out, or no longer needs the data. Using DataBuffer does not have this issue as they are attached to the query's memory manager/allocation once read. Or we could bypass the agent queue altogether, but rather more refactoring needed for that (might almost be easier to extent the "local optimization" mode to use multiple threads at that point) abortPending, replyPending, and abortPendingData methods are unimplemented, which may lead to some inefficiencies? + +Some notes on UDP packet sending mechanism +========================================== + +Requests from server to agents are send via UDP (and have a size limit of 64k as a result). Historically they were sent using multicast to go to +all agents on a channel at the same time, but since most cloud providers do not support multicast, there has long been an option to +avoid multicast and send explicitly to the agent IPs. In bare metal systems these IPs are known via the topology file, and do not change. In cloud +systems the topology server provides the IPs of all agents for a channel. + +In cloud systems, the list of IPs that a message was sent to is included in the message header, so that the IBYTI messages can be sent without requiring +that all agents/servers have the same topology information at any given moment (they will stay in sync because of topology server, but may be +temporarily out of sync when nodes are added/removed, until next time topology info is retrieved). This is controled by the SUBCHANNELS_IN_HEADER define. + +Packets back from agents to server go via the udplib message-passing code. This can best be described by looking at the sending and receiving sides +separately. + +When sending, results are split into individual packets (DataBuffers), each designed to be under 1 MTU in size. Traditionally this meant they were 1k, +but they can be set larger (8k is good). They do have to be a power of 2 because of how they are allocated from the roxiemem heap. The sender maintains +a set of UdpReceiverEntry objects, one for each server that it is conversing with. Each UdpReceiverEntry maintains multiple queues of data packets +waiting to be sent, one queue for each priority. The UdpReceiverEntry maintains a count of how many packets are contained across all its queues in +packetsQueued, so that it knows if there is data to send. + +The priority levels are: + 0: Out Of Band + 1: Fast lane + 2: Standard + +This is designed to allow control information to be sent without getting blocked by data, and high priority queries to avoid being blocked by data going +to lower priority ones. The mechanism for deciding what packet to send next is a little odd though - rather than sending all higher-priorty packets +before any lower-priority ones, it round robins across the queues sending up to N^2 from queue 0 then up to N from queue 1 then 1 from queue 2, where N +is set by the UdpOutQsPriority option, or 1 if not set. This may be a mistake - probably any from queue 0 should be sent first, before round-robining +the other queues in this fashion. + +UdpReceiverEntry objects are also responsible for maintaining a list of packets that have been sent but receiver has not yet indicated that they have +arrived. + +If an agent has data ready for a given receiver, it will send a requestToSend to that receiver, and wait for a permitToSend response. Sequence numbers +are used to handle situations where these messages get lost. A permitToSend that does not contain the expected sequence number is ignored. diff --git a/roxie/udplib/udpsha.cpp b/roxie/udplib/udpsha.cpp index b1a0bd0bbfa..de33bc6243e 100644 --- a/roxie/udplib/udpsha.cpp +++ b/roxie/udplib/udpsha.cpp @@ -398,9 +398,10 @@ bool PacketTracker::noteSeen(UdpPacketHeader &hdr) else if (!resent) packetsOOO++; } - else if (resent) + else if (resent && base) // Don't treat a resend that goes out of range as indicative of a restart - it probably just means - // that the resend was not needed and the original moved things on when it arrived + // that the resend was not needed and the original moved things on when it arrived. Unless base is 0 + // in which case it probably means I restarted duplicate = true; else { diff --git a/roxie/udplib/udpsim.cpp b/roxie/udplib/udpsim.cpp index 9015e7b2811..a4f20351936 100644 --- a/roxie/udplib/udpsim.cpp +++ b/roxie/udplib/udpsim.cpp @@ -254,8 +254,11 @@ void simulateTraffic() { if (restartReceiver) { - Sleep(100); + Sleep(1000); rm.clear(); + DBGLOG("Killed receiver"); + Sleep(500); + DBGLOG("Restarting receiver"); rm.setown(createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, false)); } } diff --git a/roxie/udplib/udptrs.cpp b/roxie/udplib/udptrs.cpp index e9dd155b351..e9aad21ce78 100644 --- a/roxie/udplib/udptrs.cpp +++ b/roxie/udplib/udptrs.cpp @@ -252,7 +252,6 @@ class UdpReceiverEntry : public IUdpReceiverEntry const IpAddress ip; std::atomic timeouts{0}; // Number of consecutive timeouts std::atomic requestExpiryTime{0}; // Updated by send_flow thread, read by send_resend thread and send_data thread - static bool comparePacket(const void *pkData, const void *key) { UdpPacketHeader *dataHdr = (UdpPacketHeader*) ((DataBuffer*)pkData)->data;