diff --git a/Cargo.lock b/Cargo.lock index 5d621dbc..6c44ebb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,12 +17,6 @@ dependencies = [ "libc", ] -[[package]] -name = "atomic-option" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593" - [[package]] name = "atty" version = "0.2.14" @@ -250,7 +244,6 @@ dependencies = [ name = "faktory" version = "0.12.1" dependencies = [ - "atomic-option", "bufstream", "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index e9d98361..9b42d389 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ libc = "0.2" rand = "0.8" chrono = { version = "0.4", features = ["serde", "clock"], default-features = false } url = "2" -atomic-option = "0.1" fnv = "1.0.5" native-tls = { version = "0.2", optional = true } clap = { version = "3.1.0", optional = true } diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index fb465cd5..3908932f 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -1,11 +1,10 @@ use crate::error::Error; use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect}; -use atomic_option::AtomicOption; use fnv::FnvHashMap; use std::error::Error as StdError; use std::io::prelude::*; use std::net::TcpStream; -use std::sync::{atomic, Arc}; +use std::sync::{atomic, Arc, Mutex}; use crate::proto::{Ack, Fail, Job}; @@ -113,12 +112,17 @@ where S: Read + Write, { c: Client, - last_job_results: Arc>>>, - running_jobs: Arc>>, + worker_states: Arc>>, callbacks: Arc>>, terminated: bool, } +#[derive(Default)] +struct WorkerState { + last_job_result: Option>, + running_job: Option, +} + /// Convenience wrapper for building a Faktory worker. /// /// See the [`Consumer`](struct.Consumer.html) documentation for details. @@ -242,8 +246,7 @@ impl Consumer { Consumer { c, callbacks: Arc::new(callbacks), - running_jobs: Arc::new((0..workers).map(|_| AtomicOption::empty()).collect()), - last_job_results: Arc::new((0..workers).map(|_| AtomicOption::empty()).collect()), + worker_states: Arc::new((0..workers).map(|_| Default::default()).collect()), terminated: false, } } @@ -285,7 +288,7 @@ where let jid = job.jid.clone(); // keep track of running job in case we're terminated during it - self.running_jobs[worker].swap(Box::new(jid.clone()), atomic::Ordering::SeqCst); + self.worker_states[worker].lock().unwrap().running_job = Some(jid.clone()); // process the job let r = self.run_job(job); @@ -295,8 +298,7 @@ where Ok(_) => { // job done -- acknowledge // remember it in case we fail to notify the server (e.g., broken connection) - self.last_job_results[worker] - .swap(Box::new(Ok(jid.clone())), atomic::Ordering::SeqCst); + self.worker_states[worker].lock().unwrap().last_job_result = Some(Ok(jid.clone())); self.c.issue(&Ack::new(jid))?.await_ok()?; } Err(e) => { @@ -320,14 +322,17 @@ where }; let fail2 = fail.clone(); - self.last_job_results[worker].swap(Box::new(Err(fail)), atomic::Ordering::SeqCst); + self.worker_states[worker].lock().unwrap().last_job_result = Some(Err(fail)); self.c.issue(&fail2)?.await_ok()?; } } // we won't have to tell the server again - self.last_job_results[worker].take(atomic::Ordering::SeqCst); - self.running_jobs[worker].take(atomic::Ordering::SeqCst); + { + let mut state = self.worker_states[worker].lock().unwrap(); + state.last_job_result = None; + state.running_job = None; + } Ok(true) } @@ -352,8 +357,7 @@ where Ok(Consumer { c: self.c.connect_again()?, callbacks: Arc::clone(&self.callbacks), - running_jobs: Arc::clone(&self.running_jobs), - last_job_results: Arc::clone(&self.last_job_results), + worker_states: Arc::clone(&self.worker_states), terminated: self.terminated, }) } @@ -373,13 +377,15 @@ where Q: AsRef, { assert!(!self.terminated, "do not re-run a terminated worker"); - assert_eq!(Arc::strong_count(&self.last_job_results), 1); + let worker_states = Arc::get_mut(&mut self.worker_states) + .expect("all workers are scoped to &mut of the user-code-visible Consumer"); // retry delivering notification about our last job result. // we know there's no leftover thread at this point, so there's no race on the option. - for last_job_result in self.last_job_results.iter() { - if let Some(res) = last_job_result.take(atomic::Ordering::SeqCst) { - let r = match *res { + for wstate in worker_states.iter_mut() { + let wstate = wstate.get_mut().unwrap(); + if let Some(res) = wstate.last_job_result.take() { + let r = match res { Ok(ref jid) => self.c.issue(&Ack::new(&**jid)), Err(ref fail) => self.c.issue(fail), }; @@ -387,7 +393,7 @@ where let r = match r { Ok(r) => r, Err(e) => { - last_job_result.swap(res, atomic::Ordering::SeqCst); + wstate.last_job_result = Some(res); return Err(e); } }; @@ -398,7 +404,7 @@ where // when re-sending the job response. this should not count as critical. other // errors, however, should! if let Error::IO(_) = e { - last_job_result.swap(res, atomic::Ordering::SeqCst); + wstate.last_job_result = Some(res); return Err(e); } } @@ -406,7 +412,7 @@ where } // keep track of the current status of each worker - let status: Vec<_> = (0..self.running_jobs.len()) + let status: Vec<_> = (0..self.worker_states.len()) .map(|_| Arc::new(atomic::AtomicUsize::new(STATUS_RUNNING))) .collect(); @@ -503,9 +509,9 @@ where if let Ok(true) = exit { // FAIL currently running jobs even though they're still running let mut running = 0; - for running_job in self.running_jobs.iter() { - if let Some(jid) = running_job.take(atomic::Ordering::SeqCst) { - let f = Fail::new(&**jid, "unknown", "terminated"); + for wstate in self.worker_states.iter() { + if let Some(jid) = wstate.lock().unwrap().running_job.take() { + let f = Fail::new(&*jid, "unknown", "terminated"); // if this fails, we don't want to exit with Err(), // because we *were* still terminated!