From fb4d239a6ff7db849833bf750f9aa33edf1b1315 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 Nov 2024 07:22:08 -0700 Subject: [PATCH] save progress --- src/planner.rs | 2 +- src/shuffle/reader.rs | 13 ++++++++++--- src/shuffle/writer.rs | 40 ++++++++++++++++++++++++++++------------ 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/src/planner.rs b/src/planner.rs index 7d9fdf0..fca783b 100644 --- a/src/planner.rs +++ b/src/planner.rs @@ -229,7 +229,7 @@ fn create_shuffle_exchange( fn create_temp_dir(stage_id: usize) -> Result { let uuid = Uuid::new_v4(); - let temp_dir = format!("/tmp/ray-sql-{uuid}-stage-{stage_id}"); + let temp_dir = format!("/tmp/ray-sql-shuffle/{uuid}/{stage_id}"); debug!("Creating temp shuffle dir: {temp_dir}"); std::fs::create_dir(&temp_dir)?; Ok(temp_dir) diff --git a/src/shuffle/reader.rs b/src/shuffle/reader.rs index 60eac26..36c37c1 100644 --- a/src/shuffle/reader.rs +++ b/src/shuffle/reader.rs @@ -39,6 +39,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Instant; use tokio::runtime::Handle; use tokio::task::block_in_place; @@ -127,20 +128,26 @@ impl ExecutionPlan for ShuffleReaderExec { partition, self.stage_id, file ); - println!("Downloading {} from object storage", file); let object_path = ObjectStorePath::from(file.as_str()); - let mut local_file = File::create(&file)?; let object_store = create_object_store()?; let result: Result> = block_in_place(move || { Handle::current().block_on(async move { match object_store.get(&object_path).await { Ok(get_result) => { + println!("Downloading {} from object storage", file); + let start = Instant::now(); + let mut local_file = File::create(&file).map_err(|e| { + println!("ShuffleReaderExec failed to create file: {}", e); + e + })?; let mut stream = get_result.into_stream(); while let Some(chunk) = stream.next().await { let bytes = chunk?; local_file.write_all(&bytes)?; } + let end = Instant::now(); + println!("File download took {:?}", end.duration_since(start)); println!("Deleting {} from object storage", object_path); object_store.delete(&object_path).await?; Ok(Some(LocalShuffleStream::new( @@ -181,7 +188,7 @@ impl ExecutionPlan for ShuffleReaderExec { "shuffle reader" } - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + fn properties(&self) -> &PlanProperties { &self.properties } } diff --git a/src/shuffle/writer.rs b/src/shuffle/writer.rs index 0f7ce6d..1ef8300 100644 --- a/src/shuffle/writer.rs +++ b/src/shuffle/writer.rs @@ -44,6 +44,7 @@ use std::fs::File; use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Instant; #[derive(Debug)] pub struct ShuffleWriterExec { @@ -337,23 +338,20 @@ impl IPCWriter { let object_store_path = ObjectStorePath::from_filesystem_path(&self.path)?; - // TODO with multipart, we need to upload in chunks of at least 5 MB - let multi_part_enabled = false; + const MULTIPART_CHUNK_SIZE: usize = 5 * 1024 * 1024; - // TODO make threshold configurable - if multi_part_enabled && self.num_bytes > 10 * 1024 * 1024 { + let start = Instant::now(); + if self.num_bytes > MULTIPART_CHUNK_SIZE { + // use multipart put for larger files println!( "Uploading shuffle file {} containing {} bytes (put_multipart)", &self.path.display(), self.num_bytes ); - - // use multipart put for larger files - const CHUNK_SIZE: usize = 16 * 1024; - let mut buffer = [0u8; CHUNK_SIZE]; + let mut buffer = Vec::with_capacity(MULTIPART_CHUNK_SIZE); let mut writer = self.object_store.put_multipart(&object_store_path).await?; loop { - let bytes_read = file.read(&mut buffer)?; + let bytes_read = read_full_buffer(&mut file, &mut buffer)?; if bytes_read == 0 { break; } @@ -369,13 +367,20 @@ impl IPCWriter { ); let mut buffer = Vec::with_capacity(self.num_bytes); - file.read_to_end(&mut buffer)?; + let bytes_read = file.read_to_end(&mut buffer)?; + let bytes = Bytes::copy_from_slice(&buffer[..bytes_read]); self.object_store - .put(&object_store_path, PutPayload::from(buffer)) + .put(&object_store_path, PutPayload::from(bytes)) .await?; } + let end = Instant::now(); + println!("File upload took {:?}", end.duration_since(start)); } - std::fs::remove_file(&self.path)?; + + std::fs::remove_file(&self.path).map_err(|e| { + println!("ShuffleWriterExec failed to delete file: {}", e); + e + })?; Ok(()) } @@ -384,3 +389,14 @@ impl IPCWriter { &self.path } } + +fn read_full_buffer(file: &mut File, buffer: &mut [u8]) -> Result { + let mut total_read = 0; + while total_read < buffer.len() { + match file.read(&mut buffer[total_read..])? { + 0 => break, + bytes_read => total_read += bytes_read, + } + } + Ok(total_read) +}