Skip to content

Commit

Permalink
Add compact in-memory representation of RecordRef
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Sep 20, 2023
1 parent 1d835a9 commit bd1f16d
Show file tree
Hide file tree
Showing 67 changed files with 1,074 additions and 244 deletions.
183 changes: 134 additions & 49 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"dozer-log-js",
"dozer-log-python",
"dozer-utils",
"dozer-recordstore",
]
resolver = "2"

Expand Down
1 change: 1 addition & 0 deletions dozer-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dozer-sql = { path = "../dozer-sql" }
dozer-types = { path = "../dozer-types" }
dozer-tracing = { path = "../dozer-tracing" }
dozer-storage = { path = "../dozer-storage" }
dozer-recordstore = { path = "../dozer-recordstore" }

uuid = { version = "1.3.0", features = ["v4", "serde"] }
tokio = { version = "1", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use dozer_core::{
epoch::Epoch,
executor_operation::ProcessorOperation,
node::{PortHandle, Sink, SinkFactory},
processor_record::ProcessorRecordStore,
DEFAULT_PORT_HANDLE,
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::{errors::internal::BoxedError, types::Schema};

#[derive(Debug)]
Expand Down
6 changes: 2 additions & 4 deletions dozer-cli/src/pipeline/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use dozer_core::{
epoch::Epoch,
executor_operation::ProcessorOperation,
node::{PortHandle, Sink, SinkFactory},
processor_record::ProcessorRecordStore,
DEFAULT_PORT_HANDLE,
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_tracing::LabelsAndProgress;
use dozer_types::indicatif::ProgressBar;
use dozer_types::types::Schema;
Expand Down Expand Up @@ -105,9 +105,7 @@ impl Sink for LogSink {
self.log
.lock()
.write(dozer_cache::dozer_log::replication::LogOperation::Op {
op: record_store
.load_operation(&op)
.map_err(Into::<BoxedError>::into)?,
op: op.load(record_store)?,
});
self.update_counter();
Ok(())
Expand Down
1 change: 1 addition & 0 deletions dozer-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dozer-log = { path = "../dozer-log/" }
dozer-storage = {path = "../dozer-storage/"}
dozer-types = {path = "../dozer-types/"}
dozer-tracing = {path = "../dozer-tracing/"}
dozer-recordstore = {path = "../dozer-recordstore/"}

uuid = {version = "1.3.0", features = ["v1", "v4", "fast-rng"]}
crossbeam = "0.8.2"
Expand Down
3 changes: 2 additions & 1 deletion dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use dozer_log::{
storage::{self, Object, Queue, Storage},
tokio::task::JoinHandle,
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::{
bincode,
log::{error, info},
Expand All @@ -18,7 +19,7 @@ use dozer_types::{
};
use tempdir::TempDir;

use crate::{errors::ExecutionError, processor_record::ProcessorRecordStore};
use crate::errors::ExecutionError;

#[derive(Debug)]
pub struct CheckpointFactory {
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/epoch/manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::node::{NodeHandle, SourceStates, TableState};
use dozer_types::parking_lot::Mutex;
use std::collections::HashMap;
Expand All @@ -7,7 +8,6 @@ use std::thread::sleep;
use std::time::{Duration, SystemTime};

use crate::checkpoint::{CheckpointFactory, CheckpointWriter};
use crate::processor_record::ProcessorRecordStore;

use super::EpochCommonInfo;

Expand Down
6 changes: 3 additions & 3 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::PathBuf;

use crate::node::PortHandle;
use dozer_storage::errors::StorageError;
use dozer_recordstore::RecordStoreError;
use dozer_types::errors::internal::BoxedError;
use dozer_types::node::NodeHandle;
use dozer_types::thiserror::Error;
Expand Down Expand Up @@ -35,8 +35,8 @@ pub enum ExecutionError {
Source(#[source] BoxedError),
#[error("File system error {0:?}: {1}")]
FileSystemError(PathBuf, #[source] std::io::Error),
#[error("Storage error: {0}")]
Storage(#[from] StorageError),
#[error("Recordstore error: {0}")]
RecordStore(#[from] RecordStoreError),
#[error("Object storage error: {0}")]
ObjectStorage(#[from] dozer_log::storage::Error),
#[error("Checkpoint writer thread panicked")]
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/execution_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use crate::{
executor_operation::ExecutorOperation,
hash_map_to_vec::insert_vec_element,
node::PortHandle,
processor_record::ProcessorRecordStore,
record_store::{create_record_writer, RecordWriter},
};
use crossbeam::channel::{bounded, Receiver, Sender};
use daggy::petgraph::{
visit::{EdgeRef, IntoEdges, IntoEdgesDirected, IntoNodeIdentifiers},
Direction,
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_tracing::LabelsAndProgress;

pub type SharedRecordWriter = Rc<RefCell<Option<Box<dyn RecordWriter>>>>;
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/processor_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use dozer_types::node::NodeHandle;
use crate::epoch::Epoch;
use crate::error_manager::ErrorManager;
use crate::executor_operation::{ExecutorOperation, ProcessorOperation};
use crate::processor_record::ProcessorRecordStore;
use crate::{
builder_dag::NodeKind,
errors::ExecutionError,
forwarder::ProcessorChannelManager,
node::{PortHandle, Processor},
};
use dozer_recordstore::ProcessorRecordStore;

use super::{execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop};

Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/receiver_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ mod tests {
types::{Field, Record},
};

use crate::processor_record::{ProcessorRecord, ProcessorRecordStore};
use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};

use super::*;

Expand Down
40 changes: 39 additions & 1 deletion dozer-core/src/executor_operation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{epoch::Epoch, processor_record::ProcessorRecord};
use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore};
use dozer_types::types::Operation;

use crate::{epoch::Epoch, errors::ExecutionError};

#[derive(Clone, Debug, PartialEq, Eq)]
/// A CDC event.
Expand All @@ -15,6 +18,41 @@ pub enum ProcessorOperation {
},
}

impl ProcessorOperation {
pub fn new(
op: &Operation,
record_store: &ProcessorRecordStore,
) -> Result<Self, ExecutionError> {
Ok(match op {
Operation::Delete { old } => ProcessorOperation::Delete {
old: record_store.create_record(old)?,
},
Operation::Insert { new } => ProcessorOperation::Insert {
new: record_store.create_record(new)?,
},
Operation::Update { old, new } => ProcessorOperation::Update {
old: record_store.create_record(old)?,
new: record_store.create_record(new)?,
},
})
}

pub fn load(&self, record_store: &ProcessorRecordStore) -> Result<Operation, ExecutionError> {
Ok(match self {
ProcessorOperation::Delete { old } => Operation::Delete {
old: record_store.load_record(old)?,
},
ProcessorOperation::Insert { new } => Operation::Insert {
new: record_store.load_record(new)?,
},
ProcessorOperation::Update { old, new } => Operation::Update {
old: record_store.load_record(old)?,
new: record_store.load_record(new)?,
},
})
}
}

#[derive(Clone, Debug)]
pub enum ExecutorOperation {
Op { op: ProcessorOperation },
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl SourceChannelManager {
);

self.manager.send_op(
self.epoch_manager.record_store().create_operation(&op)?,
ProcessorOperation::new(&op, self.epoch_manager.record_store())?,
port,
)?;
self.num_uncommitted_ops += 1;
Expand Down
1 change: 0 additions & 1 deletion dozer-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ pub mod executor_operation;
pub mod forwarder;
mod hash_map_to_vec;
pub mod node;
pub mod processor_record;
pub mod record_store;

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
use crate::epoch::Epoch;
use crate::executor_operation::ProcessorOperation;
use crate::processor_record::ProcessorRecordStore;
use dozer_recordstore::ProcessorRecordStore;

use dozer_log::storage::{Object, Queue};
use dozer_types::errors::internal::BoxedError;
Expand Down
51 changes: 0 additions & 51 deletions dozer-core/src/processor_record/mod.rs

This file was deleted.

7 changes: 3 additions & 4 deletions dozer-core/src/record_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::executor_operation::ProcessorOperation;
use crate::node::OutputPortType;
use crate::processor_record::{ProcessorRecord, ProcessorRecordStore};
use dozer_storage::errors::StorageError;
use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore, RecordStoreError};
use dozer_types::thiserror::{self, Error};
use dozer_types::types::Schema;
use std::collections::HashMap;
Expand All @@ -14,8 +13,8 @@ use super::node::PortHandle;
pub enum RecordWriterError {
#[error("Record not found")]
RecordNotFound,
#[error("Storage error: {0}")]
Storage(#[from] StorageError),
#[error("Recordstore error: {0}")]
RecordStore(#[from] RecordStoreError),
}

pub trait RecordWriter: Send + Sync {
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/tests/dag_base_create_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::executor::{DagExecutor, ExecutorOptions};
use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
};
use crate::processor_record::ProcessorRecordStore;
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
use dozer_recordstore::ProcessorRecordStore;

use crate::tests::dag_base_run::NoopProcessorFactory;
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
Source, SourceFactory, SourceState,
};
use crate::processor_record::ProcessorRecordStore;
use crate::tests::dag_base_run::NoopProcessorFactory;
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
use dozer_log::storage::{Object, Queue};
use dozer_log::tokio;
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::errors::internal::BoxedError;
use dozer_types::ingestion_types::IngestionMessage;
use dozer_types::node::{NodeHandle, OpIdentifier};
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/tests/dag_base_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::epoch::Epoch;
use crate::executor::{DagExecutor, ExecutorOptions};
use crate::executor_operation::ProcessorOperation;
use crate::node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory};
use crate::processor_record::ProcessorRecordStore;
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
use crate::tests::sources::{
DualPortGeneratorSourceFactory, GeneratorSourceFactory,
Expand All @@ -14,6 +13,7 @@ use crate::tests::sources::{
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
use dozer_log::storage::Object;
use dozer_log::tokio;
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::errors::internal::BoxedError;
use dozer_types::node::NodeHandle;
use dozer_types::types::Schema;
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/tests/dag_ports.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
};
use crate::processor_record::ProcessorRecordStore;
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::errors::internal::BoxedError;
use dozer_types::{node::NodeHandle, types::Schema};
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/tests/dag_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::node::{
OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, SinkFactory, Source,
SourceFactory,
};
use crate::processor_record::ProcessorRecordStore;
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
use dozer_recordstore::ProcessorRecordStore;

use dozer_types::errors::internal::BoxedError;
use dozer_types::node::NodeHandle;
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/tests/processors.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;

use dozer_recordstore::ProcessorRecordStore;
use dozer_types::{errors::internal::BoxedError, types::Schema};

use crate::{
node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
processor_record::ProcessorRecordStore,
DEFAULT_PORT_HANDLE,
};

Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/tests/sinks.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::epoch::Epoch;
use crate::executor_operation::ProcessorOperation;
use crate::node::{PortHandle, Sink, SinkFactory};
use crate::processor_record::ProcessorRecordStore;
use crate::DEFAULT_PORT_HANDLE;
use dozer_log::storage::Queue;
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::errors::internal::BoxedError;
use dozer_types::types::Schema;

Expand Down
13 changes: 13 additions & 0 deletions dozer-recordstore/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "dozer-recordstore"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
fuzz = ["dozer-types/arbitrary"]

[dependencies]
dozer-types = { path = "../dozer-types" }
triomphe = "0.1.9"
Loading

0 comments on commit bd1f16d

Please sign in to comment.