diff --git a/CHANGELOG.md b/CHANGELOG.md index b602870d55..6ee9fa8cc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,10 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 -## [Unreleased] +## [0.199.2] - 2024-09-09 ### Added - REST API: The `/query` endpoint now supports response proofs via reproducibility and signing (#816) - REST API: New `/{dataset}/metadata` endpoint for retrieving schema, description, attachments etc. (#816) +### Fixed +- Fixed unguaranteed ordering of events when restoring event sourcing aggregates +- Enqueuing and cancelling future flows should be done with transactions taken into account (via Outbox) ## [0.199.1] - 2024-09-06 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 4d2ed1aed1..aa483f01da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,19 +14,13 @@ dependencies = [ [[package]] name = "addr2line" -version = "0.22.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" dependencies = [ "gimli", ] -[[package]] -name = "adler" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" - [[package]] name = "adler2" version = "2.0.0" @@ -662,9 +656,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +checksum = "10f00e1f6e58a40e807377c75c6a7f97bf9044fab57816f2414e6f5f4499d7b8" [[package]] name = "approx" @@ -1789,17 +1783,17 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.73" +version = "0.3.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" dependencies = [ "addr2line", - "cc", "cfg-if", "libc", - "miniz_oxide 0.7.4", + "miniz_oxide", "object", "rustc-demangle", + "windows-targets 0.52.6", ] [[package]] @@ -2109,9 +2103,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.16" +version = "1.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9d013ecb737093c0e86b151a7b837993cf9ec6c502946cfb44bedc392421e0b" +checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476" dependencies = [ "jobserver", "libc", @@ -2381,7 +2375,7 @@ checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" [[package]] name = "container-runtime" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "cfg-if", @@ -2820,7 +2814,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "database-common" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "aws-config", @@ -2844,7 +2838,7 @@ dependencies = [ [[package]] name = "database-common-macros" -version = "0.199.1" +version = "0.199.2" dependencies = [ "quote", "syn 2.0.77", @@ -3599,7 +3593,7 @@ dependencies = [ [[package]] name = "enum-variants" -version = "0.199.1" +version = "0.199.2" [[package]] name = "env_filter" @@ -3672,7 +3666,7 @@ dependencies = [ [[package]] name = "event-sourcing" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -3688,7 +3682,7 @@ dependencies = [ [[package]] name = "event-sourcing-macros" -version = "0.199.1" +version = "0.199.2" dependencies = [ "quote", "syn 2.0.77", @@ -3804,7 +3798,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" dependencies = [ "crc32fast", - "miniz_oxide 0.8.0", + "miniz_oxide", ] [[package]] @@ -4036,9 +4030,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" [[package]] name = "glob" @@ -4057,9 +4051,9 @@ dependencies = [ [[package]] name = "grep-regex" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f748bb135ca835da5cbc67ca0e6955f968db9c5df74ca4f56b18e1ddbc68230d" +checksum = "9edd147c7e3296e7a26bd3a81345ce849557d5a8e48ed88f736074e760f91f7e" dependencies = [ "bstr", "grep-matcher", @@ -4070,9 +4064,9 @@ dependencies = [ [[package]] name = "grep-searcher" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba536ae4f69bec62d8839584dd3153d3028ef31bb229f04e09fb5a9e5a193c54" +checksum = "b9b6c14b3fc2e0a107d6604d3231dec0509e691e62447104bc385a46a7892cda" dependencies = [ "bstr", "encoding_rs", @@ -4352,7 +4346,7 @@ dependencies = [ [[package]] name = "http-common" -version = "0.199.1" +version = "0.199.2" dependencies = [ "axum", "http 0.2.12", @@ -4676,7 +4670,7 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "internal-error" -version = "0.199.1" +version = "0.199.2" dependencies = [ "thiserror", ] @@ -4695,9 +4689,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" [[package]] name = "is-terminal" @@ -4833,7 +4827,7 @@ dependencies = [ [[package]] name = "kamu" -version = "0.199.1" +version = "0.199.2" dependencies = [ "alloy", "async-recursion", @@ -4920,7 +4914,7 @@ dependencies = [ [[package]] name = "kamu-accounts" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "base32", @@ -4946,7 +4940,7 @@ dependencies = [ [[package]] name = "kamu-accounts-inmem" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -4967,7 +4961,7 @@ dependencies = [ [[package]] name = "kamu-accounts-mysql" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -4988,7 +4982,7 @@ dependencies = [ [[package]] name = "kamu-accounts-postgres" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5009,7 +5003,7 @@ dependencies = [ [[package]] name = "kamu-accounts-repo-tests" -version = "0.199.1" +version = "0.199.2" dependencies = [ "argon2", "chrono", @@ -5025,7 +5019,7 @@ dependencies = [ [[package]] name = "kamu-accounts-services" -version = "0.199.1" +version = "0.199.2" dependencies = [ "argon2", "async-trait", @@ -5051,7 +5045,7 @@ dependencies = [ [[package]] name = "kamu-accounts-sqlite" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5072,7 +5066,7 @@ dependencies = [ [[package]] name = "kamu-adapter-auth-oso" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "dill", @@ -5094,7 +5088,7 @@ dependencies = [ [[package]] name = "kamu-adapter-flight-sql" -version = "0.199.1" +version = "0.199.2" dependencies = [ "arrow-flight", "async-trait", @@ -5117,7 +5111,7 @@ dependencies = [ [[package]] name = "kamu-adapter-graphql" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-graphql", "async-trait", @@ -5167,7 +5161,7 @@ dependencies = [ [[package]] name = "kamu-adapter-http" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "aws-sdk-s3", @@ -5231,7 +5225,7 @@ dependencies = [ [[package]] name = "kamu-adapter-oauth" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5250,7 +5244,7 @@ dependencies = [ [[package]] name = "kamu-adapter-odata" -version = "0.199.1" +version = "0.199.2" dependencies = [ "axum", "chrono", @@ -5285,7 +5279,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "internal-error", @@ -5297,7 +5291,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac-inmem" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "database-common-macros", @@ -5311,7 +5305,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac-repo-tests" -version = "0.199.1" +version = "0.199.2" dependencies = [ "dill", "kamu-auth-rebac", @@ -5320,7 +5314,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac-services" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "dill", @@ -5339,7 +5333,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac-sqlite" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "database-common", @@ -5356,7 +5350,7 @@ dependencies = [ [[package]] name = "kamu-cli" -version = "0.199.1" +version = "0.199.2" dependencies = [ "arrow-flight", "async-graphql", @@ -5472,7 +5466,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-common" -version = "0.199.1" +version = "0.199.2" dependencies = [ "chrono", "indoc 2.0.5", @@ -5492,7 +5486,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-common-macros" -version = "0.199.1" +version = "0.199.2" dependencies = [ "quote", "syn 2.0.77", @@ -5500,7 +5494,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-inmem" -version = "0.199.1" +version = "0.199.2" dependencies = [ "indoc 2.0.5", "kamu-cli-e2e-common", @@ -5513,7 +5507,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-mysql" -version = "0.199.1" +version = "0.199.2" dependencies = [ "indoc 2.0.5", "kamu-cli-e2e-common", @@ -5527,7 +5521,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-postgres" -version = "0.199.1" +version = "0.199.2" dependencies = [ "indoc 2.0.5", "kamu-cli-e2e-common", @@ -5541,7 +5535,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-repo-tests" -version = "0.199.1" +version = "0.199.2" dependencies = [ "chrono", "indoc 2.0.5", @@ -5557,7 +5551,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-sqlite" -version = "0.199.1" +version = "0.199.2" dependencies = [ "indoc 2.0.5", "kamu-cli-e2e-common", @@ -5571,7 +5565,7 @@ dependencies = [ [[package]] name = "kamu-cli-puppet" -version = "0.199.1" +version = "0.199.2" dependencies = [ "assert_cmd", "async-trait", @@ -5587,7 +5581,7 @@ dependencies = [ [[package]] name = "kamu-core" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -5617,7 +5611,7 @@ dependencies = [ [[package]] name = "kamu-data-utils" -version = "0.199.1" +version = "0.199.2" dependencies = [ "arrow", "arrow-digest", @@ -5642,7 +5636,7 @@ dependencies = [ [[package]] name = "kamu-datafusion-cli" -version = "0.199.1" +version = "0.199.2" dependencies = [ "arrow", "async-trait", @@ -5664,7 +5658,7 @@ dependencies = [ [[package]] name = "kamu-datasets" -version = "0.199.1" +version = "0.199.2" dependencies = [ "aes-gcm", "async-trait", @@ -5683,7 +5677,7 @@ dependencies = [ [[package]] name = "kamu-datasets-inmem" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5706,7 +5700,7 @@ dependencies = [ [[package]] name = "kamu-datasets-postgres" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5728,7 +5722,7 @@ dependencies = [ [[package]] name = "kamu-datasets-repo-tests" -version = "0.199.1" +version = "0.199.2" dependencies = [ "chrono", "database-common", @@ -5742,7 +5736,7 @@ dependencies = [ [[package]] name = "kamu-datasets-services" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5763,7 +5757,7 @@ dependencies = [ [[package]] name = "kamu-datasets-sqlite" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5786,7 +5780,7 @@ dependencies = [ [[package]] name = "kamu-flow-system" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5814,7 +5808,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-inmem" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -5844,7 +5838,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-postgres" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -5868,7 +5862,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-repo-tests" -version = "0.199.1" +version = "0.199.2" dependencies = [ "chrono", "database-common", @@ -5881,7 +5875,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-services" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -5923,7 +5917,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-sqlite" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -5947,7 +5941,7 @@ dependencies = [ [[package]] name = "kamu-ingest-datafusion" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -5984,7 +5978,7 @@ dependencies = [ [[package]] name = "kamu-messaging-outbox-inmem" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -6003,7 +5997,7 @@ dependencies = [ [[package]] name = "kamu-messaging-outbox-postgres" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -6026,7 +6020,7 @@ dependencies = [ [[package]] name = "kamu-messaging-outbox-repo-tests" -version = "0.199.1" +version = "0.199.2" dependencies = [ "chrono", "database-common", @@ -6040,7 +6034,7 @@ dependencies = [ [[package]] name = "kamu-messaging-outbox-sqlite" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -6062,7 +6056,7 @@ dependencies = [ [[package]] name = "kamu-repo-tools" -version = "0.199.1" +version = "0.199.2" dependencies = [ "chrono", "clap", @@ -6077,7 +6071,7 @@ dependencies = [ [[package]] name = "kamu-task-system" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -6095,7 +6089,7 @@ dependencies = [ [[package]] name = "kamu-task-system-inmem" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -6114,7 +6108,7 @@ dependencies = [ [[package]] name = "kamu-task-system-postgres" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -6137,7 +6131,7 @@ dependencies = [ [[package]] name = "kamu-task-system-repo-tests" -version = "0.199.1" +version = "0.199.2" dependencies = [ "chrono", "database-common", @@ -6149,7 +6143,7 @@ dependencies = [ [[package]] name = "kamu-task-system-services" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -6174,7 +6168,7 @@ dependencies = [ [[package]] name = "kamu-task-system-sqlite" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-stream", "async-trait", @@ -6564,7 +6558,7 @@ dependencies = [ [[package]] name = "messaging-outbox" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -6607,15 +6601,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "miniz_oxide" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" -dependencies = [ - "adler", -] - [[package]] name = "miniz_oxide" version = "0.8.0" @@ -6711,7 +6696,7 @@ dependencies = [ [[package]] name = "multiformats" -version = "0.199.1" +version = "0.199.2" dependencies = [ "base64 0.22.1", "bs58", @@ -7037,7 +7022,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "opendatafabric" -version = "0.199.1" +version = "0.199.2" dependencies = [ "arrow", "base64 0.22.1", @@ -7184,9 +7169,9 @@ dependencies = [ [[package]] name = "parking" -version = "2.2.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" [[package]] name = "parking_lot" @@ -7329,9 +7314,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.7.11" +version = "2.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95" +checksum = "9c73c26c01b8c87956cea613c907c9d6ecffd8d18a2a5908e5de0adfaa185cea" dependencies = [ "memchr", "thiserror", @@ -7340,9 +7325,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.11" +version = "2.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a548d2beca6773b1c244554d36fcf8548a8a58e74156968211567250e48e49a" +checksum = "664d22978e2815783adbdd2c588b455b1bd625299ce36b2a99881ac9627e6d8d" dependencies = [ "pest", "pest_generator", @@ -7350,9 +7335,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.11" +version = "2.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c93a82e8d145725dcbaf44e5ea887c8a869efdcc28706df2d08c69e17077183" +checksum = "a2d5487022d5d33f4c30d91c22afa240ce2a644e87fe08caad974d4eab6badbe" dependencies = [ "pest", "pest_meta", @@ -7363,9 +7348,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.11" +version = "2.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a941429fea7e08bedec25e4f6785b6ffaacc6b755da98df5ef3e7dcf4a124c4f" +checksum = "0091754bbd0ea592c4deb3a122ce8ecbb0753b738aa82bc055fcc2eccc8d8174" dependencies = [ "once_cell", "pest", @@ -7510,9 +7495,9 @@ checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" [[package]] name = "plotters" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15b6eccb8484002195a3e44fe65a4ce8e93a625797a063735536fd59cb01cf3" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" dependencies = [ "num-traits", "plotters-backend", @@ -7523,15 +7508,15 @@ dependencies = [ [[package]] name = "plotters-backend" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414cec62c6634ae900ea1c56128dfe87cf63e7caece0852ec76aba307cebadb7" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" [[package]] name = "plotters-svg" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81b30686a7d9c3e010b84284bdd26a29f2138574f52f5eb6f794fc0ad924e705" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" dependencies = [ "plotters-backend", ] @@ -7901,7 +7886,7 @@ dependencies = [ [[package]] name = "random-names" -version = "0.199.1" +version = "0.199.2" dependencies = [ "rand", ] @@ -8495,11 +8480,11 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +checksum = "e9aaafd5a2b6e3d657ff009d82fbd630b6bd54dd4eb06f21693925cdf80f9b8b" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8619,18 +8604,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.209" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09" +checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.209" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" +checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", @@ -8955,9 +8940,9 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.2.4" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f895e3734318cc55f1fe66258926c9b910c124d47520339efecbb6c59cec7c1f" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" dependencies = [ "nom", "unicode_categories", @@ -9540,7 +9525,7 @@ dependencies = [ [[package]] name = "time-source" -version = "0.199.1" +version = "0.199.2" dependencies = [ "async-trait", "chrono", @@ -9927,7 +9912,7 @@ dependencies = [ [[package]] name = "tracing-perfetto" -version = "0.199.1" +version = "0.199.2" dependencies = [ "conv", "serde", diff --git a/Cargo.toml b/Cargo.toml index c41aebc04e..210f26ab83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,91 +88,91 @@ resolver = "2" [workspace.dependencies] # Apps -kamu-cli = { version = "0.199.1", path = "src/app/cli", default-features = false } +kamu-cli = { version = "0.199.2", path = "src/app/cli", default-features = false } # Utils -container-runtime = { version = "0.199.1", path = "src/utils/container-runtime", default-features = false } -database-common = { version = "0.199.1", path = "src/utils/database-common", default-features = false } -database-common-macros = { version = "0.199.1", path = "src/utils/database-common-macros", default-features = false } -enum-variants = { version = "0.199.1", path = "src/utils/enum-variants", default-features = false } -event-sourcing = { version = "0.199.1", path = "src/utils/event-sourcing", default-features = false } -event-sourcing-macros = { version = "0.199.1", path = "src/utils/event-sourcing-macros", default-features = false } -http-common = { version = "0.199.1", path = "src/utils/http-common", default-features = false } -internal-error = { version = "0.199.1", path = "src/utils/internal-error", default-features = false } -kamu-cli-puppet = { version = "0.199.1", path = "src/utils/kamu-cli-puppet", default-features = false } -kamu-data-utils = { version = "0.199.1", path = "src/utils/data-utils", default-features = false } -kamu-datafusion-cli = { version = "0.199.1", path = "src/utils/datafusion-cli", default-features = false } -messaging-outbox = { version = "0.199.1", path = "src/utils/messaging-outbox", default-features = false } -multiformats = { version = "0.199.1", path = "src/utils/multiformats", default-features = false } -random-names = { version = "0.199.1", path = "src/utils/random-names", default-features = false } -time-source = { version = "0.199.1", path = "src/utils/time-source", default-features = false } -tracing-perfetto = { version = "0.199.1", path = "src/utils/tracing-perfetto", default-features = false } +container-runtime = { version = "0.199.2", path = "src/utils/container-runtime", default-features = false } +database-common = { version = "0.199.2", path = "src/utils/database-common", default-features = false } +database-common-macros = { version = "0.199.2", path = "src/utils/database-common-macros", default-features = false } +enum-variants = { version = "0.199.2", path = "src/utils/enum-variants", default-features = false } +event-sourcing = { version = "0.199.2", path = "src/utils/event-sourcing", default-features = false } +event-sourcing-macros = { version = "0.199.2", path = "src/utils/event-sourcing-macros", default-features = false } +http-common = { version = "0.199.2", path = "src/utils/http-common", default-features = false } +internal-error = { version = "0.199.2", path = "src/utils/internal-error", default-features = false } +kamu-cli-puppet = { version = "0.199.2", path = "src/utils/kamu-cli-puppet", default-features = false } +kamu-data-utils = { version = "0.199.2", path = "src/utils/data-utils", default-features = false } +kamu-datafusion-cli = { version = "0.199.2", path = "src/utils/datafusion-cli", default-features = false } +messaging-outbox = { version = "0.199.2", path = "src/utils/messaging-outbox", default-features = false } +multiformats = { version = "0.199.2", path = "src/utils/multiformats", default-features = false } +random-names = { version = "0.199.2", path = "src/utils/random-names", default-features = false } +time-source = { version = "0.199.2", path = "src/utils/time-source", default-features = false } +tracing-perfetto = { version = "0.199.2", path = "src/utils/tracing-perfetto", default-features = false } # Domain -kamu-accounts = { version = "0.199.1", path = "src/domain/accounts/domain", default-features = false } -kamu-auth-rebac = { version = "0.199.1", path = "src/domain/auth-rebac/domain", default-features = false } -kamu-core = { version = "0.199.1", path = "src/domain/core", default-features = false } -kamu-datasets = { version = "0.199.1", path = "src/domain/datasets/domain", default-features = false } -kamu-flow-system = { version = "0.199.1", path = "src/domain/flow-system/domain", default-features = false } -kamu-task-system = { version = "0.199.1", path = "src/domain/task-system/domain", default-features = false } -opendatafabric = { version = "0.199.1", path = "src/domain/opendatafabric", default-features = false } +kamu-accounts = { version = "0.199.2", path = "src/domain/accounts/domain", default-features = false } +kamu-auth-rebac = { version = "0.199.2", path = "src/domain/auth-rebac/domain", default-features = false } +kamu-core = { version = "0.199.2", path = "src/domain/core", default-features = false } +kamu-datasets = { version = "0.199.2", path = "src/domain/datasets/domain", default-features = false } +kamu-flow-system = { version = "0.199.2", path = "src/domain/flow-system/domain", default-features = false } +kamu-task-system = { version = "0.199.2", path = "src/domain/task-system/domain", default-features = false } +opendatafabric = { version = "0.199.2", path = "src/domain/opendatafabric", default-features = false } # Domain service layer -kamu-accounts-services = { version = "0.199.1", path = "src/domain/accounts/services", default-features = false } -kamu-auth-rebac-services = { version = "0.199.1", path = "src/domain/auth-rebac/services", default-features = false } -kamu-datasets-services = { version = "0.199.1", path = "src/domain/datasets/services", default-features = false } -kamu-flow-system-services = { version = "0.199.1", path = "src/domain/flow-system/services", default-features = false } -kamu-task-system-services = { version = "0.199.1", path = "src/domain/task-system/services", default-features = false } +kamu-accounts-services = { version = "0.199.2", path = "src/domain/accounts/services", default-features = false } +kamu-auth-rebac-services = { version = "0.199.2", path = "src/domain/auth-rebac/services", default-features = false } +kamu-datasets-services = { version = "0.199.2", path = "src/domain/datasets/services", default-features = false } +kamu-flow-system-services = { version = "0.199.2", path = "src/domain/flow-system/services", default-features = false } +kamu-task-system-services = { version = "0.199.2", path = "src/domain/task-system/services", default-features = false } # Infra -kamu = { version = "0.199.1", path = "src/infra/core", default-features = false } -kamu-ingest-datafusion = { version = "0.199.1", path = "src/infra/ingest-datafusion", default-features = false } +kamu = { version = "0.199.2", path = "src/infra/core", default-features = false } +kamu-ingest-datafusion = { version = "0.199.2", path = "src/infra/ingest-datafusion", default-features = false } ## Flow System -kamu-flow-system-repo-tests = { version = "0.199.1", path = "src/infra/flow-system/repo-tests", default-features = false } -kamu-flow-system-inmem = { version = "0.199.1", path = "src/infra/flow-system/inmem", default-features = false } -kamu-flow-system-postgres = { version = "0.199.1", path = "src/infra/flow-system/postgres", default-features = false } -kamu-flow-system-sqlite = { version = "0.199.1", path = "src/infra/flow-system/sqlite", default-features = false } +kamu-flow-system-repo-tests = { version = "0.199.2", path = "src/infra/flow-system/repo-tests", default-features = false } +kamu-flow-system-inmem = { version = "0.199.2", path = "src/infra/flow-system/inmem", default-features = false } +kamu-flow-system-postgres = { version = "0.199.2", path = "src/infra/flow-system/postgres", default-features = false } +kamu-flow-system-sqlite = { version = "0.199.2", path = "src/infra/flow-system/sqlite", default-features = false } ## Accounts -kamu-accounts-inmem = { version = "0.199.1", path = "src/infra/accounts/inmem", default-features = false } -kamu-accounts-mysql = { version = "0.199.1", path = "src/infra/accounts/mysql", default-features = false } -kamu-accounts-postgres = { version = "0.199.1", path = "src/infra/accounts/postgres", default-features = false } -kamu-accounts-sqlite = { version = "0.199.1", path = "src/infra/accounts/sqlite", default-features = false } -kamu-accounts-repo-tests = { version = "0.199.1", path = "src/infra/accounts/repo-tests", default-features = false } +kamu-accounts-inmem = { version = "0.199.2", path = "src/infra/accounts/inmem", default-features = false } +kamu-accounts-mysql = { version = "0.199.2", path = "src/infra/accounts/mysql", default-features = false } +kamu-accounts-postgres = { version = "0.199.2", path = "src/infra/accounts/postgres", default-features = false } +kamu-accounts-sqlite = { version = "0.199.2", path = "src/infra/accounts/sqlite", default-features = false } +kamu-accounts-repo-tests = { version = "0.199.2", path = "src/infra/accounts/repo-tests", default-features = false } ## Datasets -kamu-datasets-inmem = { version = "0.199.1", path = "src/infra/datasets/inmem", default-features = false } -kamu-datasets-postgres = { version = "0.199.1", path = "src/infra/datasets/postgres", default-features = false } -kamu-datasets-sqlite = { version = "0.199.1", path = "src/infra/datasets/sqlite", default-features = false } -kamu-datasets-repo-tests = { version = "0.199.1", path = "src/infra/datasets/repo-tests", default-features = false } +kamu-datasets-inmem = { version = "0.199.2", path = "src/infra/datasets/inmem", default-features = false } +kamu-datasets-postgres = { version = "0.199.2", path = "src/infra/datasets/postgres", default-features = false } +kamu-datasets-sqlite = { version = "0.199.2", path = "src/infra/datasets/sqlite", default-features = false } +kamu-datasets-repo-tests = { version = "0.199.2", path = "src/infra/datasets/repo-tests", default-features = false } ## Task System -kamu-task-system-inmem = { version = "0.199.1", path = "src/infra/task-system/inmem", default-features = false } -kamu-task-system-postgres = { version = "0.199.1", path = "src/infra/task-system/postgres", default-features = false } -kamu-task-system-sqlite = { version = "0.199.1", path = "src/infra/task-system/sqlite", default-features = false } -kamu-task-system-repo-tests = { version = "0.199.1", path = "src/infra/task-system/repo-tests", default-features = false } +kamu-task-system-inmem = { version = "0.199.2", path = "src/infra/task-system/inmem", default-features = false } +kamu-task-system-postgres = { version = "0.199.2", path = "src/infra/task-system/postgres", default-features = false } +kamu-task-system-sqlite = { version = "0.199.2", path = "src/infra/task-system/sqlite", default-features = false } +kamu-task-system-repo-tests = { version = "0.199.2", path = "src/infra/task-system/repo-tests", default-features = false } ## ReBAC -kamu-auth-rebac-inmem = { version = "0.199.1", path = "src/infra/auth-rebac/inmem", default-features = false } -kamu-auth-rebac-repo-tests = { version = "0.199.1", path = "src/infra/auth-rebac/repo-tests", default-features = false } -kamu-auth-rebac-sqlite = { version = "0.199.1", path = "src/infra/auth-rebac/sqlite", default-features = false } +kamu-auth-rebac-inmem = { version = "0.199.2", path = "src/infra/auth-rebac/inmem", default-features = false } +kamu-auth-rebac-repo-tests = { version = "0.199.2", path = "src/infra/auth-rebac/repo-tests", default-features = false } +kamu-auth-rebac-sqlite = { version = "0.199.2", path = "src/infra/auth-rebac/sqlite", default-features = false } ## Outbox -kamu-messaging-outbox-inmem = { version = "0.199.1", path = "src/infra/messaging-outbox/inmem", default-features = false } -kamu-messaging-outbox-postgres = { version = "0.199.1", path = "src/infra/messaging-outbox/postgres", default-features = false } -kamu-messaging-outbox-sqlite = { version = "0.199.1", path = "src/infra/messaging-outbox/sqlite", default-features = false } -kamu-messaging-outbox-repo-tests = { version = "0.199.1", path = "src/infra/messaging-outbox/repo-tests", default-features = false } +kamu-messaging-outbox-inmem = { version = "0.199.2", path = "src/infra/messaging-outbox/inmem", default-features = false } +kamu-messaging-outbox-postgres = { version = "0.199.2", path = "src/infra/messaging-outbox/postgres", default-features = false } +kamu-messaging-outbox-sqlite = { version = "0.199.2", path = "src/infra/messaging-outbox/sqlite", default-features = false } +kamu-messaging-outbox-repo-tests = { version = "0.199.2", path = "src/infra/messaging-outbox/repo-tests", default-features = false } # Adapters -kamu-adapter-auth-oso = { version = "0.199.1", path = "src/adapter/auth-oso", default-features = false } -kamu-adapter-flight-sql = { version = "0.199.1", path = "src/adapter/flight-sql", default-features = false } -kamu-adapter-graphql = { version = "0.199.1", path = "src/adapter/graphql", default-features = false } -kamu-adapter-http = { version = "0.199.1", path = "src/adapter/http", default-features = false } -kamu-adapter-odata = { version = "0.199.1", path = "src/adapter/odata", default-features = false } -kamu-adapter-oauth = { version = "0.199.1", path = "src/adapter/oauth", default-features = false } +kamu-adapter-auth-oso = { version = "0.199.2", path = "src/adapter/auth-oso", default-features = false } +kamu-adapter-flight-sql = { version = "0.199.2", path = "src/adapter/flight-sql", default-features = false } +kamu-adapter-graphql = { version = "0.199.2", path = "src/adapter/graphql", default-features = false } +kamu-adapter-http = { version = "0.199.2", path = "src/adapter/http", default-features = false } +kamu-adapter-odata = { version = "0.199.2", path = "src/adapter/odata", default-features = false } +kamu-adapter-oauth = { version = "0.199.2", path = "src/adapter/oauth", default-features = false } # E2E -kamu-cli-e2e-common = { version = "0.199.1", path = "src/e2e/app/cli/common", default-features = false } -kamu-cli-e2e-common-macros = { version = "0.199.1", path = "src/e2e/app/cli/common-macros", default-features = false } -kamu-cli-e2e-repo-tests = { version = "0.199.1", path = "src/e2e/app/cli/repo-tests", default-features = false } +kamu-cli-e2e-common = { version = "0.199.2", path = "src/e2e/app/cli/common", default-features = false } +kamu-cli-e2e-common-macros = { version = "0.199.2", path = "src/e2e/app/cli/common-macros", default-features = false } +kamu-cli-e2e-repo-tests = { version = "0.199.2", path = "src/e2e/app/cli/repo-tests", default-features = false } [workspace.package] -version = "0.199.1" +version = "0.199.2" edition = "2021" homepage = "https://github.com/kamu-data/kamu-cli" repository = "https://github.com/kamu-data/kamu-cli" diff --git a/LICENSE.txt b/LICENSE.txt index 266a126120..a09cbdb85c 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -11,7 +11,7 @@ Business Source License 1.1 Licensor: Kamu Data, Inc. -Licensed Work: Kamu CLI Version 0.199.1 +Licensed Work: Kamu CLI Version 0.199.2 The Licensed Work is © 2023 Kamu Data, Inc. Additional Use Grant: You may use the Licensed Work for any purpose, diff --git a/src/app/cli/src/app.rs b/src/app/cli/src/app.rs index 575003f503..bc39a3adf4 100644 --- a/src/app/cli/src/app.rs +++ b/src/app/cli/src/app.rs @@ -24,8 +24,11 @@ use kamu_adapter_http::{FileUploadLimitConfig, UploadServiceLocal}; use kamu_adapter_oauth::GithubAuthenticationConfig; use kamu_auth_rebac_services::{MultiTenantRebacDatasetLifecycleMessageConsumer, RebacServiceImpl}; use kamu_datasets::DatasetEnvVar; -use kamu_flow_system_inmem::domain::FlowConfigurationUpdatedMessage; -use kamu_flow_system_services::MESSAGE_PRODUCER_KAMU_FLOW_CONFIGURATION_SERVICE; +use kamu_flow_system_inmem::domain::{FlowConfigurationUpdatedMessage, FlowProgressMessage}; +use kamu_flow_system_services::{ + MESSAGE_PRODUCER_KAMU_FLOW_CONFIGURATION_SERVICE, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, +}; use kamu_task_system_inmem::domain::{TaskProgressMessage, MESSAGE_PRODUCER_KAMU_TASK_EXECUTOR}; use messaging_outbox::{register_message_dispatcher, Outbox, OutboxDispatchingImpl}; use time_source::{SystemTimeSource, SystemTimeSourceDefault, SystemTimeSourceStub}; @@ -446,6 +449,10 @@ pub fn configure_base_catalog( &mut b, MESSAGE_PRODUCER_KAMU_FLOW_CONFIGURATION_SERVICE, ); + register_message_dispatcher::( + &mut b, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + ); b } diff --git a/src/domain/flow-system/domain/src/entities/flow/flow_outcome.rs b/src/domain/flow-system/domain/src/entities/flow/flow_outcome.rs index 599cb66474..5e669618c5 100644 --- a/src/domain/flow-system/domain/src/entities/flow/flow_outcome.rs +++ b/src/domain/flow-system/domain/src/entities/flow/flow_outcome.rs @@ -15,7 +15,7 @@ use ts::TaskError; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum FlowOutcome { /// Flow succeeded Success(FlowResult), @@ -55,14 +55,14 @@ impl FlowResult { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum FlowError { Failed, RootDatasetCompacted(FlowRootDatasetCompactedError), ResetHeadNotFound, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct FlowRootDatasetCompactedError { pub dataset_id: DatasetID, } diff --git a/src/domain/flow-system/domain/src/flow_messages_types.rs b/src/domain/flow-system/domain/src/flow_messages_types.rs index 0a6aabb5ec..79dfa10148 100644 --- a/src/domain/flow-system/domain/src/flow_messages_types.rs +++ b/src/domain/flow-system/domain/src/flow_messages_types.rs @@ -11,7 +11,7 @@ use chrono::{DateTime, Utc}; use messaging_outbox::Message; use serde::{Deserialize, Serialize}; -use crate::{FlowConfigurationRule, FlowKey}; +use crate::{FlowConfigurationRule, FlowID, FlowKey, FlowOutcome}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -41,8 +41,80 @@ impl Message for FlowExecutorUpdatedMessage {} pub enum FlowExecutorUpdateDetails { Loaded, ExecutedTimeslot, - FlowRunning, - FlowFinished, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum FlowProgressMessage { + Enqueued(FlowProgressMessageEnqueued), + Running(FlowProgressMessageRunning), + Finished(FlowProgressMessageFinished), + Cancelled(FlowProgressMessageCancelled), +} + +impl Message for FlowProgressMessage {} + +impl FlowProgressMessage { + pub fn enqueued( + event_time: DateTime, + flow_id: FlowID, + activate_at: DateTime, + ) -> Self { + Self::Enqueued(FlowProgressMessageEnqueued { + event_time, + flow_id, + activate_at, + }) + } + + pub fn running(event_time: DateTime, flow_id: FlowID) -> Self { + Self::Running(FlowProgressMessageRunning { + event_time, + flow_id, + }) + } + + pub fn finished(event_time: DateTime, flow_id: FlowID, outcome: FlowOutcome) -> Self { + Self::Finished(FlowProgressMessageFinished { + event_time, + flow_id, + outcome, + }) + } + + pub fn cancelled(event_time: DateTime, flow_id: FlowID) -> Self { + Self::Cancelled(FlowProgressMessageCancelled { + event_time, + flow_id, + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FlowProgressMessageEnqueued { + pub event_time: DateTime, + pub flow_id: FlowID, + pub activate_at: DateTime, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FlowProgressMessageRunning { + pub event_time: DateTime, + pub flow_id: FlowID, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FlowProgressMessageFinished { + pub event_time: DateTime, + pub flow_id: FlowID, + pub outcome: FlowOutcome, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FlowProgressMessageCancelled { + pub event_time: DateTime, + pub flow_id: FlowID, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/flow-system/domain/src/services/flow/flow_time_wheel_service.rs b/src/domain/flow-system/domain/src/services/flow/flow_time_wheel_service.rs index 1fcddd9dc7..8dda3067b0 100644 --- a/src/domain/flow-system/domain/src/services/flow/flow_time_wheel_service.rs +++ b/src/domain/flow-system/domain/src/services/flow/flow_time_wheel_service.rs @@ -19,12 +19,7 @@ pub trait FlowTimeWheelService: Send + Sync { fn take_nearest_planned_flows(&self) -> Vec; - fn activate_at(&self, activation_time: DateTime, flow_id: FlowID); - fn get_planned_flow_activation_time(&self, flow_id: FlowID) -> Option>; - - fn cancel_flow_activation(&self, flow_id: FlowID) - -> Result<(), TimeWheelCancelActivationError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/flow-system/services/src/flow/flow_enqueue_helper.rs b/src/domain/flow-system/services/src/flow/flow_enqueue_helper.rs index a1a42fe655..2979a0ff2c 100644 --- a/src/domain/flow-system/services/src/flow/flow_enqueue_helper.rs +++ b/src/domain/flow-system/services/src/flow/flow_enqueue_helper.rs @@ -14,10 +14,11 @@ use dill::component; use internal_error::InternalError; use kamu_core::{DatasetChangesService, DatasetOwnershipService, DependencyGraphService}; use kamu_flow_system::*; +use messaging_outbox::{Outbox, OutboxExt}; use time_source::SystemTimeSource; use super::{DownstreamDependencyFlowPlan, FlowTriggerContext}; -use crate::DownstreamDependencyTriggerType; +use crate::{DownstreamDependencyTriggerType, MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -25,6 +26,7 @@ pub(crate) struct FlowEnqueueHelper { flow_timewheel_service: Arc, flow_event_store: Arc, flow_configuration_service: Arc, + outbox: Arc, dataset_changes_service: Arc, dependency_graph_service: Arc, dataset_ownership_service: Arc, @@ -38,6 +40,7 @@ impl FlowEnqueueHelper { flow_timewheel_service: Arc, flow_event_store: Arc, flow_configuration_service: Arc, + outbox: Arc, dataset_changes_service: Arc, dependency_graph_service: Arc, dataset_ownership_service: Arc, @@ -48,6 +51,7 @@ impl FlowEnqueueHelper { flow_timewheel_service, flow_event_store, flow_configuration_service, + outbox, dataset_changes_service, dependency_graph_service, dataset_ownership_service, @@ -378,7 +382,8 @@ impl FlowEnqueueHelper { .is_some_and(|planned_time| throttling_boundary_time < planned_time) { // If so, enqueue the flow earlier - self.enqueue_flow(flow.flow_id, throttling_boundary_time); + self.enqueue_flow(flow.flow_id, throttling_boundary_time) + .await?; // Indicate throttling, if applied if throttling_boundary_time > trigger_time { @@ -437,7 +442,8 @@ impl FlowEnqueueHelper { // Apply throttling boundary let next_activation_time = std::cmp::max(throttling_boundary_time, naive_next_activation_time); - self.enqueue_flow(flow.flow_id, next_activation_time); + self.enqueue_flow(flow.flow_id, next_activation_time) + .await?; // Set throttling activity as start condition if throttling_boundary_time > naive_next_activation_time { @@ -461,7 +467,8 @@ impl FlowEnqueueHelper { // Apply throttling boundary let next_activation_time = std::cmp::max(throttling_boundary_time, trigger_time); - self.enqueue_flow(flow.flow_id, next_activation_time); + self.enqueue_flow(flow.flow_id, next_activation_time) + .await?; // Set throttling activity as start condition if throttling_boundary_time > trigger_time { @@ -580,7 +587,8 @@ impl FlowEnqueueHelper { None => true, }; if should_activate { - self.enqueue_flow(flow.flow_id, corrected_finish_time); + self.enqueue_flow(flow.flow_id, corrected_finish_time) + .await?; } // If batching is over, it's start condition is no longer valid. @@ -659,10 +667,18 @@ impl FlowEnqueueHelper { } } - #[tracing::instrument(level = "trace", skip_all, fields(%flow_id, %activation_time))] - fn enqueue_flow(&self, flow_id: FlowID, activation_time: DateTime) { - self.flow_timewheel_service - .activate_at(activation_time, flow_id); + #[tracing::instrument(level = "trace", skip_all, fields(%flow_id, %activate_at))] + async fn enqueue_flow( + &self, + flow_id: FlowID, + activate_at: DateTime, + ) -> Result<(), InternalError> { + self.outbox + .post_message( + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + FlowProgressMessage::enqueued(self.time_source.now(), flow_id, activate_at), + ) + .await } } diff --git a/src/domain/flow-system/services/src/flow/flow_executor_impl.rs b/src/domain/flow-system/services/src/flow/flow_executor_impl.rs index ddd69c3403..15a84e988b 100644 --- a/src/domain/flow-system/services/src/flow/flow_executor_impl.rs +++ b/src/domain/flow-system/services/src/flow/flow_executor_impl.rs @@ -35,6 +35,7 @@ use crate::{ MESSAGE_CONSUMER_KAMU_FLOW_EXECUTOR, MESSAGE_PRODUCER_KAMU_FLOW_CONFIGURATION_SERVICE, MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -124,10 +125,8 @@ impl FlowExecutorImpl { ) -> Result<(), InternalError> { // Extract necessary dependencies let flow_event_store = target_catalog.get_one::().unwrap(); - let flow_timewheel_service = target_catalog - .get_one::() - .unwrap(); let enqueue_helper = target_catalog.get_one::().unwrap(); + let outbox = target_catalog.get_one::().unwrap(); // How many waiting flows do we have? let waiting_filters = AllFlowFilters { @@ -170,7 +169,16 @@ impl FlowExecutorImpl { if activation_time < start_time { activation_time = start_time; } - flow_timewheel_service.activate_at(activation_time, *waiting_flow_id); + outbox + .post_message( + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + FlowProgressMessage::enqueued( + start_time, + *waiting_flow_id, + activation_time, + ), + ) + .await?; } // and we also need to re-evaluate the batching condition else if let FlowStartCondition::Batching(b) = start_condition { @@ -463,11 +471,16 @@ impl FlowExecutorTestDriver for FlowExecutorImpl { flow_id: FlowID, schedule_time: DateTime, ) -> Result { - self.flow_time_wheel_service - .cancel_flow_activation(flow_id) - .int_err()?; - let flow_event_store = target_catalog.get_one::().unwrap(); + let outbox = target_catalog.get_one::().unwrap(); + + outbox + .post_message( + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + FlowProgressMessage::cancelled(self.time_source.now(), flow_id), + ) + .await?; + let mut flow = Flow::load(flow_id, flow_event_store.as_ref()) .await .int_err()?; @@ -511,11 +524,8 @@ impl MessageConsumerT for FlowExecutorImpl { let outbox = target_catalog.get_one::().unwrap(); outbox .post_message( - MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR, - FlowExecutorUpdatedMessage { - update_time: message.event_time, - update_details: FlowExecutorUpdateDetails::FlowRunning, - }, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + FlowProgressMessage::running(message.event_time, flow_id), ) .await?; } @@ -568,11 +578,15 @@ impl MessageConsumerT for FlowExecutorImpl { let outbox = target_catalog.get_one::().unwrap(); outbox .post_message( - MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR, - FlowExecutorUpdatedMessage { - update_time: message.event_time, - update_details: FlowExecutorUpdateDetails::FlowFinished, - }, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + FlowProgressMessage::finished( + message.event_time, + flow_id, + flow.outcome + .as_ref() + .expect("Outcome must be attached by now") + .clone(), + ), ) .await?; @@ -598,15 +612,19 @@ impl MessageConsumerT for FlowExecutorImpl { if message.paused { let maybe_pending_flow_id = { let flow_event_store = target_catalog.get_one::().unwrap(); + let outbox = target_catalog.get_one::().unwrap(); let maybe_pending_flow_id = flow_event_store .try_get_pending_flow(&message.flow_key) .await?; if let Some(flow_id) = &maybe_pending_flow_id { - self.flow_time_wheel_service - .cancel_flow_activation(*flow_id) - .int_err()?; + outbox + .post_message( + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + FlowProgressMessage::cancelled(self.time_source.now(), *flow_id), + ) + .await?; } maybe_pending_flow_id }; @@ -659,23 +677,30 @@ impl MessageConsumerT for FlowExecutorImpl { .await? { flow_ids_2_abort.push(flow_id); - self.flow_time_wheel_service - .cancel_flow_activation(flow_id) - .int_err()?; } } flow_ids_2_abort }; let flow_event_store = target_catalog.get_one::().unwrap(); + let outbox = target_catalog.get_one::().unwrap(); // Abort matched flows for flow_id in flow_ids_2_abort { + let now = self.time_source.now(); + let mut flow = Flow::load(flow_id, flow_event_store.as_ref()) .await .int_err()?; - flow.abort(self.time_source.now()).int_err()?; + flow.abort(now).int_err()?; flow.save(flow_event_store.as_ref()).await.int_err()?; + + outbox + .post_message( + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + FlowProgressMessage::cancelled(now, flow_id), + ) + .await?; } // Not deleting task->update association, it should be safe. diff --git a/src/domain/flow-system/services/src/flow/flow_time_wheel_service_impl.rs b/src/domain/flow-system/services/src/flow/flow_time_wheel_service_impl.rs index ba8b313aff..acdc00b057 100644 --- a/src/domain/flow-system/services/src/flow/flow_time_wheel_service_impl.rs +++ b/src/domain/flow-system/services/src/flow/flow_time_wheel_service_impl.rs @@ -12,13 +12,26 @@ use std::collections::{BinaryHeap, HashMap}; use std::sync::{Arc, Mutex}; use chrono::{DateTime, Utc}; -use dill::{component, interface, scope, Singleton}; +use dill::{component, interface, meta, scope, Catalog, Singleton}; +use internal_error::{InternalError, ResultIntoInternal}; use kamu_flow_system::{ FlowID, + FlowProgressMessage, FlowTimeWheelService, TimeWheelCancelActivationError, TimeWheelFlowNotPlannedError, }; +use messaging_outbox::{ + MessageConsumer, + MessageConsumerMeta, + MessageConsumerT, + MessageConsumptionDurability, +}; + +use crate::{ + MESSAGE_CONSUMER_KAMU_FLOW_TIME_WHEEL_SERVICE, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, +}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -91,6 +104,15 @@ impl FlowRecord { #[component(pub)] #[interface(dyn FlowTimeWheelService)] +#[interface(dyn MessageConsumer)] +#[interface(dyn MessageConsumerT)] +#[meta(MessageConsumerMeta { + consumer_name: MESSAGE_CONSUMER_KAMU_FLOW_TIME_WHEEL_SERVICE, + feeding_producers: &[ + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE + ], + durability: MessageConsumptionDurability::Durable, +})] #[scope(Singleton)] impl FlowTimeWheelServiceImpl { pub fn new() -> Self { @@ -98,6 +120,38 @@ impl FlowTimeWheelServiceImpl { state: Arc::new(Mutex::new(State::default())), } } + + fn activate_at(&self, activation_time: DateTime, flow_id: FlowID) { + let mut guard = self.state.lock().unwrap(); + + match guard.flow_activation_times_by_id.get(&flow_id) { + Some(earlier_activation_time) => { + if activation_time < *earlier_activation_time { + guard.unplan_flow(flow_id); + guard.plan_flow(FlowRecord::new(activation_time, flow_id)); + } + } + None => { + guard.plan_flow(FlowRecord::new(activation_time, flow_id)); + } + } + } + + fn cancel_flow_activation( + &self, + flow_id: FlowID, + ) -> Result<(), TimeWheelCancelActivationError> { + let mut guard = self.state.lock().unwrap(); + + if guard.flow_activation_times_by_id.contains_key(&flow_id) { + guard.unplan_flow(flow_id); + Ok(()) + } else { + Err(TimeWheelCancelActivationError::FlowNotPlanned( + TimeWheelFlowNotPlannedError { flow_id }, + )) + } + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -135,40 +189,35 @@ impl FlowTimeWheelService for FlowTimeWheelServiceImpl { } } - fn activate_at(&self, activation_time: DateTime, flow_id: FlowID) { - let mut guard = self.state.lock().unwrap(); - - match guard.flow_activation_times_by_id.get(&flow_id) { - Some(earlier_activation_time) => { - if activation_time < *earlier_activation_time { - guard.unplan_flow(flow_id); - guard.plan_flow(FlowRecord::new(activation_time, flow_id)); - } - } - None => { - guard.plan_flow(FlowRecord::new(activation_time, flow_id)); - } - } - } - fn get_planned_flow_activation_time(&self, flow_id: FlowID) -> Option> { let guard = self.state.lock().unwrap(); guard.flow_activation_times_by_id.get(&flow_id).copied() } +} - fn cancel_flow_activation( - &self, - flow_id: FlowID, - ) -> Result<(), TimeWheelCancelActivationError> { - let mut guard = self.state.lock().unwrap(); +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - if guard.flow_activation_times_by_id.contains_key(&flow_id) { - guard.unplan_flow(flow_id); - Ok(()) - } else { - Err(TimeWheelCancelActivationError::FlowNotPlanned( - TimeWheelFlowNotPlannedError { flow_id }, - )) +impl MessageConsumer for FlowTimeWheelServiceImpl {} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[async_trait::async_trait] +impl MessageConsumerT for FlowTimeWheelServiceImpl { + async fn consume_message( + &self, + _: &Catalog, + message: &FlowProgressMessage, + ) -> Result<(), InternalError> { + match message { + FlowProgressMessage::Enqueued(e) => { + self.activate_at(e.activate_at, e.flow_id); + Ok(()) + } + FlowProgressMessage::Cancelled(e) => { + self.cancel_flow_activation(e.flow_id).int_err()?; + Ok(()) + } + FlowProgressMessage::Running(_) | FlowProgressMessage::Finished(_) => Ok(()), } } } @@ -260,7 +309,7 @@ mod tests { assert!(timewheel.nearest_activation_moment().is_none()); } - fn schedule_flow(timewheel: &dyn FlowTimeWheelService, moment: DateTime, flow_id: u64) { + fn schedule_flow(timewheel: &FlowTimeWheelServiceImpl, moment: DateTime, flow_id: u64) { timewheel.activate_at(moment, FlowID::new(flow_id)); } diff --git a/src/domain/flow-system/services/src/messages/flow_message_consumers.rs b/src/domain/flow-system/services/src/messages/flow_message_consumers.rs index 64eb11480d..f8930c2300 100644 --- a/src/domain/flow-system/services/src/messages/flow_message_consumers.rs +++ b/src/domain/flow-system/services/src/messages/flow_message_consumers.rs @@ -14,4 +14,7 @@ pub const MESSAGE_CONSUMER_KAMU_FLOW_CONFIGURATION_SERVICE: &str = pub const MESSAGE_CONSUMER_KAMU_FLOW_EXECUTOR: &str = "dev.kamu.domain.flow-system.FlowExecutor"; +pub const MESSAGE_CONSUMER_KAMU_FLOW_TIME_WHEEL_SERVICE: &str = + "dev.kamu.domain.flow-system.FlowTimeWheelService"; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/flow-system/services/src/messages/flow_message_producers.rs b/src/domain/flow-system/services/src/messages/flow_message_producers.rs index b1dcecd92e..d816233a2b 100644 --- a/src/domain/flow-system/services/src/messages/flow_message_producers.rs +++ b/src/domain/flow-system/services/src/messages/flow_message_producers.rs @@ -14,4 +14,7 @@ pub const MESSAGE_PRODUCER_KAMU_FLOW_CONFIGURATION_SERVICE: &str = pub const MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR: &str = "dev.kamu.domain.flow-system.FlowExecutor"; +pub const MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE: &str = + "dev.kamu.domain.flow-system.FlowProgressService"; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/flow-system/services/tests/tests/utils/flow_harness_shared.rs b/src/domain/flow-system/services/tests/tests/utils/flow_harness_shared.rs index 0c308c0f9c..16f9f5f5ba 100644 --- a/src/domain/flow-system/services/tests/tests/utils/flow_harness_shared.rs +++ b/src/domain/flow-system/services/tests/tests/utils/flow_harness_shared.rs @@ -176,6 +176,10 @@ impl FlowHarness { &mut b, MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR, ); + register_message_dispatcher::( + &mut b, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + ); b.build() }; diff --git a/src/domain/flow-system/services/tests/tests/utils/flow_system_test_listener.rs b/src/domain/flow-system/services/tests/tests/utils/flow_system_test_listener.rs index 71d92c8ee7..e813e60432 100644 --- a/src/domain/flow-system/services/tests/tests/utils/flow_system_test_listener.rs +++ b/src/domain/flow-system/services/tests/tests/utils/flow_system_test_listener.rs @@ -15,7 +15,10 @@ use database_common::PaginationOpts; use dill::*; use internal_error::InternalError; use kamu_flow_system::*; -use kamu_flow_system_services::MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR; +use kamu_flow_system_services::{ + MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, +}; use messaging_outbox::{ MessageConsumer, MessageConsumerMeta, @@ -45,9 +48,10 @@ struct FlowSystemTestListenerState { #[scope(Singleton)] #[interface(dyn MessageConsumer)] #[interface(dyn MessageConsumerT)] +#[interface(dyn MessageConsumerT)] #[meta(MessageConsumerMeta { consumer_name: "FlowSystemTestListener", - feeding_producers: &[MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR], + feeding_producers: &[MESSAGE_PRODUCER_KAMU_FLOW_EXECUTOR, MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE], durability: MessageConsumptionDurability::BestEffort, })] impl FlowSystemTestListener { @@ -247,4 +251,20 @@ impl MessageConsumerT for FlowSystemTestListener { } } +#[async_trait::async_trait] +impl MessageConsumerT for FlowSystemTestListener { + async fn consume_message( + &self, + _: &Catalog, + message: &FlowProgressMessage, + ) -> Result<(), InternalError> { + match message { + FlowProgressMessage::Running(e) => self.make_a_snapshot(e.event_time).await, + FlowProgressMessage::Finished(e) => self.make_a_snapshot(e.event_time).await, + FlowProgressMessage::Enqueued(_) | FlowProgressMessage::Cancelled(_) => {} + } + Ok(()) + } +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/flow-system/postgres/.sqlx/query-81589f523449579cd2055cc5e0fe72949f8fa6bc064fcf8076e13eec01e24cbf.json b/src/infra/flow-system/postgres/.sqlx/query-4fe6140fe57e34250cd77e7d28a6f1a071848bba40901c834747b68592ff2fb8.json similarity index 81% rename from src/infra/flow-system/postgres/.sqlx/query-81589f523449579cd2055cc5e0fe72949f8fa6bc064fcf8076e13eec01e24cbf.json rename to src/infra/flow-system/postgres/.sqlx/query-4fe6140fe57e34250cd77e7d28a6f1a071848bba40901c834747b68592ff2fb8.json index 4bebe54085..6c54f40761 100644 --- a/src/infra/flow-system/postgres/.sqlx/query-81589f523449579cd2055cc5e0fe72949f8fa6bc064fcf8076e13eec01e24cbf.json +++ b/src/infra/flow-system/postgres/.sqlx/query-4fe6140fe57e34250cd77e7d28a6f1a071848bba40901c834747b68592ff2fb8.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT event_id, event_payload\n FROM flow_events\n WHERE flow_id = $1\n AND (cast($2 as INT8) IS NULL OR event_id > $2)\n AND (cast($3 as INT8) IS NULL OR event_id <= $3)\n ", + "query": "\n SELECT event_id, event_payload\n FROM flow_events\n WHERE flow_id = $1\n AND (cast($2 as INT8) IS NULL OR event_id > $2)\n AND (cast($3 as INT8) IS NULL OR event_id <= $3)\n ORDER BY event_id ASC\n ", "describe": { "columns": [ { @@ -26,5 +26,5 @@ false ] }, - "hash": "81589f523449579cd2055cc5e0fe72949f8fa6bc064fcf8076e13eec01e24cbf" + "hash": "4fe6140fe57e34250cd77e7d28a6f1a071848bba40901c834747b68592ff2fb8" } diff --git a/src/infra/flow-system/postgres/.sqlx/query-d6e99f6892fe26dad007d61005080963094a4a1e892184b6a7f3a09fb86b64ac.json b/src/infra/flow-system/postgres/.sqlx/query-9f698badbce866410c85e871adf1824d11b0c37267e6c373aaa0d893f61c08d2.json similarity index 84% rename from src/infra/flow-system/postgres/.sqlx/query-d6e99f6892fe26dad007d61005080963094a4a1e892184b6a7f3a09fb86b64ac.json rename to src/infra/flow-system/postgres/.sqlx/query-9f698badbce866410c85e871adf1824d11b0c37267e6c373aaa0d893f61c08d2.json index 33a8ba03fb..0bf6195cb1 100644 --- a/src/infra/flow-system/postgres/.sqlx/query-d6e99f6892fe26dad007d61005080963094a4a1e892184b6a7f3a09fb86b64ac.json +++ b/src/infra/flow-system/postgres/.sqlx/query-9f698badbce866410c85e871adf1824d11b0c37267e6c373aaa0d893f61c08d2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT event_id, event_payload\n FROM flow_configuration_events\n WHERE system_flow_type = ($1::text)::system_flow_type\n AND (cast($2 as INT8) IS NULL or event_id > $2)\n AND (cast($3 as INT8) IS NULL or event_id <= $3)\n ", + "query": "\n SELECT event_id, event_payload\n FROM flow_configuration_events\n WHERE system_flow_type = ($1::text)::system_flow_type\n AND (cast($2 as INT8) IS NULL or event_id > $2)\n AND (cast($3 as INT8) IS NULL or event_id <= $3)\n ORDER BY event_id ASC\n ", "describe": { "columns": [ { @@ -26,5 +26,5 @@ false ] }, - "hash": "d6e99f6892fe26dad007d61005080963094a4a1e892184b6a7f3a09fb86b64ac" + "hash": "9f698badbce866410c85e871adf1824d11b0c37267e6c373aaa0d893f61c08d2" } diff --git a/src/infra/flow-system/postgres/.sqlx/query-270be1ac6c19dda611c6a90ac3e4fa387a38f2aa9bf98152049d991feb3fe53e.json b/src/infra/flow-system/postgres/.sqlx/query-fcf5b3b9651827987f624ed1c814503f120453dd1c16894a5d6a43b44d5da76a.json similarity index 80% rename from src/infra/flow-system/postgres/.sqlx/query-270be1ac6c19dda611c6a90ac3e4fa387a38f2aa9bf98152049d991feb3fe53e.json rename to src/infra/flow-system/postgres/.sqlx/query-fcf5b3b9651827987f624ed1c814503f120453dd1c16894a5d6a43b44d5da76a.json index f60d4de23e..e075b29ab1 100644 --- a/src/infra/flow-system/postgres/.sqlx/query-270be1ac6c19dda611c6a90ac3e4fa387a38f2aa9bf98152049d991feb3fe53e.json +++ b/src/infra/flow-system/postgres/.sqlx/query-fcf5b3b9651827987f624ed1c814503f120453dd1c16894a5d6a43b44d5da76a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT event_id, event_payload\n FROM flow_configuration_events\n WHERE dataset_id = $1\n AND dataset_flow_type = ($2::text)::dataset_flow_type\n AND (cast($3 as INT8) IS NULL or event_id > $3)\n AND (cast($4 as INT8) IS NULL or event_id <= $4)\n ", + "query": "\n SELECT event_id, event_payload\n FROM flow_configuration_events\n WHERE dataset_id = $1\n AND dataset_flow_type = ($2::text)::dataset_flow_type\n AND (cast($3 as INT8) IS NULL or event_id > $3)\n AND (cast($4 as INT8) IS NULL or event_id <= $4)\n ORDER BY event_id ASC\n ", "describe": { "columns": [ { @@ -27,5 +27,5 @@ false ] }, - "hash": "270be1ac6c19dda611c6a90ac3e4fa387a38f2aa9bf98152049d991feb3fe53e" + "hash": "fcf5b3b9651827987f624ed1c814503f120453dd1c16894a5d6a43b44d5da76a" } diff --git a/src/infra/flow-system/postgres/src/postgres_flow_configuration_event_store.rs b/src/infra/flow-system/postgres/src/postgres_flow_configuration_event_store.rs index f9d20a385f..6fb3141f85 100644 --- a/src/infra/flow-system/postgres/src/postgres_flow_configuration_event_store.rs +++ b/src/infra/flow-system/postgres/src/postgres_flow_configuration_event_store.rs @@ -48,6 +48,7 @@ impl PostgresFlowConfigurationEventStore { WHERE system_flow_type = ($1::text)::system_flow_type AND (cast($2 as INT8) IS NULL or event_id > $2) AND (cast($3 as INT8) IS NULL or event_id <= $3) + ORDER BY event_id ASC "#, fk_system.flow_type as SystemFlowType, maybe_from_id, @@ -87,6 +88,7 @@ impl PostgresFlowConfigurationEventStore { AND dataset_flow_type = ($2::text)::dataset_flow_type AND (cast($3 as INT8) IS NULL or event_id > $3) AND (cast($4 as INT8) IS NULL or event_id <= $4) + ORDER BY event_id ASC "#, fk_dataset.dataset_id.to_string(), fk_dataset.flow_type as DatasetFlowType, diff --git a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs index 08066eea3c..fefcf79143 100644 --- a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs +++ b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs @@ -173,6 +173,7 @@ impl EventStore for PostgresFlowEventStore { WHERE flow_id = $1 AND (cast($2 as INT8) IS NULL OR event_id > $2) AND (cast($3 as INT8) IS NULL OR event_id <= $3) + ORDER BY event_id ASC "#, flow_id, maybe_from_id, diff --git a/src/infra/flow-system/sqlite/.sqlx/query-25979568c6cba50fb06efaf49c13d398aeb1aa65be675c00df8881f6bae5bbb5.json b/src/infra/flow-system/sqlite/.sqlx/query-25979568c6cba50fb06efaf49c13d398aeb1aa65be675c00df8881f6bae5bbb5.json deleted file mode 100644 index 2ec2802a89..0000000000 --- a/src/infra/flow-system/sqlite/.sqlx/query-25979568c6cba50fb06efaf49c13d398aeb1aa65be675c00df8881f6bae5bbb5.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT event_id, event_payload as \"event_payload: _\"\n FROM flow_configuration_events\n WHERE dataset_id = $1\n AND dataset_flow_type = $2\n AND (cast($3 as INT8) IS NULL or event_id > $3)\n AND (cast($4 as INT8) IS NULL or event_id <= $4)\n ", - "describe": { - "columns": [ - { - "name": "event_id", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "event_payload: _", - "ordinal": 1, - "type_info": "Null" - } - ], - "parameters": { - "Right": 4 - }, - "nullable": [ - false, - false - ] - }, - "hash": "25979568c6cba50fb06efaf49c13d398aeb1aa65be675c00df8881f6bae5bbb5" -} diff --git a/src/infra/flow-system/sqlite/.sqlx/query-0e171da9db328875a9dab79e608b27390c63727c238c2f277ec59814202a5de3.json b/src/infra/flow-system/sqlite/.sqlx/query-516f3e70630f33046c2c08e7c32e570e304b40d358a6c3fe4be3d90c9bcf00eb.json similarity index 52% rename from src/infra/flow-system/sqlite/.sqlx/query-0e171da9db328875a9dab79e608b27390c63727c238c2f277ec59814202a5de3.json rename to src/infra/flow-system/sqlite/.sqlx/query-516f3e70630f33046c2c08e7c32e570e304b40d358a6c3fe4be3d90c9bcf00eb.json index 340da7d1ec..3ad1db02be 100644 --- a/src/infra/flow-system/sqlite/.sqlx/query-0e171da9db328875a9dab79e608b27390c63727c238c2f277ec59814202a5de3.json +++ b/src/infra/flow-system/sqlite/.sqlx/query-516f3e70630f33046c2c08e7c32e570e304b40d358a6c3fe4be3d90c9bcf00eb.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT event_id, event_payload as \"event_payload: _\"\n FROM flow_configuration_events\n WHERE system_flow_type = $1\n AND (cast($2 as INT8) IS NULL or event_id > $2)\n AND (cast($3 as INT8) IS NULL or event_id <= $3)\n ", + "query": "\n SELECT event_id, event_payload as \"event_payload: _\"\n FROM flow_configuration_events\n WHERE system_flow_type = $1\n AND (cast($2 as INT8) IS NULL or event_id > $2)\n AND (cast($3 as INT8) IS NULL or event_id <= $3)\n ORDER BY event_id ASC\n ", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "0e171da9db328875a9dab79e608b27390c63727c238c2f277ec59814202a5de3" + "hash": "516f3e70630f33046c2c08e7c32e570e304b40d358a6c3fe4be3d90c9bcf00eb" } diff --git a/src/infra/flow-system/sqlite/.sqlx/query-75a9c9e34c11dd13537fe4434ee4670d79ea52c14cb3df8802852dca993506a8.json b/src/infra/flow-system/sqlite/.sqlx/query-b24482785336a84d30b739321626acc18fb2d6e943516f11260742d4ae19d579.json similarity index 78% rename from src/infra/flow-system/sqlite/.sqlx/query-75a9c9e34c11dd13537fe4434ee4670d79ea52c14cb3df8802852dca993506a8.json rename to src/infra/flow-system/sqlite/.sqlx/query-b24482785336a84d30b739321626acc18fb2d6e943516f11260742d4ae19d579.json index cbf389f98c..81800f9a6a 100644 --- a/src/infra/flow-system/sqlite/.sqlx/query-75a9c9e34c11dd13537fe4434ee4670d79ea52c14cb3df8802852dca993506a8.json +++ b/src/infra/flow-system/sqlite/.sqlx/query-b24482785336a84d30b739321626acc18fb2d6e943516f11260742d4ae19d579.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT event_id, event_payload as \"event_payload: _\"\n FROM flow_events\n WHERE flow_id = $1\n AND (cast($2 as INT8) IS NULL OR event_id > $2)\n AND (cast($3 as INT8) IS NULL OR event_id <= $3)\n ", + "query": "\n SELECT event_id, event_payload as \"event_payload: _\"\n FROM flow_events\n WHERE flow_id = $1\n AND (cast($2 as INT8) IS NULL OR event_id > $2)\n AND (cast($3 as INT8) IS NULL OR event_id <= $3)\n ORDER BY event_id ASC\n ", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "75a9c9e34c11dd13537fe4434ee4670d79ea52c14cb3df8802852dca993506a8" + "hash": "b24482785336a84d30b739321626acc18fb2d6e943516f11260742d4ae19d579" } diff --git a/src/infra/flow-system/sqlite/.sqlx/query-b3acc2c14f77583384199b0a99dd8092c0f3a995c1a54fae2152096db9775e98.json b/src/infra/flow-system/sqlite/.sqlx/query-b3acc2c14f77583384199b0a99dd8092c0f3a995c1a54fae2152096db9775e98.json new file mode 100644 index 0000000000..504c15cf6a --- /dev/null +++ b/src/infra/flow-system/sqlite/.sqlx/query-b3acc2c14f77583384199b0a99dd8092c0f3a995c1a54fae2152096db9775e98.json @@ -0,0 +1,26 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT event_id, event_payload as \"event_payload: _\"\n FROM flow_configuration_events\n WHERE dataset_id = $1\n AND dataset_flow_type = $2\n AND (cast($3 as INT8) IS NULL or event_id > $3)\n AND (cast($4 as INT8) IS NULL or event_id <= $4)\n ORDER BY event_id ASC\n ", + "describe": { + "columns": [ + { + "name": "event_id", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "event_payload: _", + "ordinal": 1, + "type_info": "Null" + } + ], + "parameters": { + "Right": 4 + }, + "nullable": [ + false, + false + ] + }, + "hash": "b3acc2c14f77583384199b0a99dd8092c0f3a995c1a54fae2152096db9775e98" +} diff --git a/src/infra/flow-system/sqlite/src/sqlite_flow_configuration_event_store.rs b/src/infra/flow-system/sqlite/src/sqlite_flow_configuration_event_store.rs index 467800e728..37bbd977c9 100644 --- a/src/infra/flow-system/sqlite/src/sqlite_flow_configuration_event_store.rs +++ b/src/infra/flow-system/sqlite/src/sqlite_flow_configuration_event_store.rs @@ -59,10 +59,11 @@ impl SqliteFlowConfigurationEventStore { EventModel, r#" SELECT event_id, event_payload as "event_payload: _" - FROM flow_configuration_events - WHERE system_flow_type = $1 - AND (cast($2 as INT8) IS NULL or event_id > $2) - AND (cast($3 as INT8) IS NULL or event_id <= $3) + FROM flow_configuration_events + WHERE system_flow_type = $1 + AND (cast($2 as INT8) IS NULL or event_id > $2) + AND (cast($3 as INT8) IS NULL or event_id <= $3) + ORDER BY event_id ASC "#, fk_system.flow_type, maybe_from_id, @@ -101,11 +102,12 @@ impl SqliteFlowConfigurationEventStore { EventModel, r#" SELECT event_id, event_payload as "event_payload: _" - FROM flow_configuration_events - WHERE dataset_id = $1 - AND dataset_flow_type = $2 - AND (cast($3 as INT8) IS NULL or event_id > $3) - AND (cast($4 as INT8) IS NULL or event_id <= $4) + FROM flow_configuration_events + WHERE dataset_id = $1 + AND dataset_flow_type = $2 + AND (cast($3 as INT8) IS NULL or event_id > $3) + AND (cast($4 as INT8) IS NULL or event_id <= $4) + ORDER BY event_id ASC "#, dataset_id, fk_dataset.flow_type, diff --git a/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs b/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs index 23a525418a..9dde933929 100644 --- a/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs +++ b/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs @@ -194,6 +194,7 @@ impl EventStore for SqliteFlowEventStore { WHERE flow_id = $1 AND (cast($2 as INT8) IS NULL OR event_id > $2) AND (cast($3 as INT8) IS NULL OR event_id <= $3) + ORDER BY event_id ASC "#, flow_id, maybe_from_id, diff --git a/src/infra/task-system/postgres/.sqlx/query-af9b86147a70cd6d55c5fa3bef0c02b49ea23cc6214a8c78f562a450d4032756.json b/src/infra/task-system/postgres/.sqlx/query-4728e19ed95adfd6b51ed469d781f248ce8263a55c4a8d1fe264ebac53bc4cc0.json similarity index 81% rename from src/infra/task-system/postgres/.sqlx/query-af9b86147a70cd6d55c5fa3bef0c02b49ea23cc6214a8c78f562a450d4032756.json rename to src/infra/task-system/postgres/.sqlx/query-4728e19ed95adfd6b51ed469d781f248ce8263a55c4a8d1fe264ebac53bc4cc0.json index ff54492c5e..9ffc571ae0 100644 --- a/src/infra/task-system/postgres/.sqlx/query-af9b86147a70cd6d55c5fa3bef0c02b49ea23cc6214a8c78f562a450d4032756.json +++ b/src/infra/task-system/postgres/.sqlx/query-4728e19ed95adfd6b51ed469d781f248ce8263a55c4a8d1fe264ebac53bc4cc0.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT event_id, event_payload FROM task_events\n WHERE task_id = $1\n AND (cast($2 as INT8) IS NULL or event_id > $2)\n AND (cast($3 as INT8) IS NULL or event_id <= $3)\n ", + "query": "\n SELECT event_id, event_payload FROM task_events\n WHERE task_id = $1\n AND (cast($2 as INT8) IS NULL or event_id > $2)\n AND (cast($3 as INT8) IS NULL or event_id <= $3)\n ORDER BY event_id ASC\n ", "describe": { "columns": [ { @@ -26,5 +26,5 @@ false ] }, - "hash": "af9b86147a70cd6d55c5fa3bef0c02b49ea23cc6214a8c78f562a450d4032756" + "hash": "4728e19ed95adfd6b51ed469d781f248ce8263a55c4a8d1fe264ebac53bc4cc0" } diff --git a/src/infra/task-system/postgres/src/postgres_task_event_store.rs b/src/infra/task-system/postgres/src/postgres_task_event_store.rs index d251494024..cff4d18a2d 100644 --- a/src/infra/task-system/postgres/src/postgres_task_event_store.rs +++ b/src/infra/task-system/postgres/src/postgres_task_event_store.rs @@ -136,6 +136,7 @@ impl EventStore for PostgresTaskEventStore { WHERE task_id = $1 AND (cast($2 as INT8) IS NULL or event_id > $2) AND (cast($3 as INT8) IS NULL or event_id <= $3) + ORDER BY event_id ASC "#, task_id, maybe_from_id, diff --git a/src/infra/task-system/sqlite/.sqlx/query-3de78083e9e1b68c576ce420795cb8834ad3f370aa6cd103d5e1399137b9370b.json b/src/infra/task-system/sqlite/.sqlx/query-a435c6e8608fb3cf53a9245c52cf5ccd1f86d0de062112dee8d9f27e8da38afb.json similarity index 84% rename from src/infra/task-system/sqlite/.sqlx/query-3de78083e9e1b68c576ce420795cb8834ad3f370aa6cd103d5e1399137b9370b.json rename to src/infra/task-system/sqlite/.sqlx/query-a435c6e8608fb3cf53a9245c52cf5ccd1f86d0de062112dee8d9f27e8da38afb.json index 6f0f671dc2..04e70a6ad9 100644 --- a/src/infra/task-system/sqlite/.sqlx/query-3de78083e9e1b68c576ce420795cb8834ad3f370aa6cd103d5e1399137b9370b.json +++ b/src/infra/task-system/sqlite/.sqlx/query-a435c6e8608fb3cf53a9245c52cf5ccd1f86d0de062112dee8d9f27e8da38afb.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT event_id as \"event_id: _\", event_payload as \"event_payload: _\" FROM task_events\n WHERE task_id = $1\n AND (cast($2 as INT8) IS NULL or event_id > $2)\n AND (cast($3 as INT8) IS NULL or event_id <= $3)\n ", + "query": "\n SELECT event_id as \"event_id: _\", event_payload as \"event_payload: _\" FROM task_events\n WHERE task_id = $1\n AND (cast($2 as INT8) IS NULL or event_id > $2)\n AND (cast($3 as INT8) IS NULL or event_id <= $3)\n ORDER BY event_id ASC\n ", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "3de78083e9e1b68c576ce420795cb8834ad3f370aa6cd103d5e1399137b9370b" + "hash": "a435c6e8608fb3cf53a9245c52cf5ccd1f86d0de062112dee8d9f27e8da38afb" } diff --git a/src/infra/task-system/sqlite/src/sqlite_task_event_store.rs b/src/infra/task-system/sqlite/src/sqlite_task_event_store.rs index d2b0be4dec..e749866374 100644 --- a/src/infra/task-system/sqlite/src/sqlite_task_event_store.rs +++ b/src/infra/task-system/sqlite/src/sqlite_task_event_store.rs @@ -144,6 +144,7 @@ impl EventStore for SqliteTaskSystemEventStore { WHERE task_id = $1 AND (cast($2 as INT8) IS NULL or event_id > $2) AND (cast($3 as INT8) IS NULL or event_id <= $3) + ORDER BY event_id ASC "#, task_id, maybe_from_id, diff --git a/src/utils/database-common/src/transactions/db_transaction_manager.rs b/src/utils/database-common/src/transactions/db_transaction_manager.rs index 539fc7366b..4b16036654 100644 --- a/src/utils/database-common/src/transactions/db_transaction_manager.rs +++ b/src/utils/database-common/src/transactions/db_transaction_manager.rs @@ -44,6 +44,7 @@ impl DatabaseTransactionRunner { Self { catalog } } + #[tracing::instrument(level = "info", skip_all)] pub async fn transactional( &self, callback: H, @@ -64,6 +65,9 @@ impl DatabaseTransactionRunner { // A catalog with a transaction must live for a limited time let result = { + let transaction_body_span = tracing::info_span!("Transaction Body"); + let _ = transaction_body_span.enter(); + // Create a chained catalog for transaction-aware components, // but keep a local copy of a transaction pointer let catalog_with_transaction = CatalogBuilder::new_chained(&self.catalog) @@ -77,14 +81,21 @@ impl DatabaseTransactionRunner { match result { // In case everything succeeded, commit the transaction Ok(res) => { + let transaction_commit_span = tracing::info_span!("Transaction COMMIT"); + let _ = transaction_commit_span.enter(); + db_transaction_manager .commit_transaction(transaction_ref) .await?; + Ok(res) } // Otherwise, do an explicit rollback Err(e) => { + let transaction_rollback_span = tracing::error_span!("Transaction ROLLBACK"); + let _ = transaction_rollback_span.enter(); + db_transaction_manager .rollback_transaction(transaction_ref) .await?;