diff --git a/Cargo.lock b/Cargo.lock index e315fe76c85d9..fcb2e7a20db6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -959,19 +959,20 @@ dependencies = [ [[package]] name = "await-tree" -version = "0.1.1" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "325bcfc4b87d4aa36f1319b806bacc40fcefcaf43a12bd85a5a2f44fc14ce9de" +checksum = "0c2d7aec54383fa38ac2f9c28435a02f7312f7174e470c7d5566d2b7e17f9a8d" dependencies = [ "coarsetime", - "derive_builder", + "derive_builder 0.20.0", "flexstr", "indextree", - "itertools 0.10.5", + "itertools 0.12.1", "parking_lot 0.12.1", "pin-project", "tokio", "tracing", + "weak-table", ] [[package]] @@ -3462,7 +3463,16 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" dependencies = [ - "derive_builder_macro", + "derive_builder_macro 0.12.0", +] + +[[package]] +name = "derive_builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" +dependencies = [ + "derive_builder_macro 0.20.0", ] [[package]] @@ -3477,16 +3487,38 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_builder_core" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" +dependencies = [ + "darling 0.20.8", + "proc-macro2", + "quote", + "syn 2.0.57", +] + [[package]] name = "derive_builder_macro" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" dependencies = [ - "derive_builder_core", + "derive_builder_core 0.12.0", "syn 1.0.109", ] +[[package]] +name = "derive_builder_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" +dependencies = [ + "derive_builder_core 0.20.0", + "syn 2.0.57", +] + [[package]] name = "derive_utils" version = "0.14.1" @@ -5390,7 +5422,7 @@ dependencies = [ "bytes", "chrono", "csv", - "derive_builder", + "derive_builder 0.12.0", "enum-display", "faster-hex", "futures", @@ -14136,6 +14168,12 @@ dependencies = [ "wast 201.0.0", ] +[[package]] +name = "weak-table" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "323f4da9523e9a669e1eaf9c6e763892769b1d38c623913647bfdc1532fe4549" + [[package]] name = "web-sys" version = "0.3.64" diff --git a/Cargo.toml b/Cargo.toml index 33683e6ee27ed..f931f3524a8f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] foyer = "0.6" -await-tree = "0.1.1" +await-tree = "0.2.1" aws-config = { version = "1", default-features = false, features = [ "behavior-version-latest", "rt-tokio", diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 95755ada1d862..d2542ca9bd085 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -29,7 +29,9 @@ use risingwave_pb::monitor_service::{ StackTraceResponse, }; use risingwave_rpc_client::error::ToTonicStatus; +use risingwave_storage::hummock::compactor::await_tree_key::Compaction; use risingwave_stream::executor::monitor::global_streaming_metrics; +use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait}; use risingwave_stream::task::LocalStreamManager; use thiserror_ext::AsReport; use tonic::{Code, Request, Response, Status}; @@ -37,19 +39,13 @@ use tonic::{Code, Request, Response, Status}; #[derive(Clone)] pub struct MonitorServiceImpl { stream_mgr: LocalStreamManager, - grpc_await_tree_reg: Option, server_config: ServerConfig, } impl MonitorServiceImpl { - pub fn new( - stream_mgr: LocalStreamManager, - grpc_await_tree_reg: Option, - server_config: ServerConfig, - ) -> Self { + pub fn new(stream_mgr: LocalStreamManager, server_config: ServerConfig) -> Self { Self { stream_mgr, - grpc_await_tree_reg, server_config, } } @@ -64,25 +60,28 @@ impl MonitorService for MonitorServiceImpl { ) -> Result, Status> { let _req = request.into_inner(); - let actor_traces = self - .stream_mgr - .get_actor_traces() - .into_iter() - .map(|(k, v)| (k, v.to_string())) - .collect(); + let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() { + reg.collect::() + .into_iter() + .map(|(k, v)| (k.0, v.to_string())) + .collect() + } else { + Default::default() + }; - let barrier_traces = self - .stream_mgr - .get_barrier_traces() - .into_iter() - .map(|(k, v)| (k, v.to_string())) - .collect(); + let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() { + reg.collect::() + .into_iter() + .map(|(k, v)| (k.prev_epoch, v.to_string())) + .collect() + } else { + Default::default() + }; - let rpc_traces = if let Some(m) = &self.grpc_await_tree_reg { - m.lock() - .await - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) + let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() { + reg.collect::() + .into_iter() + .map(|(k, v)| (k.desc, v.to_string())) .collect() } else { Default::default() @@ -92,9 +91,9 @@ impl MonitorService for MonitorServiceImpl { self.stream_mgr.env.state_store().as_hummock() && let Some(m) = hummock.compaction_await_tree_reg() { - m.read() - .iter() - .map(|(k, v)| (k.clone(), v.to_string())) + m.collect::() + .into_iter() + .map(|(k, v)| (format!("{k:?}"), v.to_string())) .collect() } else { Default::default() @@ -296,12 +295,17 @@ pub mod grpc_middleware { use either::Either; use futures::Future; use hyper::Body; - use tokio::sync::Mutex; use tonic::transport::NamedService; use tower::{Layer, Service}; /// Manages the await-trees of `gRPC` requests that are currently served by the compute node. - pub type AwaitTreeRegistryRef = Arc>>; + pub type AwaitTreeRegistryRef = await_tree::Registry; + + /// Await-tree key type for gRPC calls. + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + pub struct GrpcCall { + pub desc: String, + } #[derive(Clone)] pub struct AwaitTreeMiddlewareLayer { @@ -365,14 +369,15 @@ pub mod grpc_middleware { let mut inner = std::mem::replace(&mut self.inner, clone); let id = self.next_id.fetch_add(1, Ordering::SeqCst); - let key = if let Some(authority) = req.uri().authority() { + let desc = if let Some(authority) = req.uri().authority() { format!("{authority} - {id}") } else { format!("?? - {id}") }; + let key = GrpcCall { desc }; Either::Right(async move { - let root = registry.lock().await.register(key, req.uri().path()); + let root = registry.register(key, req.uri().path()); root.instrument(inner.call(req)).await }) diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 18b77ff1804bc..2222f09e45f3d 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -26,8 +26,8 @@ use tonic::{Request, Response, Status, Streaming}; #[derive(Clone)] pub struct StreamServiceImpl { - mgr: LocalStreamManager, - env: StreamEnvironment, + pub mgr: LocalStreamManager, + pub env: StreamEnvironment, } impl StreamServiceImpl { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 7119df4926b9e..51be6dc7c80f1 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -75,9 +75,7 @@ use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS; use crate::rpc::service::exchange_service::ExchangeServiceImpl; use crate::rpc::service::health_service::HealthServiceImpl; -use crate::rpc::service::monitor_service::{ - AwaitTreeMiddlewareLayer, AwaitTreeRegistryRef, MonitorServiceImpl, -}; +use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl}; use crate::rpc::service::stream_service::StreamServiceImpl; use crate::telemetry::ComputeTelemetryCreator; use crate::ComputeNodeOpts; @@ -372,28 +370,12 @@ pub async fn compute_node_serve( memory_mgr.get_watermark_epoch(), ); - let grpc_await_tree_reg = await_tree_config - .map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into())); - - // Generally, one may use `risedev ctl trace` to manually get the trace reports. However, if - // this is not the case, we can use the following command to get it printed into the logs - // periodically. - // - // Comment out the following line to enable. - // TODO: may optionally enable based on the features - #[cfg(any())] - stream_mgr.clone().spawn_print_trace(); - // Boot the runtime gRPC services. let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env); let exchange_srv = ExchangeServiceImpl::new(batch_mgr.clone(), stream_mgr.clone(), exchange_srv_metrics); let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone()); - let monitor_srv = MonitorServiceImpl::new( - stream_mgr.clone(), - grpc_await_tree_reg.clone(), - config.server.clone(), - ); + let monitor_srv = MonitorServiceImpl::new(stream_mgr.clone(), config.server.clone()); let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr); let health_srv = HealthServiceImpl::new(); @@ -425,6 +407,7 @@ pub async fn compute_node_serve( ExchangeServiceServer::new(exchange_srv).max_decoding_message_size(usize::MAX), ) .add_service({ + let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned(); let srv = StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX); #[cfg(madsim)] @@ -433,7 +416,7 @@ pub async fn compute_node_serve( } #[cfg(not(madsim))] { - AwaitTreeMiddlewareLayer::new_optional(grpc_await_tree_reg).layer(srv) + AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv) } }) .add_service(MonitorServiceServer::new(monitor_srv)) diff --git a/src/storage/compactor/src/rpc.rs b/src/storage/compactor/src/rpc.rs index a748d96190a17..d74f8f977f5b1 100644 --- a/src/storage/compactor/src/rpc.rs +++ b/src/storage/compactor/src/rpc.rs @@ -23,6 +23,7 @@ use risingwave_pb::monitor_service::{ ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse, }; +use risingwave_storage::hummock::compactor::await_tree_key::Compaction; use risingwave_storage::hummock::compactor::CompactionAwaitTreeRegRef; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; @@ -85,9 +86,9 @@ impl MonitorService for MonitorServiceImpl { let compaction_task_traces = match &self.await_tree_reg { None => Default::default(), Some(await_tree_reg) => await_tree_reg - .read() - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) + .collect::() + .into_iter() + .map(|(k, v)| (format!("{k:?}"), v.to_string())) .collect(), }; Ok(Response::new(StackTraceResponse { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 775e1ef8cf86d..46ccaf0ebb94d 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -45,7 +45,8 @@ use crate::hummock::compactor::compaction_utils::{ use crate::hummock::compactor::iterator::ConcatSstableIterator; use crate::hummock::compactor::task_progress::TaskProgressGuard; use crate::hummock::compactor::{ - fast_compactor_runner, CompactOutput, CompactionFilter, Compactor, CompactorContext, + await_tree_key, fast_compactor_runner, CompactOutput, CompactionFilter, Compactor, + CompactorContext, }; use crate::hummock::iterator::{ Forward, HummockIterator, MergeIterator, SkipWatermarkIterator, ValueMeta, @@ -627,9 +628,11 @@ pub async fn compact( let traced = match context.await_tree_reg.as_ref() { None => runner.right_future(), Some(await_tree_reg) => await_tree_reg - .write() .register( - format!("compact_runner/{}-{}", compact_task.task_id, split_index), + await_tree_key::CompactRunner { + task_id: compact_task.task_id, + split_index, + }, format!( "Compaction Task {} Split {} ", compact_task.task_id, split_index diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index fb748b67e8a55..4525dcdc773fb 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use more_asserts::assert_ge; -use parking_lot::RwLock; use super::task_progress::TaskProgressManagerRef; use crate::hummock::compactor::CompactionExecutor; @@ -25,10 +24,23 @@ use crate::hummock::MemoryLimiter; use crate::monitor::CompactorMetrics; use crate::opts::StorageOpts; -pub type CompactionAwaitTreeRegRef = Arc>>; +pub type CompactionAwaitTreeRegRef = await_tree::Registry; pub fn new_compaction_await_tree_reg_ref(config: await_tree::Config) -> CompactionAwaitTreeRegRef { - Arc::new(RwLock::new(await_tree::Registry::new(config))) + await_tree::Registry::new(config) +} + +pub mod await_tree_key { + /// Await-tree key type for compaction tasks. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub enum Compaction { + CompactRunner { task_id: u64, split_index: usize }, + CompactSharedBuffer { id: usize }, + SpawnUploadTask { id: usize }, + MergingTask { id: usize }, + } + + pub use Compaction::*; } /// A `CompactorContext` describes the context of a compactor. diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 4540af2ea0749..c18828bc93e72 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -45,7 +45,9 @@ pub use compaction_filter::{ CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter, TtlCompactionFilter, }; -pub use context::{new_compaction_await_tree_reg_ref, CompactionAwaitTreeRegRef, CompactorContext}; +pub use context::{ + await_tree_key, new_compaction_await_tree_reg_ref, CompactionAwaitTreeRegRef, CompactorContext, +}; use futures::{pin_mut, StreamExt}; pub use iterator::{ConcatSstableIterator, SstableStreamIterator}; use more_asserts::assert_ge; diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index a94c66f56f4a0..9c25eee366bda 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -36,7 +36,7 @@ use tracing::{error, warn}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_filter::DummyCompactionFilter; -use crate::hummock::compactor::context::CompactorContext; +use crate::hummock::compactor::context::{await_tree_key, CompactorContext}; use crate::hummock::compactor::{check_flush_result, CompactOutput, Compactor}; use crate::hummock::event_handler::uploader::UploadTaskPayload; use crate::hummock::event_handler::LocalInstanceId; @@ -186,8 +186,8 @@ async fn compact_shared_buffer( LazyLock::new(|| AtomicUsize::new(0)); let tree_root = context.await_tree_reg.as_ref().map(|reg| { let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed); - reg.write().register( - format!("compact_shared_buffer/{}", id), + reg.register( + await_tree_key::CompactSharedBuffer { id }, format!( "Compact Shared Buffer: {:?}", payload diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index cbbc3f656e8e0..fcfcac53a478e 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -39,7 +39,7 @@ use tracing::{debug, error, info, trace, warn}; use super::refiller::{CacheRefillConfig, CacheRefiller}; use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; use crate::filter_key_extractor::FilterKeyExtractorManager; -use crate::hummock::compactor::{compact, CompactorContext}; +use crate::hummock::compactor::{await_tree_key, compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ @@ -245,8 +245,8 @@ impl HummockEventHandler { LazyLock::new(|| AtomicUsize::new(0)); let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| { let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed); - reg.write().register( - format!("spawn_upload_task/{}", upload_task_id), + reg.register( + await_tree_key::SpawnUploadTask { id: upload_task_id }, format!("Spawn Upload Task: {}", task_info), ) }); diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 9e23af91781a5..42a0a48c43ba3 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -42,7 +42,7 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info}; use crate::hummock::compactor::{ - merge_imms_in_memory, CompactionAwaitTreeRegRef, CompactionExecutor, + await_tree_key, merge_imms_in_memory, CompactionAwaitTreeRegRef, CompactionExecutor, }; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::LocalInstanceId; @@ -89,8 +89,10 @@ pub(crate) fn default_spawn_merging_task( LazyLock::new(|| AtomicUsize::new(0)); let tree_root = await_tree_reg.as_ref().map(|reg| { let merging_task_id = NEXT_MERGING_TASK_ID.fetch_add(1, Relaxed); - reg.write().register( - format!("merging_task/{}", merging_task_id), + reg.register( + await_tree_key::MergingTask { + id: merging_task_id, + }, format!( "Merging Imm {:?} {:?} {:?}", table_id, diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 18249340bafdf..29b95c314ef55 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -21,7 +21,6 @@ use arc_swap::ArcSwap; use bytes::Bytes; use itertools::Itertools; use more_asserts::assert_gt; -use parking_lot::RwLock; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::is_max_epoch; use risingwave_common_service::observer_manager::{NotificationClient, ObserverManager}; @@ -453,8 +452,8 @@ impl HummockStorage { self.backup_reader.clone() } - pub fn compaction_await_tree_reg(&self) -> Option<&RwLock>> { - self.compact_await_tree_reg.as_ref().map(AsRef::as_ref) + pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> { + self.compact_await_tree_reg.as_ref() } } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 65fd141ac376c..5cd76eda6215d 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -287,7 +287,7 @@ pub(crate) struct StreamActorManager { pub(super) watermark_epoch: AtomicU64Ref, /// Manages the await-trees of all actors. - pub(super) await_tree_reg: Option>>>, + pub(super) await_tree_reg: Option, /// Runtime for the streaming actors. pub(super) runtime: BackgroundShutdownRuntime, @@ -322,10 +322,7 @@ pub(super) struct LocalBarrierWorker { } impl LocalBarrierWorker { - pub(super) fn new( - actor_manager: Arc, - barrier_await_tree_reg: Option>>>, - ) -> Self { + pub(super) fn new(actor_manager: Arc) -> Self { let (event_tx, event_rx) = unbounded_channel(); let (failure_tx, failure_rx) = unbounded_channel(); let shared_context = Arc::new(SharedContext::new( @@ -342,7 +339,7 @@ impl LocalBarrierWorker { state: ManagedBarrierState::new( actor_manager.env.state_store(), actor_manager.streaming_metrics.clone(), - barrier_await_tree_reg, + actor_manager.await_tree_reg.clone(), ), control_stream_handle: ControlStreamHandle::empty(), actor_manager, @@ -678,10 +675,7 @@ impl LocalBarrierWorker { /// Reset all internal states. pub(super) fn reset_state(&mut self) { - *self = Self::new( - self.actor_manager.clone(), - self.state.reset_and_take_barrier_await_tree_reg(), - ); + *self = Self::new(self.actor_manager.clone()); } /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report @@ -744,8 +738,7 @@ impl LocalBarrierWorker { pub fn spawn( env: StreamEnvironment, streaming_metrics: Arc, - await_tree_reg: Option>>>, - barrier_await_tree_reg: Option>>>, + await_tree_reg: Option, watermark_epoch: AtomicU64Ref, actor_op_rx: UnboundedReceiver, ) -> JoinHandle<()> { @@ -768,7 +761,7 @@ impl LocalBarrierWorker { await_tree_reg, runtime: runtime.into(), }); - let worker = LocalBarrierWorker::new(actor_manager, barrier_await_tree_reg); + let worker = LocalBarrierWorker::new(actor_manager); tokio::spawn(worker.run(actor_op_rx)) } } @@ -870,7 +863,6 @@ impl LocalBarrierManager { StreamEnvironment::for_test(), Arc::new(StreamingMetrics::unused()), None, - None, Arc::new(AtomicU64::new(0)), rx, ); diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 9a17afc08a039..ed192b39dfe5f 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -23,7 +23,6 @@ use anyhow::anyhow; use await_tree::InstrumentAwait; use futures::stream::FuturesOrdered; use futures::{FutureExt, StreamExt}; -use parking_lot::Mutex; use prometheus::HistogramTimer; use risingwave_common::must_match; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -39,7 +38,7 @@ use super::BarrierCompleteResult; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; -use crate::task::ActorId; +use crate::task::{await_tree_key, ActorId}; /// The state machine of local barrier manager. #[derive(Debug)] @@ -142,7 +141,7 @@ pub(super) struct ManagedBarrierState { await_epoch_completed_futures: FuturesOrdered, /// Manages the await-trees of all barriers. - barrier_await_tree_reg: Option>>>, + barrier_await_tree_reg: Option, } impl ManagedBarrierState { @@ -159,7 +158,7 @@ impl ManagedBarrierState { pub(super) fn new( state_store: StateStoreImpl, streaming_metrics: Arc, - barrier_await_tree_reg: Option>>>, + barrier_await_tree_reg: Option, ) -> Self { Self { epoch_barrier_state_map: BTreeMap::default(), @@ -171,15 +170,6 @@ impl ManagedBarrierState { } } - pub(super) fn reset_and_take_barrier_await_tree_reg( - &mut self, - ) -> Option>>> { - if let Some(reg) = &self.barrier_await_tree_reg { - reg.lock().clear(); - } - self.barrier_await_tree_reg.take() - } - pub fn read_barrier_mutation( &mut self, barrier: &Barrier, @@ -300,10 +290,12 @@ impl ManagedBarrierState { }, ); if let Some(reg) = &self.barrier_await_tree_reg { - reg.lock() - .register(prev_epoch, format!("SyncEpoch({})", prev_epoch)) - .instrument(future) - .left_future() + reg.register( + await_tree_key::BarrierAwait { prev_epoch }, + format!("SyncEpoch({})", prev_epoch), + ) + .instrument(future) + .left_future() } else { future.right_future() } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 28bd0a9e220ab..a275a6eaeb0f4 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -13,9 +13,7 @@ // limitations under the License. use core::time::Duration; -use std::collections::HashMap; use std::fmt::Debug; -use std::io::Write; use std::mem::take; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -25,7 +23,6 @@ use async_recursion::async_recursion; use futures::stream::BoxStream; use futures::FutureExt; use itertools::Itertools; -use parking_lot::Mutex; use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{Field, Schema}; @@ -73,11 +70,22 @@ pub type ActorHandle = JoinHandle<()>; pub type AtomicU64Ref = Arc; +pub mod await_tree_key { + /// Await-tree key type for actors. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub struct Actor(pub crate::task::ActorId); + + /// Await-tree key type for barriers. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub struct BarrierAwait { + pub prev_epoch: u64, + } +} + /// `LocalStreamManager` manages all stream executors in this project. #[derive(Clone)] pub struct LocalStreamManager { - await_tree_reg: Option>>>, - barrier_await_tree_reg: Option>>>, + await_tree_reg: Option, pub env: StreamEnvironment, @@ -163,11 +171,7 @@ impl LocalStreamManager { await_tree_config: Option, watermark_epoch: AtomicU64Ref, ) -> Self { - let await_tree_reg = await_tree_config - .clone() - .map(|config| Arc::new(Mutex::new(await_tree::Registry::new(config)))); - let barrier_await_tree_reg = - await_tree_config.map(|config| Arc::new(Mutex::new(await_tree::Registry::new(config)))); + let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new); let (actor_op_tx, actor_op_rx) = unbounded_channel(); @@ -175,62 +179,19 @@ impl LocalStreamManager { env.clone(), streaming_metrics, await_tree_reg.clone(), - barrier_await_tree_reg.clone(), watermark_epoch, actor_op_rx, ); Self { await_tree_reg, - barrier_await_tree_reg, env, actor_op_tx: EventSender(actor_op_tx), } } - /// Print the traces of all actors periodically, used for debugging only. - pub fn spawn_print_trace(self: Arc) -> JoinHandle { - tokio::spawn(async move { - loop { - tokio::time::sleep(std::time::Duration::from_millis(5000)).await; - let mut o = std::io::stdout().lock(); - - for (k, trace) in self - .await_tree_reg - .as_ref() - .expect("async stack trace not enabled") - .lock() - .iter() - { - writeln!(o, ">> Actor {}\n\n{}", k, trace).ok(); - } - - for (e, trace) in self - .barrier_await_tree_reg - .as_ref() - .expect("async stack trace not enabled") - .lock() - .iter() - { - writeln!(o, ">> Barrier {}\n\n{}", e, trace).ok(); - } - } - }) - } - - /// Get await-tree contexts for all actors. - pub fn get_actor_traces(&self) -> HashMap { - match &self.await_tree_reg.as_ref() { - Some(mgr) => mgr.lock().iter().map(|(k, v)| (*k, v)).collect(), - None => Default::default(), - } - } - - /// Get await-tree contexts for all barrier. - pub fn get_barrier_traces(&self) -> HashMap { - match &self.barrier_await_tree_reg.as_ref() { - Some(mgr) => mgr.lock().iter().map(|(k, v)| (*k, v)).collect(), - None => Default::default(), - } + /// Get the registry of await-trees. + pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> { + self.await_tree_reg.as_ref() } /// Receive a new control stream request from meta. Notify the barrier worker to reset the CN and use the new control stream @@ -328,7 +289,7 @@ impl LocalBarrierWorker { } self.actor_manager_state.clear_state(); if let Some(m) = self.actor_manager.await_tree_reg.as_ref() { - m.lock().clear(); + m.clear(); } dispatch_state_store!(&self.actor_manager.env.state_store(), store, { store.clear_shared_buffer(prev_epoch).await; @@ -644,8 +605,7 @@ impl LocalBarrierWorker { }); let traced = match &self.actor_manager.await_tree_reg { Some(m) => m - .lock() - .register(actor_id, trace_span) + .register(await_tree_key::Actor(actor_id), trace_span) .instrument(actor) .left_future(), None => actor.right_future(),