Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): only store CreateStreamingJob command in tracker #17742

Merged
merged 5 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 74 additions & 86 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest;
use thiserror_ext::AsReport;
use tracing::warn;

use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo};
use super::info::{
CommandActorChanges, CommandFragmentChanges, CommandNewFragmentInfo, InflightActorInfo,
};
use super::trace::TracedEpoch;
use crate::barrier::GlobalBarrierManagerContext;
use crate::manager::{DdlType, MetadataManager, StreamingJob, WorkerId};
Expand Down Expand Up @@ -107,7 +109,7 @@ impl ReplaceTablePlan {
fn actor_changes(&self) -> CommandActorChanges {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment {
let fragment_change = CommandFragmentChanges::NewFragment(CommandNewFragmentInfo {
new_actors: fragment
.actors
.iter()
Expand All @@ -130,7 +132,7 @@ impl ReplaceTablePlan {
.map(|table_id| TableId::new(*table_id))
.collect(),
is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask),
};
});
assert!(fragment_changes
.insert(fragment.fragment_id, fragment_change)
.is_none());
Expand All @@ -144,6 +146,54 @@ impl ReplaceTablePlan {
}
}

#[derive(Debug, Clone)]
pub struct CreateStreamingJobCommandInfo {
pub table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
pub upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
pub init_split_assignment: SplitAssignment,
pub definition: String,
pub ddl_type: DdlType,
pub create_type: CreateType,
pub streaming_job: StreamingJob,
pub internal_tables: Vec<Table>,
}

impl CreateStreamingJobCommandInfo {
fn new_fragment_info(&self) -> impl Iterator<Item = (FragmentId, CommandNewFragmentInfo)> + '_ {
self.table_fragments.fragments.values().map(|fragment| {
(
fragment.fragment_id,
CommandNewFragmentInfo {
new_actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id,
self.table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
)
})
.collect(),
table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
.collect(),
is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask),
},
)
})
}
}

/// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
Expand Down Expand Up @@ -187,16 +237,7 @@ pub enum Command {
/// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
/// will be set to `Created`.
CreateStreamingJob {
streaming_job: StreamingJob,
internal_tables: Vec<Table>,
table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: SplitAssignment,
definition: String,
ddl_type: DdlType,
create_type: CreateType,
info: CreateStreamingJobCommandInfo,
/// This is for create SINK into table.
replace_table: Option<ReplaceTablePlan>,
},
Expand Down Expand Up @@ -279,43 +320,13 @@ impl Command {
.collect(),
}),
Command::CreateStreamingJob {
table_fragments,
info,
replace_table,
..
} => {
let fragment_changes = table_fragments
.fragments
.values()
.map(|fragment| {
(
fragment.fragment_id,
CommandFragmentChanges::NewFragment {
new_actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id,
table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
)
})
.collect(),
table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
.collect(),
is_injectable: TableFragments::is_injectable(
fragment.fragment_type_mask,
),
},
)
let fragment_changes = info
.new_fragment_info()
.map(|(fragment_id, info)| {
(fragment_id, CommandFragmentChanges::NewFragment(info))
})
.collect();
let mut changes = CommandActorChanges { fragment_changes };
Expand Down Expand Up @@ -460,10 +471,6 @@ impl CommandContext {
_span: span,
}
}

pub fn metadata_manager(&self) -> &MetadataManager {
&self.barrier_manager_context.metadata_manager
}
}

impl CommandContext {
Expand Down Expand Up @@ -521,11 +528,14 @@ impl CommandContext {
})),

Command::CreateStreamingJob {
table_fragments,
dispatchers,
init_split_assignment: split_assignment,
info:
CreateStreamingJobCommandInfo {
table_fragments,
dispatchers,
init_split_assignment: split_assignment,
..
},
replace_table,
..
} => {
let actor_dispatchers = dispatchers
.iter()
Expand Down Expand Up @@ -818,20 +828,6 @@ impl CommandContext {
}
}

/// For `CreateStreamingJob`, returns the actors of the `StreamScan`, and `StreamValue` nodes. For other commands,
/// returns an empty set.
pub fn actors_to_track(&self) -> HashSet<ActorId> {
match &self.command {
Command::CreateStreamingJob {
table_fragments, ..
} => table_fragments
.tracking_progress_actor_ids()
.into_iter()
.collect(),
_ => Default::default(),
}
}

/// For `CancelStreamingJob`, returns the table id of the target table.
pub fn table_to_cancel(&self) -> Option<TableId> {
match &self.command {
Expand All @@ -840,16 +836,6 @@ impl CommandContext {
}
}

/// For `CreateStreamingJob`, returns the table id of the target table.
pub fn table_to_create(&self) -> Option<TableId> {
match &self.command {
Command::CreateStreamingJob {
table_fragments, ..
} => Some(table_fragments.table_id()),
_ => None,
}
}

/// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands.
async fn clean_up(&self, actors: Vec<ActorId>) -> MetaResult<()> {
self.barrier_manager_context
Expand Down Expand Up @@ -992,14 +978,16 @@ impl CommandContext {
}

Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_root_actors,
init_split_assignment,
definition: _,
info,
replace_table,
..
} => {
let CreateStreamingJobCommandInfo {
table_fragments,
dispatchers,
upstream_root_actors,
init_split_assignment,
..
} = info;
match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
let mut dependent_table_actors =
Expand Down
20 changes: 12 additions & 8 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ use crate::barrier::Command;
use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, InflightFragmentInfo, WorkerId};
use crate::model::{ActorId, FragmentId};

#[derive(Debug, Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some docs for this? Is it for any new fragments associated with the barrier command?

And what is the is_injectable field for, when is it true/false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_injectable means whether we need to inject the barrier directly to the actors of the fragment. The struct is just extracted from the CommandFragmentChanges::NewFragment, and no logic is changed in this PR.

These fragments are the most upstream fragments, such as source, now.

pub(crate) struct CommandNewFragmentInfo {
pub new_actors: HashMap<ActorId, WorkerId>,
pub table_ids: HashSet<TableId>,
pub is_injectable: bool,
}

#[derive(Debug, Clone)]
pub(crate) enum CommandFragmentChanges {
NewFragment {
new_actors: HashMap<ActorId, WorkerId>,
table_ids: HashSet<TableId>,
is_injectable: bool,
},
NewFragment(CommandNewFragmentInfo),
Reschedule {
new_actors: HashMap<ActorId, WorkerId>,
to_remove: HashSet<ActorId>,
Expand Down Expand Up @@ -149,11 +152,12 @@ impl InflightActorInfo {
let mut to_add = HashMap::new();
for (fragment_id, change) in fragment_changes {
match change {
CommandFragmentChanges::NewFragment {
CommandFragmentChanges::NewFragment(CommandNewFragmentInfo {
new_actors,
table_ids,
is_injectable,
} => {
..
}) => {
for (actor_id, node_id) in &new_actors {
assert!(to_add
.insert(*actor_id, (*node_id, is_injectable))
Expand Down Expand Up @@ -232,7 +236,7 @@ impl InflightActorInfo {
let mut all_to_remove = HashSet::new();
for (fragment_id, changes) in fragment_changes.fragment_changes {
match changes {
CommandFragmentChanges::NewFragment { .. } => {}
CommandFragmentChanges::NewFragment(_) => {}
CommandFragmentChanges::Reschedule { to_remove, .. } => {
let info = self
.fragment_infos
Expand Down
Loading
Loading