From 5d1b16f462776d25ad6dec2b62c0e04d8c606e22 Mon Sep 17 00:00:00 2001 From: Sergei Zaychenko Date: Fri, 27 Dec 2024 01:05:08 +0200 Subject: [PATCH] Using odf meta-crate in domain/core --- Cargo.lock | 4 +- src/domain/core/Cargo.toml | 4 +- .../src/auth/dataset_action_authorizer.rs | 1 - src/domain/core/src/entities/engine.rs | 18 ++- .../core/src/entities/resolved_dataset.rs | 11 +- .../src/entities/resolved_datasets_map.rs | 14 +- .../src/entities/writer_metadata_state.rs | 59 ++++----- .../src/entities/writer_source_visitor.rs | 69 +++++----- .../core/src/messages/core_message_types.rs | 47 +++---- .../compaction/compaction_executor.rs | 10 +- .../compaction/compaction_listener.rs | 2 - .../services/compaction/compaction_planner.rs | 23 ++-- .../src/services/dataset_changes_service.rs | 7 +- .../src/services/dataset_ownership_service.rs | 13 +- .../core/src/services/dataset_registry.rs | 41 +++--- .../src/services/dependency_graph_service.rs | 17 ++- .../services/ingest/data_format_registry.rs | 11 +- .../core/src/services/ingest/data_writer.rs | 19 +-- .../services/ingest/polling_ingest_service.rs | 13 +- .../services/ingest/push_ingest_executor.rs | 10 +- .../services/ingest/push_ingest_planner.rs | 6 +- .../src/services/metadata_query_service.rs | 19 ++- .../core/src/services/provenance_service.rs | 12 +- .../core/src/services/pull_request_planner.rs | 68 +++++----- .../core/src/services/push_request_planner.rs | 26 ++-- src/domain/core/src/services/query_service.rs | 40 +++--- .../core/src/services/remote_aliases.rs | 9 +- .../src/services/remote_aliases_registry.rs | 19 ++- .../services/remote_repository_registry.rs | 19 +-- .../src/services/remote_status_service.rs | 1 - .../core/src/services/reset/reset_executor.rs | 4 +- .../core/src/services/reset/reset_planner.rs | 1 - .../core/src/services/resource_loader.rs | 7 +- .../core/src/services/search_service.rs | 14 +- src/domain/core/src/services/sync_service.rs | 82 ++++++------ .../services/transform/transform_executor.rs | 3 +- .../services/transform/transform_listener.rs | 4 +- .../transform/transform_request_planner.rs | 65 +++++----- .../src/services/transform/transform_types.rs | 13 +- .../core/src/services/verification_service.rs | 76 +++++------ .../watermark/set_watermark_executor.rs | 1 - .../append_dataset_metadata_batch_use_case.rs | 8 +- .../commit_dataset_event_use_case.rs | 11 +- .../src/use_cases/compact_dataset_use_case.rs | 1 - .../create_dataset_from_snapshot_use_case.rs | 7 +- .../src/use_cases/create_dataset_use_case.rs | 11 +- .../src/use_cases/delete_dataset_use_case.rs | 12 +- .../src/use_cases/push_dataset_use_case.rs | 3 +- .../src/use_cases/rename_dataset_use_case.rs | 9 +- .../src/use_cases/reset_dataset_use_case.rs | 1 - .../src/use_cases/set_watermark_use_case.rs | 1 - .../src/use_cases/verify_dataset_use_case.rs | 6 +- .../src/utils/metadata_chain_comparator.rs | 122 +++++++++--------- src/domain/odf/odf/src/lib.rs | 1 + 54 files changed, 500 insertions(+), 575 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4131300c9..2b54e805d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6075,9 +6075,7 @@ dependencies = [ "messaging-outbox", "mockall", "object_store", - "odf-dataset", - "odf-metadata", - "odf-storage", + "odf", "pathdiff", "pin-project", "serde", diff --git a/src/domain/core/Cargo.toml b/src/domain/core/Cargo.toml index b5ec7726c..a51bd2367 100644 --- a/src/domain/core/Cargo.toml +++ b/src/domain/core/Cargo.toml @@ -35,9 +35,7 @@ file-utils = { workspace = true } internal-error = { workspace = true } kamu-datasets = { workspace = true } messaging-outbox = { workspace = true } -odf-metadata = { workspace = true } -odf-dataset = { workspace = true } -odf-storage = { workspace = true } +odf = { workspace = true } async-trait = { version = "0.1", default-features = false } bytes = { version = "1", default-features = false } diff --git a/src/domain/core/src/auth/dataset_action_authorizer.rs b/src/domain/core/src/auth/dataset_action_authorizer.rs index 3b2c183a2..a6a7956af 100644 --- a/src/domain/core/src/auth/dataset_action_authorizer.rs +++ b/src/domain/core/src/auth/dataset_action_authorizer.rs @@ -12,7 +12,6 @@ use std::str::FromStr; use dill::*; use internal_error::{ErrorIntoInternal, InternalError}; -use odf_metadata as odf; use thiserror::Error; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/entities/engine.rs b/src/domain/core/src/entities/engine.rs index 426992f98..9f6d5b87c 100644 --- a/src/domain/core/src/entities/engine.rs +++ b/src/domain/core/src/entities/engine.rs @@ -15,8 +15,6 @@ use datafusion::arrow::datatypes::SchemaRef; use datafusion::prelude::{DataFrame, SessionContext}; use file_utils::OwnedFile; use internal_error::*; -use odf_dataset::BlockRef; -use odf_metadata as odf; use thiserror::Error; use crate::ResolvedDatasetsMap; @@ -52,7 +50,7 @@ pub struct RawQueryRequestExt { /// Data to be used in the query pub input_data: DataFrame, /// Defines the query to be performed - pub transform: odf::Transform, + pub transform: odf::metadata::Transform, } #[derive(Debug, Clone)] @@ -77,11 +75,11 @@ pub struct TransformRequestExt { /// Identifies the output dataset pub dataset_handle: odf::DatasetHandle, /// Block reference to advance upon commit - pub block_ref: BlockRef, + pub block_ref: odf::BlockRef, /// Current head (for concurrency control) pub head: odf::Multihash, /// Transformation that will be applied to produce new data - pub transform: odf::Transform, + pub transform: odf::metadata::Transform, /// System time to use for new records pub system_time: DateTime, /// Expected data schema (if already defined) @@ -91,7 +89,7 @@ pub struct TransformRequestExt { /// Defines the input data pub inputs: Vec, /// Output dataset's vocabulary - pub vocab: odf::DatasetVocabulary, + pub vocab: odf::metadata::DatasetVocabulary, /// Previous checkpoint, if any pub prev_checkpoint: Option, } @@ -103,7 +101,7 @@ pub struct TransformRequestInputExt { /// An alias of this input to be used in queries pub alias: String, /// Input dataset's vocabulary - pub vocab: odf::DatasetVocabulary, + pub vocab: odf::metadata::DatasetVocabulary, /// Last block of the input dataset that was previously incorporated into /// the derivative transformation, if any. Must be equal to the last /// non-empty `newBlockHash`. Together with `newBlockHash` defines a @@ -131,13 +129,13 @@ pub struct TransformRequestInputExt { /// List of data files that will be read pub data_slices: Vec, /// TODO: remove? - pub explicit_watermarks: Vec, + pub explicit_watermarks: Vec, } #[derive(Debug)] pub struct TransformResponseExt { /// Data slice produced by the transaction, if any - pub new_offset_interval: Option, + pub new_offset_interval: Option, /// Watermark advanced by the transaction, if any pub new_watermark: Option>, /// Schema of the output @@ -150,7 +148,7 @@ pub struct TransformResponseExt { pub new_data: Option, } -impl From for odf::ExecuteTransformInput { +impl From for odf::metadata::ExecuteTransformInput { fn from(val: TransformRequestInputExt) -> Self { Self { dataset_id: val.dataset_handle.id, diff --git a/src/domain/core/src/entities/resolved_dataset.rs b/src/domain/core/src/entities/resolved_dataset.rs index b31fda4bc..f90f45ece 100644 --- a/src/domain/core/src/entities/resolved_dataset.rs +++ b/src/domain/core/src/entities/resolved_dataset.rs @@ -9,23 +9,20 @@ use std::sync::Arc; -use odf_dataset::{CreateDatasetResult, Dataset}; -use odf_metadata::{self as odf}; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Clone)] pub struct ResolvedDataset { - dataset: Arc, + dataset: Arc, handle: odf::DatasetHandle, } impl ResolvedDataset { - pub fn new(dataset: Arc, handle: odf::DatasetHandle) -> Self { + pub fn new(dataset: Arc, handle: odf::DatasetHandle) -> Self { Self { dataset, handle } } - pub fn from(create_dataset_result: &CreateDatasetResult) -> Self { + pub fn from(create_dataset_result: &odf::CreateDatasetResult) -> Self { Self { dataset: create_dataset_result.dataset.clone(), handle: create_dataset_result.dataset_handle.clone(), @@ -56,7 +53,7 @@ impl ResolvedDataset { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// impl std::ops::Deref for ResolvedDataset { - type Target = Arc; + type Target = Arc; fn deref(&self) -> &Self::Target { &self.dataset } diff --git a/src/domain/core/src/entities/resolved_datasets_map.rs b/src/domain/core/src/entities/resolved_datasets_map.rs index 9f8b86245..8bccaeac9 100644 --- a/src/domain/core/src/entities/resolved_datasets_map.rs +++ b/src/domain/core/src/entities/resolved_datasets_map.rs @@ -9,30 +9,28 @@ use std::collections::HashMap; -use odf_metadata::{DatasetHandle, DatasetID}; - use crate::ResolvedDataset; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Default)] pub struct ResolvedDatasetsMap { - resolved_datasets_by_id: HashMap, + resolved_datasets_by_id: HashMap, } impl ResolvedDatasetsMap { - pub fn get_by_id(&self, id: &DatasetID) -> &ResolvedDataset { + pub fn get_by_id(&self, id: &odf::DatasetID) -> &ResolvedDataset { self.resolved_datasets_by_id .get(id) .expect("Dataset must be present") } #[inline] - pub fn get_by_handle(&self, handle: &DatasetHandle) -> &ResolvedDataset { + pub fn get_by_handle(&self, handle: &odf::DatasetHandle) -> &ResolvedDataset { self.get_by_id(&handle.id) } - pub fn iterate_all_handles(&self) -> impl Iterator { + pub fn iterate_all_handles(&self) -> impl Iterator { self.resolved_datasets_by_id .values() .map(ResolvedDataset::get_handle) @@ -50,8 +48,8 @@ impl ResolvedDatasetsMap { pub fn register_with( &mut self, - handle: &DatasetHandle, - dataset_fn: impl Fn(&DatasetHandle) -> ResolvedDataset, + handle: &odf::DatasetHandle, + dataset_fn: impl Fn(&odf::DatasetHandle) -> ResolvedDataset, ) { if !self.resolved_datasets_by_id.contains_key(&handle.id) { let resolved_dataset = dataset_fn(handle); diff --git a/src/domain/core/src/entities/writer_metadata_state.rs b/src/domain/core/src/entities/writer_metadata_state.rs index 375f7cb65..647729cfe 100644 --- a/src/domain/core/src/entities/writer_metadata_state.rs +++ b/src/domain/core/src/entities/writer_metadata_state.rs @@ -9,20 +9,6 @@ use chrono::{DateTime, Utc}; use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal}; -use odf_dataset::{ - AcceptVisitorError, - BlockRef, - GenericCallbackVisitor, - MetadataChainExt, - MetadataChainVisitorExtInfallible, - MetadataVisitorDecision, - SearchAddDataVisitor, - SearchSeedVisitor, - SearchSetDataSchemaVisitor, - SearchSetVocabVisitor, - SearchSourceStateVisitor, -}; -use odf_metadata as odf; use crate::{PushSourceNotFoundError, ResolvedDataset, WriterSourceEventVisitor}; @@ -31,17 +17,17 @@ use crate::{PushSourceNotFoundError, ResolvedDataset, WriterSourceEventVisitor}; /// Contains a projection of the metadata needed for [`DataWriter`] to function #[derive(Debug, Clone)] pub struct DataWriterMetadataState { - pub block_ref: BlockRef, + pub block_ref: odf::BlockRef, pub head: odf::Multihash, - pub schema: Option, + pub schema: Option, pub source_event: Option, - pub merge_strategy: odf::MergeStrategy, - pub vocab: odf::DatasetVocabulary, + pub merge_strategy: odf::metadata::MergeStrategy, + pub vocab: odf::metadata::DatasetVocabulary, pub data_slices: Vec, pub prev_offset: Option, pub prev_checkpoint: Option, pub prev_watermark: Option>, - pub prev_source_state: Option, + pub prev_source_state: Option, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -60,25 +46,31 @@ impl DataWriterMetadataState { )] pub async fn build( target: ResolvedDataset, - block_ref: &BlockRef, + block_ref: &odf::BlockRef, source_name: Option<&str>, ) -> Result { // TODO: PERF: Full metadata scan below - this is expensive and should be // improved using skip lists. + use odf::dataset::MetadataChainVisitorExtInfallible; + let head = target .as_metadata_chain() .resolve_ref(block_ref) .await .int_err()?; - let mut seed_visitor = SearchSeedVisitor::new().adapt_err(); - let mut set_vocab_visitor = SearchSetVocabVisitor::new().adapt_err(); - let mut set_data_schema_visitor = SearchSetDataSchemaVisitor::new().adapt_err(); - let mut prev_source_state_visitor = SearchSourceStateVisitor::new(source_name).adapt_err(); - let mut add_data_visitor = SearchAddDataVisitor::new().adapt_err(); - let mut add_data_collection_visitor = GenericCallbackVisitor::new( + let mut seed_visitor = odf::dataset::SearchSeedVisitor::new().adapt_err(); + let mut set_vocab_visitor = odf::dataset::SearchSetVocabVisitor::new().adapt_err(); + let mut set_data_schema_visitor = + odf::dataset::SearchSetDataSchemaVisitor::new().adapt_err(); + let mut prev_source_state_visitor = + odf::dataset::SearchSourceStateVisitor::new(source_name).adapt_err(); + let mut add_data_visitor = odf::dataset::SearchAddDataVisitor::new().adapt_err(); + let mut add_data_collection_visitor = odf::dataset::GenericCallbackVisitor::new( Vec::new(), - MetadataVisitorDecision::NextOfType(odf::MetadataEventTypeFlags::ADD_DATA), + odf::dataset::MetadataVisitorDecision::NextOfType( + odf::metadata::MetadataEventTypeFlags::ADD_DATA, + ), |state, _, block| { let odf::MetadataEvent::AddData(e) = &block.event else { unreachable!() @@ -88,12 +80,15 @@ impl DataWriterMetadataState { state.push(output_data.physical_hash.clone()); } - MetadataVisitorDecision::NextOfType(odf::MetadataEventTypeFlags::ADD_DATA) + odf::dataset::MetadataVisitorDecision::NextOfType( + odf::metadata::MetadataEventTypeFlags::ADD_DATA, + ) }, ) .adapt_err(); let mut source_event_visitor = WriterSourceEventVisitor::new(source_name); + use odf::dataset::MetadataChainExt; target .as_metadata_chain() .accept_by_hash( @@ -169,11 +164,11 @@ pub enum ScanMetadataError { ), } -impl From> for ScanMetadataError { - fn from(v: AcceptVisitorError) -> Self { +impl From> for ScanMetadataError { + fn from(v: odf::dataset::AcceptVisitorError) -> Self { match v { - AcceptVisitorError::Visitor(err) => err, - AcceptVisitorError::Traversal(err) => Self::Internal(err.int_err()), + odf::dataset::AcceptVisitorError::Visitor(err) => err, + odf::dataset::AcceptVisitorError::Traversal(err) => Self::Internal(err.int_err()), } } } diff --git a/src/domain/core/src/entities/writer_source_visitor.rs b/src/domain/core/src/entities/writer_source_visitor.rs index be09d3ea8..c3dc43aa4 100644 --- a/src/domain/core/src/entities/writer_source_visitor.rs +++ b/src/domain/core/src/entities/writer_source_visitor.rs @@ -7,19 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use odf_dataset::{ - HashedMetadataBlockRef, - MetadataChainVisitor, - MetadataVisitorDecision as Decision, -}; -use odf_metadata::{ - AddPushSource, - MergeStrategy, - MergeStrategyAppend, - MetadataEvent, - MetadataEventTypeFlags as Flag, - SetPollingSource, -}; +use odf::dataset::MetadataVisitorDecision as Decision; +use odf::metadata::MetadataEventTypeFlags as Flag; use crate::{ScanMetadataError, SourceNotFoundError}; @@ -28,7 +17,7 @@ use crate::{ScanMetadataError, SourceNotFoundError}; pub struct WriterSourceEventVisitor<'a> { maybe_source_name: Option<&'a str>, next_block_flags: Flag, - maybe_source_event: Option, + maybe_source_event: Option, } impl<'a> WriterSourceEventVisitor<'a> { @@ -49,16 +38,18 @@ impl<'a> WriterSourceEventVisitor<'a> { pub fn get_source_event_and_merge_strategy( self, - ) -> Result<(Option, MergeStrategy), ScanMetadataError> { + ) -> Result<(Option, odf::metadata::MergeStrategy), ScanMetadataError> { let merge_strategy = match (&self.maybe_source_event, self.maybe_source_name) { // Source found (Some(e), _) => match e { - MetadataEvent::SetPollingSource(e) => Ok(e.merge.clone()), - MetadataEvent::AddPushSource(e) => Ok(e.merge.clone()), + odf::MetadataEvent::SetPollingSource(e) => Ok(e.merge.clone()), + odf::MetadataEvent::AddPushSource(e) => Ok(e.merge.clone()), _ => unreachable!(), }, // No source defined - assuming append strategy - (None, None) => Ok(MergeStrategy::Append(MergeStrategyAppend {})), + (None, None) => Ok(odf::metadata::MergeStrategy::Append( + odf::metadata::MergeStrategyAppend {}, + )), // Source expected but not found (None, Some(source)) => Err(SourceNotFoundError::new( Some(source), @@ -69,7 +60,10 @@ impl<'a> WriterSourceEventVisitor<'a> { Ok((self.maybe_source_event, merge_strategy)) } - fn handle_set_polling_source(&mut self, e: &SetPollingSource) -> Result<(), ScanMetadataError> { + fn handle_set_polling_source( + &mut self, + e: &odf::metadata::SetPollingSource, + ) -> Result<(), ScanMetadataError> { if self.maybe_source_name.is_some() { return Err(SourceNotFoundError::new( self.maybe_source_name, @@ -83,7 +77,10 @@ impl<'a> WriterSourceEventVisitor<'a> { Ok(()) } - fn handle_add_push_source(&mut self, e: &AddPushSource) -> Result<(), ScanMetadataError> { + fn handle_add_push_source( + &mut self, + e: &odf::metadata::AddPushSource, + ) -> Result<(), ScanMetadataError> { if self.maybe_source_event.is_none() { if self.maybe_source_name.is_none() || self.maybe_source_name == Some(e.source_name.as_str()) @@ -106,41 +103,45 @@ impl<'a> WriterSourceEventVisitor<'a> { } } -impl<'a> MetadataChainVisitor for WriterSourceEventVisitor<'a> { +impl<'a> odf::dataset::MetadataChainVisitor for WriterSourceEventVisitor<'a> { type Error = ScanMetadataError; fn initial_decision(&self) -> Decision { Decision::NextOfType(self.next_block_flags) } - fn visit(&mut self, (_, block): HashedMetadataBlockRef) -> Result { + fn visit( + &mut self, + (_, block): odf::dataset::HashedMetadataBlockRef, + ) -> Result { match &block.event { - MetadataEvent::SetPollingSource(e) => { + odf::MetadataEvent::SetPollingSource(e) => { self.handle_set_polling_source(e)?; if self.maybe_source_name.is_none() { self.next_block_flags -= Flag::SET_POLLING_SOURCE; } } - MetadataEvent::AddPushSource(e) => { + odf::MetadataEvent::AddPushSource(e) => { self.handle_add_push_source(e)?; if self.maybe_source_name.is_some() && self.maybe_source_event.is_some() { self.next_block_flags -= Flag::ADD_PUSH_SOURCE; } } - MetadataEvent::DisablePollingSource(_) | MetadataEvent::DisablePushSource(_) => { + odf::MetadataEvent::DisablePollingSource(_) + | odf::MetadataEvent::DisablePushSource(_) => { unimplemented!("Disabling sources is not yet fully supported") } - MetadataEvent::Seed(_) - | MetadataEvent::AddData(_) - | MetadataEvent::ExecuteTransform(_) - | MetadataEvent::SetVocab(_) - | MetadataEvent::SetDataSchema(_) - | MetadataEvent::SetTransform(_) - | MetadataEvent::SetAttachments(_) - | MetadataEvent::SetInfo(_) - | MetadataEvent::SetLicense(_) => { + odf::MetadataEvent::Seed(_) + | odf::MetadataEvent::AddData(_) + | odf::MetadataEvent::ExecuteTransform(_) + | odf::MetadataEvent::SetVocab(_) + | odf::MetadataEvent::SetDataSchema(_) + | odf::MetadataEvent::SetTransform(_) + | odf::MetadataEvent::SetAttachments(_) + | odf::MetadataEvent::SetInfo(_) + | odf::MetadataEvent::SetLicense(_) => { unreachable!() } } diff --git a/src/domain/core/src/messages/core_message_types.rs b/src/domain/core/src/messages/core_message_types.rs index 191278777..9b72da53d 100644 --- a/src/domain/core/src/messages/core_message_types.rs +++ b/src/domain/core/src/messages/core_message_types.rs @@ -8,8 +8,6 @@ // by the Apache License, Version 2.0. use messaging_outbox::Message; -use odf_dataset::DatasetVisibility; -use odf_metadata::{AccountID, DatasetID, DatasetName}; use serde::{Deserialize, Serialize}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -28,10 +26,10 @@ pub enum DatasetLifecycleMessage { impl DatasetLifecycleMessage { pub fn created( - dataset_id: DatasetID, - owner_account_id: AccountID, - dataset_visibility: DatasetVisibility, - dataset_name: DatasetName, + dataset_id: odf::DatasetID, + owner_account_id: odf::AccountID, + dataset_visibility: odf::DatasetVisibility, + dataset_name: odf::DatasetName, ) -> Self { Self::Created(DatasetLifecycleMessageCreated { dataset_id, @@ -41,22 +39,25 @@ impl DatasetLifecycleMessage { }) } - pub fn dependencies_updated(dataset_id: DatasetID, new_upstream_ids: Vec) -> Self { + pub fn dependencies_updated( + dataset_id: odf::DatasetID, + new_upstream_ids: Vec, + ) -> Self { Self::DependenciesUpdated(DatasetLifecycleMessageDependenciesUpdated { dataset_id, new_upstream_ids, }) } - pub fn deleted(dataset_id: DatasetID) -> Self { + pub fn deleted(dataset_id: odf::DatasetID) -> Self { Self::Deleted(DatasetLifecycleMessageDeleted { dataset_id }) } pub fn renamed( - dataset_id: DatasetID, - owner_account_id: AccountID, - old_dataset_name: DatasetName, - new_dataset_name: DatasetName, + dataset_id: odf::DatasetID, + owner_account_id: odf::AccountID, + old_dataset_name: odf::DatasetName, + new_dataset_name: odf::DatasetName, ) -> Self { Self::Renamed(DatasetLifecycleMessageRenamed { dataset_id, @@ -77,36 +78,36 @@ impl Message for DatasetLifecycleMessage { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DatasetLifecycleMessageCreated { - pub dataset_id: DatasetID, - pub owner_account_id: AccountID, + pub dataset_id: odf::DatasetID, + pub owner_account_id: odf::AccountID, #[serde(default)] - pub dataset_visibility: DatasetVisibility, - pub dataset_name: DatasetName, + pub dataset_visibility: odf::DatasetVisibility, + pub dataset_name: odf::DatasetName, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DatasetLifecycleMessageDependenciesUpdated { - pub dataset_id: DatasetID, - pub new_upstream_ids: Vec, + pub dataset_id: odf::DatasetID, + pub new_upstream_ids: Vec, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DatasetLifecycleMessageDeleted { - pub dataset_id: DatasetID, + pub dataset_id: odf::DatasetID, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DatasetLifecycleMessageRenamed { - pub dataset_id: DatasetID, - pub owner_account_id: AccountID, - pub old_dataset_name: DatasetName, - pub new_dataset_name: DatasetName, + pub dataset_id: odf::DatasetID, + pub owner_account_id: odf::AccountID, + pub old_dataset_name: odf::DatasetName, + pub new_dataset_name: odf::DatasetName, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/services/compaction/compaction_executor.rs b/src/domain/core/src/services/compaction/compaction_executor.rs index 34e9e898b..2f4f10c4a 100644 --- a/src/domain/core/src/services/compaction/compaction_executor.rs +++ b/src/domain/core/src/services/compaction/compaction_executor.rs @@ -10,8 +10,6 @@ use std::sync::Arc; use internal_error::{ErrorIntoInternal, InternalError}; -use odf_dataset::SetChainRefError; -use odf_metadata as odf; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -59,11 +57,11 @@ pub enum CompactionExecutionError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -impl From for CompactionExecutionError { - fn from(v: SetChainRefError) -> Self { +impl From for CompactionExecutionError { + fn from(v: odf::dataset::SetChainRefError) -> Self { match v { - SetChainRefError::Access(e) => Self::Access(e), - SetChainRefError::Internal(e) => Self::Internal(e), + odf::dataset::SetChainRefError::Access(e) => Self::Access(e), + odf::dataset::SetChainRefError::Internal(e) => Self::Internal(e), _ => Self::Internal(v.int_err()), } } diff --git a/src/domain/core/src/services/compaction/compaction_listener.rs b/src/domain/core/src/services/compaction/compaction_listener.rs index c44b35a16..955b69ebb 100644 --- a/src/domain/core/src/services/compaction/compaction_listener.rs +++ b/src/domain/core/src/services/compaction/compaction_listener.rs @@ -9,8 +9,6 @@ use std::sync::Arc; -use odf_metadata as odf; - use super::{CompactionExecutionError, CompactionPlan, CompactionPlanningError}; use crate::CompactionResult; diff --git a/src/domain/core/src/services/compaction/compaction_planner.rs b/src/domain/core/src/services/compaction/compaction_planner.rs index 9969e2662..034854526 100644 --- a/src/domain/core/src/services/compaction/compaction_planner.rs +++ b/src/domain/core/src/services/compaction/compaction_planner.rs @@ -11,9 +11,6 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use internal_error::{ErrorIntoInternal, InternalError}; -use odf_dataset::IterBlocksError; -use odf_metadata as odf; -use odf_storage::GetRefError; use serde::{Deserialize, Serialize}; use thiserror::Error; use url::Url; @@ -93,7 +90,7 @@ pub struct CompactionDataSliceBatchInfo { #[derive(Debug, Default, Clone)] pub struct CompactionDataSliceBatchUpperBound { - pub new_source_state: Option, + pub new_source_state: Option, pub new_watermark: Option>, pub new_checkpoint: Option, pub end_offset: u64, @@ -138,23 +135,23 @@ pub struct InvalidDatasetKindError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -impl From for CompactionPlanningError { - fn from(v: GetRefError) -> Self { +impl From for CompactionPlanningError { + fn from(v: odf::storage::GetRefError) -> Self { match v { - GetRefError::NotFound(e) => Self::Internal(e.int_err()), - GetRefError::Access(e) => Self::Access(e), - GetRefError::Internal(e) => Self::Internal(e), + odf::storage::GetRefError::NotFound(e) => Self::Internal(e.int_err()), + odf::storage::GetRefError::Access(e) => Self::Access(e), + odf::storage::GetRefError::Internal(e) => Self::Internal(e), } } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -impl From for CompactionPlanningError { - fn from(v: IterBlocksError) -> Self { +impl From for CompactionPlanningError { + fn from(v: odf::dataset::IterBlocksError) -> Self { match v { - IterBlocksError::Access(e) => Self::Access(e), - IterBlocksError::Internal(e) => Self::Internal(e), + odf::dataset::IterBlocksError::Access(e) => Self::Access(e), + odf::dataset::IterBlocksError::Internal(e) => Self::Internal(e), _ => CompactionPlanningError::Internal(v.int_err()), } } diff --git a/src/domain/core/src/services/dataset_changes_service.rs b/src/domain/core/src/services/dataset_changes_service.rs index 552c388f0..14ebe2e75 100644 --- a/src/domain/core/src/services/dataset_changes_service.rs +++ b/src/domain/core/src/services/dataset_changes_service.rs @@ -9,9 +9,6 @@ use chrono::{DateTime, Utc}; use internal_error::InternalError; -use odf_dataset::DatasetNotFoundError; -use odf_metadata as odf; -use odf_storage::RefNotFoundError; use thiserror::Error; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -63,10 +60,10 @@ impl std::ops::AddAssign for DatasetIntervalIncrement { #[derive(Error, Debug)] pub enum GetIncrementError { #[error(transparent)] - DatasetNotFound(DatasetNotFoundError), + DatasetNotFound(odf::dataset::DatasetNotFoundError), #[error(transparent)] - RefNotFound(RefNotFoundError), + RefNotFound(odf::storage::RefNotFoundError), #[error(transparent)] Access(odf::AccessError), diff --git a/src/domain/core/src/services/dataset_ownership_service.rs b/src/domain/core/src/services/dataset_ownership_service.rs index c07a007a4..a149d54db 100644 --- a/src/domain/core/src/services/dataset_ownership_service.rs +++ b/src/domain/core/src/services/dataset_ownership_service.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_metadata::{AccountID, DatasetID}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -16,18 +15,18 @@ use odf_metadata::{AccountID, DatasetID}; pub trait DatasetOwnershipService: Sync + Send { async fn get_dataset_owners( &self, - dataset_id: &DatasetID, - ) -> Result, InternalError>; + dataset_id: &odf::DatasetID, + ) -> Result, InternalError>; async fn get_owned_datasets( &self, - account_id: &AccountID, - ) -> Result, InternalError>; + account_id: &odf::AccountID, + ) -> Result, InternalError>; async fn is_dataset_owned_by( &self, - dataset_id: &DatasetID, - account_id: &AccountID, + dataset_id: &odf::DatasetID, + account_id: &odf::AccountID, ) -> Result; } diff --git a/src/domain/core/src/services/dataset_registry.rs b/src/domain/core/src/services/dataset_registry.rs index bf80ca884..34dec5069 100644 --- a/src/domain/core/src/services/dataset_registry.rs +++ b/src/domain/core/src/services/dataset_registry.rs @@ -8,8 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_dataset::{DatasetHandleStream, GetDatasetError}; -use odf_metadata::{AccountName, DatasetHandle, DatasetID, DatasetRef}; use thiserror::Error; use crate::ResolvedDataset; @@ -18,21 +16,24 @@ use crate::ResolvedDataset; #[async_trait::async_trait] pub trait DatasetRegistry: Send + Sync { - fn all_dataset_handles(&self) -> DatasetHandleStream<'_>; + fn all_dataset_handles(&self) -> odf::dataset::DatasetHandleStream<'_>; - fn all_dataset_handles_by_owner(&self, owner_name: &AccountName) -> DatasetHandleStream<'_>; + fn all_dataset_handles_by_owner( + &self, + owner_name: &odf::AccountName, + ) -> odf::dataset::DatasetHandleStream<'_>; async fn resolve_dataset_handle_by_ref( &self, - dataset_ref: &DatasetRef, - ) -> Result; + dataset_ref: &odf::DatasetRef, + ) -> Result; async fn resolve_multiple_dataset_handles_by_ids( &self, - dataset_ids: Vec, + dataset_ids: Vec, ) -> Result; - fn get_dataset_by_handle(&self, dataset_handle: &DatasetHandle) -> ResolvedDataset; + fn get_dataset_by_handle(&self, dataset_handle: &odf::DatasetHandle) -> ResolvedDataset; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -43,13 +44,13 @@ pub trait DatasetRegistry: Send + Sync { pub trait DatasetRegistryExt: DatasetRegistry { async fn try_resolve_dataset_handle_by_ref( &self, - dataset_ref: &DatasetRef, - ) -> Result, InternalError>; + dataset_ref: &odf::DatasetRef, + ) -> Result, InternalError>; async fn get_dataset_by_ref( &self, - dataset_ref: &DatasetRef, - ) -> Result; + dataset_ref: &odf::DatasetRef, + ) -> Result; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -62,19 +63,19 @@ where { async fn try_resolve_dataset_handle_by_ref( &self, - dataset_ref: &DatasetRef, - ) -> Result, InternalError> { + dataset_ref: &odf::DatasetRef, + ) -> Result, InternalError> { match self.resolve_dataset_handle_by_ref(dataset_ref).await { Ok(hdl) => Ok(Some(hdl)), - Err(GetDatasetError::NotFound(_)) => Ok(None), - Err(GetDatasetError::Internal(e)) => Err(e), + Err(odf::dataset::GetDatasetError::NotFound(_)) => Ok(None), + Err(odf::dataset::GetDatasetError::Internal(e)) => Err(e), } } async fn get_dataset_by_ref( &self, - dataset_ref: &DatasetRef, - ) -> Result { + dataset_ref: &odf::DatasetRef, + ) -> Result { let dataset_handle = self.resolve_dataset_handle_by_ref(dataset_ref).await?; let dataset = self.get_dataset_by_handle(&dataset_handle); Ok(dataset) @@ -85,8 +86,8 @@ where #[derive(Default)] pub struct DatasetHandlesResolution { - pub resolved_handles: Vec, - pub unresolved_datasets: Vec<(DatasetID, GetDatasetError)>, + pub resolved_handles: Vec, + pub unresolved_datasets: Vec<(odf::DatasetID, odf::dataset::GetDatasetError)>, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/services/dependency_graph_service.rs b/src/domain/core/src/services/dependency_graph_service.rs index a3b2643b8..32e52d195 100644 --- a/src/domain/core/src/services/dependency_graph_service.rs +++ b/src/domain/core/src/services/dependency_graph_service.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_metadata::DatasetID; use thiserror::Error; use tokio_stream::Stream; @@ -19,27 +18,27 @@ pub trait DependencyGraphService: Sync + Send { /// Iterates over 1st level of dataset's downstream dependencies async fn get_downstream_dependencies( &self, - dataset_id: &DatasetID, + dataset_id: &odf::DatasetID, ) -> Result; /// Iterates over 1st level of dataset's upstream dependencies async fn get_upstream_dependencies( &self, - dataset_id: &DatasetID, + dataset_id: &odf::DatasetID, ) -> Result; /// Iterates over all levels of dataset's upstream dependencies /// and return reversed result including passed parameters async fn get_recursive_upstream_dependencies( &self, - dataset_ids: Vec, + dataset_ids: Vec, ) -> Result; /// Iterates over all levels of dataset's downstream dependencies /// and return result including passed parameters async fn get_recursive_downstream_dependencies( &self, - dataset_ids: Vec, + dataset_ids: Vec, ) -> Result; /// Given a set of dataset IDs this will sort them in depth-first or @@ -48,14 +47,14 @@ pub trait DependencyGraphService: Sync + Send { /// versa async fn in_dependency_order( &self, - dataset_ids: Vec, + dataset_ids: Vec, order: DependencyOrder, - ) -> Result, GetDependenciesError>; + ) -> Result, GetDependenciesError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub type DatasetIDStream<'a> = std::pin::Pin + Send + 'a>>; +pub type DatasetIDStream<'a> = std::pin::Pin + Send + 'a>>; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -81,7 +80,7 @@ pub enum GetDependenciesError { #[derive(Error, Debug)] #[error("Dataset {dataset_id} not found")] pub struct DatasetNodeNotFoundError { - pub dataset_id: DatasetID, + pub dataset_id: odf::DatasetID, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/services/ingest/data_format_registry.rs b/src/domain/core/src/services/ingest/data_format_registry.rs index fedd8474b..4bbb872b1 100644 --- a/src/domain/core/src/services/ingest/data_format_registry.rs +++ b/src/domain/core/src/services/ingest/data_format_registry.rs @@ -11,7 +11,6 @@ use std::path::PathBuf; use std::sync::Arc; use datafusion::prelude::SessionContext; -use odf_metadata as odf; use super::{ReadError, Reader, UnsupportedMediaTypeError}; @@ -23,14 +22,14 @@ pub trait DataFormatRegistry: Send + Sync { fn format_by_file_extension(&self, ext: &str) -> Option; - fn format_of(&self, conf: &odf::ReadStep) -> DataFormatDesc; + fn format_of(&self, conf: &odf::metadata::ReadStep) -> DataFormatDesc; // TODO: Avoid `async` poisoning by datafusion // TODO: Avoid passing `temp_path` here async fn get_reader( &self, ctx: SessionContext, - conf: odf::ReadStep, + conf: odf::metadata::ReadStep, temp_path: PathBuf, ) -> Result, ReadError>; @@ -39,15 +38,15 @@ pub trait DataFormatRegistry: Send + Sync { /// actual data fn get_compatible_read_config( &self, - base_conf: odf::ReadStep, + base_conf: odf::metadata::ReadStep, actual_media_type: &MediaType, - ) -> Result; + ) -> Result; fn get_best_effort_config( &self, schema: Option>, media_type: &MediaType, - ) -> Result; + ) -> Result; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/services/ingest/data_writer.rs b/src/domain/core/src/services/ingest/data_writer.rs index e88a60850..f667bf2c6 100644 --- a/src/domain/core/src/services/ingest/data_writer.rs +++ b/src/domain/core/src/services/ingest/data_writer.rs @@ -15,8 +15,6 @@ use datafusion::arrow::datatypes::SchemaRef; use datafusion::prelude::*; use file_utils::OwnedFile; use internal_error::*; -use odf_dataset::{AddDataParams, CommitError}; -use odf_metadata as odf; use super::MergeError; @@ -51,7 +49,10 @@ pub trait DataWriter { ) -> Result; /// Commit previously staged data and advance writer state - async fn commit(&mut self, staged: StageDataResult) -> Result; + async fn commit( + &mut self, + staged: StageDataResult, + ) -> Result; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -60,7 +61,7 @@ pub struct WriteWatermarkOpts { /// Will be used for system time data column and metadata block timestamp pub system_time: DateTime, /// Data source state to store in the commit - pub new_source_state: Option, + pub new_source_state: Option, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -74,7 +75,7 @@ pub struct WriteDataOpts { /// Explicit watermark to use in the commit pub new_watermark: Option>, /// Data source state to store in the commit - pub new_source_state: Option, + pub new_source_state: Option, // TODO: Find a better way to deal with temporary files /// Local FS path to which data slice will be written before committing it /// into the data object store of a dataset @@ -87,7 +88,7 @@ pub struct WriteDataOpts { pub struct WriteDataResult { pub old_head: odf::Multihash, pub new_head: odf::Multihash, - pub add_data_block: Option>, + pub add_data_block: Option>, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -99,7 +100,7 @@ pub struct StageDataResult { /// Set when `SetDataSchema` event needs to be committed pub new_schema: Option, /// Set when `AddData` event needs to be committed - pub add_data: Option, + pub add_data: Option, /// Set when commmit will contains some data pub data_file: Option, } @@ -112,7 +113,7 @@ pub enum WriteWatermarkError { EmptyCommit(#[from] EmptyCommitError), #[error(transparent)] - CommitError(#[from] CommitError), + CommitError(#[from] odf::dataset::CommitError), #[error(transparent)] Internal(#[from] InternalError), @@ -135,7 +136,7 @@ pub enum WriteDataError { EmptyCommit(#[from] EmptyCommitError), #[error(transparent)] - CommitError(#[from] CommitError), + CommitError(#[from] odf::dataset::CommitError), #[error(transparent)] Internal(#[from] InternalError), diff --git a/src/domain/core/src/services/ingest/polling_ingest_service.rs b/src/domain/core/src/services/ingest/polling_ingest_service.rs index 105ca6cf4..15b6813bc 100644 --- a/src/domain/core/src/services/ingest/polling_ingest_service.rs +++ b/src/domain/core/src/services/ingest/polling_ingest_service.rs @@ -16,8 +16,6 @@ use chrono::{DateTime, Utc}; use container_runtime::ImagePullError; use internal_error::{BoxedError, InternalError}; use kamu_datasets::{DatasetEnvVar, FindDatasetEnvVarError}; -use odf_dataset::CommitError; -use odf_metadata::*; use thiserror::Error; use crate::engine::{normalize_logs, EngineError, ProcessError}; @@ -85,8 +83,8 @@ pub enum PollingIngestResult { uncacheable: bool, }, Updated { - old_head: Multihash, - new_head: Multihash, + old_head: odf::Multihash, + new_head: odf::Multihash, has_more: bool, uncacheable: bool, }, @@ -134,7 +132,10 @@ pub struct NullPollingIngestListener; impl PollingIngestListener for NullPollingIngestListener {} pub trait PollingIngestMultiListener: Send + Sync { - fn begin_ingest(&self, _dataset: &DatasetHandle) -> Option> { + fn begin_ingest( + &self, + _dataset: &odf::DatasetHandle, + ) -> Option> { None } } @@ -302,7 +303,7 @@ pub enum PollingIngestError { CommitError( #[from] #[backtrace] - CommitError, + odf::dataset::CommitError, ), #[error(transparent)] diff --git a/src/domain/core/src/services/ingest/push_ingest_executor.rs b/src/domain/core/src/services/ingest/push_ingest_executor.rs index 72f07c5cf..ab226fbac 100644 --- a/src/domain/core/src/services/ingest/push_ingest_executor.rs +++ b/src/domain/core/src/services/ingest/push_ingest_executor.rs @@ -10,8 +10,6 @@ use std::sync::Arc; use internal_error::InternalError; -use odf_dataset::CommitError; -use odf_metadata::*; use thiserror::Error; use tokio::io::AsyncRead; @@ -54,8 +52,8 @@ pub trait PushIngestExecutor: Send + Sync { pub enum PushIngestResult { UpToDate, Updated { - old_head: Multihash, - new_head: Multihash, + old_head: odf::Multihash, + new_head: odf::Multihash, num_blocks: usize, }, } @@ -145,14 +143,14 @@ pub enum PushIngestError { CommitError( #[from] #[backtrace] - CommitError, + odf::dataset::CommitError, ), #[error(transparent)] Access( #[from] #[backtrace] - AccessError, + odf::metadata::AccessError, ), #[error(transparent)] diff --git a/src/domain/core/src/services/ingest/push_ingest_planner.rs b/src/domain/core/src/services/ingest/push_ingest_planner.rs index a679f5875..54e3ad1ce 100644 --- a/src/domain/core/src/services/ingest/push_ingest_planner.rs +++ b/src/domain/core/src/services/ingest/push_ingest_planner.rs @@ -11,8 +11,6 @@ use std::path::PathBuf; use chrono::{DateTime, Utc}; use internal_error::InternalError; -use odf_dataset::CommitError; -use odf_metadata as odf; use thiserror::Error; use crate::{DataWriterMetadataState, MediaType, ResolvedDataset, SchemaInferenceOpts}; @@ -59,7 +57,7 @@ pub struct PushIngestArgs { pub operation_dir: PathBuf, pub system_time: DateTime, pub opts: PushIngestOpts, - pub push_source: odf::AddPushSource, + pub push_source: odf::metadata::AddPushSource, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -84,7 +82,7 @@ pub enum PushIngestPlanningError { CommitError( #[from] #[backtrace] - CommitError, + odf::dataset::CommitError, ), #[error(transparent)] diff --git a/src/domain/core/src/services/metadata_query_service.rs b/src/domain/core/src/services/metadata_query_service.rs index 788f40886..9edf63100 100644 --- a/src/domain/core/src/services/metadata_query_service.rs +++ b/src/domain/core/src/services/metadata_query_service.rs @@ -9,7 +9,6 @@ use chrono::{DateTime, Utc}; use internal_error::InternalError; -use odf_metadata as odf; use crate::ResolvedDataset; @@ -24,7 +23,7 @@ pub trait MetadataQueryService: Send + Sync { ) -> Result< Option<( odf::Multihash, - odf::MetadataBlockTyped, + odf::MetadataBlockTyped, )>, InternalError, >; @@ -33,13 +32,25 @@ pub trait MetadataQueryService: Send + Sync { async fn get_active_push_sources( &self, target: ResolvedDataset, - ) -> Result)>, InternalError>; + ) -> Result< + Vec<( + odf::Multihash, + odf::MetadataBlockTyped, + )>, + InternalError, + >; /// Returns an active transform, if any async fn get_active_transform( &self, target: ResolvedDataset, - ) -> Result)>, InternalError>; + ) -> Result< + Option<( + odf::Multihash, + odf::MetadataBlockTyped, + )>, + InternalError, + >; /// Attempt reading watermark that is currently associated with a dataset async fn try_get_current_watermark( diff --git a/src/domain/core/src/services/provenance_service.rs b/src/domain/core/src/services/provenance_service.rs index 1ca84bbe2..9b8df066e 100644 --- a/src/domain/core/src/services/provenance_service.rs +++ b/src/domain/core/src/services/provenance_service.rs @@ -8,8 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_dataset::{DatasetNotFoundError, GetDatasetError}; -use odf_metadata as odf; use thiserror::Error; use crate::auth; @@ -82,7 +80,7 @@ pub struct LineageOptions {} #[derive(Debug, Error)] pub enum GetLineageError { #[error(transparent)] - NotFound(#[from] DatasetNotFoundError), + NotFound(#[from] odf::dataset::DatasetNotFoundError), #[error(transparent)] Access( #[from] @@ -97,11 +95,11 @@ pub enum GetLineageError { ), } -impl From for GetLineageError { - fn from(v: GetDatasetError) -> Self { +impl From for GetLineageError { + fn from(v: odf::dataset::GetDatasetError) -> Self { match v { - GetDatasetError::NotFound(e) => Self::NotFound(e), - GetDatasetError::Internal(e) => Self::Internal(e), + odf::dataset::GetDatasetError::NotFound(e) => Self::NotFound(e), + odf::dataset::GetDatasetError::Internal(e) => Self::Internal(e), } } } diff --git a/src/domain/core/src/services/pull_request_planner.rs b/src/domain/core/src/services/pull_request_planner.rs index 09d336b28..00a760f2b 100644 --- a/src/domain/core/src/services/pull_request_planner.rs +++ b/src/domain/core/src/services/pull_request_planner.rs @@ -12,8 +12,6 @@ use std::sync::Arc; use ::serde::{Deserialize, Serialize}; use internal_error::InternalError; -use odf_dataset::DatasetNotFoundError; -use odf_metadata::*; use thiserror::Error; use crate::*; @@ -109,7 +107,7 @@ pub struct PullTransformItem { pub struct PullSyncItem { pub depth: i32, pub local_target: PullLocalTarget, - pub remote_ref: DatasetRefRemote, + pub remote_ref: odf::DatasetRefRemote, pub maybe_original_request: Option, pub sync_request: Box, } @@ -118,34 +116,34 @@ pub struct PullSyncItem { #[derive(Debug, Clone, PartialEq, Eq)] pub enum PullLocalTarget { - Existing(DatasetHandle), - ToCreate(DatasetAlias), + Existing(odf::DatasetHandle), + ToCreate(odf::DatasetAlias), } impl PullLocalTarget { - pub fn existing(hdl: DatasetHandle) -> Self { + pub fn existing(hdl: odf::DatasetHandle) -> Self { Self::Existing(hdl) } - pub fn to_create(alias: DatasetAlias) -> Self { + pub fn to_create(alias: odf::DatasetAlias) -> Self { Self::ToCreate(alias) } - pub fn alias(&self) -> &DatasetAlias { + pub fn alias(&self) -> &odf::DatasetAlias { match self { Self::Existing(hdl) => &hdl.alias, Self::ToCreate(alias) => alias, } } - pub fn as_local_ref(&self) -> DatasetRef { + pub fn as_local_ref(&self) -> odf::DatasetRef { match self { Self::Existing(hdl) => hdl.as_local_ref(), Self::ToCreate(alias) => alias.as_local_ref(), } } - pub fn as_any_ref(&self) -> DatasetRefAny { + pub fn as_any_ref(&self) -> odf::DatasetRefAny { match self { Self::Existing(hdl) => hdl.as_any_ref(), Self::ToCreate(alias) => alias.as_any_ref(), @@ -157,29 +155,35 @@ impl PullLocalTarget { #[derive(Debug, Clone, PartialEq, Eq)] pub enum PullRequest { - Local(DatasetRef), + Local(odf::DatasetRef), Remote(PullRequestRemote), } #[derive(Debug, Clone, PartialEq, Eq)] pub struct PullRequestRemote { - pub remote_ref: DatasetRefRemote, - pub maybe_local_alias: Option, + pub remote_ref: odf::DatasetRefRemote, + pub maybe_local_alias: Option, } impl PullRequest { - pub fn local(dataset_ref: DatasetRef) -> Self { + pub fn local(dataset_ref: odf::DatasetRef) -> Self { Self::Local(dataset_ref) } - pub fn remote(remote_ref: DatasetRefRemote, maybe_local_alias: Option) -> Self { + pub fn remote( + remote_ref: odf::DatasetRefRemote, + maybe_local_alias: Option, + ) -> Self { Self::Remote(PullRequestRemote { remote_ref, maybe_local_alias, }) } - pub fn from_any_ref(dataset_ref: &DatasetRefAny, is_repo: impl Fn(&RepoName) -> bool) -> Self { + pub fn from_any_ref( + dataset_ref: &odf::DatasetRefAny, + is_repo: impl Fn(&odf::RepoName) -> bool, + ) -> Self { // Single-tenant workspace => treat all repo-like references as repos. // Multi-tenant workspace => treat all repo-like references as accounts, use // repo:// for repos @@ -189,7 +193,7 @@ impl PullRequest { } } - pub fn local_ref(&self) -> Option> { + pub fn local_ref(&self) -> Option> { match self { PullRequest::Local(local_ref) => Some(Cow::Borrowed(local_ref)), PullRequest::Remote(remote) => remote @@ -199,7 +203,7 @@ impl PullRequest { } } - pub fn remote_ref(&self) -> Option<&DatasetRefRemote> { + pub fn remote_ref(&self) -> Option<&odf::DatasetRefRemote> { match self { PullRequest::Local(_) => None, PullRequest::Remote(remote) => Some(&remote.remote_ref), @@ -210,26 +214,26 @@ impl PullRequest { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub trait PullItemCommon { - fn try_get_written_handle(&self) -> Option<&DatasetHandle>; - fn get_read_handles(&self) -> Vec<&DatasetHandle>; + fn try_get_written_handle(&self) -> Option<&odf::DatasetHandle>; + fn get_read_handles(&self) -> Vec<&odf::DatasetHandle>; } impl PullItemCommon for PullIngestItem { - fn try_get_written_handle(&self) -> Option<&DatasetHandle> { + fn try_get_written_handle(&self) -> Option<&odf::DatasetHandle> { Some(self.target.get_handle()) } - fn get_read_handles(&self) -> Vec<&DatasetHandle> { + fn get_read_handles(&self) -> Vec<&odf::DatasetHandle> { vec![] } } impl PullItemCommon for PullTransformItem { - fn try_get_written_handle(&self) -> Option<&DatasetHandle> { + fn try_get_written_handle(&self) -> Option<&odf::DatasetHandle> { Some(self.target.get_handle()) } - fn get_read_handles(&self) -> Vec<&DatasetHandle> { + fn get_read_handles(&self) -> Vec<&odf::DatasetHandle> { let mut read_handles = Vec::new(); for hdl in self.plan.datasets_map.iterate_all_handles() { if hdl != self.target.get_handle() { @@ -241,14 +245,14 @@ impl PullItemCommon for PullTransformItem { } impl PullItemCommon for PullSyncItem { - fn try_get_written_handle(&self) -> Option<&DatasetHandle> { + fn try_get_written_handle(&self) -> Option<&odf::DatasetHandle> { match &self.local_target { PullLocalTarget::Existing(hdl) => Some(hdl), PullLocalTarget::ToCreate(_) => None, } } - fn get_read_handles(&self) -> Vec<&DatasetHandle> { + fn get_read_handles(&self) -> Vec<&odf::DatasetHandle> { vec![] } } @@ -261,9 +265,9 @@ pub struct PullResponse { /// recursive dependencies. pub maybe_original_request: Option, /// Local dataset handle, if resolved - pub maybe_local_ref: Option, + pub maybe_local_ref: Option, /// Destination reference, if resolved - pub maybe_remote_ref: Option, + pub maybe_remote_ref: Option, /// Result of the push operation pub result: Result, } @@ -317,8 +321,8 @@ pub trait PullMultiListener: Send + Sync { pub enum PullResult { UpToDate(PullResultUpToDate), Updated { - old_head: Option, - new_head: Multihash, + old_head: Option, + new_head: odf::Multihash, }, } @@ -389,7 +393,7 @@ pub enum PullError { NotFound( #[from] #[backtrace] - DatasetNotFoundError, + odf::dataset::DatasetNotFoundError, ), #[error("Cannot choose between multiple pull aliases")] @@ -430,7 +434,7 @@ pub enum PullError { Access( #[from] #[backtrace] - AccessError, + odf::metadata::AccessError, ), #[error(transparent)] diff --git a/src/domain/core/src/services/push_request_planner.rs b/src/domain/core/src/services/push_request_planner.rs index 6af197771..026d9da47 100644 --- a/src/domain/core/src/services/push_request_planner.rs +++ b/src/domain/core/src/services/push_request_planner.rs @@ -8,8 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_dataset::{DatasetNotFoundError, GetDatasetError}; -use odf_metadata::*; use thiserror::Error; use super::sync_service::*; @@ -23,8 +21,8 @@ use super::{RemoteTarget, RepositoryNotFoundError}; pub trait PushRequestPlanner: Send + Sync { async fn collect_plan( &self, - dataset_handles: &[DatasetHandle], - push_target: Option<&DatasetPushTarget>, + dataset_handles: &[odf::DatasetHandle], + push_target: Option<&odf::DatasetPushTarget>, ) -> (Vec, Vec); } @@ -32,9 +30,9 @@ pub trait PushRequestPlanner: Send + Sync { #[derive(Debug, Eq, PartialEq)] pub struct PushItem { - pub local_handle: DatasetHandle, + pub local_handle: odf::DatasetHandle, pub remote_target: RemoteTarget, - pub push_target: Option, + pub push_target: Option, } impl PushItem { @@ -51,9 +49,9 @@ impl PushItem { pub struct PushResponse { /// Local dataset handle, if resolved - pub local_handle: Option, + pub local_handle: Option, /// Destination reference, if resolved - pub target: Option, + pub target: Option, /// Result of the push operation pub result: Result, } @@ -96,7 +94,7 @@ pub struct PushMultiOptions { /// Sync options pub sync_options: SyncOptions, /// Destination reference, if resolved - pub remote_target: Option, + pub remote_target: Option, } impl Default for PushMultiOptions { @@ -121,7 +119,7 @@ pub enum PushError { SourceNotFound( #[from] #[backtrace] - DatasetNotFoundError, + odf::dataset::DatasetNotFoundError, ), #[error("Destination is not specified and there is no associated push alias")] NoTarget, @@ -147,11 +145,11 @@ pub enum PushError { ), } -impl From for PushError { - fn from(v: GetDatasetError) -> Self { +impl From for PushError { + fn from(v: odf::dataset::GetDatasetError) -> Self { match v { - GetDatasetError::NotFound(e) => e.into(), - GetDatasetError::Internal(e) => e.into(), + odf::dataset::GetDatasetError::NotFound(e) => e.into(), + odf::dataset::GetDatasetError::Internal(e) => e.into(), } } } diff --git a/src/domain/core/src/services/query_service.rs b/src/domain/core/src/services/query_service.rs index cc402156d..8aff8469e 100644 --- a/src/domain/core/src/services/query_service.rs +++ b/src/domain/core/src/services/query_service.rs @@ -14,8 +14,6 @@ use datafusion::arrow; use datafusion::parquet::schema::types::Type; use datafusion::prelude::{DataFrame, SessionContext}; use internal_error::InternalError; -use odf_dataset::{DatasetNotFoundError, GetDatasetError}; -use odf_metadata::*; use thiserror::Error; use crate::auth::DatasetActionUnauthorizedError; @@ -43,7 +41,7 @@ pub trait QueryService: Send + Sync { /// ``` async fn tail( &self, - dataset_ref: &DatasetRef, + dataset_ref: &odf::DatasetRef, skip: u64, limit: u64, ) -> Result; @@ -61,21 +59,21 @@ pub trait QueryService: Send + Sync { /// already defined by this moment, `None` otherwise async fn get_schema( &self, - dataset_ref: &DatasetRef, + dataset_ref: &odf::DatasetRef, ) -> Result, QueryError>; /// Returns parquet schema of the last data file in a given dataset, if any /// files were written, `None` otherwise async fn get_schema_parquet_file( &self, - dataset_ref: &DatasetRef, + dataset_ref: &odf::DatasetRef, ) -> Result, QueryError>; // TODO: Introduce additional options that could be used to narrow down the // number of files we collect to construct the dataframe. // /// Returns a [DataFrame] representing the contents of an entire dataset - async fn get_data(&self, dataset_ref: &DatasetRef) -> Result; + async fn get_data(&self, dataset_ref: &odf::DatasetRef) -> Result; /// Lists engines known to the system and recommended for use async fn get_known_engines(&self) -> Result, InternalError>; @@ -91,7 +89,7 @@ pub struct QueryOptions { /// provided the table names in the query will be treated as dataset /// references and resolved as normally in the context of the calling /// user. - pub input_datasets: BTreeMap, + pub input_datasets: BTreeMap, } #[derive(Debug, Clone, Default)] @@ -103,7 +101,7 @@ pub struct QueryOptionsDataset { /// execution. This is used to achieve full reproducibility of queries /// as no matter what updates happen in the datasets - the query will /// only consider a specific subset of the data ledger. - pub block_hash: Option, + pub block_hash: Option, /// Hints that can help the system to minimize metadata scanning. Be extra /// careful that your hints don't influence the actual result of the /// query, as they are not inlcuded in the [`QueryState`] and thus can @@ -135,7 +133,7 @@ pub struct QueryResponse { #[derive(Debug, Clone)] pub struct QueryState { /// State of the input datasets used in the query - pub input_datasets: BTreeMap, + pub input_datasets: BTreeMap, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -144,7 +142,7 @@ pub struct QueryStateDataset { pub alias: String, /// Last block hash that was considered during the /// query planning - pub block_hash: Multihash, + pub block_hash: odf::Multihash, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -181,7 +179,7 @@ pub enum CreateSessionError { Access( #[from] #[backtrace] - AccessError, + odf::metadata::AccessError, ), #[error(transparent)] Internal( @@ -197,7 +195,7 @@ pub enum QueryError { DatasetNotFound( #[from] #[backtrace] - DatasetNotFoundError, + odf::dataset::DatasetNotFoundError, ), #[error(transparent)] DatasetBlockNotFound( @@ -221,7 +219,7 @@ pub enum QueryError { Access( #[from] #[backtrace] - AccessError, + odf::metadata::AccessError, ), #[error(transparent)] Internal( @@ -238,12 +236,12 @@ pub enum QueryError { #[derive(Error, Clone, PartialEq, Eq, Debug)] #[error("Dataset {dataset_id} does not have a block {block_hash}")] pub struct DatasetBlockNotFoundError { - pub dataset_id: DatasetID, - pub block_hash: Multihash, + pub dataset_id: odf::DatasetID, + pub block_hash: odf::Multihash, } impl DatasetBlockNotFoundError { - pub fn new(dataset_id: DatasetID, block_hash: Multihash) -> Self { + pub fn new(dataset_id: odf::DatasetID, block_hash: odf::Multihash) -> Self { Self { dataset_id, block_hash, @@ -277,16 +275,16 @@ impl From for QueryError { #[derive(Error, Clone, PartialEq, Eq, Debug)] #[error("Dataset schema is not yet available: {dataset_ref}")] pub struct DatasetSchemaNotAvailableError { - pub dataset_ref: DatasetRef, + pub dataset_ref: odf::DatasetRef, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -impl From for QueryError { - fn from(v: GetDatasetError) -> Self { +impl From for QueryError { + fn from(v: odf::dataset::GetDatasetError) -> Self { match v { - GetDatasetError::NotFound(e) => Self::DatasetNotFound(e), - GetDatasetError::Internal(e) => Self::Internal(e), + odf::dataset::GetDatasetError::NotFound(e) => Self::DatasetNotFound(e), + odf::dataset::GetDatasetError::Internal(e) => Self::Internal(e), } } } diff --git a/src/domain/core/src/services/remote_aliases.rs b/src/domain/core/src/services/remote_aliases.rs index 0c1114494..029e4b01c 100644 --- a/src/domain/core/src/services/remote_aliases.rs +++ b/src/domain/core/src/services/remote_aliases.rs @@ -9,7 +9,6 @@ use async_trait::async_trait; use internal_error::InternalError; -use odf_metadata::DatasetRefRemote; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RemoteAliasKind { @@ -22,21 +21,21 @@ pub trait RemoteAliases: Send + Sync { fn get_by_kind<'a>( &'a self, kind: RemoteAliasKind, - ) -> Box + 'a>; + ) -> Box + 'a>; - fn contains(&self, remote_ref: &DatasetRefRemote, kind: RemoteAliasKind) -> bool; + fn contains(&self, remote_ref: &odf::DatasetRefRemote, kind: RemoteAliasKind) -> bool; fn is_empty(&self, kind: RemoteAliasKind) -> bool; async fn add( &mut self, - remote_ref: &DatasetRefRemote, + remote_ref: &odf::DatasetRefRemote, kind: RemoteAliasKind, ) -> Result; async fn delete( &mut self, - remote_ref: &DatasetRefRemote, + remote_ref: &odf::DatasetRefRemote, kind: RemoteAliasKind, ) -> Result; diff --git a/src/domain/core/src/services/remote_aliases_registry.rs b/src/domain/core/src/services/remote_aliases_registry.rs index 48d301007..ec5c9f41d 100644 --- a/src/domain/core/src/services/remote_aliases_registry.rs +++ b/src/domain/core/src/services/remote_aliases_registry.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_metadata::{AccountName, DatasetHandle, DatasetName, DatasetPushTarget, RepoName}; use thiserror::Error; use crate::*; @@ -20,7 +19,7 @@ use crate::*; pub trait RemoteAliasesRegistry: Send + Sync { async fn get_remote_aliases( &self, - dataset_handle: &DatasetHandle, + dataset_handle: &odf::DatasetHandle, ) -> Result, GetAliasesError>; } @@ -47,8 +46,8 @@ pub trait RemoteAliasResolver: Send + Sync { // try to resolve via repository registry async fn resolve_push_target( &self, - dataset_handle: &DatasetHandle, - dataset_push_target_maybe: Option, + dataset_handle: &odf::DatasetHandle, + dataset_push_target_maybe: Option, ) -> Result; } @@ -57,17 +56,17 @@ pub trait RemoteAliasResolver: Send + Sync { #[derive(Debug, Clone, Eq, PartialEq)] pub struct RemoteTarget { pub url: url::Url, - pub repo_name: Option, - pub dataset_name: Option, - pub account_name: Option, + pub repo_name: Option, + pub dataset_name: Option, + pub account_name: Option, } impl RemoteTarget { pub fn new( url: url::Url, - repo_name: Option, - dataset_name: Option, - account_name: Option, + repo_name: Option, + dataset_name: Option, + account_name: Option, ) -> Self { Self { url, diff --git a/src/domain/core/src/services/remote_repository_registry.rs b/src/domain/core/src/services/remote_repository_registry.rs index dd57dc84a..62c7a376c 100644 --- a/src/domain/core/src/services/remote_repository_registry.rs +++ b/src/domain/core/src/services/remote_repository_registry.rs @@ -11,8 +11,6 @@ use ::serde::{Deserialize, Serialize}; use ::serde_with::skip_serializing_none; use async_trait::async_trait; use internal_error::InternalError; -use odf_dataset::UnsupportedProtocolError; -use odf_metadata::*; use thiserror::Error; use url::Url; @@ -29,13 +27,16 @@ pub struct RepositoryAccessInfo { #[async_trait] pub trait RemoteRepositoryRegistry: Send + Sync { - fn get_all_repositories<'s>(&'s self) -> Box + 's>; + fn get_all_repositories<'s>(&'s self) -> Box + 's>; - fn get_repository(&self, repo_name: &RepoName) -> Result; + fn get_repository( + &self, + repo_name: &odf::RepoName, + ) -> Result; - fn add_repository(&self, repo_name: &RepoName, url: Url) -> Result<(), AddRepoError>; + fn add_repository(&self, repo_name: &odf::RepoName, url: Url) -> Result<(), AddRepoError>; - fn delete_repository(&self, repo_name: &RepoName) -> Result<(), DeleteRepoError>; + fn delete_repository(&self, repo_name: &odf::RepoName) -> Result<(), DeleteRepoError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -64,7 +65,7 @@ pub enum AddRepoError { UnsupportedProtocol( #[from] #[backtrace] - UnsupportedProtocolError, + odf::dataset::UnsupportedProtocolError, ), #[error(transparent)] AlreadyExists( @@ -103,7 +104,7 @@ pub enum DeleteRepoError { #[derive(Error, Clone, Eq, PartialEq, Debug)] #[error("Repository {repo_name} does not exist")] pub struct RepositoryNotFoundError { - pub repo_name: RepoName, + pub repo_name: odf::RepoName, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -111,5 +112,5 @@ pub struct RepositoryNotFoundError { #[derive(Error, Clone, Eq, PartialEq, Debug)] #[error("Repository {repo_name} already exists")] pub struct RepositoryAlreadyExistsError { - pub repo_name: RepoName, + pub repo_name: odf::RepoName, } diff --git a/src/domain/core/src/services/remote_status_service.rs b/src/domain/core/src/services/remote_status_service.rs index bf4efa4cb..dc6929512 100644 --- a/src/domain/core/src/services/remote_status_service.rs +++ b/src/domain/core/src/services/remote_status_service.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_metadata as odf; use thiserror::Error; use crate::utils::metadata_chain_comparator::CompareChainsResult; diff --git a/src/domain/core/src/services/reset/reset_executor.rs b/src/domain/core/src/services/reset/reset_executor.rs index 939c40801..99db8f502 100644 --- a/src/domain/core/src/services/reset/reset_executor.rs +++ b/src/domain/core/src/services/reset/reset_executor.rs @@ -8,8 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_dataset::SetChainRefError; -use odf_metadata as odf; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -38,7 +36,7 @@ pub struct ResetResult { #[derive(Debug, Error)] pub enum ResetExecutionError { #[error(transparent)] - SetReferenceFailed(#[from] SetChainRefError), + SetReferenceFailed(#[from] odf::dataset::SetChainRefError), #[error(transparent)] Internal(#[from] InternalError), diff --git a/src/domain/core/src/services/reset/reset_planner.rs b/src/domain/core/src/services/reset/reset_planner.rs index 32034fd34..c3c3ad2aa 100644 --- a/src/domain/core/src/services/reset/reset_planner.rs +++ b/src/domain/core/src/services/reset/reset_planner.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_metadata as odf; use thiserror::Error; use crate::ResolvedDataset; diff --git a/src/domain/core/src/services/resource_loader.rs b/src/domain/core/src/services/resource_loader.rs index 7a822c8dd..67b2cd9ac 100644 --- a/src/domain/core/src/services/resource_loader.rs +++ b/src/domain/core/src/services/resource_loader.rs @@ -10,7 +10,6 @@ use std::backtrace::Backtrace; use internal_error::{BoxedError, InternalError}; -use odf_metadata::DatasetSnapshot; use thiserror::Error; #[async_trait::async_trait] @@ -18,17 +17,17 @@ pub trait ResourceLoader: Send + Sync { async fn load_dataset_snapshot_from_path( &self, path: &std::path::Path, - ) -> Result; + ) -> Result; async fn load_dataset_snapshot_from_url( &self, url: &url::Url, - ) -> Result; + ) -> Result; async fn load_dataset_snapshot_from_ref( &self, sref: &str, - ) -> Result; + ) -> Result; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/services/search_service.rs b/src/domain/core/src/services/search_service.rs index e16cd64da..5f67f0f25 100644 --- a/src/domain/core/src/services/search_service.rs +++ b/src/domain/core/src/services/search_service.rs @@ -8,8 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::{BoxedError, InternalError}; -use odf_dataset::UnsupportedProtocolError; -use odf_metadata::*; use thiserror::Error; use crate::*; @@ -29,7 +27,7 @@ pub trait SearchService: Send + Sync { #[derive(Debug, Clone, Default)] pub struct SearchOptions { - pub repository_names: Vec, + pub repository_names: Vec, } #[derive(Debug, Clone, Default, PartialEq, Eq)] @@ -39,9 +37,9 @@ pub struct SearchResult { #[derive(Debug, Clone, PartialEq, Eq)] pub struct SearchResultDataset { - pub id: Option, - pub alias: DatasetAliasRemote, - pub kind: Option, + pub id: Option, + pub alias: odf::DatasetAliasRemote, + pub kind: Option, pub num_blocks: Option, pub num_records: Option, pub estimated_size: Option, @@ -69,13 +67,13 @@ pub enum SearchError { UnsupportedProtocol( #[from] #[backtrace] - UnsupportedProtocolError, + odf::dataset::UnsupportedProtocolError, ), #[error(transparent)] Access( #[from] #[backtrace] - AccessError, + odf::metadata::AccessError, ), #[error(transparent)] Internal( diff --git a/src/domain/core/src/services/sync_service.rs b/src/domain/core/src/services/sync_service.rs index 75976776c..9291b467b 100644 --- a/src/domain/core/src/services/sync_service.rs +++ b/src/domain/core/src/services/sync_service.rs @@ -10,16 +10,6 @@ use std::sync::Arc; use internal_error::{BoxedError, InternalError}; -use odf_dataset::{ - BuildDatasetError, - CreateDatasetError, - Dataset, - DatasetVisibility, - GetDatasetError, - RefCollisionError, - UnsupportedProtocolError, -}; -use odf_metadata::*; use thiserror::Error; use url::Url; @@ -55,7 +45,7 @@ pub struct SyncRequest { #[derive(Debug, Clone)] pub enum SyncRef { Local(ResolvedDataset), - LocalNew(DatasetAlias), + LocalNew(odf::DatasetAlias), Remote(SyncRefRemote), } @@ -68,16 +58,16 @@ impl SyncRef { } // If remote, refers to resolved repository URL - pub fn as_internal_any_ref(&self) -> DatasetRefAny { + pub fn as_internal_any_ref(&self) -> odf::DatasetRefAny { match self { Self::Local(local_ref) => local_ref.get_handle().as_any_ref(), Self::LocalNew(alias) => alias.as_any_ref(), - Self::Remote(remote_ref) => DatasetRefAny::Url(remote_ref.url.clone()), + Self::Remote(remote_ref) => odf::DatasetRefAny::Url(remote_ref.url.clone()), } } // If remote, returns the original unresolved ref - pub fn as_user_friendly_any_ref(&self) -> DatasetRefAny { + pub fn as_user_friendly_any_ref(&self) -> odf::DatasetRefAny { match self { Self::Local(local_ref) => local_ref.get_handle().as_any_ref(), Self::LocalNew(alias) => alias.as_any_ref(), @@ -89,8 +79,8 @@ impl SyncRef { #[derive(Clone)] pub struct SyncRefRemote { pub url: Arc, - pub dataset: Arc, - pub original_remote_ref: DatasetRefRemote, + pub dataset: Arc, + pub original_remote_ref: odf::DatasetRefRemote, } impl std::fmt::Debug for SyncRefRemote { @@ -118,7 +108,7 @@ pub struct SyncOptions { pub force: bool, /// Dataset visibility, in case of initial pushing - pub dataset_visibility: DatasetVisibility, + pub dataset_visibility: odf::DatasetVisibility, } impl Default for SyncOptions { @@ -127,7 +117,7 @@ impl Default for SyncOptions { trust_source: None, create_if_not_exists: true, force: false, - dataset_visibility: DatasetVisibility::Private, + dataset_visibility: odf::DatasetVisibility::Private, } } } @@ -138,8 +128,8 @@ impl Default for SyncOptions { pub enum SyncResult { UpToDate, Updated { - old_head: Option, - new_head: Multihash, + old_head: Option, + new_head: odf::Multihash, num_blocks: u64, }, } @@ -204,8 +194,8 @@ impl SyncListener for NullSyncListener {} pub trait SyncMultiListener: Send + Sync { fn begin_sync( &self, - _src: &DatasetRefAny, - _dst: &DatasetRefAny, + _src: &odf::DatasetRefAny, + _dst: &odf::DatasetRefAny, ) -> Option> { None } @@ -223,11 +213,11 @@ pub enum SyncError { #[error(transparent)] DatasetNotFound(#[from] DatasetAnyRefUnresolvedError), #[error(transparent)] - RefCollision(#[from] RefCollisionError), + RefCollision(#[from] odf::dataset::RefCollisionError), #[error(transparent)] - CreateDatasetFailed(#[from] CreateDatasetError), + CreateDatasetFailed(#[from] odf::dataset::CreateDatasetError), #[error(transparent)] - UnsupportedProtocol(#[from] UnsupportedProtocolError), + UnsupportedProtocol(#[from] odf::dataset::UnsupportedProtocolError), #[error(transparent)] UnsupportedIpfsStorageType(#[from] UnsupportedIpfsStorageTypeError), #[error(transparent)] @@ -245,13 +235,13 @@ pub enum SyncError { DestinationAhead(#[from] DestinationAheadError), #[error(transparent)] Corrupted(#[from] CorruptedSourceError), - #[error("Dataset was updated concurrently")] + #[error("odf::Dataset was updated concurrently")] UpdatedConcurrently(#[source] BoxedError), #[error(transparent)] Access( #[from] #[backtrace] - AccessError, + odf::metadata::AccessError, ), #[error(transparent)] Internal( @@ -279,13 +269,13 @@ pub enum IpfsAddError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Error, Clone, Eq, PartialEq, Debug)] -#[error("Dataset {dataset_ref} not found")] +#[error("odf::Dataset {dataset_ref} not found")] pub struct DatasetAnyRefUnresolvedError { - pub dataset_ref: DatasetRefAny, + pub dataset_ref: odf::DatasetRefAny, } impl DatasetAnyRefUnresolvedError { - pub fn new(r: impl Into) -> Self { + pub fn new(r: impl Into) -> Self { Self { dataset_ref: r.into(), } @@ -296,8 +286,8 @@ impl DatasetAnyRefUnresolvedError { #[derive(Error, Clone, Eq, PartialEq, Debug)] pub struct DatasetsDivergedError { - pub src_head: Multihash, - pub dst_head: Multihash, + pub src_head: odf::Multihash, + pub dst_head: odf::Multihash, pub detail: Option, } @@ -337,8 +327,8 @@ pub struct DatasetsDivergedErrorDetail { "Destination head {dst_head} is ahead of source head {src_head} by {dst_ahead_size} blocks" )] pub struct DestinationAheadError { - pub src_head: Multihash, - pub dst_head: Multihash, + pub src_head: odf::Multihash, + pub dst_head: odf::Multihash, pub dst_ahead_size: usize, } @@ -354,13 +344,15 @@ pub struct CorruptedSourceError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -impl From for SyncError { - fn from(v: GetDatasetError) -> Self { +impl From for SyncError { + fn from(v: odf::dataset::GetDatasetError) -> Self { match v { - GetDatasetError::NotFound(e) => Self::DatasetNotFound(DatasetAnyRefUnresolvedError { - dataset_ref: e.dataset_ref.into(), - }), - GetDatasetError::Internal(e) => Self::Internal(e), + odf::dataset::GetDatasetError::NotFound(e) => { + Self::DatasetNotFound(DatasetAnyRefUnresolvedError { + dataset_ref: e.dataset_ref.into(), + }) + } + odf::dataset::GetDatasetError::Internal(e) => Self::Internal(e), } } } @@ -374,11 +366,11 @@ impl From for SyncError { } } -impl From for SyncError { - fn from(v: BuildDatasetError) -> Self { +impl From for SyncError { + fn from(v: odf::dataset::BuildDatasetError) -> Self { match v { - BuildDatasetError::UnsupportedProtocol(e) => Self::UnsupportedProtocol(e), - BuildDatasetError::Internal(e) => Self::Internal(e), + odf::dataset::BuildDatasetError::UnsupportedProtocol(e) => Self::UnsupportedProtocol(e), + odf::dataset::BuildDatasetError::Internal(e) => Self::Internal(e), } } } @@ -405,7 +397,7 @@ impl From for SyncError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Error, Debug)] -#[error("Dataset storage type '{}' is unsupported for IPFS operations", url.scheme())] +#[error("odf::Dataset storage type '{}' is unsupported for IPFS operations", url.scheme())] pub struct UnsupportedIpfsStorageTypeError { pub url: Url, } diff --git a/src/domain/core/src/services/transform/transform_executor.rs b/src/domain/core/src/services/transform/transform_executor.rs index 81afd9b5c..22f214cdf 100644 --- a/src/domain/core/src/services/transform/transform_executor.rs +++ b/src/domain/core/src/services/transform/transform_executor.rs @@ -10,7 +10,6 @@ use std::sync::Arc; use internal_error::InternalError; -use odf_dataset::CommitError; use thiserror::Error; use super::TransformPlan; @@ -67,7 +66,7 @@ pub enum TransformExecuteError { CommitError( #[from] #[backtrace] - CommitError, + odf::dataset::CommitError, ), #[error(transparent)] Internal( diff --git a/src/domain/core/src/services/transform/transform_listener.rs b/src/domain/core/src/services/transform/transform_listener.rs index 12596b6a2..f270d4b86 100644 --- a/src/domain/core/src/services/transform/transform_listener.rs +++ b/src/domain/core/src/services/transform/transform_listener.rs @@ -9,8 +9,6 @@ use std::sync::Arc; -use odf_metadata::DatasetHandle; - use super::{TransformElaborateError, TransformExecuteError, TransformResult}; use crate::EngineProvisioningListener; @@ -35,7 +33,7 @@ impl TransformListener for NullTransformListener {} //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub trait TransformMultiListener: Send + Sync { - fn begin_transform(&self, _dataset: &DatasetHandle) -> Option> { + fn begin_transform(&self, _dataset: &odf::DatasetHandle) -> Option> { None } } diff --git a/src/domain/core/src/services/transform/transform_request_planner.rs b/src/domain/core/src/services/transform/transform_request_planner.rs index 501ebaa44..b86e6e2ce 100644 --- a/src/domain/core/src/services/transform/transform_request_planner.rs +++ b/src/domain/core/src/services/transform/transform_request_planner.rs @@ -10,16 +10,6 @@ use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::SchemaRef; use internal_error::InternalError; -use odf_dataset::{BlockRef, DatasetNotFoundError, GetDatasetError, InvalidIntervalError}; -use odf_metadata as odf; -use odf_storage::{ - BlockMalformedError, - BlockNotFoundError, - BlockVersionError, - GetBlockError, - GetRefError, - RefNotFoundError, -}; use thiserror::Error; use crate::engine::TransformRequestExt; @@ -61,11 +51,11 @@ pub struct TransformPreliminaryRequestExt { /// Identifies the output dataset pub dataset_handle: odf::DatasetHandle, /// Block reference to advance upon commit - pub block_ref: BlockRef, + pub block_ref: odf::BlockRef, /// Current head (for concurrency control) pub head: odf::Multihash, /// Transformation that will be applied to produce new data - pub transform: odf::Transform, + pub transform: odf::metadata::Transform, /// System time to use for new records pub system_time: DateTime, /// Expected data schema (if already defined) @@ -73,9 +63,12 @@ pub struct TransformPreliminaryRequestExt { /// Preceding record offset, if any pub prev_offset: Option, /// State of inputs - pub input_states: Vec<(odf::TransformInput, Option)>, + pub input_states: Vec<( + odf::metadata::TransformInput, + Option, + )>, /// Output dataset's vocabulary - pub vocab: odf::DatasetVocabulary, + pub vocab: odf::metadata::DatasetVocabulary, /// Previous checkpoint, if any pub prev_checkpoint: Option, } @@ -134,37 +127,37 @@ pub enum VerifyTransformPlanError { DatasetNotFound( #[from] #[backtrace] - DatasetNotFoundError, + odf::dataset::DatasetNotFoundError, ), #[error(transparent)] RefNotFound( #[from] #[backtrace] - RefNotFoundError, + odf::storage::RefNotFoundError, ), #[error(transparent)] BlockNotFound( #[from] #[backtrace] - BlockNotFoundError, + odf::storage::BlockNotFoundError, ), #[error(transparent)] BlockVersion( #[from] #[backtrace] - BlockVersionError, + odf::storage::BlockVersionError, ), #[error(transparent)] BlockMalformed( #[from] #[backtrace] - BlockMalformedError, + odf::storage::BlockMalformedError, ), #[error(transparent)] InvalidInterval( #[from] #[backtrace] - InvalidIntervalError, + odf::dataset::InvalidIntervalError, ), #[error(transparent)] InputSchemaNotDefined( @@ -194,37 +187,37 @@ pub enum VerifyTransformPlanError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -impl From for VerifyTransformPlanError { - fn from(v: GetDatasetError) -> Self { +impl From for VerifyTransformPlanError { + fn from(v: odf::dataset::GetDatasetError) -> Self { match v { - GetDatasetError::NotFound(e) => Self::DatasetNotFound(e), - GetDatasetError::Internal(e) => Self::Internal(e), + odf::dataset::GetDatasetError::NotFound(e) => Self::DatasetNotFound(e), + odf::dataset::GetDatasetError::Internal(e) => Self::Internal(e), } } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -impl From for VerifyTransformPlanError { - fn from(v: GetRefError) -> Self { +impl From for VerifyTransformPlanError { + fn from(v: odf::storage::GetRefError) -> Self { match v { - GetRefError::NotFound(e) => Self::RefNotFound(e), - GetRefError::Access(e) => Self::Access(e), - GetRefError::Internal(e) => Self::Internal(e), + odf::storage::GetRefError::NotFound(e) => Self::RefNotFound(e), + odf::storage::GetRefError::Access(e) => Self::Access(e), + odf::storage::GetRefError::Internal(e) => Self::Internal(e), } } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -impl From for VerifyTransformPlanError { - fn from(v: GetBlockError) -> Self { +impl From for VerifyTransformPlanError { + fn from(v: odf::storage::GetBlockError) -> Self { match v { - GetBlockError::NotFound(e) => Self::BlockNotFound(e), - GetBlockError::BlockVersion(e) => Self::BlockVersion(e), - GetBlockError::BlockMalformed(e) => Self::BlockMalformed(e), - GetBlockError::Access(e) => Self::Access(e), - GetBlockError::Internal(e) => Self::Internal(e), + odf::storage::GetBlockError::NotFound(e) => Self::BlockNotFound(e), + odf::storage::GetBlockError::BlockVersion(e) => Self::BlockVersion(e), + odf::storage::GetBlockError::BlockMalformed(e) => Self::BlockMalformed(e), + odf::storage::GetBlockError::Access(e) => Self::Access(e), + odf::storage::GetBlockError::Internal(e) => Self::Internal(e), } } } diff --git a/src/domain/core/src/services/transform/transform_types.rs b/src/domain/core/src/services/transform/transform_types.rs index 76ba0b885..732e262e8 100644 --- a/src/domain/core/src/services/transform/transform_types.rs +++ b/src/domain/core/src/services/transform/transform_types.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_metadata::{DatasetHandle, DatasetID, Multihash}; use thiserror::Error; use super::{ @@ -27,8 +26,8 @@ use super::{ pub enum TransformResult { UpToDate, Updated { - old_head: Multihash, - new_head: Multihash, + old_head: odf::Multihash, + new_head: odf::Multihash, }, } @@ -94,7 +93,7 @@ pub enum VerifyTransformError { #[derive(Debug, Error)] #[error("Dataset {dataset_handle} has not defined a schema yet")] pub struct InputSchemaNotDefinedError { - pub dataset_handle: DatasetHandle, + pub dataset_handle: odf::DatasetHandle, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -102,9 +101,9 @@ pub struct InputSchemaNotDefinedError { #[derive(Error, Debug)] #[error("Invalid block interval [{head}, {tail}) in input dataset '{input_dataset_id}'")] pub struct InvalidInputIntervalError { - pub input_dataset_id: DatasetID, - pub head: Multihash, - pub tail: Multihash, + pub input_dataset_id: odf::DatasetID, + pub head: odf::Multihash, + pub tail: odf::Multihash, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/services/verification_service.rs b/src/domain/core/src/services/verification_service.rs index dd1d014f2..8306f572a 100644 --- a/src/domain/core/src/services/verification_service.rs +++ b/src/domain/core/src/services/verification_service.rs @@ -11,16 +11,6 @@ use std::fmt::Display; use std::sync::Arc; use internal_error::{ErrorIntoInternal, InternalError}; -use odf_dataset::{DatasetNotFoundError, GetDatasetError, InvalidIntervalError, IterBlocksError}; -use odf_metadata as odf; -use odf_storage::{ - BlockMalformedError, - BlockNotFoundError, - BlockVersionError, - GetBlockError, - GetRefError, - RefNotFoundError, -}; use thiserror::Error; use crate::*; @@ -181,37 +171,37 @@ pub enum VerificationError { DatasetNotFound( #[from] #[backtrace] - DatasetNotFoundError, + odf::dataset::DatasetNotFoundError, ), #[error(transparent)] RefNotFound( #[from] #[backtrace] - RefNotFoundError, + odf::storage::RefNotFoundError, ), #[error(transparent)] BlockNotFound( #[from] #[backtrace] - BlockNotFoundError, + odf::storage::BlockNotFoundError, ), #[error(transparent)] BlockVersion( #[from] #[backtrace] - BlockVersionError, + odf::storage::BlockVersionError, ), #[error(transparent)] BlockMalformed( #[from] #[backtrace] - BlockMalformedError, + odf::storage::BlockMalformedError, ), #[error(transparent)] InvalidInterval( #[from] #[backtrace] - InvalidIntervalError, + odf::dataset::InvalidIntervalError, ), #[error("Data doesn't match metadata")] DataDoesNotMatchMetadata( @@ -251,35 +241,39 @@ pub enum VerificationError { ), } -impl From for VerificationError { - fn from(v: GetDatasetError) -> Self { +impl From for VerificationError { + fn from(v: odf::dataset::GetDatasetError) -> Self { match v { - GetDatasetError::NotFound(e) => VerificationError::DatasetNotFound(e), - GetDatasetError::Internal(e) => VerificationError::Internal(e), + odf::dataset::GetDatasetError::NotFound(e) => VerificationError::DatasetNotFound(e), + odf::dataset::GetDatasetError::Internal(e) => VerificationError::Internal(e), } } } -impl From for VerificationError { - fn from(v: GetRefError) -> Self { +impl From for VerificationError { + fn from(v: odf::storage::GetRefError) -> Self { match v { - GetRefError::NotFound(e) => VerificationError::RefNotFound(e), - GetRefError::Access(e) => VerificationError::Internal(e.int_err()), - GetRefError::Internal(e) => VerificationError::Internal(e), + odf::storage::GetRefError::NotFound(e) => VerificationError::RefNotFound(e), + odf::storage::GetRefError::Access(e) => VerificationError::Internal(e.int_err()), + odf::storage::GetRefError::Internal(e) => VerificationError::Internal(e), } } } -impl From for VerificationError { - fn from(v: IterBlocksError) -> Self { +impl From for VerificationError { + fn from(v: odf::dataset::IterBlocksError) -> Self { match v { - IterBlocksError::RefNotFound(e) => VerificationError::RefNotFound(e), - IterBlocksError::BlockNotFound(e) => VerificationError::BlockNotFound(e), - IterBlocksError::BlockVersion(e) => VerificationError::BlockVersion(e), - IterBlocksError::BlockMalformed(e) => VerificationError::BlockMalformed(e), - IterBlocksError::InvalidInterval(e) => VerificationError::InvalidInterval(e), - IterBlocksError::Access(e) => VerificationError::Internal(e.int_err()), - IterBlocksError::Internal(e) => VerificationError::Internal(e), + odf::dataset::IterBlocksError::RefNotFound(e) => VerificationError::RefNotFound(e), + odf::dataset::IterBlocksError::BlockNotFound(e) => VerificationError::BlockNotFound(e), + odf::dataset::IterBlocksError::BlockVersion(e) => VerificationError::BlockVersion(e), + odf::dataset::IterBlocksError::BlockMalformed(e) => { + VerificationError::BlockMalformed(e) + } + odf::dataset::IterBlocksError::InvalidInterval(e) => { + VerificationError::InvalidInterval(e) + } + odf::dataset::IterBlocksError::Access(e) => VerificationError::Internal(e.int_err()), + odf::dataset::IterBlocksError::Internal(e) => VerificationError::Internal(e), } } } @@ -293,14 +287,14 @@ impl From for VerificationError { } } -impl From for VerificationError { - fn from(v: GetBlockError) -> Self { +impl From for VerificationError { + fn from(v: odf::storage::GetBlockError) -> Self { match v { - GetBlockError::NotFound(e) => Self::BlockNotFound(e), - GetBlockError::BlockVersion(e) => Self::BlockVersion(e), - GetBlockError::BlockMalformed(e) => Self::BlockMalformed(e), - GetBlockError::Access(e) => Self::Internal(e.int_err()), - GetBlockError::Internal(e) => Self::Internal(e), + odf::storage::GetBlockError::NotFound(e) => Self::BlockNotFound(e), + odf::storage::GetBlockError::BlockVersion(e) => Self::BlockVersion(e), + odf::storage::GetBlockError::BlockMalformed(e) => Self::BlockMalformed(e), + odf::storage::GetBlockError::Access(e) => Self::Internal(e.int_err()), + odf::storage::GetBlockError::Internal(e) => Self::Internal(e), } } } diff --git a/src/domain/core/src/services/watermark/set_watermark_executor.rs b/src/domain/core/src/services/watermark/set_watermark_executor.rs index 4b65ad080..1093e5f18 100644 --- a/src/domain/core/src/services/watermark/set_watermark_executor.rs +++ b/src/domain/core/src/services/watermark/set_watermark_executor.rs @@ -16,7 +16,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_metadata as odf; use thiserror::Error; use crate::{ResolvedDataset, SetWatermarkPlan}; diff --git a/src/domain/core/src/use_cases/append_dataset_metadata_batch_use_case.rs b/src/domain/core/src/use_cases/append_dataset_metadata_batch_use_case.rs index 086ab7f82..2f2588bc8 100644 --- a/src/domain/core/src/use_cases/append_dataset_metadata_batch_use_case.rs +++ b/src/domain/core/src/use_cases/append_dataset_metadata_batch_use_case.rs @@ -9,18 +9,16 @@ use std::collections::VecDeque; -use odf_dataset::{AppendError, Dataset, HashedMetadataBlock}; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[async_trait::async_trait] pub trait AppendDatasetMetadataBatchUseCase: Send + Sync { async fn execute( &self, - dataset: &dyn Dataset, - new_blocks: VecDeque, + dataset: &dyn odf::Dataset, + new_blocks: VecDeque, force_update_if_diverged: bool, - ) -> Result<(), AppendError>; + ) -> Result<(), odf::dataset::AppendError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/use_cases/commit_dataset_event_use_case.rs b/src/domain/core/src/use_cases/commit_dataset_event_use_case.rs index c2a6034b1..f9abffbfa 100644 --- a/src/domain/core/src/use_cases/commit_dataset_event_use_case.rs +++ b/src/domain/core/src/use_cases/commit_dataset_event_use_case.rs @@ -7,19 +7,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use odf_dataset::{CommitError, CommitOpts, CommitResult}; -use odf_metadata::{DatasetHandle, MetadataEvent}; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[async_trait::async_trait] pub trait CommitDatasetEventUseCase: Send + Sync { async fn execute( &self, - dataset_handle: &DatasetHandle, - event: MetadataEvent, - opts: CommitOpts<'_>, - ) -> Result; + dataset_handle: &odf::DatasetHandle, + event: odf::MetadataEvent, + opts: odf::dataset::CommitOpts<'_>, + ) -> Result; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/use_cases/compact_dataset_use_case.rs b/src/domain/core/src/use_cases/compact_dataset_use_case.rs index caa834eb0..e6dd50c99 100644 --- a/src/domain/core/src/use_cases/compact_dataset_use_case.rs +++ b/src/domain/core/src/use_cases/compact_dataset_use_case.rs @@ -10,7 +10,6 @@ use std::sync::Arc; use internal_error::InternalError; -use odf_metadata as odf; use thiserror::Error; use crate::auth::DatasetActionUnauthorizedError; diff --git a/src/domain/core/src/use_cases/create_dataset_from_snapshot_use_case.rs b/src/domain/core/src/use_cases/create_dataset_from_snapshot_use_case.rs index 64d4a4b02..ff7999568 100644 --- a/src/domain/core/src/use_cases/create_dataset_from_snapshot_use_case.rs +++ b/src/domain/core/src/use_cases/create_dataset_from_snapshot_use_case.rs @@ -7,9 +7,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use odf_dataset::{CreateDatasetFromSnapshotError, CreateDatasetResult}; -use odf_metadata::DatasetSnapshot; - use crate::CreateDatasetUseCaseOptions; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -18,9 +15,9 @@ use crate::CreateDatasetUseCaseOptions; pub trait CreateDatasetFromSnapshotUseCase: Send + Sync { async fn execute( &self, - snapshot: DatasetSnapshot, + snapshot: odf::DatasetSnapshot, options: CreateDatasetUseCaseOptions, - ) -> Result; + ) -> Result; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/use_cases/create_dataset_use_case.rs b/src/domain/core/src/use_cases/create_dataset_use_case.rs index ee258fed1..2b61bc4bb 100644 --- a/src/domain/core/src/use_cases/create_dataset_use_case.rs +++ b/src/domain/core/src/use_cases/create_dataset_use_case.rs @@ -7,26 +7,23 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use odf_dataset::{CreateDatasetError, CreateDatasetResult, DatasetVisibility}; -use odf_metadata::{DatasetAlias, MetadataBlockTyped, Seed}; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[async_trait::async_trait] pub trait CreateDatasetUseCase: Send + Sync { async fn execute( &self, - dataset_alias: &DatasetAlias, - seed_block: MetadataBlockTyped, + dataset_alias: &odf::DatasetAlias, + seed_block: odf::MetadataBlockTyped, options: CreateDatasetUseCaseOptions, - ) -> Result; + ) -> Result; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Debug, Copy, Clone, Default)] pub struct CreateDatasetUseCaseOptions { - pub dataset_visibility: DatasetVisibility, + pub dataset_visibility: odf::DatasetVisibility, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/use_cases/delete_dataset_use_case.rs b/src/domain/core/src/use_cases/delete_dataset_use_case.rs index 374f03a56..ef9727a65 100644 --- a/src/domain/core/src/use_cases/delete_dataset_use_case.rs +++ b/src/domain/core/src/use_cases/delete_dataset_use_case.rs @@ -7,19 +7,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use odf_dataset::DeleteDatasetError; -use odf_metadata::{DatasetHandle, DatasetRef}; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[async_trait::async_trait] pub trait DeleteDatasetUseCase: Send + Sync { - async fn execute_via_ref(&self, dataset_ref: &DatasetRef) -> Result<(), DeleteDatasetError>; + async fn execute_via_ref( + &self, + dataset_ref: &odf::DatasetRef, + ) -> Result<(), odf::dataset::DeleteDatasetError>; async fn execute_via_handle( &self, - dataset_handle: &DatasetHandle, - ) -> Result<(), DeleteDatasetError>; + dataset_handle: &odf::DatasetHandle, + ) -> Result<(), odf::dataset::DeleteDatasetError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/use_cases/push_dataset_use_case.rs b/src/domain/core/src/use_cases/push_dataset_use_case.rs index 5585872a8..cf34ab9fe 100644 --- a/src/domain/core/src/use_cases/push_dataset_use_case.rs +++ b/src/domain/core/src/use_cases/push_dataset_use_case.rs @@ -10,7 +10,6 @@ use std::sync::Arc; use internal_error::InternalError; -use odf_metadata::DatasetHandle; use crate::{PushMultiOptions, PushResponse, SyncMultiListener}; @@ -20,7 +19,7 @@ use crate::{PushMultiOptions, PushResponse, SyncMultiListener}; pub trait PushDatasetUseCase: Send + Sync { async fn execute_multi( &self, - dataset_handles: Vec, + dataset_handles: Vec, options: PushMultiOptions, sync_listener: Option>, ) -> Result, InternalError>; diff --git a/src/domain/core/src/use_cases/rename_dataset_use_case.rs b/src/domain/core/src/use_cases/rename_dataset_use_case.rs index 2bf44a446..8c4637778 100644 --- a/src/domain/core/src/use_cases/rename_dataset_use_case.rs +++ b/src/domain/core/src/use_cases/rename_dataset_use_case.rs @@ -7,18 +7,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use odf_dataset::RenameDatasetError; -use odf_metadata::{DatasetName, DatasetRef}; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[async_trait::async_trait] pub trait RenameDatasetUseCase: Send + Sync { async fn execute( &self, - dataset_ref: &DatasetRef, - new_name: &DatasetName, - ) -> Result<(), RenameDatasetError>; + dataset_ref: &odf::DatasetRef, + new_name: &odf::DatasetName, + ) -> Result<(), odf::dataset::RenameDatasetError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/use_cases/reset_dataset_use_case.rs b/src/domain/core/src/use_cases/reset_dataset_use_case.rs index 4281f6ae1..f3127fe8d 100644 --- a/src/domain/core/src/use_cases/reset_dataset_use_case.rs +++ b/src/domain/core/src/use_cases/reset_dataset_use_case.rs @@ -8,7 +8,6 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use odf_metadata as odf; use thiserror::Error; use crate::auth::DatasetActionUnauthorizedError; diff --git a/src/domain/core/src/use_cases/set_watermark_use_case.rs b/src/domain/core/src/use_cases/set_watermark_use_case.rs index 0deed678c..8dc750800 100644 --- a/src/domain/core/src/use_cases/set_watermark_use_case.rs +++ b/src/domain/core/src/use_cases/set_watermark_use_case.rs @@ -9,7 +9,6 @@ use chrono::{DateTime, Utc}; use internal_error::InternalError; -use odf_metadata as odf; use thiserror::Error; use crate::auth::DatasetActionUnauthorizedError; diff --git a/src/domain/core/src/use_cases/verify_dataset_use_case.rs b/src/domain/core/src/use_cases/verify_dataset_use_case.rs index 460a794e8..3ba9ca9b9 100644 --- a/src/domain/core/src/use_cases/verify_dataset_use_case.rs +++ b/src/domain/core/src/use_cases/verify_dataset_use_case.rs @@ -9,8 +9,6 @@ use std::sync::Arc; -use odf_metadata::DatasetHandle; - use crate::{ VerificationListener, VerificationMultiListener, @@ -24,13 +22,13 @@ use crate::{ pub trait VerifyDatasetUseCase: Send + Sync { async fn execute( &self, - request: VerificationRequest, + request: VerificationRequest, maybe_listener: Option>, ) -> VerificationResult; async fn execute_multi( &self, - requests: Vec>, + requests: Vec>, maybe_multi_listener: Option>, ) -> Vec; } diff --git a/src/domain/core/src/utils/metadata_chain_comparator.rs b/src/domain/core/src/utils/metadata_chain_comparator.rs index f9f7dcc9d..72b0889f4 100644 --- a/src/domain/core/src/utils/metadata_chain_comparator.rs +++ b/src/domain/core/src/utils/metadata_chain_comparator.rs @@ -12,25 +12,6 @@ use std::convert::TryFrom; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use internal_error::*; -use odf_dataset::{ - AppendError, - AppendOpts, - BlockRef, - DynMetadataStream, - HashedMetadataBlock, - IterBlocksError, - MetadataChain, - SetChainRefError, - SetRefOpts, -}; -use odf_metadata as odf; -use odf_storage::{ - ContainsBlockError, - GetBlockError, - GetRefError, - MetadataBlockRepository, - ReferenceRepository, -}; use thiserror::Error; use crate::*; @@ -46,12 +27,14 @@ pub struct MetadataChainComparator {} // skip through long chains faster impl MetadataChainComparator { pub async fn compare_chains( - lhs_chain: &dyn MetadataChain, + lhs_chain: &dyn odf::MetadataChain, lhs_head: &odf::Multihash, - rhs_chain: &dyn MetadataChain, + rhs_chain: &dyn odf::MetadataChain, rhs_head: Option<&odf::Multihash>, listener: &dyn CompareChainsListener, ) -> Result { + use odf::MetadataChain; + // When source and destination point to the same block, chains are equal, no // further scanning required if Some(&lhs_head) == rhs_head.as_ref() { @@ -185,7 +168,8 @@ impl MetadataChainComparator { let ahead_size = ahead_sequence_number - expected_common_sequence_number; ahead_chain.expecting_to_read_blocks(ahead_size); - let ahead_blocks: Vec = ahead_chain + use odf::MetadataChain; + let ahead_blocks: Vec = ahead_chain .iter_blocks_interval(ahead_head, None, false) .take(usize::try_from(ahead_size).unwrap()) .try_collect() @@ -251,6 +235,7 @@ impl MetadataChainComparator { ); } + use odf::MetadataChain; let mut lhs_stream = lhs_chain.iter_blocks_interval(lhs_head, None, false); let mut rhs_stream = rhs_chain.iter_blocks_interval(rhs_head, None, false); @@ -296,10 +281,10 @@ impl MetadataChainComparator { pub enum CompareChainsResult { Equal, LhsAhead { - lhs_ahead_blocks: Vec, + lhs_ahead_blocks: Vec, }, LhsBehind { - rhs_ahead_blocks: Vec, + rhs_ahead_blocks: Vec, }, Divergence { uncommon_blocks_in_lhs: u64, @@ -312,7 +297,7 @@ pub enum CompareChainsResult { #[derive(Debug)] enum CommonAncestorCheck { Success { - ahead_blocks: Vec, + ahead_blocks: Vec, }, Failure { common_ancestor_sequence_number: Option, @@ -339,52 +324,56 @@ pub enum CompareChainsError { ), } -impl From for CompareChainsError { - fn from(v: GetBlockError) -> Self { +impl From for CompareChainsError { + fn from(v: odf::storage::GetBlockError) -> Self { match v { - GetBlockError::NotFound(e) => Self::Corrupted(CorruptedSourceError { - message: "Metadata chain is broken".to_owned(), - source: Some(e.into()), - }), - GetBlockError::BlockVersion(e) => Self::Corrupted(CorruptedSourceError { + odf::storage::GetBlockError::NotFound(e) => Self::Corrupted(CorruptedSourceError { message: "Metadata chain is broken".to_owned(), source: Some(e.into()), }), - GetBlockError::BlockMalformed(e) => Self::Corrupted(CorruptedSourceError { + odf::storage::GetBlockError::BlockVersion(e) => Self::Corrupted(CorruptedSourceError { message: "Metadata chain is broken".to_owned(), source: Some(e.into()), }), - GetBlockError::Access(e) => Self::Access(e), - GetBlockError::Internal(e) => Self::Internal(e), + odf::storage::GetBlockError::BlockMalformed(e) => { + Self::Corrupted(CorruptedSourceError { + message: "Metadata chain is broken".to_owned(), + source: Some(e.into()), + }) + } + odf::storage::GetBlockError::Access(e) => Self::Access(e), + odf::storage::GetBlockError::Internal(e) => Self::Internal(e), } } } -impl From for CompareChainsError { - fn from(v: IterBlocksError) -> Self { +impl From for CompareChainsError { + fn from(v: odf::dataset::IterBlocksError) -> Self { match v { - IterBlocksError::RefNotFound(e) => CompareChainsError::Internal(e.int_err()), - IterBlocksError::BlockNotFound(e) => { + odf::dataset::IterBlocksError::RefNotFound(e) => { + CompareChainsError::Internal(e.int_err()) + } + odf::dataset::IterBlocksError::BlockNotFound(e) => { CompareChainsError::Corrupted(CorruptedSourceError { message: "Metadata chain is broken".to_owned(), source: Some(e.into()), }) } - IterBlocksError::BlockVersion(e) => { + odf::dataset::IterBlocksError::BlockVersion(e) => { CompareChainsError::Corrupted(CorruptedSourceError { message: "Metadata chain is broken".to_owned(), source: Some(e.into()), }) } - IterBlocksError::BlockMalformed(e) => { + odf::dataset::IterBlocksError::BlockMalformed(e) => { CompareChainsError::Corrupted(CorruptedSourceError { message: "Metadata chain is broken".to_owned(), source: Some(e.into()), }) } - IterBlocksError::InvalidInterval(_) => unreachable!(), - IterBlocksError::Access(e) => CompareChainsError::Access(e), - IterBlocksError::Internal(e) => CompareChainsError::Internal(e), + odf::dataset::IterBlocksError::InvalidInterval(_) => unreachable!(), + odf::dataset::IterBlocksError::Access(e) => CompareChainsError::Access(e), + odf::dataset::IterBlocksError::Internal(e) => CompareChainsError::Internal(e), } } } @@ -392,14 +381,14 @@ impl From for CompareChainsError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct MetadataChainWithStats<'a> { - chain: &'a dyn MetadataChain, + chain: &'a dyn odf::MetadataChain, on_expected: Box, on_read: Box, } impl<'a> MetadataChainWithStats<'a> { fn new( - chain: &'a dyn MetadataChain, + chain: &'a dyn odf::MetadataChain, on_expected: impl Fn(u64) + Send + Sync + 'a, on_read: impl Fn(u64) + Send + Sync + 'a, ) -> Self { @@ -416,17 +405,26 @@ impl<'a> MetadataChainWithStats<'a> { } #[async_trait] -impl<'a> MetadataChain for MetadataChainWithStats<'a> { - async fn resolve_ref(&self, r: &BlockRef) -> Result { +impl<'a> odf::MetadataChain for MetadataChainWithStats<'a> { + async fn resolve_ref( + &self, + r: &odf::BlockRef, + ) -> Result { self.chain.resolve_ref(r).await } - async fn get_block(&self, hash: &odf::Multihash) -> Result { + async fn get_block( + &self, + hash: &odf::Multihash, + ) -> Result { (self.on_read)(1); self.chain.get_block(hash).await } - async fn contains_block(&self, hash: &odf::Multihash) -> Result { + async fn contains_block( + &self, + hash: &odf::Multihash, + ) -> Result { (self.on_read)(1); self.chain.contains_block(hash).await } @@ -436,7 +434,7 @@ impl<'a> MetadataChain for MetadataChainWithStats<'a> { head: &'b odf::Multihash, tail: Option<&'b odf::Multihash>, ignore_missing_tail: bool, - ) -> DynMetadataStream<'b> { + ) -> odf::dataset::DynMetadataStream<'b> { Box::pin( self.chain .iter_blocks_interval(head, tail, ignore_missing_tail) @@ -452,7 +450,7 @@ impl<'a> MetadataChain for MetadataChainWithStats<'a> { head: &'b odf::Multihash, tail: &'b odf::Multihash, ignore_missing_tail: bool, - ) -> DynMetadataStream<'b> { + ) -> odf::dataset::DynMetadataStream<'b> { Box::pin( self.chain .iter_blocks_interval_inclusive(head, tail, ignore_missing_tail) @@ -465,9 +463,9 @@ impl<'a> MetadataChain for MetadataChainWithStats<'a> { fn iter_blocks_interval_ref<'b>( &'b self, - head: &'b BlockRef, - tail: Option<&'b BlockRef>, - ) -> DynMetadataStream<'b> { + head: &'b odf::BlockRef, + tail: Option<&'b odf::BlockRef>, + ) -> odf::dataset::DynMetadataStream<'b> { Box::pin(self.chain.iter_blocks_interval_ref(head, tail).map(|v| { (self.on_read)(1); v @@ -476,26 +474,26 @@ impl<'a> MetadataChain for MetadataChainWithStats<'a> { async fn set_ref<'b>( &'b self, - r: &BlockRef, + r: &odf::BlockRef, hash: &odf::Multihash, - opts: SetRefOpts<'b>, - ) -> Result<(), SetChainRefError> { + opts: odf::dataset::SetRefOpts<'b>, + ) -> Result<(), odf::dataset::SetChainRefError> { self.chain.set_ref(r, hash, opts).await } async fn append<'b>( &'b self, block: odf::MetadataBlock, - opts: AppendOpts<'b>, - ) -> Result { + opts: odf::dataset::AppendOpts<'b>, + ) -> Result { self.chain.append(block, opts).await } - fn as_reference_repo(&self) -> &dyn ReferenceRepository { + fn as_reference_repo(&self) -> &dyn odf::storage::ReferenceRepository { self.chain.as_reference_repo() } - fn as_metadata_block_repository(&self) -> &dyn MetadataBlockRepository { + fn as_metadata_block_repository(&self) -> &dyn odf::storage::MetadataBlockRepository { self.chain.as_metadata_block_repository() } } diff --git a/src/domain/odf/odf/src/lib.rs b/src/domain/odf/odf/src/lib.rs index a70cf4efd..bc3b0f5b2 100644 --- a/src/domain/odf/odf/src/lib.rs +++ b/src/domain/odf/odf/src/lib.rs @@ -23,6 +23,7 @@ pub use odf_metadata::{ Checkpoint, DataSlice, DatasetAlias, + DatasetAliasRemote, DatasetHandle, DatasetID, DatasetKind,