Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): maintain snapshot of running actors instead of resolving it every time for barrier #14517

Merged
merged 11 commits into from
Jan 16, 2024
682 changes: 346 additions & 336 deletions src/meta/src/barrier/command.rs

Large diffs are not rendered by default.

118 changes: 100 additions & 18 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,139 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use risingwave_pb::common::WorkerNode;
use risingwave_pb::common::PbWorkerNode;

use crate::manager::{ActorInfos, WorkerId};
use crate::model::ActorId;

/// [`BarrierActorInfo`] resolves the actor info read from meta store for
#[derive(Debug, Clone)]
pub struct ActorDesc {
pub id: ActorId,
pub node_id: WorkerId,
pub is_injectable: bool,
}

#[derive(Debug, Clone)]
pub enum CommandActorChanges {
Actor {
to_add: Vec<ActorDesc>,
to_remove: HashSet<ActorId>,
},
None,
}

/// [`InflightActorInfo`] resolves the actor info read from meta store for
/// [`crate::barrier::GlobalBarrierManager`].
pub struct BarrierActorInfo {
#[derive(Default, Clone)]
pub struct InflightActorInfo {
/// node_id => node
pub node_map: HashMap<WorkerId, WorkerNode>,
pub node_map: HashMap<WorkerId, PbWorkerNode>,

/// node_id => actors
pub actor_map: HashMap<WorkerId, Vec<ActorId>>,
pub actor_map: HashMap<WorkerId, HashSet<ActorId>>,

/// node_id => barrier inject actors
pub actor_map_to_send: HashMap<WorkerId, Vec<ActorId>>,
pub actor_map_to_send: HashMap<WorkerId, HashSet<ActorId>>,

/// actor_id => WorkerId
pub actor_location_map: HashMap<ActorId, WorkerId>,
}

impl BarrierActorInfo {
// TODO: we may resolve this info as graph updating, instead of doing it every time we want to
// send a barrier
impl InflightActorInfo {
pub fn resolve(
all_nodes: impl IntoIterator<Item = WorkerNode>,
all_nodes: impl IntoIterator<Item = PbWorkerNode>,
actor_infos: ActorInfos,
) -> Self {
let node_map = all_nodes
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();

let actor_map = actor_infos
.actor_maps
.into_iter()
.map(|(node_id, actor_ids)| (node_id, actor_ids.into_iter().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();

let actor_map_to_send = actor_infos
.barrier_inject_actor_maps
.into_iter()
.map(|(node_id, actor_ids)| (node_id, actor_ids.into_iter().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();

let actor_location_map = actor_map
.iter()
.flat_map(|(node_id, actor_ids)| actor_ids.iter().map(|actor_id| (*actor_id, *node_id)))
.collect::<HashMap<_, _>>();

Self {
node_map,
actor_map: actor_infos.actor_maps,
actor_map_to_send: actor_infos.barrier_inject_actor_maps,
actor_map,
actor_map_to_send,
actor_location_map,
}
}

pub fn resolve_worker_nodes(&mut self, all_nodes: impl IntoIterator<Item = PbWorkerNode>) {
self.node_map = all_nodes
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();
}

pub fn pre_apply(&mut self, changes: &CommandActorChanges) {
if let CommandActorChanges::Actor { to_add, .. } = changes {
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
for actor_desc in to_add {
assert!(self.node_map.contains_key(&actor_desc.node_id));
self.actor_map
.entry(actor_desc.node_id)
.or_default()
.insert(actor_desc.id);
if actor_desc.is_injectable {
self.actor_map_to_send
.entry(actor_desc.node_id)
.or_default()
.insert(actor_desc.id);
}
self.actor_location_map
.insert(actor_desc.id, actor_desc.node_id);
}
};
}

pub fn post_apply(&mut self, changes: &CommandActorChanges) {
if let CommandActorChanges::Actor { to_remove, .. } = changes {
for actor_id in to_remove {
if let Some(node_id) = self.actor_location_map.remove(actor_id) {
self.actor_map
.get_mut(&node_id)
.map(|actor_ids| actor_ids.remove(actor_id));
self.actor_map_to_send
.get_mut(&node_id)
.map(|actor_ids| actor_ids.remove(actor_id));
}
}
self.actor_map.retain(|_, actor_ids| !actor_ids.is_empty());
self.actor_map_to_send
.retain(|_, actor_ids| !actor_ids.is_empty());
}
}

// TODO: should only collect from reachable actors, for mv on mv
pub fn actor_ids_to_collect(&self, node_id: &WorkerId) -> impl Iterator<Item = ActorId> {
self.actor_map
.get(node_id)
.map(|actor_ids| actor_ids.clone().into_iter())
.unwrap_or_else(|| vec![].into_iter())
.cloned()
.unwrap_or_default()
.into_iter()
}

pub fn actor_ids_to_send(&self, node_id: &WorkerId) -> impl Iterator<Item = ActorId> {
self.actor_map_to_send
.get(node_id)
.map(|actor_ids| actor_ids.clone().into_iter())
.unwrap_or_else(|| vec![].into_iter())
.cloned()
.unwrap_or_default()
.into_iter()
}
}
Loading
Loading