Skip to content

Commit

Permalink
feat(telemetry): support report event to telemetry (#17486) (#17835)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabversion <[email protected]>
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Jul 29, 2024
1 parent 0875640 commit 9f3af5a
Show file tree
Hide file tree
Showing 18 changed files with 421 additions and 53 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,44 @@ message FrontendReport {
message CompactorReport {
ReportBase base = 1;
}

enum TelemetryEventStage {
TELEMETRY_EVENT_STAGE_UNSPECIFIED = 0;
TELEMETRY_EVENT_STAGE_CREATE_STREAM_JOB = 1;
TELEMETRY_EVENT_STAGE_UPDATE_STREAM_JOB = 2;
TELEMETRY_EVENT_STAGE_DROP_STREAM_JOB = 3;
TELEMETRY_EVENT_STAGE_QUERY = 4;
TELEMETRY_EVENT_STAGE_RECOVERY = 5;
}

enum TelemetryDatabaseObject {
TELEMETRY_DATABASE_OBJECT_UNSPECIFIED = 0;
TELEMETRY_DATABASE_OBJECT_SOURCE = 1;
TELEMETRY_DATABASE_OBJECT_MV = 2;
TELEMETRY_DATABASE_OBJECT_TABLE = 3;
TELEMETRY_DATABASE_OBJECT_SINK = 4;
TELEMETRY_DATABASE_OBJECT_INDEX = 5;
}

message EventMessage {
// tracking_id is persistent in meta data
string tracking_id = 1;
// event_time is when the event is created
uint64 event_time_sec = 3;
// event_stage describes in which process the event happens
TelemetryEventStage event_stage = 4;
// feature_name is the name of the feature triggered the event
string event_name = 5;
// connector_name is the name of the connector involves
optional string connector_name = 6;
// connector_direction is the direction of data flow, can be source or sink
optional TelemetryDatabaseObject object = 10;
// catalog_id is the id of the catalog involves (table_id/source_id/...)
int64 catalog_id = 7;
// attributes is the additional information of the event: json format ser to string
optional string attributes = 8;
// node is the node that creates the event
string node = 9;
// mark the event is a test message
bool is_test = 11;
}
114 changes: 112 additions & 2 deletions src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbTelemetryDatabaseObject,
TelemetryEventStage as PbTelemetryEventStage,
};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use uuid::Uuid;

use super::{Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
use super::{current_timestamp, Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
use crate::telemetry::pb_compatible::TelemetryToProtobuf;
use crate::telemetry::post_telemetry_report_pb;

Expand All @@ -42,6 +47,8 @@ pub trait TelemetryReportCreator {
fn report_type(&self) -> &str;
}

static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();

pub async fn start_telemetry_reporting<F, I>(
info_fetcher: Arc<I>,
report_creator: Arc<F>,
Expand Down Expand Up @@ -74,6 +81,13 @@ where
return;
}
};
TELEMETRY_TRACKING_ID
.set(tracking_id.clone())
.unwrap_or_else(|_| {
tracing::warn!(
"Telemetry failed to set tracking_id, event reporting will be disabled"
)
});

loop {
tokio::select! {
Expand Down Expand Up @@ -112,3 +126,99 @@ where
});
(join_handle, shutdown_tx)
}

pub fn report_event_common(
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, // any json string
node: String,
) {
let event_tracking_id: String;
if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
event_tracking_id = tracking_id.to_string();
} else {
tracing::info!("Telemetry tracking_id is not set, event reporting disabled");
return;
}

request_to_telemetry_event(
event_tracking_id,
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
false,
);
}

fn request_to_telemetry_event(
tracking_id: String,
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, // any json string
node: String,
is_test: bool,
) {
let event = PbEventMessage {
tracking_id,
event_time_sec: current_timestamp(),
event_stage: event_stage as i32,
event_name: event_name.to_string(),
connector_name,
object: object.map(|c| c as i32),
catalog_id,
attributes: attributes.map(|a| a.to_string()),
node,
is_test,
};
let report_bytes = event.encode_to_vec();

tokio::spawn(async move {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
post_telemetry_report_pb(&url, report_bytes)
.await
.unwrap_or_else(|e| tracing::info!("{}", e))
});
}

#[cfg(test)]
mod test {

use super::*;

#[ignore]
#[tokio::test]
async fn test_telemetry_report_event() {
let event_stage = PbTelemetryEventStage::CreateStreamJob;
let event_name = "test_feature";
let catalog_id = 1;
let connector_name = Some("test_connector".to_string());
let object = Some(PbTelemetryDatabaseObject::Source);
let attributes = None;
let node = "test_node".to_string();

request_to_telemetry_event(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
true,
);

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ tokio-stream = "0.1"
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tracing = "0.1"
uuid = { version = "1.8.0", features = ["v4"] }

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
18 changes: 16 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod desc;

use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};

use anyhow::anyhow;
use itertools::Itertools;
Expand All @@ -28,6 +29,7 @@ use risingwave_pb::catalog::{
PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
};
use risingwave_pb::secret::PbSecretRef;
use serde_derive::Serialize;

use super::{
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
Expand Down Expand Up @@ -125,15 +127,21 @@ pub struct SinkFormatDesc {
}

/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub enum SinkFormat {
AppendOnly,
Upsert,
Debezium,
}

impl Display for SinkFormat {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub enum SinkEncode {
Json,
Protobuf,
Expand All @@ -142,6 +150,12 @@ pub enum SinkEncode {
Text,
}

impl Display for SinkEncode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl SinkFormatDesc {
pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
use crate::sink::kafka::KafkaSink;
Expand Down
23 changes: 22 additions & 1 deletion src/frontend/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,35 @@

use prost::Message;
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::report::TelemetryReportCreator;
use risingwave_common::telemetry::report::{report_event_common, TelemetryReportCreator};
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult,
};
use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
use serde::{Deserialize, Serialize};

const TELEMETRY_FRONTEND_REPORT_TYPE: &str = "frontend";

#[allow(dead_code)] // please remove when used
pub(crate) fn report_event(
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
component: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, // any json string
) {
report_event_common(
event_stage,
event_name,
catalog_id,
connector_name,
component,
attributes,
TELEMETRY_FRONTEND_REPORT_TYPE.to_string(),
);
}

#[derive(Clone, Copy)]
pub(crate) struct FrontendTelemetryCreator {}

Expand Down
21 changes: 11 additions & 10 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
hex = "0.4"
hyper = "0.14" # required by tonic
itertools = { workspace = true }
jsonbb = { workspace = true }
maplit = "1.0.2"
memcomparable = { version = "0.2" }
mime_guess = "2"
Expand Down Expand Up @@ -71,12 +72,12 @@ sync-point = { path = "../utils/sync-point" }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
"signal",
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
"signal",
] }
tokio-retry = "0.3"
tokio-stream = { version = "0.1", features = ["net"] }
Expand All @@ -89,10 +90,10 @@ uuid = { version = "1", features = ["v4"] }
[target.'cfg(not(madsim))'.dependencies]
axum = { workspace = true }
tower-http = { version = "0.5", features = [
"add-extension",
"cors",
"fs",
"compression-gzip",
"add-extension",
"cors",
"fs",
"compression-gzip",
] }
workspace-hack = { path = "../workspace-hack" }

Expand Down
Loading

0 comments on commit 9f3af5a

Please sign in to comment.