From e1b8804dd9ebb2ba58f98cf9c1f053fdd4417773 Mon Sep 17 00:00:00 2001 From: Dima Pristupa Date: Tue, 24 Dec 2024 18:00:22 +0200 Subject: [PATCH] GQL, DatasetMetadata: be prepared for not accessed datasets (#1011) --- CHANGELOG.md | 2 + resources/schema.gql | 23 ++- .../src/mutations/dataset_mut/dataset_mut.rs | 14 +- .../src/queries/datasets/dataset_metadata.rs | 142 ++++++++++++++---- 4 files changed, 144 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6481f3e57a..847386e62a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ Recommendation: for ease of reading, use the following order: - E2E: Using the correct account in multi-tenant mode - And also the possibility of set it up - `DatasetOwnershipService`: moved to the `kamu-dataset` crate area & implemented via `DatasetEntryServiceImpl` + - GQL, `DatasetMetadata.currentUpstreamDependencies`: indication if datasets not found/not accessed + - GQL, `DatasetMetadata.currentDownstreamDependencies`: exclude datasets that cannot be accessed ## [0.213.1] - 2024-12-18 ### Fixed diff --git a/resources/schema.gql b/resources/schema.gql index f97ea47e02..2cdce5126e 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -646,7 +646,7 @@ type DatasetMetadata { """ Current upstream dependencies of a dataset """ - currentUpstreamDependencies: [Dataset!]! + currentUpstreamDependencies: [UpstreamDatasetResult!]! """ Current downstream dependencies of a dataset """ @@ -725,7 +725,7 @@ type DatasetMut { """ Set visibility for the dataset """ - setVisibility(visibility: DatasetVisibilityInput!): SetDatasetPropertyResult! + setVisibility(visibility: DatasetVisibilityInput!): SetDatasetVisibilityResult! } scalar DatasetName @@ -1758,11 +1758,11 @@ type SetDataSchema { schema: DataSchema! } -interface SetDatasetPropertyResult { +interface SetDatasetVisibilityResult { message: String! } -type SetDatasetPropertyResultSuccess implements SetDatasetPropertyResult { +type SetDatasetVisibilityResultSuccess implements SetDatasetVisibilityResult { dummy: String message: String! } @@ -1996,6 +1996,21 @@ interface UpdateReadmeResult { message: String! } +interface UpstreamDatasetResult { + message: String! +} + +type UpstreamDatasetResultFound implements UpstreamDatasetResult { + dataset: Dataset! + message: String! +} + +type UpstreamDatasetResultNotFound implements UpstreamDatasetResult { + datasetId: DatasetID! + datasetAlias: DatasetAlias! + message: String! +} + type ViewAccessToken { """ Unique identifier of the access token diff --git a/src/adapter/graphql/src/mutations/dataset_mut/dataset_mut.rs b/src/adapter/graphql/src/mutations/dataset_mut/dataset_mut.rs index 89e378abaf..c27b950e6a 100644 --- a/src/adapter/graphql/src/mutations/dataset_mut/dataset_mut.rs +++ b/src/adapter/graphql/src/mutations/dataset_mut/dataset_mut.rs @@ -162,7 +162,7 @@ impl DatasetMut { &self, ctx: &Context<'_>, visibility: DatasetVisibilityInput, - ) -> Result { + ) -> Result { ensure_account_owns_dataset(ctx, &self.dataset_handle).await?; let rebac_svc = from_catalog_n!(ctx, dyn kamu_auth_rebac::RebacService); @@ -186,7 +186,7 @@ impl DatasetMut { .int_err()?; } - Ok(SetDatasetPropertyResultSuccess::default().into()) + Ok(SetDatasetVisibilityResultSuccess::default().into()) } } @@ -292,20 +292,20 @@ pub enum DatasetVisibilityInput { #[derive(Interface, Debug)] #[graphql(field(name = "message", ty = "String"))] -pub enum SetDatasetPropertyResult { - Success(SetDatasetPropertyResultSuccess), +pub enum SetDatasetVisibilityResult { + Success(SetDatasetVisibilityResultSuccess), } #[derive(SimpleObject, Debug, Default)] #[graphql(complex)] -pub struct SetDatasetPropertyResultSuccess { +pub struct SetDatasetVisibilityResultSuccess { _dummy: Option, } #[ComplexObject] -impl SetDatasetPropertyResultSuccess { +impl SetDatasetVisibilityResultSuccess { async fn message(&self) -> String { - "Updated".to_string() + "Success".to_string() } } diff --git a/src/adapter/graphql/src/queries/datasets/dataset_metadata.rs b/src/adapter/graphql/src/queries/datasets/dataset_metadata.rs index 1d89e5fbe4..427dc000dd 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_metadata.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_metadata.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use chrono::prelude::*; +use kamu_core::auth::{ClassifyByAllowanceResponse, DatasetAction}; use kamu_core::{ self as domain, MetadataChainExt, @@ -25,8 +26,6 @@ use crate::utils::get_dataset; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - pub struct DatasetMetadata { dataset_handle: odf::DatasetHandle, } @@ -81,31 +80,50 @@ impl DatasetMetadata { } } + // TODO: Private Datasets: tests /// Current upstream dependencies of a dataset - async fn current_upstream_dependencies(&self, ctx: &Context<'_>) -> Result> { - let (dependency_graph_service, dataset_registry) = from_catalog_n!( + async fn current_upstream_dependencies( + &self, + ctx: &Context<'_>, + ) -> Result> { + let (dependency_graph_service, dataset_registry, dataset_action_authorizer) = from_catalog_n!( ctx, dyn domain::DependencyGraphService, - dyn domain::DatasetRegistry + dyn domain::DatasetRegistry, + dyn kamu_core::auth::DatasetActionAuthorizer ); - use tokio_stream::StreamExt; - let upstream_dataset_ids: Vec<_> = dependency_graph_service + use futures::{StreamExt, TryStreamExt}; + + let upstream_dataset_handles = dependency_graph_service .get_upstream_dependencies(&self.dataset_handle.id) .await .int_err()? - .collect() - .await; - - let mut upstream = Vec::with_capacity(upstream_dataset_ids.len()); - for upstream_dataset_id in upstream_dataset_ids { - let hdl = dataset_registry - .resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref()) - .await - .int_err()?; + .then(|upstream_dataset_id| { + let dataset_registry = dataset_registry.clone(); + async move { + dataset_registry + .resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref()) + .await + .int_err() + } + }) + .try_collect::>() + .await?; + + let upstream_dataset_handles_len = upstream_dataset_handles.len(); + let ClassifyByAllowanceResponse { + authorized_handles, + unauthorized_handles_with_errors, + } = dataset_action_authorizer + .classify_datasets_by_allowance(upstream_dataset_handles, DatasetAction::Read) + .await?; + + let mut upstream = Vec::with_capacity(upstream_dataset_handles_len); + for hdl in authorized_handles { let maybe_account = Account::from_dataset_alias(ctx, &hdl.alias).await?; if let Some(account) = maybe_account { - upstream.push(Dataset::new(account, hdl)); + upstream.push(UpstreamDatasetResult::found(Dataset::new(account, hdl))); } else { tracing::warn!( "Skipped upstream dataset '{}' with unresolved account", @@ -114,28 +132,51 @@ impl DatasetMetadata { } } + upstream.extend( + unauthorized_handles_with_errors + .into_iter() + .map(|(hdl, _)| UpstreamDatasetResult::not_found(hdl)), + ); + Ok(upstream) } // TODO: Convert to collection + // TODO: Private Datasets: tests /// Current downstream dependencies of a dataset async fn current_downstream_dependencies(&self, ctx: &Context<'_>) -> Result> { - let (dependency_graph_service, dataset_registry) = from_catalog_n!( + let (dependency_graph_service, dataset_registry, dataset_action_authorizer) = from_catalog_n!( ctx, dyn domain::DependencyGraphService, - dyn domain::DatasetRegistry + dyn domain::DatasetRegistry, + dyn kamu_core::auth::DatasetActionAuthorizer ); - use tokio_stream::StreamExt; - let downstream_dataset_ids: Vec<_> = dependency_graph_service + use futures::{StreamExt, TryStreamExt}; + + let downstream_dataset_handles = dependency_graph_service .get_downstream_dependencies(&self.dataset_handle.id) .await .int_err()? - .collect() - .await; - - let mut downstream = Vec::with_capacity(downstream_dataset_ids.len()); - for downstream_dataset_id in downstream_dataset_ids { + .then(|upstream_dataset_id| { + let dataset_registry = dataset_registry.clone(); + async move { + dataset_registry + .resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref()) + .await + .int_err() + } + }) + .try_collect::>() + .await?; + + let authorized_downstream_dataset_ids = dataset_action_authorizer + .classify_datasets_by_allowance(downstream_dataset_handles, DatasetAction::Read) + .await? + .authorized_handles; + + let mut downstream = Vec::with_capacity(authorized_downstream_dataset_ids.len()); + for downstream_dataset_id in authorized_downstream_dataset_ids { let hdl = dataset_registry .resolve_dataset_handle_by_ref(&downstream_dataset_id.as_local_ref()) .await @@ -284,3 +325,52 @@ impl DatasetMetadata { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Interface, Debug, Clone)] +#[graphql(field(name = "message", ty = "String"))] +enum UpstreamDatasetResult { + Found(UpstreamDatasetResultFound), + NotFound(UpstreamDatasetResultNotFound), +} + +impl UpstreamDatasetResult { + pub fn found(dataset: Dataset) -> Self { + Self::Found(UpstreamDatasetResultFound { dataset }) + } + + pub fn not_found(dataset_handle: odf::DatasetHandle) -> Self { + Self::NotFound(UpstreamDatasetResultNotFound { + dataset_id: dataset_handle.id.into(), + dataset_alias: dataset_handle.alias.into(), + }) + } +} + +#[derive(SimpleObject, Debug, Clone)] +#[graphql(complex)] +pub struct UpstreamDatasetResultFound { + pub dataset: Dataset, +} + +#[ComplexObject] +impl UpstreamDatasetResultFound { + async fn message(&self) -> String { + "Found".to_string() + } +} + +#[derive(SimpleObject, Debug, Clone)] +#[graphql(complex)] +pub struct UpstreamDatasetResultNotFound { + pub dataset_id: DatasetID, + pub dataset_alias: DatasetAlias, +} + +#[ComplexObject] +impl UpstreamDatasetResultNotFound { + async fn message(&self) -> String { + "Not found".to_string() + } +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////