From 2c00dcebf469d9bbb11ba44ae05c4a16be4b4614 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 28 Dec 2023 23:13:01 +0800 Subject: [PATCH] refactor --- src/stream/src/executor/sink.rs | 61 ++++++++++++++++----------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 5dcaa65679ea0..505e0e47e62d5 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -108,8 +108,7 @@ impl SinkExecutor { }) } - #[try_stream(boxed, ok = Message, error = StreamExecutorError)] - async fn execute_inner(self) { + fn execute_inner(self) -> BoxedMessageStream { let sink_id = self.sink_param.sink_id; let actor_id = self.actor_context.id; @@ -139,7 +138,7 @@ impl SinkExecutor { } }); - let processed_input = Self::process_input( + let processed_input = Self::process_msg( input, self.sink_param.sink_type, stream_key, @@ -147,35 +146,33 @@ impl SinkExecutor { ); if self.sink.is_sink_into_table() { - #[for_await] - for msg in processed_input { - yield msg?; - } + processed_input.boxed() } else { - let (log_reader, log_writer) = self.log_store_factory.build().await; - - let write_log_stream = Self::execute_write_log( - processed_input, - log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), - actor_id, - ); - - let output = dispatch_sink!(self.sink, sink, { - let consume_log_stream = Self::execute_consume_log( - sink, - log_reader, - self.input_columns, - self.sink_writer_param, - self.actor_context, - self.info, - ); - // TODO: may try to remove the boxed - select(consume_log_stream.into_stream(), write_log_stream).boxed() - }); - #[for_await] - for msg in output { - yield msg?; - } + self.log_store_factory + .build() + .map(move |(log_reader, log_writer)| { + let write_log_stream = Self::execute_write_log( + processed_input, + log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), + actor_id, + ); + + dispatch_sink!(self.sink, sink, { + let consume_log_stream = Self::execute_consume_log( + sink, + log_reader, + self.input_columns, + self.sink_writer_param, + self.actor_context, + self.info, + ); + // TODO: may try to remove the boxed + select(consume_log_stream.into_stream(), write_log_stream).boxed() + }) + }) + .into_stream() + .flatten() + .boxed() } } @@ -227,7 +224,7 @@ impl SinkExecutor { } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn process_input( + async fn process_msg( input: impl MessageStream, sink_type: SinkType, stream_key: PkIndices,