Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
HAWQ-1834. add options for native orc table creation
Browse files Browse the repository at this point in the history
  • Loading branch information
ztao1987 committed Mar 16, 2022
1 parent e0a3899 commit 054b407
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 124 deletions.
278 changes: 177 additions & 101 deletions src/backend/access/common/reloptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,175 @@ parseRelOptions(Datum options, int numkeywords, const char *const * keywords,
}
}

/*
* Parse reloptions for native orc format
*/
void
checkOrcOptions(Datum reloptions, bool validate, StdRdOptions *result)
{
/*
* 1. Needn't check option 'appendonly' and 'orientation' because we already
* check them in default_reloptions.
* 2. 'compresslevel' is a default option in reloptions, but we actually don't
* use it in native orc format.
* 3. Everytime we add an option into orc_keywords, we should also add one
* into default_keywords because there will perform a first check.
*/
const char *const orc_keywords[] = {
"appendonly",
"orientation",
"compresstype",
"compresslevel",
"dicthreshold",
"compressblocksize",
"rowindexstride",
"stripesize",
"bloomfilter",
"bucketnum",
};

bool appendonly = true;
char columnstore = RELSTORAGE_ORC;
char* compresstype = NULL;
int32 compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE;
int32 rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE;
int32 stripesize = DEFAULT_ORC_STRIPE_SIZE;
char* bloomfilter = NULL;
int32 bucket_num = 0;
int j = 0;

char *orcOptionValues[ARRAY_SIZE(orc_keywords)];
parseRelOptions(reloptions, ARRAY_SIZE(orc_keywords), orc_keywords, orcOptionValues, validate);

/* orc compresstype */
if (orcOptionValues[2] != NULL)
{
compresstype = orcOptionValues[2];

if ((strcmp(compresstype, "snappy") != 0) && (strcmp(compresstype, "lz4") != 0)
// XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
// and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
&& (strcmp(compresstype, "zlib") != 0)
&& (strcmp(compresstype, "zstd") != 0)
&& (strcmp(compresstype, "none") != 0))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("orc table doesn't support compress type: \'%s\'", compresstype),
errOmitLocation(true)));
}

if (compresstype) {
StringInfoData option;
initStringInfo(&option);
appendStringInfo(&option, "\"compresstype\":\"%s\"",
compresstype);
compresstype = pstrdup(option.data);
}
}

/* orc dicthreshold */
if (orcOptionValues[4] != NULL)
{
char *end;
double threshold = strtod(orcOptionValues[4], &end);
if (end == orcOptionValues[4] || *end != '\0' || threshold < 0 || threshold > 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("\'dicthreshold\' must be within [0-1]"),
errOmitLocation(true)));
StringInfoData option;
initStringInfo(&option);
if (compresstype != NULL)
appendStringInfo(&option, "%s,",compresstype);
appendStringInfo(&option, "\"dicthreshold\": \"%s\"",
orcOptionValues[1]);
compresstype = pstrdup(option.data);
}

/* orc compressblocksize */
if (orcOptionValues[5] != NULL)
{
compressblocksize = pg_atoi(orcOptionValues[5], sizeof(int32), 0);
if ((compressblocksize < MIN_ORC_COMPRESS_BLOCK_SIZE) || (compressblocksize > MAX_ORC_COMPRESS_BLOCK_SIZE))
{
if (validate)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("compressblock size for orc table should between 1B and 1GB and should be specified in Bytes. "
"Got %d Bytes", compressblocksize), errOmitLocation(true)));

compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE;
}
}

/* orc rowgroupsize */
if (orcOptionValues[6] != NULL)
{
rowindexstride = pg_atoi(orcOptionValues[6], sizeof(int32), 0);

if ((rowindexstride < MIN_ORC_ROW_GROUP_SIZE) || (rowindexstride > MAX_ORC_ROW_GROUP_SIZE))
{
if (validate)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("row group size for orc table should between 1000 and 1024*1024*1024. "
"Got %d", rowindexstride), errOmitLocation(true)));

rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE;
}
}

/* orc stripesize */
if (orcOptionValues[7] != NULL)
{
stripesize = pg_atoi(orcOptionValues[7], sizeof(int32), 0);

if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE))
{
if (validate)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. "
"Got %d MB", stripesize), errOmitLocation(true)));

stripesize = DEFAULT_ORC_STRIPE_SIZE;
}
}

/* orc bloomfilter */
if (orcOptionValues[8] != NULL)
{
StringInfoData option;
initStringInfo(&option);
appendStringInfo(&option, orcOptionValues[8]);
bloomfilter = pstrdup(option.data);
}

/* orc bucket_num */
if (orcOptionValues[9] != NULL)
{
bucket_num= pg_atoi(orcOptionValues[9], sizeof(int32), 0);
if(bucket_num <= 0)
{
if (validate)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("bucket number should be greater than 0. "
"Got %d", bucket_num), errOmitLocation(true)));

bucket_num = 0;
}
}
result->compressblocksize = compressblocksize;
result->stripesize = stripesize;
result->rowindexstride = rowindexstride;
if (compresstype != NULL)
for (j = 0;j < strlen(compresstype); j++)
compresstype[j] = pg_tolower(compresstype[j]);
result->compresstype = compresstype;
result->bloomfilter = bloomfilter;
}

/*
* Parse reloptions for anything using StdRdOptions
Expand All @@ -323,14 +492,15 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
"dicthreshold",
"bloomfilter",
"stripesize",
"rowindexstride",
"compressblocksize",
};

char *values[ARRAY_SIZE(default_keywords)];
int32 fillfactor = defaultFillfactor;
int32 blocksize = DEFAULT_APPENDONLY_BLOCK_SIZE;
int32 pagesize = DEFAULT_PARQUET_PAGE_SIZE;
int32 rowgroupsize = DEFAULT_PARQUET_ROWGROUP_SIZE;
int32 stripesize = DEFAULT_ORC_STRIPE_SIZE;
bool appendonly = false;
bool checksum = false;
char* compresstype = NULL;
Expand Down Expand Up @@ -542,28 +712,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
errmsg("non-parquet table doesn't support compress type: \'%s\'", compresstype),
errOmitLocation(true)));
}

if ((columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, "snappy") != 0)
&& (strcmp(compresstype, "lz4") != 0)
// XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
// and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
&& (strcmp(compresstype, "zlib") != 0)
&& (strcmp(compresstype, "zstd") != 0)
&& (strcmp(compresstype, "none") != 0))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("orc table doesn't support compress type: \'%s\'", compresstype),
errOmitLocation(true)));
}

if (!(columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, "lz4") == 0))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("non-orc table doesn't support compress type: \'%s\'", compresstype),
errOmitLocation(true)));
}
}

/* compression level */
Expand Down Expand Up @@ -644,14 +792,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
compresslevel = setDefaultCompressionLevel(compresstype);
}

if (columnstore == RELSTORAGE_ORC && compresstype) {
StringInfoData option;
initStringInfo(&option);
appendStringInfo(&option, "\"compresstype\":\"%s\"",
compresstype);
compresstype = pstrdup(option.data);
}

/* checksum */
if (values[7] != NULL)
{
Expand Down Expand Up @@ -811,75 +951,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
errOmitLocation(true)));
}

/* stripesize */
if (values[13] != NULL)
{
if(!(columnstore == RELSTORAGE_ORC)){
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid option \'stripesize\' for non-orc table"),
errOmitLocation(true)));
}

stripesize = pg_atoi(values[13], sizeof(int32), 0);

if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE))
{
if (validate)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. "
"Got %d MB", stripesize), errOmitLocation(true)));

stripesize = DEFAULT_ORC_STRIPE_SIZE;
}
}

// dicthreshold
if (values[11] != NULL) {
if(!(columnstore == RELSTORAGE_ORC)){
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid option \'dicthreshold\' for non-orc table"),
errOmitLocation(true)));
}
char *end;
double threshold = strtod(values[11], &end);
if (end == values[11] || *end != '\0' || threshold < 0 || threshold > 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("\'dicthreshold\' must be within [0-1]"),
errOmitLocation(true)));
StringInfoData option;
initStringInfo(&option);
if (compresstype != NULL)
appendStringInfo(&option, "%s,",compresstype);
appendStringInfo(&option, "\"dicthreshold\": \"%s\"",
values[11]);
compresstype = pstrdup(option.data);
}

// bloomfilter
if (values[12] != NULL) {
if(!(columnstore == RELSTORAGE_ORC)){
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid option \'bloomfilter\' for non-orc table"),
errOmitLocation(true)));
}
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("option \'bloomfilter\' for orc table not supported yet"),
errOmitLocation(true)));
StringInfoData option;
initStringInfo(&option);
if (compresstype != NULL)
appendStringInfo(&option, "%s",compresstype);
appendStringInfo(&option, ",\"bloomfilter\": \"%s\"",
values[12]);
compresstype = pstrdup(option.data);
}

result = (StdRdOptions *) palloc(sizeof(StdRdOptions));
SET_VARSIZE(result, sizeof(StdRdOptions));

Expand All @@ -888,7 +959,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
result->blocksize = blocksize;
result->pagesize = pagesize;
result->rowgroupsize = rowgroupsize;
result->stripesize = stripesize;
result->compresslevel = compresslevel;
if (compresstype != NULL)
for (j = 0;j < strlen(compresstype); j++)
Expand All @@ -900,6 +970,12 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
result->errorTable = errorTable;
result->bucket_num = bucket_num;

// extra parse and check for ORC format
if (columnstore == RELSTORAGE_ORC)
{
checkOrcOptions(reloptions, validate, result);
}

return (bytea *) result;
}

Expand Down
25 changes: 3 additions & 22 deletions src/backend/access/orc/orcam.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ static int32 GetSplitCount(List *fileSplits, Oid idxId) {
return ret;
}

>>>>>>> 7910d663d... step2
OrcInsertDescData *orcBeginInsert(Relation rel,
ResultRelSegFileInfo *segfileinfo) {
OrcInsertDescData *insertDesc =
Expand All @@ -299,17 +298,7 @@ OrcInsertDescData *orcBeginInsert(Relation rel,
AppendOnlyEntry *aoentry =
GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
StringInfoData option;
initStringInfo(&option);
appendStringInfoChar(&option, '{');
appendStringInfo(&option, "\"logicEof\": %" PRId64, segfileinfo->eof[0]);
appendStringInfo(&option, ", \"uncompressedEof\": %" PRId64,
segfileinfo->uncompressed_eof[0]);
appendStringInfo(
&option, ", \"stripeSize\": %" PRId64,
((StdRdOptions *)(rel->rd_options))->stripesize * 1024 * 1024);
if (aoentry->compresstype)
appendStringInfo(&option, ", %s", aoentry->compresstype);
appendStringInfoChar(&option, '}');
constructOrcFormatOptionString(&option, rel, segfileinfo, aoentry);

insertDesc->orcFormatData = palloc0(sizeof(OrcFormatData));
insertDesc->orcFormatData->fmt =
Expand Down Expand Up @@ -929,11 +918,7 @@ OrcDeleteDescData *orcBeginDelete(Relation rel, List *fileSplits,
AppendOnlyEntry *aoentry =
GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
StringInfoData option;
initStringInfo(&option);
appendStringInfoChar(&option, '{');
if (aoentry->compresstype)
appendStringInfo(&option, "%s", aoentry->compresstype);
appendStringInfoChar(&option, '}');
constructOrcFormatOptionString(&option, rel, NULL, aoentry);

int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
char *hdfsPath = (char *)palloc0(hdfsPathMaxLen);
Expand Down Expand Up @@ -1047,11 +1032,7 @@ OrcUpdateDescData *orcBeginUpdate(Relation rel, List *fileSplits,
AppendOnlyEntry *aoentry =
GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
StringInfoData option;
initStringInfo(&option);
appendStringInfoChar(&option, '{');
if (aoentry->compresstype)
appendStringInfo(&option, "%s", aoentry->compresstype);
appendStringInfoChar(&option, '}');
constructOrcFormatOptionString(&option, rel, NULL, aoentry);

int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
char *hdfsPath = (char *)palloc0(hdfsPathMaxLen);
Expand Down
Loading

0 comments on commit 054b407

Please sign in to comment.