From 760aef9072eaaea8e726ee41e1adf7150b2f8807 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 8 Nov 2024 10:24:06 +0000 Subject: [PATCH] HPCC-32922 Changes following review Signed-off-by: Shamser Ahmed --- .../lookupjoin/thlookupjoinslave.cpp | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 1b2d53db7d4..cad21912165 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -1445,12 +1445,18 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } HTHELPER *queryTable() { return table; } IBitSet *queryRhsChannelStopSet() { dbgassertex(0 == queryJobChannelNumber()); return rhsChannelStop; } - void startLeftInput() + void startLeftInput(bool async=false) + { { - LookAheadTimer t(slaveTimerStats, timeActivities); try { - startInput(0); + if (async) + { + LookAheadTimer t(slaveTimerStats, timeActivities); + startInput(0); + } + else + startInput(0); if (ensureStartFTLookAhead(0)) setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, ::canStall(input), grouped, RCUNBOUND, this), false); left.set(inputStream); // can be replaced by loader stream @@ -1460,6 +1466,10 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, leftexception.setown(e); } } + void startLeftInput() + { + startLeftInput(true); + } virtual bool isRhsConstant() const { return rhsConstant; } // IThorSlaveActivity overloaded methods @@ -1531,6 +1541,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } virtual void start() override { + ActivityTimer s(slaveTimerStats, timeActivities); joined = 0; joinCounter = 0; candidateCounter = 0; @@ -1563,10 +1574,9 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper, } else { - CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInput, this)); + CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInputAsync, this)); try { - ActivityTimer s(slaveTimerStats, timeActivities); startInput(1); rhsStartedBefore = true; }