Skip to content

Commit

Permalink
refactor: bump await-tree to 0.2 (#16035)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Apr 4, 2024
1 parent 713c7b5 commit dfe5f31
Show file tree
Hide file tree
Showing 16 changed files with 163 additions and 174 deletions.
52 changes: 45 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = "0.6"
await-tree = "0.1.1"
await-tree = "0.2.1"
aws-config = { version = "1", default-features = false, features = [
"behavior-version-latest",
"rt-tokio",
Expand Down
67 changes: 36 additions & 31 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,23 @@ use risingwave_pb::monitor_service::{
StackTraceResponse,
};
use risingwave_rpc_client::error::ToTonicStatus;
use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
use risingwave_stream::executor::monitor::global_streaming_metrics;
use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait};
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tonic::{Code, Request, Response, Status};

#[derive(Clone)]
pub struct MonitorServiceImpl {
stream_mgr: LocalStreamManager,
grpc_await_tree_reg: Option<AwaitTreeRegistryRef>,
server_config: ServerConfig,
}

impl MonitorServiceImpl {
pub fn new(
stream_mgr: LocalStreamManager,
grpc_await_tree_reg: Option<AwaitTreeRegistryRef>,
server_config: ServerConfig,
) -> Self {
pub fn new(stream_mgr: LocalStreamManager, server_config: ServerConfig) -> Self {
Self {
stream_mgr,
grpc_await_tree_reg,
server_config,
}
}
Expand All @@ -64,25 +60,28 @@ impl MonitorService for MonitorServiceImpl {
) -> Result<Response<StackTraceResponse>, Status> {
let _req = request.into_inner();

let actor_traces = self
.stream_mgr
.get_actor_traces()
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect();
let actor_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
reg.collect::<Actor>()
.into_iter()
.map(|(k, v)| (k.0, v.to_string()))
.collect()
} else {
Default::default()
};

let barrier_traces = self
.stream_mgr
.get_barrier_traces()
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect();
let barrier_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
reg.collect::<BarrierAwait>()
.into_iter()
.map(|(k, v)| (k.prev_epoch, v.to_string()))
.collect()
} else {
Default::default()
};

let rpc_traces = if let Some(m) = &self.grpc_await_tree_reg {
m.lock()
.await
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
let rpc_traces = if let Some(reg) = self.stream_mgr.await_tree_reg() {
reg.collect::<GrpcCall>()
.into_iter()
.map(|(k, v)| (k.desc, v.to_string()))
.collect()
} else {
Default::default()
Expand All @@ -92,9 +91,9 @@ impl MonitorService for MonitorServiceImpl {
self.stream_mgr.env.state_store().as_hummock()
&& let Some(m) = hummock.compaction_await_tree_reg()
{
m.read()
.iter()
.map(|(k, v)| (k.clone(), v.to_string()))
m.collect::<Compaction>()
.into_iter()
.map(|(k, v)| (format!("{k:?}"), v.to_string()))
.collect()
} else {
Default::default()
Expand Down Expand Up @@ -296,12 +295,17 @@ pub mod grpc_middleware {
use either::Either;
use futures::Future;
use hyper::Body;
use tokio::sync::Mutex;
use tonic::transport::NamedService;
use tower::{Layer, Service};

/// Manages the await-trees of `gRPC` requests that are currently served by the compute node.
pub type AwaitTreeRegistryRef = Arc<Mutex<await_tree::Registry<String>>>;
pub type AwaitTreeRegistryRef = await_tree::Registry;

/// Await-tree key type for gRPC calls.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GrpcCall {
pub desc: String,
}

#[derive(Clone)]
pub struct AwaitTreeMiddlewareLayer {
Expand Down Expand Up @@ -365,14 +369,15 @@ pub mod grpc_middleware {
let mut inner = std::mem::replace(&mut self.inner, clone);

let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let key = if let Some(authority) = req.uri().authority() {
let desc = if let Some(authority) = req.uri().authority() {
format!("{authority} - {id}")
} else {
format!("?? - {id}")
};
let key = GrpcCall { desc };

Either::Right(async move {
let root = registry.lock().await.register(key, req.uri().path());
let root = registry.register(key, req.uri().path());

root.instrument(inner.call(req)).await
})
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use tonic::{Request, Response, Status, Streaming};

#[derive(Clone)]
pub struct StreamServiceImpl {
mgr: LocalStreamManager,
env: StreamEnvironment,
pub mgr: LocalStreamManager,
pub env: StreamEnvironment,
}

impl StreamServiceImpl {
Expand Down
25 changes: 4 additions & 21 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ use crate::rpc::service::config_service::ConfigServiceImpl;
use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS;
use crate::rpc::service::exchange_service::ExchangeServiceImpl;
use crate::rpc::service::health_service::HealthServiceImpl;
use crate::rpc::service::monitor_service::{
AwaitTreeMiddlewareLayer, AwaitTreeRegistryRef, MonitorServiceImpl,
};
use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl};
use crate::rpc::service::stream_service::StreamServiceImpl;
use crate::telemetry::ComputeTelemetryCreator;
use crate::ComputeNodeOpts;
Expand Down Expand Up @@ -372,28 +370,12 @@ pub async fn compute_node_serve(
memory_mgr.get_watermark_epoch(),
);

let grpc_await_tree_reg = await_tree_config
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));

// Generally, one may use `risedev ctl trace` to manually get the trace reports. However, if
// this is not the case, we can use the following command to get it printed into the logs
// periodically.
//
// Comment out the following line to enable.
// TODO: may optionally enable based on the features
#[cfg(any())]
stream_mgr.clone().spawn_print_trace();

// Boot the runtime gRPC services.
let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env);
let exchange_srv =
ExchangeServiceImpl::new(batch_mgr.clone(), stream_mgr.clone(), exchange_srv_metrics);
let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone());
let monitor_srv = MonitorServiceImpl::new(
stream_mgr.clone(),
grpc_await_tree_reg.clone(),
config.server.clone(),
);
let monitor_srv = MonitorServiceImpl::new(stream_mgr.clone(), config.server.clone());
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr);
let health_srv = HealthServiceImpl::new();

Expand Down Expand Up @@ -425,6 +407,7 @@ pub async fn compute_node_serve(
ExchangeServiceServer::new(exchange_srv).max_decoding_message_size(usize::MAX),
)
.add_service({
let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned();
let srv =
StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX);
#[cfg(madsim)]
Expand All @@ -433,7 +416,7 @@ pub async fn compute_node_serve(
}
#[cfg(not(madsim))]
{
AwaitTreeMiddlewareLayer::new_optional(grpc_await_tree_reg).layer(srv)
AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv)
}
})
.add_service(MonitorServiceServer::new(monitor_srv))
Expand Down
7 changes: 4 additions & 3 deletions src/storage/compactor/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_pb::monitor_service::{
ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest,
StackTraceResponse,
};
use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
use risingwave_storage::hummock::compactor::CompactionAwaitTreeRegRef;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -85,9 +86,9 @@ impl MonitorService for MonitorServiceImpl {
let compaction_task_traces = match &self.await_tree_reg {
None => Default::default(),
Some(await_tree_reg) => await_tree_reg
.read()
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<Compaction>()
.into_iter()
.map(|(k, v)| (format!("{k:?}"), v.to_string()))
.collect(),
};
Ok(Response::new(StackTraceResponse {
Expand Down
9 changes: 6 additions & 3 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ use crate::hummock::compactor::compaction_utils::{
use crate::hummock::compactor::iterator::ConcatSstableIterator;
use crate::hummock::compactor::task_progress::TaskProgressGuard;
use crate::hummock::compactor::{
fast_compactor_runner, CompactOutput, CompactionFilter, Compactor, CompactorContext,
await_tree_key, fast_compactor_runner, CompactOutput, CompactionFilter, Compactor,
CompactorContext,
};
use crate::hummock::iterator::{
Forward, HummockIterator, MergeIterator, SkipWatermarkIterator, ValueMeta,
Expand Down Expand Up @@ -627,9 +628,11 @@ pub async fn compact(
let traced = match context.await_tree_reg.as_ref() {
None => runner.right_future(),
Some(await_tree_reg) => await_tree_reg
.write()
.register(
format!("compact_runner/{}-{}", compact_task.task_id, split_index),
await_tree_key::CompactRunner {
task_id: compact_task.task_id,
split_index,
},
format!(
"Compaction Task {} Split {} ",
compact_task.task_id, split_index
Expand Down
18 changes: 15 additions & 3 deletions src/storage/src/hummock/compactor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use more_asserts::assert_ge;
use parking_lot::RwLock;

use super::task_progress::TaskProgressManagerRef;
use crate::hummock::compactor::CompactionExecutor;
Expand All @@ -25,10 +24,23 @@ use crate::hummock::MemoryLimiter;
use crate::monitor::CompactorMetrics;
use crate::opts::StorageOpts;

pub type CompactionAwaitTreeRegRef = Arc<RwLock<await_tree::Registry<String>>>;
pub type CompactionAwaitTreeRegRef = await_tree::Registry;

pub fn new_compaction_await_tree_reg_ref(config: await_tree::Config) -> CompactionAwaitTreeRegRef {
Arc::new(RwLock::new(await_tree::Registry::new(config)))
await_tree::Registry::new(config)
}

pub mod await_tree_key {
/// Await-tree key type for compaction tasks.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Compaction {
CompactRunner { task_id: u64, split_index: usize },
CompactSharedBuffer { id: usize },
SpawnUploadTask { id: usize },
MergingTask { id: usize },
}

pub use Compaction::*;
}

/// A `CompactorContext` describes the context of a compactor.
Expand Down
Loading

0 comments on commit dfe5f31

Please sign in to comment.