From da0c8402619134d455242bd5c16e16a3b3955994 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 10 Jul 2024 15:52:30 +0800 Subject: [PATCH] feat: customize copy to parquet parameter (#4328) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat/copy-to-parquet-parameter: Commit Message: Enhance Parquet Writer with Column-wise Configuration Summary: • Introduced column_wise_config function to customize per-column properties in Parquet writer. * feat/copy-to-parquet-parameter: Commit Message: Enhance Parquet File Format Handling for Specific Data Types Summary: • Added ConcreteDataType import to support specific data type handling. * feat/copy-to-parquet-parameter: Commit Message: Refactor Parquet file format configuration * feat/copy-to-parquet-parameter: Enhance Parquet file format handling for timestamp columns - Added logic to disable dictionary encoding and set DELTA_BINARY_PACKED encoding for timestamp columns in the Parquet file format configuration. * feat/copy-to-parquet-parameter: Disable dictionary encoding for timestamp columns in Parquet writer and update default max_active_window_runs in TwcsOptions - Modified Parquet writer to disable dictionary encoding for timestamp columns to optimize for increasing timestamp data. * feat/copy-to-parquet-parameter: Update compaction settings in tests - Modified `test_compaction_region` to include new compaction options: `compaction.type`, `compaction.twcs.max_active_window_runs`, and `compaction.twcs.max_inactive_window_runs`. - Updated `test_merge_mode_compaction` to use `compaction.twcs.max_active_window_runs` and `compaction.twcs.max_inactive_window_runs` instead of `max_active_window_files` and `max_inactive_window_files`. --- .../datasource/src/file_format/parquet.rs | 36 +++++++++++++++---- src/mito2/src/engine/compaction_test.rs | 6 +++- src/mito2/src/engine/merge_mode_test.rs | 4 +-- src/mito2/src/region/options.rs | 2 +- src/operator/src/statement/copy_table_to.rs | 20 ++++++----- 5 files changed, 49 insertions(+), 19 deletions(-) diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 9988a311f51c..88f21ce9dbf8 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -27,12 +27,14 @@ use datafusion::parquet::file::metadata::ParquetMetaData; use datafusion::parquet::format::FileMetaData; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::SendableRecordBatchStream; +use datatypes::schema::SchemaRef; use futures::future::BoxFuture; use futures::StreamExt; use object_store::{FuturesAsyncReader, ObjectStore}; use parquet::arrow::AsyncArrowWriter; -use parquet::basic::{Compression, ZstdLevel}; -use parquet::file::properties::WriterProperties; +use parquet::basic::{Compression, Encoding, ZstdLevel}; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; +use parquet::schema::types::ColumnPath; use snafu::ResultExt; use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; @@ -184,14 +186,16 @@ impl ArrowWriterCloser for ArrowWriter { /// Returns number of rows written. pub async fn stream_to_parquet( mut stream: SendableRecordBatchStream, + schema: datatypes::schema::SchemaRef, store: ObjectStore, path: &str, concurrency: usize, ) -> Result { - let write_props = WriterProperties::builder() - .set_compression(Compression::ZSTD(ZstdLevel::default())) - .build(); - let schema = stream.schema(); + let write_props = column_wise_config( + WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())), + schema, + ) + .build(); let inner_writer = store .writer_with(path) .concurrent(concurrency) @@ -200,7 +204,7 @@ pub async fn stream_to_parquet( .map(|w| w.into_futures_async_write().compat_write()) .context(WriteObjectSnafu { path })?; - let mut writer = AsyncArrowWriter::try_new(inner_writer, schema, Some(write_props)) + let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props)) .context(WriteParquetSnafu { path })?; let mut rows_written = 0; @@ -216,6 +220,24 @@ pub async fn stream_to_parquet( Ok(rows_written) } +/// Customizes per-column properties. +fn column_wise_config( + mut props: WriterPropertiesBuilder, + schema: SchemaRef, +) -> WriterPropertiesBuilder { + // Disable dictionary for timestamp column, since for increasing timestamp column, + // the dictionary pages will be larger than data pages. + for col in schema.column_schemas() { + if col.data_type.is_timestamp() { + let path = ColumnPath::new(vec![col.name.clone()]); + props = props + .set_column_dictionary_enabled(path.clone(), false) + .set_column_encoding(path, Encoding::DELTA_BINARY_PACKED) + } + } + props +} + #[cfg(test)] mod tests { use common_test_util::find_workspace_path; diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index f71b665a252f..9de4a0ddf572 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -112,7 +112,11 @@ async fn test_compaction_region() { let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_runs", "1") + .insert_option("compaction.twcs.max_inactive_window_runs", "1") + .build(); let column_schemas = request .column_metadatas diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs index 1adf51d12f41..0f0be6b8f12b 100644 --- a/src/mito2/src/engine/merge_mode_test.rs +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -101,8 +101,8 @@ async fn test_merge_mode_compaction() { let request = CreateRequestBuilder::new() .field_num(2) .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.max_active_window_files", "2") - .insert_option("compaction.twcs.max_inactive_window_files", "2") + .insert_option("compaction.twcs.max_active_window_runs", "1") + .insert_option("compaction.twcs.max_inactive_window_runs", "1") .insert_option("merge_mode", "last_non_null") .build(); let region_dir = request.region_dir.clone(); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 9e740cff86b0..7a28cee977d6 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -216,7 +216,7 @@ impl TwcsOptions { impl Default for TwcsOptions { fn default() -> Self { Self { - max_active_window_runs: 1, + max_active_window_runs: 4, max_inactive_window_runs: 1, time_window: None, remote_compaction: false, diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index 0fb6f1137e9c..8a90d1095569 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -75,14 +75,18 @@ impl StatementExecutor { ) .await .context(error::WriteStreamToFileSnafu { path }), - Format::Parquet(_) => stream_to_parquet( - Box::pin(DfRecordBatchStreamAdapter::new(stream)), - object_store, - path, - WRITE_CONCURRENCY, - ) - .await - .context(error::WriteStreamToFileSnafu { path }), + Format::Parquet(_) => { + let schema = stream.schema(); + stream_to_parquet( + Box::pin(DfRecordBatchStreamAdapter::new(stream)), + schema, + object_store, + path, + WRITE_CONCURRENCY, + ) + .await + .context(error::WriteStreamToFileSnafu { path }) + } _ => error::UnsupportedFormatSnafu { format: *format }.fail(), } }