diff --git a/common/thorhelper/thorcommon.cpp b/common/thorhelper/thorcommon.cpp index bfbea0d9407..86f91ce0d43 100644 --- a/common/thorhelper/thorcommon.cpp +++ b/common/thorhelper/thorcommon.cpp @@ -1074,16 +1074,18 @@ class CRowStreamReader : public CSimpleInterfaceOf } return false; } - const byte *getNextPrefetchRow() + const byte *getNextPrefetchRow(size32_t & size) { while (true) { ++progress; if (checkEmptyRow()) - return nullptr; + break; + currentRowOffset = prefetchBuffer.tell(); prefetcher->readAhead(prefetchBuffer); bool matched = fieldFilterMatch(prefetchBuffer.queryRow()); + size = prefetchBuffer.tell() - currentRowOffset; checkEog(); if (matched) // NB: prefetchDone() call must be paired with a row returned from prefetchRow() { @@ -1095,6 +1097,7 @@ class CRowStreamReader : public CSimpleInterfaceOf if (checkExitConditions()) break; } + size = 0; return nullptr; } const void *getNextRow() @@ -1194,7 +1197,8 @@ class CRowStreamReader : public CSimpleInterfaceOf { if (translator) { - const byte *row = getNextPrefetchRow(); + size32_t prefetchSize; + const byte *row = getNextPrefetchRow(prefetchSize); if (row) { RtlDynamicRowBuilder rowBuilder(*allocator); @@ -1210,7 +1214,7 @@ class CRowStreamReader : public CSimpleInterfaceOf return nullptr; } - virtual const byte *prefetchRow() override + virtual const void *prefetchRow(size32_t & size) override { // NB: prefetchDone() call must be paired with a row returned from prefetchRow() if (eog) @@ -1224,20 +1228,21 @@ class CRowStreamReader : public CSimpleInterfaceOf eos = true; else { - const byte *row = getNextPrefetchRow(); + const byte *row = getNextPrefetchRow(size); if (row) { if (translator) { translateBuf.setLength(0); MemoryBufferBuilder rowBuilder(translateBuf, 0); - translator->translate(rowBuilder, *fieldCallback, row); + size = translator->translate(rowBuilder, *fieldCallback, row); row = rowBuilder.getSelf(); } return row; } } } + size = 0; return nullptr; } @@ -1248,7 +1253,11 @@ class CRowStreamReader : public CSimpleInterfaceOf virtual void stop() override { - stop(NULL); + if (!eos) + { + eos = true; + clear(); + } } void clear() @@ -1257,15 +1266,10 @@ class CRowStreamReader : public CSimpleInterfaceOf fileio.clear(); } - virtual void stop(CRC32 *crcout) override + virtual CRC32 queryCRC() const override { - if (!eos) { - eos = true; - clear(); - } // NB CRC will only be right if stopped at eos - if (crcout) - *crcout = crccb.crc; + return crccb.crc; } virtual offset_t getOffset() const override diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index baab17d66be..c36f57583d6 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -155,16 +155,15 @@ inline unsigned getCompMethod(const char *compStr) interface IExtRowStream: extends IRowStream { - virtual offset_t getOffset() const = 0; - virtual offset_t getLastRowOffset() const = 0; - virtual unsigned __int64 queryProgress() const = 0; - using IRowStream::stop; - virtual void stop(CRC32 *crcout) = 0; - virtual const byte *prefetchRow() = 0; + virtual offset_t getOffset() const = 0; // Used by merge to limit the size read from disk in CMergeSlave::getRows() + virtual offset_t getLastRowOffset() const = 0; // Used by disk read to deal with virtual file positions. + virtual unsigned __int64 queryProgress() const = 0; // Should probably be getStatistic(StNumRowsRead) + virtual const void *prefetchRow(size32_t & size) = 0; // Used when row does not need to be cloned - e.g. when it will be transformed virtual void prefetchDone() = 0; - virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0; + virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0; // Not used anywhere - should be deleted virtual unsigned __int64 getStatistic(StatisticKind kind) = 0; - virtual void setFilters(IConstArrayOf &filters) = 0; + virtual void setFilters(IConstArrayOf &filters) = 0; // Possibly cleaner as a parameter to createRowStream() + virtual CRC32 queryCRC() const = 0; }; interface IExtRowWriter: extends IRowWriter diff --git a/common/thorhelper/thorread.cpp b/common/thorhelper/thorread.cpp index ecbdca35b43..665fc2c9a3d 100644 --- a/common/thorhelper/thorread.cpp +++ b/common/thorhelper/thorread.cpp @@ -526,7 +526,7 @@ class BinaryDiskRowReader : public LocalDiskRowReader BinaryDiskRowReader(IDiskReadMapping * _mapping); virtual const void *nextRow() override; - virtual const void *nextRow(size32_t & resultSize) override; + virtual const void *prefetchRow(size32_t & resultSize) override; virtual const void * nextRow(MemoryBufferBuilder & builder) override; virtual bool getCursor(MemoryBuffer & cursor) override; virtual void setCursor(MemoryBuffer & cursor) override; @@ -681,7 +681,7 @@ const void *BinaryDiskRowReader::nextRow() //Similar to above, except the code at the end will translate to a local buffer or return the pointer -const void *BinaryDiskRowReader::nextRow(size32_t & resultSize) +const void *BinaryDiskRowReader::prefetchRow(size32_t & resultSize) { return inlineNextRow( [this,&resultSize](size32_t sizeRead, const byte * next) @@ -885,7 +885,7 @@ class CsvDiskRowReader : public ExternalFormatDiskRowReader CsvDiskRowReader(IDiskReadMapping * _mapping); virtual const void *nextRow() override; - virtual const void *nextRow(size32_t & resultSize) override; + virtual const void *prefetchRow(size32_t & resultSize) override; virtual const void *nextRow(MemoryBufferBuilder & builder) override; virtual void stop() override; @@ -1006,7 +1006,7 @@ const void *CsvDiskRowReader::nextRow() //Implementation of IRawRowStream -const void *CsvDiskRowReader::nextRow(size32_t & resultSize) +const void *CsvDiskRowReader::prefetchRow(size32_t & resultSize) { for (;;) { @@ -1232,7 +1232,7 @@ class MarkupDiskRowReader : public ExternalFormatDiskRowReader, implements IXMLS MarkupDiskRowReader(IDiskReadMapping * _mapping, ThorActivityKind _kind); virtual const void *nextRow() override; - virtual const void *nextRow(size32_t & resultSize) override; + virtual const void *prefetchRow(size32_t & resultSize) override; virtual const void *nextRow(MemoryBufferBuilder & builder) override; virtual void stop() override; @@ -1302,7 +1302,7 @@ const void *MarkupDiskRowReader::nextRow() } //Implementation of IRawRowStream -const void *MarkupDiskRowReader::nextRow(size32_t & resultSize) +const void *MarkupDiskRowReader::prefetchRow(size32_t & resultSize) { tempOutputBuffer.clear(); const void * next = nextRow(bufferBuilder); @@ -1494,10 +1494,10 @@ class CompoundProjectRowReader : extends CInterfaceOf, implement virtual void setCursor(MemoryBuffer & cursor) override { rawInputStream->setCursor(cursor); } virtual void stop() override { rawInputStream->stop(); } - virtual const void *nextRow(size32_t & resultSize) override + virtual const void *prefetchRow(size32_t & resultSize) override { size32_t rawInputSize; - const void * next = rawInputStream->nextRow(rawInputSize); + const void * next = rawInputStream->prefetchRow(rawInputSize); if (isSpecialRow(next)) return next; @@ -1512,7 +1512,7 @@ class CompoundProjectRowReader : extends CInterfaceOf, implement virtual const void *nextRow() override { size32_t rawInputSize; - const void * next = rawInputStream->nextRow(rawInputSize); + const void * next = rawInputStream->prefetchRow(rawInputSize); if (isSpecialRow(next)) return next; @@ -1523,7 +1523,7 @@ class CompoundProjectRowReader : extends CInterfaceOf, implement virtual const void *nextRow(MemoryBufferBuilder & builder) override { size32_t rawInputSize; - const void * next = rawInputStream->nextRow(rawInputSize); + const void * next = rawInputStream->prefetchRow(rawInputSize); if (isSpecialRow(next)) return next; @@ -1651,7 +1651,7 @@ class ParquetDiskRowReader : public ExternalFormatDiskRowReader virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) override; virtual const void * nextRow() override; - virtual const void * nextRow(size32_t & resultSize) override; + virtual const void * prefetchRow(size32_t & resultSize) override; virtual const void * nextRow(MemoryBufferBuilder & builder) override; virtual bool getCursor(MemoryBuffer & cursor) override { return parquetFileReader->getCursor(cursor); } virtual void setCursor(MemoryBuffer & cursor) override { parquetFileReader->setCursor(cursor); } @@ -1720,7 +1720,7 @@ const void * ParquetDiskRowReader::nextRow() // Returns temporary rows for filtering/counting etc. // Row is built in temporary buffer and reused. -const void * ParquetDiskRowReader::nextRow(size32_t & resultSize) +const void * ParquetDiskRowReader::prefetchRow(size32_t & resultSize) { tempOutputBuffer.clear(); const void * next = nextRow(bufferBuilder); @@ -1812,7 +1812,7 @@ class RemoteDiskRowReader : public DiskRowReader RemoteDiskRowReader(const char * _format, IDiskReadMapping * _mapping); virtual const void *nextRow() override; - virtual const void *nextRow(size32_t & resultSize) override; + virtual const void *prefetchRow(size32_t & resultSize) override; virtual const void *nextRow(MemoryBufferBuilder & builder) override; virtual bool getCursor(MemoryBuffer & cursor) override; virtual void setCursor(MemoryBuffer & cursor) override; @@ -1988,7 +1988,7 @@ const void *RemoteDiskRowReader::nextRow() //Similar to above, except the code at the end will translate to a local buffer or return the pointer -const void *RemoteDiskRowReader::nextRow(size32_t & resultSize) +const void *RemoteDiskRowReader::prefetchRow(size32_t & resultSize) { return inlineNextRow( [this,&resultSize](size32_t sizeRead, const byte * next) diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index e00fb39bcb7..ea9f998bad3 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -11350,7 +11350,7 @@ const void *CHThorNewDiskReadActivity::nextRow() { //Returns a row in the serialized form of the projected format size32_t nextSize; - const byte * next = (const byte *)inputRowStream->nextRow(nextSize); + const byte * next = (const byte *)inputRowStream->prefetchRow(nextSize); if (!isSpecialRow(next)) { if (likely(!hasMatchFilter || helper.canMatch(next))) @@ -11910,7 +11910,7 @@ const void *CHThorGenericDiskReadActivity::nextRow() { //Returns a row in the serialized form of the projected format size32_t nextSize; - const byte * next = (const byte *)inputRowStream->nextRow(nextSize); + const byte * next = (const byte *)inputRowStream->prefetchRow(nextSize); if (!isSpecialRow(next)) { if (likely(!hasMatchFilter || helper.canMatch(next))) diff --git a/system/jlib/jrowstream.cpp b/system/jlib/jrowstream.cpp index 52868fb4442..d023229e6b2 100644 --- a/system/jlib/jrowstream.cpp +++ b/system/jlib/jrowstream.cpp @@ -34,7 +34,7 @@ class NullDiskRowStream : public CInterfaceOf virtual void stop() { } - virtual const void *nextRow(size32_t & size) override + virtual const void *prefetchRow(size32_t & size) override { size = 0; return eofRow; diff --git a/system/jlib/jrowstream.hpp b/system/jlib/jrowstream.hpp index 8c60f44cb45..6f83ed4e848 100644 --- a/system/jlib/jrowstream.hpp +++ b/system/jlib/jrowstream.hpp @@ -59,13 +59,13 @@ interface IDiskRowStream : extends IRowStream virtual void setCursor(MemoryBuffer & cursor) = 0; // rows returned are only valid until next call. Size is the number of bytes in the row. - virtual const void *nextRow(size32_t & size)=0; + virtual const void * prefetchRow(size32_t & size)=0; - inline const void *ungroupedNextRow(size32_t & size) // size will not include the size of the eog + inline const void *ungroupedPrefetchRow(size32_t & size) // size will not include the size of the eog { for (;;) { - const void *ret = nextRow(size); + const void *ret = prefetchRow(size); if (likely(!isEndOfGroup(ret))) return ret; } diff --git a/thorlcr/activities/diskread/thdiskreadslave.cpp b/thorlcr/activities/diskread/thdiskreadslave.cpp index 839b805fe52..1cb6f200479 100644 --- a/thorlcr/activities/diskread/thdiskreadslave.cpp +++ b/thorlcr/activities/diskread/thdiskreadslave.cpp @@ -169,9 +169,10 @@ class CDiskRecordPartHandler : public CDiskPartHandlerBase { return in->nextRow(); } - inline const byte *prefetchRow() + inline const void *prefetchRow() { - return in->prefetchRow(); + size32_t size; + return in->prefetchRow(size); } inline void prefetchDone() { @@ -397,7 +398,8 @@ void CDiskRecordPartHandler::close(CRC32 &fileCRC) { closedPartFileStats.mergeStatistic(StNumDiskRowsRead, partStream->queryProgress()); activity.mergeFileStats(partDesc, partStream); - partStream->stop(&fileCRC); + partStream->stop(); + fileCRC = partStream->queryCRC(); } } @@ -425,7 +427,7 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord { for (;;) { - const byte *row = CDiskRecordPartHandler::prefetchRow(); + const void *row = CDiskRecordPartHandler::prefetchRow(); if (!row) { if (!activity.grouped) diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 260b58a5b55..72ba5da6186 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -381,14 +381,13 @@ class CFileSizeTracker : public CInterface }; // simple class which takes ownership of the underlying file and deletes it on destruction -class graph_decl CFileOwner : public CSimpleInterface, implements IInterface +class graph_decl CFileOwner : public CSimpleInterfaceOf { Linked iFile; Linked fileSizeTracker; offset_t fileSize = 0; public: - IMPLEMENT_IINTERFACE_USING(CSimpleInterface); CFileOwner(IFile *_iFile, CFileSizeTracker *_fileSizeTracker = nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker) { } @@ -429,12 +428,11 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf } // IExtRowStream virtual const void *nextRow() override { return stream->nextRow(); } - virtual void stop() override { stream->stop(NULL); } + virtual void stop() override { stream->stop(); } virtual offset_t getOffset() const override { return stream->getOffset(); } virtual offset_t getLastRowOffset() const override { return stream->getLastRowOffset(); } virtual unsigned __int64 queryProgress() const override { return stream->queryProgress(); } - virtual void stop(CRC32 *crcout) override { stream->stop(crcout); } - virtual const byte *prefetchRow() override { return stream->prefetchRow(); } + virtual const void *prefetchRow(size32_t & size) override { return stream->prefetchRow(size); } virtual void prefetchDone() override { stream->prefetchDone(); } virtual void reinit(offset_t offset, offset_t len, unsigned __int64 maxRows) override { @@ -448,6 +446,7 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf { return stream->setFilters(filters); } + virtual CRC32 queryCRC() const override { return stream->queryCRC(); } }; #define DEFAULT_THORMASTERPORT 20000