Skip to content

Commit

Permalink
remove state_table::get_schema_len
Browse files Browse the repository at this point in the history
- we just dynamically check row_length with state length
  • Loading branch information
kwannoel committed Sep 20, 2023
1 parent dce7f58 commit 1416f60
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 38 deletions.
16 changes: 0 additions & 16 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,22 +1293,6 @@ where
}
}

impl<
S,
SD,
const IS_REPLICATED: bool,
W: WatermarkBufferStrategy,
const USE_WATERMARK_CACHE: bool,
> StateTableInner<S, SD, IS_REPLICATED, W, USE_WATERMARK_CACHE>
where
S: StateStore,
SD: ValueRowSerde,
{
pub fn get_schema_len(&self) -> usize {
self.pk_indices.len() + self.value_indices.as_ref().map_or(0, |v| v.len())
}
}

pub type KeyedRowStream<'a, S: StateStore, SD: ValueRowSerde + 'a> =
impl Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;

Expand Down
14 changes: 3 additions & 11 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ where
// +1 for vnode, +1 for backfill_finished, +1 for row_count.
let state_len = pk_in_output_indices.len() + 3;

// Handle backwards compatibility of old schema:
// | vnode | pk ... | backfill_finished |
let row_count_is_persisted = if let Some(state_table) = self.state_table.as_ref() {
state_table.get_schema_len() == state_len
} else {
false
};

let pk_order = self.upstream_table.pk_serializer().get_order_types();

let upstream_table_id = self.upstream_table.table_id().table_id;
Expand All @@ -153,7 +145,7 @@ where
}

let is_finished = if let Some(state_table) = self.state_table.as_ref() {
let is_finished = check_all_vnode_finished(state_table, row_count_is_persisted).await?;
let is_finished = check_all_vnode_finished(state_table, state_len).await?;
if is_finished {
assert!(!first_barrier.is_newly_added(self.actor_id));
}
Expand Down Expand Up @@ -228,8 +220,8 @@ where

// Keep track of rows from the snapshot.
let mut total_snapshot_processed_rows: u64 =
if let Some(state_table) = self.state_table.as_ref() && row_count_is_persisted {
get_row_count_state(state_table).await?
if let Some(state_table) = self.state_table.as_ref() {
get_row_count_state(state_table, state_len).await?
} else {
// Maintain backwards compatibility with no state_table.
0
Expand Down
25 changes: 14 additions & 11 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,19 +308,21 @@ pub(crate) async fn get_progress_per_vnode<S: StateStore, const IS_REPLICATED: b
Ok(result)
}

pub(crate) fn get_backfill_finished(row: OwnedRow, row_count_is_persisted: bool) -> Option<bool> {
let datum = if row_count_is_persisted {
// Row count is the last datum in the schema.
// backfill is second last.
row.datum_at(row.len() - 2)
} else {
// handle backwards compat case where there isn't a row count persisted.
pub(crate) fn get_backfill_finished(row: OwnedRow, state_len: usize) -> Option<bool> {
let datum = if row.len() < state_len {
// Handle backwards compatibility case where
// we did not have row count.
row.last()
} else {
row.datum_at(row.len() - 2)
};
datum.map(|d| d.into_bool())
}

pub(crate) fn get_row_count(row: OwnedRow) -> u64 {
pub(crate) fn get_row_count(row: OwnedRow, state_len: usize) -> u64 {
if row.len() < state_len {
return 0;
}
match row.last() {
None => 0,
Some(d) => d.into_int64() as u64,
Expand All @@ -329,22 +331,23 @@ pub(crate) fn get_row_count(row: OwnedRow) -> u64 {

pub(crate) async fn get_row_count_state<S: StateStore, const IS_REPLICATED: bool>(
state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
state_len: usize,
) -> StreamExecutorResult<u64> {
let mut vnodes = state_table.vnodes().iter_vnodes_scalar();
let vnode = vnodes.next().unwrap();
let key: &[Datum] = &[Some(vnode.into())];
let row = state_table.get_row(key).await?;
let row_count = match row {
None => 0,
Some(row) => get_row_count(row),
Some(row) => get_row_count(row, state_len),
};
Ok(row_count)
}

/// All vnodes should be persisted with status finished.
pub(crate) async fn check_all_vnode_finished<S: StateStore, const IS_REPLICATED: bool>(
state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
row_count_is_persisted: bool,
state_len: usize,
) -> StreamExecutorResult<bool> {
debug_assert!(!state_table.vnode_bitmap().is_empty());
let vnodes = state_table.vnodes().iter_vnodes_scalar();
Expand All @@ -354,7 +357,7 @@ pub(crate) async fn check_all_vnode_finished<S: StateStore, const IS_REPLICATED:
let row = state_table.get_row(key).await?;

let vnode_is_finished = if let Some(row) = row
&& let Some(vnode_is_finished) = get_backfill_finished(row, row_count_is_persisted)
&& let Some(vnode_is_finished) = get_backfill_finished(row, state_len)
{
vnode_is_finished
} else {
Expand Down

0 comments on commit 1416f60

Please sign in to comment.