Skip to content

Commit

Permalink
sink report
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Jun 28, 2024
1 parent fb3aa40 commit 21c029e
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use risingwave_pb::catalog::Table;
use risingwave_pb::plan_common::PbColumnCatalog;
use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
use risingwave_pb::telemetry::{PbTelemetryConnectorDirection, PbTelemetryEventStage};
use serde_json::json;

use super::*;
use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
Expand All @@ -37,14 +38,21 @@ use crate::telemetry::report_event;

pub struct SinkExecutorBuilder;

fn telemetry_sink_build(sink_id: &SinkId) {
fn telemetry_sink_build(
sink_id: &SinkId,
connector_name: &str,
sink_format_desc: &Option<SinkFormatDesc>,
) {
let attr = sink_format_desc
.as_ref()
.map(|f| json!({"format": f.format, "encode": f.encode}).to_string());
report_event(
PbTelemetryEventStage::CreateStreamJob,
"sink".to_string(),
sink_id.sink_id() as i64,
None,
Some(connector_name.to_string()),
Some(PbTelemetryConnectorDirection::Sink),
None,
attr,
)
}

Expand Down

0 comments on commit 21c029e

Please sign in to comment.