Skip to content

Commit

Permalink
refactor(meta): decouple global barrier worker from context with trait (
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Oct 28, 2024
1 parent 0d013ea commit 0935cf2
Show file tree
Hide file tree
Showing 10 changed files with 577 additions and 380 deletions.
8 changes: 2 additions & 6 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,8 @@ pub async fn start_service_as_election_leader(
None
};

let (barrier_scheduler, scheduled_barriers) = BarrierScheduler::new_pair(
hummock_manager.clone(),
meta_metrics.clone(),
system_params_reader.checkpoint_frequency() as usize,
);
let (barrier_scheduler, scheduled_barriers) =
BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());

// Initialize services.
let backup_manager = BackupManager::new(
Expand Down Expand Up @@ -498,7 +495,6 @@ pub async fn start_service_as_election_leader(
hummock_manager.clone(),
source_manager.clone(),
sink_manager.clone(),
meta_metrics.clone(),
scale_controller.clone(),
)
.await;
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightGraphInfo};
use crate::barrier::{BarrierInfo, GlobalBarrierWorkerContext, InflightSubscriptionInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::{GlobalBarrierWorkerContextImpl, InflightSubscriptionInfo};
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::{DdlType, StreamingJob};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
Expand Down Expand Up @@ -926,7 +927,7 @@ impl Command {
impl CommandContext {
pub async fn wait_epoch_commit(
&self,
barrier_manager_context: &GlobalBarrierWorkerContext,
barrier_manager_context: &GlobalBarrierWorkerContextImpl,
) -> MetaResult<()> {
let table_id = self.table_ids_to_commit.iter().next().cloned();
// try wait epoch on an existing random table id
Expand Down Expand Up @@ -956,7 +957,7 @@ impl CommandContext {
/// the given command.
pub async fn post_collect(
&self,
barrier_manager_context: &GlobalBarrierWorkerContext,
barrier_manager_context: &GlobalBarrierWorkerContextImpl,
) -> MetaResult<()> {
match &self.command {
Command::Plain(_) => {}
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/barrier/creating_job/barrier_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_meta_model::WorkerId;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::debug;

use crate::rpc::metrics::MetaMetrics;
use crate::rpc::metrics::GLOBAL_META_METRICS;

#[derive(Debug)]
struct CreatingStreamingJobEpochState {
Expand Down Expand Up @@ -57,7 +57,7 @@ pub(super) struct CreatingStreamingJobBarrierControl {
}

impl CreatingStreamingJobBarrierControl {
pub(super) fn new(table_id: TableId, backfill_epoch: u64, metrics: &MetaMetrics) -> Self {
pub(super) fn new(table_id: TableId, backfill_epoch: u64) -> Self {
let table_id_str = format!("{}", table_id.table_id);
Self {
table_id,
Expand All @@ -68,16 +68,16 @@ impl CreatingStreamingJobBarrierControl {
pending_barriers_to_complete: Default::default(),
completing_barrier: None,

consuming_snapshot_barrier_latency: metrics
consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
.snapshot_backfill_barrier_latency
.with_guarded_label_values(&[&table_id_str, "consuming_snapshot"]),
consuming_log_store_barrier_latency: metrics
consuming_log_store_barrier_latency: GLOBAL_META_METRICS
.snapshot_backfill_barrier_latency
.with_guarded_label_values(&[&table_id_str, "consuming_log_store"]),
wait_commit_latency: metrics
wait_commit_latency: GLOBAL_META_METRICS
.snapshot_backfill_wait_commit_latency
.with_guarded_label_values(&[&table_id_str]),
inflight_barrier_num: metrics
inflight_barrier_num: GLOBAL_META_METRICS
.snapshot_backfill_inflight_barrier_num
.with_guarded_label_values(&[&table_id_str]),
}
Expand Down
15 changes: 5 additions & 10 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ use crate::barrier::creating_job::barrier_control::CreatingStreamingJobBarrierCo
use crate::barrier::creating_job::status::{
CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus,
};
use crate::barrier::info::InflightGraphInfo;
use crate::barrier::info::{BarrierInfo, InflightGraphInfo};
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::{BarrierInfo, Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo};
use crate::rpc::metrics::MetaMetrics;
use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo};
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::MetaResult;

#[derive(Debug)]
Expand All @@ -60,7 +60,6 @@ impl CreatingStreamingJobControl {
snapshot_backfill_info: SnapshotBackfillInfo,
backfill_epoch: u64,
version_stat: &HummockVersionStats,
metrics: &MetaMetrics,
initial_mutation: Mutation,
) -> Self {
info!(
Expand All @@ -81,11 +80,7 @@ impl CreatingStreamingJobControl {
Self {
info,
snapshot_backfill_info,
barrier_control: CreatingStreamingJobBarrierControl::new(
table_id,
backfill_epoch,
metrics,
),
barrier_control: CreatingStreamingJobBarrierControl::new(table_id, backfill_epoch),
backfill_epoch,
graph_info: InflightGraphInfo::new(fragment_info),
status: CreatingStreamingJobStatus::ConsumingSnapshot {
Expand All @@ -98,7 +93,7 @@ impl CreatingStreamingJobControl {
pending_non_checkpoint_barriers: vec![],
initial_barrier_info: Some((actors_to_create, initial_mutation)),
},
upstream_lag: metrics
upstream_lag: GLOBAL_META_METRICS
.snapshot_backfill_lag
.with_guarded_label_values(&[&table_id_str]),
}
Expand Down
Loading

0 comments on commit 0935cf2

Please sign in to comment.