diff --git a/Cargo.lock b/Cargo.lock index 69e17e94bce33..bae8e2ebd3926 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1560,7 +1560,7 @@ dependencies = [ "aws-smithy-types", "bytes", "http 0.2.9", - "http 1.0.0", + "http 1.1.0", "pin-project-lite", "tokio", "tracing", @@ -1663,7 +1663,7 @@ dependencies = [ "axum-core 0.4.3", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "hyper 1.1.0", @@ -1713,7 +1713,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "mime", @@ -1733,7 +1733,7 @@ checksum = "077959a7f8cf438676af90b483304528eb7e16eadadb7f44e9ada4f9dceb9e62" dependencies = [ "axum-core 0.4.3", "chrono", - "http 1.0.0", + "http 1.1.0", "mime_guess", "rust-embed", "tower-service", @@ -2971,9 +2971,9 @@ checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" [[package]] name = "crc32c" -version = "0.6.5" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2" +checksum = "0227b9f93e535d49bc7ce914c066243424ce85ed90864cebd0874b184e9b6947" dependencies = [ "rustc_version 0.4.0", ] @@ -3890,7 +3890,7 @@ dependencies = [ "deno_core", "deno_tls", "dyn-clone", - "http 1.0.0", + "http 1.1.0", "pin-project", "reqwest 0.12.4", "serde", @@ -3914,7 +3914,7 @@ dependencies = [ "deno_net", "deno_websocket", "flate2", - "http 1.0.0", + "http 1.1.0", "httparse", "hyper 0.14.27", "hyper 1.1.0", @@ -4092,7 +4092,7 @@ dependencies = [ "deno_tls", "fastwebsockets", "h2 0.4.4", - "http 1.0.0", + "http 1.1.0", "http-body-util", "hyper 1.1.0", "hyper-util", @@ -5884,7 +5884,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 1.0.0", + "http 1.1.0", "indexmap 2.0.0", "slab", "tokio", @@ -6076,9 +6076,9 @@ dependencies = [ [[package]] name = "http" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -6103,7 +6103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.0.0", + "http 1.1.0", ] [[package]] @@ -6114,7 +6114,7 @@ checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ "bytes", "futures-core", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "pin-project-lite", ] @@ -6177,7 +6177,7 @@ dependencies = [ "futures-channel", "futures-util", "h2 0.4.4", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "httparse", "httpdate", @@ -6211,7 +6211,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http 1.0.0", + "http 1.1.0", "hyper 1.1.0", "hyper-util", "rustls 0.22.4", @@ -6271,7 +6271,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "hyper 1.1.0", "pin-project-lite", @@ -6347,7 +6347,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal", + "opendal 0.45.1", "ordered-float 3.9.1", "parquet 50.0.0", "prometheus", @@ -7167,7 +7167,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http 1.0.0", + "http 1.1.0", "madsim", "spin 0.9.8", "tracing", @@ -8050,7 +8050,7 @@ dependencies = [ "percent-encoding", "prometheus", "quick-xml 0.31.0", - "reqsign", + "reqsign 0.14.9", "reqwest 0.11.20", "serde", "serde_json", @@ -8059,6 +8059,37 @@ dependencies = [ "uuid", ] +[[package]] +name = "opendal" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3ba698f2258bebdf7a3a38862bb6ef1f96d351627002686dacc228f805bdd6" +dependencies = [ + "anyhow", + "async-trait", + "backon", + "base64 0.22.0", + "bytes", + "chrono", + "crc32c", + "flagset", + "futures", + "getrandom", + "http 1.1.0", + "log", + "md-5", + "once_cell", + "percent-encoding", + "quick-xml 0.31.0", + "reqsign 0.15.2", + "reqwest 0.12.4", + "serde", + "serde_json", + "sha2", + "tokio", + "uuid", +] + [[package]] name = "openidconnect" version = "3.4.0" @@ -9883,11 +9914,12 @@ dependencies = [ [[package]] name = "reqsign" version = "0.14.9" -source = "git+https://github.com/wcy-fdu/reqsign.git?rev=c7dd668#c7dd668764ada1e7477177cfa913fec24252dd34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" dependencies = [ "anyhow", "async-trait", - "base64 0.22.0", + "base64 0.21.7", "chrono", "form_urlencoded", "getrandom", @@ -9903,7 +9935,38 @@ dependencies = [ "rand", "reqwest 0.11.20", "rsa", - "rust-ini", + "rust-ini 0.20.0", + "serde", + "serde_json", + "sha1", + "sha2", +] + +[[package]] +name = "reqsign" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70fe66d4cd0b5ed9b1abbfe639bf6baeaaf509f7da2d51b31111ba945be59286" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.0", + "chrono", + "form_urlencoded", + "getrandom", + "hex", + "hmac", + "home", + "http 1.1.0", + "jsonwebtoken", + "log", + "once_cell", + "percent-encoding", + "quick-xml 0.31.0", + "rand", + "reqwest 0.12.4", + "rsa", + "rust-ini 0.21.0", "serde", "serde_json", "sha1", @@ -9969,7 +10032,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.4.4", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "hyper 1.1.0", @@ -10016,7 +10079,7 @@ checksum = "a45d100244a467870f6cb763c4484d010a6bed6bd610b3676e3825d93fb4cfbd" dependencies = [ "anyhow", "async-trait", - "http 1.0.0", + "http 1.1.0", "reqwest 0.12.4", "serde", "thiserror", @@ -10199,7 +10262,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "memcomparable", - "opendal", + "opendal 0.45.1", "parking_lot 0.12.1", "paste", "prometheus", @@ -10695,7 +10758,7 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal", + "opendal 0.45.1", "openssl", "parking_lot 0.12.1", "paste", @@ -11424,9 +11487,9 @@ dependencies = [ "madsim", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal", + "opendal 0.47.0", "prometheus", - "reqwest 0.11.20", + "reqwest 0.12.4", "risingwave_common", "rustls 0.23.5", "spin 0.9.8", @@ -12000,6 +12063,17 @@ dependencies = [ "ordered-multimap", ] +[[package]] +name = "rust-ini" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41" +dependencies = [ + "cfg-if", + "ordered-multimap", + "trim-in-place", +] + [[package]] name = "rust_decimal" version = "1.35.0" @@ -14893,7 +14967,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "http-range-header", @@ -15062,6 +15136,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "triomphe" version = "0.1.11" diff --git a/Cargo.toml b/Cargo.toml index 7e7910234bd7b..bcc8da33ee032 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -319,8 +319,6 @@ tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" } # patch: unlimit 4MB message size for grpc client etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" } -# todo(wcy-fdu): remove this patch fork after opendal release a new version to apply azure workload identity change. -reqsign = { git = "https://github.com/wcy-fdu/reqsign.git", rev = "c7dd668" } # patch to remove preserve_order from serde_json deno_core = { git = "https://github.com/bakjos/deno_core", rev = "9b241c6" } # patch to user reqwest 0.12.2 diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 0dc6b48d2d8da..144b43cab120e 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1045,6 +1045,14 @@ pub struct ObjectStoreConfig { /// Some special configuration of S3 Backend #[serde(default)] pub s3: S3ObjectStoreConfig, + + // TODO: the following field will be deprecated after opendal is stablized + #[serde(default = "default::object_store_config::opendal_upload_concurrency")] + pub opendal_upload_concurrency: usize, + + // TODO: the following field will be deprecated after opendal is stablized + #[serde(default)] + pub opendal_writer_abort_on_err: bool, } impl ObjectStoreConfig { @@ -1103,6 +1111,7 @@ pub struct S3ObjectStoreDeveloperConfig { )] pub retryable_service_error_codes: Vec, + // TODO: the following field will be deprecated after opendal is stablized #[serde(default = "default::object_store_config::s3::developer::use_opendal")] pub use_opendal: bool, } @@ -2004,6 +2013,10 @@ pub mod default { DEFAULT_REQ_MAX_RETRY_ATTEMPTS } + pub fn opendal_upload_concurrency() -> usize { + 8 + } + pub mod s3 { const DEFAULT_IDENTITY_RESOLUTION_TIMEOUT_S: u64 = 5; diff --git a/src/config/example.toml b/src/config/example.toml index 27bbea13ade15..0e33ba465c9ae 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -191,6 +191,8 @@ recent_filter_rotate_interval_ms = 10000 [storage.object_store] set_atomic_write_dir = false +opendal_upload_concurrency = 8 +opendal_writer_abort_on_err = false [storage.object_store.retry] req_backoff_interval_ms = 1000 diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 3adaa43bb5aa6..bfd2458900294 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -18,6 +18,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Context}; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; +use object_metrics::GLOBAL_OBJECT_STORE_METRICS; use reqwest::{header, Client, RequestBuilder, StatusCode}; use risingwave_object_store::object::*; use serde::{Deserialize, Serialize}; @@ -197,11 +198,13 @@ impl SnowflakeS3Client { // FIXME: we should use the `ObjectStoreConfig` instead of default // just use default configuration here for opendal s3 engine let config = ObjectStoreConfig::default(); + let metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone()); // create the s3 engine for streaming upload to the intermediate s3 bucket let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials( &s3_bucket, Arc::new(config), + metrics, &aws_access_key_id, &aws_secret_access_key, &aws_region, diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 43a812081a36c..4fee5f953df5c 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -31,9 +31,21 @@ hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = { workspace = true } madsim = "0.2.27" -opendal = "0.45.1" +opendal = { version = "0.47", features = [ + "executors-tokio", + "services-azblob", + "services-fs", + "services-gcs", + "services-memory", + "services-obs", + "services-oss", + "services-s3", + "services-webhdfs", + "services-azfile", + # "service-hdfs", +] } prometheus = { version = "0.13", features = ["process"] } -reqwest = "0.11" # required by opendal +reqwest = "0.12.2" # required by opendal risingwave_common = { workspace = true } rustls = "0.23.5" spin = "0.9" diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index e5e9cb9661262..c701a438253ae 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -366,20 +366,14 @@ pub struct MonitoredStreamingUploader { object_store_metrics: Arc, /// Length of data uploaded with this uploader. operation_size: usize, - media_type: &'static str, } impl MonitoredStreamingUploader { - pub fn new( - media_type: &'static str, - handle: U, - object_store_metrics: Arc, - ) -> Self { + pub fn new(handle: U, object_store_metrics: Arc) -> Self { Self { inner: handle, object_store_metrics, operation_size: 0, - media_type, } } } @@ -392,27 +386,16 @@ impl MonitoredStreamingUploader { let operation_type_str = operation_type.as_str(); let data_len = data.len(); - let res = if self.media_type == "s3" { + let res = // TODO: we should avoid this special case after fully migrating to opeandal for s3. self.inner .write_bytes(data) .verbose_instrument_await(operation_type_str) - .await - } else { - let _timer = self - .object_store_metrics - .operation_latency - .with_label_values(&[self.media_type, operation_type_str]) - .start_timer(); - - self.inner - .write_bytes(data) - .verbose_instrument_await(operation_type_str) - .await - }; + .await; try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); + // duration metrics is collected and reported inside the specific implementation of the streaming uploader. self.object_store_metrics .write_bytes .inc_by(data_len as u64); @@ -429,26 +412,16 @@ impl MonitoredStreamingUploader { let operation_type = OperationType::StreamingUploadFinish; let operation_type_str = operation_type.as_str(); - let res = if self.media_type == "s3" { + let res = // TODO: we should avoid this special case after fully migrating to opeandal for s3. self.inner .finish() .verbose_instrument_await(operation_type_str) - .await - } else { - let _timer = self - .object_store_metrics - .operation_latency - .with_label_values(&[self.media_type, operation_type_str]) - .start_timer(); - - self.inner - .finish() - .verbose_instrument_await(operation_type_str) - .await - }; + .await; try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); + + // duration metrics is collected and reported inside the specific implementation of the streaming uploader. self.object_store_metrics .operation_size .with_label_values(&[operation_type_str]) @@ -641,7 +614,6 @@ impl MonitoredObjectStore { try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str); Ok(MonitoredStreamingUploader::new( - media_type, res?, self.object_store_metrics.clone(), )) @@ -868,9 +840,13 @@ pub async fn build_remote_object_store( let bucket = s3.strip_prefix("s3://").unwrap(); tracing::info!("Using OpenDAL to access s3, bucket is {}", bucket); ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone()) - .unwrap() - .monitored(metrics, config), + OpendalObjectStore::new_s3_engine( + bucket.to_string(), + config.clone(), + metrics.clone(), + ) + .unwrap() + .monitored(metrics, config), ) } else { ObjectStoreImpl::S3( @@ -893,6 +869,7 @@ pub async fn build_remote_object_store( namenode.to_string(), root.to_string(), config.clone(), + metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -906,6 +883,7 @@ pub async fn build_remote_object_store( bucket.to_string(), root.to_string(), config.clone(), + metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -919,6 +897,7 @@ pub async fn build_remote_object_store( bucket.to_string(), root.to_string(), config.clone(), + metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -933,6 +912,7 @@ pub async fn build_remote_object_store( bucket.to_string(), root.to_string(), config.clone(), + metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -946,6 +926,7 @@ pub async fn build_remote_object_store( namenode.to_string(), root.to_string(), config.clone(), + metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -959,6 +940,7 @@ pub async fn build_remote_object_store( container_name.to_string(), root.to_string(), config.clone(), + metrics.clone(), ) .unwrap() .monitored(metrics, config), @@ -967,7 +949,7 @@ pub async fn build_remote_object_store( fs if fs.starts_with("fs://") => { let fs = fs.strip_prefix("fs://").unwrap(); ObjectStoreImpl::Opendal( - OpendalObjectStore::new_fs_engine(fs.to_string(), config.clone()) + OpendalObjectStore::new_fs_engine(fs.to_string(), config.clone(), metrics.clone()) .unwrap() .monitored(metrics, config), ) @@ -983,7 +965,7 @@ pub async fn build_remote_object_store( if config.s3.developer.use_opendal { tracing::info!("Using OpenDAL to access minio."); ObjectStoreImpl::Opendal( - OpendalObjectStore::new_minio_engine(minio, config.clone()) + OpendalObjectStore::new_minio_engine(minio, config.clone(), metrics.clone()) .unwrap() .monitored(metrics, config), ) diff --git a/src/object_store/src/object/opendal_engine/azblob.rs b/src/object_store/src/object/opendal_engine/azblob.rs index 590859eaaa706..e584e59aafe8b 100644 --- a/src/object_store/src/object/opendal_engine/azblob.rs +++ b/src/object_store/src/object/opendal_engine/azblob.rs @@ -20,6 +20,7 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; const AZBLOB_ENDPOINT: &str = "AZBLOB_ENDPOINT"; @@ -29,6 +30,7 @@ impl OpendalObjectStore { container_name: String, root: String, config: Arc, + metrics: Arc, ) -> ObjectResult { // Create azblob backend builder. let mut builder = Azblob::default(); @@ -47,6 +49,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::Azblob, config, + metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs index ecb1131f0def8..2edaaa44d6bbe 100644 --- a/src/object_store/src/object/opendal_engine/fs.rs +++ b/src/object_store/src/object/opendal_engine/fs.rs @@ -20,12 +20,17 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; impl OpendalObjectStore { /// create opendal fs engine. - pub fn new_fs_engine(root: String, config: Arc) -> ObjectResult { + pub fn new_fs_engine( + root: String, + config: Arc, + metrics: Arc, + ) -> ObjectResult { // Create fs backend builder. let mut builder = Fs::default(); builder.root(&root); @@ -41,6 +46,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::Fs, config, + metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/gcs.rs b/src/object_store/src/object/opendal_engine/gcs.rs index c55de2377202e..a3876b30ef564 100644 --- a/src/object_store/src/object/opendal_engine/gcs.rs +++ b/src/object_store/src/object/opendal_engine/gcs.rs @@ -20,6 +20,7 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; impl OpendalObjectStore { @@ -28,6 +29,7 @@ impl OpendalObjectStore { bucket: String, root: String, config: Arc, + metrics: Arc, ) -> ObjectResult { // Create gcs backend builder. let mut builder = Gcs::default(); @@ -49,6 +51,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::Gcs, config, + metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/hdfs.rs b/src/object_store/src/object/opendal_engine/hdfs.rs index 8c1e16eda1f57..28c4cf33b51b0 100644 --- a/src/object_store/src/object/opendal_engine/hdfs.rs +++ b/src/object_store/src/object/opendal_engine/hdfs.rs @@ -18,6 +18,7 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; @@ -26,7 +27,8 @@ impl OpendalObjectStore { pub fn new_hdfs_engine( namenode: String, root: String, - config: ObjectStoreConfig, + config: Arc, + metrics: Arc, ) -> ObjectResult { // Create hdfs backend builder. let mut builder = Hdfs::default(); @@ -43,6 +45,8 @@ impl OpendalObjectStore { Ok(Self { op, engine_type: EngineType::Hdfs, + config, + metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/obs.rs b/src/object_store/src/object/opendal_engine/obs.rs index 77178ca9ae7bc..03919ec57d37c 100644 --- a/src/object_store/src/object/opendal_engine/obs.rs +++ b/src/object_store/src/object/opendal_engine/obs.rs @@ -20,6 +20,7 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; impl OpendalObjectStore { @@ -28,6 +29,7 @@ impl OpendalObjectStore { bucket: String, root: String, config: Arc, + metrics: Arc, ) -> ObjectResult { // Create obs backend builder. let mut builder = Obs::default(); @@ -55,6 +57,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::Obs, config, + metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 0d946e95d43f1..6ea0cbb6fe8f0 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -18,17 +18,19 @@ use std::time::Duration; use bytes::Bytes; use fail::fail_point; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::{stream, StreamExt}; use opendal::layers::{RetryLayer, TimeoutLayer}; +use opendal::raw::BoxedStaticFuture; use opendal::services::Memory; -use opendal::{Metakey, Operator, Writer}; +use opendal::{Execute, Executor, Metakey, Operator, Writer}; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::{ prefix, ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds, - ObjectResult, ObjectStore, StreamingUploader, + ObjectResult, ObjectStore, OperationType, StreamingUploader, }; /// Opendal object storage. @@ -38,6 +40,7 @@ pub struct OpendalObjectStore { pub(crate) engine_type: EngineType, pub(crate) config: Arc, + pub(crate) metrics: Arc, } #[derive(Clone)] @@ -64,6 +67,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::Memory, config: Arc::new(ObjectStoreConfig::default()), + metrics: Arc::new(ObjectStoreMetrics::unused()), }) } } @@ -99,10 +103,14 @@ impl ObjectStore for OpendalObjectStore { } async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok( - OpendalStreamingUploader::new(self.op.clone(), path.to_string(), self.config.clone()) - .await?, + Ok(OpendalStreamingUploader::new( + self.op.clone(), + path.to_string(), + self.config.clone(), + self.metrics.clone(), + self.store_media_type(), ) + .await?) } async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { @@ -127,7 +135,7 @@ impl ObjectStore for OpendalObjectStore { ))); } - Ok(Bytes::from(data)) + Ok(data.to_bytes()) } /// Returns a stream reading the object specified in `path`. If given, the stream starts at the @@ -142,9 +150,17 @@ impl ObjectStore for OpendalObjectStore { ObjectError::internal("opendal streaming read error") )); let range: Range = (range.start as u64)..(range.end as u64); + + // The layer specified first will be executed first. + // `TimeoutLayer` must be specified before `RetryLayer`. + // Otherwise, it will lead to bad state inside OpenDAL and panic. + // See https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html#panics let reader = self .op .clone() + .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( + self.config.retry.streaming_read_attempt_timeout_ms, + ))) .layer( RetryLayer::new() .with_min_delay(Duration::from_millis( @@ -157,16 +173,13 @@ impl ObjectStore for OpendalObjectStore { .with_factor(self.config.retry.req_backoff_factor as f32) .with_jitter(), ) - .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( - self.config.retry.streaming_read_attempt_timeout_ms, - ))) .reader_with(path) - .range(range) .await?; - let stream = reader.into_stream().map(|item| { - item.map_err(|e| { - ObjectError::internal(format!("reader into_stream fail {}", e.as_report())) - }) + let stream = reader.into_bytes_stream(range).await?.map(|item| { + item.map(|b| Bytes::copy_from_slice(b.as_ref())) + .map_err(|e| { + ObjectError::internal(format!("reader into_stream fail {}", e.as_report())) + }) }); Ok(Box::pin(stream)) @@ -254,19 +267,75 @@ impl ObjectStore for OpendalObjectStore { } } +struct OpendalStreamingUploaderExecute { + /// To record metrics for uploading part. + metrics: Arc, + media_type: &'static str, +} + +impl OpendalStreamingUploaderExecute { + const STREAMING_UPLOAD_TYPE: OperationType = OperationType::StreamingUpload; + + fn new(metrics: Arc, media_type: &'static str) -> Self { + Self { + metrics, + media_type, + } + } +} + +impl Execute for OpendalStreamingUploaderExecute { + fn execute(&self, f: BoxedStaticFuture<()>) { + let operation_type_str = Self::STREAMING_UPLOAD_TYPE.as_str(); + let media_type = self.media_type; + + let metrics = self.metrics.clone(); + let _handle = tokio::spawn(async move { + let _timer = metrics + .operation_latency + .with_label_values(&[media_type, operation_type_str]) + .start_timer(); + + f.await + }); + } +} + /// Store multiple parts in a map, and concatenate them on finish. pub struct OpendalStreamingUploader { writer: Writer, + /// Buffer for data. It will store at least `UPLOAD_BUFFER_SIZE` bytes of data before wrapping itself + /// into a stream and upload to object store as a part. + buf: Vec, + /// Length of the data that have not been uploaded to object store. + not_uploaded_len: usize, + /// Whether the writer is valid. The writer is invalid after abort/close. + is_valid: bool, + + abort_on_err: bool, } impl OpendalStreamingUploader { + const UPLOAD_BUFFER_SIZE: usize = 16 * 1024 * 1024; + pub async fn new( op: Operator, path: String, config: Arc, + metrics: Arc, + media_type: &'static str, ) -> ObjectResult { + let monitored_execute = OpendalStreamingUploaderExecute::new(metrics, media_type); + + // The layer specified first will be executed first. + // `TimeoutLayer` must be specified before `RetryLayer`. + // Otherwise, it will lead to bad state inside OpenDAL and panic. + // See https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html#panics let writer = op .clone() + .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( + config.retry.streaming_upload_attempt_timeout_ms, + ))) .layer( RetryLayer::new() .with_min_delay(Duration::from_millis(config.retry.req_backoff_interval_ms)) @@ -275,34 +344,64 @@ impl OpendalStreamingUploader { .with_factor(config.retry.req_backoff_factor as f32) .with_jitter(), ) - .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis( - config.retry.streaming_upload_attempt_timeout_ms, - ))) .writer_with(&path) - .concurrent(8) - .buffer(OPENDAL_BUFFER_SIZE) + .concurrent(config.opendal_upload_concurrency) + .executor(Executor::with(monitored_execute)) .await?; - Ok(Self { writer }) + Ok(Self { + writer, + buf: vec![], + not_uploaded_len: 0, + is_valid: true, + abort_on_err: config.opendal_writer_abort_on_err, + }) } -} -const OPENDAL_BUFFER_SIZE: usize = 16 * 1024 * 1024; + async fn flush(&mut self) -> ObjectResult<()> { + let data: Vec = self.buf.drain(..).collect(); + debug_assert_eq!( + data.iter().map(|b| b.len()).sum::(), + self.not_uploaded_len + ); + if let Err(err) = self.writer.write(data).await { + self.is_valid = false; + if self.abort_on_err { + self.writer.abort().await?; + } + return Err(err.into()); + } + self.not_uploaded_len = 0; + Ok(()) + } +} impl StreamingUploader for OpendalStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { - self.writer.write(data).await?; - + assert!(self.is_valid); + self.not_uploaded_len += data.len(); + self.buf.push(data); + if self.not_uploaded_len >= Self::UPLOAD_BUFFER_SIZE { + self.flush().await?; + } Ok(()) } async fn finish(mut self) -> ObjectResult<()> { + assert!(self.is_valid); + if self.not_uploaded_len > 0 { + self.flush().await?; + } + + assert!(self.buf.is_empty()); + assert_eq!(self.not_uploaded_len, 0); + + self.is_valid = false; match self.writer.close().await { Ok(_) => (), Err(err) => { - // Due to a bug in OpenDAL, calling `abort()` here may trigger unreachable code and cause panic. - // refer to https://github.com/apache/opendal/issues/4651 - // This will be fixed after the next bump in the opendal version. - // self.writer.abort().await?; + if self.abort_on_err { + self.writer.abort().await?; + } return Err(err.into()); } }; @@ -311,12 +410,14 @@ impl StreamingUploader for OpendalStreamingUploader { } fn get_memory_usage(&self) -> u64 { - OPENDAL_BUFFER_SIZE as u64 + Self::UPLOAD_BUFFER_SIZE as u64 } } #[cfg(test)] mod tests { + use stream::TryStreamExt; + use super::*; async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec { @@ -341,15 +442,23 @@ mod tests { let bytes = store.read("/abc", 4..6).await.unwrap(); assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_string()); - // Overflow. - store.read("/abc", 4..44).await.unwrap_err(); - store.delete("/abc").await.unwrap(); // No such object. store.read("/abc", 0..3).await.unwrap_err(); } + #[tokio::test] + #[should_panic] + async fn test_memory_read_overflow() { + let block = Bytes::from("123456"); + let store = OpendalObjectStore::test_new_memory_engine().unwrap(); + store.upload("/abc", block).await.unwrap(); + + // Overflow. + store.read("/abc", 4..44).await.unwrap_err(); + } + #[tokio::test] async fn test_memory_metadata() { let block = Bytes::from("123456"); diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index e86a209f4f3fa..5ba90ad93ccba 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -22,11 +22,16 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; impl OpendalObjectStore { /// create opendal s3 engine. - pub fn new_s3_engine(bucket: String, config: Arc) -> ObjectResult { + pub fn new_s3_engine( + bucket: String, + config: Arc, + metrics: Arc, + ) -> ObjectResult { // Create s3 builder. let mut builder = S3::default(); builder.bucket(&bucket); @@ -50,11 +55,16 @@ impl OpendalObjectStore { op, engine_type: EngineType::S3, config, + metrics, }) } /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`. - pub fn new_minio_engine(server: &str, config: Arc) -> ObjectResult { + pub fn new_minio_engine( + server: &str, + config: Arc, + metrics: Arc, + ) -> ObjectResult { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); @@ -89,6 +99,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::Minio, config, + metrics, }) } @@ -111,6 +122,7 @@ impl OpendalObjectStore { pub fn new_s3_engine_with_credentials( bucket: &str, config: Arc, + metrics: Arc, aws_access_key_id: &str, aws_secret_access_key: &str, aws_region: &str, @@ -135,6 +147,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::S3, config, + metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/oss.rs b/src/object_store/src/object/opendal_engine/oss.rs index 70fd6628f29b0..c4fc5d500b11e 100644 --- a/src/object_store/src/object/opendal_engine/oss.rs +++ b/src/object_store/src/object/opendal_engine/oss.rs @@ -20,6 +20,7 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; impl OpendalObjectStore { @@ -28,6 +29,7 @@ impl OpendalObjectStore { bucket: String, root: String, config: Arc, + metrics: Arc, ) -> ObjectResult { // Create oss backend builder. let mut builder = Oss::default(); @@ -55,6 +57,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::Oss, config, + metrics, }) } } diff --git a/src/object_store/src/object/opendal_engine/webhdfs.rs b/src/object_store/src/object/opendal_engine/webhdfs.rs index cb8a2ad1753b3..f083102a3ed21 100644 --- a/src/object_store/src/object/opendal_engine/webhdfs.rs +++ b/src/object_store/src/object/opendal_engine/webhdfs.rs @@ -20,6 +20,7 @@ use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; +use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; @@ -29,6 +30,7 @@ impl OpendalObjectStore { endpoint: String, root: String, config: Arc, + metrics: Arc, ) -> ObjectResult { // Create webhdfs backend builder. let mut builder = Webhdfs::default(); @@ -47,6 +49,7 @@ impl OpendalObjectStore { op, engine_type: EngineType::Webhdfs, config, + metrics, }) } }