Skip to content

Commit

Permalink
HPCC-32111 Use compression in global merge activity
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Oct 25, 2024
1 parent ed05bcf commit dac0255
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
17 changes: 14 additions & 3 deletions thorlcr/activities/merge/thmergeslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity
offset_t *partitionpos;
size32_t chunkmaxsize;
unsigned width;
unsigned rwFlags = 0x0; // flags for streams (e.g. compression flags)

class cRemoteStream : implements IRowStream, public CSimpleInterface
{
Expand Down Expand Up @@ -220,7 +221,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity

CThorKeyArray partition(*this, queryRowInterfaces(this),helper->querySerialize(),helper->queryCompare(),helper->queryCompareKey(),helper->queryCompareRowKey());
partition.deserialize(mb,false);
partition.calcPositions(tmpfile,sample);
partition.calcPositions(tmpfile, sample, rwFlags);
partitionpos = new offset_t[width];
unsigned i;
for (i=0;i<width;i++) {
Expand Down Expand Up @@ -270,6 +271,16 @@ class GlobalMergeSlaveActivity : public CSlaveActivity
void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
{
masterMpTag = container.queryJobChannel().deserializeMPTag(data);
rwFlags = DEFAULT_RWFLAGS;
if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
{
StringBuffer compType;
getOpt(THOROPT_COMPRESS_SPILL_TYPE, compType);
unsigned spillCompInfo;
setCompFlag(compType, spillCompInfo);
if (spillCompInfo)
rwFlags |= spillCompInfo;
}
}

void abort()
Expand Down Expand Up @@ -322,7 +333,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity
StringBuffer tmpname;
GetTempFilePath(tmpname,"merge");
tmpfile.setown(createIFile(tmpname.str()));
Owned<IRowWriter> writer = createRowWriter(tmpfile, this);
Owned<IRowWriter> writer = createRowWriter(tmpfile, this, rwFlags);
CThorKeyArray sample(*this, this, helper->querySerialize(), helper->queryCompare(), helper->queryCompareKey(), helper->queryCompareRowKey());
sample.setSampling(MERGE_TRANSFER_BUFFER_SIZE);
ActPrintLog("MERGE: start gather");
Expand Down Expand Up @@ -366,7 +377,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity
offset_t end = partitionpos[idx];
if (pos>=end)
return 0;
Owned<IExtRowStream> rs = createRowStreamEx(tmpfile, queryRowInterfaces(this), pos, end); // this is not good
Owned<IExtRowStream> rs = createRowStreamEx(tmpfile, queryRowInterfaces(this), pos, end, (unsigned __int64)-1, rwFlags); // this is not good
offset_t so = rs->getOffset();
size32_t len = 0;
size32_t chunksize = chunkmaxsize;
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/msort/tsorta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ offset_t CThorKeyArray::findLessRowPos(const void * row)
return getFixedFilePos(p);
}

void CThorKeyArray::calcPositions(IFile *file,CThorKeyArray &sample)
void CThorKeyArray::calcPositions(IFile *file, CThorKeyArray &sample, unsigned rwFlags)
{
// calculates positions based on sample
// not fast!
Expand All @@ -459,7 +459,7 @@ void CThorKeyArray::calcPositions(IFile *file,CThorKeyArray &sample)
if (pos==(offset_t)-1)
pos = 0;
// should do bin-chop for fixed length but initially do sequential search
Owned<IRowStream> s = createRowStreamEx(file, rowif, pos);
Owned<IRowStream> s = createRowStreamEx(file, rowif, pos, 0, (offset_t)-1, rwFlags);
for (;;)
{
OwnedConstThorRow rowcmp = s->nextRow();
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/msort/tsorta.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class THORSORT_API CThorKeyArray
void deserialize(MemoryBuffer &mb,bool append);
void sort();
void createSortedPartition(unsigned pn);
void calcPositions(IFile *file,CThorKeyArray &sample);
void calcPositions(IFile *file, CThorKeyArray &sample, unsigned rwFlags);
void setSampling(size32_t _maxsamplesize, unsigned _divisor=0);
int keyCompare(unsigned a,unsigned b);
offset_t getFixedFilePos(unsigned i);
Expand Down

0 comments on commit dac0255

Please sign in to comment.