Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): minor refactors #19990

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ c583e2c6c054764249acf484438c7bf7197765f4

# chore: cleanup v2 naming for sql metastore (#18941)
9a6a7f9052d5679165ff57cc01417c742c95351c

# refactor: split catalog to smaller files (#19870)
d6341b74be3f1913cc93993a95c147999df1ff74
24 changes: 14 additions & 10 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ pub struct Reschedule {
}

/// Replacing an old job with a new one. All actors in the job will be rebuilt.
/// Used for `ALTER TABLE` ([`Command::ReplaceStreamJob`]) and sink into table ([`Command::CreateStreamingJob`]).
///
/// Current use cases:
/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
/// will replace a table job's plan.
#[derive(Debug, Clone)]
pub struct ReplaceStreamJobPlan {
pub old_fragments: StreamJobFragments,
Expand All @@ -102,7 +106,7 @@ pub struct ReplaceStreamJobPlan {
pub init_split_assignment: SplitAssignment,
/// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
pub streaming_job: StreamingJob,
/// The temporary dummy table fragments id of new table fragment
/// The temporary dummy job fragments id of new table fragment
pub tmp_id: u32,
}

Expand Down Expand Up @@ -264,7 +268,7 @@ pub enum Command {
///
/// Barriers from the actors to be dropped will STILL be collected.
/// After the barrier is collected, it notifies the local stream manager of compute nodes to
/// drop actors, and then delete the table fragments info from meta store.
/// drop actors, and then delete the job fragments info from meta store.
DropStreamingJobs {
table_fragments_ids: HashSet<TableId>,
actors: Vec<ActorId>,
Expand All @@ -278,7 +282,7 @@ pub enum Command {
/// be collected since the barrier should be passthrough.
///
/// After the barrier is collected, these newly created actors will be marked as `Running`. And
/// it adds the table fragments info to meta store. However, the creating progress will **last
/// it adds the job fragments info to meta store. However, the creating progress will **last
/// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
/// will be set to `Created`.
CreateStreamingJob {
Expand All @@ -302,16 +306,16 @@ pub enum Command {
},

/// `ReplaceStreamJob` command generates a `Update` barrier with the given `merge_updates`. This is
/// essentially switching the downstream of the old table fragments to the new ones, and
/// dropping the old table fragments. Used for table schema change.
/// essentially switching the downstream of the old job fragments to the new ones, and
/// dropping the old job fragments. Used for table schema change.
///
/// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
/// of the Merge executors are changed additionally.
ReplaceStreamJob(ReplaceStreamJobPlan),

/// `SourceSplitAssignment` generates a `Splits` barrier for pushing initialized splits or
/// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
/// changed splits.
SourceSplitAssignment(SplitAssignment),
SourceChangeSplit(SplitAssignment),

/// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
/// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
Expand Down Expand Up @@ -416,7 +420,7 @@ impl Command {
),
Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
Command::MergeSnapshotBackfillStreamingJobs(_) => None,
Command::SourceSplitAssignment(_) => None,
Command::SourceChangeSplit(_) => None,
Command::Throttle(_) => None,
Command::CreateSubscription { .. } => None,
Command::DropSubscription { .. } => None,
Expand Down Expand Up @@ -640,7 +644,7 @@ impl Command {
}
}

Command::SourceSplitAssignment(change) => {
Command::SourceChangeSplit(change) => {
let mut diff = HashMap::new();

for actor_splits in change.values() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl CommandContext {

Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
Command::SourceChangeSplit(split_assignment) => {
barrier_manager_context
.metadata_manager
.update_actor_splits_by_split_assignment(split_assignment)
Expand Down
43 changes: 15 additions & 28 deletions src/meta/src/controller/catalog/drop_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,8 @@ impl CatalogController {
}
});

let (source_fragments, removed_actors, removed_fragments) =
resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?;

let fragment_ids = get_fragment_ids_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?;
let (removed_source_fragments, removed_actors, removed_fragments) =
get_fragments_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?;

// Find affect users with privileges on all this objects.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
Expand Down Expand Up @@ -245,10 +243,10 @@ impl CatalogController {
.notify_frontend(NotificationOperation::Delete, relation_group)
.await;

let fragment_mappings = fragment_ids
.into_iter()
let fragment_mappings = removed_fragments
.iter()
.map(|fragment_id| PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
fragment_id: *fragment_id as _,
mapping: None,
})
.collect();
Expand All @@ -259,11 +257,10 @@ impl CatalogController {
Ok((
ReleaseContext {
database_id,
streaming_job_ids: to_drop_streaming_jobs,
state_table_ids: to_drop_state_table_ids,
source_ids: to_drop_source_ids,
connections: vec![],
source_fragments,
removed_streaming_job_ids: to_drop_streaming_jobs,
removed_state_table_ids: to_drop_state_table_ids,
removed_source_ids: to_drop_source_ids,
removed_source_fragments,
removed_actors,
removed_fragments,
},
Expand Down Expand Up @@ -418,8 +415,8 @@ impl CatalogController {
.all(&txn)
.await?;

let (source_fragments, removed_actors, removed_fragments) =
resolve_source_register_info_for_jobs(&txn, streaming_jobs.clone()).await?;
let (removed_source_fragments, removed_actors, removed_fragments) =
get_fragments_for_jobs(&txn, streaming_jobs.clone()).await?;

let state_table_ids: Vec<TableId> = Table::find()
.select_only()
Expand All @@ -445,15 +442,6 @@ impl CatalogController {
.all(&txn)
.await?;

let connections = Connection::find()
.inner_join(Object)
.filter(object::Column::DatabaseId.eq(Some(database_id)))
.all(&txn)
.await?
.into_iter()
.map(|conn| conn.connection_id)
.collect_vec();

// Find affect users with privileges on the database and the objects in the database.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
.select_only()
Expand Down Expand Up @@ -503,11 +491,10 @@ impl CatalogController {
Ok((
ReleaseContext {
database_id,
streaming_job_ids: streaming_jobs,
state_table_ids,
source_ids,
connections,
source_fragments,
removed_streaming_job_ids: streaming_jobs,
removed_state_table_ids: state_table_ids,
removed_source_ids: source_ids,
removed_source_fragments,
removed_actors,
removed_fragments,
},
Expand Down
31 changes: 10 additions & 21 deletions src/meta/src/controller/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,7 @@ use super::utils::{
rename_relation, rename_relation_refer,
};
use crate::controller::catalog::util::update_internal_tables;
use crate::controller::utils::{
build_relation_group_for_delete, check_connection_name_duplicate,
check_database_name_duplicate, check_function_signature_duplicate,
check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate,
ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id,
extract_external_table_name_from_definition, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::utils::*;
use crate::controller::ObjectModel;
use crate::manager::{
get_referred_connection_ids_from_source, get_referred_secret_ids_from_source, MetaSrvEnv,
Expand Down Expand Up @@ -121,20 +113,17 @@ pub struct CatalogController {
#[derive(Clone, Default)]
pub struct ReleaseContext {
pub(crate) database_id: DatabaseId,
pub(crate) streaming_job_ids: Vec<ObjectId>,
pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
/// Dropped state table list, need to unregister from hummock.
pub(crate) state_table_ids: Vec<TableId>,
/// Dropped source list, need to unregister from source manager.
pub(crate) source_ids: Vec<SourceId>,
/// Dropped connection list, need to delete from vpc endpoints.
#[allow(dead_code)]
pub(crate) connections: Vec<ConnectionId>,

/// Dropped fragments that are fetching data from the target source.
pub(crate) source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
/// Dropped actors.
pub(crate) removed_actors: HashSet<ActorId>,
pub(crate) removed_state_table_ids: Vec<TableId>,

/// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
pub(crate) removed_source_ids: Vec<SourceId>,
/// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
/// need to unregister from source manager.
pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,

pub(crate) removed_actors: HashSet<ActorId>,
pub(crate) removed_fragments: HashSet<FragmentId>,
}

Expand Down
39 changes: 12 additions & 27 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
use risingwave_pb::meta::{
FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup,
};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource};
use risingwave_pb::stream_plan::PbFragmentTypeFlag;
use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject};
use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
use risingwave_sqlparser::ast::Statement as SqlStatement;
Expand Down Expand Up @@ -1059,25 +1058,11 @@ where
Ok(fragment_actors.into_iter().into_group_map())
}

/// Find the external stream source info inside the stream node, if any.
pub fn find_stream_source(stream_node: &PbStreamNode) -> Option<&StreamSource> {
if let Some(NodeBody::Source(source)) = &stream_node.node_body {
if let Some(inner) = &source.source_inner {
return Some(inner);
}
}

for child in &stream_node.input {
if let Some(source) = find_stream_source(child) {
return Some(source);
}
}

None
}

/// Resolve fragment list that are subscribing to sources and actor lists.
pub async fn resolve_source_register_info_for_jobs<C>(
/// For the given streaming jobs, returns
/// - All source fragments
/// - All actors
/// - All fragments
pub async fn get_fragments_for_jobs<C>(
db: &C,
streaming_jobs: Vec<ObjectId>,
) -> MetaResult<(
Expand Down Expand Up @@ -1113,28 +1098,28 @@ where
.all(db)
.await?;

let removed_fragments = fragments
let fragment_ids = fragments
.iter()
.map(|(fragment_id, _, _)| *fragment_id)
.collect();

let mut source_fragment_ids = HashMap::new();
let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
for (fragment_id, mask, stream_node) in fragments {
if mask & PbFragmentTypeFlag::Source as i32 == 0 {
continue;
}
if let Some(source) = find_stream_source(&stream_node.to_protobuf()) {
if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
source_fragment_ids
.entry(source.source_id as SourceId)
.or_insert_with(BTreeSet::new)
.entry(source_id as _)
.or_default()
.insert(fragment_id);
}
}

Ok((
source_fragment_ids,
actors.into_iter().collect(),
removed_fragments,
fragment_ids,
))
}

Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,24 +1252,24 @@ impl DdlController {

let ReleaseContext {
database_id,
streaming_job_ids,
state_table_ids,
source_ids,
source_fragments,
removed_streaming_job_ids,
removed_state_table_ids,
removed_source_ids,
removed_source_fragments,
removed_actors,
removed_fragments,
..
} = release_ctx;

// unregister sources.
self.source_manager
.unregister_sources(source_ids.into_iter().map(|id| id as _).collect())
.unregister_sources(removed_source_ids.into_iter().map(|id| id as _).collect())
.await;

// unregister fragments and actors from source manager.
self.source_manager
.drop_source_fragments(
source_fragments
removed_source_fragments
.into_iter()
.map(|(source_id, fragments)| {
(
Expand All @@ -1287,8 +1287,8 @@ impl DdlController {
.drop_streaming_jobs(
risingwave_common::catalog::DatabaseId::new(database_id as _),
removed_actors.into_iter().map(|id| id as _).collect(),
streaming_job_ids,
state_table_ids,
removed_streaming_job_ids,
removed_state_table_ids,
removed_fragments.iter().map(|id| *id as _).collect(),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,7 @@ impl SourceManager {

for (database_id, split_assignment) in split_assignment {
if !split_assignment.is_empty() {
let command = Command::SourceSplitAssignment(split_assignment);
let command = Command::SourceChangeSplit(split_assignment);
tracing::info!(command = ?command, "pushing down split assignment command");
self.barrier_scheduler
.run_command(database_id, command)
Expand Down
Loading