Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30320 Add file compression to parquet plugin #17951

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion plugins/parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ dataset := ParquetIO.Read(layout, '/source/directory/data.parquet');

The Write function empowers ECL programmers to write ECL datasets to Parquet files. By leveraging the Parquet format's columnar storage capabilities, this function provides efficient compression and optimized storage for data. There is an optional argument that sets the overwrite behavior of the plugin. The default value is false meaning it will throw an error if the target file already exists.

The Parquet Plugin supports all available Arrow compression types. Specifying the compression when writing is optional and defaults to Uncompressed. The options for compressing your files are Snappy, GZip, Brotli, LZ4, LZ4Frame, LZ4Hadoop, ZSTD, Uncompressed.

```
ParquetIO.Write(inDataset, '/output/directory/data.parquet', overwriteOption);
overwriteOption := TRUE;
compressionOption := 'Snappy';

ParquetIO.Write(inDataset, '/output/directory/data.parquet', overwriteOption, compressionOption);
```

### Partitioned Files (Tabular Datasets)
Expand Down
4 changes: 2 additions & 2 deletions plugins/parquet/parquet.ecllib
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ EXPORT ParquetIO := MODULE
RETURN _DoParquetReadPartition();
ENDMACRO;

EXPORT Write(outDS, filePath, overwriteOption = false) := FUNCTIONMACRO
LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath), overwriteOpt(overwriteOption))
EXPORT Write(outDS, filePath, overwriteOption = false, compressionOption = '\'UNCOMPRESSED\'') := FUNCTIONMACRO
LOCAL _DoParquetWrite(STREAMED DATASET(RECORDOF(outDS)) _ds) := EMBED(parquet : activity, option('write'), destination(filePath), overwriteOpt(overwriteOption), compression(compressionOption))
ENDEMBED;
RETURN _doParquetWrite(outDS);
ENDMACRO;
Expand Down
32 changes: 27 additions & 5 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ extern void fail(const char *message)
*
* @param _batchSize The size of the batches when converting parquet columns to rows.
*/
ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination,
int _rowSize, int _batchSize, bool _overwrite, const IThorActivityContext *_activityCtx)
ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination, int _rowSize, int _batchSize,
bool _overwrite, arrow::Compression::type _compressionOption, const IThorActivityContext *_activityCtx)
: partOption(option), location(_location), destination(destination)
{
rowSize = _rowSize;
batchSize = _batchSize;
overwrite = _overwrite;
compressionOption = _compressionOption;
activityCtx = _activityCtx;

pool = arrow::default_memory_pool();
Expand Down Expand Up @@ -200,8 +201,7 @@ arrow::Status ParquetHelper::openWriteFile()
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination));

// Choose compression
// TO DO let the user choose a compression
std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(arrow::Compression::UNCOMPRESSED)->build();
std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(compressionOption)->build();

// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<parquet::ArrowWriterProperties> arrow_props = parquet::ArrowWriterProperties::Builder().store_schema()->build();
Expand Down Expand Up @@ -1734,6 +1734,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
__int64 rowsize = 40000; // Size of the row groups when writing to parquet files
__int64 batchSize = 40000; // Size of the batches when converting parquet columns to rows
bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists.
arrow::Compression::type compressionOption = arrow::Compression::UNCOMPRESSED;

// Iterate through user options and save them
StringArray inputOptions;
Expand All @@ -1758,6 +1759,27 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
batchSize = atoi(val);
else if (stricmp(optName, "overwriteOpt") == 0)
overwrite = clipStrToBool(val);
else if (stricmp(optName, "compression") == 0)
{
if (strieq(val, "snappy"))
compressionOption = arrow::Compression::SNAPPY;
else if (strieq(val, "gzip"))
compressionOption = arrow::Compression::GZIP;
else if (strieq(val, "brotli"))
compressionOption = arrow::Compression::BROTLI;
else if (strieq(val, "lz4"))
compressionOption = arrow::Compression::LZ4;
else if (strieq(val, "lz4frame"))
compressionOption = arrow::Compression::LZ4_FRAME;
else if (strieq(val, "lz4hadoop"))
compressionOption = arrow::Compression::LZ4_HADOOP;
else if (strieq(val, "zstd"))
compressionOption = arrow::Compression::ZSTD;
else if (strieq(val, "uncompressed"))
compressionOption = arrow::Compression::UNCOMPRESSED;
else
failx("Unsupported compression type: %s", val);
}
else
failx("Unknown option %s", optName.str());
}
Expand All @@ -1768,7 +1790,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
}
else
{
m_parquet = std::make_shared<ParquetHelper>(option, location, destination, rowsize, batchSize, overwrite, activityCtx);
m_parquet = std::make_shared<ParquetHelper>(option, location, destination, rowsize, batchSize, overwrite, compressionOption, activityCtx);
}
}

Expand Down
3 changes: 2 additions & 1 deletion plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ class JsonValueConverter
class ParquetHelper
{
public:
ParquetHelper(const char *option, const char *_location, const char *destination, int rowsize, int _batchSize, bool _overwrite, const IThorActivityContext *_activityCtx);
ParquetHelper(const char *option, const char *_location, const char *destination, int rowsize, int _batchSize, bool _overwrite, arrow::Compression::type _compressionOption, const IThorActivityContext *_activityCtx);
~ParquetHelper();
std::shared_ptr<arrow::Schema> getSchema();
arrow::Status openWriteFile();
Expand Down Expand Up @@ -835,6 +835,7 @@ class ParquetHelper
std::vector<rapidjson::Value> rowStack; // Stack for keeping track of the context when building a nested row.
std::shared_ptr<arrow::dataset::Scanner> scanner = nullptr; // Scanner for reading through partitioned files. PARTITION
arrow::dataset::FileSystemDatasetWriteOptions writeOptions; // Write options for writing partitioned files. PARTITION
arrow::Compression::type compressionOption = arrow::Compression::type::UNCOMPRESSED;
std::shared_ptr<arrow::RecordBatchReader> rbatchReader = nullptr;
arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr;
std::vector<__int64> fileTableCounts;
Expand Down
Loading