Skip to content

Commit

Permalink
feat: Implement custom RecordBatch serde for shuffle for improved per…
Browse files Browse the repository at this point in the history
…formance (#1190)

* Implement faster encoder for shuffle blocks

* make code more concise

* enable fast encoding for columnar shuffle

* update benches

* test all int types

* test float

* remaining types

* add Snappy and Zstd(6) back to benchmark

* fix regression

* Update native/core/src/execution/shuffle/codec.rs

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* address feedback

* support nullable flag

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
andygrove and viirya authored Jan 13, 2025
1 parent e8261fb commit d7a7812
Show file tree
Hide file tree
Showing 15 changed files with 1,090 additions and 187 deletions.
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ object CometConf extends ShimCometConf {
.intConf
.createWithDefault(1)

val COMET_SHUFFLE_ENABLE_FAST_ENCODING: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enableFastEncoding")
.doc("Whether to enable Comet's faster proprietary encoding for shuffle blocks " +
"rather than using Arrow IPC.")
.internal()
.booleanConf
.createWithDefault(true)

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
.doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
Expand Down
1 change: 1 addition & 0 deletions native/core/benches/row_columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ fn benchmark(c: &mut Criterion) {
0,
None,
&CompressionCodec::Zstd(1),
true,
)
.unwrap();
});
Expand Down
130 changes: 67 additions & 63 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::Int32Builder;
use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::shuffle::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec};
use comet::execution::shuffle::{CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::physical_plan::metrics::Time;
use datafusion::{
Expand All @@ -31,67 +31,56 @@ use std::sync::Arc;
use tokio::runtime::Runtime;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_batch(8192, true);
let mut group = c.benchmark_group("shuffle_writer");
group.bench_function("shuffle_writer: encode (no compression))", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (snappy)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Snappy, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (lz4)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Lz4Frame, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time)
});
});
group.bench_function("shuffle_writer: end to end", |b| {
let ctx = SessionContext::new();
let exec = create_shuffle_writer_exec(CompressionCodec::Zstd(1));
b.iter(|| {
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
criterion::black_box(rt.block_on(collect(stream)).unwrap());
});
});
for compression_codec in &[
CompressionCodec::None,
CompressionCodec::Lz4Frame,
CompressionCodec::Snappy,
CompressionCodec::Zstd(1),
CompressionCodec::Zstd(6),
] {
for enable_fast_encoding in [true, false] {
let name = format!("shuffle_writer: write encoded (enable_fast_encoding={enable_fast_encoding}, compression={compression_codec:?})");
group.bench_function(name, |b| {
let mut buffer = vec![];
let ipc_time = Time::default();
let w = ShuffleBlockWriter::try_new(
&batch.schema(),
enable_fast_encoding,
compression_codec.clone(),
)
.unwrap();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
w.write_batch(&batch, &mut cursor, &ipc_time).unwrap();
});
});
}
}

for compression_codec in [
CompressionCodec::None,
CompressionCodec::Lz4Frame,
CompressionCodec::Snappy,
CompressionCodec::Zstd(1),
CompressionCodec::Zstd(6),
] {
group.bench_function(
format!("shuffle_writer: end to end (compression = {compression_codec:?}"),
|b| {
let ctx = SessionContext::new();
let exec = create_shuffle_writer_exec(compression_codec.clone());
b.iter(|| {
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(collect(stream)).unwrap();
});
},
);
}
}

fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWriterExec {
Expand All @@ -104,6 +93,7 @@ fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWri
compression_codec,
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
true,
)
.unwrap()
}
Expand All @@ -121,11 +111,19 @@ fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int32, true),
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Date32, true),
Field::new("c3", DataType::Decimal128(11, 2), true),
]));
let mut a = Int32Builder::new();
let mut b = StringBuilder::new();
let mut c = Date32Builder::new();
let mut d = Decimal128Builder::new()
.with_precision_and_scale(11, 2)
.unwrap();
for i in 0..num_rows {
a.append_value(i as i32);
c.append_value(i as i32);
d.append_value((i * 1000000) as i128);
if allow_nulls && i % 10 == 0 {
b.append_null();
} else {
Expand All @@ -134,7 +132,13 @@ fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
}
let a = a.finish();
let b = b.finish();
RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap()
let c = c.finish();
let d = d.finish();
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)],
)
.unwrap()
}

fn config() -> Criterion {
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
current_checksum: jlong,
compression_codec: jstring,
compression_level: jint,
enable_fast_encoding: jboolean,
) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
Expand Down Expand Up @@ -686,6 +687,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_algo,
current_checksum,
&compression_codec,
enable_fast_encoding != JNI_FALSE,
)?;

let checksum = if let Some(checksum) = checksum {
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ impl PhysicalPlanner {
codec,
writer.output_data_file.clone(),
writer.output_index_file.clone(),
writer.enable_fast_encoding,
)?);

Ok((
Expand Down
Loading

0 comments on commit d7a7812

Please sign in to comment.