diff --git a/thorlcr/activities/csvread/thcsvrslave.cpp b/thorlcr/activities/csvread/thcsvrslave.cpp index c682dc8230d..4ee76eb7622 100644 --- a/thorlcr/activities/csvread/thcsvrslave.cpp +++ b/thorlcr/activities/csvread/thcsvrslave.cpp @@ -39,7 +39,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase IHThorCsvReadArg *helper; StringAttr csvQuote, csvSeparate, csvTerminate, csvEscape; - Owned out; + Owned out; rowcount_t limit; rowcount_t stopAfter; unsigned headerLines; diff --git a/thorlcr/activities/diskread/thdiskreadslave.cpp b/thorlcr/activities/diskread/thdiskreadslave.cpp index 59d001238a0..839b805fe52 100644 --- a/thorlcr/activities/diskread/thdiskreadslave.cpp +++ b/thorlcr/activities/diskread/thdiskreadslave.cpp @@ -185,14 +185,6 @@ class CDiskRecordPartHandler : public CDiskPartHandlerBase if (in) merged.mergeStatistic(StNumDiskRowsRead, in->queryProgress()); } - virtual unsigned __int64 queryProgress() override - { - CriticalBlock block(inputCs); - if (in) - return in->queryProgress(); - else - return 0; - } }; ///////////////////////////////////////////////// @@ -516,34 +508,9 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord } }; - class PgRecordSize : implements IRecordSize, public CSimpleInterface - { - public: - IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - - PgRecordSize(IRecordSize *_rcSz) - { - rcSz = LINK(_rcSz); - } - ~PgRecordSize() - { - rcSz->Release(); - } - virtual size32_t getRecordSize(const void *rec) - { - return rcSz->getRecordSize(rec) + 1; - } - virtual size32_t getFixedSize() const - { - return rcSz->getFixedSize()?(rcSz->getFixedSize()+1):0; - } - private: - IRecordSize *rcSz; - }; - public: bool unsorted = false, countSent = false; - IRowStream *out = nullptr; + Owned partSequencer; IHThorDiskReadArg *helper; @@ -558,7 +525,6 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord } ~CDiskReadSlaveActivity() { - ::Release(out); } // IThorSlaveActivity @@ -588,11 +554,7 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord } virtual void kill() { - if (out) - { - out->Release(); - out = NULL; - } + partSequencer.clear(); CDiskReadSlaveActivityRecord::kill(); } @@ -627,28 +589,27 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord if (limit && (limit < remoteLimit)) remoteLimit = limit+1; // 1 more to ensure triggered when received back. // JCSMORE remote side could handle skip too.. } - out = createSequentialPartHandler(partHandler, partDescs, grouped); // ** + partSequencer.setown(createSequentialPartHandler(partHandler, partDescs, grouped)); } virtual bool isGrouped() const override { return grouped; } // IRowStream virtual void stop() { - if (out) + if (partSequencer) { - out->stop(); - out->Release(); - out = NULL; + partSequencer->stop(); + partSequencer.clear(); } PARENT::stop(); } CATCH_NEXTROW() { ActivityTimer t(slaveTimerStats, timeActivities); - if (NULL == out) // guard against, but shouldn't happen + if (unlikely(NULL == partSequencer)) // guard against, but shouldn't happen return NULL; - OwnedConstThorRow ret = out->nextRow(); - if (!ret) + OwnedConstThorRow ret = partSequencer->nextRow(); + if (unlikely(!ret)) return NULL; rowcount_t c = getDataLinkCount(); if (stopAfter && (c >= stopAfter)) // NB: only slave limiter, global performed in chained choosen activity @@ -739,7 +700,7 @@ class CDiskNormalizeSlave : public CDiskReadSlaveActivityRecord }; IHThorDiskNormalizeArg *helper; - IRowStream *out = nullptr; + Owned partSequencer; public: CDiskNormalizeSlave(CGraphElementBase *_container) @@ -750,7 +711,6 @@ class CDiskNormalizeSlave : public CDiskReadSlaveActivityRecord } ~CDiskNormalizeSlave() { - ::Release(out); } // IThorSlaveActivity @@ -782,28 +742,27 @@ class CDiskNormalizeSlave : public CDiskReadSlaveActivityRecord else limit = (rowcount_t)helper->getRowLimit(); stopAfter = (rowcount_t)helper->getChooseNLimit(); - out = createSequentialPartHandler(partHandler, partDescs, false); + partSequencer.setown(createSequentialPartHandler(partHandler, partDescs, false)); } virtual bool isGrouped() const override { return false; } // IRowStream virtual void stop() { - if (out) + if (partSequencer) { - out->stop(); - out->Release(); - out = NULL; + partSequencer->stop(); + partSequencer.clear(); } PARENT::stop(); } CATCH_NEXTROW() { ActivityTimer t(slaveTimerStats, timeActivities); - if (!out) + if (unlikely(!partSequencer)) return NULL; - OwnedConstThorRow ret = out->nextRow(); - if (!ret) + OwnedConstThorRow ret = partSequencer->nextRow(); + if (unlikely(!ret)) return NULL; rowcount_t c = getDataLinkCount(); if (stopAfter && (c >= stopAfter)) // NB: only slave limiter, global performed in chained choosen activity diff --git a/thorlcr/activities/thactivityutil.cpp b/thorlcr/activities/thactivityutil.cpp index afa10f9b282..581302c4ee9 100644 --- a/thorlcr/activities/thactivityutil.cpp +++ b/thorlcr/activities/thactivityutil.cpp @@ -866,73 +866,64 @@ StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFil return filePath; } +CSeqPartHandler::CSeqPartHandler(CPartHandler *_partHandler, IArrayOf &_partDescs, bool _grouped) + : partDescs(_partDescs), partHandler(_partHandler), grouped(_grouped) +{ + part = 0; + someInGroup = false; + if (0==numParts()) + { + eof = true; + } + else + { + eof = false; + partHandler->setPart(&partDescs.item(0)); + } +} -IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf &partDescs, bool grouped) +void CSeqPartHandler::stop() { - class CSeqPartHandler : implements IRowStream, public CSimpleInterface + if (partHandler) { - IArrayOf &partDescs; - int part, parts; - bool eof, grouped, someInGroup; - Linked partHandler; + partHandler->stop(); + partHandler.clear(); + } +} - IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - public: - CSeqPartHandler(CPartHandler *_partHandler, IArrayOf &_partDescs, bool _grouped) - : partDescs(_partDescs), partHandler(_partHandler), grouped(_grouped) +const void * CSeqPartHandler::nextRow() +{ + if (unlikely(eof)) + { + return NULL; + } + for (;;) + { + OwnedConstThorRow row = partHandler->nextRow(); + if (likely(row)) { - part = 0; - parts = partDescs.ordinality(); - someInGroup = false; - if (0==parts) - { - eof = true; - } - else - { - eof = false; - partHandler->setPart(&partDescs.item(0)); - } + someInGroup = true; + return row.getClear(); } - virtual void stop() + if (grouped && someInGroup) { - if (partHandler) - { - partHandler->stop(); - partHandler.clear(); - } + someInGroup = false; + return NULL; } - const void *nextRow() + ++part; + if (part >= numParts()) { - if (eof) - { - return NULL; - } - for (;;) - { - OwnedConstThorRow row = partHandler->nextRow(); - if (row) - { - someInGroup = true; - return row.getClear(); - } - if (grouped && someInGroup) - { - someInGroup = false; - return NULL; - } - ++part; - if (part >= parts) - { - partHandler->stop(); - partHandler.clear(); - eof = true; - return NULL; - } - partHandler->setPart(&partDescs.item(part)); - } + partHandler->stop(); + partHandler.clear(); + eof = true; + return NULL; } - }; + partHandler->setPart(&partDescs.item(part)); + } +} + +CSeqPartHandler *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf &partDescs, bool grouped) +{ return new CSeqPartHandler(partHandler, partDescs, grouped); } diff --git a/thorlcr/activities/thactivityutil.ipp b/thorlcr/activities/thactivityutil.ipp index 3e90d92d6aa..ceac73941b3 100644 --- a/thorlcr/activities/thactivityutil.ipp +++ b/thorlcr/activities/thactivityutil.ipp @@ -49,7 +49,23 @@ public: virtual void stop() = 0; }; -IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf &partDescs, bool grouped); +class CSeqPartHandler : public CInterface +{ + IArrayOf &partDescs; + Linked partHandler; + unsigned part; + bool eof, grouped, someInGroup; + +public: + CSeqPartHandler(CPartHandler *_partHandler, IArrayOf &_partDescs, bool _grouped); + + void stop(); + const void *nextRow(); + + inline unsigned numParts() const { return partDescs.ordinality(); } +}; + +CSeqPartHandler * createSequentialPartHandler(CPartHandler *partHandler, IArrayOf &partDescs, bool grouped); #define CATCH_NEXTROWX_CATCH \ catch (IException *_e) \ diff --git a/thorlcr/activities/thdiskbaseslave.ipp b/thorlcr/activities/thdiskbaseslave.ipp index 33fdf423130..c7a4ef28dc4 100644 --- a/thorlcr/activities/thdiskbaseslave.ipp +++ b/thorlcr/activities/thdiskbaseslave.ipp @@ -56,7 +56,6 @@ public: { merged.merge(closedPartFileStats); } - virtual unsigned __int64 queryProgress() { return 0; } // IThorDiskCallback virtual offset_t getFilePosition(const void * row); diff --git a/thorlcr/activities/xmlread/thxmlreadslave.cpp b/thorlcr/activities/xmlread/thxmlreadslave.cpp index 6cb5a9dc499..98ac9fd93a9 100644 --- a/thorlcr/activities/xmlread/thxmlreadslave.cpp +++ b/thorlcr/activities/xmlread/thxmlreadslave.cpp @@ -37,7 +37,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase typedef CDiskReadSlaveActivityBase PARENT; IHThorXmlReadArg *helper; - IRowStream *out; + Owned partSequencer; rowcount_t limit; rowcount_t stopAfter; @@ -214,7 +214,6 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase public: CXmlReadSlaveActivity(CGraphElementBase *_container) : CDiskReadSlaveActivityBase(_container, nullptr) { - out = NULL; helper = (IHThorXmlReadArg *)queryHelper(); stopAfter = (rowcount_t)helper->getChooseNLimit(); if (helper->getFlags() & TDRlimitskips) @@ -225,7 +224,6 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase } ~CXmlReadSlaveActivity() { - ::Release(out); } virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override { @@ -234,11 +232,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase } virtual void kill() { - if (out) - { - out->Release(); - out = NULL; - } + partSequencer.clear(); CDiskReadSlaveActivityBase::kill(); } @@ -247,14 +241,14 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase { ActivityTimer s(slaveTimerStats, timeActivities); CDiskReadSlaveActivityBase::start(); - out = createSequentialPartHandler(partHandler, partDescs, false); + partSequencer.setown(createSequentialPartHandler(partHandler, partDescs, false)); } virtual void stop() override { - if (out) + if (partSequencer) { - out->Release(); - out = NULL; + partSequencer->stop(); + partSequencer.clear(); } PARENT::stop(); } @@ -262,8 +256,8 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase CATCH_NEXTROW() { ActivityTimer t(slaveTimerStats, timeActivities); - OwnedConstThorRow row = out->nextRow(); - if (!row) + OwnedConstThorRow row = partSequencer->nextRow(); + if (unlikely(!row)) return NULL; rowcount_t c = getDataLinkCount(); if (stopAfter && (c >= stopAfter)) // NB: only slave limiter, global performed in chained choosen activity