Skip to content

Commit

Permalink
GQL, DatasetMetadata: be prepared for not accessed datasets (#1011)
Browse files Browse the repository at this point in the history
  • Loading branch information
s373r authored Dec 24, 2024
1 parent 301b5c1 commit e1b8804
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Recommendation: for ease of reading, use the following order:
- E2E: Using the correct account in multi-tenant mode
- And also the possibility of set it up
- `DatasetOwnershipService`: moved to the `kamu-dataset` crate area & implemented via `DatasetEntryServiceImpl`
- GQL, `DatasetMetadata.currentUpstreamDependencies`: indication if datasets not found/not accessed
- GQL, `DatasetMetadata.currentDownstreamDependencies`: exclude datasets that cannot be accessed

## [0.213.1] - 2024-12-18
### Fixed
Expand Down
23 changes: 19 additions & 4 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ type DatasetMetadata {
"""
Current upstream dependencies of a dataset
"""
currentUpstreamDependencies: [Dataset!]!
currentUpstreamDependencies: [UpstreamDatasetResult!]!
"""
Current downstream dependencies of a dataset
"""
Expand Down Expand Up @@ -725,7 +725,7 @@ type DatasetMut {
"""
Set visibility for the dataset
"""
setVisibility(visibility: DatasetVisibilityInput!): SetDatasetPropertyResult!
setVisibility(visibility: DatasetVisibilityInput!): SetDatasetVisibilityResult!
}

scalar DatasetName
Expand Down Expand Up @@ -1758,11 +1758,11 @@ type SetDataSchema {
schema: DataSchema!
}

interface SetDatasetPropertyResult {
interface SetDatasetVisibilityResult {
message: String!
}

type SetDatasetPropertyResultSuccess implements SetDatasetPropertyResult {
type SetDatasetVisibilityResultSuccess implements SetDatasetVisibilityResult {
dummy: String
message: String!
}
Expand Down Expand Up @@ -1996,6 +1996,21 @@ interface UpdateReadmeResult {
message: String!
}

interface UpstreamDatasetResult {
message: String!
}

type UpstreamDatasetResultFound implements UpstreamDatasetResult {
dataset: Dataset!
message: String!
}

type UpstreamDatasetResultNotFound implements UpstreamDatasetResult {
datasetId: DatasetID!
datasetAlias: DatasetAlias!
message: String!
}

type ViewAccessToken {
"""
Unique identifier of the access token
Expand Down
14 changes: 7 additions & 7 deletions src/adapter/graphql/src/mutations/dataset_mut/dataset_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl DatasetMut {
&self,
ctx: &Context<'_>,
visibility: DatasetVisibilityInput,
) -> Result<SetDatasetPropertyResult> {
) -> Result<SetDatasetVisibilityResult> {
ensure_account_owns_dataset(ctx, &self.dataset_handle).await?;

let rebac_svc = from_catalog_n!(ctx, dyn kamu_auth_rebac::RebacService);
Expand All @@ -186,7 +186,7 @@ impl DatasetMut {
.int_err()?;
}

Ok(SetDatasetPropertyResultSuccess::default().into())
Ok(SetDatasetVisibilityResultSuccess::default().into())
}
}

Expand Down Expand Up @@ -292,20 +292,20 @@ pub enum DatasetVisibilityInput {

#[derive(Interface, Debug)]
#[graphql(field(name = "message", ty = "String"))]
pub enum SetDatasetPropertyResult {
Success(SetDatasetPropertyResultSuccess),
pub enum SetDatasetVisibilityResult {
Success(SetDatasetVisibilityResultSuccess),
}

#[derive(SimpleObject, Debug, Default)]
#[graphql(complex)]
pub struct SetDatasetPropertyResultSuccess {
pub struct SetDatasetVisibilityResultSuccess {
_dummy: Option<String>,
}

#[ComplexObject]
impl SetDatasetPropertyResultSuccess {
impl SetDatasetVisibilityResultSuccess {
async fn message(&self) -> String {
"Updated".to_string()
"Success".to_string()
}
}

Expand Down
142 changes: 116 additions & 26 deletions src/adapter/graphql/src/queries/datasets/dataset_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0.

use chrono::prelude::*;
use kamu_core::auth::{ClassifyByAllowanceResponse, DatasetAction};
use kamu_core::{
self as domain,
MetadataChainExt,
Expand All @@ -25,8 +26,6 @@ use crate::utils::get_dataset;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct DatasetMetadata {
dataset_handle: odf::DatasetHandle,
}
Expand Down Expand Up @@ -81,31 +80,50 @@ impl DatasetMetadata {
}
}

// TODO: Private Datasets: tests
/// Current upstream dependencies of a dataset
async fn current_upstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
let (dependency_graph_service, dataset_registry) = from_catalog_n!(
async fn current_upstream_dependencies(
&self,
ctx: &Context<'_>,
) -> Result<Vec<UpstreamDatasetResult>> {
let (dependency_graph_service, dataset_registry, dataset_action_authorizer) = from_catalog_n!(
ctx,
dyn domain::DependencyGraphService,
dyn domain::DatasetRegistry
dyn domain::DatasetRegistry,
dyn kamu_core::auth::DatasetActionAuthorizer
);

use tokio_stream::StreamExt;
let upstream_dataset_ids: Vec<_> = dependency_graph_service
use futures::{StreamExt, TryStreamExt};

let upstream_dataset_handles = dependency_graph_service
.get_upstream_dependencies(&self.dataset_handle.id)
.await
.int_err()?
.collect()
.await;

let mut upstream = Vec::with_capacity(upstream_dataset_ids.len());
for upstream_dataset_id in upstream_dataset_ids {
let hdl = dataset_registry
.resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref())
.await
.int_err()?;
.then(|upstream_dataset_id| {
let dataset_registry = dataset_registry.clone();
async move {
dataset_registry
.resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref())
.await
.int_err()
}
})
.try_collect::<Vec<_>>()
.await?;

let upstream_dataset_handles_len = upstream_dataset_handles.len();
let ClassifyByAllowanceResponse {
authorized_handles,
unauthorized_handles_with_errors,
} = dataset_action_authorizer
.classify_datasets_by_allowance(upstream_dataset_handles, DatasetAction::Read)
.await?;

let mut upstream = Vec::with_capacity(upstream_dataset_handles_len);
for hdl in authorized_handles {
let maybe_account = Account::from_dataset_alias(ctx, &hdl.alias).await?;
if let Some(account) = maybe_account {
upstream.push(Dataset::new(account, hdl));
upstream.push(UpstreamDatasetResult::found(Dataset::new(account, hdl)));
} else {
tracing::warn!(
"Skipped upstream dataset '{}' with unresolved account",
Expand All @@ -114,28 +132,51 @@ impl DatasetMetadata {
}
}

upstream.extend(
unauthorized_handles_with_errors
.into_iter()
.map(|(hdl, _)| UpstreamDatasetResult::not_found(hdl)),
);

Ok(upstream)
}

// TODO: Convert to collection
// TODO: Private Datasets: tests
/// Current downstream dependencies of a dataset
async fn current_downstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
let (dependency_graph_service, dataset_registry) = from_catalog_n!(
let (dependency_graph_service, dataset_registry, dataset_action_authorizer) = from_catalog_n!(
ctx,
dyn domain::DependencyGraphService,
dyn domain::DatasetRegistry
dyn domain::DatasetRegistry,
dyn kamu_core::auth::DatasetActionAuthorizer
);

use tokio_stream::StreamExt;
let downstream_dataset_ids: Vec<_> = dependency_graph_service
use futures::{StreamExt, TryStreamExt};

let downstream_dataset_handles = dependency_graph_service
.get_downstream_dependencies(&self.dataset_handle.id)
.await
.int_err()?
.collect()
.await;

let mut downstream = Vec::with_capacity(downstream_dataset_ids.len());
for downstream_dataset_id in downstream_dataset_ids {
.then(|upstream_dataset_id| {
let dataset_registry = dataset_registry.clone();
async move {
dataset_registry
.resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref())
.await
.int_err()
}
})
.try_collect::<Vec<_>>()
.await?;

let authorized_downstream_dataset_ids = dataset_action_authorizer
.classify_datasets_by_allowance(downstream_dataset_handles, DatasetAction::Read)
.await?
.authorized_handles;

let mut downstream = Vec::with_capacity(authorized_downstream_dataset_ids.len());
for downstream_dataset_id in authorized_downstream_dataset_ids {
let hdl = dataset_registry
.resolve_dataset_handle_by_ref(&downstream_dataset_id.as_local_ref())
.await
Expand Down Expand Up @@ -284,3 +325,52 @@ impl DatasetMetadata {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Interface, Debug, Clone)]
#[graphql(field(name = "message", ty = "String"))]
enum UpstreamDatasetResult {
Found(UpstreamDatasetResultFound),
NotFound(UpstreamDatasetResultNotFound),
}

impl UpstreamDatasetResult {
pub fn found(dataset: Dataset) -> Self {
Self::Found(UpstreamDatasetResultFound { dataset })
}

pub fn not_found(dataset_handle: odf::DatasetHandle) -> Self {
Self::NotFound(UpstreamDatasetResultNotFound {
dataset_id: dataset_handle.id.into(),
dataset_alias: dataset_handle.alias.into(),
})
}
}

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub struct UpstreamDatasetResultFound {
pub dataset: Dataset,
}

#[ComplexObject]
impl UpstreamDatasetResultFound {
async fn message(&self) -> String {
"Found".to_string()
}
}

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub struct UpstreamDatasetResultNotFound {
pub dataset_id: DatasetID,
pub dataset_alias: DatasetAlias,
}

#[ComplexObject]
impl UpstreamDatasetResultNotFound {
async fn message(&self) -> String {
"Not found".to_string()
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

0 comments on commit e1b8804

Please sign in to comment.