Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): maintain snapshot of running actors instead of resolving it every time for barrier #14517

Merged
merged 11 commits into from
Jan 16, 2024
679 changes: 343 additions & 336 deletions src/meta/src/barrier/command.rs

Large diffs are not rendered by default.

134 changes: 116 additions & 18 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,155 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use risingwave_pb::common::WorkerNode;
use risingwave_pb::common::PbWorkerNode;

use crate::manager::{ActorInfos, WorkerId};
use crate::model::ActorId;

/// [`BarrierActorInfo`] resolves the actor info read from meta store for
#[derive(Debug, Clone)]
pub struct ActorDesc {
pub id: ActorId,
pub node_id: WorkerId,
pub is_injectable: bool,
}

#[derive(Debug, Clone)]
pub struct CommandActorChanges {
pub(crate) to_add: Vec<ActorDesc>,
pub(crate) to_remove: HashSet<ActorId>,
}

/// [`InflightActorInfo`] resolves the actor info read from meta store for
/// [`crate::barrier::GlobalBarrierManager`].
pub struct BarrierActorInfo {
#[derive(Default, Clone)]
pub struct InflightActorInfo {
/// node_id => node
pub node_map: HashMap<WorkerId, WorkerNode>,
pub node_map: HashMap<WorkerId, PbWorkerNode>,

/// node_id => actors
pub actor_map: HashMap<WorkerId, Vec<ActorId>>,
pub actor_map: HashMap<WorkerId, HashSet<ActorId>>,

/// node_id => barrier inject actors
pub actor_map_to_send: HashMap<WorkerId, Vec<ActorId>>,
pub actor_map_to_send: HashMap<WorkerId, HashSet<ActorId>>,

/// actor_id => WorkerId
pub actor_location_map: HashMap<ActorId, WorkerId>,
}

impl BarrierActorInfo {
// TODO: we may resolve this info as graph updating, instead of doing it every time we want to
// send a barrier
impl InflightActorInfo {
/// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors.
pub fn resolve(
all_nodes: impl IntoIterator<Item = WorkerNode>,
all_nodes: impl IntoIterator<Item = PbWorkerNode>,
actor_infos: ActorInfos,
) -> Self {
let node_map = all_nodes
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();

let actor_map = actor_infos
.actor_maps
.into_iter()
.map(|(node_id, actor_ids)| (node_id, actor_ids.into_iter().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();

let actor_map_to_send = actor_infos
.barrier_inject_actor_maps
.into_iter()
.map(|(node_id, actor_ids)| (node_id, actor_ids.into_iter().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();

let actor_location_map = actor_map
.iter()
.flat_map(|(node_id, actor_ids)| actor_ids.iter().map(|actor_id| (*actor_id, *node_id)))
.collect::<HashMap<_, _>>();

Self {
node_map,
actor_map: actor_infos.actor_maps,
actor_map_to_send: actor_infos.barrier_inject_actor_maps,
actor_map,
actor_map_to_send,
actor_location_map,
}
}

// TODO: should only collect from reachable actors, for mv on mv
/// Update worker nodes snapshot. We need to support incremental updates for it in the future.
pub fn resolve_worker_nodes(&mut self, all_nodes: impl IntoIterator<Item = PbWorkerNode>) {
self.node_map = all_nodes
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();
}

/// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
/// the info correspondingly.
pub fn pre_apply(&mut self, changes: Option<CommandActorChanges>) {
if let Some(CommandActorChanges { to_add, .. }) = changes {
for actor_desc in to_add {
assert!(self.node_map.contains_key(&actor_desc.node_id));
assert!(
self.actor_map
.entry(actor_desc.node_id)
.or_default()
.insert(actor_desc.id),
"duplicate actor in command changes"
);
if actor_desc.is_injectable {
assert!(
self.actor_map_to_send
.entry(actor_desc.node_id)
.or_default()
.insert(actor_desc.id),
"duplicate actor in command changes"
);
}
assert!(
self.actor_location_map
.insert(actor_desc.id, actor_desc.node_id)
.is_none(),
"duplicate actor in command changes"
);
}
};
}

/// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
/// remove that from the snapshot correspondingly.
pub fn post_apply(&mut self, changes: Option<CommandActorChanges>) {
if let Some(CommandActorChanges { to_remove, .. }) = changes {
for actor_id in to_remove {
let node_id = self
.actor_location_map
.remove(&actor_id)
.expect("actor not found");
let actor_ids = self.actor_map.get_mut(&node_id).expect("node not found");
assert!(actor_ids.remove(&actor_id), "actor not found");
self.actor_map_to_send
.get_mut(&node_id)
.map(|actor_ids| actor_ids.remove(&actor_id));
}
self.actor_map.retain(|_, actor_ids| !actor_ids.is_empty());
self.actor_map_to_send
.retain(|_, actor_ids| !actor_ids.is_empty());
}
}

/// Returns actor list to collect in the target worker node.
pub fn actor_ids_to_collect(&self, node_id: &WorkerId) -> impl Iterator<Item = ActorId> {
self.actor_map
.get(node_id)
.map(|actor_ids| actor_ids.clone().into_iter())
.unwrap_or_else(|| vec![].into_iter())
.cloned()
.unwrap_or_default()
.into_iter()
}

/// Returns actor list to send in the target worker node.
pub fn actor_ids_to_send(&self, node_id: &WorkerId) -> impl Iterator<Item = ActorId> {
self.actor_map_to_send
.get(node_id)
.map(|actor_ids| actor_ids.clone().into_iter())
.unwrap_or_else(|| vec![].into_iter())
.cloned()
.unwrap_or_default()
.into_iter()
}
}
Loading
Loading