diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index c0ee1330805bf..e9e9eda31fb7e 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -48,6 +48,9 @@ pub struct SinkExecutor { sink_writer_param: SinkWriterParam, chunk_size: usize, input_data_types: Vec, + need_advance_delete: bool, + re_construct_with_sink_pk: bool, + compact_chunk: bool, } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. @@ -107,46 +110,12 @@ impl SinkExecutor { assert_eq!(sink_input_schema.data_types(), info.schema.data_types()); } - Ok(Self { - actor_context, - info, - input, - sink, - input_columns: columns, - sink_param, - log_store_factory, - sink_writer_param, - chunk_size, - input_data_types, - }) - } - - fn execute_inner(self) -> BoxedMessageStream { - let sink_id = self.sink_param.sink_id; - let actor_id = self.actor_context.id; - let fragment_id = self.actor_context.fragment_id; - - let stream_key = self.info.pk_indices.clone(); - let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics( - sink_id, - actor_id, - fragment_id, - ); - + let stream_key = info.pk_indices.clone(); let stream_key_sink_pk_mismatch = { stream_key .iter() - .any(|i| !self.sink_param.downstream_pk.contains(i)) + .any(|i| !sink_param.downstream_pk.contains(i)) }; - - let input = self.input.execute(); - - let input = input.inspect_ok(move |msg| { - if let Message::Chunk(c) = msg { - metrics.sink_input_row_count.inc_by(c.capacity() as u64); - } - }); - // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order // stream key: a,b // sink pk: a @@ -178,27 +147,71 @@ impl SinkExecutor { // after compacting with the stream key, the two event with the same user defined sink pk must have different stream key. // So the delete event is not to delete the inserted record in our internal streaming SQL semantic. let need_advance_delete = - stream_key_sink_pk_mismatch && self.sink_param.sink_type != SinkType::AppendOnly; + stream_key_sink_pk_mismatch && sink_param.sink_type != SinkType::AppendOnly; // NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case. let re_construct_with_sink_pk = need_advance_delete - && self.sink_param.sink_type == SinkType::Upsert - && !self.sink_param.downstream_pk.is_empty(); + && sink_param.sink_type == SinkType::Upsert + && !sink_param.downstream_pk.is_empty(); // Don't compact chunk for blackhole sink for better benchmark performance. - let compact_chunk = !self.sink.is_blackhole(); - tracing::info!("Sink info: sink_id: {} actor_id: {}, need_advance_delete: {}, re_construct_with_sink_pk: {}", - sink_id, actor_id, need_advance_delete, re_construct_with_sink_pk); + let compact_chunk = !sink.is_blackhole(); + + tracing::info!( + sink_id = sink_param.sink_id.sink_id, + actor_id = actor_context.id, + need_advance_delete, + re_construct_with_sink_pk, + compact_chunk, + "Sink executor info" + ); + + Ok(Self { + actor_context, + info, + input, + sink, + input_columns: columns, + sink_param, + log_store_factory, + sink_writer_param, + chunk_size, + input_data_types, + need_advance_delete, + re_construct_with_sink_pk, + compact_chunk, + }) + } + + fn execute_inner(self) -> BoxedMessageStream { + let sink_id = self.sink_param.sink_id; + let actor_id = self.actor_context.id; + let fragment_id = self.actor_context.fragment_id; + + let stream_key = self.info.pk_indices.clone(); + let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics( + sink_id, + actor_id, + fragment_id, + ); + + let input = self.input.execute(); + + let input = input.inspect_ok(move |msg| { + if let Message::Chunk(c) = msg { + metrics.sink_input_row_count.inc_by(c.capacity() as u64); + } + }); let processed_input = Self::process_msg( input, self.sink_param.sink_type, stream_key, - need_advance_delete, - re_construct_with_sink_pk, + self.need_advance_delete, + self.re_construct_with_sink_pk, self.chunk_size, self.input_data_types, self.sink_param.downstream_pk.clone(), metrics.sink_chunk_buffer_size, - compact_chunk, + self.compact_chunk, ); if self.sink.is_sink_into_table() {