Skip to content

Commit

Permalink
fix compilation with new Executor trait
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 28, 2024
1 parent 5820bad commit 2d9c787
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 17 deletions.
18 changes: 3 additions & 15 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl BackfillState {
pub struct KafkaBackfillExecutor<S: StateStore> {
pub inner: KafkaBackfillExecutorInner<S>,
/// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
pub input: Box<dyn Executor>,
pub input: Executor,
}

pub struct KafkaBackfillExecutorInner<S: StateStore> {
Expand Down Expand Up @@ -222,7 +222,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute(mut self, input: BoxedExecutor) {
async fn execute(mut self, input: Executor) {
let mut input = input.execute();

// Poll the upstream to get the first barrier.
Expand Down Expand Up @@ -805,22 +805,10 @@ fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
a.cmp(&b)
}

impl<S: StateStore> Executor for KafkaBackfillExecutor<S> {
impl<S: StateStore> Execute for KafkaBackfillExecutor<S> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.inner.execute(self.input).boxed()
}

fn schema(&self) -> &Schema {
&self.inner.info.schema
}

fn pk_indices(&self) -> PkIndicesRef<'_> {
&self.inner.info.pk_indices
}

fn identity(&self) -> &str {
&self.inner.info.identity
}
}

impl<S: StateStore> Debug for KafkaBackfillExecutorInner<S> {
Expand Down
9 changes: 7 additions & 2 deletions src/stream/src/from_proto/source_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ExecutorBuilder for KafkaBackfillExecutorBuilder {
params: ExecutorParams,
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<BoxedExecutor> {
) -> StreamResult<Executor> {
let [input]: [_; 1] = params.input.try_into().unwrap();

// let (sender, barrier_receiver) = unbounded_channel();
Expand Down Expand Up @@ -103,6 +103,7 @@ impl ExecutorBuilder for KafkaBackfillExecutorBuilder {

let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: None,
};

let source_column_ids: Vec<_> = source_columns
Expand Down Expand Up @@ -140,6 +141,10 @@ impl ExecutorBuilder for KafkaBackfillExecutorBuilder {
params.env.connector_params(),
backfill_state_table,
);
Ok(KafkaBackfillExecutor { inner: exec, input }.boxed())
Ok((
params.info,
KafkaBackfillExecutor { inner: exec, input }.boxed(),
)
.into())
}
}

0 comments on commit 2d9c787

Please sign in to comment.