Skip to content

Commit

Permalink
fix(connector): add connector sink metric back (#12812)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Oct 15, 2023
1 parent b441341 commit ec60ee2
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::encoder::TimestampHandlingMode;
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkCommitCoordinator, SinkError, SinkParam,
SinkWriterParam,
DummySinkCommitCoordinator, Result, Sink, SinkCommitCoordinator, SinkError, SinkMetrics,
SinkParam, SinkWriterParam,
};
use crate::ConnectorParams;

Expand Down Expand Up @@ -106,11 +106,13 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
const SINK_NAME: &'static str = R::SINK_NAME;

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(
RemoteSinkWriter::new(self.param.clone(), writer_param.connector_params)
.await?
.into_log_sinker(writer_param.sink_metrics),
Ok(RemoteSinkWriter::new(
self.param.clone(),
writer_param.connector_params,
writer_param.sink_metrics.clone(),
)
.await?
.into_log_sinker(writer_param.sink_metrics))
}

async fn validate(&self) -> Result<()> {
Expand Down Expand Up @@ -224,8 +226,12 @@ impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
"sink needs coordination should not have singleton input"
))
})?,
CoordinatedRemoteSinkWriter::new(self.0.param.clone(), writer_param.connector_params)
.await?,
CoordinatedRemoteSinkWriter::new(
self.0.param.clone(),
writer_param.connector_params,
writer_param.sink_metrics.clone(),
)
.await?,
)
.await?
.into_log_sinker(writer_param.sink_metrics))
Expand Down Expand Up @@ -355,11 +361,16 @@ pub struct RemoteSinkWriterInner<SM, R: RemoteSinkTrait> {
payload_format: SinkPayloadFormat,
stream_handle: SinkWriterStreamJniHandle,
json_encoder: JsonEncoder,
sink_metrics: SinkMetrics,
_phantom: PhantomData<(SM, R)>,
}

impl<SM, R: RemoteSinkTrait> RemoteSinkWriterInner<SM, R> {
pub async fn new(param: SinkParam, connector_params: ConnectorParams) -> Result<Self> {
pub async fn new(
param: SinkParam,
connector_params: ConnectorParams,
sink_metrics: SinkMetrics,
) -> Result<Self> {
let (request_tx, request_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let (response_tx, response_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

Expand Down Expand Up @@ -440,6 +451,7 @@ impl<SM, R: RemoteSinkTrait> RemoteSinkWriterInner<SM, R> {
stream_handle,
payload_format: connector_params.sink_payload_format,
json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String),
sink_metrics,
_phantom: PhantomData,
})
}
Expand Down Expand Up @@ -479,6 +491,7 @@ impl<SM, R: RemoteSinkTrait> RemoteSinkWriterInner<SM, R> {
json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String),
stream_handle,
payload_format: SinkPayloadFormat::Json,
sink_metrics: SinkMetrics::for_test(),
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -529,9 +542,13 @@ where
type CommitMetadata = SM;

async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
let cardinality = chunk.cardinality();
self.sink_metrics
.connector_sink_rows_received
.inc_by(cardinality as _);
let payload = match self.payload_format {
SinkPayloadFormat::Json => {
let mut row_ops = Vec::with_capacity(chunk.cardinality());
let mut row_ops = Vec::with_capacity(cardinality);
for (op, row_ref) in chunk.rows() {
let map = self.json_encoder.encode(row_ref)?;
let row_op = RowOp {
Expand Down

0 comments on commit ec60ee2

Please sign in to comment.