Skip to content

Commit

Permalink
im cryign
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Apr 28, 2024
1 parent a6222d4 commit e56ebc4
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 158 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
[profile.dev.package]
backtrace = { opt-level = 3 }
num-bigint-dig = { opt-level = 3 }
taplo = { debug-assertions = false } # A debug assertion will make the xtask panic with too long trailing comments

# The profile that 'cargo dist' will build with
Expand Down
15 changes: 7 additions & 8 deletions kitsune-job-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ pub async fn run_dispatcher(
},
});

let job_queue = Arc::new(job_queue);
let job_tracker = TaskTracker::new();
job_tracker.close();

Expand All @@ -99,13 +98,13 @@ pub async fn run_dispatcher(
let job_tracker = job_tracker.clone();

async move {
job_queue
.spawn_jobs(
num_job_workers - job_tracker.len(),
Arc::clone(&ctx),
&job_tracker,
)
.await
athena::spawn_jobs(
&job_queue,
num_job_workers - job_tracker.len(),
Arc::clone(&ctx),
&job_tracker,
)
.await
}
})
.retry(just_retry::backoff_policy())
Expand Down
10 changes: 4 additions & 6 deletions lib/athena/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ name = "basic_queue"
required-features = ["redis"]

[dependencies]
ahash = { version = "0.8.11", optional = true }
ahash = "0.8.11"
async-trait = "0.1.80"
either = { version = "1.11.0", default-features = false, optional = true }
futures-util = { version = "0.3.30", default-features = false }
iso8601-timestamp = "0.2.17"
just-retry = { path = "../just-retry", optional = true }
just-retry = { path = "../just-retry" }
multiplex-pool = { path = "../multiplex-pool", optional = true }
once_cell = { version = "1.19.0", optional = true }
rand = { version = "0.8.5", optional = true }
Expand All @@ -26,7 +26,7 @@ redis = { version = "0.25.3", default-features = false, features = [
"streams",
"tokio-comp",
], optional = true }
serde = { version = "1.0.199", features = ["derive"], optional = true }
serde = { version = "1.0.199", features = ["derive"] }
simd-json = { version = "0.13.10", optional = true }
smol_str = "0.2.1"
speedy-uuid = { path = "../speedy-uuid", features = ["redis", "serde"] }
Expand All @@ -35,17 +35,15 @@ tokio = { version = "1.37.0", features = ["macros", "rt", "sync"] }
tokio-util = { version = "0.7.10", features = ["rt"] }
tracing = "0.1.40"
typed-builder = "0.18.2"
typetag = "0.2.16"

[features]
redis = [
"dep:ahash",
"dep:either",
"dep:just-retry",
"dep:multiplex-pool",
"dep:once_cell",
"dep:rand",
"dep:redis",
"dep:serde",
"dep:simd-json",
]

Expand Down
2 changes: 1 addition & 1 deletion lib/athena/examples/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn main() {
loop {
if tokio::time::timeout(
Duration::from_secs(5),
queue.spawn_jobs(20, Arc::new(()), &jobs),
athena::spawn_jobs(&queue, 20, Arc::new(()), &jobs),
)
.await
.is_err()
Expand Down
36 changes: 20 additions & 16 deletions lib/athena/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use crate::{
consts::MIN_IDLE_TIME,
error::{Error, Result},
JobContextRepository, JobQueue, Runnable,
JobContextRepository, JobData, JobQueue, JobResult, Outcome, Runnable,
};
use ahash::AHashMap;
use futures_util::TryStreamExt;
use just_retry::RetryExt;
use speedy_uuid::Uuid;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use tokio_util::task::TaskTracker;

const BLOCK_TIME: Duration = Duration::from_secs(2);
const MAX_RETRIES: u32 = 10;
const MIN_IDLE_TIME: Duration = Duration::from_secs(10 * 60);

type ContextFor<Queue: JobQueue> =
<<Queue::ContextRepository as JobContextRepository>::JobContext as Runnable>::Context;
type ContextFor<Queue> =
<<<Queue as JobQueue>::ContextRepository as JobContextRepository>::JobContext as Runnable>::Context;

pub async fn spawn_jobs<Q>(
queue: &Q,
Expand All @@ -25,9 +24,11 @@ where
Q: JobQueue + Clone,
{
let job_data = queue.fetch_job_data(max_jobs).await?;
let job_ids: Vec<Uuid> = job_data.iter().map(|data| data.job_id).collect();

let context_stream = queue
.context_repository()
.fetch_context(job_data.clone().map(|data| data.meta.job_id))
.fetch_context(job_ids.into_iter())
.await
.map_err(|err| Error::ContextRepository(err.into()))?;

Expand All @@ -36,14 +37,14 @@ where
// Collect all the job data into a hashmap indexed by the job ID
// This is because we don't enforce an ordering with the batch fetching
let job_data = job_data
.map(|data| (data.meta.job_id, data))
.into_iter()
.map(|data| (data.job_id, data))
.collect::<AHashMap<Uuid, JobData>>();
let job_data = Arc::new(job_data);

while let Some((job_id, job_ctx)) = context_stream
.next()
.try_next()
.await
.transpose()
.map_err(|err| Error::ContextRepository(err.into()))?
{
let queue = queue.clone();
Expand Down Expand Up @@ -73,15 +74,18 @@ where

let job_state = if let Err(error) = result {
error!(error = ?error.into(), "Failed run job");
JobState::Failed {
fail_count: job_data.meta.fail_count,
JobResult {
outcome: Outcome::Fail {
fail_count: job_data.fail_count,
},
job_id,
stream_id: &job_data.stream_id,
ctx: &job_data.ctx,
}
} else {
JobState::Succeeded {
JobResult {
outcome: Outcome::Success,
job_id,
stream_id: &job_data.stream_id,
ctx: &job_data.ctx,
}
};

Expand Down
5 changes: 5 additions & 0 deletions lib/athena/src/consts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use std::time::Duration;

pub const BLOCK_TIME: Duration = Duration::from_secs(2);
pub const MAX_RETRIES: u32 = 10;
pub const MIN_IDLE_TIME: Duration = Duration::from_secs(10 * 60);
Loading

0 comments on commit e56ebc4

Please sign in to comment.