Skip to content

Commit

Permalink
feat: support inject and collect barrier from partial graph (#17758)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jul 30, 2024
1 parent b8e08c7 commit 2fb78f0
Show file tree
Hide file tree
Showing 19 changed files with 621 additions and 401 deletions.
9 changes: 9 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ message InjectBarrierRequest {
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
}

message BarrierCompleteResponse {
Expand All @@ -80,6 +81,9 @@ message BarrierCompleteResponse {
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
uint32 partial_graph_id = 8;
// prev_epoch of barrier
uint64 epoch = 9;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
Expand All @@ -100,9 +104,14 @@ message StreamingControlStreamRequest {
uint64 prev_epoch = 2;
}

message RemovePartialGraphRequest {
repeated uint32 partial_graph_ids = 1;
}

oneof request {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
RemovePartialGraphRequest remove_partial_graph = 3;
}
}

Expand Down
20 changes: 9 additions & 11 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest;
use thiserror_ext::AsReport;
use tracing::warn;

use super::info::{
CommandActorChanges, CommandFragmentChanges, CommandNewFragmentInfo, InflightActorInfo,
};
use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo};
use super::trace::TracedEpoch;
use crate::barrier::GlobalBarrierManagerContext;
use crate::manager::{DdlType, MetadataManager, StreamingJob, WorkerId};
use crate::manager::{DdlType, InflightFragmentInfo, MetadataManager, StreamingJob, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;
Expand Down Expand Up @@ -109,8 +107,8 @@ impl ReplaceTablePlan {
fn actor_changes(&self) -> CommandActorChanges {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment(CommandNewFragmentInfo {
new_actors: fragment
let fragment_change = CommandFragmentChanges::NewFragment(InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
Expand All @@ -124,7 +122,7 @@ impl ReplaceTablePlan {
)
})
.collect(),
table_ids: fragment
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
Expand Down Expand Up @@ -159,12 +157,12 @@ pub struct CreateStreamingJobCommandInfo {
}

impl CreateStreamingJobCommandInfo {
fn new_fragment_info(&self) -> impl Iterator<Item = (FragmentId, CommandNewFragmentInfo)> + '_ {
fn new_fragment_info(&self) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
self.table_fragments.fragments.values().map(|fragment| {
(
fragment.fragment_id,
CommandNewFragmentInfo {
new_actors: fragment
InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
Expand All @@ -178,7 +176,7 @@ impl CreateStreamingJobCommandInfo {
)
})
.collect(),
table_ids: fragment
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
Expand Down
124 changes: 44 additions & 80 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,9 @@ use crate::barrier::Command;
use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, InflightFragmentInfo, WorkerId};
use crate::model::{ActorId, FragmentId};

#[derive(Debug, Clone)]
pub(crate) struct CommandNewFragmentInfo {
pub new_actors: HashMap<ActorId, WorkerId>,
pub table_ids: HashSet<TableId>,
pub is_injectable: bool,
}

#[derive(Debug, Clone)]
pub(crate) enum CommandFragmentChanges {
NewFragment(CommandNewFragmentInfo),
NewFragment(InflightFragmentInfo),
Reschedule {
new_actors: HashMap<ActorId, WorkerId>,
to_remove: HashSet<ActorId>,
Expand Down Expand Up @@ -65,9 +58,6 @@ pub struct InflightActorInfo {
/// `node_id` => actors
pub actor_map: HashMap<WorkerId, HashSet<ActorId>>,

/// `node_id` => barrier inject actors
pub actor_map_to_send: HashMap<WorkerId, HashSet<ActorId>>,

/// `actor_id` => `WorkerId`
pub actor_location_map: HashMap<ActorId, WorkerId>,

Expand Down Expand Up @@ -96,20 +86,6 @@ impl InflightActorInfo {
map
};

let actor_map_to_send = {
let mut map: HashMap<_, HashSet<_>> = HashMap::new();
for info in actor_infos
.fragment_infos
.values()
.filter(|info| info.is_injectable)
{
for (actor_id, worker_id) in &info.actors {
map.entry(*worker_id).or_default().insert(*actor_id);
}
}
map
};

let actor_location_map = actor_infos
.fragment_infos
.values()
Expand All @@ -124,7 +100,6 @@ impl InflightActorInfo {
Self {
node_map,
actor_map,
actor_map_to_send,
actor_location_map,
mv_depended_subscriptions,
fragment_infos: actor_infos.fragment_infos,
Expand Down Expand Up @@ -167,28 +142,11 @@ impl InflightActorInfo {
let mut to_add = HashMap::new();
for (fragment_id, change) in fragment_changes {
match change {
CommandFragmentChanges::NewFragment(CommandNewFragmentInfo {
new_actors,
table_ids,
is_injectable,
..
}) => {
for (actor_id, node_id) in &new_actors {
assert!(to_add
.insert(*actor_id, (*node_id, is_injectable))
.is_none());
CommandFragmentChanges::NewFragment(info) => {
for (actor_id, node_id) in &info.actors {
assert!(to_add.insert(*actor_id, *node_id).is_none());
}
assert!(self
.fragment_infos
.insert(
fragment_id,
InflightFragmentInfo {
actors: new_actors,
state_table_ids: table_ids,
is_injectable,
}
)
.is_none());
assert!(self.fragment_infos.insert(fragment_id, info).is_none());
}
CommandFragmentChanges::Reschedule { new_actors, .. } => {
let info = self
Expand All @@ -197,30 +155,19 @@ impl InflightActorInfo {
.expect("should exist");
let actors = &mut info.actors;
for (actor_id, node_id) in new_actors {
assert!(to_add
.insert(actor_id, (node_id, info.is_injectable))
.is_none());
assert!(to_add.insert(actor_id, node_id).is_none());
assert!(actors.insert(actor_id, node_id).is_none());
}
}
CommandFragmentChanges::RemoveFragment => {}
}
}
for (actor_id, (node_id, is_injectable)) in to_add {
for (actor_id, node_id) in to_add {
assert!(self.node_map.contains_key(&node_id));
assert!(
self.actor_map.entry(node_id).or_default().insert(actor_id),
"duplicate actor in command changes"
);
if is_injectable {
assert!(
self.actor_map_to_send
.entry(node_id)
.or_default()
.insert(actor_id),
"duplicate actor in command changes"
);
}
assert!(
self.actor_location_map.insert(actor_id, node_id).is_none(),
"duplicate actor in command changes"
Expand Down Expand Up @@ -280,13 +227,8 @@ impl InflightActorInfo {
.expect("actor not found");
let actor_ids = self.actor_map.get_mut(&node_id).expect("node not found");
assert!(actor_ids.remove(&actor_id), "actor not found");
self.actor_map_to_send
.get_mut(&node_id)
.map(|actor_ids| actor_ids.remove(&actor_id));
}
self.actor_map.retain(|_, actor_ids| !actor_ids.is_empty());
self.actor_map_to_send
.retain(|_, actor_ids| !actor_ids.is_empty());
}
if let Command::DropSubscription {
subscription_id,
Expand All @@ -310,27 +252,49 @@ impl InflightActorInfo {
}

/// Returns actor list to collect in the target worker node.
pub fn actor_ids_to_collect(&self, node_id: &WorkerId) -> impl Iterator<Item = ActorId> {
self.actor_map
.get(node_id)
.cloned()
.unwrap_or_default()
.into_iter()
pub fn actor_ids_to_collect(
fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
node_id: WorkerId,
) -> impl Iterator<Item = ActorId> + '_ {
fragment_infos.values().flat_map(move |info| {
info.actors
.iter()
.filter_map(move |(actor_id, actor_node_id)| {
if *actor_node_id == node_id {
Some(*actor_id)
} else {
None
}
})
})
}

/// Returns actor list to send in the target worker node.
pub fn actor_ids_to_send(&self, node_id: &WorkerId) -> impl Iterator<Item = ActorId> {
self.actor_map_to_send
.get(node_id)
.cloned()
.unwrap_or_default()
.into_iter()
pub fn actor_ids_to_send(
fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
node_id: WorkerId,
) -> impl Iterator<Item = ActorId> + '_ {
fragment_infos
.values()
.filter(|info| info.is_injectable)
.flat_map(move |info| {
info.actors
.iter()
.filter_map(move |(actor_id, actor_node_id)| {
if *actor_node_id == node_id {
Some(*actor_id)
} else {
None
}
})
})
}

pub fn existing_table_ids(&self) -> HashSet<TableId> {
self.fragment_infos
pub fn existing_table_ids(
fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
) -> impl Iterator<Item = TableId> + '_ {
fragment_infos
.values()
.flat_map(|info| info.state_table_ids.iter().cloned())
.collect()
}
}
Loading

0 comments on commit 2fb78f0

Please sign in to comment.