Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-8.12.x' into candidate…
Browse files Browse the repository at this point in the history
…-9.0.x

Signed-off-by: Gavin Halliday <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
ghalliday committed Jan 19, 2024
2 parents 6e62b80 + c985142 commit 7a2d946
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
36 changes: 30 additions & 6 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
Thread::start();
started.wait();
}

~receive_data()
{
DBGLOG("Total data packets seen = %u OOO(%u) Requests(%u) Permits(%u)", dataPacketsReceived.load(), packetsOOO.load(), flowRequestsReceived.load(), flowRequestsSent.load());
Expand Down Expand Up @@ -1338,20 +1338,44 @@ class CReceiveManager : implements IReceiveManager, public CInterface
b = bufferManager->allocate();

unsigned int res;
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
while (true)
{
receive_socket->readtms(b->data, 1, DATA_PAYLOAD, res, timeout);
if (res!=sizeof(UdpRequestToSendMsg))
break;
//Read at least the size of the smallest packet we can receive
//static assert to check we are reading the smaller of the two possible packet types
static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader));
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);

//Even if a UDP packet is not split, very occasionally only some of the data may be present for the read.
//Slightly horribly this packet could be one of two different formats(!)
// a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd
// a UdpPacketHeader which has a 2 byte length. This length must be > sizeof(UdpPacketHeader).
//Since max_flow_cmd < sizeof(UdpPacketHeader) this can be used to distinguish a true data packet(!)
static_assert(flowType::max_flow_cmd < sizeof(UdpPacketHeader)); // assert to check the above comment is correct

if (hdr.length >= sizeof(UdpPacketHeader))
{
if (res == hdr.length)
break;

//Very rare situation - log it so that there is some evidence that it is occurring
OWARNLOG("Received partial network packet - %u bytes out of %u received", res, hdr.length);

//Because we are reading UDP datgrams rather than tcp packets, if we failed to read the whole datagram
//the rest of the datgram is lost - you cannot call readtms to read the rest of the datagram.
//Therefore throw this incomplete datagram away and allow the resend mechanism to retransmit it.
continue;
}

//Sanity check
assertex(res == sizeof(UdpRequestToSendMsg));

//Sending flow packets (eg send_completed) to the data thread ensures they do not overtake the data
//Redirect them to the flow thread to process them.
selfFlowSocket->write(b->data, res);
}

dataPacketsReceived++;
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
assert(hdr.length == res && hdr.length > sizeof(hdr));
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
Expand Down
7 changes: 4 additions & 3 deletions roxie/udplib/udptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,10 @@ class UdpReceiverEntry : public IUdpReceiverEntry
const char *data = buffer->data + sizeof(UdpPacketHeader);
const MemoryAttr &udpkey = getSecretUdpKey(true);
aesEncrypt(udpkey.get(), udpkey.length(), data, length, encryptBuffer);
header->length = encryptBuffer.length();
encryptBuffer.writeDirect(0, sizeof(UdpPacketHeader), header); // Only really need length updating
assert(length <= DATA_PAYLOAD);
UdpPacketHeader newHeader;
newHeader.length = encryptBuffer.length();
encryptBuffer.writeDirect(offsetof(UdpPacketHeader, length), sizeof(newHeader.length), &newHeader.length); // Only need to update the length - rest is the same
assertex(encryptBuffer.length() <= DATA_PAYLOAD);
if (udpTraceLevel > 5)
DBGLOG("ENCRYPT: Writing %u bytes to data socket", encryptBuffer.length());
data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length());
Expand Down

0 comments on commit 7a2d946

Please sign in to comment.