Skip to content

Commit

Permalink
Use lz4 compression for shuffle files and streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Nov 27, 2023
1 parent b8bd8fc commit 92e9a3c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
9 changes: 8 additions & 1 deletion ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::serde::scheduler::PartitionStats;

use async_trait::async_trait;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::ipc::CompressionType;
use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
use datafusion::error::DataFusionError;
Expand Down Expand Up @@ -82,7 +84,12 @@ pub async fn write_stream_to_disk(
let mut num_rows = 0;
let mut num_batches = 0;
let mut num_bytes = 0;
let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?;

let options = IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;

let mut writer =
FileWriter::try_new_with_options(file, stream.schema().as_ref(), options)?;

while let Some(result) = stream.next().await {
let batch = result?;
Expand Down
5 changes: 4 additions & 1 deletion ballista/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::convert::TryFrom;
use std::fs::File;
use std::pin::Pin;

use arrow::ipc::CompressionType;
use arrow_flight::SchemaAsIpc;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
Expand Down Expand Up @@ -231,7 +232,9 @@ async fn stream_flight_data<T>(
where
T: Read + Seek,
{
let options = arrow::ipc::writer::IpcWriteOptions::default();
let options = arrow::ipc::writer::IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::LZ4_FRAME))
.map_err(|x| from_arrow_err(&x))?;
let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into();
send_response(&tx, Ok(schema_flight_data)).await?;

Expand Down

0 comments on commit 92e9a3c

Please sign in to comment.