Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: report stream error for create streaming jobs #13108

Merged
merged 16 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ impl GlobalBarrierManager {
err: MetaError,
fail_nodes: impl IntoIterator<Item = EpochNode>,
) {
self.context.tracker.lock().await.abort_all(&err);
self.rpc_manager.clear();

for node in fail_nodes {
Expand All @@ -707,9 +708,10 @@ impl GlobalBarrierManager {
if let Some(wait_commit_timer) = node.wait_commit_timer {
wait_commit_timer.observe_duration();
}
node.notifiers
.into_iter()
.for_each(|notifier| notifier.notify_collection_failed(err.clone()));
node.notifiers.into_iter().for_each(|notifier|
// some of the fail nodes may be notified as collected before, we should notify them
// as failed using the specified error.
notifier.notify_failed(err.clone()));
}

if self.enable_recovery {
Expand Down
21 changes: 19 additions & 2 deletions src/meta/src/barrier/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub(crate) struct Notifier {
pub collected: Option<oneshot::Sender<MetaResult<()>>>,

/// Get notified when scheduled barrier is finished.
pub finished: Option<oneshot::Sender<()>>,
pub finished: Option<oneshot::Sender<MetaResult<()>>>,
}

impl Notifier {
Expand Down Expand Up @@ -70,7 +70,24 @@ impl Notifier {
/// However for creating MV, this is only called when all `BackfillExecutor` report it finished.
pub fn notify_finished(self) {
if let Some(tx) = self.finished {
tx.send(()).ok();
tx.send(Ok(())).ok();
}
}

/// Notify when we failed to finish a barrier. This function consumes `self`.
pub fn notify_finish_failed(self, err: MetaError) {
if let Some(tx) = self.finished {
tx.send(Err(err)).ok();
}
}

/// Notify when we failed to collect or finish a barrier. This function consumes `self`.
pub fn notify_failed(self, err: MetaError) {
if let Some(tx) = self.collected {
tx.send(Err(err.clone())).ok();
}
if let Some(tx) = self.finished {
tx.send(Err(err)).ok();
}
}
}
27 changes: 26 additions & 1 deletion src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::barrier::{
};
use crate::manager::{DdlType, MetadataManager};
use crate::model::{ActorId, TableFragments};
use crate::MetaResult;
use crate::{MetaError, MetaResult};

type ConsumedRows = u64;

Expand Down Expand Up @@ -216,6 +216,20 @@ impl TrackingJob {
}
}

pub(crate) fn notify_finish_failed(self, err: MetaError) {
match self {
TrackingJob::New(command) => {
command
.notifiers
.into_iter()
.for_each(|n| n.notify_finish_failed(err.clone()));
}
TrackingJob::Recovered(recovered) => {
recovered.finished.notify_finish_failed(err);
}
}
}

pub(crate) fn table_to_create(&self) -> Option<TableId> {
match self {
TrackingJob::New(command) => command.context.table_to_create(),
Expand Down Expand Up @@ -371,6 +385,17 @@ impl CreateMviewProgressTracker {
self.actor_map.retain(|_, table_id| *table_id != id);
}

/// Notify all tracked commands that error encountered and clear them.
pub fn abort_all(&mut self, err: &MetaError) {
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
self.actor_map.clear();
self.finished_jobs.drain(..).for_each(|job| {
job.notify_finish_failed(err.clone());
});
self.progress_map
.drain()
.for_each(|(_, (_, job))| job.notify_finish_failed(err.clone()));
}

/// Add a new create-mview DDL command to track.
///
/// If the actors to track is empty, return the given command as it can be finished immediately.
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl GlobalBarrierManagerContext {
tokio::spawn(async move {
let res: MetaResult<()> = try {
tracing::debug!("recovering stream job {}", table.id);
finished.await.context("failed to finish command")?;
finished.await.ok().context("failed to finish command")??;

tracing::debug!("finished stream job {}", table.id);
// Once notified that job is finished we need to notify frontend.
Expand Down Expand Up @@ -275,7 +275,7 @@ impl GlobalBarrierManagerContext {
tokio::spawn(async move {
let res: MetaResult<()> = try {
tracing::debug!("recovering stream job {}", id);
finished.await.ok().context("failed to finish command")?;
finished.await.ok().context("failed to finish command")??;
tracing::debug!(id, "finished stream job");
catalog_controller.finish_streaming_job(id).await?;
};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl BarrierScheduler {
.context("failed to collect barrier")??;

// Wait for this command to be finished.
finish_rx.await.ok().context("failed to finish command")?;
finish_rx.await.ok().context("failed to finish command")??;
}

Ok(infos)
Expand Down
16 changes: 9 additions & 7 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,13 +797,15 @@ impl CatalogManager {
Ok(())
}

fn assert_table_creating(tables: &BTreeMap<TableId, Table>, table: &Table) {
if let Some(t) = tables.get(&table.id)
&& let Ok(StreamJobStatus::Creating) = t.get_stream_job_status()
{
fn check_table_creating(tables: &BTreeMap<TableId, Table>, table: &Table) -> MetaResult<()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cc @kwannoel , Before the table in Foreground is marked as created, if recovery occurs, it will be cleaned up by clean_dirty_tables. Changed this function to check and return an error if it's cleaned.
The corner case was found in this run: https://buildkite.com/risingwavelabs/pull-request/builds/43296#018df426-a3da-48cc-af24-f09eb061a320

return if let Some(t) = tables.get(&table.id) {
assert_eq!(t.get_stream_job_status(), Ok(StreamJobStatus::Creating));
Ok(())
} else {
panic!("Table must be in creating procedure: {table:#?}")
}
// If the table does not exist, it should be created in Foreground and cleaned during recovery in some cases.
assert_eq!(table.create_type(), CreateType::Foreground);
Err(anyhow!("failed to create table during recovery").into())
};
}

pub async fn assert_tables_deleted(&self, table_ids: Vec<TableId>) {
Expand Down Expand Up @@ -1007,7 +1009,7 @@ impl CatalogManager {
let database_core = &mut core.database;
let tables = &mut database_core.tables;
if cfg!(not(test)) {
Self::assert_table_creating(tables, &table);
Self::check_table_creating(tables, &table)?;
}
let mut tables = BTreeMapTransaction::new(tables);

Expand Down
Loading