Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): mark a test message #15548

Merged
merged 16 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ message ReportBase {
uint64 report_time = 5;
// node_type is the node that creates the report
TelemetryNodeType node_type = 6;
// mark the report is a test message
// if so, the backend do validations but not store it
bool is_test = 7;
}

message MetaReport {
Expand Down
5 changes: 4 additions & 1 deletion src/common/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub struct TelemetryReportBase {
pub time_stamp: u64,
/// node_type is the node that creates the report
pub node_type: TelemetryNodeType,
/// is_test is whether the report is from a test environment, default to be false
/// needed in CI for compatible tests with telemetry backend
pub is_test: bool,
}

pub trait TelemetryReport: Serialize {}
Expand Down Expand Up @@ -123,7 +126,7 @@ impl Default for SystemData {
}

/// Sends a `POST` request of the telemetry reporting to a URL.
async fn post_telemetry_report_pb(url: &str, report_body: Vec<u8>) -> Result<()> {
pub async fn post_telemetry_report_pb(url: &str, report_body: Vec<u8>) -> Result<()> {
let client = reqwest::Client::new();
let res = client
.post(url)
Expand Down
1 change: 1 addition & 0 deletions src/common/src/telemetry/pb_compatible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl From<TelemetryReportBase> for PbTelemetryReportBase {
up_time: val.up_time,
report_time: val.time_stamp,
node_type: from_telemetry_node_type(val.node_type) as i32,
is_test: val.is_test,
}
}
}
Expand Down
30 changes: 29 additions & 1 deletion src/compute/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use risingwave_common::telemetry::{
};
use serde::{Deserialize, Serialize};

const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute";

#[derive(Clone, Copy)]
pub(crate) struct ComputeTelemetryCreator {}

Expand All @@ -45,7 +47,7 @@ impl TelemetryReportCreator for ComputeTelemetryCreator {
}

fn report_type(&self) -> &str {
"compute"
TELEMETRY_COMPUTE_REPORT_TYPE
}
}

Expand Down Expand Up @@ -74,7 +76,33 @@ impl ComputeTelemetryReport {
system_data: SystemData::new(),
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Compute,
is_test: false,
},
}
}
}

#[cfg(test)]
mod test {
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL};

use crate::telemetry::{ComputeTelemetryReport, TELEMETRY_COMPUTE_REPORT_TYPE};

// It is ok to use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database.
#[tokio::test]
async fn test_compute_telemetry_report() {
let mut report = ComputeTelemetryReport::new(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
100,
);
report.base.is_test = true;

let pb_report = report.to_pb_bytes();
let url =
(TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_COMPUTE_REPORT_TYPE).to_owned();
let post_res = post_telemetry_report_pb(&url, pb_report).await;
assert!(post_res.is_ok());
}
}
27 changes: 25 additions & 2 deletions src/frontend/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use risingwave_common::telemetry::{
};
use serde::{Deserialize, Serialize};

const TELEMETRY_FRONTEND_REPORT_TYPE: &str = "frontend";

#[derive(Clone, Copy)]
pub(crate) struct FrontendTelemetryCreator {}

Expand All @@ -45,11 +47,11 @@ impl TelemetryReportCreator for FrontendTelemetryCreator {
}

fn report_type(&self) -> &str {
"frontend"
TELEMETRY_FRONTEND_REPORT_TYPE
}
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct FrontendTelemetryReport {
#[serde(flatten)]
base: TelemetryReportBase,
Expand All @@ -74,6 +76,7 @@ impl FrontendTelemetryReport {
up_time,
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Frontend,
is_test: false,
},
}
}
Expand All @@ -93,4 +96,24 @@ mod tests {
assert_eq!(report.base.up_time, 0);
assert_eq!(report.base.node_type, TelemetryNodeType::Frontend);
}

use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL};

// It is ok to use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database.
#[tokio::test]
async fn test_frontend_telemetry_report() {
let mut report = super::FrontendTelemetryReport::new(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
100,
);
report.base.is_test = true;

let pb_report = report.to_pb_bytes();
let url =
(TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_FRONTEND_REPORT_TYPE).to_owned();
let post_res = post_telemetry_report_pb(&url, pb_report).await;
assert!(post_res.is_ok());
}
}
53 changes: 52 additions & 1 deletion src/meta/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use thiserror_ext::AsReport;
use crate::manager::MetadataManager;
use crate::model::ClusterId;

const TELEMETRY_META_REPORT_TYPE: &str = "meta";

#[derive(Debug, Serialize, Deserialize)]
struct NodeCount {
meta_count: u64,
Expand Down Expand Up @@ -142,6 +144,7 @@ impl TelemetryReportCreator for MetaReportCreator {
up_time,
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Meta,
is_test: false,
},
node_count: NodeCount {
meta_count: *node_map.get(&WorkerType::Meta).unwrap_or(&0),
Expand All @@ -155,6 +158,54 @@ impl TelemetryReportCreator for MetaReportCreator {
}

fn report_type(&self) -> &str {
"meta"
TELEMETRY_META_REPORT_TYPE
}
}

#[cfg(test)]
mod test {
use risingwave_common::config::MetaBackend;
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase,
};

use crate::telemetry::{MetaTelemetryReport, NodeCount, RwVersion};

#[tokio::test]
async fn test_meta_telemetry_report() {
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL};

use crate::telemetry::TELEMETRY_META_REPORT_TYPE;

// we don't call `create_report` here because it rely on the metadata manager
let report = MetaTelemetryReport {
base: TelemetryReportBase {
tracking_id: "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_owned(),
session_id: "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_owned(),
system_data: SystemData::new(),
up_time: 100,
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Meta,
is_test: true,
},
node_count: NodeCount {
meta_count: 1,
compute_count: 2,
frontend_count: 3,
compactor_count: 4,
},
streaming_job_count: 5,
meta_backend: MetaBackend::Etcd,
rw_version: RwVersion {
version: "version".to_owned(),
git_sha: "git_sha".to_owned(),
},
};

let pb_bytes = report.to_pb_bytes();
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_META_REPORT_TYPE).to_owned();
let result = post_telemetry_report_pb(&url, pb_bytes).await;
assert!(result.is_ok());
}
}
34 changes: 30 additions & 4 deletions src/storage/compactor/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use prost::Message;
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::report::TelemetryReportCreator;
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase,
TelemetryResult,
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult,
};
use serde::{Deserialize, Serialize};

const TELEMETRY_COMPACTOR_REPORT_TYPE: &str = "compactor";

#[derive(Clone, Copy)]
pub(crate) struct CompactorTelemetryCreator {}

Expand All @@ -46,7 +47,7 @@ impl TelemetryReportCreator for CompactorTelemetryCreator {
}

fn report_type(&self) -> &str {
"compactor"
TELEMETRY_COMPACTOR_REPORT_TYPE
}
}

Expand All @@ -65,7 +66,6 @@ pub(crate) struct CompactorTelemetryReport {
base: TelemetryReportBase,
}

impl TelemetryReport for CompactorTelemetryReport {}
impl CompactorTelemetryReport {
pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
Self {
Expand All @@ -76,7 +76,33 @@ impl CompactorTelemetryReport {
up_time,
time_stamp: current_timestamp(),
node_type: TelemetryNodeType::Compactor,
is_test: false,
},
}
}
}

#[cfg(test)]
mod test {
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::{post_telemetry_report_pb, TELEMETRY_REPORT_URL};

use crate::telemetry::{CompactorTelemetryReport, TELEMETRY_COMPACTOR_REPORT_TYPE};

// It is ok to use `TELEMETRY_REPORT_URL` here because we mark it as test and will not write to the database.
#[tokio::test]
async fn test_compactor_telemetry_report() {
let mut report = CompactorTelemetryReport::new(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
100,
);
report.base.is_test = true;

let pb_report = report.to_pb_bytes();
let url =
(TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_COMPACTOR_REPORT_TYPE).to_owned();
let post_res = post_telemetry_report_pb(&url, pb_report).await;
assert!(post_res.is_ok());
}
}
Loading