Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: define the telemetry protocol in protobuf #14952

Merged
merged 4 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
syntax = "proto3";

package telemetry;

enum MetaBackend {
META_BACKEND_UNSPECIFIED = 0;
META_BACKEND_MEMORY = 1;
META_BACKEND_ETCD = 2;
META_BACKEND_RDB = 3;
}

enum TelemetryNodeType {
TELEMETRY_NODE_TYPE_UNSPECIFIED = 0;
TELEMETRY_NODE_TYPE_META = 1;
TELEMETRY_NODE_TYPE_COMPUTE = 2;
TELEMETRY_NODE_TYPE_FRONTEND = 3;
TELEMETRY_NODE_TYPE_COMPACTOR = 4;
}

message SystemMemory {
uint64 used = 1;
uint64 total = 2;
}

message SystemOs {
string name = 1;
string version = 2;
string kernel_version = 3;
}

message SystemCpu {
float available = 1;
}

message SystemData {
SystemMemory memory = 1;
SystemOs os = 2;
SystemCpu cpu = 3;
}

// NodeCount represents how many nodes in this cluster
message NodeCount {
uint32 meta = 1;
uint32 compute = 2;
uint32 frontend = 3;
uint32 compactor = 4;
}

// RwVersion represents the version of RisingWave
message RwVersion {
// Version is the Cargo package version of RisingWave
string rw_version = 1;
// GitSHA is the Git commit SHA of RisingWave
string git_sha = 2;
}

message ReportBase {
// tracking_id is persistent in meta data
string tracking_id = 1;
// session_id is reset every time node restarts
string session_id = 2;
// system_data is hardware and os info
SystemData system_data = 3;
// up_time is how long the node has been running
uint64 up_time = 4;
// report_time is when the report is created
uint64 report_time = 5;
// node_type is the node that creates the report
TelemetryNodeType node_type = 6;
}

message MetaReport {
ReportBase base = 1;
// meta_backend is the backend of meta data
MetaBackend meta_backend = 2;
// node_count is the count of each node type
NodeCount node_count = 3;
// rw_version is the version of RisingWave
RwVersion rw_version = 4;
// This field represents the "number of running streaming jobs"
// and is used to indicate whether the cluster is active.
uint32 stream_job_count = 5;
}

message ComputeReport {
ReportBase base = 1;
}

message FrontendReport {
ReportBase base = 1;
}

message CompactorReport {
ReportBase base = 1;
}
75 changes: 75 additions & 0 deletions src/common/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ pub mod report;

use std::time::SystemTime;

use risingwave_pb::telemetry::{
ReportBase as PbTelemetryReportBase, SystemCpu as PbSystemCpu, SystemData as PbSystemData,
SystemMemory as PbSystemMemory, SystemOs as PbSystemOs,
TelemetryNodeType as PbTelemetryNodeType,
};
use serde::{Deserialize, Serialize};
use sysinfo::System;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -65,6 +70,19 @@ pub struct TelemetryReportBase {
pub node_type: TelemetryNodeType,
}

impl From<TelemetryReportBase> for PbTelemetryReportBase {
fn from(val: TelemetryReportBase) -> Self {
PbTelemetryReportBase {
tracking_id: val.tracking_id,
session_id: val.session_id,
system_data: Some(val.system_data.into()),
up_time: val.up_time,
report_time: val.time_stamp,
node_type: from_telemetry_node_type(val.node_type) as i32,
}
}
}

pub trait TelemetryReport: Serialize {}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -155,6 +173,63 @@ pub fn current_timestamp() -> u64 {
.as_secs()
}

fn from_telemetry_node_type(t: TelemetryNodeType) -> PbTelemetryNodeType {
match t {
TelemetryNodeType::Meta => PbTelemetryNodeType::Meta,
TelemetryNodeType::Compute => PbTelemetryNodeType::Compute,
TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend,
TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor,
}
}

impl From<TelemetryNodeType> for PbTelemetryNodeType {
fn from(val: TelemetryNodeType) -> Self {
match val {
TelemetryNodeType::Meta => PbTelemetryNodeType::Meta,
TelemetryNodeType::Compute => PbTelemetryNodeType::Compute,
TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend,
TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor,
}
}
}

impl From<Cpu> for PbSystemCpu {
fn from(val: Cpu) -> Self {
PbSystemCpu {
available: val.available,
}
}
}

impl From<Memory> for PbSystemMemory {
fn from(val: Memory) -> Self {
PbSystemMemory {
used: val.used as u64,
total: val.total as u64,
}
}
}

impl From<Os> for PbSystemOs {
fn from(val: Os) -> Self {
PbSystemOs {
name: val.name,
kernel_version: val.kernel_version,
version: val.version,
}
}
}

impl From<SystemData> for PbSystemData {
fn from(val: SystemData) -> Self {
PbSystemData {
memory: Some(val.memory.into()),
os: Some(val.os.into()),
cpu: Some(val.cpu.into()),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"stream_plan",
"stream_service",
"task_service",
"telemetry",
"user",
];
let protos: Vec<String> = proto_files
Expand Down
6 changes: 6 additions & 0 deletions src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub mod java_binding;
#[cfg_attr(madsim, path = "sim/health.rs")]
pub mod health;
#[rustfmt::skip]
#[path = "sim/telemetry.rs"]
pub mod telemetry;
#[rustfmt::skip]
#[path = "connector_service.serde.rs"]
pub mod connector_service_serde;
#[rustfmt::skip]
Expand Down Expand Up @@ -151,6 +154,9 @@ pub mod backup_service_serde;
#[rustfmt::skip]
#[path = "java_binding.serde.rs"]
pub mod java_binding_serde;
#[rustfmt::skip]
#[path = "telemetry.serde.rs"]
pub mod telemetry_serde;

#[derive(Clone, PartialEq, Eq, Debug, Error)]
#[error("field `{0}` not found")]
Expand Down
Loading