Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Nov 18, 2024
1 parent b9410c4 commit fb4d239
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ fn create_shuffle_exchange(

fn create_temp_dir(stage_id: usize) -> Result<String> {
let uuid = Uuid::new_v4();
let temp_dir = format!("/tmp/ray-sql-{uuid}-stage-{stage_id}");
let temp_dir = format!("/tmp/ray-sql-shuffle/{uuid}/{stage_id}");
debug!("Creating temp shuffle dir: {temp_dir}");
std::fs::create_dir(&temp_dir)?;
Ok(temp_dir)
Expand Down
13 changes: 10 additions & 3 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use tokio::runtime::Handle;
use tokio::task::block_in_place;

Expand Down Expand Up @@ -127,20 +128,26 @@ impl ExecutionPlan for ShuffleReaderExec {
partition, self.stage_id, file
);

println!("Downloading {} from object storage", file);
let object_path = ObjectStorePath::from(file.as_str());
let mut local_file = File::create(&file)?;
let object_store = create_object_store()?;

let result: Result<Option<LocalShuffleStream>> = block_in_place(move || {
Handle::current().block_on(async move {
match object_store.get(&object_path).await {
Ok(get_result) => {
println!("Downloading {} from object storage", file);
let start = Instant::now();
let mut local_file = File::create(&file).map_err(|e| {
println!("ShuffleReaderExec failed to create file: {}", e);
e
})?;
let mut stream = get_result.into_stream();
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
local_file.write_all(&bytes)?;
}
let end = Instant::now();
println!("File download took {:?}", end.duration_since(start));
println!("Deleting {} from object storage", object_path);
object_store.delete(&object_path).await?;
Ok(Some(LocalShuffleStream::new(
Expand Down Expand Up @@ -181,7 +188,7 @@ impl ExecutionPlan for ShuffleReaderExec {
"shuffle reader"
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
fn properties(&self) -> &PlanProperties {
&self.properties
}
}
Expand Down
40 changes: 28 additions & 12 deletions src/shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use std::fs::File;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

#[derive(Debug)]
pub struct ShuffleWriterExec {
Expand Down Expand Up @@ -337,23 +338,20 @@ impl IPCWriter {

let object_store_path = ObjectStorePath::from_filesystem_path(&self.path)?;

// TODO with multipart, we need to upload in chunks of at least 5 MB
let multi_part_enabled = false;
const MULTIPART_CHUNK_SIZE: usize = 5 * 1024 * 1024;

// TODO make threshold configurable
if multi_part_enabled && self.num_bytes > 10 * 1024 * 1024 {
let start = Instant::now();
if self.num_bytes > MULTIPART_CHUNK_SIZE {
// use multipart put for larger files
println!(
"Uploading shuffle file {} containing {} bytes (put_multipart)",
&self.path.display(),
self.num_bytes
);

// use multipart put for larger files
const CHUNK_SIZE: usize = 16 * 1024;
let mut buffer = [0u8; CHUNK_SIZE];
let mut buffer = Vec::with_capacity(MULTIPART_CHUNK_SIZE);
let mut writer = self.object_store.put_multipart(&object_store_path).await?;
loop {
let bytes_read = file.read(&mut buffer)?;
let bytes_read = read_full_buffer(&mut file, &mut buffer)?;
if bytes_read == 0 {
break;
}
Expand All @@ -369,13 +367,20 @@ impl IPCWriter {
);

let mut buffer = Vec::with_capacity(self.num_bytes);
file.read_to_end(&mut buffer)?;
let bytes_read = file.read_to_end(&mut buffer)?;
let bytes = Bytes::copy_from_slice(&buffer[..bytes_read]);
self.object_store
.put(&object_store_path, PutPayload::from(buffer))
.put(&object_store_path, PutPayload::from(bytes))
.await?;
}
let end = Instant::now();
println!("File upload took {:?}", end.duration_since(start));
}
std::fs::remove_file(&self.path)?;

std::fs::remove_file(&self.path).map_err(|e| {
println!("ShuffleWriterExec failed to delete file: {}", e);
e
})?;
Ok(())
}

Expand All @@ -384,3 +389,14 @@ impl IPCWriter {
&self.path
}
}

fn read_full_buffer(file: &mut File, buffer: &mut [u8]) -> Result<usize> {
let mut total_read = 0;
while total_read < buffer.len() {
match file.read(&mut buffer[total_read..])? {
0 => break,
bytes_read => total_read += bytes_read,
}
}
Ok(total_read)
}

0 comments on commit fb4d239

Please sign in to comment.