diff --git a/resources/schema.gql b/resources/schema.gql index 2cdce5126e..d1c8f134d1 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -646,11 +646,11 @@ type DatasetMetadata { """ Current upstream dependencies of a dataset """ - currentUpstreamDependencies: [UpstreamDatasetResult!]! + currentUpstreamDependencies: [DependencyDatasetResult!]! """ Current downstream dependencies of a dataset """ - currentDownstreamDependencies: [Dataset!]! + currentDownstreamDependencies: [DependencyDatasetResult!]! """ Current polling source used by the root dataset """ @@ -849,6 +849,20 @@ type DeleteResultSuccess implements DeleteResult { message: String! } +interface DependencyDatasetResult { + message: String! +} + +type DependencyDatasetResultFound implements DependencyDatasetResult { + dataset: Dataset! + message: String! +} + +type DependencyDatasetResultNotFound implements DependencyDatasetResult { + datasetId: DatasetID! + message: String! +} + type DisablePollingSource { dummy: String } @@ -1996,21 +2010,6 @@ interface UpdateReadmeResult { message: String! } -interface UpstreamDatasetResult { - message: String! -} - -type UpstreamDatasetResultFound implements UpstreamDatasetResult { - dataset: Dataset! - message: String! -} - -type UpstreamDatasetResultNotFound implements UpstreamDatasetResult { - datasetId: DatasetID! - datasetAlias: DatasetAlias! - message: String! -} - type ViewAccessToken { """ Unique identifier of the access token diff --git a/src/adapter/auth-oso-rebac/src/oso_dataset_authorizer.rs b/src/adapter/auth-oso-rebac/src/oso_dataset_authorizer.rs index 113549ed52..4ef045e30b 100644 --- a/src/adapter/auth-oso-rebac/src/oso_dataset_authorizer.rs +++ b/src/adapter/auth-oso-rebac/src/oso_dataset_authorizer.rs @@ -235,6 +235,50 @@ impl DatasetActionAuthorizer for OsoDatasetAuthorizer { unauthorized_handles_with_errors: unmatched_results, }) } + + async fn classify_dataset_ids_by_allowance( + &self, + dataset_ids: Vec, + action: DatasetAction, + ) -> Result { + let user_actor = self.user_actor().await?; + let mut authorized_ids = Vec::with_capacity(dataset_ids.len()); + let mut unauthorized_ids_with_errors = Vec::new(); + + let dataset_resources_resolution = self + .oso_resource_service + .get_multiple_dataset_resources(&dataset_ids) + .await + .int_err()?; + + for (dataset_id, dataset_resource) in dataset_resources_resolution.resolved_resources { + let is_allowed = self + .kamu_auth_oso + .is_allowed(user_actor.clone(), action, dataset_resource) + .int_err()?; + + if is_allowed { + authorized_ids.push(dataset_id); + } else { + let dataset_ref = dataset_id.as_local_ref(); + unauthorized_ids_with_errors.push(( + dataset_id, + DatasetActionUnauthorizedError::Access(AccessError::Forbidden( + DatasetActionNotEnoughPermissionsError { + action, + dataset_ref, + } + .into(), + )), + )); + } + } + + Ok(ClassifyByAllowanceIdsResponse { + authorized_ids, + unauthorized_ids_with_errors, + }) + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/Cargo.toml b/src/adapter/graphql/Cargo.toml index dd40ef1df9..b871b0fd6e 100644 --- a/src/adapter/graphql/Cargo.toml +++ b/src/adapter/graphql/Cargo.toml @@ -48,7 +48,9 @@ datafusion = { version = "43", default-features = false, features = [ "serde", ] } # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core dill = "0.9" -futures = "0.3" +futures = { version = "0.3", default-features = false, features = [ + "alloc" +] } secrecy = "0.10" serde = { version = "1", default-features = false } serde_json = "1" diff --git a/src/adapter/graphql/src/queries/datasets/dataset_metadata.rs b/src/adapter/graphql/src/queries/datasets/dataset_metadata.rs index 427dc000dd..fe0c4723a5 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_metadata.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_metadata.rs @@ -7,8 +7,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashSet; + use chrono::prelude::*; -use kamu_core::auth::{ClassifyByAllowanceResponse, DatasetAction}; +use kamu_accounts::AccountService; +use kamu_core::auth::{ClassifyByAllowanceIdsResponse, DatasetAction}; use kamu_core::{ self as domain, MetadataChainExt, @@ -17,6 +20,7 @@ use kamu_core::{ SearchSetLicenseVisitor, SearchSetVocabVisitor, }; +use kamu_datasets::DatasetEntriesResolution; use opendatafabric as odf; use crate::prelude::*; @@ -85,114 +89,181 @@ impl DatasetMetadata { async fn current_upstream_dependencies( &self, ctx: &Context<'_>, - ) -> Result> { - let (dependency_graph_service, dataset_registry, dataset_action_authorizer) = from_catalog_n!( + ) -> Result> { + let ( + dependency_graph_service, + dataset_action_authorizer, + dataset_entry_repository, + account_service, + ) = from_catalog_n!( ctx, dyn domain::DependencyGraphService, - dyn domain::DatasetRegistry, - dyn kamu_core::auth::DatasetActionAuthorizer + dyn kamu_core::auth::DatasetActionAuthorizer, + dyn kamu_datasets::DatasetEntryRepository, + dyn AccountService ); - use futures::{StreamExt, TryStreamExt}; + use tokio_stream::StreamExt; - let upstream_dataset_handles = dependency_graph_service + // TODO: PERF: chunk the stream + let upstream_dependency_ids = dependency_graph_service .get_upstream_dependencies(&self.dataset_handle.id) .await .int_err()? - .then(|upstream_dataset_id| { - let dataset_registry = dataset_registry.clone(); - async move { - dataset_registry - .resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref()) - .await - .int_err() - } - }) - .try_collect::>() - .await?; + .collect::>() + .await; - let upstream_dataset_handles_len = upstream_dataset_handles.len(); - let ClassifyByAllowanceResponse { - authorized_handles, - unauthorized_handles_with_errors, + let mut upstream_dependencies = Vec::with_capacity(upstream_dependency_ids.len()); + + let ClassifyByAllowanceIdsResponse { + authorized_ids, + unauthorized_ids_with_errors, } = dataset_action_authorizer - .classify_datasets_by_allowance(upstream_dataset_handles, DatasetAction::Read) + .classify_dataset_ids_by_allowance(upstream_dependency_ids, DatasetAction::Read) .await?; - let mut upstream = Vec::with_capacity(upstream_dataset_handles_len); - for hdl in authorized_handles { - let maybe_account = Account::from_dataset_alias(ctx, &hdl.alias).await?; + upstream_dependencies.extend(unauthorized_ids_with_errors.into_iter().map( + |(not_found_dataset_id, _)| DependencyDatasetResult::not_found(not_found_dataset_id), + )); + + let DatasetEntriesResolution { + resolved_entries, + unresolved_entries, + } = dataset_entry_repository + .get_multiple_dataset_entries(&authorized_ids) + .await + .int_err()?; + + upstream_dependencies.extend( + unresolved_entries + .into_iter() + .map(DependencyDatasetResult::not_found), + ); + + let owner_ids = resolved_entries + .iter() + .fold(HashSet::new(), |mut acc, entry| { + acc.insert(entry.owner_id.clone()); + acc + }); + let account_map = account_service + .get_account_map(owner_ids.into_iter().collect()) + .await + .int_err()?; + + for dataset_entry in resolved_entries { + let maybe_account = account_map.get(&dataset_entry.owner_id); + if let Some(account) = maybe_account { - upstream.push(UpstreamDatasetResult::found(Dataset::new(account, hdl))); + let dataset_handle = odf::DatasetHandle { + id: dataset_entry.id, + alias: odf::DatasetAlias::new( + Some(account.account_name.clone()), + dataset_entry.name, + ), + }; + let dataset = Dataset::new(Account::from_account(account.clone()), dataset_handle); + + upstream_dependencies.push(DependencyDatasetResult::found(dataset)); } else { tracing::warn!( - "Skipped upstream dataset '{}' with unresolved account", - hdl.alias + "Upstream owner's account not found for dataset: {:?}", + &dataset_entry ); + upstream_dependencies.push(DependencyDatasetResult::not_found(dataset_entry.id)); } } - upstream.extend( - unauthorized_handles_with_errors - .into_iter() - .map(|(hdl, _)| UpstreamDatasetResult::not_found(hdl)), - ); - - Ok(upstream) + Ok(upstream_dependencies) } // TODO: Convert to collection // TODO: Private Datasets: tests /// Current downstream dependencies of a dataset - async fn current_downstream_dependencies(&self, ctx: &Context<'_>) -> Result> { - let (dependency_graph_service, dataset_registry, dataset_action_authorizer) = from_catalog_n!( + async fn current_downstream_dependencies( + &self, + ctx: &Context<'_>, + ) -> Result> { + let ( + dependency_graph_service, + dataset_action_authorizer, + dataset_entry_repository, + account_service, + ) = from_catalog_n!( ctx, dyn domain::DependencyGraphService, - dyn domain::DatasetRegistry, - dyn kamu_core::auth::DatasetActionAuthorizer + dyn kamu_core::auth::DatasetActionAuthorizer, + dyn kamu_datasets::DatasetEntryRepository, + dyn AccountService ); - use futures::{StreamExt, TryStreamExt}; + use tokio_stream::StreamExt; - let downstream_dataset_handles = dependency_graph_service + // TODO: PERF: chunk the stream + let downstream_dependency_ids = dependency_graph_service .get_downstream_dependencies(&self.dataset_handle.id) .await .int_err()? - .then(|upstream_dataset_id| { - let dataset_registry = dataset_registry.clone(); - async move { - dataset_registry - .resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref()) - .await - .int_err() - } - }) - .try_collect::>() - .await?; + .collect::>() + .await; - let authorized_downstream_dataset_ids = dataset_action_authorizer - .classify_datasets_by_allowance(downstream_dataset_handles, DatasetAction::Read) + let mut downstream_dependencies = Vec::with_capacity(downstream_dependency_ids.len()); + + // Cut off datasets that we don't have access to + let authorized_ids = dataset_action_authorizer + .classify_dataset_ids_by_allowance(downstream_dependency_ids, DatasetAction::Read) .await? - .authorized_handles; - - let mut downstream = Vec::with_capacity(authorized_downstream_dataset_ids.len()); - for downstream_dataset_id in authorized_downstream_dataset_ids { - let hdl = dataset_registry - .resolve_dataset_handle_by_ref(&downstream_dataset_id.as_local_ref()) - .await - .int_err()?; - let maybe_account = Account::from_dataset_alias(ctx, &hdl.alias).await?; + .authorized_ids; + + let DatasetEntriesResolution { + resolved_entries, + unresolved_entries, + } = dataset_entry_repository + .get_multiple_dataset_entries(&authorized_ids) + .await + .int_err()?; + + downstream_dependencies.extend( + unresolved_entries + .into_iter() + .map(DependencyDatasetResult::not_found), + ); + + let owner_ids = resolved_entries + .iter() + .fold(HashSet::new(), |mut acc, entry| { + acc.insert(entry.owner_id.clone()); + acc + }); + let account_map = account_service + .get_account_map(owner_ids.into_iter().collect()) + .await + .int_err()?; + + for dataset_entry in resolved_entries { + let maybe_account = account_map.get(&dataset_entry.owner_id); + if let Some(account) = maybe_account { - downstream.push(Dataset::new(account, hdl)); + let dataset_handle = odf::DatasetHandle { + id: dataset_entry.id, + alias: odf::DatasetAlias::new( + Some(account.account_name.clone()), + dataset_entry.name, + ), + }; + let dataset = Dataset::new(Account::from_account(account.clone()), dataset_handle); + + downstream_dependencies.push(DependencyDatasetResult::found(dataset)); } else { tracing::warn!( - "Skipped downstream dataset '{}' with unresolved account", - hdl.alias + "Downstream owner's account not found for dataset: {:?}", + &dataset_entry ); + downstream_dependencies.push(DependencyDatasetResult::not_found(dataset_entry.id)); } } - Ok(downstream) + Ok(downstream_dependencies) } /// Current polling source used by the root dataset @@ -328,32 +399,31 @@ impl DatasetMetadata { #[derive(Interface, Debug, Clone)] #[graphql(field(name = "message", ty = "String"))] -enum UpstreamDatasetResult { - Found(UpstreamDatasetResultFound), - NotFound(UpstreamDatasetResultNotFound), +enum DependencyDatasetResult { + Found(DependencyDatasetResultFound), + NotFound(DependencyDatasetResultNotFound), } -impl UpstreamDatasetResult { +impl DependencyDatasetResult { pub fn found(dataset: Dataset) -> Self { - Self::Found(UpstreamDatasetResultFound { dataset }) + Self::Found(DependencyDatasetResultFound { dataset }) } - pub fn not_found(dataset_handle: odf::DatasetHandle) -> Self { - Self::NotFound(UpstreamDatasetResultNotFound { - dataset_id: dataset_handle.id.into(), - dataset_alias: dataset_handle.alias.into(), + pub fn not_found(dataset_id: odf::DatasetID) -> Self { + Self::NotFound(DependencyDatasetResultNotFound { + dataset_id: dataset_id.into(), }) } } #[derive(SimpleObject, Debug, Clone)] #[graphql(complex)] -pub struct UpstreamDatasetResultFound { +pub struct DependencyDatasetResultFound { pub dataset: Dataset, } #[ComplexObject] -impl UpstreamDatasetResultFound { +impl DependencyDatasetResultFound { async fn message(&self) -> String { "Found".to_string() } @@ -361,13 +431,12 @@ impl UpstreamDatasetResultFound { #[derive(SimpleObject, Debug, Clone)] #[graphql(complex)] -pub struct UpstreamDatasetResultNotFound { +pub struct DependencyDatasetResultNotFound { pub dataset_id: DatasetID, - pub dataset_alias: DatasetAlias, } #[ComplexObject] -impl UpstreamDatasetResultNotFound { +impl DependencyDatasetResultNotFound { async fn message(&self) -> String { "Not found".to_string() } diff --git a/src/domain/accounts/domain/src/services/account_service.rs b/src/domain/accounts/domain/src/services/account_service.rs index 8bd6800272..fedd5b13cc 100644 --- a/src/domain/accounts/domain/src/services/account_service.rs +++ b/src/domain/accounts/domain/src/services/account_service.rs @@ -7,14 +7,18 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; + use database_common::{EntityPageListing, PaginationOpts}; use internal_error::InternalError; +use opendatafabric as odf; use thiserror::Error; use crate::{Account, AccountPageStream}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TODO: Private Datasets: merge with AuthenticationService? // TODO: Private Datasets: tests #[async_trait::async_trait] pub trait AccountService: Sync + Send { @@ -25,6 +29,11 @@ pub trait AccountService: Sync + Send { &self, pagination: PaginationOpts, ) -> Result, ListAccountError>; + + async fn get_account_map( + &self, + account_ids: Vec, + ) -> Result, GetAccountMapError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -38,3 +47,11 @@ pub enum ListAccountError { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Error)] +pub enum GetAccountMapError { + #[error(transparent)] + Internal(#[from] InternalError), +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/accounts/services/src/account_service_impl.rs b/src/domain/accounts/services/src/account_service_impl.rs index c1bd8b1682..f4ef33913c 100644 --- a/src/domain/accounts/services/src/account_service_impl.rs +++ b/src/domain/accounts/services/src/account_service_impl.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; use std::sync::Arc; use database_common::{EntityPageListing, EntityPageStreamer, PaginationOpts}; @@ -17,8 +18,11 @@ use kamu_accounts::{ AccountPageStream, AccountRepository, AccountService, + GetAccountByIdError, + GetAccountMapError, ListAccountError, }; +use opendatafabric as odf; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -67,6 +71,30 @@ impl AccountService for AccountServiceImpl { total_count, }) } + + async fn get_account_map( + &self, + account_ids: Vec, + ) -> Result, GetAccountMapError> { + let account_map = match self.account_repo.get_accounts_by_ids(account_ids).await { + Ok(accounts) => { + let map = accounts + .into_iter() + .fold(HashMap::new(), |mut acc, account| { + acc.insert(account.id.clone(), account); + acc + }); + Ok(map) + } + Err(err) => match err { + GetAccountByIdError::NotFound(_) => Ok(HashMap::new()), + e => Err(e), + }, + } + .int_err()?; + + Ok(account_map) + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/auth/dataset_action_authorizer.rs b/src/domain/core/src/auth/dataset_action_authorizer.rs index cc452af02e..bbcfbe1b22 100644 --- a/src/domain/core/src/auth/dataset_action_authorizer.rs +++ b/src/domain/core/src/auth/dataset_action_authorizer.rs @@ -23,7 +23,7 @@ use crate::AccessError; pub trait DatasetActionAuthorizer: Sync + Send { async fn check_action_allowed( &self, - // TODO: Private Datasets: use odf::DatasetID, here and below + // TODO: Private Datasets: migrate to use odf::DatasetID, here and below dataset_handle: &odf::DatasetHandle, action: DatasetAction, ) -> Result<(), DatasetActionUnauthorizedError>; @@ -60,6 +60,15 @@ pub trait DatasetActionAuthorizer: Sync + Send { dataset_handles: Vec, action: DatasetAction, ) -> Result; + + // TODO: Private Datasets: tests + // TODO: Private Datasets: use classify_datasets_by_allowance() name + // after migration + async fn classify_dataset_ids_by_allowance( + &self, + dataset_ids: Vec, + action: DatasetAction, + ) -> Result; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -144,6 +153,16 @@ pub struct ClassifyByAllowanceResponse { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TODO: Private Datasets: use classify_datasets_by_allowance() name +// after migration +#[derive(Debug)] +pub struct ClassifyByAllowanceIdsResponse { + pub authorized_ids: Vec, + pub unauthorized_ids_with_errors: Vec<(odf::DatasetID, DatasetActionUnauthorizedError)>, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #[component(pub)] #[interface(dyn DatasetActionAuthorizer)] pub struct AlwaysHappyDatasetActionAuthorizer {} @@ -190,6 +209,17 @@ impl DatasetActionAuthorizer for AlwaysHappyDatasetActionAuthorizer { unauthorized_handles_with_errors: vec![], }) } + + async fn classify_dataset_ids_by_allowance( + &self, + dataset_ids: Vec, + _action: DatasetAction, + ) -> Result { + Ok(ClassifyByAllowanceIdsResponse { + authorized_ids: dataset_ids, + unauthorized_ids_with_errors: vec![], + }) + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/core/src/testing/mock_dataset_action_authorizer.rs b/src/infra/core/src/testing/mock_dataset_action_authorizer.rs index e7baae4226..5713c1250a 100644 --- a/src/infra/core/src/testing/mock_dataset_action_authorizer.rs +++ b/src/infra/core/src/testing/mock_dataset_action_authorizer.rs @@ -11,7 +11,7 @@ use std::collections::HashSet; use internal_error::InternalError; use kamu_core::auth::{ - self, + ClassifyByAllowanceIdsResponse, ClassifyByAllowanceResponse, DatasetAction, DatasetActionAuthorizer, @@ -21,7 +21,7 @@ use kamu_core::auth::{ use kamu_core::AccessError; use mockall::predicate::{always, eq, function}; use mockall::Predicate; -use opendatafabric::{DatasetAlias, DatasetHandle}; +use opendatafabric as odf; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -32,32 +32,38 @@ mockall::mock! { impl DatasetActionAuthorizer for DatasetActionAuthorizer { async fn check_action_allowed( &self, - dataset_handle: &DatasetHandle, + dataset_handle: &odf::DatasetHandle, action: DatasetAction, ) -> Result<(), DatasetActionUnauthorizedError>; async fn get_allowed_actions( &self, - dataset_handle: &DatasetHandle, + dataset_handle: &odf::DatasetHandle, ) -> Result, InternalError>; async fn filter_datasets_allowing( &self, - dataset_handles: Vec, + dataset_handles: Vec, action: DatasetAction, - ) -> Result, InternalError>; + ) -> Result, InternalError>; async fn classify_datasets_by_allowance( &self, - dataset_handles: Vec, + dataset_handles: Vec, action: DatasetAction, ) -> Result; + + async fn classify_dataset_ids_by_allowance( + &self, + dataset_ids: Vec, + action: DatasetAction, + ) -> Result; } } impl MockDatasetActionAuthorizer { pub fn denying_error( - dataset_handle: &DatasetHandle, + dataset_handle: &odf::DatasetHandle, action: DatasetAction, ) -> DatasetActionUnauthorizedError { DatasetActionUnauthorizedError::Access(AccessError::Forbidden( @@ -87,13 +93,13 @@ impl MockDatasetActionAuthorizer { pub fn expect_check_read_dataset( self, - dataset_alias: &DatasetAlias, + dataset_alias: &odf::DatasetAlias, times: usize, success: bool, ) -> Self { let dataset_alias = dataset_alias.clone(); self.expect_check_action_allowed_internal( - function(move |dh: &DatasetHandle| dh.alias == dataset_alias), + function(move |dh: &odf::DatasetHandle| dh.alias == dataset_alias), DatasetAction::Read, times, success, @@ -102,13 +108,13 @@ impl MockDatasetActionAuthorizer { pub fn expect_check_write_dataset( self, - dataset_alias: &DatasetAlias, + dataset_alias: &odf::DatasetAlias, times: usize, success: bool, ) -> Self { let dataset_alias = dataset_alias.clone(); self.expect_check_action_allowed_internal( - function(move |dh: &DatasetHandle| dh.alias == dataset_alias), + function(move |dh: &odf::DatasetHandle| dh.alias == dataset_alias), DatasetAction::Write, times, success, @@ -126,12 +132,12 @@ impl MockDatasetActionAuthorizer { fn expect_check_action_allowed_internal

( mut self, dataset_handle_predicate: P, - action: auth::DatasetAction, + action: DatasetAction, times: usize, success: bool, ) -> Self where - P: Predicate + Sync + Send + 'static, + P: Predicate + Sync + Send + 'static, { if times > 0 { self.expect_check_action_allowed() @@ -155,9 +161,9 @@ impl MockDatasetActionAuthorizer { pub fn make_expect_classify_datasets_by_allowance( mut self, - action: auth::DatasetAction, + action: DatasetAction, times: usize, - authorized: HashSet, + authorized: HashSet, ) -> Self { self.expect_classify_datasets_by_allowance() .with(always(), eq(action))