From 9f3af5a8b196be82030608a126e0251edbf9e36b Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Mon, 29 Jul 2024 11:54:28 +0800 Subject: [PATCH] feat(telemetry): support report event to telemetry (#17486) (#17835) Signed-off-by: tabVersion Signed-off-by: tabversion Co-authored-by: tabversion --- Cargo.lock | 4 + proto/telemetry.proto | 41 +++++++ src/common/src/telemetry/report.rs | 114 +++++++++++++++++- src/compute/Cargo.toml | 1 + src/connector/src/sink/catalog/mod.rs | 18 ++- src/frontend/src/telemetry.rs | 23 +++- src/meta/Cargo.toml | 21 ++-- src/meta/src/barrier/mod.rs | 28 ++++- src/meta/src/telemetry.rs | 24 +++- src/storage/compactor/Cargo.toml | 15 +-- src/storage/compactor/src/telemetry.rs | 23 +++- src/stream/Cargo.toml | 23 ++-- src/stream/src/from_proto/sink.rs | 32 ++++- src/stream/src/from_proto/source/mod.rs | 42 +++++++ .../src/from_proto/source/trad_source.rs | 25 ++-- src/stream/src/from_proto/source_backfill.rs | 2 + src/stream/src/lib.rs | 1 + src/stream/src/telemetry.rs | 37 ++++++ 18 files changed, 421 insertions(+), 53 deletions(-) create mode 100644 src/stream/src/telemetry.rs diff --git a/Cargo.lock b/Cargo.lock index b5811e323fa39..da9f88c5576e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10716,6 +10716,7 @@ dependencies = [ "async-trait", "await-tree", "clap", + "jsonbb", "madsim-tokio", "madsim-tonic", "parking_lot 0.12.1", @@ -10774,6 +10775,7 @@ dependencies = [ "tokio-stream", "tower", "tracing", + "uuid", "workspace-hack", ] @@ -11398,6 +11400,7 @@ dependencies = [ "hex", "hyper 0.14.27", "itertools 0.12.1", + "jsonbb", "madsim-etcd-client", "madsim-tokio", "madsim-tonic", @@ -11923,6 +11926,7 @@ dependencies = [ "governor", "hytra", "itertools 0.12.1", + "jsonbb", "local_stats_alloc", "lru 0.7.6", "madsim-tokio", diff --git a/proto/telemetry.proto b/proto/telemetry.proto index 9890f48250538..c7ac6bb387bb1 100644 --- a/proto/telemetry.proto +++ b/proto/telemetry.proto @@ -111,3 +111,44 @@ message FrontendReport { message CompactorReport { ReportBase base = 1; } + +enum TelemetryEventStage { + TELEMETRY_EVENT_STAGE_UNSPECIFIED = 0; + TELEMETRY_EVENT_STAGE_CREATE_STREAM_JOB = 1; + TELEMETRY_EVENT_STAGE_UPDATE_STREAM_JOB = 2; + TELEMETRY_EVENT_STAGE_DROP_STREAM_JOB = 3; + TELEMETRY_EVENT_STAGE_QUERY = 4; + TELEMETRY_EVENT_STAGE_RECOVERY = 5; +} + +enum TelemetryDatabaseObject { + TELEMETRY_DATABASE_OBJECT_UNSPECIFIED = 0; + TELEMETRY_DATABASE_OBJECT_SOURCE = 1; + TELEMETRY_DATABASE_OBJECT_MV = 2; + TELEMETRY_DATABASE_OBJECT_TABLE = 3; + TELEMETRY_DATABASE_OBJECT_SINK = 4; + TELEMETRY_DATABASE_OBJECT_INDEX = 5; +} + +message EventMessage { + // tracking_id is persistent in meta data + string tracking_id = 1; + // event_time is when the event is created + uint64 event_time_sec = 3; + // event_stage describes in which process the event happens + TelemetryEventStage event_stage = 4; + // feature_name is the name of the feature triggered the event + string event_name = 5; + // connector_name is the name of the connector involves + optional string connector_name = 6; + // connector_direction is the direction of data flow, can be source or sink + optional TelemetryDatabaseObject object = 10; + // catalog_id is the id of the catalog involves (table_id/source_id/...) + int64 catalog_id = 7; + // attributes is the additional information of the event: json format ser to string + optional string attributes = 8; + // node is the node that creates the event + string node = 9; + // mark the event is a test message + bool is_test = 11; +} diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs index 80c5cfa14d114..38d1c48065635 100644 --- a/src/common/src/telemetry/report.rs +++ b/src/common/src/telemetry/report.rs @@ -12,14 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; +use prost::Message; +use risingwave_pb::telemetry::{ + EventMessage as PbEventMessage, PbTelemetryDatabaseObject, + TelemetryEventStage as PbTelemetryEventStage, +}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; use tokio::time::{interval, Duration}; use uuid::Uuid; -use super::{Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL}; +use super::{current_timestamp, Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL}; use crate::telemetry::pb_compatible::TelemetryToProtobuf; use crate::telemetry::post_telemetry_report_pb; @@ -42,6 +47,8 @@ pub trait TelemetryReportCreator { fn report_type(&self) -> &str; } +static TELEMETRY_TRACKING_ID: OnceLock = OnceLock::new(); + pub async fn start_telemetry_reporting( info_fetcher: Arc, report_creator: Arc, @@ -74,6 +81,13 @@ where return; } }; + TELEMETRY_TRACKING_ID + .set(tracking_id.clone()) + .unwrap_or_else(|_| { + tracing::warn!( + "Telemetry failed to set tracking_id, event reporting will be disabled" + ) + }); loop { tokio::select! { @@ -112,3 +126,99 @@ where }); (join_handle, shutdown_tx) } + +pub fn report_event_common( + event_stage: PbTelemetryEventStage, + event_name: &str, + catalog_id: i64, + connector_name: Option, + object: Option, + attributes: Option, // any json string + node: String, +) { + let event_tracking_id: String; + if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() { + event_tracking_id = tracking_id.to_string(); + } else { + tracing::info!("Telemetry tracking_id is not set, event reporting disabled"); + return; + } + + request_to_telemetry_event( + event_tracking_id, + event_stage, + event_name, + catalog_id, + connector_name, + object, + attributes, + node, + false, + ); +} + +fn request_to_telemetry_event( + tracking_id: String, + event_stage: PbTelemetryEventStage, + event_name: &str, + catalog_id: i64, + connector_name: Option, + object: Option, + attributes: Option, // any json string + node: String, + is_test: bool, +) { + let event = PbEventMessage { + tracking_id, + event_time_sec: current_timestamp(), + event_stage: event_stage as i32, + event_name: event_name.to_string(), + connector_name, + object: object.map(|c| c as i32), + catalog_id, + attributes: attributes.map(|a| a.to_string()), + node, + is_test, + }; + let report_bytes = event.encode_to_vec(); + + tokio::spawn(async move { + const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; + let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); + post_telemetry_report_pb(&url, report_bytes) + .await + .unwrap_or_else(|e| tracing::info!("{}", e)) + }); +} + +#[cfg(test)] +mod test { + + use super::*; + + #[ignore] + #[tokio::test] + async fn test_telemetry_report_event() { + let event_stage = PbTelemetryEventStage::CreateStreamJob; + let event_name = "test_feature"; + let catalog_id = 1; + let connector_name = Some("test_connector".to_string()); + let object = Some(PbTelemetryDatabaseObject::Source); + let attributes = None; + let node = "test_node".to_string(); + + request_to_telemetry_event( + "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), + event_stage, + event_name, + catalog_id, + connector_name, + object, + attributes, + node, + true, + ); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } +} diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 817578db32631..a3f74792982f2 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -58,6 +58,7 @@ tokio-stream = "0.1" tonic = { workspace = true } tower = { version = "0.4", features = ["util", "load-shed"] } tracing = "0.1" +uuid = { version = "1.8.0", features = ["v4"] } [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 5adda694703c6..2a7244e5800ab 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -15,6 +15,7 @@ pub mod desc; use std::collections::BTreeMap; +use std::fmt::{Display, Formatter}; use anyhow::anyhow; use itertools::Itertools; @@ -28,6 +29,7 @@ use risingwave_pb::catalog::{ PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus, }; use risingwave_pb::secret::PbSecretRef; +use serde_derive::Serialize; use super::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, @@ -125,15 +127,21 @@ pub struct SinkFormatDesc { } /// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub enum SinkFormat { AppendOnly, Upsert, Debezium, } +impl Display for SinkFormat { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + /// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub enum SinkEncode { Json, Protobuf, @@ -142,6 +150,12 @@ pub enum SinkEncode { Text, } +impl Display for SinkEncode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + impl SinkFormatDesc { pub fn from_legacy_type(connector: &str, r#type: &str) -> Result, SinkError> { use crate::sink::kafka::KafkaSink; diff --git a/src/frontend/src/telemetry.rs b/src/frontend/src/telemetry.rs index d50b9999b946f..19777a28ca901 100644 --- a/src/frontend/src/telemetry.rs +++ b/src/frontend/src/telemetry.rs @@ -14,14 +14,35 @@ use prost::Message; use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; -use risingwave_common::telemetry::report::TelemetryReportCreator; +use risingwave_common::telemetry::report::{report_event_common, TelemetryReportCreator}; use risingwave_common::telemetry::{ current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, }; +use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; use serde::{Deserialize, Serialize}; const TELEMETRY_FRONTEND_REPORT_TYPE: &str = "frontend"; +#[allow(dead_code)] // please remove when used +pub(crate) fn report_event( + event_stage: PbTelemetryEventStage, + event_name: &str, + catalog_id: i64, + connector_name: Option, + component: Option, + attributes: Option, // any json string +) { + report_event_common( + event_stage, + event_name, + catalog_id, + connector_name, + component, + attributes, + TELEMETRY_FRONTEND_REPORT_TYPE.to_string(), + ); +} + #[derive(Clone, Copy)] pub(crate) struct FrontendTelemetryCreator {} diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 6252d845788af..80bd394f40d20 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -38,6 +38,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } hex = "0.4" hyper = "0.14" # required by tonic itertools = { workspace = true } +jsonbb = { workspace = true } maplit = "1.0.2" memcomparable = { version = "0.2" } mime_guess = "2" @@ -71,12 +72,12 @@ sync-point = { path = "../utils/sync-point" } thiserror = "1" thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ - "rt", - "rt-multi-thread", - "sync", - "macros", - "time", - "signal", + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", + "signal", ] } tokio-retry = "0.3" tokio-stream = { version = "0.1", features = ["net"] } @@ -89,10 +90,10 @@ uuid = { version = "1", features = ["v4"] } [target.'cfg(not(madsim))'.dependencies] axum = { workspace = true } tower-http = { version = "0.5", features = [ - "add-extension", - "cors", - "fs", - "compression-gzip", + "add-extension", + "cors", + "fs", + "compression-gzip", ] } workspace-hack = { path = "../workspace-hack" } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 6d5657a158e48..b7997e6b74940 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -574,6 +574,14 @@ impl GlobalBarrierManager { self.context .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)); let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0); + crate::telemetry::report_event( + risingwave_pb::telemetry::TelemetryEventStage::Recovery, + "normal_recovery", + 0, + None, + None, + None, + ); let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); @@ -805,6 +813,15 @@ impl GlobalBarrierManager { prev_epoch = prev_epoch.value().0 ); + crate::telemetry::report_event( + risingwave_pb::telemetry::TelemetryEventStage::Recovery, + "failure_recovery", + 0, + None, + None, + None, + ); + // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. self.recovery(None).instrument(span).await; @@ -830,8 +847,15 @@ impl GlobalBarrierManager { prev_epoch = prev_epoch.value().0 ); - // No need to clean dirty tables for barrier recovery, - // The foreground stream job should cleanup their own tables. + crate::telemetry::report_event( + risingwave_pb::telemetry::TelemetryEventStage::Recovery, + "adhoc_recovery", + 0, + None, + None, + None, + ); // No need to clean dirty tables for barrier recovery, + // The foreground stream job should cleanup their own tables. self.recovery(None).instrument(span).await; self.context.set_status(BarrierManagerStatus::Running); } else { diff --git a/src/meta/src/telemetry.rs b/src/meta/src/telemetry.rs index 98c7d997fc89b..f84feff6af0a5 100644 --- a/src/meta/src/telemetry.rs +++ b/src/meta/src/telemetry.rs @@ -15,12 +15,15 @@ use prost::Message; use risingwave_common::config::MetaBackend; use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; -use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator}; +use risingwave_common::telemetry::report::{ + report_event_common, TelemetryInfoFetcher, TelemetryReportCreator, +}; use risingwave_common::telemetry::{ current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, }; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_pb::common::WorkerType; +use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -29,6 +32,25 @@ use crate::model::ClusterId; const TELEMETRY_META_REPORT_TYPE: &str = "meta"; +pub(crate) fn report_event( + event_stage: PbTelemetryEventStage, + event_name: &str, + catalog_id: i64, + connector_name: Option, + component: Option, + attributes: Option, // any json string +) { + report_event_common( + event_stage, + event_name, + catalog_id, + connector_name, + component, + attributes, + TELEMETRY_META_REPORT_TYPE.to_string(), + ); +} + #[derive(Debug, Serialize, Deserialize)] struct NodeCount { meta_count: u64, diff --git a/src/storage/compactor/Cargo.toml b/src/storage/compactor/Cargo.toml index f9d943be27964..ed8f421ee6384 100644 --- a/src/storage/compactor/Cargo.toml +++ b/src/storage/compactor/Cargo.toml @@ -18,6 +18,7 @@ normal = ["workspace-hack"] async-trait = "0.1" await-tree = { workspace = true } clap = { workspace = true } +jsonbb = { workspace = true } parking_lot = { workspace = true } prost = { workspace = true } risingwave_common = { workspace = true } @@ -29,13 +30,13 @@ risingwave_rpc_client = { workspace = true } risingwave_storage = { workspace = true } serde = { version = "1", features = ["derive"] } tokio = { version = "0.2", package = "madsim-tokio", features = [ - "fs", - "rt", - "rt-multi-thread", - "sync", - "macros", - "time", - "signal", + "fs", + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", + "signal", ] } tonic = { workspace = true } tracing = "0.1" diff --git a/src/storage/compactor/src/telemetry.rs b/src/storage/compactor/src/telemetry.rs index b01fdd1e258f3..815b485f9e898 100644 --- a/src/storage/compactor/src/telemetry.rs +++ b/src/storage/compactor/src/telemetry.rs @@ -14,14 +14,35 @@ use prost::Message; use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; -use risingwave_common::telemetry::report::TelemetryReportCreator; +use risingwave_common::telemetry::report::{report_event_common, TelemetryReportCreator}; use risingwave_common::telemetry::{ current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, }; +use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; use serde::{Deserialize, Serialize}; const TELEMETRY_COMPACTOR_REPORT_TYPE: &str = "compactor"; +#[allow(dead_code)] // please remove when used +pub(crate) fn report_event( + event_stage: PbTelemetryEventStage, + event_name: &str, + catalog_id: i64, + connector_name: Option, + object: Option, + attributes: Option, // json object +) { + report_event_common( + event_stage, + event_name, + catalog_id, + connector_name, + object, + attributes, + TELEMETRY_COMPACTOR_REPORT_TYPE.to_string(), + ); +} + #[derive(Clone, Copy)] pub(crate) struct CompactorTelemetryCreator {} diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 89f2b61fd3449..3c85092a4d677 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -32,12 +32,13 @@ foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } governor = { version = "0.6", default-features = false, features = [ - "std", - "dashmap", - "jitter", + "std", + "dashmap", + "jitter", ] } hytra = "0.1.2" itertools = { workspace = true } +jsonbb = { workspace = true } local_stats_alloc = { path = "../utils/local_stats_alloc" } lru = { workspace = true } maplit = "1.0.2" @@ -68,13 +69,13 @@ strum_macros = "0.26" thiserror = "1" thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ - "rt", - "rt-multi-thread", - "sync", - "macros", - "time", - "signal", - "fs", + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", + "signal", + "fs", ] } tokio-metrics = "0.3.0" tokio-retry = "0.3" @@ -92,7 +93,7 @@ expect-test = "1" risingwave_expr_impl = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_hummock_test = { path = "../storage/hummock_test", features = [ - "test", + "test", ] } serde_yaml = "0.9" tracing-test = "0.2" diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 5e77be7beb7a0..81aa00e93588c 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -18,13 +18,14 @@ use anyhow::anyhow; use risingwave_common::catalog::{ColumnCatalog, Schema}; use risingwave_common::types::DataType; use risingwave_connector::match_sink_name_str; -use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType}; +use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType}; use risingwave_connector::sink::{ SinkError, SinkMetaClient, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, }; use risingwave_pb::catalog::Table; use risingwave_pb::plan_common::PbColumnCatalog; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; +use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; use super::*; use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; @@ -32,9 +33,36 @@ use crate::common::log_store_impl::kv_log_store::{ KvLogStoreFactory, KvLogStoreMetrics, KvLogStorePkInfo, KV_LOG_STORE_V2_INFO, }; use crate::executor::SinkExecutor; +use crate::telemetry::report_event; pub struct SinkExecutorBuilder; +fn telemetry_sink_build( + sink_id: &SinkId, + connector_name: &str, + sink_format_desc: &Option, +) { + let attr = sink_format_desc.as_ref().map(|f| { + let mut builder = jsonbb::Builder::>::new(); + builder.begin_object(); + builder.add_string("format"); + builder.add_value(jsonbb::ValueRef::String(f.format.to_string().as_str())); + builder.add_string("encode"); + builder.add_value(jsonbb::ValueRef::String(f.encode.to_string().as_str())); + builder.end_object(); + builder.finish() + }); + + report_event( + PbTelemetryEventStage::CreateStreamJob, + "sink", + sink_id.sink_id() as i64, + Some(connector_name.to_string()), + Some(PbTelemetryDatabaseObject::Sink), + attr, + ) +} + fn resolve_pk_info( input_schema: &Schema, log_store_table: &Table, @@ -192,6 +220,8 @@ impl ExecutorBuilder for SinkExecutorBuilder { connector, sink_id.sink_id, params.executor_id ); + telemetry_sink_build(&sink_id, connector, &sink_param.format_desc); + let exec = match node.log_store_type() { // Default value is the normal in memory log store to be backward compatible with the // previously unset value diff --git a/src/stream/src/from_proto/source/mod.rs b/src/stream/src/from_proto/source/mod.rs index 5383c8a768fd0..63a81ee961eb0 100644 --- a/src/stream/src/from_proto/source/mod.rs +++ b/src/stream/src/from_proto/source/mod.rs @@ -13,8 +13,50 @@ // limitations under the License. mod trad_source; + +use std::collections::BTreeMap; + pub use trad_source::{create_source_desc_builder, SourceExecutorBuilder}; mod fs_fetch; pub use fs_fetch::FsFetchExecutorBuilder; +use risingwave_common::catalog::TableId; +use risingwave_connector::source::UPSTREAM_SOURCE_KEY; +use risingwave_pb::catalog::PbStreamSourceInfo; +use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; use super::*; +use crate::telemetry::report_event; + +fn get_connector_name(with_props: &BTreeMap) -> String { + with_props + .get(UPSTREAM_SOURCE_KEY) + .map(|s| s.to_lowercase()) + .unwrap_or_default() +} + +fn telemetry_source_build( + source_type: &str, // "source" or "source backfill" + source_id: &TableId, + source_info: &PbStreamSourceInfo, + with_props: &BTreeMap, +) { + let mut builder = jsonbb::Builder::>::new(); + builder.begin_object(); + builder.add_string("format"); + builder.add_value(jsonbb::ValueRef::String(source_info.format().as_str_name())); + builder.add_string("encode"); + builder.add_value(jsonbb::ValueRef::String( + source_info.row_encode().as_str_name(), + )); + builder.end_object(); + let value = builder.finish(); + + report_event( + PbTelemetryEventStage::CreateStreamJob, + source_type, + source_id.table_id as i64, + Some(get_connector_name(with_props)), + Some(PbTelemetryDatabaseObject::Source), + Some(value), + ) +} diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 667f2c5d49bc9..c27b3e44d9ad1 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -12,15 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use risingwave_common::catalog::{ - default_key_column_name_version_mapping, TableId, KAFKA_TIMESTAMP_COLUMN_NAME, + default_key_column_name_version_mapping, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_connector::source::reader::desc::SourceDescBuilder; -use risingwave_connector::source::{should_copy_to_format_encode_options, UPSTREAM_SOURCE_KEY}; +use risingwave_connector::source::should_copy_to_format_encode_options; use risingwave_connector::WithPropertiesExt; -use risingwave_pb::catalog::PbStreamSourceInfo; use risingwave_pb::data::data_type::TypeName as PbTypeName; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{ @@ -42,6 +39,8 @@ const FS_CONNECTORS: &[&str] = &["s3"]; pub struct SourceExecutorBuilder; pub fn create_source_desc_builder( + source_type: &str, // "source" or "source backfill" + source_id: &TableId, mut source_columns: Vec, params: &ExecutorParams, source_info: PbStreamSourceInfo, @@ -112,6 +111,8 @@ pub fn create_source_desc_builder( }); } + telemetry_source_build(source_type, source_id, &source_info, &with_properties); + SourceDescBuilder::new( source_columns.clone(), params.env.source_metrics(), @@ -155,11 +156,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { if source_info.format_encode_options.is_empty() { // compatible code: quick fix for , // will move the logic to FragmentManager::init in release 1.7. - let connector = source - .with_properties - .get(UPSTREAM_SOURCE_KEY) - .unwrap_or(&String::default()) - .to_owned(); + let connector = get_connector_name(&source.with_properties); source_info.format_encode_options.extend( source.with_properties.iter().filter_map(|(k, v)| { should_copy_to_format_encode_options(k, &connector) @@ -169,6 +166,8 @@ impl ExecutorBuilder for SourceExecutorBuilder { } let source_desc_builder = create_source_desc_builder( + "source", + &source_id, source.columns.clone(), ¶ms, source_info, @@ -195,11 +194,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { state_table_handler, ); - let connector = source - .with_properties - .get("connector") - .map(|c| c.to_ascii_lowercase()) - .unwrap_or_default(); + let connector = get_connector_name(&source.with_properties); let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str()); let is_fs_v2_connector = source.with_properties.is_new_fs_connector(); diff --git a/src/stream/src/from_proto/source_backfill.rs b/src/stream/src/from_proto/source_backfill.rs index 84f4bf7adab86..a9af482afe2c2 100644 --- a/src/stream/src/from_proto/source_backfill.rs +++ b/src/stream/src/from_proto/source_backfill.rs @@ -36,6 +36,8 @@ impl ExecutorBuilder for SourceBackfillExecutorBuilder { let source_info = node.get_info()?; let source_desc_builder = super::source::create_source_desc_builder( + "source backfill", + &source_id, node.columns.clone(), ¶ms, source_info.clone(), diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 51a735af0f5a7..01f4ca8957ef1 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -54,6 +54,7 @@ pub mod error; pub mod executor; mod from_proto; pub mod task; +pub mod telemetry; #[cfg(test)] risingwave_expr_impl::enable!(); diff --git a/src/stream/src/telemetry.rs b/src/stream/src/telemetry.rs new file mode 100644 index 0000000000000..e64f71fa94e18 --- /dev/null +++ b/src/stream/src/telemetry.rs @@ -0,0 +1,37 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::telemetry::report::report_event_common; +use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; + +const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute"; + +pub fn report_event( + event_stage: PbTelemetryEventStage, + event_name: &str, + catalog_id: i64, + connector_name: Option, + component: Option, + attributes: Option, // any json string +) { + report_event_common( + event_stage, + event_name, + catalog_id, + connector_name, + component, + attributes, + TELEMETRY_COMPUTE_REPORT_TYPE.to_string(), + ); +}