diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 7bac891cc3e..53a6c53e338 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -265,6 +265,7 @@ interface IRoxieQueryPacket : extends IInterface virtual void noteTimeSent() = 0; virtual void setAcknowledged() = 0; + virtual void clearAcknowledged() = 0; virtual bool isAcknowledged() const = 0; virtual bool resendNeeded(unsigned now) = 0; }; diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index f74004bfeb8..b70242587cf 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -646,6 +646,11 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa acknowledged = true; } + virtual void clearAcknowledged() override + { + acknowledged = false; + } + virtual bool isAcknowledged() const override { return acknowledged; @@ -2723,6 +2728,8 @@ class RoxieSocketQueueManager : public RoxieReceiverBase #endif Owned packet = createSerializedRoxiePacket(mb); unsigned retries = header.thisChannelRetries(mySubchannel); + if (retries >= SUBCHANNEL_MASK) + return; // I already failed unrecoverably on this request - ignore it if (acknowledgeAllRequests && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) { #ifdef DEBUG @@ -2742,9 +2749,6 @@ class RoxieSocketQueueManager : public RoxieReceiverBase { // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread assertex(header.channel); // should never see a retry on channel 0 - if (retries >= SUBCHANNEL_MASK) - return; // someone sent a failure or something - ignore it - // Send back an out-of-band immediately, to let Roxie server know that channel is still active if (!(testAgentFailure & 0x800) && !acknowledgeAllRequests) { diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 0a9ac8869dc..e81829cd280 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4164,11 +4164,11 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie unsigned now = 0; if (acknowledgeAllRequests) { - if (doTrace(traceRoxiePackets)) - DBGLOG("Checking %d pending packets for ack status", pending.ordinality()); now = msTick(); if (now-lastRetryCheck < packetAcknowledgeTimeout/4) return; + if (doTrace(traceRoxiePackets)) + DBGLOG("Checking %d pending packets for ack status", pending.ordinality()); lastRetryCheck = now; } CriticalBlock b(pendingCrit); @@ -5235,7 +5235,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie Owned exceptionData = mr->getCursor(rowManager); throwRemoteException(exceptionData); } - // Leave it on pending queue in original location + // One channel has failed, but should be recoverable + // Leave it on pending queue in original location, but clear acknowledged flag + op->clearAcknowledged(); break; case ROXIE_ALIVE: