diff --git a/proto/meta.proto b/proto/meta.proto index 53cf83a28fb2d..3f1e5cdbbdf65 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -508,7 +508,8 @@ message SystemParams { optional string data_directory = 7; optional string backup_storage_url = 8; optional string backup_storage_directory = 9; - optional bool telemetry_enabled = 10; + // Deprecated. Use config file instead. + optional bool telemetry_enabled = 10 [deprecated = true]; optional uint32 parallel_compact_size_mb = 11; optional uint32 max_concurrent_creating_streaming_jobs = 12; optional bool pause_on_next_bootstrap = 13; diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 042a7f1708d4c..6381fc7ac4342 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -736,8 +736,9 @@ pub struct BatchDeveloperConfig { pub chunk_size: usize, } -/// The section `[system]` in `risingwave.toml`. This section is only for testing purpose and should -/// not be documented. +/// The section `[system]` in `risingwave.toml`. All these fields are used to initialize the system +/// parameters persisted in Meta store. Most fields are for testing purpose only and should not be +/// documented. #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct SystemConfig { /// The interval of periodic barrier. @@ -778,9 +779,6 @@ pub struct SystemConfig { #[serde(default = "default::system::backup_storage_directory")] pub backup_storage_directory: Option<String>, - #[serde(default = "default::system::telemetry_enabled")] - pub telemetry_enabled: Option<bool>, - /// Max number of concurrent creating streaming jobs. #[serde(default = "default::system::max_concurrent_creating_streaming_jobs")] pub max_concurrent_creating_streaming_jobs: Option<u32>, @@ -791,6 +789,7 @@ pub struct SystemConfig { } impl SystemConfig { + #![allow(deprecated)] pub fn into_init_system_params(self) -> SystemParams { SystemParams { barrier_interval_ms: self.barrier_interval_ms, @@ -803,9 +802,9 @@ impl SystemConfig { data_directory: self.data_directory, backup_storage_url: self.backup_storage_url, backup_storage_directory: self.backup_storage_directory, - telemetry_enabled: self.telemetry_enabled, max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs, pause_on_next_bootstrap: self.pause_on_next_bootstrap, + telemetry_enabled: None, // deprecated } } } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index ccb666367d729..fac23532c9b79 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -54,7 +54,6 @@ macro_rules! for_all_undeprecated_params { { data_directory, String, None, false }, { backup_storage_url, String, Some("memory".to_string()), false }, { backup_storage_directory, String, Some("backup".to_string()), false }, - { telemetry_enabled, bool, Some(true), true }, { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true }, { pause_on_next_bootstrap, bool, Some(false), true }, $({ $field, $type, $default, $is_mutable },)* @@ -369,7 +368,6 @@ mod tests { (DATA_DIRECTORY_KEY, "a"), (BACKUP_STORAGE_URL_KEY, "a"), (BACKUP_STORAGE_DIRECTORY_KEY, "a"), - (TELEMETRY_ENABLED_KEY, "false"), (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"), (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"), ]; diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 91cc893823b66..643905c89f919 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -76,10 +76,6 @@ impl SystemParamsReader { self.prost.max_concurrent_creating_streaming_jobs.unwrap() } - pub fn telemetry_enabled(&self) -> bool { - self.prost.telemetry_enabled.unwrap() - } - pub fn pause_on_next_bootstrap(&self) -> bool { self.prost.pause_on_next_bootstrap.unwrap_or(false) } diff --git a/src/common/src/telemetry/manager.rs b/src/common/src/telemetry/manager.rs index 5fbf15fd1d1ac..a03f598ce613c 100644 --- a/src/common/src/telemetry/manager.rs +++ b/src/common/src/telemetry/manager.rs @@ -12,153 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use parking_lot::Mutex; -use tokio::select; -use tokio::sync::oneshot::{self, Sender}; -use tokio::sync::watch::Receiver; +use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; use super::report::{start_telemetry_reporting, TelemetryInfoFetcher, TelemetryReportCreator}; -use crate::system_param::local_manager::SystemParamsReaderRef; -use crate::telemetry::telemetry_env_enabled; pub struct TelemetryManager<F, I> where F: TelemetryReportCreator + Send + Sync + 'static, I: TelemetryInfoFetcher + Send + Sync + 'static, { - core: Arc<TelemetryManagerCore<F, I>>, - sys_params_change_rx: Receiver<SystemParamsReaderRef>, -} - -impl<F, I> TelemetryManager<F, I> -where - F: TelemetryReportCreator + Send + Sync + 'static, - I: TelemetryInfoFetcher + Send + Sync + 'static, -{ - pub fn new( - sys_params_change_rx: Receiver<SystemParamsReaderRef>, - info_fetcher: Arc<I>, - report_creator: Arc<F>, - ) -> Self { - Self { - core: Arc::new(TelemetryManagerCore::new(info_fetcher, report_creator)), - sys_params_change_rx, - } - } - - pub async fn start_telemetry_reporting(&self) { - self.core.start().await; - } - - pub fn watch_params_change(self) -> (JoinHandle<()>, Sender<()>) { - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let core = self.core.clone(); - let mut sys_params_change_rx = self.sys_params_change_rx; - let watch_fn = async move { - loop { - select! { - Ok(_) = sys_params_change_rx.changed() => { - let telemetry_enabled = { - let params = sys_params_change_rx.borrow().load(); - // check both environment variable and system params - // if either is false, then stop telemetry - params.telemetry_enabled() && telemetry_env_enabled() - }; - - let telemetry_running = { - core.telemetry_running() - }; - - match (telemetry_running, telemetry_enabled) { - (false, true) => { - tracing::info!("telemetry config changed to true, start reporting"); - } - (true, false) => { - tracing::info!("telemetry config changed to false, stop reporting"); - core.stop(); - } - _ => {} - }; - } - , - _ = &mut shutdown_rx =>{ - tracing::info!("Telemetry exit"); - return; - } - } - } - }; - - let handle = tokio::spawn(watch_fn); - (handle, shutdown_tx) - } -} - -struct TelemetryManagerCore<F, I> -where - F: TelemetryReportCreator + Send + Sync + 'static, - I: TelemetryInfoFetcher + Send + Sync + 'static, -{ - telemetry_handle: Mutex<Option<JoinHandle<()>>>, - telemetry_shutdown_tx: Mutex<Option<Sender<()>>>, - telemetry_running: Arc<AtomicBool>, info_fetcher: Arc<I>, report_creator: Arc<F>, } -impl<F, I> TelemetryManagerCore<F, I> +impl<F, I> TelemetryManager<F, I> where F: TelemetryReportCreator + Send + Sync + 'static, I: TelemetryInfoFetcher + Send + Sync + 'static, { - fn new(info_fetcher: Arc<I>, report_creator: Arc<F>) -> Self { + pub fn new(info_fetcher: Arc<I>, report_creator: Arc<F>) -> Self { Self { - telemetry_handle: Mutex::new(None), - telemetry_shutdown_tx: Mutex::new(None), - telemetry_running: Arc::new(AtomicBool::new(false)), info_fetcher, report_creator, } } - fn telemetry_running(&self) -> bool { - self.telemetry_running.load(Ordering::Relaxed) - } - - async fn start(&self) { - if self.telemetry_running() { - return; - } - - let (handle, tx) = - start_telemetry_reporting(self.info_fetcher.clone(), self.report_creator.clone()).await; - let mut handle_guard = self.telemetry_handle.lock(); - *handle_guard = Some(handle); - let mut shutdown_tx_gurad = self.telemetry_shutdown_tx.lock(); - *shutdown_tx_gurad = Some(tx); - self.telemetry_running.store(true, Ordering::Relaxed); - } - - fn stop(&self) { - match ( - self.telemetry_running.load(Ordering::Relaxed), - self.telemetry_shutdown_tx.lock().take(), - self.telemetry_handle.lock().take(), - ) { - (true, Some(shutdown_rx), Some(_)) => { - if let Err(()) = shutdown_rx.send(()) { - tracing::error!("telemetry mgr failed to send stop signal"); - } else { - self.telemetry_running.store(false, Ordering::Relaxed) - } - } - // do nothing if telemetry is not running - (false, None, None) => {} - _ => unreachable!("impossible telemetry handler"), - } + pub async fn start(&self) -> (JoinHandle<()>, Sender<()>) { + start_telemetry_reporting(self.info_fetcher.clone(), self.report_creator.clone()).await } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 46340f9f4c75d..aef6895ad6b5b 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -302,8 +302,6 @@ pub async fn compute_node_serve( // of lru manager. stream_mgr.set_watermark_epoch(watermark_epoch).await; - let telemetry_enabled = system_params.telemetry_enabled(); - let grpc_await_tree_reg = await_tree_config .map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into())); let dml_mgr = Arc::new(DmlManager::new( @@ -379,7 +377,6 @@ pub async fn compute_node_serve( let health_srv = HealthServiceImpl::new(); let telemetry_manager = TelemetryManager::new( - system_params_manager.watch_params(), Arc::new(meta_client.clone()), Arc::new(ComputeTelemetryCreator::new()), ); @@ -387,12 +384,7 @@ pub async fn compute_node_serve( // if the toml config file or env variable disables telemetry, do not watch system params change // because if any of configs disable telemetry, we should never start it if config.server.telemetry_enabled && telemetry_env_enabled() { - // if all configs are true, start reporting - if telemetry_enabled { - telemetry_manager.start_telemetry_reporting().await; - } - // if config and env are true, starting watching - sub_tasks.push(telemetry_manager.watch_params_change()); + sub_tasks.push(telemetry_manager.start().await); } else { tracing::info!("Telemetry didn't start due to config"); } diff --git a/src/config/ci-delete-range-test.toml b/src/config/ci-delete-range-test.toml index 4408633fd09f3..703390b658541 100644 --- a/src/config/ci-delete-range-test.toml +++ b/src/config/ci-delete-range-test.toml @@ -1,6 +1,8 @@ [meta] vacuum_interval_sec = 10 -[system] +[server] telemetry_enabled = false + +[system] max_concurrent_creating_streaming_jobs = 0 diff --git a/src/config/ci-sim.toml b/src/config/ci-sim.toml index 94f28684418c8..9535ff83696cb 100644 --- a/src/config/ci-sim.toml +++ b/src/config/ci-sim.toml @@ -3,7 +3,6 @@ telemetry_enabled = false metrics_level = "Disabled" [system] -telemetry_enabled = false max_concurrent_creating_streaming_jobs = 0 [meta] diff --git a/src/config/example.toml b/src/config/example.toml index 4789f03901649..0e6d50e8f3f1e 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -151,6 +151,5 @@ block_size_kb = 64 bloom_false_positive = 0.001 backup_storage_url = "memory" backup_storage_directory = "backup" -telemetry_enabled = true max_concurrent_creating_streaming_jobs = 1 pause_on_next_bootstrap = false diff --git a/src/config/full-iceberg-bench.toml b/src/config/full-iceberg-bench.toml index 6b0ea54eef72b..581bcf84644e2 100644 --- a/src/config/full-iceberg-bench.toml +++ b/src/config/full-iceberg-bench.toml @@ -146,5 +146,4 @@ block_size_kb = 64 bloom_false_positive = 0.001 backup_storage_url = "memory" backup_storage_directory = "backup" -telemetry_enabled = true max_concurrent_creating_streaming_jobs = 1 diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index f4783f0f5a22a..9f9390cb629c2 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -251,8 +251,6 @@ impl FrontendEnv { user_info_updated_rx, )); - let telemetry_enabled = system_params_reader.telemetry_enabled(); - let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader.clone())); let frontend_observer_node = FrontendObserverNode::new( @@ -285,7 +283,6 @@ impl FrontendEnv { let host = opts.health_check_listener_addr.clone(); let telemetry_manager = TelemetryManager::new( - system_params_manager.watch_params(), Arc::new(meta_client.clone()), Arc::new(FrontendTelemetryCreator::new()), ); @@ -293,14 +290,9 @@ impl FrontendEnv { // if the toml config file or env variable disables telemetry, do not watch system params // change because if any of configs disable telemetry, we should never start it if config.server.telemetry_enabled && telemetry_env_enabled() { - if telemetry_enabled { - telemetry_manager.start_telemetry_reporting().await; - } - let (telemetry_join_handle, telemetry_shutdown_sender) = - telemetry_manager.watch_params_change(); - - join_handles.push(telemetry_join_handle); - shutdown_senders.push(telemetry_shutdown_sender); + let (join_handle, shutdown_sender) = telemetry_manager.start().await; + join_handles.push(join_handle); + shutdown_senders.push(shutdown_sender); } else { tracing::info!("Telemetry didn't start due to config"); } diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index e0e0472109286..5f85a494c0624 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -22,7 +22,6 @@ use futures::future::join_all; use itertools::Itertools; use regex::Regex; use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; -use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common_service::metrics_manager::MetricsManager; @@ -648,10 +647,7 @@ pub async fn start_service_as_election_leader( }); sub_tasks.push((stream_abort_handler, abort_sender)); - let local_system_params_manager = LocalSystemParamsManager::new(system_params_reader.clone()); - - let mgr = TelemetryManager::new( - local_system_params_manager.watch_params(), + let telemetry_manager = TelemetryManager::new( Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())), Arc::new(MetaReportCreator::new( cluster_manager, @@ -661,10 +657,7 @@ pub async fn start_service_as_election_leader( // May start telemetry reporting if env.opts.telemetry_enabled && telemetry_env_enabled() { - if system_params_reader.telemetry_enabled() { - mgr.start_telemetry_reporting().await; - } - sub_tasks.push(mgr.watch_params_change()); + sub_tasks.push(telemetry_manager.start().await); } else { tracing::info!("Telemetry didn't start due to meta backend or config"); } diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 029705ef3c204..dd953b87c7af9 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -156,8 +156,6 @@ pub async fn compactor_serve( meta_cache_capacity_bytes, )); - let telemetry_enabled = system_params_reader.telemetry_enabled(); - let filter_key_extractor_manager = Arc::new(RpcFilterKeyExtractorManager::new(Box::new( RemoteTableAccessor::new(meta_client.clone()), ))); @@ -225,17 +223,13 @@ pub async fn compactor_serve( ]; let telemetry_manager = TelemetryManager::new( - system_params_manager.watch_params(), Arc::new(meta_client.clone()), Arc::new(CompactorTelemetryCreator::new()), ); // if the toml config file or env variable disables telemetry, do not watch system params change // because if any of configs disable telemetry, we should never start it if config.server.telemetry_enabled && telemetry_env_enabled() { - if telemetry_enabled { - telemetry_manager.start_telemetry_reporting().await; - } - sub_tasks.push(telemetry_manager.watch_params_change()); + sub_tasks.push(telemetry_manager.start().await); } else { tracing::info!("Telemetry didn't start due to config"); } diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index 8ca23d2ba0327..7a000c914e3a9 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -85,8 +85,8 @@ async fn run_replay(args: Args) -> Result<()> { async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalReplay> { let config = load_config(&args.config, NoOverride); let storage_memory_config = extract_storage_memory_config(&config); - let system = config.system.clone(); - let system_params_reader = SystemParamsReader::from(system.into_init_system_params()); + let system_params_reader = + SystemParamsReader::from(config.system.clone().into_init_system_params()); let storage_opts = Arc::new(StorageOpts::from(( &config,