Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 29, 2024
1 parent ccdaba9 commit 0deefbb
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
6 changes: 3 additions & 3 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
unreachable!("Partition and offset columns must be set.");
};

let mut boot_state = Vec::default();
let mut owned_splits = Vec::default();
if let Some(mutation) = barrier.mutation.as_ref() {
match mutation.as_ref() {
Mutation::Add(AddMutation { splits, .. })
Expand All @@ -262,7 +262,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
..
}) => {
if let Some(splits) = splits.get(&self.actor_ctx.id) {
boot_state = splits.clone();
owned_splits = splits.clone();
}
}
_ => {}
Expand All @@ -272,7 +272,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {

let mut backfill_states: BackfillStates = HashMap::new();
let mut unfinished_splits = Vec::new();
for mut split in boot_state {
for mut split in owned_splits {
let split_id = split.id();
let backfill_state = self
.backfill_state_store
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/source/kafka_backfill_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::ops::Bound;
use futures::{pin_mut, StreamExt};
use risingwave_common::row;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{JsonbVal, ScalarImpl, ScalarRef, ScalarRefImpl};
use risingwave_common::types::{ScalarImpl, ScalarRef, ScalarRefImpl};
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::source::SplitId;
use risingwave_pb::catalog::PbTable;
Expand Down Expand Up @@ -79,10 +79,10 @@ impl<S: StateStore> BackfillStateTableHandler<S> {
Ok(ret)
}

pub async fn set(&mut self, key: SplitId, value: JsonbVal) -> StreamExecutorResult<()> {
async fn set(&mut self, key: SplitId, state: BackfillState) -> StreamExecutorResult<()> {
let row = [
Some(Self::string_to_scalar(key.as_ref())),
Some(ScalarImpl::Jsonb(value)),
Some(ScalarImpl::Jsonb(state.encode_to_json())),
];
match self.get(&key).await? {
Some(prev_row) => {
Expand All @@ -105,7 +105,7 @@ impl<S: StateStore> BackfillStateTableHandler<S> {

pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> {
for (split_id, state) in states {
self.set(split_id, state.encode_to_json()).await?;
self.set(split_id, state).await?;
}
Ok(())
}
Expand Down

0 comments on commit 0deefbb

Please sign in to comment.