From a0574b7466f5f4a28f8bcbcb660b2060a922094a Mon Sep 17 00:00:00 2001 From: Kevin Axel Date: Tue, 30 Jan 2024 19:13:50 +0800 Subject: [PATCH 1/2] feat(test): add ObjectStore trait simulator support (#14545) Signed-off-by: Kevin Axel --- Cargo.lock | 2 + src/object_store/Cargo.toml | 1 + src/object_store/src/object/error.rs | 14 + src/object_store/src/object/mod.rs | 27 ++ src/object_store/src/object/sim/client.rs | 48 ++++ src/object_store/src/object/sim/error.rs | 39 +++ src/object_store/src/object/sim/mod.rs | 254 ++++++++++++++++++ src/object_store/src/object/sim/rpc_server.rs | 57 ++++ src/object_store/src/object/sim/service.rs | 151 +++++++++++ src/tests/simulation/Cargo.toml | 1 + src/tests/simulation/src/cluster.rs | 35 +-- 11 files changed, 612 insertions(+), 17 deletions(-) create mode 100644 src/object_store/src/object/sim/client.rs create mode 100644 src/object_store/src/object/sim/error.rs create mode 100644 src/object_store/src/object/sim/mod.rs create mode 100644 src/object_store/src/object/sim/rpc_server.rs create mode 100644 src/object_store/src/object/sim/service.rs diff --git a/Cargo.lock b/Cargo.lock index fffd5b98e4807..5b73909cc9797 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9610,6 +9610,7 @@ dependencies = [ "hyper-rustls", "hyper-tls", "itertools 0.12.0", + "madsim", "madsim-aws-sdk-s3", "madsim-tokio", "opendal", @@ -9775,6 +9776,7 @@ dependencies = [ "risingwave_frontend", "risingwave_hummock_sdk", "risingwave_meta_node", + "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "risingwave_sqlparser", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 95354557dd943..61f526511c3ab 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -26,6 +26,7 @@ hyper = { version = "0.14", features = ["tcp", "client"] } hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = "0.12" +madsim = "0.2.22" opendal = "0.44" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index f1c18bb391188..d4927a1595fca 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -43,6 +43,9 @@ pub enum ObjectErrorInner { #[error("Internal error: {0}")] #[construct(skip)] Internal(String), + #[cfg(madsim)] + #[error(transparent)] + Sim(#[from] crate::object::sim::SimError), } impl ObjectError { @@ -80,6 +83,10 @@ impl ObjectError { ObjectErrorInner::Mem(e) => { return e.is_object_not_found_error(); } + #[cfg(madsim)] + ObjectErrorInner::Sim(e) => { + return e.is_object_not_found_error(); + } _ => {} }; false @@ -108,4 +115,11 @@ impl From for ObjectError { } } +#[cfg(madsim)] +impl From for ObjectError { + fn from(e: std::io::Error) -> Self { + ObjectErrorInner::Internal(e.to_string()).into() + } +} + pub type ObjectResult = std::result::Result; diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index ab5abe0d07478..5399b6d253b2f 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(madsim)] +pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; use std::time::Duration; @@ -39,6 +41,9 @@ pub use error::*; use object_metrics::ObjectStoreMetrics; use thiserror_ext::AsReport; +#[cfg(madsim)] +use self::sim::SimObjectStore; + pub type ObjectStoreRef = Arc; pub type ObjectStreamingUploader = MonitoredStreamingUploader; @@ -139,6 +144,8 @@ pub enum ObjectStoreImpl { InMem(MonitoredObjectStore), Opendal(MonitoredObjectStore), S3(MonitoredObjectStore), + #[cfg(madsim)] + Sim(MonitoredObjectStore), } macro_rules! dispatch_async { @@ -165,7 +172,12 @@ macro_rules! object_store_impl_method_body { ObjectStoreImpl::S3(s3) => { $dispatch_macro!(s3, $method_name, path $(, $args)*) }, + #[cfg(madsim)] + ObjectStoreImpl::Sim(in_mem) => { + $dispatch_macro!(in_mem, $method_name, path $(, $args)*) + }, } + } }; } @@ -179,6 +191,7 @@ macro_rules! object_store_impl_method_body_slice { ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $paths:expr $(, $args:expr)*) => { { let paths_rem = partition_object_store_paths($paths); + match $object_store { ObjectStoreImpl::InMem(in_mem) => { $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) @@ -189,6 +202,10 @@ macro_rules! object_store_impl_method_body_slice { ObjectStoreImpl::S3(s3) => { $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*) }, + #[cfg(madsim)] + ObjectStoreImpl::Sim(in_mem) => { + $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) + }, } } }; @@ -247,6 +264,8 @@ impl ObjectStoreImpl { ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id), ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id), + #[cfg(madsim)] + ObjectStoreImpl::Sim(store) => store.inner.get_object_prefix(obj_id), } } @@ -257,6 +276,8 @@ impl ObjectStoreImpl { store.inner.op.info().native_capability().write_can_multi } ObjectStoreImpl::S3(_) => true, + #[cfg(madsim)] + ObjectStoreImpl::Sim(_) => true, } } @@ -265,6 +286,8 @@ impl ObjectStoreImpl { ObjectStoreImpl::InMem(store) => store.recv_buffer_size(), ObjectStoreImpl::Opendal(store) => store.recv_buffer_size(), ObjectStoreImpl::S3(store) => store.recv_buffer_size(), + #[cfg(madsim)] + ObjectStoreImpl::Sim(store) => store.recv_buffer_size(), } } } @@ -896,6 +919,10 @@ pub async fn build_remote_object_store( } ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics)) } + #[cfg(madsim)] + sim if sim.starts_with("sim://") => { + ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics)) + } other => { unimplemented!( "{} remote object store only supports s3, minio, gcs, oss, cos, azure blob, hdfs, disk, memory, and memory-shared.", diff --git a/src/object_store/src/object/sim/client.rs b/src/object_store/src/object/sim/client.rs new file mode 100644 index 0000000000000..4e7aa67834a59 --- /dev/null +++ b/src/object_store/src/object/sim/client.rs @@ -0,0 +1,48 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::net::SocketAddr; + +use madsim::net::{Endpoint, Payload}; + +use super::service::{Request, Response}; +use crate::object::error::ObjectResult as Result; + +#[derive(Debug, Clone)] +pub struct Client { + addr: SocketAddr, +} + +impl Client { + pub fn new(addr: SocketAddr) -> Self { + Self { addr } + } + + pub(crate) async fn send_request(&self, req: Request) -> Result { + let resp = self.send_request_io(req).await?; + let resp = *resp + .downcast::>() + .expect("failed to downcast"); + resp + } + + async fn send_request_io(&self, req: Request) -> std::io::Result { + let addr = self.addr; + let ep = Endpoint::connect(addr).await?; + let (tx, mut rx) = ep.connect1(addr).await?; + tx.send(Box::new(req)).await?; + rx.recv().await + } +} diff --git a/src/object_store/src/object/sim/error.rs b/src/object_store/src/object/sim/error.rs new file mode 100644 index 0000000000000..29ca09f6825b7 --- /dev/null +++ b/src/object_store/src/object/sim/error.rs @@ -0,0 +1,39 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum SimError { + #[error("NotFound error: {0}")] + NotFound(String), + #[error("Other error: {0}")] + Other(String), +} + +impl SimError { + pub fn is_object_not_found_error(&self) -> bool { + matches!(self, SimError::NotFound(_)) + } +} + +impl SimError { + pub(crate) fn not_found(msg: impl ToString) -> Self { + SimError::NotFound(msg.to_string()) + } + + pub(crate) fn other(msg: impl ToString) -> Self { + SimError::Other(msg.to_string()) + } +} diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs new file mode 100644 index 0000000000000..31f34cbcd3267 --- /dev/null +++ b/src/object_store/src/object/sim/mod.rs @@ -0,0 +1,254 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod client; +mod error; +use bytes::{BufMut, BytesMut}; +pub use error::SimError; +use futures::Stream; +mod rpc_server; +pub use rpc_server::SimServer; +mod service; + +use std::net::SocketAddr; +use std::ops::{Range, RangeBounds}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use madsim::rand::{thread_rng, RngCore}; +use madsim::time::{sleep, Duration}; +use risingwave_common::range::RangeBoundsExt; + +use self::client::Client; +use self::service::Response; +use super::{ + BoxedStreamingUploader, Bytes, ObjectDataStream, ObjectError, ObjectMetadata, + ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, +}; + +pub struct SimStreamingUploader { + client: Client, + key: String, + buf: BytesMut, +} + +impl SimStreamingUploader { + fn new(client: Client, key: String) -> Self { + Self { + client, + key, + buf: Default::default(), + } + } +} + +#[async_trait::async_trait] +impl StreamingUploader for SimStreamingUploader { + async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { + self.buf.put(data); + Ok(()) + } + + async fn finish(mut self: Box) -> ObjectResult<()> { + if self.buf.is_empty() { + Err(ObjectError::internal("upload empty object")) + } else { + let resp = self + .client + .send_request(service::Request::Upload { + path: self.key, + obj: self.buf.freeze(), + }) + .await?; + let Response::Upload = resp else { + panic!("expect Response::Upload"); + }; + Ok(()) + } + } + + fn get_memory_usage(&self) -> u64 { + self.buf.len() as u64 + } +} + +pub struct SimDataIterator { + data: Bytes, + offset: usize, +} + +impl SimDataIterator { + pub fn new(data: Bytes) -> Self { + Self { data, offset: 0 } + } +} + +impl Stream for SimDataIterator { + type Item = ObjectResult; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + const MAX_PACKET_SIZE: usize = 128 * 1024; + if self.offset >= self.data.len() { + return Poll::Ready(None); + } + let read_len = std::cmp::min(self.data.len() - self.offset, MAX_PACKET_SIZE); + let data = self.data.slice(self.offset..(self.offset + read_len)); + self.offset += read_len; + Poll::Ready(Some(Ok(data))) + } +} + +pub struct SimObjectStore { + client: Client, +} + +#[async_trait::async_trait] +impl ObjectStore for SimObjectStore { + fn get_object_prefix(&self, _obj_id: u64) -> String { + String::default() + } + + async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> { + if obj.is_empty() { + Err(ObjectError::internal("upload empty object")) + } else { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Upload { path, obj }) + .await?; + if let Response::Upload = resp { + Ok(()) + } else { + panic!("expect Response::Upload"); + } + } + } + + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok(Box::new(SimStreamingUploader::new( + self.client.clone(), + path.to_string(), + ))) + } + + async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Read { path }) + .await?; + if let Response::Read(obj) = resp { + if let Some(end) = range.end() + && end > obj.len() + { + return Err(SimError::other("bad block offset and size").into()); + } + Ok(obj.slice(range)) + } else { + panic!("expect Response::Read"); + } + } + + async fn metadata(&self, path: &str) -> ObjectResult { + let path = path.to_string(); + if let Response::Metadata(m) = self + .client + .send_request(service::Request::Metadata { path }) + .await? + { + Ok(m) + } else { + panic!("expect Response::Metadata"); + } + } + + async fn streaming_read( + &self, + path: &str, + range: Range, + ) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Read { path }) + .await?; + let Response::Read(body) = resp else { + panic!("expect Response::Read"); + }; + + Ok(Box::pin(SimDataIterator::new(body.slice(range)))) + } + + async fn delete(&self, path: &str) -> ObjectResult<()> { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::Delete { path }) + .await?; + if let Response::Delete = resp { + Ok(()) + } else { + panic!("expect Response::Delete"); + } + } + + async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { + let resp = self + .client + .send_request(service::Request::DeleteObjects { + paths: paths.to_vec(), + }) + .await?; + if let Response::DeleteObjects = resp { + Ok(()) + } else { + panic!("expect Response::DeleteObjects"); + } + } + + async fn list(&self, path: &str) -> ObjectResult { + let path = path.to_string(); + let resp = self + .client + .send_request(service::Request::List { path }) + .await?; + if let Response::List(o) = resp { + Ok(Box::pin(o)) + } else { + panic!("expect Response::List"); + } + } + + fn store_media_type(&self) -> &'static str { + "sim" + } +} + +impl SimObjectStore { + pub fn new(addr: &str) -> Self { + let addr = addr.strip_prefix("sim://").unwrap(); + let (_access_key_id, rest) = addr.split_once(':').unwrap(); + let (_secret_access_key, rest) = rest.split_once('@').unwrap(); + let (address, _bucket) = rest.split_once('/').unwrap(); + + Self { + client: Client::new( + address + .parse::() + .expect(&format!("parse SockAddr failed: {}", address)), + ), + } + } +} diff --git a/src/object_store/src/object/sim/rpc_server.rs b/src/object_store/src/object/sim/rpc_server.rs new file mode 100644 index 0000000000000..068dbed5a27a4 --- /dev/null +++ b/src/object_store/src/object/sim/rpc_server.rs @@ -0,0 +1,57 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Result; +use std::net::SocketAddr; +use std::sync::Arc; + +use madsim::net::{Endpoint, Payload}; + +use super::service::{Request, SimService}; + +/// A simulated ObjectStore server. +#[derive(Default, Clone)] +pub struct SimServer {} + +impl SimServer { + pub fn builder() -> Self { + SimServer::default() + } + + pub async fn serve(self, addr: SocketAddr) -> Result<()> { + let ep = Endpoint::bind(addr).await?; + let service = SimService::new(); + let service = Arc::new(service); + + loop { + let (tx, mut rx, _) = ep.accept1().await?; + let service = service.clone(); + madsim::task::spawn(async move { + let request = *rx.recv().await?.downcast::().unwrap(); + use super::service::Request::*; + + let response: Payload = match request { + Upload { path, obj } => Box::new(service.upload(path, obj).await), + Read { path } => Box::new(service.read(path).await), + Delete { path } => Box::new(service.delete(path).await), + DeleteObjects { paths } => Box::new(service.delete_objects(paths).await), + List { path } => Box::new(service.list(path).await), + Metadata { path } => Box::new(service.metadata(path).await), + }; + tx.send(response).await?; + Ok(()) as Result<()> + }); + } + } +} diff --git a/src/object_store/src/object/sim/service.rs b/src/object_store/src/object/sim/service.rs new file mode 100644 index 0000000000000..fb03a1b13c173 --- /dev/null +++ b/src/object_store/src/object/sim/service.rs @@ -0,0 +1,151 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, VecDeque}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use bytes::Bytes; +use futures::Stream; +use itertools::Itertools; +use spin::Mutex; + +use crate::object::error::ObjectResult as Result; +use crate::object::sim::SimError; +use crate::object::{ObjectError, ObjectMetadata}; + +#[derive(Debug)] +pub(crate) enum Request { + Upload { path: String, obj: Bytes }, + Read { path: String }, + Delete { path: String }, + DeleteObjects { paths: Vec }, + List { path: String }, + Metadata { path: String }, +} + +#[derive(Debug)] +pub(crate) enum Response { + Upload, + Read(Bytes), + Delete, + DeleteObjects, + List(SimObjectIter), + Metadata(ObjectMetadata), +} + +#[derive(Debug)] +pub(crate) struct SimService { + storage: Mutex>, +} + +fn get_obj_meta(path: &str, obj: &Bytes) -> Result { + Ok(ObjectMetadata { + key: path.to_string(), + last_modified: SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(ObjectError::internal)? + .as_secs_f64(), + total_size: obj.len(), + }) +} + +impl SimService { + pub fn new() -> Self { + SimService { + storage: Mutex::new(BTreeMap::new()), + } + } + + pub async fn upload(&self, path: String, obj: Bytes) -> Result { + let metadata = get_obj_meta(&path, &obj)?; + self.storage.lock().insert(path.into(), (metadata, obj)); + Ok(Response::Upload) + } + + pub async fn read(&self, path: String) -> Result { + let storage = self.storage.lock(); + let obj = storage + .get(&path) + .map(|(_, o)| o) + .ok_or_else(|| SimError::NotFound(format!("no object at path '{}'", path)))?; + Ok(Response::Read(obj.clone())) + } + + pub async fn delete(&self, path: String) -> Result { + self.storage.lock().remove(&path); + Ok(Response::Delete) + } + + pub async fn delete_objects(&self, paths: Vec) -> Result { + for path in paths { + self.storage.lock().remove(&path); + } + Ok(Response::DeleteObjects) + } + + pub async fn list(&self, prefix: String) -> Result { + if prefix.is_empty() { + return Ok(Response::List(SimObjectIter::new(vec![]))); + } + + let mut scan_end = prefix.chars().collect_vec(); + let next = *scan_end.last().unwrap() as u8 + 1; + *scan_end.last_mut().unwrap() = next as char; + let scan_end = scan_end.into_iter().collect::(); + + let list_result = self + .storage + .lock() + .range(prefix..scan_end) + .map(|(_, (o, _))| o.clone()) + .collect_vec(); + Ok(Response::List(SimObjectIter::new(list_result))) + } + + pub async fn metadata(&self, path: String) -> Result { + self.storage + .lock() + .get(&path) + .map(|(metadata, _)| metadata) + .cloned() + .ok_or_else(|| SimError::not_found(format!("no object at path '{}'", path)).into()) + .map(|meta| Response::Metadata(meta)) + } +} + +#[derive(Debug)] +pub(crate) struct SimObjectIter { + list_result: VecDeque, +} + +impl SimObjectIter { + fn new(list_result: Vec) -> Self { + Self { + list_result: list_result.into(), + } + } +} + +impl Stream for SimObjectIter { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Some(i) = self.list_result.pop_front() { + return Poll::Ready(Some(Ok(i))); + } + Poll::Ready(None) + } +} diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 6ce0c7ed05232..dbde0d5d46650 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -42,6 +42,7 @@ risingwave_expr_impl = { workspace = true } risingwave_frontend = { workspace = true } risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_meta_node = { workspace = true } +risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index a81a770bf0bd9..dd591954d945a 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -32,6 +32,8 @@ use itertools::Itertools; use madsim::runtime::{Handle, NodeHandle}; use rand::seq::IteratorRandom; use rand::Rng; +#[cfg(madsim)] +use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer; use risingwave_pb::common::WorkerNode; use sqllogictest::AsyncDB; #[cfg(not(madsim))] @@ -297,18 +299,18 @@ metrics_level = "Disabled" /// /// # Nodes /// -/// | Name | IP | -/// | -------------- | ------------- | -/// | meta-x | 192.168.1.x | -/// | frontend-x | 192.168.2.x | -/// | compute-x | 192.168.3.x | -/// | compactor-x | 192.168.4.x | -/// | etcd | 192.168.10.1 | -/// | kafka-broker | 192.168.11.1 | -/// | kafka-producer | 192.168.11.2 | -/// | s3 | 192.168.12.1 | -/// | client | 192.168.100.1 | -/// | ctl | 192.168.101.1 | +/// | Name | IP | +/// | ---------------- | ------------- | +/// | meta-x | 192.168.1.x | +/// | frontend-x | 192.168.2.x | +/// | compute-x | 192.168.3.x | +/// | compactor-x | 192.168.4.x | +/// | etcd | 192.168.10.1 | +/// | kafka-broker | 192.168.11.1 | +/// | kafka-producer | 192.168.11.2 | +/// | object_store_sim | 192.168.12.1 | +/// | client | 192.168.100.1 | +/// | ctl | 192.168.101.1 | pub struct Cluster { config: Configuration, handle: Handle, @@ -385,14 +387,13 @@ impl Cluster { }) .build(); - // s3 + // object_store_sim handle .create_node() - .name("s3") + .name("object_store_sim") .ip("192.168.12.1".parse().unwrap()) .init(move || async move { - aws_sdk_s3::server::SimServer::default() - .with_bucket("hummock001") + ObjectStoreSimServer::builder() .serve("0.0.0.0:9301".parse().unwrap()) .await }) @@ -422,7 +423,7 @@ impl Cluster { "--etcd-endpoints", "etcd:2388", "--state-store", - "hummock+minio://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", + "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001", "--data-directory", "hummock_001", ]); From 5db21ecaa399365f0a90a20ecd80edfc3338d97f Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 30 Jan 2024 19:51:54 +0800 Subject: [PATCH 2/2] feat: handle streaming control rpc in worker loop (#14737) --- src/compute/src/rpc/service/config_service.rs | 4 +- .../src/rpc/service/exchange_service.rs | 4 +- .../src/rpc/service/monitor_service.rs | 5 +- src/compute/src/rpc/service/stream_service.rs | 42 +-- src/compute/src/server.rs | 21 +- src/meta/src/stream/stream_manager.rs | 1 - src/stream/src/task/barrier_manager.rs | 276 ++++++++++++--- .../src/task/barrier_manager/progress.rs | 2 +- src/stream/src/task/stream_manager.rs | 317 ++++++++---------- src/utils/futures_util/src/misc.rs | 43 +++ 10 files changed, 443 insertions(+), 272 deletions(-) diff --git a/src/compute/src/rpc/service/config_service.rs b/src/compute/src/rpc/service/config_service.rs index da11949230306..c1df5e40aeb94 100644 --- a/src/compute/src/rpc/service/config_service.rs +++ b/src/compute/src/rpc/service/config_service.rs @@ -23,7 +23,7 @@ use tonic::{Code, Request, Response, Status}; pub struct ConfigServiceImpl { batch_mgr: Arc, - stream_mgr: Arc, + stream_mgr: LocalStreamManager, } #[async_trait::async_trait] @@ -46,7 +46,7 @@ impl ConfigService for ConfigServiceImpl { } impl ConfigServiceImpl { - pub fn new(batch_mgr: Arc, stream_mgr: Arc) -> Self { + pub fn new(batch_mgr: Arc, stream_mgr: LocalStreamManager) -> Self { Self { batch_mgr, stream_mgr, diff --git a/src/compute/src/rpc/service/exchange_service.rs b/src/compute/src/rpc/service/exchange_service.rs index 58a39fd5b56db..792ef0bfb2149 100644 --- a/src/compute/src/rpc/service/exchange_service.rs +++ b/src/compute/src/rpc/service/exchange_service.rs @@ -38,7 +38,7 @@ const BATCH_EXCHANGE_BUFFER_SIZE: usize = 1024; #[derive(Clone)] pub struct ExchangeServiceImpl { batch_mgr: Arc, - stream_mgr: Arc, + stream_mgr: LocalStreamManager, metrics: Arc, } @@ -128,7 +128,7 @@ impl ExchangeService for ExchangeServiceImpl { impl ExchangeServiceImpl { pub fn new( mgr: Arc, - stream_mgr: Arc, + stream_mgr: LocalStreamManager, metrics: Arc, ) -> Self { ExchangeServiceImpl { diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index b832b7827adda..0d822fd3597f6 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -15,7 +15,6 @@ use std::ffi::CString; use std::fs; use std::path::Path; -use std::sync::Arc; use std::time::Duration; use itertools::Itertools; @@ -37,14 +36,14 @@ use tonic::{Code, Request, Response, Status}; #[derive(Clone)] pub struct MonitorServiceImpl { - stream_mgr: Arc, + stream_mgr: LocalStreamManager, grpc_await_tree_reg: Option, server_config: ServerConfig, } impl MonitorServiceImpl { pub fn new( - stream_mgr: Arc, + stream_mgr: LocalStreamManager, grpc_await_tree_reg: Option, server_config: ServerConfig, ) -> Self { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 2640b505b7873..def9a534586bb 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use await_tree::InstrumentAwait; use itertools::Itertools; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; @@ -26,16 +24,16 @@ use risingwave_stream::error::StreamError; use risingwave_stream::executor::Barrier; use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment}; use thiserror_ext::AsReport; -use tonic::{Code, Request, Response, Status}; +use tonic::{Request, Response, Status}; #[derive(Clone)] pub struct StreamServiceImpl { - mgr: Arc, + mgr: LocalStreamManager, env: StreamEnvironment, } impl StreamServiceImpl { - pub fn new(mgr: Arc, env: StreamEnvironment) -> Self { + pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self { StreamServiceImpl { mgr, env } } } @@ -48,7 +46,7 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - let res = self.mgr.update_actors(&req.actors); + let res = self.mgr.update_actors(req.actors).await; match res { Err(e) => { error!(error = %e.as_report(), "failed to update stream actor"); @@ -66,10 +64,7 @@ impl StreamService for StreamServiceImpl { let req = request.into_inner(); let actor_id = req.actor_id; - let res = self - .mgr - .build_actors(actor_id.as_slice(), self.env.clone()) - .await; + let res = self.mgr.build_actors(actor_id).await; match res { Err(e) => { error!(error = %e.as_report(), "failed to build actors"); @@ -108,7 +103,7 @@ impl StreamService for StreamServiceImpl { ) -> std::result::Result, Status> { let req = request.into_inner(); let actors = req.actor_ids; - self.mgr.drop_actors(&actors)?; + self.mgr.drop_actors(actors).await?; Ok(Response::new(DropActorsResponse { request_id: req.request_id, status: None, @@ -121,8 +116,7 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - self.mgr.stop_all_actors().await?; - self.env.dml_manager_ref().clear(); + self.mgr.reset().await; Ok(Response::new(ForceStopActorsResponse { request_id: req.request_id, status: None, @@ -138,28 +132,6 @@ impl StreamService for StreamServiceImpl { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?; - // The barrier might be outdated and been injected after recovery in some certain extreme - // scenarios. So some newly creating actors in the barrier are possibly not rebuilt during - // recovery. Check it here and return an error here if some actors are not found to - // avoid collection hang. We need some refine in meta side to remove this workaround since - // it will cause another round of unnecessary recovery. - let actor_ids = self.mgr.all_actor_ids(); - let missing_actor_ids = req - .actor_ids_to_collect - .iter() - .filter(|id| !actor_ids.contains(id)) - .collect_vec(); - if !missing_actor_ids.is_empty() { - tracing::warn!( - "to collect actors not found, they should be cleaned when recovering: {:?}", - missing_actor_ids - ); - return Err(Status::new( - Code::InvalidArgument, - "to collect actors not found", - )); - } - self.mgr .send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect) .await?; diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index f26ebc10c0a41..fa15dcc3a38fe 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -298,17 +298,6 @@ pub async fn compute_node_serve( // Run a background heap profiler heap_profiler.start(); - let stream_mgr = Arc::new(LocalStreamManager::new( - advertise_addr.clone(), - state_store.clone(), - streaming_metrics.clone(), - config.streaming.clone(), - await_tree_config.clone(), - memory_mgr.get_watermark_epoch(), - )); - - let grpc_await_tree_reg = await_tree_config - .map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into())); let dml_mgr = Arc::new(DmlManager::new( worker_id, config.streaming.developer.dml_channel_initial_permits, @@ -363,6 +352,16 @@ pub async fn compute_node_serve( meta_client.clone(), ); + let stream_mgr = LocalStreamManager::new( + stream_env.clone(), + streaming_metrics.clone(), + await_tree_config.clone(), + memory_mgr.get_watermark_epoch(), + ); + + let grpc_await_tree_reg = await_tree_config + .map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into())); + // Generally, one may use `risedev ctl trace` to manually get the trace reports. However, if // this is not the case, we can use the following command to get it printed into the logs // periodically. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 098325f4ad4d6..330e6e3e72dd0 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -369,7 +369,6 @@ impl GlobalStreamManager { res } - /// First broadcasts the actor info to `WorkerNodes`, and then let them build actors and channels. async fn build_actors( &self, table_fragments: &TableFragments, diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 4d0ef92add497..b838314729ad3 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -16,15 +16,20 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use anyhow::anyhow; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use parking_lot::Mutex; use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; +use rw_futures_util::{pending_on_none, AttachedFuture}; use thiserror_ext::AsReport; use tokio::select; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; +use tokio::task::JoinHandle; use self::managed_state::ManagedBarrierState; use crate::error::{IntoUnexpectedExit, StreamError, StreamResult}; -use crate::task::ActorId; +use crate::task::{ActorHandle, ActorId, AtomicU64Ref, SharedContext, StreamEnvironment}; mod managed_state; mod progress; @@ -32,11 +37,13 @@ mod progress; mod tests; pub use progress::CreateMviewProgress; +use risingwave_common::util::runtime::BackgroundShutdownRuntime; +use risingwave_pb::stream_plan; +use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_storage::store::SyncResult; -use risingwave_storage::StateStoreImpl; use crate::executor::monitor::StreamingMetrics; -use crate::executor::Barrier; +use crate::executor::{Actor, Barrier, DispatchExecutor}; use crate::task::barrier_manager::progress::BackfillState; use crate::task::barrier_manager::LocalBarrierEvent::{ReportActorCollected, ReportActorFailure}; @@ -54,7 +61,7 @@ pub struct BarrierCompleteResult { pub create_mview_progress: Vec, } -enum LocalBarrierEvent { +pub(super) enum LocalBarrierEvent { RegisterSender { actor_id: ActorId, sender: UnboundedSender, @@ -65,7 +72,7 @@ enum LocalBarrierEvent { actor_ids_to_collect: HashSet, result_sender: oneshot::Sender>, }, - Reset, + Reset(oneshot::Sender<()>), ReportActorCollected { actor_id: ActorId, barrier: Barrier, @@ -83,14 +90,96 @@ enum LocalBarrierEvent { actor: ActorId, state: BackfillState, }, + DropActors { + actors: Vec, + result_sender: oneshot::Sender<()>, + }, + UpdateActors { + actors: Vec, + result_sender: oneshot::Sender>, + }, + BuildActors { + actors: Vec, + result_sender: oneshot::Sender>, + }, #[cfg(test)] Flush(oneshot::Sender<()>), } +pub(crate) struct StreamActorManagerState { + /// Each processor runs in a future. Upon receiving a `Terminate` message, they will exit. + /// `handles` store join handles of these futures, and therefore we could wait their + /// termination. + pub(super) handles: HashMap, + + /// Stores all actor information, taken after actor built. + pub(super) actors: HashMap, + + /// Stores all actor tokio runtime monitoring tasks. + pub(super) actor_monitor_tasks: HashMap, + + #[expect(clippy::type_complexity)] + pub(super) creating_actors: FuturesUnordered< + AttachedFuture< + JoinHandle>>>, + oneshot::Sender>, + >, + >, +} + +impl StreamActorManagerState { + fn new() -> Self { + Self { + handles: HashMap::new(), + actors: HashMap::new(), + actor_monitor_tasks: HashMap::new(), + creating_actors: FuturesUnordered::new(), + } + } + + async fn next_created_actors( + &mut self, + ) -> ( + oneshot::Sender>, + StreamResult>>, + ) { + let (join_result, sender) = pending_on_none(self.creating_actors.next()).await; + ( + sender, + join_result + .map_err(|join_error| { + anyhow!( + "failed to join creating actors futures: {:?}", + join_error.as_report() + ) + .into() + }) + .and_then(|result| result), + ) + } +} + +pub(crate) struct StreamActorManager { + pub(super) env: StreamEnvironment, + pub(super) context: Arc, + pub(super) streaming_metrics: Arc, + + /// Watermark epoch number. + pub(super) watermark_epoch: AtomicU64Ref, + + pub(super) local_barrier_manager: LocalBarrierManager, + + /// Manages the await-trees of all actors. + pub(super) await_tree_reg: Option>>>, + + /// Runtime for the streaming actors. + pub(super) runtime: BackgroundShutdownRuntime, +} + /// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager. /// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the /// barriers to and collect them from all actors, and finally report the progress. -struct LocalBarrierWorker { +pub(super) struct LocalBarrierWorker { /// Stores all streaming job source sender. barrier_senders: HashMap>>, @@ -101,27 +190,47 @@ struct LocalBarrierWorker { failure_actors: HashMap, epoch_result_sender: HashMap>>, + + pub(super) actor_manager: Arc, + + pub(super) actor_manager_state: StreamActorManagerState, } impl LocalBarrierWorker { - fn new(state_store: StateStoreImpl, streaming_metrics: Arc) -> Self { + pub(super) fn new(actor_manager: Arc) -> Self { Self { barrier_senders: HashMap::new(), failure_actors: HashMap::default(), - state: ManagedBarrierState::new(state_store, streaming_metrics), + state: ManagedBarrierState::new( + actor_manager.env.state_store(), + actor_manager.streaming_metrics.clone(), + ), epoch_result_sender: HashMap::default(), + actor_manager, + actor_manager_state: StreamActorManagerState::new(), } } async fn run(mut self, mut event_rx: UnboundedReceiver) { loop { select! { + (sender, create_actors_result) = self.actor_manager_state.next_created_actors() => { + self.handle_actor_created(sender, create_actors_result); + } completed_epoch = self.state.next_completed_epoch() => { self.on_epoch_completed(completed_epoch); }, event = event_rx.recv() => { if let Some(event) = event { - self.handle_event(event); + match event { + LocalBarrierEvent::Reset(finish_sender) => { + self.reset().await; + let _ = finish_sender.send(()); + } + event => { + self.handle_event(event); + } + } } else { break; @@ -131,6 +240,18 @@ impl LocalBarrierWorker { } } + fn handle_actor_created( + &mut self, + sender: oneshot::Sender>, + create_actor_result: StreamResult>>, + ) { + let result = create_actor_result.map(|actors| { + self.spawn_actors(actors); + }); + + let _ = sender.send(result); + } + fn handle_event(&mut self, event: LocalBarrierEvent) { match event { LocalBarrierEvent::RegisterSender { actor_id, sender } => { @@ -147,8 +268,8 @@ impl LocalBarrierWorker { warn!(err=?e, "fail to send inject barrier result"); }); } - LocalBarrierEvent::Reset => { - self.reset(); + LocalBarrierEvent::Reset(_) => { + unreachable!("Reset event should be handled separately in async context") } ReportActorCollected { actor_id, barrier } => self.collect(actor_id, &barrier), ReportActorFailure { actor_id, err } => { @@ -169,6 +290,24 @@ impl LocalBarrierWorker { } #[cfg(test)] LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(), + LocalBarrierEvent::DropActors { + actors, + result_sender, + } => { + self.drop_actors(&actors); + let _ = result_sender.send(()); + } + LocalBarrierEvent::UpdateActors { + actors, + result_sender, + } => { + let result = self.update_actors(actors); + let _ = result_sender.send(result); + } + LocalBarrierEvent::BuildActors { + actors, + result_sender, + } => self.start_create_actors(&actors, result_sender), } } } @@ -209,6 +348,32 @@ impl LocalBarrierWorker { to_send: HashSet, to_collect: HashSet, ) -> StreamResult<()> { + #[cfg(not(test))] + { + use itertools::Itertools; + // The barrier might be outdated and been injected after recovery in some certain extreme + // scenarios. So some newly creating actors in the barrier are possibly not rebuilt during + // recovery. Check it here and return an error here if some actors are not found to + // avoid collection hang. We need some refine in meta side to remove this workaround since + // it will cause another round of unnecessary recovery. + let missing_actor_ids = to_collect + .iter() + .filter(|id| !self.actor_manager_state.handles.contains_key(id)) + .collect_vec(); + if !missing_actor_ids.is_empty() { + tracing::warn!( + "to collect actors not found, they should be cleaned when recovering: {:?}", + missing_actor_ids + ); + return Err(anyhow!("to collect actors not found: {:?}", to_collect).into()); + } + } + + if barrier.kind == BarrierKind::Initial { + self.actor_manager + .watermark_epoch + .store(barrier.epoch.curr, std::sync::atomic::Ordering::SeqCst); + } debug!( target: "events::stream::barrier::manager::send", "send barrier {:?}, senders = {:?}, actor_ids_to_collect = {:?}", @@ -300,11 +465,8 @@ impl LocalBarrierWorker { } /// Reset all internal states. - fn reset(&mut self) { - *self = Self::new( - self.state.state_store.clone(), - self.state.streaming_metrics.clone(), - ); + pub(super) fn reset_state(&mut self) { + *self = Self::new(self.actor_manager.clone()); } /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report @@ -341,20 +503,59 @@ pub struct LocalBarrierManager { impl LocalBarrierManager { /// Create a [`LocalBarrierWorker`] with managed mode. - pub fn new(state_store: StateStoreImpl, streaming_metrics: Arc) -> Self { + pub fn new( + context: Arc, + env: StreamEnvironment, + streaming_metrics: Arc, + await_tree_reg: Option>>>, + watermark_epoch: AtomicU64Ref, + ) -> Self { + let runtime = { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + if let Some(worker_threads_num) = context.config.actor_runtime_worker_threads_num { + builder.worker_threads(worker_threads_num); + } + builder + .thread_name("rw-streaming") + .enable_all() + .build() + .unwrap() + }; + let (tx, rx) = unbounded_channel(); - let worker = LocalBarrierWorker::new(state_store, streaming_metrics); - let _join_handle = tokio::spawn(worker.run(rx)); - Self { + let local_barrier_manager = Self { barrier_event_sender: tx, - } + }; + let actor_manager = Arc::new(StreamActorManager { + context: context.clone(), + env: env.clone(), + streaming_metrics, + watermark_epoch, + local_barrier_manager: local_barrier_manager.clone(), + await_tree_reg, + runtime: runtime.into(), + }); + let worker = LocalBarrierWorker::new(actor_manager); + let _join_handle = tokio::spawn(worker.run(rx)); + local_barrier_manager } - fn send_event(&self, event: LocalBarrierEvent) { + pub(super) fn send_event(&self, event: LocalBarrierEvent) { self.barrier_event_sender .send(event) .expect("should be able to send event") } + + pub(super) async fn send_and_await( + &self, + make_event: impl FnOnce(oneshot::Sender) -> LocalBarrierEvent, + ) -> StreamResult { + let (tx, rx) = oneshot::channel(); + let event = make_event(tx); + self.send_event(event); + rx.await + .map_err(|_| anyhow!("barrier manager maybe reset").into()) + } } impl LocalBarrierManager { @@ -371,15 +572,13 @@ impl LocalBarrierManager { actor_ids_to_send: impl IntoIterator, actor_ids_to_collect: impl IntoIterator, ) -> StreamResult<()> { - let (tx, rx) = oneshot::channel(); - self.send_event(LocalBarrierEvent::InjectBarrier { + self.send_and_await(move |result_sender| LocalBarrierEvent::InjectBarrier { barrier, actor_ids_to_send: actor_ids_to_send.into_iter().collect(), actor_ids_to_collect: actor_ids_to_collect.into_iter().collect(), - result_sender: tx, - }); - rx.await - .map_err(|_| anyhow!("barrier manager maybe reset"))? + result_sender, + }) + .await? } /// Use `prev_epoch` to remove collect rx and return rx. @@ -387,18 +586,11 @@ impl LocalBarrierManager { &self, prev_epoch: u64, ) -> StreamResult { - let (tx, rx) = oneshot::channel(); - self.send_event(LocalBarrierEvent::AwaitEpochCompleted { + self.send_and_await(|result_sender| LocalBarrierEvent::AwaitEpochCompleted { epoch: prev_epoch, - result_sender: tx, - }); - rx.await - .map_err(|_| anyhow!("barrier manager maybe reset"))? - } - - /// Reset all internal states. - pub fn reset(&self) { - self.send_event(LocalBarrierEvent::Reset) + result_sender, + }) + .await? } /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report @@ -420,9 +612,13 @@ impl LocalBarrierManager { #[cfg(test)] impl LocalBarrierManager { pub fn for_test() -> Self { + use std::sync::atomic::AtomicU64; Self::new( - StateStoreImpl::for_test(), + Arc::new(SharedContext::for_test()), + StreamEnvironment::for_test(), Arc::new(StreamingMetrics::unused()), + None, + Arc::new(AtomicU64::new(0)), ) } diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 2a84a0d01d510..476534967072b 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -21,7 +21,7 @@ type ConsumedEpoch = u64; type ConsumedRows = u64; #[derive(Debug, Clone, Copy)] -pub(super) enum BackfillState { +pub(crate) enum BackfillState { ConsumingUpstream(ConsumedEpoch, ConsumedRows), Done(ConsumedRows), } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 80f2623407fe0..5a2bde99da491 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -13,32 +13,31 @@ // limitations under the License. use core::time::Duration; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt::Debug; use std::io::Write; +use std::mem::take; use std::sync::atomic::AtomicU64; use std::sync::Arc; use anyhow::anyhow; use async_recursion::async_recursion; use futures::FutureExt; -use hytra::TrAdder; use itertools::Itertools; use parking_lot::Mutex; use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::config::{MetricLevel, StreamingConfig}; -use risingwave_common::util::addr::HostAddr; -use risingwave_common::util::runtime::BackgroundShutdownRuntime; +use risingwave_common::config::MetricLevel; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; -use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{StreamActor, StreamNode}; use risingwave_storage::monitor::HummockTraceFutureExt; -use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl}; +use risingwave_storage::{dispatch_state_store, StateStore}; +use rw_futures_util::AttachedFuture; use thiserror_ext::AsReport; +use tokio::sync::oneshot; use tokio::task::JoinHandle; use super::{unique_executor_id, unique_operator_id, BarrierCompleteResult}; @@ -47,49 +46,28 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; use crate::executor::*; use crate::from_proto::create_executor; -use crate::task::{ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamEnvironment}; +use crate::task::barrier_manager::{LocalBarrierEvent, LocalBarrierWorker}; +use crate::task::{ + ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager, + StreamActorManagerState, StreamEnvironment, +}; #[cfg(test)] -pub static LOCAL_TEST_ADDR: std::sync::LazyLock = +pub static LOCAL_TEST_ADDR: std::sync::LazyLock = std::sync::LazyLock::new(|| "127.0.0.1:2333".parse().unwrap()); pub type ActorHandle = JoinHandle<()>; pub type AtomicU64Ref = Arc; -pub struct LocalStreamManagerCore { - /// Runtime for the streaming actors. - runtime: BackgroundShutdownRuntime, - - /// Each processor runs in a future. Upon receiving a `Terminate` message, they will exit. - /// `handles` store join handles of these futures, and therefore we could wait their - /// termination. - handles: HashMap, - - /// Stores all actor information, taken after actor built. - actors: HashMap, - - /// Stores all actor tokio runtime monitoring tasks. - actor_monitor_tasks: HashMap, -} - /// `LocalStreamManager` manages all stream executors in this project. +#[derive(Clone)] pub struct LocalStreamManager { - core: Mutex, + await_tree_reg: Option>>>, - state_store: StateStoreImpl, context: Arc, - streaming_metrics: Arc, - - total_mem_val: Arc>, - - /// Watermark epoch number. - watermark_epoch: AtomicU64Ref, local_barrier_manager: LocalBarrierManager, - - /// Manages the await-trees of all actors. - await_tree_reg: Option>>, } /// Report expression evaluation errors to the actor context. @@ -164,27 +142,28 @@ impl Debug for ExecutorParams { impl LocalStreamManager { pub fn new( - addr: HostAddr, - state_store: StateStoreImpl, + env: StreamEnvironment, streaming_metrics: Arc, - config: StreamingConfig, await_tree_config: Option, watermark_epoch: AtomicU64Ref, ) -> Self { - let local_barrier_manager = - LocalBarrierManager::new(state_store.clone(), streaming_metrics.clone()); - let context = Arc::new(SharedContext::new(addr, &config)); - let core = LocalStreamManagerCore::new(context.config.actor_runtime_worker_threads_num); - Self { - state_store, - context, - total_mem_val: Arc::new(TrAdder::new()), - core: Mutex::new(core), + let context = Arc::new(SharedContext::new( + env.server_address().clone(), + env.config(), + )); + let await_tree_reg = + await_tree_config.map(|config| Arc::new(Mutex::new(await_tree::Registry::new(config)))); + let local_barrier_manager = LocalBarrierManager::new( + context.clone(), + env, streaming_metrics, + await_tree_reg.clone(), watermark_epoch, + ); + Self { + await_tree_reg, + context, local_barrier_manager, - await_tree_reg: await_tree_config - .map(|config| Mutex::new(await_tree::Registry::new(config))), } } @@ -216,11 +195,6 @@ impl LocalStreamManager { } } - /// Get all existing actor ids. - pub fn all_actor_ids(&self) -> HashSet { - self.core.lock().handles.keys().cloned().collect() - } - /// Broadcast a barrier to all senders. Save a receiver in barrier manager pub async fn send_barrier( &self, @@ -228,21 +202,12 @@ impl LocalStreamManager { actor_ids_to_send: impl IntoIterator, actor_ids_to_collect: impl IntoIterator, ) -> StreamResult<()> { - if barrier.kind == BarrierKind::Initial { - self.watermark_epoch - .store(barrier.epoch.curr, std::sync::atomic::Ordering::SeqCst); - } self.local_barrier_manager .send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect) .await?; Ok(()) } - /// Reset the state of the barrier manager. - pub fn reset_barrier_manager(&self) { - self.local_barrier_manager.reset(); - } - /// Use `epoch` to find collect rx. And wait for all actor to be collected before /// returning. pub async fn collect_barrier(&self, epoch: u64) -> StreamResult { @@ -251,26 +216,60 @@ impl LocalStreamManager { .await } - pub async fn clear_storage_buffer(&self) { - dispatch_state_store!(self.state_store.clone(), store, { - store.clear_shared_buffer().await.unwrap(); - }); + pub fn context(&self) -> &Arc { + &self.context + } + + /// Drop the resources of the given actors. + pub async fn drop_actors(&self, actors: Vec) -> StreamResult<()> { + self.local_barrier_manager + .send_and_await(|result_sender| LocalBarrierEvent::DropActors { + actors, + result_sender, + }) + .await + } + + /// Force stop all actors on this worker, and then drop their resources. + pub async fn reset(&self) { + self.local_barrier_manager + .send_and_await(LocalBarrierEvent::Reset) + .await + .expect("should receive reset") + } + + pub async fn update_actors(&self, actors: Vec) -> StreamResult<()> { + self.local_barrier_manager + .send_and_await(|result_sender| LocalBarrierEvent::UpdateActors { + actors, + result_sender, + }) + .await? } + pub async fn build_actors(&self, actors: Vec) -> StreamResult<()> { + self.local_barrier_manager + .send_and_await(|result_sender| LocalBarrierEvent::BuildActors { + actors, + result_sender, + }) + .await? + } +} + +impl LocalBarrierWorker { /// Drop the resources of the given actors. - pub fn drop_actors(&self, actors: &[ActorId]) -> StreamResult<()> { - self.context.drop_actors(actors); - let mut core = self.core.lock(); + pub(super) fn drop_actors(&mut self, actors: &[ActorId]) { + self.actor_manager.context.drop_actors(actors); for &id in actors { - core.drop_actor(id); + self.actor_manager_state.drop_actor(id); } tracing::debug!(actors = ?actors, "drop actors"); - Ok(()) } /// Force stop all actors on this worker, and then drop their resources. - pub async fn stop_all_actors(&self) -> StreamResult<()> { - let actor_handles = self.core.lock().drain_actor_handles(); + pub(super) async fn reset(&mut self) { + let actor_handles = self.actor_manager_state.drain_actor_handles(); for (actor_id, handle) in &actor_handles { tracing::debug!("force stopping actor {}", actor_id); handle.abort(); @@ -280,85 +279,72 @@ impl LocalStreamManager { let result = handle.await; assert!(result.is_ok() || result.unwrap_err().is_cancelled()); } - self.context.clear_channels(); - self.context.actor_infos.write().clear(); - self.core.lock().clear_state(); - if let Some(m) = self.await_tree_reg.as_ref() { + // Clear the join handle of creating actors + for handle in take(&mut self.actor_manager_state.creating_actors) + .into_iter() + .map(|attached_future| attached_future.into_inner().0) + { + handle.abort(); + let result = handle.await; + assert!(result.is_ok() || result.err().unwrap().is_cancelled()); + } + self.actor_manager.context.clear_channels(); + self.actor_manager.context.actor_infos.write().clear(); + self.actor_manager_state.clear_state(); + if let Some(m) = self.actor_manager.await_tree_reg.as_ref() { m.lock().clear(); } - self.reset_barrier_manager(); - // Clear shared buffer in storage to release memory - self.clear_storage_buffer().await; - - Ok(()) + dispatch_state_store!(&self.actor_manager.env.state_store(), store, { + store.clear_shared_buffer().await.unwrap(); + }); + self.reset_state(); + self.actor_manager.env.dml_manager_ref().clear(); } - pub fn update_actors(&self, actors: &[stream_plan::StreamActor]) -> StreamResult<()> { - let mut core = self.core.lock(); - core.update_actors(actors) + pub(super) fn update_actors( + &mut self, + actors: Vec, + ) -> StreamResult<()> { + self.actor_manager_state.update_actors(actors) } /// This function could only be called once during the lifecycle of `LocalStreamManager` for /// now. - pub async fn build_actors( - &self, + pub(super) fn start_create_actors( + &mut self, actors: &[ActorId], - env: StreamEnvironment, - ) -> StreamResult<()> { + result_sender: oneshot::Sender>, + ) { let actors = { - let mut core = self.core.lock(); - actors + let actor_result = actors .iter() .map(|actor_id| { - core.actors + self.actor_manager_state + .actors .remove(actor_id) .ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id)) }) - .try_collect()? - }; - let actors = self.create_actors(actors, env).await?; - self.core.lock().spawn_actors( - actors, - &self.streaming_metrics, - &self.local_barrier_manager, - self.await_tree_reg.as_ref(), - ); - Ok(()) - } - - pub fn context(&self) -> &Arc { - &self.context - } - - pub fn total_mem_usage(&self) -> usize { - self.total_mem_val.get() as usize - } -} - -impl LocalStreamManagerCore { - fn new(actor_runtime_worker_threads_num: Option) -> Self { - let runtime = { - let mut builder = tokio::runtime::Builder::new_multi_thread(); - if let Some(worker_threads_num) = actor_runtime_worker_threads_num { - builder.worker_threads(worker_threads_num); + .try_collect(); + match actor_result { + Ok(actors) => actors, + Err(e) => { + let _ = result_sender.send(Err(e.into())); + return; + } } - builder - .thread_name("rw-streaming") - .enable_all() - .build() - .unwrap() }; - - Self { - runtime: runtime.into(), - handles: HashMap::new(), - actors: HashMap::new(), - actor_monitor_tasks: HashMap::new(), - } + let actor_manager = self.actor_manager.clone(); + let join_handle = self + .actor_manager + .runtime + .spawn(actor_manager.create_actors(actors)); + self.actor_manager_state + .creating_actors + .push(AttachedFuture::new(join_handle, result_sender)); } } -impl LocalStreamManager { +impl StreamActorManager { /// Create dispatchers with downstream information registered before fn create_dispatcher( &self, @@ -542,18 +528,17 @@ impl LocalStreamManager { } async fn create_actors( - &self, + self: Arc, actors: Vec, - env: StreamEnvironment, ) -> StreamResult>> { let mut ret = Vec::with_capacity(actors.len()); for actor in actors { let actor_id = actor.actor_id; let actor_context = ActorContext::create( &actor, - self.total_mem_val.clone(), + self.env.total_mem_usage(), self.streaming_metrics.clone(), - env.config().unique_user_stream_errors, + self.env.config().unique_user_stream_errors, actor.dispatcher.len(), ); let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into()); @@ -563,7 +548,7 @@ impl LocalStreamManager { .create_nodes( actor.fragment_id, actor.get_nodes()?, - env.clone(), + self.env.clone(), &actor_context, vnode_bitmap, ) @@ -588,14 +573,8 @@ impl LocalStreamManager { } } -impl LocalStreamManagerCore { - fn spawn_actors( - &mut self, - actors: Vec>, - streaming_metrics: &Arc, - barrier_manager: &LocalBarrierManager, - await_tree_reg: Option<&Mutex>>, - ) { +impl LocalBarrierWorker { + pub(super) fn spawn_actors(&mut self, actors: Vec>) { for actor in actors { let monitor = tokio_metrics::TaskMonitor::new(); let actor_context = actor.actor_context.clone(); @@ -603,7 +582,7 @@ impl LocalStreamManagerCore { let handle = { let trace_span = format!("Actor {actor_id}: `{}`", actor_context.mview_definition); - let barrier_manager = barrier_manager.clone(); + let barrier_manager = self.actor_manager.local_barrier_manager.clone(); let actor = actor.run().map(move |result| { if let Err(err) = result { // TODO: check error type and panic if it's unexpected. @@ -612,7 +591,7 @@ impl LocalStreamManagerCore { barrier_manager.notify_failure(actor_id, err); } }); - let traced = match await_tree_reg { + let traced = match &self.actor_manager.await_tree_reg { Some(m) => m .lock() .register(actor_id, trace_span) @@ -642,16 +621,16 @@ impl LocalStreamManagerCore { } #[cfg(not(enable_task_local_alloc))] { - self.runtime.spawn(instrumented) + self.actor_manager.runtime.spawn(instrumented) } }; - self.handles.insert(actor_id, handle); + self.actor_manager_state.handles.insert(actor_id, handle); - if streaming_metrics.level >= MetricLevel::Debug { + if self.actor_manager.streaming_metrics.level >= MetricLevel::Debug { tracing::info!("Tokio metrics are enabled because metrics_level >= Debug"); let actor_id_str = actor_id.to_string(); - let metrics = streaming_metrics.clone(); - let actor_monitor_task = self.runtime.spawn(async move { + let metrics = self.actor_manager.streaming_metrics.clone(); + let actor_monitor_task = self.actor_manager.runtime.spawn(async move { loop { let task_metrics = monitor.cumulative(); metrics @@ -701,29 +680,12 @@ impl LocalStreamManagerCore { tokio::time::sleep(Duration::from_secs(1)).await; } }); - self.actor_monitor_tasks + self.actor_manager_state + .actor_monitor_tasks .insert(actor_id, actor_monitor_task); } } } - - pub fn take_all_handles(&mut self) -> StreamResult> { - Ok(std::mem::take(&mut self.handles)) - } - - pub fn remove_actor_handles( - &mut self, - actor_ids: &[ActorId], - ) -> StreamResult> { - actor_ids - .iter() - .map(|actor_id| { - self.handles - .remove(actor_id) - .ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id).into()) - }) - .try_collect() - } } impl LocalStreamManager { @@ -746,7 +708,7 @@ impl LocalStreamManager { } } -impl LocalStreamManagerCore { +impl StreamActorManagerState { /// `drop_actor` is invoked by meta node via RPC once the stop barrier arrives at the /// sink. All the actors in the actors should stop themselves before this method is invoked. fn drop_actor(&mut self, actor_id: ActorId) { @@ -772,11 +734,12 @@ impl LocalStreamManagerCore { self.actor_monitor_tasks.clear(); } - fn update_actors(&mut self, actors: &[stream_plan::StreamActor]) -> StreamResult<()> { + fn update_actors(&mut self, actors: Vec) -> StreamResult<()> { for actor in actors { + let actor_id = actor.get_actor_id(); self.actors - .try_insert(actor.get_actor_id(), actor.clone()) - .map_err(|_| anyhow!("duplicated actor {}", actor.get_actor_id()))?; + .try_insert(actor_id, actor) + .map_err(|_| anyhow!("duplicated actor {}", actor_id))?; } Ok(()) diff --git a/src/utils/futures_util/src/misc.rs b/src/utils/futures_util/src/misc.rs index 01317b3e85f24..f224b58214ddf 100644 --- a/src/utils/futures_util/src/misc.rs +++ b/src/utils/futures_util/src/misc.rs @@ -14,6 +14,7 @@ use std::future::Future; use std::pin::{pin, Pin}; +use std::task::{ready, Context, Poll}; use futures::future::{pending, select, Either}; use futures::stream::Peekable; @@ -76,3 +77,45 @@ pub async fn await_future_with_monitor_error_stream( Either::Right((output, _)) => Ok(output), } } + +/// Attach an item of type `T` to the future `F`. When the future is polled with ready, +/// the item will be attached to the output of future as `(F::Output, item)`. +/// +/// The generated future will be similar to `future.map(|output| (output, item))`. The +/// only difference is that the `Map` future does not provide method `into_inner` to +/// get the original inner future. +pub struct AttachedFuture { + inner: F, + item: Option, +} + +impl AttachedFuture { + pub fn new(inner: F, item: T) -> Self { + Self { + inner, + item: Some(item), + } + } + + pub fn into_inner(self) -> (F, T) { + ( + self.inner, + self.item.expect("should not be called after polled ready"), + ) + } +} + +impl Future for AttachedFuture { + type Output = (F::Output, T); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + let output = ready!(this.inner.poll_unpin(cx)); + Poll::Ready(( + output, + this.item + .take() + .expect("should not be polled ready for twice"), + )) + } +}