From 21c029efa3f507ca157065cde9b463b1aed609af Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 28 Jun 2024 15:37:32 +0800 Subject: [PATCH] sink report Signed-off-by: tabVersion --- src/stream/src/from_proto/sink.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 41a842151640f..d11c5ddd82dc6 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -26,6 +26,7 @@ use risingwave_pb::catalog::Table; use risingwave_pb::plan_common::PbColumnCatalog; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; use risingwave_pb::telemetry::{PbTelemetryConnectorDirection, PbTelemetryEventStage}; +use serde_json::json; use super::*; use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; @@ -37,14 +38,21 @@ use crate::telemetry::report_event; pub struct SinkExecutorBuilder; -fn telemetry_sink_build(sink_id: &SinkId) { +fn telemetry_sink_build( + sink_id: &SinkId, + connector_name: &str, + sink_format_desc: &Option, +) { + let attr = sink_format_desc + .as_ref() + .map(|f| json!({"format": f.format, "encode": f.encode}).to_string()); report_event( PbTelemetryEventStage::CreateStreamJob, "sink".to_string(), sink_id.sink_id() as i64, - None, + Some(connector_name.to_string()), Some(PbTelemetryConnectorDirection::Sink), - None, + attr, ) }