Skip to content

Commit

Permalink
HPCC-30285 Default cloud logical files to compressed
Browse files Browse the repository at this point in the history
Configurable via plane attribute 'compressLogicalFiles'

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Sep 29, 2023
1 parent 9a9066c commit 60885c5
Show file tree
Hide file tree
Showing 42 changed files with 326 additions and 193 deletions.
22 changes: 22 additions & 0 deletions common/thorhelper/engineerr.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*##############################################################################
HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */

#pragma once

#include "errorlist.h"

#define ENGINEERR_EXTEND_CLUSTER_WRITE ENGINE_ERROR_START
#define ENGINEERR_MIXED_COMPRESSED_WRITE ENGINE_ERROR_START+1
#define ENGINEERR_FILE_TYPE_MISMATCH ENGINE_ERROR_START+2
#define ENGINEERR_MISSING_OPTIONAL_FILE ENGINE_ERROR_START+3
#define ENGINEERR_FILE_UPTODATE ENGINE_ERROR_START+4
2 changes: 2 additions & 0 deletions ecl/hql/hqlatoms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ IAtom * newSetAtom;
IAtom * _nlpParse_Atom;
IAtom * noBoundCheckAtom;
IAtom * noCaseAtom;
IAtom * noCompressAtom;
IAtom * noConstAtom;
IAtom * _noDuplicate_Atom;
IAtom * nofoldAtom;
Expand Down Expand Up @@ -782,6 +783,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM)
MAKESYSATOM(nlpParse);
MAKEATOM(noBoundCheck);
MAKEATOM(noCase);
MAKEATOM(noCompress);
MAKEATOM(noConst);
MAKESYSATOM(noDuplicate);
MAKEATOM(nofold);
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqlatoms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ extern HQL_API IAtom * newSetAtom;
extern HQL_API IAtom * _nlpParse_Atom;
extern HQL_API IAtom * noBoundCheckAtom;
extern HQL_API IAtom * noCaseAtom;
extern HQL_API IAtom * noCompressAtom;
extern HQL_API IAtom * noConstAtom;
extern HQL_API IAtom * _noDuplicate_Atom;
extern HQL_API IAtom * nofoldAtom;
Expand Down
25 changes: 17 additions & 8 deletions ecl/hql/hqlgram.y
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
TOK_TRUE
TYPE
TYPEOF
UNCOMPRESSED
UNICODEORDER
UNGROUP
UNLIKELY
Expand Down Expand Up @@ -3490,6 +3491,21 @@ outputFlags
{ $$.setExpr(createComma($1.getExpr(), $3.getExpr())); $$.setPosition($1); }
;

compressionOptions
: COMPRESSED {
$$.setExpr(createAttribute(compressedAtom));
$$.setPosition($1);
}
| UNCOMPRESSED
{
$$.setExpr(createAttribute(noCompressAtom));
$$.setPosition($1);
}
| __COMPRESSED__ {
$$.setExpr(createAttribute(__compressed__Atom));
$$.setPosition($1);
}

outputFlag
: EXTEND {
$$.setExpr(createAttribute(extendAtom));
Expand All @@ -3505,14 +3521,7 @@ outputFlag
$$.setExpr(createExprAttribute(csvAtom, args));
$$.setPosition($1);
}
| COMPRESSED {
$$.setExpr(createAttribute(compressedAtom));
$$.setPosition($1);
}
| __COMPRESSED__ {
$$.setExpr(createAttribute(__compressed__Atom));
$$.setPosition($1);
}
| compressionOptions
| __GROUPED__ {
$$.setExpr(createAttribute(groupedAtom));
$$.setPosition($1);
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqlgram2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11763,6 +11763,7 @@ static void getTokenText(StringBuffer & msg, int token)
case TOK_TRUE: msg.append("TRUE"); break;
case TYPE: msg.append("TYPE"); break;
case TYPEOF: msg.append("TYPEOF"); break;
case UNCOMPRESSED: msg.append("UNCOMPRESSED"); break;
case UNGROUP: msg.append("UNGROUP"); break;
case UNICODEORDER: msg.append("UNICODEORDER"); break;
case UNLIKELY: msg.append("UNLIKELY"); break;
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqllex.l
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ TRIM { RETURNSYM(TRIM); }
TRUNCATE { RETURNSYM(TRUNCATE); }
TYPE { RETURNSYM(TYPE); }
TYPEOF { RETURNSYM(TYPEOF); }
UNCOMPRESSED { RETURNSYM(UNCOMPRESSED); }
UNICODEORDER { RETURNSYM(UNICODEORDER); }
UNGROUP { RETURNSYM(UNGROUP); }
UNLIKELY { RETURNSYM(UNLIKELY); }
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/reservedwords.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ static const char * eclReserved14[] = { //Attribute functions (some might actual
"timelimit",
"timeout",
"token",
"uncompressed",
"unstable",
"update",
"use",
Expand Down
1 change: 1 addition & 0 deletions ecl/hqlcpp/hqlhtcpp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11068,6 +11068,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutput(BuildCtx & ctx, IHqlExp
if (expr->hasAttribute(groupedAtom)) flags.append("|TDXgrouped");
if (expr->hasAttribute(compressedAtom)) flags.append("|TDWnewcompress");
if (expr->hasAttribute(__compressed__Atom)) flags.append("|TDXcompress");
if (expr->hasAttribute(noCompressAtom)) flags.append("|TDWnocompress");
if (expr->hasAttribute(extendAtom)) flags.append("|TDWextend");
if (expr->hasAttribute(overwriteAtom)) flags.append("|TDWoverwrite");
if (expr->hasAttribute(noOverwriteAtom)) flags.append("|TDWnooverwrite");
Expand Down
33 changes: 27 additions & 6 deletions ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "thorread.hpp"

#include "ws_dfsclient.hpp"
#include "hthorerr.hpp"


#define EMPTY_LOOP_LIMIT 1000
Expand Down Expand Up @@ -361,6 +362,7 @@ ClusterWriteHandler *createClusterWriteHandler(IAgentContext &agent, IHThorIndex
getDefaultStoragePlane(defaultCluster);
Owned<CHThorClusterWriteHandler> clusterHandler;
unsigned clusterIdx = 0;

while(true)
{
OwnedRoxieString helperCluster(iwHelper ? iwHelper->getCluster(clusterIdx++) : dwHelper->getCluster(clusterIdx++));
Expand All @@ -372,10 +374,10 @@ ClusterWriteHandler *createClusterWriteHandler(IAgentContext &agent, IHThorIndex
}
if (!cluster)
break;
if(!clusterHandler)
if (!clusterHandler)
{
if(extend)
throw MakeStringException(0, "Cannot combine EXTEND and CLUSTER flags on disk write of file %s", lfn);
if (extend)
throw makeStringExceptionV(ENGINEERR_EXTEND_CLUSTER_WRITE, "Cannot combine EXTEND and CLUSTER flags on disk write of file %s", lfn);
clusterHandler.setown(new CHThorClusterWriteHandler(lfn, "OUTPUT", agent));
}
clusterHandler->addCluster(cluster);
Expand Down Expand Up @@ -540,6 +542,20 @@ void CHThorDiskWriteActivity::resolve()
}

clusterHandler.setown(createClusterWriteHandler(agent, NULL, &helper, dfsLogicalName.get(), filename, extend, false));
StringBuffer planeName;
if (clusterHandler)
{
StringArray clusterNames;
clusterHandler->getClusters(clusterNames);
planeName.set(clusterNames.item(0)); // NB: only bother with 1st, if multiple createClusterWriteHandler validates if same
}
else
getDefaultStoragePlane(planeName);
bool outputCompressionDefault = agent.queryWorkUnit()->getDebugValueBool("compressAllOutputs", isContainerized());
outputPlaneCompressed = outputCompressionDefault;
Owned<IPropertyTree> plane = getStoragePlane(planeName);
if (plane)
outputPlaneCompressed = plane->getPropBool("@compressLogicalFiles", outputCompressionDefault);
}
}
else
Expand All @@ -560,7 +576,13 @@ void CHThorDiskWriteActivity::open()
Linked<IRecordSize> groupedMeta = input->queryOutputMeta()->querySerializedDiskMeta();
if (grouped)
groupedMeta.setown(createDeltaRecordSize(groupedMeta, +1));
blockcompressed = checkWriteIsCompressed(helper.getFlags(), serializedOutputMeta.getFixedSize(), grouped);//TDWnewcompress for new compression, else check for row compression
blockcompressed=false;
if (0 == (helper.getFlags() & TDWnocompress))
{
blockcompressed = checkWriteIsCompressed(helper.getFlags(), serializedOutputMeta.getFixedSize(), grouped);//TDWnewcompress for new compression, else check for row compression
if (!blockcompressed) // if ECL doesn't specify, default to plane definition
blockcompressed = outputPlaneCompressed;
}
void *ekey;
size32_t ekeylen;
helper.getEncryptKey(ekeylen,ekey);
Expand Down Expand Up @@ -8314,7 +8336,6 @@ void CHThorDiskReadBaseActivity::stop()
CHThorActivityBase::stop();
}

#define TE_FileTypeMismatch 10138 // NB: duplicated from thorlcr/shared/thexception.hpp, but be moved to common header
void CHThorDiskReadBaseActivity::checkFileType(IDistributedFile *file)
{
if (rt_csv == readType)
Expand Down Expand Up @@ -8346,7 +8367,7 @@ void CHThorDiskReadBaseActivity::checkFileType(IDistributedFile *file)
return;
if (!strieq(kind, expectedType))
{
Owned<IException> e = makeStringExceptionV(TE_FileTypeMismatch, "File format mismatch reading file: '%s'. Expected type '%s', but file is type '%s'", file->queryLogicalName(), expectedType, kind);
Owned<IException> e = makeStringExceptionV(ENGINEERR_FILE_TYPE_MISMATCH, "File format mismatch reading file: '%s'. Expected type '%s', but file is type '%s'", file->queryLogicalName(), expectedType, kind);
if (!warningOnly)
throw e.getClear();
StringBuffer tmp;
Expand Down
1 change: 1 addition & 0 deletions ecl/hthor/hthor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ protected:
bool grouped;
bool blockcompressed;
bool encrypted;
bool outputPlaneCompressed = false;
CachedOutputMetaData serializedOutputMeta;
offset_t uncompressedBytesWritten;
Owned<IExtRowWriter> outSeq;
Expand Down
16 changes: 16 additions & 0 deletions ecl/hthor/hthorerr.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*##############################################################################
HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */

#pragma once

#include "engineerr.hpp"
5 changes: 5 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,11 @@
"type": "integer",
"default": 0
},
"compressLogicalFiles" : {
"description": "Compress all logical file outputs on this plane.",
"type": "boolean",
"default": true
},
"eclwatchVisible": {
"type": "boolean"
},
Expand Down
29 changes: 27 additions & 2 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
#include "keybuild.hpp"
#include "thorstrand.hpp"
#include "rtldynfield.hpp"
#include "engineerr.hpp"

#define MAX_HTTP_HEADERSIZE 8000

Expand Down Expand Up @@ -11902,6 +11903,7 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
bool overwrite;
bool encrypted;
bool grouped;
bool outputPlaneCompressed = false;
IHThorDiskWriteArg &helper;
Owned<IRecordSize> diskmeta;
Owned<IRoxieWriteHandler> writer;
Expand Down Expand Up @@ -11959,13 +11961,22 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
assertex((helper.getFlags() & TDXtemporary) == 0);
StringArray clusters;
unsigned clusterIdx = 0;
bool outputCompressionDefault = isContainerized();
while(true)
{
OwnedRoxieString cluster(helper.getCluster(clusterIdx));
if(!cluster)
break;
clusters.append(cluster);
clusterIdx++;

if (1 == clusterIdx)
{
// establish default compression from 1st plane, but ECL compression attributes take precedence
Owned<IPropertyTree> plane = getStoragePlane(cluster);
if (plane)
outputPlaneCompressed = plane->getPropBool("@compressLogicalFiles", outputCompressionDefault);
}
}
if (clusters.length())
{
Expand All @@ -11976,7 +11987,12 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
{
StringBuffer defaultCluster;
if (getDefaultStoragePlane(defaultCluster))
{
clusters.append(defaultCluster);
Owned<IPropertyTree> plane = getStoragePlane(defaultCluster);
if (plane)
outputPlaneCompressed = plane->getPropBool("@compressLogicalFiles", outputCompressionDefault);
}
else if (roxieName.length())
clusters.append(roxieName.str());
else
Expand All @@ -11997,8 +12013,12 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
diskmeta.set(helper.queryDiskRecordSize()->querySerializedDiskMeta());
if (grouped)
diskmeta.setown(createDeltaRecordSize(diskmeta, +1));
size32_t fixedSize = diskmeta->getFixedSize();
blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && ((0 == fixedSize) || (fixedSize >= MIN_ROWCOMPRESS_RECSIZE)))); //always use new compression
blockcompressed = false;
if (0 == (helper.getFlags() & TDWnocompress))
{
size32_t fixedSize = diskmeta->getFixedSize();
blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && ((0 == fixedSize) || (fixedSize >= MIN_ROWCOMPRESS_RECSIZE)))); //always use new compression
}
encrypted = false; // set later
tallycrc = true;
uncompressedBytesWritten = 0;
Expand Down Expand Up @@ -12031,6 +12051,11 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
encrypted = true;
blockcompressed = true;
}
else
{
if ((0 == (helper.getFlags() & TDWnocompress)) && !blockcompressed)
blockcompressed = outputPlaneCompressed;
}
if (blockcompressed)
io.setown(createCompressedFileWriter(writer->queryFile(), (diskmeta->isFixedSize() ? diskmeta->getFixedSize() : 0), extend, true, ecomp, COMPRESS_METHOD_LZ4));
else
Expand Down
1 change: 1 addition & 0 deletions rtl/include/eclhelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,7 @@ enum
TDWupdatecrc = 0x80000, // has format crc
TDWexpires = 0x100000,
TDWrestricted = 0x200000,
TDWnocompress = 0x400000,
};

//flags for thor index read
Expand Down
8 changes: 8 additions & 0 deletions system/include/errorlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@
#define FILEVIEW_ERROR_START 6700
#define FILEVIEW_ERROR_END 6749

//HThor
#define HTHOR_ERROR_START 6800
#define HTHOR_ERROR_END 7099

//Common to engines
#define ENGINE_ERROR_START 7100
#define ENGINE_ERROR_END 7299

#define REMOTE_ERROR_START 8000 // dafilesrv etc - see common/remote/remoteerr.hpp
#define REMOTE_ERROR_END 8099

Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfs.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfsi.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfsirecordof.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfsj.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfsrecordof.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
Loading

0 comments on commit 60885c5

Please sign in to comment.