Skip to content

Commit

Permalink
Merge pull request #19171 from richardkchapman/checkAbortCrit
Browse files Browse the repository at this point in the history
HPCC-32781 RoxieSocketQueueManager::run may be blocked by actCrit 

Reviewed-by: Mark Kelly [email protected]
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Oct 15, 2024
2 parents 0275911 + 32402f5 commit 2c4c9ca
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 93 deletions.
5 changes: 3 additions & 2 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class RoxiePacketHeader
unsigned activityId = 0; // identifies the helper factory to be used (activityId in graph)
hash64_t queryHash = 0; // identifies the query

ruid_t uid = 0; // unique id
std::atomic<ruid_t> uid = 0; // unique id
ServerIdentifier serverId;
#ifdef SUBCHANNELS_IN_HEADER
ServerIdentifier subChannels[MAX_SUBCHANNEL];
Expand All @@ -173,6 +173,7 @@ class RoxiePacketHeader

static unsigned getSubChannelMask(unsigned subChannel);
unsigned priorityHash() const;
void clear();
void copy(const RoxiePacketHeader &oh);
bool matchPacket(const RoxiePacketHeader &oh) const;
void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);
Expand Down Expand Up @@ -295,7 +296,6 @@ extern unsigned callbackTimeout;
extern unsigned lowTimeout;
extern unsigned highTimeout;
extern unsigned slaTimeout;
extern unsigned headRegionSize;
extern unsigned ccdMulticastPort;
extern IPropertyTree *topology;
extern MapStringTo<int> *preferredClusters;
Expand Down Expand Up @@ -389,6 +389,7 @@ extern bool ignoreFileDateMismatches;
extern bool ignoreFileSizeMismatches;
extern int fileTimeFuzzySeconds;
extern SinkMode defaultSinkMode;
extern bool limitWaitingWorkers;

#if defined(_CONTAINERIZED) || defined(SUBCHANNELS_IN_HEADER)
static constexpr bool roxieMulticastEnabled = false;
Expand Down
5 changes: 2 additions & 3 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ unsigned numRequestArrayThreads = 5;
bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
unsigned headRegionSize;
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
unsigned parallelLoopFlowLimit = 100;
Expand Down Expand Up @@ -203,6 +202,7 @@ unsigned maxGraphLoopIterations;
bool steppingEnabled = true;
bool simpleLocalKeyedJoins = true;
bool adhocRoxie = false;
bool limitWaitingWorkers = false;

unsigned __int64 minFreeDiskSpace = 1024 * 0x100000; // default to 1 GB
unsigned socketCheckInterval = 5000;
Expand Down Expand Up @@ -1005,7 +1005,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize);
blockedLocalAgent = topology->getPropBool("@blockedLocalAgent", blockedLocalAgent);
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
headRegionSize = topology->getPropInt("@headRegionSize", 0);
packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout);
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
Expand Down Expand Up @@ -1285,6 +1284,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
const char *sinkModeText = topology->queryProp("@sinkMode");
if (sinkModeText)
defaultSinkMode = getSinkMode(sinkModeText);
limitWaitingWorkers = topology->getPropBool("@limitWaitingWorkers", limitWaitingWorkers);

cacheReportPeriodSeconds = topology->getPropInt("@cacheReportPeriodSeconds", 5*60);
setLegacyAES(topology->getPropBool("expert/@useLegacyAES", false));
Expand Down Expand Up @@ -1454,7 +1454,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
DBGLOG("Loading all packages took %ums", loadPackageTimer.elapsedMs());

ROQ = createOutputQueueManager(numAgentThreads, encryptInTransit);
ROQ->setHeadRegionSize(headRegionSize);
ROQ->start();
Owned<IPacketDiscarder> packetDiscarder = createPacketDiscarder();
#if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL)
Expand Down
127 changes: 43 additions & 84 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _
{
// Used to create the header to send a callback to originating server or an IBYTI to a buddy
activityId = _activityId;
uid = source.uid;
uid.store(source.uid);
queryHash = source.queryHash;
channel = source.channel;
overflowSequence = source.overflowSequence;
Expand Down Expand Up @@ -88,15 +88,21 @@ unsigned RoxiePacketHeader::priorityHash() const

void RoxiePacketHeader::copy(const RoxiePacketHeader &oh)
{
// used for saving away kill packets for later matching by match
uid = oh.uid;
// used for saving away info for later matching by match, without having to lock
overflowSequence = oh.overflowSequence;
continueSequence = oh.continueSequence;
serverId = oh.serverId;
channel = oh.channel;
uid.store(oh.uid);
// MORE - would it be safer, maybe even faster to copy the rest too?
}

void RoxiePacketHeader::clear()
{
// used for saving away kill packets for later matching by match
uid = RUID_NONE; // Will never match a queued packet
}

bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const
{
// used when matching up a kill packet against a pending one...
Expand Down Expand Up @@ -156,7 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
ret.appendf(" (fetch part)");
break;
}
ret.appendf(" uid=" RUIDF " pri=", uid);
ret.appendf(" uid=" RUIDF " pri=", uid.load());
switch(activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
Expand Down Expand Up @@ -1151,8 +1157,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory
Owned <IThreadPool> workers;
QueueOf<ISerializedRoxieQueryPacket, true> waiting;
Semaphore available;
CriticalSection availCrit; // Semaphore post may be slow with a lot of waiters - this crit may be used to limit to a single waiter
CriticalSection qcrit;
unsigned headRegionSize;
unsigned numWorkers;
RelaxedAtomic<unsigned> started;
std::atomic<unsigned> idle;
Expand All @@ -1174,9 +1180,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory
public:
IMPLEMENT_IINTERFACE;

RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
RoxieQueue(unsigned _numWorkers)
{
headRegionSize = _headRegionSize;
numWorkers = _numWorkers;
workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers));
started = 0;
Expand All @@ -1192,7 +1197,6 @@ class RoxieQueue : public CInterface, implements IThreadFactory


virtual IPooledThread *createNew();
void abortChannel(unsigned channel);

void start()
{
Expand Down Expand Up @@ -1319,7 +1323,10 @@ class RoxieQueue : public CInterface, implements IThreadFactory
void wait()
{
idle++;
available.wait();
{
CLeavableCriticalBlock b(availCrit, limitWaitingWorkers);
available.wait();
}
idle--;
}

Expand All @@ -1331,31 +1338,7 @@ class RoxieQueue : public CInterface, implements IThreadFactory
ISerializedRoxieQueryPacket *dequeue()
{
CriticalBlock qc(qcrit);
unsigned lim = waiting.ordinality();
if (lim)
{
if (headRegionSize)
{
if (lim > headRegionSize)
lim = headRegionSize;
return waiting.dequeue(fastRand() % lim);
}
return waiting.dequeue();
}
else
return NULL;
}

unsigned getHeadRegionSize() const
{
return headRegionSize;
}

unsigned setHeadRegionSize(unsigned newsize)
{
unsigned ret = headRegionSize;
headRegionSize = newsize;
return ret;
return waiting.dequeue();
}

void noteOrphanIBYTI(const RoxiePacketHeader &hdr)
Expand Down Expand Up @@ -1389,6 +1372,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
Owned<const ITopologyServer> topology;
#endif
AgentContextLogger logctx;
RoxiePacketHeader packetHeader;

public:
IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -1417,41 +1401,41 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
inline void setActivity(IRoxieAgentActivity *act)
{
//Ensure that the activity is released outside of the critical section
Owned<IRoxieAgentActivity> temp(act);
{
CriticalBlock b(actCrit);
activity.swap(temp);
}
}
inline bool match(RoxiePacketHeader &h)
{
// There is a window between getting packet from queue and being able to match it.
// This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
CriticalBlock b(actCrit);
return packet && packet->queryHeader().matchPacket(h);
}

void abortChannel(unsigned channel)
inline void setPacket(IRoxieQueryPacket *p)
{
Owned<IRoxieQueryPacket> temp(p);
CriticalBlock b(actCrit);
if (packet && packet->queryHeader().channel==channel)
if (p)
{
abortLaunch = true;
#ifndef NEW_IBYTI
if (doIbytiDelay)
ibytiSem.signal();
#endif
if (activity)
activity->abort();
packet.swap(temp);
packetHeader.copy(p->queryHeader());
}
else
{
packetHeader.clear();
packet.swap(temp);
}
}
inline bool match(RoxiePacketHeader &h)
{
// There is a window between getting packet from queue and being able to match it.
// This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
return packetHeader.matchPacket(h);
}

bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity)
{
CriticalBlock b(actCrit);
if (packet && packet->queryHeader().matchPacket(h))
if (packetHeader.matchPacket(h))
{
CriticalBlock b(actCrit);
if (!packetHeader.matchPacket(h))
return false;
queryFound = true;
abortLaunch = true;
#ifndef NEW_IBYTI
Expand Down Expand Up @@ -1772,7 +1756,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
#ifdef NEW_IBYTI
logctx.setStatistic(StTimeIBYTIDelay, next->queryIBYTIDelayTime());
#endif
packet.setown(next->deserialize());
setPacket(next->deserialize());
next.clear();
RoxiePacketHeader &header = packet->queryHeader();
#ifndef SUBCHANNELS_IN_HEADER
Expand Down Expand Up @@ -1804,8 +1788,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
workerThreadBusy = false;
{
CriticalBlock b(actCrit);
packet.clear();
setPacket(nullptr);
#ifndef SUBCHANNELS_IN_HEADER
topology.clear();
#endif
Expand All @@ -1815,12 +1798,11 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
catch(IException *E)
{
CriticalBlock b(actCrit);
EXCLOG(E);
if (packet)
{
throwRemoteException(E, NULL, packet, false);
packet.clear();
setPacket(nullptr);
}
else
E->Release();
Expand All @@ -1830,13 +1812,12 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
catch(...)
{
CriticalBlock b(actCrit);
Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread");
EXCLOG(E);
if (packet)
{
throwRemoteException(E.getClear(), NULL, packet, false);
packet.clear();
setPacket(nullptr);
}
#ifndef SUBCHANNELS_IN_HEADER
topology.clear();
Expand All @@ -1851,16 +1832,6 @@ IPooledThread *RoxieQueue::createNew()
return new CRoxieWorker;
}

void RoxieQueue::abortChannel(unsigned channel)
{
Owned<IPooledThreadIterator> wi = workers->running();
ForEach(*wi)
{
CRoxieWorker &w = (CRoxieWorker &) wi->query();
w.abortChannel(channel);
}
}

//=================================================================================

class CallbackEntry : implements IPendingCallback, public CInterface
Expand Down Expand Up @@ -1917,20 +1888,8 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
{
}

virtual unsigned getHeadRegionSize() const
{
return loQueue.getHeadRegionSize();
}

virtual void setHeadRegionSize(unsigned newSize)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers), hiQueue(_numWorkers), loQueue(_numWorkers), numWorkers(_numWorkers)
{
slaQueue.setHeadRegionSize(newSize);
hiQueue.setHeadRegionSize(newSize);
loQueue.setHeadRegionSize(newSize);
}

virtual void start()
Expand Down
6 changes: 2 additions & 4 deletions roxie/udplib/udplib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ typedef unsigned ruid_t; // at 1000/sec recycle every 49 days
#define RUIDF "0x%.8x"
#define RUID_PING 0
#define RUID_DISCARD 1
#define RUID_FIRST 2
#define RUID_NONE 2
#define RUID_FIRST 3

typedef unsigned RecordLengthType;
#define MAX_RECORD_LENGTH 0xffffffff
Expand Down Expand Up @@ -176,9 +177,6 @@ interface IRoxieOutputQueueManager : public IInterface
virtual bool replyPending(RoxiePacketHeader &x) = 0;
virtual bool abortCompleted(RoxiePacketHeader &x) = 0;

virtual unsigned getHeadRegionSize() const = 0;
virtual void setHeadRegionSize(unsigned newsize) = 0;

virtual void start() = 0;
virtual void stop() = 0;
virtual void join() = 0;
Expand Down

0 comments on commit 2c4c9ca

Please sign in to comment.