Skip to content

Commit

Permalink
Chore/private datasets address comments - 1 (#1037)
Browse files Browse the repository at this point in the history
* DatasetEntryServiceExt: absorb DatasetOwnershipService::get_owned_datasets()

* DatasetEntryServiceExt: absorb all rest DatasetOwnershipService

* kamu-adapter-auth-oso-rebac: remove duplicate dep

* DatasetActionAuthorizer: classify_datasets_by_allowance() -> classify_dataset_handles_by_allowance()

* DatasetRegistry: remove an TODO

* DatasetEntryRepository::get_dataset_entries(): use dataset_name column for sorting in implementations (as it was)

* OsoResourceServiceImpl: state extraction to singleton component

* DatasetActionAuthorizer::check_action_allowed(): use DatasetID instead of DatasetHandle

* DatasetActionAuthorizer::is_action_allowed(): use DatasetID instead of DatasetHandle

* DatasetActionAuthorizer::get_allowed_actions(): use DatasetID instead of DatasetHandle

* DatasetActionAuthorizer: finalization

* ODataServiceContext::list_collections(): use DatasetActionAuthorizer::filtered_datasets_stream()

* Datasets::by_account_impl(): use DatasetActionAuthorizer::filtered_datasets_stream()

* Search::query(): use DatasetActionAuthorizer::filtered_datasets_stream()

* GetDatasetDownstreamDependenciesUseCase: extract

* GetDatasetUpstreamDependenciesUseCase: extract

* AccountServiceImpl::all_accounts(): absorb list_all_accounts() method

* ExpensiveAccountRepository: extract trait

* RebacService::properties_count(): implement

* DatasetEntryService: move list-* operations within an implementation
  • Loading branch information
s373r authored Jan 16, 2025
1 parent f330215 commit a01068f
Show file tree
Hide file tree
Showing 126 changed files with 1,465 additions and 876 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/adapter/auth-oso-rebac/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ tracing = { version = "0.1", default-features = false }
kamu-accounts-inmem = { workspace = true }
kamu-accounts-services = { workspace = true }
kamu-auth-rebac-inmem = { workspace = true }
kamu-auth-rebac-services = { workspace = true }
kamu-core = { workspace = true, default-features = false, features = ["oso", "testing"] }
kamu-datasets-inmem = { workspace = true }
kamu-datasets-services = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/adapter/auth-oso-rebac/src/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn register_dependencies(catalog_builder: &mut CatalogBuilder) {
catalog_builder.add::<KamuAuthOso>();
catalog_builder.add::<OsoDatasetAuthorizer>();
catalog_builder.add::<OsoResourceServiceImpl>();
catalog_builder.add::<OsoResourceServiceImplStateHolder>();
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
20 changes: 9 additions & 11 deletions src/adapter/auth-oso-rebac/src/oso_dataset_authorizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ impl OsoDatasetAuthorizer {

async fn dataset_resource(
&self,
dataset_handle: &odf::DatasetHandle,
dataset_id: &odf::DatasetID,
) -> Result<DatasetResource, InternalError> {
let dataset_id = &dataset_handle.id;

let dataset_resource = self
.oso_resource_service
.dataset_resource(dataset_id)
Expand All @@ -86,14 +84,14 @@ impl OsoDatasetAuthorizer {

#[async_trait::async_trait]
impl DatasetActionAuthorizer for OsoDatasetAuthorizer {
#[tracing::instrument(level = "debug", skip_all, fields(%dataset_handle, ?action))]
#[tracing::instrument(level = "debug", skip_all, fields(%dataset_id, ?action))]
async fn check_action_allowed(
&self,
dataset_handle: &odf::DatasetHandle,
dataset_id: &odf::DatasetID,
action: DatasetAction,
) -> Result<(), DatasetActionUnauthorizedError> {
let (user_actor, dataset_resource) =
try_join!(self.user_actor(), self.dataset_resource(dataset_handle))?;
try_join!(self.user_actor(), self.dataset_resource(dataset_id))?;

match self
.kamu_auth_oso
Expand All @@ -104,7 +102,7 @@ impl DatasetActionAuthorizer for OsoDatasetAuthorizer {
AccessError::Forbidden(
DatasetActionNotEnoughPermissionsError {
action,
dataset_ref: dataset_handle.as_local_ref(),
dataset_ref: dataset_id.as_local_ref(),
}
.into(),
),
Expand All @@ -113,13 +111,13 @@ impl DatasetActionAuthorizer for OsoDatasetAuthorizer {
}
}

#[tracing::instrument(level = "debug", skip_all, fields(%dataset_handle))]
#[tracing::instrument(level = "debug", skip_all, fields(%dataset_id))]
async fn get_allowed_actions(
&self,
dataset_handle: &odf::DatasetHandle,
dataset_id: &odf::DatasetID,
) -> Result<HashSet<DatasetAction>, InternalError> {
let (user_actor, dataset_resource) =
try_join!(self.user_actor(), self.dataset_resource(dataset_handle))?;
try_join!(self.user_actor(), self.dataset_resource(dataset_id))?;

self.kamu_auth_oso
.get_allowed_actions(user_actor, dataset_resource)
Expand Down Expand Up @@ -174,7 +172,7 @@ impl DatasetActionAuthorizer for OsoDatasetAuthorizer {
}

#[tracing::instrument(level = "debug", skip_all, fields(dataset_handles=?dataset_handles, action=%action))]
async fn classify_datasets_by_allowance(
async fn classify_dataset_handles_by_allowance(
&self,
dataset_handles: Vec<odf::DatasetHandle>,
action: DatasetAction,
Expand Down
30 changes: 22 additions & 8 deletions src/adapter/auth-oso-rebac/src/oso_resource_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,46 @@ use crate::{DatasetResource, UserActor};
type EntityId = String;

#[derive(Debug, Default)]
struct State {
pub struct State {
user_actor_cache_map: HashMap<EntityId, UserActor>,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct OsoResourceServiceImplStateHolder {
pub state: RwLock<State>,
}

#[component(pub)]
#[scope(Singleton)]
impl OsoResourceServiceImplStateHolder {
pub fn new() -> Self {
Self {
state: RwLock::new(State::default()),
}
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// TODO: Private Datasets: add Service trait?
pub struct OsoResourceServiceImpl {
state: RwLock<State>,
state_holder: Arc<OsoResourceServiceImplStateHolder>,
dataset_entry_repo: Arc<dyn DatasetEntryRepository>,
rebac_service: Arc<dyn RebacService>,
account_repo: Arc<dyn AccountRepository>,
}

#[component(pub)]
// TODO: Private Datasets: This service should be a singleton
// Alternative: put the state into a separate component
// #[scope(Singleton)]
impl OsoResourceServiceImpl {
pub fn new(
state_holder: Arc<OsoResourceServiceImplStateHolder>,
dataset_entry_repo: Arc<dyn DatasetEntryRepository>,
rebac_service: Arc<dyn RebacService>,
account_repo: Arc<dyn AccountRepository>,
) -> Self {
Self {
state: RwLock::new(State::default()),
state_holder,
dataset_entry_repo,
rebac_service,
account_repo,
Expand All @@ -74,7 +88,7 @@ impl OsoResourceServiceImpl {

// First, an attempt to get from the cache
{
let readable_state = self.state.read().await;
let readable_state = self.state_holder.state.read().await;

let account_id_stack = account_id.as_did_str().to_stack_string();
let maybe_cached_user_actor = readable_state
Expand Down Expand Up @@ -107,7 +121,7 @@ impl OsoResourceServiceImpl {
};

// Lastly, caching
let mut writable_state = self.state.write().await;
let mut writable_state = self.state_holder.state.write().await;

writable_state
.user_actor_cache_map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ use time_source::SystemTimeSourceDefault;
#[test_log::test(tokio::test)]
async fn test_owner_can_read_and_write_private_dataset() {
let harness = DatasetAuthorizerHarness::new(logged("john")).await;
let dataset_handle = harness
let dataset_id = harness
.create_private_dataset(dataset_alias("john/foo"))
.await;

let read_result = harness
.dataset_authorizer
.check_action_allowed(&dataset_handle, DatasetAction::Read)
.check_action_allowed(&dataset_id, DatasetAction::Read)
.await;

let write_result = harness
.dataset_authorizer
.check_action_allowed(&dataset_handle, DatasetAction::Write)
.check_action_allowed(&dataset_id, DatasetAction::Write)
.await;

let allowed_actions = harness
.dataset_authorizer
.get_allowed_actions(&dataset_handle)
.get_allowed_actions(&dataset_id)
.await;

assert_matches!(read_result, Ok(()));
Expand All @@ -81,23 +81,23 @@ async fn test_owner_can_read_and_write_private_dataset() {
#[test_log::test(tokio::test)]
async fn test_guest_can_read_but_not_write_public_dataset() {
let harness = DatasetAuthorizerHarness::new(anonymous()).await;
let dataset_handle = harness
let dataset_id = harness
.create_public_dataset(dataset_alias("john/foo"))
.await;

let read_result = harness
.dataset_authorizer
.check_action_allowed(&dataset_handle, DatasetAction::Read)
.check_action_allowed(&dataset_id, DatasetAction::Read)
.await;

let write_result = harness
.dataset_authorizer
.check_action_allowed(&dataset_handle, DatasetAction::Write)
.check_action_allowed(&dataset_id, DatasetAction::Write)
.await;

let allowed_actions = harness
.dataset_authorizer
.get_allowed_actions(&dataset_handle)
.get_allowed_actions(&dataset_id)
.await;

assert_matches!(read_result, Ok(()));
Expand Down Expand Up @@ -185,19 +185,19 @@ impl DatasetAuthorizerHarness {
}
}

async fn create_public_dataset(&self, alias: odf::DatasetAlias) -> odf::DatasetHandle {
async fn create_public_dataset(&self, alias: odf::DatasetAlias) -> odf::DatasetID {
self.create_dataset(alias, DatasetVisibility::Public).await
}

async fn create_private_dataset(&self, alias: odf::DatasetAlias) -> odf::DatasetHandle {
async fn create_private_dataset(&self, alias: odf::DatasetAlias) -> odf::DatasetID {
self.create_dataset(alias, DatasetVisibility::Private).await
}

async fn create_dataset(
&self,
alias: odf::DatasetAlias,
visibility: DatasetVisibility,
) -> odf::DatasetHandle {
) -> odf::DatasetID {
let dataset_id = dataset_id(&alias);

self.outbox
Expand All @@ -213,7 +213,7 @@ impl DatasetAuthorizerHarness {
.await
.unwrap();

odf::DatasetHandle::new(dataset_id, alias)
dataset_id
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use kamu_datasets::DatasetOwnershipService;
use kamu_datasets::{DatasetEntryService, DatasetEntryServiceExt};
use opendatafabric as odf;

use crate::prelude::*;
Expand All @@ -19,17 +19,18 @@ pub(crate) async fn ensure_account_owns_dataset(
ctx: &Context<'_>,
dataset_handle: &odf::DatasetHandle,
) -> Result<()> {
let dataset_ownership_service = from_catalog_n!(ctx, dyn DatasetOwnershipService);
let dataset_entry_service = from_catalog_n!(ctx, dyn DatasetEntryService);
let logged_account = utils::get_logged_account(ctx)?;

if logged_account.is_admin {
// Technically, the admin isn't the owner, but that's not a barrier in this case
return Ok(());
}

let not_owner = !dataset_ownership_service
let not_owner = !dataset_entry_service
.is_dataset_owned_by(&dataset_handle.id, &logged_account.account_id)
.await?;
.await
.int_err()?;

if not_owner {
return Err(Error::new("Only the dataset owner can perform this action").into());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use chrono::Utc;
use kamu_accounts::Account;
use kamu_datasets::DatasetOwnershipService;
use kamu_datasets::{DatasetEntryService, DatasetEntryServiceExt};
use kamu_flow_system::FlowTriggerService;
use opendatafabric::DatasetID;

Expand All @@ -30,10 +30,11 @@ impl AccountFlowTriggersMut {

#[graphql(skip)]
async fn get_account_dataset_ids(&self, ctx: &Context<'_>) -> Result<Vec<DatasetID>> {
let dataset_ownership_service = from_catalog_n!(ctx, dyn DatasetOwnershipService);
let dataset_ids: Vec<_> = dataset_ownership_service
.get_owned_datasets(&self.account.id)
.await?;
let dataset_entry_service = from_catalog_n!(ctx, dyn DatasetEntryService);
let dataset_ids: Vec<_> = dataset_entry_service
.get_owned_dataset_ids(&self.account.id)
.await
.int_err()?;

Ok(dataset_ids)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use futures::StreamExt;
use kamu_accounts::Account as AccountEntity;
use kamu_datasets::DatasetOwnershipService;
use kamu_datasets::{DatasetEntryService, DatasetEntryServiceExt};
use kamu_flow_system::FlowTriggerService;

use crate::prelude::*;
Expand All @@ -29,12 +29,13 @@ impl AccountFlowTriggers {

/// Checks if all triggers of all datasets in account are disabled
async fn all_paused(&self, ctx: &Context<'_>) -> Result<bool> {
let (dataset_ownership_service, flow_trigger_service) =
from_catalog_n!(ctx, dyn DatasetOwnershipService, dyn FlowTriggerService);
let (dataset_entry_service, flow_trigger_service) =
from_catalog_n!(ctx, dyn DatasetEntryService, dyn FlowTriggerService);

let owned_dataset_ids: Vec<_> = dataset_ownership_service
.get_owned_datasets(&self.account.id)
.await?;
let owned_dataset_ids: Vec<_> = dataset_entry_service
.get_owned_dataset_ids(&self.account.id)
.await
.int_err()?;

let mut all_triggers = flow_trigger_service
.find_triggers_by_datasets(owned_dataset_ids)
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/graphql/src/queries/datasets/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Dataset {
let dataset_action_authorizer = from_catalog_n!(ctx, dyn auth::DatasetActionAuthorizer);

let allowed_actions = dataset_action_authorizer
.get_allowed_actions(&self.dataset_handle)
.get_allowed_actions(&self.dataset_handle.id)
.await?;
let can_read = allowed_actions.contains(&auth::DatasetAction::Read);
let can_write = allowed_actions.contains(&auth::DatasetAction::Write);
Expand Down
Loading

0 comments on commit a01068f

Please sign in to comment.