From cd7ce5bbce32233f72e6669f2aedc9a180a2faf1 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 2 Oct 2024 13:43:45 +0100 Subject: [PATCH] HPCC-XXXXX RoxieSocketQueueManager::run may be blocked by actCrit Avoid need to obtain critsec just to check if a worker thread's packet matches Signed-off-by: Richard Chapman --- roxie/ccd/ccd.hpp | 3 +- roxie/ccd/ccdqueue.cpp | 64 +++++++++++++++++++---------------------- roxie/udplib/udplib.hpp | 3 +- 3 files changed, 33 insertions(+), 37 deletions(-) diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 77b2dd66d3a..bd5ca6dccab 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -159,7 +159,7 @@ class RoxiePacketHeader unsigned activityId = 0; // identifies the helper factory to be used (activityId in graph) hash64_t queryHash = 0; // identifies the query - ruid_t uid = 0; // unique id + std::atomic uid = 0; // unique id ServerIdentifier serverId; #ifdef SUBCHANNELS_IN_HEADER ServerIdentifier subChannels[MAX_SUBCHANNEL]; @@ -173,6 +173,7 @@ class RoxiePacketHeader static unsigned getSubChannelMask(unsigned subChannel); unsigned priorityHash() const; + void clear(); void copy(const RoxiePacketHeader &oh); bool matchPacket(const RoxiePacketHeader &oh) const; void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 5e41da7fbbf..48819e058b5 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -56,7 +56,7 @@ RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _ { // Used to create the header to send a callback to originating server or an IBYTI to a buddy activityId = _activityId; - uid = source.uid; + uid.store(source.uid); queryHash = source.queryHash; channel = source.channel; overflowSequence = source.overflowSequence; @@ -88,15 +88,21 @@ unsigned RoxiePacketHeader::priorityHash() const void RoxiePacketHeader::copy(const RoxiePacketHeader &oh) { - // used for saving away kill packets for later matching by match - uid = oh.uid; + // used for saving away info for later matching by match, without having to lock overflowSequence = oh.overflowSequence; continueSequence = oh.continueSequence; serverId = oh.serverId; channel = oh.channel; + uid.store(oh.uid); // MORE - would it be safer, maybe even faster to copy the rest too? } +void RoxiePacketHeader::clear() +{ + // used for saving away kill packets for later matching by match + uid = RUID_NONE; // Will never match a queued packet +} + bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const { // used when matching up a kill packet against a pending one... @@ -156,7 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const ret.appendf(" (fetch part)"); break; } - ret.appendf(" uid=" RUIDF " pri=", uid); + ret.appendf(" uid=" RUIDF " pri=", uid.load()); switch(activityId & ROXIE_PRIORITY_MASK) { case ROXIE_SLA_PRIORITY: ret.append("SLA"); break; @@ -1184,7 +1190,6 @@ class RoxieQueue : public CInterface, implements IThreadFactory virtual IPooledThread *createNew(); - void abortChannel(unsigned channel); void start() { @@ -1381,6 +1386,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread Owned topology; #endif AgentContextLogger logctx; + RoxiePacketHeader packetHeader; public: IMPLEMENT_IINTERFACE; @@ -1412,34 +1418,32 @@ class CRoxieWorker : public CInterface, implements IPooledThread CriticalBlock b(actCrit); activity.setown(act); } - inline bool match(RoxiePacketHeader &h) - { - // There is a window between getting packet from queue and being able to match it. - // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!) - CriticalBlock b(actCrit); - return packet && packet->queryHeader().matchPacket(h); - } - - void abortChannel(unsigned channel) + inline void setPacket(IRoxieQueryPacket *p) { CriticalBlock b(actCrit); - if (packet && packet->queryHeader().channel==channel) + if (p) { - abortLaunch = true; -#ifndef NEW_IBYTI - if (doIbytiDelay) - ibytiSem.signal(); -#endif - if (activity) - activity->abort(); + packet.setown(p); + packetHeader.copy(p->queryHeader()); + } + else + { + packetHeader.clear(); + packet.setown(p); } } + inline bool match(RoxiePacketHeader &h) + { + // There is a window between getting packet from queue and being able to match it. + // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!) + return packetHeader.matchPacket(h); + } bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity) { - CriticalBlock b(actCrit); - if (packet && packet->queryHeader().matchPacket(h)) + if (packetHeader.matchPacket(h)) { + CriticalBlock b(actCrit); queryFound = true; abortLaunch = true; #ifndef NEW_IBYTI @@ -1760,7 +1764,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread #ifdef NEW_IBYTI logctx.setStatistic(StTimeIBYTIDelay, next->queryIBYTIDelayTime()); #endif - packet.setown(next->deserialize()); + setPacket(next->deserialize()); next.clear(); RoxiePacketHeader &header = packet->queryHeader(); #ifndef SUBCHANNELS_IN_HEADER @@ -1839,16 +1843,6 @@ IPooledThread *RoxieQueue::createNew() return new CRoxieWorker; } -void RoxieQueue::abortChannel(unsigned channel) -{ - Owned wi = workers->running(); - ForEach(*wi) - { - CRoxieWorker &w = (CRoxieWorker &) wi->query(); - w.abortChannel(channel); - } -} - //================================================================================= class CallbackEntry : implements IPendingCallback, public CInterface diff --git a/roxie/udplib/udplib.hpp b/roxie/udplib/udplib.hpp index 89190424dd2..ebd674376df 100644 --- a/roxie/udplib/udplib.hpp +++ b/roxie/udplib/udplib.hpp @@ -33,7 +33,8 @@ typedef unsigned ruid_t; // at 1000/sec recycle every 49 days #define RUIDF "0x%.8x" #define RUID_PING 0 #define RUID_DISCARD 1 -#define RUID_FIRST 2 +#define RUID_NONE 2 +#define RUID_FIRST 3 typedef unsigned RecordLengthType; #define MAX_RECORD_LENGTH 0xffffffff