Skip to content

Commit

Permalink
HPCC-32117 Optimize unordered append dataset in Thor
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Jul 24, 2024
1 parent 1cd728c commit 06d60fe
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 50 deletions.
87 changes: 67 additions & 20 deletions system/jlib/jqueue.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ public:
using PARENT::ensure;
};

//Sending signals once the critical section has been released generally gives better performance.
#define SIGNAL_OUTSIDE

template <class BASE, bool ALLOWNULLS>
class SimpleInterThreadQueueOf : protected SafeQueueOf<BASE, ALLOWNULLS>
Expand Down Expand Up @@ -358,18 +360,53 @@ public:

bool enqueue(BASE *e,unsigned timeout=INFINITE)
{
CriticalBlock b(SELF::crit);
if (limit) {
unsigned start=0;
while (limit<=SafeQueueOf<BASE, ALLOWNULLS>::unsafeordinality())
if (stopped||!qwait(deqwaitsem,deqwaiting,timeout,start))
return false;
unsigned numToSignal = 0;
{
CriticalBlock b(SELF::crit);
if (limit) {
unsigned start=0;
while (limit<=SafeQueueOf<BASE, ALLOWNULLS>::unsafeordinality())
if (stopped||!qwait(deqwaitsem,deqwaiting,timeout,start))
return false;
}
SafeQueueOf<BASE, ALLOWNULLS>::unsafeenqueue(e);
#ifdef SIGNAL_OUTSIDE
numToSignal = enqwaiting;
enqwaiting = 0;
#else
if (enqwaiting) {
enqwaitsem.signal(enqwaiting);
enqwaiting = 0;
}
#endif
}
SafeQueueOf<BASE, ALLOWNULLS>::unsafeenqueue(e);
if (enqwaiting) {
enqwaitsem.signal(enqwaiting);
// Signal when critical section no longer held so the reader can actually remove the item
if (numToSignal)
enqwaitsem.signal(numToSignal);
return true;
}

bool enqueueMany(unsigned num, BASE * *e,unsigned timeout=INFINITE)
{
assertex(!limit);
unsigned numToSignal = 0;
{
CriticalBlock b(SELF::crit);
for (unsigned i=0; i < num; i++)
SafeQueueOf<BASE, ALLOWNULLS>::unsafeenqueue(e[i]);

#ifdef SIGNAL_OUTSIDE
numToSignal = enqwaiting;
enqwaiting = 0;
#else
if (enqwaiting) {
enqwaitsem.signal(enqwaiting);
enqwaiting = 0;
}
#endif
}
if (numToSignal)
enqwaitsem.signal(numToSignal);
return true;
}

Expand Down Expand Up @@ -409,21 +446,31 @@ public:

BASE *dequeue(unsigned timeout=INFINITE)
{
CriticalBlock b(SELF::crit);
unsigned start=0;
while (!stopped) {
BASE *ret;
if (get(ret,false)) {
if (deqwaiting) {
deqwaitsem.signal(deqwaiting);
BASE *ret = nullptr;
unsigned numToSignal = 0;
{
CriticalBlock b(SELF::crit);
unsigned start=0;
while (!stopped) {
if (get(ret,false)) {
#ifdef SIGNAL_OUTSIDE
numToSignal = deqwaiting;
deqwaiting = 0;
#else
if (deqwaiting) {
deqwaitsem.signal(deqwaiting);
deqwaiting = 0;
}
#endif
break;
}
return ret;
if (!qwait(enqwaitsem,enqwaiting,timeout,start))
break;
}
if (!qwait(enqwaitsem,enqwaiting,timeout,start))
break;
}
return NULL;
if (numToSignal)
deqwaitsem.signal(numToSignal);
return ret;
}

BASE *dequeueTail(unsigned timeout=INFINITE)
Expand Down
119 changes: 89 additions & 30 deletions thorlcr/activities/funnel/thfunnelslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
StringAttr idStr;
unsigned inputIndex;
rowcount_t readThisInput; // purely for tracing
bool stopping;
std::atomic<bool> stopping{false};
public:
CInputHandler(CParallelFunnel &_funnel, unsigned _inputIndex)
: threaded("CInputHandler", this), funnel(_funnel), inputIndex(_inputIndex)
Expand All @@ -63,8 +63,6 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
}
void stop()
{
CriticalBlock b(stopCrit);
if (stopping) return;
stopping = true;
}
void join()
Expand All @@ -77,27 +75,36 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
{
bool started = false;
IEngineRowStream *inputStream = nullptr;
constexpr unsigned chunkSize = 32;
const void * rows[chunkSize];
unsigned numRows = 0;
try
{
funnel.activity.startInput(inputIndex);
started = true;
inputStream = funnel.activity.queryInputStream(inputIndex);
while (!stopping)
{
OwnedConstThorRow row = inputStream->ungroupedNextRow();
if (!row) break;

numRows = 0;
for (;numRows < chunkSize; numRows++)
{
CriticalBlock b(stopCrit);
if (stopping) break;
const void * row = inputStream->ungroupedNextRow();
if (!row)
break;
rows[numRows] = row;
}
CriticalBlock b(funnel.crit); // will mean first 'push' could block on fullSem, others on this crit.
funnel.push(row.getClear());
++readThisInput;

if (numRows == 0) break;

funnel.pushMulti(numRows, rows);
readThisInput += numRows;
if (numRows != chunkSize)
break;
}
}
catch (IException *e)
{
roxiemem::ReleaseRoxieRowArray(numRows, rows);
funnel.fireException(e);
e->Release();
}
Expand All @@ -124,27 +131,73 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
unsigned eoss;
StringAttr idStr;

CriticalSection fullCrit, crit;
CriticalSection crit;
CriticalSection writerCrit;
SimpleInterThreadQueueOf<const void, true> rows;
Semaphore fullSem;
size32_t totSize;
bool full, stopped;
unsigned waiting = 0;
bool stopped;
Linked<IOutputRowSerializer> serializer;

void push(const void *row)
{
CriticalBlock b2(fullCrit); // exclusivity for totSize / full
if (stopped)
size32_t rowSize = thorRowMemoryFootprint(serializer, row);

bool waitForSpace = false;
// only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time
{
ReleaseThorRow(row);
return;

CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit.
if (stopped)
{
ReleaseThorRow(row);
return;
}
rows.enqueue(row);
totSize += rowSize;
if (totSize > FUNNEL_MIN_BUFF_SIZE)
{
waiting++;
waitForSpace = true;
}
}
rows.enqueue(row);
totSize += thorRowMemoryFootprint(serializer, row);
while (totSize > FUNNEL_MIN_BUFF_SIZE)

if (waitForSpace)
{
full = true;
CriticalUnblock b(fullCrit);
CriticalBlock b(writerCrit);
fullSem.wait(); // block pushers on crit
}
}

void pushMulti(unsigned numRows, const void * * newRows)
{
size32_t rowSizes = 0;
for (unsigned i=0; i < numRows; i++)
rowSizes += thorRowMemoryFootprint(serializer, newRows[i]);

bool waitForSpace = false;
// only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time
{
CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit.
if (stopped)
{
for (unsigned i=0; i < numRows; i++)
ReleaseThorRow(newRows[i]);
return;
}
rows.enqueueMany(numRows, newRows);
totSize += rowSizes;
if (totSize > FUNNEL_MIN_BUFF_SIZE)
{
waiting++;
waitForSpace = true;
}
}

if (waitForSpace)
{
CriticalBlock b(writerCrit);
fullSem.wait(); // block pushers on crit
}
}
Expand All @@ -168,7 +221,8 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
{
idStr.set(activityKindStr(activity.queryContainer().getKind()));

stopped = full = false;
stopped = false;
waiting = 0;
totSize = 0;
eoss = 0;
serializer.set(activity.queryRowSerializer());
Expand Down Expand Up @@ -210,10 +264,11 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
CInputHandler &handler = inputHandlers.item(h);
handler.stop();
}

{
CriticalBlock b(fullCrit);
CriticalBlock b(crit);
stopped = true; // ensure any pending push()'s don't enqueue and if big row potentially block again.
if (full)
if (waiting)
{
for (;;)
{
Expand All @@ -222,7 +277,8 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
}
rows.stop(); // I don't think really needed
totSize = 0;
fullSem.signal();
fullSem.signal(waiting);
waiting = 0;
}
}
ForEachItemIn(h2, inputHandlers)
Expand All @@ -243,16 +299,19 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
return NULL;
}
size32_t sz = thorRowMemoryFootprint(serializer, row.get());
unsigned numToSignal = 0;
{
CriticalBlock b(fullCrit);
CriticalBlock b(crit);
assertex(totSize>=sz);
totSize -= sz;
if (full)
if (waiting && (totSize <= FUNNEL_MIN_BUFF_SIZE))
{
full = false;
fullSem.signal();
numToSignal = 1;
waiting--;
}
}
if (numToSignal)
fullSem.signal(numToSignal);
return row.getClear();
}

Expand Down

0 comments on commit 06d60fe

Please sign in to comment.