Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): reorganize code of global barrier manager (part 1) #19334

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
820 changes: 820 additions & 0 deletions src/meta/src/barrier/checkpoint/control.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ use std::cmp::max;
use std::collections::HashMap;
use std::ops::Bound::{Excluded, Unbounded};

use barrier_control::CreatingStreamingJobBarrierControl;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use status::{CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus};
use tracing::info;

use crate::barrier::creating_job::barrier_control::CreatingStreamingJobBarrierControl;
use crate::barrier::creating_job::status::{
CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus,
};
use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
Expand All @@ -41,8 +39,8 @@ use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::MetaResult;

#[derive(Debug)]
pub(super) struct CreatingStreamingJobControl {
pub(super) info: CreateStreamingJobCommandInfo,
pub(crate) struct CreatingStreamingJobControl {
pub(crate) info: CreateStreamingJobCommandInfo,
pub(super) snapshot_backfill_info: SnapshotBackfillInfo,
backfill_epoch: u64,

Expand Down Expand Up @@ -103,7 +101,7 @@ impl CreatingStreamingJobControl {
}
}

pub(super) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool {
pub(crate) fn is_wait_on_worker(&self, worker_id: WorkerId) -> bool {
self.barrier_control.is_wait_on_worker(worker_id)
|| (self.status.is_finishing()
&& InflightFragmentInfo::contains_worker(
Expand All @@ -112,7 +110,7 @@ impl CreatingStreamingJobControl {
))
}

pub(super) fn gen_ddl_progress(&self) -> DdlProgress {
pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
let progress = match &self.status {
CreatingStreamingJobStatus::ConsumingSnapshot {
create_mview_tracker,
Expand Down
20 changes: 20 additions & 0 deletions src/meta/src/barrier/checkpoint/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod control;
mod creating_job;
mod state;

pub(super) use control::{CheckpointControl, DatabaseCheckpointControl, EpochNode};
pub(super) use state::BarrierWorkerState;
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
use crate::controller::fragment::InflightFragmentInfo;

/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
pub(super) struct BarrierWorkerState {
pub(crate) struct BarrierWorkerState {
/// The last sent `prev_epoch`
///
/// There's no need to persist this field. On recovery, we will restore this from the latest
Expand Down
247 changes: 17 additions & 230 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Formatter;

use futures::future::try_join_all;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
Expand All @@ -37,17 +36,15 @@ use risingwave_pb::stream_plan::{
DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
use crate::barrier::info::BarrierInfo;
use crate::barrier::{GlobalBarrierWorkerContextImpl, InflightSubscriptionInfo};
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::{DdlType, StreamingJob};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
/// in some fragment, like scaling or migrating.
Expand Down Expand Up @@ -203,7 +200,7 @@ pub enum CreateStreamingJobType {
SnapshotBackfill(SnapshotBackfillInfo),
}

/// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands,
/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
#[derive(Debug, strum::Display)]
Expand Down Expand Up @@ -479,6 +476,21 @@ impl CommandContext {
_span: span,
}
}

pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
let Some(truncate_timestamptz) = Timestamptz::from_secs(
self.barrier_info
.prev_epoch
.value()
.as_timestamptz()
.timestamp()
- retention_second as i64,
) else {
warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
return self.barrier_info.prev_epoch.value();
};
Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
}
}

impl Command {
Expand Down Expand Up @@ -922,228 +934,3 @@ impl Command {
}
}
}

impl CommandContext {
pub async fn wait_epoch_commit(
&self,
barrier_manager_context: &GlobalBarrierWorkerContextImpl,
) -> MetaResult<()> {
let table_id = self.table_ids_to_commit.iter().next().cloned();
// try wait epoch on an existing random table id
let Some(table_id) = table_id else {
// no need to wait epoch when there is no table id
return Ok(());
};
let futures = self.node_map.values().map(|worker_node| async {
let client = barrier_manager_context
.env
.stream_client_pool()
.get(worker_node)
.await?;
let request = WaitEpochCommitRequest {
epoch: self.barrier_info.prev_epoch(),
table_id: table_id.table_id,
};
client.wait_epoch_commit(request).await
});

try_join_all(futures).await?;

Ok(())
}

/// Do some stuffs after barriers are collected and the new storage version is committed, for
/// the given command.
pub async fn post_collect(
&self,
barrier_manager_context: &GlobalBarrierWorkerContextImpl,
) -> MetaResult<()> {
let Some(command) = &self.command else {
return Ok(());
};
match command {
Command::Flush => {}

Command::Throttle(_) => {}

Command::Pause(reason) => {
if let PausedReason::ConfigChange = reason {
// After the `Pause` barrier is collected and committed, we must ensure that the
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators
// may immediately initialize their states on that barrier.
self.wait_epoch_commit(barrier_manager_context).await?;
}
}

Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
barrier_manager_context
.metadata_manager
.update_actor_splits_by_split_assignment(split_assignment)
.await?;
barrier_manager_context
.source_manager
.apply_source_change(None, None, Some(split_assignment.clone()), None)
.await;
}

Command::DropStreamingJobs {
unregistered_state_table_ids,
..
} => {
barrier_manager_context
.hummock_manager
.unregister_table_ids(unregistered_state_table_ids.iter().cloned())
.await?;
}

Command::CancelStreamingJob(table_fragments) => {
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
// This is required for background ddl, for foreground ddl this is a no-op.
// Foreground ddl is handled entirely by stream manager, so it will unregister
// the table ids on failure.
// On the other hand background ddl could be handled by barrier manager.
// It won't clean the tables on failure,
// since the failure could be recoverable.
// As such it needs to be handled here.
barrier_manager_context
.hummock_manager
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;
}

Command::CreateStreamingJob { info, job_type } => {
let CreateStreamingJobCommandInfo {
table_fragments,
dispatchers,
init_split_assignment,
..
} = info;
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
table_fragments.table_id().table_id as _,
table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;

if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan {
new_table_fragments,
dispatchers,
init_split_assignment,
..
}) = job_type
{
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}

// Extract the fragments that include source operators.
let source_fragments = table_fragments.stream_source_fragments();
let backfill_fragments = table_fragments.source_backfill_fragments()?;
barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Some(backfill_fragments),
Some(init_split_assignment.clone()),
None,
)
.await;
}
Command::RescheduleFragment {
reschedules,
table_parallelism,
..
} => {
barrier_manager_context
.scale_controller
.post_apply_reschedule(reschedules, table_parallelism)
.await?;
}

Command::ReplaceTable(ReplaceTablePlan {
old_table_fragments,
new_table_fragments,
dispatchers,
init_split_assignment,
..
}) => {
// Update actors and actor_dispatchers for new table fragments.
barrier_manager_context
.metadata_manager
.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;

// Apply the split changes in source manager.
barrier_manager_context
.source_manager
.drop_source_fragments_vec(std::slice::from_ref(old_table_fragments))
.await;
let source_fragments = new_table_fragments.stream_source_fragments();
// XXX: is it possible to have backfill fragments here?
let backfill_fragments = new_table_fragments.source_backfill_fragments()?;
barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Some(backfill_fragments),
Some(init_split_assignment.clone()),
None,
)
.await;
}

Command::CreateSubscription {
subscription_id, ..
} => {
barrier_manager_context
.metadata_manager
.catalog_controller
.finish_create_subscription_catalog(*subscription_id)
.await?
}
Command::DropSubscription { .. } => {}
Command::MergeSnapshotBackfillStreamingJobs(_) => {}
}

Ok(())
}

pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
let Some(truncate_timestamptz) = Timestamptz::from_secs(
self.barrier_info
.prev_epoch
.value()
.as_timestamptz()
.timestamp()
- retention_second as i64,
) else {
warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
return self.barrier_info.prev_epoch.value();
};
Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
}
}
Loading
Loading