Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32741 Refactor code to work towards a Thor generic disk activity #19165

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions common/thorhelper/thorcommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,16 +1074,18 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
}
return false;
}
const byte *getNextPrefetchRow()
const byte *getNextPrefetchRow(size32_t & size)
{
while (true)
{
++progress;
if (checkEmptyRow())
return nullptr;
break;

currentRowOffset = prefetchBuffer.tell();
prefetcher->readAhead(prefetchBuffer);
bool matched = fieldFilterMatch(prefetchBuffer.queryRow());
size = prefetchBuffer.tell() - currentRowOffset;
checkEog();
if (matched) // NB: prefetchDone() call must be paired with a row returned from prefetchRow()
{
Expand All @@ -1095,6 +1097,7 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
if (checkExitConditions())
break;
}
size = 0;
return nullptr;
}
const void *getNextRow()
Expand Down Expand Up @@ -1194,7 +1197,8 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
{
if (translator)
{
const byte *row = getNextPrefetchRow();
size32_t prefetchSize;
const byte *row = getNextPrefetchRow(prefetchSize);
if (row)
{
RtlDynamicRowBuilder rowBuilder(*allocator);
Expand All @@ -1210,7 +1214,7 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
return nullptr;
}

virtual const byte *prefetchRow() override
virtual const void *prefetchRow(size32_t & size) override
{
// NB: prefetchDone() call must be paired with a row returned from prefetchRow()
if (eog)
Expand All @@ -1224,20 +1228,21 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
eos = true;
else
{
const byte *row = getNextPrefetchRow();
const byte *row = getNextPrefetchRow(size);
if (row)
{
if (translator)
{
translateBuf.setLength(0);
MemoryBufferBuilder rowBuilder(translateBuf, 0);
translator->translate(rowBuilder, *fieldCallback, row);
size = translator->translate(rowBuilder, *fieldCallback, row);
row = rowBuilder.getSelf();
}
return row;
}
}
}
size = 0;
return nullptr;
}

Expand All @@ -1248,7 +1253,11 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>

virtual void stop() override
{
stop(NULL);
if (!eos)
{
eos = true;
clear();
}
}

void clear()
Expand All @@ -1257,15 +1266,10 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
fileio.clear();
}

virtual void stop(CRC32 *crcout) override
virtual CRC32 queryCRC() const override
{
if (!eos) {
eos = true;
clear();
}
// NB CRC will only be right if stopped at eos
if (crcout)
*crcout = crccb.crc;
return crccb.crc;
}

virtual offset_t getOffset() const override
Expand Down
15 changes: 7 additions & 8 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,15 @@ inline unsigned getCompMethod(const char *compStr)

interface IExtRowStream: extends IRowStream
{
virtual offset_t getOffset() const = 0;
virtual offset_t getLastRowOffset() const = 0;
virtual unsigned __int64 queryProgress() const = 0;
using IRowStream::stop;
virtual void stop(CRC32 *crcout) = 0;
virtual const byte *prefetchRow() = 0;
virtual offset_t getOffset() const = 0; // Used by merge to limit the size read from disk in CMergeSlave::getRows()
virtual offset_t getLastRowOffset() const = 0; // Used by disk read to deal with virtual file positions.
virtual unsigned __int64 queryProgress() const = 0; // Should probably be getStatistic(StNumRowsRead)
virtual const void *prefetchRow(size32_t & size) = 0; // Used when row does not need to be cloned - e.g. when it will be transformed
virtual void prefetchDone() = 0;
virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0;
virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0; // Not used anywhere - should be deleted
virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
virtual void setFilters(IConstArrayOf<IFieldFilter> &filters) = 0;
virtual void setFilters(IConstArrayOf<IFieldFilter> &filters) = 0; // Possibly cleaner as a parameter to createRowStream()
virtual CRC32 queryCRC() const = 0;
};

interface IExtRowWriter: extends IRowWriter
Expand Down
28 changes: 14 additions & 14 deletions common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class BinaryDiskRowReader : public LocalDiskRowReader
BinaryDiskRowReader(IDiskReadMapping * _mapping);

virtual const void *nextRow() override;
virtual const void *nextRow(size32_t & resultSize) override;
virtual const void *prefetchRow(size32_t & resultSize) override;
virtual const void * nextRow(MemoryBufferBuilder & builder) override;
virtual bool getCursor(MemoryBuffer & cursor) override;
virtual void setCursor(MemoryBuffer & cursor) override;
Expand Down Expand Up @@ -681,7 +681,7 @@ const void *BinaryDiskRowReader::nextRow()


//Similar to above, except the code at the end will translate to a local buffer or return the pointer
const void *BinaryDiskRowReader::nextRow(size32_t & resultSize)
const void *BinaryDiskRowReader::prefetchRow(size32_t & resultSize)
{
return inlineNextRow(
[this,&resultSize](size32_t sizeRead, const byte * next)
Expand Down Expand Up @@ -885,7 +885,7 @@ class CsvDiskRowReader : public ExternalFormatDiskRowReader
CsvDiskRowReader(IDiskReadMapping * _mapping);

virtual const void *nextRow() override;
virtual const void *nextRow(size32_t & resultSize) override;
virtual const void *prefetchRow(size32_t & resultSize) override;
virtual const void *nextRow(MemoryBufferBuilder & builder) override;

virtual void stop() override;
Expand Down Expand Up @@ -1006,7 +1006,7 @@ const void *CsvDiskRowReader::nextRow()


//Implementation of IRawRowStream
const void *CsvDiskRowReader::nextRow(size32_t & resultSize)
const void *CsvDiskRowReader::prefetchRow(size32_t & resultSize)
{
for (;;)
{
Expand Down Expand Up @@ -1232,7 +1232,7 @@ class MarkupDiskRowReader : public ExternalFormatDiskRowReader, implements IXMLS
MarkupDiskRowReader(IDiskReadMapping * _mapping, ThorActivityKind _kind);

virtual const void *nextRow() override;
virtual const void *nextRow(size32_t & resultSize) override;
virtual const void *prefetchRow(size32_t & resultSize) override;
virtual const void *nextRow(MemoryBufferBuilder & builder) override;

virtual void stop() override;
Expand Down Expand Up @@ -1302,7 +1302,7 @@ const void *MarkupDiskRowReader::nextRow()
}

//Implementation of IRawRowStream
const void *MarkupDiskRowReader::nextRow(size32_t & resultSize)
const void *MarkupDiskRowReader::prefetchRow(size32_t & resultSize)
{
tempOutputBuffer.clear();
const void * next = nextRow(bufferBuilder);
Expand Down Expand Up @@ -1494,10 +1494,10 @@ class CompoundProjectRowReader : extends CInterfaceOf<IDiskRowStream>, implement
virtual void setCursor(MemoryBuffer & cursor) override { rawInputStream->setCursor(cursor); }
virtual void stop() override { rawInputStream->stop(); }

virtual const void *nextRow(size32_t & resultSize) override
virtual const void *prefetchRow(size32_t & resultSize) override
{
size32_t rawInputSize;
const void * next = rawInputStream->nextRow(rawInputSize);
const void * next = rawInputStream->prefetchRow(rawInputSize);
if (isSpecialRow(next))
return next;

Expand All @@ -1512,7 +1512,7 @@ class CompoundProjectRowReader : extends CInterfaceOf<IDiskRowStream>, implement
virtual const void *nextRow() override
{
size32_t rawInputSize;
const void * next = rawInputStream->nextRow(rawInputSize);
const void * next = rawInputStream->prefetchRow(rawInputSize);
if (isSpecialRow(next))
return next;

Expand All @@ -1523,7 +1523,7 @@ class CompoundProjectRowReader : extends CInterfaceOf<IDiskRowStream>, implement
virtual const void *nextRow(MemoryBufferBuilder & builder) override
{
size32_t rawInputSize;
const void * next = rawInputStream->nextRow(rawInputSize);
const void * next = rawInputStream->prefetchRow(rawInputSize);
if (isSpecialRow(next))
return next;

Expand Down Expand Up @@ -1651,7 +1651,7 @@ class ParquetDiskRowReader : public ExternalFormatDiskRowReader
virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) override;

virtual const void * nextRow() override;
virtual const void * nextRow(size32_t & resultSize) override;
virtual const void * prefetchRow(size32_t & resultSize) override;
virtual const void * nextRow(MemoryBufferBuilder & builder) override;
virtual bool getCursor(MemoryBuffer & cursor) override { return parquetFileReader->getCursor(cursor); }
virtual void setCursor(MemoryBuffer & cursor) override { parquetFileReader->setCursor(cursor); }
Expand Down Expand Up @@ -1720,7 +1720,7 @@ const void * ParquetDiskRowReader::nextRow()

// Returns temporary rows for filtering/counting etc.
// Row is built in temporary buffer and reused.
const void * ParquetDiskRowReader::nextRow(size32_t & resultSize)
const void * ParquetDiskRowReader::prefetchRow(size32_t & resultSize)
{
tempOutputBuffer.clear();
const void * next = nextRow(bufferBuilder);
Expand Down Expand Up @@ -1812,7 +1812,7 @@ class RemoteDiskRowReader : public DiskRowReader
RemoteDiskRowReader(const char * _format, IDiskReadMapping * _mapping);

virtual const void *nextRow() override;
virtual const void *nextRow(size32_t & resultSize) override;
virtual const void *prefetchRow(size32_t & resultSize) override;
virtual const void *nextRow(MemoryBufferBuilder & builder) override;
virtual bool getCursor(MemoryBuffer & cursor) override;
virtual void setCursor(MemoryBuffer & cursor) override;
Expand Down Expand Up @@ -1988,7 +1988,7 @@ const void *RemoteDiskRowReader::nextRow()


//Similar to above, except the code at the end will translate to a local buffer or return the pointer
const void *RemoteDiskRowReader::nextRow(size32_t & resultSize)
const void *RemoteDiskRowReader::prefetchRow(size32_t & resultSize)
{
return inlineNextRow(
[this,&resultSize](size32_t sizeRead, const byte * next)
Expand Down
4 changes: 2 additions & 2 deletions ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11350,7 +11350,7 @@ const void *CHThorNewDiskReadActivity::nextRow()
{
//Returns a row in the serialized form of the projected format
size32_t nextSize;
const byte * next = (const byte *)inputRowStream->nextRow(nextSize);
const byte * next = (const byte *)inputRowStream->prefetchRow(nextSize);
if (!isSpecialRow(next))
{
if (likely(!hasMatchFilter || helper.canMatch(next)))
Expand Down Expand Up @@ -11910,7 +11910,7 @@ const void *CHThorGenericDiskReadActivity::nextRow()
{
//Returns a row in the serialized form of the projected format
size32_t nextSize;
const byte * next = (const byte *)inputRowStream->nextRow(nextSize);
const byte * next = (const byte *)inputRowStream->prefetchRow(nextSize);
if (!isSpecialRow(next))
{
if (likely(!hasMatchFilter || helper.canMatch(next)))
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jrowstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class NullDiskRowStream : public CInterfaceOf<IDiskRowStream>
virtual void stop()
{
}
virtual const void *nextRow(size32_t & size) override
virtual const void *prefetchRow(size32_t & size) override
{
size = 0;
return eofRow;
Expand Down
6 changes: 3 additions & 3 deletions system/jlib/jrowstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ interface IDiskRowStream : extends IRowStream
virtual void setCursor(MemoryBuffer & cursor) = 0;

// rows returned are only valid until next call. Size is the number of bytes in the row.
virtual const void *nextRow(size32_t & size)=0;
virtual const void * prefetchRow(size32_t & size)=0;

inline const void *ungroupedNextRow(size32_t & size) // size will not include the size of the eog
inline const void *ungroupedPrefetchRow(size32_t & size) // size will not include the size of the eog
{
for (;;)
{
const void *ret = nextRow(size);
const void *ret = prefetchRow(size);
if (likely(!isEndOfGroup(ret)))
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ class CDiskRecordPartHandler : public CDiskPartHandlerBase
{
return in->nextRow();
}
inline const byte *prefetchRow()
inline const void *prefetchRow()
{
return in->prefetchRow();
size32_t size;
return in->prefetchRow(size);
}
inline void prefetchDone()
{
Expand Down Expand Up @@ -397,7 +398,8 @@ void CDiskRecordPartHandler::close(CRC32 &fileCRC)
{
closedPartFileStats.mergeStatistic(StNumDiskRowsRead, partStream->queryProgress());
activity.mergeFileStats(partDesc, partStream);
partStream->stop(&fileCRC);
partStream->stop();
fileCRC = partStream->queryCRC();
}
}

Expand Down Expand Up @@ -425,7 +427,7 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord
{
for (;;)
{
const byte *row = CDiskRecordPartHandler::prefetchRow();
const void *row = CDiskRecordPartHandler::prefetchRow();
if (!row)
{
if (!activity.grouped)
Expand Down
9 changes: 4 additions & 5 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,13 @@ class CFileSizeTracker : public CInterface
};

// simple class which takes ownership of the underlying file and deletes it on destruction
class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
class graph_decl CFileOwner : public CSimpleInterfaceOf<IInterface>
{
Linked<IFile> iFile;
Linked<CFileSizeTracker> fileSizeTracker;
offset_t fileSize = 0;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
CFileOwner(IFile *_iFile, CFileSizeTracker *_fileSizeTracker = nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker)
{
}
Expand Down Expand Up @@ -429,12 +428,11 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
}
// IExtRowStream
virtual const void *nextRow() override { return stream->nextRow(); }
virtual void stop() override { stream->stop(NULL); }
virtual void stop() override { stream->stop(); }
virtual offset_t getOffset() const override { return stream->getOffset(); }
virtual offset_t getLastRowOffset() const override { return stream->getLastRowOffset(); }
virtual unsigned __int64 queryProgress() const override { return stream->queryProgress(); }
virtual void stop(CRC32 *crcout) override { stream->stop(crcout); }
virtual const byte *prefetchRow() override { return stream->prefetchRow(); }
virtual const void *prefetchRow(size32_t & size) override { return stream->prefetchRow(size); }
virtual void prefetchDone() override { stream->prefetchDone(); }
virtual void reinit(offset_t offset, offset_t len, unsigned __int64 maxRows) override
{
Expand All @@ -448,6 +446,7 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
{
return stream->setFilters(filters);
}
virtual CRC32 queryCRC() const override { return stream->queryCRC(); }
};

#define DEFAULT_THORMASTERPORT 20000
Expand Down
Loading