diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index d0af342b917..dd82bf9c11f 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -284,7 +284,11 @@ void init_signals() { // signal(SIGTERM, caughtSIGTERM); #ifndef _WIN32 +#ifdef __APPLE__ + signal(SIGPIPE, SIG_IGN); // Otherwise it seems semaphore gets interrupted +#else signal(SIGPIPE, caughtSIGPIPE); +#endif signal(SIGHUP, caughtSIGHUP); signal(SIGALRM, caughtSIGALRM); @@ -484,18 +488,23 @@ void readStaticTopology() unsigned numNodes = topology->getCount("./RoxieServerProcess"); if (!numNodes && oneShotRoxie) { - if (topology->getPropBool("expert/@addDummyNode", false)) + unsigned dummyNodes = topology->getPropInt("expert/@addDummyNode", 0); + if (dummyNodes) { // Special config for testing some multinode things on a single node + numNodes = dummyNodes + 1; topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", "."); - topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", "192.0.2.0"); // A non-existent machine (this address is reserved for documentation) - numNodes = 2; + for (unsigned dummyNo = 0; dummyNo < dummyNodes; dummyNo++) + { + VStringBuffer dummyIP("192.0.2.%u", dummyNo); // A non-existent machine (this address is reserved for documentation) + topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", dummyIP.str()); + } localAgent = false; - topology->setPropInt("@numChannels", 2); - numChannels = 2; - topology->setPropInt("@numDataCopies", 2); - topology->setPropInt("@channelsPerNode", 2); - topology->setProp("@agentConfig", "cyclic"); + topology->setPropInt("@numChannels", 1); + numChannels = 1; + topology->setPropInt("@numDataCopies", numNodes); + topology->setPropInt("@channelsPerNode", 1); + topology->setProp("@agentConfig", "simple"); } else if (oneShotRoxie) { @@ -506,6 +515,8 @@ void readStaticTopology() Owned roxieServers = topology->getElements("./RoxieServerProcess"); bool myNodeSet = false; + StringBuffer forceIP; + topology->getProp("expert/@forceIP", forceIP); unsigned calcNumChannels = 0; ForEach(*roxieServers) { @@ -514,11 +525,23 @@ void readStaticTopology() IpAddress ip(iptext); if (ip.isNull()) throw MakeStringException(ROXIE_UDP_ERROR, "Could not resolve address %s", iptext); - if (ip.isLocal() && !myNodeSet) + if (forceIP.length()) { - myNodeSet = true; - myNode.setIp(ip); - myAgentEP.set(ccdMulticastPort, myNode.getIpAddress()); + if (streq(iptext, forceIP) && !myNodeSet) + { + myNodeSet = true; + myNode.setIp(ip); + myAgentEP.set(ccdMulticastPort, myNode.getIpAddress()); + } + } + else + { + if (ip.isLocal() && !myNodeSet) + { + myNodeSet = true; + myNode.setIp(ip); + myAgentEP.set(ccdMulticastPort, myNode.getIpAddress()); + } } ForEachItemIn(idx, nodeTable) { diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 331444f1d64..1b00584e29f 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -2199,7 +2199,10 @@ class DelayedPacketQueue void append(ISerializedRoxieQueryPacket *packet, unsigned expires) { - // Goes on the end. But percolate the expiry time backwards + // Insert in the list at the appropriate point that the list stays in expiry order + // With a single buddy this is usually on the end, but with multiple buddies we will be inserting at various points. + // Should we consider a queue per buddy? + assert(GetCurrentThreadId()==roxiePacketReaderThread); packet->noteQueued(0); DelayedPacketEntry *newEntry = new DelayedPacketEntry(packet, expires); @@ -2208,6 +2211,8 @@ class DelayedPacketQueue StringBuffer s; DBGLOG("Adding delayed packet %s expires in %u ms", packet->queryHeader().toString(s).str(), expires - msTick()); } +#if 0 +// old code assumed only one buddy newEntry->prev = tail; if (tail) { @@ -2223,6 +2228,41 @@ class DelayedPacketQueue else head = newEntry; tail = newEntry; +#else + if (tail) + { + DelayedPacketEntry *last = nullptr; + DelayedPacketEntry *finger = tail; + while (finger != nullptr) + { + if ((int) (finger->waitExpires - expires) <= 0) + break; + last = finger; + finger = finger->prev; + } + if (last) + last->prev = newEntry; + else + tail = newEntry; + newEntry->next = last; + newEntry->prev = finger; + if (finger) + finger->next = newEntry; + else + head = newEntry; + } + else + { + head = newEntry; + tail = newEntry; + } +#endif + numEntries++; + if (numEntries > maxNumEntries) + { + maxNumEntries = numEntries; + DBGLOG("WorkerUdpReader: Max IBYTI queue length is now %u", numEntries); + } } // Move any that we are done waiting for our buddy onto the active queue @@ -2288,11 +2328,13 @@ class DelayedPacketQueue if (goer==tail) tail = goer->prev; delete goer; + numEntries--; } DelayedPacketEntry *head = nullptr; DelayedPacketEntry *tail = nullptr; - + unsigned numEntries = 0; + unsigned maxNumEntries = 0; }; //------------------------------------------------------------------------------------------------------------ @@ -2398,7 +2440,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase DelayedPacketQueueManager delayed; #endif - class WorkerUdpTracker : public TimeDivisionTracker<12, false> + class WorkerUdpTracker : public TimeDivisionTracker<13, false> { public: enum @@ -2408,6 +2450,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase allocating, processing, pushing, + deferring, checkingRunning, checkingExpired, decoding, @@ -2417,13 +2460,14 @@ class RoxieSocketQueueManager : public RoxieReceiverBase getTimeout }; - WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<12, false>(name, reportIntervalSeconds) + WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<13, false>(name, reportIntervalSeconds) { stateNames[other] = "other"; stateNames[waiting] = "waiting"; stateNames[allocating] = "allocating"; stateNames[processing] = "processing"; stateNames[pushing] = "pushing"; + stateNames[deferring] = "deferring"; stateNames[checkingRunning] = "checking running"; stateNames[checkingExpired] = "checking expiry"; stateNames[decoding] = "decoding"; @@ -2454,7 +2498,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } readThread; public: - RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 60), readThread(*this) + RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 5), readThread(*this) { maxPacketSize = multicastSocket->get_max_send_size(); if ((maxPacketSize==0)||(maxPacketSize>65535)) @@ -2818,17 +2862,22 @@ class RoxieSocketQueueManager : public RoxieReceiverBase delay += getIbytiDelay(header.subChannels[subChannel]); } if (delay) + { + division.switchState(WorkerUdpTracker::deferring); delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay); + } else #endif + { + division.switchState(WorkerUdpTracker::pushing); queue.enqueueUnique(packet.getClear(), mySubchannel, 0); + } } } else // first time (not a retry). { division.switchState(WorkerUdpTracker::creatingPacket); Owned packet = createSerializedRoxiePacket(mb); - division.switchState(WorkerUdpTracker::pushing); #ifdef NEW_IBYTI unsigned delay = 0; if (mySubchannel != 0 && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) // i.e. I am not the primary here, and never delay special @@ -2837,10 +2886,16 @@ class RoxieSocketQueueManager : public RoxieReceiverBase delay += getIbytiDelay(header.subChannels[subChannel]); } if (delay) + { + division.switchState(WorkerUdpTracker::deferring); delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay); + } else #endif + { + division.switchState(WorkerUdpTracker::pushing); queue.enqueue(packet.getClear(), 0); + } } } }