Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Dec 28, 2023
1 parent 7d23d03 commit 2c00dce
Showing 1 changed file with 29 additions and 32 deletions.
61 changes: 29 additions & 32 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
})
}

#[try_stream(boxed, ok = Message, error = StreamExecutorError)]
async fn execute_inner(self) {
fn execute_inner(self) -> BoxedMessageStream {
let sink_id = self.sink_param.sink_id;
let actor_id = self.actor_context.id;

Expand Down Expand Up @@ -139,43 +138,41 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
}
});

let processed_input = Self::process_input(
let processed_input = Self::process_msg(
input,
self.sink_param.sink_type,
stream_key,
stream_key_sink_pk_mismatch,
);

if self.sink.is_sink_into_table() {
#[for_await]
for msg in processed_input {
yield msg?;
}
processed_input.boxed()
} else {
let (log_reader, log_writer) = self.log_store_factory.build().await;

let write_log_stream = Self::execute_write_log(
processed_input,
log_writer.monitored(self.sink_writer_param.sink_metrics.clone()),
actor_id,
);

let output = dispatch_sink!(self.sink, sink, {
let consume_log_stream = Self::execute_consume_log(
sink,
log_reader,
self.input_columns,
self.sink_writer_param,
self.actor_context,
self.info,
);
// TODO: may try to remove the boxed
select(consume_log_stream.into_stream(), write_log_stream).boxed()
});
#[for_await]
for msg in output {
yield msg?;
}
self.log_store_factory
.build()
.map(move |(log_reader, log_writer)| {
let write_log_stream = Self::execute_write_log(
processed_input,
log_writer.monitored(self.sink_writer_param.sink_metrics.clone()),
actor_id,
);

dispatch_sink!(self.sink, sink, {
let consume_log_stream = Self::execute_consume_log(
sink,
log_reader,
self.input_columns,
self.sink_writer_param,
self.actor_context,
self.info,
);
// TODO: may try to remove the boxed
select(consume_log_stream.into_stream(), write_log_stream).boxed()
})
})
.into_stream()
.flatten()
.boxed()
}
}

Expand Down Expand Up @@ -227,7 +224,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn process_input(
async fn process_msg(
input: impl MessageStream,
sink_type: SinkType,
stream_key: PkIndices,
Expand Down

0 comments on commit 2c00dce

Please sign in to comment.