From 927b6d577605f51e936bcf2b1347eba77cbadbff Mon Sep 17 00:00:00 2001 From: Joe Grund Date: Mon, 29 Jun 2020 13:23:18 -0400 Subject: [PATCH] Create Rust iml-state-machine - Use `warp-drive` `Cache` as a realtime singleton to get the current system state. This acts much the same way as the `job_scheduler` `ObjectCache` does but instead gets realtime updates from the db instead of needing to be notified of changes by other processes. - Use petgraph to build a graph consisting of `State` nodes and `Edge` edges. `Edge` is an enum that is either a `Transition` or a `Dependency`. Add some methods via an Extenstion trait to find transitions / shortest transition paths. - Create a `Job` trait that can either be invoked directly via a `Command`, or indirectly via a `Transition`. - Create a `Steps` struct that holds a list of free fns (much like action plugins). These steps are run serially within a job. - Refactor service address bindings to not coopt the nginx proxy host. - Add an input type for `RecordId`s - Add a http_client to the graphql context - Add graphql query and mutation for statemachine Signed-off-by: Joe Grund --- Cargo.lock | 28 ++- Cargo.toml | 1 + chroma-manager.conf.template | 11 +- docker/docker-compose.yml | 5 +- iml-action-client/src/lib.rs | 4 +- iml-api/src/graphql/mod.rs | 21 +- iml-api/src/graphql/state_machine.rs | 150 ++++++++++++ iml-api/src/main.rs | 6 +- iml-api/src/timer.rs | 9 +- ...nx__tests__replace_template_variables.snap | 11 +- iml-manager-env/src/lib.rs | 46 +++- iml-state-machine/Cargo.toml | 18 ++ iml-state-machine/README.md | 27 +++ iml-state-machine/src/command.rs | 191 +++++++++++++++ iml-state-machine/src/graph.rs | 229 ++++++++++++++++++ iml-state-machine/src/job.rs | 53 ++++ iml-state-machine/src/lib.rs | 26 ++ iml-state-machine/src/lnet.rs | 88 +++++++ iml-state-machine/src/snapshot.rs | 41 ++++ iml-state-machine/src/step.rs | 59 +++++ iml-warp-drive/Cargo.toml | 3 + iml-warp-drive/src/error.rs | 65 +---- iml-warp-drive/src/lib.rs | 2 + iml-warp-drive/src/locks.rs | 5 +- iml-warp-drive/src/main.rs | 64 ++--- iml-warp-drive/src/messaging.rs | 41 ++++ iml-warp-drive/src/state_machine.rs | 95 ++++++++ iml-wire-types/src/graphql_duration.rs | 2 +- iml-wire-types/src/lib.rs | 1 + iml-wire-types/src/snapshot.rs | 15 +- iml-wire-types/src/state_machine.rs | 128 ++++++++++ iml-wire-types/src/warp_drive.rs | 91 +++++++ migrations/20201026195644_state_machine.sql | 34 +++ sqlx-data.json | 136 +++++++++++ 34 files changed, 1580 insertions(+), 126 deletions(-) create mode 100644 iml-api/src/graphql/state_machine.rs create mode 100644 iml-state-machine/Cargo.toml create mode 100644 iml-state-machine/README.md create mode 100644 iml-state-machine/src/command.rs create mode 100644 iml-state-machine/src/graph.rs create mode 100644 iml-state-machine/src/job.rs create mode 100644 iml-state-machine/src/lib.rs create mode 100644 iml-state-machine/src/lnet.rs create mode 100644 iml-state-machine/src/snapshot.rs create mode 100644 iml-state-machine/src/step.rs create mode 100644 iml-warp-drive/src/messaging.rs create mode 100644 iml-warp-drive/src/state_machine.rs create mode 100644 iml-wire-types/src/state_machine.rs create mode 100644 migrations/20201026195644_state_machine.sql diff --git a/Cargo.lock b/Cargo.lock index b30aef04ae..4e45cb235c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2029,6 +2029,23 @@ dependencies = [ "url", ] +[[package]] +name = "iml-state-machine" +version = "0.1.0" +dependencies = [ + "futures", + "iml-action-client", + "iml-postgres", + "iml-tracing", + "iml-wire-types", + "petgraph", + "serde", + "serde_json", + "thiserror", + "tokio", + "uuid", +] + [[package]] name = "iml-stats" version = "0.4.0" @@ -2138,14 +2155,17 @@ version = "0.4.0" dependencies = [ "futures", "im", + "iml-action-client", "iml-manager-client", "iml-manager-env", "iml-postgres", "iml-rabbit", + "iml-state-machine", "iml-tracing", "iml-wire-types", "serde", "serde_json", + "thiserror", "tokio", "tokio-runtime-shutdown", "tracing", @@ -2766,9 +2786,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" dependencies = [ "autocfg 1.0.1", "num-traits", @@ -2776,9 +2796,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.12" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" dependencies = [ "autocfg 1.0.1", ] diff --git a/Cargo.toml b/Cargo.toml index bde8e28997..e484846fa9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ members = [ 'iml-services/iml-snapshot', 'iml-services/iml-stats', 'iml-sfa', + 'iml-state-machine', 'iml-system-test-utils', 'iml-systemd', 'iml-task-runner', diff --git a/chroma-manager.conf.template b/chroma-manager.conf.template index 76f6e999fd..034cf3e51e 100644 --- a/chroma-manager.conf.template +++ b/chroma-manager.conf.template @@ -238,7 +238,16 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_http_version 1.1; proxy_set_header Connection ''; - proxy_pass {{WARP_DRIVE_PROXY_PASS}}; + proxy_pass {{WARP_DRIVE_PROXY_PASS}}/messaging; + } + + location /state_machine { + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Server $host; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_http_version 1.1; + proxy_set_header Connection ''; + proxy_pass {{WARP_DRIVE_PROXY_PASS}}/state_machine; } location /mailbox { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 9ed115673f..5b546ad012 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -311,6 +311,8 @@ services: - "manager-config:/var/lib/chroma" environment: - PROXY_HOST=iml-warp-drive + - ACTION_RUNNER_HOST=iml-action-runner + - ACTION_RUNNER_PORT=8009 - RUST_LOG=info,sqlx::query=warn iml-action-runner: image: "imlteam/iml-action-runner:6.2.0" @@ -349,7 +351,8 @@ services: - "manager-config:/var/lib/chroma" - "report:/var/spool/iml/report" environment: - - PROXY_HOST=iml-api + - PROXY_HOST=nginx + - SERVICE_HOST=iml-api - RUST_LOG=info,sqlx::query=warn - BRANDING - USE_STRATAGEM diff --git a/iml-action-client/src/lib.rs b/iml-action-client/src/lib.rs index 5b424ebadd..bd8b1ca3f2 100644 --- a/iml-action-client/src/lib.rs +++ b/iml-action-client/src/lib.rs @@ -5,7 +5,7 @@ use bytes::buf::BufExt as _; use hyper::{client::HttpConnector, Body, Request}; use hyperlocal::{UnixClientExt as _, UnixConnector}; -use iml_manager_env::{get_action_runner_http, get_action_runner_uds, running_in_docker}; +use iml_manager_env::{get_action_runner_uds, running_in_docker, ACTION_RUNNER_URL}; use iml_wire_types::{Action, ActionId, ActionName, ActionType, Fqdn}; use std::{ops::Deref, sync::Arc}; use thiserror::Error; @@ -57,7 +57,7 @@ impl Default for Client { let (inner, uri) = if running_in_docker() { ( ClientInner::Http(hyper::Client::new()), - get_action_runner_http().parse::().unwrap(), + ACTION_RUNNER_URL.as_str().parse::().unwrap(), ) } else { ( diff --git a/iml-api/src/graphql/mod.rs b/iml-api/src/graphql/mod.rs index 4c912f76df..04a422474a 100644 --- a/iml-api/src/graphql/mod.rs +++ b/iml-api/src/graphql/mod.rs @@ -2,6 +2,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +mod state_machine; mod stratagem; mod task; @@ -12,6 +13,7 @@ use crate::{ }; use chrono::{DateTime, Utc}; use futures::{future::join_all, TryFutureExt, TryStreamExt}; +use iml_manager_client::Client; use iml_postgres::{ active_mgs_host_fqdn, fqdn_by_host_id, sqlx, sqlx::postgres::types::PgInterval, PgPool, }; @@ -139,6 +141,9 @@ impl QueryRoot { fn task(&self) -> task::TaskQuery { task::TaskQuery } + fn state_machine(&self) -> state_machine::StateMachineQuery { + state_machine::StateMachineQuery + } #[graphql(arguments( limit(description = "optional paging limit, defaults to all rows"), offset(description = "Offset into items, defaults to 0"), @@ -602,6 +607,9 @@ impl MutationRoot { fn task(&self) -> task::TaskMutation { task::TaskMutation } + fn state_machine(&self) -> state_machine::StateMachineMutation { + state_machine::StateMachineMutation + } #[graphql(arguments( fsname(description = "Filesystem to snapshot"), name(description = "Name of the snapshot"), @@ -852,8 +860,14 @@ impl MutationRoot { .map(|x| x.id); if let Some(id) = maybe_id { - configure_snapshot_timer(id, fsname, interval.0, use_barrier.unwrap_or_default()) - .await?; + configure_snapshot_timer( + context.http_client.clone(), + id, + fsname, + interval.0, + use_barrier.unwrap_or_default(), + ) + .await?; } Ok(true) @@ -866,7 +880,7 @@ impl MutationRoot { .execute(&context.pg_pool) .await?; - remove_snapshot_timer(id).await?; + remove_snapshot_timer(context.http_client.clone(), id).await?; Ok(true) } @@ -963,6 +977,7 @@ pub(crate) type Schema = RootNode<'static, QueryRoot, MutationRoot, EmptySubscri pub(crate) struct Context { pub(crate) pg_pool: PgPool, pub(crate) rabbit_pool: Pool, + pub(crate) http_client: Client, } impl juniper::Context for Context {} diff --git a/iml-api/src/graphql/state_machine.rs b/iml-api/src/graphql/state_machine.rs new file mode 100644 index 0000000000..c6b1dcba1b --- /dev/null +++ b/iml-api/src/graphql/state_machine.rs @@ -0,0 +1,150 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::graphql::Context; +use iml_manager_client::{post, Client, ImlManagerClientError}; +use iml_manager_env::get_proxy_url; +use iml_postgres::sqlx; +use iml_wire_types::{ + snapshot::{Destroy, Mount, Unmount}, + state_machine::{Command, CommandRecord, Job, Transition}, + warp_drive::{GraphqlRecordId, RecordId}, +}; + +pub(crate) struct StateMachineMutation; + +#[juniper::graphql_object(Context = Context)] +impl StateMachineMutation { + /// Run a state_machine `Transition` for a given record + async fn run_transition( + context: &Context, + record_id: GraphqlRecordId, + transition: Transition, + ) -> juniper::FieldResult { + let record_id = RecordId::from(record_id); + + let xs = get_transition_path(context.http_client.clone(), record_id, transition).await?; + + let mut jobs = vec![]; + + for x in xs { + match (record_id, x) { + (RecordId::Snapshot(x), Transition::MountSnapshot) => { + let x = sqlx::query!( + "SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1", + x + ) + .fetch_one(&context.pg_pool) + .await?; + + jobs.push(Job::MountSnapshotJob(Mount { + fsname: x.filesystem_name, + name: x.snapshot_name, + })); + } + (RecordId::Snapshot(x), Transition::UnmountSnapshot) => { + let x = sqlx::query!( + "SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1", + x + ) + .fetch_one(&context.pg_pool) + .await?; + + jobs.push(Job::UnmountSnapshotJob(Unmount { + fsname: x.filesystem_name, + name: x.snapshot_name, + })) + } + (RecordId::Snapshot(x), Transition::RemoveSnapshot) => { + let x = sqlx::query!( + "SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1", + x + ) + .fetch_one(&context.pg_pool) + .await?; + + jobs.push(Job::RemoveSnapshotJob(Destroy { + fsname: x.filesystem_name, + name: x.snapshot_name, + force: true, + })) + } + _ => {} + } + } + + let cmd = Command { + message: "Running Transition".to_string(), + jobs, + }; + + let mut url = get_proxy_url(); + + url.set_path("state_machine/run_command/"); + + let cmd = post(context.http_client.clone(), url.as_str(), cmd) + .await? + .error_for_status()? + .json() + .await?; + + Ok(cmd) + } +} + +pub(crate) struct StateMachineQuery; + +#[juniper::graphql_object(Context = Context)] +impl StateMachineQuery { + /// Given a record, figure out the possible transitions available for it + async fn get_transitions( + context: &Context, + record_id: GraphqlRecordId, + ) -> juniper::FieldResult> { + let mut url = get_proxy_url(); + + url.set_path("state_machine/get_transitions/"); + + let xs = post( + context.http_client.clone(), + url.as_str(), + RecordId::from(record_id), + ) + .await? + .error_for_status()? + .json() + .await?; + + Ok(xs) + } + /// Given a record and transition, figure out the shortest possible path for that + /// Record to reach that transition. + async fn get_transition_path( + context: &Context, + record_id: GraphqlRecordId, + transition: Transition, + ) -> juniper::FieldResult> { + let xs = get_transition_path(context.http_client.clone(), record_id, transition).await?; + + Ok(xs) + } +} + +async fn get_transition_path( + client: Client, + record_id: impl Into, + transition: Transition, +) -> Result, ImlManagerClientError> { + let mut url = get_proxy_url(); + + url.set_path("state_machine/get_transition_path/"); + + let xs = post(client, url.as_str(), (record_id.into(), transition)) + .await? + .error_for_status()? + .json() + .await?; + + Ok(xs) +} diff --git a/iml-api/src/main.rs b/iml-api/src/main.rs index 6e4b078361..126fe9a124 100644 --- a/iml-api/src/main.rs +++ b/iml-api/src/main.rs @@ -8,6 +8,7 @@ mod error; mod graphql; mod timer; +use iml_manager_client::get_client; use iml_manager_env::get_pool_limit; use iml_postgres::get_db_pool; use iml_rabbit::{self, create_connection_filter}; @@ -22,7 +23,7 @@ const DEFAULT_POOL_LIMIT: u32 = 5; async fn main() -> Result<(), Box> { iml_tracing::init(); - let addr = iml_manager_env::get_iml_api_addr(); + let addr = iml_manager_env::get_iml_api_bind_addr(); let conf = Conf { allow_anonymous_read: iml_manager_env::get_allow_anonymous_read(), @@ -42,6 +43,8 @@ async fn main() -> Result<(), Box> { let pg_pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?; + let http_client = get_client()?; + let schema = Arc::new(graphql::Schema::new( graphql::QueryRoot, graphql::MutationRoot, @@ -52,6 +55,7 @@ async fn main() -> Result<(), Box> { let ctx = Arc::new(graphql::Context { pg_pool, rabbit_pool, + http_client, }); let ctx_filter = warp::any().map(move || Arc::clone(&ctx)); diff --git a/iml-api/src/timer.rs b/iml-api/src/timer.rs index 615e6ad48a..47a5af13d4 100644 --- a/iml-api/src/timer.rs +++ b/iml-api/src/timer.rs @@ -1,5 +1,5 @@ use crate::error::ImlApiError; -use iml_manager_client::{delete, get_client, put}; +use iml_manager_client::{delete, put, Client}; use iml_manager_env::{get_timer_addr, running_in_docker}; use std::time::Duration; @@ -12,6 +12,7 @@ pub struct TimerConfig { } pub async fn configure_snapshot_timer( + client: Client, config_id: i32, fsname: String, interval: Duration, @@ -73,8 +74,6 @@ ExecStart={} service_config, }; - let client = get_client()?; - let url = format!("http://{}/configure/", get_timer_addr()); tracing::debug!( "Sending snapshot interval config to timer service: {:?} {:?}", @@ -86,9 +85,7 @@ ExecStart={} Ok(()) } -pub async fn remove_snapshot_timer(config_id: i32) -> Result<(), ImlApiError> { - let client = get_client()?; - +pub async fn remove_snapshot_timer(client: Client, config_id: i32) -> Result<(), ImlApiError> { delete( client, format!( diff --git a/iml-manager-cli/src/snapshots/iml_manager_cli__nginx__tests__replace_template_variables.snap b/iml-manager-cli/src/snapshots/iml_manager_cli__nginx__tests__replace_template_variables.snap index 1c544b9a09..20342b1c9b 100644 --- a/iml-manager-cli/src/snapshots/iml_manager_cli__nginx__tests__replace_template_variables.snap +++ b/iml-manager-cli/src/snapshots/iml_manager_cli__nginx__tests__replace_template_variables.snap @@ -242,7 +242,16 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_http_version 1.1; proxy_set_header Connection ''; - proxy_pass http://127.0.0.1:8890; + proxy_pass http://127.0.0.1:8890/messaging; + } + + location /state_machine { + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Server $host; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_http_version 1.1; + proxy_set_header Connection ''; + proxy_pass http://127.0.0.1:8890/state_machine; } location /mailbox { diff --git a/iml-manager-env/src/lib.rs b/iml-manager-env/src/lib.rs index 883883fc63..31d9b31a8d 100644 --- a/iml-manager-env/src/lib.rs +++ b/iml-manager-env/src/lib.rs @@ -16,11 +16,7 @@ lazy_static! { } lazy_static! { - static ref ACTION_RUNNER_HTTP: String = format!( - "http://{}:{}", - get_server_host(), - get_var("ACTION_RUNNER_PORT") - ); + pub static ref ACTION_RUNNER_URL: Url = get_action_runner_url(); } /// Get the environment variable or panic @@ -140,6 +136,10 @@ pub fn get_iml_api_addr() -> SocketAddr { to_socket_addr(&get_server_host(), &get_iml_api_port()) } +pub fn get_iml_api_bind_addr() -> SocketAddr { + to_socket_addr(&get_service_host(), &get_iml_api_port()) +} + /// Get the `http_agent2` port from the env or panic pub fn get_http_agent2_port() -> String { get_var("HTTP_AGENT2_PORT") @@ -149,7 +149,12 @@ pub fn get_http_agent2_addr() -> SocketAddr { to_socket_addr(&get_server_host(), &get_http_agent2_port()) } -/// Get the server host from the env or panic +/// Get the name of the host a service should bind to +pub fn get_service_host() -> String { + env::var("SERVICE_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()) +} + +/// Get the nginx host from the env or panic pub fn get_server_host() -> String { get_var("PROXY_HOST") } @@ -287,14 +292,39 @@ pub fn get_use_snapshots() -> bool { string_to_bool(env::var("USE_SNAPSHOTS").unwrap_or_else(|_| "false".to_string())) } -pub fn get_action_runner_http() -> String { - ACTION_RUNNER_HTTP.clone() +pub fn get_action_runner_host() -> String { + get_var("ACTION_RUNNER_HOST") +} + +pub fn get_action_runner_port() -> String { + get_var("ACTION_RUNNER_PORT") +} + +pub fn get_action_runner_url() -> Url { + Url::parse(&format!( + "http://{}:{}", + get_action_runner_host(), + get_action_runner_port() + )) + .expect("Could not parse action runner Url") } pub fn get_action_runner_uds() -> String { "/var/run/iml-action-runner.sock".to_string() } +/// Get the nginx proxy port or panic +pub fn get_proxy_port() -> String { + get_var("HTTPS_FRONTEND_PORT") +} + +/// Get the proxy URL or panic +pub fn get_proxy_url() -> Url { + let x = format!("https://{}:{}/", get_server_host(), get_proxy_port()); + + Url::parse(&x).expect("Could not parse proxy URL") +} + pub fn get_sfa_endpoints() -> Option>> { let xs: BTreeMap<_, _> = env::vars() .filter(|(k, _)| k.starts_with("SFA_ENDPOINTS_")) diff --git a/iml-state-machine/Cargo.toml b/iml-state-machine/Cargo.toml new file mode 100644 index 0000000000..fe9bbf023e --- /dev/null +++ b/iml-state-machine/Cargo.toml @@ -0,0 +1,18 @@ +[package] +authors = ["IML Team "] +edition = "2018" +name = "iml-state-machine" +version = "0.1.0" + +[dependencies] +futures = "0.3" +iml-action-client = {path = "../iml-action-client", version = "0.1"} +iml-postgres = {path = "../iml-postgres", version = "0.4"} +iml-tracing = {path = "../iml-tracing", version = "0.3"} +iml-wire-types = {path = "../iml-wire-types", version = "0.4", features = ["postgres-interop"]} +petgraph = "0.5" +serde = {version = "1", features = ["derive"]} +serde_json = "1" +thiserror = "1.0" +tokio = "0.2" +uuid = {version = "0.8", features = ["v4"]} diff --git a/iml-state-machine/README.md b/iml-state-machine/README.md new file mode 100644 index 0000000000..d34fe2fcfd --- /dev/null +++ b/iml-state-machine/README.md @@ -0,0 +1,27 @@ +# IML State Machine Model + +``` + ┌────────────────────────────────────────────┐ + │ │ + │ Client mount │ + │ │ + │ │ + └────────────────────────────────────────────┘ + │ + │ + │ + │ + │ + ┌───────Depends on─────┴────────Depends On─────────┐ + │ │ + │ │ + │ │ + │ │ + ▼ ▼ +┌────────────────────────────────────────────┐ ┌────────────────────────────────────────────┐ +│ │ │ │ +│ LNet │ │ Filesystem │ +│ │ │ │ +│ │ │ │ +└────────────────────────────────────────────┘ └────────────────────────────────────────────┘ +``` diff --git a/iml-state-machine/src/command.rs b/iml-state-machine/src/command.rs new file mode 100644 index 0000000000..56b90555e9 --- /dev/null +++ b/iml-state-machine/src/command.rs @@ -0,0 +1,191 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{job::Job, step::Steps, Error}; +use future::Aborted; +use futures::future::{self, abortable}; +use futures::{future::AbortHandle, lock::Mutex}; +use iml_action_client::Client; +use iml_postgres::{sqlx, PgPool}; +use iml_tracing::tracing; +use iml_wire_types::{ + state_machine, + state_machine::{Command, CommandRecord, CurrentState}, +}; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tokio::time; +use uuid::Uuid; + +/* +1. Create command +2. Create jobs +3. Add jobs to command +3. push jobs into job_queue +4. Run each job in separate task, accounting for any dependendent jobs that came before. +*/ + +pub enum JobState { + Pending, + Running(Option), +} + +impl JobState { + fn is_pending(&self) -> bool { + match self { + Self::Pending => true, + Self::Running(_) => false, + } + } + fn is_running(&self) -> bool { + !self.is_pending() + } +} + +pub type JobStates = Arc>>; + +pub async fn run_command( + pool: &PgPool, + job_states: &JobStates, + cmd: Command, +) -> Result { + let mut transaction = pool.begin().await?; + + let x = sqlx::query_as!( + CommandRecord, + r#" + INSERT INTO command (message) + VALUES ($1) + RETURNING id, start_time, end_time, state as "state: CurrentState", message, jobs + "#, + cmd.message + ) + .fetch_one(&mut transaction) + .await?; + + for job in cmd.jobs { + let locks = match job.as_record_id() { + Some(x) => vec![serde_json::to_value(x)?], + None => vec![], + }; + + let job_id = sqlx::query!( + r#" + INSERT INTO job (command_id, job, wait_for_jobs, locked_records) + VALUES ($1, $2, array[]::int[], $3) + RETURNING id + "#, + x.id, + serde_json::to_value(&job)?, + &locks + ) + .fetch_one(&mut transaction) + .await? + .id; + + job_states + .lock() + .await + .insert(job_id, (job, JobState::Pending)); + } + + transaction.commit().await?; + + Ok(x) +} + +pub async fn run_jobs(client: Client, pool: PgPool, job_states: JobStates) { + loop { + let job_states = Arc::clone(&job_states); + + let xs: HashMap = { + let mut x = job_states.lock().await; + + x.iter_mut() + .filter_map(|(k, (job, state))| { + if state.is_pending() { + Some((*k, job.get_steps())) + } else { + None + } + }) + .collect() + }; + + for (k, steps) in xs { + let client = client.clone(); + let job_states = Arc::clone(&job_states); + let pool = pool.clone(); + + tokio::spawn(async move { + let r = run_steps(client, pool.clone(), k, steps, Arc::clone(&job_states)).await; + + let mut lock = job_states.lock().await; + + lock.remove(&k); + + let end_state = match r { + Ok(_) => state_machine::CurrentState::Succeeded, + Err(Error::Aborted(_)) => state_machine::CurrentState::Cancelled, + Err(e) => state_machine::CurrentState::Failed, + }; + + sqlx::query!( + r#" + UPDATE job + SET + state = $1, + end_time = now() + "#, + end_state as state_machine::CurrentState + ) + .execute(&pool) + .await; + }); + } + + time::delay_for(Duration::from_secs(1)).await + } +} + +async fn run_steps( + client: Client, + pool: PgPool, + job_id: i32, + steps: Steps, + job_states: JobStates, +) -> Result<(), Error> { + for (f, args) in steps.0 { + let fut = f(pool.clone(), args); + let (fqdn, action, args) = fut.await?; + + let uuid = Uuid::new_v4(); + + let fut = client.invoke_rust_agent(fqdn.to_string(), action, args, &uuid); + let (fut, h) = abortable(fut); + + { + let mut lock = job_states.lock().await; + + let (job, _) = lock.remove(&job_id).unwrap(); + + lock.insert(job_id, (job, JobState::Running(Some(h)))); + } + + match fut.await { + Err(Aborted) => { + let r = client.cancel_request(fqdn, &uuid).await; + + return Err(Error::Aborted(Aborted)); + } + Ok(Err(e)) => { + tracing::error!("Step failed: {:?}", e); + + return Err(e.into()); + } + Ok(Ok(x)) => tracing::info!("Got {:?}", x), + }; + } + + Ok(()) +} diff --git a/iml-state-machine/src/graph.rs b/iml-state-machine/src/graph.rs new file mode 100644 index 0000000000..2d28d79886 --- /dev/null +++ b/iml-state-machine/src/graph.rs @@ -0,0 +1,229 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use iml_wire_types::state_machine::{snapshot, Edge, State, Transition}; +use petgraph::{ + algo::astar, + graph::{DiGraph, NodeIndex}, + prelude::*, + visit::{EdgeFiltered, IntoNeighborsDirected}, + Direction, +}; +use std::collections::HashSet; + +trait GraphExt { + fn find_node_idx(&self, x: &N) -> Option; +} + +impl GraphExt for Graph { + fn find_node_idx(&self, x: &N) -> Option { + self.node_indices().find(|i| &self[*i] == x) + } +} + +pub type StateGraph = DiGraph; + +pub trait StateGraphExt { + /// Get the node cooresponding to the current state, if one exists. + fn get_state_node(&self, state: impl Into) -> Option; + /// Get the available `Transition`s for this NodeIndex. + /// + /// A `Transition` is available iff it's cooresponding state + /// and all dependendant states can be satisfied. + fn get_available_transitions(&self, n: NodeIndex) -> HashSet; + fn get_transition_path( + &self, + start_state: impl Into, + transition: impl Into, + ) -> Option>; +} + +impl StateGraphExt for StateGraph { + fn get_state_node(&self, state: impl Into) -> Option { + self.find_node_idx(&state.into()) + } + fn get_available_transitions(&self, n: NodeIndex) -> HashSet { + let graph = EdgeFiltered::from_fn(&self, |x| x.weight().is_transition()); + + let mut transitions = HashSet::new(); + + let mut dfs = Dfs::new(&graph, n); + + let mut seen = HashSet::new(); + seen.insert(n); + + while let Some(from_node) = dfs.next(&self) { + let mut neighbors = graph + .neighbors_directed(from_node, Direction::Outgoing) + .into_iter(); + + while let Some(to_node) = neighbors.next() { + if seen.contains(&to_node) { + continue; + } + + seen.insert(to_node); + + let ix = self + .find_edge(from_node, to_node) + .expect("Could not find edge"); + + let t = match self[ix] { + Edge::Dependency(_) => { + panic!("Found a `Dependency` in a `Transition` filtered graph."); + } + Edge::Transition(t) => t, + }; + + transitions.insert(t); + } + } + + transitions + } + fn get_transition_path( + &self, + start_state: impl Into, + transition: impl Into, + ) -> Option> { + let start_state_ix = self.get_state_node(start_state)?; + let x = transition.into(); + + let xs = astar( + &self, + start_state_ix, + |finish| { + self.edges_directed(finish, Direction::Incoming) + .any(|edge| edge.weight() == &Edge::Transition(x)) + }, + |_| 1, + |_| 0, + )? + .1; + + let xs = xs.iter().zip(xs.iter().skip(1)).collect::>(); + + let mut out = vec![]; + + for (a, b) in xs { + let e = self.find_edge(*a, *b)?; + + let edge = *&self[e]; + + match edge { + Edge::Dependency(_) => return None, + Edge::Transition(x) => out.push(x), + }; + } + + Some(out) + } +} + +pub fn build_graph() -> StateGraph { + let mut deps = StateGraph::new(); + + let unknown = deps.add_node(snapshot::State::Unknown.into()); + let unmounted = deps.add_node(snapshot::State::Unmounted.into()); + let mounted = deps.add_node(snapshot::State::Mounted.into()); + let removed = deps.add_node(snapshot::State::Removed.into()); + + deps.add_edge(unknown, unmounted, Transition::CreateSnapshot.into()); + + deps.add_edge(unmounted, mounted, Transition::MountSnapshot.into()); + + deps.add_edge(mounted, unmounted, Transition::UnmountSnapshot.into()); + + deps.add_edge(unmounted, removed, Transition::RemoveSnapshot.into()); + deps.add_edge(mounted, removed, Transition::RemoveSnapshot.into()); + + deps +} + +#[cfg(test)] +pub mod test { + use super::*; + use iml_wire_types::state_machine::snapshot; + use petgraph::dot::Dot; + + #[test] + fn get_snapshot_mount_transitions() { + let graph = build_graph(); + + let ix = graph.get_state_node(snapshot::State::Mounted).unwrap(); + + let xs = graph.get_available_transitions(ix); + + assert_eq!( + xs, + vec![ + Transition::RemoveSnapshot.into(), + Transition::UnmountSnapshot.into(), + ] + .into_iter() + .collect() + ); + } + + #[test] + fn get_snapshot_unmount_transitions() { + let graph = build_graph(); + + let ix = graph.get_state_node(snapshot::State::Unmounted).unwrap(); + + let xs = graph.get_available_transitions(ix); + + assert_eq!( + xs, + vec![ + Transition::RemoveSnapshot.into(), + Transition::MountSnapshot.into(), + ] + .into_iter() + .collect() + ); + } + + #[test] + fn get_snapshot_remove_transitions() { + let graph = build_graph(); + + let ix = graph.get_state_node(snapshot::State::Removed).unwrap(); + + let xs = graph.get_available_transitions(ix); + + assert_eq!(xs, vec![].into_iter().collect()); + } + + #[test] + fn get_snapshot_mount_remove_transition() { + let graph = build_graph(); + + let xs = graph + .get_transition_path(snapshot::State::Mounted, Transition::RemoveSnapshot) + .unwrap(); + + assert_eq!(xs, vec![Transition::RemoveSnapshot.into()]); + } + + #[test] + fn get_snapshot_mount_unmount_transition() { + let graph = build_graph(); + + let xs = graph + .get_transition_path(snapshot::State::Mounted, Transition::UnmountSnapshot) + .unwrap(); + + assert_eq!(xs, vec![Transition::UnmountSnapshot.into()]); + } + + #[test] + fn show_dotviz() { + let graph = build_graph(); + + let dot = Dot::with_config(&graph, &[]); + + eprintln!("graph {:?}", dot); + } +} diff --git a/iml-state-machine/src/job.rs b/iml-state-machine/src/job.rs new file mode 100644 index 0000000000..7aafe3875d --- /dev/null +++ b/iml-state-machine/src/job.rs @@ -0,0 +1,53 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{ + snapshot::create_snapshot, snapshot::destroy_snapshot, snapshot::mount_snapshot, + snapshot::unmount_snapshot, step::Steps, +}; +use iml_wire_types::{state_machine, state_machine::Transition, warp_drive::RecordId}; + +pub trait Job { + fn as_record_id(&self) -> Option { + None + } + /// The steps that need to be run to complete this job. + /// Steps run serially and can be cancelled. + /// Cancelling a step cancels all further steps in the series, + /// and also cancels all dependendant jobs. + fn get_steps(&self) -> Steps; + fn get_transition(&self) -> Option { + None + } +} + +impl Job for state_machine::Job { + fn get_steps(&self) -> Steps { + match self { + Self::CreateSnapshotJob(x) => { + Steps::default().add_remote_step(create_snapshot, x.clone()) + } + Self::MountSnapshotJob(x) => { + Steps::default().add_remote_step(mount_snapshot, x.clone()) + } + Self::UnmountSnapshotJob(x) => { + Steps::default().add_remote_step(unmount_snapshot, x.clone()) + } + Self::RemoveSnapshotJob(x) => { + Steps::default().add_remote_step(destroy_snapshot, x.clone()) + } + } + } + fn get_transition(&self) -> Option { + match self { + Self::CreateSnapshotJob(_) => Some(Transition::CreateSnapshot.into()), + Self::MountSnapshotJob(_) => Some(Transition::MountSnapshot.into()), + Self::UnmountSnapshotJob(_) => Some(Transition::UnmountSnapshot.into()), + Self::RemoveSnapshotJob(_) => Some(Transition::RemoveSnapshot.into()), + } + } + fn as_record_id(&self) -> Option { + None + } +} diff --git a/iml-state-machine/src/lib.rs b/iml-state-machine/src/lib.rs new file mode 100644 index 0000000000..2ad0ce5ff3 --- /dev/null +++ b/iml-state-machine/src/lib.rs @@ -0,0 +1,26 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +mod command; +pub mod graph; +mod job; +mod snapshot; +mod step; + +pub use command::{run_command, run_jobs, JobStates}; +use futures::future::Aborted; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Aborted(#[from] Aborted), + #[error("State Not Found")] + NotFound, + #[error(transparent)] + SerdeJsonError(#[from] serde_json::Error), + #[error(transparent)] + ImlActionClientError(#[from] iml_action_client::ImlActionClientError), + #[error(transparent)] + SqlxError(#[from] iml_postgres::sqlx::Error), +} diff --git a/iml-state-machine/src/lnet.rs b/iml-state-machine/src/lnet.rs new file mode 100644 index 0000000000..db76238b82 --- /dev/null +++ b/iml-state-machine/src/lnet.rs @@ -0,0 +1,88 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use futures::{Future, FutureExt}; +use petgraph::graph::DiGraph; +use std::{ io, pin::Pin}; + +pub enum LnetStates { + Unconfigured, + Unloaded, + Down, + Up, +} + +impl Default for LnetStates { + fn default() -> Self { + Self::Unconfigured + } +} + +impl LnetStates { + fn step(self, next: &Self) { + match (self, next) { + (Self::Unconfigured, Self::Unloaded) => {} + (Self::Unloaded, Self::Down) => {} + (Self::Down, Self::Up) => {} + (Self::Up, Self::Down) => {} + (Self::Down, Self::Unloaded) => {} + (Self::Unloaded, Self::Unconfigured) => {} + _ => {} + }; + } +} + +async fn configure() -> Result<(), io::Error> { + Ok(()) +} + +async fn load() -> Result<(), io::Error> { + Ok(()) +} + +async fn start() -> Result<(), io::Error> { + Ok(()) +} + +async fn stop() -> Result<(), io::Error> { + Ok(()) +} + +async fn unload() -> Result<(), io::Error> { + Ok(()) +} + +async fn unconfigure() -> Result<(), io::Error> { + Ok(()) +} + +type BoxedFuture = Pin> + Send>>; + +type Transition = Box BoxedFuture + Send + Sync>; + +fn mk_transition(f: fn() -> Fut) -> Transition +where + Fut: Future> + Send + 'static, +{ + Box::new(move || f().boxed()) +} + +fn build_graph() -> DiGraph:: { + let mut deps = DiGraph::::new(); + + let unconfigured = deps.add_node(LnetStates::Unconfigured); + let unloaded = deps.add_node(LnetStates::Unloaded); + let down = deps.add_node(LnetStates::Down); + let up = deps.add_node(LnetStates::Up); + + deps.add_edge(unconfigured, unloaded, mk_transition(configure)); + deps.add_edge(unloaded, down, mk_transition(load)); + deps.add_edge(down, up, mk_transition(start)); + deps.add_edge(up, down, mk_transition(stop)); + deps.add_edge(down, unloaded, mk_transition(unload)); + deps.add_edge(unloaded, unconfigured, mk_transition(unconfigure)); + + deps + +} diff --git a/iml-state-machine/src/snapshot.rs b/iml-state-machine/src/snapshot.rs new file mode 100644 index 0000000000..6fc8ad87c8 --- /dev/null +++ b/iml-state-machine/src/snapshot.rs @@ -0,0 +1,41 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::Error; +use iml_postgres::{active_mgs_host_fqdn, PgPool}; +use iml_wire_types::{ + snapshot::{Create, Destroy, Mount, Unmount}, + Fqdn, +}; + +pub async fn mount_snapshot(pool: PgPool, x: Mount) -> Result<(Fqdn, String, Mount), Error> { + let fqdn = get_active_mgs_or_fail(&pool, &x.fsname).await?; + + Ok((fqdn, "snapshot_mount".to_string(), x)) +} + +pub async fn unmount_snapshot(pool: PgPool, x: Unmount) -> Result<(Fqdn, String, Unmount), Error> { + let fqdn = get_active_mgs_or_fail(&pool, &x.fsname).await?; + + Ok((fqdn, "snapshot_unmount".to_string(), x)) +} + +pub async fn destroy_snapshot(pool: PgPool, x: Destroy) -> Result<(Fqdn, String, Destroy), Error> { + let fqdn = get_active_mgs_or_fail(&pool, &x.fsname).await?; + + Ok((fqdn, "snapshot_destroy".to_string(), x)) +} + +pub async fn create_snapshot(pool: PgPool, x: Create) -> Result<(Fqdn, String, Create), Error> { + let fqdn = get_active_mgs_or_fail(&pool, &x.fsname).await?; + + Ok((fqdn, "snapshot_create".to_string(), x)) +} + +async fn get_active_mgs_or_fail(pool: &PgPool, fsname: &str) -> Result { + match active_mgs_host_fqdn(fsname, pool).await? { + Some(x) => Ok(Fqdn(x)), + None => Err(Error::NotFound), + } +} diff --git a/iml-state-machine/src/step.rs b/iml-state-machine/src/step.rs new file mode 100644 index 0000000000..bb0da43be8 --- /dev/null +++ b/iml-state-machine/src/step.rs @@ -0,0 +1,59 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::Error; +use futures::{future, Future, TryFutureExt}; +use iml_postgres::PgPool; +use iml_wire_types::Fqdn; +use std::pin::Pin; + +type BoxedFuture = + Pin> + Send>>; + +type Step = Box) -> BoxedFuture + Send>; + +fn mk_step(f: fn(PgPool, T) -> Fut) -> Step +where + T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static, + Fut: Future> + Send + 'static, +{ + Box::new(move |p, x| { + let x = match x.and_then(|v| serde_json::from_value(v)) { + Ok(x) => x, + Err(e) => { + return Box::pin(future::err(e.into())); + } + }; + + let fut = f(p, x); + + let fut = fut.err_into().and_then(|(fqdn, action, x)| async { + let x = serde_json::to_value(x)?; + + Ok((fqdn, action, x)) + }); + + Box::pin(fut) + }) +} + +pub struct Steps(pub Vec<(Step, Result)>); + +impl Default for Steps { + fn default() -> Self { + Steps(vec![]) + } +} + +impl Steps { + pub fn add_remote_step(mut self, f: fn(PgPool, T) -> Fut, args: T) -> Self + where + T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static, + Fut: Future> + Send + 'static, + { + self.0.push((mk_step(f), serde_json::to_value(args))); + + self + } +} diff --git a/iml-warp-drive/Cargo.toml b/iml-warp-drive/Cargo.toml index 3c622af970..65571856df 100644 --- a/iml-warp-drive/Cargo.toml +++ b/iml-warp-drive/Cargo.toml @@ -7,14 +7,17 @@ version = "0.4.0" [dependencies] futures = "0.3" im = {version = "15.0", features = ["serde"]} +iml-action-client = {path = "../iml-action-client", version = "0.1"} iml-manager-client = {path = "../iml-manager-client", version = "0.4"} iml-manager-env = {path = "../iml-manager-env", version = "0.4"} iml-postgres = {path = "../iml-postgres", version = "0.4"} iml-rabbit = {path = "../iml-rabbit", version = "0.4"} +iml-state-machine = {path = "../iml-state-machine", version = "0.1"} iml-tracing = {version = "0.3", path = "../iml-tracing"} iml-wire-types = {path = "../iml-wire-types", version = "0.4", features = ["postgres-interop"]} serde = {version = "1", features = ["derive"]} serde_json = "1.0" +thiserror = "1.0" tokio = {version = "0.2", features = ["macros", "rt-threaded"]} tokio-runtime-shutdown = {path = "../tokio-runtime-shutdown", version = "0.4"} tracing = "0.1" diff --git a/iml-warp-drive/src/error.rs b/iml-warp-drive/src/error.rs index 6f987d1a04..543306990f 100644 --- a/iml-warp-drive/src/error.rs +++ b/iml-warp-drive/src/error.rs @@ -7,67 +7,26 @@ use iml_postgres::DbError; use iml_rabbit::ImlRabbitError; use warp::reject; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum ImlWarpDriveError { - ImlRabbitError(ImlRabbitError), - ImlManagerClientError(ImlManagerClientError), - TokioPostgresError(iml_postgres::Error), + #[error(transparent)] + ImlRabbitError(#[from] ImlRabbitError), + #[error(transparent)] + ImlManagerClientError(#[from] ImlManagerClientError), + #[error(transparent)] + StateMachineError(#[from] iml_state_machine::Error), + #[error(transparent)] + TokioPostgresError(#[from] iml_postgres::Error), + #[error(transparent)] DbError(Box), - SerdeJsonError(serde_json::error::Error), + #[error(transparent)] + SerdeJsonError(#[from] serde_json::error::Error), } impl reject::Reject for ImlWarpDriveError {} -impl std::fmt::Display for ImlWarpDriveError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match *self { - ImlWarpDriveError::ImlRabbitError(ref err) => write!(f, "{}", err), - ImlWarpDriveError::ImlManagerClientError(ref err) => write!(f, "{}", err), - ImlWarpDriveError::TokioPostgresError(ref err) => write!(f, "{}", err), - ImlWarpDriveError::DbError(ref err) => write!(f, "{}", err), - ImlWarpDriveError::SerdeJsonError(ref err) => write!(f, "{}", err), - } - } -} - -impl std::error::Error for ImlWarpDriveError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match *self { - ImlWarpDriveError::ImlRabbitError(ref err) => Some(err), - ImlWarpDriveError::ImlManagerClientError(ref err) => Some(err), - ImlWarpDriveError::TokioPostgresError(ref err) => Some(err), - ImlWarpDriveError::DbError(ref err) => Some(err), - ImlWarpDriveError::SerdeJsonError(ref err) => Some(err), - } - } -} - -impl From for ImlWarpDriveError { - fn from(err: ImlRabbitError) -> Self { - ImlWarpDriveError::ImlRabbitError(err) - } -} - -impl From for ImlWarpDriveError { - fn from(err: ImlManagerClientError) -> Self { - ImlWarpDriveError::ImlManagerClientError(err) - } -} - impl From for ImlWarpDriveError { fn from(err: DbError) -> Self { ImlWarpDriveError::DbError(Box::new(err)) } } - -impl From for ImlWarpDriveError { - fn from(err: iml_postgres::Error) -> Self { - ImlWarpDriveError::TokioPostgresError(err) - } -} - -impl From for ImlWarpDriveError { - fn from(err: serde_json::error::Error) -> Self { - ImlWarpDriveError::SerdeJsonError(err) - } -} diff --git a/iml-warp-drive/src/lib.rs b/iml-warp-drive/src/lib.rs index 6f281ca962..9ebedb81d9 100644 --- a/iml-warp-drive/src/lib.rs +++ b/iml-warp-drive/src/lib.rs @@ -7,7 +7,9 @@ pub mod db_record; pub mod error; pub mod listen; pub mod locks; +pub mod messaging; pub mod request; +pub mod state_machine; pub mod users; pub use db_record::*; diff --git a/iml-warp-drive/src/locks.rs b/iml-warp-drive/src/locks.rs index 92369940f7..df996e128a 100644 --- a/iml-warp-drive/src/locks.rs +++ b/iml-warp-drive/src/locks.rs @@ -3,7 +3,7 @@ // license that can be found in the LICENSE file. use crate::request::Request; -use futures::{Stream, TryStreamExt}; +use futures::{lock::Mutex, Stream, TryStreamExt}; use im::{HashMap, HashSet}; use iml_rabbit::{ basic_consume, basic_publish, bind_queue, declare_transient_exchange, declare_transient_queue, @@ -11,6 +11,9 @@ use iml_rabbit::{ Queue, }; use iml_wire_types::{LockAction, LockChange, ToCompositeId}; +use std::sync::Arc; + +pub type SharedLocks = Arc>; /// Declares the exchange for rpc comms async fn declare_rpc_exchange(c: &Channel) -> Result<(), ImlRabbitError> { diff --git a/iml-warp-drive/src/main.rs b/iml-warp-drive/src/main.rs index 019d1e1431..86e5183e33 100644 --- a/iml-warp-drive/src/main.rs +++ b/iml-warp-drive/src/main.rs @@ -8,15 +8,13 @@ use iml_postgres::get_db_pool; use iml_warp_drive::{ cache::{populate_from_api, populate_from_db, SharedCache}, error, listen, - locks::{self, create_locks_consumer, Locks}, + locks::{self, create_locks_consumer, SharedLocks}, users, }; use iml_wire_types::warp_drive::{Cache, Message}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use warp::Filter; -type SharedLocks = Arc>; - #[tokio::main] async fn main() -> Result<(), Box> { iml_tracing::init(); @@ -29,6 +27,8 @@ async fn main() -> Result<(), Box> { let api_cache_state: SharedCache = Arc::new(Mutex::new(Cache::default())); + let job_states = Arc::new(Mutex::new(HashMap::new())); + // Clone here to allow SSE route to get a ref. let user_state2 = Arc::clone(&user_state); let lock_state2 = Arc::clone(&lock_state); @@ -79,11 +79,15 @@ async fn main() -> Result<(), Box> { tracing::info!("Started listening to NOTIFY events"); - { - let pool = get_db_pool(2).await?; + let pg_pool = get_db_pool(4).await?; - populate_from_db(Arc::clone(&api_cache_state3), &pool).await?; - } + tokio::spawn(iml_state_machine::run_jobs( + iml_action_client::Client::default(), + pg_pool.clone(), + Arc::clone(&job_states), + )); + + populate_from_db(Arc::clone(&api_cache_state3), &pg_pool).await?; let pool = iml_rabbit::connect_to_rabbit(1); @@ -138,42 +142,22 @@ async fn main() -> Result<(), Box> { }), ); - // GET -> messages stream - let routes = warp::get() - .and(warp::any().map(move || Arc::clone(&user_state2))) - .and(warp::any().map(move || Arc::clone(&lock_state2))) - .and(warp::any().map(move || Arc::clone(&api_cache_state2))) - .and_then( - |users: users::SharedUsers, locks: SharedLocks, api_cache: SharedCache| { - tracing::debug!("Inside user route"); - - async move { - // reply using server-sent events - let stream = users::user_connected( - users, - locks.lock().await.clone(), - api_cache.lock().await.clone(), - ) - .await; - - Ok::<_, error::ImlWarpDriveError>(warp::sse::reply( - warp::sse::keep_alive().stream(stream), - )) - } - .map_err(warp::reject::custom) - }, - ) - .with(warp::log("iml-warp-drive::api")); - let addr = iml_manager_env::get_warp_drive_addr(); tracing::info!("Listening on {}", addr); - let (_, fut) = warp::serve(routes).bind_with_graceful_shutdown( - addr, - tokio_runtime_shutdown::when_finished(&valve) - .then(move |_| users::disconnect_all_users(user_state3)), - ); + let messaging_route = + iml_warp_drive::messaging::route(user_state2, Arc::clone(&api_cache_state2), lock_state2); + + let state_machine_routes = + iml_warp_drive::state_machine::route(api_cache_state2, job_states, pg_pool); + + let (_, fut) = warp::serve(messaging_route.or(state_machine_routes)) + .bind_with_graceful_shutdown( + addr, + tokio_runtime_shutdown::when_finished(&valve) + .then(move |_| users::disconnect_all_users(user_state3)), + ); fut.await; diff --git a/iml-warp-drive/src/messaging.rs b/iml-warp-drive/src/messaging.rs new file mode 100644 index 0000000000..12b7db81d4 --- /dev/null +++ b/iml-warp-drive/src/messaging.rs @@ -0,0 +1,41 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{cache, error, locks::SharedLocks, users}; +use futures::TryFutureExt; +use std::sync::Arc; +use warp::Filter; + +pub fn route( + user_state: users::SharedUsers, + api_cache_state: cache::SharedCache, + lock_state: SharedLocks, +) -> impl Filter + Clone { + warp::path("messaging") + .and(warp::get()) + .and(warp::any().map(move || Arc::clone(&user_state))) + .and(warp::any().map(move || Arc::clone(&lock_state))) + .and(warp::any().map(move || Arc::clone(&api_cache_state))) + .and_then( + |users: users::SharedUsers, locks: SharedLocks, api_cache: cache::SharedCache| { + tracing::debug!("Inside messaging route"); + + async move { + // reply using server-sent events + let stream = users::user_connected( + users, + locks.lock().await.clone(), + api_cache.lock().await.clone(), + ) + .await; + + Ok::<_, error::ImlWarpDriveError>(warp::sse::reply( + warp::sse::keep_alive().stream(stream), + )) + } + .map_err(warp::reject::custom) + }, + ) + .with(warp::log("iml-warp-drive::messaging")) +} diff --git a/iml-warp-drive/src/state_machine.rs b/iml-warp-drive/src/state_machine.rs new file mode 100644 index 0000000000..86e2abca51 --- /dev/null +++ b/iml-warp-drive/src/state_machine.rs @@ -0,0 +1,95 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{cache, error::ImlWarpDriveError}; +use iml_postgres::PgPool; +use iml_state_machine::{graph::StateGraphExt, run_command, JobStates}; +use iml_wire_types::{ + state_machine::{Command, Transition}, + warp_drive::RecordId, +}; +use std::sync::Arc; +use warp::Filter; + +pub fn route( + shared_cache: cache::SharedCache, + job_states: JobStates, + pg_pool: PgPool, +) -> impl Filter + Clone { + let route = warp::path("state_machine"); + + let shared_cache_filter = warp::any().map(move || Arc::clone(&shared_cache)); + + let get_transitions_route = route + .clone() + .and(shared_cache_filter.clone()) + .and(warp::path("get_transitions")) + .and(warp::path::end()) + .and(warp::post()) + .and(warp::body::json()) + .and_then( + |shared_cache: cache::SharedCache, record_id: RecordId| async move { + tracing::debug!("Inside state_machine route"); + + let cache = shared_cache.lock().await; + + let g = iml_state_machine::graph::build_graph(); + + let x = record_id + .to_state(&cache) + .and_then(|x| g.get_state_node(x)) + .map(|x| g.get_available_transitions(x)) + .unwrap_or_default(); + + Ok::<_, warp::Rejection>(warp::reply::json(&x)) + }, + ); + + let get_transition_path_route = + route + .clone() + .and(shared_cache_filter) + .and(warp::path("get_transition_path")) + .and(warp::path::end()) + .and(warp::post()) + .and(warp::body::json()) + .and_then( + |shared_cache: cache::SharedCache, + (record_id, transition): (RecordId, Transition)| async move { + let cache = shared_cache.lock().await; + + let g = iml_state_machine::graph::build_graph(); + + let xs = record_id + .to_state(&cache) + .and_then(|x| g.get_transition_path(x, transition)) + .unwrap_or_default(); + + Ok::<_, warp::Rejection>(warp::reply::json(&xs)) + }, + ); + + let run_command_route = route + .clone() + .and(warp::path("run_command")) + .and(warp::path::end()) + .and(warp::post()) + .and(warp::any().map(move || pg_pool.clone())) + .and(warp::any().map(move || Arc::clone(&job_states))) + .and(warp::body::json()) + .and_then( + |pg_pool: PgPool, job_states: JobStates, command: Command| async move { + let cmd = run_command(&pg_pool, &job_states, command) + .await + .map_err(ImlWarpDriveError::StateMachineError) + .map_err(warp::reject::custom)?; + + Ok::<_, warp::Rejection>(warp::reply::json(&cmd)) + }, + ); + + get_transitions_route + .or(get_transition_path_route) + .or(run_command_route) +} diff --git a/iml-wire-types/src/graphql_duration.rs b/iml-wire-types/src/graphql_duration.rs index a31bb8016e..ebfe2e5a36 100644 --- a/iml-wire-types/src/graphql_duration.rs +++ b/iml-wire-types/src/graphql_duration.rs @@ -25,7 +25,7 @@ where juniper::Value::scalar(humantime::format_duration(self.0).to_string()) } - fn from_input_value(value: &juniper::InputValue) -> Option { + fn from_input_value(value: &juniper::InputValue) -> Option { value.as_string_value()?.to_string().try_into().ok() } diff --git a/iml-wire-types/src/lib.rs b/iml-wire-types/src/lib.rs index cbbd9c79c6..3e0783b8cd 100644 --- a/iml-wire-types/src/lib.rs +++ b/iml-wire-types/src/lib.rs @@ -8,6 +8,7 @@ pub mod graphql_duration; pub mod high_availability; pub mod sfa; pub mod snapshot; +pub mod state_machine; pub mod stratagem; pub mod task; pub mod warp_drive; diff --git a/iml-wire-types/src/snapshot.rs b/iml-wire-types/src/snapshot.rs index 5c53cb11a9..7bbe93cf5b 100644 --- a/iml-wire-types/src/snapshot.rs +++ b/iml-wire-types/src/snapshot.rs @@ -7,6 +7,7 @@ use crate::{ db::{Id, TableName}, graphql_duration::GraphQLDuration, + warp_drive::RecordId, }; use chrono::{offset::Utc, DateTime}; use std::str::FromStr; @@ -56,6 +57,12 @@ impl Id for SnapshotRecord { } } +impl From<&SnapshotRecord> for RecordId { + fn from(x: &SnapshotRecord) -> Self { + Self::Snapshot(x.id) + } +} + pub const SNAPSHOT_TABLE_NAME: TableName = TableName("snapshot"); #[cfg_attr(feature = "graphql", derive(juniper::GraphQLObject))] @@ -131,7 +138,7 @@ impl FromStr for ReserveUnit { } } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to create a snapshot pub struct Create { @@ -147,7 +154,7 @@ pub struct Create { pub comment: Option, } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to destroy the snapshot pub struct Destroy { @@ -161,7 +168,7 @@ pub struct Destroy { pub force: bool, } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to mount the snapshot pub struct Mount { @@ -171,7 +178,7 @@ pub struct Mount { pub name: String, } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to unmount the snapshot pub struct Unmount { diff --git a/iml-wire-types/src/state_machine.rs b/iml-wire-types/src/state_machine.rs new file mode 100644 index 0000000000..904bdb03a7 --- /dev/null +++ b/iml-wire-types/src/state_machine.rs @@ -0,0 +1,128 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::snapshot::{Create, Destroy, Mount, Unmount}; +use chrono::Utc; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLEnum))] +pub enum Transition { + CreateSnapshot, + MountSnapshot, + UnmountSnapshot, + RemoveSnapshot, +} + +impl Transition { + pub fn description(&self) -> &str { + match self { + Self::CreateSnapshot => "Create Snapshot", + Self::MountSnapshot => "Mount snapshot", + Self::UnmountSnapshot => "Unmount snapshot", + Self::RemoveSnapshot => "Remove snapshot", + } + } +} + +impl From for Edge { + fn from(x: Transition) -> Edge { + Edge::Transition(x) + } +} + +#[derive( + serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, +)] +pub enum State { + Snapshot(snapshot::State), +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Edge { + Transition(Transition), + Dependency(State), +} + +impl Edge { + pub fn is_transition(&self) -> bool { + match self { + Self::Transition(_) => true, + Self::Dependency(_) => false, + } + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +pub enum Job { + CreateSnapshotJob(Create), + MountSnapshotJob(Mount), + UnmountSnapshotJob(Unmount), + RemoveSnapshotJob(Destroy), +} + +#[cfg_attr(feature = "postgres-interop", derive(sqlx::Type))] +#[cfg_attr(feature = "postgres-interop", sqlx(rename = "machine_state"))] +#[cfg_attr(feature = "postgres-interop", sqlx(rename_all = "lowercase"))] +#[derive(serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Debug)] +#[serde(rename_all = "lowercase")] +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLEnum))] +pub enum CurrentState { + Pending, + Progress, + Failed, + Succeeded, + Cancelled, +} + +#[derive(serde::Serialize, serde::Deserialize)] +pub struct Command { + pub message: String, + pub jobs: Vec, +} + +#[derive(serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLObject))] +pub struct CommandRecord { + pub id: i32, + pub start_time: chrono::DateTime, + pub end_time: Option>, + pub state: CurrentState, + pub message: String, + pub jobs: Vec, +} + +pub mod snapshot { + use crate::state_machine; + + #[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Clone, + Copy, + Eq, + PartialEq, + Ord, + PartialOrd, + Hash, + )] + pub enum State { + Unknown, + Unmounted, + Mounted, + Removed, + } + + impl Default for State { + fn default() -> Self { + Self::Unknown + } + } + + impl From for state_machine::State { + fn from(x: State) -> state_machine::State { + state_machine::State::Snapshot(x) + } + } +} diff --git a/iml-wire-types/src/warp_drive.rs b/iml-wire-types/src/warp_drive.rs index 629e6e9939..e8ca328461 100644 --- a/iml-wire-types/src/warp_drive.rs +++ b/iml-wire-types/src/warp_drive.rs @@ -11,6 +11,7 @@ use crate::{ }, sfa::{SfaController, SfaDiskDrive, SfaEnclosure, SfaJob, SfaPowerSupply, SfaStorageSystem}, snapshot::{SnapshotInterval, SnapshotRecord, SnapshotRetention}, + state_machine::{self, State}, Alert, CompositeId, EndpointNameSelf, Filesystem, Host, Label, LockChange, Target, TargetConfParam, ToCompositeId, }; @@ -658,6 +659,96 @@ impl Deref for RecordId { } } +impl RecordId { + pub fn to_state(&self, cache: &Cache) -> Option { + match self { + RecordId::Snapshot(id) => { + let snap = cache.snapshot.get(&id)?; + + let snap = if snap.mounted == Some(true) { + State::Snapshot(state_machine::snapshot::State::Mounted) + } else { + State::Snapshot(state_machine::snapshot::State::Unmounted) + }; + + Some(snap) + } + _ => None, + } + } +} + +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLEnum))] +#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)] +pub enum RecordType { + ActiveAlert, + ContentType, + CorosyncConfiguration, + Filesystem, + Group, + Host, + LnetConfiguration, + ManagedTargetMount, + OstPool, + OstPoolOsts, + PacemakerConfiguration, + SfaDiskDrive, + SfaEnclosure, + SfaStorageSystem, + SfaJob, + SfaPowerSupply, + SfaController, + StratagemConfig, + Snapshot, + SnapshotInterval, + SnapshotRetention, + Target, + User, + UserGroup, + Volume, + VolumeNode, +} + +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLInputObject))] +#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)] +pub struct GraphqlRecordId { + pub r#type: RecordType, + pub id: i32, +} + +impl From for RecordId { + fn from(GraphqlRecordId { r#type, id }: GraphqlRecordId) -> Self { + match r#type { + RecordType::ActiveAlert => RecordId::ActiveAlert(id), + RecordType::ContentType => RecordId::ContentType(id), + RecordType::CorosyncConfiguration => RecordId::CorosyncConfiguration(id), + RecordType::Filesystem => RecordId::Filesystem(id), + RecordType::Group => RecordId::Group(id), + RecordType::Host => RecordId::Host(id), + RecordType::LnetConfiguration => RecordId::LnetConfiguration(id), + RecordType::ManagedTargetMount => RecordId::ManagedTargetMount(id), + RecordType::OstPool => RecordId::OstPool(id), + RecordType::OstPoolOsts => RecordId::OstPoolOsts(id), + RecordType::PacemakerConfiguration => RecordId::PacemakerConfiguration(id), + RecordType::SfaDiskDrive => RecordId::SfaDiskDrive(id), + RecordType::SfaEnclosure => RecordId::SfaEnclosure(id), + RecordType::SfaStorageSystem => RecordId::SfaStorageSystem(id), + RecordType::SfaJob => RecordId::SfaJob(id), + RecordType::SfaPowerSupply => RecordId::SfaPowerSupply(id), + RecordType::SfaController => RecordId::SfaController(id), + RecordType::StratagemConfig => RecordId::StratagemConfig(id), + RecordType::Snapshot => RecordId::Snapshot(id), + RecordType::SnapshotInterval => RecordId::SnapshotInterval(id), + RecordType::SnapshotRetention => RecordId::SnapshotRetention(id), + RecordType::Target => RecordId::Target(id), + RecordType::User => RecordId::User(id), + RecordType::UserGroup => RecordId::UserGroup(id), + RecordType::Volume => RecordId::Volume(id), + RecordType::VolumeNode => RecordId::VolumeNode(id), + } + } +} + #[allow(clippy::large_enum_variant)] #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(tag = "tag", content = "payload")] diff --git a/migrations/20201026195644_state_machine.sql b/migrations/20201026195644_state_machine.sql new file mode 100644 index 0000000000..458ddb4465 --- /dev/null +++ b/migrations/20201026195644_state_machine.sql @@ -0,0 +1,34 @@ +CREATE TYPE machine_state AS ENUM ( + 'pending', + 'progress', + 'failed', + 'succeeded', + 'cancelled' +); + +CREATE TABLE IF NOT EXISTS command ( + id serial PRIMARY KEY, + start_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + end_time TIMESTAMP WITH TIME ZONE, + state machine_state NOT NULL DEFAULT 'pending', + message TEXT NOT NULL, + jobs int[] NOT NULL DEFAULT array[]::int[] +); + +CREATE TABLE IF NOT EXISTS job ( + id serial PRIMARY KEY, + start_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + end_time TIMESTAMP WITH TIME ZONE, + state machine_state NOT NULL DEFAULT 'pending', + command_id INT NOT NULL REFERENCES command (id) ON DELETE CASCADE, + job jsonb NOT NULL, + wait_for_jobs int[] NOT NULL, + locked_records jsonb[] +); + +CREATE TABLE IF NOT EXISTS step ( + id serial PRIMARY KEY, + start_time TIMESTAMP WITH TIME ZONE NOT NULL, + end_time TIMESTAMP WITH TIME ZONE, + job_id INT NOT NULL REFERENCES job (id) ON DELETE CASCADE +) \ No newline at end of file diff --git a/sqlx-data.json b/sqlx-data.json index c751d3bd41..83fc930bd2 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -333,6 +333,32 @@ ] } }, + "158e5a5a3a32a58a0487c92e8f05ef0bcdc16d6c4ae1e9d3016a66d2836a2f36": { + "query": "SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "filesystem_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "snapshot_name", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false, + false + ] + } + }, "17645262c426038efcc8e22bf999c0d3cee07f52c4e276a1b607e2e00b2e62bd": { "query": "\n SELECT\n id,\n filesystem_name,\n reserve_value,\n reserve_unit as \"reserve_unit:ReserveUnit\",\n last_run,\n keep_num\n FROM snapshot_retention\n ", "describe": { @@ -2657,6 +2683,31 @@ ] } }, + "9557d6d9a74d1ad2d597c1f786dbcc7c6a3be86535a7c781d4a707ae2b88dd00": { + "query": "\n UPDATE job\n SET\n state = $1,\n end_time = now()\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + { + "Custom": { + "name": "machine_state", + "kind": { + "Enum": [ + "pending", + "progress", + "failed", + "succeeded", + "cancelled" + ] + } + } + } + ] + }, + "nullable": [] + } + }, "9a4c05da9d9233e6b3fa63ca2f50cf90feb0c305b1cc05e0eb2edcf2572db4ba": { "query": "select * from chroma_core_volume where not_deleted = 't'", "describe": { @@ -3149,6 +3200,28 @@ ] } }, + "b0795a0f0b333b7bd65f5a336b62463aa30a6c1b4e23079d1057c11fe416fbde": { + "query": "\n INSERT INTO job (command_id, job, wait_for_jobs, locked_records)\n VALUES ($1, $2, array[]::int[], $3)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int4", + "Jsonb", + "JsonbArray" + ] + }, + "nullable": [ + false + ] + } + }, "b0991443ae430ca73d4369f314b88f731ead796ec9ac353c3d237be9203c95bf": { "query": "UPDATE chroma_core_alertstate\n SET active = Null, \"end\" = now()\n WHERE\n active = true\n AND alert_item_id = $1\n AND record_type = ANY($2)\n ", "describe": { @@ -4193,6 +4266,69 @@ "nullable": [] } }, + "ea4c0d5efc5eb5fb0db892f9236e98688edea23676210d0b3e6a29a4c733a9ba": { + "query": "\n INSERT INTO command (message)\n VALUES ($1)\n RETURNING id, start_time, end_time, state as \"state: CurrentState\", message, jobs\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "start_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "end_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "state: CurrentState", + "type_info": { + "Custom": { + "name": "machine_state", + "kind": { + "Enum": [ + "pending", + "progress", + "failed", + "succeeded", + "cancelled" + ] + } + } + } + }, + { + "ordinal": 4, + "name": "message", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "jobs", + "type_info": "Int4Array" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false, + true, + false, + false, + false + ] + } + }, "ec70b9a5caeadc31f5d1359737cc1c6da64e41db8315a81d81420d3b37b182c5": { "query": "\n UPDATE chroma_core_task\n SET running_on_id = $1\n WHERE id = $2\n AND running_on_id is Null", "describe": {