Skip to content

Commit

Permalink
Merge pull request hpcc-systems#19023 from jakesmith/HPCC-32486-hd-co…
Browse files Browse the repository at this point in the history
…mpressed-spilling

HPCC-32486 Use new compressing lookahead classes for HD spilling

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Aug 23, 2024
2 parents 2cdf2a1 + fb4f76d commit 9db5a41
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
15 changes: 14 additions & 1 deletion thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,8 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
StringAttr id; // for tracing
ICompressHandler *compressHandler;
StringBuffer compressOptions;
LookAheadOptions options;
bool newLookAhead = false;
public:
IMPLEMENT_IINTERFACE_USING(CInterface);

Expand Down Expand Up @@ -1126,6 +1128,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
::ActPrintLog(activity, thorDetailedLogLevel, "inputBufferSize : %d, bucketSendSize = %d, pullBufferSize=%d", inputBufferSize, bucketSendSize, pullBufferSize);
targetWriterLimit = activity->getOptUInt(THOROPT_HDIST_TARGETWRITELIMIT);
::ActPrintLog(activity, thorDetailedLogLevel, "targetWriterLimit : %d", targetWriterLimit);

newLookAhead = activity->getOptBool("newlookahead", false);
if (newLookAhead)
populateLookAheadOptions(*activity, options);
}

virtual void beforeDispose()
Expand Down Expand Up @@ -1187,7 +1193,14 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
{
StringBuffer temp;
GetTempFilePath(temp,"hddrecvbuff");
piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf));
if (newLookAhead)
{
options.totalCompressionBufferSize = pullBufferSize; // hd option overrides defaults
ICompressHandler *compressHandler = pullBufferSize ? queryDefaultCompressHandler() : nullptr;
piperd.setown(createCompressedSpillingRowStream(activity, temp.str(), false, rowIf, options, compressHandler));
}
else
piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf));
}
else
piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize));
Expand Down
16 changes: 1 addition & 15 deletions thorlcr/activities/thactivityutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,7 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
allowspill = true;
}

// for "newlookahead" only
if (isContainerized())
{
// JCSMORE - add CJobBase::getTempBlockSize() to calc. once.
StringBuffer planeName;
if (!getDefaultPlane(planeName, "@tempPlane", "temp"))
getDefaultPlane(planeName, "@spillPlane", "spill");
size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1);
if ((size32_t)-1 != blockedSequentialIOSize)
options.storageBlockSize = blockedSequentialIOSize;
}
options.totalCompressionBufferSize = activity.getOptInt(THOROPT_LOOKAHEAD_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024;
options.inMemMaxMem = activity.getOptInt(THOROPT_LOOKAHEAD_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024;
options.writeAheadSize = activity.getOptInt64(THOROPT_LOOKAHEAD_WRITEAHEADK, options.writeAheadSize / 1024) * 1024;
options.tempFileGranularity = activity.getOptInt64(THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY, options.tempFileGranularity / 0x100000) * 0x100000;
populateLookAheadOptions(activity, options);
}
~CRowStreamLookAhead()
{
Expand Down
16 changes: 16 additions & 0 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2957,3 +2957,19 @@ class CRCFileStream: public CSimpleInterface, implements IFileIOStream
}
};

void populateLookAheadOptions(CActivityBase &activity, LookAheadOptions &options)
{
if (isContainerized())
{
StringBuffer planeName;
if (!getDefaultPlane(planeName, "@tempPlane", "temp"))
getDefaultPlane(planeName, "@spillPlane", "spill");
size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1);
if ((size32_t)-1 != blockedSequentialIOSize)
options.storageBlockSize = blockedSequentialIOSize;
}
options.totalCompressionBufferSize = activity.getOptInt(THOROPT_LOOKAHEAD_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024;
options.inMemMaxMem = activity.getOptInt(THOROPT_LOOKAHEAD_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024;
options.writeAheadSize = activity.getOptInt64(THOROPT_LOOKAHEAD_WRITEAHEADK, options.writeAheadSize / 1024) * 1024;
options.tempFileGranularity = activity.getOptInt64(THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY, options.tempFileGranularity / 0x100000) * 0x100000;
}
1 change: 1 addition & 0 deletions thorlcr/thorutil/thbuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,6 @@ interface IRowMultiWriterReader : extends IRowStream
#define DEFAULT_WR_WRITE_GRANULARITY 1000 // Amount writers buffer up before committing to output
extern graph_decl IRowMultiWriterReader *createSharedWriteBuffer(CActivityBase *activity, IThorRowInterfaces *rowif, unsigned limit, unsigned readGranularity=DEFAULT_WR_READ_GRANULARITY, unsigned writeGranularity=DEFAULT_WR_WRITE_GRANULARITY);

extern graph_decl void populateLookAheadOptions(CActivityBase &activity, LookAheadOptions &options);

#endif

0 comments on commit 9db5a41

Please sign in to comment.