Skip to content

Commit

Permalink
move check in handler
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 6, 2024
1 parent 89897ac commit 0a434f1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
51 changes: 49 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::{info, warn, Instrument};

use self::command::CommandContext;
use self::notifier::Notifier;
Expand Down Expand Up @@ -559,7 +559,54 @@ impl GlobalBarrierManager {
break;
}

_changed_worker = self.active_streaming_nodes.changed() => {
changed_worker = self.active_streaming_nodes.changed() => {
#[cfg(debug_assertions)]
{
match self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
{
Ok(worker_nodes) => {
let ignore_irrelevant_info = |node: &WorkerNode| {
(
node.id,
WorkerNode {
id: node.id,
r#type: node.r#type,
host: node.host.clone(),
parallel_units: node.parallel_units.clone(),
property: node.property.clone(),
resource: node.resource.clone(),
..Default::default()
},
)
};
let worker_nodes: HashMap<_, _> =
worker_nodes.iter().map(ignore_irrelevant_info).collect();
let curr_worker_nodes: HashMap<_, _> = self
.active_streaming_nodes
.current()
.values()
.map(ignore_irrelevant_info)
.collect();
if worker_nodes != curr_worker_nodes {
warn!(
?worker_nodes,
?curr_worker_nodes,
"different to global snapshot"
);
}
}
Err(e) => {
warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
}
}
}

info!(?changed_worker, "worker changed");

// TODO: may apply the changed worker to state
}

Expand Down
39 changes: 1 addition & 38 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct MetadataManagerV2 {
pub catalog_controller: CatalogControllerRef,
}

#[derive(Debug)]
pub(crate) enum ActiveStreamingWorkerChange {
Add(WorkerNode),
Remove(WorkerNode),
Expand Down Expand Up @@ -174,44 +175,6 @@ impl ActiveStreamingWorkerNodes {
}
};

#[cfg(debug_assertions)]
{
if let Ok(worker_nodes) = self
._meta_manager
.list_active_streaming_compute_nodes()
.await
{
let ignore_irrelevant_info = |node: &WorkerNode| {
(
node.id,
WorkerNode {
id: node.id,
r#type: node.r#type,
host: node.host.clone(),
parallel_units: node.parallel_units.clone(),
property: node.property.clone(),
resource: node.resource.clone(),
..Default::default()
},
)
};
let worker_nodes: HashMap<_, _> =
worker_nodes.iter().map(ignore_irrelevant_info).collect();
let curr_worker_nodes: HashMap<_, _> = self
.worker_nodes
.values()
.map(ignore_irrelevant_info)
.collect();
if worker_nodes != curr_worker_nodes {
warn!(
?worker_nodes,
?curr_worker_nodes,
"different to global snapshot"
);
}
}
}

ret
}
}
Expand Down

0 comments on commit 0a434f1

Please sign in to comment.