From 4b06ccf22454d2ef7ac2eda4a786974978402073 Mon Sep 17 00:00:00 2001 From: Dima Pristupa Date: Wed, 25 Sep 2024 18:39:49 +0300 Subject: [PATCH] Private Datasets: use SQLite storage by default for multi-tenant (#787) * kamu-cli: a SQLite database is used for a multi-tenant workspace * kamu-cli, prepare_run_dir(): move error variables into format string * kamu-cli, run(): use database_config * InitCommand::before_run(): absorb checks * kamu-cli, run(): apply embedded migrations * SqlitePlugin::open_sqlite_pool(): add the "create if missing" flag * kamu-cli, apply_migrations(): use warn!() not panic!() * kamu-cli, build.rs: rebuild if ./migrations/ changed * CHANGELOG.md: update * CHANGELOG.md: fix typo * kamu-cli, run(): remove an outdated TODO * CHANGELOG.md: update after rebase * kamu-cli, command_needs_transaction(): apply for "add" * kamu-cli, initialize_components(): FlowServiceImpl pre-initialization * TODO DROP: OutboxTransactionalProcessor::run_with_mode(): introduce * PropertyName::dataset_allows_anonymous_read(): parity with dataset_allows_public_read() * CHANGELOG.md: update * kamu-cli, build.rs: rebuilt if migration have changed * kamu-cli, build.rs: activate "fail on error" for vergen * kamu-cli, build.rs: add a comment about vergen * DatabaseConfig::sqlite_database() -> sqlite_database_in_workspace_dir() * SqlitePlugin::catalog_with_connected_pool(): absorb the migrator stuff * async-utils: introduce * Creating a database in a temporary directory with subsequent moving * kamu-cli: remove extra dep * WebUIServer: typo fixes * kamu-cli, initialize_components(): rename to initialize_server_components() * Fixes after rebasing * kamu-cli, configure_base_catalog(): remove unused "is_e2e_testing" argument * kamu-cli: introduce server_catalog * kamu-cli, configure_server_catalog(): move login-related deps to the base catalog * kamu-cli, get_command(): rename cli_catalog -> work_catalog * test_di_server_graph_validates(): add * test_di_cli_graph_validates(): remove extra components * test_di_graph: split st & mt test variants * test_di_graph: fix lint warnings * Fixes after rebasing * kamu-cli, List command: fix a typo * OutboxExecutor::run_while_has_tasks(): introduce * kamu-cli: restore app.rs changes * kamu-cli: get_app_database_config(): move to database.rs * Extract fix of the WebUIFeatureFlags' typo into a dedicated issue * AppDatabaseConfig: logic simplification * kamu-cli, app: initialize_base_components(): extract * kamu-cli, app: initialize_server_components(): debug logging instrumentation * kamu-cli, app: move_initial_database_to_workspace_if_needed(): extract * kamu-cli, app: initialize_server_components(): absorb APIServerRunCommand::pre_run() * Updates after rebasing * Release (minor): 0.204.0 --- CHANGELOG.md | 9 +- Cargo.lock | 148 +++++---- Cargo.toml | 134 ++++---- LICENSE.txt | 4 +- src/app/cli/Cargo.toml | 5 +- src/app/cli/build.rs | 19 +- src/app/cli/src/app.rs | 262 +++++++++++---- src/app/cli/src/cli.rs | 2 +- src/app/cli/src/cli_commands.rs | 314 +++++++++--------- src/app/cli/src/commands/init_command.rs | 25 +- .../commands/system_api_server_run_command.rs | 1 - src/app/cli/src/database.rs | 109 +++++- src/app/cli/src/explore/api_server.rs | 20 +- src/app/cli/src/explore/web_ui_server.rs | 13 +- src/app/cli/src/services/config/models.rs | 8 + .../services/workspace/workspace_layout.rs | 9 + src/app/cli/tests/tests/test_di_graph.rs | 78 ++++- .../domain/src/entities/property.rs | 13 +- .../services/tests/tests/utils/task_driver.rs | 2 +- .../services/src/task_executor_impl.rs | 2 +- src/utils/async-utils/Cargo.toml | 25 ++ src/utils/async-utils/src/lib.rs | 34 ++ .../src/plugins/sqlite_plugin.rs | 17 +- .../src/executors/outbox_executor.rs | 75 ++++- .../outbox_producer_consumption_job.rs | 12 +- 25 files changed, 907 insertions(+), 433 deletions(-) create mode 100644 src/utils/async-utils/Cargo.toml create mode 100644 src/utils/async-utils/src/lib.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d8ffa5d15..47acd23eda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ Recommendation: for ease of reading, use the following order: - Fixed --> +## [0.204.0] - 2024-09-25 +### Changed +- If not explicitly configured, a SQLite database is used for a multi-tenant workspace +- If a SQLite database is used, built-in migrations are automatically applied +- Start processing added Outbox messages after successful command execution +- DI: `ServerCatalog` added, to split dependencies + ## [0.203.1] - 2024-09-24 ### Added - Added database migration & scripting to create an application user with restricted permissions @@ -167,7 +174,7 @@ Recommendation: for ease of reading, use the following order: ### Changed - Upgraded `sqlx` crate to v0.8 - Renamed `setConfigSchedule` GQL api to `setConfigIngest`. Also extended - `setConfigIngest` with new field `fetchUncacheable` which indicates to ingone cache + `setConfigIngest` with new field `fetchUncacheable` which indicates to ignore cache during ingest step ## [0.195.1] - 2024-08-16 diff --git a/Cargo.lock b/Cargo.lock index 6692c4f495..03e1bb67c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1303,6 +1303,13 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "async-utils" +version = "0.204.0" +dependencies = [ + "async-trait", +] + [[package]] name = "async_io_stream" version = "0.3.3" @@ -2436,7 +2443,7 @@ checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" [[package]] name = "container-runtime" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "cfg-if", @@ -2866,7 +2873,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "database-common" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "aws-config", @@ -2890,7 +2897,7 @@ dependencies = [ [[package]] name = "database-common-macros" -version = "0.203.1" +version = "0.204.0" dependencies = [ "quote", "syn 2.0.77", @@ -3734,7 +3741,7 @@ dependencies = [ [[package]] name = "enum-variants" -version = "0.203.1" +version = "0.204.0" [[package]] name = "env_filter" @@ -3803,7 +3810,7 @@ dependencies = [ [[package]] name = "event-sourcing" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -3819,7 +3826,7 @@ dependencies = [ [[package]] name = "event-sourcing-macros" -version = "0.203.1" +version = "0.204.0" dependencies = [ "quote", "syn 2.0.77", @@ -4481,7 +4488,7 @@ dependencies = [ [[package]] name = "http-common" -version = "0.203.1" +version = "0.204.0" dependencies = [ "axum", "http 1.1.0", @@ -4808,7 +4815,7 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "internal-error" -version = "0.203.1" +version = "0.204.0" dependencies = [ "thiserror", ] @@ -4956,7 +4963,7 @@ dependencies = [ [[package]] name = "kamu" -version = "0.203.1" +version = "0.204.0" dependencies = [ "alloy", "async-recursion", @@ -5043,7 +5050,7 @@ dependencies = [ [[package]] name = "kamu-accounts" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "base32", @@ -5069,7 +5076,7 @@ dependencies = [ [[package]] name = "kamu-accounts-inmem" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5090,7 +5097,7 @@ dependencies = [ [[package]] name = "kamu-accounts-mysql" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5111,7 +5118,7 @@ dependencies = [ [[package]] name = "kamu-accounts-postgres" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5132,7 +5139,7 @@ dependencies = [ [[package]] name = "kamu-accounts-repo-tests" -version = "0.203.1" +version = "0.204.0" dependencies = [ "argon2", "chrono", @@ -5148,7 +5155,7 @@ dependencies = [ [[package]] name = "kamu-accounts-services" -version = "0.203.1" +version = "0.204.0" dependencies = [ "argon2", "async-trait", @@ -5174,7 +5181,7 @@ dependencies = [ [[package]] name = "kamu-accounts-sqlite" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5195,7 +5202,7 @@ dependencies = [ [[package]] name = "kamu-adapter-auth-oso" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "dill", @@ -5217,7 +5224,7 @@ dependencies = [ [[package]] name = "kamu-adapter-flight-sql" -version = "0.203.1" +version = "0.204.0" dependencies = [ "arrow-flight", "async-trait", @@ -5240,7 +5247,7 @@ dependencies = [ [[package]] name = "kamu-adapter-graphql" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-graphql", "async-trait", @@ -5290,7 +5297,7 @@ dependencies = [ [[package]] name = "kamu-adapter-http" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "aws-sdk-s3", @@ -5355,7 +5362,7 @@ dependencies = [ [[package]] name = "kamu-adapter-oauth" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5374,7 +5381,7 @@ dependencies = [ [[package]] name = "kamu-adapter-odata" -version = "0.203.1" +version = "0.204.0" dependencies = [ "axum", "chrono", @@ -5410,7 +5417,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "internal-error", @@ -5422,7 +5429,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac-inmem" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "database-common-macros", @@ -5436,7 +5443,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac-repo-tests" -version = "0.203.1" +version = "0.204.0" dependencies = [ "dill", "kamu-auth-rebac", @@ -5445,7 +5452,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac-services" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "dill", @@ -5464,7 +5471,7 @@ dependencies = [ [[package]] name = "kamu-auth-rebac-sqlite" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "database-common", @@ -5481,12 +5488,13 @@ dependencies = [ [[package]] name = "kamu-cli" -version = "0.203.1" +version = "0.204.0" dependencies = [ "arrow-flight", "async-graphql", "async-graphql-axum", "async-trait", + "async-utils", "axum", "axum-extra", "cfg-if", @@ -5599,7 +5607,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-common" -version = "0.203.1" +version = "0.204.0" dependencies = [ "chrono", "indoc 2.0.5", @@ -5619,7 +5627,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-common-macros" -version = "0.203.1" +version = "0.204.0" dependencies = [ "quote", "syn 2.0.77", @@ -5627,7 +5635,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-inmem" -version = "0.203.1" +version = "0.204.0" dependencies = [ "indoc 2.0.5", "kamu-cli-e2e-common", @@ -5640,7 +5648,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-mysql" -version = "0.203.1" +version = "0.204.0" dependencies = [ "indoc 2.0.5", "kamu-cli-e2e-common", @@ -5654,7 +5662,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-postgres" -version = "0.203.1" +version = "0.204.0" dependencies = [ "indoc 2.0.5", "kamu-cli-e2e-common", @@ -5668,7 +5676,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-repo-tests" -version = "0.203.1" +version = "0.204.0" dependencies = [ "chrono", "indoc 2.0.5", @@ -5684,7 +5692,7 @@ dependencies = [ [[package]] name = "kamu-cli-e2e-sqlite" -version = "0.203.1" +version = "0.204.0" dependencies = [ "indoc 2.0.5", "kamu-cli-e2e-common", @@ -5698,7 +5706,7 @@ dependencies = [ [[package]] name = "kamu-cli-puppet" -version = "0.203.1" +version = "0.204.0" dependencies = [ "assert_cmd", "async-trait", @@ -5714,7 +5722,7 @@ dependencies = [ [[package]] name = "kamu-core" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -5744,7 +5752,7 @@ dependencies = [ [[package]] name = "kamu-data-utils" -version = "0.203.1" +version = "0.204.0" dependencies = [ "arrow", "arrow-digest", @@ -5769,7 +5777,7 @@ dependencies = [ [[package]] name = "kamu-datafusion-cli" -version = "0.203.1" +version = "0.204.0" dependencies = [ "arrow", "async-trait", @@ -5793,7 +5801,7 @@ dependencies = [ [[package]] name = "kamu-datasets" -version = "0.203.1" +version = "0.204.0" dependencies = [ "aes-gcm", "async-trait", @@ -5812,7 +5820,7 @@ dependencies = [ [[package]] name = "kamu-datasets-inmem" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5835,7 +5843,7 @@ dependencies = [ [[package]] name = "kamu-datasets-postgres" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5857,7 +5865,7 @@ dependencies = [ [[package]] name = "kamu-datasets-repo-tests" -version = "0.203.1" +version = "0.204.0" dependencies = [ "chrono", "database-common", @@ -5871,7 +5879,7 @@ dependencies = [ [[package]] name = "kamu-datasets-services" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5892,7 +5900,7 @@ dependencies = [ [[package]] name = "kamu-datasets-sqlite" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5915,7 +5923,7 @@ dependencies = [ [[package]] name = "kamu-flow-system" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -5944,7 +5952,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-inmem" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -5974,7 +5982,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-postgres" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -5999,7 +6007,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-repo-tests" -version = "0.203.1" +version = "0.204.0" dependencies = [ "chrono", "database-common", @@ -6012,7 +6020,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-services" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -6055,7 +6063,7 @@ dependencies = [ [[package]] name = "kamu-flow-system-sqlite" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -6080,7 +6088,7 @@ dependencies = [ [[package]] name = "kamu-ingest-datafusion" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -6116,7 +6124,7 @@ dependencies = [ [[package]] name = "kamu-messaging-outbox-inmem" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -6135,7 +6143,7 @@ dependencies = [ [[package]] name = "kamu-messaging-outbox-postgres" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -6158,7 +6166,7 @@ dependencies = [ [[package]] name = "kamu-messaging-outbox-repo-tests" -version = "0.203.1" +version = "0.204.0" dependencies = [ "chrono", "database-common", @@ -6172,7 +6180,7 @@ dependencies = [ [[package]] name = "kamu-messaging-outbox-sqlite" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -6194,7 +6202,7 @@ dependencies = [ [[package]] name = "kamu-repo-tools" -version = "0.203.1" +version = "0.204.0" dependencies = [ "chrono", "clap", @@ -6209,7 +6217,7 @@ dependencies = [ [[package]] name = "kamu-task-system" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -6227,7 +6235,7 @@ dependencies = [ [[package]] name = "kamu-task-system-inmem" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -6246,7 +6254,7 @@ dependencies = [ [[package]] name = "kamu-task-system-postgres" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -6269,7 +6277,7 @@ dependencies = [ [[package]] name = "kamu-task-system-repo-tests" -version = "0.203.1" +version = "0.204.0" dependencies = [ "chrono", "database-common", @@ -6281,7 +6289,7 @@ dependencies = [ [[package]] name = "kamu-task-system-services" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -6308,7 +6316,7 @@ dependencies = [ [[package]] name = "kamu-task-system-sqlite" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-stream", "async-trait", @@ -6705,7 +6713,7 @@ dependencies = [ [[package]] name = "messaging-outbox" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -6833,7 +6841,7 @@ dependencies = [ [[package]] name = "multiformats" -version = "0.203.1" +version = "0.204.0" dependencies = [ "base64 0.22.1", "bs58", @@ -7158,7 +7166,7 @@ dependencies = [ [[package]] name = "observability" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "axum", @@ -7216,7 +7224,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "opendatafabric" -version = "0.203.1" +version = "0.204.0" dependencies = [ "arrow", "base64 0.22.1", @@ -8175,7 +8183,7 @@ dependencies = [ [[package]] name = "random-names" -version = "0.203.1" +version = "0.204.0" dependencies = [ "rand", ] @@ -9747,7 +9755,7 @@ dependencies = [ [[package]] name = "time-source" -version = "0.203.1" +version = "0.204.0" dependencies = [ "async-trait", "chrono", @@ -10161,7 +10169,7 @@ dependencies = [ [[package]] name = "tracing-perfetto" -version = "0.203.1" +version = "0.204.0" dependencies = [ "conv", "serde", diff --git a/Cargo.toml b/Cargo.toml index 846543c22a..14697eac25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ # Utils + "src/utils/async-utils", "src/utils/container-runtime", "src/utils/data-utils", "src/utils/database-common", @@ -89,92 +90,93 @@ resolver = "2" [workspace.dependencies] # Apps -kamu-cli = { version = "0.203.1", path = "src/app/cli", default-features = false } +kamu-cli = { version = "0.204.0", path = "src/app/cli", default-features = false } # Utils -container-runtime = { version = "0.203.1", path = "src/utils/container-runtime", default-features = false } -database-common = { version = "0.203.1", path = "src/utils/database-common", default-features = false } -database-common-macros = { version = "0.203.1", path = "src/utils/database-common-macros", default-features = false } -enum-variants = { version = "0.203.1", path = "src/utils/enum-variants", default-features = false } -event-sourcing = { version = "0.203.1", path = "src/utils/event-sourcing", default-features = false } -event-sourcing-macros = { version = "0.203.1", path = "src/utils/event-sourcing-macros", default-features = false } -http-common = { version = "0.203.1", path = "src/utils/http-common", default-features = false } -internal-error = { version = "0.203.1", path = "src/utils/internal-error", default-features = false } -kamu-cli-puppet = { version = "0.203.1", path = "src/utils/kamu-cli-puppet", default-features = false } -kamu-data-utils = { version = "0.203.1", path = "src/utils/data-utils", default-features = false } -kamu-datafusion-cli = { version = "0.203.1", path = "src/utils/datafusion-cli", default-features = false } -messaging-outbox = { version = "0.203.1", path = "src/utils/messaging-outbox", default-features = false } -multiformats = { version = "0.203.1", path = "src/utils/multiformats", default-features = false } -observability = { version = "0.203.1", path = "src/utils/observability", default-features = false } -random-names = { version = "0.203.1", path = "src/utils/random-names", default-features = false } -time-source = { version = "0.203.1", path = "src/utils/time-source", default-features = false } -tracing-perfetto = { version = "0.203.1", path = "src/utils/tracing-perfetto", default-features = false } +async-utils = { version = "0.204.0", path = "src/utils/async-utils", default-features = false } +container-runtime = { version = "0.204.0", path = "src/utils/container-runtime", default-features = false } +database-common = { version = "0.204.0", path = "src/utils/database-common", default-features = false } +database-common-macros = { version = "0.204.0", path = "src/utils/database-common-macros", default-features = false } +enum-variants = { version = "0.204.0", path = "src/utils/enum-variants", default-features = false } +event-sourcing = { version = "0.204.0", path = "src/utils/event-sourcing", default-features = false } +event-sourcing-macros = { version = "0.204.0", path = "src/utils/event-sourcing-macros", default-features = false } +http-common = { version = "0.204.0", path = "src/utils/http-common", default-features = false } +internal-error = { version = "0.204.0", path = "src/utils/internal-error", default-features = false } +kamu-cli-puppet = { version = "0.204.0", path = "src/utils/kamu-cli-puppet", default-features = false } +kamu-data-utils = { version = "0.204.0", path = "src/utils/data-utils", default-features = false } +kamu-datafusion-cli = { version = "0.204.0", path = "src/utils/datafusion-cli", default-features = false } +messaging-outbox = { version = "0.204.0", path = "src/utils/messaging-outbox", default-features = false } +multiformats = { version = "0.204.0", path = "src/utils/multiformats", default-features = false } +observability = { version = "0.204.0", path = "src/utils/observability", default-features = false } +random-names = { version = "0.204.0", path = "src/utils/random-names", default-features = false } +time-source = { version = "0.204.0", path = "src/utils/time-source", default-features = false } +tracing-perfetto = { version = "0.204.0", path = "src/utils/tracing-perfetto", default-features = false } # Domain -kamu-accounts = { version = "0.203.1", path = "src/domain/accounts/domain", default-features = false } -kamu-auth-rebac = { version = "0.203.1", path = "src/domain/auth-rebac/domain", default-features = false } -kamu-core = { version = "0.203.1", path = "src/domain/core", default-features = false } -kamu-datasets = { version = "0.203.1", path = "src/domain/datasets/domain", default-features = false } -kamu-flow-system = { version = "0.203.1", path = "src/domain/flow-system/domain", default-features = false } -kamu-task-system = { version = "0.203.1", path = "src/domain/task-system/domain", default-features = false } -opendatafabric = { version = "0.203.1", path = "src/domain/opendatafabric", default-features = false } +kamu-accounts = { version = "0.204.0", path = "src/domain/accounts/domain", default-features = false } +kamu-auth-rebac = { version = "0.204.0", path = "src/domain/auth-rebac/domain", default-features = false } +kamu-core = { version = "0.204.0", path = "src/domain/core", default-features = false } +kamu-datasets = { version = "0.204.0", path = "src/domain/datasets/domain", default-features = false } +kamu-flow-system = { version = "0.204.0", path = "src/domain/flow-system/domain", default-features = false } +kamu-task-system = { version = "0.204.0", path = "src/domain/task-system/domain", default-features = false } +opendatafabric = { version = "0.204.0", path = "src/domain/opendatafabric", default-features = false } # Domain service layer -kamu-accounts-services = { version = "0.203.1", path = "src/domain/accounts/services", default-features = false } -kamu-auth-rebac-services = { version = "0.203.1", path = "src/domain/auth-rebac/services", default-features = false } -kamu-datasets-services = { version = "0.203.1", path = "src/domain/datasets/services", default-features = false } -kamu-flow-system-services = { version = "0.203.1", path = "src/domain/flow-system/services", default-features = false } -kamu-task-system-services = { version = "0.203.1", path = "src/domain/task-system/services", default-features = false } +kamu-accounts-services = { version = "0.204.0", path = "src/domain/accounts/services", default-features = false } +kamu-auth-rebac-services = { version = "0.204.0", path = "src/domain/auth-rebac/services", default-features = false } +kamu-datasets-services = { version = "0.204.0", path = "src/domain/datasets/services", default-features = false } +kamu-flow-system-services = { version = "0.204.0", path = "src/domain/flow-system/services", default-features = false } +kamu-task-system-services = { version = "0.204.0", path = "src/domain/task-system/services", default-features = false } # Infra -kamu = { version = "0.203.1", path = "src/infra/core", default-features = false } -kamu-ingest-datafusion = { version = "0.203.1", path = "src/infra/ingest-datafusion", default-features = false } +kamu = { version = "0.204.0", path = "src/infra/core", default-features = false } +kamu-ingest-datafusion = { version = "0.204.0", path = "src/infra/ingest-datafusion", default-features = false } ## Flow System -kamu-flow-system-repo-tests = { version = "0.203.1", path = "src/infra/flow-system/repo-tests", default-features = false } -kamu-flow-system-inmem = { version = "0.203.1", path = "src/infra/flow-system/inmem", default-features = false } -kamu-flow-system-postgres = { version = "0.203.1", path = "src/infra/flow-system/postgres", default-features = false } -kamu-flow-system-sqlite = { version = "0.203.1", path = "src/infra/flow-system/sqlite", default-features = false } +kamu-flow-system-repo-tests = { version = "0.204.0", path = "src/infra/flow-system/repo-tests", default-features = false } +kamu-flow-system-inmem = { version = "0.204.0", path = "src/infra/flow-system/inmem", default-features = false } +kamu-flow-system-postgres = { version = "0.204.0", path = "src/infra/flow-system/postgres", default-features = false } +kamu-flow-system-sqlite = { version = "0.204.0", path = "src/infra/flow-system/sqlite", default-features = false } ## Accounts -kamu-accounts-inmem = { version = "0.203.1", path = "src/infra/accounts/inmem", default-features = false } -kamu-accounts-mysql = { version = "0.203.1", path = "src/infra/accounts/mysql", default-features = false } -kamu-accounts-postgres = { version = "0.203.1", path = "src/infra/accounts/postgres", default-features = false } -kamu-accounts-sqlite = { version = "0.203.1", path = "src/infra/accounts/sqlite", default-features = false } -kamu-accounts-repo-tests = { version = "0.203.1", path = "src/infra/accounts/repo-tests", default-features = false } +kamu-accounts-inmem = { version = "0.204.0", path = "src/infra/accounts/inmem", default-features = false } +kamu-accounts-mysql = { version = "0.204.0", path = "src/infra/accounts/mysql", default-features = false } +kamu-accounts-postgres = { version = "0.204.0", path = "src/infra/accounts/postgres", default-features = false } +kamu-accounts-sqlite = { version = "0.204.0", path = "src/infra/accounts/sqlite", default-features = false } +kamu-accounts-repo-tests = { version = "0.204.0", path = "src/infra/accounts/repo-tests", default-features = false } ## Datasets -kamu-datasets-inmem = { version = "0.203.1", path = "src/infra/datasets/inmem", default-features = false } -kamu-datasets-postgres = { version = "0.203.1", path = "src/infra/datasets/postgres", default-features = false } -kamu-datasets-sqlite = { version = "0.203.1", path = "src/infra/datasets/sqlite", default-features = false } -kamu-datasets-repo-tests = { version = "0.203.1", path = "src/infra/datasets/repo-tests", default-features = false } +kamu-datasets-inmem = { version = "0.204.0", path = "src/infra/datasets/inmem", default-features = false } +kamu-datasets-postgres = { version = "0.204.0", path = "src/infra/datasets/postgres", default-features = false } +kamu-datasets-sqlite = { version = "0.204.0", path = "src/infra/datasets/sqlite", default-features = false } +kamu-datasets-repo-tests = { version = "0.204.0", path = "src/infra/datasets/repo-tests", default-features = false } ## Task System -kamu-task-system-inmem = { version = "0.203.1", path = "src/infra/task-system/inmem", default-features = false } -kamu-task-system-postgres = { version = "0.203.1", path = "src/infra/task-system/postgres", default-features = false } -kamu-task-system-sqlite = { version = "0.203.1", path = "src/infra/task-system/sqlite", default-features = false } -kamu-task-system-repo-tests = { version = "0.203.1", path = "src/infra/task-system/repo-tests", default-features = false } +kamu-task-system-inmem = { version = "0.204.0", path = "src/infra/task-system/inmem", default-features = false } +kamu-task-system-postgres = { version = "0.204.0", path = "src/infra/task-system/postgres", default-features = false } +kamu-task-system-sqlite = { version = "0.204.0", path = "src/infra/task-system/sqlite", default-features = false } +kamu-task-system-repo-tests = { version = "0.204.0", path = "src/infra/task-system/repo-tests", default-features = false } ## ReBAC -kamu-auth-rebac-inmem = { version = "0.203.1", path = "src/infra/auth-rebac/inmem", default-features = false } -kamu-auth-rebac-repo-tests = { version = "0.203.1", path = "src/infra/auth-rebac/repo-tests", default-features = false } -kamu-auth-rebac-sqlite = { version = "0.203.1", path = "src/infra/auth-rebac/sqlite", default-features = false } +kamu-auth-rebac-inmem = { version = "0.204.0", path = "src/infra/auth-rebac/inmem", default-features = false } +kamu-auth-rebac-repo-tests = { version = "0.204.0", path = "src/infra/auth-rebac/repo-tests", default-features = false } +kamu-auth-rebac-sqlite = { version = "0.204.0", path = "src/infra/auth-rebac/sqlite", default-features = false } ## Outbox -kamu-messaging-outbox-inmem = { version = "0.203.1", path = "src/infra/messaging-outbox/inmem", default-features = false } -kamu-messaging-outbox-postgres = { version = "0.203.1", path = "src/infra/messaging-outbox/postgres", default-features = false } -kamu-messaging-outbox-sqlite = { version = "0.203.1", path = "src/infra/messaging-outbox/sqlite", default-features = false } -kamu-messaging-outbox-repo-tests = { version = "0.203.1", path = "src/infra/messaging-outbox/repo-tests", default-features = false } +kamu-messaging-outbox-inmem = { version = "0.204.0", path = "src/infra/messaging-outbox/inmem", default-features = false } +kamu-messaging-outbox-postgres = { version = "0.204.0", path = "src/infra/messaging-outbox/postgres", default-features = false } +kamu-messaging-outbox-sqlite = { version = "0.204.0", path = "src/infra/messaging-outbox/sqlite", default-features = false } +kamu-messaging-outbox-repo-tests = { version = "0.204.0", path = "src/infra/messaging-outbox/repo-tests", default-features = false } # Adapters -kamu-adapter-auth-oso = { version = "0.203.1", path = "src/adapter/auth-oso", default-features = false } -kamu-adapter-flight-sql = { version = "0.203.1", path = "src/adapter/flight-sql", default-features = false } -kamu-adapter-graphql = { version = "0.203.1", path = "src/adapter/graphql", default-features = false } -kamu-adapter-http = { version = "0.203.1", path = "src/adapter/http", default-features = false } -kamu-adapter-odata = { version = "0.203.1", path = "src/adapter/odata", default-features = false } -kamu-adapter-oauth = { version = "0.203.1", path = "src/adapter/oauth", default-features = false } +kamu-adapter-auth-oso = { version = "0.204.0", path = "src/adapter/auth-oso", default-features = false } +kamu-adapter-flight-sql = { version = "0.204.0", path = "src/adapter/flight-sql", default-features = false } +kamu-adapter-graphql = { version = "0.204.0", path = "src/adapter/graphql", default-features = false } +kamu-adapter-http = { version = "0.204.0", path = "src/adapter/http", default-features = false } +kamu-adapter-odata = { version = "0.204.0", path = "src/adapter/odata", default-features = false } +kamu-adapter-oauth = { version = "0.204.0", path = "src/adapter/oauth", default-features = false } # E2E -kamu-cli-e2e-common = { version = "0.203.1", path = "src/e2e/app/cli/common", default-features = false } -kamu-cli-e2e-common-macros = { version = "0.203.1", path = "src/e2e/app/cli/common-macros", default-features = false } -kamu-cli-e2e-repo-tests = { version = "0.203.1", path = "src/e2e/app/cli/repo-tests", default-features = false } +kamu-cli-e2e-common = { version = "0.204.0", path = "src/e2e/app/cli/common", default-features = false } +kamu-cli-e2e-common-macros = { version = "0.204.0", path = "src/e2e/app/cli/common-macros", default-features = false } +kamu-cli-e2e-repo-tests = { version = "0.204.0", path = "src/e2e/app/cli/repo-tests", default-features = false } [workspace.package] -version = "0.203.1" +version = "0.204.0" edition = "2021" homepage = "https://github.com/kamu-data/kamu-cli" repository = "https://github.com/kamu-data/kamu-cli" diff --git a/LICENSE.txt b/LICENSE.txt index 0a47e82621..3a79e4e50e 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -11,7 +11,7 @@ Business Source License 1.1 Licensor: Kamu Data, Inc. -Licensed Work: Kamu CLI Version 0.203.1 +Licensed Work: Kamu CLI Version 0.204.0 The Licensed Work is © 2023 Kamu Data, Inc. Additional Use Grant: You may use the Licensed Work for any purpose, @@ -24,7 +24,7 @@ Additional Use Grant: You may use the Licensed Work for any purpose, Licensed Work where data or transformations are controlled by such third parties. -Change Date: 2028-09-22 +Change Date: 2028-09-25 Change License: Apache License, Version 2.0 diff --git a/src/app/cli/Cargo.toml b/src/app/cli/Cargo.toml index 3da720a179..3c2c7e25c8 100644 --- a/src/app/cli/Cargo.toml +++ b/src/app/cli/Cargo.toml @@ -41,6 +41,7 @@ web-ui = ["rust-embed"] [dependencies] # Kamu +async-utils = { workspace = true } container-runtime = { workspace = true } database-common = { workspace = true } database-common-macros = { workspace = true } @@ -180,11 +181,11 @@ regex = "1" secrecy = "0.10" shlex = "1" # Parsing partial input for custom completions signal-hook = "0.3" # Signal handling +tempfile = "3" +thiserror = { version = "1", default-features = false } tokio = { version = "1", default-features = false, features = ["io-util"] } tokio-stream = { version = "0.1", default-features = false, features = ["net"] } tokio-util = { version = "0.7", default-features = false, features = ["io"] } -tempfile = "3" -thiserror = { version = "1", default-features = false } url = "2" urlencoding = "2" whoami = "1.5" diff --git a/src/app/cli/build.rs b/src/app/cli/build.rs index a3c18e9125..35ae3ebcc0 100644 --- a/src/app/cli/build.rs +++ b/src/app/cli/build.rs @@ -9,14 +9,27 @@ use std::error::Error; -use vergen::EmitBuilder; - fn main() -> Result<(), Box> { - EmitBuilder::builder() + // Preparing the build information + vergen::EmitBuilder::builder() .all_build() .all_git() .all_rustc() .all_cargo() + .fail_on_error() .emit()?; + + // sqlx will cause kamu-cli to be rebuilt if already embedded migrations have + // changed. + // + // But if the migration was previously missing, the rebuild would not be + // triggered. + // + // To solve this, we tell the compiler to perform a rebuild + // in case of the migrations folder content has changed. + // + // NB. Working path: ./src/app/cli + println!("cargo:rerun-if-changed=../../../migrations/sqlite"); + Ok(()) } diff --git a/src/app/cli/src/app.rs b/src/app/cli/src/app.rs index a54f2930a5..55cced9785 100644 --- a/src/app/cli/src/app.rs +++ b/src/app/cli/src/app.rs @@ -11,11 +11,12 @@ use std::future::Future; use std::path::Path; use std::sync::Arc; +use async_utils::ResultAsync; use chrono::{DateTime, Duration, Utc}; use container_runtime::{ContainerRuntime, ContainerRuntimeConfig}; use database_common::DatabaseTransactionRunner; use dill::*; -use internal_error::InternalError; +use internal_error::{InternalError, ResultIntoInternal}; use kamu::domain::*; use kamu::*; use kamu_accounts::*; @@ -24,17 +25,26 @@ use kamu_adapter_http::{FileUploadLimitConfig, UploadServiceLocal}; use kamu_adapter_oauth::GithubAuthenticationConfig; use kamu_auth_rebac_services::{MultiTenantRebacDatasetLifecycleMessageConsumer, RebacServiceImpl}; use kamu_datasets::DatasetEnvVar; -use kamu_flow_system_inmem::domain::{FlowConfigurationUpdatedMessage, FlowProgressMessage}; +use kamu_flow_system_inmem::domain::{ + FlowConfigurationUpdatedMessage, + FlowExecutor, + FlowProgressMessage, +}; use kamu_flow_system_services::{ MESSAGE_PRODUCER_KAMU_FLOW_CONFIGURATION_SERVICE, MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, }; -use kamu_task_system_inmem::domain::{TaskProgressMessage, MESSAGE_PRODUCER_KAMU_TASK_EXECUTOR}; +use kamu_task_system_inmem::domain::{ + TaskExecutor, + TaskProgressMessage, + MESSAGE_PRODUCER_KAMU_TASK_EXECUTOR, +}; use messaging_outbox::{register_message_dispatcher, Outbox, OutboxDispatchingImpl}; use time_source::{SystemTimeSource, SystemTimeSourceDefault, SystemTimeSourceStub}; use tracing::{warn, Instrument}; use crate::accounts::AccountService; +use crate::cli::Command; use crate::error::*; use crate::explore::TraceServer; use crate::output::*; @@ -45,6 +55,8 @@ use crate::{ configure_database_components, configure_in_memory_components, connect_database_initially, + get_app_database_config, + move_initial_database_to_workspace_if_needed, odf_server, spawn_password_refreshing_job, try_build_db_connection_settings, @@ -81,8 +93,11 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() // Sometimes (in the case of predefined users), we need to know whether the // workspace to be created will be multi-tenant or not right away, even before // the `kamu init` command itself is processed. - let init_multi_tenant_workspace = - matches!(&args.command, cli::Command::Init(c) if c.multi_tenant); + let maybe_init_multi_tenant_flag = match &args.command { + Command::Init(c) => Some(c.multi_tenant), + _ => None, + }; + let init_multi_tenant_workspace = maybe_init_multi_tenant_flag == Some(true); let workspace_svc = WorkspaceService::new( Arc::new(workspace_layout.clone()), init_multi_tenant_workspace, @@ -99,13 +114,20 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() prepare_run_dir(&workspace_layout.run_info_dir); - let maybe_db_connection_settings = config - .database - .clone() + let is_init_command = maybe_init_multi_tenant_flag.is_some(); + let app_database_config = get_app_database_config( + &workspace_layout, + &config, + is_multi_tenant_workspace, + is_init_command, + ); + let (database_config, maybe_temp_database_path) = app_database_config.into_inner(); + let maybe_db_connection_settings = database_config + .as_ref() .and_then(try_build_db_connection_settings); // Configure application - let (guards, base_catalog, cli_catalog, output_config) = { + let (guards, base_catalog, cli_catalog, maybe_server_catalog, output_config) = { let dependencies_graph_repository = prepare_dependencies_graph_repository( &workspace_layout, is_multi_tenant_workspace, @@ -125,7 +147,7 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() if let Some(db_connection_settings) = maybe_db_connection_settings.as_ref() { configure_database_components( &mut base_catalog_builder, - config.database.as_ref().unwrap(), + database_config.as_ref().unwrap(), db_connection_settings.clone(), ); } else { @@ -158,7 +180,7 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() let base_catalog = base_catalog_builder.build(); // Database requires extra actions: - let final_base_catalog = if let Some(db_config) = config.database { + let final_base_catalog = if let Some(db_config) = database_config { // Connect database and obtain a connection pool let catalog_with_pool = connect_database_initially(&base_catalog).await?; @@ -173,10 +195,35 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() let cli_catalog = configure_cli_catalog(&final_base_catalog, is_multi_tenant_workspace) .add_value(current_account.to_current_account_subject()) .build(); + let maybe_server_catalog = if cli_commands::command_needs_server_components(&args) { + let server_catalog = + configure_server_catalog(&final_base_catalog, is_multi_tenant_workspace) + .add_value(current_account.to_current_account_subject()) + .build(); - (guards, final_base_catalog, cli_catalog, output_config) + Some(server_catalog) + } else { + None + }; + + ( + guards, + final_base_catalog, + cli_catalog, + maybe_server_catalog, + output_config, + ) }; + if let Err(e) = initialize_base_components(&base_catalog).await { + tracing::error!( + error_dbg = ?e, + error = %e.pretty(true), + "Initialize base components failed", + ); + return Err(e); + } + // Register metrics let metrics_registry = observability::metrics::register_all(&cli_catalog); @@ -185,14 +232,12 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() cli_catalog.get_one::()?.evict_cache()?; } - // Initialize components - match initialize_components(&cli_catalog).await { - Ok(_) => {} - Err(e) => { + if let Some(server_catalog) = &maybe_server_catalog { + if let Err(e) = initialize_server_components(server_catalog).await { tracing::error!( error_dbg = ?e, error = %e.pretty(true), - "Initialize components failed", + "Initialize server components failed", ); return Err(e); } @@ -200,10 +245,11 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() let is_transactional = maybe_db_connection_settings.is_some() && cli_commands::command_needs_transaction(&args); + let work_catalog = maybe_server_catalog.unwrap_or(cli_catalog); - let command_result: Result<(), CLIError> = maybe_transactional( + let mut command_result: Result<(), CLIError> = maybe_transactional( is_transactional, - cli_catalog, + work_catalog.clone(), |catalog: Catalog| async move { let mut command = cli_commands::get_command(&base_catalog, &catalog, args)?; @@ -234,6 +280,32 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() .instrument(tracing::debug_span!("app::run_command")) .await; + command_result = command_result + // If successful, then process the Outbox messages while they are present + .and_then_async(|_| async { + let outbox_executor = work_catalog.get_one::()?; + + outbox_executor + .run_while_has_tasks() + .await + .map_err(CLIError::critical) + }) + .instrument(tracing::debug_span!( + "Consume accumulated the Outbox messages" + )) + .await + // If we had a temporary directory, we move the database from it to the expected + // location. + .and_then_async(|_| async { + move_initial_database_to_workspace_if_needed( + &workspace_layout, + maybe_temp_database_path, + ) + .await + .map_int_err(CLIError::critical) + }) + .await; + match &command_result { Ok(()) => { tracing::info!("Command successful"); @@ -274,7 +346,7 @@ pub async fn run(workspace_layout: WorkspaceLayout, args: cli::Cli) -> Result<() async fn maybe_transactional( transactional: bool, - cli_catalog: Catalog, + work_catalog: Catalog, f: F, ) -> Result where @@ -283,9 +355,9 @@ where RE: From, { if !transactional { - f(cli_catalog).await + f(work_catalog).await } else { - let transaction_runner = DatabaseTransactionRunner::new(cli_catalog); + let transaction_runner = DatabaseTransactionRunner::new(work_catalog); transaction_runner .transactional(|transactional_catalog| async move { f(transactional_catalog).await }) @@ -360,8 +432,6 @@ pub fn configure_base_catalog( b.add::(); - b.add::(); - b.add::(); b.add::(); @@ -406,9 +476,6 @@ pub fn configure_base_catalog( b.add::(); - b.add::(); - b.add::(); - b.add::(); b.add::(); b.add::(); @@ -416,14 +483,6 @@ pub fn configure_base_catalog( b.add::(); b.add::(); - kamu_task_system_services::register_dependencies(&mut b); - - b.add_value(kamu_flow_system_inmem::domain::FlowExecutorConfig::new( - chrono::Duration::seconds(1), - chrono::Duration::minutes(1), - )); - kamu_flow_system_services::register_dependencies(&mut b); - b.add::(); // No GitHub login possible for single-tenant workspace @@ -446,8 +505,6 @@ pub fn configure_base_catalog( b.add::(); b.add::(); - b.add::(); - b.add::(); b.add::(); @@ -470,15 +527,6 @@ pub fn configure_base_catalog( &mut b, MESSAGE_PRODUCER_KAMU_CORE_DATASET_SERVICE, ); - register_message_dispatcher::(&mut b, MESSAGE_PRODUCER_KAMU_TASK_EXECUTOR); - register_message_dispatcher::( - &mut b, - MESSAGE_PRODUCER_KAMU_FLOW_CONFIGURATION_SERVICE, - ); - register_message_dispatcher::( - &mut b, - MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, - ); b } @@ -498,32 +546,105 @@ pub fn configure_cli_catalog( b } -#[tracing::instrument(level = "info", skip_all)] -async fn initialize_components(cli_catalog: &Catalog) -> Result<(), CLIError> { - // TODO: Generalize on-startup initialization into a trait - DatabaseTransactionRunner::new(cli_catalog.clone()) - .transactional(|transactional_catalog| async move { - let registrator = transactional_catalog - .get_one::() - .map_err(CLIError::critical)?; +// Public only for tests +pub fn configure_server_catalog( + base_catalog: &Catalog, + multi_tenant_workspace: bool, +) -> CatalogBuilder { + let mut b = CatalogBuilder::new_chained(base_catalog); - registrator - .ensure_predefined_accounts_are_registered() - .await - .map_err(CLIError::critical)?; + b.add_builder(WorkspaceService::builder().with_multi_tenant(multi_tenant_workspace)); - let initializer = transactional_catalog - .get_one::() - .map_err(CLIError::critical)?; + b.add::(); - initializer - .eager_initialization() - .await - .map_err(CLIError::critical) + b.add::(); + b.add::(); + + kamu_task_system_services::register_dependencies(&mut b); + + b.add_value(kamu_flow_system_inmem::domain::FlowExecutorConfig::new( + Duration::seconds(1), + Duration::minutes(1), + )); + kamu_flow_system_services::register_dependencies(&mut b); + + b.add::(); + + register_message_dispatcher::( + &mut b, + MESSAGE_PRODUCER_KAMU_FLOW_PROGRESS_SERVICE, + ); + register_message_dispatcher::( + &mut b, + MESSAGE_PRODUCER_KAMU_FLOW_CONFIGURATION_SERVICE, + ); + + register_message_dispatcher::(&mut b, MESSAGE_PRODUCER_KAMU_TASK_EXECUTOR); + + b +} + +#[tracing::instrument(level = "debug", skip_all)] +async fn initialize_server_components(server_catalog: &Catalog) -> Result<(), CLIError> { + // TODO: Generalize on-startup initialization into a trait + DatabaseTransactionRunner::new(server_catalog.clone()) + .transactional(|transactional_catalog| async move { + { + let registrator = transactional_catalog + .get_one::() + .map_err(CLIError::critical)?; + + registrator + .ensure_predefined_accounts_are_registered() + .await + .map_err(CLIError::critical)?; + } + { + let initializer = transactional_catalog + .get_one::() + .map_err(CLIError::critical)?; + + initializer + .eager_initialization() + .await + .map_err(CLIError::critical) + } }) - .instrument(tracing::debug_span!("app::initialize_components")) .await?; + // Have their own transactions + { + let task_executor = server_catalog + .get_one::() + .map_err(CLIError::critical)?; + + task_executor.pre_run().await.map_err(CLIError::critical)?; + } + { + let flow_executor = server_catalog + .get_one::() + .map_err(CLIError::critical)?; + let time_source = server_catalog + .get_one::() + .map_err(CLIError::critical)?; + + flow_executor + .pre_run(time_source.now()) + .await + .map_err(CLIError::critical) + } +} + +#[tracing::instrument(level = "debug", skip_all)] +async fn initialize_base_components(base_catalog: &Catalog) -> Result<(), CLIError> { + // TODO: Generalize on-startup initialization into a trait + let outbox_executor = base_catalog.get_one::()?; + + outbox_executor + .pre_run() + .await + .map_err(CLIError::critical)?; + Ok(()) } @@ -733,17 +854,12 @@ fn prepare_run_dir(run_dir: &Path) { if run_dir.exists() { std::fs::remove_dir_all(run_dir).unwrap_or_else(|e| { panic!( - "Unable to clean up run directory {}: {}", + "Unable to clean up run directory {}: {e}", run_dir.display(), - e ) }); std::fs::create_dir(run_dir).unwrap_or_else(|e| { - panic!( - "Unable to create run directory {}: {}", - run_dir.display(), - e - ) + panic!("Unable to create run directory {}: {e}", run_dir.display(),) }); } } diff --git a/src/app/cli/src/cli.rs b/src/app/cli/src/cli.rs index 1d789a2224..7a8c27b9f0 100644 --- a/src/app/cli/src/cli.rs +++ b/src/app/cli/src/cli.rs @@ -580,7 +580,7 @@ pub struct List { #[arg(long, hide = true)] pub target_account: Option, - /// List accesssible datasets of all accounts + /// List accessible datasets of all accounts #[arg(long, hide = true)] pub all_accounts: bool, } diff --git a/src/app/cli/src/cli_commands.rs b/src/app/cli/src/cli_commands.rs index 33b1350af4..5abd258395 100644 --- a/src/app/cli/src/cli_commands.rs +++ b/src/app/cli/src/cli_commands.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use clap::CommandFactory as _; +use dill::Catalog; use kamu_accounts::CurrentAccountSubject; use opendatafabric::*; @@ -17,89 +18,89 @@ use crate::{accounts, cli, odf_server, WorkspaceService}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub fn get_command( - base_catalog: &dill::Catalog, - cli_catalog: &dill::Catalog, + base_catalog: &Catalog, + work_catalog: &Catalog, args: cli::Cli, ) -> Result, CLIError> { let command: Box = match args.command { cli::Command::Add(c) => Box::new(AddCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.manifest, c.name, c.recursive, c.replace, c.stdin, c.visibility.into(), - cli_catalog.get_one()?, + work_catalog.get_one()?, )), cli::Command::Complete(c) => { - let workspace_svc = cli_catalog.get_one::()?; + let workspace_svc = work_catalog.get_one::()?; let in_workspace = workspace_svc.is_in_workspace() && !workspace_svc.is_upgrade_needed()?; Box::new(CompleteCommand::new( if in_workspace { - Some(cli_catalog.get_one()?) + Some(work_catalog.get_one()?) } else { None }, if in_workspace { - Some(cli_catalog.get_one()?) + Some(work_catalog.get_one()?) } else { None }, if in_workspace { - Some(cli_catalog.get_one()?) + Some(work_catalog.get_one()?) } else { None }, - cli_catalog.get_one()?, - crate::cli::Cli::command(), + work_catalog.get_one()?, + cli::Cli::command(), c.input, c.current, )) } cli::Command::Completions(c) => { - Box::new(CompletionsCommand::new(crate::cli::Cli::command(), c.shell)) + Box::new(CompletionsCommand::new(cli::Cli::command(), c.shell)) } cli::Command::Config(c) => match c.subcommand { cli::ConfigSubCommand::List(sc) => Box::new(ConfigListCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, sc.user, sc.with_defaults, )), cli::ConfigSubCommand::Get(sc) => Box::new(ConfigGetCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, sc.user, sc.with_defaults, sc.cfgkey, )), cli::ConfigSubCommand::Set(sc) => Box::new(ConfigSetCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, sc.user, sc.cfgkey, sc.value, )), }, cli::Command::Delete(c) => Box::new(DeleteCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - validate_many_dataset_patterns(cli_catalog, c.dataset)?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + validate_many_dataset_patterns(work_catalog, c.dataset)?, + work_catalog.get_one()?, c.all, c.recursive, c.yes, )), cli::Command::Ingest(c) => Box::new(IngestCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - validate_dataset_ref(cli_catalog, c.dataset)?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + validate_dataset_ref(work_catalog, c.dataset)?, c.file.unwrap_or_default(), c.source_name, c.event_time, @@ -110,15 +111,15 @@ pub fn get_command( cli::Command::Init(c) => { if c.pull_images { Box::new(PullImagesCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.list_only, )) } else { Box::new(InitCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.exists_ok, c.multi_tenant, )) @@ -126,34 +127,34 @@ pub fn get_command( } cli::Command::Inspect(c) => match c.subcommand { cli::InspectSubCommand::Lineage(sc) => Box::new(LineageCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - validate_many_dataset_refs(cli_catalog, sc.dataset)?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + validate_many_dataset_refs(work_catalog, sc.dataset)?, sc.browse, sc.output_format, - cli_catalog.get_one()?, + work_catalog.get_one()?, )), cli::InspectSubCommand::Query(sc) => Box::new(InspectQueryCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - validate_dataset_ref(cli_catalog, sc.dataset)?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + validate_dataset_ref(work_catalog, sc.dataset)?, + work_catalog.get_one()?, )), cli::InspectSubCommand::Schema(sc) => Box::new(InspectSchemaCommand::new( - cli_catalog.get_one()?, - validate_dataset_ref(cli_catalog, sc.dataset)?, + work_catalog.get_one()?, + validate_dataset_ref(work_catalog, sc.dataset)?, sc.output_format, sc.from_data_file, )), }, cli::Command::List(c) => { - let workspace_svc = cli_catalog.get_one::()?; - let user_config = cli_catalog.get_one::()?; + let workspace_svc = work_catalog.get_one::()?; + let user_config = work_catalog.get_one::()?; Box::new(ListCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, accounts::AccountService::current_account_indication( args.account, workspace_svc.is_multi_tenant_workspace(), @@ -163,23 +164,23 @@ pub fn get_command( c.target_account, c.all_accounts, ), - cli_catalog.get_one()?, + work_catalog.get_one()?, c.wide, )) } cli::Command::Log(c) => Box::new(LogCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - validate_dataset_ref(cli_catalog, c.dataset)?, + work_catalog.get_one()?, + work_catalog.get_one()?, + validate_dataset_ref(work_catalog, c.dataset)?, c.output_format, c.filter, c.limit, - cli_catalog.get_one()?, + work_catalog.get_one()?, )), cli::Command::Login(c) => match c.subcommand { Some(cli::LoginSubCommand::Oauth(sc)) => Box::new(LoginSilentCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, if sc.user { odf_server::AccessTokenStoreScope::User } else { @@ -192,8 +193,8 @@ pub fn get_command( }), )), Some(cli::LoginSubCommand::Password(sc)) => Box::new(LoginSilentCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, if sc.user { odf_server::AccessTokenStoreScope::User } else { @@ -206,9 +207,9 @@ pub fn get_command( }), )), None => Box::new(LoginCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, if c.user { odf_server::AccessTokenStoreScope::User } else { @@ -220,7 +221,7 @@ pub fn get_command( )), }, cli::Command::Logout(c) => Box::new(LogoutCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, if c.user { odf_server::AccessTokenStoreScope::User } else { @@ -236,10 +237,10 @@ pub fn get_command( None::<&str>, )), cli::Command::Notebook(c) => Box::new(NotebookCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.address, c.http_port, c.env.unwrap_or_default(), @@ -247,9 +248,9 @@ pub fn get_command( cli::Command::Pull(c) => { if let Some(set_watermark) = c.set_watermark { Box::new(SetWatermarkCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.dataset.unwrap_or_default(), c.all, c.recursive, @@ -257,12 +258,12 @@ pub fn get_command( )) } else { Box::new(PullCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.dataset.unwrap_or_default(), - cli_catalog.get_one()?, + work_catalog.get_one()?, c.all, c.recursive, c.fetch_uncacheable, @@ -274,51 +275,51 @@ pub fn get_command( } } cli::Command::Push(c) => Box::new(PushCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.dataset.unwrap_or_default(), - cli_catalog.get_one()?, + work_catalog.get_one()?, c.all, c.recursive, !c.no_alias, c.force, c.to, c.visibility.into(), - cli_catalog.get_one()?, + work_catalog.get_one()?, )), cli::Command::Rename(c) => Box::new(RenameCommand::new( - cli_catalog.get_one()?, - validate_dataset_ref(cli_catalog, c.dataset)?, + work_catalog.get_one()?, + validate_dataset_ref(work_catalog, c.dataset)?, c.name, )), cli::Command::Repo(c) => match c.subcommand { cli::RepoSubCommand::Add(sc) => Box::new(RepositoryAddCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, sc.name, sc.url, )), cli::RepoSubCommand::Delete(sc) => Box::new(RepositoryDeleteCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, sc.repository.unwrap_or_default(), sc.all, sc.yes, )), cli::RepoSubCommand::List(_) => Box::new(RepositoryListCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, )), cli::RepoSubCommand::Alias(sc) => match sc.subcommand { cli::RepoAliasSubCommand::Add(ssc) => Box::new(AliasAddCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, ssc.dataset, ssc.alias, ssc.pull, ssc.push, )), cli::RepoAliasSubCommand::Delete(ssc) => Box::new(AliasDeleteCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, ssc.dataset, ssc.alias, ssc.all, @@ -326,33 +327,33 @@ pub fn get_command( ssc.push, )), cli::RepoAliasSubCommand::List(ssc) => Box::new(AliasListCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, ssc.dataset, )), }, }, cli::Command::Reset(c) => Box::new(ResetCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - validate_dataset_ref(cli_catalog, c.dataset)?, + work_catalog.get_one()?, + work_catalog.get_one()?, + validate_dataset_ref(work_catalog, c.dataset)?, c.hash, c.yes, )), cli::Command::Search(c) => Box::new(SearchCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.query, c.repo.unwrap_or_default(), )), cli::Command::Sql(c) => match c.subcommand { None => Box::new(SqlShellCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.command, c.url, c.engine, @@ -360,10 +361,10 @@ pub fn get_command( Some(cli::SqlSubCommand::Server(sc)) => { if sc.livy { Box::new(SqlServerLivyCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, sc.address, sc.port, )) @@ -373,7 +374,7 @@ pub fn get_command( Box::new(SqlServerFlightSqlCommand::new( sc.address, sc.port, - cli_catalog.get_one()?, + work_catalog.get_one()?, )) } else { return Err(CLIError::usage_error("Kamu was compiled without Flight SQL support")) @@ -381,10 +382,10 @@ pub fn get_command( } } else { Box::new(SqlServerCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, sc.address, sc.port, )) @@ -394,20 +395,20 @@ pub fn get_command( cli::Command::System(c) => match c.subcommand { cli::SystemSubCommand::ApiServer(sc) => match sc.subcommand { None => { - let workspace_svc = cli_catalog.get_one::()?; + let workspace_svc = work_catalog.get_one::()?; Box::new(APIServerRunCommand::new( base_catalog.clone(), - cli_catalog.clone(), + work_catalog.clone(), workspace_svc.is_multi_tenant_workspace(), - cli_catalog.get_one()?, + work_catalog.get_one()?, sc.address, sc.http_port, sc.external_address, sc.get_token, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, args.e2e_output_data_path, )) } @@ -419,10 +420,10 @@ pub fn get_command( } }, cli::SystemSubCommand::Compact(sc) => Box::new(CompactCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - validate_dataset_ref(cli_catalog, sc.dataset)?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + validate_dataset_ref(work_catalog, sc.dataset)?, sc.max_slice_size, sc.max_slice_records, sc.hard, @@ -430,53 +431,53 @@ pub fn get_command( sc.keep_metadata_only, )), cli::SystemSubCommand::Diagnose(_) => Box::new(SystemDiagnoseCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, )), cli::SystemSubCommand::DebugToken(sc) => { - Box::new(DebugTokenCommand::new(cli_catalog.get_one()?, sc.token)) + Box::new(DebugTokenCommand::new(work_catalog.get_one()?, sc.token)) } cli::SystemSubCommand::E2e(sc) => Box::new(SystemE2ECommand::new( sc.action, sc.dataset, - cli_catalog.get_one()?, + work_catalog.get_one()?, )), - cli::SystemSubCommand::Gc(_) => Box::new(GcCommand::new(cli_catalog.get_one()?)), + cli::SystemSubCommand::Gc(_) => Box::new(GcCommand::new(work_catalog.get_one()?)), cli::SystemSubCommand::GenerateToken(sc) => Box::new(GenerateTokenCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, sc.login, sc.subject, sc.expiration_time_sec, )), cli::SystemSubCommand::Info(sc) => Box::new(SystemInfoCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, sc.output_format, )), cli::SystemSubCommand::Ipfs(sc) => match sc.subcommand { cli::SystemIpfsSubCommand::Add(ssc) => Box::new(SystemIpfsAddCommand::new( - cli_catalog.get_one()?, + work_catalog.get_one()?, ssc.dataset, )), }, cli::SystemSubCommand::UpgradeWorkspace(_) => { - Box::new(UpgradeWorkspaceCommand::new(cli_catalog.get_one()?)) + Box::new(UpgradeWorkspaceCommand::new(work_catalog.get_one()?)) } }, cli::Command::Tail(c) => Box::new(TailCommand::new( - cli_catalog.get_one()?, - validate_dataset_ref(cli_catalog, c.dataset)?, + work_catalog.get_one()?, + validate_dataset_ref(work_catalog, c.dataset)?, c.skip_records, c.num_records, - cli_catalog.get_one()?, + work_catalog.get_one()?, )), cli::Command::Ui(c) => { - let workspace_svc = cli_catalog.get_one::()?; + let workspace_svc = work_catalog.get_one::()?; - let current_account_subject = cli_catalog.get_one::()?; + let current_account_subject = work_catalog.get_one::()?; let current_account_name = match current_account_subject.as_ref() { CurrentAccountSubject::Logged(l) => l.account_name.clone(), @@ -489,29 +490,30 @@ pub fn get_command( base_catalog.clone(), workspace_svc.is_multi_tenant_workspace(), current_account_name, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, c.address, c.http_port, c.get_token, )) } cli::Command::Verify(c) => Box::new(VerifyCommand::new( - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - cli_catalog.get_one()?, - validate_many_dataset_patterns(cli_catalog, c.dataset)?.into_iter(), + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + work_catalog.get_one()?, + validate_many_dataset_patterns(work_catalog, c.dataset)?.into_iter(), c.recursive, c.integrity, )), - cli::Command::Version(c) => { - Box::new(VersionCommand::new(cli_catalog.get_one()?, c.output_format)) - } + cli::Command::Version(c) => Box::new(VersionCommand::new( + work_catalog.get_one()?, + c.output_format, + )), }; Ok(command) @@ -524,7 +526,19 @@ pub fn command_needs_transaction(args: &cli::Cli) -> bool { cli::SystemSubCommand::GenerateToken(_) => true, _ => false, }, - cli::Command::Delete(_) => true, + cli::Command::Add(_) | cli::Command::Delete(_) => true, + _ => false, + } +} + +#[allow(clippy::match_like_matches_macro)] +pub fn command_needs_server_components(args: &cli::Cli) -> bool { + match &args.command { + cli::Command::System(c) => match &c.subcommand { + cli::SystemSubCommand::ApiServer(_) => true, + _ => false, + }, + cli::Command::Ui(_) => true, _ => false, } } diff --git a/src/app/cli/src/commands/init_command.rs b/src/app/cli/src/commands/init_command.rs index 34d5a88284..433e8de406 100644 --- a/src/app/cli/src/commands/init_command.rs +++ b/src/app/cli/src/commands/init_command.rs @@ -41,18 +41,23 @@ impl Command for InitCommand { false } - async fn run(&mut self) -> Result<(), CLIError> { - if self.workspace_layout.root_dir.is_dir() { - return if self.exists_ok { - if !self.output_config.quiet { - eprintln!("{}", console::style("Workspace already exists").yellow()); - } - Ok(()) - } else { - Err(CLIError::usage_error_from(AlreadyInWorkspace)) - }; + async fn validate_args(&self) -> Result<(), CLIError> { + if !self.workspace_layout.root_dir.is_dir() { + return Ok(()); + } + + if self.exists_ok { + if !self.output_config.quiet { + eprintln!("{}", console::style("Workspace already exists").yellow()); + } + + Ok(()) + } else { + Err(CLIError::usage_error_from(AlreadyInWorkspace)) } + } + async fn run(&mut self) -> Result<(), CLIError> { WorkspaceLayout::create(&self.workspace_layout.root_dir, self.multi_tenant)?; // TODO, write a workspace config diff --git a/src/app/cli/src/commands/system_api_server_run_command.rs b/src/app/cli/src/commands/system_api_server_run_command.rs index 2be8b826bc..3b7cc7ea10 100644 --- a/src/app/cli/src/commands/system_api_server_run_command.rs +++ b/src/app/cli/src/commands/system_api_server_run_command.rs @@ -171,7 +171,6 @@ impl Command for APIServerRunCommand { } } - api_server.pre_run().await.map_err(CLIError::critical)?; api_server.run().await.map_err(CLIError::critical)?; Ok(()) diff --git a/src/app/cli/src/database.rs b/src/app/cli/src/database.rs index 0dc05d901c..43fc54dfd2 100644 --- a/src/app/cli/src/database.rs +++ b/src/app/cli/src/database.rs @@ -7,14 +7,93 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::path::Path; +use std::path::{Path, PathBuf}; use database_common::*; use dill::{Catalog, CatalogBuilder, Component}; use internal_error::{InternalError, ResultIntoInternal}; use secrecy::SecretString; +use tempfile::TempDir; use crate::config::{DatabaseConfig, DatabaseCredentialSourceConfig, RemoteDatabaseConfig}; +use crate::{config, WorkspaceLayout, DEFAULT_MULTI_TENANT_SQLITE_DATABASE_NAME}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub enum AppDatabaseConfig { + /// No settings are specified + None, + /// The user has specified custom database settings + Explicit(DatabaseConfig), + /// No settings are specified, default settings will be used + DefaultMultiTenant(DatabaseConfig), + /// Since there is no workspace, we use a temporary directory to create the + /// database + DefaultMultiTenantInitCommand(DatabaseConfig, OwnedTempPath), +} + +impl AppDatabaseConfig { + pub fn into_inner(self) -> (Option, Option) { + match self { + AppDatabaseConfig::None => (None, None), + AppDatabaseConfig::Explicit(c) | AppDatabaseConfig::DefaultMultiTenant(c) => { + (Some(c), None) + } + AppDatabaseConfig::DefaultMultiTenantInitCommand(c, path) => (Some(c), Some(path)), + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub fn get_app_database_config( + workspace_layout: &WorkspaceLayout, + config: &config::CLIConfig, + multi_tenant_workspace: bool, + init_command: bool, +) -> AppDatabaseConfig { + if let Some(database_config) = config.database.clone() { + return AppDatabaseConfig::Explicit(database_config); + } + + if !multi_tenant_workspace { + // Default for multi-tenant workspace only + return AppDatabaseConfig::None; + }; + + if !init_command { + let database_path = workspace_layout.default_multi_tenant_database_path(); + + return AppDatabaseConfig::DefaultMultiTenant(DatabaseConfig::sqlite(&database_path)); + } + + let temp_dir = tempfile::tempdir().unwrap(); + let database_path = temp_dir + .as_ref() + .join(DEFAULT_MULTI_TENANT_SQLITE_DATABASE_NAME); + let config = DatabaseConfig::sqlite(&database_path); + let temp_database_path = OwnedTempPath::new(database_path, temp_dir); + + AppDatabaseConfig::DefaultMultiTenantInitCommand(config, temp_database_path) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn move_initial_database_to_workspace_if_needed( + workspace_layout: &WorkspaceLayout, + maybe_temp_database_path: Option, +) -> Result<(), std::io::Error> { + if let Some(temp_database_path) = maybe_temp_database_path { + tokio::fs::copy( + temp_database_path.path(), + workspace_layout.default_multi_tenant_database_path(), + ) + .await?; + }; + + Ok(()) +} //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -109,10 +188,10 @@ pub fn configure_in_memory_components(b: &mut CatalogBuilder) { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub fn try_build_db_connection_settings( - raw_db_config: DatabaseConfig, + raw_db_config: &DatabaseConfig, ) -> Option { - fn convert(c: RemoteDatabaseConfig, provider: DatabaseProvider) -> DatabaseConnectionSettings { - DatabaseConnectionSettings::new(provider, c.database_name, c.host, c.port) + fn convert(c: &RemoteDatabaseConfig, provider: DatabaseProvider) -> DatabaseConnectionSettings { + DatabaseConnectionSettings::new(provider, c.database_name.clone(), c.host.clone(), c.port) } match raw_db_config { @@ -156,6 +235,7 @@ pub async fn connect_database_initially(base_catalog: &Catalog) -> Result { SqlitePlugin::catalog_with_connected_pool(base_catalog, &db_connection_settings) + .await .int_err() } } @@ -235,3 +315,24 @@ fn init_database_password_provider(b: &mut CatalogBuilder, raw_db_config: &Datab } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub struct OwnedTempPath { + path: PathBuf, + _temp_dir: TempDir, +} + +impl OwnedTempPath { + pub fn new(path: PathBuf, temp_dir: TempDir) -> Self { + Self { + path, + _temp_dir: temp_dir, + } + } + + pub fn path(&self) -> &Path { + &self.path + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/cli/src/explore/api_server.rs b/src/app/cli/src/explore/api_server.rs index bef0829e76..a097906b92 100644 --- a/src/app/cli/src/explore/api_server.rs +++ b/src/app/cli/src/explore/api_server.rs @@ -25,7 +25,6 @@ use kamu_adapter_http::e2e::e2e_router; use kamu_flow_system_inmem::domain::FlowExecutor; use kamu_task_system_inmem::domain::TaskExecutor; use messaging_outbox::OutboxExecutor; -use time_source::SystemTimeSource; use tokio::sync::Notify; use url::Url; @@ -36,8 +35,7 @@ pub struct APIServer { local_addr: SocketAddr, task_executor: Arc, flow_executor: Arc, - outbox_processor: Arc, - time_source: Arc, + outbox_executor: Arc, maybe_shutdown_notify: Option>, } @@ -57,9 +55,7 @@ impl APIServer { let flow_executor = cli_catalog.get_one().unwrap(); - let outbox_processor = cli_catalog.get_one().unwrap(); - - let time_source = base_catalog.get_one().unwrap(); + let outbox_executor = cli_catalog.get_one().unwrap(); let gql_schema = kamu_adapter_graphql::schema(); @@ -178,8 +174,7 @@ impl APIServer { local_addr, task_executor, flow_executor, - outbox_processor, - time_source, + outbox_executor, maybe_shutdown_notify, }) } @@ -188,13 +183,6 @@ impl APIServer { &self.local_addr } - pub async fn pre_run(&self) -> Result<(), InternalError> { - self.task_executor.pre_run().await?; - self.flow_executor.pre_run(self.time_source.now()).await?; - self.outbox_processor.pre_run().await?; - Ok(()) - } - pub async fn run(self) -> Result<(), InternalError> { let server_run_fut: Pin>> = if let Some(shutdown_notify) = self.maybe_shutdown_notify { @@ -212,7 +200,7 @@ impl APIServer { tokio::select! { res = server_run_fut => { res.int_err() }, - res = self.outbox_processor.run() => { res.int_err() }, + res = self.outbox_executor.run() => { res.int_err() }, res = self.task_executor.run() => { res.int_err() }, res = self.flow_executor.run() => { res.int_err() } } diff --git a/src/app/cli/src/explore/web_ui_server.rs b/src/app/cli/src/explore/web_ui_server.rs index 3820812de9..a7113dc15e 100644 --- a/src/app/cli/src/explore/web_ui_server.rs +++ b/src/app/cli/src/explore/web_ui_server.rs @@ -61,7 +61,9 @@ struct WebUILoginInstructions { #[serde(rename_all = "camelCase")] struct WebUIFeatureFlags { enable_logout: bool, - enable_scheduling: bool, + // TODO: Correct a typo in `WebUIFeatureFlags` + // (content of `assets/runtime-config.json`) + // https://github.com/kamu-data/kamu-cli/issues/841 enable_dataset_env_vars_managment: bool, } @@ -82,7 +84,7 @@ impl WebUIServer { current_account_name: AccountName, predefined_accounts_config: Arc, file_upload_limit_config: Arc, - enable_dataset_env_vars_managment: bool, + enable_dataset_env_vars_management: bool, address: Option, port: Option, ) -> Result { @@ -122,13 +124,16 @@ impl WebUIServer { enable_logout: false, // No way to configure scheduling of datasets enable_scheduling: false, - enable_dataset_env_vars_managment, + // TODO: Correct a typo in `WebUIFeatureFlags` + // (content of `assets/runtime-config.json`) + // https://github.com/kamu-data/kamu-cli/issues/841 + enable_dataset_env_vars_managment: enable_dataset_env_vars_management, }, }; let access_token = Self::acquire_access_token(base_catalog.clone(), &login_instructions) .await - .expect("Token not retreieved"); + .expect("Token not retrieved"); let web_ui_url = Url::parse(&web_ui_url).expect("URL failed to parse"); diff --git a/src/app/cli/src/services/config/models.rs b/src/app/cli/src/services/config/models.rs index a1b1801372..8531a00300 100644 --- a/src/app/cli/src/services/config/models.rs +++ b/src/app/cli/src/services/config/models.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::path::Path; + use container_runtime::{ContainerRuntimeType, NetworkNamespaceType}; use database_common::DatabaseProvider; use duration_string::DurationString; @@ -614,6 +616,12 @@ impl DatabaseConfig { port: Some(DatabaseProvider::Postgres.default_port()), }) } + + pub fn sqlite(database_path: &Path) -> Self { + Self::Sqlite(SqliteDatabaseConfig { + database_path: database_path.to_str().unwrap().into(), + }) + } } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/app/cli/src/services/workspace/workspace_layout.rs b/src/app/cli/src/services/workspace/workspace_layout.rs index 91402fad11..e0c74d9ff2 100644 --- a/src/app/cli/src/services/workspace/workspace_layout.rs +++ b/src/app/cli/src/services/workspace/workspace_layout.rs @@ -15,6 +15,10 @@ use serde::{Deserialize, Serialize}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +pub const DEFAULT_MULTI_TENANT_SQLITE_DATABASE_NAME: &str = "workspace.sqlite.db"; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // TODO: Consider extracting to kamu-cli layer /// Describes the layout of the workspace on disk #[derive(Debug, Clone)] @@ -68,6 +72,11 @@ impl WorkspaceLayout { Ok(ws) } + + pub fn default_multi_tenant_database_path(&self) -> PathBuf { + self.root_dir + .join(DEFAULT_MULTI_TENANT_SQLITE_DATABASE_NAME) + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/cli/tests/tests/test_di_graph.rs b/src/app/cli/tests/tests/test_di_graph.rs index e034db2867..bfd73d1e40 100644 --- a/src/app/cli/tests/tests/test_di_graph.rs +++ b/src/app/cli/tests/tests/test_di_graph.rs @@ -7,16 +7,46 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use dill::*; use kamu::domain::ServerUrlConfig; use kamu_accounts::{CurrentAccountSubject, JwtAuthenticationConfig}; use kamu_adapter_http::AccessToken; use kamu_cli::{self, OutputConfig, WorkspaceLayout}; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_di_cli_graph_validates_st() { + test_di_cli_graph_validates(false); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_di_cli_graph_validates_mt() { + test_di_cli_graph_validates(true); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_di_server_graph_validates_st() { + test_di_server_graph_validates(false); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #[test_log::test(tokio::test)] -async fn test_di_graph_validates() { - let tempdir = tempfile::tempdir().unwrap(); - let workspace_layout = WorkspaceLayout::new(tempdir.path()); +async fn test_di_server_graph_validates_mt() { + test_di_server_graph_validates(true); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Tests +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +fn test_di_cli_graph_validates(multi_tenant_workspace: bool) { + let temp_dir = tempfile::tempdir().unwrap(); + let workspace_layout = WorkspaceLayout::new(temp_dir.path()); let mut base_catalog_builder = kamu_cli::configure_base_catalog(&workspace_layout, false, None, false); kamu_cli::configure_in_memory_components(&mut base_catalog_builder); @@ -25,13 +55,45 @@ async fn test_di_graph_validates() { kamu_cli::register_config_in_catalog( &kamu_cli::config::CLIConfig::default(), &mut base_catalog_builder, - false, + multi_tenant_workspace, ); let base_catalog = base_catalog_builder.build(); - let multi_tenant_workspace = true; let mut cli_catalog_builder = kamu_cli::configure_cli_catalog(&base_catalog, multi_tenant_workspace); + + cli_catalog_builder.add_value(CurrentAccountSubject::new_test()); + cli_catalog_builder.add_value(JwtAuthenticationConfig::default()); + + let validate_result = cli_catalog_builder.validate(); + + assert!( + validate_result.is_ok(), + "{}", + validate_result.err().unwrap() + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +fn test_di_server_graph_validates(multi_tenant_workspace: bool) { + let temp_dir = tempfile::tempdir().unwrap(); + let workspace_layout = WorkspaceLayout::new(temp_dir.path()); + let mut base_catalog_builder = + kamu_cli::configure_base_catalog(&workspace_layout, false, None, false); + kamu_cli::configure_in_memory_components(&mut base_catalog_builder); + base_catalog_builder.add_value(OutputConfig::default()); + + kamu_cli::register_config_in_catalog( + &kamu_cli::config::CLIConfig::default(), + &mut base_catalog_builder, + multi_tenant_workspace, + ); + let base_catalog = base_catalog_builder.build(); + + let mut cli_catalog_builder = + kamu_cli::configure_server_catalog(&base_catalog, multi_tenant_workspace); + cli_catalog_builder.add_value(CurrentAccountSubject::new_test()); cli_catalog_builder.add_value(JwtAuthenticationConfig::default()); cli_catalog_builder.add_value(ServerUrlConfig::new_test(None)); @@ -40,7 +102,7 @@ async fn test_di_graph_validates() { // TODO: We should ensure this test covers parameters requested by commands and // types needed for GQL/HTTP adapter that are currently being constructed // manually - let validate_result = cli_catalog_builder.validate().ignore::(); + let validate_result = cli_catalog_builder.validate(); assert!( validate_result.is_ok(), @@ -48,3 +110,5 @@ async fn test_di_graph_validates() { validate_result.err().unwrap() ); } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/auth-rebac/domain/src/entities/property.rs b/src/domain/auth-rebac/domain/src/entities/property.rs index 7427054c0e..5bf471b60c 100644 --- a/src/domain/auth-rebac/domain/src/entities/property.rs +++ b/src/domain/auth-rebac/domain/src/entities/property.rs @@ -32,12 +32,9 @@ pub enum PropertyName { impl PropertyName { pub fn dataset_allows_anonymous_read<'a>(allows: bool) -> (Self, PropertyValue<'a>) { - let value = if allows { "true" } else { "false" }; + let property = DatasetPropertyName::allows_anonymous_read(allows); - ( - Self::Dataset(DatasetPropertyName::AllowsAnonymousRead), - value.into(), - ) + (Self::Dataset(property.0), property.1) } pub fn dataset_allows_public_read<'a>(allows: bool) -> (Self, PropertyValue<'a>) { @@ -120,6 +117,12 @@ pub enum DatasetPropertyName { } impl DatasetPropertyName { + pub fn allows_anonymous_read<'a>(allows: bool) -> (Self, PropertyValue<'a>) { + let value = if allows { "true" } else { "false" }; + + (DatasetPropertyName::AllowsAnonymousRead, value.into()) + } + pub fn allows_public_read<'a>(allows: bool) -> (Self, PropertyValue<'a>) { let value = if allows { "true" } else { "false" }; diff --git a/src/domain/flow-system/services/tests/tests/utils/task_driver.rs b/src/domain/flow-system/services/tests/tests/utils/task_driver.rs index cf4032f8be..0a0bae2c2f 100644 --- a/src/domain/flow-system/services/tests/tests/utils/task_driver.rs +++ b/src/domain/flow-system/services/tests/tests/utils/task_driver.rs @@ -60,7 +60,7 @@ impl TaskDriver { self.ensure_task_matches_logical_plan().await; // Note: we can omit transaction, since this is a test-only abstraction - // with assummed immediate delivery + // with assumed immediate delivery self.outbox .post_message( MESSAGE_PRODUCER_KAMU_TASK_EXECUTOR, diff --git a/src/domain/task-system/services/src/task_executor_impl.rs b/src/domain/task-system/services/src/task_executor_impl.rs index 5e6a2270f0..2c546e7208 100644 --- a/src/domain/task-system/services/src/task_executor_impl.rs +++ b/src/domain/task-system/services/src/task_executor_impl.rs @@ -67,7 +67,7 @@ impl TaskExecutorImpl { // Total number of running tasks let total_running_tasks = task_event_store.get_count_running_tasks().await?; - // Processe them in pages + // Process them in pages let mut processed_running_tasks = 0; while processed_running_tasks < total_running_tasks { // Load another page diff --git a/src/utils/async-utils/Cargo.toml b/src/utils/async-utils/Cargo.toml new file mode 100644 index 0000000000..7d21bb847b --- /dev/null +++ b/src/utils/async-utils/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "async-utils" +description = "Simple utilities to streamline asynchronous work with std::core::Result" +version = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +readme = { workspace = true } +license-file = { workspace = true } +keywords = { workspace = true } +include = { workspace = true } +edition = { workspace = true } +publish = { workspace = true } + + +[lints] +workspace = true + + +[lib] +doctest = false + + +[dependencies] +async-trait = "0.1" diff --git a/src/utils/async-utils/src/lib.rs b/src/utils/async-utils/src/lib.rs new file mode 100644 index 0000000000..8c3cd8aec1 --- /dev/null +++ b/src/utils/async-utils/src/lib.rs @@ -0,0 +1,34 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[async_trait::async_trait(?Send)] +pub trait ResultAsync { + async fn and_then_async(self, op: F) -> Result + where + F: FnOnce(T) -> FFut, + FFut: std::future::Future>; +} + +#[async_trait::async_trait(?Send)] +impl ResultAsync for Result { + async fn and_then_async(self, op: F) -> Result + where + F: FnOnce(T) -> FFut, + FFut: std::future::Future>, + { + match self { + Ok(val) => op(val).await, + Err(e) => Err(e), + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/utils/database-common/src/plugins/sqlite_plugin.rs b/src/utils/database-common/src/plugins/sqlite_plugin.rs index 102785efc9..f97552418f 100644 --- a/src/utils/database-common/src/plugins/sqlite_plugin.rs +++ b/src/utils/database-common/src/plugins/sqlite_plugin.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use dill::*; +use sqlx::migrate::Migrator; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::SqlitePool; @@ -15,6 +16,10 @@ use crate::*; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +static SQLITE_MIGRATOR: Migrator = sqlx::migrate!("../../../migrations/sqlite"); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct SqlitePlugin {} #[component(pub)] @@ -28,12 +33,17 @@ impl SqlitePlugin { catalog_builder.add::(); } - pub fn catalog_with_connected_pool( + pub async fn catalog_with_connected_pool( base_catalog: &Catalog, db_connection_settings: &DatabaseConnectionSettings, ) -> Result { let sqlite_pool = Self::open_sqlite_pool(db_connection_settings); + SQLITE_MIGRATOR + .run(&sqlite_pool) + .await + .expect("Migration failed"); + Ok(CatalogBuilder::new_chained(base_catalog) .add_value(sqlite_pool) .build()) @@ -41,7 +51,10 @@ impl SqlitePlugin { #[tracing::instrument(level = "info", skip_all)] fn open_sqlite_pool(db_connection_settings: &DatabaseConnectionSettings) -> SqlitePool { - let sqlite_options = SqliteConnectOptions::new().filename(&db_connection_settings.host); + let sqlite_options = SqliteConnectOptions::new() + .filename(&db_connection_settings.host) + .create_if_missing(true); + SqlitePoolOptions::new() .max_connections(1) .connect_lazy_with(sqlite_options) diff --git a/src/utils/messaging-outbox/src/executors/outbox_executor.rs b/src/utils/messaging-outbox/src/executors/outbox_executor.rs index 23ae061af1..52f438c4ef 100644 --- a/src/utils/messaging-outbox/src/executors/outbox_executor.rs +++ b/src/utils/messaging-outbox/src/executors/outbox_executor.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use database_common_macros::{transactional_method1, transactional_method2}; @@ -19,6 +20,14 @@ use crate::*; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +enum RunMode { + SingleIterationOnly, + WhileHasTasks, + MainRelayLoop, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct OutboxExecutor { catalog: Catalog, config: Arc, @@ -92,20 +101,49 @@ impl OutboxExecutor { pub async fn run(&self) -> Result<(), InternalError> { // Main consumption loop - loop { - self.run_consumption_iteration() - .instrument(tracing::debug_span!("OutboxExecutor::tick")) - .await?; + self.run_with_mode(RunMode::MainRelayLoop).await + } - tokio::time::sleep(self.config.awaiting_step.to_std().unwrap()).await; - } + #[tracing::instrument(level = "debug", skip_all)] + pub async fn run_while_has_tasks(&self) -> Result<(), InternalError> { + self.run_with_mode(RunMode::WhileHasTasks).await } // To be used by tests only! #[tracing::instrument(level = "debug", skip_all)] pub async fn run_single_iteration_only(&self) -> Result<(), InternalError> { // Run single iteration instead of a loop - self.run_consumption_iteration().await?; + self.run_with_mode(RunMode::SingleIterationOnly).await + } + + async fn run_with_mode(&self, mode: RunMode) -> Result<(), InternalError> { + match mode { + RunMode::SingleIterationOnly => { + self.run_consumption_iteration().await?; + } + RunMode::WhileHasTasks => loop { + let processed_consumer_tasks_count = self + .run_consumption_iteration() + .instrument(tracing::debug_span!("OutboxExecutor::tick")) + .await?; + + if processed_consumer_tasks_count == 0 { + break; + } + }, + RunMode::MainRelayLoop => { + let loop_delay = self.config.awaiting_step.to_std().unwrap(); + + loop { + self.run_consumption_iteration() + .instrument(tracing::debug_span!("OutboxExecutor::tick")) + .await?; + + tokio::time::sleep(loop_delay).await; + } + } + } + Ok(()) } @@ -150,10 +188,13 @@ impl OutboxExecutor { Ok(()) } - async fn run_consumption_iteration(&self) -> Result<(), InternalError> { + async fn run_consumption_iteration( + &self, + ) -> Result { // Read current state of producers and consumptions // Prepare consumption tasks for each progressed producer let mut consumption_tasks_by_producer = self.prepare_consumption_iteration().await?; + let processed_consumer_tasks_counter = Arc::new(AtomicUsize::new(0)); // Select jobs per consumption task, unless consumers are failing already let mut consumption_job_tasks = Vec::new(); @@ -171,7 +212,11 @@ impl OutboxExecutor { } // Plan task to run during this iteration - consumption_job_tasks.push((producer_consumption_job, producer_consumption_task)); + consumption_job_tasks.push(( + producer_consumption_job, + producer_consumption_task, + processed_consumer_tasks_counter.clone(), + )); } // Run iteration of consumption jobs of each producer concurrently @@ -180,13 +225,19 @@ impl OutboxExecutor { .map(Ok) .try_for_each_concurrent( /* limit */ None, - |(consumption_job, consumption_task)| async move { - consumption_job.run_consumption_task(consumption_task).await + |(consumption_job, consumption_task, processed_consumer_tasks_counter)| async move { + let processed = consumption_job + .run_consumption_task(consumption_task) + .await?; + + processed_consumer_tasks_counter.fetch_add(processed, Ordering::Relaxed); + + Ok(()) }, ) .await?; - Ok(()) + Ok(processed_consumer_tasks_counter.load(Ordering::Relaxed)) } #[transactional_method2( diff --git a/src/utils/messaging-outbox/src/executors/outbox_producer_consumption_job.rs b/src/utils/messaging-outbox/src/executors/outbox_producer_consumption_job.rs index a81ccbc2f6..d45ae84e9c 100644 --- a/src/utils/messaging-outbox/src/executors/outbox_producer_consumption_job.rs +++ b/src/utils/messaging-outbox/src/executors/outbox_producer_consumption_job.rs @@ -27,6 +27,10 @@ use crate::{ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +pub(crate) type ProcessedConsumerTasksCount = usize; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub(crate) struct ProducerConsumptionJob { catalog: Catalog, routes_static_info: Arc, @@ -71,13 +75,15 @@ impl ProducerConsumptionJob { pub(crate) async fn run_consumption_task( &self, consumption_task: ProducerConsumptionTask, - ) -> Result<(), InternalError> { + ) -> Result { // Clone names of failing consumers before this iteration. let mut failing_consumer_names = { let g_failed_consumer_names = self.failed_consumer_names.lock().unwrap(); g_failed_consumer_names.clone() }; + let mut processed_consumer_tasks_count = 0; + // Feed consumers if they are behind this message // We must respect the sequential order of messages, // but individual consumers may process each message concurrently @@ -129,6 +135,8 @@ impl ProducerConsumptionJob { while let Some(res) = join_set.join_next().await { match res.int_err()? { Ok(tx) => { + processed_consumer_tasks_count += 1; + self.metrics .messages_processed_total .with_label_values(&[&tx.message.producer_name, &tx.consumer_name]) @@ -161,7 +169,7 @@ impl ProducerConsumptionJob { let mut g_failed_consumer_names = self.failed_consumer_names.lock().unwrap(); *g_failed_consumer_names = failing_consumer_names; - Ok(()) + Ok(processed_consumer_tasks_count) } }