Skip to content

Commit

Permalink
feat(batch): bump opendal for batch spill out (#17550)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jul 3, 2024
1 parent 1ac9583 commit 1c3bdc3
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 83 deletions.
86 changes: 7 additions & 79 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ hytra = "0.1.2"
iceberg = { workspace = true }
itertools = { workspace = true }
memcomparable = "0.2"
opendal = "0.45.1"
opendal = "0.47"
parking_lot = { workspace = true }
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
Expand Down
7 changes: 4 additions & 3 deletions src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ impl SpillOp {
Ok(self
.op
.writer_with(name)
.buffer(DEFAULT_IO_BUFFER_SIZE)
.concurrent(DEFAULT_IO_CONCURRENT_TASK)
.chunk(DEFAULT_IO_BUFFER_SIZE)
.await?)
}

pub async fn reader_with(&self, name: &str) -> Result<opendal::Reader> {
Ok(self
.op
.reader_with(name)
.buffer(DEFAULT_IO_BUFFER_SIZE)
.chunk(DEFAULT_IO_BUFFER_SIZE)
.await?)
}

Expand All @@ -123,7 +123,8 @@ impl SpillOp {
/// [proto_bytes]
/// ```
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
pub async fn read_stream(mut reader: opendal::Reader, spill_metrics: Arc<BatchSpillMetrics>) {
pub async fn read_stream(reader: opendal::Reader, spill_metrics: Arc<BatchSpillMetrics>) {
let mut reader = reader.into_futures_async_read(..).await?;
let mut buf = [0u8; 4];
loop {
if let Err(err) = reader.read_exact(&mut buf).await {
Expand Down

0 comments on commit 1c3bdc3

Please sign in to comment.