Skip to content

Commit

Permalink
remove commit_no_data_expected
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jan 30, 2024
1 parent e3e109a commit 96871cb
Show file tree
Hide file tree
Showing 9 changed files with 14 additions and 80 deletions.
18 changes: 0 additions & 18 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,24 +1127,6 @@ where
Ok(())
}

// TODO(st1page): maybe we should extract a pub struct to do it
/// just specially used by those state table read-only and after the call the data
/// in the epoch will be visible
pub fn commit_no_data_expected(&mut self, new_epoch: EpochPair) {
assert_eq!(self.epoch(), new_epoch.prev);
assert!(!self.is_dirty());
// Tick the watermark buffer here because state table is expected to be committed once
// per epoch.
self.watermark_buffer_strategy.tick();
self.local_store.seal_current_epoch(
new_epoch.curr,
SealCurrentEpochOptions {
table_watermarks: None,
switch_op_consistency_level: None,
},
);
}

/// Write to state store.
async fn seal_current_epoch(
&mut self,
Expand Down
7 changes: 1 addition & 6 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ where
}

// consume upstream buffer chunk
let upstream_chunk_buffer_is_empty = upstream_chunk_buffer.is_empty();
for chunk in upstream_chunk_buffer.drain(..) {
cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
// FIXME: Replace with `snapshot_is_processed`
Expand All @@ -395,11 +394,7 @@ where
upstream_table.write_chunk(chunk);
}

if upstream_chunk_buffer_is_empty {
upstream_table.commit_no_data_expected(barrier.epoch)
} else {
upstream_table.commit(barrier.epoch).await?;
}
upstream_table.commit(barrier.epoch).await?;

self.metrics
.arrangement_backfill_snapshot_read_row_count
Expand Down
17 changes: 4 additions & 13 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,7 @@ pub(crate) async fn flush_data<S: StateStore, const IS_REPLICATED: bool>(
) -> StreamExecutorResult<()> {
let vnodes = table.vnodes().clone();
if let Some(old_state) = old_state {
if old_state[1..] == current_partial_state[1..] {
table.commit_no_data_expected(epoch);
return Ok(());
} else {
if old_state[1..] != current_partial_state[1..] {
vnodes.iter_vnodes_scalar().for_each(|vnode| {
let datum = Some(vnode.into());
current_partial_state[0] = datum.clone();
Expand Down Expand Up @@ -632,7 +629,6 @@ pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED:
#[cfg(debug_assertions)] state_len: usize,
vnodes: impl Iterator<Item = VirtualNode>,
) -> StreamExecutorResult<()> {
let mut has_progress = false;
for vnode in vnodes {
if !backfill_state.need_commit(&vnode) {
continue;
Expand Down Expand Up @@ -667,7 +663,6 @@ pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED:
old_row: &encoded_prev_state[..],
new_row: &encoded_current_state[..],
});
has_progress = true;
} else {
// No existing state, create a new entry.
#[cfg(debug_assertions)]
Expand All @@ -680,15 +675,11 @@ pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED:
table.write_record(Record::Insert {
new_row: &encoded_current_state[..],
});
has_progress = true;
}
backfill_state.mark_committed(vnode);
}
if has_progress {
table.commit(epoch).await?;
} else {
table.commit_no_data_expected(epoch);
}

table.commit(epoch).await?;
Ok(())
}

Expand All @@ -712,7 +703,7 @@ pub(crate) async fn persist_state<S: StateStore, const IS_REPLICATED: bool>(
flush_data(table, epoch, old_state, current_state).await?;
*old_state = Some(current_state.into());
} else {
table.commit_no_data_expected(epoch);
table.commit(epoch).await?;
}
Ok(())
}
Expand Down
12 changes: 1 addition & 11 deletions src/stream/src/executor/dedup/append_only_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ impl<S: StateStore> AppendOnlyDedupExecutor<S> {
// The first barrier message should be propagated.
yield Message::Barrier(barrier);

let mut commit_data = false;

#[for_await]
for msg in input {
self.cache.evict();
Expand Down Expand Up @@ -127,21 +125,13 @@ impl<S: StateStore> AppendOnlyDedupExecutor<S> {
let chunk = StreamChunk::with_visibility(ops, columns, vis);
self.state_table.write_chunk(chunk.clone());

commit_data = true;

yield Message::Chunk(chunk);
}
self.state_table.try_flush().await?;
}

Message::Barrier(barrier) => {
if commit_data {
// Only commit when we have new data in this epoch.
self.state_table.commit(barrier.epoch).await?;
commit_data = false;
} else {
self.state_table.commit_no_data_expected(barrier.epoch);
}
self.state_table.commit(barrier.epoch).await?;

if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
let (_prev_vnode_bitmap, cache_may_stale) =
Expand Down
6 changes: 1 addition & 5 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,17 +462,13 @@ impl<S: StateStore, const USE_WATERMARK_CACHE: bool> DynamicFilterExecutor<S, US
if let Some(row) = &current_epoch_row {
self.right_table.insert(row);
}
self.right_table.commit(barrier.epoch).await?;
} else {
self.right_table.commit_no_data_expected(barrier.epoch);
}
// Update the last committed row since it has changed
last_committed_epoch_row = current_epoch_row.clone();
} else {
self.right_table.commit_no_data_expected(barrier.epoch);
}

self.left_table.commit(barrier.epoch).await?;
self.right_table.commit(barrier.epoch).await?;

prev_epoch_value = Some(curr);

Expand Down
3 changes: 0 additions & 3 deletions src/stream/src/executor/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ impl<S: StateStore> NowExecutor<S> {
last_timestamp = state_row.and_then(|row| row[0].clone());
paused = barrier.is_pause_on_startup();
initialized = true;
} else if paused {
// Assert that no data is updated.
state_table.commit_no_data_expected(barrier.epoch);
} else {
state_table.commit(barrier.epoch).await?;
}
Expand Down
15 changes: 4 additions & 11 deletions src/stream/src/executor/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,34 +214,27 @@ impl<S: StateStore> SimpleAggExecutor<S> {
this.intermediate_state_table
.update_without_old_value(encoded_states);

// Commit all state tables.
futures::future::try_join_all(
this.all_state_tables_mut().map(|table| table.commit(epoch)),
)
.await?;

// Retrieve modified states and put the changes into the builders.
vars.agg_group
.build_change(&this.storages, &this.agg_funcs)
.await?
.map(|change| change.to_stream_chunk(&this.info.schema.data_types()))
} else {
// No state is changed.
// Call commit on state table to increment the epoch.
this.all_state_tables_mut().for_each(|table| {
table.commit_no_data_expected(epoch);
});
None
};

// Commit all state tables.
futures::future::try_join_all(this.all_state_tables_mut().map(|table| table.commit(epoch)))
.await?;

vars.state_changed = false;
Ok(chunk)
}

async fn try_flush_data(this: &mut ExecutorInner<S>) -> StreamExecutorResult<()> {
futures::future::try_join_all(this.all_state_tables_mut().map(|table| table.try_flush()))
.await?;

Ok(())
}

Expand Down
11 changes: 1 addition & 10 deletions src/stream/src/executor/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ struct ExecutorInner<S: StateStore> {

struct ExecutionVars<S: StateStore> {
buffer: SortBuffer<S>,
buffer_changed: bool,
}

impl<S: StateStore> Executor for SortExecutor<S> {
Expand Down Expand Up @@ -103,7 +102,6 @@ impl<S: StateStore> SortExecutor<S> {

let mut vars = ExecutionVars {
buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table),
buffer_changed: false,
};

// Populate the sort buffer cache on initialization.
Expand Down Expand Up @@ -131,7 +129,6 @@ impl<S: StateStore> SortExecutor<S> {
if let Some(chunk) = chunk_builder.take() {
yield Message::Chunk(chunk);
}
vars.buffer_changed = true;

yield Message::Watermark(watermark);
}
Expand All @@ -141,16 +138,10 @@ impl<S: StateStore> SortExecutor<S> {
}
Message::Chunk(chunk) => {
vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
vars.buffer_changed = true;
this.buffer_table.try_flush().await?;
}
Message::Barrier(barrier) => {
if vars.buffer_changed {
this.buffer_table.commit(barrier.epoch).await?;
} else {
this.buffer_table.commit_no_data_expected(barrier.epoch);
}
vars.buffer_changed = false;
this.buffer_table.commit(barrier.epoch).await?;

// Update the vnode bitmap for state tables of all agg calls if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) {
Expand Down
5 changes: 2 additions & 3 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,10 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
table.insert(row);
}
}
table.commit(barrier.epoch).await?;
} else {
table.commit_no_data_expected(barrier.epoch);
}

table.commit(barrier.epoch).await?;

if barrier.kind.is_checkpoint() {
if idle_input {
// Align watermark
Expand Down

0 comments on commit 96871cb

Please sign in to comment.