Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): maintain snapshot of running actors instead of resolving it every time for barrier #14517

Merged
merged 11 commits into from
Jan 16, 2024
Prev Previous commit
clippy
  • Loading branch information
yezizp2012 committed Jan 16, 2024
commit d5befa58fa1e9ae82841ea2782b1ea641ac60d6a
5 changes: 2 additions & 3 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
@@ -134,9 +134,8 @@ impl InflightActorInfo {
.actor_location_map
.remove(&actor_id)
.expect("actor not found");
self.actor_map
.get_mut(&node_id)
.map(|actor_ids| assert!(actor_ids.remove(&actor_id), "actor not found"));
let actor_ids = self.actor_map.get_mut(&node_id).expect("node not found");
assert!(actor_ids.remove(&actor_id), "actor not found");
self.actor_map_to_send
.get_mut(&node_id)
.map(|actor_ids| actor_ids.remove(&actor_id));
3 changes: 1 addition & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
@@ -373,8 +373,7 @@ impl GlobalBarrierManagerContext {
warn!(err = ?err, "reset compute nodes failed");
})?;

let to_remove_actors = scheduled_barriers.pre_apply_drop_scheduled().await;
if !to_remove_actors.is_empty() {
if scheduled_barriers.pre_apply_drop_scheduled().await {
info = self.resolve_actor_info().await;
}

28 changes: 10 additions & 18 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashSet, VecDeque};
use std::collections::VecDeque;
use std::iter::once;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
@@ -28,7 +28,6 @@ use tokio::sync::{oneshot, watch, RwLock};
use super::notifier::{BarrierInfo, Notifier};
use super::{Command, Scheduled};
use crate::hummock::HummockManagerRef;
use crate::model::ActorId;
use crate::rpc::metrics::MetaMetrics;
use crate::{MetaError, MetaResult};

@@ -396,35 +395,28 @@ impl ScheduledBarriers {
queue.mark_ready();
}

/// Try to pre apply drop scheduled command and return the table ids of dropped streaming jobs.
/// Try to pre apply drop scheduled command and return true if any.
/// It should only be called in recovery.
pub(super) async fn pre_apply_drop_scheduled(&self) -> HashSet<ActorId> {
let mut to_remove_actors = HashSet::new();
pub(super) async fn pre_apply_drop_scheduled(&self) -> bool {
let mut queue = self.inner.queue.write().await;
assert_matches!(queue.status, QueueStatus::Blocked(_));
let mut found = false;

while let Some(Scheduled {
notifiers, command, ..
}) = queue.queue.pop_front()
{
match command {
Command::DropStreamingJobs(node_actors) => {
to_remove_actors.extend(node_actors.values().flatten());
}
Command::CancelStreamingJob(table_fragments) => {
let actor_ids = table_fragments.actor_ids();
to_remove_actors.extend(actor_ids);
}
_ => {
unreachable!("only drop streaming jobs should be buffered");
}
}
assert_matches!(
command,
Command::DropStreamingJobs(_) | Command::CancelStreamingJob(_)
);
notifiers.into_iter().for_each(|mut notify| {
notify.notify_collected();
notify.notify_finished();
});
found = true;
}
to_remove_actors
found
}

/// Whether the barrier(checkpoint = true) should be injected.
Loading