diff --git a/system/jlib/jqueue.tpp b/system/jlib/jqueue.tpp index 4f4ed610985..f334469c4dc 100644 --- a/system/jlib/jqueue.tpp +++ b/system/jlib/jqueue.tpp @@ -271,6 +271,8 @@ public: using PARENT::ensure; }; +//Sending signals once the critical section has been released generally gives better performance. +#define SIGNAL_OUTSIDE template class SimpleInterThreadQueueOf : protected SafeQueueOf @@ -358,18 +360,53 @@ public: bool enqueue(BASE *e,unsigned timeout=INFINITE) { - CriticalBlock b(SELF::crit); - if (limit) { - unsigned start=0; - while (limit<=SafeQueueOf::unsafeordinality()) - if (stopped||!qwait(deqwaitsem,deqwaiting,timeout,start)) - return false; + unsigned numToSignal = 0; + { + CriticalBlock b(SELF::crit); + if (limit) { + unsigned start=0; + while (limit<=SafeQueueOf::unsafeordinality()) + if (stopped||!qwait(deqwaitsem,deqwaiting,timeout,start)) + return false; + } + SafeQueueOf::unsafeenqueue(e); +#ifdef SIGNAL_OUTSIDE + numToSignal = enqwaiting; + enqwaiting = 0; +#else + if (enqwaiting) { + enqwaitsem.signal(enqwaiting); + enqwaiting = 0; + } +#endif } - SafeQueueOf::unsafeenqueue(e); - if (enqwaiting) { - enqwaitsem.signal(enqwaiting); + // Signal when critical section no longer held so the reader can actually remove the item + if (numToSignal) + enqwaitsem.signal(numToSignal); + return true; + } + + bool enqueueMany(unsigned num, BASE * *e,unsigned timeout=INFINITE) + { + assertex(!limit); + unsigned numToSignal = 0; + { + CriticalBlock b(SELF::crit); + for (unsigned i=0; i < num; i++) + SafeQueueOf::unsafeenqueue(e[i]); + +#ifdef SIGNAL_OUTSIDE + numToSignal = enqwaiting; enqwaiting = 0; +#else + if (enqwaiting) { + enqwaitsem.signal(enqwaiting); + enqwaiting = 0; + } +#endif } + if (numToSignal) + enqwaitsem.signal(numToSignal); return true; } @@ -409,21 +446,31 @@ public: BASE *dequeue(unsigned timeout=INFINITE) { - CriticalBlock b(SELF::crit); - unsigned start=0; - while (!stopped) { - BASE *ret; - if (get(ret,false)) { - if (deqwaiting) { - deqwaitsem.signal(deqwaiting); + BASE *ret = nullptr; + unsigned numToSignal = 0; + { + CriticalBlock b(SELF::crit); + unsigned start=0; + while (!stopped) { + if (get(ret,false)) { +#ifdef SIGNAL_OUTSIDE + numToSignal = deqwaiting; deqwaiting = 0; +#else + if (deqwaiting) { + deqwaitsem.signal(deqwaiting); + deqwaiting = 0; + } +#endif + break; } - return ret; + if (!qwait(enqwaitsem,enqwaiting,timeout,start)) + break; } - if (!qwait(enqwaitsem,enqwaiting,timeout,start)) - break; } - return NULL; + if (numToSignal) + deqwaitsem.signal(numToSignal); + return ret; } BASE *dequeueTail(unsigned timeout=INFINITE) diff --git a/thorlcr/activities/funnel/thfunnelslave.cpp b/thorlcr/activities/funnel/thfunnelslave.cpp index 91deee8d18f..32962951c52 100644 --- a/thorlcr/activities/funnel/thfunnelslave.cpp +++ b/thorlcr/activities/funnel/thfunnelslave.cpp @@ -41,7 +41,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface StringAttr idStr; unsigned inputIndex; rowcount_t readThisInput; // purely for tracing - bool stopping; + std::atomic stopping{false}; public: CInputHandler(CParallelFunnel &_funnel, unsigned _inputIndex) : threaded("CInputHandler", this), funnel(_funnel), inputIndex(_inputIndex) @@ -63,8 +63,6 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface } void stop() { - CriticalBlock b(stopCrit); - if (stopping) return; stopping = true; } void join() @@ -77,6 +75,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface { bool started = false; IEngineRowStream *inputStream = nullptr; + constexpr unsigned chunkSize = 32; + const void * rows[chunkSize]; + unsigned numRows = 0; try { funnel.activity.startInput(inputIndex); @@ -84,20 +85,26 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface inputStream = funnel.activity.queryInputStream(inputIndex); while (!stopping) { - OwnedConstThorRow row = inputStream->ungroupedNextRow(); - if (!row) break; - + numRows = 0; + for (;numRows < chunkSize; numRows++) { - CriticalBlock b(stopCrit); - if (stopping) break; + const void * row = inputStream->ungroupedNextRow(); + if (!row) + break; + rows[numRows] = row; } - CriticalBlock b(funnel.crit); // will mean first 'push' could block on fullSem, others on this crit. - funnel.push(row.getClear()); - ++readThisInput; + + if (numRows == 0) break; + + funnel.pushMulti(numRows, rows); + readThisInput += numRows; + if (numRows != chunkSize) + break; } } catch (IException *e) { + roxiemem::ReleaseRoxieRowArray(numRows, rows); funnel.fireException(e); e->Release(); } @@ -124,27 +131,73 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface unsigned eoss; StringAttr idStr; - CriticalSection fullCrit, crit; + CriticalSection crit; + CriticalSection writerCrit; SimpleInterThreadQueueOf rows; Semaphore fullSem; size32_t totSize; - bool full, stopped; + unsigned waiting = 0; + bool stopped; Linked serializer; void push(const void *row) { - CriticalBlock b2(fullCrit); // exclusivity for totSize / full - if (stopped) + size32_t rowSize = thorRowMemoryFootprint(serializer, row); + + bool waitForSpace = false; + // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time { - ReleaseThorRow(row); - return; + + CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. + if (stopped) + { + ReleaseThorRow(row); + return; + } + rows.enqueue(row); + totSize += rowSize; + if (totSize > FUNNEL_MIN_BUFF_SIZE) + { + waiting++; + waitForSpace = true; + } } - rows.enqueue(row); - totSize += thorRowMemoryFootprint(serializer, row); - while (totSize > FUNNEL_MIN_BUFF_SIZE) + + if (waitForSpace) { - full = true; - CriticalUnblock b(fullCrit); + CriticalBlock b(writerCrit); + fullSem.wait(); // block pushers on crit + } + } + + void pushMulti(unsigned numRows, const void * * newRows) + { + size32_t rowSizes = 0; + for (unsigned i=0; i < numRows; i++) + rowSizes += thorRowMemoryFootprint(serializer, newRows[i]); + + bool waitForSpace = false; + // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time + { + CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. + if (stopped) + { + for (unsigned i=0; i < numRows; i++) + ReleaseThorRow(newRows[i]); + return; + } + rows.enqueueMany(numRows, newRows); + totSize += rowSizes; + if (totSize > FUNNEL_MIN_BUFF_SIZE) + { + waiting++; + waitForSpace = true; + } + } + + if (waitForSpace) + { + CriticalBlock b(writerCrit); fullSem.wait(); // block pushers on crit } } @@ -168,7 +221,8 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface { idStr.set(activityKindStr(activity.queryContainer().getKind())); - stopped = full = false; + stopped = false; + waiting = 0; totSize = 0; eoss = 0; serializer.set(activity.queryRowSerializer()); @@ -210,10 +264,11 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface CInputHandler &handler = inputHandlers.item(h); handler.stop(); } + { - CriticalBlock b(fullCrit); + CriticalBlock b(crit); stopped = true; // ensure any pending push()'s don't enqueue and if big row potentially block again. - if (full) + if (waiting) { for (;;) { @@ -222,7 +277,8 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface } rows.stop(); // I don't think really needed totSize = 0; - fullSem.signal(); + fullSem.signal(waiting); + waiting = 0; } } ForEachItemIn(h2, inputHandlers) @@ -243,16 +299,19 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface return NULL; } size32_t sz = thorRowMemoryFootprint(serializer, row.get()); + unsigned numToSignal = 0; { - CriticalBlock b(fullCrit); + CriticalBlock b(crit); assertex(totSize>=sz); totSize -= sz; - if (full) + if (waiting && (totSize <= FUNNEL_MIN_BUFF_SIZE)) { - full = false; - fullSem.signal(); + numToSignal = 1; + waiting--; } } + if (numToSignal) + fullSem.signal(numToSignal); return row.getClear(); }