Skip to content

Commit

Permalink
cleanup state table changes
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Apr 20, 2024
1 parent c3c04e0 commit 8e3a558
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 24 deletions.
6 changes: 0 additions & 6 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,12 @@ pub struct Reschedule {
pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
}

/// Replacing an old table with a new one. Used for `ALTER TABLE` and sink into table. All actors in the table job will be rebuilt.
#[derive(Debug, Clone)]
pub struct ReplaceTablePlan {
pub old_table_fragments: TableFragments,
pub new_table_fragments: TableFragments,
pub merge_updates: Vec<MergeUpdate>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
/// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
/// We need to reassign splits for it.
///
/// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
/// backfill_splits.
pub init_split_assignment: SplitAssignment,
}

Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,6 @@ where
W: WatermarkBufferStrategy,
{
/// Create state table from table catalog and store.
///
/// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
pub async fn from_table_catalog(
table_catalog: &Table,
store: S,
Expand Down Expand Up @@ -1358,6 +1356,9 @@ where
/// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
/// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
/// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same
/// `vnode`.
pub async fn iter_with_prefix(
&self,
pk_prefix: impl Row,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub struct BackfillStateTableHandler<S: StateStore> {
}

impl<S: StateStore> BackfillStateTableHandler<S> {
/// See also [`super::SourceStateTableHandler::from_table_catalog`] for how the state table looks like.
pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
Self {
state_store: StateTable::from_table_catalog(table_catalog, store, None).await,
Expand Down
9 changes: 0 additions & 9 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,12 @@ pub struct SourceStateTableHandler<S: StateStore> {
}

impl<S: StateStore> SourceStateTableHandler<S> {
/// Creates a state table with singleton distribution (only one vnode 0).
///
/// `SourceExecutor` is distributed, but its state table is singleton.
/// This is because the `SourceExecutor`s states are not sharded by consistent hash.
/// The state table is `partition_id -> offset_info`, but the partitions (a.k.a split) each actor accesses
/// are assigned by `SourceManager` in meta, instead of `vnode` computed from the `partition_id`.
///
/// See also `infer_internal_table_catalog` in `src/frontend/src/optimizer/plan_node/generic/source.rs`.
pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
Self {
state_table: StateTable::from_table_catalog(table_catalog, store, None).await,
}
}

/// For [`super::FsFetchExecutor`], each actor accesses splits according to the `vnode` computed from `partition_id`.
pub async fn from_table_catalog_with_vnodes(
table_catalog: &PbTable,
store: S,
Expand Down
11 changes: 5 additions & 6 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,11 @@ impl ExecutorBuilder for SourceExecutorBuilder {
.map(|column| column.column_id)
.collect();

let mut state_table = source.state_table.clone().unwrap();
// To make it possible to scan the whole state table.
// This is quite wild, can we do this?
state_table.read_prefix_len_hint = 0;
let state_table_handler =
SourceStateTableHandler::from_table_catalog(&state_table, store.clone()).await;
let state_table_handler = SourceStateTableHandler::from_table_catalog(
source.state_table.as_ref.unwrap(),
store.clone(),
)
.await;
let stream_source_core = StreamSourceCore::new(
source_id,
source_name,
Expand Down

0 comments on commit 8e3a558

Please sign in to comment.