Skip to content

Commit

Permalink
HPCC-32540 Don't retry indefinitely when a server request is not ackn…
Browse files Browse the repository at this point in the history
…owledged

Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Aug 28, 2024
1 parent bf120af commit ae93d7a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
4 changes: 2 additions & 2 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class RoxiePacketHeader
void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);
StringBuffer &toString(StringBuffer &ret) const;
bool allChannelsFailed();
bool retry(bool ack);
bool retry();
void setException(unsigned subChannel);
unsigned thisChannelRetries(unsigned subChannel);

Expand Down Expand Up @@ -266,7 +266,7 @@ interface IRoxieQueryPacket : extends IInterface
virtual void noteTimeSent() = 0;
virtual void setAcknowledged() = 0;
virtual bool isAcknowledged() const = 0;
virtual bool resendNeeded(unsigned timeout, unsigned now) const = 0;
virtual bool resendNeeded(unsigned now) = 0;
};

interface IQueryDll;
Expand Down
13 changes: 8 additions & 5 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ bool RoxiePacketHeader::allChannelsFailed()
return (retries & mask) == mask;
}

bool RoxiePacketHeader::retry(bool ack)
bool RoxiePacketHeader::retry()
{
bool worthRetrying = false;
unsigned mask = SUBCHANNEL_MASK;
Expand All @@ -213,8 +213,7 @@ bool RoxiePacketHeader::retry(bool ack)
{
unsigned subRetries = (retries & mask) >> (subChannel * SUBCHANNEL_BITS);
if (subRetries != SUBCHANNEL_MASK)
if (!subRetries || !ack)
subRetries++;
subRetries++;
if (subRetries != SUBCHANNEL_MASK)
worthRetrying = true;
retries = (retries & ~mask) | (subRetries << (subChannel * SUBCHANNEL_BITS));
Expand Down Expand Up @@ -498,6 +497,7 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa
unsigned contextLength = 0;
std::atomic<unsigned> timeFirstSent = 0;
std::atomic<bool> acknowledged = false;
unsigned resends = 0;

public:
IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -651,9 +651,12 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa
return acknowledged;
}

virtual bool resendNeeded(unsigned timeout, unsigned now) const override
virtual bool resendNeeded(unsigned now) override
{
return timeFirstSent && !acknowledged && now-timeFirstSent > timeout;
bool ret = timeFirstSent && !acknowledged && now-timeFirstSent > packetAcknowledgeTimeout*(resends+1);
if (ret)
resends++;
return ret;
}
};

Expand Down
8 changes: 4 additions & 4 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4182,14 +4182,14 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
{
if (acknowledgeAllRequests)
{
if (!i->resendNeeded(packetAcknowledgeTimeout, now))
if (!i->resendNeeded(now))
continue;
if (doTrace(traceAcknowledge) || doTrace(traceRoxiePackets))
activity.queryLogCtx().CTXLOG("Input has not been acknowledged for %u ms - retry required?", packetAcknowledgeTimeout);
activity.noteStatistic(StNumAckRetries, 1);

}
if (!i->queryHeader().retry(acknowledgeAllRequests))
if (!i->queryHeader().retry())
{
StringBuffer s;
IException *E = MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to get response from agent(s) for %s in activity %d", i->queryHeader().toString(s).str(), activity.queryId());
Expand Down Expand Up @@ -5315,10 +5315,10 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
else if (!anyActivity && !localAgent && !acknowledgeAllRequests)
{
unsigned timeNow = msTick();
if (timeNow-lastActivity >= checkInterval)
if (timeNow-lastActivity >= timeout)
{
lastActivity = timeNow;
activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", checkInterval);
activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", timeout);
retryPending();
}
}
Expand Down

0 comments on commit ae93d7a

Please sign in to comment.