From a379cdb658a188a1db1dc1eab646084a02233647 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 3 Jan 2024 11:17:05 +0800 Subject: [PATCH 01/10] save work --- .../src/object/opendal_engine/mod.rs | 2 +- .../src/object/opendal_engine/opendal_s3.rs | 75 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 src/object_store/src/object/opendal_engine/opendal_s3.rs diff --git a/src/object_store/src/object/opendal_engine/mod.rs b/src/object_store/src/object/opendal_engine/mod.rs index f2dbc24d4bba..c11f4fd2a25e 100644 --- a/src/object_store/src/object/opendal_engine/mod.rs +++ b/src/object_store/src/object/opendal_engine/mod.rs @@ -27,7 +27,7 @@ pub mod gcs; pub mod obs; pub mod oss; - +pub mod opendal_s3; pub mod azblob; pub mod fs; diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs new file mode 100644 index 000000000000..c6b493fc1213 --- /dev/null +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -0,0 +1,75 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use opendal::{layers::{LoggingLayer, RetryLayer}, services::S3}; +use opendal::services::Gcs; +use opendal::Operator; + +use super::{EngineType, OpendalObjectStore}; +use crate::object::ObjectResult; + +impl OpendalObjectStore { + /// create opendal gcs engine. + pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult { + // Create gcs backend builder. + let mut builder = Gcs::default(); + + builder.bucket(&bucket); + + builder.root(&root); + + // Create s3 builder. + let mut builder = S3::default(); + builder.bucket(&bucket); + builder.region(&s3_properties.region_name); + + if let Some(endpoint_url) = s3_properties.endpoint_url { + builder.endpoint(&endpoint_url); + } + + if let Some(access) = s3_properties.access { + builder.access_key_id(&access); + } else { + tracing::error!( + "access key id of aws s3 is not set, bucket {}", + s3_properties.bucket_name + ); + } + + if let Some(secret) = s3_properties.secret { + builder.secret_access_key(&secret); + } else { + tracing::error!( + "secret access key of aws s3 is not set, bucket {}", + s3_properties.bucket_name + ); + } + + builder.enable_virtual_host_style(); + + if let Some(assume_role) = assume_role { + builder.role_arn(&assume_role); + } + + builder.disable_config_load(); + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(Self { + op, + engine_type: EngineType::Gcs, + }) + } +} From 4e524193c814f90f27db0bb1174ef34844a8f395 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 8 Jan 2024 14:26:46 +0800 Subject: [PATCH 02/10] implement new s3 via OpenDAL --- risedev.yml | 13 +++++++ src/object_store/src/object/mod.rs | 9 +++++ .../src/object/opendal_engine/mod.rs | 4 +- .../opendal_engine/opendal_object_store.rs | 2 + .../src/object/opendal_engine/opendal_s3.rs | 37 ++++++------------- src/risedevtool/src/task/utils.rs | 4 ++ 6 files changed, 41 insertions(+), 28 deletions(-) diff --git a/risedev.yml b/risedev.yml index 17076e4f2da8..97a8d0bd48b0 100644 --- a/risedev.yml +++ b/risedev.yml @@ -191,6 +191,19 @@ profile: - use: compactor # - use: prometheus # - use: grafana + opendal_s3: + steps: + # - use: etcd + - use: meta-node + - use: compute-node + - use: frontend + # If you want to use google cloud storage as storage backend, configure bucket name: + - use: opendal + engine: opendal_s3 + bucket: bucket-name + - use: compactor + # - use: prometheus + # - use: grafana obs: steps: # - use: etcd diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index d2010a676a48..8b7dac6d9037 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -795,6 +795,15 @@ pub async fn build_remote_object_store( .await .monitored(metrics), ), + opendal_s3 if opendal_s3.starts_with("opendal_s3://") => { + let opendal_s3 = opendal_s3.strip_prefix("opendal_s3://").unwrap(); + let (bucket, root) = opendal_s3.split_once('@').unwrap_or((opendal_s3, "")); + ObjectStoreImpl::Opendal( + OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string()) + .unwrap() + .monitored(metrics), + ) + } #[cfg(feature = "hdfs-backend")] hdfs if hdfs.starts_with("hdfs://") => { let hdfs = hdfs.strip_prefix("hdfs://").unwrap(); diff --git a/src/object_store/src/object/opendal_engine/mod.rs b/src/object_store/src/object/opendal_engine/mod.rs index c11f4fd2a25e..9744a578582a 100644 --- a/src/object_store/src/object/opendal_engine/mod.rs +++ b/src/object_store/src/object/opendal_engine/mod.rs @@ -26,8 +26,8 @@ pub mod gcs; pub mod obs; -pub mod oss; -pub mod opendal_s3; pub mod azblob; +pub mod opendal_s3; +pub mod oss; pub mod fs; diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 9005f06981b2..3489603026a8 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -37,6 +37,7 @@ pub enum EngineType { Memory, Hdfs, Gcs, + OpendalS3, Obs, Oss, Webhdfs, @@ -189,6 +190,7 @@ impl ObjectStore for OpendalObjectStore { match self.engine_type { EngineType::Memory => "Memory", EngineType::Hdfs => "Hdfs", + EngineType::OpendalS3 => "OpendalS3", EngineType::Gcs => "Gcs", EngineType::Obs => "Obs", EngineType::Oss => "Oss", diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index c6b493fc1213..93ff0ce2d0eb 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use opendal::{layers::{LoggingLayer, RetryLayer}, services::S3}; -use opendal::services::Gcs; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; use opendal::Operator; use super::{EngineType, OpendalObjectStore}; @@ -22,44 +22,29 @@ use crate::object::ObjectResult; impl OpendalObjectStore { /// create opendal gcs engine. pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult { - // Create gcs backend builder. - let mut builder = Gcs::default(); - - builder.bucket(&bucket); - - builder.root(&root); - // Create s3 builder. let mut builder = S3::default(); builder.bucket(&bucket); - builder.region(&s3_properties.region_name); + builder.root(&root); - if let Some(endpoint_url) = s3_properties.endpoint_url { + if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { builder.endpoint(&endpoint_url); } - if let Some(access) = s3_properties.access { + if let Ok(access) = std::env::var("AWS_ACCESS_KEY_ID") { builder.access_key_id(&access); } else { - tracing::error!( - "access key id of aws s3 is not set, bucket {}", - s3_properties.bucket_name - ); + tracing::error!("access key id of aws s3 is not set, bucket {}", bucket); } - if let Some(secret) = s3_properties.secret { + if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") { builder.secret_access_key(&secret); } else { - tracing::error!( - "secret access key of aws s3 is not set, bucket {}", - s3_properties.bucket_name - ); + tracing::error!("secret access key of aws s3 is not set, bucket {}", bucket); } - builder.enable_virtual_host_style(); - - if let Some(assume_role) = assume_role { - builder.role_arn(&assume_role); + if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() { + builder.enable_virtual_host_style(); } builder.disable_config_load(); @@ -69,7 +54,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Gcs, + engine_type: EngineType::OpendalS3, }) } } diff --git a/src/risedevtool/src/task/utils.rs b/src/risedevtool/src/task/utils.rs index e4a6322c2858..0e563774e577 100644 --- a/src/risedevtool/src/task/utils.rs +++ b/src/risedevtool/src/task/utils.rs @@ -118,6 +118,10 @@ pub fn add_hummock_backend( cmd.arg("--state-store") .arg(format!("hummock+hdfs://{}", opendal.namenode)); } + else if opendal.engine == "opendal_s3"{ + cmd.arg("--state-store") + .arg(format!("hummock+opendal_s3://{}", opendal.bucket)); + } else if opendal.engine == "gcs"{ cmd.arg("--state-store") .arg(format!("hummock+gcs://{}", opendal.bucket)); From 95ae8460bba4c0d7fee8c28511724440d4efc39d Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 8 Jan 2024 15:15:06 +0800 Subject: [PATCH 03/10] update Copyright --- src/object_store/src/object/opendal_engine/opendal_s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 93ff0ce2d0eb..af940bedd923 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -1,4 +1,4 @@ -// Copyright 2023 RisingWave Labs +// Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From c2d8280388aeaec9b36599d37d675e270d125e41 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 8 Jan 2024 15:21:20 +0800 Subject: [PATCH 04/10] typo --- src/object_store/src/object/opendal_engine/opendal_s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index af940bedd923..6b805bf9217d 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -20,7 +20,7 @@ use super::{EngineType, OpendalObjectStore}; use crate::object::ObjectResult; impl OpendalObjectStore { - /// create opendal gcs engine. + /// create opendal s3 engine. pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult { // Create s3 builder. let mut builder = S3::default(); From c89867829644bd5531ed5b7da47bb40c2f013fb0 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 19 Jan 2024 13:49:27 +0800 Subject: [PATCH 05/10] add region --- src/object_store/src/object/opendal_engine/opendal_s3.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 6b805bf9217d..6257560c1c4c 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -27,10 +27,17 @@ impl OpendalObjectStore { builder.bucket(&bucket); builder.root(&root); + // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field. if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { builder.endpoint(&endpoint_url); } + if let Ok(region) = std::env::var("AWS_REGION") { + builder.region(®ion); + } else { + tracing::error!("aws s3 region is not set, bucket {}", bucket); + } + if let Ok(access) = std::env::var("AWS_ACCESS_KEY_ID") { builder.access_key_id(&access); } else { From b990bad7939d2f078f1f476f0671f86b1ed4cd23 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 23 Jan 2024 18:57:27 +0800 Subject: [PATCH 06/10] use env to decide whether using opendal s3 --- risedev.yml | 13 ----------- src/object_store/src/object/mod.rs | 35 ++++++++++++++++-------------- src/risedevtool/src/task/utils.rs | 4 ---- 3 files changed, 19 insertions(+), 33 deletions(-) diff --git a/risedev.yml b/risedev.yml index 97a8d0bd48b0..17076e4f2da8 100644 --- a/risedev.yml +++ b/risedev.yml @@ -191,19 +191,6 @@ profile: - use: compactor # - use: prometheus # - use: grafana - opendal_s3: - steps: - # - use: etcd - - use: meta-node - - use: compute-node - - use: frontend - # If you want to use google cloud storage as storage backend, configure bucket name: - - use: opendal - engine: opendal_s3 - bucket: bucket-name - - use: compactor - # - use: prometheus - # - use: grafana obs: steps: # - use: etcd diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 8b7dac6d9037..19f5a5a70c82 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -786,23 +786,26 @@ pub async fn build_remote_object_store( config: ObjectStoreConfig, ) -> ObjectStoreImpl { match url { - s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3( - S3ObjectStore::new_with_config( - s3.strip_prefix("s3://").unwrap().to_string(), - metrics.clone(), - config, - ) - .await - .monitored(metrics), - ), - opendal_s3 if opendal_s3.starts_with("opendal_s3://") => { - let opendal_s3 = opendal_s3.strip_prefix("opendal_s3://").unwrap(); - let (bucket, root) = opendal_s3.split_once('@').unwrap_or((opendal_s3, "")); - ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string()) - .unwrap() + s3 if s3.starts_with("s3://") => { + if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() { + let s3 = s3.strip_prefix("s3://").unwrap(); + let (bucket, root) = s3.split_once('@').unwrap_or((s3, "")); + ObjectStoreImpl::Opendal( + OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string()) + .unwrap() + .monitored(metrics), + ) + } else { + ObjectStoreImpl::S3( + S3ObjectStore::new_with_config( + s3.strip_prefix("s3://").unwrap().to_string(), + metrics.clone(), + config, + ) + .await .monitored(metrics), - ) + ) + } } #[cfg(feature = "hdfs-backend")] hdfs if hdfs.starts_with("hdfs://") => { diff --git a/src/risedevtool/src/task/utils.rs b/src/risedevtool/src/task/utils.rs index 0e563774e577..e4a6322c2858 100644 --- a/src/risedevtool/src/task/utils.rs +++ b/src/risedevtool/src/task/utils.rs @@ -118,10 +118,6 @@ pub fn add_hummock_backend( cmd.arg("--state-store") .arg(format!("hummock+hdfs://{}", opendal.namenode)); } - else if opendal.engine == "opendal_s3"{ - cmd.arg("--state-store") - .arg(format!("hummock+opendal_s3://{}", opendal.bucket)); - } else if opendal.engine == "gcs"{ cmd.arg("--state-store") .arg(format!("hummock+gcs://{}", opendal.bucket)); From 129593199bfa70d0a10f08dbad9347c9c1d74f68 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 1 Feb 2024 17:16:15 +0800 Subject: [PATCH 07/10] enhance opendal s3 --- Cargo.lock | 20 ++----------- src/object_store/Cargo.toml | 2 +- src/object_store/src/object/mod.rs | 30 ++++++------------- .../opendal_engine/opendal_object_store.rs | 6 +++- .../src/object/opendal_engine/opendal_s3.rs | 22 ++++++++++++-- 5 files changed, 37 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b73909cc979..5f82db1e78ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -683,19 +683,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "async-compat" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b48b4ff0c2026db683dea961cd8ea874737f56cffca86fa84415eaddc51c00d" -dependencies = [ - "futures-core", - "futures-io", - "once_cell", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-compression" version = "0.4.5" @@ -6646,12 +6633,11 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.44.0" +version = "0.44.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66" +checksum = "4af824652d4d2ffabf606d337a071677ae621b05622adf35df9562f69d9b4498" dependencies = [ "anyhow", - "async-compat", "async-trait", "backon", "base64 0.21.4", @@ -6664,9 +6650,7 @@ dependencies = [ "log", "md-5", "once_cell", - "parking_lot 0.12.1", "percent-encoding", - "pin-project", "prometheus", "quick-xml 0.30.0", "reqsign", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 61f526511c3a..d1db63c5fdb3 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -27,7 +27,7 @@ hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = "0.12" madsim = "0.2.22" -opendal = "0.44" +opendal = "0.44.2" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } rustls = "0.21.8" diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 7ccee1cb5068..5399b6d253b2 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -818,27 +818,15 @@ pub async fn build_remote_object_store( config: ObjectStoreConfig, ) -> ObjectStoreImpl { match url { - s3 if s3.starts_with("s3://") => { - if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() { - let s3 = s3.strip_prefix("s3://").unwrap(); - let (bucket, root) = s3.split_once('@').unwrap_or((s3, "")); - ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string()) - .unwrap() - .monitored(metrics), - ) - } else { - ObjectStoreImpl::S3( - S3ObjectStore::new_with_config( - s3.strip_prefix("s3://").unwrap().to_string(), - metrics.clone(), - config, - ) - .await - .monitored(metrics), - ) - } - } + s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3( + S3ObjectStore::new_with_config( + s3.strip_prefix("s3://").unwrap().to_string(), + metrics.clone(), + config, + ) + .await + .monitored(metrics), + ), #[cfg(feature = "hdfs-backend")] hdfs if hdfs.starts_with("hdfs://") => { let hdfs = hdfs.strip_prefix("hdfs://").unwrap(); diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 32c63ca3d214..d70bbb6c7bb9 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -208,7 +208,11 @@ pub struct OpendalStreamingUploader { } impl OpendalStreamingUploader { pub async fn new(op: Operator, path: String) -> ObjectResult { - let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?; + let writer = op + .writer_with(&path) + .concurrent(8) + .buffer(OPENDAL_BUFFER_SIZE) + .await?; Ok(Self { writer }) } } diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 6257560c1c4c..acbbf2eb6420 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -12,16 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::S3; use opendal::Operator; +use risingwave_common::config::ObjectStoreConfig; use super::{EngineType, OpendalObjectStore}; use crate::object::ObjectResult; impl OpendalObjectStore { /// create opendal s3 engine. - pub fn new_s3_engine(bucket: String, root: String) -> ObjectResult { + pub fn new_s3_engine( + bucket: String, + root: String, + object_store_config: ObjectStoreConfig, + ) -> ObjectResult { // Create s3 builder. let mut builder = S3::default(); builder.bucket(&bucket); @@ -57,7 +64,18 @@ impl OpendalObjectStore { builder.disable_config_load(); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) - .layer(RetryLayer::default()) + .layer( + RetryLayer::new() + .with_min_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_interval_ms, + )) + .with_max_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_max_delay_ms, + )) + .with_max_times(object_store_config.s3.object_store_req_retry_max_attempts) + .with_factor(1.0) + .with_jitter(), + ) .finish(); Ok(Self { op, From a270d7de2fd0b0199b0ebe50e9533eb725ae9062 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Thu, 1 Feb 2024 19:38:37 +0800 Subject: [PATCH 08/10] refactor(meta): handle all stream client request in stream rpc manager (#14844) --- src/meta/node/src/server.rs | 5 + src/meta/service/src/scale_service.rs | 1 + src/meta/src/barrier/command.rs | 34 +-- src/meta/src/barrier/mod.rs | 6 + src/meta/src/barrier/recovery.rs | 87 +++---- src/meta/src/barrier/rpc.rs | 312 +++++++++++++++++++------- src/meta/src/stream/scale.rs | 79 +++---- src/meta/src/stream/stream_manager.rs | 123 ++++------ 8 files changed, 354 insertions(+), 293 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 8f9b2f3d313a..ce7fa7985007 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -29,6 +29,7 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_common_service::tracing::TracingExtractLayer; +use risingwave_meta::barrier::StreamRpcManager; use risingwave_meta::controller::catalog::CatalogController; use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::manager::MetadataManager; @@ -525,6 +526,8 @@ pub async fn start_service_as_election_leader( let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(); let mut sub_tasks = vec![shutdown_handle]; + let stream_rpc_manager = StreamRpcManager::new(env.clone()); + let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, env.clone(), @@ -533,6 +536,7 @@ pub async fn start_service_as_election_leader( source_manager.clone(), sink_manager.clone(), meta_metrics.clone(), + stream_rpc_manager.clone(), ); { @@ -549,6 +553,7 @@ pub async fn start_service_as_election_leader( barrier_scheduler.clone(), source_manager.clone(), hummock_manager.clone(), + stream_rpc_manager, ) .unwrap(), ); diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 4c9e3ba2b5f7..33270fc2204f 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -53,6 +53,7 @@ impl ScaleServiceImpl { let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), + stream_manager.stream_rpc_manager.clone(), stream_manager.env.clone(), )); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 19a9c8b48a80..6b1b73d6ca69 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -35,9 +35,8 @@ use risingwave_pb::stream_plan::{ PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, ThrottleMutation, UpdateMutation, }; -use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; +use risingwave_pb::stream_service::WaitEpochCommitRequest; use thiserror_ext::AsReport; -use uuid::Uuid; use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo}; use super::trace::TracedEpoch; @@ -739,27 +738,16 @@ impl CommandContext { /// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands. async fn clean_up(&self, actors: Vec) -> MetaResult<()> { - let futures = self.info.node_map.values().map(|node| { - let request_id = Uuid::new_v4().to_string(); - let actor_ids = actors.clone(); - - async move { - let client = self - .barrier_manager_context - .env - .stream_client_pool() - .get(node) - .await?; - let request = DropActorsRequest { - request_id, - actor_ids, - }; - client.drop_actors(request).await - } - }); - - try_join_all(futures).await?; - Ok(()) + self.barrier_manager_context + .stream_rpc_manager + .drop_actors( + &self.info.node_map, + self.info + .node_map + .keys() + .map(|worker_id| (*worker_id, actors.clone())), + ) + .await } pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index c6d57b01dca5..8536a611aafa 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -71,6 +71,7 @@ mod state; mod trace; pub use self::command::{Command, ReplaceTablePlan, Reschedule}; +pub use self::rpc::StreamRpcManager; pub use self::schedule::BarrierScheduler; pub use self::trace::TracedEpoch; @@ -149,6 +150,8 @@ pub struct GlobalBarrierManagerContext { metrics: Arc, + stream_rpc_manager: StreamRpcManager, + env: MetaSrvEnv, } @@ -381,6 +384,7 @@ impl GlobalBarrierManager { source_manager: SourceManagerRef, sink_manager: SinkCoordinatorManager, metrics: Arc, + stream_rpc_manager: StreamRpcManager, ) -> Self { let enable_recovery = env.opts.enable_recovery; let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; @@ -397,6 +401,7 @@ impl GlobalBarrierManager { let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), + stream_rpc_manager.clone(), env.clone(), )); @@ -409,6 +414,7 @@ impl GlobalBarrierManager { sink_manager, metrics, tracker: Arc::new(Mutex::new(tracker)), + stream_rpc_manager, env: env.clone(), }; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 37c270da30c7..0f8ecd027dcd 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -17,9 +17,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::{anyhow, Context}; -use futures::future::try_join_all; -use futures::stream::FuturesUnordered; -use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; @@ -29,14 +26,10 @@ use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; -use risingwave_pb::stream_service::{ - BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest, -}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, warn, Instrument}; -use uuid::Uuid; use super::TracedEpoch; use crate::barrier::command::CommandContext; @@ -979,31 +972,19 @@ impl GlobalBarrierManagerContext { return Err(anyhow!("actors dropped during update").into()); } - info.actor_map.iter().map(|(node_id, actors)| { - let node_actors = all_node_actors.remove(node_id).unwrap_or_default(); - let node = info.node_map.get(node_id).unwrap(); - let actor_infos = actor_infos.clone(); - - async move { - let client = self.env.stream_client_pool().get(node).await?; - client - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { - info: actor_infos, - }) - .await?; - - let request_id = Uuid::new_v4().to_string(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "update actors"); - client - .update_actors(UpdateActorsRequest { - request_id, - actors: node_actors, - }) - .await?; - - Ok(()) as MetaResult<()> - } - }).collect::>().try_collect::<()>().await?; + self.stream_rpc_manager + .broadcast_update_actor_info( + &info.node_map, + info.actor_map.keys().cloned(), + actor_infos.into_iter(), + info.actor_map.keys().map(|node_id| { + ( + *node_id, + all_node_actors.remove(node_id).unwrap_or_default(), + ) + }), + ) + .await?; Ok(()) } @@ -1015,26 +996,14 @@ impl GlobalBarrierManagerContext { return Ok(()); } - info.actor_map - .iter() - .map(|(node_id, actors)| async move { - let actors = actors.iter().cloned().collect(); - let node = info.node_map.get(node_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - - let request_id = Uuid::new_v4().to_string(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors"); - client - .build_actors(BuildActorsRequest { - request_id, - actor_id: actors, - }) - .await?; - - Ok(()) as MetaResult<_> - }) - .collect::>() - .try_collect::<()>() + self.stream_rpc_manager + .build_actors( + &info.node_map, + info.actor_map.iter().map(|(node_id, actors)| { + let actors = actors.iter().cloned().collect(); + (*node_id, actors) + }), + ) .await?; Ok(()) @@ -1042,17 +1011,11 @@ impl GlobalBarrierManagerContext { /// Reset all compute nodes by calling `force_stop_actors`. async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { - let futures = info.node_map.values().map(|worker_node| async move { - let client = self.env.stream_client_pool().get(worker_node).await?; - debug!(worker = ?worker_node.id, "force stop actors"); - client - .force_stop_actors(ForceStopActorsRequest { - request_id: Uuid::new_v4().to_string(), - }) - .await - }); + debug!(worker = ?info.node_map.keys().collect_vec(), "force stop actors"); + self.stream_rpc_manager + .force_stop_actors(info.node_map.values()) + .await?; - try_join_all(futures).await?; debug!("all compute nodes have been reset."); Ok(()) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 55c9fce4c408..670ee7cf1092 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::future::Future; -use std::ops::Deref; use std::sync::Arc; use anyhow::anyhow; @@ -24,10 +23,16 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; +use risingwave_common::hash::ActorId; use risingwave_common::util::tracing::TracingContext; -use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; -use risingwave_pb::stream_service::{BarrierCompleteRequest, InjectBarrierRequest}; -use risingwave_rpc_client::StreamClientPoolRef; +use risingwave_pb::common::{ActorInfo, WorkerNode}; +use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor}; +use risingwave_pb::stream_service::{ + BarrierCompleteRequest, BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, + ForceStopActorsRequest, InjectBarrierRequest, UpdateActorsRequest, +}; +use risingwave_rpc_client::error::RpcError; +use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; use tokio::sync::oneshot; use uuid::Uuid; @@ -77,17 +82,21 @@ impl GlobalBarrierManagerContext { ) -> BarrierCompletionFuture { let (tx, rx) = oneshot::channel(); let prev_epoch = command_context.prev_epoch.value().0; - let result = self.inject_barrier_inner(command_context.clone()).await; + let result = self + .stream_rpc_manager + .inject_barrier(command_context.clone()) + .await; match result { Ok(node_need_collect) => { // todo: the collect handler should be abort when recovery. - tokio::spawn(Self::collect_barrier( - self.env.clone(), - node_need_collect, - self.env.stream_client_pool_ref(), - command_context, - tx, - )); + tokio::spawn({ + let stream_rpc_manager = self.stream_rpc_manager.clone(); + async move { + stream_rpc_manager + .collect_barrier(node_need_collect, command_context, tx) + .await + } + }); } Err(e) => { let _ = tx.send(BarrierCompletion { @@ -104,9 +113,11 @@ impl GlobalBarrierManagerContext { }, }) } +} +impl StreamRpcManager { /// Send inject-barrier-rpc to stream service and wait for its response before returns. - async fn inject_barrier_inner( + async fn inject_barrier( &self, command_context: Arc, ) -> MetaResult> { @@ -114,38 +125,44 @@ impl GlobalBarrierManagerContext { let mutation = command_context.to_mutation().await?; let info = command_context.info.clone(); let mut node_need_collect = HashMap::new(); - let inject_futures = info.node_map.iter().filter_map(|(node_id, node)| { - let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); - let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); - if actor_ids_to_collect.is_empty() { - // No need to send or collect barrier for this node. - assert!(actor_ids_to_send.is_empty()); - node_need_collect.insert(*node_id, false); - None - } else { - node_need_collect.insert(*node_id, true); - let mutation = mutation.clone(); - let request_id = Uuid::new_v4().to_string(); - let barrier = Barrier { - epoch: Some(risingwave_pb::data::Epoch { - curr: command_context.curr_epoch.value().0, - prev: command_context.prev_epoch.value().0, - }), - mutation: mutation.clone().map(|_| BarrierMutation { mutation }), - tracing_context: TracingContext::from_span(command_context.curr_epoch.span()) + self.make_request( + info.node_map.iter().filter_map(|(node_id, node)| { + let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); + let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); + if actor_ids_to_collect.is_empty() { + // No need to send or collect barrier for this node. + assert!(actor_ids_to_send.is_empty()); + node_need_collect.insert(*node_id, false); + None + } else { + node_need_collect.insert(*node_id, true); + let mutation = mutation.clone(); + let barrier = Barrier { + epoch: Some(risingwave_pb::data::Epoch { + curr: command_context.curr_epoch.value().0, + prev: command_context.prev_epoch.value().0, + }), + mutation: mutation.clone().map(|_| BarrierMutation { mutation }), + tracing_context: TracingContext::from_span( + command_context.curr_epoch.span(), + ) .to_protobuf(), - kind: command_context.kind as i32, - passed_actors: vec![], - }; - async move { - let client = self.env.stream_client_pool().get(node).await?; - - let request = InjectBarrierRequest { - request_id, - barrier: Some(barrier), - actor_ids_to_send, - actor_ids_to_collect, + kind: command_context.kind as i32, + passed_actors: vec![], }; + Some(( + node, + InjectBarrierRequest { + request_id: Self::new_request_id(), + barrier: Some(barrier), + actor_ids_to_send, + actor_ids_to_collect, + }, + )) + } + }), + |client, request| { + async move { tracing::debug!( target: "events::meta::barrier::inject_barrier", ?request, "inject barrier request" @@ -154,10 +171,10 @@ impl GlobalBarrierManagerContext { // This RPC returns only if this worker node has injected this barrier. client.inject_barrier(request).await } - .into() - } - }); - try_join_all(inject_futures).await.inspect_err(|e| { + }, + ) + .await + .inspect_err(|e| { // Record failure in event log. use risingwave_pb::meta::event_log; use thiserror_ext::AsReport; @@ -175,9 +192,8 @@ impl GlobalBarrierManagerContext { /// Send barrier-complete-rpc and wait for responses from all CNs async fn collect_barrier( - env: MetaSrvEnv, + &self, node_need_collect: HashMap, - client_pool_ref: StreamClientPoolRef, command_context: Arc, barrier_complete_tx: oneshot::Sender, ) { @@ -186,34 +202,34 @@ impl GlobalBarrierManagerContext { TracingContext::from_span(command_context.prev_epoch.span()).to_protobuf(); let info = command_context.info.clone(); - let client_pool = client_pool_ref.deref(); - let collect_futures = info.node_map.iter().filter_map(|(node_id, node)| { - if !*node_need_collect.get(node_id).unwrap() { - // No need to send or collect barrier for this node. - None - } else { - let request_id = Uuid::new_v4().to_string(); - let tracing_context = tracing_context.clone(); - async move { - let client = client_pool.get(node).await?; - let request = BarrierCompleteRequest { - request_id, - prev_epoch, - tracing_context, - }; - tracing::debug!( - target: "events::meta::barrier::barrier_complete", - ?request, "barrier complete" - ); + let result = self + .broadcast( + info.node_map.iter().filter_map(|(node_id, node)| { + if !*node_need_collect.get(node_id).unwrap() { + // No need to send or collect barrier for this node. + None + } else { + Some(node) + } + }), + |client| { + let tracing_context = tracing_context.clone(); + async move { + let request = BarrierCompleteRequest { + request_id: Self::new_request_id(), + prev_epoch, + tracing_context, + }; + tracing::debug!( + target: "events::meta::barrier::barrier_complete", + ?request, "barrier complete" + ); - // This RPC returns only if this worker node has collected this barrier. - client.barrier_complete(request).await - } - .into() - } - }); - - let result = try_join_all(collect_futures) + // This RPC returns only if this worker node has collected this barrier. + client.barrier_complete(request).await + } + }, + ) .await .inspect_err(|e| { // Record failure in event log. @@ -224,7 +240,8 @@ impl GlobalBarrierManagerContext { cur_epoch: command_context.curr_epoch.value().0, error: e.to_report_string(), }; - env.event_log_manager_ref() + self.env + .event_log_manager_ref() .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); }) .map_err(Into::into); @@ -233,3 +250,144 @@ impl GlobalBarrierManagerContext { .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); } } + +#[derive(Clone)] +pub struct StreamRpcManager { + env: MetaSrvEnv, +} + +impl StreamRpcManager { + pub fn new(env: MetaSrvEnv) -> Self { + Self { env } + } + + async fn make_request> + 'static>( + &self, + request: impl Iterator, + f: impl Fn(StreamClient, REQ) -> Fut, + ) -> MetaResult> { + let pool = self.env.stream_client_pool(); + let f = &f; + Ok(try_join_all(request.map(|(node, input)| async move { + let client = pool.get(node).await?; + f(client, input).await + })) + .await?) + } + + async fn broadcast> + 'static>( + &self, + nodes: impl Iterator, + f: impl Fn(StreamClient) -> Fut, + ) -> MetaResult> { + self.make_request(nodes.map(|node| (node, ())), |client, ()| f(client)) + .await + } + + fn new_request_id() -> String { + Uuid::new_v4().to_string() + } + + pub async fn build_actors( + &self, + node_map: &HashMap, + node_actors: impl Iterator)>, + ) -> MetaResult<()> { + self.make_request( + node_actors.map(|(worker_id, actors)| (node_map.get(&worker_id).unwrap(), actors)), + |client, actors| async move { + let request_id = Self::new_request_id(); + tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors"); + client + .build_actors(BuildActorsRequest { + request_id, + actor_id: actors, + }) + .await + }, + ) + .await?; + Ok(()) + } + + /// Broadcast and update actor info in CN. + /// `node_actors_to_create` must be a subset of `broadcast_worker_ids`. + pub async fn broadcast_update_actor_info( + &self, + worker_nodes: &HashMap, + broadcast_worker_ids: impl Iterator, + actor_infos_to_broadcast: impl Iterator, + node_actors_to_create: impl Iterator)>, + ) -> MetaResult<()> { + let actor_infos = actor_infos_to_broadcast.collect_vec(); + let mut node_actors_to_create = node_actors_to_create.collect::>(); + self.make_request( + broadcast_worker_ids + .map(|worker_id| { + let node = worker_nodes.get(&worker_id).unwrap(); + let actors = node_actors_to_create.remove(&worker_id); + (node, actors) + }), + |client, actors| { + let info = actor_infos.clone(); + async move { + client + .broadcast_actor_info_table(BroadcastActorInfoTableRequest { info }) + .await?; + if let Some(actors) = actors { + let request_id = Self::new_request_id(); + let actor_ids = actors.iter().map(|actor| actor.actor_id).collect_vec(); + tracing::debug!(request_id = request_id.as_str(), actors = ?actor_ids, "update actors"); + client + .update_actors(UpdateActorsRequest { request_id, actors }) + .await?; + } + Ok(()) + } + }, + ) + .await?; + assert!( + node_actors_to_create.is_empty(), + "remaining uncreated actors: {:?}", + node_actors_to_create + ); + Ok(()) + } + + pub async fn drop_actors( + &self, + node_map: &HashMap, + node_actors: impl Iterator)>, + ) -> MetaResult<()> { + self.make_request( + node_actors + .map(|(worker_id, actor_ids)| (node_map.get(&worker_id).unwrap(), actor_ids)), + |client, actor_ids| async move { + client + .drop_actors(DropActorsRequest { + request_id: Self::new_request_id(), + actor_ids, + }) + .await + }, + ) + .await?; + Ok(()) + } + + pub async fn force_stop_actors( + &self, + nodes: impl Iterator, + ) -> MetaResult<()> { + self.broadcast(nodes, |client| async move { + client + .force_stop_actors(ForceStopActorsRequest { + request_id: Self::new_request_id(), + }) + .await + }) + .await?; + Ok(()) + } +} diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 3a21c812086f..2bd711b0a9d3 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -39,17 +39,13 @@ use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment}; use risingwave_pb::meta::FragmentParallelUnitMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode}; -use risingwave_pb::stream_service::{ - BroadcastActorInfoTableRequest, BuildActorsRequest, UpdateActorsRequest, -}; use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; -use uuid::Uuid; -use crate::barrier::{Command, Reschedule}; +use crate::barrier::{Command, Reschedule, StreamRpcManager}; use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::serving::{ @@ -375,6 +371,8 @@ pub struct ScaleController { pub source_manager: SourceManagerRef, + pub stream_rpc_manager: StreamRpcManager, + pub env: MetaSrvEnv, } @@ -382,9 +380,11 @@ impl ScaleController { pub fn new( metadata_manager: &MetadataManager, source_manager: SourceManagerRef, + stream_rpc_manager: StreamRpcManager, env: MetaSrvEnv, ) -> Self { Self { + stream_rpc_manager, metadata_manager: metadata_manager.clone(), source_manager, env, @@ -693,52 +693,35 @@ impl ScaleController { async fn create_actors_on_compute_node( &self, worker_nodes: &HashMap, - actor_infos_to_broadcast: BTreeMap, + actor_infos_to_broadcast: BTreeMap, node_actors_to_create: HashMap>, - broadcast_worker_ids: HashSet, + broadcast_worker_ids: HashSet, ) -> MetaResult<()> { - for worker_id in &broadcast_worker_ids { - let node = worker_nodes.get(worker_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - - let actor_infos_to_broadcast = actor_infos_to_broadcast.values().cloned().collect(); - - client - .to_owned() - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { - info: actor_infos_to_broadcast, - }) - .await?; - } - - for (node_id, stream_actors) in &node_actors_to_create { - let node = worker_nodes.get(node_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - let request_id = Uuid::new_v4().to_string(); - let request = UpdateActorsRequest { - request_id, - actors: stream_actors.clone(), - }; - - client.to_owned().update_actors(request).await?; - } - - for (node_id, stream_actors) in node_actors_to_create { - let node = worker_nodes.get(&node_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - let request_id = Uuid::new_v4().to_string(); + self.stream_rpc_manager + .broadcast_update_actor_info( + worker_nodes, + broadcast_worker_ids.into_iter(), + actor_infos_to_broadcast.values().cloned(), + node_actors_to_create.clone().into_iter(), + ) + .await?; - client - .to_owned() - .build_actors(BuildActorsRequest { - request_id, - actor_id: stream_actors - .iter() - .map(|stream_actor| stream_actor.actor_id) - .collect(), - }) - .await?; - } + self.stream_rpc_manager + .build_actors( + worker_nodes, + node_actors_to_create + .iter() + .map(|(node_id, stream_actors)| { + ( + *node_id, + stream_actors + .iter() + .map(|stream_actor| stream_actor.actor_id) + .collect_vec(), + ) + }), + ) + .await?; Ok(()) } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 4d14f0caa282..b098800168fd 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -15,26 +15,20 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; -use futures::future::{join_all, try_join_all, BoxFuture}; -use futures::stream::FuturesUnordered; -use futures::TryStreamExt; +use futures::future::{join_all, BoxFuture}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::Dispatcher; -use risingwave_pb::stream_service::{ - BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, UpdateActorsRequest, -}; use thiserror_ext::AsReport; use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Mutex, RwLock}; use tracing::Instrument; -use uuid::Uuid; use super::{Locations, RescheduleOptions, ScaleController, ScaleControllerRef, TableResizePolicy}; -use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan}; +use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; use crate::hummock::HummockManagerRef; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob}; use crate::model::{ActorId, TableFragments, TableParallelism}; @@ -202,6 +196,8 @@ pub struct GlobalStreamManager { pub reschedule_lock: RwLock<()>, pub(crate) scale_controller: ScaleControllerRef, + + pub stream_rpc_manager: StreamRpcManager, } impl GlobalStreamManager { @@ -211,10 +207,12 @@ impl GlobalStreamManager { barrier_scheduler: BarrierScheduler, source_manager: SourceManagerRef, hummock_manager: HummockManagerRef, + stream_rpc_manager: StreamRpcManager, ) -> MetaResult { let scale_controller = Arc::new(ScaleController::new( &metadata_manager, source_manager.clone(), + stream_rpc_manager.clone(), env.clone(), )); @@ -227,6 +225,7 @@ impl GlobalStreamManager { creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), reschedule_lock: RwLock::new(()), scale_controller, + stream_rpc_manager, }) } @@ -306,28 +305,12 @@ impl GlobalStreamManager { let node_actors = table_fragments.worker_actor_ids(); let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; - let node_actors = node_actors - .into_iter() - .map(|(id, actor_ids)| { - ( - cluster_info.worker_nodes.get(&id).cloned().unwrap(), - actor_ids, - ) - }) - .collect_vec(); - let futures = node_actors.into_iter().map(|(node, actor_ids)| { - let request_id = Uuid::new_v4().to_string(); - async move { - let client = - self.env.stream_client_pool().get(&node).await?; - let request = DropActorsRequest { - request_id, - actor_ids, - }; - client.drop_actors(request).await - } - }); - try_join_all(futures).await?; + self.stream_rpc_manager + .drop_actors( + &cluster_info.worker_nodes, + node_actors.into_iter(), + ) + .await?; if let MetadataManager::V1(mgr) = &self.metadata_manager { mgr.fragment_manager @@ -381,8 +364,7 @@ impl GlobalStreamManager { // 2. all upstream actors. let actor_infos_to_broadcast = building_locations .actor_infos() - .chain(existing_locations.actor_infos()) - .collect_vec(); + .chain(existing_locations.actor_infos()); let building_worker_actors = building_locations.worker_actors(); @@ -390,57 +372,28 @@ impl GlobalStreamManager { // The first stage does 2 things: broadcast actor info, and send local actor ids to // different WorkerNodes. Such that each WorkerNode knows the overall actor // allocation, but not actually builds it. We initialize all channels in this stage. - building_worker_actors.iter().map(|(worker_id, actors)| { - let stream_actors = actors - .iter() - .map(|actor_id| actor_map[actor_id].clone()) - .collect::>(); - let worker_node = building_locations.worker_locations.get(worker_id).unwrap(); - let actor_infos_to_broadcast = &actor_infos_to_broadcast; - async move { - let client = self.env.stream_client_pool().get(worker_node).await?; - - client - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { - info: actor_infos_to_broadcast.clone(), - }) - .await?; - - let request_id = Uuid::new_v4().to_string(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "update actors"); - client - .update_actors(UpdateActorsRequest { - request_id, - actors: stream_actors.clone(), - }) - .await?; - - Ok(()) as MetaResult<_> - } - }).collect::>().try_collect::<()>().await?; + self.stream_rpc_manager + .broadcast_update_actor_info( + &building_locations.worker_locations, + building_worker_actors.keys().cloned(), + actor_infos_to_broadcast, + building_worker_actors.iter().map(|(worker_id, actors)| { + let stream_actors = actors + .iter() + .map(|actor_id| actor_map[actor_id].clone()) + .collect::>(); + (*worker_id, stream_actors) + }), + ) + .await?; // In the second stage, each [`WorkerNode`] builds local actors and connect them with // channels. - building_worker_actors - .iter() - .map(|(worker_id, actors)| async move { - let worker_node = building_locations.worker_locations.get(worker_id).unwrap(); - - let client = self.env.stream_client_pool().get(worker_node).await?; - - let request_id = Uuid::new_v4().to_string(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors"); - client - .build_actors(BuildActorsRequest { - request_id, - actor_id: actors.clone(), - }) - .await?; - - Ok(()) as MetaResult<()> - }) - .collect::>() - .try_collect::<()>() + self.stream_rpc_manager + .build_actors( + &building_locations.worker_locations, + building_worker_actors.into_iter(), + ) .await?; Ok(()) @@ -825,7 +778,7 @@ mod tests { use tonic::{Request, Response, Status}; use super::*; - use crate::barrier::GlobalBarrierManager; + use crate::barrier::{GlobalBarrierManager, StreamRpcManager}; use crate::hummock::{CompactorManager, HummockManager}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ @@ -852,7 +805,7 @@ mod tests { impl StreamService for FakeStreamService { async fn update_actors( &self, - request: Request, + request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); let mut guard = self.inner.actor_streams.lock().unwrap(); @@ -881,7 +834,7 @@ mod tests { async fn broadcast_actor_info_table( &self, - request: Request, + request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); let mut guard = self.inner.actor_infos.lock().unwrap(); @@ -1033,6 +986,8 @@ mod tests { let (sink_manager, _) = SinkCoordinatorManager::start_worker(); + let stream_rpc_manager = StreamRpcManager::new(env.clone()); + let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, env.clone(), @@ -1041,6 +996,7 @@ mod tests { source_manager.clone(), sink_manager, meta_metrics.clone(), + stream_rpc_manager.clone(), ); let stream_manager = GlobalStreamManager::new( @@ -1049,6 +1005,7 @@ mod tests { barrier_scheduler.clone(), source_manager.clone(), hummock_manager, + stream_rpc_manager, )?; let (join_handle_2, shutdown_tx_2) = GlobalBarrierManager::start(barrier_manager); From 4438b1f03fdf48492e9b47faf991899fb5a021eb Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 1 Feb 2024 23:53:26 +0800 Subject: [PATCH 09/10] rollback --- src/object_store/src/object/mod.rs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 5399b6d253b2..5a2601651c13 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -818,15 +818,27 @@ pub async fn build_remote_object_store( config: ObjectStoreConfig, ) -> ObjectStoreImpl { match url { - s3 if s3.starts_with("s3://") => ObjectStoreImpl::S3( - S3ObjectStore::new_with_config( - s3.strip_prefix("s3://").unwrap().to_string(), - metrics.clone(), - config, - ) - .await - .monitored(metrics), - ), + s3 if s3.starts_with("s3://") => { + if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() { + let s3 = s3.strip_prefix("s3://").unwrap(); + let (bucket, root) = s3.split_once('@').unwrap_or((s3, "")); + ObjectStoreImpl::Opendal( + OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string(), config) + .unwrap() + .monitored(metrics), + ) + } else { + ObjectStoreImpl::S3( + S3ObjectStore::new_with_config( + s3.strip_prefix("s3://").unwrap().to_string(), + metrics.clone(), + config, + ) + .await + .monitored(metrics), + ) + } + } #[cfg(feature = "hdfs-backend")] hdfs if hdfs.starts_with("hdfs://") => { let hdfs = hdfs.strip_prefix("hdfs://").unwrap(); From f1e88fddffea8bf1298c99b62856fec4854902d4 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 2 Feb 2024 00:40:50 +0800 Subject: [PATCH 10/10] minor --- src/object_store/src/object/mod.rs | 6 +++--- .../src/object/opendal_engine/opendal_object_store.rs | 4 ++-- src/object_store/src/object/opendal_engine/opendal_s3.rs | 5 +---- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 5a2601651c13..d9ae0bc37b86 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -820,10 +820,10 @@ pub async fn build_remote_object_store( match url { s3 if s3.starts_with("s3://") => { if std::env::var("RW_USE_OPENDAL_FOR_S3").is_ok() { - let s3 = s3.strip_prefix("s3://").unwrap(); - let (bucket, root) = s3.split_once('@').unwrap_or((s3, "")); + let bucket = s3.strip_prefix("s3://").unwrap(); + ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine(bucket.to_string(), root.to_string(), config) + OpendalObjectStore::new_s3_engine(bucket.to_string(), config) .unwrap() .monitored(metrics), ) diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index d70bbb6c7bb9..f3fe072daea6 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -38,7 +38,7 @@ pub enum EngineType { Memory, Hdfs, Gcs, - OpendalS3, + S3, Obs, Oss, Webhdfs, @@ -191,7 +191,7 @@ impl ObjectStore for OpendalObjectStore { match self.engine_type { EngineType::Memory => "Memory", EngineType::Hdfs => "Hdfs", - EngineType::OpendalS3 => "OpendalS3", + EngineType::S3 => "S3", EngineType::Gcs => "Gcs", EngineType::Obs => "Obs", EngineType::Oss => "Oss", diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index acbbf2eb6420..425d0a757669 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -26,13 +26,11 @@ impl OpendalObjectStore { /// create opendal s3 engine. pub fn new_s3_engine( bucket: String, - root: String, object_store_config: ObjectStoreConfig, ) -> ObjectResult { // Create s3 builder. let mut builder = S3::default(); builder.bucket(&bucket); - builder.root(&root); // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field. if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") { @@ -61,7 +59,6 @@ impl OpendalObjectStore { builder.enable_virtual_host_style(); } - builder.disable_config_load(); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer( @@ -79,7 +76,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::OpendalS3, + engine_type: EngineType::S3, }) } }