From 1fba5cceca1a3fde7e844231bb004d1f949a89cf Mon Sep 17 00:00:00 2001 From: Dima Pristupa Date: Thu, 26 Dec 2024 20:13:32 +0200 Subject: [PATCH] GQL: Datasets: auth checks --- resources/schema.gql | 4 +- .../mutations/flows_mut/flows_mut_utils.rs | 12 +- .../src/queries/access_tokens/access_token.rs | 2 +- .../graphql/src/queries/accounts/account.rs | 5 +- .../src/queries/accounts/account_flow_runs.rs | 4 +- .../graphql/src/queries/admin/admin.rs | 4 + .../graphql/src/queries/datasets/dataset.rs | 15 +- .../src/queries/datasets/dataset_endpoints.rs | 22 +-- .../src/queries/datasets/dataset_env_var.rs | 2 +- .../src/queries/datasets/dataset_env_vars.rs | 5 - .../queries/datasets/dataset_flow_configs.rs | 5 - .../src/queries/datasets/dataset_flow_runs.rs | 7 - .../graphql/src/queries/datasets/datasets.rs | 130 +++++++++++------- src/domain/datasets/services/src/lib.rs | 1 + 14 files changed, 122 insertions(+), 96 deletions(-) diff --git a/resources/schema.gql b/resources/schema.gql index 39875c9ad4..91c112933c 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -2024,7 +2024,7 @@ type ViewAccessToken { """ createdAt: DateTime! """ - Date of token revokation + Date of token revocation """ revokedAt: DateTime """ @@ -2043,7 +2043,7 @@ type ViewDatasetEnvVar { """ key: String! """ - Non sercret value of dataset environment variable + Non secret value of dataset environment variable """ value: String """ diff --git a/src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs b/src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs index bc9fa6933b..1cbe6d2e4b 100644 --- a/src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs +++ b/src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs @@ -49,12 +49,14 @@ pub(crate) async fn check_if_flow_belongs_to_dataset( return Ok(Some(FlowInDatasetError::NotFound(FlowNotFound { flow_id }))) } }, - Err(e) => match e { - fs::GetFlowError::NotFound(_) => { - return Ok(Some(FlowInDatasetError::NotFound(FlowNotFound { flow_id }))) + Err(e) => { + return match e { + fs::GetFlowError::NotFound(_) => { + Ok(Some(FlowInDatasetError::NotFound(FlowNotFound { flow_id }))) + } + fs::GetFlowError::Internal(e) => Err(GqlError::Internal(e)), } - fs::GetFlowError::Internal(e) => return Err(GqlError::Internal(e)), - }, + } } Ok(None) diff --git a/src/adapter/graphql/src/queries/access_tokens/access_token.rs b/src/adapter/graphql/src/queries/access_tokens/access_token.rs index 8e71d7d774..ceb0bdfef2 100644 --- a/src/adapter/graphql/src/queries/access_tokens/access_token.rs +++ b/src/adapter/graphql/src/queries/access_tokens/access_token.rs @@ -42,7 +42,7 @@ impl ViewAccessToken { self.token.created_at } - /// Date of token revokation + /// Date of token revocation async fn revoked_at(&self) -> Option> { self.token.revoked_at } diff --git a/src/adapter/graphql/src/queries/accounts/account.rs b/src/adapter/graphql/src/queries/accounts/account.rs index 2b004625a9..0225549a83 100644 --- a/src/adapter/graphql/src/queries/accounts/account.rs +++ b/src/adapter/graphql/src/queries/accounts/account.rs @@ -90,7 +90,10 @@ impl Account { alias: &odf::DatasetAlias, ) -> Result, InternalError> { if alias.is_multi_tenant() { - Ok(Self::from_account_name(ctx, alias.account_name.as_ref().unwrap().clone()).await?) + // Safety: In multi-tenant, we have a name. + let account_name = alias.account_name.as_ref().unwrap().clone(); + + Ok(Self::from_account_name(ctx, account_name).await?) } else { let current_account_subject = from_catalog_n!(ctx, CurrentAccountSubject); diff --git a/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs b/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs index 7fb5416e85..bcf07820fa 100644 --- a/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs +++ b/src/adapter/graphql/src/queries/accounts/account_flow_runs.rs @@ -54,8 +54,8 @@ impl AccountFlowRuns { by_flow_status: filters.by_status.map(Into::into), by_dataset_ids: filters .by_dataset_ids - .iter() - .map(|dataset_id| dataset_id.clone().into()) + .into_iter() + .map(Into::into) .collect::>(), by_initiator: match filters.by_initiator { Some(initiator_filter) => match initiator_filter { diff --git a/src/adapter/graphql/src/queries/admin/admin.rs b/src/adapter/graphql/src/queries/admin/admin.rs index d72d79ef2a..c846cd33b2 100644 --- a/src/adapter/graphql/src/queries/admin/admin.rs +++ b/src/adapter/graphql/src/queries/admin/admin.rs @@ -10,6 +10,8 @@ use crate::prelude::*; use crate::AdminGuard; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct Admin; #[Object] @@ -20,3 +22,5 @@ impl Admin { Ok("OK".to_string()) } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/queries/datasets/dataset.rs b/src/adapter/graphql/src/queries/datasets/dataset.rs index 988912ce32..f1a1cae68f 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset.rs @@ -13,7 +13,7 @@ use opendatafabric as odf; use crate::prelude::*; use crate::queries::*; -use crate::utils::{ensure_dataset_env_vars_enabled, get_dataset}; +use crate::utils::{check_dataset_read_access, ensure_dataset_env_vars_enabled, get_dataset}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -38,15 +38,18 @@ impl Dataset { let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry); // TODO: Should we resolve reference at this point or allow unresolved and fail - // later? - let hdl = dataset_registry + // later? + let handle = dataset_registry .resolve_dataset_handle_by_ref(dataset_ref) .await .int_err()?; - let account = Account::from_dataset_alias(ctx, &hdl.alias) + + check_dataset_read_access(ctx, &handle).await?; + + let account = Account::from_dataset_alias(ctx, &handle.alias) .await? .expect("Account must exist"); - Ok(Dataset::new(account, hdl)) + Ok(Dataset::new(account, handle)) } /// Unique identifier of the dataset @@ -112,7 +115,7 @@ impl Dataset { } /// Access to the environment variable of this dataset - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn env_vars(&self, ctx: &Context<'_>) -> Result { ensure_dataset_env_vars_enabled(ctx)?; diff --git a/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs b/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs index 463f6fd53d..43fc8696a1 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_endpoints.rs @@ -15,6 +15,8 @@ use opendatafabric as odf; use crate::prelude::*; use crate::queries::*; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub struct DatasetEndpoints<'a> { owner: &'a Account, dataset_handle: odf::DatasetHandle, @@ -48,7 +50,7 @@ impl<'a> DatasetEndpoints<'a> { self.dataset_handle.alias.dataset_name.as_str() } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn web_link(&self) -> Result { let url = format!( "{}{}/{}", @@ -60,7 +62,7 @@ impl<'a> DatasetEndpoints<'a> { Ok(LinkProtocolDesc { url }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn cli(&self) -> Result { let url = format!( "odf+{}{}", @@ -78,7 +80,7 @@ impl<'a> DatasetEndpoints<'a> { }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn rest(&self) -> Result { let dataset_base_url = format!( "{}{}", @@ -101,14 +103,14 @@ impl<'a> DatasetEndpoints<'a> { }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn flightsql(&self) -> Result { Ok(FlightSqlDesc { url: self.config.protocols.base_url_flightsql.to_string(), }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn jdbc(&self) -> Result { let mut url = self.config.protocols.base_url_flightsql.clone(); @@ -119,28 +121,28 @@ impl<'a> DatasetEndpoints<'a> { }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn postgresql(&self) -> Result { Ok(PostgreSqlDesl { url: "- coming soon -".to_string(), }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn kafka(&self) -> Result { Ok(KafkaProtocolDesc { url: "- coming soon -".to_string(), }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn websocket(&self) -> Result { Ok(WebSocketProtocolDesc { url: "- coming soon -".to_string(), }) } - #[allow(clippy::unused_async)] + #[expect(clippy::unused_async)] async fn odata(&self) -> Result { let mut url = format!("{}odata", self.config.protocols.base_url_rest); // to respect both kinds of workspaces: single-tenant & multi-tenant @@ -168,3 +170,5 @@ impl<'a> DatasetEndpoints<'a> { }) } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/graphql/src/queries/datasets/dataset_env_var.rs b/src/adapter/graphql/src/queries/datasets/dataset_env_var.rs index 1bc95d867e..fe9d559b86 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_env_var.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_env_var.rs @@ -36,7 +36,7 @@ impl ViewDatasetEnvVar { self.env_var.key.clone() } - /// Non sercret value of dataset environment variable + /// Non secret value of dataset environment variable #[allow(clippy::unused_async)] async fn value(&self) -> Option { self.env_var.get_non_secret_value() diff --git a/src/adapter/graphql/src/queries/datasets/dataset_env_vars.rs b/src/adapter/graphql/src/queries/datasets/dataset_env_vars.rs index bcedbbf72d..fdf534429c 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_env_vars.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_env_vars.rs @@ -13,7 +13,6 @@ use opendatafabric as odf; use super::ViewDatasetEnvVar; use crate::prelude::*; -use crate::utils; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -35,8 +34,6 @@ impl DatasetEnvVars { ctx: &Context<'_>, dataset_env_var_id: DatasetEnvVarID, ) -> Result { - utils::check_dataset_read_access(ctx, &self.dataset_handle).await?; - let dataset_env_var_service = from_catalog_n!(ctx, dyn DatasetEnvVarService); let dataset_env_var = dataset_env_var_service .get_dataset_env_var_by_id(&dataset_env_var_id) @@ -57,8 +54,6 @@ impl DatasetEnvVars { page: Option, per_page: Option, ) -> Result { - utils::check_dataset_read_access(ctx, &self.dataset_handle).await?; - let dataset_env_var_service = from_catalog_n!(ctx, dyn DatasetEnvVarService); let page = page.unwrap_or(0); diff --git a/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs b/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs index a094fea269..863b35d9da 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs @@ -11,7 +11,6 @@ use kamu_flow_system::{FlowConfigurationService, FlowKeyDataset}; use opendatafabric as odf; use crate::prelude::*; -use crate::utils::check_dataset_read_access; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -32,8 +31,6 @@ impl DatasetFlowConfigs { ctx: &Context<'_>, dataset_flow_type: DatasetFlowType, ) -> Result> { - check_dataset_read_access(ctx, &self.dataset_handle).await?; - let flow_config_service = from_catalog_n!(ctx, dyn FlowConfigurationService); let maybe_flow_config = flow_config_service .find_configuration( @@ -48,8 +45,6 @@ impl DatasetFlowConfigs { /// Checks if all configs of this dataset are disabled async fn all_paused(&self, ctx: &Context<'_>) -> Result { - check_dataset_read_access(ctx, &self.dataset_handle).await?; - let flow_config_service = from_catalog_n!(ctx, dyn FlowConfigurationService); for dataset_flow_type in kamu_flow_system::DatasetFlowType::all() { let maybe_flow_config = flow_config_service diff --git a/src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs b/src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs index 9aad65a504..ce80620266 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs @@ -17,7 +17,6 @@ use {kamu_flow_system as fs, opendatafabric as odf}; use crate::mutations::{check_if_flow_belongs_to_dataset, FlowInDatasetError, FlowNotFound}; use crate::prelude::*; use crate::queries::{Account, Flow}; -use crate::utils; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -35,8 +34,6 @@ impl DatasetFlowRuns { } async fn get_flow(&self, ctx: &Context<'_>, flow_id: FlowID) -> Result { - utils::check_dataset_read_access(ctx, &self.dataset_handle).await?; - if let Some(error) = check_if_flow_belongs_to_dataset(ctx, flow_id, &self.dataset_handle).await? { @@ -64,8 +61,6 @@ impl DatasetFlowRuns { per_page: Option, filters: Option, ) -> Result { - utils::check_dataset_read_access(ctx, &self.dataset_handle).await?; - let flow_query_service = from_catalog_n!(ctx, dyn fs::FlowQueryService); let page = page.unwrap_or(0); @@ -122,8 +117,6 @@ impl DatasetFlowRuns { } async fn list_flow_initiators(&self, ctx: &Context<'_>) -> Result { - utils::check_dataset_read_access(ctx, &self.dataset_handle).await?; - let flow_query_service = from_catalog_n!(ctx, dyn fs::FlowQueryService); let flow_initiator_ids: Vec<_> = flow_query_service diff --git a/src/adapter/graphql/src/queries/datasets/datasets.rs b/src/adapter/graphql/src/queries/datasets/datasets.rs index 546ccfa85d..43509f1fd5 100644 --- a/src/adapter/graphql/src/queries/datasets/datasets.rs +++ b/src/adapter/graphql/src/queries/datasets/datasets.rs @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use futures::TryStreamExt; +use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer}; use kamu_core::{ DatasetRegistryExt, {self as domain}, @@ -16,6 +16,7 @@ use opendatafabric as odf; use crate::prelude::*; use crate::queries::*; +use crate::utils::check_dataset_read_access; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -24,22 +25,40 @@ pub struct Datasets; #[Object] impl Datasets { const DEFAULT_PER_PAGE: usize = 15; + const DATASETS_CHUNK_SIZE: usize = 100; - /// Returns dataset by its ID - async fn by_id(&self, ctx: &Context<'_>, dataset_id: DatasetID) -> Result> { + #[graphql(skip)] + async fn by_dataset_ref( + &self, + ctx: &Context<'_>, + dataset_ref: &odf::DatasetRef, + ) -> Result> { let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry); - let hdl = dataset_registry - .try_resolve_dataset_handle_by_ref(&dataset_id.as_local_ref()) + // TODO: Extract into ViewDatasetUseCase + let maybe_handle = dataset_registry + .try_resolve_dataset_handle_by_ref(dataset_ref) .await?; - Ok(match hdl { - Some(h) => { - let account = Account::from_dataset_alias(ctx, &h.alias) - .await? - .expect("Account must exist"); - Some(Dataset::new(account, h)) - } - None => None, - }) + + let Some(handle) = maybe_handle else { + return Ok(None); + }; + + if check_dataset_read_access(ctx, &handle).await.is_err() { + return Ok(None); + } + + let account = Account::from_dataset_alias(ctx, &handle.alias) + .await? + .expect("Account must exist"); + + Ok(Some(Dataset::new(account, handle))) + } + + /// Returns dataset by its ID + async fn by_id(&self, ctx: &Context<'_>, dataset_id: DatasetID) -> Result> { + let dataset_id: odf::DatasetID = dataset_id.into(); + + self.by_dataset_ref(ctx, &dataset_id.into_local_ref()).await } /// Returns dataset by its owner and name @@ -49,22 +68,10 @@ impl Datasets { account_name: AccountName, dataset_name: DatasetName, ) -> Result> { - let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry); let dataset_alias = odf::DatasetAlias::new(Some(account_name.into()), dataset_name.into()); - let hdl = dataset_registry - .try_resolve_dataset_handle_by_ref(&dataset_alias.into_local_ref()) - .await?; - - Ok(match hdl { - Some(h) => { - let account = Account::from_dataset_alias(ctx, &h.alias) - .await? - .expect("Account must exist"); - Some(Dataset::new(account, h)) - } - None => None, - }) + self.by_dataset_ref(ctx, &dataset_alias.into_local_ref()) + .await } #[graphql(skip)] @@ -75,25 +82,47 @@ impl Datasets { page: Option, per_page: Option, ) -> Result { - let dataset_registry = from_catalog_n!(ctx, dyn domain::DatasetRegistry); + let (dataset_registry, dataset_action_authorizer) = from_catalog_n!( + ctx, + dyn domain::DatasetRegistry, + dyn DatasetActionAuthorizer + ); let page = page.unwrap_or(0); let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE); let account_name = account_ref.account_name_internal(); - let mut all_datasets: Vec<_> = dataset_registry + use futures::TryStreamExt; + + let mut account_owned_datasets_stream = dataset_registry .all_dataset_handles_by_owner(&account_name.clone().into()) - .try_collect() - .await?; - let total_count = all_datasets.len(); - all_datasets.sort_by(|a, b| a.alias.cmp(&b.alias)); + .try_chunks(Self::DATASETS_CHUNK_SIZE); + let mut accessible_datasets_handles = Vec::new(); + + while let Some(account_owned_dataset_handles_chunk) = + account_owned_datasets_stream.try_next().await.int_err()? + { + let authorized_handles = dataset_action_authorizer + .classify_datasets_by_allowance( + account_owned_dataset_handles_chunk, + DatasetAction::Read, + ) + .await? + .authorized_handles; + + accessible_datasets_handles.extend(authorized_handles); + } - let nodes = all_datasets + let total_count = accessible_datasets_handles.len(); + + accessible_datasets_handles.sort_by(|a, b| a.alias.cmp(&b.alias)); + + let nodes = accessible_datasets_handles .into_iter() .skip(page * per_page) .take(per_page) - .map(|hdl| Dataset::new(account_ref.clone(), hdl)) + .map(|handle| Dataset::new(account_ref.clone(), handle)) .collect(); Ok(DatasetConnection::new(nodes, page, per_page, total_count)) @@ -114,22 +143,19 @@ impl Datasets { .find_account_name_by_id(&account_id) .await?; - match maybe_account_name { - Some(account_name) => { - self.by_account_impl( - ctx, - Account::new(account_id.into(), account_name.into()), - page, - per_page, - ) - .await - } - None => { - let page = page.unwrap_or(0); - let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE); - - Ok(DatasetConnection::new(vec![], page, per_page, 0)) - } + if let Some(account_name) = maybe_account_name { + self.by_account_impl( + ctx, + Account::new(account_id.into(), account_name.into()), + page, + per_page, + ) + .await + } else { + let page = page.unwrap_or(0); + let per_page = per_page.unwrap_or(Self::DEFAULT_PER_PAGE); + + Ok(DatasetConnection::new(vec![], page, per_page, 0)) } } diff --git a/src/domain/datasets/services/src/lib.rs b/src/domain/datasets/services/src/lib.rs index ba39e55a7a..fe9c04233b 100644 --- a/src/domain/datasets/services/src/lib.rs +++ b/src/domain/datasets/services/src/lib.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. #![feature(let_chains)] +#![feature(lint_reasons)] // Re-exports pub use kamu_datasets as domain;