Skip to content

Commit

Permalink
fix(sql-backend): fix some corner cases of rename, alter owner, drop …
Browse files Browse the repository at this point in the history
…and scale in sql backend (#14933)
  • Loading branch information
yezizp2012 authored Feb 1, 2024
1 parent 8481ea7 commit 4500a08
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 72 deletions.
1 change: 1 addition & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub async fn rpc_serve(
Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
}
};
election_client.init().await?;

rpc_serve_with_store(
None,
Expand Down
14 changes: 11 additions & 3 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use std::collections::{HashMap, HashSet};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::model;
use risingwave_meta::model::ActorId;
use risingwave_meta::stream::ThrottleConfig;
use risingwave_meta_model_v2::SourceId;
use risingwave_meta_model_v2::{SourceId, StreamingParallelism};
use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
use risingwave_pb::meta::list_table_fragments_response::{
ActorInfo, FragmentInfo, TableFragmentInfo,
Expand Down Expand Up @@ -280,11 +281,18 @@ impl StreamManagerService for StreamServiceImpl {
let job_states = mgr.catalog_controller.list_streaming_job_states().await?;
job_states
.into_iter()
.map(|(table_id, state)| {
.map(|(table_id, state, parallelism)| {
let parallelism = match parallelism {
StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
StreamingParallelism::Fixed(n) => {
model::TableParallelism::Fixed(n as _)
}
};

list_table_fragment_states_response::TableFragmentState {
table_id: table_id as _,
state: PbState::from(state) as _,
parallelism: None, // TODO: support parallelism.
parallelism: Some(parallelism.into()),
}
})
.collect_vec()
Expand Down
134 changes: 75 additions & 59 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ use crate::controller::utils::{
check_connection_name_duplicate, check_database_name_duplicate,
check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate,
ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id,
get_fragment_mappings, get_referring_objects, get_referring_objects_cascade,
get_user_privilege, list_user_info_by_ids, resolve_source_register_info_for_jobs,
PartialObject,
get_fragment_mappings, get_fragment_mappings_by_jobs, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
Expand Down Expand Up @@ -267,6 +267,7 @@ impl CatalogController {
.into_tuple()
.all(&txn)
.await?;
let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs).await?;

// The schema and objects in the database will be delete cascade.
let res = Object::delete_by_id(database_id).exec(&txn).await?;
Expand All @@ -287,6 +288,8 @@ impl CatalogController {
}),
)
.await;
self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
.await;
Ok((
ReleaseContext {
state_table_ids,
Expand Down Expand Up @@ -1014,18 +1017,6 @@ impl CatalogController {
.await?;
}

if !index_ids.is_empty() {
let index_objs = Index::find()
.find_also_related(Object)
.filter(index::Column::IndexId.is_in(index_ids))
.all(&txn)
.await?;
for (index, index_obj) in index_objs {
relations.push(PbRelationInfo::Index(
ObjectModel(index, index_obj.unwrap()).into(),
));
}
}
if !table_ids.is_empty() {
let table_objs = Table::find()
.find_also_related(Object)
Expand All @@ -1038,6 +1029,19 @@ impl CatalogController {
));
}
}
// FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
if !index_ids.is_empty() {
let index_objs = Index::find()
.find_also_related(Object)
.filter(index::Column::IndexId.is_in(index_ids))
.all(&txn)
.await?;
for (index, index_obj) in index_objs {
relations.push(PbRelationInfo::Index(
ObjectModel(index, index_obj.unwrap()).into(),
));
}
}
}
ObjectType::Source => {
let source = Source::find_by_id(object_id)
Expand Down Expand Up @@ -1426,7 +1430,16 @@ impl CatalogController {
DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
DropMode::Restrict => {
ensure_object_not_refer(object_type, object_id, &txn).await?;
vec![]
if obj.obj_type == ObjectType::Table {
let indexes = get_referring_objects(object_id, &txn).await?;
assert!(
indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
"only index could be dropped in restrict mode"
);
indexes
} else {
vec![]
}
}
};
assert!(
Expand All @@ -1440,11 +1453,15 @@ impl CatalogController {

let to_drop_table_ids = to_drop_objects
.iter()
.filter(|obj| obj.obj_type == ObjectType::Table)
.filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
.map(|obj| obj.oid);
let mut to_drop_streaming_jobs = to_drop_objects
.iter()
.filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
.filter(|obj| {
obj.obj_type == ObjectType::Table
|| obj.obj_type == ObjectType::Sink
|| obj.obj_type == ObjectType::Index
})
.map(|obj| obj.oid)
.collect_vec();

Expand Down Expand Up @@ -1479,11 +1496,6 @@ impl CatalogController {
}

let mut to_drop_state_table_ids = to_drop_table_ids.clone().collect_vec();
let to_drop_index_ids = to_drop_objects
.iter()
.filter(|obj| obj.obj_type == ObjectType::Index)
.map(|obj| obj.oid)
.collect_vec();

// Add associated sources.
let mut to_drop_source_ids: Vec<SourceId> = Table::find()
Expand All @@ -1507,16 +1519,6 @@ impl CatalogController {
to_drop_source_ids.push(object_id);
}

// add internal tables.
let index_table_ids: Vec<TableId> = Index::find()
.select_only()
.column(index::Column::IndexTableId)
.filter(index::Column::IndexId.is_in(to_drop_index_ids))
.into_tuple()
.all(&txn)
.await?;
to_drop_streaming_jobs.extend(index_table_ids);

if !to_drop_streaming_jobs.is_empty() {
let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
.select_only()
Expand All @@ -1537,7 +1539,8 @@ impl CatalogController {
}

let (source_fragments, removed_actors) =
resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs).await?;
resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?;
let fragment_mappings = get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs).await?;

// Find affect users with privileges on all this objects.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
Expand Down Expand Up @@ -1566,58 +1569,70 @@ impl CatalogController {

// notify about them.
self.notify_users_update(user_infos).await;
let relations = to_drop_objects
.into_iter()
.map(|obj| match obj.obj_type {
ObjectType::Table => PbRelation {
let mut relations = vec![];
for obj in to_drop_objects {
match obj.obj_type {
ObjectType::Table => relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Table(PbTable {
id: obj.oid as _,
schema_id: obj.schema_id.unwrap() as _,
database_id: obj.database_id.unwrap() as _,
..Default::default()
})),
},
ObjectType::Source => PbRelation {
}),
ObjectType::Source => relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Source(PbSource {
id: obj.oid as _,
schema_id: obj.schema_id.unwrap() as _,
database_id: obj.database_id.unwrap() as _,
..Default::default()
})),
},
ObjectType::Sink => PbRelation {
}),
ObjectType::Sink => relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Sink(PbSink {
id: obj.oid as _,
schema_id: obj.schema_id.unwrap() as _,
database_id: obj.database_id.unwrap() as _,
..Default::default()
})),
},
ObjectType::View => PbRelation {
}),
ObjectType::View => relations.push(PbRelation {
relation_info: Some(PbRelationInfo::View(PbView {
id: obj.oid as _,
schema_id: obj.schema_id.unwrap() as _,
database_id: obj.database_id.unwrap() as _,
..Default::default()
})),
},
ObjectType::Index => PbRelation {
relation_info: Some(PbRelationInfo::Index(PbIndex {
id: obj.oid as _,
schema_id: obj.schema_id.unwrap() as _,
database_id: obj.database_id.unwrap() as _,
..Default::default()
})),
},
}),
ObjectType::Index => {
relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Index(PbIndex {
id: obj.oid as _,
schema_id: obj.schema_id.unwrap() as _,
database_id: obj.database_id.unwrap() as _,
..Default::default()
})),
});
relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Table(PbTable {
id: obj.oid as _,
schema_id: obj.schema_id.unwrap() as _,
database_id: obj.database_id.unwrap() as _,
..Default::default()
})),
});
}
_ => unreachable!("only relations will be dropped."),
})
.collect_vec();
}
}
let version = self
.notify_frontend(
NotificationOperation::Delete,
NotificationInfo::RelationGroup(PbRelationGroup { relations }),
)
.await;
self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
.await;

Ok((
ReleaseContext {
Expand Down Expand Up @@ -1730,9 +1745,12 @@ impl CatalogController {
.one(&txn)
.await?
.unwrap();
let obj = obj.unwrap();
let old_name = relation.name.clone();
relation.name = object_name.into();
relation.definition = alter_relation_rename(&relation.definition, object_name);
if obj.obj_type != ObjectType::Index {
relation.definition = alter_relation_rename(&relation.definition, object_name);
}
let active_model = $table::ActiveModel {
$identity: Set(relation.$identity),
name: Set(object_name.into()),
Expand All @@ -1741,9 +1759,7 @@ impl CatalogController {
};
active_model.update(&txn).await?;
to_update_relations.push(PbRelation {
relation_info: Some(PbRelationInfo::$entity(
ObjectModel(relation, obj.unwrap()).into(),
)),
relation_info: Some(PbRelationInfo::$entity(ObjectModel(relation, obj).into())),
});
old_name
}};
Expand Down
9 changes: 6 additions & 3 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink,
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamNode, TableId, VnodeBitmap, WorkerId,
StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::meta::subscribe_response::{
Expand Down Expand Up @@ -634,13 +634,16 @@ impl CatalogController {
)
}

pub async fn list_streaming_job_states(&self) -> MetaResult<Vec<(ObjectId, JobStatus)>> {
pub async fn list_streaming_job_states(
&self,
) -> MetaResult<Vec<(ObjectId, JobStatus, StreamingParallelism)>> {
let inner = self.inner.read().await;
let job_states: Vec<(ObjectId, JobStatus)> = StreamingJob::find()
let job_states: Vec<(ObjectId, JobStatus, StreamingParallelism)> = StreamingJob::find()
.select_only()
.columns([
streaming_job::Column::JobId,
streaming_job::Column::JobStatus,
streaming_job::Column::Parallelism,
])
.into_tuple()
.all(&inner.db)
Expand Down
Loading

0 comments on commit 4500a08

Please sign in to comment.