Skip to content

Commit

Permalink
doc: add comments for source executor's state table (#16499)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Apr 27, 2024
1 parent 4ba2e66 commit 9996e75
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 24 deletions.
30 changes: 22 additions & 8 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,27 @@ impl Source {
(self, original_row_id_index)
}

pub fn infer_internal_table_catalog(require_dist_key: bool) -> TableCatalog {
// note that source's internal table is to store partition_id -> offset mapping and its
// schema is irrelevant to input schema
// On the premise of ensuring that the materialized_source data can be cleaned up, keep the
// state in source.
// Source state doesn't maintain retention_seconds, internal_table_subset function only
// returns retention_seconds so default is used here
/// Source's state table is `partition_id -> offset_info`.
/// Its schema is irrelevant to the data's schema.
///
/// ## Notes on the distribution of the state table (`is_distributed`)
///
/// Source executors are always distributed, but their state tables are special.
///
/// ### `StreamSourceExecutor`: singleton (only one vnode)
///
/// Its states are not sharded by consistent hash.
///
/// Each actor accesses (point get) some partitions (a.k.a splits).
/// They are assigned by `SourceManager` in meta,
/// instead of `vnode` computed from the `partition_id`.
///
/// ### `StreamFsFetch`: distributed by `partition_id`
///
/// Each actor accesses (range scan) splits according to the `vnode`
/// computed from `partition_id`.
/// This is a normal distributed table.
pub fn infer_internal_table_catalog(is_distributed: bool) -> TableCatalog {
let mut builder = TableCatalogBuilder::default();

let key = Field {
Expand All @@ -164,7 +178,7 @@ impl Source {
builder.add_order_column(ordered_col_idx, OrderType::ascending());

builder.build(
if require_dist_key {
if is_distributed {
vec![ordered_col_idx]
} else {
vec![]
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ impl StreamNode for StreamFsFetch {
source_id: source_catalog.id,
source_name: source_catalog.name.clone(),
state_table: Some(
// `StreamFsSource` will do range scan according to assigned vnodes, so we need to set
// the key for distributing data to different vnodes.
generic::Source::infer_internal_table_catalog(true)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ impl StreamNode for StreamSource {
source_id: source_catalog.id,
source_name: source_catalog.name.clone(),
state_table: Some(
// `StreamSource` can write all data to the same vnode
// but it is ok because we only do point get on each key rather than range scan.
generic::Source::infer_internal_table_catalog(false)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
Expand Down
10 changes: 3 additions & 7 deletions src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,9 @@ impl StreamSourceScan {
.expect("source scan should have source cataglog")
}

/// The state is different from but similar to `StreamSource`.
/// Refer to [`generic::Source::infer_internal_table_catalog`] for more details.
pub fn infer_internal_table_catalog() -> TableCatalog {
// note that source's internal table is to store partition_id -> offset mapping and its
// schema is irrelevant to input schema
// On the premise of ensuring that the materialized_source data can be cleaned up, keep the
// state in source.
// Source state doesn't maintain retention_seconds, internal_table_subset function only
// returns retention_seconds so default is used here
let mut builder = TableCatalogBuilder::default();

let key = Field {
Expand All @@ -119,7 +115,7 @@ impl StreamSourceScan {
let ordered_col_idx = builder.add_column(&key);
builder.add_column(&value);
builder.add_order_column(ordered_col_idx, OrderType::ascending());
// read prefix hint is 0. We need to scan all data in the state table.
// Hacky: read prefix hint is 0, because we need to scan all data in the state table.
builder.build(vec![], 0)
}

Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub struct TableCatalogBuilder {
value_indices: Option<Vec<usize>>,
vnode_col_idx: Option<usize>,
column_names: HashMap<String, i32>,
read_prefix_len_hint: usize,
watermark_columns: Option<FixedBitSet>,
dist_key_in_pk: Option<Vec<usize>>,
}
Expand Down Expand Up @@ -136,7 +135,7 @@ impl TableCatalogBuilder {
/// anticipated read prefix pattern (number of fields) for the table, which can be utilized for
/// implementing the table's bloom filter or other storage optimization techniques.
pub fn build(self, distribution_key: Vec<usize>, read_prefix_len_hint: usize) -> TableCatalog {
assert!(self.read_prefix_len_hint <= self.pk.len());
assert!(read_prefix_len_hint <= self.pk.len());
let watermark_columns = match self.watermark_columns {
Some(w) => w,
None => FixedBitSet::with_capacity(self.columns.len()),
Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,19 @@ pub struct Reschedule {
pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
}

/// Replacing an old table with a new one. All actors in the table job will be rebuilt.
/// Used for `ALTER TABLE` ([`Command::ReplaceTable`]) and sink into table ([`Command::CreateStreamingJob`]).
#[derive(Debug, Clone)]
pub struct ReplaceTablePlan {
pub old_table_fragments: TableFragments,
pub new_table_fragments: TableFragments,
pub merge_updates: Vec<MergeUpdate>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
/// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
/// We need to reassign splits for it.
///
/// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
/// backfill_splits.
pub init_split_assignment: SplitAssignment,
}

Expand Down Expand Up @@ -166,6 +173,7 @@ pub enum Command {
definition: String,
ddl_type: DdlType,
create_type: CreateType,
/// This is for create SINK into table.
replace_table: Option<ReplaceTablePlan>,
},
/// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given
Expand Down
5 changes: 2 additions & 3 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ where
W: WatermarkBufferStrategy,
{
/// Create state table from table catalog and store.
///
/// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
pub async fn from_table_catalog(
table_catalog: &Table,
store: S,
Expand Down Expand Up @@ -1434,9 +1436,6 @@ where
/// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
/// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
/// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same
/// `vnode`.
pub async fn iter_with_prefix(
&self,
pk_prefix: impl Row,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct BackfillStateTableHandler<S: StateStore> {
}

impl<S: StateStore> BackfillStateTableHandler<S> {
/// See also [`super::SourceStateTableHandler::from_table_catalog`] for how the state table looks like.
pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
Self {
state_store: StateTable::from_table_catalog(table_catalog, store, None).await,
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@ pub struct SourceStateTableHandler<S: StateStore> {
}

impl<S: StateStore> SourceStateTableHandler<S> {
/// Creates a state table with singleton distribution (only one vnode 0).
///
/// Refer to `infer_internal_table_catalog` in `src/frontend/src/optimizer/plan_node/generic/source.rs` for more details.
pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
Self {
state_table: StateTable::from_table_catalog(table_catalog, store, None).await,
}
}

/// For [`super::FsFetchExecutor`], each actor accesses splits according to the `vnode` computed from `partition_id`.
pub async fn from_table_catalog_with_vnodes(
table_catalog: &PbTable,
store: S,
Expand Down

0 comments on commit 9996e75

Please sign in to comment.