Skip to content

Commit

Permalink
move failed jobs to the failed state (madara-alliance#159)
Browse files Browse the repository at this point in the history
* move failed to jobs to the failed state

* changelog

* test fix and clippy

* logx

* fix clippy
  • Loading branch information
apoorvsadana authored Oct 15, 2024
1 parent 5fb2de6 commit 64b375d
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 54 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- update_job returns the updated job item
- made create_job atomic to avoid race conditions
- handle jobs in tokio tasks
- handle workers in tokio tasks
Expand All @@ -63,6 +64,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- all failed jobs should move to failed state
- Fixes all unwraps() in code to improve error logging
- Simplified Update_Job for Database.
- Simplified otel setup.
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ tracing-core = { workspace = true, default-features = false }
tracing-opentelemetry = "0.26.0"
tracing-subscriber = { workspace = true, features = ["env-filter"] }


[features]
default = ["ethereum", "with_mongodb", "with_sqs"]
ethereum = ["ethereum-da-client"]
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait Database: Send + Sync {
async fn create_job(&self, job: JobItem) -> Result<JobItem, JobError>;
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>>;
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>>;
async fn update_job(&self, current_job: &JobItem, updates: crate::jobs::types::JobItemUpdates) -> Result<()>;
async fn update_job(&self, current_job: &JobItem, updates: crate::jobs::types::JobItemUpdates) -> Result<JobItem>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_without_successor(
&self,
Expand Down
26 changes: 16 additions & 10 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use color_eyre::eyre::eyre;
use color_eyre::Result;
use futures::TryStreamExt;
use mongodb::bson::{doc, Bson, Document};
use mongodb::options::{ClientOptions, FindOneOptions, FindOptions, ServerApi, ServerApiVersion, UpdateOptions};
use mongodb::options::{
ClientOptions, FindOneAndUpdateOptions, FindOneOptions, FindOptions, ReturnDocument, ServerApi, ServerApiVersion,
UpdateOptions,
};
use mongodb::{bson, Client, Collection};
use utils::ToDocument;
use uuid::Uuid;
Expand Down Expand Up @@ -109,13 +112,13 @@ impl Database for MongoDb {
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result<()> {
async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result<JobItem> {
// Filters to search for the job
let filter = doc! {
"id": current_job.id,
"version": current_job.version,
};
let options = UpdateOptions::builder().upsert(false).build();
let options = FindOneAndUpdateOptions::builder().upsert(false).return_document(ReturnDocument::After).build();

let mut updates = updates.to_document()?;

Expand All @@ -140,14 +143,17 @@ impl Database for MongoDb {
"$set": non_null_updates
};

let result = self.get_job_collection().update_one(filter, update, options).await?;
if result.modified_count == 0 {
tracing::warn!(job_id = %current_job.id, category = "db_call", "Failed to update job. Job version is likely outdated");
return Err(eyre!("Failed to update job. Job version is likely outdated"));
let result = self.get_job_collection().find_one_and_update(filter, update, options).await?;
match result {
Some(job) => {
tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully");
Ok(job)
}
None => {
tracing::warn!(job_id = %current_job.id, category = "db_call", "Failed to update job. Job version is likely outdated");
return Err(eyre!("Failed to update job. Job version is likely outdated"));
}
}

tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully");
Ok(())
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/jobs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub const JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX: &str = "attempt_tx_hashes_";
pub const JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO: &str = "last_failed_block_no";
pub const JOB_METADATA_SNOS_BLOCK: &str = "block_number_to_run";
pub const JOB_METADATA_SNOS_FACT: &str = "snos_fact";
pub const JOB_METADATA_FAILURE_REASON: &str = "failure_reason";
77 changes: 49 additions & 28 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use async_trait::async_trait;
use color_eyre::eyre::{eyre, Context};
use constants::JOB_METADATA_FAILURE_REASON;
use conversion::parse_string;
use da_job::DaError;
use mockall::automock;
Expand Down Expand Up @@ -185,7 +186,7 @@ pub async fn create_job(
/// DB. It then adds the job to the verification queue.
#[tracing::instrument(skip(config), fields(category = "general", job, job_type, internal_id), ret, err)]
pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let mut job = get_job(id, config.clone()).await?;
let job = get_job(id, config.clone()).await?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block");

Expand All @@ -209,7 +210,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
// the same job, it would fail to update the job in the database because the version would be
// outdated
tracing::debug!(job_id = ?id, "Updating job status to LockedForProcessing");
config
let mut job = config
.database()
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::LockedForProcessing).build())
.await
Expand All @@ -220,19 +221,28 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>

tracing::debug!(job_id = ?id, job_type = ?job.job_type, "Getting job handler");
let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = job_handler.process_job(config.clone(), &mut job).await?;
let external_id = match job_handler.process_job(config.clone(), &mut job).await {
Ok(external_id) => {
tracing::debug!(job_id = ?id, "Successfully processed job");
external_id
}
Err(e) => {
// TODO: I think most of the times the errors will not be fixed automatically
// if we just retry. But for some failures like DB issues, it might be possible
// that retrying will work. So we can add a retry logic here to improve robustness.
tracing::error!(job_id = ?id, error = ?e, "Failed to process job");
return move_job_to_failed(&job, config.clone(), format!("Processing failed: {}", e)).await;
}
};
tracing::debug!(job_id = ?id, "Incrementing process attempt count in metadata");
let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;

let mut job_cloned = job.clone();
job_cloned.version += 1;

// Fetching the job again because update status above will update the job version
tracing::debug!(job_id = ?id, "Updating job status to PendingVerification");
config
.database()
.update_job(
&job_cloned,
&job,
JobItemUpdates::new()
.update_status(JobStatus::PendingVerification)
.update_metadata(metadata)
Expand Down Expand Up @@ -322,21 +332,6 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
new_job.metadata.insert("error".to_string(), e);
new_job.status = JobStatus::VerificationFailed;

config
.database()
.update_job(
&job,
JobItemUpdates::new()
.update_status(JobStatus::VerificationFailed)
.update_metadata(new_job.metadata)
.build(),
)
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationFailed");
JobError::Other(OtherError(e))
})?;

let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)
.map_err(|e| JobError::Other(OtherError(e)))?;
if process_attempts < job_handler.max_process_attempts() {
Expand All @@ -345,10 +340,30 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
attempt = process_attempts + 1,
"Verification failed. Retrying job processing"
);
config
.database()
.update_job(
&job,
JobItemUpdates::new()
.update_status(JobStatus::VerificationFailed)
.update_metadata(new_job.metadata)
.build(),
)
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationFailed");
JobError::Other(OtherError(e))
})?;
add_job_to_process_queue(job.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?;
return Ok(());
} else {
tracing::warn!(job_id = ?id, "Max process attempts reached. Job will not be retried");
return move_job_to_failed(
&job,
config.clone(),
format!("Verification rejected. Max process attempts reached: {}", process_attempts),
)
.await;
}
}
JobVerificationStatus::Pending => {
Expand Down Expand Up @@ -408,30 +423,36 @@ pub async fn handle_job_failure(id: Uuid, config: Arc<Config>) -> Result<(), Job
let job = get_job(id, config.clone()).await?.clone();
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure started for block");
let mut metadata = job.metadata.clone();

tracing::Span::current().record("job_status", format!("{:?}", job.status));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type));

tracing::debug!(job_id = ?id, job_status = ?job.status, job_type = ?job.job_type, block_no = %internal_id, "Job details for failure handling for block");
let status = job.status.clone().to_string();
move_job_to_failed(&job, config.clone(), format!("Received failure queue message for job with status: {}", status))
.await
}

async fn move_job_to_failed(job: &JobItem, config: Arc<Config>, reason: String) -> Result<(), JobError> {
if job.status == JobStatus::Completed {
tracing::error!(job_id = ?id, job_status = ?job.status, "Invalid state exists on DL queue");
tracing::error!(job_id = ?job.id, job_status = ?job.status, "Invalid state exists on DL queue");
return Ok(());
}
// We assume that a Failure status will only show up if the message is sent twice from a queue
// Can return silently because it's already been processed.
else if job.status == JobStatus::Failed {
tracing::warn!(job_id = ?id, "Job already marked as failed, skipping processing");
tracing::warn!(job_id = ?job.id, "Job already marked as failed, skipping processing");
return Ok(());
}

metadata.insert("last_job_status".to_string(), job.status.to_string());
let mut metadata = job.metadata.clone();
let internal_id = job.internal_id.clone();
metadata.insert(JOB_METADATA_FAILURE_REASON.to_string(), reason);

tracing::debug!(job_id = ?id, "Updating job status to Failed in database");
tracing::debug!(job_id = ?job.id, "Updating job status to Failed in database");
match config
.database()
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::Failed).update_metadata(metadata).build())
.update_job(job, JobItemUpdates::new().update_status(JobStatus::Failed).update_metadata(metadata).build())
.await
{
Ok(_) => {
Expand Down
7 changes: 0 additions & 7 deletions crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,19 @@ pub enum JobType {
pub enum JobStatus {
/// An acknowledgement that the job has been received by the
/// orchestrator and is waiting to be processed
#[strum(to_string = "Created")]
Created,
/// Some system has taken a lock over the job for processing and no
/// other system to process the job
#[strum(to_string = "Locked for Processing")]
LockedForProcessing,
/// The job has been processed and is pending verification
#[strum(to_string = "Pending Verification")]
PendingVerification,
/// The job has been processed and verified. No other actions needs to be taken
#[strum(to_string = "Completed")]
Completed,
/// The job was processed but the was unable to be verified under the given time
#[strum(to_string = "Verification Timeout")]
VerificationTimeout,
/// The job failed processing
#[strum(to_string = "Verification Failed")]
VerificationFailed,
/// The job failed completing
#[strum(to_string = "Failed")]
Failed,
}

Expand Down
7 changes: 6 additions & 1 deletion crates/orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async fn database_test_update_job() {
let updated_metadata = increment_key_in_metadata(&metadata, key).unwrap();

let job_cloned = job.clone();
let _ = database_client
let updated_job = database_client
.update_job(
&job_cloned,
JobItemUpdates::new()
Expand All @@ -219,10 +219,15 @@ async fn database_test_update_job() {
.await;

if let Some(job_after_updates_db) = database_client.get_job_by_id(job_id).await.unwrap() {
// check if job is updated
assert_eq!(JobType::DataSubmission, job_after_updates_db.job_type);
assert_eq!(JobStatus::LockedForProcessing, job_after_updates_db.status);
assert_eq!(1, job_after_updates_db.version);
assert_eq!(456.to_string(), job_after_updates_db.internal_id);

// check if value returned by `update_job` is the correct one
// and matches the one in database
assert_eq!(updated_job.unwrap(), job_after_updates_db);
} else {
panic!("Job not found in Database.")
}
Expand Down
54 changes: 50 additions & 4 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ use tokio::time::sleep;
use uuid::Uuid;

use super::database::build_job_item;
use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY};
use crate::jobs::constants::{
JOB_METADATA_FAILURE_REASON, JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY,
};
use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::{create_job, handle_job_failure, increment_key_in_metadata, process_job, verify_job, Job, MockJob};
use crate::jobs::{
create_job, handle_job_failure, increment_key_in_metadata, process_job, verify_job, Job, JobError, MockJob,
};
use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE};
use crate::tests::common::MessagePayloadType;
use crate::tests::config::{ConfigType, TestConfigBuilder};
Expand Down Expand Up @@ -298,6 +302,45 @@ async fn process_job_two_workers_process_same_job_works() {
assert_eq!(final_job_in_db.status, JobStatus::PendingVerification);
}

/// Tests `process_job` function when the job handler returns an error.
/// The job should be moved to the failed status.
#[rstest]
#[tokio::test]
async fn process_job_job_handler_returns_error_works() {
let mut job_handler = MockJob::new();
// Expecting process job function in job processor to return the external ID.
let failure_reason = "Failed to process job";
job_handler
.expect_process_job()
.times(1)
.returning(move |_, _| Err(JobError::Other(failure_reason.to_string().into())));
job_handler.expect_verification_polling_delay_seconds().return_const(1u64);

// Mocking the `get_job_handler` call in create_job function.
let job_handler: Arc<Box<dyn Job>> = Arc::new(Box::new(job_handler));
let ctx = mock_factory::get_job_handler_context();
ctx.expect().times(1).with(eq(JobType::SnosRun)).returning(move |_| Arc::clone(&job_handler));

// building config
let services = TestConfigBuilder::new()
.configure_database(ConfigType::Actual)
.configure_queue_client(ConfigType::Actual)
.build()
.await;
let db_client = services.config.database();

let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string());

// Creating the job in the db
db_client.create_job(job_item.clone()).await.unwrap();

assert!(process_job(job_item.id, services.config.clone()).await.is_ok());

let final_job_in_db = db_client.get_job_by_id(job_item.id).await.unwrap().unwrap();
assert_eq!(final_job_in_db.status, JobStatus::Failed);
assert!(final_job_in_db.metadata.get(JOB_METADATA_FAILURE_REASON).unwrap().to_string().contains(failure_reason));
}

/// Tests `verify_job` function when job is having expected status
/// and returns a `Verified` verification status.
#[rstest]
Expand Down Expand Up @@ -427,7 +470,7 @@ async fn verify_job_with_rejected_status_works() {

// DB checks.
let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap();
assert_eq!(updated_job.status, JobStatus::VerificationFailed);
assert_eq!(updated_job.status, JobStatus::Failed);
assert_eq!(updated_job.metadata.get(JOB_PROCESS_ATTEMPT_METADATA_KEY).unwrap(), "1");

// Waiting for 5 secs for message to be passed into the queue
Expand Down Expand Up @@ -623,7 +666,10 @@ async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobT
// creating expected output
let mut job_expected = job.clone();
let mut job_metadata = job_expected.metadata.clone();
job_metadata.insert("last_job_status".to_string(), job_status.to_string());
job_metadata.insert(
JOB_METADATA_FAILURE_REASON.to_string(),
format!("Received failure queue message for job with status: {}", job_status),
);
job_expected.metadata.clone_from(&job_metadata);
job_expected.status = JobStatus::Failed;
job_expected.version = 1;
Expand Down

0 comments on commit 64b375d

Please sign in to comment.