From 3246c5512743d22ec4c9c244b09f6bbc9a036f18 Mon Sep 17 00:00:00 2001 From: Joe Grund Date: Mon, 29 Jun 2020 13:23:18 -0400 Subject: [PATCH 1/2] Create Rust iml-state-machine - Use `warp-drive` `Cache` as a realtime singleton to get the current system state. This acts much the same way as the `job_scheduler` `ObjectCache` does but instead gets realtime updates from the db instead of needing to be notified of changes by other processes. - Use petgraph to build a graph consisting of `State` nodes and `Edge` edges. `Edge` is an enum that is either a `Transition` or a `Dependency`. Add some methods via an Extenstion trait to find transitions / shortest transition paths. - Create a `Job` trait that can either be invoked directly via a `Command`, or indirectly via a `Transition`. - Create a `Steps` struct that holds a list of free fns (much like action plugins). These steps are run serially within a job. - Refactor service address bindings to not coopt the nginx proxy host. - Add an input type for `RecordId`s - Add a http_client to the graphql context - Add graphql query and mutation for statemachine Signed-off-by: Joe Grund --- Cargo.lock | 212 ++++++++------- Cargo.toml | 1 + chroma-manager.conf.template | 11 +- docker/docker-compose.yml | 6 +- iml-action-client/src/lib.rs | 4 +- iml-api/src/graphql/mod.rs | 21 +- iml-api/src/graphql/state_machine.rs | 150 +++++++++++ iml-api/src/main.rs | 6 +- iml-api/src/timer.rs | 9 +- ...nx__tests__replace_template_variables.snap | 11 +- iml-manager-env/src/lib.rs | 46 +++- iml-state-machine/Cargo.toml | 19 ++ iml-state-machine/README.md | 27 ++ iml-state-machine/src/command.rs | 199 ++++++++++++++ iml-state-machine/src/graph.rs | 250 ++++++++++++++++++ iml-state-machine/src/job.rs | 79 ++++++ iml-state-machine/src/lib.rs | 26 ++ iml-state-machine/src/lnet.rs | 88 ++++++ iml-state-machine/src/snapshot.rs | 41 +++ iml-state-machine/src/step.rs | 61 +++++ iml-warp-drive/Cargo.toml | 3 + iml-warp-drive/src/error.rs | 65 +---- iml-warp-drive/src/lib.rs | 2 + iml-warp-drive/src/locks.rs | 5 +- iml-warp-drive/src/main.rs | 64 ++--- iml-warp-drive/src/messaging.rs | 41 +++ iml-warp-drive/src/state_machine.rs | 95 +++++++ iml-wire-types/src/graphql_duration.rs | 2 +- iml-wire-types/src/lib.rs | 1 + iml-wire-types/src/snapshot.rs | 15 +- iml-wire-types/src/state_machine.rs | 128 +++++++++ iml-wire-types/src/warp_drive.rs | 91 +++++++ migrations/20201026195644_state_machine.sql | 43 +++ sqlx-data.json | 157 +++++++++++ 34 files changed, 1763 insertions(+), 216 deletions(-) create mode 100644 iml-api/src/graphql/state_machine.rs create mode 100644 iml-state-machine/Cargo.toml create mode 100644 iml-state-machine/README.md create mode 100644 iml-state-machine/src/command.rs create mode 100644 iml-state-machine/src/graph.rs create mode 100644 iml-state-machine/src/job.rs create mode 100644 iml-state-machine/src/lib.rs create mode 100644 iml-state-machine/src/lnet.rs create mode 100644 iml-state-machine/src/snapshot.rs create mode 100644 iml-state-machine/src/step.rs create mode 100644 iml-warp-drive/src/messaging.rs create mode 100644 iml-warp-drive/src/state_machine.rs create mode 100644 iml-wire-types/src/state_machine.rs create mode 100644 migrations/20201026195644_state_machine.sql diff --git a/Cargo.lock b/Cargo.lock index 47061f75e3..9b9174f1fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,9 +2,9 @@ # It is not intended for manual editing. [[package]] name = "addr2line" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b6a2d3371669ab3ca9797670853d61402b03d0b4b9ebf33d677dfa720203072" +checksum = "7c0929d69e78dd9bf5408269919fcbcaeb2e35e5d43e5815517cdc6a8e11a423" dependencies = [ "gimli", ] @@ -100,12 +100,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "arc-swap" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" - [[package]] name = "arrayref" version = "0.3.6" @@ -186,9 +180,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "backtrace" -version = "0.3.53" +version = "0.3.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707b586e0e2f247cbde68cdd2c3ce69ea7b7be43e1c5b426e37c9319c4b9838e" +checksum = "2baad346b2d4e94a24347adeee9c7a93f412ee94b9cc26e5b59dea23848e9f28" dependencies = [ "addr2line", "cfg-if 1.0.0", @@ -256,9 +250,9 @@ dependencies = [ [[package]] name = "blake2b_simd" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a" +checksum = "afa748e348ad3be8263be728124b24a24f268266f6f5d58af9d75f6a40b5c587" dependencies = [ "arrayref", "arrayvec", @@ -620,9 +614,9 @@ dependencies = [ [[package]] name = "const_fn" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce90df4c658c62f12d78f7508cf92f9173e5184a539c10bfe54a3107b3ffd0f2" +checksum = "c478836e029dcef17fb47c89023448c64f781a046e0300e257ad8225ae59afab" [[package]] name = "constant_time_eq" @@ -750,7 +744,7 @@ dependencies = [ "async-trait", "crossbeam-queue", "num_cpus", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -930,11 +924,11 @@ checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" [[package]] name = "encoding_rs" -version = "0.8.24" +version = "0.8.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a51b8cf747471cb9499b6d59e59b0444f4c90eba8968c4e44874e92b5b64ace2" +checksum = "801bbab217d7f79c0062f4f7205b5d4427c6d1a7bd7aafdd1475f7c59d62b283" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", ] [[package]] @@ -1236,9 +1230,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf91faf136cb47367fa430cd46e37a788775e7fa104f8b4bcb3861dc389b724" +checksum = "f6503fe142514ca4799d4c26297c4248239fe8838d827db6bd6065c6ed29a6ce" [[package]] name = "glob" @@ -1270,7 +1264,7 @@ dependencies = [ "http", "indexmap", "slab", - "tokio", + "tokio 0.2.22", "tokio-util", "tracing", "tracing-futures", @@ -1421,9 +1415,9 @@ checksum = "3c1ad908cc71012b7bea4d0c53ba96a8cba9962f048fa68d143376143d863b7a" [[package]] name = "hyper" -version = "0.13.8" +version = "0.13.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f3afcfae8af5ad0576a31e768415edb627824129e8e5a29b8bfccb2f234e835" +checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf" dependencies = [ "bytes", "futures-channel", @@ -1435,9 +1429,9 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 0.4.27", + "pin-project 1.0.1", "socket2", - "tokio", + "tokio 0.2.22", "tower-service", "tracing", "want", @@ -1454,7 +1448,7 @@ dependencies = [ "hyper", "log", "rustls", - "tokio", + "tokio 0.2.22", "tokio-rustls", "webpki", ] @@ -1468,7 +1462,7 @@ dependencies = [ "bytes", "hyper", "native-tls", - "tokio", + "tokio 0.2.22", "tokio-tls", ] @@ -1482,7 +1476,7 @@ dependencies = [ "hex", "hyper", "pin-project 0.4.27", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -1547,7 +1541,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio", + "tokio 0.2.22", "tokio-runtime-shutdown", "tokio-test", "tracing", @@ -1597,7 +1591,7 @@ dependencies = [ "structopt", "tempfile", "thiserror", - "tokio", + "tokio 0.2.22", "tokio-util", "tracing", "url", @@ -1617,7 +1611,7 @@ dependencies = [ "iml-wire-types", "serde", "serde_json", - "tokio", + "tokio 0.2.22", "tracing", "uuid", "warp", @@ -1642,7 +1636,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio", + "tokio 0.2.22", "tracing", "uuid", "warp", @@ -1665,7 +1659,7 @@ name = "iml-cmd" version = "0.4.0" dependencies = [ "futures", - "tokio", + "tokio 0.2.22", "tracing", "warp", ] @@ -1679,7 +1673,7 @@ dependencies = [ "iml-tracing", "iml-wire-types", "thiserror", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -1697,7 +1691,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio", + "tokio 0.2.22", "tokio-test", ] @@ -1721,7 +1715,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio", + "tokio 0.2.22", "tokio-test", "url", "warp", @@ -1735,7 +1729,7 @@ dependencies = [ "futures", "tempdir", "tempfile", - "tokio", + "tokio 0.2.22", "tokio-util", ] @@ -1769,7 +1763,7 @@ dependencies = [ "iml-wire-types", "serde", "serde_json", - "tokio", + "tokio 0.2.22", "tracing", "uuid", ] @@ -1792,7 +1786,7 @@ dependencies = [ "serde_json", "thiserror", "time 0.2.22", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -1811,7 +1805,7 @@ dependencies = [ "serde_json", "tempdir", "thiserror", - "tokio", + "tokio 0.2.22", "warp", ] @@ -1847,7 +1841,7 @@ dependencies = [ "serde_yaml", "structopt", "thiserror", - "tokio", + "tokio 0.2.22", "tracing", ] @@ -1884,7 +1878,7 @@ dependencies = [ "iml-service-queue", "iml-tracing", "iml-wire-types", - "tokio", + "tokio 0.2.22", "tracing", "url", ] @@ -1900,7 +1894,7 @@ dependencies = [ "iml-service-queue", "iml-tracing", "iml-wire-types", - "tokio", + "tokio 0.2.22", "tracing", ] @@ -1914,7 +1908,7 @@ dependencies = [ "iml-service-queue", "iml-tracing", "iml-wire-types", - "tokio", + "tokio 0.2.22", "tracing", ] @@ -1940,7 +1934,7 @@ dependencies = [ "iml-service-queue", "iml-tracing", "iml-wire-types", - "tokio", + "tokio 0.2.22", "tracing", ] @@ -1955,7 +1949,7 @@ dependencies = [ "lapin", "serde_json", "thiserror", - "tokio", + "tokio 0.2.22", "tokio-amqp", "tracing", "warp", @@ -1971,7 +1965,7 @@ dependencies = [ "iml-manager-env", "iml-tracing", "tempdir", - "tokio", + "tokio 0.2.22", "tracing", "warp", ] @@ -1987,7 +1981,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "tokio", + "tokio 0.2.22", "tokio-test", "tracing", "url", @@ -2021,7 +2015,7 @@ dependencies = [ "iml-wire-types", "serde", "thiserror", - "tokio", + "tokio 0.2.22", "unzip-n", "url", "wbem-client", @@ -2045,10 +2039,28 @@ dependencies = [ "iml-wire-types", "serde", "thiserror", - "tokio", + "tokio 0.2.22", "url", ] +[[package]] +name = "iml-state-machine" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "iml-action-client", + "iml-postgres", + "iml-tracing", + "iml-wire-types", + "petgraph", + "serde", + "serde_json", + "thiserror", + "tokio 0.3.3", + "uuid", +] + [[package]] name = "iml-stats" version = "0.4.0" @@ -2064,7 +2076,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio", + "tokio 0.2.22", "tracing", "url", ] @@ -2084,7 +2096,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio", + "tokio 0.2.22", "tracing", ] @@ -2097,7 +2109,7 @@ dependencies = [ "iml-wire-types", "serde", "serde_json", - "tokio", + "tokio 0.2.22", "tracing", ] @@ -2114,7 +2126,7 @@ dependencies = [ "lazy_static", "serde_json", "thiserror", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -2127,7 +2139,7 @@ dependencies = [ "iml-tracing", "serde", "serde_json", - "tokio", + "tokio 0.2.22", "warp", ] @@ -2135,7 +2147,7 @@ dependencies = [ name = "iml-tracing" version = "0.3.0" dependencies = [ - "tokio", + "tokio 0.2.22", "tracing", "tracing-subscriber", ] @@ -2148,7 +2160,7 @@ dependencies = [ "iml-wire-types", "serde", "serde_json", - "tokio", + "tokio 0.2.22", "tracing", ] @@ -2158,15 +2170,18 @@ version = "0.4.0" dependencies = [ "futures", "im", + "iml-action-client", "iml-manager-client", "iml-manager-env", "iml-postgres", "iml-rabbit", + "iml-state-machine", "iml-tracing", "iml-wire-types", "serde", "serde_json", - "tokio", + "thiserror", + "tokio 0.2.22", "tokio-runtime-shutdown", "tracing", "uuid", @@ -2241,7 +2256,7 @@ dependencies = [ "inotify-sys", "libc", "mio 0.6.22", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -2401,7 +2416,7 @@ dependencies = [ "crossbeam-channel", "futures-core", "log", - "mio 0.7.4", + "mio 0.7.5", "parking_lot", "pinky-swear", ] @@ -2637,9 +2652,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8f1c83949125de4a582aa2da15ae6324d91cf6a58a70ea407643941ff98f558" +checksum = "8962c171f57fcfffa53f4df1bb15ec4c8cf26a7569459c9ceb62d94aab0d9584" dependencies = [ "libc", "log", @@ -2796,9 +2811,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" dependencies = [ "autocfg 1.0.1", "num-traits", @@ -2806,9 +2821,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.12" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" dependencies = [ "autocfg 1.0.1", ] @@ -2835,9 +2850,9 @@ checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" [[package]] name = "object" -version = "0.21.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37fd5004feb2ce328a52b0b3d01dbf4ffff72583493900ed15f22d4111c51693" +checksum = "8d3b63360ec3cb337817c2dbd47ab4a0f170d285d8e5a2064600f3def1402397" [[package]] name = "once_cell" @@ -3115,9 +3130,9 @@ dependencies = [ [[package]] name = "ppv-lite86" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" +checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "precomputed-hash" @@ -3459,9 +3474,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.4.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8963b85b8ce3074fecffde43b4b0dded83ce2f367dc8d363afc56679f3ee820b" +checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" dependencies = [ "aho-corasick", "memchr", @@ -3481,9 +3496,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.20" +version = "0.6.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cab7a364d15cde1e505267766a2d3c4e22a843e1a601f0fa7564c0f82ced11c" +checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" [[package]] name = "remove_dir_all" @@ -3523,7 +3538,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "tokio", + "tokio 0.2.22", "tokio-rustls", "tokio-tls", "url", @@ -3776,9 +3791,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.8.13" +version = "0.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae3e2dd40a7cdc18ca80db804b7f461a39bb721160a85c9a1fa30134bf3c02a5" +checksum = "f7baae0a99f1a324984bcdc5f0718384c1f69775f1c7eec8b859b71b443e3fd7" dependencies = [ "dtoa", "linked-hash-map", @@ -3848,11 +3863,10 @@ checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" [[package]] name = "signal-hook-registry" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e12110bc539e657a646068aaf5eb5b63af9d0c1f7b29c97113fad80e15f035" +checksum = "ce32ea0c6c56d5eacaeb814fbed9960547021d3edd010ded1425f180536b20ab" dependencies = [ - "arc-swap", "libc", ] @@ -4020,7 +4034,7 @@ source = "git+https://github.com/jgrund/sqlx?branch=support-offline-workspaces#3 dependencies = [ "native-tls", "once_cell", - "tokio", + "tokio 0.2.22", "tokio-native-tls", ] @@ -4097,7 +4111,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project 0.4.27", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -4109,7 +4123,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project 0.4.27", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -4231,7 +4245,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f34bbc13eef7fd9522e3f08538480f7762d6dd72b111f62b2bc8b185a5d982bd" dependencies = [ "cfg-if 1.0.0", - "mio 0.7.4", + "mio 0.7.5", "p12", "rustls-connector", ] @@ -4427,6 +4441,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "tokio" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ca08accbcb46f11fd8d2d1c6158c348b7888009a1f39260bcad66f6a454250" +dependencies = [ + "autocfg 1.0.1", + "pin-project-lite", +] + [[package]] name = "tokio-amqp" version = "0.1.3" @@ -4434,7 +4458,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2938fb5b638e6d8992c304e2426db2de4502f19084b91ec4562d9b45bf896fc4" dependencies = [ "lapin", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -4455,7 +4479,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd608593a919a8e05a7d1fc6df885e40f6a88d3a70a3a7eff23ff27964eda069" dependencies = [ "native-tls", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -4476,7 +4500,7 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", - "tokio", + "tokio 0.2.22", "tokio-util", ] @@ -4486,7 +4510,7 @@ version = "0.4.0" dependencies = [ "futures", "stream-cancel 0.5.2", - "tokio", + "tokio 0.2.22", "tokio-test", ] @@ -4498,7 +4522,7 @@ checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a" dependencies = [ "futures-core", "rustls", - "tokio", + "tokio 0.2.22", "webpki", ] @@ -4510,7 +4534,7 @@ checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d" dependencies = [ "bytes", "futures-core", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -4520,7 +4544,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343" dependencies = [ "native-tls", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -4532,7 +4556,7 @@ dependencies = [ "futures-util", "log", "pin-project 0.4.27", - "tokio", + "tokio 0.2.22", "tungstenite", ] @@ -4547,7 +4571,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", - "tokio", + "tokio 0.2.22", ] [[package]] @@ -4877,7 +4901,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "tokio", + "tokio 0.2.22", "tokio-tungstenite", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 1e0bebf256..81b1ee50fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ 'iml-services/iml-snapshot', 'iml-services/iml-stats', 'iml-sfa', + 'iml-state-machine', 'iml-system-test-utils', 'iml-systemd', 'iml-task-runner', diff --git a/chroma-manager.conf.template b/chroma-manager.conf.template index 76f6e999fd..034cf3e51e 100644 --- a/chroma-manager.conf.template +++ b/chroma-manager.conf.template @@ -238,7 +238,16 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_http_version 1.1; proxy_set_header Connection ''; - proxy_pass {{WARP_DRIVE_PROXY_PASS}}; + proxy_pass {{WARP_DRIVE_PROXY_PASS}}/messaging; + } + + location /state_machine { + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Server $host; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_http_version 1.1; + proxy_set_header Connection ''; + proxy_pass {{WARP_DRIVE_PROXY_PASS}}/state_machine; } location /mailbox { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index fa58eae164..a70c36f942 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -322,6 +322,8 @@ services: - "manager-config:/var/lib/chroma" environment: - PROXY_HOST=iml-warp-drive + - ACTION_RUNNER_HOST=iml-action-runner + - ACTION_RUNNER_PORT=8009 - RUST_LOG=info,sqlx::query=warn iml-action-runner: image: "imlteam/iml-action-runner:6.2.0" @@ -348,6 +350,7 @@ services: environment: - RUST_LOG=info,sqlx::query=warn - PROXY_HOST=iml-action-runner + - ACTION_RUNNER_HOST=iml-action-runner - ACTION_RUNNER_PORT=8009 iml-api: image: "imlteam/iml-api:6.2.0" @@ -360,7 +363,8 @@ services: - "manager-config:/var/lib/chroma" - "report:/var/spool/iml/report" environment: - - PROXY_HOST=iml-api + - PROXY_HOST=nginx + - SERVICE_HOST=iml-api - RUST_LOG=info,sqlx::query=warn - BRANDING - USE_STRATAGEM diff --git a/iml-action-client/src/lib.rs b/iml-action-client/src/lib.rs index 65c8d32854..ea13c2ce69 100644 --- a/iml-action-client/src/lib.rs +++ b/iml-action-client/src/lib.rs @@ -5,7 +5,7 @@ use bytes::buf::BufExt as _; use hyper::{client::HttpConnector, Body, Request}; use hyperlocal::{UnixClientExt as _, UnixConnector}; -use iml_manager_env::{get_action_runner_http, get_action_runner_uds, running_in_docker}; +use iml_manager_env::{get_action_runner_uds, running_in_docker, ACTION_RUNNER_URL}; use iml_wire_types::{Action, ActionId, ActionName, ActionType, AgentResult, Fqdn}; use std::{ops::Deref, sync::Arc}; use thiserror::Error; @@ -57,7 +57,7 @@ impl Default for Client { let (inner, uri) = if running_in_docker() { ( ClientInner::Http(hyper::Client::new()), - get_action_runner_http().parse::().unwrap(), + ACTION_RUNNER_URL.as_str().parse::().unwrap(), ) } else { ( diff --git a/iml-api/src/graphql/mod.rs b/iml-api/src/graphql/mod.rs index 6f379fee5f..c796dc78d0 100644 --- a/iml-api/src/graphql/mod.rs +++ b/iml-api/src/graphql/mod.rs @@ -2,6 +2,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +mod state_machine; mod stratagem; mod task; @@ -12,6 +13,7 @@ use crate::{ }; use chrono::{DateTime, Utc}; use futures::{future::join_all, TryFutureExt, TryStreamExt}; +use iml_manager_client::Client; use iml_postgres::{ active_mgs_host_fqdn, fqdn_by_host_id, sqlx, sqlx::postgres::types::PgInterval, PgPool, }; @@ -115,6 +117,9 @@ impl QueryRoot { fn task(&self) -> task::TaskQuery { task::TaskQuery } + fn state_machine(&self) -> state_machine::StateMachineQuery { + state_machine::StateMachineQuery + } #[graphql(arguments( limit(description = "optional paging limit, defaults to all rows"), offset(description = "Offset into items, defaults to 0"), @@ -620,6 +625,9 @@ impl MutationRoot { fn task(&self) -> task::TaskMutation { task::TaskMutation } + fn state_machine(&self) -> state_machine::StateMachineMutation { + state_machine::StateMachineMutation + } #[graphql(arguments( fsname(description = "Filesystem to snapshot"), name(description = "Name of the snapshot"), @@ -870,8 +878,14 @@ impl MutationRoot { .map(|x| x.id); if let Some(id) = maybe_id { - configure_snapshot_timer(id, fsname, interval.0, use_barrier.unwrap_or_default()) - .await?; + configure_snapshot_timer( + context.http_client.clone(), + id, + fsname, + interval.0, + use_barrier.unwrap_or_default(), + ) + .await?; } Ok(true) @@ -884,7 +898,7 @@ impl MutationRoot { .execute(&context.pg_pool) .await?; - remove_snapshot_timer(id).await?; + remove_snapshot_timer(context.http_client.clone(), id).await?; Ok(true) } @@ -1117,6 +1131,7 @@ pub(crate) type Schema = RootNode<'static, QueryRoot, MutationRoot, EmptySubscri pub(crate) struct Context { pub(crate) pg_pool: PgPool, pub(crate) rabbit_pool: Pool, + pub(crate) http_client: Client, } impl juniper::Context for Context {} diff --git a/iml-api/src/graphql/state_machine.rs b/iml-api/src/graphql/state_machine.rs new file mode 100644 index 0000000000..c6b1dcba1b --- /dev/null +++ b/iml-api/src/graphql/state_machine.rs @@ -0,0 +1,150 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::graphql::Context; +use iml_manager_client::{post, Client, ImlManagerClientError}; +use iml_manager_env::get_proxy_url; +use iml_postgres::sqlx; +use iml_wire_types::{ + snapshot::{Destroy, Mount, Unmount}, + state_machine::{Command, CommandRecord, Job, Transition}, + warp_drive::{GraphqlRecordId, RecordId}, +}; + +pub(crate) struct StateMachineMutation; + +#[juniper::graphql_object(Context = Context)] +impl StateMachineMutation { + /// Run a state_machine `Transition` for a given record + async fn run_transition( + context: &Context, + record_id: GraphqlRecordId, + transition: Transition, + ) -> juniper::FieldResult { + let record_id = RecordId::from(record_id); + + let xs = get_transition_path(context.http_client.clone(), record_id, transition).await?; + + let mut jobs = vec![]; + + for x in xs { + match (record_id, x) { + (RecordId::Snapshot(x), Transition::MountSnapshot) => { + let x = sqlx::query!( + "SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1", + x + ) + .fetch_one(&context.pg_pool) + .await?; + + jobs.push(Job::MountSnapshotJob(Mount { + fsname: x.filesystem_name, + name: x.snapshot_name, + })); + } + (RecordId::Snapshot(x), Transition::UnmountSnapshot) => { + let x = sqlx::query!( + "SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1", + x + ) + .fetch_one(&context.pg_pool) + .await?; + + jobs.push(Job::UnmountSnapshotJob(Unmount { + fsname: x.filesystem_name, + name: x.snapshot_name, + })) + } + (RecordId::Snapshot(x), Transition::RemoveSnapshot) => { + let x = sqlx::query!( + "SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1", + x + ) + .fetch_one(&context.pg_pool) + .await?; + + jobs.push(Job::RemoveSnapshotJob(Destroy { + fsname: x.filesystem_name, + name: x.snapshot_name, + force: true, + })) + } + _ => {} + } + } + + let cmd = Command { + message: "Running Transition".to_string(), + jobs, + }; + + let mut url = get_proxy_url(); + + url.set_path("state_machine/run_command/"); + + let cmd = post(context.http_client.clone(), url.as_str(), cmd) + .await? + .error_for_status()? + .json() + .await?; + + Ok(cmd) + } +} + +pub(crate) struct StateMachineQuery; + +#[juniper::graphql_object(Context = Context)] +impl StateMachineQuery { + /// Given a record, figure out the possible transitions available for it + async fn get_transitions( + context: &Context, + record_id: GraphqlRecordId, + ) -> juniper::FieldResult> { + let mut url = get_proxy_url(); + + url.set_path("state_machine/get_transitions/"); + + let xs = post( + context.http_client.clone(), + url.as_str(), + RecordId::from(record_id), + ) + .await? + .error_for_status()? + .json() + .await?; + + Ok(xs) + } + /// Given a record and transition, figure out the shortest possible path for that + /// Record to reach that transition. + async fn get_transition_path( + context: &Context, + record_id: GraphqlRecordId, + transition: Transition, + ) -> juniper::FieldResult> { + let xs = get_transition_path(context.http_client.clone(), record_id, transition).await?; + + Ok(xs) + } +} + +async fn get_transition_path( + client: Client, + record_id: impl Into, + transition: Transition, +) -> Result, ImlManagerClientError> { + let mut url = get_proxy_url(); + + url.set_path("state_machine/get_transition_path/"); + + let xs = post(client, url.as_str(), (record_id.into(), transition)) + .await? + .error_for_status()? + .json() + .await?; + + Ok(xs) +} diff --git a/iml-api/src/main.rs b/iml-api/src/main.rs index 6e4b078361..126fe9a124 100644 --- a/iml-api/src/main.rs +++ b/iml-api/src/main.rs @@ -8,6 +8,7 @@ mod error; mod graphql; mod timer; +use iml_manager_client::get_client; use iml_manager_env::get_pool_limit; use iml_postgres::get_db_pool; use iml_rabbit::{self, create_connection_filter}; @@ -22,7 +23,7 @@ const DEFAULT_POOL_LIMIT: u32 = 5; async fn main() -> Result<(), Box> { iml_tracing::init(); - let addr = iml_manager_env::get_iml_api_addr(); + let addr = iml_manager_env::get_iml_api_bind_addr(); let conf = Conf { allow_anonymous_read: iml_manager_env::get_allow_anonymous_read(), @@ -42,6 +43,8 @@ async fn main() -> Result<(), Box> { let pg_pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?; + let http_client = get_client()?; + let schema = Arc::new(graphql::Schema::new( graphql::QueryRoot, graphql::MutationRoot, @@ -52,6 +55,7 @@ async fn main() -> Result<(), Box> { let ctx = Arc::new(graphql::Context { pg_pool, rabbit_pool, + http_client, }); let ctx_filter = warp::any().map(move || Arc::clone(&ctx)); diff --git a/iml-api/src/timer.rs b/iml-api/src/timer.rs index 615e6ad48a..47a5af13d4 100644 --- a/iml-api/src/timer.rs +++ b/iml-api/src/timer.rs @@ -1,5 +1,5 @@ use crate::error::ImlApiError; -use iml_manager_client::{delete, get_client, put}; +use iml_manager_client::{delete, put, Client}; use iml_manager_env::{get_timer_addr, running_in_docker}; use std::time::Duration; @@ -12,6 +12,7 @@ pub struct TimerConfig { } pub async fn configure_snapshot_timer( + client: Client, config_id: i32, fsname: String, interval: Duration, @@ -73,8 +74,6 @@ ExecStart={} service_config, }; - let client = get_client()?; - let url = format!("http://{}/configure/", get_timer_addr()); tracing::debug!( "Sending snapshot interval config to timer service: {:?} {:?}", @@ -86,9 +85,7 @@ ExecStart={} Ok(()) } -pub async fn remove_snapshot_timer(config_id: i32) -> Result<(), ImlApiError> { - let client = get_client()?; - +pub async fn remove_snapshot_timer(client: Client, config_id: i32) -> Result<(), ImlApiError> { delete( client, format!( diff --git a/iml-manager-cli/src/snapshots/iml_manager_cli__nginx__tests__replace_template_variables.snap b/iml-manager-cli/src/snapshots/iml_manager_cli__nginx__tests__replace_template_variables.snap index 1c544b9a09..20342b1c9b 100644 --- a/iml-manager-cli/src/snapshots/iml_manager_cli__nginx__tests__replace_template_variables.snap +++ b/iml-manager-cli/src/snapshots/iml_manager_cli__nginx__tests__replace_template_variables.snap @@ -242,7 +242,16 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_http_version 1.1; proxy_set_header Connection ''; - proxy_pass http://127.0.0.1:8890; + proxy_pass http://127.0.0.1:8890/messaging; + } + + location /state_machine { + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Server $host; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_http_version 1.1; + proxy_set_header Connection ''; + proxy_pass http://127.0.0.1:8890/state_machine; } location /mailbox { diff --git a/iml-manager-env/src/lib.rs b/iml-manager-env/src/lib.rs index 883883fc63..31d9b31a8d 100644 --- a/iml-manager-env/src/lib.rs +++ b/iml-manager-env/src/lib.rs @@ -16,11 +16,7 @@ lazy_static! { } lazy_static! { - static ref ACTION_RUNNER_HTTP: String = format!( - "http://{}:{}", - get_server_host(), - get_var("ACTION_RUNNER_PORT") - ); + pub static ref ACTION_RUNNER_URL: Url = get_action_runner_url(); } /// Get the environment variable or panic @@ -140,6 +136,10 @@ pub fn get_iml_api_addr() -> SocketAddr { to_socket_addr(&get_server_host(), &get_iml_api_port()) } +pub fn get_iml_api_bind_addr() -> SocketAddr { + to_socket_addr(&get_service_host(), &get_iml_api_port()) +} + /// Get the `http_agent2` port from the env or panic pub fn get_http_agent2_port() -> String { get_var("HTTP_AGENT2_PORT") @@ -149,7 +149,12 @@ pub fn get_http_agent2_addr() -> SocketAddr { to_socket_addr(&get_server_host(), &get_http_agent2_port()) } -/// Get the server host from the env or panic +/// Get the name of the host a service should bind to +pub fn get_service_host() -> String { + env::var("SERVICE_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()) +} + +/// Get the nginx host from the env or panic pub fn get_server_host() -> String { get_var("PROXY_HOST") } @@ -287,14 +292,39 @@ pub fn get_use_snapshots() -> bool { string_to_bool(env::var("USE_SNAPSHOTS").unwrap_or_else(|_| "false".to_string())) } -pub fn get_action_runner_http() -> String { - ACTION_RUNNER_HTTP.clone() +pub fn get_action_runner_host() -> String { + get_var("ACTION_RUNNER_HOST") +} + +pub fn get_action_runner_port() -> String { + get_var("ACTION_RUNNER_PORT") +} + +pub fn get_action_runner_url() -> Url { + Url::parse(&format!( + "http://{}:{}", + get_action_runner_host(), + get_action_runner_port() + )) + .expect("Could not parse action runner Url") } pub fn get_action_runner_uds() -> String { "/var/run/iml-action-runner.sock".to_string() } +/// Get the nginx proxy port or panic +pub fn get_proxy_port() -> String { + get_var("HTTPS_FRONTEND_PORT") +} + +/// Get the proxy URL or panic +pub fn get_proxy_url() -> Url { + let x = format!("https://{}:{}/", get_server_host(), get_proxy_port()); + + Url::parse(&x).expect("Could not parse proxy URL") +} + pub fn get_sfa_endpoints() -> Option>> { let xs: BTreeMap<_, _> = env::vars() .filter(|(k, _)| k.starts_with("SFA_ENDPOINTS_")) diff --git a/iml-state-machine/Cargo.toml b/iml-state-machine/Cargo.toml new file mode 100644 index 0000000000..eba03b619d --- /dev/null +++ b/iml-state-machine/Cargo.toml @@ -0,0 +1,19 @@ +[package] +authors = ["IML Team "] +edition = "2018" +name = "iml-state-machine" +version = "0.1.0" + +[dependencies] +async-trait = "0.1" +futures = "0.3" +iml-action-client = {path = "../iml-action-client", version = "0.1"} +iml-postgres = {path = "../iml-postgres", version = "0.4"} +iml-tracing = {path = "../iml-tracing", version = "0.3"} +iml-wire-types = {path = "../iml-wire-types", version = "0.4", features = ["postgres-interop"]} +petgraph = "0.5" +serde = {version = "1", features = ["derive"]} +serde_json = "1" +thiserror = "1.0" +tokio = "0.3" +uuid = {version = "0.8", features = ["v4"]} diff --git a/iml-state-machine/README.md b/iml-state-machine/README.md new file mode 100644 index 0000000000..d34fe2fcfd --- /dev/null +++ b/iml-state-machine/README.md @@ -0,0 +1,27 @@ +# IML State Machine Model + +``` + ┌────────────────────────────────────────────┐ + │ │ + │ Client mount │ + │ │ + │ │ + └────────────────────────────────────────────┘ + │ + │ + │ + │ + │ + ┌───────Depends on─────┴────────Depends On─────────┐ + │ │ + │ │ + │ │ + │ │ + ▼ ▼ +┌────────────────────────────────────────────┐ ┌────────────────────────────────────────────┐ +│ │ │ │ +│ LNet │ │ Filesystem │ +│ │ │ │ +│ │ │ │ +└────────────────────────────────────────────┘ └────────────────────────────────────────────┘ +``` diff --git a/iml-state-machine/src/command.rs b/iml-state-machine/src/command.rs new file mode 100644 index 0000000000..480901ab9f --- /dev/null +++ b/iml-state-machine/src/command.rs @@ -0,0 +1,199 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{job::Job, step::Steps, Error}; +use future::Aborted; +use futures::future::{self, abortable}; +use futures::{future::AbortHandle, lock::Mutex}; +use iml_action_client::Client; +use iml_postgres::{sqlx, PgPool}; +use iml_tracing::tracing; +use iml_wire_types::{ + state_machine, + state_machine::{Command, CommandRecord, CurrentState}, +}; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tokio::time; +use uuid::Uuid; + +/* +1. Create command +2. Create jobs +3. Add jobs to command +3. push jobs into job_queue +4. Run each job in separate task, accounting for any dependendent jobs that came before. +*/ + +pub enum JobState { + Pending, + Running(Option), +} + +impl JobState { + fn is_pending(&self) -> bool { + match self { + Self::Pending => true, + Self::Running(_) => false, + } + } + fn is_running(&self) -> bool { + !self.is_pending() + } +} + +pub type JobStates = Arc>>; + +pub async fn run_command( + pool: &PgPool, + job_states: &JobStates, + cmd: Command, +) -> Result { + let mut transaction = pool.begin().await?; + + let x = sqlx::query_as!( + CommandRecord, + r#" + INSERT INTO command (message) + VALUES ($1) + RETURNING id, start_time, end_time, state as "state: CurrentState", message, jobs + "#, + cmd.message + ) + .fetch_one(&mut transaction) + .await?; + + let mut jobs = vec![]; + + for job in cmd.jobs { + let locks = job.get_locks(&pool).await?; + let locks = locks + .into_iter() + .map(|x| serde_json::to_value(x)) + .collect::, _>>()?; + + let job_id = sqlx::query!( + r#" + INSERT INTO job (command_id, job, wait_for_jobs, locked_records) + VALUES ($1, $2, array[]::int[], $3) + RETURNING id + "#, + x.id, + serde_json::to_value(&job)?, + &locks + ) + .fetch_one(&mut transaction) + .await? + .id; + + jobs.push((job_id, (job, JobState::Pending))); + } + + let ids: Vec = jobs.iter().map(|x| x.0).collect(); + + sqlx::query!("UPDATE command SET jobs = $1", &ids) + .execute(&mut transaction) + .await?; + + job_states.lock().await.extend(jobs); + + transaction.commit().await?; + + Ok(x) +} + +pub async fn run_jobs(client: Client, pool: PgPool, job_states: JobStates) { + loop { + let job_states = Arc::clone(&job_states); + + let xs: HashMap = { + let mut x = job_states.lock().await; + + x.iter_mut() + .filter_map(|(k, (job, state))| { + if state.is_pending() { + Some((*k, job.get_steps())) + } else { + None + } + }) + .collect() + }; + + for (k, steps) in xs { + let client = client.clone(); + let job_states = Arc::clone(&job_states); + let pool = pool.clone(); + + tokio::spawn(async move { + let r = run_steps(client, pool.clone(), k, steps, Arc::clone(&job_states)).await; + + let mut lock = job_states.lock().await; + + lock.remove(&k); + + let end_state = match r { + Ok(_) => state_machine::CurrentState::Succeeded, + Err(Error::Aborted(_)) => state_machine::CurrentState::Cancelled, + Err(e) => state_machine::CurrentState::Failed, + }; + + sqlx::query!( + r#" + UPDATE job + SET + state = $1, + end_time = now() + "#, + end_state as state_machine::CurrentState + ) + .execute(&pool) + .await; + }); + } + + time::sleep(Duration::from_secs(1)).await + } +} + +async fn run_steps( + client: Client, + pool: PgPool, + job_id: i32, + steps: Steps, + job_states: JobStates, +) -> Result<(), Error> { + for (f, args) in steps.0 { + let fut = f(pool.clone(), args); + let (fqdn, action, args) = fut.await?; + + let uuid = Uuid::new_v4(); + + let fut = client.invoke_rust_agent_expect_result(fqdn.to_string(), action, args, &uuid); + let (fut, h) = abortable(fut); + + { + let mut lock = job_states.lock().await; + + let (job, _) = lock.remove(&job_id).unwrap(); + + lock.insert(job_id, (job, JobState::Running(Some(h)))); + } + + match fut.await { + Err(Aborted) => { + let r = client.cancel_request(fqdn, &uuid).await; + + return Err(Error::Aborted(Aborted)); + } + Ok(Err(e)) => { + tracing::error!("Step failed: {:?}", e); + + return Err(e.into()); + } + Ok(Ok(x)) => tracing::info!("Got {:?}", x), + }; + } + + Ok(()) +} diff --git a/iml-state-machine/src/graph.rs b/iml-state-machine/src/graph.rs new file mode 100644 index 0000000000..44df929c9a --- /dev/null +++ b/iml-state-machine/src/graph.rs @@ -0,0 +1,250 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use iml_wire_types::state_machine::{snapshot, Edge, State, Transition}; +use petgraph::{ + algo::astar, + graph::{DiGraph, NodeIndex}, + prelude::*, + visit::{EdgeFiltered, IntoNeighborsDirected}, + Direction, +}; +use std::collections::HashSet; + +trait GraphExt { + fn find_node_idx(&self, x: &N) -> Option; +} + +impl GraphExt for Graph { + fn find_node_idx(&self, x: &N) -> Option { + self.node_indices().find(|i| &self[*i] == x) + } +} + +pub type StateGraph = DiGraph; + +pub trait StateGraphExt { + /// Get the node cooresponding to the current state, if one exists. + fn get_state_node(&self, state: impl Into) -> Option; + /// Get the available `Transition`s for this NodeIndex. + /// + /// A `Transition` is available iff it's cooresponding state + /// and all dependendant states can be satisfied. + fn get_available_transitions(&self, n: NodeIndex) -> HashSet; + fn get_transition_path( + &self, + start_state: impl Into, + transition: impl Into, + ) -> Option>; +} + +impl StateGraphExt for StateGraph { + fn get_state_node(&self, state: impl Into) -> Option { + self.find_node_idx(&state.into()) + } + fn get_available_transitions(&self, n: NodeIndex) -> HashSet { + let graph = EdgeFiltered::from_fn(&self, |x| x.weight().is_transition()); + + let mut transitions = HashSet::new(); + + let mut dfs = Dfs::new(&graph, n); + + let mut seen = HashSet::new(); + seen.insert(n); + + while let Some(from_node) = dfs.next(&self) { + let mut neighbors = graph + .neighbors_directed(from_node, Direction::Outgoing) + .into_iter(); + + while let Some(to_node) = neighbors.next() { + if seen.contains(&to_node) { + continue; + } + + seen.insert(to_node); + + let ix = self + .find_edge(from_node, to_node) + .expect("Could not find edge"); + + let t = match self[ix] { + Edge::Dependency(_) => { + panic!("Found a `Dependency` in a `Transition` filtered graph."); + } + Edge::Transition(t) => t, + }; + + transitions.insert(t); + } + } + + transitions + } + fn get_transition_path( + &self, + start_state: impl Into, + transition: impl Into, + ) -> Option> { + let start_state_ix = self.get_state_node(start_state)?; + let x = transition.into(); + + let xs = astar( + &self, + start_state_ix, + |finish| { + self.edges_directed(finish, Direction::Incoming) + .any(|edge| edge.weight() == &Edge::Transition(x)) + }, + |_| 1, + |_| 0, + )? + .1; + + let xs = xs.iter().zip(xs.iter().skip(1)).collect::>(); + + let mut out = vec![]; + + for (a, b) in xs { + let e = self.find_edge(*a, *b)?; + + let edge = *&self[e]; + + match edge { + Edge::Dependency(_) => return None, + Edge::Transition(x) => out.push(x), + }; + } + + Some(out) + } +} + +pub fn build_graph() -> StateGraph { + let mut deps = StateGraph::new(); + + let unknown_snapshot = deps.add_node(snapshot::State::Unknown.into()); + let unmounted_snapshot = deps.add_node(snapshot::State::Unmounted.into()); + let mounted_snapshot = deps.add_node(snapshot::State::Mounted.into()); + let removed_snapshot = deps.add_node(snapshot::State::Removed.into()); + + deps.add_edge( + unknown_snapshot, + unmounted_snapshot, + Transition::CreateSnapshot.into(), + ); + + deps.add_edge( + unmounted_snapshot, + mounted_snapshot, + Transition::MountSnapshot.into(), + ); + + deps.add_edge( + mounted_snapshot, + unmounted_snapshot, + Transition::UnmountSnapshot.into(), + ); + + deps.add_edge( + unmounted_snapshot, + removed_snapshot, + Transition::RemoveSnapshot.into(), + ); + + deps.add_edge( + mounted_snapshot, + removed_snapshot, + Transition::RemoveSnapshot.into(), + ); + + deps +} + +#[cfg(test)] +pub mod test { + use super::*; + use iml_wire_types::state_machine::snapshot; + use petgraph::dot::Dot; + + #[test] + fn get_snapshot_mount_transitions() { + let graph = build_graph(); + + let ix = graph.get_state_node(snapshot::State::Mounted).unwrap(); + + let xs = graph.get_available_transitions(ix); + + assert_eq!( + xs, + vec![ + Transition::RemoveSnapshot.into(), + Transition::UnmountSnapshot.into(), + ] + .into_iter() + .collect() + ); + } + + #[test] + fn get_snapshot_unmount_transitions() { + let graph = build_graph(); + + let ix = graph.get_state_node(snapshot::State::Unmounted).unwrap(); + + let xs = graph.get_available_transitions(ix); + + assert_eq!( + xs, + vec![ + Transition::RemoveSnapshot.into(), + Transition::MountSnapshot.into(), + ] + .into_iter() + .collect() + ); + } + + #[test] + fn get_snapshot_remove_transitions() { + let graph = build_graph(); + + let ix = graph.get_state_node(snapshot::State::Removed).unwrap(); + + let xs = graph.get_available_transitions(ix); + + assert_eq!(xs, vec![].into_iter().collect()); + } + + #[test] + fn get_snapshot_mount_remove_transition() { + let graph = build_graph(); + + let xs = graph + .get_transition_path(snapshot::State::Mounted, Transition::RemoveSnapshot) + .unwrap(); + + assert_eq!(xs, vec![Transition::RemoveSnapshot.into()]); + } + + #[test] + fn get_snapshot_mount_unmount_transition() { + let graph = build_graph(); + + let xs = graph + .get_transition_path(snapshot::State::Mounted, Transition::UnmountSnapshot) + .unwrap(); + + assert_eq!(xs, vec![Transition::UnmountSnapshot.into()]); + } + + #[test] + fn show_dotviz() { + let graph = build_graph(); + + let dot = Dot::with_config(&graph, &[]); + + eprintln!("graph {:?}", dot); + } +} diff --git a/iml-state-machine/src/job.rs b/iml-state-machine/src/job.rs new file mode 100644 index 0000000000..5879909bac --- /dev/null +++ b/iml-state-machine/src/job.rs @@ -0,0 +1,79 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{ + snapshot::create_snapshot, snapshot::destroy_snapshot, snapshot::mount_snapshot, + snapshot::unmount_snapshot, step::Steps, Error, +}; +use async_trait::async_trait; +use iml_postgres::{sqlx, PgPool}; +use iml_wire_types::{state_machine, state_machine::Transition, warp_drive::RecordId}; + +#[async_trait] +pub trait Job { + async fn get_locks(&self, _pool: &PgPool) -> Result, Error> { + Ok(vec![]) + } + /// The steps that need to be run to complete this job. + /// Steps run serially and can be cancelled. + /// Cancelling a step cancels all further steps in the series, + /// and also cancels all dependendant jobs. + fn get_steps(&self) -> Steps; + fn get_transition(&self) -> Option { + None + } +} + +#[async_trait] +impl Job for state_machine::Job { + fn get_steps(&self) -> Steps { + match self.clone() { + Self::CreateSnapshotJob(x) => Steps::default().add_remote_step(create_snapshot, x), + Self::MountSnapshotJob(x) => Steps::default().add_remote_step(mount_snapshot, x), + Self::UnmountSnapshotJob(x) => Steps::default().add_remote_step(unmount_snapshot, x), + Self::RemoveSnapshotJob(x) => Steps::default().add_remote_step(destroy_snapshot, x), + } + } + fn get_transition(&self) -> Option { + match self { + Self::CreateSnapshotJob(_) => Some(Transition::CreateSnapshot.into()), + Self::MountSnapshotJob(_) => Some(Transition::MountSnapshot.into()), + Self::UnmountSnapshotJob(_) => Some(Transition::UnmountSnapshot.into()), + Self::RemoveSnapshotJob(_) => Some(Transition::RemoveSnapshot.into()), + } + } + async fn get_locks(&self, pool: &PgPool) -> Result, Error> { + match self { + Self::CreateSnapshotJob(_) => Ok(vec![]), + Self::MountSnapshotJob(x) => { + let id = get_snapshot_id(pool, &x.name, &x.fsname).await?; + + Ok(vec![RecordId::Snapshot(id)]) + } + Self::UnmountSnapshotJob(x) => { + let id = get_snapshot_id(pool, &x.name, &x.fsname).await?; + + Ok(vec![RecordId::Snapshot(id)]) + } + Self::RemoveSnapshotJob(x) => { + let id = get_snapshot_id(pool, &x.name, &x.fsname).await?; + + Ok(vec![RecordId::Snapshot(id)]) + } + } + } +} + +async fn get_snapshot_id(pool: &PgPool, name: &str, fsname: &str) -> Result { + let id = sqlx::query!( + "SELECT id FROM snapshot WHERE snapshot_name = $1 AND filesystem_name = $2", + name, + fsname + ) + .fetch_one(pool) + .await? + .id; + + Ok(id) +} diff --git a/iml-state-machine/src/lib.rs b/iml-state-machine/src/lib.rs new file mode 100644 index 0000000000..2ad0ce5ff3 --- /dev/null +++ b/iml-state-machine/src/lib.rs @@ -0,0 +1,26 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +mod command; +pub mod graph; +mod job; +mod snapshot; +mod step; + +pub use command::{run_command, run_jobs, JobStates}; +use futures::future::Aborted; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Aborted(#[from] Aborted), + #[error("State Not Found")] + NotFound, + #[error(transparent)] + SerdeJsonError(#[from] serde_json::Error), + #[error(transparent)] + ImlActionClientError(#[from] iml_action_client::ImlActionClientError), + #[error(transparent)] + SqlxError(#[from] iml_postgres::sqlx::Error), +} diff --git a/iml-state-machine/src/lnet.rs b/iml-state-machine/src/lnet.rs new file mode 100644 index 0000000000..db76238b82 --- /dev/null +++ b/iml-state-machine/src/lnet.rs @@ -0,0 +1,88 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use futures::{Future, FutureExt}; +use petgraph::graph::DiGraph; +use std::{ io, pin::Pin}; + +pub enum LnetStates { + Unconfigured, + Unloaded, + Down, + Up, +} + +impl Default for LnetStates { + fn default() -> Self { + Self::Unconfigured + } +} + +impl LnetStates { + fn step(self, next: &Self) { + match (self, next) { + (Self::Unconfigured, Self::Unloaded) => {} + (Self::Unloaded, Self::Down) => {} + (Self::Down, Self::Up) => {} + (Self::Up, Self::Down) => {} + (Self::Down, Self::Unloaded) => {} + (Self::Unloaded, Self::Unconfigured) => {} + _ => {} + }; + } +} + +async fn configure() -> Result<(), io::Error> { + Ok(()) +} + +async fn load() -> Result<(), io::Error> { + Ok(()) +} + +async fn start() -> Result<(), io::Error> { + Ok(()) +} + +async fn stop() -> Result<(), io::Error> { + Ok(()) +} + +async fn unload() -> Result<(), io::Error> { + Ok(()) +} + +async fn unconfigure() -> Result<(), io::Error> { + Ok(()) +} + +type BoxedFuture = Pin> + Send>>; + +type Transition = Box BoxedFuture + Send + Sync>; + +fn mk_transition(f: fn() -> Fut) -> Transition +where + Fut: Future> + Send + 'static, +{ + Box::new(move || f().boxed()) +} + +fn build_graph() -> DiGraph:: { + let mut deps = DiGraph::::new(); + + let unconfigured = deps.add_node(LnetStates::Unconfigured); + let unloaded = deps.add_node(LnetStates::Unloaded); + let down = deps.add_node(LnetStates::Down); + let up = deps.add_node(LnetStates::Up); + + deps.add_edge(unconfigured, unloaded, mk_transition(configure)); + deps.add_edge(unloaded, down, mk_transition(load)); + deps.add_edge(down, up, mk_transition(start)); + deps.add_edge(up, down, mk_transition(stop)); + deps.add_edge(down, unloaded, mk_transition(unload)); + deps.add_edge(unloaded, unconfigured, mk_transition(unconfigure)); + + deps + +} diff --git a/iml-state-machine/src/snapshot.rs b/iml-state-machine/src/snapshot.rs new file mode 100644 index 0000000000..6fc8ad87c8 --- /dev/null +++ b/iml-state-machine/src/snapshot.rs @@ -0,0 +1,41 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::Error; +use iml_postgres::{active_mgs_host_fqdn, PgPool}; +use iml_wire_types::{ + snapshot::{Create, Destroy, Mount, Unmount}, + Fqdn, +}; + +pub async fn mount_snapshot(pool: PgPool, x: Mount) -> Result<(Fqdn, String, Mount), Error> { + let fqdn = get_active_mgs_or_fail(&pool, &x.fsname).await?; + + Ok((fqdn, "snapshot_mount".to_string(), x)) +} + +pub async fn unmount_snapshot(pool: PgPool, x: Unmount) -> Result<(Fqdn, String, Unmount), Error> { + let fqdn = get_active_mgs_or_fail(&pool, &x.fsname).await?; + + Ok((fqdn, "snapshot_unmount".to_string(), x)) +} + +pub async fn destroy_snapshot(pool: PgPool, x: Destroy) -> Result<(Fqdn, String, Destroy), Error> { + let fqdn = get_active_mgs_or_fail(&pool, &x.fsname).await?; + + Ok((fqdn, "snapshot_destroy".to_string(), x)) +} + +pub async fn create_snapshot(pool: PgPool, x: Create) -> Result<(Fqdn, String, Create), Error> { + let fqdn = get_active_mgs_or_fail(&pool, &x.fsname).await?; + + Ok((fqdn, "snapshot_create".to_string(), x)) +} + +async fn get_active_mgs_or_fail(pool: &PgPool, fsname: &str) -> Result { + match active_mgs_host_fqdn(fsname, pool).await? { + Some(x) => Ok(Fqdn(x)), + None => Err(Error::NotFound), + } +} diff --git a/iml-state-machine/src/step.rs b/iml-state-machine/src/step.rs new file mode 100644 index 0000000000..9995873d04 --- /dev/null +++ b/iml-state-machine/src/step.rs @@ -0,0 +1,61 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::Error; +use futures::{future, Future, TryFutureExt}; +use iml_postgres::PgPool; +use iml_wire_types::Fqdn; +use std::pin::Pin; + +type BoxedFuture = + Pin> + Send>>; + +type Step = Box) -> BoxedFuture + Send>; + +fn mk_step(f: fn(PgPool, T) -> Fut) -> Step +where + T: serde::de::DeserializeOwned + serde::Serialize + Send + 'static, + R: serde::Serialize + Send + 'static, + Fut: Future> + Send + 'static, +{ + Box::new(move |p, x| { + let x = match x.and_then(|v| serde_json::from_value(v)) { + Ok(x) => x, + Err(e) => { + return Box::pin(future::err(e.into())); + } + }; + + let fut = f(p, x); + + let fut = fut.err_into().and_then(|(fqdn, action, x)| async { + let x = serde_json::to_value(x)?; + + Ok((fqdn, action, x)) + }); + + Box::pin(fut) + }) +} + +pub struct Steps(pub Vec<(Step, Result)>); + +impl Default for Steps { + fn default() -> Self { + Steps(vec![]) + } +} + +impl Steps { + pub fn add_remote_step(mut self, f: fn(PgPool, T) -> Fut, args: T) -> Self + where + T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static, + R: serde::Serialize + Send + 'static, + Fut: Future> + Send + 'static, + { + self.0.push((mk_step(f), serde_json::to_value(args))); + + self + } +} diff --git a/iml-warp-drive/Cargo.toml b/iml-warp-drive/Cargo.toml index 3c622af970..65571856df 100644 --- a/iml-warp-drive/Cargo.toml +++ b/iml-warp-drive/Cargo.toml @@ -7,14 +7,17 @@ version = "0.4.0" [dependencies] futures = "0.3" im = {version = "15.0", features = ["serde"]} +iml-action-client = {path = "../iml-action-client", version = "0.1"} iml-manager-client = {path = "../iml-manager-client", version = "0.4"} iml-manager-env = {path = "../iml-manager-env", version = "0.4"} iml-postgres = {path = "../iml-postgres", version = "0.4"} iml-rabbit = {path = "../iml-rabbit", version = "0.4"} +iml-state-machine = {path = "../iml-state-machine", version = "0.1"} iml-tracing = {version = "0.3", path = "../iml-tracing"} iml-wire-types = {path = "../iml-wire-types", version = "0.4", features = ["postgres-interop"]} serde = {version = "1", features = ["derive"]} serde_json = "1.0" +thiserror = "1.0" tokio = {version = "0.2", features = ["macros", "rt-threaded"]} tokio-runtime-shutdown = {path = "../tokio-runtime-shutdown", version = "0.4"} tracing = "0.1" diff --git a/iml-warp-drive/src/error.rs b/iml-warp-drive/src/error.rs index 6f987d1a04..543306990f 100644 --- a/iml-warp-drive/src/error.rs +++ b/iml-warp-drive/src/error.rs @@ -7,67 +7,26 @@ use iml_postgres::DbError; use iml_rabbit::ImlRabbitError; use warp::reject; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum ImlWarpDriveError { - ImlRabbitError(ImlRabbitError), - ImlManagerClientError(ImlManagerClientError), - TokioPostgresError(iml_postgres::Error), + #[error(transparent)] + ImlRabbitError(#[from] ImlRabbitError), + #[error(transparent)] + ImlManagerClientError(#[from] ImlManagerClientError), + #[error(transparent)] + StateMachineError(#[from] iml_state_machine::Error), + #[error(transparent)] + TokioPostgresError(#[from] iml_postgres::Error), + #[error(transparent)] DbError(Box), - SerdeJsonError(serde_json::error::Error), + #[error(transparent)] + SerdeJsonError(#[from] serde_json::error::Error), } impl reject::Reject for ImlWarpDriveError {} -impl std::fmt::Display for ImlWarpDriveError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match *self { - ImlWarpDriveError::ImlRabbitError(ref err) => write!(f, "{}", err), - ImlWarpDriveError::ImlManagerClientError(ref err) => write!(f, "{}", err), - ImlWarpDriveError::TokioPostgresError(ref err) => write!(f, "{}", err), - ImlWarpDriveError::DbError(ref err) => write!(f, "{}", err), - ImlWarpDriveError::SerdeJsonError(ref err) => write!(f, "{}", err), - } - } -} - -impl std::error::Error for ImlWarpDriveError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match *self { - ImlWarpDriveError::ImlRabbitError(ref err) => Some(err), - ImlWarpDriveError::ImlManagerClientError(ref err) => Some(err), - ImlWarpDriveError::TokioPostgresError(ref err) => Some(err), - ImlWarpDriveError::DbError(ref err) => Some(err), - ImlWarpDriveError::SerdeJsonError(ref err) => Some(err), - } - } -} - -impl From for ImlWarpDriveError { - fn from(err: ImlRabbitError) -> Self { - ImlWarpDriveError::ImlRabbitError(err) - } -} - -impl From for ImlWarpDriveError { - fn from(err: ImlManagerClientError) -> Self { - ImlWarpDriveError::ImlManagerClientError(err) - } -} - impl From for ImlWarpDriveError { fn from(err: DbError) -> Self { ImlWarpDriveError::DbError(Box::new(err)) } } - -impl From for ImlWarpDriveError { - fn from(err: iml_postgres::Error) -> Self { - ImlWarpDriveError::TokioPostgresError(err) - } -} - -impl From for ImlWarpDriveError { - fn from(err: serde_json::error::Error) -> Self { - ImlWarpDriveError::SerdeJsonError(err) - } -} diff --git a/iml-warp-drive/src/lib.rs b/iml-warp-drive/src/lib.rs index 6f281ca962..9ebedb81d9 100644 --- a/iml-warp-drive/src/lib.rs +++ b/iml-warp-drive/src/lib.rs @@ -7,7 +7,9 @@ pub mod db_record; pub mod error; pub mod listen; pub mod locks; +pub mod messaging; pub mod request; +pub mod state_machine; pub mod users; pub use db_record::*; diff --git a/iml-warp-drive/src/locks.rs b/iml-warp-drive/src/locks.rs index 92369940f7..df996e128a 100644 --- a/iml-warp-drive/src/locks.rs +++ b/iml-warp-drive/src/locks.rs @@ -3,7 +3,7 @@ // license that can be found in the LICENSE file. use crate::request::Request; -use futures::{Stream, TryStreamExt}; +use futures::{lock::Mutex, Stream, TryStreamExt}; use im::{HashMap, HashSet}; use iml_rabbit::{ basic_consume, basic_publish, bind_queue, declare_transient_exchange, declare_transient_queue, @@ -11,6 +11,9 @@ use iml_rabbit::{ Queue, }; use iml_wire_types::{LockAction, LockChange, ToCompositeId}; +use std::sync::Arc; + +pub type SharedLocks = Arc>; /// Declares the exchange for rpc comms async fn declare_rpc_exchange(c: &Channel) -> Result<(), ImlRabbitError> { diff --git a/iml-warp-drive/src/main.rs b/iml-warp-drive/src/main.rs index 019d1e1431..86e5183e33 100644 --- a/iml-warp-drive/src/main.rs +++ b/iml-warp-drive/src/main.rs @@ -8,15 +8,13 @@ use iml_postgres::get_db_pool; use iml_warp_drive::{ cache::{populate_from_api, populate_from_db, SharedCache}, error, listen, - locks::{self, create_locks_consumer, Locks}, + locks::{self, create_locks_consumer, SharedLocks}, users, }; use iml_wire_types::warp_drive::{Cache, Message}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use warp::Filter; -type SharedLocks = Arc>; - #[tokio::main] async fn main() -> Result<(), Box> { iml_tracing::init(); @@ -29,6 +27,8 @@ async fn main() -> Result<(), Box> { let api_cache_state: SharedCache = Arc::new(Mutex::new(Cache::default())); + let job_states = Arc::new(Mutex::new(HashMap::new())); + // Clone here to allow SSE route to get a ref. let user_state2 = Arc::clone(&user_state); let lock_state2 = Arc::clone(&lock_state); @@ -79,11 +79,15 @@ async fn main() -> Result<(), Box> { tracing::info!("Started listening to NOTIFY events"); - { - let pool = get_db_pool(2).await?; + let pg_pool = get_db_pool(4).await?; - populate_from_db(Arc::clone(&api_cache_state3), &pool).await?; - } + tokio::spawn(iml_state_machine::run_jobs( + iml_action_client::Client::default(), + pg_pool.clone(), + Arc::clone(&job_states), + )); + + populate_from_db(Arc::clone(&api_cache_state3), &pg_pool).await?; let pool = iml_rabbit::connect_to_rabbit(1); @@ -138,42 +142,22 @@ async fn main() -> Result<(), Box> { }), ); - // GET -> messages stream - let routes = warp::get() - .and(warp::any().map(move || Arc::clone(&user_state2))) - .and(warp::any().map(move || Arc::clone(&lock_state2))) - .and(warp::any().map(move || Arc::clone(&api_cache_state2))) - .and_then( - |users: users::SharedUsers, locks: SharedLocks, api_cache: SharedCache| { - tracing::debug!("Inside user route"); - - async move { - // reply using server-sent events - let stream = users::user_connected( - users, - locks.lock().await.clone(), - api_cache.lock().await.clone(), - ) - .await; - - Ok::<_, error::ImlWarpDriveError>(warp::sse::reply( - warp::sse::keep_alive().stream(stream), - )) - } - .map_err(warp::reject::custom) - }, - ) - .with(warp::log("iml-warp-drive::api")); - let addr = iml_manager_env::get_warp_drive_addr(); tracing::info!("Listening on {}", addr); - let (_, fut) = warp::serve(routes).bind_with_graceful_shutdown( - addr, - tokio_runtime_shutdown::when_finished(&valve) - .then(move |_| users::disconnect_all_users(user_state3)), - ); + let messaging_route = + iml_warp_drive::messaging::route(user_state2, Arc::clone(&api_cache_state2), lock_state2); + + let state_machine_routes = + iml_warp_drive::state_machine::route(api_cache_state2, job_states, pg_pool); + + let (_, fut) = warp::serve(messaging_route.or(state_machine_routes)) + .bind_with_graceful_shutdown( + addr, + tokio_runtime_shutdown::when_finished(&valve) + .then(move |_| users::disconnect_all_users(user_state3)), + ); fut.await; diff --git a/iml-warp-drive/src/messaging.rs b/iml-warp-drive/src/messaging.rs new file mode 100644 index 0000000000..12b7db81d4 --- /dev/null +++ b/iml-warp-drive/src/messaging.rs @@ -0,0 +1,41 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{cache, error, locks::SharedLocks, users}; +use futures::TryFutureExt; +use std::sync::Arc; +use warp::Filter; + +pub fn route( + user_state: users::SharedUsers, + api_cache_state: cache::SharedCache, + lock_state: SharedLocks, +) -> impl Filter + Clone { + warp::path("messaging") + .and(warp::get()) + .and(warp::any().map(move || Arc::clone(&user_state))) + .and(warp::any().map(move || Arc::clone(&lock_state))) + .and(warp::any().map(move || Arc::clone(&api_cache_state))) + .and_then( + |users: users::SharedUsers, locks: SharedLocks, api_cache: cache::SharedCache| { + tracing::debug!("Inside messaging route"); + + async move { + // reply using server-sent events + let stream = users::user_connected( + users, + locks.lock().await.clone(), + api_cache.lock().await.clone(), + ) + .await; + + Ok::<_, error::ImlWarpDriveError>(warp::sse::reply( + warp::sse::keep_alive().stream(stream), + )) + } + .map_err(warp::reject::custom) + }, + ) + .with(warp::log("iml-warp-drive::messaging")) +} diff --git a/iml-warp-drive/src/state_machine.rs b/iml-warp-drive/src/state_machine.rs new file mode 100644 index 0000000000..86e2abca51 --- /dev/null +++ b/iml-warp-drive/src/state_machine.rs @@ -0,0 +1,95 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{cache, error::ImlWarpDriveError}; +use iml_postgres::PgPool; +use iml_state_machine::{graph::StateGraphExt, run_command, JobStates}; +use iml_wire_types::{ + state_machine::{Command, Transition}, + warp_drive::RecordId, +}; +use std::sync::Arc; +use warp::Filter; + +pub fn route( + shared_cache: cache::SharedCache, + job_states: JobStates, + pg_pool: PgPool, +) -> impl Filter + Clone { + let route = warp::path("state_machine"); + + let shared_cache_filter = warp::any().map(move || Arc::clone(&shared_cache)); + + let get_transitions_route = route + .clone() + .and(shared_cache_filter.clone()) + .and(warp::path("get_transitions")) + .and(warp::path::end()) + .and(warp::post()) + .and(warp::body::json()) + .and_then( + |shared_cache: cache::SharedCache, record_id: RecordId| async move { + tracing::debug!("Inside state_machine route"); + + let cache = shared_cache.lock().await; + + let g = iml_state_machine::graph::build_graph(); + + let x = record_id + .to_state(&cache) + .and_then(|x| g.get_state_node(x)) + .map(|x| g.get_available_transitions(x)) + .unwrap_or_default(); + + Ok::<_, warp::Rejection>(warp::reply::json(&x)) + }, + ); + + let get_transition_path_route = + route + .clone() + .and(shared_cache_filter) + .and(warp::path("get_transition_path")) + .and(warp::path::end()) + .and(warp::post()) + .and(warp::body::json()) + .and_then( + |shared_cache: cache::SharedCache, + (record_id, transition): (RecordId, Transition)| async move { + let cache = shared_cache.lock().await; + + let g = iml_state_machine::graph::build_graph(); + + let xs = record_id + .to_state(&cache) + .and_then(|x| g.get_transition_path(x, transition)) + .unwrap_or_default(); + + Ok::<_, warp::Rejection>(warp::reply::json(&xs)) + }, + ); + + let run_command_route = route + .clone() + .and(warp::path("run_command")) + .and(warp::path::end()) + .and(warp::post()) + .and(warp::any().map(move || pg_pool.clone())) + .and(warp::any().map(move || Arc::clone(&job_states))) + .and(warp::body::json()) + .and_then( + |pg_pool: PgPool, job_states: JobStates, command: Command| async move { + let cmd = run_command(&pg_pool, &job_states, command) + .await + .map_err(ImlWarpDriveError::StateMachineError) + .map_err(warp::reject::custom)?; + + Ok::<_, warp::Rejection>(warp::reply::json(&cmd)) + }, + ); + + get_transitions_route + .or(get_transition_path_route) + .or(run_command_route) +} diff --git a/iml-wire-types/src/graphql_duration.rs b/iml-wire-types/src/graphql_duration.rs index a31bb8016e..ebfe2e5a36 100644 --- a/iml-wire-types/src/graphql_duration.rs +++ b/iml-wire-types/src/graphql_duration.rs @@ -25,7 +25,7 @@ where juniper::Value::scalar(humantime::format_duration(self.0).to_string()) } - fn from_input_value(value: &juniper::InputValue) -> Option { + fn from_input_value(value: &juniper::InputValue) -> Option { value.as_string_value()?.to_string().try_into().ok() } diff --git a/iml-wire-types/src/lib.rs b/iml-wire-types/src/lib.rs index 6f179e6b38..c08d83550c 100644 --- a/iml-wire-types/src/lib.rs +++ b/iml-wire-types/src/lib.rs @@ -8,6 +8,7 @@ pub mod graphql_duration; pub mod high_availability; pub mod sfa; pub mod snapshot; +pub mod state_machine; pub mod stratagem; pub mod task; pub mod warp_drive; diff --git a/iml-wire-types/src/snapshot.rs b/iml-wire-types/src/snapshot.rs index 5c53cb11a9..7bbe93cf5b 100644 --- a/iml-wire-types/src/snapshot.rs +++ b/iml-wire-types/src/snapshot.rs @@ -7,6 +7,7 @@ use crate::{ db::{Id, TableName}, graphql_duration::GraphQLDuration, + warp_drive::RecordId, }; use chrono::{offset::Utc, DateTime}; use std::str::FromStr; @@ -56,6 +57,12 @@ impl Id for SnapshotRecord { } } +impl From<&SnapshotRecord> for RecordId { + fn from(x: &SnapshotRecord) -> Self { + Self::Snapshot(x.id) + } +} + pub const SNAPSHOT_TABLE_NAME: TableName = TableName("snapshot"); #[cfg_attr(feature = "graphql", derive(juniper::GraphQLObject))] @@ -131,7 +138,7 @@ impl FromStr for ReserveUnit { } } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to create a snapshot pub struct Create { @@ -147,7 +154,7 @@ pub struct Create { pub comment: Option, } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to destroy the snapshot pub struct Destroy { @@ -161,7 +168,7 @@ pub struct Destroy { pub force: bool, } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to mount the snapshot pub struct Mount { @@ -171,7 +178,7 @@ pub struct Mount { pub name: String, } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to unmount the snapshot pub struct Unmount { diff --git a/iml-wire-types/src/state_machine.rs b/iml-wire-types/src/state_machine.rs new file mode 100644 index 0000000000..cccd5e0bc8 --- /dev/null +++ b/iml-wire-types/src/state_machine.rs @@ -0,0 +1,128 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::snapshot::{Create, Destroy, Mount, Unmount}; +use chrono::Utc; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLEnum))] +pub enum Transition { + CreateSnapshot, + MountSnapshot, + UnmountSnapshot, + RemoveSnapshot, +} + +impl Transition { + pub fn description(&self) -> &str { + match self { + Self::CreateSnapshot => "Create Snapshot", + Self::MountSnapshot => "Mount snapshot", + Self::UnmountSnapshot => "Unmount snapshot", + Self::RemoveSnapshot => "Remove snapshot", + } + } +} + +impl From for Edge { + fn from(x: Transition) -> Edge { + Edge::Transition(x) + } +} + +#[derive( + serde::Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, +)] +pub enum State { + Snapshot(snapshot::State), +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Edge { + Transition(Transition), + Dependency(State), +} + +impl Edge { + pub fn is_transition(&self) -> bool { + match self { + Self::Transition(_) => true, + Self::Dependency(_) => false, + } + } +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub enum Job { + CreateSnapshotJob(Create), + MountSnapshotJob(Mount), + UnmountSnapshotJob(Unmount), + RemoveSnapshotJob(Destroy), +} + +#[cfg_attr(feature = "postgres-interop", derive(sqlx::Type))] +#[cfg_attr(feature = "postgres-interop", sqlx(rename = "machine_state"))] +#[cfg_attr(feature = "postgres-interop", sqlx(rename_all = "lowercase"))] +#[derive(serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Debug)] +#[serde(rename_all = "lowercase")] +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLEnum))] +pub enum CurrentState { + Pending, + Progress, + Failed, + Succeeded, + Cancelled, +} + +#[derive(serde::Serialize, serde::Deserialize)] +pub struct Command { + pub message: String, + pub jobs: Vec, +} + +#[derive(serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLObject))] +pub struct CommandRecord { + pub id: i32, + pub start_time: chrono::DateTime, + pub end_time: Option>, + pub state: CurrentState, + pub message: String, + pub jobs: Vec, +} + +pub mod snapshot { + use crate::state_machine; + + #[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Clone, + Copy, + Eq, + PartialEq, + Ord, + PartialOrd, + Hash, + )] + pub enum State { + Unknown, + Unmounted, + Mounted, + Removed, + } + + impl Default for State { + fn default() -> Self { + Self::Unknown + } + } + + impl From for state_machine::State { + fn from(x: State) -> state_machine::State { + state_machine::State::Snapshot(x) + } + } +} diff --git a/iml-wire-types/src/warp_drive.rs b/iml-wire-types/src/warp_drive.rs index 1b74c1e19e..1a6dffa6ba 100644 --- a/iml-wire-types/src/warp_drive.rs +++ b/iml-wire-types/src/warp_drive.rs @@ -11,6 +11,7 @@ use crate::{ }, sfa::{SfaController, SfaDiskDrive, SfaEnclosure, SfaJob, SfaPowerSupply, SfaStorageSystem}, snapshot::{SnapshotInterval, SnapshotRecord, SnapshotRetention}, + state_machine::{self, State}, Alert, CompositeId, EndpointNameSelf, Filesystem, Host, Label, LockChange, Target, TargetConfParam, ToCompositeId, }; @@ -675,6 +676,96 @@ impl Deref for RecordId { } } +impl RecordId { + pub fn to_state(&self, cache: &Cache) -> Option { + match self { + RecordId::Snapshot(id) => { + let snap = cache.snapshot.get(&id)?; + + let snap = if snap.mounted == Some(true) { + State::Snapshot(state_machine::snapshot::State::Mounted) + } else { + State::Snapshot(state_machine::snapshot::State::Unmounted) + }; + + Some(snap) + } + _ => None, + } + } +} + +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLEnum))] +#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)] +pub enum RecordType { + ActiveAlert, + ContentType, + CorosyncConfiguration, + Filesystem, + Group, + Host, + LnetConfiguration, + ManagedTargetMount, + OstPool, + OstPoolOsts, + PacemakerConfiguration, + SfaDiskDrive, + SfaEnclosure, + SfaStorageSystem, + SfaJob, + SfaPowerSupply, + SfaController, + StratagemConfig, + Snapshot, + SnapshotInterval, + SnapshotRetention, + Target, + User, + UserGroup, + Volume, + VolumeNode, +} + +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLInputObject))] +#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)] +pub struct GraphqlRecordId { + pub r#type: RecordType, + pub id: i32, +} + +impl From for RecordId { + fn from(GraphqlRecordId { r#type, id }: GraphqlRecordId) -> Self { + match r#type { + RecordType::ActiveAlert => RecordId::ActiveAlert(id), + RecordType::ContentType => RecordId::ContentType(id), + RecordType::CorosyncConfiguration => RecordId::CorosyncConfiguration(id), + RecordType::Filesystem => RecordId::Filesystem(id), + RecordType::Group => RecordId::Group(id), + RecordType::Host => RecordId::Host(id), + RecordType::LnetConfiguration => RecordId::LnetConfiguration(id), + RecordType::ManagedTargetMount => RecordId::ManagedTargetMount(id), + RecordType::OstPool => RecordId::OstPool(id), + RecordType::OstPoolOsts => RecordId::OstPoolOsts(id), + RecordType::PacemakerConfiguration => RecordId::PacemakerConfiguration(id), + RecordType::SfaDiskDrive => RecordId::SfaDiskDrive(id), + RecordType::SfaEnclosure => RecordId::SfaEnclosure(id), + RecordType::SfaStorageSystem => RecordId::SfaStorageSystem(id), + RecordType::SfaJob => RecordId::SfaJob(id), + RecordType::SfaPowerSupply => RecordId::SfaPowerSupply(id), + RecordType::SfaController => RecordId::SfaController(id), + RecordType::StratagemConfig => RecordId::StratagemConfig(id), + RecordType::Snapshot => RecordId::Snapshot(id), + RecordType::SnapshotInterval => RecordId::SnapshotInterval(id), + RecordType::SnapshotRetention => RecordId::SnapshotRetention(id), + RecordType::Target => RecordId::Target(id), + RecordType::User => RecordId::User(id), + RecordType::UserGroup => RecordId::UserGroup(id), + RecordType::Volume => RecordId::Volume(id), + RecordType::VolumeNode => RecordId::VolumeNode(id), + } + } +} + #[allow(clippy::large_enum_variant)] #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(tag = "tag", content = "payload")] diff --git a/migrations/20201026195644_state_machine.sql b/migrations/20201026195644_state_machine.sql new file mode 100644 index 0000000000..244882f30b --- /dev/null +++ b/migrations/20201026195644_state_machine.sql @@ -0,0 +1,43 @@ +CREATE TYPE machine_state AS ENUM ( + 'pending', + 'progress', + 'failed', + 'succeeded', + 'cancelled' +); + +CREATE TYPE step_state as ENUM ( + 'progress', + 'failed', + 'cancelled' +); + +CREATE TABLE IF NOT EXISTS command ( + id serial PRIMARY KEY, + start_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + end_time TIMESTAMP WITH TIME ZONE, + state machine_state NOT NULL DEFAULT 'pending', + message TEXT NOT NULL, + jobs int[] NOT NULL DEFAULT array[]::int[] +); + +CREATE TABLE IF NOT EXISTS job ( + id serial PRIMARY KEY, + start_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + end_time TIMESTAMP WITH TIME ZONE, + state machine_state NOT NULL DEFAULT 'pending', + command_id INT NOT NULL REFERENCES command (id) ON DELETE CASCADE, + job jsonb NOT NULL, + wait_for_jobs int[] NOT NULL, + locked_records jsonb[] +); + +CREATE TABLE IF NOT EXISTS step ( + id serial PRIMARY KEY, + start_time TIMESTAMP WITH TIME ZONE NOT NULL, + end_time TIMESTAMP WITH TIME ZONE, + job_id INT NOT NULL REFERENCES job (id) ON DELETE CASCADE, + result jsonb, + logs text NOT NULL DEFAULT '', + state step_state NOT NULL DEFAULT 'progress' +) \ No newline at end of file diff --git a/sqlx-data.json b/sqlx-data.json index e40b57e953..bf190aecf4 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -195,6 +195,27 @@ "nullable": [] } }, + "0b1ae42a49266622456c9c8b5fa836054b0bc0258d3bab66cdb4e130665547ef": { + "query": "SELECT id FROM snapshot WHERE snapshot_name = $1 AND filesystem_name = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false + ] + } + }, "1167f9862155b35e2bb59ba77286ccfc35c7f6113227d8dce116b3def418e2f9": { "query": "\n INSERT INTO chroma_core_sfastoragesystem\n (\n uuid,\n platform,\n health_state_reason,\n health_state,\n child_health_state\n )\n VALUES ($1, $2, $3, $4, $5)\n ON CONFLICT (uuid) DO UPDATE\n SET\n platform = excluded.platform,\n health_state_reason = excluded.health_state_reason,\n health_state = excluded.health_state,\n child_health_state = excluded.child_health_state\n ", "describe": { @@ -346,6 +367,32 @@ ] } }, + "158e5a5a3a32a58a0487c92e8f05ef0bcdc16d6c4ae1e9d3016a66d2836a2f36": { + "query": "SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "filesystem_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "snapshot_name", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false, + false + ] + } + }, "17645262c426038efcc8e22bf999c0d3cee07f52c4e276a1b607e2e00b2e62bd": { "query": "\n SELECT\n id,\n filesystem_name,\n reserve_value,\n reserve_unit as \"reserve_unit:ReserveUnit\",\n last_run,\n keep_num\n FROM snapshot_retention\n ", "describe": { @@ -2836,6 +2883,31 @@ ] } }, + "9557d6d9a74d1ad2d597c1f786dbcc7c6a3be86535a7c781d4a707ae2b88dd00": { + "query": "\n UPDATE job\n SET\n state = $1,\n end_time = now()\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + { + "Custom": { + "name": "machine_state", + "kind": { + "Enum": [ + "pending", + "progress", + "failed", + "succeeded", + "cancelled" + ] + } + } + } + ] + }, + "nullable": [] + } + }, "9a4c05da9d9233e6b3fa63ca2f50cf90feb0c305b1cc05e0eb2edcf2572db4ba": { "query": "select * from chroma_core_volume where not_deleted = 't'", "describe": { @@ -3311,6 +3383,28 @@ ] } }, + "b0795a0f0b333b7bd65f5a336b62463aa30a6c1b4e23079d1057c11fe416fbde": { + "query": "\n INSERT INTO job (command_id, job, wait_for_jobs, locked_records)\n VALUES ($1, $2, array[]::int[], $3)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int4", + "Jsonb", + "JsonbArray" + ] + }, + "nullable": [ + false + ] + } + }, "b0991443ae430ca73d4369f314b88f731ead796ec9ac353c3d237be9203c95bf": { "query": "UPDATE chroma_core_alertstate\n SET active = Null, \"end\" = now()\n WHERE\n active = true\n AND alert_item_id = $1\n AND record_type = ANY($2)\n ", "describe": { @@ -4389,6 +4483,69 @@ "nullable": [] } }, + "ea4c0d5efc5eb5fb0db892f9236e98688edea23676210d0b3e6a29a4c733a9ba": { + "query": "\n INSERT INTO command (message)\n VALUES ($1)\n RETURNING id, start_time, end_time, state as \"state: CurrentState\", message, jobs\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "start_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "end_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "state: CurrentState", + "type_info": { + "Custom": { + "name": "machine_state", + "kind": { + "Enum": [ + "pending", + "progress", + "failed", + "succeeded", + "cancelled" + ] + } + } + } + }, + { + "ordinal": 4, + "name": "message", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "jobs", + "type_info": "Int4Array" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false, + true, + false, + false, + false + ] + } + }, "ec70b9a5caeadc31f5d1359737cc1c6da64e41db8315a81d81420d3b37b182c5": { "query": "\n UPDATE chroma_core_task\n SET running_on_id = $1\n WHERE id = $2\n AND running_on_id is Null", "describe": { From 5aa70064c136b96b3c00615984c206bab03419aa Mon Sep 17 00:00:00 2001 From: Joe Grund Date: Fri, 13 Nov 2020 15:33:30 -0500 Subject: [PATCH 2/2] Signed-off-by: Joe Grund --- Cargo.lock | 85 ++++++++++++++++++++++++------------ iml-state-machine/Cargo.toml | 2 +- 2 files changed, 58 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b9174f1fa..aa33af54f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,6 +392,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" +[[package]] +name = "bytes" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" + [[package]] name = "cargo_metadata" version = "0.10.0" @@ -542,7 +548,7 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d79eb8c0bfd05a68f8b6195b27c4f2e042f86d52da032817f89a3847e0495ed" dependencies = [ - "bytes", + "bytes 0.5.6", "memchr", ] @@ -1256,7 +1262,7 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535" dependencies = [ - "bytes", + "bytes 0.5.6", "fnv", "futures-core", "futures-sink", @@ -1294,7 +1300,7 @@ checksum = "ed18eb2459bf1a09ad2d6b1547840c3e5e62882fa09b9a6a20b1de8e3228848f" dependencies = [ "base64 0.12.3", "bitflags", - "bytes", + "bytes 0.5.6", "headers-core", "http", "mime", @@ -1371,7 +1377,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" dependencies = [ - "bytes", + "bytes 0.5.6", "fnv", "itoa", ] @@ -1382,7 +1388,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ - "bytes", + "bytes 0.5.6", "http", ] @@ -1419,7 +1425,7 @@ version = "0.13.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf" dependencies = [ - "bytes", + "bytes 0.5.6", "futures-channel", "futures-core", "futures-util", @@ -1443,7 +1449,7 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37743cc83e8ee85eacfce90f2f4102030d9ff0a95244098d781e9bee4a90abb6" dependencies = [ - "bytes", + "bytes 0.5.6", "futures-util", "hyper", "log", @@ -1459,7 +1465,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed" dependencies = [ - "bytes", + "bytes 0.5.6", "hyper", "native-tls", "tokio 0.2.22", @@ -1509,7 +1515,7 @@ dependencies = [ name = "iml-action-client" version = "0.1.0" dependencies = [ - "bytes", + "bytes 0.5.6", "futures", "hyper", "hyperlocal", @@ -1554,7 +1560,7 @@ version = "0.4.0" dependencies = [ "async-trait", "byte-unit", - "bytes", + "bytes 0.5.6", "chrono", "combine 4.1.0", "console 0.12.0", @@ -1725,7 +1731,7 @@ dependencies = [ name = "iml-fs" version = "0.4.0" dependencies = [ - "bytes", + "bytes 0.5.6", "futures", "tempdir", "tempfile", @@ -1793,7 +1799,7 @@ dependencies = [ name = "iml-mailbox" version = "0.4.0" dependencies = [ - "bytes", + "bytes 0.5.6", "futures", "iml-fs", "iml-manager-env", @@ -1959,7 +1965,7 @@ dependencies = [ name = "iml-report" version = "0.1.0" dependencies = [ - "bytes", + "bytes 0.5.6", "futures", "iml-fs", "iml-manager-env", @@ -2192,7 +2198,7 @@ dependencies = [ name = "iml-wire-types" version = "0.4.0" dependencies = [ - "bytes", + "bytes 0.5.6", "chrono", "humantime 2.0.1", "im", @@ -2238,7 +2244,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20f7ba194095b44d05d7d2907fb3a04e7ea7ae1591cdd41fe800e555e51903c4" dependencies = [ - "bytes", + "bytes 0.5.6", "futures", "reqwest", "serde", @@ -2274,7 +2280,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" dependencies = [ - "bytes", + "bytes 0.5.6", ] [[package]] @@ -3107,7 +3113,7 @@ checksum = "4888a0e36637ab38d76cace88c1476937d617ad015f07f6b669cec11beacc019" dependencies = [ "base64 0.13.0", "byteorder", - "bytes", + "bytes 0.5.6", "fallible-iterator", "hmac 0.9.0", "md5", @@ -3123,7 +3129,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfc08a7d94a80665de4a83942fa8db2fdeaf2f123fc0535e384dc4fff251efae" dependencies = [ - "bytes", + "bytes 0.5.6", "fallible-iterator", "postgres-protocol", ] @@ -3516,7 +3522,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e" dependencies = [ "base64 0.12.3", - "bytes", + "bytes 0.5.6", "encoding_rs", "futures-core", "futures-util", @@ -3967,7 +3973,7 @@ dependencies = [ "base64 0.12.3", "bitflags", "byteorder", - "bytes", + "bytes 0.5.6", "chrono", "crc", "crossbeam-channel", @@ -4423,7 +4429,7 @@ version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd" dependencies = [ - "bytes", + "bytes 0.5.6", "fnv", "futures-core", "iovec", @@ -4437,7 +4443,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", - "tokio-macros", + "tokio-macros 0.2.5", "winapi 0.3.9", ] @@ -4448,7 +4454,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ca08accbcb46f11fd8d2d1c6158c348b7888009a1f39260bcad66f6a454250" dependencies = [ "autocfg 1.0.1", + "bytes 0.6.0", + "futures-core", + "lazy_static", + "libc", + "memchr", + "mio 0.7.5", + "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", + "slab", + "tokio-macros 0.3.1", + "winapi 0.3.9", ] [[package]] @@ -4472,6 +4490,17 @@ dependencies = [ "syn 1.0.48", ] +[[package]] +name = "tokio-macros" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21d30fdbb5dc2d8f91049691aa1a9d4d4ae422a21c334ce8936e5886d30c5c45" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.7", + "syn 1.0.48", +] + [[package]] name = "tokio-native-tls" version = "0.1.0" @@ -4490,7 +4519,7 @@ checksum = "55a2482c9fe4dd481723cf5c0616f34afc710e55dcda0944e12e7b3316117892" dependencies = [ "async-trait", "byteorder", - "bytes", + "bytes 0.5.6", "fallible-iterator", "futures", "log", @@ -4532,7 +4561,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d" dependencies = [ - "bytes", + "bytes 0.5.6", "futures-core", "tokio 0.2.22", ] @@ -4566,7 +4595,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ - "bytes", + "bytes 0.5.6", "futures-core", "futures-sink", "log", @@ -4680,7 +4709,7 @@ checksum = "f0308d80d86700c5878b9ef6321f020f29b1bb9d5ff3cab25e75e23f3a492a23" dependencies = [ "base64 0.12.3", "byteorder", - "bytes", + "bytes 0.5.6", "http", "httparse", "input_buffer", @@ -4887,7 +4916,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f41be6df54c97904af01aa23e613d4521eed7ab23537cede692d4058f6449407" dependencies = [ - "bytes", + "bytes 0.5.6", "futures", "headers", "http", @@ -4995,7 +5024,7 @@ version = "0.2.0" dependencies = [ "async-trait", "base64 0.12.3", - "bytes", + "bytes 0.5.6", "futures", "insta 1.1.0", "quick-xml 0.19.0", diff --git a/iml-state-machine/Cargo.toml b/iml-state-machine/Cargo.toml index eba03b619d..2579744198 100644 --- a/iml-state-machine/Cargo.toml +++ b/iml-state-machine/Cargo.toml @@ -15,5 +15,5 @@ petgraph = "0.5" serde = {version = "1", features = ["derive"]} serde_json = "1" thiserror = "1.0" -tokio = "0.3" +tokio = {version = "0.3", features = ["full"]} uuid = {version = "0.8", features = ["v4"]}