From c9a6a1fdde4b78ccea71d27a8432753ed697df55 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 9 Jul 2024 20:15:01 +0800 Subject: [PATCH] refactor(common): remove dead code and simplify (#17585) Signed-off-by: xxchan --- src/common/common_service/src/lib.rs | 12 +- .../common_service/src/observer_manager.rs | 49 +--- src/common/metrics/src/monitor/connection.rs | 22 -- src/common/metrics/src/monitor/mod.rs | 71 +----- src/common/metrics/src/monitor/my_stats.rs | 221 ------------------ src/compute/src/observer/observer_manager.rs | 6 +- src/compute/src/server.rs | 6 +- src/frontend/src/observer/observer_manager.rs | 6 +- src/frontend/src/session.rs | 3 +- src/meta/node/src/server.rs | 5 +- src/meta/src/hummock/manager/mod.rs | 2 +- src/object_store/src/object/s3.rs | 2 +- src/rpc_client/src/compute_client.rs | 2 +- src/rpc_client/src/connector_client.rs | 2 +- src/rpc_client/src/stream_client.rs | 2 +- .../compactor_observer/observer_manager.rs | 6 +- src/storage/compactor/src/server.rs | 5 +- .../src/bin/replay/replay_impl.rs | 2 +- .../hummock_test/src/compactor_tests.rs | 2 +- .../src/mock_notification_client.rs | 2 +- src/storage/hummock_test/src/test_utils.rs | 2 +- src/storage/src/hummock/observer_manager.rs | 6 +- .../src/hummock/store/hummock_storage.rs | 2 +- src/storage/src/store_impl.rs | 2 +- 24 files changed, 53 insertions(+), 387 deletions(-) delete mode 100644 src/common/metrics/src/monitor/my_stats.rs diff --git a/src/common/common_service/src/lib.rs b/src/common/common_service/src/lib.rs index c09c84012819b..2cf9a56e076f3 100644 --- a/src/common/common_service/src/lib.rs +++ b/src/common/common_service/src/lib.rs @@ -18,9 +18,13 @@ #![feature(impl_trait_in_assoc_type)] #![feature(error_generic_member_access)] -pub mod metrics_manager; -pub mod observer_manager; +mod metrics_manager; +mod observer_manager; +mod tracing; pub use metrics_manager::MetricsManager; - -pub mod tracing; +pub use observer_manager::{ + Channel, NotificationClient, ObserverError, ObserverManager, ObserverState, + RpcNotificationClient, +}; +pub use tracing::TracingExtractLayer; diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index e760a0e16866c..bf7e457be8b1c 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -22,42 +22,6 @@ use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tonic::{Status, Streaming}; -pub trait SubscribeTypeEnum { - fn subscribe_type() -> SubscribeType; -} - -pub struct SubscribeFrontend {} - -impl SubscribeTypeEnum for SubscribeFrontend { - fn subscribe_type() -> SubscribeType { - SubscribeType::Frontend - } -} - -pub struct SubscribeHummock {} - -impl SubscribeTypeEnum for SubscribeHummock { - fn subscribe_type() -> SubscribeType { - SubscribeType::Hummock - } -} - -pub struct SubscribeCompactor {} - -impl SubscribeTypeEnum for SubscribeCompactor { - fn subscribe_type() -> SubscribeType { - SubscribeType::Compactor - } -} - -pub struct SubscribeCompute {} - -impl SubscribeTypeEnum for SubscribeCompute { - fn subscribe_type() -> SubscribeType { - SubscribeType::Compute - } -} - /// `ObserverManager` is used to update data based on notification from meta. /// Call `start` to spawn a new asynchronous task /// We can write the notification logic by implementing `ObserverNodeImpl`. @@ -68,7 +32,7 @@ pub struct ObserverManager { } pub trait ObserverState: Send + 'static { - type SubscribeType: SubscribeTypeEnum; + fn subscribe_type() -> SubscribeType; /// modify data after receiving notification from meta fn handle_notification(&mut self, resp: SubscribeResponse); @@ -109,10 +73,7 @@ where S: ObserverState, { pub async fn new(client: T, observer_states: S) -> Self { - let rx = client - .subscribe(S::SubscribeType::subscribe_type()) - .await - .unwrap(); + let rx = client.subscribe(S::subscribe_type()).await.unwrap(); Self { rx, client, @@ -214,11 +175,7 @@ where /// `re_subscribe` is used to re-subscribe to the meta's notification. async fn re_subscribe(&mut self) { loop { - match self - .client - .subscribe(S::SubscribeType::subscribe_type()) - .await - { + match self.client.subscribe(S::subscribe_type()).await { Ok(rx) => { tracing::debug!("re-subscribe success"); self.rx = rx; diff --git a/src/common/metrics/src/monitor/connection.rs b/src/common/metrics/src/monitor/connection.rs index 295fb6399ba4b..e5774a3f16d7d 100644 --- a/src/common/metrics/src/monitor/connection.rs +++ b/src/common/metrics/src/monitor/connection.rs @@ -587,28 +587,6 @@ impl tonic::transport::server::Router { } } -#[cfg(not(madsim))] -pub fn monitored_tcp_incoming( - listen_addr: std::net::SocketAddr, - connection_type: impl Into, - config: TcpConfig, -) -> Result< - MonitoredConnection, - Box, -> { - let incoming = tonic::transport::server::TcpIncoming::new( - listen_addr, - config.tcp_nodelay, - config.keepalive_duration, - )?; - Ok(MonitoredConnection::new( - incoming, - MonitorNewConnectionImpl { - connection_type: connection_type.into(), - }, - )) -} - #[derive(Clone)] pub struct MonitorNewConnectionImpl { connection_type: String, diff --git a/src/common/metrics/src/monitor/mod.rs b/src/common/metrics/src/monitor/mod.rs index 10b5c966e636a..316cac9ea907c 100644 --- a/src/common/metrics/src/monitor/mod.rs +++ b/src/common/metrics/src/monitor/mod.rs @@ -12,20 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod connection; -pub mod my_stats; -pub mod process; -pub mod rwlock; +pub use connection::{monitor_connector, EndpointExt, RouterExt, TcpConfig}; +pub use rwlock::MonitoredRwLock; -use std::sync::LazyLock; +mod connection; +mod process; +mod rwlock; -use prometheus::core::{ - AtomicI64, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, Metric, -}; -use prometheus::{Histogram, HistogramVec, Registry}; +use std::sync::LazyLock; -use crate::monitor::my_stats::MyHistogram; -use crate::monitor::process::monitor_process; +use prometheus::Registry; #[cfg(target_os = "linux")] static PAGESIZE: std::sync::LazyLock = @@ -35,59 +31,8 @@ static PAGESIZE: std::sync::LazyLock = pub static CLOCK_TICK: std::sync::LazyLock = std::sync::LazyLock::new(|| unsafe { libc::sysconf(libc::_SC_CLK_TCK) as u64 }); -/// Define extension method `print` used in `print_statistics`. -pub trait Print { - fn print(&self); -} - -impl Print for GenericCounter { - fn print(&self) { - let desc = &self.desc()[0].fq_name; - let counter = self.metric().get_counter().get_value() as u64; - println!("{desc} COUNT : {counter}"); - } -} - -impl Print for GenericGauge { - fn print(&self) { - let desc = &self.desc()[0].fq_name; - let counter = self.get(); - println!("{desc} COUNT : {counter}"); - } -} - -impl Print for Histogram { - fn print(&self) { - let desc = &self.desc()[0].fq_name; - - let histogram = MyHistogram::from_prom_hist(self.metric().get_histogram()); - let p50 = histogram.get_percentile(50.0); - let p95 = histogram.get_percentile(95.0); - let p99 = histogram.get_percentile(99.0); - let p100 = histogram.get_percentile(100.0); - - let sample_count = self.get_sample_count(); - let sample_sum = self.get_sample_sum(); - println!("{desc} P50 : {p50} P95 : {p95} P99 : {p99} P100 : {p100} COUNT : {sample_count} SUM : {sample_sum}"); - } -} - -impl Print for HistogramVec { - fn print(&self) { - let desc = &self.desc()[0].fq_name; - println!("{desc} {:?}", self); - } -} - -impl Print for GenericCounterVec { - fn print(&self) { - let desc = &self.desc()[0].fq_name; - println!("{desc} {:?}", self); - } -} - pub static GLOBAL_METRICS_REGISTRY: LazyLock = LazyLock::new(|| { let registry = Registry::new(); - monitor_process(®istry); + process::monitor_process(®istry); registry }); diff --git a/src/common/metrics/src/monitor/my_stats.rs b/src/common/metrics/src/monitor/my_stats.rs deleted file mode 100644 index 52c71167f2f97..0000000000000 --- a/src/common/metrics/src/monitor/my_stats.rs +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::{Display, Formatter}; - -use itertools::Itertools; -use prometheus::proto::Histogram; -use rw_iter_util::ZipEqFast; - -#[derive(Clone, Default, Debug)] -pub struct MyHistogram { - pub upper_bound_list: Vec, - pub count_list: Vec, - pub total_count: u64, - pub total_sum: f64, -} - -impl MyHistogram { - pub fn from_prom_hist(histogram: &Histogram) -> MyHistogram { - let mut upper_bound_list = Vec::new(); - let mut count_list = Vec::new(); - - let total_count = histogram.get_sample_count(); - let total_sum = histogram.get_sample_sum(); - - let buckets = histogram.get_bucket(); - for bucket in buckets { - let upper_bound = bucket.get_upper_bound(); - let count = bucket.get_cumulative_count(); - upper_bound_list.push(upper_bound); - count_list.push(count); - } - - MyHistogram { - upper_bound_list, - count_list, - total_count, - total_sum, - } - } - - pub fn from_diff(prev: &MyHistogram, cur: &MyHistogram) -> MyHistogram { - MyHistogram { - upper_bound_list: cur.upper_bound_list.clone(), - count_list: match prev.count_list.is_empty() { - true => cur.count_list.clone(), - false => prev - .count_list - .iter() - .zip_eq_fast(cur.count_list.iter()) - .map(|(&pb, &cb)| cb - pb) - .collect_vec(), - }, - total_sum: cur.total_sum - prev.total_sum, - total_count: cur.total_count - prev.total_count, - } - } - - pub fn get_percentile(&self, p: f64) -> f64 { - let sample_count = self.total_count; - - // empty bucket may appear - if sample_count == 0 { - return 0.0; - } - let threshold = (sample_count as f64 * (p / 100.0_f64)).ceil() as u64; - let mut last_upper_bound = 0.0; - let mut last_count = 0; - for (&upper_bound, &count) in self - .upper_bound_list - .iter() - .zip_eq_fast(self.count_list.iter()) - { - if count >= threshold { - // assume scale linearly within this bucket, - // return a value between last_upper_bound and upper_bound - let right_left_diff = upper_bound - last_upper_bound; - return last_upper_bound - + right_left_diff * (threshold - last_count) as f64 - / (count - last_count) as f64; - } - last_upper_bound = upper_bound; - last_count = count; - } - - 0.0 - } -} - -impl Display for MyHistogram { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - // calculate latencies statistics - let mean = self.total_sum / self.total_count as f64; - let p50 = self.get_percentile(50.0); - let p90 = self.get_percentile(90.0); - let p99 = self.get_percentile(99.0); - let p100 = self.get_percentile(100.0); - - write!( - f, - "latency: - mean: {}, - p50: {}, - p90: {}, - p99: {}, - p100: {};", - mean, p50, p90, p99, p100 - ) - } -} - -#[cfg(test)] -mod tests { - use prometheus::core::Metric; - use prometheus::{histogram_opts, register_histogram_with_registry, Registry}; - - use super::*; - - #[test] - fn test_proc_histogram_basic() { - fn new_simple_histogram(upper_bound: u64) -> MyHistogram { - let registry = Registry::new(); - let buckets = (1..=upper_bound).map(|x| x as f64).collect::>(); - let opts = histogram_opts!("test_histogram", "test_histogram", buckets); - - let histogram = register_histogram_with_registry!(opts, registry).unwrap(); - - for value in 1..=upper_bound { - histogram.observe(value as f64); - } - - MyHistogram::from_prom_hist(histogram.metric().get_histogram()) - } - - let histogram = new_simple_histogram(999); - assert_eq!(histogram.get_percentile(50.0) as u64, 500); - assert_eq!(histogram.get_percentile(90.0) as u64, 900); - assert_eq!(histogram.get_percentile(99.0) as u64, 990); - assert_eq!(histogram.get_percentile(99.9) as u64, 999); - assert_eq!(histogram.get_percentile(100.0) as u64, 999); - - let histogram = new_simple_histogram(1000); - assert_eq!(histogram.get_percentile(50.0) as u64, 500); - assert_eq!(histogram.get_percentile(90.0) as u64, 900); - assert_eq!(histogram.get_percentile(99.0) as u64, 990); - assert_eq!(histogram.get_percentile(99.9) as u64, 1000); - assert_eq!(histogram.get_percentile(100.0) as u64, 1000); - - let histogram = new_simple_histogram(9999); - assert_eq!(histogram.get_percentile(50.0) as u64, 5000); - assert_eq!(histogram.get_percentile(90.0) as u64, 9000); - assert_eq!(histogram.get_percentile(99.0) as u64, 9900); - assert_eq!(histogram.get_percentile(99.9) as u64, 9990); - assert_eq!(histogram.get_percentile(100.0) as u64, 9999); - } - - #[test] - fn test_proc_histogram_uneven_distributed() { - let registry = Registry::new(); - let buckets = vec![ - 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, - ]; - let opts = histogram_opts!("test_histogram", "test_histogram", buckets); - let histogram = register_histogram_with_registry!(opts, registry).unwrap(); - - let mut i = 0.005; - while i < 10.0 { - histogram.observe(i); - i += 0.005; - } - - let histogram = MyHistogram::from_prom_hist(histogram.metric().get_histogram()); - assert_eq!(histogram.get_percentile(50.0), 5.0); - assert_eq!(histogram.get_percentile(90.0), 9.004004004004004); - assert_eq!(histogram.get_percentile(99.0), 9.904904904904905); - assert_eq!(histogram.get_percentile(99.9), 9.994994994994995); - assert_eq!(histogram.get_percentile(100.0), 10.0); - } - - #[test] - fn test_proc_histogram_realistic() { - let registry = Registry::new(); - let buckets = vec![ - 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, - ]; - let opts = histogram_opts!("test_histogram", "test_histogram", buckets); - let histogram = register_histogram_with_registry!(opts, registry).unwrap(); - - histogram.observe(0.0012); - histogram.observe(0.0013); - histogram.observe(0.003); - - histogram.observe(0.0132); - histogram.observe(0.0143); - histogram.observe(0.0146); - histogram.observe(0.0249); - - histogram.observe(0.99); - - histogram.observe(6.11); - histogram.observe(7.833); - - let histogram = MyHistogram::from_prom_hist(histogram.metric().get_histogram()); - assert_eq!(histogram.get_percentile(50.0), 0.0175); - assert_eq!(histogram.get_percentile(90.0), 7.5); - assert_eq!(histogram.get_percentile(99.0), 10.00); - assert_eq!(histogram.get_percentile(99.9), 10.00); - assert_eq!(histogram.get_percentile(100.0), 10.00); - } -} diff --git a/src/compute/src/observer/observer_manager.rs b/src/compute/src/observer/observer_manager.rs index e4192fb0b6f9c..62e2d699668f4 100644 --- a/src/compute/src/observer/observer_manager.rs +++ b/src/compute/src/observer/observer_manager.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; -use risingwave_common_service::observer_manager::{ObserverState, SubscribeCompute}; +use risingwave_common_service::ObserverState; use risingwave_pb::meta::subscribe_response::Info; use risingwave_pb::meta::SubscribeResponse; @@ -22,7 +22,9 @@ pub struct ComputeObserverNode { } impl ObserverState for ComputeObserverNode { - type SubscribeType = SubscribeCompute; + fn subscribe_type() -> risingwave_pb::meta::SubscribeType { + risingwave_pb::meta::SubscribeType::Compute + } fn handle_notification(&mut self, resp: SubscribeResponse) { let Some(info) = resp.info.as_ref() else { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 3b0466b6cc0d1..d7dcbd5146c31 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -28,7 +28,7 @@ use risingwave_common::config::{ MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE, }; use risingwave_common::lru::init_global_sequencer_args; -use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; +use risingwave_common::monitor::{RouterExt, TcpConfig}; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::telemetry::manager::TelemetryManager; @@ -38,9 +38,7 @@ use risingwave_common::util::pretty_bytes::convert; use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; -use risingwave_common_service::metrics_manager::MetricsManager; -use risingwave_common_service::observer_manager::ObserverManager; -use risingwave_common_service::tracing::TracingExtractLayer; +use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer}; use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS; use risingwave_dml::dml_manager::DmlManager; use risingwave_pb::common::WorkerType; diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 38e84d213bf49..169c6bff1b950 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -22,7 +22,7 @@ use risingwave_common::catalog::CatalogVersion; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; -use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend}; +use risingwave_common_service::ObserverState; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::relation::RelationInfo; @@ -50,7 +50,9 @@ pub struct FrontendObserverNode { } impl ObserverState for FrontendObserverNode { - type SubscribeType = SubscribeFrontend; + fn subscribe_type() -> risingwave_pb::meta::SubscribeType { + risingwave_pb::meta::SubscribeType::Frontend + } fn handle_notification(&mut self, resp: SubscribeResponse) { let Some(info) = resp.info.as_ref() else { diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 2148656bee00b..5f0ad6d62b751 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -64,8 +64,7 @@ use risingwave_common::util::resource_util; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; -use risingwave_common_service::observer_manager::ObserverManager; -use risingwave_common_service::MetricsManager; +use risingwave_common_service::{MetricsManager, ObserverManager}; use risingwave_connector::source::monitor::{SourceMetrics, GLOBAL_SOURCE_METRICS}; use risingwave_pb::common::WorkerType; use risingwave_pb::health::health_server::HealthServer; diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 57ccad6c9b5e6..9098ee1429c82 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -21,13 +21,12 @@ use etcd_client::ConnectOptions; use futures::future::join_all; use otlp_embedded::TraceServiceServer; use regex::Regex; -use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; +use risingwave_common::monitor::{RouterExt, TcpConfig}; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled}; -use risingwave_common_service::metrics_manager::MetricsManager; -use risingwave_common_service::tracing::TracingExtractLayer; +use risingwave_common_service::{MetricsManager, TracingExtractLayer}; use risingwave_meta::barrier::StreamRpcManager; use risingwave_meta::controller::catalog::CatalogController; use risingwave_meta::controller::cluster::ClusterController; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 65eede718c5c4..51d3c6c397ea3 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arc_swap::ArcSwap; use bytes::Bytes; use itertools::Itertools; -use risingwave_common::monitor::rwlock::MonitoredRwLock; +use risingwave_common::monitor::MonitoredRwLock; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 3ed5fc01ba40c..f382a97293fa9 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -53,7 +53,7 @@ use futures::{stream, Stream, StreamExt, TryStreamExt}; use hyper::Body; use itertools::Itertools; use risingwave_common::config::ObjectStoreConfig; -use risingwave_common::monitor::connection::monitor_connector; +use risingwave_common::monitor::monitor_connector; use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; use tokio::task::JoinHandle; diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index f908bb21aa3a2..4d959ec5209a6 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -18,7 +18,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::StreamExt; use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; -use risingwave_common::monitor::connection::{EndpointExt, TcpConfig}; +use risingwave_common::monitor::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::batch_plan::{PlanFragment, TaskId, TaskOutputId}; diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index 30d78290b6d98..c81a74d2fa709 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -19,7 +19,7 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use futures::TryStreamExt; use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; -use risingwave_common::monitor::connection::{EndpointExt, TcpConfig}; +use risingwave_common::monitor::{EndpointExt, TcpConfig}; use risingwave_pb::connector_service::connector_service_client::ConnectorServiceClient; use risingwave_pb::connector_service::sink_coordinator_stream_request::{ CommitMetadata, StartCoordinator, diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 4710be7085ef6..988931cb207b6 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -19,7 +19,7 @@ use anyhow::anyhow; use async_trait::async_trait; use futures::TryStreamExt; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; -use risingwave_common::monitor::connection::{EndpointExt, TcpConfig}; +use risingwave_common::monitor::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_pb::stream_service::stream_service_client::StreamServiceClient; use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; diff --git a/src/storage/compactor/src/compactor_observer/observer_manager.rs b/src/storage/compactor/src/compactor_observer/observer_manager.rs index 3a37fd6060595..97d78f6c2849f 100644 --- a/src/storage/compactor/src/compactor_observer/observer_manager.rs +++ b/src/storage/compactor/src/compactor_observer/observer_manager.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; -use risingwave_common_service::observer_manager::{ObserverState, SubscribeCompactor}; +use risingwave_common_service::ObserverState; use risingwave_pb::catalog::Table; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -32,7 +32,9 @@ pub struct CompactorObserverNode { } impl ObserverState for CompactorObserverNode { - type SubscribeType = SubscribeCompactor; + fn subscribe_type() -> risingwave_pb::meta::SubscribeType { + risingwave_pb::meta::SubscribeType::Compactor + } fn handle_notification(&mut self, resp: SubscribeResponse) { let Some(info) = resp.info.as_ref() else { diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 8d246016858ef..e139bc201cd48 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -19,7 +19,7 @@ use std::time::Duration; use risingwave_common::config::{ extract_storage_memory_config, load_config, AsyncStackTraceOption, MetricLevel, RwConfig, }; -use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; +use risingwave_common::monitor::{RouterExt, TcpConfig}; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader}; use risingwave_common::telemetry::manager::TelemetryManager; @@ -29,8 +29,7 @@ use risingwave_common::util::resource_util::memory::system_memory_available_byte use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; -use risingwave_common_service::metrics_manager::MetricsManager; -use risingwave_common_service::observer_manager::ObserverManager; +use risingwave_common_service::{MetricsManager, ObserverManager}; use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::GLOBAL_OBJECT_STORE_METRICS; use risingwave_pb::common::WorkerType; diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index d8a2a2f0c24bd..fe9e5874e328e 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -19,7 +19,7 @@ use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::catalog::TableId; use risingwave_common::util::addr::HostAddr; -use risingwave_common_service::observer_manager::{Channel, NotificationClient, ObserverError}; +use risingwave_common_service::{Channel, NotificationClient, ObserverError}; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult}; use risingwave_hummock_trace::{ diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index e3c19f54e4340..96f237704abf5 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -28,7 +28,7 @@ pub(crate) mod tests { use risingwave_common::constants::hummock::CompactionFilterFlag; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, Epoch, EpochExt}; - use risingwave_common_service::observer_manager::NotificationClient; + use risingwave_common_service::NotificationClient; use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ diff --git a/src/storage/hummock_test/src/mock_notification_client.rs b/src/storage/hummock_test/src/mock_notification_client.rs index 68c75d57937a2..906781a7725b2 100644 --- a/src/storage/hummock_test/src/mock_notification_client.rs +++ b/src/storage/hummock_test/src/mock_notification_client.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use risingwave_common::util::addr::HostAddr; -use risingwave_common_service::observer_manager::{Channel, NotificationClient, ObserverError}; +use risingwave_common_service::{Channel, NotificationClient, ObserverError}; use risingwave_meta::hummock::{HummockManager, HummockManagerRef}; use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey}; use risingwave_pb::backup_service::MetaBackupManifestId; diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index 5a2034facdbfa..bf5c4a8dd8d8c 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common_service::observer_manager::ObserverManager; +use risingwave_common_service::ObserverManager; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::TableKey; pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str}; diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index 0725424aaca76..a9171005aeaa9 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use risingwave_common_service::observer_manager::{ObserverState, SubscribeHummock}; +use risingwave_common_service::ObserverState; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_trace::TraceSpan; use risingwave_pb::catalog::Table; @@ -38,7 +38,9 @@ pub struct HummockObserverNode { } impl ObserverState for HummockObserverNode { - type SubscribeType = SubscribeHummock; + fn subscribe_type() -> risingwave_pb::meta::SubscribeType { + risingwave_pb::meta::SubscribeType::Hummock + } fn handle_notification(&mut self, resp: SubscribeResponse) { let Some(info) = resp.info.as_ref() else { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 7b28588c5054a..f7794604e5a8b 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use more_asserts::assert_gt; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::is_max_epoch; -use risingwave_common_service::observer_manager::{NotificationClient, ObserverManager}; +use risingwave_common_service::{NotificationClient, ObserverManager}; use risingwave_hummock_sdk::key::{ is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, }; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 8dd718324d9a5..20a146ce04f10 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -22,7 +22,7 @@ use foyer::{ DirectFsDeviceOptionsBuilder, HybridCacheBuilder, RateLimitPicker, RuntimeConfigBuilder, }; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; -use risingwave_common_service::observer_manager::RpcNotificationClient; +use risingwave_common_service::RpcNotificationClient; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_object_store::object::build_remote_object_store;