diff --git a/proto/telemetry.proto b/proto/telemetry.proto index 3fcf3911ea361..563ff3cb965a3 100644 --- a/proto/telemetry.proto +++ b/proto/telemetry.proto @@ -69,6 +69,9 @@ message ReportBase { uint64 report_time = 5; // node_type is the node that creates the report TelemetryNodeType node_type = 6; + // mark the report is a test message + // if so, the backend do validations but not store it + bool is_test = 7; } message MetaReport { diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs index 576b01f3e1bc0..29a34b9a86422 100644 --- a/src/common/src/telemetry/mod.rs +++ b/src/common/src/telemetry/mod.rs @@ -64,6 +64,9 @@ pub struct TelemetryReportBase { pub time_stamp: u64, /// node_type is the node that creates the report pub node_type: TelemetryNodeType, + /// is_test is whether the report is from a test environment, default to be false + /// needed in CI for compatible tests with telemetry backend + pub is_test: bool, } pub trait TelemetryReport: Serialize {} @@ -123,7 +126,7 @@ impl Default for SystemData { } /// Sends a `POST` request of the telemetry reporting to a URL. -async fn post_telemetry_report_pb(url: &str, report_body: Vec) -> Result<()> { +pub async fn post_telemetry_report_pb(url: &str, report_body: Vec) -> Result<()> { let client = reqwest::Client::new(); let res = client .post(url) diff --git a/src/common/src/telemetry/pb_compatible.rs b/src/common/src/telemetry/pb_compatible.rs index 56f31632734c0..36772872a186e 100644 --- a/src/common/src/telemetry/pb_compatible.rs +++ b/src/common/src/telemetry/pb_compatible.rs @@ -33,6 +33,7 @@ impl From for PbTelemetryReportBase { up_time: val.up_time, report_time: val.time_stamp, node_type: from_telemetry_node_type(val.node_type) as i32, + is_test: val.is_test, } } } diff --git a/src/compute/src/telemetry.rs b/src/compute/src/telemetry.rs index eb236818510a2..143ec51c5bc39 100644 --- a/src/compute/src/telemetry.rs +++ b/src/compute/src/telemetry.rs @@ -20,6 +20,8 @@ use risingwave_common::telemetry::{ }; use serde::{Deserialize, Serialize}; +const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute"; + #[derive(Clone, Copy)] pub(crate) struct ComputeTelemetryCreator {} @@ -45,7 +47,7 @@ impl TelemetryReportCreator for ComputeTelemetryCreator { } fn report_type(&self) -> &str { - "compute" + TELEMETRY_COMPUTE_REPORT_TYPE } } @@ -74,7 +76,34 @@ impl ComputeTelemetryReport { system_data: SystemData::new(), time_stamp: current_timestamp(), node_type: TelemetryNodeType::Compute, + is_test: false, }, } } } + +#[cfg(test)] +mod test { + use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; + use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL}; + + use crate::telemetry::{ComputeTelemetryReport, TELEMETRY_COMPUTE_REPORT_TYPE}; + + // It is ok to use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database. + #[cfg(not(madsim))] + #[tokio::test] + async fn test_compute_telemetry_report() { + let mut report = ComputeTelemetryReport::new( + "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), + "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), + 100, + ); + report.base.is_test = true; + + let pb_report = report.to_pb_bytes(); + let url = + (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_COMPUTE_REPORT_TYPE).to_owned(); + let post_res = post_telemetry_report_pb(&url, pb_report).await; + assert!(post_res.is_ok()); + } +} diff --git a/src/frontend/src/telemetry.rs b/src/frontend/src/telemetry.rs index 528f5ca2c6c4c..631914cee7a4f 100644 --- a/src/frontend/src/telemetry.rs +++ b/src/frontend/src/telemetry.rs @@ -20,6 +20,8 @@ use risingwave_common::telemetry::{ }; use serde::{Deserialize, Serialize}; +const TELEMETRY_FRONTEND_REPORT_TYPE: &str = "frontend"; + #[derive(Clone, Copy)] pub(crate) struct FrontendTelemetryCreator {} @@ -45,11 +47,11 @@ impl TelemetryReportCreator for FrontendTelemetryCreator { } fn report_type(&self) -> &str { - "frontend" + TELEMETRY_FRONTEND_REPORT_TYPE } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub(crate) struct FrontendTelemetryReport { #[serde(flatten)] base: TelemetryReportBase, @@ -74,6 +76,7 @@ impl FrontendTelemetryReport { up_time, time_stamp: current_timestamp(), node_type: TelemetryNodeType::Frontend, + is_test: false, }, } } @@ -93,4 +96,26 @@ mod tests { assert_eq!(report.base.up_time, 0); assert_eq!(report.base.node_type, TelemetryNodeType::Frontend); } + + use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; + use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL}; + + // It is ok to + // use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database. + #[cfg(not(madsim))] + #[tokio::test] + async fn test_frontend_telemetry_report() { + let mut report = super::FrontendTelemetryReport::new( + "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), + "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), + 100, + ); + report.base.is_test = true; + + let pb_report = report.to_pb_bytes(); + let url = + (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_FRONTEND_REPORT_TYPE).to_owned(); + let post_res = post_telemetry_report_pb(&url, pb_report).await; + assert!(post_res.is_ok()); + } } diff --git a/src/meta/src/telemetry.rs b/src/meta/src/telemetry.rs index 0f4b19bf13d12..19e7d46f3c048 100644 --- a/src/meta/src/telemetry.rs +++ b/src/meta/src/telemetry.rs @@ -27,6 +27,8 @@ use thiserror_ext::AsReport; use crate::manager::MetadataManager; use crate::model::ClusterId; +const TELEMETRY_META_REPORT_TYPE: &str = "meta"; + #[derive(Debug, Serialize, Deserialize)] struct NodeCount { meta_count: u64, @@ -142,6 +144,7 @@ impl TelemetryReportCreator for MetaReportCreator { up_time, time_stamp: current_timestamp(), node_type: TelemetryNodeType::Meta, + is_test: false, }, node_count: NodeCount { meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0), @@ -155,6 +158,55 @@ impl TelemetryReportCreator for MetaReportCreator { } fn report_type(&self) -> &str { - "meta" + TELEMETRY_META_REPORT_TYPE + } +} + +#[cfg(test)] +mod test { + use risingwave_common::config::MetaBackend; + use risingwave_common::telemetry::{ + current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, + }; + + use crate::telemetry::{MetaTelemetryReport, NodeCount, RwVersion}; + + #[cfg(not(madsim))] + #[tokio::test] + async fn test_meta_telemetry_report() { + use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; + use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL}; + + use crate::telemetry::TELEMETRY_META_REPORT_TYPE; + + // we don't call `create_report` here because it rely on the metadata manager + let report = MetaTelemetryReport { + base: TelemetryReportBase { + tracking_id: "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_owned(), + session_id: "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_owned(), + system_data: SystemData::new(), + up_time: 100, + time_stamp: current_timestamp(), + node_type: TelemetryNodeType::Meta, + is_test: true, + }, + node_count: NodeCount { + meta_count: 1, + compute_count: 2, + frontend_count: 3, + compactor_count: 4, + }, + streaming_job_count: 5, + meta_backend: MetaBackend::Etcd, + rw_version: RwVersion { + version: "version".to_owned(), + git_sha: "git_sha".to_owned(), + }, + }; + + let pb_bytes = report.to_pb_bytes(); + let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_META_REPORT_TYPE).to_owned(); + let result = post_telemetry_report_pb(&url, pb_bytes).await; + assert!(result.is_ok()); } } diff --git a/src/storage/compactor/src/telemetry.rs b/src/storage/compactor/src/telemetry.rs index 2793f8ca7ea01..2bf7aaaa3f508 100644 --- a/src/storage/compactor/src/telemetry.rs +++ b/src/storage/compactor/src/telemetry.rs @@ -16,11 +16,12 @@ use prost::Message; use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; use risingwave_common::telemetry::report::TelemetryReportCreator; use risingwave_common::telemetry::{ - current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase, - TelemetryResult, + current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, }; use serde::{Deserialize, Serialize}; +const TELEMETRY_COMPACTOR_REPORT_TYPE: &str = "compactor"; + #[derive(Clone, Copy)] pub(crate) struct CompactorTelemetryCreator {} @@ -46,7 +47,7 @@ impl TelemetryReportCreator for CompactorTelemetryCreator { } fn report_type(&self) -> &str { - "compactor" + TELEMETRY_COMPACTOR_REPORT_TYPE } } @@ -65,7 +66,6 @@ pub(crate) struct CompactorTelemetryReport { base: TelemetryReportBase, } -impl TelemetryReport for CompactorTelemetryReport {} impl CompactorTelemetryReport { pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self { Self { @@ -76,7 +76,34 @@ impl CompactorTelemetryReport { up_time, time_stamp: current_timestamp(), node_type: TelemetryNodeType::Compactor, + is_test: false, }, } } } + +#[cfg(test)] +mod test { + use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; + use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL}; + + use crate::telemetry::{CompactorTelemetryReport, TELEMETRY_COMPACTOR_REPORT_TYPE}; + + // It is ok to use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database. + #[cfg(not(madsim))] + #[tokio::test] + async fn test_compactor_telemetry_report() { + let mut report = CompactorTelemetryReport::new( + "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), + "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), + 100, + ); + report.base.is_test = true; + + let pb_report = report.to_pb_bytes(); + let url = + (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_COMPACTOR_REPORT_TYPE).to_owned(); + let post_res = post_telemetry_report_pb(&url, pb_report).await; + assert!(post_res.is_ok()); + } +}