Skip to content

Commit

Permalink
Merge pull request #18977 from ghalliday/drcleanup
Browse files Browse the repository at this point in the history
HPCC-32708 Various minor improvements to Thor disk read code

Reviewed-by: Jake Smith <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Sep 25, 2024
2 parents 58b0e60 + 43582f0 commit c4ea787
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 132 deletions.
2 changes: 1 addition & 1 deletion thorlcr/activities/csvread/thcsvrslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase

IHThorCsvReadArg *helper;
StringAttr csvQuote, csvSeparate, csvTerminate, csvEscape;
Owned<IRowStream> out;
Owned<CSeqPartHandler> out;
rowcount_t limit;
rowcount_t stopAfter;
unsigned headerLines;
Expand Down
75 changes: 17 additions & 58 deletions thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,6 @@ class CDiskRecordPartHandler : public CDiskPartHandlerBase
if (in)
merged.mergeStatistic(StNumDiskRowsRead, in->queryProgress());
}
virtual unsigned __int64 queryProgress() override
{
CriticalBlock block(inputCs);
if (in)
return in->queryProgress();
else
return 0;
}
};

/////////////////////////////////////////////////
Expand Down Expand Up @@ -516,34 +508,9 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord
}
};

class PgRecordSize : implements IRecordSize, public CSimpleInterface
{
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

PgRecordSize(IRecordSize *_rcSz)
{
rcSz = LINK(_rcSz);
}
~PgRecordSize()
{
rcSz->Release();
}
virtual size32_t getRecordSize(const void *rec)
{
return rcSz->getRecordSize(rec) + 1;
}
virtual size32_t getFixedSize() const
{
return rcSz->getFixedSize()?(rcSz->getFixedSize()+1):0;
}
private:
IRecordSize *rcSz;
};

public:
bool unsorted = false, countSent = false;
IRowStream *out = nullptr;
Owned<CSeqPartHandler> partSequencer;

IHThorDiskReadArg *helper;

Expand All @@ -558,7 +525,6 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord
}
~CDiskReadSlaveActivity()
{
::Release(out);
}

// IThorSlaveActivity
Expand Down Expand Up @@ -588,11 +554,7 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord
}
virtual void kill()
{
if (out)
{
out->Release();
out = NULL;
}
partSequencer.clear();
CDiskReadSlaveActivityRecord::kill();
}

Expand Down Expand Up @@ -627,28 +589,27 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord
if (limit && (limit < remoteLimit))
remoteLimit = limit+1; // 1 more to ensure triggered when received back. // JCSMORE remote side could handle skip too..
}
out = createSequentialPartHandler(partHandler, partDescs, grouped); // **
partSequencer.setown(createSequentialPartHandler(partHandler, partDescs, grouped));
}
virtual bool isGrouped() const override { return grouped; }

// IRowStream
virtual void stop()
{
if (out)
if (partSequencer)
{
out->stop();
out->Release();
out = NULL;
partSequencer->stop();
partSequencer.clear();
}
PARENT::stop();
}
CATCH_NEXTROW()
{
ActivityTimer t(slaveTimerStats, timeActivities);
if (NULL == out) // guard against, but shouldn't happen
if (unlikely(NULL == partSequencer)) // guard against, but shouldn't happen
return NULL;
OwnedConstThorRow ret = out->nextRow();
if (!ret)
OwnedConstThorRow ret = partSequencer->nextRow();
if (unlikely(!ret))
return NULL;
rowcount_t c = getDataLinkCount();
if (stopAfter && (c >= stopAfter)) // NB: only slave limiter, global performed in chained choosen activity
Expand Down Expand Up @@ -739,7 +700,7 @@ class CDiskNormalizeSlave : public CDiskReadSlaveActivityRecord
};

IHThorDiskNormalizeArg *helper;
IRowStream *out = nullptr;
Owned<CSeqPartHandler> partSequencer;

public:
CDiskNormalizeSlave(CGraphElementBase *_container)
Expand All @@ -750,7 +711,6 @@ class CDiskNormalizeSlave : public CDiskReadSlaveActivityRecord
}
~CDiskNormalizeSlave()
{
::Release(out);
}

// IThorSlaveActivity
Expand Down Expand Up @@ -782,28 +742,27 @@ class CDiskNormalizeSlave : public CDiskReadSlaveActivityRecord
else
limit = (rowcount_t)helper->getRowLimit();
stopAfter = (rowcount_t)helper->getChooseNLimit();
out = createSequentialPartHandler(partHandler, partDescs, false);
partSequencer.setown(createSequentialPartHandler(partHandler, partDescs, false));
}
virtual bool isGrouped() const override { return false; }

// IRowStream
virtual void stop()
{
if (out)
if (partSequencer)
{
out->stop();
out->Release();
out = NULL;
partSequencer->stop();
partSequencer.clear();
}
PARENT::stop();
}
CATCH_NEXTROW()
{
ActivityTimer t(slaveTimerStats, timeActivities);
if (!out)
if (unlikely(!partSequencer))
return NULL;
OwnedConstThorRow ret = out->nextRow();
if (!ret)
OwnedConstThorRow ret = partSequencer->nextRow();
if (unlikely(!ret))
return NULL;
rowcount_t c = getDataLinkCount();
if (stopAfter && (c >= stopAfter)) // NB: only slave limiter, global performed in chained choosen activity
Expand Down
105 changes: 48 additions & 57 deletions thorlcr/activities/thactivityutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -866,73 +866,64 @@ StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFil
return filePath;
}

CSeqPartHandler::CSeqPartHandler(CPartHandler *_partHandler, IArrayOf<IPartDescriptor> &_partDescs, bool _grouped)
: partDescs(_partDescs), partHandler(_partHandler), grouped(_grouped)
{
part = 0;
someInGroup = false;
if (0==numParts())
{
eof = true;
}
else
{
eof = false;
partHandler->setPart(&partDescs.item(0));
}
}

IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPartDescriptor> &partDescs, bool grouped)
void CSeqPartHandler::stop()
{
class CSeqPartHandler : implements IRowStream, public CSimpleInterface
if (partHandler)
{
IArrayOf<IPartDescriptor> &partDescs;
int part, parts;
bool eof, grouped, someInGroup;
Linked<CPartHandler> partHandler;
partHandler->stop();
partHandler.clear();
}
}

IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
public:
CSeqPartHandler(CPartHandler *_partHandler, IArrayOf<IPartDescriptor> &_partDescs, bool _grouped)
: partDescs(_partDescs), partHandler(_partHandler), grouped(_grouped)
const void * CSeqPartHandler::nextRow()
{
if (unlikely(eof))
{
return NULL;
}
for (;;)
{
OwnedConstThorRow row = partHandler->nextRow();
if (likely(row))
{
part = 0;
parts = partDescs.ordinality();
someInGroup = false;
if (0==parts)
{
eof = true;
}
else
{
eof = false;
partHandler->setPart(&partDescs.item(0));
}
someInGroup = true;
return row.getClear();
}
virtual void stop()
if (grouped && someInGroup)
{
if (partHandler)
{
partHandler->stop();
partHandler.clear();
}
someInGroup = false;
return NULL;
}
const void *nextRow()
++part;
if (part >= numParts())
{
if (eof)
{
return NULL;
}
for (;;)
{
OwnedConstThorRow row = partHandler->nextRow();
if (row)
{
someInGroup = true;
return row.getClear();
}
if (grouped && someInGroup)
{
someInGroup = false;
return NULL;
}
++part;
if (part >= parts)
{
partHandler->stop();
partHandler.clear();
eof = true;
return NULL;
}
partHandler->setPart(&partDescs.item(part));
}
partHandler->stop();
partHandler.clear();
eof = true;
return NULL;
}
};
partHandler->setPart(&partDescs.item(part));
}
}

CSeqPartHandler *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPartDescriptor> &partDescs, bool grouped)
{
return new CSeqPartHandler(partHandler, partDescs, grouped);
}

18 changes: 17 additions & 1 deletion thorlcr/activities/thactivityutil.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,23 @@ public:
virtual void stop() = 0;
};

IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPartDescriptor> &partDescs, bool grouped);
class CSeqPartHandler : public CInterface
{
IArrayOf<IPartDescriptor> &partDescs;
Linked<CPartHandler> partHandler;
unsigned part;
bool eof, grouped, someInGroup;

public:
CSeqPartHandler(CPartHandler *_partHandler, IArrayOf<IPartDescriptor> &_partDescs, bool _grouped);

void stop();
const void *nextRow();

inline unsigned numParts() const { return partDescs.ordinality(); }
};

CSeqPartHandler * createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPartDescriptor> &partDescs, bool grouped);

#define CATCH_NEXTROWX_CATCH \
catch (IException *_e) \
Expand Down
1 change: 0 additions & 1 deletion thorlcr/activities/thdiskbaseslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public:
{
merged.merge(closedPartFileStats);
}
virtual unsigned __int64 queryProgress() { return 0; }

// IThorDiskCallback
virtual offset_t getFilePosition(const void * row);
Expand Down
22 changes: 8 additions & 14 deletions thorlcr/activities/xmlread/thxmlreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
typedef CDiskReadSlaveActivityBase PARENT;

IHThorXmlReadArg *helper;
IRowStream *out;
Owned<CSeqPartHandler> partSequencer;
rowcount_t limit;
rowcount_t stopAfter;

Expand Down Expand Up @@ -214,7 +214,6 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
public:
CXmlReadSlaveActivity(CGraphElementBase *_container) : CDiskReadSlaveActivityBase(_container, nullptr)
{
out = NULL;
helper = (IHThorXmlReadArg *)queryHelper();
stopAfter = (rowcount_t)helper->getChooseNLimit();
if (helper->getFlags() & TDRlimitskips)
Expand All @@ -225,7 +224,6 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
}
~CXmlReadSlaveActivity()
{
::Release(out);
}
virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
{
Expand All @@ -234,11 +232,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
}
virtual void kill()
{
if (out)
{
out->Release();
out = NULL;
}
partSequencer.clear();
CDiskReadSlaveActivityBase::kill();
}

Expand All @@ -247,23 +241,23 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
{
ActivityTimer s(slaveTimerStats, timeActivities);
CDiskReadSlaveActivityBase::start();
out = createSequentialPartHandler(partHandler, partDescs, false);
partSequencer.setown(createSequentialPartHandler(partHandler, partDescs, false));
}
virtual void stop() override
{
if (out)
if (partSequencer)
{
out->Release();
out = NULL;
partSequencer->stop();
partSequencer.clear();
}
PARENT::stop();
}

CATCH_NEXTROW()
{
ActivityTimer t(slaveTimerStats, timeActivities);
OwnedConstThorRow row = out->nextRow();
if (!row)
OwnedConstThorRow row = partSequencer->nextRow();
if (unlikely(!row))
return NULL;
rowcount_t c = getDataLinkCount();
if (stopAfter && (c >= stopAfter)) // NB: only slave limiter, global performed in chained choosen activity
Expand Down

0 comments on commit c4ea787

Please sign in to comment.