Skip to content

Commit

Permalink
Get rid of unmaintained atomic-option
Browse files Browse the repository at this point in the history
  • Loading branch information
jonhoo committed Jan 3, 2024
1 parent a4e72fd commit 47b946e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 32 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ libc = "0.2"
rand = "0.8"
chrono = { version = "0.4", features = ["serde", "clock"], default-features = false }
url = "2"
atomic-option = "0.1"
fnv = "1.0.5"
native-tls = { version = "0.2", optional = true }
clap = { version = "3.1.0", optional = true }
Expand Down
54 changes: 30 additions & 24 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::error::Error;
use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect};
use atomic_option::AtomicOption;
use fnv::FnvHashMap;
use std::error::Error as StdError;
use std::io::prelude::*;
use std::net::TcpStream;
use std::sync::{atomic, Arc};
use std::sync::{atomic, Arc, Mutex};

use crate::proto::{Ack, Fail, Job};

Expand Down Expand Up @@ -113,12 +112,17 @@ where
S: Read + Write,
{
c: Client<S>,
last_job_results: Arc<Vec<AtomicOption<Result<String, Fail>>>>,
running_jobs: Arc<Vec<AtomicOption<String>>>,
worker_states: Arc<Vec<Mutex<WorkerState>>>,
callbacks: Arc<FnvHashMap<String, BoxedJobRunner<E>>>,
terminated: bool,
}

#[derive(Default)]
struct WorkerState {
last_job_result: Option<Result<String, Fail>>,
running_job: Option<String>,
}

/// Convenience wrapper for building a Faktory worker.
///
/// See the [`Consumer`](struct.Consumer.html) documentation for details.
Expand Down Expand Up @@ -242,8 +246,7 @@ impl<E, S: Read + Write> Consumer<S, E> {
Consumer {
c,
callbacks: Arc::new(callbacks),
running_jobs: Arc::new((0..workers).map(|_| AtomicOption::empty()).collect()),
last_job_results: Arc::new((0..workers).map(|_| AtomicOption::empty()).collect()),
worker_states: Arc::new((0..workers).map(|_| Default::default()).collect()),
terminated: false,
}
}
Expand Down Expand Up @@ -285,7 +288,7 @@ where
let jid = job.jid.clone();

// keep track of running job in case we're terminated during it
self.running_jobs[worker].swap(Box::new(jid.clone()), atomic::Ordering::SeqCst);
self.worker_states[worker].lock().unwrap().running_job = Some(jid.clone());

// process the job
let r = self.run_job(job);
Expand All @@ -295,8 +298,7 @@ where
Ok(_) => {
// job done -- acknowledge
// remember it in case we fail to notify the server (e.g., broken connection)
self.last_job_results[worker]
.swap(Box::new(Ok(jid.clone())), atomic::Ordering::SeqCst);
self.worker_states[worker].lock().unwrap().last_job_result = Some(Ok(jid.clone()));
self.c.issue(&Ack::new(jid))?.await_ok()?;
}
Err(e) => {
Expand All @@ -320,14 +322,17 @@ where
};

let fail2 = fail.clone();
self.last_job_results[worker].swap(Box::new(Err(fail)), atomic::Ordering::SeqCst);
self.worker_states[worker].lock().unwrap().last_job_result = Some(Err(fail));
self.c.issue(&fail2)?.await_ok()?;
}
}

// we won't have to tell the server again
self.last_job_results[worker].take(atomic::Ordering::SeqCst);
self.running_jobs[worker].take(atomic::Ordering::SeqCst);
{
let mut state = self.worker_states[worker].lock().unwrap();
state.last_job_result = None;
state.running_job = None;
}
Ok(true)
}

Expand All @@ -352,8 +357,7 @@ where
Ok(Consumer {
c: self.c.connect_again()?,
callbacks: Arc::clone(&self.callbacks),
running_jobs: Arc::clone(&self.running_jobs),
last_job_results: Arc::clone(&self.last_job_results),
worker_states: Arc::clone(&self.worker_states),
terminated: self.terminated,
})
}
Expand All @@ -373,21 +377,23 @@ where
Q: AsRef<str>,
{
assert!(!self.terminated, "do not re-run a terminated worker");
assert_eq!(Arc::strong_count(&self.last_job_results), 1);
let worker_states = Arc::get_mut(&mut self.worker_states)
.expect("all workers are scoped to &mut of the user-code-visible Consumer");

// retry delivering notification about our last job result.
// we know there's no leftover thread at this point, so there's no race on the option.
for last_job_result in self.last_job_results.iter() {
if let Some(res) = last_job_result.take(atomic::Ordering::SeqCst) {
let r = match *res {
for wstate in worker_states.iter_mut() {
let wstate = wstate.get_mut().unwrap();
if let Some(res) = wstate.last_job_result.take() {
let r = match res {
Ok(ref jid) => self.c.issue(&Ack::new(&**jid)),
Err(ref fail) => self.c.issue(fail),
};

let r = match r {
Ok(r) => r,
Err(e) => {
last_job_result.swap(res, atomic::Ordering::SeqCst);
wstate.last_job_result = Some(res);
return Err(e);
}
};
Expand All @@ -398,15 +404,15 @@ where
// when re-sending the job response. this should not count as critical. other
// errors, however, should!
if let Error::IO(_) = e {
last_job_result.swap(res, atomic::Ordering::SeqCst);
wstate.last_job_result = Some(res);
return Err(e);
}
}
}
}

// keep track of the current status of each worker
let status: Vec<_> = (0..self.running_jobs.len())
let status: Vec<_> = (0..self.worker_states.len())
.map(|_| Arc::new(atomic::AtomicUsize::new(STATUS_RUNNING)))
.collect();

Expand Down Expand Up @@ -503,9 +509,9 @@ where
if let Ok(true) = exit {
// FAIL currently running jobs even though they're still running
let mut running = 0;
for running_job in self.running_jobs.iter() {
if let Some(jid) = running_job.take(atomic::Ordering::SeqCst) {
let f = Fail::new(&**jid, "unknown", "terminated");
for wstate in self.worker_states.iter() {
if let Some(jid) = wstate.lock().unwrap().running_job.take() {
let f = Fail::new(&*jid, "unknown", "terminated");

// if this fails, we don't want to exit with Err(),
// because we *were* still terminated!
Expand Down

0 comments on commit 47b946e

Please sign in to comment.