Skip to content

Commit

Permalink
854 persistent storage of dataset dependencies graph (#973)
Browse files Browse the repository at this point in the history
Dependency graph service moved to 'datasets' domain.
Defined dataset dependency repository interface and created 3 implementations.
No more postponed initialization, organized initial setup in the form of an indexer.
Added telemetry extensions on the way.
Tests for repositories, stabilized other tests.
Cascading effect on delete within the dataset entry domain.
  • Loading branch information
zaychenko-sergei authored Dec 2, 2024
1 parent 9bbce44 commit 5f42df4
Show file tree
Hide file tree
Showing 94 changed files with 2,273 additions and 528 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Changed
- Dataset dependency graph is now backed with a database, removing need in dependendency scanning at startup.

## [0.210.0] - 2024-11-28
### Added
- Console warning when deleting datasets which are out of sync with their push remotes
Expand Down
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions migrations/postgres/20241125193114_dataset_dependencies.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* ------------------------------ */

CREATE TABLE dataset_dependencies
(
upstream_dataset_id VARCHAR(100) NOT NULL REFERENCES dataset_entries(dataset_id) ON DELETE CASCADE,
downstream_dataset_id VARCHAR(100) NOT NULL REFERENCES dataset_entries(dataset_id) ON DELETE CASCADE
);

CREATE UNIQUE INDEX idx_dataset_dependencies
ON dataset_dependencies (upstream_dataset_id, downstream_dataset_id);

CREATE INDEX idx_dataset_dependencies_upstream_dataset_id
ON dataset_dependencies(upstream_dataset_id);

CREATE INDEX idx_dataset_dependencies_downstream_dataset_id
ON dataset_dependencies (downstream_dataset_id);

/* ------------------------------ */
18 changes: 18 additions & 0 deletions migrations/sqlite/20241125192943_dataset_dependencies.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* ------------------------------ */

CREATE TABLE dataset_dependencies
(
upstream_dataset_id VARCHAR(100) NOT NULL REFERENCES dataset_entries(dataset_id) ON DELETE CASCADE,
downstream_dataset_id VARCHAR(100) NOT NULL REFERENCES dataset_entries(dataset_id) ON DELETE CASCADE
);

CREATE UNIQUE INDEX idx_dataset_dependencies
ON dataset_dependencies (upstream_dataset_id, downstream_dataset_id);

CREATE INDEX idx_dataset_dependencies_upstream_dataset_id
ON dataset_dependencies(upstream_dataset_id);

CREATE INDEX idx_dataset_dependencies_downstream_dataset_id
ON dataset_dependencies (downstream_dataset_id);

/* ------------------------------ */
11 changes: 4 additions & 7 deletions src/adapter/graphql/tests/tests/test_gql_account_flow_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use kamu::testing::{
MetadataFactory,
MockDatasetActionAuthorizer,
MockDatasetChangesService,
MockDependencyGraphRepository,
MockPollingIngestService,
MockTransformRequestPlanner,
};
Expand All @@ -29,12 +28,13 @@ use kamu::{
DatasetRegistryRepoBridge,
DatasetRepositoryLocalFs,
DatasetRepositoryWriter,
DependencyGraphServiceInMemory,
};
use kamu_accounts::{JwtAuthenticationConfig, DEFAULT_ACCOUNT_NAME, DEFAULT_ACCOUNT_NAME_STR};
use kamu_accounts_inmem::InMemoryAccessTokenRepository;
use kamu_accounts_services::{AccessTokenServiceImpl, AuthenticationServiceImpl};
use kamu_core::*;
use kamu_datasets_inmem::InMemoryDatasetDependencyRepository;
use kamu_datasets_services::DependencyGraphServiceImpl;
use kamu_flow_system::FlowExecutorConfig;
use kamu_flow_system_inmem::{InMemoryFlowConfigurationEventStore, InMemoryFlowEventStore};
use kamu_task_system_inmem::InMemoryTaskEventStore;
Expand Down Expand Up @@ -630,7 +630,6 @@ struct FlowConfigHarness {

#[derive(Default)]
struct FlowRunsHarnessOverrides {
dependency_graph_mock: Option<MockDependencyGraphRepository>,
dataset_changes_mock: Option<MockDatasetChangesService>,
transform_planner_mock: Option<MockTransformRequestPlanner>,
polling_service_mock: Option<MockPollingIngestService>,
Expand All @@ -644,7 +643,6 @@ impl FlowConfigHarness {
std::fs::create_dir(&datasets_dir).unwrap();

let dataset_changes_mock = overrides.dataset_changes_mock.unwrap_or_default();
let dependency_graph_mock = overrides.dependency_graph_mock.unwrap_or_default();
let transform_planner_mock = overrides.transform_planner_mock.unwrap_or_default();
let polling_service_mock = overrides.polling_service_mock.unwrap_or_default();
let mock_dataset_action_authorizer =
Expand Down Expand Up @@ -673,9 +671,8 @@ impl FlowConfigHarness {
.add::<InMemoryAccessTokenRepository>()
.add_value(JwtAuthenticationConfig::default())
.bind::<dyn kamu::domain::auth::DatasetActionAuthorizer, MockDatasetActionAuthorizer>()
.add::<DependencyGraphServiceInMemory>()
.add_value(dependency_graph_mock)
.bind::<dyn DependencyGraphRepository, MockDependencyGraphRepository>()
.add::<DependencyGraphServiceImpl>()
.add::<InMemoryDatasetDependencyRepository>()
.add::<InMemoryFlowConfigurationEventStore>()
.add::<InMemoryFlowEventStore>()
.add_value(FlowExecutorConfig::new(
Expand Down
5 changes: 4 additions & 1 deletion src/adapter/graphql/tests/tests/test_gql_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use kamu_accounts_services::{
PredefinedAccountsRegistrator,
};
use kamu_core::*;
use kamu_datasets_inmem::InMemoryDatasetDependencyRepository;
use kamu_datasets_services::DependencyGraphServiceImpl;
use messaging_outbox::DummyOutboxImpl;
use opendatafabric::*;
use serde_json::json;
Expand Down Expand Up @@ -56,7 +58,8 @@ async fn create_catalog_with_local_workspace(
let catalog = {
let mut b = dill::CatalogBuilder::new();

b.add::<DependencyGraphServiceInMemory>()
b.add::<DependencyGraphServiceImpl>()
.add::<InMemoryDatasetDependencyRepository>()
.add_value(current_account_subject)
.add_value(predefined_accounts_config)
.add_value(tenancy_config)
Expand Down
8 changes: 4 additions & 4 deletions src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use kamu::{
DatasetRegistryRepoBridge,
DatasetRepositoryLocalFs,
DatasetRepositoryWriter,
DependencyGraphServiceInMemory,
};
use kamu_core::{
auth,
Expand All @@ -27,8 +26,8 @@ use kamu_core::{
TenancyConfig,
};
use kamu_datasets::DatasetEnvVarsConfig;
use kamu_datasets_inmem::InMemoryDatasetEnvVarRepository;
use kamu_datasets_services::DatasetEnvVarServiceImpl;
use kamu_datasets_inmem::{InMemoryDatasetDependencyRepository, InMemoryDatasetEnvVarRepository};
use kamu_datasets_services::{DatasetEnvVarServiceImpl, DependencyGraphServiceImpl};
use messaging_outbox::DummyOutboxImpl;
use opendatafabric::DatasetKind;
use time_source::SystemTimeSourceDefault;
Expand Down Expand Up @@ -360,7 +359,8 @@ impl DatasetEnvVarsHarness {
.add::<CreateDatasetFromSnapshotUseCaseImpl>()
.add::<SystemTimeSourceDefault>()
.add::<auth::AlwaysHappyDatasetActionAuthorizer>()
.add::<DependencyGraphServiceInMemory>()
.add::<DependencyGraphServiceImpl>()
.add::<InMemoryDatasetDependencyRepository>()
.add::<DatabaseTransactionRunner>()
.add::<DatasetEnvVarServiceImpl>()
.add::<InMemoryDatasetEnvVarRepository>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use kamu::{
DatasetRegistryRepoBridge,
DatasetRepositoryLocalFs,
DatasetRepositoryWriter,
DependencyGraphServiceInMemory,
};
use kamu_core::{
auth,
Expand All @@ -28,6 +27,8 @@ use kamu_core::{
TenancyConfig,
TransformRequestPlanner,
};
use kamu_datasets_inmem::InMemoryDatasetDependencyRepository;
use kamu_datasets_services::DependencyGraphServiceImpl;
use kamu_flow_system_inmem::InMemoryFlowConfigurationEventStore;
use kamu_flow_system_services::FlowConfigurationServiceImpl;
use messaging_outbox::DummyOutboxImpl;
Expand Down Expand Up @@ -1637,7 +1638,8 @@ impl FlowConfigHarness {
.add_value(transform_planner_mock)
.bind::<dyn TransformRequestPlanner, MockTransformRequestPlanner>()
.add::<auth::AlwaysHappyDatasetActionAuthorizer>()
.add::<DependencyGraphServiceInMemory>()
.add::<DependencyGraphServiceImpl>()
.add::<InMemoryDatasetDependencyRepository>()
.add::<FlowConfigurationServiceImpl>()
.add::<InMemoryFlowConfigurationEventStore>()
.add::<DatabaseTransactionRunner>();
Expand Down
Loading

0 comments on commit 5f42df4

Please sign in to comment.