diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 0c42bbe3a3663..b694696842746 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -50,8 +50,8 @@ use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::encoder::TimestampHandlingMode; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, - SinkWriterParam, + DummySinkCommitCoordinator, Result, Sink, SinkCommitCoordinator, SinkError, SinkMetrics, + SinkParam, SinkWriterParam, }; use crate::ConnectorParams; @@ -106,11 +106,13 @@ impl Sink for RemoteSink { const SINK_NAME: &'static str = R::SINK_NAME; async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - Ok( - RemoteSinkWriter::new(self.param.clone(), writer_param.connector_params) - .await? - .into_log_sinker(writer_param.sink_metrics), + Ok(RemoteSinkWriter::new( + self.param.clone(), + writer_param.connector_params, + writer_param.sink_metrics.clone(), ) + .await? + .into_log_sinker(writer_param.sink_metrics)) } async fn validate(&self) -> Result<()> { @@ -224,8 +226,12 @@ impl Sink for CoordinatedRemoteSink { "sink needs coordination should not have singleton input" )) })?, - CoordinatedRemoteSinkWriter::new(self.0.param.clone(), writer_param.connector_params) - .await?, + CoordinatedRemoteSinkWriter::new( + self.0.param.clone(), + writer_param.connector_params, + writer_param.sink_metrics.clone(), + ) + .await?, ) .await? .into_log_sinker(writer_param.sink_metrics)) @@ -355,11 +361,16 @@ pub struct RemoteSinkWriterInner { payload_format: SinkPayloadFormat, stream_handle: SinkWriterStreamJniHandle, json_encoder: JsonEncoder, + sink_metrics: SinkMetrics, _phantom: PhantomData<(SM, R)>, } impl RemoteSinkWriterInner { - pub async fn new(param: SinkParam, connector_params: ConnectorParams) -> Result { + pub async fn new( + param: SinkParam, + connector_params: ConnectorParams, + sink_metrics: SinkMetrics, + ) -> Result { let (request_tx, request_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let (response_tx, response_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); @@ -440,6 +451,7 @@ impl RemoteSinkWriterInner { stream_handle, payload_format: connector_params.sink_payload_format, json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String), + sink_metrics, _phantom: PhantomData, }) } @@ -479,6 +491,7 @@ impl RemoteSinkWriterInner { json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String), stream_handle, payload_format: SinkPayloadFormat::Json, + sink_metrics: SinkMetrics::for_test(), _phantom: PhantomData, } } @@ -529,9 +542,13 @@ where type CommitMetadata = SM; async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + let cardinality = chunk.cardinality(); + self.sink_metrics + .connector_sink_rows_received + .inc_by(cardinality as _); let payload = match self.payload_format { SinkPayloadFormat::Json => { - let mut row_ops = Vec::with_capacity(chunk.cardinality()); + let mut row_ops = Vec::with_capacity(cardinality); for (op, row_ref) in chunk.rows() { let map = self.json_encoder.encode(row_ref)?; let row_op = RowOp {