diff --git a/Cargo.lock b/Cargo.lock index bcff98cebf..b96e81270b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -453,6 +453,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "arbitrary" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d098ff73c1ca148721f37baad5ea6a465a13f9573aba8641fbbbae8164a54e" +dependencies = [ + "derive_arbitrary", +] + [[package]] name = "arc-swap" version = "1.6.0" @@ -1595,6 +1604,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" dependencies = [ "android-tzdata", + "arbitrary", "iana-time-zone", "js-sys", "num-traits", @@ -2507,6 +2517,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_arbitrary" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53e0efad4403bfc52dc201159c4b842a246a14b98c64b55dfd0f2d89729dfeb8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -2642,6 +2663,7 @@ dependencies = [ "dozer-cache", "dozer-core", "dozer-ingestion", + "dozer-recordstore", "dozer-sql", "dozer-storage", "dozer-tracing", @@ -2676,6 +2698,7 @@ dependencies = [ "crossbeam", "daggy", "dozer-log", + "dozer-recordstore", "dozer-storage", "dozer-tracing", "dozer-types", @@ -2775,12 +2798,21 @@ dependencies = [ "pyo3-asyncio", ] +[[package]] +name = "dozer-recordstore" +version = "0.1.0" +dependencies = [ + "dozer-types", + "triomphe", +] + [[package]] name = "dozer-sql" version = "0.1.39" dependencies = [ "ahash 0.8.3", "dozer-core", + "dozer-recordstore", "dozer-sql-expression", "dozer-storage", "dozer-tracing", @@ -2838,6 +2870,7 @@ dependencies = [ "dozer-cache", "dozer-cli", "dozer-core", + "dozer-recordstore", "dozer-sql", "dozer-tracing", "dozer-types", @@ -2879,6 +2912,7 @@ name = "dozer-types" version = "0.1.39" dependencies = [ "ahash 0.8.3", + "arbitrary", "arrow", "arrow-schema", "bincode", @@ -6929,6 +6963,7 @@ version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4c4216490d5a413bc6d10fa4742bd7d4955941d062c0ef873141d6b0e7b30fd" dependencies = [ + "arbitrary", "arrayvec", "borsh", "bytes", @@ -8496,6 +8531,16 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" +dependencies = [ + "serde", + "stable_deref_trait", +] + [[package]] name = "trust-dns-proto" version = "0.21.2" diff --git a/Cargo.toml b/Cargo.toml index f90ae5752f..f4475ce534 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "dozer-log-js", "dozer-log-python", "dozer-utils", + "dozer-recordstore", ] resolver = "2" diff --git a/dozer-cli/Cargo.toml b/dozer-cli/Cargo.toml index f57a297196..3215ad6ea5 100644 --- a/dozer-cli/Cargo.toml +++ b/dozer-cli/Cargo.toml @@ -18,6 +18,7 @@ dozer-sql = { path = "../dozer-sql" } dozer-types = { path = "../dozer-types" } dozer-tracing = { path = "../dozer-tracing" } dozer-storage = { path = "../dozer-storage" } +dozer-recordstore = { path = "../dozer-recordstore" } uuid = { version = "1.3.0", features = ["v4", "serde"] } tokio = { version = "1", features = ["full"] } diff --git a/dozer-cli/src/pipeline/dummy_sink.rs b/dozer-cli/src/pipeline/dummy_sink.rs index e9ebbec45b..9517719cab 100644 --- a/dozer-cli/src/pipeline/dummy_sink.rs +++ b/dozer-cli/src/pipeline/dummy_sink.rs @@ -5,9 +5,9 @@ use dozer_core::{ epoch::Epoch, executor_operation::ProcessorOperation, node::{PortHandle, Sink, SinkFactory}, - processor_record::ProcessorRecordStore, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::{errors::internal::BoxedError, types::Schema}; #[derive(Debug)] diff --git a/dozer-cli/src/pipeline/log_sink.rs b/dozer-cli/src/pipeline/log_sink.rs index f6e5234da3..f04bf00952 100644 --- a/dozer-cli/src/pipeline/log_sink.rs +++ b/dozer-cli/src/pipeline/log_sink.rs @@ -8,9 +8,9 @@ use dozer_core::{ epoch::Epoch, executor_operation::ProcessorOperation, node::{PortHandle, Sink, SinkFactory}, - processor_record::ProcessorRecordStore, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_tracing::LabelsAndProgress; use dozer_types::indicatif::ProgressBar; use dozer_types::types::Schema; @@ -105,9 +105,7 @@ impl Sink for LogSink { self.log .lock() .write(dozer_cache::dozer_log::replication::LogOperation::Op { - op: record_store - .load_operation(&op) - .map_err(Into::::into)?, + op: op.load(record_store)?, }); self.update_counter(); Ok(()) diff --git a/dozer-core/Cargo.toml b/dozer-core/Cargo.toml index 0f090fd84e..9da25e20b1 100644 --- a/dozer-core/Cargo.toml +++ b/dozer-core/Cargo.toml @@ -11,6 +11,7 @@ dozer-log = { path = "../dozer-log/" } dozer-storage = {path = "../dozer-storage/"} dozer-types = {path = "../dozer-types/"} dozer-tracing = {path = "../dozer-tracing/"} +dozer-recordstore = {path = "../dozer-recordstore/"} uuid = {version = "1.3.0", features = ["v1", "v4", "fast-rng"]} crossbeam = "0.8.2" diff --git a/dozer-core/src/checkpoint/mod.rs b/dozer-core/src/checkpoint/mod.rs index 4278c5d2c4..8afda79b92 100644 --- a/dozer-core/src/checkpoint/mod.rs +++ b/dozer-core/src/checkpoint/mod.rs @@ -7,6 +7,7 @@ use dozer_log::{ storage::{self, Object, Queue, Storage}, tokio::task::JoinHandle, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::{ bincode, log::{error, info}, @@ -18,7 +19,7 @@ use dozer_types::{ }; use tempdir::TempDir; -use crate::{errors::ExecutionError, processor_record::ProcessorRecordStore}; +use crate::errors::ExecutionError; #[derive(Debug)] pub struct CheckpointFactory { diff --git a/dozer-core/src/epoch/manager.rs b/dozer-core/src/epoch/manager.rs index 897285d133..3b6ebfb129 100644 --- a/dozer-core/src/epoch/manager.rs +++ b/dozer-core/src/epoch/manager.rs @@ -1,3 +1,4 @@ +use dozer_recordstore::ProcessorRecordStore; use dozer_types::node::{NodeHandle, SourceStates, TableState}; use dozer_types::parking_lot::Mutex; use std::collections::HashMap; @@ -7,7 +8,6 @@ use std::thread::sleep; use std::time::{Duration, SystemTime}; use crate::checkpoint::{CheckpointFactory, CheckpointWriter}; -use crate::processor_record::ProcessorRecordStore; use super::EpochCommonInfo; diff --git a/dozer-core/src/errors.rs b/dozer-core/src/errors.rs index 6c2a1bb7db..b709c4c791 100644 --- a/dozer-core/src/errors.rs +++ b/dozer-core/src/errors.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use crate::node::PortHandle; -use dozer_storage::errors::StorageError; +use dozer_recordstore::RecordStoreError; use dozer_types::errors::internal::BoxedError; use dozer_types::node::NodeHandle; use dozer_types::thiserror::Error; @@ -35,8 +35,8 @@ pub enum ExecutionError { Source(#[source] BoxedError), #[error("File system error {0:?}: {1}")] FileSystemError(PathBuf, #[source] std::io::Error), - #[error("Storage error: {0}")] - Storage(#[from] StorageError), + #[error("Recordstore error: {0}")] + RecordStore(#[from] RecordStoreError), #[error("Object storage error: {0}")] ObjectStorage(#[from] dozer_log::storage::Error), #[error("Checkpoint writer thread panicked")] diff --git a/dozer-core/src/executor/execution_dag.rs b/dozer-core/src/executor/execution_dag.rs index 68d87138d7..a22e269a15 100644 --- a/dozer-core/src/executor/execution_dag.rs +++ b/dozer-core/src/executor/execution_dag.rs @@ -15,7 +15,6 @@ use crate::{ executor_operation::ExecutorOperation, hash_map_to_vec::insert_vec_element, node::PortHandle, - processor_record::ProcessorRecordStore, record_store::{create_record_writer, RecordWriter}, }; use crossbeam::channel::{bounded, Receiver, Sender}; @@ -23,6 +22,7 @@ use daggy::petgraph::{ visit::{EdgeRef, IntoEdges, IntoEdgesDirected, IntoNodeIdentifiers}, Direction, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_tracing::LabelsAndProgress; pub type SharedRecordWriter = Rc>>>; diff --git a/dozer-core/src/executor/processor_node.rs b/dozer-core/src/executor/processor_node.rs index 799a351de4..c8ce1597e0 100644 --- a/dozer-core/src/executor/processor_node.rs +++ b/dozer-core/src/executor/processor_node.rs @@ -8,13 +8,13 @@ use dozer_types::node::NodeHandle; use crate::epoch::Epoch; use crate::error_manager::ErrorManager; use crate::executor_operation::{ExecutorOperation, ProcessorOperation}; -use crate::processor_record::ProcessorRecordStore; use crate::{ builder_dag::NodeKind, errors::ExecutionError, forwarder::ProcessorChannelManager, node::{PortHandle, Processor}, }; +use dozer_recordstore::ProcessorRecordStore; use super::{execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop}; diff --git a/dozer-core/src/executor/receiver_loop.rs b/dozer-core/src/executor/receiver_loop.rs index a31a9feea0..bab8004795 100644 --- a/dozer-core/src/executor/receiver_loop.rs +++ b/dozer-core/src/executor/receiver_loop.rs @@ -105,7 +105,7 @@ mod tests { types::{Field, Record}, }; - use crate::processor_record::{ProcessorRecord, ProcessorRecordStore}; + use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use super::*; diff --git a/dozer-core/src/executor_operation.rs b/dozer-core/src/executor_operation.rs index 5f734cc818..c5f229935c 100644 --- a/dozer-core/src/executor_operation.rs +++ b/dozer-core/src/executor_operation.rs @@ -1,4 +1,7 @@ -use crate::{epoch::Epoch, processor_record::ProcessorRecord}; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; +use dozer_types::types::Operation; + +use crate::{epoch::Epoch, errors::ExecutionError}; #[derive(Clone, Debug, PartialEq, Eq)] /// A CDC event. @@ -15,6 +18,41 @@ pub enum ProcessorOperation { }, } +impl ProcessorOperation { + pub fn new( + op: &Operation, + record_store: &ProcessorRecordStore, + ) -> Result { + Ok(match op { + Operation::Delete { old } => ProcessorOperation::Delete { + old: record_store.create_record(old)?, + }, + Operation::Insert { new } => ProcessorOperation::Insert { + new: record_store.create_record(new)?, + }, + Operation::Update { old, new } => ProcessorOperation::Update { + old: record_store.create_record(old)?, + new: record_store.create_record(new)?, + }, + }) + } + + pub fn load(&self, record_store: &ProcessorRecordStore) -> Result { + Ok(match self { + ProcessorOperation::Delete { old } => Operation::Delete { + old: record_store.load_record(old)?, + }, + ProcessorOperation::Insert { new } => Operation::Insert { + new: record_store.load_record(new)?, + }, + ProcessorOperation::Update { old, new } => Operation::Update { + old: record_store.load_record(old)?, + new: record_store.load_record(new)?, + }, + }) + } +} + #[derive(Clone, Debug)] pub enum ExecutorOperation { Op { op: ProcessorOperation }, diff --git a/dozer-core/src/forwarder.rs b/dozer-core/src/forwarder.rs index 8eb9df0dfd..d976c15c5f 100644 --- a/dozer-core/src/forwarder.rs +++ b/dozer-core/src/forwarder.rs @@ -234,7 +234,7 @@ impl SourceChannelManager { ); self.manager.send_op( - self.epoch_manager.record_store().create_operation(&op)?, + ProcessorOperation::new(&op, self.epoch_manager.record_store())?, port, )?; self.num_uncommitted_ops += 1; diff --git a/dozer-core/src/lib.rs b/dozer-core/src/lib.rs index 407d8d189b..a691e450a6 100644 --- a/dozer-core/src/lib.rs +++ b/dozer-core/src/lib.rs @@ -14,7 +14,6 @@ pub mod executor_operation; pub mod forwarder; mod hash_map_to_vec; pub mod node; -pub mod processor_record; pub mod record_store; #[cfg(test)] diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index c3af258697..fd9765c4ca 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -1,7 +1,7 @@ use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder}; use crate::epoch::Epoch; use crate::executor_operation::ProcessorOperation; -use crate::processor_record::ProcessorRecordStore; +use dozer_recordstore::ProcessorRecordStore; use dozer_log::storage::{Object, Queue}; use dozer_types::errors::internal::BoxedError; diff --git a/dozer-core/src/processor_record/mod.rs b/dozer-core/src/processor_record/mod.rs deleted file mode 100644 index 61fc47a803..0000000000 --- a/dozer-core/src/processor_record/mod.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::hash::Hash; -use std::sync::Arc; - -use dozer_types::{ - serde::{Deserialize, Serialize}, - types::{Field, Lifetime}, -}; - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(crate = "dozer_types::serde")] -pub struct RecordRef(Arc<[Field]>); - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] -pub struct ProcessorRecord { - /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage. - /// This is a Box<[]> instead of a Vec to save space on storing the vec's capacity - values: Box<[RecordRef]>, - - /// Time To Live for this record. If the value is None, the record will never expire. - lifetime: Option>, -} - -impl ProcessorRecord { - pub fn new(values: Box<[RecordRef]>) -> Self { - Self { - values, - ..Default::default() - } - } - - pub fn get_lifetime(&self) -> Option { - self.lifetime.as_ref().map(|lifetime| *lifetime.clone()) - } - pub fn set_lifetime(&mut self, lifetime: Option) { - self.lifetime = lifetime.map(Box::new); - } - - pub fn values(&self) -> &[RecordRef] { - &self.values - } - - pub fn appended(existing: &ProcessorRecord, additional: RecordRef) -> Self { - let mut values = Vec::with_capacity(existing.values().len() + 1); - values.extend_from_slice(existing.values()); - values.push(additional); - Self::new(values.into_boxed_slice()) - } -} - -mod store; -pub use store::ProcessorRecordStore; diff --git a/dozer-core/src/record_store.rs b/dozer-core/src/record_store.rs index 7c26041061..4ad29bad0d 100644 --- a/dozer-core/src/record_store.rs +++ b/dozer-core/src/record_store.rs @@ -1,7 +1,6 @@ use crate::executor_operation::ProcessorOperation; use crate::node::OutputPortType; -use crate::processor_record::{ProcessorRecord, ProcessorRecordStore}; -use dozer_storage::errors::StorageError; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore, RecordStoreError}; use dozer_types::thiserror::{self, Error}; use dozer_types::types::Schema; use std::collections::HashMap; @@ -14,8 +13,8 @@ use super::node::PortHandle; pub enum RecordWriterError { #[error("Record not found")] RecordNotFound, - #[error("Storage error: {0}")] - Storage(#[from] StorageError), + #[error("Recordstore error: {0}")] + RecordStore(#[from] RecordStoreError), } pub trait RecordWriter: Send + Sync { diff --git a/dozer-core/src/tests/dag_base_create_errors.rs b/dozer-core/src/tests/dag_base_create_errors.rs index b08c2ef2b4..3b6347161f 100644 --- a/dozer-core/src/tests/dag_base_create_errors.rs +++ b/dozer-core/src/tests/dag_base_create_errors.rs @@ -3,8 +3,8 @@ use crate::executor::{DagExecutor, ExecutorOptions}; use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory, }; -use crate::processor_record::ProcessorRecordStore; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; +use dozer_recordstore::ProcessorRecordStore; use crate::tests::dag_base_run::NoopProcessorFactory; use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT}; diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index aa6244fd42..785ed5496b 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -7,13 +7,13 @@ use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory, Source, SourceFactory, SourceState, }; -use crate::processor_record::ProcessorRecordStore; use crate::tests::dag_base_run::NoopProcessorFactory; use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT}; use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT}; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; use dozer_log::storage::{Object, Queue}; use dozer_log::tokio; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use dozer_types::ingestion_types::IngestionMessage; use dozer_types::node::{NodeHandle, OpIdentifier}; diff --git a/dozer-core/src/tests/dag_base_run.rs b/dozer-core/src/tests/dag_base_run.rs index 3a2338bc3d..2433ed9984 100644 --- a/dozer-core/src/tests/dag_base_run.rs +++ b/dozer-core/src/tests/dag_base_run.rs @@ -4,7 +4,6 @@ use crate::epoch::Epoch; use crate::executor::{DagExecutor, ExecutorOptions}; use crate::executor_operation::ProcessorOperation; use crate::node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}; -use crate::processor_record::ProcessorRecordStore; use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT}; use crate::tests::sources::{ DualPortGeneratorSourceFactory, GeneratorSourceFactory, @@ -14,6 +13,7 @@ use crate::tests::sources::{ use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; use dozer_log::storage::Object; use dozer_log::tokio; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use dozer_types::node::NodeHandle; use dozer_types::types::Schema; diff --git a/dozer-core/src/tests/dag_ports.rs b/dozer-core/src/tests/dag_ports.rs index bf74602c6a..14d52bf26e 100644 --- a/dozer-core/src/tests/dag_ports.rs +++ b/dozer-core/src/tests/dag_ports.rs @@ -1,8 +1,8 @@ use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory, }; -use crate::processor_record::ProcessorRecordStore; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use dozer_types::{node::NodeHandle, types::Schema}; use std::collections::HashMap; diff --git a/dozer-core/src/tests/dag_schemas.rs b/dozer-core/src/tests/dag_schemas.rs index d6f74e8ecd..4a9e578611 100644 --- a/dozer-core/src/tests/dag_schemas.rs +++ b/dozer-core/src/tests/dag_schemas.rs @@ -3,8 +3,8 @@ use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, SinkFactory, Source, SourceFactory, }; -use crate::processor_record::ProcessorRecordStore; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use dozer_types::node::NodeHandle; diff --git a/dozer-core/src/tests/processors.rs b/dozer-core/src/tests/processors.rs index 8edc74072a..703cf6cc40 100644 --- a/dozer-core/src/tests/processors.rs +++ b/dozer-core/src/tests/processors.rs @@ -1,10 +1,10 @@ use std::collections::HashMap; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::{errors::internal::BoxedError, types::Schema}; use crate::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, - processor_record::ProcessorRecordStore, DEFAULT_PORT_HANDLE, }; diff --git a/dozer-core/src/tests/sinks.rs b/dozer-core/src/tests/sinks.rs index 8cf3f7ff3a..1310c4fcfc 100644 --- a/dozer-core/src/tests/sinks.rs +++ b/dozer-core/src/tests/sinks.rs @@ -1,9 +1,9 @@ use crate::epoch::Epoch; use crate::executor_operation::ProcessorOperation; use crate::node::{PortHandle, Sink, SinkFactory}; -use crate::processor_record::ProcessorRecordStore; use crate::DEFAULT_PORT_HANDLE; use dozer_log::storage::Queue; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use dozer_types::types::Schema; diff --git a/dozer-ingestion/src/connectors/snowflake/connection/client.rs b/dozer-ingestion/src/connectors/snowflake/connection/client.rs index dfc6538557..100c2a03b7 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/client.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/client.rs @@ -20,11 +20,9 @@ use odbc::odbc_safe::{AutocommitOn, Odbc3}; use odbc::{ColumnDescriptor, Cursor, DiagnosticRecord, Environment, Executed, HasResult}; use rand::distributions::Alphanumeric; use rand::Rng; -use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Write; use std::ops::Deref; -use std::rc::Rc; use super::helpers::is_network_failure; use super::pool::{Conn, Pool}; @@ -252,7 +250,7 @@ impl<'env> Client<'env> { exec_first_exists(&self.pool, &query).map_or_else(Self::parse_not_exist_error, Ok) } - pub fn fetch(&self, query: String) -> ExecIter<'env> { + pub fn fetch(&self, query: String) -> Result, SnowflakeError> { exec_iter(self.pool.clone(), query) } @@ -283,7 +281,7 @@ impl<'env> Client<'env> { ORDER BY TABLE_NAME, ORDINAL_POSITION" ); - let results = exec_iter(self.pool.clone(), query); + let results = exec_iter(self.pool.clone(), query)?; let mut schemas: IndexMap)> = IndexMap::new(); for (idx, result) in results.enumerate() { @@ -388,7 +386,7 @@ impl<'env> Client<'env> { pub fn fetch_keys(&self) -> Result>, SnowflakeError> { 'retry: loop { let query = "SHOW PRIMARY KEYS IN SCHEMA".to_string(); - let results = exec_iter(self.pool.clone(), query); + let results = exec_iter(self.pool.clone(), query)?; let mut keys: HashMap> = HashMap::new(); for result in results { let row_data = match result { @@ -477,16 +475,14 @@ fn exec_first_exists(pool: &Pool, query: &str) -> Result ExecIter { +fn exec_iter(pool: Pool, query: String) -> Result { use genawaiter::{ rc::{gen, Gen}, yield_, }; + use ExecIterResult::*; - let schema = Rc::new(RefCell::new(None::>)); - let schema_ref = schema.clone(); - - let mut generator: Gen, (), _> = gen!({ + let mut generator: Gen = gen!({ let mut cursor_position = 0u64; 'retry: loop { let conn = pool.get_conn().map_err(QueryError)?; @@ -498,23 +494,22 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter { None => break, }; let cols = data.num_result_cols().map_err(|e| QueryError(e.into()))?; - let mut vec = Vec::new(); + let mut schema = Vec::new(); for i in 1..(cols + 1) { let value = i.try_into(); let column_descriptor = match value { Ok(v) => data.describe_col(v).map_err(|e| QueryError(e.into()))?, Err(e) => Err(SchemaConversionError(e))?, }; - vec.push(column_descriptor) + schema.push(column_descriptor) } - schema.borrow_mut().replace(vec); + yield_!(Schema(schema.clone())); while let Some(cursor) = retry!(data.fetch(),'retry).map_err(|e| QueryError(e.into()))? { - let fields = - get_fields_from_cursor(cursor, cols, schema.borrow().as_deref().unwrap())?; - yield_!(fields); + let fields = get_fields_from_cursor(cursor, cols, &schema)?; + yield_!(Row(fields)); cursor_position += 1; } } @@ -524,7 +519,7 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter { Ok::<(), SnowflakeError>(()) }); - let iterator = std::iter::from_fn(move || { + let mut iterator = std::iter::from_fn(move || { use genawaiter::GeneratorState::*; match generator.resume() { Yielded(fields) => Some(Ok(fields)), @@ -533,20 +528,32 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter { } }); - ExecIter { + let schema = match iterator.next() { + Some(Ok(Schema(schema))) => Some(schema), + Some(Err(err)) => Err(err)?, + None => None, + _ => unreachable!(), + }; + + Ok(ExecIter { iterator: Box::new(iterator), - schema: schema_ref, - } + schema, + }) +} + +enum ExecIterResult { + Schema(Vec), + Row(Vec), } pub struct ExecIter<'env> { - iterator: Box, SnowflakeError>> + 'env>, - schema: Rc>>>, + iterator: Box> + 'env>, + schema: Option>, } impl<'env> ExecIter<'env> { - pub fn schema(&self) -> Option> { - self.schema.borrow().deref().clone() + pub fn schema(&self) -> Option<&Vec> { + self.schema.as_ref() } } @@ -554,7 +561,18 @@ impl<'env> Iterator for ExecIter<'env> { type Item = Result, SnowflakeError>; fn next(&mut self) -> Option { - self.iterator.next() + use ExecIterResult::*; + loop { + let result = match self.iterator.next()? { + Ok(Schema(schema)) => { + self.schema = Some(schema); + continue; + } + Ok(Row(row)) => Ok(row), + Err(err) => Err(err), + }; + return Some(result); + } } } diff --git a/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs b/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs index 8cb5e8e8ef..4ef1c6148c 100644 --- a/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs +++ b/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs @@ -113,10 +113,10 @@ impl StreamConsumer { client.exec(&query)?; } - let rows = client.fetch(format!("SELECT * FROM {temp_table_name};")); + let rows = client.fetch(format!("SELECT * FROM {temp_table_name};"))?; if let Some(schema) = rows.schema() { let schema_len = schema.len(); - let mut truncated_schema = schema; + let mut truncated_schema = schema.clone(); truncated_schema.truncate(schema_len - 3); let columns_length = schema_len; diff --git a/dozer-recordstore/Cargo.toml b/dozer-recordstore/Cargo.toml new file mode 100644 index 0000000000..f47d595131 --- /dev/null +++ b/dozer-recordstore/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dozer-recordstore" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +fuzz = ["dozer-types/arbitrary"] + +[dependencies] +dozer-types = { path = "../dozer-types" } +triomphe = "0.1.9" diff --git a/dozer-recordstore/fuzz/Cargo.toml b/dozer-recordstore/fuzz/Cargo.toml new file mode 100644 index 0000000000..d904f85951 --- /dev/null +++ b/dozer-recordstore/fuzz/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "dozer-recordstore-fuzz" +version = "0.0.0" +publish = false +edition = "2021" + +[package.metadata] +cargo-fuzz = true + +[dependencies] +libfuzzer-sys = "0.4" + +# Prevent this from interfering with workspaces +[workspace] +members = ["."] + +[dependencies.dozer-recordstore] +path = ".." + +[dependencies.dozer-types] +path = "../../dozer-types" +features = ["arbitrary"] + +[profile.release] +debug = 1 + +[[bin]] +name = "fuzz_recordref" +path = "fuzz_targets/fuzz_recordref.rs" +test = false +doc = false + +[patch.crates-io] +postgres = { git = "https://github.com/getdozer/rust-postgres" } +postgres-protocol = { git = "https://github.com/getdozer/rust-postgres" } +postgres-types = { git = "https://github.com/getdozer/rust-postgres" } +tokio-postgres = { git = "https://github.com/getdozer/rust-postgres" } diff --git a/dozer-recordstore/fuzz/fuzz_targets/fuzz_recordref.rs b/dozer-recordstore/fuzz/fuzz_targets/fuzz_recordref.rs new file mode 100644 index 0000000000..78d4ac3778 --- /dev/null +++ b/dozer-recordstore/fuzz/fuzz_targets/fuzz_recordref.rs @@ -0,0 +1,31 @@ +#![no_main] + +use dozer_recordstore::{ProcessorRecordStore, RecordRef}; +use dozer_types::types::Field; +use libfuzzer_sys::fuzz_target; + +fuzz_target!(|data: Vec>| { + let store = ProcessorRecordStore::new().unwrap(); + for record in data { + let record_ref = store.create_ref(&record).unwrap(); + let fields: Vec<_> = store + .load_ref(&record_ref) + .unwrap() + .into_iter() + .map(|field| field.cloned()) + .collect(); + assert_eq!(fields, record); + + let cloned = record_ref.clone(); + + drop(record_ref); + + // Check that dropping is sound (e.g. no double-free when a clone is dropped) + let fields: Vec<_> = cloned + .load() + .into_iter() + .map(|field| field.cloned()) + .collect(); + assert_eq!(fields, record); + } +}); diff --git a/dozer-recordstore/src/lib.rs b/dozer-recordstore/src/lib.rs new file mode 100644 index 0000000000..9e2e8f132d --- /dev/null +++ b/dozer-recordstore/src/lib.rs @@ -0,0 +1,532 @@ +//! [`RecordRef`] is a compact representation of a collection of [dozer_types::types::Field]s +//! There are two principles that make this representation more compact than `[Field]`: +//! 1. The fields and their types are stored as a Struct of Arrays instead of +//! and Array of Structs. This makes it possible to pack the discriminants +//! for the field types as a byte per field, instead of taking up a full word, +//! which is the case in [Field] (because the variant value must be aligned) +//! 2. The field values are stored packed. In a `[Field]` representation, each +//! field takes as much space as the largest enum variant in [Field] (plus its discriminant, +//! see (1.)). Instead, for the compact representation, we pack the values into +//! align_of::() sized slots. This way, a u64 takes only 8 bytes, whereas +//! a u128 can still use its 16 bytes. +use std::alloc::{dealloc, handle_alloc_error, Layout}; +use std::{hash::Hash, ptr::NonNull}; + +use triomphe::{Arc, HeaderSlice}; + +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate}; +use dozer_types::json_types::JsonValue; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::types::{DozerDuration, DozerPoint}; +use dozer_types::{ + serde::{Deserialize, Serialize}, + types::{Field, FieldType, Lifetime}, +}; + +// The alignment of an enum is necessarily the maximum alignment of its variants +// (otherwise it would be unsound to read from it). +// So, by using the alignment of `Field` as the alignment of the values in our +// packed `RecordRef`, we ensure that all accesses are aligned. +// This wastes a little bit of memory for subsequent fields that have +// smaller minimum alignment and size (such as `bool`, which has size=1, align=1), +// but in practice this should be negligible compared to the added effort of +// packing these fields while keeping everything aligned. +const MAX_ALIGN: usize = std::mem::align_of::(); + +#[repr(transparent)] +#[derive(Debug)] +/// `repr(transparent)` inner struct so we can implement drop logic on it +/// This is a `triomphe` HeaderSlice so we can make a fat Arc, saving a level +/// of indirection and a pointer which would otherwise be needed for the field types +struct RecordRefInner(HeaderSlice, [Option]>); + +#[derive(Debug, Clone)] +pub struct RecordRef(Arc); + +impl PartialEq for RecordRef { + fn eq(&self, other: &Self) -> bool { + self.load() == other.load() + } +} + +impl Eq for RecordRef {} + +unsafe impl Send for RecordRef {} +unsafe impl Sync for RecordRef {} + +impl Hash for RecordRef { + fn hash(&self, state: &mut H) { + self.load().hash(state) + } +} + +impl<'de> Deserialize<'de> for RecordRef { + fn deserialize(deserializer: D) -> Result + where + D: dozer_types::serde::Deserializer<'de>, + { + let fields = Vec::::deserialize(deserializer)?; + let owned_fields: Vec<_> = fields.iter().map(FieldRef::cloned).collect(); + Ok(Self::new(owned_fields)) + } +} +impl Serialize for RecordRef { + fn serialize(&self, serializer: S) -> Result + where + S: dozer_types::serde::Serializer, + { + self.load().serialize(serializer) + } +} + +#[inline(always)] +unsafe fn adjust_alignment(ptr: *mut u8) -> *mut u8 { + ptr.add(ptr.align_offset(std::mem::align_of::())) +} +/// # Safety +/// ptr should be valid for writing a `T`, +/// that is, ptr..ptr + size_of:: should be inside a single live allocation +unsafe fn write(ptr: *mut u8, value: T) -> *mut u8 { + let ptr = adjust_alignment::(ptr) as *mut T; + ptr.write(value); + ptr.add(1) as *mut u8 +} + +/// # Safety +/// ptr should be valid for reading a `T`, +/// that is, ptr..ptr + size_of:: should be inside a single live allocation +/// and the memory read should be initialized. +/// The returned reference is only valid as long as pointed to memory is valid +/// for reading. +unsafe fn read_ref<'a, T>(ptr: *mut u8) -> (*mut u8, &'a T) { + let ptr = adjust_alignment::(ptr) as *mut T; + let result = &*ptr; + (ptr.add(1) as *mut u8, result) +} + +/// # Safety +/// ptr should be valid for reading a `T`, +/// that is, ptr..ptr + size_of:: should be inside a single live allocation +/// and the memory read should be initialized. +/// This takes ownership of the memory returned as `T`, which means dropping `T` +/// may make future reads from `ptr` undefined behavior +unsafe fn read(ptr: *mut u8) -> (*mut u8, T) { + let ptr = adjust_alignment::(ptr) as *mut T; + let result = ptr.read(); + (ptr.add(1) as *mut u8, result) +} + +/// # Safety +/// `ptr` should be valid for reading the contents of a `Field` with the type +/// corresponding to `field_type`. +/// See `read_ref` +unsafe fn read_field_ref<'a>(ptr: *mut u8, field_type: FieldType) -> (*mut u8, FieldRef<'a>) { + match field_type { + FieldType::UInt => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::UInt(*value)) + } + FieldType::U128 => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::U128(*value)) + } + + FieldType::Int => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Int(*value)) + } + + FieldType::I128 => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::I128(*value)) + } + + FieldType::Float => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Float(*value)) + } + + FieldType::Boolean => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Boolean(*value)) + } + + FieldType::String => { + let (ptr, value): (_, &String) = read_ref(ptr); + (ptr, FieldRef::String(value)) + } + FieldType::Text => { + let (ptr, value): (_, &String) = read_ref(ptr); + (ptr, FieldRef::Text(value)) + } + FieldType::Binary => { + let (ptr, value): (_, &Vec) = read_ref(ptr); + (ptr, FieldRef::Binary(value)) + } + FieldType::Decimal => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Decimal(*value)) + } + FieldType::Timestamp => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Timestamp(*value)) + } + FieldType::Date => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Date(*value)) + } + FieldType::Json => { + let (ptr, value) = read_ref::(ptr); + (ptr, FieldRef::Json(value.to_owned())) + } + FieldType::Point => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Point(*value)) + } + FieldType::Duration => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Duration(*value)) + } + } +} +unsafe fn read_field(ptr: *mut u8, field_type: FieldType) -> (*mut u8, Field) { + match field_type { + FieldType::UInt => { + let (ptr, value) = read(ptr); + (ptr, Field::UInt(value)) + } + FieldType::U128 => { + let (ptr, value) = read(ptr); + (ptr, Field::U128(value)) + } + + FieldType::Int => { + let (ptr, value) = read(ptr); + (ptr, Field::Int(value)) + } + + FieldType::I128 => { + let (ptr, value) = read(ptr); + (ptr, Field::I128(value)) + } + + FieldType::Float => { + let (ptr, value) = read(ptr); + (ptr, Field::Float(value)) + } + + FieldType::Boolean => { + let (ptr, value) = read(ptr); + (ptr, Field::Boolean(value)) + } + + FieldType::String => { + let (ptr, value) = read(ptr); + (ptr, Field::String(value)) + } + FieldType::Text => { + let (ptr, value) = read(ptr); + (ptr, Field::String(value)) + } + FieldType::Binary => { + let (ptr, value) = read(ptr); + (ptr, Field::Binary(value)) + } + FieldType::Decimal => { + let (ptr, value) = read(ptr); + (ptr, Field::Decimal(value)) + } + FieldType::Timestamp => { + let (ptr, value) = read(ptr); + (ptr, Field::Timestamp(value)) + } + FieldType::Date => { + let (ptr, value) = read(ptr); + (ptr, Field::Date(value)) + } + FieldType::Json => { + let (ptr, value) = read::(ptr); + (ptr, Field::Json(value)) + } + FieldType::Point => { + let (ptr, value) = read(ptr); + (ptr, Field::Point(value)) + } + FieldType::Duration => { + let (ptr, value) = read(ptr); + (ptr, Field::Duration(value)) + } + } +} + +#[inline(always)] +fn add_field_size(size: &mut usize) { + let align = std::mem::align_of::(); + // Align the start of the field + *size = (*size + (align - 1)) & !(align - 1); + *size += std::mem::size_of::(); +} +fn size(fields: &[Option]) -> usize { + let mut size = 0; + for field in fields.iter().flatten() { + match field { + FieldType::UInt => add_field_size::(&mut size), + FieldType::U128 => add_field_size::(&mut size), + FieldType::Int => add_field_size::(&mut size), + FieldType::I128 => add_field_size::(&mut size), + FieldType::Float => add_field_size::>(&mut size), + FieldType::Boolean => add_field_size::(&mut size), + FieldType::String => add_field_size::(&mut size), + FieldType::Text => add_field_size::(&mut size), + FieldType::Binary => add_field_size::>(&mut size), + FieldType::Decimal => add_field_size::(&mut size), + FieldType::Timestamp => add_field_size::>(&mut size), + FieldType::Date => add_field_size::(&mut size), + FieldType::Json => add_field_size::(&mut size), + FieldType::Point => add_field_size::(&mut size), + FieldType::Duration => add_field_size::(&mut size), + } + } + size +} + +#[derive(Hash, Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(crate = "dozer_types::serde")] +pub enum FieldRef<'a> { + UInt(u64), + U128(u128), + Int(i64), + I128(i128), + Float(OrderedFloat), + Boolean(bool), + String(&'a str), + Text(&'a str), + Binary(&'a [u8]), + Decimal(Decimal), + Timestamp(DateTime), + Date(NaiveDate), + Json(JsonValue), + Point(DozerPoint), + Duration(DozerDuration), + Null, +} + +impl FieldRef<'_> { + pub fn cloned(&self) -> Field { + match self { + FieldRef::UInt(v) => Field::UInt(*v), + FieldRef::U128(v) => Field::U128(*v), + FieldRef::Int(v) => Field::Int(*v), + FieldRef::I128(v) => Field::I128(*v), + FieldRef::Float(v) => Field::Float(*v), + FieldRef::Boolean(v) => Field::Boolean(*v), + FieldRef::String(v) => Field::String((*v).to_owned()), + FieldRef::Text(v) => Field::Text((*v).to_owned()), + FieldRef::Binary(v) => Field::Binary((*v).to_vec()), + FieldRef::Decimal(v) => Field::Decimal(*v), + FieldRef::Timestamp(v) => Field::Timestamp(*v), + FieldRef::Date(v) => Field::Date(*v), + FieldRef::Json(v) => Field::Json(v.clone()), + FieldRef::Point(v) => Field::Point(*v), + FieldRef::Duration(v) => Field::Duration(*v), + FieldRef::Null => Field::Null, + } + } +} + +impl RecordRef { + pub fn new(fields: Vec) -> Self { + let field_types = fields + .iter() + .map(|field| field.ty()) + .collect::]>>(); + let size = size(&field_types); + + let layout = Layout::from_size_align(size, MAX_ALIGN).unwrap(); + // SAFETY: Everything is `ALIGN` byte aligned + let data = unsafe { + let data = std::alloc::alloc(layout); + if data.is_null() { + handle_alloc_error(layout); + } + data + }; + // SAFETY: We checked for null above + let data = unsafe { NonNull::new_unchecked(data) }; + let mut ptr = data.as_ptr(); + + // SAFETY: + // - ptr is non-null (we got it from a NonNull) + // - ptr is dereferencable (its memory range is large enough and not de-allocated) + // + unsafe { + for field in fields { + match field { + Field::UInt(v) => ptr = write(ptr, v), + Field::U128(v) => ptr = write(ptr, v), + Field::Int(v) => ptr = write(ptr, v), + Field::I128(v) => ptr = write(ptr, v), + Field::Float(v) => ptr = write(ptr, v), + Field::Boolean(v) => ptr = write(ptr, v), + Field::String(v) => ptr = write(ptr, v), + Field::Text(v) => ptr = write(ptr, v), + Field::Binary(v) => ptr = write(ptr, v), + Field::Decimal(v) => ptr = write(ptr, v), + Field::Timestamp(v) => ptr = write(ptr, v), + Field::Date(v) => ptr = write(ptr, v), + Field::Json(v) => ptr = write(ptr, v), + Field::Point(v) => ptr = write(ptr, v), + Field::Duration(v) => ptr = write(ptr, v), + Field::Null => (), + } + } + } + // SAFETY: This is valid, because inner is `repr(transparent)` + let arc = unsafe { + let arc = Arc::from_header_and_slice(data, &field_types); + std::mem::transmute(arc) + }; + Self(arc) + } + + pub fn load(&self) -> Vec> { + self.0 + .field_types() + .iter() + .scan(self.0.data().as_ptr(), |ptr, field_type| { + let Some(field_type) = field_type else { + return Some(FieldRef::Null); + }; + + unsafe { + let (new_ptr, value) = read_field_ref(*ptr, *field_type); + *ptr = new_ptr; + Some(value) + } + }) + .collect() + } + + #[inline(always)] + pub fn id(&self) -> usize { + self.0.as_ptr() as *const () as usize + } +} + +impl RecordRefInner { + #[inline(always)] + fn field_types(&self) -> &[Option] { + &self.0.slice + } + + #[inline(always)] + fn data(&self) -> NonNull { + self.0.header + } +} + +impl Drop for RecordRefInner { + fn drop(&mut self) { + let mut ptr = self.data().as_ptr(); + for field in self.field_types().iter().flatten() { + unsafe { + // Read owned so all field destructors run + ptr = read_field(ptr, *field).0; + } + } + // Then deallocate the field storage + unsafe { + dealloc( + self.data().as_ptr(), + Layout::from_size_align(size(self.field_types()), MAX_ALIGN).unwrap(), + ); + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct ProcessorRecord { + /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage. + /// This is a Box<[]> instead of a Vec to save space on storing the vec's capacity + values: Box<[RecordRef]>, + + /// Time To Live for this record. If the value is None, the record will never expire. + lifetime: Option>, +} + +impl ProcessorRecord { + pub fn new(values: Box<[RecordRef]>) -> Self { + Self { + values, + ..Default::default() + } + } + + pub fn get_lifetime(&self) -> Option { + self.lifetime.as_ref().map(|lifetime| *lifetime.clone()) + } + pub fn set_lifetime(&mut self, lifetime: Option) { + self.lifetime = lifetime.map(Box::new); + } + + pub fn values(&self) -> &[RecordRef] { + &self.values + } + + pub fn appended(existing: &ProcessorRecord, additional: RecordRef) -> Self { + let mut values = Vec::with_capacity(existing.values().len() + 1); + values.extend_from_slice(existing.values()); + values.push(additional); + Self::new(values.into_boxed_slice()) + } +} + +mod store; +pub use store::{ProcessorRecordStore, RecordStoreError}; + +#[cfg(test)] +mod tests { + use dozer_types::types::Field; + + use crate::RecordRef; + + #[test] + fn test_store_load() { + let fields = vec![ + Field::String("asdf".to_owned()), + Field::Int(23), + Field::Null, + Field::U128(234), + ]; + + let record = RecordRef::new(fields.clone()); + let loaded_fields: Vec<_> = record + .load() + .into_iter() + .map(|field| field.cloned()) + .collect(); + assert_eq!(&fields, &loaded_fields); + } + + #[test] + fn test_ser_de() { + let fields = vec![ + Field::String("asdf".to_owned()), + Field::Int(23), + Field::Null, + Field::U128(234), + ]; + + let record = RecordRef::new(fields.clone()); + + let bytes = dozer_types::bincode::serialize(&record).unwrap(); + let deserialized: RecordRef = dozer_types::bincode::deserialize(&bytes).unwrap(); + let loaded_fields: Vec<_> = deserialized + .load() + .into_iter() + .map(|field| field.cloned()) + .collect(); + assert_eq!(&fields, &loaded_fields); + } +} diff --git a/dozer-core/src/processor_record/store.rs b/dozer-recordstore/src/store.rs similarity index 71% rename from dozer-core/src/processor_record/store.rs rename to dozer-recordstore/src/store.rs index de9d383fc4..29160a6b1a 100644 --- a/dozer-core/src/processor_record/store.rs +++ b/dozer-recordstore/src/store.rs @@ -1,16 +1,30 @@ use std::collections::HashMap; -use dozer_storage::errors::StorageError; use dozer_types::{ bincode, + errors::internal::BoxedError, parking_lot::RwLock, serde::{Deserialize, Serialize}, - types::{Field, Lifetime, Operation, Record}, + thiserror::Error, + types::{Field, Lifetime, Record}, }; -use crate::{errors::ExecutionError, executor_operation::ProcessorOperation}; - -use super::{ProcessorRecord, RecordRef}; +use super::{FieldRef, ProcessorRecord, RecordRef}; + +#[derive(Error, Debug)] +pub enum RecordStoreError { + #[error("Unable to deserialize type: {} - Reason: {}", typ, reason.to_string())] + DeserializationError { + typ: &'static str, + reason: BoxedError, + }, + + #[error("Unable to serialize type: {} - Reason: {}", typ, reason.to_string())] + SerializationError { + typ: &'static str, + reason: BoxedError, + }, +} #[derive(Debug)] pub struct ProcessorRecordStore { @@ -24,7 +38,7 @@ struct ProcessorRecordStoreInner { } impl ProcessorRecordStore { - pub fn new() -> Result { + pub fn new() -> Result { Ok(Self { inner: RwLock::new(Default::default()), }) @@ -34,19 +48,19 @@ impl ProcessorRecordStore { self.inner.read().records.len() } - pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), StorageError> { + pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), RecordStoreError> { let records = &self.inner.read().records; let slice = &records[start..]; - let data = bincode::serialize(slice).map_err(|e| StorageError::SerializationError { + let data = bincode::serialize(slice).map_err(|e| RecordStoreError::SerializationError { typ: "[RecordRef]", reason: Box::new(e), })?; Ok((data, slice.len())) } - pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), StorageError> { + pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> { let slice: Vec = - bincode::deserialize(data).map_err(|e| StorageError::DeserializationError { + bincode::deserialize(data).map_err(|e| RecordStoreError::DeserializationError { typ: "[RecordRef]", reason: Box::new(e), })?; @@ -64,8 +78,8 @@ impl ProcessorRecordStore { Ok(()) } - pub fn create_ref(&self, values: &[Field]) -> Result { - let record = RecordRef(values.to_vec().into()); + pub fn create_ref(&self, values: &[Field]) -> Result { + let record = RecordRef::new(values.to_vec()); let mut inner = self.inner.write(); @@ -76,8 +90,11 @@ impl ProcessorRecordStore { Ok(record) } - pub fn load_ref(&self, record_ref: &RecordRef) -> Result, StorageError> { - Ok(record_ref.0.to_vec()) + pub fn load_ref<'a>( + &self, + record_ref: &'a RecordRef, + ) -> Result>, RecordStoreError> { + Ok(record_ref.load()) } pub fn serialize_ref(&self, record_ref: &RecordRef) -> u64 { @@ -85,7 +102,7 @@ impl ProcessorRecordStore { .inner .read() .record_pointer_to_index - .get(&(record_ref.0.as_ptr() as usize)) + .get(&record_ref.id()) .expect("RecordRef not found in ProcessorRecordStore") as u64 } @@ -93,18 +110,23 @@ impl ProcessorRecordStore { self.inner.read().records[index as usize].clone() } - pub fn create_record(&self, record: &Record) -> Result { + pub fn create_record(&self, record: &Record) -> Result { let record_ref = self.create_ref(&record.values)?; let mut processor_record = ProcessorRecord::new(Box::new([record_ref])); processor_record.set_lifetime(record.lifetime.clone()); Ok(processor_record) } - pub fn load_record(&self, processor_record: &ProcessorRecord) -> Result { + pub fn load_record( + &self, + processor_record: &ProcessorRecord, + ) -> Result { let mut record = Record::default(); for record_ref in processor_record.values.iter() { let fields = self.load_ref(record_ref)?; - record.values.extend(fields.iter().cloned()); + record + .values + .extend(fields.iter().map(|field| field.cloned())); } record.set_lifetime(processor_record.get_lifetime()); Ok(record) @@ -131,48 +153,6 @@ impl ProcessorRecordStore { .collect(); Ok(ProcessorRecord { values, lifetime }) } - - pub fn create_operation( - &self, - operation: &Operation, - ) -> Result { - match operation { - Operation::Delete { old } => { - let old = self.create_record(old)?; - Ok(ProcessorOperation::Delete { old }) - } - Operation::Insert { new } => { - let new = self.create_record(new)?; - Ok(ProcessorOperation::Insert { new }) - } - Operation::Update { old, new } => { - let old = self.create_record(old)?; - let new = self.create_record(new)?; - Ok(ProcessorOperation::Update { old, new }) - } - } - } - - pub fn load_operation( - &self, - operation: &ProcessorOperation, - ) -> Result { - match operation { - ProcessorOperation::Delete { old } => { - let old = self.load_record(old)?; - Ok(Operation::Delete { old }) - } - ProcessorOperation::Insert { new } => { - let new = self.load_record(new)?; - Ok(Operation::Insert { new }) - } - ProcessorOperation::Update { old, new } => { - let old = self.load_record(old)?; - let new = self.load_record(new)?; - Ok(Operation::Update { old, new }) - } - } - } } fn insert_record_pointer_to_index( @@ -180,7 +160,7 @@ fn insert_record_pointer_to_index( record: &RecordRef, index: usize, ) { - let previous_index = record_pointer_to_index.insert(record.0.as_ptr() as usize, index); + let previous_index = record_pointer_to_index.insert(record.id(), index); debug_assert!(previous_index.is_none()); } diff --git a/dozer-sql/Cargo.toml b/dozer-sql/Cargo.toml index a7a0aa0f07..caf7c51bc9 100644 --- a/dozer-sql/Cargo.toml +++ b/dozer-sql/Cargo.toml @@ -12,6 +12,7 @@ dozer-storage = { path = "../dozer-storage" } dozer-core = { path = "../dozer-core" } dozer-tracing = { path = "../dozer-tracing" } dozer-sql-expression = { path = "expression" } +dozer-recordstore = {path = "../dozer-recordstore"} ahash = "0.8.3" enum_dispatch = "0.3.11" diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 2499d1a0af..54e069a743 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -367,7 +367,7 @@ impl Expression { pub fn get_type(&self, schema: &Schema) -> Result { match self { Expression::Literal(field) => { - let field_type = get_field_type(field); + let field_type = field.ty(); match field_type { Some(f) => Ok(ExpressionType::new( f, @@ -472,27 +472,6 @@ impl Expression { } } -pub fn get_field_type(field: &Field) -> Option { - match field { - Field::UInt(_) => Some(FieldType::UInt), - Field::U128(_) => Some(FieldType::U128), - Field::Int(_) => Some(FieldType::Int), - Field::I128(_) => Some(FieldType::I128), - Field::Float(_) => Some(FieldType::Float), - Field::Boolean(_) => Some(FieldType::Boolean), - Field::String(_) => Some(FieldType::String), - Field::Binary(_) => Some(FieldType::Binary), - Field::Decimal(_) => Some(FieldType::Decimal), - Field::Timestamp(_) => Some(FieldType::Timestamp), - Field::Json(_) => Some(FieldType::Json), - Field::Text(_) => Some(FieldType::Text), - Field::Date(_) => Some(FieldType::Date), - Field::Point(_) => Some(FieldType::Point), - Field::Duration(_) => Some(FieldType::Duration), - Field::Null => None, - } -} - fn get_unary_operator_type( operator: &UnaryOperatorType, expression: &Expression, diff --git a/dozer-sql/src/aggregation/factory.rs b/dozer-sql/src/aggregation/factory.rs index 1f09bf804e..76463b6141 100644 --- a/dozer-sql/src/aggregation/factory.rs +++ b/dozer-sql/src/aggregation/factory.rs @@ -1,11 +1,11 @@ use crate::planner::projection::CommonPlanner; use crate::projection::processor::ProjectionProcessor; use crate::{aggregation::processor::AggregationProcessor, errors::PipelineError}; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::sqlparser::ast::Select; use dozer_types::errors::internal::BoxedError; use dozer_types::models::udf_config::UdfConfig; diff --git a/dozer-sql/src/aggregation/processor.rs b/dozer-sql/src/aggregation/processor.rs index 4bd8772e0b..70cd1773b6 100644 --- a/dozer-sql/src/aggregation/processor.rs +++ b/dozer-sql/src/aggregation/processor.rs @@ -7,8 +7,8 @@ use dozer_core::channels::ProcessorChannelForwarder; use dozer_core::dozer_log::storage::Object; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::{PortHandle, Processor}; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::execution::Expression; use dozer_types::bincode; use dozer_types::errors::internal::BoxedError; @@ -585,10 +585,10 @@ impl Processor for AggregationProcessor { op: ProcessorOperation, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { - let op = record_store.load_operation(&op)?; + let op = op.load(record_store)?; let ops = self.aggregate(op)?; for output_op in ops { - let output_op = record_store.create_operation(&output_op)?; + let output_op = ProcessorOperation::new(&output_op, record_store)?; fw.send(output_op, DEFAULT_PORT_HANDLE); } Ok(()) diff --git a/dozer-sql/src/errors.rs b/dozer-sql/src/errors.rs index 2f56c676b7..98c6d94c63 100644 --- a/dozer-sql/src/errors.rs +++ b/dozer-sql/src/errors.rs @@ -1,7 +1,7 @@ #![allow(clippy::enum_variant_names)] use dozer_core::node::PortHandle; -use dozer_storage::errors::StorageError; +use dozer_recordstore::RecordStoreError; use dozer_types::chrono::RoundingError; use dozer_types::errors::internal::BoxedError; use dozer_types::errors::types::TypeError; @@ -193,8 +193,8 @@ pub enum JoinError { #[error("Field type error computing the eviction time in the TTL reference field")] EvictionTypeOverflow, - #[error("Storage error: {0}")] - Storage(#[from] StorageError), + #[error("Recordstore error: {0}")] + RecordStore(#[from] RecordStoreError), #[error("Deserialization error: {0}")] Deserialization(#[from] DeserializationError), @@ -294,8 +294,8 @@ pub enum WindowError { #[error("WINDOW functions require alias")] NoAlias, - #[error("Storage error")] - Storage(#[from] StorageError), + #[error("RecordStore error")] + RecordStore(#[from] RecordStoreError), } #[derive(Error, Debug)] @@ -324,6 +324,6 @@ pub enum TableOperatorError { #[error("TTL input must evaluate to timestamp, but it evaluates to {0}")] InvalidTtlInputType(Field), - #[error("Storage error")] - Storage(#[from] StorageError), + #[error("Recordstore error")] + RecordStore(#[from] RecordStoreError), } diff --git a/dozer-sql/src/expression/tests/test_common.rs b/dozer-sql/src/expression/tests/test_common.rs index c456c7949b..5ec2a24731 100644 --- a/dozer-sql/src/expression/tests/test_common.rs +++ b/dozer-sql/src/expression/tests/test_common.rs @@ -2,8 +2,8 @@ use crate::{projection::factory::ProjectionProcessorFactory, tests::utils::get_s use dozer_core::channels::ProcessorChannelForwarder; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::ProcessorFactory; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::types::Record; use dozer_types::types::{Field, Schema}; use std::collections::HashMap; diff --git a/dozer-sql/src/product/join/factory.rs b/dozer-sql/src/product/join/factory.rs index b5b6f367dc..c910f977fd 100644 --- a/dozer-sql/src/product/join/factory.rs +++ b/dozer-sql/src/product/join/factory.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use dozer_core::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, - processor_record::ProcessorRecordStore, DEFAULT_PORT_HANDLE, }; use dozer_sql_expression::{ @@ -12,6 +11,8 @@ use dozer_sql_expression::{ JoinOperator as SqlJoinOperator, }, }; + +use dozer_recordstore::ProcessorRecordStore; use dozer_types::{ errors::internal::BoxedError, types::{FieldDefinition, Schema}, diff --git a/dozer-sql/src/product/join/operator/mod.rs b/dozer-sql/src/product/join/operator/mod.rs index 2bd0b1f6cd..93db8a0314 100644 --- a/dozer-sql/src/product/join/operator/mod.rs +++ b/dozer-sql/src/product/join/operator/mod.rs @@ -1,7 +1,5 @@ -use dozer_core::{ - dozer_log::storage::Object, - processor_record::{ProcessorRecord, ProcessorRecordStore}, -}; +use dozer_core::dozer_log::storage::Object; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::types::{Record, Schema, Timestamp}; use crate::{ diff --git a/dozer-sql/src/product/join/operator/table.rs b/dozer-sql/src/product/join/operator/table.rs index cbd2863b19..74605de7a4 100644 --- a/dozer-sql/src/product/join/operator/table.rs +++ b/dozer-sql/src/product/join/operator/table.rs @@ -6,10 +6,8 @@ use std::{ iter::{once, Flatten, Once}, }; -use dozer_core::{ - dozer_log::storage::Object, - processor_record::{ProcessorRecord, ProcessorRecordStore}, -}; +use dozer_core::dozer_log::storage::Object; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::{ chrono, types::{Field, Record, Schema, Timestamp}, diff --git a/dozer-sql/src/product/join/processor.rs b/dozer-sql/src/product/join/processor.rs index 23553f5213..4a98609f9c 100644 --- a/dozer-sql/src/product/join/processor.rs +++ b/dozer-sql/src/product/join/processor.rs @@ -3,8 +3,8 @@ use dozer_core::dozer_log::storage::Object; use dozer_core::epoch::Epoch; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::{PortHandle, Processor}; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_tracing::Labels; use dozer_types::errors::internal::BoxedError; use dozer_types::types::Lifetime; diff --git a/dozer-sql/src/product/set/operator.rs b/dozer-sql/src/product/set/operator.rs index 0eef1111b4..f0ade01fd9 100644 --- a/dozer-sql/src/product/set/operator.rs +++ b/dozer-sql/src/product/set/operator.rs @@ -1,6 +1,6 @@ use super::record_map::{CountingRecordMap, CountingRecordMapEnum}; use crate::errors::PipelineError; -use dozer_core::processor_record::ProcessorRecord; +use dozer_recordstore::ProcessorRecord; use dozer_sql_expression::sqlparser::ast::{SetOperator, SetQuantifier}; #[derive(Clone, Debug, PartialEq, Eq, Copy)] diff --git a/dozer-sql/src/product/set/record_map/mod.rs b/dozer-sql/src/product/set/record_map/mod.rs index eeab3e9ddb..6ad0acfbb4 100644 --- a/dozer-sql/src/product/set/record_map/mod.rs +++ b/dozer-sql/src/product/set/record_map/mod.rs @@ -1,7 +1,5 @@ -use dozer_core::{ - dozer_log::storage::Object, - processor_record::{ProcessorRecord, ProcessorRecordStore}, -}; +use dozer_core::dozer_log::storage::Object; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::serde::{Deserialize, Serialize}; use enum_dispatch::enum_dispatch; use std::collections::HashMap; @@ -163,7 +161,7 @@ mod bloom; #[cfg(test)] mod tests { - use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore}; + use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::types::{Field, Record}; use super::{ diff --git a/dozer-sql/src/product/set/set_factory.rs b/dozer-sql/src/product/set/set_factory.rs index 04910aa5b6..c5421f6097 100644 --- a/dozer-sql/src/product/set/set_factory.rs +++ b/dozer-sql/src/product/set/set_factory.rs @@ -3,11 +3,11 @@ use std::collections::HashMap; use crate::errors::PipelineError; use crate::errors::SetError; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::sqlparser::ast::{SetOperator, SetQuantifier}; use dozer_types::errors::internal::BoxedError; use dozer_types::types::{FieldDefinition, Schema, SourceDefinition}; diff --git a/dozer-sql/src/product/set/set_processor.rs b/dozer-sql/src/product/set/set_processor.rs index a9f74034fc..9a6e2e205b 100644 --- a/dozer-sql/src/product/set/set_processor.rs +++ b/dozer-sql/src/product/set/set_processor.rs @@ -10,8 +10,8 @@ use dozer_core::dozer_log::storage::Object; use dozer_core::epoch::Epoch; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::{PortHandle, Processor}; -use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore}; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::errors::internal::BoxedError; use std::fmt::{Debug, Formatter}; diff --git a/dozer-sql/src/product/table/factory.rs b/dozer-sql/src/product/table/factory.rs index a016f368f9..0f5ab6b8de 100644 --- a/dozer-sql/src/product/table/factory.rs +++ b/dozer-sql/src/product/table/factory.rs @@ -2,9 +2,9 @@ use std::collections::HashMap; use dozer_core::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, - processor_record::ProcessorRecordStore, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::{ builder::{extend_schema_source_def, NameOrAlias}, sqlparser::ast::TableFactor, diff --git a/dozer-sql/src/product/table/processor.rs b/dozer-sql/src/product/table/processor.rs index e6dfe8edc0..1368b0698b 100644 --- a/dozer-sql/src/product/table/processor.rs +++ b/dozer-sql/src/product/table/processor.rs @@ -3,8 +3,8 @@ use dozer_core::dozer_log::storage::Object; use dozer_core::epoch::Epoch; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::{PortHandle, Processor}; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; #[derive(Debug)] diff --git a/dozer-sql/src/projection/factory.rs b/dozer-sql/src/projection/factory.rs index 54e7df80f0..b9b150ef0f 100644 --- a/dozer-sql/src/projection/factory.rs +++ b/dozer-sql/src/projection/factory.rs @@ -2,9 +2,9 @@ use std::collections::HashMap; use dozer_core::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, - processor_record::ProcessorRecordStore, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::{ builder::ExpressionBuilder, execution::Expression, diff --git a/dozer-sql/src/projection/processor.rs b/dozer-sql/src/projection/processor.rs index ec8a03ec43..b715663582 100644 --- a/dozer-sql/src/projection/processor.rs +++ b/dozer-sql/src/projection/processor.rs @@ -6,8 +6,8 @@ use dozer_core::dozer_log::storage::Object; use dozer_core::epoch::Epoch; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::{PortHandle, Processor}; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use dozer_types::types::{Operation, Record, Schema}; @@ -82,13 +82,13 @@ impl Processor for ProjectionProcessor { op: ProcessorOperation, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { - let op = record_store.load_operation(&op)?; + let op = op.load(record_store)?; let output_op = match op { Operation::Delete { ref old } => self.delete(old)?, Operation::Insert { ref new } => self.insert(new)?, Operation::Update { ref old, ref new } => self.update(old, new)?, }; - let output_op = record_store.create_operation(&output_op)?; + let output_op = ProcessorOperation::new(&output_op, record_store)?; fw.send(output_op, DEFAULT_PORT_HANDLE); Ok(()) } diff --git a/dozer-sql/src/selection/factory.rs b/dozer-sql/src/selection/factory.rs index 6bcf6de27f..679994ac41 100644 --- a/dozer-sql/src/selection/factory.rs +++ b/dozer-sql/src/selection/factory.rs @@ -1,11 +1,11 @@ use std::collections::HashMap; use crate::errors::PipelineError; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::builder::ExpressionBuilder; use dozer_sql_expression::sqlparser::ast::Expr as SqlExpr; use dozer_types::models::udf_config::UdfConfig; diff --git a/dozer-sql/src/selection/processor.rs b/dozer-sql/src/selection/processor.rs index e2677a0cc8..c1ae3901ad 100644 --- a/dozer-sql/src/selection/processor.rs +++ b/dozer-sql/src/selection/processor.rs @@ -3,8 +3,8 @@ use dozer_core::dozer_log::storage::Object; use dozer_core::epoch::Epoch; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::{PortHandle, Processor}; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::execution::Expression; use dozer_types::errors::internal::BoxedError; use dozer_types::types::{Field, Schema}; diff --git a/dozer-sql/src/table_operator/factory.rs b/dozer-sql/src/table_operator/factory.rs index ca3bcd1189..1c1fa30e81 100644 --- a/dozer-sql/src/table_operator/factory.rs +++ b/dozer-sql/src/table_operator/factory.rs @@ -2,9 +2,9 @@ use std::{collections::HashMap, time::Duration}; use dozer_core::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, - processor_record::ProcessorRecordStore, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::{ builder::ExpressionBuilder, execution::Expression, diff --git a/dozer-sql/src/table_operator/lifetime.rs b/dozer-sql/src/table_operator/lifetime.rs index 975f200b55..4379db1158 100644 --- a/dozer-sql/src/table_operator/lifetime.rs +++ b/dozer-sql/src/table_operator/lifetime.rs @@ -1,4 +1,4 @@ -use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore}; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_sql_expression::execution::Expression; use dozer_types::types::{Field, Lifetime, Schema}; diff --git a/dozer-sql/src/table_operator/operator.rs b/dozer-sql/src/table_operator/operator.rs index 387b2a9a60..da95c03833 100644 --- a/dozer-sql/src/table_operator/operator.rs +++ b/dozer-sql/src/table_operator/operator.rs @@ -1,5 +1,5 @@ use crate::table_operator::lifetime::LifetimeTableOperator; -use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore}; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::types::Schema; use enum_dispatch::enum_dispatch; diff --git a/dozer-sql/src/table_operator/processor.rs b/dozer-sql/src/table_operator/processor.rs index af334f7a42..12f2ae87e6 100644 --- a/dozer-sql/src/table_operator/processor.rs +++ b/dozer-sql/src/table_operator/processor.rs @@ -3,8 +3,8 @@ use dozer_core::dozer_log::storage::Object; use dozer_core::epoch::Epoch; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::{PortHandle, Processor}; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use dozer_types::types::Schema; diff --git a/dozer-sql/src/table_operator/tests/operator_test.rs b/dozer-sql/src/table_operator/tests/operator_test.rs index 072e448818..ec9a6f709d 100644 --- a/dozer-sql/src/table_operator/tests/operator_test.rs +++ b/dozer-sql/src/table_operator/tests/operator_test.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use dozer_core::processor_record::ProcessorRecordStore; +use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::execution::Expression; use dozer_types::{ chrono::DateTime, diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index 6df1d62af3..be42d53225 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -10,8 +10,8 @@ use dozer_core::node::{ OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory, SourceState, }; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use dozer_types::ingestion_types::IngestionMessage; use dozer_types::log::debug; diff --git a/dozer-sql/src/utils/serialize.rs b/dozer-sql/src/utils/serialize.rs index 6192ce7c60..44cceca6a3 100644 --- a/dozer-sql/src/utils/serialize.rs +++ b/dozer-sql/src/utils/serialize.rs @@ -1,7 +1,5 @@ -use dozer_core::{ - dozer_log::{storage::Object, tokio::sync::mpsc::error::SendError}, - processor_record::{ProcessorRecord, ProcessorRecordStore}, -}; +use dozer_core::dozer_log::{storage::Object, tokio::sync::mpsc::error::SendError}; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::{ bincode, serde::{de::DeserializeOwned, Serialize}, diff --git a/dozer-sql/src/window/factory.rs b/dozer-sql/src/window/factory.rs index 3fbef96600..9885afb689 100644 --- a/dozer-sql/src/window/factory.rs +++ b/dozer-sql/src/window/factory.rs @@ -2,9 +2,9 @@ use std::collections::HashMap; use dozer_core::{ node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory}, - processor_record::ProcessorRecordStore, DEFAULT_PORT_HANDLE, }; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::{errors::internal::BoxedError, types::Schema}; use crate::{ diff --git a/dozer-sql/src/window/operator.rs b/dozer-sql/src/window/operator.rs index 9172f473f8..e17492b046 100644 --- a/dozer-sql/src/window/operator.rs +++ b/dozer-sql/src/window/operator.rs @@ -1,4 +1,4 @@ -use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore}; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::{ chrono::{Duration, DurationRound}, types::{Field, FieldDefinition, FieldType, Record, Schema, SourceDefinition}, diff --git a/dozer-sql/src/window/processor.rs b/dozer-sql/src/window/processor.rs index 4439fb88bf..78e5cc9d0f 100644 --- a/dozer-sql/src/window/processor.rs +++ b/dozer-sql/src/window/processor.rs @@ -4,8 +4,8 @@ use dozer_core::dozer_log::storage::Object; use dozer_core::epoch::Epoch; use dozer_core::executor_operation::ProcessorOperation; use dozer_core::node::{PortHandle, Processor}; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; use super::operator::WindowType; diff --git a/dozer-sql/src/window/tests/operator_test.rs b/dozer-sql/src/window/tests/operator_test.rs index 21d8dd9e4d..4b5fe3b510 100644 --- a/dozer-sql/src/window/tests/operator_test.rs +++ b/dozer-sql/src/window/tests/operator_test.rs @@ -1,4 +1,4 @@ -use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore}; +use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore}; use dozer_types::types::Record; use dozer_types::{ chrono::{DateTime, Duration}, diff --git a/dozer-tests/Cargo.toml b/dozer-tests/Cargo.toml index 8a8632b607..468427412b 100644 --- a/dozer-tests/Cargo.toml +++ b/dozer-tests/Cargo.toml @@ -25,6 +25,7 @@ dozer-api = { path = "../dozer-api" } dozer-utils = { path = "../dozer-utils" } dozer-cli = { path = "../dozer-cli" } dozer-tracing = { path = "../dozer-tracing" } +dozer-recordstore = { path = "../dozer-recordstore" } reqwest = { version = "0.11.20", features = ["json", "rustls-tls"], default-features = false } tokio = { version = "1.25.0", features = ["full", "rt"] } diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 2eb7352d08..1a82b7d5c8 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -12,8 +12,8 @@ use dozer_core::node::{ SourceState, }; -use dozer_core::processor_record::ProcessorRecordStore; use dozer_core::{Dag, DEFAULT_PORT_HANDLE}; +use dozer_recordstore::ProcessorRecordStore; use dozer_core::executor::{DagExecutor, ExecutorOptions}; diff --git a/dozer-types/Cargo.toml b/dozer-types/Cargo.toml index 5a8af1020d..223b4e7a81 100644 --- a/dozer-types/Cargo.toml +++ b/dozer-types/Cargo.toml @@ -36,6 +36,7 @@ tokio-postgres = { version = "0.7.7", features = [ "with-uuid-1", ] } serde_bytes = "0.11.12" +arbitrary = { version = "1", features = ["derive"], optional = true } [build-dependencies] tonic-build = "0.10.0" @@ -43,3 +44,4 @@ tonic-build = "0.10.0" [features] python-extension-module = ["dep:pyo3", "pyo3?/extension-module"] python-auto-initialize = ["dep:pyo3", "pyo3?/auto-initialize"] +arbitrary = ["dep:arbitrary", "chrono/arbitrary", "rust_decimal/rust-fuzz"] diff --git a/dozer-types/src/json_types.rs b/dozer-types/src/json_types.rs index 4c42d229d1..fbb60ae46e 100644 --- a/dozer-types/src/json_types.rs +++ b/dozer-types/src/json_types.rs @@ -13,10 +13,14 @@ use std::fmt::{Display, Formatter}; use std::str::FromStr; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord, Hash)] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub enum JsonValue { Null, Bool(bool), - Number(OrderedFloat), + Number( + #[cfg_attr(feature="arbitrary", arbitrary(with = crate::types::field::arbitrary_float))] + OrderedFloat, + ), String(String), Array(Vec), Object(BTreeMap), diff --git a/dozer-types/src/types/field.rs b/dozer-types/src/types/field.rs index f67c16d2fc..2c5caae647 100644 --- a/dozer-types/src/types/field.rs +++ b/dozer-types/src/types/field.rs @@ -16,12 +16,13 @@ use std::time::Duration; pub const DATE_FORMAT: &str = "%Y-%m-%d"; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord, Hash)] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub enum Field { UInt(u64), U128(u128), Int(i64), I128(i128), - Float(OrderedFloat), + Float(#[cfg_attr(feature= "arbitrary", arbitrary(with = arbitrary_float))] OrderedFloat), Boolean(bool), String(String), Text(String), @@ -35,6 +36,13 @@ pub enum Field { Null, } +#[cfg(feature = "arbitrary")] +pub(crate) fn arbitrary_float( + arbitrary: &mut arbitrary::Unstructured, +) -> arbitrary::Result> { + Ok(OrderedFloat(arbitrary.arbitrary()?)) +} + impl Field { pub(crate) fn data_encoding_len(&self) -> usize { match self { @@ -183,6 +191,27 @@ impl Field { } } + pub fn ty(&self) -> Option { + match self { + Field::UInt(_) => Some(FieldType::UInt), + Field::U128(_) => Some(FieldType::U128), + Field::Int(_) => Some(FieldType::Int), + Field::I128(_) => Some(FieldType::I128), + Field::Float(_) => Some(FieldType::Float), + Field::Boolean(_) => Some(FieldType::Boolean), + Field::String(_) => Some(FieldType::String), + Field::Text(_) => Some(FieldType::Text), + Field::Binary(_) => Some(FieldType::Binary), + Field::Decimal(_) => Some(FieldType::Decimal), + Field::Timestamp(_) => Some(FieldType::Timestamp), + Field::Date(_) => Some(FieldType::Date), + Field::Json(_) => Some(FieldType::Json), + Field::Point(_) => Some(FieldType::Point), + Field::Duration(_) => Some(FieldType::Duration), + Field::Null => None, + } + } + pub fn as_uint(&self) -> Option { match self { Field::UInt(i) => Some(*i), diff --git a/dozer-types/src/types/mod.rs b/dozer-types/src/types/mod.rs index 63d27a8d08..a495c8a979 100644 --- a/dozer-types/src/types/mod.rs +++ b/dozer-types/src/types/mod.rs @@ -260,6 +260,7 @@ pub enum Operation { // Helpful in interacting with external systems during ingestion and querying // For example, nanoseconds can overflow. #[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub enum TimeUnit { Seconds, Milliseconds, @@ -320,6 +321,7 @@ impl TimeUnit { } #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct DozerDuration(pub std::time::Duration, pub TimeUnit); impl Ord for DozerDuration { @@ -371,6 +373,16 @@ impl DozerDuration { #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] pub struct DozerPoint(pub Point>); +#[cfg(feature = "arbitrary")] +impl<'a> arbitrary::Arbitrary<'a> for DozerPoint { + fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { + let x = self::field::arbitrary_float(u)?; + let y = self::field::arbitrary_float(u)?; + + Ok(Self(geo::Point::new(x, y))) + } +} + impl GeodesicDistance> for DozerPoint { fn geodesic_distance(&self, rhs: &Self) -> OrderedFloat { let f = point! { x: self.0.x().0, y: self.0.y().0 };