Skip to content

Commit

Permalink
feat(sql-backend): support table replace for sql-backend (#14415)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored and Little-Wallace committed Jan 20, 2024
1 parent f45a80d commit f617c88
Show file tree
Hide file tree
Showing 16 changed files with 597 additions and 109 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
pub const NON_RESERVED_USER_ID: i32 = 11;
pub const NON_RESERVED_SYS_CATALOG_ID: i32 = 1001;

pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;

pub const SYSTEM_SCHEMAS: [&str; 3] = [
PG_CATALOG_SCHEMA_NAME,
INFORMATION_SCHEMA_SCHEMA_NAME,
Expand Down Expand Up @@ -164,7 +166,7 @@ impl DatabaseId {

pub fn placeholder() -> Self {
DatabaseId {
database_id: u32::MAX - 1,
database_id: OBJECT_ID_PLACEHOLDER,
}
}
}
Expand Down Expand Up @@ -200,7 +202,7 @@ impl SchemaId {

pub fn placeholder() -> Self {
SchemaId {
schema_id: u32::MAX - 1,
schema_id: OBJECT_ID_PLACEHOLDER,
}
}
}
Expand Down Expand Up @@ -237,7 +239,7 @@ impl TableId {
/// Sometimes the id field is filled later, we use this value for better debugging.
pub const fn placeholder() -> Self {
TableId {
table_id: u32::MAX - 1,
table_id: OBJECT_ID_PLACEHOLDER,
}
}

Expand Down Expand Up @@ -328,7 +330,7 @@ impl IndexId {
/// Sometimes the id field is filled later, we use this value for better debugging.
pub const fn placeholder() -> Self {
IndexId {
index_id: u32::MAX - 1,
index_id: OBJECT_ID_PLACEHOLDER,
}
}

Expand Down Expand Up @@ -357,7 +359,7 @@ impl FunctionId {
}

pub const fn placeholder() -> Self {
FunctionId(u32::MAX - 1)
FunctionId(OBJECT_ID_PLACEHOLDER)
}

pub fn function_id(&self) -> u32 {
Expand Down Expand Up @@ -396,7 +398,7 @@ impl UserId {

pub const fn placeholder() -> Self {
UserId {
user_id: u32::MAX - 1,
user_id: OBJECT_ID_PLACEHOLDER,
}
}
}
Expand Down Expand Up @@ -428,7 +430,7 @@ impl ConnectionId {
}

pub const fn placeholder() -> Self {
ConnectionId(u32::MAX - 1)
ConnectionId(OBJECT_ID_PLACEHOLDER)
}

pub fn connection_id(&self) -> u32 {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
Expand All @@ -43,7 +44,7 @@ impl SinkId {
/// Sometimes the id field is filled later, we use this value for better debugging.
pub const fn placeholder() -> Self {
SinkId {
sink_id: u32::MAX - 1,
sink_id: OBJECT_ID_PLACEHOLDER,
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::assert_matches::assert_matches;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, TableId};
use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, TableId, OBJECT_ID_PLACEHOLDER};
use risingwave_common::error::Result;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
Expand All @@ -29,7 +29,6 @@ use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion};
use crate::catalog::FragmentId;
use crate::optimizer::plan_node::derive::derive_pk;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanRef;
Expand Down Expand Up @@ -235,8 +234,7 @@ impl StreamMaterialize {
append_only,
owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
properties,
// TODO(zehua): replace it with FragmentId::placeholder()
fragment_id: FragmentId::MAX - 1,
fragment_id: OBJECT_ID_PLACEHOLDER,
dml_fragment_id: None,
vnode_col_index: None,
row_id_index,
Expand Down
7 changes: 3 additions & 4 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ConflictBehavior, Field, FieldDisplay, Schema,
ColumnCatalog, ColumnDesc, ConflictBehavior, Field, FieldDisplay, Schema, OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use crate::catalog::table_catalog::{CreateType, TableType};
use crate::catalog::{ColumnId, FragmentId, TableCatalog, TableId};
use crate::catalog::{ColumnId, TableCatalog, TableId};
use crate::optimizer::property::Cardinality;
use crate::utils::WithOptions;

Expand Down Expand Up @@ -160,8 +160,7 @@ impl TableCatalogBuilder {
append_only: false,
owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
properties: self.properties,
// TODO(zehua): replace it with FragmentId::placeholder()
fragment_id: FragmentId::MAX - 1,
fragment_id: OBJECT_ID_PLACEHOLDER,
dml_fragment_id: None,
vnode_col_index: self.vnode_col_idx,
row_id_index: None,
Expand Down
1 change: 1 addition & 0 deletions src/meta/model_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
risingwave_common = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
sea-orm = { version = "0.12.0", features = [
Expand Down
1 change: 0 additions & 1 deletion src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ pub type CompactionTaskId = i64;
pub type HummockSstableObjectId = i64;

pub type FragmentId = i32;

pub type ActorId = i32;

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
Expand Down
27 changes: 23 additions & 4 deletions src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::table::PbTableType;
use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType};
use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable};
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -187,10 +188,28 @@ impl From<PbTable> for ActiveModel {
let table_type = pb_table.table_type();
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();

let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
NotSet
} else {
Set(Some(pb_table.fragment_id as FragmentId))
};
let dml_fragment_id = pb_table
.dml_fragment_id
.map(|x| Set(Some(x as FragmentId)))
.unwrap_or_default();
let optional_associated_source_id =
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
pb_table.optional_associated_source_id
{
Set(Some(src_id as SourceId))
} else {
NotSet
};

Self {
table_id: Set(pb_table.id as _),
name: Set(pb_table.name),
optional_associated_source_id: NotSet,
optional_associated_source_id,
table_type: Set(table_type.into()),
belongs_to_job_id: Set(None),
columns: Set(pb_table.columns.into()),
Expand All @@ -199,7 +218,7 @@ impl From<PbTable> for ActiveModel {
stream_key: Set(pb_table.stream_key.into()),
append_only: Set(pb_table.append_only),
properties: Set(pb_table.properties.into()),
fragment_id: NotSet,
fragment_id,
vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
value_indices: Set(pb_table.value_indices.into()),
Expand All @@ -208,7 +227,7 @@ impl From<PbTable> for ActiveModel {
read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
watermark_indices: Set(pb_table.watermark_indices.into()),
dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
dml_fragment_id: NotSet,
dml_fragment_id,
cardinality: Set(pb_table.cardinality.map(|x| x.into())),
cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
description: Set(pb_table.description),
Expand Down
53 changes: 32 additions & 21 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,11 +777,8 @@ impl CommandContext {
Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement config change funcs in v2");
};
mgr.fragment_manager
self.barrier_manager_context
.metadata_manager
.update_actor_splits_by_split_assignment(split_assignment)
.await?;
self.barrier_manager_context
Expand Down Expand Up @@ -981,26 +978,40 @@ impl CommandContext {
dispatchers,
init_split_assignment,
}) => {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement replace funcs in v2");
};
let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id()));

// Tell compute nodes to drop actors.
let node_actors = mgr.fragment_manager.table_node_actors(&table_ids).await?;
let node_actors = self
.barrier_manager_context
.metadata_manager
.get_worker_actor_ids(table_ids)
.await?;
self.clean_up(node_actors).await?;

// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment.clone(),
)
.await?;
match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment.clone(),
)
.await?;
}
MetadataManager::V2(mgr) => {
// Update actors and actor_dispatchers for new table fragments.
mgr.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}
}

// Apply the split changes in source manager.
self.barrier_manager_context
Expand Down
47 changes: 45 additions & 2 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::controller::utils::{
};
use crate::manager::{ActorInfos, LocalNotification};
use crate::stream::SplitAssignment;
use crate::MetaResult;
use crate::{MetaError, MetaResult};

impl CatalogControllerInner {
/// List all fragment vnode mapping info for all CREATED streaming jobs.
Expand Down Expand Up @@ -985,15 +985,58 @@ impl CatalogController {
Ok(node_actors)
}

pub async fn get_worker_actor_ids(
&self,
job_ids: Vec<ObjectId>,
) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
let inner = self.inner.read().await;
let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?;
let actor_pu: Vec<(ActorId, i32)> = Actor::find()
.select_only()
.columns([actor::Column::ActorId, actor::Column::ParallelUnitId])
.join(JoinType::InnerJoin, actor::Relation::Fragment.def())
.filter(fragment::Column::JobId.is_in(job_ids))
.into_tuple()
.all(&inner.db)
.await?;

let mut worker_actors = BTreeMap::new();
for (actor_id, pu_id) in actor_pu {
let worker_id = parallel_units_map
.get(&(pu_id as _))
.unwrap()
.worker_node_id as WorkerId;
worker_actors
.entry(worker_id)
.or_insert_with(Vec::new)
.push(actor_id);
}

Ok(worker_actors)
}

pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;
for assignments in split_assignment.values() {
for (actor_id, splits) in assignments {
let actor_splits: Option<ConnectorSplits> = Actor::find_by_id(*actor_id as ActorId)
.select_only()
.column(actor::Column::Splits)
.into_tuple()
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("actor_id", actor_id))?;

let mut actor_splits = actor_splits
.map(|splits| splits.0.splits)
.unwrap_or_default();
actor_splits.extend(splits.iter().map(Into::into));

Actor::update(actor::ActiveModel {
actor_id: Set(*actor_id as _),
splits: Set(Some(ConnectorSplits(PbConnectorSplits {
splits: splits.iter().map(Into::into).collect(),
splits: actor_splits,
}))),
..Default::default()
})
Expand Down
Loading

0 comments on commit f617c88

Please sign in to comment.