From 1c3bdc3b72f473d1979d29a54d3a7a488715f407 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 3 Jul 2024 17:21:34 +0800 Subject: [PATCH] feat(batch): bump opendal for batch spill out (#17550) --- Cargo.lock | 86 +++------------------------------ src/batch/Cargo.toml | 2 +- src/batch/src/spill/spill_op.rs | 7 +-- 3 files changed, 12 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 30e187572dcc..9197e73c2708 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6609,7 +6609,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal 0.47.0", + "opendal", "ordered-float 4.1.1", "parquet 52.0.0", "reqwest 0.12.4", @@ -6675,7 +6675,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal 0.47.0", + "opendal", "ordered-float 3.9.1", "parquet 52.0.0", "prometheus", @@ -8438,36 +8438,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "opendal" -version = "0.45.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c17c077f23fa2d2c25d9d22af98baa43b8bbe2ef0de80cf66339aa70401467" -dependencies = [ - "anyhow", - "async-trait", - "backon", - "base64 0.21.7", - "bytes", - "chrono", - "flagset", - "futures", - "getrandom", - "http 0.2.9", - "log", - "md-5", - "once_cell", - "percent-encoding", - "quick-xml 0.31.0", - "reqsign 0.14.9", - "reqwest 0.11.20", - "serde", - "serde_json", - "sha2", - "tokio", - "uuid", -] - [[package]] name = "opendal" version = "0.47.0" @@ -8491,7 +8461,7 @@ dependencies = [ "percent-encoding", "prometheus", "quick-xml 0.31.0", - "reqsign 0.15.2", + "reqsign", "reqwest 0.12.4", "serde", "serde_json", @@ -10363,37 +10333,6 @@ dependencies = [ "bytecheck", ] -[[package]] -name = "reqsign" -version = "0.14.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" -dependencies = [ - "anyhow", - "async-trait", - "base64 0.21.7", - "chrono", - "form_urlencoded", - "getrandom", - "hex", - "hmac", - "home", - "http 0.2.9", - "jsonwebtoken", - "log", - "once_cell", - "percent-encoding", - "quick-xml 0.31.0", - "rand", - "reqwest 0.11.20", - "rsa", - "rust-ini 0.20.0", - "serde", - "serde_json", - "sha1", - "sha2", -] - [[package]] name = "reqsign" version = "0.15.2" @@ -10418,7 +10357,7 @@ dependencies = [ "rand", "reqwest 0.12.4", "rsa", - "rust-ini 0.21.0", + "rust-ini", "serde", "serde_json", "sha1", @@ -10451,7 +10390,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.21.11", - "rustls-native-certs 0.6.3", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -10715,7 +10653,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "memcomparable", - "opendal 0.45.1", + "opendal", "parking_lot 0.12.1", "paste", "prometheus", @@ -11220,7 +11158,7 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal 0.47.0", + "opendal", "openssl", "parking_lot 0.12.1", "parquet 50.0.0", @@ -11968,7 +11906,7 @@ dependencies = [ "madsim", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal 0.47.0", + "opendal", "prometheus", "reqwest 0.12.4", "risingwave_common", @@ -12543,16 +12481,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "rust-ini" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a" -dependencies = [ - "cfg-if", - "ordered-multimap", -] - [[package]] name = "rust-ini" version = "0.21.0" diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 2821dd851c7a..cf94b2ec838a 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -32,7 +32,7 @@ hytra = "0.1.2" iceberg = { workspace = true } itertools = { workspace = true } memcomparable = "0.2" -opendal = "0.45.1" +opendal = "0.47" parking_lot = { workspace = true } paste = "1" prometheus = { version = "0.13", features = ["process"] } diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs index 4086db46aa53..237ee3baf009 100644 --- a/src/batch/src/spill/spill_op.rs +++ b/src/batch/src/spill/spill_op.rs @@ -100,8 +100,8 @@ impl SpillOp { Ok(self .op .writer_with(name) - .buffer(DEFAULT_IO_BUFFER_SIZE) .concurrent(DEFAULT_IO_CONCURRENT_TASK) + .chunk(DEFAULT_IO_BUFFER_SIZE) .await?) } @@ -109,7 +109,7 @@ impl SpillOp { Ok(self .op .reader_with(name) - .buffer(DEFAULT_IO_BUFFER_SIZE) + .chunk(DEFAULT_IO_BUFFER_SIZE) .await?) } @@ -123,7 +123,8 @@ impl SpillOp { /// [proto_bytes] /// ``` #[try_stream(boxed, ok = DataChunk, error = BatchError)] - pub async fn read_stream(mut reader: opendal::Reader, spill_metrics: Arc) { + pub async fn read_stream(reader: opendal::Reader, spill_metrics: Arc) { + let mut reader = reader.into_futures_async_read(..).await?; let mut buf = [0u8; 4]; loop { if let Err(err) = reader.read_exact(&mut buf).await {