Skip to content

Commit

Permalink
Using odf meta-crate in domain/core
Browse files Browse the repository at this point in the history
  • Loading branch information
zaychenko-sergei committed Dec 26, 2024
1 parent f636395 commit 5d1b16f
Show file tree
Hide file tree
Showing 54 changed files with 500 additions and 575 deletions.
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions src/domain/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ file-utils = { workspace = true }
internal-error = { workspace = true }
kamu-datasets = { workspace = true }
messaging-outbox = { workspace = true }
odf-metadata = { workspace = true }
odf-dataset = { workspace = true }
odf-storage = { workspace = true }
odf = { workspace = true }

async-trait = { version = "0.1", default-features = false }
bytes = { version = "1", default-features = false }
Expand Down
1 change: 0 additions & 1 deletion src/domain/core/src/auth/dataset_action_authorizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::str::FromStr;

use dill::*;
use internal_error::{ErrorIntoInternal, InternalError};
use odf_metadata as odf;
use thiserror::Error;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
18 changes: 8 additions & 10 deletions src/domain/core/src/entities/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use datafusion::arrow::datatypes::SchemaRef;
use datafusion::prelude::{DataFrame, SessionContext};
use file_utils::OwnedFile;
use internal_error::*;
use odf_dataset::BlockRef;
use odf_metadata as odf;
use thiserror::Error;

use crate::ResolvedDatasetsMap;
Expand Down Expand Up @@ -52,7 +50,7 @@ pub struct RawQueryRequestExt {
/// Data to be used in the query
pub input_data: DataFrame,
/// Defines the query to be performed
pub transform: odf::Transform,
pub transform: odf::metadata::Transform,
}

#[derive(Debug, Clone)]
Expand All @@ -77,11 +75,11 @@ pub struct TransformRequestExt {
/// Identifies the output dataset
pub dataset_handle: odf::DatasetHandle,
/// Block reference to advance upon commit
pub block_ref: BlockRef,
pub block_ref: odf::BlockRef,
/// Current head (for concurrency control)
pub head: odf::Multihash,
/// Transformation that will be applied to produce new data
pub transform: odf::Transform,
pub transform: odf::metadata::Transform,
/// System time to use for new records
pub system_time: DateTime<Utc>,
/// Expected data schema (if already defined)
Expand All @@ -91,7 +89,7 @@ pub struct TransformRequestExt {
/// Defines the input data
pub inputs: Vec<TransformRequestInputExt>,
/// Output dataset's vocabulary
pub vocab: odf::DatasetVocabulary,
pub vocab: odf::metadata::DatasetVocabulary,
/// Previous checkpoint, if any
pub prev_checkpoint: Option<odf::Multihash>,
}
Expand All @@ -103,7 +101,7 @@ pub struct TransformRequestInputExt {
/// An alias of this input to be used in queries
pub alias: String,
/// Input dataset's vocabulary
pub vocab: odf::DatasetVocabulary,
pub vocab: odf::metadata::DatasetVocabulary,
/// Last block of the input dataset that was previously incorporated into
/// the derivative transformation, if any. Must be equal to the last
/// non-empty `newBlockHash`. Together with `newBlockHash` defines a
Expand Down Expand Up @@ -131,13 +129,13 @@ pub struct TransformRequestInputExt {
/// List of data files that will be read
pub data_slices: Vec<odf::Multihash>,
/// TODO: remove?
pub explicit_watermarks: Vec<odf::Watermark>,
pub explicit_watermarks: Vec<odf::metadata::Watermark>,
}

#[derive(Debug)]
pub struct TransformResponseExt {
/// Data slice produced by the transaction, if any
pub new_offset_interval: Option<odf::OffsetInterval>,
pub new_offset_interval: Option<odf::metadata::OffsetInterval>,
/// Watermark advanced by the transaction, if any
pub new_watermark: Option<DateTime<Utc>>,
/// Schema of the output
Expand All @@ -150,7 +148,7 @@ pub struct TransformResponseExt {
pub new_data: Option<OwnedFile>,
}

impl From<TransformRequestInputExt> for odf::ExecuteTransformInput {
impl From<TransformRequestInputExt> for odf::metadata::ExecuteTransformInput {
fn from(val: TransformRequestInputExt) -> Self {
Self {
dataset_id: val.dataset_handle.id,
Expand Down
11 changes: 4 additions & 7 deletions src/domain/core/src/entities/resolved_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,20 @@

use std::sync::Arc;

use odf_dataset::{CreateDatasetResult, Dataset};
use odf_metadata::{self as odf};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Clone)]
pub struct ResolvedDataset {
dataset: Arc<dyn Dataset>,
dataset: Arc<dyn odf::Dataset>,
handle: odf::DatasetHandle,
}

impl ResolvedDataset {
pub fn new(dataset: Arc<dyn Dataset>, handle: odf::DatasetHandle) -> Self {
pub fn new(dataset: Arc<dyn odf::Dataset>, handle: odf::DatasetHandle) -> Self {
Self { dataset, handle }
}

pub fn from(create_dataset_result: &CreateDatasetResult) -> Self {
pub fn from(create_dataset_result: &odf::CreateDatasetResult) -> Self {
Self {
dataset: create_dataset_result.dataset.clone(),
handle: create_dataset_result.dataset_handle.clone(),
Expand Down Expand Up @@ -56,7 +53,7 @@ impl ResolvedDataset {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

impl std::ops::Deref for ResolvedDataset {
type Target = Arc<dyn Dataset>;
type Target = Arc<dyn odf::Dataset>;
fn deref(&self) -> &Self::Target {
&self.dataset
}
Expand Down
14 changes: 6 additions & 8 deletions src/domain/core/src/entities/resolved_datasets_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,28 @@

use std::collections::HashMap;

use odf_metadata::{DatasetHandle, DatasetID};

use crate::ResolvedDataset;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Default)]
pub struct ResolvedDatasetsMap {
resolved_datasets_by_id: HashMap<DatasetID, ResolvedDataset>,
resolved_datasets_by_id: HashMap<odf::DatasetID, ResolvedDataset>,
}

impl ResolvedDatasetsMap {
pub fn get_by_id(&self, id: &DatasetID) -> &ResolvedDataset {
pub fn get_by_id(&self, id: &odf::DatasetID) -> &ResolvedDataset {
self.resolved_datasets_by_id
.get(id)
.expect("Dataset must be present")
}

#[inline]
pub fn get_by_handle(&self, handle: &DatasetHandle) -> &ResolvedDataset {
pub fn get_by_handle(&self, handle: &odf::DatasetHandle) -> &ResolvedDataset {
self.get_by_id(&handle.id)
}

pub fn iterate_all_handles(&self) -> impl Iterator<Item = &DatasetHandle> {
pub fn iterate_all_handles(&self) -> impl Iterator<Item = &odf::DatasetHandle> {
self.resolved_datasets_by_id
.values()
.map(ResolvedDataset::get_handle)
Expand All @@ -50,8 +48,8 @@ impl ResolvedDatasetsMap {

pub fn register_with(
&mut self,
handle: &DatasetHandle,
dataset_fn: impl Fn(&DatasetHandle) -> ResolvedDataset,
handle: &odf::DatasetHandle,
dataset_fn: impl Fn(&odf::DatasetHandle) -> ResolvedDataset,
) {
if !self.resolved_datasets_by_id.contains_key(&handle.id) {
let resolved_dataset = dataset_fn(handle);
Expand Down
59 changes: 27 additions & 32 deletions src/domain/core/src/entities/writer_metadata_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,6 @@

use chrono::{DateTime, Utc};
use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal};
use odf_dataset::{
AcceptVisitorError,
BlockRef,
GenericCallbackVisitor,
MetadataChainExt,
MetadataChainVisitorExtInfallible,
MetadataVisitorDecision,
SearchAddDataVisitor,
SearchSeedVisitor,
SearchSetDataSchemaVisitor,
SearchSetVocabVisitor,
SearchSourceStateVisitor,
};
use odf_metadata as odf;

use crate::{PushSourceNotFoundError, ResolvedDataset, WriterSourceEventVisitor};

Expand All @@ -31,17 +17,17 @@ use crate::{PushSourceNotFoundError, ResolvedDataset, WriterSourceEventVisitor};
/// Contains a projection of the metadata needed for [`DataWriter`] to function
#[derive(Debug, Clone)]
pub struct DataWriterMetadataState {
pub block_ref: BlockRef,
pub block_ref: odf::BlockRef,
pub head: odf::Multihash,
pub schema: Option<odf::SetDataSchema>,
pub schema: Option<odf::metadata::SetDataSchema>,
pub source_event: Option<odf::MetadataEvent>,
pub merge_strategy: odf::MergeStrategy,
pub vocab: odf::DatasetVocabulary,
pub merge_strategy: odf::metadata::MergeStrategy,
pub vocab: odf::metadata::DatasetVocabulary,
pub data_slices: Vec<odf::Multihash>,
pub prev_offset: Option<u64>,
pub prev_checkpoint: Option<odf::Multihash>,
pub prev_watermark: Option<DateTime<Utc>>,
pub prev_source_state: Option<odf::SourceState>,
pub prev_source_state: Option<odf::metadata::SourceState>,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -60,25 +46,31 @@ impl DataWriterMetadataState {
)]
pub async fn build(
target: ResolvedDataset,
block_ref: &BlockRef,
block_ref: &odf::BlockRef,
source_name: Option<&str>,
) -> Result<Self, ScanMetadataError> {
// TODO: PERF: Full metadata scan below - this is expensive and should be
// improved using skip lists.

use odf::dataset::MetadataChainVisitorExtInfallible;

let head = target
.as_metadata_chain()
.resolve_ref(block_ref)
.await
.int_err()?;
let mut seed_visitor = SearchSeedVisitor::new().adapt_err();
let mut set_vocab_visitor = SearchSetVocabVisitor::new().adapt_err();
let mut set_data_schema_visitor = SearchSetDataSchemaVisitor::new().adapt_err();
let mut prev_source_state_visitor = SearchSourceStateVisitor::new(source_name).adapt_err();
let mut add_data_visitor = SearchAddDataVisitor::new().adapt_err();
let mut add_data_collection_visitor = GenericCallbackVisitor::new(
let mut seed_visitor = odf::dataset::SearchSeedVisitor::new().adapt_err();
let mut set_vocab_visitor = odf::dataset::SearchSetVocabVisitor::new().adapt_err();
let mut set_data_schema_visitor =
odf::dataset::SearchSetDataSchemaVisitor::new().adapt_err();
let mut prev_source_state_visitor =
odf::dataset::SearchSourceStateVisitor::new(source_name).adapt_err();
let mut add_data_visitor = odf::dataset::SearchAddDataVisitor::new().adapt_err();
let mut add_data_collection_visitor = odf::dataset::GenericCallbackVisitor::new(
Vec::new(),
MetadataVisitorDecision::NextOfType(odf::MetadataEventTypeFlags::ADD_DATA),
odf::dataset::MetadataVisitorDecision::NextOfType(
odf::metadata::MetadataEventTypeFlags::ADD_DATA,
),
|state, _, block| {
let odf::MetadataEvent::AddData(e) = &block.event else {
unreachable!()
Expand All @@ -88,12 +80,15 @@ impl DataWriterMetadataState {
state.push(output_data.physical_hash.clone());
}

MetadataVisitorDecision::NextOfType(odf::MetadataEventTypeFlags::ADD_DATA)
odf::dataset::MetadataVisitorDecision::NextOfType(
odf::metadata::MetadataEventTypeFlags::ADD_DATA,
)
},
)
.adapt_err();
let mut source_event_visitor = WriterSourceEventVisitor::new(source_name);

use odf::dataset::MetadataChainExt;
target
.as_metadata_chain()
.accept_by_hash(
Expand Down Expand Up @@ -169,11 +164,11 @@ pub enum ScanMetadataError {
),
}

impl From<AcceptVisitorError<ScanMetadataError>> for ScanMetadataError {
fn from(v: AcceptVisitorError<ScanMetadataError>) -> Self {
impl From<odf::dataset::AcceptVisitorError<ScanMetadataError>> for ScanMetadataError {
fn from(v: odf::dataset::AcceptVisitorError<ScanMetadataError>) -> Self {
match v {
AcceptVisitorError::Visitor(err) => err,
AcceptVisitorError::Traversal(err) => Self::Internal(err.int_err()),
odf::dataset::AcceptVisitorError::Visitor(err) => err,
odf::dataset::AcceptVisitorError::Traversal(err) => Self::Internal(err.int_err()),
}
}
}
Expand Down
Loading

0 comments on commit 5d1b16f

Please sign in to comment.