diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 3252ad72f..e16c1b4c2 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -25,6 +25,8 @@ use crate::serde::scheduler::PartitionStats; use async_trait::async_trait; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::ipc::writer::IpcWriteOptions; +use datafusion::arrow::ipc::CompressionType; use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch}; use datafusion::datasource::physical_plan::{CsvExec, ParquetExec}; use datafusion::error::DataFusionError; @@ -82,7 +84,12 @@ pub async fn write_stream_to_disk( let mut num_rows = 0; let mut num_batches = 0; let mut num_bytes = 0; - let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?; + + let options = IpcWriteOptions::default() + .try_with_compression(Some(CompressionType::LZ4_FRAME))?; + + let mut writer = + FileWriter::try_new_with_options(file, stream.schema().as_ref(), options)?; while let Some(result) = stream.next().await { let batch = result?; diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs index 4a62bc5fd..7237ea0fc 100644 --- a/ballista/executor/src/flight_service.rs +++ b/ballista/executor/src/flight_service.rs @@ -21,6 +21,7 @@ use std::convert::TryFrom; use std::fs::File; use std::pin::Pin; +use arrow::ipc::CompressionType; use arrow_flight::SchemaAsIpc; use ballista_core::error::BallistaError; use ballista_core::serde::decode_protobuf; @@ -231,7 +232,9 @@ async fn stream_flight_data( where T: Read + Seek, { - let options = arrow::ipc::writer::IpcWriteOptions::default(); + let options = arrow::ipc::writer::IpcWriteOptions::default() + .try_with_compression(Some(CompressionType::LZ4_FRAME)) + .map_err(|x| from_arrow_err(&x))?; let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into(); send_response(&tx, Ok(schema_flight_data)).await?;