From 499b833510fbb45349ff477014ba1f71bce39f44 Mon Sep 17 00:00:00 2001 From: Felipe Lalanne Date: Thu, 31 Oct 2024 18:04:40 -0300 Subject: [PATCH] Add first implementation of Domain The domain defines what jobs are applicable for different routes in the state and to which operations. Change-type: minor --- Cargo.toml | 1 + README.md | 12 +-- src/path.rs | 17 ++++ src/task/boxed.rs | 8 +- src/task/domain.rs | 227 +++++++++++++++++++++++++++++++++++++++++++++ src/task/intent.rs | 91 ++++++++++++++++++ src/task/job.rs | 49 +++++++++- src/task/mod.rs | 14 ++- 8 files changed, 399 insertions(+), 20 deletions(-) create mode 100644 src/task/domain.rs create mode 100644 src/task/intent.rs diff --git a/Cargo.toml b/Cargo.toml index 30ed1f9..23ed845 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ description = "An automated job orchestration library to build and execute dynam [dependencies] json-patch = "2.0.0" jsonptr = "0.6.0" +matchit = "0.8.4" serde = { version = "1.0.197" } serde_json = "1.0.120" thiserror = "1.0.63" diff --git a/README.md b/README.md index a02e25a..ec09e81 100644 --- a/README.md +++ b/README.md @@ -166,7 +166,7 @@ async fn main() { let agent = Worker::new() // The plus_one job is applicable to an UPDATE operation // on any given counter - .job("/counters/:name", update(plus_one)) + .job("/counters/{name}", update(plus_one)) // Initialize two counters "a" and "b" to 0 .with_state(State {counters: HashMap::from([("a".to_string(), 0), ("b".to_string(), 0)])}) @@ -194,7 +194,7 @@ As programmers, we want to be able to build code by composing simpler behaviors ```rust use gustav::system::Context; -fn plus_two(counter: Update, tgt: Target) -> Vec> { +fn plus_two(counter: Update, tgt: Target, Path(name): Path) -> Vec> { if *tgt - *counter < 2 { // Returning an empty result tells the planner // the task is not applicable to reach the target @@ -204,8 +204,8 @@ fn plus_two(counter: Update, tgt: Target) -> Vec, tgt: Target) -> Vec for Path { } } +// Structure to store path arguments when matching +// against a lens +#[derive(Clone)] +pub(crate) struct PathArgs(pub Vec<(Arc, String)>); + +impl PathArgs { + pub fn new(params: matchit::Params) -> Self { + let params: Vec<(Arc, String)> = params + .iter() + .map(|(k, v)| (Arc::from(k), String::from(v))) + .collect(); + + PathArgs(params) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/task/boxed.rs b/src/task/boxed.rs index e9999cc..0fa9b3c 100644 --- a/src/task/boxed.rs +++ b/src/task/boxed.rs @@ -29,7 +29,7 @@ impl BoxedIntoTask { })) } - pub fn into_task(self, id: String, context: Context) -> Task { + pub fn into_task(self, id: &str, context: Context) -> Task { self.0.into_task(id, context) } } @@ -43,12 +43,12 @@ impl Clone for BoxedIntoTask { trait ErasedIntoTask { fn clone_box(&self) -> Box>; - fn into_task(self: Box, id: String, context: Context) -> Task; + fn into_task(self: Box, id: &str, context: Context) -> Task; } struct MakeIntoTask { pub(crate) handler: H, - pub(crate) into_task: fn(String, H, Context) -> Task, + pub(crate) into_task: fn(&str, H, Context) -> Task, } impl ErasedIntoTask for MakeIntoTask @@ -63,7 +63,7 @@ where }) } - fn into_task(self: Box, id: String, context: Context) -> Task { + fn into_task(self: Box, id: &str, context: Context) -> Task { (self.into_task)(id, self.handler, context) } } diff --git a/src/task/domain.rs b/src/task/domain.rs new file mode 100644 index 0000000..4094c26 --- /dev/null +++ b/src/task/domain.rs @@ -0,0 +1,227 @@ +use matchit::Router; +use std::collections::{BTreeSet, HashMap}; + +use super::intent::{Intent, Operation}; +use crate::path::PathArgs; + +#[derive(Default)] +pub struct Domain { + // The router stores a list of intents matching a route + router: Router>>, + // The index stores the reverse relation of job id to a route + index: HashMap, +} + +// Placeholder string to replace escaped parameters +// in a route +const PLACEHOLDER: &str = "__gustav_placeholder__"; + +impl Domain { + pub fn new() -> Self { + Self { + router: Router::new(), + index: HashMap::new(), + } + } + + pub fn job(self, route: &str, intent: Intent) -> Self { + let Self { + mut router, + mut index, + } = self; + + let job_id = intent.job.id().clone(); + let operation = intent.operation.clone(); + + let mut queue = BTreeSet::new(); + + // Try to remove the route, if removing succeeds, then + // add the job to the returned set + if let Some(mut oldqueue) = router.remove(route) { + // Do not allow the same job to be assigned to + // multiple operations. This could cause problems at + // runtime + if oldqueue.iter().any(|i| i.job.id() == &job_id) { + panic!( + "cannot assign job '{}' to operation '{:?}', a previous assignment exists", + job_id, operation + ) + } + + // Update the queue with the new job + oldqueue.insert(intent); + queue = oldqueue; + } else { + queue.insert(intent); + } + + // (re)insert the queue to the router, we should not have + // conflicts here + router.insert(route, queue).expect("route should be valid"); + + // Only allow one assignment of a job to a route + if let Some(oldroute) = index.insert(job_id.clone(), String::from(route)) { + panic!( + "cannot assign job '{}' to route '{}', a previous assignment exists to '{}'", + job_id, route, oldroute + ) + } + + Self { router, index } + } + + // This allows to find the path that a task relates to from the + // job it belongs to and the arguments given by the user as part + // of the context. + // + // This implementation is still missing a ton of edge cases but should + // work as a proof of concept + // + // This will no longer be dead code when the planner + // is implemented + #[allow(dead_code)] + pub(crate) fn get_path(&self, job_id: &String, args: PathArgs) -> Option { + if let Some(route) = self.index.get(job_id) { + let mut route = route.clone(); + let placeholder = PLACEHOLDER.to_string(); + + // for each key in path args look for a parameter + // in the route and replace it by the value + for (k, v) in args.0.iter() { + // look for double bracket versions first and replace + // by a placeholder + let escaped = format!("{{{{{}}}}}", k); + route = route.replace(&escaped, &placeholder); + + let param = format!("{{{}}}", k); + route = route.replace(¶m, v); + + // Replace placeholder for its unescaped version + route = route.replace(&placeholder, &escaped); + } + + // TODO: for each escaped value `{{param}}` we should replace it + // with `{param}` + // TODO: what about wildcards? `{*param}` + + // QUESTION: Should be fail if there are still parameters? + return Some(route); + } + + None + } + + /// Find matches for the given path in the domain + /// the matches are sorted in order that they should be + /// tested + /// + // This will no longer be dead code when the planner + // is implemented + #[allow(dead_code)] + pub(crate) fn at(&self, path: &str) -> Option<(PathArgs, impl Iterator>)> { + self.router + .at(path) + .map(|matched| { + ( + PathArgs::new(matched.params), + matched + .value + .iter() + .filter(|i| i.operation != Operation::None), + ) + }) + .ok() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::extract::{Target, Update}; + use crate::path::PathArgs; + use crate::system::Context; + use crate::task::*; + + fn plus_one(mut counter: Update, tgt: Target) -> Update { + if *counter < *tgt { + *counter += 1; + } + + // Update implements IntoResult + counter + } + + fn plus_two(counter: Update, tgt: Target) -> Vec> { + if *tgt - *counter < 2 { + // Returning an empty result tells the planner + // the task is not applicable to reach the target + return vec![]; + } + + vec![ + plus_one.into_task(Context::from_target(*tgt)), + plus_one.into_task(Context::from_target(*tgt)), + ] + } + + #[test] + fn it_finds_jobs_ordered_by_degree() { + let domain = Domain::new() + .job("/counters/{counter}", update(plus_one)) + .job("/counters/{counter}", update(plus_two)); + + let jobs: Vec<&String> = domain + .at("/counters/{counter}") + .map(|(_, iter)| iter.map(|i| i.job.id()).collect()) + .unwrap(); + + // It should return compound jobs first + assert_eq!( + jobs, + vec![plus_two.into_job().id(), plus_one.into_job().id()] + ); + } + + #[test] + fn it_ignores_none_jobs() { + let domain = Domain::new() + .job("/counters/{counter}", none(plus_one)) + .job("/counters/{counter}", update(plus_two)); + + let jobs: Vec<&String> = domain + .at("/counters/{counter}") + .map(|(_, iter)| iter.map(|i| i.job.id()).collect()) + .unwrap(); + + // It should not return jobs for None operations + assert_eq!(jobs, vec![plus_two.into_job().id()]); + } + + #[test] + fn it_constructs_a_path_given_arguments() { + let domain = Domain::new() + .job("/counters/{counter}", none(plus_one)) + .job("/counters/{counter}", update(plus_two)); + + let args = PathArgs(vec![(Arc::from("counter"), String::from("one"))]); + let path = domain.get_path(plus_one.into_job().id(), args).unwrap(); + assert_eq!(path, String::from("/counters/one")) + } + + #[test] + #[should_panic] + fn it_fails_if_assigning_the_same_job_to_multiple_ops() { + Domain::new() + .job("/counters/{counter}", update(plus_one)) + .job("/counters/{counter}", update(plus_one)); + } + + #[test] + #[should_panic] + fn it_fails_if_assigning_the_same_job_to_multiple_routes() { + Domain::new() + .job("/counters/{counter}", update(plus_one)) + .job("/numbers/{counter}", create(plus_one)); + } +} diff --git a/src/task/intent.rs b/src/task/intent.rs new file mode 100644 index 0000000..e25ad7f --- /dev/null +++ b/src/task/intent.rs @@ -0,0 +1,91 @@ +use super::handler::Handler; +use super::job::Job; +use std::cmp::Ordering; + +#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Clone)] +pub(crate) enum Operation { + Create, + Update, + Delete, + Any, + None, +} + +pub struct Intent { + pub(crate) operation: Operation, + pub(crate) job: Job, + priority: u8, +} + +impl Intent { + pub(crate) fn new(job: Job) -> Self { + Intent { + operation: Operation::Update, + job, + priority: u8::MAX, + } + } + + /// Set intent priority. + /// + /// This defines search priority when looking for jobs + /// the lower the value, the higher the priority + pub fn with_priority(self, priority: u8) -> Self { + let Intent { operation, job, .. } = self; + Intent { + operation, + job, + priority, + } + } + + pub(crate) fn with_operation(self, operation: Operation) -> Self { + let Intent { priority, job, .. } = self; + Intent { + operation, + job, + priority, + } + } +} + +macro_rules! define_intent { + ($func_name:ident, $operation:expr) => { + pub fn $func_name(handler: H) -> Intent + where + H: Handler, + S: 'static, + I: 'static, + { + Intent::new(handler.into_job()).with_operation($operation) + } + }; +} + +define_intent!(create, Operation::Create); +define_intent!(update, Operation::Update); +define_intent!(delete, Operation::Delete); +define_intent!(any, Operation::Any); +define_intent!(none, Operation::None); + +impl PartialEq for Intent { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} +impl Eq for Intent {} + +impl PartialOrd for Intent { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Intent { + fn cmp(&self, other: &Self) -> Ordering { + self.job + .cmp(&other.job) + .then(self.operation.cmp(&other.operation)) + .then(self.priority.cmp(&other.priority)) + } +} diff --git a/src/task/job.rs b/src/task/job.rs index caef258..8de2541 100644 --- a/src/task/job.rs +++ b/src/task/job.rs @@ -1,18 +1,55 @@ use json_patch::Patch; +use std::cmp::Ordering; use super::boxed::*; use super::{Handler, Task}; use crate::system::Context; +/// The Job degree denotes its cardinality or its position in a search tree +/// +/// - Atom jobs are the leafs in the search tree, they define the work to be +/// executed and cannot be expanded +/// - List jobs define work in terms of other tasks, they are expanded recursively +/// in order to get to a list of atoms +#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)] +enum Degree { + List, + Atom, +} + /// Jobs are generic work definitions. They can be converted to tasks /// by calling into_task with a specific context. /// /// Jobs are re-usable pub struct Job { id: String, + degree: Degree, builder: BoxedIntoTask, } +impl PartialEq for Job { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl PartialOrd for Job { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Eq for Job {} + +impl Ord for Job { + fn cmp(&self, other: &Self) -> Ordering { + // We order jobs by degree. When searching for applicable + // jobs, we want to give List jobs priority over atomic jobs + // as these can be used to direct the search + self.degree.cmp(&other.degree) + } +} + impl Job { pub(crate) fn from_action(action: A) -> Self where @@ -20,11 +57,12 @@ impl Job { S: 'static, I: 'static, { - let id = std::any::type_name::().to_string(); + let id = String::from(std::any::type_name::()); Self { id, builder: BoxedIntoTask::from_action(action), + degree: Degree::Atom, } } @@ -33,18 +71,19 @@ impl Job { M: Handler>>, S: 'static, { - let id = std::any::type_name::().to_string(); + let id = String::from(std::any::type_name::()); Self { id, + degree: Degree::List, builder: BoxedIntoTask::from_method(method), } } - pub fn id(&self) -> String { - self.id.clone() + pub fn id(&self) -> &String { + &self.id } pub fn into_task(&self, context: Context) -> Task { - self.builder.clone().into_task(self.id.clone(), context) + self.builder.clone().into_task(self.id.as_str(), context) } } diff --git a/src/task/mod.rs b/src/task/mod.rs index d5d3f8b..981634c 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -1,6 +1,8 @@ mod boxed; +mod domain; mod effect; mod handler; +mod intent; mod job; mod result; @@ -11,8 +13,10 @@ use std::pin::Pin; use crate::error::Error; use crate::system::{Context, System}; +pub use domain::*; pub use effect::*; pub use handler::*; +pub use intent::*; pub use job::*; pub(crate) use result::*; @@ -38,7 +42,7 @@ pub enum Task { } impl Task { - pub(crate) fn atom(id: String, handler: H, context: Context) -> Self + pub(crate) fn atom(id: &str, handler: H, context: Context) -> Self where H: Handler, S: 'static, @@ -46,7 +50,7 @@ impl Task { { let hc = handler.clone(); Self::Atom { - id, + id: String::from(id), context, dry_run: Box::new(|system: &System, context: Context| { let effect = hc.call(system, context); @@ -65,13 +69,13 @@ impl Task { } } - pub(crate) fn list(id: String, handler: H, context: Context) -> Self + pub(crate) fn list(id: &str, handler: H, context: Context) -> Self where H: Handler>>, S: 'static, { Self::List { - id, + id: String::from(id), context, expand: Box::new(|system: &System, context: Context| { // List tasks cannot perform changes to the system @@ -196,7 +200,7 @@ mod tests { fn it_gets_metadata_from_function() { let job = plus_one.into_job(); - assert_eq!(job.id(), "gustav::task::tests::plus_one".to_string()); + assert_eq!(job.id().as_str(), "gustav::task::tests::plus_one"); } #[test]