diff --git a/Cargo.lock b/Cargo.lock index d77b4b43d69f6..85906dd0e4659 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1560,7 +1560,7 @@ dependencies = [ "aws-smithy-types", "bytes", "http 0.2.9", - "http 1.1.0", + "http 1.0.0", "pin-project-lite", "tokio", "tracing", @@ -1663,7 +1663,7 @@ dependencies = [ "axum-core 0.4.3", "bytes", "futures-util", - "http 1.1.0", + "http 1.0.0", "http-body 1.0.0", "http-body-util", "hyper 1.1.0", @@ -1713,7 +1713,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.1.0", + "http 1.0.0", "http-body 1.0.0", "http-body-util", "mime", @@ -1733,7 +1733,7 @@ checksum = "077959a7f8cf438676af90b483304528eb7e16eadadb7f44e9ada4f9dceb9e62" dependencies = [ "axum-core 0.4.3", "chrono", - "http 1.1.0", + "http 1.0.0", "mime_guess", "rust-embed", "tower-service", @@ -2971,9 +2971,9 @@ checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" [[package]] name = "crc32c" -version = "0.6.7" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0227b9f93e535d49bc7ce914c066243424ce85ed90864cebd0874b184e9b6947" +checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2" dependencies = [ "rustc_version 0.4.0", ] @@ -3890,7 +3890,7 @@ dependencies = [ "deno_core", "deno_tls", "dyn-clone", - "http 1.1.0", + "http 1.0.0", "pin-project", "reqwest 0.12.4", "serde", @@ -3914,7 +3914,7 @@ dependencies = [ "deno_net", "deno_websocket", "flate2", - "http 1.1.0", + "http 1.0.0", "httparse", "hyper 0.14.27", "hyper 1.1.0", @@ -4092,7 +4092,7 @@ dependencies = [ "deno_tls", "fastwebsockets", "h2 0.4.4", - "http 1.1.0", + "http 1.0.0", "http-body-util", "hyper 1.1.0", "hyper-util", @@ -5884,7 +5884,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 1.1.0", + "http 1.0.0", "indexmap 2.0.0", "slab", "tokio", @@ -6076,9 +6076,9 @@ dependencies = [ [[package]] name = "http" -version = "1.1.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" dependencies = [ "bytes", "fnv", @@ -6103,7 +6103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.1.0", + "http 1.0.0", ] [[package]] @@ -6114,7 +6114,7 @@ checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ "bytes", "futures-core", - "http 1.1.0", + "http 1.0.0", "http-body 1.0.0", "pin-project-lite", ] @@ -6177,7 +6177,7 @@ dependencies = [ "futures-channel", "futures-util", "h2 0.4.4", - "http 1.1.0", + "http 1.0.0", "http-body 1.0.0", "httparse", "httpdate", @@ -6211,7 +6211,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http 1.1.0", + "http 1.0.0", "hyper 1.1.0", "hyper-util", "rustls 0.22.4", @@ -6271,7 +6271,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.1.0", + "http 1.0.0", "http-body 1.0.0", "hyper 1.1.0", "pin-project-lite", @@ -6347,7 +6347,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal 0.45.1", + "opendal", "ordered-float 3.9.1", "parquet 50.0.0", "prometheus", @@ -7167,7 +7167,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http 1.1.0", + "http 1.0.0", "madsim", "spin 0.9.8", "tracing", @@ -8050,7 +8050,7 @@ dependencies = [ "percent-encoding", "prometheus", "quick-xml 0.31.0", - "reqsign 0.14.9", + "reqsign", "reqwest 0.11.20", "serde", "serde_json", @@ -8059,37 +8059,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "opendal" -version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c3ba698f2258bebdf7a3a38862bb6ef1f96d351627002686dacc228f805bdd6" -dependencies = [ - "anyhow", - "async-trait", - "backon", - "base64 0.22.0", - "bytes", - "chrono", - "crc32c", - "flagset", - "futures", - "getrandom", - "http 1.1.0", - "log", - "md-5", - "once_cell", - "percent-encoding", - "quick-xml 0.31.0", - "reqsign 0.15.2", - "reqwest 0.12.4", - "serde", - "serde_json", - "sha2", - "tokio", - "uuid", -] - [[package]] name = "openidconnect" version = "3.4.0" @@ -9914,12 +9883,11 @@ dependencies = [ [[package]] name = "reqsign" version = "0.14.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" +source = "git+https://github.com/wcy-fdu/reqsign.git?rev=dffdacb#dffdacb739a49a5d94039636793e0d01c97b1924" dependencies = [ "anyhow", "async-trait", - "base64 0.21.7", + "base64 0.22.0", "chrono", "form_urlencoded", "getrandom", @@ -9935,38 +9903,7 @@ dependencies = [ "rand", "reqwest 0.11.20", "rsa", - "rust-ini 0.20.0", - "serde", - "serde_json", - "sha1", - "sha2", -] - -[[package]] -name = "reqsign" -version = "0.15.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70fe66d4cd0b5ed9b1abbfe639bf6baeaaf509f7da2d51b31111ba945be59286" -dependencies = [ - "anyhow", - "async-trait", - "base64 0.22.0", - "chrono", - "form_urlencoded", - "getrandom", - "hex", - "hmac", - "home", - "http 1.1.0", - "jsonwebtoken", - "log", - "once_cell", - "percent-encoding", - "quick-xml 0.31.0", - "rand", - "reqwest 0.12.4", - "rsa", - "rust-ini 0.21.0", + "rust-ini", "serde", "serde_json", "sha1", @@ -10032,7 +9969,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.4.4", - "http 1.1.0", + "http 1.0.0", "http-body 1.0.0", "http-body-util", "hyper 1.1.0", @@ -10079,7 +10016,7 @@ checksum = "a45d100244a467870f6cb763c4484d010a6bed6bd610b3676e3825d93fb4cfbd" dependencies = [ "anyhow", "async-trait", - "http 1.1.0", + "http 1.0.0", "reqwest 0.12.4", "serde", "thiserror", @@ -10262,7 +10199,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "memcomparable", - "opendal 0.45.1", + "opendal", "parking_lot 0.12.1", "paste", "prometheus", @@ -10758,7 +10695,7 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal 0.45.1", + "opendal", "openssl", "parking_lot 0.12.1", "paste", @@ -11487,9 +11424,9 @@ dependencies = [ "madsim", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal 0.47.0", + "opendal", "prometheus", - "reqwest 0.12.4", + "reqwest 0.11.20", "risingwave_common", "rustls 0.23.5", "spin 0.9.8", @@ -12063,17 +12000,6 @@ dependencies = [ "ordered-multimap", ] -[[package]] -name = "rust-ini" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41" -dependencies = [ - "cfg-if", - "ordered-multimap", - "trim-in-place", -] - [[package]] name = "rust_decimal" version = "1.35.0" @@ -14967,7 +14893,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http 1.1.0", + "http 1.0.0", "http-body 1.0.0", "http-body-util", "http-range-header", @@ -15136,12 +15062,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "trim-in-place" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" - [[package]] name = "triomphe" version = "0.1.11" diff --git a/Cargo.toml b/Cargo.toml index 2a578df6563d3..81669968e77a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -319,6 +319,8 @@ tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" } # patch: unlimit 4MB message size for grpc client etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" } +# todo(wcy-fdu): remove this patch fork after opendal release a new version to apply azure workload identity change. +reqsign = { git = "https://github.com/wcy-fdu/reqsign.git", rev = "dffdacb" } # patch to remove preserve_order from serde_json deno_core = { git = "https://github.com/bakjos/deno_core", rev = "9b241c6" } # patch to user reqwest 0.12.2 diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 144b43cab120e..0dc6b48d2d8da 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1045,14 +1045,6 @@ pub struct ObjectStoreConfig { /// Some special configuration of S3 Backend #[serde(default)] pub s3: S3ObjectStoreConfig, - - // TODO: the following field will be deprecated after opendal is stablized - #[serde(default = "default::object_store_config::opendal_upload_concurrency")] - pub opendal_upload_concurrency: usize, - - // TODO: the following field will be deprecated after opendal is stablized - #[serde(default)] - pub opendal_writer_abort_on_err: bool, } impl ObjectStoreConfig { @@ -1111,7 +1103,6 @@ pub struct S3ObjectStoreDeveloperConfig { )] pub retryable_service_error_codes: Vec, - // TODO: the following field will be deprecated after opendal is stablized #[serde(default = "default::object_store_config::s3::developer::use_opendal")] pub use_opendal: bool, } @@ -2013,10 +2004,6 @@ pub mod default { DEFAULT_REQ_MAX_RETRY_ATTEMPTS } - pub fn opendal_upload_concurrency() -> usize { - 8 - } - pub mod s3 { const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5; diff --git a/src/config/example.toml b/src/config/example.toml index 0e33ba465c9ae..27bbea13ade15 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -191,8 +191,6 @@ recent_filter_rotate_interval_ms = 10000 [storage.object_store] set_atomic_write_dir = false -opendal_upload_concurrency = 8 -opendal_writer_abort_on_err = false [storage.object_store.retry] req_backoff_interval_ms = 1000 diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index bfd2458900294..3adaa43bb5aa6 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -18,7 +18,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Context}; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; -use object_metrics::GLOBAL_OBJECT_STORE_METRICS; use reqwest::{header, Client, RequestBuilder, StatusCode}; use risingwave_object_store::object::*; use serde::{Deserialize, Serialize}; @@ -198,13 +197,11 @@ impl SnowflakeS3Client { // FIXME: we should use the `ObjectStoreConfig` instead of default // just use default configuration here for opendal s3 engine let config = ObjectStoreConfig::default(); - let metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone()); // create the s3 engine for streaming upload to the intermediate s3 bucket let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials( &s3_bucket, Arc::new(config), - metrics, &aws_access_key_id, &aws_secret_access_key, &aws_region, diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 4fee5f953df5c..43a812081a36c 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -31,21 +31,9 @@ hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = { workspace = true } madsim = "0.2.27" -opendal = { version = "0.47", features = [ - "executors-tokio", - "services-azblob", - "services-fs", - "services-gcs", - "services-memory", - "services-obs", - "services-oss", - "services-s3", - "services-webhdfs", - "services-azfile", - # "service-hdfs", -] } +opendal = "0.45.1" prometheus = { version = "0.13", features = ["process"] } -reqwest = "0.12.2" # required by opendal +reqwest = "0.11" # required by opendal risingwave_common = { workspace = true } rustls = "0.23.5" spin = "0.9" diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index b206fdb5dd17d..5369d914a7514 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -364,14 +364,20 @@ pub struct MonitoredStreamingUploader { object_store_metrics: Arc, /// Length of data uploaded with this uploader. operation_size: usize, + media_type: &'static str, } impl MonitoredStreamingUploader { - pub fn new(handle: U, object_store_metrics: Arc) -> Self { + pub fn new( + media_type: &'static str, + handle: U, + object_store_metrics: Arc, + ) -> Self { Self { inner: handle, object_store_metrics, operation_size: 0, + media_type, } } } @@ -384,16 +390,27 @@ impl MonitoredStreamingUploader { let operation_type_str = operation_type.as_str(); let data_len = data.len(); - let res = + let res = if self.media_type == "s3" { // TODO: we should avoid this special case after fully migrating to opeandal for s3. self.inner .write_bytes(data) .verbose_instrument_await(operation_type_str) - .await; + .await + } else { + let _timer = self + .object_store_metrics + .operation_latency + .with_label_values(&[self.media_type, operation_type_str]) + .start_timer(); + + self.inner + .write_bytes(data) + .verbose_instrument_await(operation_type_str) + .await + }; try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); - // duration metrics is collected and reported inside the specific implementation of the streaming uploader. self.object_store_metrics .write_bytes .inc_by(data_len as u64); @@ -410,16 +427,26 @@ impl MonitoredStreamingUploader { let operation_type = OperationType::StreamingUploadFinish; let operation_type_str = operation_type.as_str(); - let res = + let res = if self.media_type == "s3" { // TODO: we should avoid this special case after fully migrating to opeandal for s3. self.inner .finish() .verbose_instrument_await(operation_type_str) - .await; + .await + } else { + let _timer = self + .object_store_metrics + .operation_latency + .with_label_values(&[self.media_type, operation_type_str]) + .start_timer(); - try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); + self.inner + .finish() + .verbose_instrument_await(operation_type_str) + .await + }; - // duration metrics is collected and reported inside the specific implementation of the streaming uploader. + try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); self.object_store_metrics .operation_size .with_label_values(&[operation_type_str]) @@ -612,6 +639,7 @@ impl MonitoredObjectStore { try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); Ok(MonitoredStreamingUploader::new( + media_type, res?, self.object_store_metrics.clone(), )) @@ -838,13 +866,9 @@ pub async fn build_remote_object_store( let bucket = s3.strip_prefix("s3://").unwrap(); tracing::info!("Using OpenDAL to access s3, bucket is {}", bucket); ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine( - bucket.to_string(), - config.clone(), - metrics.clone(), - ) - .unwrap() - .monitored(metrics, config), + OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone()) + .unwrap() + .monitored(metrics, config), ) } else { ObjectStoreImpl::S3( @@ -867,7 +891,6 @@ pub async fn build_remote_object_store( namenode.to_string(), root.to_string(), config.clone(), - metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -881,7 +904,6 @@ pub async fn build_remote_object_store( bucket.to_string(), root.to_string(), config.clone(), - metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -895,7 +917,6 @@ pub async fn build_remote_object_store( bucket.to_string(), root.to_string(), config.clone(), - metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -910,7 +931,6 @@ pub async fn build_remote_object_store( bucket.to_string(), root.to_string(), config.clone(), - metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -924,7 +944,6 @@ pub async fn build_remote_object_store( namenode.to_string(), root.to_string(), config.clone(), - metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -938,7 +957,6 @@ pub async fn build_remote_object_store( container_name.to_string(), root.to_string(), config.clone(), - metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -947,7 +965,7 @@ pub async fn build_remote_object_store( fs if fs.starts_with("fs://") => { let fs = fs.strip_prefix("fs://").unwrap(); ObjectStoreImpl::Opendal( - OpendalObjectStore::new_fs_engine(fs.to_string(), config.clone(), metrics.clone()) + OpendalObjectStore::new_fs_engine(fs.to_string(), config.clone()) .unwrap() .monitored(metrics, config), ) @@ -963,7 +981,7 @@ pub async fn build_remote_object_store( if config.s3.developer.use_opendal { tracing::info!("Using OpenDAL to access minio."); ObjectStoreImpl::Opendal( - OpendalObjectStore::new_minio_engine(minio, config.clone(), metrics.clone()) + OpendalObjectStore::new_minio_engine(minio, config.clone()) .unwrap() .monitored(metrics, config), ) diff --git a/src/object_store/src/object/opendal_engine/azblob.rs b/src/object_store/src/object/opendal_engine/azblob.rs index e584e59aafe8b..590859eaaa706 100644 --- a/src/object_store/src/object/opendal_engine/azblob.rs +++ b/src/object_store/src/object/opendal_engine/azblob.rs @@ -20,7 +20,6 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; const AZBLOB_ENDPOINT: &str = "AZBLOB_ENDPOINT"; @@ -30,7 +29,6 @@ impl OpendalObjectStore { container_name: String, root: String, config: Arc, - metrics: Arc, ) -> ObjectResult { // Create azblob backend builder. let mut builder = Azblob::default(); @@ -49,7 +47,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::Azblob, config, - metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs index 2edaaa44d6bbe..ecb1131f0def8 100644 --- a/src/object_store/src/object/opendal_engine/fs.rs +++ b/src/object_store/src/object/opendal_engine/fs.rs @@ -20,17 +20,12 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; impl OpendalObjectStore { /// create opendal fs engine. - pub fn new_fs_engine( - root: String, - config: Arc, - metrics: Arc, - ) -> ObjectResult { + pub fn new_fs_engine(root: String, config: Arc) -> ObjectResult { // Create fs backend builder. let mut builder = Fs::default(); builder.root(&root); @@ -46,7 +41,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::Fs, config, - metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/gcs.rs b/src/object_store/src/object/opendal_engine/gcs.rs index a3876b30ef564..c55de2377202e 100644 --- a/src/object_store/src/object/opendal_engine/gcs.rs +++ b/src/object_store/src/object/opendal_engine/gcs.rs @@ -20,7 +20,6 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; impl OpendalObjectStore { @@ -29,7 +28,6 @@ impl OpendalObjectStore { bucket: String, root: String, config: Arc, - metrics: Arc, ) -> ObjectResult { // Create gcs backend builder. let mut builder = Gcs::default(); @@ -51,7 +49,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::Gcs, config, - metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/hdfs.rs b/src/object_store/src/object/opendal_engine/hdfs.rs index 28c4cf33b51b0..8c1e16eda1f57 100644 --- a/src/object_store/src/object/opendal_engine/hdfs.rs +++ b/src/object_store/src/object/opendal_engine/hdfs.rs @@ -18,7 +18,6 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; @@ -27,8 +26,7 @@ impl OpendalObjectStore { pub fn new_hdfs_engine( namenode: String, root: String, - config: Arc, - metrics: Arc, + config: ObjectStoreConfig, ) -> ObjectResult { // Create hdfs backend builder. let mut builder = Hdfs::default(); @@ -45,8 +43,6 @@ impl OpendalObjectStore { Ok(Self { op, engine_type: EngineType::Hdfs, - config, - metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/obs.rs b/src/object_store/src/object/opendal_engine/obs.rs index 03919ec57d37c..77178ca9ae7bc 100644 --- a/src/object_store/src/object/opendal_engine/obs.rs +++ b/src/object_store/src/object/opendal_engine/obs.rs @@ -20,7 +20,6 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; impl OpendalObjectStore { @@ -29,7 +28,6 @@ impl OpendalObjectStore { bucket: String, root: String, config: Arc, - metrics: Arc, ) -> ObjectResult { // Create obs backend builder. let mut builder = Obs::default(); @@ -57,7 +55,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::Obs, config, - metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 20d498d67806f..47ca4f362702a 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -18,19 +18,17 @@ use std::time::Duration; use bytes::Bytes; use fail::fail_point; -use futures::{stream, StreamExt}; +use futures::{stream, StreamExt, TryStreamExt}; use opendal::layers::{RetryLayer, TimeoutLayer}; -use opendal::raw::BoxedStaticFuture; use opendal::services::Memory; -use opendal::{Execute, Executor, Metakey, Operator, Writer}; +use opendal::{Metakey, Operator, Writer}; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::{ prefix, ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds, - ObjectResult, ObjectStore, OperationType, StreamingUploader, + ObjectResult, ObjectStore, StreamingUploader, }; /// Opendal object storage. @@ -40,7 +38,6 @@ pub struct OpendalObjectStore { pub(crate) engine_type: EngineType, pub(crate) config: Arc, - pub(crate) metrics: Arc, } #[derive(Clone)] @@ -67,7 +64,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::Memory, config: Arc::new(ObjectStoreConfig::default()), - metrics: Arc::new(ObjectStoreMetrics::unused()), }) } } @@ -101,14 +97,10 @@ impl ObjectStore for OpendalObjectStore { } async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok(OpendalStreamingUploader::new( - self.op.clone(), - path.to_string(), - self.config.clone(), - self.metrics.clone(), - self.store_media_type(), + Ok( + OpendalStreamingUploader::new(self.op.clone(), path.to_string(), self.config.clone()) + .await?, ) - .await?) } async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { @@ -133,7 +125,7 @@ impl ObjectStore for OpendalObjectStore { ))); } - Ok(data.to_bytes()) + Ok(Bytes::from(data)) } /// Returns a stream reading the object specified in `path`. If given, the stream starts at the @@ -148,17 +140,9 @@ impl ObjectStore for OpendalObjectStore { ObjectError::internal("opendal streaming read error") )); let range: Range = (range.start as u64)..(range.end as u64); - - // The layer specified first will be executed first. - // `TimeoutLayer` must be specified before `RetryLayer`. - // Otherwise, it will lead to bad state inside OpenDAL and panic. - // See https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html#panics let reader = self .op .clone() - .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( - self.config.retry.streaming_read_attempt_timeout_ms, - ))) .layer( RetryLayer::new() .with_min_delay(Duration::from_millis( @@ -171,13 +155,16 @@ impl ObjectStore for OpendalObjectStore { .with_factor(self.config.retry.req_backoff_factor as f32) .with_jitter(), ) + .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( + self.config.retry.streaming_read_attempt_timeout_ms, + ))) .reader_with(path) + .range(range) .await?; - let stream = reader.into_bytes_stream(range).await?.map(|item| { - item.map(|b| Bytes::copy_from_slice(b.as_ref())) - .map_err(|e| { - ObjectError::internal(format!("reader into_stream fail {}", e.as_report())) - }) + let stream = reader.into_stream().map(|item| { + item.map_err(|e| { + ObjectError::internal(format!("reader into_stream fail {}", e.as_report())) + }) }); Ok(Box::pin(stream)) @@ -265,75 +252,19 @@ impl ObjectStore for OpendalObjectStore { } } -struct OpendalStreamingUploaderExecute { - /// To record metrics for uploading part. - metrics: Arc, - media_type: &'static str, -} - -impl OpendalStreamingUploaderExecute { - const STREAMING_UPLOAD_TYPE: OperationType = OperationType::StreamingUpload; - - fn new(metrics: Arc, media_type: &'static str) -> Self { - Self { - metrics, - media_type, - } - } -} - -impl Execute for OpendalStreamingUploaderExecute { - fn execute(&self, f: BoxedStaticFuture<()>) { - let operation_type_str = Self::STREAMING_UPLOAD_TYPE.as_str(); - let media_type = self.media_type; - - let metrics = self.metrics.clone(); - let _handle = tokio::spawn(async move { - let _timer = metrics - .operation_latency - .with_label_values(&[media_type, operation_type_str]) - .start_timer(); - - f.await - }); - } -} - /// Store multiple parts in a map, and concatenate them on finish. pub struct OpendalStreamingUploader { writer: Writer, - /// Buffer for data. It will store at least `UPLOAD_BUFFER_SIZE` bytes of data before wrapping itself - /// into a stream and upload to object store as a part. - buf: Vec, - /// Length of the data that have not been uploaded to object store. - not_uploaded_len: usize, - /// Whether the writer is valid. The writer is invalid after abort/close. - is_valid: bool, - - abort_on_err: bool, } impl OpendalStreamingUploader { - const UPLOAD_BUFFER_SIZE: usize = 16 * 1024 * 1024; - pub async fn new( op: Operator, path: String, config: Arc, - metrics: Arc, - media_type: &'static str, ) -> ObjectResult { - let monitored_execute = OpendalStreamingUploaderExecute::new(metrics, media_type); - - // The layer specified first will be executed first. - // `TimeoutLayer` must be specified before `RetryLayer`. - // Otherwise, it will lead to bad state inside OpenDAL and panic. - // See https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html#panics let writer = op .clone() - .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( - config.retry.streaming_upload_attempt_timeout_ms, - ))) .layer( RetryLayer::new() .with_min_delay(Duration::from_millis(config.retry.req_backoff_interval_ms)) @@ -342,64 +273,34 @@ impl OpendalStreamingUploader { .with_factor(config.retry.req_backoff_factor as f32) .with_jitter(), ) + .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( + config.retry.streaming_upload_attempt_timeout_ms, + ))) .writer_with(&path) - .concurrent(config.opendal_upload_concurrency) - .executor(Executor::with(monitored_execute)) + .concurrent(8) + .buffer(OPENDAL_BUFFER_SIZE) .await?; - Ok(Self { - writer, - buf: vec![], - not_uploaded_len: 0, - is_valid: true, - abort_on_err: config.opendal_writer_abort_on_err, - }) - } - - async fn flush(&mut self) -> ObjectResult<()> { - let data: Vec = self.buf.drain(..).collect(); - debug_assert_eq!( - data.iter().map(|b| b.len()).sum::(), - self.not_uploaded_len - ); - if let Err(err) = self.writer.write(data).await { - self.is_valid = false; - if self.abort_on_err { - self.writer.abort().await?; - } - return Err(err.into()); - } - self.not_uploaded_len = 0; - Ok(()) + Ok(Self { writer }) } } +const OPENDAL_BUFFER_SIZE: usize = 16 * 1024 * 1024; + impl StreamingUploader for OpendalStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { - assert!(self.is_valid); - self.not_uploaded_len += data.len(); - self.buf.push(data); - if self.not_uploaded_len >= Self::UPLOAD_BUFFER_SIZE { - self.flush().await?; - } + self.writer.write(data).await?; + Ok(()) } async fn finish(mut self) -> ObjectResult<()> { - assert!(self.is_valid); - if self.not_uploaded_len > 0 { - self.flush().await?; - } - - assert!(self.buf.is_empty()); - assert_eq!(self.not_uploaded_len, 0); - - self.is_valid = false; match self.writer.close().await { Ok(_) => (), Err(err) => { - if self.abort_on_err { - self.writer.abort().await?; - } + // Due to a bug in OpenDAL, calling `abort()` here may trigger unreachable code and cause panic. + // refer to https://github.com/apache/opendal/issues/4651 + // This will be fixed after the next bump in the opendal version. + // self.writer.abort().await?; return Err(err.into()); } }; @@ -408,14 +309,12 @@ impl StreamingUploader for OpendalStreamingUploader { } fn get_memory_usage(&self) -> u64 { - Self::UPLOAD_BUFFER_SIZE as u64 + OPENDAL_BUFFER_SIZE as u64 } } #[cfg(test)] mod tests { - use stream::TryStreamExt; - use super::*; async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec { @@ -440,23 +339,15 @@ mod tests { let bytes = store.read("/abc", 4..6).await.unwrap(); assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_string()); + // Overflow. + store.read("/abc", 4..44).await.unwrap_err(); + store.delete("/abc").await.unwrap(); // No such object. store.read("/abc", 0..3).await.unwrap_err(); } - #[tokio::test] - #[should_panic] - async fn test_memory_read_overflow() { - let block = Bytes::from("123456"); - let store = OpendalObjectStore::test_new_memory_engine().unwrap(); - store.upload("/abc", block).await.unwrap(); - - // Overflow. - store.read("/abc", 4..44).await.unwrap_err(); - } - #[tokio::test] async fn test_memory_metadata() { let block = Bytes::from("123456"); diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 5ba90ad93ccba..e86a209f4f3fa 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -22,16 +22,11 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; impl OpendalObjectStore { /// create opendal s3 engine. - pub fn new_s3_engine( - bucket: String, - config: Arc, - metrics: Arc, - ) -> ObjectResult { + pub fn new_s3_engine(bucket: String, config: Arc) -> ObjectResult { // Create s3 builder. let mut builder = S3::default(); builder.bucket(&bucket); @@ -55,16 +50,11 @@ impl OpendalObjectStore { op, engine_type: EngineType::S3, config, - metrics, }) } /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`. - pub fn new_minio_engine( - server: &str, - config: Arc, - metrics: Arc, - ) -> ObjectResult { + pub fn new_minio_engine(server: &str, config: Arc) -> ObjectResult { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); @@ -99,7 +89,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::Minio, config, - metrics, }) } @@ -122,7 +111,6 @@ impl OpendalObjectStore { pub fn new_s3_engine_with_credentials( bucket: &str, config: Arc, - metrics: Arc, aws_access_key_id: &str, aws_secret_access_key: &str, aws_region: &str, @@ -147,7 +135,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::S3, config, - metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/oss.rs b/src/object_store/src/object/opendal_engine/oss.rs index c4fc5d500b11e..70fd6628f29b0 100644 --- a/src/object_store/src/object/opendal_engine/oss.rs +++ b/src/object_store/src/object/opendal_engine/oss.rs @@ -20,7 +20,6 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; impl OpendalObjectStore { @@ -29,7 +28,6 @@ impl OpendalObjectStore { bucket: String, root: String, config: Arc, - metrics: Arc, ) -> ObjectResult { // Create oss backend builder. let mut builder = Oss::default(); @@ -57,7 +55,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::Oss, config, - metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/webhdfs.rs b/src/object_store/src/object/opendal_engine/webhdfs.rs index f083102a3ed21..cb8a2ad1753b3 100644 --- a/src/object_store/src/object/opendal_engine/webhdfs.rs +++ b/src/object_store/src/object/opendal_engine/webhdfs.rs @@ -20,7 +20,6 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; -use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; @@ -30,7 +29,6 @@ impl OpendalObjectStore { endpoint: String, root: String, config: Arc, - metrics: Arc, ) -> ObjectResult { // Create webhdfs backend builder. let mut builder = Webhdfs::default(); @@ -49,7 +47,6 @@ impl OpendalObjectStore { op, engine_type: EngineType::Webhdfs, config, - metrics, }) } }