From 3a13fb31059c69c85e8cf0123712e61ce6991e17 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 29 Feb 2024 16:51:24 +0800 Subject: [PATCH] resolve comments --- src/stream/src/executor/source/kafka_backfill_executor.rs | 6 +++--- .../src/executor/source/kafka_backfill_state_table.rs | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/stream/src/executor/source/kafka_backfill_executor.rs b/src/stream/src/executor/source/kafka_backfill_executor.rs index 7bc02620b38f2..f319d276b7a06 100644 --- a/src/stream/src/executor/source/kafka_backfill_executor.rs +++ b/src/stream/src/executor/source/kafka_backfill_executor.rs @@ -253,7 +253,7 @@ impl KafkaBackfillExecutorInner { unreachable!("Partition and offset columns must be set."); }; - let mut boot_state = Vec::default(); + let mut owned_splits = Vec::default(); if let Some(mutation) = barrier.mutation.as_ref() { match mutation.as_ref() { Mutation::Add(AddMutation { splits, .. }) @@ -262,7 +262,7 @@ impl KafkaBackfillExecutorInner { .. }) => { if let Some(splits) = splits.get(&self.actor_ctx.id) { - boot_state = splits.clone(); + owned_splits = splits.clone(); } } _ => {} @@ -272,7 +272,7 @@ impl KafkaBackfillExecutorInner { let mut backfill_states: BackfillStates = HashMap::new(); let mut unfinished_splits = Vec::new(); - for mut split in boot_state { + for mut split in owned_splits { let split_id = split.id(); let backfill_state = self .backfill_state_store diff --git a/src/stream/src/executor/source/kafka_backfill_state_table.rs b/src/stream/src/executor/source/kafka_backfill_state_table.rs index 2fe4c66f5e269..cd6ee38ba4e63 100644 --- a/src/stream/src/executor/source/kafka_backfill_state_table.rs +++ b/src/stream/src/executor/source/kafka_backfill_state_table.rs @@ -17,7 +17,7 @@ use std::ops::Bound; use futures::{pin_mut, StreamExt}; use risingwave_common::row; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{JsonbVal, ScalarImpl, ScalarRef, ScalarRefImpl}; +use risingwave_common::types::{ScalarImpl, ScalarRef, ScalarRefImpl}; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::source::SplitId; use risingwave_pb::catalog::PbTable; @@ -79,10 +79,10 @@ impl BackfillStateTableHandler { Ok(ret) } - pub async fn set(&mut self, key: SplitId, value: JsonbVal) -> StreamExecutorResult<()> { + async fn set(&mut self, key: SplitId, state: BackfillState) -> StreamExecutorResult<()> { let row = [ Some(Self::string_to_scalar(key.as_ref())), - Some(ScalarImpl::Jsonb(value)), + Some(ScalarImpl::Jsonb(state.encode_to_json())), ]; match self.get(&key).await? { Some(prev_row) => { @@ -105,7 +105,7 @@ impl BackfillStateTableHandler { pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> { for (split_id, state) in states { - self.set(split_id, state.encode_to_json()).await?; + self.set(split_id, state).await?; } Ok(()) }