From 84c494ffd7e2004813f503627627e1b8e37f0924 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 2 Feb 2024 15:19:07 +0800 Subject: [PATCH 1/3] init --- proto/telemetry.proto | 95 +++++++++++++++++++++++++++++++++++++++++++ src/prost/build.rs | 1 + 2 files changed, 96 insertions(+) create mode 100644 proto/telemetry.proto diff --git a/proto/telemetry.proto b/proto/telemetry.proto new file mode 100644 index 0000000000000..017a55d3ccfc0 --- /dev/null +++ b/proto/telemetry.proto @@ -0,0 +1,95 @@ +syntax = "proto3"; + +package telemetry; + +enum MetaBackend { + MetaBackendUnspecified = 0; + MetaBackendMemory = 1; + MetaBackendEtcd = 2; + MetaBackendRdb = 3; +} + +enum TelemetryNodeType { + TelemetryNodeTypeUnspecified = 0; + TelemetryNodeTypeMeta = 1; + TelemetryNodeTypeCompute = 2; + TelemetryNodeTypeFrontend = 3; + TelemetryNodeTypeCompactor = 4; +} + +message SystemMemory { + uint64 used = 1; + uint64 total = 2; +} + +message SystemOs { + string name = 1; + string version = 2; + string kernel_version = 3; +} + +message SystemCpu { + float available = 1; +} + +message SystemData { + SystemMemory memory = 1; + SystemOs os = 2; + SystemCpu cpu = 3; +} + +// NodeCount represents how many nodes in this cluster +message NodeCount { + uint32 meta = 1; + uint32 compute = 2; + uint32 frontend = 3; + uint32 compactor = 4; +} + +// RwVersion represents the version of RisingWave +message RwVersion { + // Version is the Cargo package version of RisingWave + string rw_version = 1; + // GitSHA is the Git commit SHA of RisingWave + string git_sha = 2; +} + +message ReportBase { + // tracking_id is persistent in meta data + string tracking_id = 1; + // session_id is reset every time node restarts + string session_id = 2; + // system_data is hardware and os info + SystemData system_data = 3; + // up_time is how long the node has been running + uint64 up_time = 4; + // report_time is when the report is created + uint64 report_time = 5; + // node_type is the node that creates the report + TelemetryNodeType node_type = 6; +} + +message MetaReport { + ReportBase base = 1; + // meta_backend is the backend of meta data + MetaBackend meta_backend = 2; + // node_count is the count of each node type + NodeCount node_count = 3; + // rw_version is the version of RisingWave + RwVersion rw_version = 4; + // This field represents the "number of running streaming jobs" + // and is used to indicate whether the cluster is active. + uint32 stream_job_count = 5; +} + +message ComputeReport { + ReportBase base = 1; +} + +message FrontendReport { + ReportBase base = 1; +} + +message CompactorReport { + ReportBase base = 1; +} diff --git a/src/prost/build.rs b/src/prost/build.rs index 3af4c9873863d..cd65b20d6f192 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -51,6 +51,7 @@ fn main() -> Result<(), Box> { "stream_plan", "stream_service", "task_service", + "telemetry", "user", ]; let protos: Vec = proto_files From db43b0abcd53660ed81b16813c8bdec31f889d15 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 2 Feb 2024 16:02:28 +0800 Subject: [PATCH 2/3] fix --- src/common/src/telemetry/mod.rs | 75 +++++++++++++++++++++++++++++++++ src/prost/src/lib.rs | 6 +++ 2 files changed, 81 insertions(+) diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs index e25e312013593..dfb4ce385f9a8 100644 --- a/src/common/src/telemetry/mod.rs +++ b/src/common/src/telemetry/mod.rs @@ -17,6 +17,11 @@ pub mod report; use std::time::SystemTime; +use risingwave_pb::telemetry::{ + ReportBase as PbTelemetryReportBase, SystemCpu as PbSystemCpu, SystemData as PbSystemData, + SystemMemory as PbSystemMemory, SystemOs as PbSystemOs, + TelemetryNodeType as PbTelemetryNodeType, +}; use serde::{Deserialize, Serialize}; use sysinfo::System; use thiserror_ext::AsReport; @@ -65,6 +70,19 @@ pub struct TelemetryReportBase { pub node_type: TelemetryNodeType, } +impl Into for TelemetryReportBase { + fn into(self) -> PbTelemetryReportBase { + PbTelemetryReportBase { + tracking_id: self.tracking_id, + session_id: self.session_id, + system_data: Some(self.system_data.into()), + up_time: self.up_time, + report_time: self.time_stamp, + node_type: from_telemetry_node_type(self.node_type) as i32, + } + } +} + pub trait TelemetryReport: Serialize {} #[derive(Debug, Serialize, Deserialize)] @@ -155,6 +173,63 @@ pub fn current_timestamp() -> u64 { .as_secs() } +fn from_telemetry_node_type(t: TelemetryNodeType) -> PbTelemetryNodeType { + match t { + TelemetryNodeType::Meta => PbTelemetryNodeType::Meta, + TelemetryNodeType::Compute => PbTelemetryNodeType::Compute, + TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend, + TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor, + } +} + +impl Into for TelemetryNodeType { + fn into(self) -> PbTelemetryNodeType { + match self { + TelemetryNodeType::Meta => PbTelemetryNodeType::Meta, + TelemetryNodeType::Compute => PbTelemetryNodeType::Compute, + TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend, + TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor, + } + } +} + +impl Into for Cpu { + fn into(self) -> PbSystemCpu { + PbSystemCpu { + available: self.available, + } + } +} + +impl Into for Memory { + fn into(self) -> PbSystemMemory { + PbSystemMemory { + used: self.used as u64, + total: self.total as u64, + } + } +} + +impl Into for Os { + fn into(self) -> PbSystemOs { + PbSystemOs { + name: self.name, + kernel_version: self.kernel_version, + version: self.version, + } + } +} + +impl Into for SystemData { + fn into(self) -> PbSystemData { + PbSystemData { + memory: Some(self.memory.into()), + os: Some(self.os.into()), + cpu: Some(self.cpu.into()), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index dafa3a568780f..82f399d1a3de6 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -89,6 +89,9 @@ pub mod java_binding; #[cfg_attr(madsim, path = "sim/health.rs")] pub mod health; #[rustfmt::skip] +#[path = "sim/telemetry.rs"] +pub mod telemetry; +#[rustfmt::skip] #[path = "connector_service.serde.rs"] pub mod connector_service_serde; #[rustfmt::skip] @@ -151,6 +154,9 @@ pub mod backup_service_serde; #[rustfmt::skip] #[path = "java_binding.serde.rs"] pub mod java_binding_serde; +#[rustfmt::skip] +#[path = "telemetry.serde.rs"] +pub mod telemetry_serde; #[derive(Clone, PartialEq, Eq, Debug, Error)] #[error("field `{0}` not found")] From a3e9cc24d5a445c5613e735b3511a7270b63c88f Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 2 Feb 2024 17:10:00 +0800 Subject: [PATCH 3/3] fix --- proto/telemetry.proto | 18 +++++------ src/common/src/telemetry/mod.rs | 56 ++++++++++++++++----------------- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/proto/telemetry.proto b/proto/telemetry.proto index 017a55d3ccfc0..2da24cc823085 100644 --- a/proto/telemetry.proto +++ b/proto/telemetry.proto @@ -3,18 +3,18 @@ syntax = "proto3"; package telemetry; enum MetaBackend { - MetaBackendUnspecified = 0; - MetaBackendMemory = 1; - MetaBackendEtcd = 2; - MetaBackendRdb = 3; + META_BACKEND_UNSPECIFIED = 0; + META_BACKEND_MEMORY = 1; + META_BACKEND_ETCD = 2; + META_BACKEND_RDB = 3; } enum TelemetryNodeType { - TelemetryNodeTypeUnspecified = 0; - TelemetryNodeTypeMeta = 1; - TelemetryNodeTypeCompute = 2; - TelemetryNodeTypeFrontend = 3; - TelemetryNodeTypeCompactor = 4; + TELEMETRY_NODE_TYPE_UNSPECIFIED = 0; + TELEMETRY_NODE_TYPE_META = 1; + TELEMETRY_NODE_TYPE_COMPUTE = 2; + TELEMETRY_NODE_TYPE_FRONTEND = 3; + TELEMETRY_NODE_TYPE_COMPACTOR = 4; } message SystemMemory { diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs index dfb4ce385f9a8..382e50d8eb927 100644 --- a/src/common/src/telemetry/mod.rs +++ b/src/common/src/telemetry/mod.rs @@ -70,15 +70,15 @@ pub struct TelemetryReportBase { pub node_type: TelemetryNodeType, } -impl Into for TelemetryReportBase { - fn into(self) -> PbTelemetryReportBase { +impl From for PbTelemetryReportBase { + fn from(val: TelemetryReportBase) -> Self { PbTelemetryReportBase { - tracking_id: self.tracking_id, - session_id: self.session_id, - system_data: Some(self.system_data.into()), - up_time: self.up_time, - report_time: self.time_stamp, - node_type: from_telemetry_node_type(self.node_type) as i32, + tracking_id: val.tracking_id, + session_id: val.session_id, + system_data: Some(val.system_data.into()), + up_time: val.up_time, + report_time: val.time_stamp, + node_type: from_telemetry_node_type(val.node_type) as i32, } } } @@ -182,9 +182,9 @@ fn from_telemetry_node_type(t: TelemetryNodeType) -> PbTelemetryNodeType { } } -impl Into for TelemetryNodeType { - fn into(self) -> PbTelemetryNodeType { - match self { +impl From for PbTelemetryNodeType { + fn from(val: TelemetryNodeType) -> Self { + match val { TelemetryNodeType::Meta => PbTelemetryNodeType::Meta, TelemetryNodeType::Compute => PbTelemetryNodeType::Compute, TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend, @@ -193,39 +193,39 @@ impl Into for TelemetryNodeType { } } -impl Into for Cpu { - fn into(self) -> PbSystemCpu { +impl From for PbSystemCpu { + fn from(val: Cpu) -> Self { PbSystemCpu { - available: self.available, + available: val.available, } } } -impl Into for Memory { - fn into(self) -> PbSystemMemory { +impl From for PbSystemMemory { + fn from(val: Memory) -> Self { PbSystemMemory { - used: self.used as u64, - total: self.total as u64, + used: val.used as u64, + total: val.total as u64, } } } -impl Into for Os { - fn into(self) -> PbSystemOs { +impl From for PbSystemOs { + fn from(val: Os) -> Self { PbSystemOs { - name: self.name, - kernel_version: self.kernel_version, - version: self.version, + name: val.name, + kernel_version: val.kernel_version, + version: val.version, } } } -impl Into for SystemData { - fn into(self) -> PbSystemData { +impl From for PbSystemData { + fn from(val: SystemData) -> Self { PbSystemData { - memory: Some(self.memory.into()), - os: Some(self.os.into()), - cpu: Some(self.cpu.into()), + memory: Some(val.memory.into()), + os: Some(val.os.into()), + cpu: Some(val.cpu.into()), } } }