From 91f03829fba96eff8c9246a40872c8a13e5f93e6 Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Tue, 24 Oct 2023 18:12:22 +0000 Subject: [PATCH] HPCC-30320 Add file compression to parquet plugin --- plugins/parquet/README.md | 7 ++++++- plugins/parquet/parquet.ecllib | 4 ++-- plugins/parquet/parquetembed.cpp | 32 +++++++++++++++++++++++++++----- plugins/parquet/parquetembed.hpp | 3 ++- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/plugins/parquet/README.md b/plugins/parquet/README.md index 39e4358ab69..095499a212f 100644 --- a/plugins/parquet/README.md +++ b/plugins/parquet/README.md @@ -40,8 +40,13 @@ dataset := ParquetIO.Read(layout, '/source/directory/data.parquet'); The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. There is an optional argument that sets the overwrite behavior of the plugin. The default value is false meaning it will throw an error if the target file already exists. +The Parquet Plugin supports all available Arrow compression types. Specifying the compression when writing is optional and defaults to Uncompressed. The options for compressing your files are Snappy, GZip, Brotli, LZ4, LZ4Frame, LZ4Hadoop, ZSTD, Uncompressed. + ``` -ParquetIO.Write(inDataset, '/output/directory/data.parquet', overwriteOption); +overwriteOption := TRUE; +compressionOption := 'Snappy'; + +ParquetIO.Write(inDataset, '/output/directory/data.parquet', overwriteOption, compressionOption); ``` ### Partitioned Files (Tabular Datasets) diff --git a/plugins/parquet/parquet.ecllib b/plugins/parquet/parquet.ecllib index 5056b4f2ced..2338ec28245 100644 --- a/plugins/parquet/parquet.ecllib +++ b/plugins/parquet/parquet.ecllib @@ -33,8 +33,8 @@ EXPORT ParquetIO := MODULE RETURN _DoParquetReadPartition(); ENDMACRO; - EXPORT Write(outDS, filePath, overwriteOption = false) := FUNCTIONMACRO - LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath), overwriteOpt(overwriteOption)) + EXPORT Write(outDS, filePath, overwriteOption = false, compressionOption = '\'UNCOMPRESSED\'') := FUNCTIONMACRO + LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath), overwriteOpt(overwriteOption), compression(compressionOption)) ENDEMBED; RETURN _doParquetWrite(outDS); ENDMACRO; diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index ba2ba80dd9c..c605bb522f2 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -109,13 +109,14 @@ extern void fail(const char *message) * * @param _batchSize The size of the batches when converting parquet columns to rows. */ -ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination, - int _rowSize, int _batchSize, bool _overwrite, const IThorActivityContext *_activityCtx) +ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination, int _rowSize, int _batchSize, + bool _overwrite, arrow::Compression::type _compressionOption, const IThorActivityContext *_activityCtx) : partOption(option), location(_location), destination(destination) { rowSize = _rowSize; batchSize = _batchSize; overwrite = _overwrite; + compressionOption = _compressionOption; activityCtx = _activityCtx; pool = arrow::default_memory_pool(); @@ -200,8 +201,7 @@ arrow::Status ParquetHelper::openWriteFile() PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination)); // Choose compression - // TO DO let the user choose a compression - std::shared_ptr props = parquet::WriterProperties::Builder().compression(arrow::Compression::UNCOMPRESSED)->build(); + std::shared_ptr props = parquet::WriterProperties::Builder().compression(compressionOption)->build(); // Opt to store Arrow schema for easier reads back into Arrow std::shared_ptr arrow_props = parquet::ArrowWriterProperties::Builder().store_schema()->build(); @@ -1734,6 +1734,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ __int64 rowsize = 40000; // Size of the row groups when writing to parquet files __int64 batchSize = 40000; // Size of the batches when converting parquet columns to rows bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists. + arrow::Compression::type compressionOption = arrow::Compression::UNCOMPRESSED; // Iterate through user options and save them StringArray inputOptions; @@ -1758,6 +1759,27 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ batchSize = atoi(val); else if (stricmp(optName, "overwriteOpt") == 0) overwrite = clipStrToBool(val); + else if (stricmp(optName, "compression") == 0) + { + if (strieq(val, "snappy")) + compressionOption = arrow::Compression::SNAPPY; + else if (strieq(val, "gzip")) + compressionOption = arrow::Compression::GZIP; + else if (strieq(val, "brotli")) + compressionOption = arrow::Compression::BROTLI; + else if (strieq(val, "lz4")) + compressionOption = arrow::Compression::LZ4; + else if (strieq(val, "lz4frame")) + compressionOption = arrow::Compression::LZ4_FRAME; + else if (strieq(val, "lz4hadoop")) + compressionOption = arrow::Compression::LZ4_HADOOP; + else if (strieq(val, "zstd")) + compressionOption = arrow::Compression::ZSTD; + else if (strieq(val, "uncompressed")) + compressionOption = arrow::Compression::UNCOMPRESSED; + else + failx("Unsupported compression type: %s", val); + } else failx("Unknown option %s", optName.str()); } @@ -1768,7 +1790,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ } else { - m_parquet = std::make_shared(option, location, destination, rowsize, batchSize, overwrite, activityCtx); + m_parquet = std::make_shared(option, location, destination, rowsize, batchSize, overwrite, compressionOption, activityCtx); } } diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index f2944efa7d3..9027c2e1891 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -785,7 +785,7 @@ class JsonValueConverter class ParquetHelper { public: - ParquetHelper(const char *option, const char *_location, const char *destination, int rowsize, int _batchSize, bool _overwrite, const IThorActivityContext *_activityCtx); + ParquetHelper(const char *option, const char *_location, const char *destination, int rowsize, int _batchSize, bool _overwrite, arrow::Compression::type _compressionOption, const IThorActivityContext *_activityCtx); ~ParquetHelper(); std::shared_ptr getSchema(); arrow::Status openWriteFile(); @@ -835,6 +835,7 @@ class ParquetHelper std::vector rowStack; // Stack for keeping track of the context when building a nested row. std::shared_ptr scanner = nullptr; // Scanner for reading through partitioned files. PARTITION arrow::dataset::FileSystemDatasetWriteOptions writeOptions; // Write options for writing partitioned files. PARTITION + arrow::Compression::type compressionOption = arrow::Compression::type::UNCOMPRESSED; std::shared_ptr rbatchReader = nullptr; arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr; std::vector<__int64> fileTableCounts;