Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): commit finish catalog in barrier manager #17428

Merged
merged 44 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8ca25d5
commit finish catalog in barrier manager
kwannoel Jun 24, 2024
5e911ab
add table_id_to_rx
kwannoel Jun 25, 2024
6090f33
register notifier
kwannoel Jun 25, 2024
6a8e51b
add wait finish stream job finished
kwannoel Jun 25, 2024
716c9b0
use wait_streaming_job_finished to poll for notification version
kwannoel Jun 25, 2024
c2b52c9
remove call to finish stream job for recovered jobs, its always done …
kwannoel Jun 25, 2024
52e2726
remove finished channel in notifier
kwannoel Jun 25, 2024
3d0845a
remove notifiers from recovered stream job
kwannoel Jun 25, 2024
f087aa6
wait streaming job finished logic error
kwannoel Jun 25, 2024
ca6783e
split recovered stream job into v1 and v2
kwannoel Jun 25, 2024
73721a0
update replace table info, so it can be committed in barrier manager
kwannoel Jun 25, 2024
74f3a3a
handle recovered jobs
kwannoel Jun 25, 2024
a7eadaf
fmt
kwannoel Jun 26, 2024
5224da0
add table_id_to_version map
kwannoel Jun 26, 2024
060a2bb
handle tx on finish create table
kwannoel Jun 26, 2024
165ac41
handle tx on finish create streaming job
kwannoel Jun 26, 2024
f041a68
handle tx on finish create index and sink
kwannoel Jun 26, 2024
2b4bb84
handle tx on finish create table with source
kwannoel Jun 26, 2024
761f2ff
handle tx for v2
kwannoel Jun 26, 2024
10a9267
implement v2 wait streaming job
kwannoel Jun 26, 2024
25edf4e
add wait_streaming_job to stream manager instead
kwannoel Jun 27, 2024
16bb50d
handle abort_all during recovery
kwannoel Jun 27, 2024
2425e7f
dont assert
kwannoel Jul 3, 2024
06549e7
fix notify failed
kwannoel Jul 3, 2024
fca78b1
add more logs
kwannoel Jul 3, 2024
89cfac9
more logs
kwannoel Jul 3, 2024
69431bb
fix regression
kwannoel Jul 3, 2024
a7306a3
fix
kwannoel Jul 3, 2024
b429251
fix create sink into table
kwannoel Jul 4, 2024
e613f38
fmt
kwannoel Jul 4, 2024
916e51d
update sink target table
kwannoel Jul 4, 2024
5223e7b
fix warn
kwannoel Jul 4, 2024
8f286f7
notify mv
kwannoel Jul 5, 2024
ea35907
fix
kwannoel Jul 5, 2024
05f455b
fix
kwannoel Jul 5, 2024
50026d8
fix
kwannoel Jul 7, 2024
a262b6b
Merge branch 'main' into kwannoel/commit-in-barrier
wenym1 Jul 12, 2024
ca2c5b2
check finish in catalog and refactor recovery progress param
wenym1 Jul 15, 2024
f5e1422
take ownership
wenym1 Jul 16, 2024
4e3ffdf
fix comments
wenym1 Jul 16, 2024
a0b234e
refactor streaming job
wenym1 Jul 16, 2024
844b7c3
Merge branch 'main' into kwannoel/commit-in-barrier
wenym1 Jul 16, 2024
9a7fee8
add doc
wenym1 Jul 16, 2024
3598ae9
fix doc
wenym1 Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ export RUST_LOG="risingwave_meta::barrier::recovery=debug,\
risingwave_meta::manager::catalog=debug,\
risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
risingwave_simulation=debug"
risingwave_simulation=debug,\
risingwave_meta::stream::stream_manager=debug,\
risingwave_meta::barrier::progress=debug"

# Extra logs you can enable if the existing trace does not give enough info.
#risingwave_stream::executor::backfill=trace,
Expand Down
13 changes: 11 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::CreateType;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
Expand All @@ -44,7 +44,7 @@ use tracing::warn;
use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo};
use super::trace::TracedEpoch;
use crate::barrier::GlobalBarrierManagerContext;
use crate::manager::{DdlType, MetadataManager, WorkerId};
use crate::manager::{DdlType, MetadataManager, StreamingJob, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;
Expand Down Expand Up @@ -97,6 +97,10 @@ pub struct ReplaceTablePlan {
/// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
/// `backfill_splits`.
pub init_split_assignment: SplitAssignment,
/// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
pub streaming_job: StreamingJob,
/// The temporary dummy table fragments id of new table fragment
pub dummy_id: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why it's named dummy_id? What does it mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And any reason to add these information in the ReplaceTablePlan command? I think in all commands that need to track the status of backfill, there should be a CreateStreamingJob command, and we can get the StreamingJob information there. What ddl would only have the ReplaceTablePlan command and we have to store the StreamingJob here in the ReplaceTablePlan command?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the dummy_id the same as the id of new_table_fragments?

}

impl ReplaceTablePlan {
Expand Down Expand Up @@ -183,6 +187,8 @@ pub enum Command {
/// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
/// will be set to `Created`.
CreateStreamingJob {
streaming_job: StreamingJob,
internal_tables: Vec<Table>,
table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
Expand Down Expand Up @@ -551,6 +557,7 @@ impl CommandContext {
merge_updates,
dispatchers,
init_split_assignment,
..
}) = replace_table
{
// TODO: support in v2.
Expand Down Expand Up @@ -1019,6 +1026,7 @@ impl CommandContext {
merge_updates,
dispatchers,
init_split_assignment,
..
}) = replace_table
{
self.clean_up(old_table_fragments.actor_ids()).await?;
Expand Down Expand Up @@ -1104,6 +1112,7 @@ impl CommandContext {
merge_updates,
dispatchers,
init_split_assignment,
..
}) => {
self.clean_up(old_table_fragments.actor_ids()).await?;

Expand Down
35 changes: 15 additions & 20 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ use crate::manager::{
ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
MetadataManager, SystemParamsManagerImpl, WorkerId,
};
use crate::model::{ActorId, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::{ScaleControllerRef, SourceManagerRef};
use crate::{MetaError, MetaResult};
Expand All @@ -88,12 +87,6 @@ pub(crate) struct TableMap<T> {
inner: HashMap<TableId, T>,
}

impl<T> TableMap<T> {
pub fn remove(&mut self, table_id: &TableId) -> Option<T> {
self.inner.remove(table_id)
}
}

impl<T> From<HashMap<TableId, T>> for TableMap<T> {
fn from(inner: HashMap<TableId, T>) -> Self {
Self { inner }
Expand All @@ -106,12 +99,6 @@ impl<T> From<TableMap<T>> for HashMap<TableId, T> {
}
}

pub(crate) type TableActorMap = TableMap<HashSet<ActorId>>;
pub(crate) type TableUpstreamMvCountMap = TableMap<HashMap<TableId, usize>>;
pub(crate) type TableDefinitionMap = TableMap<String>;
pub(crate) type TableNotifierMap = TableMap<Notifier>;
pub(crate) type TableFragmentMap = TableMap<TableFragments>;

/// The reason why the cluster is recovering.
enum RecoveryReason {
/// After bootstrap.
Expand Down Expand Up @@ -802,7 +789,12 @@ impl GlobalBarrierManager {
}

async fn failure_recovery(&mut self, err: MetaError) {
self.context.tracker.lock().await.abort_all(&err);
self.context
.tracker
.lock()
.await
.abort_all(&err, &self.context)
.await;
self.checkpoint_control.clear_on_err(&err).await;
self.pending_non_checkpoint_barriers.clear();

Expand Down Expand Up @@ -830,7 +822,12 @@ impl GlobalBarrierManager {

async fn adhoc_recovery(&mut self) {
let err = MetaErrorInner::AdhocRecovery.into();
self.context.tracker.lock().await.abort_all(&err);
self.context
.tracker
.lock()
.await
.abort_all(&err, &self.context)
.await;
self.checkpoint_control.clear_on_err(&err).await;

if self.enable_recovery {
Expand Down Expand Up @@ -859,7 +856,7 @@ impl GlobalBarrierManagerContext {
async fn complete_barrier(self, node: EpochNode) -> MetaResult<BarrierCompleteOutput> {
let EpochNode {
command_ctx,
mut notifiers,
notifiers,
enqueue_time,
state,
..
Expand All @@ -877,11 +874,11 @@ impl GlobalBarrierManagerContext {
}
return Err(e);
};
notifiers.iter_mut().for_each(|notifier| {
notifiers.into_iter().for_each(|notifier| {
notifier.notify_collected();
});
let has_remaining = self
.update_tracking_jobs(notifiers, command_ctx.clone(), create_mview_progress)
.update_tracking_jobs(command_ctx.clone(), create_mview_progress)
.await?;
let duration_sec = enqueue_time.stop_and_record();
self.report_complete_event(duration_sec, &command_ctx);
Expand Down Expand Up @@ -943,7 +940,6 @@ impl GlobalBarrierManagerContext {

async fn update_tracking_jobs(
&self,
notifiers: Vec<Notifier>,
command_ctx: Arc<CommandContext>,
create_mview_progress: Vec<CreateMviewProgress>,
) -> MetaResult<bool> {
Expand All @@ -960,7 +956,6 @@ impl GlobalBarrierManagerContext {
if let Some(command) = tracker.add(
TrackingCommand {
context: command_ctx.clone(),
notifiers,
},
&version_stats,
) {
Expand Down
28 changes: 2 additions & 26 deletions src/meta/src/barrier/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ pub(crate) struct Notifier {

/// Get notified when scheduled barrier is collected or failed.
pub collected: Option<oneshot::Sender<MetaResult<()>>>,

/// Get notified when scheduled barrier is finished.
pub finished: Option<oneshot::Sender<MetaResult<()>>>,
}

impl Notifier {
Expand All @@ -50,8 +47,8 @@ impl Notifier {
}

/// Notify when we have collected a barrier from all actors.
pub fn notify_collected(&mut self) {
if let Some(tx) = self.collected.take() {
pub fn notify_collected(self) {
if let Some(tx) = self.collected {
tx.send(Ok(())).ok();
}
}
Expand All @@ -63,31 +60,10 @@ impl Notifier {
}
}

/// Notify when we have finished a barrier from all actors. This function consumes `self`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we can change to consume the ownership of notifier for method notify_collected because it is expected there won't be call on the notifier after calling it.

///
/// Generally when a barrier is collected, it's also finished since it does not require further
/// report of finishing from actors.
/// However for creating MV, this is only called when all `BackfillExecutor` report it finished.
pub fn notify_finished(self) {
if let Some(tx) = self.finished {
tx.send(Ok(())).ok();
}
}

/// Notify when we failed to finish a barrier. This function consumes `self`.
pub fn notify_finish_failed(self, err: MetaError) {
if let Some(tx) = self.finished {
tx.send(Err(err)).ok();
}
}

/// Notify when we failed to collect or finish a barrier. This function consumes `self`.
pub fn notify_failed(self, err: MetaError) {
if let Some(tx) = self.collected {
tx.send(Err(err.clone())).ok();
}
if let Some(tx) = self.finished {
tx.send(Err(err)).ok();
}
}
}
Loading
Loading