Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30285 Default cloud logical files to compressed #17770

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions common/thorhelper/engineerr.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*##############################################################################
HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */

#pragma once

#include "errorlist.h"

#define ENGINEERR_EXTEND_CLUSTER_WRITE ENGINE_ERROR_START
#define ENGINEERR_MIXED_COMPRESSED_WRITE ENGINE_ERROR_START+1
#define ENGINEERR_FILE_TYPE_MISMATCH ENGINE_ERROR_START+2
#define ENGINEERR_MISSING_OPTIONAL_FILE ENGINE_ERROR_START+3
#define ENGINEERR_FILE_UPTODATE ENGINE_ERROR_START+4
2 changes: 2 additions & 0 deletions ecl/hql/hqlatoms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ IAtom * newSetAtom;
IAtom * _nlpParse_Atom;
IAtom * noBoundCheckAtom;
IAtom * noCaseAtom;
IAtom * noCompressAtom;
IAtom * noConstAtom;
IAtom * _noDuplicate_Atom;
IAtom * nofoldAtom;
Expand Down Expand Up @@ -782,6 +783,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM)
MAKESYSATOM(nlpParse);
MAKEATOM(noBoundCheck);
MAKEATOM(noCase);
MAKEATOM(noCompress);
MAKEATOM(noConst);
MAKESYSATOM(noDuplicate);
MAKEATOM(nofold);
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqlatoms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ extern HQL_API IAtom * newSetAtom;
extern HQL_API IAtom * _nlpParse_Atom;
extern HQL_API IAtom * noBoundCheckAtom;
extern HQL_API IAtom * noCaseAtom;
extern HQL_API IAtom * noCompressAtom;
extern HQL_API IAtom * noConstAtom;
extern HQL_API IAtom * _noDuplicate_Atom;
extern HQL_API IAtom * nofoldAtom;
Expand Down
25 changes: 17 additions & 8 deletions ecl/hql/hqlgram.y
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
TOK_TRUE
TYPE
TYPEOF
UNCOMPRESSED
UNICODEORDER
UNGROUP
UNLIKELY
Expand Down Expand Up @@ -3490,6 +3491,21 @@ outputFlags
{ $$.setExpr(createComma($1.getExpr(), $3.getExpr())); $$.setPosition($1); }
;

compressionOptions
: COMPRESSED {
$$.setExpr(createAttribute(compressedAtom));
$$.setPosition($1);
}
| UNCOMPRESSED
{
$$.setExpr(createAttribute(noCompressAtom));
$$.setPosition($1);
}
| __COMPRESSED__ {
$$.setExpr(createAttribute(__compressed__Atom));
$$.setPosition($1);
}

outputFlag
: EXTEND {
$$.setExpr(createAttribute(extendAtom));
Expand All @@ -3505,14 +3521,7 @@ outputFlag
$$.setExpr(createExprAttribute(csvAtom, args));
$$.setPosition($1);
}
| COMPRESSED {
$$.setExpr(createAttribute(compressedAtom));
$$.setPosition($1);
}
| __COMPRESSED__ {
$$.setExpr(createAttribute(__compressed__Atom));
$$.setPosition($1);
}
| compressionOptions
| __GROUPED__ {
$$.setExpr(createAttribute(groupedAtom));
$$.setPosition($1);
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqlgram2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11763,6 +11763,7 @@ static void getTokenText(StringBuffer & msg, int token)
case TOK_TRUE: msg.append("TRUE"); break;
case TYPE: msg.append("TYPE"); break;
case TYPEOF: msg.append("TYPEOF"); break;
case UNCOMPRESSED: msg.append("UNCOMPRESSED"); break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs to be added to reservedwords.cpp

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add

case UNGROUP: msg.append("UNGROUP"); break;
case UNICODEORDER: msg.append("UNICODEORDER"); break;
case UNLIKELY: msg.append("UNLIKELY"); break;
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqllex.l
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ TRIM { RETURNSYM(TRIM); }
TRUNCATE { RETURNSYM(TRUNCATE); }
TYPE { RETURNSYM(TYPE); }
TYPEOF { RETURNSYM(TYPEOF); }
UNCOMPRESSED { RETURNSYM(UNCOMPRESSED); }
UNICODEORDER { RETURNSYM(UNICODEORDER); }
UNGROUP { RETURNSYM(UNGROUP); }
UNLIKELY { RETURNSYM(UNLIKELY); }
Expand Down
1 change: 1 addition & 0 deletions ecl/hql/reservedwords.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ static const char * eclReserved14[] = { //Attribute functions (some might actual
"timelimit",
"timeout",
"token",
"uncompressed",
"unstable",
"update",
"use",
Expand Down
1 change: 1 addition & 0 deletions ecl/hqlcpp/hqlhtcpp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11068,6 +11068,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutput(BuildCtx & ctx, IHqlExp
if (expr->hasAttribute(groupedAtom)) flags.append("|TDXgrouped");
if (expr->hasAttribute(compressedAtom)) flags.append("|TDWnewcompress");
if (expr->hasAttribute(__compressed__Atom)) flags.append("|TDXcompress");
if (expr->hasAttribute(noCompressAtom)) flags.append("|TDWnocompress");
if (expr->hasAttribute(extendAtom)) flags.append("|TDWextend");
if (expr->hasAttribute(overwriteAtom)) flags.append("|TDWoverwrite");
if (expr->hasAttribute(noOverwriteAtom)) flags.append("|TDWnooverwrite");
Expand Down
33 changes: 27 additions & 6 deletions ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "thorread.hpp"

#include "ws_dfsclient.hpp"
#include "hthorerr.hpp"


#define EMPTY_LOOP_LIMIT 1000
Expand Down Expand Up @@ -361,6 +362,7 @@ ClusterWriteHandler *createClusterWriteHandler(IAgentContext &agent, IHThorIndex
getDefaultStoragePlane(defaultCluster);
Owned<CHThorClusterWriteHandler> clusterHandler;
unsigned clusterIdx = 0;

while(true)
{
OwnedRoxieString helperCluster(iwHelper ? iwHelper->getCluster(clusterIdx++) : dwHelper->getCluster(clusterIdx++));
Expand All @@ -372,10 +374,10 @@ ClusterWriteHandler *createClusterWriteHandler(IAgentContext &agent, IHThorIndex
}
if (!cluster)
break;
if(!clusterHandler)
if (!clusterHandler)
{
if(extend)
throw MakeStringException(0, "Cannot combine EXTEND and CLUSTER flags on disk write of file %s", lfn);
if (extend)
throw makeStringExceptionV(ENGINEERR_EXTEND_CLUSTER_WRITE, "Cannot combine EXTEND and CLUSTER flags on disk write of file %s", lfn);
clusterHandler.setown(new CHThorClusterWriteHandler(lfn, "OUTPUT", agent));
}
clusterHandler->addCluster(cluster);
Expand Down Expand Up @@ -540,6 +542,20 @@ void CHThorDiskWriteActivity::resolve()
}

clusterHandler.setown(createClusterWriteHandler(agent, NULL, &helper, dfsLogicalName.get(), filename, extend, false));
StringBuffer planeName;
if (clusterHandler)
{
StringArray clusterNames;
clusterHandler->getClusters(clusterNames);
planeName.set(clusterNames.item(0)); // NB: only bother with 1st, if multiple createClusterWriteHandler validates if same
}
else
getDefaultStoragePlane(planeName);
bool outputCompressionDefault = agent.queryWorkUnit()->getDebugValueBool("compressAllOutputs", isContainerized());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the workunit option useful? Not convinced, but may as well leave it in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it isn't useful in cloud, but it would the current way to change the default in bare-metal since you can't configure planes there.

outputPlaneCompressed = outputCompressionDefault;
Owned<IPropertyTree> plane = getStoragePlane(planeName);
if (plane)
outputPlaneCompressed = plane->getPropBool("@compressLogicalFiles", outputCompressionDefault);
}
}
else
Expand All @@ -560,7 +576,13 @@ void CHThorDiskWriteActivity::open()
Linked<IRecordSize> groupedMeta = input->queryOutputMeta()->querySerializedDiskMeta();
if (grouped)
groupedMeta.setown(createDeltaRecordSize(groupedMeta, +1));
blockcompressed = checkWriteIsCompressed(helper.getFlags(), serializedOutputMeta.getFixedSize(), grouped);//TDWnewcompress for new compression, else check for row compression
blockcompressed=false;
if (0 == (helper.getFlags() & TDWnocompress))
{
blockcompressed = checkWriteIsCompressed(helper.getFlags(), serializedOutputMeta.getFixedSize(), grouped);//TDWnewcompress for new compression, else check for row compression
if (!blockcompressed) // if ECL doesn't specify, default to plane definition
blockcompressed = outputPlaneCompressed;
}
void *ekey;
size32_t ekeylen;
helper.getEncryptKey(ekeylen,ekey);
Expand Down Expand Up @@ -8314,7 +8336,6 @@ void CHThorDiskReadBaseActivity::stop()
CHThorActivityBase::stop();
}

#define TE_FileTypeMismatch 10138 // NB: duplicated from thorlcr/shared/thexception.hpp, but be moved to common header
void CHThorDiskReadBaseActivity::checkFileType(IDistributedFile *file)
{
if (rt_csv == readType)
Expand Down Expand Up @@ -8346,7 +8367,7 @@ void CHThorDiskReadBaseActivity::checkFileType(IDistributedFile *file)
return;
if (!strieq(kind, expectedType))
{
Owned<IException> e = makeStringExceptionV(TE_FileTypeMismatch, "File format mismatch reading file: '%s'. Expected type '%s', but file is type '%s'", file->queryLogicalName(), expectedType, kind);
Owned<IException> e = makeStringExceptionV(ENGINEERR_FILE_TYPE_MISMATCH, "File format mismatch reading file: '%s'. Expected type '%s', but file is type '%s'", file->queryLogicalName(), expectedType, kind);
if (!warningOnly)
throw e.getClear();
StringBuffer tmp;
Expand Down
1 change: 1 addition & 0 deletions ecl/hthor/hthor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ protected:
bool grouped;
bool blockcompressed;
bool encrypted;
bool outputPlaneCompressed = false;
CachedOutputMetaData serializedOutputMeta;
offset_t uncompressedBytesWritten;
Owned<IExtRowWriter> outSeq;
Expand Down
16 changes: 16 additions & 0 deletions ecl/hthor/hthorerr.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*##############################################################################
HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */

#pragma once

#include "engineerr.hpp"
5 changes: 5 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,11 @@
"type": "integer",
"default": 0
},
"compressLogicalFiles" : {
"description": "Compress all logical file outputs on this plane.",
"type": "boolean",
"default": true
},
"eclwatchVisible": {
"type": "boolean"
},
Expand Down
29 changes: 27 additions & 2 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
#include "keybuild.hpp"
#include "thorstrand.hpp"
#include "rtldynfield.hpp"
#include "engineerr.hpp"

#define MAX_HTTP_HEADERSIZE 8000

Expand Down Expand Up @@ -11902,6 +11903,7 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
bool overwrite;
bool encrypted;
bool grouped;
bool outputPlaneCompressed = false;
IHThorDiskWriteArg &helper;
Owned<IRecordSize> diskmeta;
Owned<IRoxieWriteHandler> writer;
Expand Down Expand Up @@ -11959,13 +11961,22 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
assertex((helper.getFlags() & TDXtemporary) == 0);
StringArray clusters;
unsigned clusterIdx = 0;
bool outputCompressionDefault = isContainerized();
while(true)
{
OwnedRoxieString cluster(helper.getCluster(clusterIdx));
if(!cluster)
break;
clusters.append(cluster);
clusterIdx++;

if (1 == clusterIdx)
{
// establish default compression from 1st plane, but ECL compression attributes take precedence
Owned<IPropertyTree> plane = getStoragePlane(cluster);
if (plane)
outputPlaneCompressed = plane->getPropBool("@compressLogicalFiles", outputCompressionDefault);
}
}
if (clusters.length())
{
Expand All @@ -11976,7 +11987,12 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
{
StringBuffer defaultCluster;
if (getDefaultStoragePlane(defaultCluster))
{
clusters.append(defaultCluster);
Owned<IPropertyTree> plane = getStoragePlane(defaultCluster);
if (plane)
outputPlaneCompressed = plane->getPropBool("@compressLogicalFiles", outputCompressionDefault);
}
else if (roxieName.length())
clusters.append(roxieName.str());
else
Expand All @@ -11997,8 +12013,12 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
diskmeta.set(helper.queryDiskRecordSize()->querySerializedDiskMeta());
if (grouped)
diskmeta.setown(createDeltaRecordSize(diskmeta, +1));
size32_t fixedSize = diskmeta->getFixedSize();
blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && ((0 == fixedSize) || (fixedSize >= MIN_ROWCOMPRESS_RECSIZE)))); //always use new compression
blockcompressed = false;
if (0 == (helper.getFlags() & TDWnocompress))
{
size32_t fixedSize = diskmeta->getFixedSize();
blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && ((0 == fixedSize) || (fixedSize >= MIN_ROWCOMPRESS_RECSIZE)))); //always use new compression
}
encrypted = false; // set later
tallycrc = true;
uncompressedBytesWritten = 0;
Expand Down Expand Up @@ -12031,6 +12051,11 @@ class CRoxieServerDiskWriteActivity : public CRoxieServerInternalSinkActivity, i
encrypted = true;
blockcompressed = true;
}
else
{
if ((0 == (helper.getFlags() & TDWnocompress)) && !blockcompressed)
blockcompressed = outputPlaneCompressed;
}
if (blockcompressed)
io.setown(createCompressedFileWriter(writer->queryFile(), (diskmeta->isFixedSize() ? diskmeta->getFixedSize() : 0), extend, true, ecomp, COMPRESS_METHOD_LZ4));
else
Expand Down
1 change: 1 addition & 0 deletions rtl/include/eclhelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,7 @@ enum
TDWupdatecrc = 0x80000, // has format crc
TDWexpires = 0x100000,
TDWrestricted = 0x200000,
TDWnocompress = 0x400000,
};

//flags for thor index read
Expand Down
8 changes: 8 additions & 0 deletions system/include/errorlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@
#define FILEVIEW_ERROR_START 6700
#define FILEVIEW_ERROR_END 6749

//HThor
#define HTHOR_ERROR_START 6800
#define HTHOR_ERROR_END 7099

//Common to engines
#define ENGINE_ERROR_START 7100
#define ENGINE_ERROR_END 7299

#define REMOTE_ERROR_START 8000 // dafilesrv etc - see common/remote/remoteerr.hpp
#define REMOTE_ERROR_END 8099

Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfs.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfsi.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfsirecordof.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfsj.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
2 changes: 1 addition & 1 deletion testing/regress/ecl/dfsrecordof.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//class=file

#onwarning(10140, ignore);
#onwarning(7103, ignore);

import $.setup;
Files := setup.Files(false, false, false);
Expand Down
Loading