Skip to content

Commit

Permalink
feat(telemetry): mark a test message (#15548)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored and jetjinser committed Mar 14, 2024
1 parent 4f8a0d6 commit c502b96
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 9 deletions.
3 changes: 3 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ message ReportBase {
uint64 report_time = 5;
// node_type is the node that creates the report
TelemetryNodeType node_type = 6;
// mark the report is a test message
// if so, the backend do validations but not store it
bool is_test = 7;
}

message MetaReport {
Expand Down
5 changes: 4 additions & 1 deletion src/common/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub struct TelemetryReportBase {
pub time_stamp: u64,
/// node_type is the node that creates the report
pub node_type: TelemetryNodeType,
/// is_test is whether the report is from a test environment, default to be false
/// needed in CI for compatible tests with telemetry backend
pub is_test: bool,
}

pub trait TelemetryReport: Serialize {}
Expand Down Expand Up @@ -123,7 +126,7 @@ impl Default for SystemData {
}

/// Sends a `POST` request of the telemetry reporting to a URL.
async fn post_telemetry_report_pb(url: &str, report_body: Vec<u8>) -> Result<()> {
pub async fn post_telemetry_report_pb(url: &str, report_body: Vec<u8>) -> Result<()> {
let client = reqwest::Client::new();
let res = client
.post(url)
Expand Down
1 change: 1 addition & 0 deletions src/common/src/telemetry/pb_compatible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl From<TelemetryReportBase> for PbTelemetryReportBase {
up_time: val.up_time,
report_time: val.time_stamp,
node_type: from_telemetry_node_type(val.node_type) as i32,
is_test: val.is_test,
}
}
}
Expand Down
31 changes: 30 additions & 1 deletion src/compute/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use risingwave_common::telemetry::{
};
use serde::{Deserialize, Serialize};

const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute";

#[derive(Clone, Copy)]
pub(crate) struct ComputeTelemetryCreator {}

Expand All @@ -45,7 +47,7 @@ impl TelemetryReportCreator for ComputeTelemetryCreator {
}

fn report_type(&self) -> &str {
"compute"
TELEMETRY_COMPUTE_REPORT_TYPE
}
}

Expand Down Expand Up @@ -74,7 +76,34 @@ impl ComputeTelemetryReport {
system_data: SystemData::new(),
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Compute,
is_test: false,
},
}
}
}

#[cfg(test)]
mod test {
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL};

use crate::telemetry::{ComputeTelemetryReport, TELEMETRY_COMPUTE_REPORT_TYPE};

// It is ok to use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database.
#[cfg(not(madsim))]
#[tokio::test]
async fn test_compute_telemetry_report() {
let mut report = ComputeTelemetryReport::new(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
100,
);
report.base.is_test = true;

let pb_report = report.to_pb_bytes();
let url =
(TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_COMPUTE_REPORT_TYPE).to_owned();
let post_res = post_telemetry_report_pb(&url, pb_report).await;
assert!(post_res.is_ok());
}
}
29 changes: 27 additions & 2 deletions src/frontend/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use risingwave_common::telemetry::{
};
use serde::{Deserialize, Serialize};

const TELEMETRY_FRONTEND_REPORT_TYPE: &str = "frontend";

#[derive(Clone, Copy)]
pub(crate) struct FrontendTelemetryCreator {}

Expand All @@ -45,11 +47,11 @@ impl TelemetryReportCreator for FrontendTelemetryCreator {
}

fn report_type(&self) -> &str {
"frontend"
TELEMETRY_FRONTEND_REPORT_TYPE
}
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct FrontendTelemetryReport {
#[serde(flatten)]
base: TelemetryReportBase,
Expand All @@ -74,6 +76,7 @@ impl FrontendTelemetryReport {
up_time,
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Frontend,
is_test: false,
},
}
}
Expand All @@ -93,4 +96,26 @@ mod tests {
assert_eq!(report.base.up_time, 0);
assert_eq!(report.base.node_type, TelemetryNodeType::Frontend);
}

use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL};

// It is ok to
// use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database.
#[cfg(not(madsim))]
#[tokio::test]
async fn test_frontend_telemetry_report() {
let mut report = super::FrontendTelemetryReport::new(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
100,
);
report.base.is_test = true;

let pb_report = report.to_pb_bytes();
let url =
(TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_FRONTEND_REPORT_TYPE).to_owned();
let post_res = post_telemetry_report_pb(&url, pb_report).await;
assert!(post_res.is_ok());
}
}
54 changes: 53 additions & 1 deletion src/meta/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use thiserror_ext::AsReport;
use crate::manager::MetadataManager;
use crate::model::ClusterId;

const TELEMETRY_META_REPORT_TYPE: &str = "meta";

#[derive(Debug, Serialize, Deserialize)]
struct NodeCount {
meta_count: u64,
Expand Down Expand Up @@ -142,6 +144,7 @@ impl TelemetryReportCreator for MetaReportCreator {
up_time,
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Meta,
is_test: false,
},
node_count: NodeCount {
meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0),
Expand All @@ -155,6 +158,55 @@ impl TelemetryReportCreator for MetaReportCreator {
}

fn report_type(&self) -> &str {
"meta"
TELEMETRY_META_REPORT_TYPE
}
}

#[cfg(test)]
mod test {
use risingwave_common::config::MetaBackend;
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase,
};

use crate::telemetry::{MetaTelemetryReport, NodeCount, RwVersion};

#[cfg(not(madsim))]
#[tokio::test]
async fn test_meta_telemetry_report() {
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL};

use crate::telemetry::TELEMETRY_META_REPORT_TYPE;

// we don't call `create_report` here because it rely on the metadata manager
let report = MetaTelemetryReport {
base: TelemetryReportBase {
tracking_id: "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_owned(),
session_id: "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_owned(),
system_data: SystemData::new(),
up_time: 100,
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Meta,
is_test: true,
},
node_count: NodeCount {
meta_count: 1,
compute_count: 2,
frontend_count: 3,
compactor_count: 4,
},
streaming_job_count: 5,
meta_backend: MetaBackend::Etcd,
rw_version: RwVersion {
version: "version".to_owned(),
git_sha: "git_sha".to_owned(),
},
};

let pb_bytes = report.to_pb_bytes();
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_META_REPORT_TYPE).to_owned();
let result = post_telemetry_report_pb(&url, pb_bytes).await;
assert!(result.is_ok());
}
}
35 changes: 31 additions & 4 deletions src/storage/compactor/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use prost::Message;
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::report::TelemetryReportCreator;
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase,
TelemetryResult,
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult,
};
use serde::{Deserialize, Serialize};

const TELEMETRY_COMPACTOR_REPORT_TYPE: &str = "compactor";

#[derive(Clone, Copy)]
pub(crate) struct CompactorTelemetryCreator {}

Expand All @@ -46,7 +47,7 @@ impl TelemetryReportCreator for CompactorTelemetryCreator {
}

fn report_type(&self) -> &str {
"compactor"
TELEMETRY_COMPACTOR_REPORT_TYPE
}
}

Expand All @@ -65,7 +66,6 @@ pub(crate) struct CompactorTelemetryReport {
base: TelemetryReportBase,
}

impl TelemetryReport for CompactorTelemetryReport {}
impl CompactorTelemetryReport {
pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
Self {
Expand All @@ -76,7 +76,34 @@ impl CompactorTelemetryReport {
up_time,
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Compactor,
is_test: false,
},
}
}
}

#[cfg(test)]
mod test {
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL};

use crate::telemetry::{CompactorTelemetryReport, TELEMETRY_COMPACTOR_REPORT_TYPE};

// It is ok to use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database.
#[cfg(not(madsim))]
#[tokio::test]
async fn test_compactor_telemetry_report() {
let mut report = CompactorTelemetryReport::new(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
100,
);
report.base.is_test = true;

let pb_report = report.to_pb_bytes();
let url =
(TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_COMPACTOR_REPORT_TYPE).to_owned();
let post_res = post_telemetry_report_pb(&url, pb_report).await;
assert!(post_res.is_ok());
}
}

0 comments on commit c502b96

Please sign in to comment.