Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Oct 16, 2024
1 parent 5daed52 commit 81a44c7
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 18 deletions.
47 changes: 35 additions & 12 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ void init_signals()
{
// signal(SIGTERM, caughtSIGTERM);
#ifndef _WIN32
#ifdef __APPLE__
signal(SIGPIPE, SIG_IGN); // Otherwise it seems semaphore gets interrupted
#else
signal(SIGPIPE, caughtSIGPIPE);
#endif
signal(SIGHUP, caughtSIGHUP);
signal(SIGALRM, caughtSIGALRM);

Expand Down Expand Up @@ -484,18 +488,23 @@ void readStaticTopology()
unsigned numNodes = topology->getCount("./RoxieServerProcess");
if (!numNodes && oneShotRoxie)
{
if (topology->getPropBool("expert/@addDummyNode", false))
unsigned dummyNodes = topology->getPropInt("expert/@addDummyNode", 0);
if (dummyNodes)
{
// Special config for testing some multinode things on a single node
numNodes = dummyNodes + 1;
topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", ".");
topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", "192.0.2.0"); // A non-existent machine (this address is reserved for documentation)
numNodes = 2;
for (unsigned dummyNo = 0; dummyNo < dummyNodes; dummyNo++)
{
VStringBuffer dummyIP("192.0.2.%u", dummyNo); // A non-existent machine (this address is reserved for documentation)
topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", dummyIP.str());
}
localAgent = false;
topology->setPropInt("@numChannels", 2);
numChannels = 2;
topology->setPropInt("@numDataCopies", 2);
topology->setPropInt("@channelsPerNode", 2);
topology->setProp("@agentConfig", "cyclic");
topology->setPropInt("@numChannels", 1);
numChannels = 1;
topology->setPropInt("@numDataCopies", numNodes);
topology->setPropInt("@channelsPerNode", 1);
topology->setProp("@agentConfig", "simple");
}
else if (oneShotRoxie)
{
Expand All @@ -506,6 +515,8 @@ void readStaticTopology()
Owned<IPropertyTreeIterator> roxieServers = topology->getElements("./RoxieServerProcess");

bool myNodeSet = false;
StringBuffer forceIP;
topology->getProp("expert/@forceIP", forceIP);
unsigned calcNumChannels = 0;
ForEach(*roxieServers)
{
Expand All @@ -514,11 +525,23 @@ void readStaticTopology()
IpAddress ip(iptext);
if (ip.isNull())
throw MakeStringException(ROXIE_UDP_ERROR, "Could not resolve address %s", iptext);
if (ip.isLocal() && !myNodeSet)
if (forceIP.length())
{
myNodeSet = true;
myNode.setIp(ip);
myAgentEP.set(ccdMulticastPort, myNode.getIpAddress());
if (streq(iptext, forceIP) && !myNodeSet)
{
myNodeSet = true;
myNode.setIp(ip);
myAgentEP.set(ccdMulticastPort, myNode.getIpAddress());
}
}
else
{
if (ip.isLocal() && !myNodeSet)
{
myNodeSet = true;
myNode.setIp(ip);
myAgentEP.set(ccdMulticastPort, myNode.getIpAddress());
}
}
ForEachItemIn(idx, nodeTable)
{
Expand Down
67 changes: 61 additions & 6 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2199,7 +2199,10 @@ class DelayedPacketQueue

void append(ISerializedRoxieQueryPacket *packet, unsigned expires)
{
// Goes on the end. But percolate the expiry time backwards
// Insert in the list at the appropriate point that the list stays in expiry order
// With a single buddy this is usually on the end, but with multiple buddies we will be inserting at various points.
// Should we consider a queue per buddy?

assert(GetCurrentThreadId()==roxiePacketReaderThread);
packet->noteQueued(0);
DelayedPacketEntry *newEntry = new DelayedPacketEntry(packet, expires);
Expand All @@ -2208,6 +2211,8 @@ class DelayedPacketQueue
StringBuffer s;
DBGLOG("Adding delayed packet %s expires in %u ms", packet->queryHeader().toString(s).str(), expires - msTick());
}
#if 0
// old code assumed only one buddy
newEntry->prev = tail;
if (tail)
{
Expand All @@ -2223,6 +2228,41 @@ class DelayedPacketQueue
else
head = newEntry;
tail = newEntry;
#else
if (tail)
{
DelayedPacketEntry *last = nullptr;
DelayedPacketEntry *finger = tail;
while (finger != nullptr)
{
if ((int) (finger->waitExpires - expires) <= 0)
break;
last = finger;
finger = finger->prev;
}
if (last)
last->prev = newEntry;
else
tail = newEntry;
newEntry->next = last;
newEntry->prev = finger;
if (finger)
finger->next = newEntry;
else
head = newEntry;
}
else
{
head = newEntry;
tail = newEntry;
}
#endif
numEntries++;
if (numEntries > maxNumEntries)
{
maxNumEntries = numEntries;
DBGLOG("WorkerUdpReader: Max IBYTI queue length is now %u", numEntries);
}
}

// Move any that we are done waiting for our buddy onto the active queue
Expand Down Expand Up @@ -2288,11 +2328,13 @@ class DelayedPacketQueue
if (goer==tail)
tail = goer->prev;
delete goer;
numEntries--;
}

DelayedPacketEntry *head = nullptr;
DelayedPacketEntry *tail = nullptr;

unsigned numEntries = 0;
unsigned maxNumEntries = 0;
};

//------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -2398,7 +2440,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
DelayedPacketQueueManager delayed;
#endif

class WorkerUdpTracker : public TimeDivisionTracker<12, false>
class WorkerUdpTracker : public TimeDivisionTracker<13, false>
{
public:
enum
Expand All @@ -2408,6 +2450,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
allocating,
processing,
pushing,
deferring,
checkingRunning,
checkingExpired,
decoding,
Expand All @@ -2417,13 +2460,14 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
getTimeout
};

WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<12, false>(name, reportIntervalSeconds)
WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<13, false>(name, reportIntervalSeconds)
{
stateNames[other] = "other";
stateNames[waiting] = "waiting";
stateNames[allocating] = "allocating";
stateNames[processing] = "processing";
stateNames[pushing] = "pushing";
stateNames[deferring] = "deferring";
stateNames[checkingRunning] = "checking running";
stateNames[checkingExpired] = "checking expiry";
stateNames[decoding] = "decoding";
Expand Down Expand Up @@ -2454,7 +2498,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
} readThread;

public:
RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 60), readThread(*this)
RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 5), readThread(*this)
{
maxPacketSize = multicastSocket->get_max_send_size();
if ((maxPacketSize==0)||(maxPacketSize>65535))
Expand Down Expand Up @@ -2818,17 +2862,22 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
delay += getIbytiDelay(header.subChannels[subChannel]);
}
if (delay)
{
division.switchState(WorkerUdpTracker::deferring);
delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
}
else
#endif
{
division.switchState(WorkerUdpTracker::pushing);
queue.enqueueUnique(packet.getClear(), mySubchannel, 0);
}
}
}
else // first time (not a retry).
{
division.switchState(WorkerUdpTracker::creatingPacket);
Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
division.switchState(WorkerUdpTracker::pushing);
#ifdef NEW_IBYTI
unsigned delay = 0;
if (mySubchannel != 0 && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) // i.e. I am not the primary here, and never delay special
Expand All @@ -2837,10 +2886,16 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
delay += getIbytiDelay(header.subChannels[subChannel]);
}
if (delay)
{
division.switchState(WorkerUdpTracker::deferring);
delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
}
else
#endif
{
division.switchState(WorkerUdpTracker::pushing);
queue.enqueue(packet.getClear(), 0);
}
}
}
}
Expand Down

0 comments on commit 81a44c7

Please sign in to comment.