diff --git a/proto/telemetry.proto b/proto/telemetry.proto index 06e161506db15..162ab80648aba 100644 --- a/proto/telemetry.proto +++ b/proto/telemetry.proto @@ -7,8 +7,7 @@ option go_package = "risingwavelabs.com/risingwave/proto/telemetry"; enum MetaBackend { META_BACKEND_UNSPECIFIED = 0; META_BACKEND_MEMORY = 1; - reserved 2; - reserved "META_BACKEND_ETCD"; + META_BACKEND_ETCD = 2; META_BACKEND_RDB = 3; } @@ -167,3 +166,7 @@ message EventMessage { // mark the event is a test message bool is_test = 11; } + +message BatchEventMessage { + repeated EventMessage events = 1; +} diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs index d4b1fd27335df..f4737038863b0 100644 --- a/src/common/src/telemetry/report.rs +++ b/src/common/src/telemetry/report.rs @@ -14,13 +14,18 @@ use std::sync::Arc; -use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid; +use risingwave_pb::telemetry::PbEventMessage; pub use risingwave_telemetry_event::{ - current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID, + current_timestamp, do_telemetry_event_report, post_telemetry_report_pb, + TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID, +}; +use risingwave_telemetry_event::{ + get_telemetry_risingwave_cloud_uuid, TELEMETRY_EVENT_REPORT_STASH_SIZE, + TELEMETRY_EVENT_REPORT_TX, }; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -use tokio::time::{interval, Duration}; +use tokio::time::{interval as tokio_interval_fn, Duration}; use uuid::Uuid; use super::{Result, TELEMETRY_REPORT_INTERVAL}; @@ -60,9 +65,13 @@ where let begin_time = std::time::Instant::now(); let session_id = Uuid::new_v4().to_string(); - let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); + let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut event_interval = + tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL)); + event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // fetch telemetry tracking_id from the meta node only at the beginning // There is only one case tracking_id updated at the runtime ---- metastore data has been // cleaned. There is no way that metastore has been cleaned but nodes are still running @@ -91,9 +100,29 @@ where ) }); + let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::(); + TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| { + tracing::warn!( + "Telemetry failed to set event reporting tx, event reporting will be disabled" + ); + }); + let mut event_stash = Vec::new(); + loop { tokio::select! { _ = interval.tick() => {}, + event = event_rx.recv() => { + debug_assert!(event.is_some()); + event_stash.push(event.unwrap()); + if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE { + do_telemetry_event_report(&mut event_stash).await; + } + continue; + } + _ = event_interval.tick() => { + do_telemetry_event_report(&mut event_stash).await; + continue; + }, _ = &mut shutdown_rx => { tracing::info!("Telemetry exit"); return; diff --git a/src/common/telemetry_event/src/lib.rs b/src/common/telemetry_event/src/lib.rs index 0be5f40e0de1c..18874f4818627 100644 --- a/src/common/telemetry_event/src/lib.rs +++ b/src/common/telemetry_event/src/lib.rs @@ -21,9 +21,11 @@ use std::sync::OnceLock; use prost::Message; use risingwave_pb::telemetry::{ - EventMessage as PbEventMessage, PbTelemetryDatabaseObject, + EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject, TelemetryEventStage as PbTelemetryEventStage, }; +use thiserror_ext::AsReport; +use tokio::sync::mpsc::UnboundedSender; pub use util::*; pub type TelemetryResult = core::result::Result; @@ -32,6 +34,7 @@ pub type TelemetryResult = core::result::Result; pub type TelemetryError = String; pub static TELEMETRY_TRACKING_ID: OnceLock = OnceLock::new(); +pub static TELEMETRY_EVENT_REPORT_TX: OnceLock> = OnceLock::new(); pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report"; @@ -42,6 +45,21 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option { env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok() } +pub async fn do_telemetry_event_report(event_stash: &mut Vec) { + const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; // the batch report url + let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); + let batch_message = PbBatchEventMessage { + events: std::mem::take(event_stash), + }; + + post_telemetry_report_pb(&url, batch_message.encode_to_vec()) + .await + .unwrap_or_else(|e| tracing::debug!("{}", e)); +} + +pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds +pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; // 100 events to trigger a report action + pub fn report_event_common( event_stage: PbTelemetryEventStage, event_name: &str, @@ -95,15 +113,12 @@ pub fn request_to_telemetry_event( node, is_test, }; - let report_bytes = event.encode_to_vec(); - - tokio::spawn(async move { - const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; - let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); - post_telemetry_report_pb(&url, report_bytes) - .await - .unwrap_or_else(|e| tracing::info!("{}", e)) - }); + + if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() { + let _ = tx.send(event).inspect_err(|e| { + tracing::warn!("Failed to send telemetry event queue: {}", e.as_report()) + }); + } } #[cfg(test)]