Skip to content

Commit

Permalink
feat: Store error in procedure state (#1062)
Browse files Browse the repository at this point in the history
* docs: Change comment position

* refactor(procedure): Store error in ProcedureState

* test: Mock instance with procedure enabled

* feat: Add wait method to wait for procedure

* test(datanode): Test create table by procedure

* chore: Fix clippy
  • Loading branch information
evenyag authored Mar 1, 2023
1 parent 75e48c5 commit 3fd9c2f
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 79 deletions.
21 changes: 20 additions & 1 deletion src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use common_error::prelude::*;

Expand Down Expand Up @@ -81,6 +82,21 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},

#[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
ProcedurePanic { procedure_id: ProcedureId },

#[snafu(display("Failed to wait watcher, source: {}", source))]
WaitWatcher {
source: tokio::sync::watch::error::RecvError,
backtrace: Backtrace,
},

#[snafu(display("Failed to execute procedure, source: {}", source))]
ProcedureExec {
source: Arc<Error>,
backtrace: Backtrace,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -95,10 +111,13 @@ impl ErrorExt for Error {
| Error::ListState { .. }
| Error::ReadState { .. }
| Error::FromJson { .. }
| Error::RetryLater { .. } => StatusCode::Internal,
| Error::RetryLater { .. }
| Error::WaitWatcher { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Error::ProcedurePanic { .. } => StatusCode::Unexpected,
Error::ProcedureExec { source, .. } => source.status_code(),
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/common/procedure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ pub mod error;
pub mod local;
mod procedure;
mod store;
pub mod watcher;

pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, Watcher,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status,
};
pub use crate::watcher::Watcher;
10 changes: 5 additions & 5 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl LocalManager {

common_runtime::spawn_bg(async move {
// Run the root procedure.
let _ = runner.run().await;
runner.run().await;
});

Ok(watcher)
Expand Down Expand Up @@ -447,9 +447,9 @@ mod tests {
assert!(ctx.try_insert_procedure(meta.clone()));
assert!(ctx.contains_procedure(meta.id));

assert_eq!(ProcedureState::Running, ctx.state(meta.id).unwrap());
assert!(ctx.state(meta.id).unwrap().is_running());
meta.set_state(ProcedureState::Done);
assert_eq!(ProcedureState::Done, ctx.state(meta.id).unwrap());
assert!(ctx.state(meta.id).unwrap().is_done());
}

#[test]
Expand Down Expand Up @@ -634,7 +634,7 @@ mod tests {
// Wait for the procedure done.
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
watcher.changed().await.unwrap();
assert_eq!(ProcedureState::Done, *watcher.borrow());
assert!(watcher.borrow().is_done());

// Try to submit procedure with same id again.
let err = manager
Expand Down Expand Up @@ -697,7 +697,7 @@ mod tests {
.unwrap();
// Wait for the notification.
watcher.changed().await.unwrap();
assert_eq!(ProcedureState::Failed, *watcher.borrow());
assert!(watcher.borrow().is_failed());
}
};

Expand Down
61 changes: 33 additions & 28 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Duration;
use common_telemetry::logging;
use tokio::time;

use crate::error::{Error, Result};
use crate::error::{ProcedurePanicSnafu, Result};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::store::ProcedureStore;
use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status};
Expand All @@ -30,7 +30,7 @@ enum ExecResult {
Continue,
Done,
RetryLater,
Failed(Error),
Failed,
}

#[cfg(test)]
Expand All @@ -48,7 +48,7 @@ impl ExecResult {
}

fn is_failed(&self) -> bool {
matches!(self, ExecResult::Failed(_))
matches!(self, ExecResult::Failed)
}
}

Expand Down Expand Up @@ -83,7 +83,11 @@ impl Drop for ProcedureGuard {
// Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
// See https://github.com/tokio-rs/tokio/issues/2002 .
// We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
self.meta.set_state(ProcedureState::Failed);
let err = ProcedurePanicSnafu {
procedure_id: self.meta.id,
}
.build();
self.meta.set_state(ProcedureState::failed(Arc::new(err)));
}

// Notify parent procedure.
Expand All @@ -109,7 +113,7 @@ pub(crate) struct Runner {

impl Runner {
/// Run the procedure.
pub(crate) async fn run(mut self) -> Result<()> {
pub(crate) async fn run(mut self) {
// Ensure we can update the procedure state.
let guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());

Expand All @@ -129,12 +133,9 @@ impl Runner {
.await;
}

let mut result = Ok(());
// Execute the procedure. We need to release the lock whenever the the execution
// is successful or fail.
if let Err(e) = self.execute_procedure_in_loop().await {
result = Err(e);
}
self.execute_procedure_in_loop().await;

// We can't remove the metadata of the procedure now as users and its parent might
// need to query its state.
Expand All @@ -155,11 +156,9 @@ impl Runner {
self.procedure.type_name(),
self.meta.id
);

result
}

async fn execute_procedure_in_loop(&mut self) -> Result<()> {
async fn execute_procedure_in_loop(&mut self) {
let ctx = Context {
procedure_id: self.meta.id,
provider: self.manager_ctx.clone(),
Expand All @@ -168,11 +167,10 @@ impl Runner {
loop {
match self.execute_once(&ctx).await {
ExecResult::Continue => (),
ExecResult::Done => return Ok(()),
ExecResult::Done | ExecResult::Failed => return,
ExecResult::RetryLater => {
self.wait_on_err().await;
}
ExecResult::Failed(e) => return Err(e),
}
}
}
Expand Down Expand Up @@ -222,14 +220,14 @@ impl Runner {
return ExecResult::RetryLater;
}

self.meta.set_state(ProcedureState::Failed);
self.meta.set_state(ProcedureState::failed(Arc::new(e)));

// Write rollback key so we can skip this procedure while recovering procedures.
if self.rollback_procedure().await.is_err() {
return ExecResult::RetryLater;
}

ExecResult::Failed(e)
ExecResult::Failed
}
}
}
Expand Down Expand Up @@ -404,7 +402,7 @@ mod tests {

use super::*;
use crate::local::test_util;
use crate::{ContextProvider, LockKey, Procedure};
use crate::{ContextProvider, Error, LockKey, Procedure};

const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";

Expand Down Expand Up @@ -630,9 +628,14 @@ mod tests {
// Wait for subprocedures.
let mut all_child_done = true;
for id in children_ids {
if ctx.provider.procedure_state(id).await.unwrap()
!= Some(ProcedureState::Done)
{
let is_not_done = ctx
.provider
.procedure_state(id)
.await
.unwrap()
.map(|s| !s.is_done())
.unwrap_or(true);
if is_not_done {
all_child_done = false;
}
}
Expand Down Expand Up @@ -668,7 +671,7 @@ mod tests {
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;

runner.run().await.unwrap();
runner.run().await;

// Check files on store.
for child_id in children_ids {
Expand Down Expand Up @@ -706,7 +709,7 @@ mod tests {

let res = runner.execute_once(&ctx).await;
assert!(res.is_failed(), "{res:?}");
assert_eq!(ProcedureState::Failed, meta.state());
assert!(meta.state().is_failed());
check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await;
}

Expand Down Expand Up @@ -741,11 +744,11 @@ mod tests {

let res = runner.execute_once(&ctx).await;
assert!(res.is_retry_later(), "{res:?}");
assert_eq!(ProcedureState::Running, meta.state());
assert!(meta.state().is_running());

let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
assert_eq!(ProcedureState::Done, meta.state());
assert!(meta.state().is_done());
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
}

Expand Down Expand Up @@ -779,7 +782,8 @@ mod tests {
} else {
// Wait for subprocedures.
let state = ctx.provider.procedure_state(child_id).await.unwrap();
if state == Some(ProcedureState::Failed) {
let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
if is_failed {
// The parent procedure to abort itself if child procedure is failed.
Err(Error::from_error_ext(PlainError::new(
"subprocedure failed".to_string(),
Expand Down Expand Up @@ -811,12 +815,13 @@ mod tests {

let manager_ctx = Arc::new(ManagerContext::new());
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta));
assert!(manager_ctx.try_insert_procedure(meta.clone()));
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;

// Run the runer and execute the procedure.
let err = runner.run().await.unwrap_err();
assert!(err.to_string().contains("subprocedure failed"), "{err}");
runner.run().await;
let err = meta.state().error().unwrap().to_string();
assert!(err.contains("subprocedure failed"), "{err}");
}
}
57 changes: 50 additions & 7 deletions src/common/procedure/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use snafu::{ResultExt, Snafu};
use tokio::sync::watch::Receiver;
use uuid::Uuid;

use crate::error::Result;
use crate::error::{Error, Result};
use crate::watcher::Watcher;

/// Procedure execution status.
#[derive(Debug)]
Expand Down Expand Up @@ -198,20 +198,47 @@ impl FromStr for ProcedureId {
/// Loader to recover the [Procedure] instance from serialized data.
pub type BoxedProcedureLoader = Box<dyn Fn(&str) -> Result<BoxedProcedure> + Send>;

// TODO(yingwen): Find a way to return the error message if the procedure is failed.
/// State of a submitted procedure.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Default, Clone)]
pub enum ProcedureState {
/// The procedure is running.
#[default]
Running,
/// The procedure is finished.
Done,
/// The procedure is failed and cannot proceed anymore.
Failed,
Failed { error: Arc<Error> },
}

/// Watcher to watch procedure state.
pub type Watcher = Receiver<ProcedureState>;
impl ProcedureState {
/// Returns a [ProcedureState] with failed state.
pub fn failed(error: Arc<Error>) -> ProcedureState {
ProcedureState::Failed { error }
}

/// Returns true if the procedure state is running.
pub fn is_running(&self) -> bool {
matches!(self, ProcedureState::Running)
}

/// Returns true if the procedure state is done.
pub fn is_done(&self) -> bool {
matches!(self, ProcedureState::Done)
}

/// Returns true if the procedure state failed.
pub fn is_failed(&self) -> bool {
matches!(self, ProcedureState::Failed { .. })
}

/// Returns the error.
pub fn error(&self) -> Option<&Arc<Error>> {
match self {
ProcedureState::Failed { error } => Some(error),
_ => None,
}
}
}

// TODO(yingwen): Shutdown
/// `ProcedureManager` executes [Procedure] submitted to it.
Expand Down Expand Up @@ -244,6 +271,9 @@ pub type ProcedureManagerRef = Arc<dyn ProcedureManager>;

#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use common_error::prelude::StatusCode;

use super::*;

#[test]
Expand Down Expand Up @@ -311,4 +341,17 @@ mod tests {
let parsed = serde_json::from_str(&json).unwrap();
assert_eq!(id, parsed);
}

#[test]
fn test_procedure_state() {
assert!(ProcedureState::Running.is_running());
assert!(ProcedureState::Running.error().is_none());
assert!(ProcedureState::Done.is_done());

let state = ProcedureState::failed(Arc::new(Error::external(MockError::new(
StatusCode::Unexpected,
))));
assert!(state.is_failed());
assert!(state.error().is_some());
}
}
Loading

0 comments on commit 3fd9c2f

Please sign in to comment.