Skip to content

Commit

Permalink
feat(batch): storage table supports read vnode column as vnode (#14570)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jan 15, 2024
1 parent 1d0fd6b commit 18f1943
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 13 deletions.
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ message StorageTableDesc {
// be used to be compatible with schema changes.
bool versioned = 8;
repeated uint32 stream_key = 9;
optional uint32 vnode_col_idx_in_pk = 10;
}

// Represents a table in external database for CDC scenario
Expand Down
40 changes: 28 additions & 12 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct TableDesc {
/// Column indices for primary keys.
pub stream_key: Vec<usize>,

pub vnode_col_index: Option<usize>,

/// Whether the table source is append-only
pub append_only: bool,

Expand Down Expand Up @@ -84,21 +86,34 @@ impl TableDesc {
.iter()
.map(|v| v.to_protobuf().column_index)
.collect();
let dist_key_in_pk_indices = dist_key_indices
.iter()
.map(|&di| {
let vnode_col_idx_in_pk = self
.vnode_col_index
.and_then(|vnode_col_index| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution key {:?} must be a subset of primary key {:?}",
dist_key_indices, pk_indices
)
})
.position(|&pk_index| pk_index == vnode_col_index as u32)
})
.map(|d| d as u32)
.collect_vec();
.map(|i| i as u32);

let dist_key_in_pk_indices = if vnode_col_idx_in_pk.is_none() {
dist_key_indices
.iter()
.map(|&di| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution key {:?} must be a subset of primary key {:?}",
dist_key_indices, pk_indices
)
})
})
.map(|d| d as u32)
.collect_vec()
} else {
Vec::new()
};
StorageTableDesc {
table_id: self.table_id.into(),
columns: self.columns.iter().map(Into::into).collect(),
Expand All @@ -109,6 +124,7 @@ impl TableDesc {
read_prefix_len_hint: self.read_prefix_len_hint as u32,
versioned: self.versioned,
stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
vnode_col_idx_in_pk,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl TableCatalog {
read_prefix_len_hint: self.read_prefix_len_hint,
watermark_columns: self.watermark_columns.clone(),
versioned: self.version.is_some(),
vnode_col_index: self.vnode_col_index,
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
.iter()
.map(|&k| k as usize)
.collect_vec();
let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, None);
let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
let distribution =
TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);

Self::new_inner(
store,
Expand Down

0 comments on commit 18f1943

Please sign in to comment.