Skip to content

Commit

Permalink
Merge pull request #18127 from ghalliday/issue30992
Browse files Browse the repository at this point in the history
HPCC-30992 Fix intermittent deadlock in roxie worker->server communication

Reviewed-by: Mark Kelly [email protected]
Reviewed-By: Richard Chapman <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Dec 13, 2023
2 parents f223818 + 960ebbe commit d83fce2
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions roxie/udplib/udptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,20 @@ class UdpReceiverEntry : public IUdpReceiverEntry
activeFlowSequence = seq;
return seq;
}

bool hasDataToSend() const
{
return (packetsQueued.load(std::memory_order_relaxed) || (resendList && resendList->numActive()));
}

void setRequestExpiryTime(unsigned newExpiryTime)
{
//requestExpiryTime 0 should only be used if there is no data to send. Ensure it is non zero otherwise.
if (newExpiryTime == 0)
newExpiryTime == 1;
requestExpiryTime.store(newExpiryTime);
}

void sendStart(unsigned packets)
{
UdpRequestToSendMsg msg;
Expand Down Expand Up @@ -326,7 +335,7 @@ class UdpReceiverEntry : public IUdpReceiverEntry
msg.cmd = flowType::request_to_send;
msg.packets = 0;
msg.flowSeq = nextFlowSequence();
requestExpiryTime = msTick() + udpFlowAckTimeout;
setRequestExpiryTime(msTick() + udpFlowAckTimeout);
block.leave();
sendRequest(msg, false);
}
Expand All @@ -339,7 +348,7 @@ class UdpReceiverEntry : public IUdpReceiverEntry

//The flow event is sent on the data socket, so it needs to wait for all the data to be sent before being received
//therefore use the updDataSendTimeout instead of udpFlowAckTimeout
requestExpiryTime = msTick() + updDataSendTimeout;
setRequestExpiryTime(msTick() + updDataSendTimeout);
block.leave();
sendRequest(msg, true);
}
Expand Down Expand Up @@ -367,7 +376,7 @@ class UdpReceiverEntry : public IUdpReceiverEntry
msg.sendSeq = nextSendSequence;
msg.flowSeq = nextFlowSequence();
msg.sourceNode = sourceIP;
requestExpiryTime = msTick() + udpFlowAckTimeout;
setRequestExpiryTime(msTick() + udpFlowAckTimeout);
block.leave();
sendRequest(msg, false);
}
Expand All @@ -394,6 +403,11 @@ class UdpReceiverEntry : public IUdpReceiverEntry
CLeavableCriticalBlock block(activeCrit);
if (maxRequestDeadTimeouts && (timeouts >= maxRequestDeadTimeouts))
{
int timeExpired = msTick()-requestExpiryTime;
StringBuffer s;
EXCLOG(MCoperatorError,"ERROR: UdpSender: too many timeouts - aborting sends. Timed out %i times (flow=%u, max=%i, timeout=%u, expiryTime=%u[%u] ack(%u)) waiting ok_to_send for %u packets from node=%s",
timeouts.load(), activeFlowSequence.load(), maxRequestDeadTimeouts, udpFlowAckTimeout, requestExpiryTime.load(), timeExpired, (int)hadAcknowledgement, packetsQueued.load(), ip.getIpText(s).str());

abort();
return;
}
Expand All @@ -405,7 +419,7 @@ class UdpReceiverEntry : public IUdpReceiverEntry
msg.sendSeq = nextSendSequence;
msg.flowSeq = activeFlowSequence;
msg.sourceNode = sourceIP;
requestExpiryTime = msTick() + udpFlowAckTimeout;
setRequestExpiryTime(msTick() + udpFlowAckTimeout);
block.leave();
sendRequest(msg, false);
}
Expand All @@ -430,7 +444,7 @@ class UdpReceiverEntry : public IUdpReceiverEntry
hadAcknowledgement = true;
CriticalBlock b(activeCrit);
if (requestExpiryTime)
requestExpiryTime = msTick() + udpRequestTimeout; // set a timeout in case an ok_to_send message goes missing
setRequestExpiryTime(msTick() + udpRequestTimeout); // set a timeout in case an ok_to_send message goes missing
}

#ifdef TEST_DROPPED_PACKETS
Expand Down Expand Up @@ -655,8 +669,11 @@ class UdpReceiverEntry : public IUdpReceiverEntry
DBGLOG("UdpSender: abort sending queued data to node=%s", ip.getIpText(s).str());
}
timeouts = 0;
requestExpiryTime = 0;
removeData(nullptr, nullptr);

CriticalBlock block(activeCrit);
if (packetsQueued == 0)
requestExpiryTime = 0;
}

inline void pushData(unsigned queue, DataBuffer *buffer)
Expand Down

0 comments on commit d83fce2

Please sign in to comment.