diff --git a/dozer-cli/src/pipeline/connector_source.rs b/dozer-cli/src/pipeline/connector_source.rs index 6b4d550fc8..570d383eda 100644 --- a/dozer-cli/src/pipeline/connector_source.rs +++ b/dozer-cli/src/pipeline/connector_source.rs @@ -1,5 +1,7 @@ use dozer_core::event::EventHub; -use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory}; +use dozer_core::node::{ + OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceMessage, +}; use dozer_core::shutdown::ShutdownReceiver; use dozer_ingestion::{ get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo, @@ -13,7 +15,7 @@ use dozer_tracing::{emit_event, DozerMonitorContext}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::connection::Connection; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; +use dozer_types::node::SourceState; use dozer_types::thiserror::{self, Error}; use dozer_types::tracing::info; use dozer_types::types::{Operation, Schema, SourceDefinition}; @@ -225,8 +227,8 @@ impl Source for ConnectorSource { async fn start( &mut self, - sender: Sender<(PortHandle, IngestionMessage)>, - last_checkpoint: Option, + sender: Sender, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { let (ingestor, iterator) = Ingestor::initialize_channel(self.ingestion_config.clone()); let connection_name = self.connection_name.clone(); @@ -305,7 +307,7 @@ impl Source for ConnectorSource { async fn forward_message_to_pipeline( mut iterator: IngestionIterator, - sender: Sender<(PortHandle, IngestionMessage)>, + sender: Sender, connection_name: String, tables: Vec, ports: Vec, @@ -325,7 +327,7 @@ async fn forward_message_to_pipeline( .init(); let mut counter = vec![(0u64, 0u64); tables.len()]; - while let Some(message) = iterator.receiver.recv().await { + while let Some((idx, message)) = iterator.receiver.recv().await { match &message { IngestionMessage::OperationEvent { table_index, op, .. @@ -374,13 +376,29 @@ async fn forward_message_to_pipeline( } // Send message to the pipeline - if sender.send((port, message)).await.is_err() { + if sender + .send(SourceMessage { + id: idx, + port, + message, + }) + .await + .is_err() + { break; } } IngestionMessage::TransactionInfo(_) => { // For transaction level messages, we can send to any port. - if sender.send((ports[0], message)).await.is_err() { + if sender + .send(SourceMessage { + id: idx, + port: ports[0], + message, + }) + .await + .is_err() + { break; } } diff --git a/dozer-cli/src/pipeline/dummy_sink.rs b/dozer-cli/src/pipeline/dummy_sink.rs index 9607278bad..c18e244ff4 100644 --- a/dozer-cli/src/pipeline/dummy_sink.rs +++ b/dozer-cli/src/pipeline/dummy_sink.rs @@ -6,7 +6,6 @@ use dozer_core::{ node::{PortHandle, Sink, SinkFactory}, DEFAULT_PORT_HANDLE, }; -use dozer_types::log::debug; use dozer_types::{ chrono::Local, errors::internal::BoxedError, @@ -14,6 +13,7 @@ use dozer_types::{ node::OpIdentifier, types::{FieldType, Operation, Schema, TableOperation}, }; +use dozer_types::{log::debug, node::SourceState}; use crate::async_trait::async_trait; @@ -179,15 +179,15 @@ impl Sink for DummySink { Ok(()) } - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } - fn get_source_state(&mut self) -> Result>, BoxedError> { + fn get_source_state_data(&mut self) -> Result>, BoxedError> { Ok(None) } - fn get_latest_op_id(&mut self) -> Result, BoxedError> { - Ok(None) + fn get_source_state(&mut self) -> Result { + Ok(SourceState::NotStarted) } } diff --git a/dozer-core/src/builder_dag.rs b/dozer-core/src/builder_dag.rs index e2041af90b..6ff3b9f8bb 100644 --- a/dozer-core/src/builder_dag.rs +++ b/dozer-core/src/builder_dag.rs @@ -6,7 +6,7 @@ use std::{ use daggy::{petgraph::visit::IntoNodeIdentifiers, NodeIndex}; use dozer_types::{ log::warn, - node::{NodeHandle, OpIdentifier}, + node::{NodeHandle, SourceState}, }; use crate::{ @@ -31,7 +31,7 @@ pub struct NodeType { pub enum NodeKind { Source { source: Box, - last_checkpoint: Option, + last_checkpoint: SourceState, }, Processor(Box), Sink(Box), @@ -76,8 +76,8 @@ impl BuilderDag { // Build the sinks and load checkpoint. let event_hub = EventHub::new(event_hub_capacity); let mut graph = daggy::Dag::new(); - let mut source_states = HashMap::new(); - let mut source_op_ids = HashMap::new(); + let mut source_state_data = HashMap::new(); + let mut source_states: HashMap = HashMap::new(); let mut source_id_to_sinks = HashMap::>::new(); let mut node_index_map: HashMap = HashMap::new(); for (node_index, node) in nodes.iter_mut().enumerate() { @@ -99,9 +99,9 @@ impl BuilderDag { .await .map_err(ExecutionError::Factory)?; - let state = sink.get_source_state().map_err(ExecutionError::Sink)?; + let state = sink.get_source_state_data().map_err(ExecutionError::Sink)?; if let Some(state) = state { - match source_states.entry(source.clone()) { + match source_state_data.entry(source.clone()) { Entry::Occupied(entry) => { if entry.get() != &state { return Err(ExecutionError::SourceStateConflict(source)); @@ -113,16 +113,18 @@ impl BuilderDag { } } - let op_id = sink.get_latest_op_id().map_err(ExecutionError::Sink)?; - if let Some(op_id) = op_id { - match source_op_ids.entry(source.clone()) { - Entry::Occupied(mut entry) => { - *entry.get_mut() = op_id.min(*entry.get()); - } - Entry::Vacant(entry) => { - entry.insert(op_id); + let resume_state = sink.get_source_state().map_err(ExecutionError::Sink)?; + match source_states.entry(source.clone()) { + Entry::Occupied(mut entry) => { + if let Some(merged) = entry.get().clone().merge(resume_state) { + *entry.get_mut() = merged; + } else { + return Err(ExecutionError::ResumeStateConflict(source)); } } + Entry::Vacant(entry) => { + entry.insert(resume_state); + } } let new_node_index = graph.add_node(NodeType { @@ -151,7 +153,7 @@ impl BuilderDag { .remove(&node_index) .expect("we collected all output schemas"), event_hub.clone(), - source_states.remove(&node.handle), + source_state_data.remove(&node.handle), ) .map_err(ExecutionError::Factory)?; @@ -163,23 +165,27 @@ impl BuilderDag { let mut checkpoint = None; for sink in source_id_to_sinks.remove(&node.handle).unwrap_or_default() { let sink = &mut graph[sink]; - let sink_handle = &sink.handle; let NodeKind::Sink(sink) = &mut sink.kind else { unreachable!() }; - sink.set_source_state(&state) + sink.set_source_state_data(&state) .map_err(ExecutionError::Sink)?; - if let Some(sink_checkpoint) = source_op_ids.remove(sink_handle) { - checkpoint = - Some(checkpoint.unwrap_or(sink_checkpoint).min(sink_checkpoint)); - } + let resume_state = source_states.remove(&node.handle); + checkpoint = + match (checkpoint, resume_state) { + (None, new) => new, + (old, None) => old, + (Some(old), Some(new)) => Some(old.merge(new).ok_or( + ExecutionError::ResumeStateConflict(node.handle.clone()), + )?), + }; } NodeType { handle: node.handle, kind: NodeKind::Source { source, - last_checkpoint: checkpoint, + last_checkpoint: checkpoint.unwrap_or(SourceState::NotStarted), }, } } diff --git a/dozer-core/src/errors.rs b/dozer-core/src/errors.rs index 72d8493045..6e7b4c327a 100644 --- a/dozer-core/src/errors.rs +++ b/dozer-core/src/errors.rs @@ -39,6 +39,8 @@ pub enum ExecutionError { Sink(#[source] BoxedError), #[error("State of {0} is not consistent across sinks")] SourceStateConflict(NodeHandle), + #[error("Resume state of {0} is not consistent across sinks")] + ResumeStateConflict(NodeHandle), #[error("File system error {0:?}: {1}")] FileSystemError(PathBuf, #[source] std::io::Error), #[error("Checkpoint writer thread panicked")] diff --git a/dozer-core/src/executor/source_node/mod.rs b/dozer-core/src/executor/source_node/mod.rs index 0fa913c004..0809d5e910 100644 --- a/dozer-core/src/executor/source_node/mod.rs +++ b/dozer-core/src/executor/source_node/mod.rs @@ -1,9 +1,7 @@ use std::{fmt::Debug, future::Future, pin::pin, sync::Arc, time::SystemTime}; use daggy::petgraph::visit::IntoNodeIdentifiers; -use dozer_types::{ - log::debug, models::ingestion_types::TransactionInfo, node::OpIdentifier, types::TableOperation, -}; +use dozer_types::{log::debug, models::ingestion_types::TransactionInfo, types::TableOperation}; use dozer_types::{models::ingestion_types::IngestionMessage, node::SourceState}; use futures::{future::Either, StreamExt}; use tokio::{ @@ -11,13 +9,10 @@ use tokio::{ sync::mpsc::{channel, Receiver, Sender}, }; +use crate::node::SourceMessage; use crate::{ - builder_dag::NodeKind, - epoch::Epoch, - errors::ExecutionError, - executor_operation::ExecutorOperation, - forwarder::ChannelManager, - node::{PortHandle, Source}, + builder_dag::NodeKind, epoch::Epoch, errors::ExecutionError, + executor_operation::ExecutorOperation, forwarder::ChannelManager, node::Source, }; use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions}; @@ -30,7 +25,7 @@ pub struct SourceNode { /// Structs for running a source. source_runners: Vec, /// Receivers from sources. - receivers: Vec>, + receivers: Vec>, /// The current epoch id. epoch_id: u64, /// The shutdown future. @@ -68,7 +63,12 @@ impl Node for SourceNode { let next = next.expect("We return just when the stream ends"); self.shutdown = shutdown; let index = next.0; - let Some((port, message)) = next.1 else { + let Some(SourceMessage { + id: message_id, + port, + message, + }) = next.1 + else { debug!("[{}] quit", self.sources[index].channel_manager.owner().id); match self.runtime.block_on( handles[index] @@ -92,7 +92,7 @@ impl Node for SourceNode { let source = &mut self.sources[index]; match message { IngestionMessage::OperationEvent { op, id, .. } => { - source.state = SourceState::NonRestartable; + source.state.set(SourceState::Started); source .channel_manager .send_op(TableOperation { op, id, port })?; @@ -100,9 +100,9 @@ impl Node for SourceNode { IngestionMessage::TransactionInfo(info) => match info { TransactionInfo::Commit { id, source_time } => { if let Some(id) = id { - source.state = SourceState::Restartable(id); + source.state.set(SourceState::Restartable(id)); } else { - source.state = SourceState::NonRestartable; + source.state.set(SourceState::Started); } let source_states = Arc::new( @@ -117,7 +117,8 @@ impl Node for SourceNode { .collect(), ); let mut epoch = - Epoch::new(self.epoch_id, source_states, SystemTime::now()); + Epoch::new(self.epoch_id, source_states, SystemTime::now()) + .with_originating_msg(message_id); if let Some(st) = source_time { epoch = epoch.with_source_time(st); } @@ -155,8 +156,8 @@ struct RunningSource { #[derive(Debug)] struct SourceRunner { source: Box, - last_checkpoint: Option, - sender: Sender<(PortHandle, IngestionMessage)>, + last_checkpoint: SourceState, + sender: Sender, } /// Returns if the operation is sent successfully. diff --git a/dozer-core/src/lib.rs b/dozer-core/src/lib.rs index 33f8663d9a..e94898bbcb 100644 --- a/dozer-core/src/lib.rs +++ b/dozer-core/src/lib.rs @@ -17,7 +17,46 @@ pub mod shutdown; pub use tokio; #[cfg(test)] -pub mod tests; +mod tests; + +pub mod test_utils { + use std::sync::atomic::AtomicUsize; + + use dozer_types::{models::ingestion_types::IngestionMessage, types::PortHandle}; + use tokio::sync::mpsc::Sender; + + use crate::node::SourceMessage; + + pub struct CountingSender { + count: AtomicUsize, + sender: Sender, + } + + impl CountingSender { + pub fn new(sender: Sender) -> Self { + Self { + count: 0.into(), + sender, + } + } + + pub async fn send( + &self, + port: PortHandle, + message: IngestionMessage, + ) -> Result<(), tokio::sync::mpsc::error::SendError> { + let idx = self.count.fetch_add(1, std::sync::atomic::Ordering::AcqRel); + self.sender + .send(SourceMessage { + id: idx, + port, + message, + }) + .await?; + Ok(()) + } + } +} pub use daggy::{self, petgraph}; pub use dozer_types::{epoch, event}; diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index d9eb136b10..ef735f2eb1 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -4,7 +4,7 @@ use crate::event::EventHub; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; +use dozer_types::node::{OpIdentifier, SourceState}; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::tonic::async_trait; use dozer_types::types::{Schema, TableOperation}; @@ -56,14 +56,20 @@ pub trait SourceFactory: Send + Sync + Debug { ) -> Result, BoxedError>; } +pub struct SourceMessage { + pub id: usize, + pub port: PortHandle, + pub message: IngestionMessage, +} + #[async_trait] pub trait Source: Send + Sync + Debug { async fn serialize_state(&self) -> Result, BoxedError>; async fn start( &mut self, - sender: Sender<(PortHandle, IngestionMessage)>, - last_checkpoint: Option, + sender: Sender, + last_checkpoint: SourceState, ) -> Result<(), BoxedError>; } @@ -121,9 +127,9 @@ pub trait Sink: Send + Debug { ) -> Result<(), BoxedError>; // Pipeline state management. - fn set_source_state(&mut self, source_state: &[u8]) -> Result<(), BoxedError>; - fn get_source_state(&mut self) -> Result>, BoxedError>; - fn get_latest_op_id(&mut self) -> Result, BoxedError>; + fn set_source_state_data(&mut self, source_state: &[u8]) -> Result<(), BoxedError>; + fn get_source_state_data(&mut self) -> Result>, BoxedError>; + fn get_source_state(&mut self) -> Result; fn preferred_batch_size(&self) -> Option { None diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index e8a2bed55a..4ec1c2238e 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -3,7 +3,7 @@ use crate::epoch::Epoch; use crate::event::EventHub; use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory, - Source, SourceFactory, + Source, SourceFactory, SourceMessage, }; use crate::tests::dag_base_run::NoopProcessorFactory; use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT}; @@ -11,7 +11,7 @@ use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::{IngestionMessage, TransactionInfo}; -use dozer_types::node::{NodeHandle, OpIdentifier}; +use dozer_types::node::{NodeHandle, OpIdentifier, SourceState}; use dozer_types::tonic::async_trait; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, TableOperation, @@ -25,6 +25,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use super::run_dag; +use crate::test_utils::CountingSender; // Test when error is generated by a processor @@ -332,16 +333,17 @@ impl Source for ErrGeneratorSource { async fn start( &mut self, - sender: Sender<(PortHandle, IngestionMessage)>, - _last_checkpoint: Option, + sender: Sender, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { + let sender = CountingSender::new(sender); for n in 1..(self.count + 1) { if n == self.err_at { return Err("Generated Error".to_string().into()); } sender - .send(( + .send( GENERATOR_SOURCE_OUTPUT_PORT, IngestionMessage::OperationEvent { table_index: 0, @@ -353,16 +355,16 @@ impl Source for ErrGeneratorSource { }, id: Some(OpIdentifier::new(0, n)), }, - )) + ) .await?; sender - .send(( + .send( GENERATOR_SOURCE_OUTPUT_PORT, IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: Some(OpIdentifier::new(0, n)), source_time: None, }), - )) + ) .await?; } Ok(()) @@ -487,16 +489,16 @@ impl Sink for ErrSink { Ok(()) } - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } - fn get_source_state(&mut self) -> Result>, BoxedError> { + fn get_source_state_data(&mut self) -> Result>, BoxedError> { Ok(None) } - fn get_latest_op_id(&mut self) -> Result, BoxedError> { - Ok(None) + fn get_source_state(&mut self) -> Result { + Ok(SourceState::NotStarted) } } diff --git a/dozer-core/src/tests/sinks.rs b/dozer-core/src/tests/sinks.rs index 6f67a10216..4fd73b21af 100644 --- a/dozer-core/src/tests/sinks.rs +++ b/dozer-core/src/tests/sinks.rs @@ -3,7 +3,7 @@ use crate::event::EventHub; use crate::node::{PortHandle, Sink, SinkFactory}; use crate::DEFAULT_PORT_HANDLE; use dozer_types::errors::internal::BoxedError; -use dozer_types::node::OpIdentifier; +use dozer_types::node::{OpIdentifier, SourceState}; use dozer_types::types::{Schema, TableOperation}; use dozer_types::log::debug; @@ -99,16 +99,16 @@ impl Sink for CountingSink { Ok(()) } - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } - fn get_source_state(&mut self) -> Result>, BoxedError> { + fn get_source_state_data(&mut self) -> Result>, BoxedError> { Ok(None) } - fn get_latest_op_id(&mut self) -> Result, BoxedError> { - Ok(None) + fn get_source_state(&mut self) -> Result { + Ok(SourceState::NotStarted) } } diff --git a/dozer-core/src/tests/sources.rs b/dozer-core/src/tests/sources.rs index 896b1cf782..e68378962c 100644 --- a/dozer-core/src/tests/sources.rs +++ b/dozer-core/src/tests/sources.rs @@ -1,9 +1,11 @@ use crate::event::EventHub; -use crate::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory}; +use crate::node::{ + OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceMessage, +}; use crate::DEFAULT_PORT_HANDLE; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::{IngestionMessage, TransactionInfo}; -use dozer_types::node::OpIdentifier; +use dozer_types::node::{OpIdentifier, SourceState}; use dozer_types::tonic::async_trait; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, @@ -16,6 +18,8 @@ use std::sync::Arc; use std::time::Duration; +use crate::test_utils::CountingSender; + pub(crate) const GENERATOR_SOURCE_OUTPUT_PORT: PortHandle = 100; #[derive(Debug)] @@ -101,15 +105,18 @@ impl Source for GeneratorSource { async fn start( &mut self, - sender: Sender<(PortHandle, IngestionMessage)>, - last_checkpoint: Option, + sender: Sender, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - let start = last_checkpoint - .map(|checkpoint| checkpoint.seq_in_tx + 1) - .unwrap_or(0); + let start = if let SourceState::Restartable(op_id) = last_checkpoint { + op_id.seq_in_tx + 1 + } else { + 0 + }; + let sender = CountingSender::new(sender); for n in start..(start + self.count) { sender - .send(( + .send( GENERATOR_SOURCE_OUTPUT_PORT, IngestionMessage::OperationEvent { table_index: 0, @@ -121,16 +128,16 @@ impl Source for GeneratorSource { }, id: Some(OpIdentifier::new(0, n)), }, - )) + ) .await?; sender - .send(( + .send( GENERATOR_SOURCE_OUTPUT_PORT, IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: Some(OpIdentifier::new(0, n)), source_time: None, }), - )) + ) .await?; } @@ -245,12 +252,13 @@ impl Source for DualPortGeneratorSource { async fn start( &mut self, - sender: Sender<(PortHandle, IngestionMessage)>, - _last_checkpoint: Option, + sender: Sender, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { + let sender = CountingSender::new(sender); for n in 1..(self.count + 1) { sender - .send(( + .send( DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1, IngestionMessage::OperationEvent { table_index: 0, @@ -262,10 +270,10 @@ impl Source for DualPortGeneratorSource { }, id: Some(OpIdentifier::new(0, n)), }, - )) + ) .await?; sender - .send(( + .send( DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2, IngestionMessage::OperationEvent { table_index: 0, @@ -277,16 +285,16 @@ impl Source for DualPortGeneratorSource { }, id: Some(OpIdentifier::new(0, n)), }, - )) + ) .await?; sender - .send(( + .send( DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1, IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: Some(OpIdentifier::new(0, n)), source_time: None, }), - )) + ) .await?; } loop { diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index 663c5fdbcb..63d4232d15 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -235,7 +235,7 @@ struct PendingMessage { #[derive(Debug)] struct PendingOperationId { - operation_id: u64, + commit_msg_id: usize, sender: oneshot::Sender<()>, } @@ -245,28 +245,26 @@ async fn ingestor_loop( ingestor: Ingestor, operation_id_sender: mpsc::UnboundedSender, ) { - let mut operation_id = 0; while let Some(message) = message_receiver.recv().await { - let pending_operation_id = PendingOperationId { - operation_id, - sender: message.sender, - }; - - // Propagate panic in the pipeline event processor loop. - operation_id_sender.send(pending_operation_id).unwrap(); - // Ignore the error, because the server can be down. for message in message.messages { - let _ = ingestor.handle_message(message).await; + let _ = ingestor.handle_message(message).await.unwrap(); } - let _ = ingestor + let commit_msg_id = ingestor .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { - id: Some(OpIdentifier::new(0, operation_id)), + id: None, source_time: Some(message.source_time), })) - .await; + .await + .unwrap(); - operation_id += 1; + let pending_operation_id = PendingOperationId { + commit_msg_id, + sender: message.sender, + }; + + // Propagate panic in the pipeline event processor loop. + operation_id_sender.send(pending_operation_id).unwrap(); } } @@ -282,7 +280,7 @@ async fn pipeline_event_processor( if operation_id_from_pipeline < pending_operation_id .as_ref() - .map(|operation_id| operation_id.operation_id) + .map(|operation_id| operation_id.commit_msg_id) { // We have pending operation id, wait for pipeline event. let event = match event_receiver.recv().await { @@ -296,7 +294,7 @@ async fn pipeline_event_processor( continue; } }; - if let Some(operation_id) = get_operation_id_from_event(&event, &node_handle) { + if let Some(operation_id) = get_msg_id_from_event(&event, &node_handle) { operation_id_from_pipeline = Some(operation_id); } } else if let Some(pending) = pending_operation_id.take() { @@ -313,16 +311,10 @@ async fn pipeline_event_processor( } } -fn get_operation_id_from_event(event: &Event, node_handle: &NodeHandle) -> Option { +fn get_msg_id_from_event(event: &Event, node_handle: &NodeHandle) -> Option { match event { - Event::SinkFlushed { epoch, .. } => epoch - .common_info - .source_states - .get(node_handle) - .and_then(|state| match state { - SourceState::Restartable(id) => Some(id.seq_in_tx), - _ => None, - }), + Event::SinkFlushed { epoch, node } if node == node_handle => epoch.originating_msg_id, + _ => None, } } @@ -604,12 +596,12 @@ impl Connector for AerospikeConnector { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { let hosts = CString::new(self.config.hosts.as_str())?; let client = Client::new(&hosts).map_err(Box::new)?; - if last_checkpoint.is_none() { + if let SourceState::NotStarted = last_checkpoint { let dc_name = self.config.replication.datacenter.clone(); let namespace = self.config.namespace.clone(); diff --git a/dozer-ingestion/benches/helper.rs b/dozer-ingestion/benches/helper.rs index fdcd3b5b92..9347017e66 100644 --- a/dozer-ingestion/benches/helper.rs +++ b/dozer-ingestion/benches/helper.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use dozer_ingestion::dozer_types::event::EventHub; +use dozer_ingestion::dozer_types::{event::EventHub, node::SourceState}; use dozer_ingestion_connector::{ dozer_types::{ indicatif::{ProgressBar, ProgressStyle}, @@ -52,7 +52,9 @@ pub fn get_connection_iterator(runtime: Arc, config: TestConfig) -> Ing let tables = runtime.block_on(list_tables(&mut *connector)); let (ingestor, iterator) = Ingestor::initialize_channel(Default::default()); runtime.clone().spawn_blocking(move || async move { - if let Err(e) = runtime.block_on(connector.start(&ingestor, tables, None)) { + if let Err(e) = + runtime.block_on(connector.start(&ingestor, tables, SourceState::NotStarted)) + { error!("Error starting connector: {:?}", e); } }); diff --git a/dozer-ingestion/connector/src/ingestor.rs b/dozer-ingestion/connector/src/ingestor.rs index 170ee9601e..350601bf25 100644 --- a/dozer-ingestion/connector/src/ingestor.rs +++ b/dozer-ingestion/connector/src/ingestor.rs @@ -1,5 +1,10 @@ use dozer_types::models::ingestion_types::IngestionMessage; -use std::{error::Error, fmt::Display, time::Duration}; +use std::{ + error::Error, + fmt::Display, + sync::{atomic::AtomicUsize, Arc}, + time::Duration, +}; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, time::timeout, @@ -19,21 +24,26 @@ impl Default for IngestionConfig { } #[derive(Debug)] -/// `IngestionIterator` is the receiver side of a spsc channel. The sender side is `Ingestor`. +/// `IngestionIterator` is the receiver side of a mpsc channel. The sender side is `Ingestor`. pub struct IngestionIterator { - pub receiver: Receiver, + pub receiver: Receiver<(usize, IngestionMessage)>, } impl Iterator for IngestionIterator { type Item = IngestionMessage; fn next(&mut self) -> Option { - self.receiver.blocking_recv() + let (_idx, msg) = self.receiver.blocking_recv()?; + Some(msg) } } impl IngestionIterator { pub async fn next_timeout(&mut self, duration: Duration) -> Option { - timeout(duration, self.receiver.recv()).await.ok().flatten() + timeout(duration, self.receiver.recv()) + .await + .ok() + .flatten() + .map(|(_id, msg)| msg) } } @@ -42,7 +52,8 @@ impl IngestionIterator { /// /// `IngestionMessage` is the message type that is sent over the channel. pub struct Ingestor { - sender: Sender, + msg_idx: Arc, + sender: Sender<(usize, IngestionMessage)>, } #[derive(Debug, Clone, Copy)] @@ -59,18 +70,34 @@ impl Error for SendError {} impl Ingestor { pub fn initialize_channel(config: IngestionConfig) -> (Ingestor, IngestionIterator) { let (sender, receiver) = channel(config.forwarder_channel_cap); - let ingestor = Self { sender }; + let ingestor = Self { + sender, + msg_idx: Arc::new(0.into()), + }; let iterator = IngestionIterator { receiver }; (ingestor, iterator) } - pub async fn handle_message(&self, message: IngestionMessage) -> Result<(), SendError> { - self.sender.send(message).await.map_err(|_| SendError) + pub async fn handle_message(&self, message: IngestionMessage) -> Result { + let idx = self + .msg_idx + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + self.sender + .send((idx, message)) + .await + .map_err(|_| SendError)?; + Ok(idx) } - pub fn blocking_handle_message(&self, message: IngestionMessage) -> Result<(), SendError> { - self.sender.blocking_send(message).map_err(|_| SendError) + pub fn blocking_handle_message(&self, message: IngestionMessage) -> Result { + let idx = self + .msg_idx + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + self.sender + .blocking_send((idx, message)) + .map_err(|_| SendError)?; + Ok(idx) } pub fn is_closed(&self) -> bool { @@ -86,8 +113,9 @@ mod tests { #[tokio::test] async fn test_message_handle() { - let (sender, mut rx) = tokio::sync::mpsc::channel(10); - let ingestor = Ingestor { sender }; + let (ingestor, mut rx) = Ingestor::initialize_channel(crate::IngestionConfig { + forwarder_channel_cap: 10, + }); // Expected seq no - 2 let operation = Operation::Insert { @@ -124,14 +152,17 @@ mod tests { let expected_op_event_message = vec![operation, operation2].into_iter(); - for op in expected_op_event_message { - let msg = rx.recv().await.unwrap(); + for (i, op) in expected_op_event_message.enumerate() { + let msg = rx.receiver.recv().await.unwrap(); assert_eq!( - IngestionMessage::OperationEvent { - table_index: 0, - op, - id: None - }, + ( + i, + IngestionMessage::OperationEvent { + table_index: 0, + op, + id: None + } + ), msg ); } diff --git a/dozer-ingestion/connector/src/lib.rs b/dozer-ingestion/connector/src/lib.rs index fdea56e861..bada4cf9be 100644 --- a/dozer-ingestion/connector/src/lib.rs +++ b/dozer-ingestion/connector/src/lib.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use dozer_types::errors::internal::BoxedError; -use dozer_types::node::OpIdentifier; +use dozer_types::node::SourceState; use dozer_types::serde; use dozer_types::serde::{Deserialize, Serialize}; pub use dozer_types::tonic::async_trait; @@ -107,7 +107,7 @@ pub trait Connector: Send + Sync + Debug { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError>; } diff --git a/dozer-ingestion/connector/src/test_util.rs b/dozer-ingestion/connector/src/test_util.rs index 50703ff266..4bcde4462f 100644 --- a/dozer-ingestion/connector/src/test_util.rs +++ b/dozer-ingestion/connector/src/test_util.rs @@ -4,6 +4,7 @@ use dozer_types::{ constants::DEFAULT_CONFIG_PATH, log::error, models::{config::Config, connection::ConnectionConfig}, + node::SourceState, }; use futures::stream::{AbortHandle, Abortable}; use tokio::runtime::Runtime; @@ -28,8 +29,11 @@ pub fn spawn_connector( let (abort_handle, abort_registration) = AbortHandle::new_pair(); runtime.clone().spawn_blocking(move || { runtime.block_on(async move { - if let Ok(Err(e)) = - Abortable::new(connector.start(&ingestor, tables, None), abort_registration).await + if let Ok(Err(e)) = Abortable::new( + connector.start(&ingestor, tables, SourceState::NotStarted), + abort_registration, + ) + .await { error!("Connector `start` returned error: {e}") } diff --git a/dozer-ingestion/deltalake/src/connector.rs b/dozer-ingestion/deltalake/src/connector.rs index 958af46360..7b2f45faf4 100644 --- a/dozer-ingestion/deltalake/src/connector.rs +++ b/dozer-ingestion/deltalake/src/connector.rs @@ -3,7 +3,7 @@ use crate::schema_helper::SchemaHelper; use dozer_ingestion_connector::{ async_trait, dozer_types::{ - errors::internal::BoxedError, models::ingestion_types::DeltaLakeConfig, node::OpIdentifier, + errors::internal::BoxedError, models::ingestion_types::DeltaLakeConfig, node::SourceState, types::FieldType, }, utils::{ListOrFilterColumns, TableNotFound}, @@ -117,9 +117,9 @@ impl Connector for DeltaLakeConnector { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - assert!(last_checkpoint.is_none()); + assert!(last_checkpoint.op_id().is_none()); let reader = DeltaLakeReader::new(self.config.clone()); reader.read(&tables, ingestor).await } diff --git a/dozer-ingestion/ethereum/src/log/connector.rs b/dozer-ingestion/ethereum/src/log/connector.rs index e41661f82d..d28691b168 100644 --- a/dozer-ingestion/ethereum/src/log/connector.rs +++ b/dozer-ingestion/ethereum/src/log/connector.rs @@ -3,7 +3,7 @@ use std::{str::FromStr, sync::Arc}; use super::helper; use super::sender::{run, EthDetails}; -use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::node::SourceState; use dozer_ingestion_connector::utils::TableNotFound; use dozer_ingestion_connector::{ async_trait, @@ -242,7 +242,7 @@ impl Connector for EthLogConnector { &mut self, ingestor: &Ingestor, tables: Vec, - _last_checkpoint: Option, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { // Start a new thread that interfaces with ETH node let wss_url = self.config.wss_url.to_owned(); diff --git a/dozer-ingestion/ethereum/src/trace/connector.rs b/dozer-ingestion/ethereum/src/trace/connector.rs index 215aa5717a..e064a33e29 100644 --- a/dozer-ingestion/ethereum/src/trace/connector.rs +++ b/dozer-ingestion/ethereum/src/trace/connector.rs @@ -4,7 +4,7 @@ use dozer_ingestion_connector::{ errors::internal::BoxedError, log::{error, info, warn}, models::ingestion_types::{default_batch_size, EthTraceConfig, IngestionMessage}, - node::OpIdentifier, + node::SourceState, types::FieldType, }, utils::TableNotFound, @@ -106,7 +106,7 @@ impl Connector for EthTraceConnector { &mut self, ingestor: &Ingestor, _tables: Vec, - _last_checkpoint: Option, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { let config = self.config.clone(); let conn_name = self.conn_name.clone(); diff --git a/dozer-ingestion/grpc/src/connector.rs b/dozer-ingestion/grpc/src/connector.rs index 33ecb3e9dc..00f27606f9 100644 --- a/dozer-ingestion/grpc/src/connector.rs +++ b/dozer-ingestion/grpc/src/connector.rs @@ -4,7 +4,7 @@ use crate::Error; use super::adapter::{GrpcIngestor, IngestAdapter}; use super::ingest::IngestorServiceImpl; -use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::node::SourceState; use dozer_ingestion_connector::schema_parser::SchemaParser; use dozer_ingestion_connector::utils::TableNotFound; use dozer_ingestion_connector::{ @@ -201,7 +201,7 @@ where &mut self, ingestor: &Ingestor, tables: Vec, - _last_checkpoint: Option, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { self.serve(ingestor, tables).await.map_err(Into::into) } diff --git a/dozer-ingestion/javascript/src/lib.rs b/dozer-ingestion/javascript/src/lib.rs index ba9fef603a..66c88d9c8d 100644 --- a/dozer-ingestion/javascript/src/lib.rs +++ b/dozer-ingestion/javascript/src/lib.rs @@ -5,7 +5,7 @@ use dozer_ingestion_connector::{ dozer_types::{ errors::internal::BoxedError, models::ingestion_types::{default_bootstrap_path, JavaScriptConfig}, - node::OpIdentifier, + node::SourceState, types::{FieldDefinition, FieldType, Schema, SourceDefinition}, }, tokio::runtime::Runtime, @@ -83,7 +83,7 @@ impl Connector for JavaScriptConnector { &mut self, ingestor: &Ingestor, _tables: Vec, - _last_checkpoint: Option, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { let js_path = self .config diff --git a/dozer-ingestion/kafka/src/connector.rs b/dozer-ingestion/kafka/src/connector.rs index e48d672d58..95cfeb7a1a 100644 --- a/dozer-ingestion/kafka/src/connector.rs +++ b/dozer-ingestion/kafka/src/connector.rs @@ -2,6 +2,7 @@ use dozer_ingestion_connector::async_trait; use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError; use dozer_ingestion_connector::dozer_types::models::ingestion_types::KafkaConfig; use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::node::SourceState; use dozer_ingestion_connector::dozer_types::types::FieldType; use dozer_ingestion_connector::Connector; use dozer_ingestion_connector::Ingestor; @@ -135,13 +136,13 @@ impl Connector for KafkaConnector { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { let broker = self.config.broker.to_owned(); run( broker, tables, - last_checkpoint, + last_checkpoint.op_id(), ingestor, &self.config.schema_registry_url, ) diff --git a/dozer-ingestion/mongodb/src/lib.rs b/dozer-ingestion/mongodb/src/lib.rs index 190be41267..c4040cdd1c 100644 --- a/dozer-ingestion/mongodb/src/lib.rs +++ b/dozer-ingestion/mongodb/src/lib.rs @@ -8,7 +8,7 @@ use dozer_ingestion_connector::{ errors::{internal::BoxedError, types::DeserializationError}, json_types::{serde_json_to_json_value, JsonValue}, models::ingestion_types::{IngestionMessage, TransactionInfo}, - node::OpIdentifier, + node::SourceState, thiserror::{self, Error}, types::{Field, FieldDefinition, FieldType, Operation, Record, SourceDefinition}, }, @@ -599,7 +599,7 @@ impl Connector for MongodbConnector { &mut self, ingestor: &Ingestor, tables: Vec, - _last_checkpoint: Option, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { // Snapshot: find // diff --git a/dozer-ingestion/mysql/src/connector.rs b/dozer-ingestion/mysql/src/connector.rs index 3229335cc7..a2d3d9915d 100644 --- a/dozer-ingestion/mysql/src/connector.rs +++ b/dozer-ingestion/mysql/src/connector.rs @@ -13,9 +13,8 @@ use dozer_ingestion_connector::{ dozer_types::{ errors::internal::BoxedError, log::info, - models::ingestion_types::IngestionMessage, - models::ingestion_types::TransactionInfo, - node::OpIdentifier, + models::ingestion_types::{IngestionMessage, TransactionInfo}, + node::{OpIdentifier, SourceState}, types::{FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition}, }, utils::TableNotFound, @@ -200,9 +199,9 @@ impl Connector for MySQLConnector { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - self.replicate(ingestor, tables, last_checkpoint) + self.replicate(ingestor, tables, last_checkpoint.op_id()) .await .map_err(Into::into) } diff --git a/dozer-ingestion/object-store/src/connector.rs b/dozer-ingestion/object-store/src/connector.rs index 20d7e44601..fc17c917ba 100644 --- a/dozer-ingestion/object-store/src/connector.rs +++ b/dozer-ingestion/object-store/src/connector.rs @@ -3,7 +3,7 @@ use dozer_ingestion_connector::dozer_types::log::error; use dozer_ingestion_connector::dozer_types::models::ingestion_types::{ IngestionMessage, TransactionInfo, }; -use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::node::SourceState; use dozer_ingestion_connector::dozer_types::types::FieldType; use dozer_ingestion_connector::futures::future::try_join_all; use dozer_ingestion_connector::tokio::sync::mpsc::channel; @@ -103,9 +103,9 @@ impl Connector for ObjectStoreConnector { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - assert!(last_checkpoint.is_none()); + assert!(!matches!(last_checkpoint, SourceState::Restartable(_))); let (sender, mut receiver) = channel::, ObjectStoreConnectorError>>(100); // todo: increase buffer siz let ingestor_clone = ingestor.clone(); @@ -118,7 +118,9 @@ impl Connector for ObjectStoreConnector { .await .ok_or(ObjectStoreConnectorError::RecvError)?; match message { - Ok(Some(evt)) => ingestor_clone.handle_message(evt).await?, + Ok(Some(evt)) => { + ingestor_clone.handle_message(evt).await?; + } Ok(None) => { break; } diff --git a/dozer-ingestion/oracle/src/lib.rs b/dozer-ingestion/oracle/src/lib.rs index cf398cb097..3923ad3058 100644 --- a/dozer-ingestion/oracle/src/lib.rs +++ b/dozer-ingestion/oracle/src/lib.rs @@ -4,7 +4,7 @@ use dozer_ingestion_connector::{ errors::internal::BoxedError, log::info, models::ingestion_types::{IngestionMessage, OracleConfig, TransactionInfo}, - node::OpIdentifier, + node::{OpIdentifier, SourceState}, types::FieldType, }, tokio, Connector, Ingestor, SourceSchemaResult, TableIdentifier, TableInfo, @@ -150,9 +150,9 @@ impl Connector for OracleConnector { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - let checkpoint = if let Some(last_checkpoint) = last_checkpoint { + let checkpoint = if let SourceState::Restartable(last_checkpoint) = last_checkpoint { last_checkpoint.txid } else { info!("No checkpoint passed, starting snapshotting"); diff --git a/dozer-ingestion/postgres/src/connector.rs b/dozer-ingestion/postgres/src/connector.rs index 3ba69bc02e..6ea8f6e205 100644 --- a/dozer-ingestion/postgres/src/connector.rs +++ b/dozer-ingestion/postgres/src/connector.rs @@ -1,4 +1,4 @@ -use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::node::SourceState; use dozer_ingestion_connector::{ async_trait, dozer_types::{errors::internal::BoxedError, types::FieldType}, @@ -171,9 +171,11 @@ impl Connector for PostgresConnector { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - let lsn = last_checkpoint.map(|checkpoint| checkpoint.txid.into()); + let lsn = last_checkpoint + .op_id() + .map(|checkpoint| checkpoint.txid.into()); if lsn.is_none() { let client = helper::connect(self.replication_conn_config.clone()).await?; diff --git a/dozer-ingestion/snowflake/src/connector/snowflake.rs b/dozer-ingestion/snowflake/src/connector/snowflake.rs index 6b844568bf..66bed03aca 100644 --- a/dozer-ingestion/snowflake/src/connector/snowflake.rs +++ b/dozer-ingestion/snowflake/src/connector/snowflake.rs @@ -4,7 +4,7 @@ use dozer_ingestion_connector::{ errors::internal::BoxedError, log::{info, warn}, models::ingestion_types::{default_snowflake_poll_interval, SnowflakeConfig}, - node::OpIdentifier, + node::{OpIdentifier, SourceState}, types::FieldType, }, tokio, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, @@ -123,13 +123,14 @@ impl Connector for SnowflakeConnector { &mut self, ingestor: &Ingestor, tables: Vec, - last_checkpoint: Option, + last_checkpoint: SourceState, ) -> Result<(), BoxedError> { + let op_id = last_checkpoint.op_id(); spawn_blocking({ let name = self.name.clone(); let config = self.config.clone(); let ingestor = ingestor.clone(); - move || run(name, config, tables, last_checkpoint, ingestor) + move || run(name, config, tables, op_id, ingestor) }) .await .map_err(Into::into) diff --git a/dozer-ingestion/webhook/src/connector.rs b/dozer-ingestion/webhook/src/connector.rs index 84d9f08764..6d0ee04d09 100644 --- a/dozer-ingestion/webhook/src/connector.rs +++ b/dozer-ingestion/webhook/src/connector.rs @@ -3,7 +3,7 @@ use dozer_ingestion_connector::{ async_trait, dozer_types::{ self, errors::internal::BoxedError, models::ingestion_types::WebhookConfig, - node::OpIdentifier, + node::SourceState, }, utils::TableNotFound, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, @@ -145,7 +145,7 @@ impl Connector for WebhookConnector { &mut self, ingestor: &Ingestor, tables: Vec, - _last_checkpoint: Option, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { let config = self.config.clone(); let server = WebhookServer::new(config); diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index c28937932b..16bece75ef 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -5,7 +5,7 @@ use denorm_dag::DenormalizationState; use dozer_core::event::EventHub; use dozer_types::log::error; use dozer_types::models::connection::AerospikeConnection; -use dozer_types::node::OpIdentifier; +use dozer_types::node::{OpIdentifier, SourceState}; use dozer_types::thiserror; use itertools::Itertools; @@ -511,15 +511,15 @@ impl Sink for AerospikeSink { Ok(()) } - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } - fn get_source_state(&mut self) -> Result>, BoxedError> { + fn get_source_state_data(&mut self) -> Result>, BoxedError> { Ok(None) } - fn get_latest_op_id(&mut self) -> Result, BoxedError> { + fn get_source_state(&mut self) -> Result { let mut _k = MaybeUninit::uninit(); let mut _r = std::ptr::null_mut(); unsafe { @@ -537,7 +537,7 @@ impl Sink for AerospikeSink { Err(AerospikeError { code: as_status_e_AEROSPIKE_ERR_RECORD_NOT_FOUND, message: _, - }) => return Ok(None), + }) => return Ok(SourceState::NotStarted), Err(e) => return Err(e.into()), } let record = AsRecord(_r.as_mut().unwrap()); @@ -547,12 +547,12 @@ impl Sink for AerospikeSink { -1, ); if txid > 0 { - Ok(Some(OpIdentifier { + Ok(SourceState::Restartable(OpIdentifier { txid: txid as u64, seq_in_tx: 0, })) } else { - Ok(None) + Ok(SourceState::Started) } } } diff --git a/dozer-sink-clickhouse/src/sink.rs b/dozer-sink-clickhouse/src/sink.rs index 4b0c70b4bb..5524475b3e 100644 --- a/dozer-sink-clickhouse/src/sink.rs +++ b/dozer-sink-clickhouse/src/sink.rs @@ -7,7 +7,7 @@ use dozer_types::errors::internal::BoxedError; use dozer_types::log::debug; use dozer_types::models::sink::{ClickhouseSinkConfig, ClickhouseTableOptions}; -use dozer_types::node::OpIdentifier; +use dozer_types::node::{OpIdentifier, SourceState}; use crate::client::ClickhouseClient; use crate::errors::ClickhouseSinkError; @@ -310,16 +310,16 @@ impl Sink for ClickhouseSink { Ok(()) } - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } - fn get_source_state(&mut self) -> Result>, BoxedError> { + fn get_source_state_data(&mut self) -> Result>, BoxedError> { Ok(None) } - fn get_latest_op_id(&mut self) -> Result, BoxedError> { + fn get_source_state(&mut self) -> Result { // self.get_latest_op() - Ok(None) + Ok(SourceState::NotStarted) } } diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 71e9594476..033abc652e 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -1,6 +1,7 @@ use dozer_types::{ log::warn, models::sink::OracleSinkConfig, + node::SourceState, thiserror, types::{FieldDefinition, Operation, SourceDefinition, TableOperation}, }; @@ -23,7 +24,7 @@ use dozer_types::{ }; use oracle::{ sql_type::{OracleType, ToSql}, - Connection, + Connection, RowValue, }; const TXN_ID_COL: &str = "__txn_id"; @@ -85,27 +86,132 @@ struct BatchedOperation { params: Record, } +#[derive(Debug)] +struct MetadataQueries { + insert_metadata: String, + update_metadata: String, + select_metadata: String, +} + +#[derive(Debug)] +enum WriteStrategy { + AppendOnly { + insert_append_stmt: String, + }, + Merge { + merge_stmt: String, + insert_stmt: String, + delete_stmt: String, + }, +} + +impl WriteStrategy { + fn write_batch( + &self, + connection: &Connection, + batch_values: &mut Vec, + field_types: &[FieldType], + ) -> oracle::Result<()> { + match self { + WriteStrategy::AppendOnly { insert_append_stmt } => { + Self::do_write_appendonly(connection, batch_values, insert_append_stmt, field_types) + } + WriteStrategy::Merge { + merge_stmt, + insert_stmt, + delete_stmt, + } => Self::do_write_merge( + connection, + batch_values, + merge_stmt, + insert_stmt, + delete_stmt, + field_types, + ), + } + } + + fn do_write_appendonly( + connection: &Connection, + batch_values: &mut Vec, + insert_stmt: &str, + field_types: &[FieldType], + ) -> oracle::Result<()> { + let mut batch = connection.batch(insert_stmt, batch_values.len()).build()?; + for params in batch_values.drain(..) { + let mut bind_idx = 1..; + for ((field, typ), i) in params + .params + .values + .into_iter() + .zip(field_types) + .zip(&mut bind_idx) + { + batch.set(i, &OraField(field, *typ))?; + } + batch.append_row(&[])?; + } + batch.execute()?; + + Ok(()) + } + + fn do_write_merge( + connection: &Connection, + batch_values: &mut Vec, + merge_stmt: &str, + insert_stmt: &str, + delete_stmt: &str, + field_types: &[FieldType], + ) -> oracle::Result<()> { + let mut batch = connection.batch(insert_stmt, batch_values.len()).build()?; + for params in batch_values.drain(..) { + let mut bind_idx = 1..; + for ((field, typ), i) in params + .params + .values + .into_iter() + .zip(field_types) + .zip(&mut bind_idx) + { + batch.set(i, &OraField(field, *typ))?; + } + let (txid, seq_in_tx) = params.op_id.map(|opid| (opid.txid, opid.seq_in_tx)).unzip(); + batch.set(bind_idx.next().unwrap(), &txid)?; + batch.set(bind_idx.next().unwrap(), &seq_in_tx)?; + batch.set(bind_idx.next().unwrap(), &(params.op_kind as u64))?; + batch.append_row(&[])?; + } + batch.execute()?; + + connection.execute(merge_stmt, &[])?; + + connection.execute(delete_stmt, &[])?; + + Ok(()) + } +} + #[derive(Debug)] struct OracleSink { + table: Table, conn: Connection, - insert_append: String, pk: Vec, field_types: Vec, - merge_statement: String, batch_params: Vec, batch_size: usize, - insert_metadata: String, - update_metadata: String, - select_metadata: String, + metadata_queries: Option, latest_txid: Option, - insert_statement: String, - delete_statement: String, + batch_insert_stmt: String, + append_only: bool, + write_strategy: WriteStrategy, } #[derive(Debug)] pub struct OracleSinkFactory { connection_config: OracleConfig, table: Table, + append_only: bool, } impl std::fmt::Display for Table { @@ -126,6 +232,7 @@ impl OracleSinkFactory { name: config.table_name, unique_key: config.unique_key, }, + append_only: config.append_only, } } } @@ -188,133 +295,143 @@ fn parse_oracle_type( }; Some(typ) } +fn validate_table(connection: &Connection, table: &Table, schema: &Schema) -> Result { + let err = |e| Error::IncompatibleSchema { + table: table.clone(), + inner: e, + }; -impl OracleSinkFactory { - fn validate_table( - &self, - connection: &Connection, - table: &Table, - schema: &Schema, - ) -> Result { - let err = |e| Error::IncompatibleSchema { - table: table.clone(), - inner: e, - }; + #[derive(RowValue)] + struct Column { + column_name: String, + data_type: String, + data_length: u32, + data_precision: Option, + data_scale: Option, + nullable: String, + data_default: Option, + } - let results = connection.query_as::<(String, String, u32, Option, Option, String)>( - "SELECT COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE FROM ALL_TAB_COLS WHERE table_name = :1 AND owner = :2", + let results = connection.query_as::( + "SELECT COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT FROM ALL_TAB_COLS WHERE table_name = :1 AND owner = :2", &[&table.name, &table.owner], )?; - let mut cols = HashMap::new(); - for col in results { - let col = col?; - cols.insert(col.0.clone(), col); - } + let mut cols = HashMap::new(); + for col in results { + let col = col?; + cols.insert(col.column_name.clone(), col); + } - // The table does not exist - if cols.is_empty() { - return Ok(false); - } + // The table does not exist + if cols.is_empty() { + return Ok(false); + } - for field in &schema.fields { - let definition = cols - .remove(&field.name) - .ok_or_else(|| err(SchemaValidationError::MissingColumn(field.name.clone())))?; - let (_, type_name, length, precision, scale, nullable) = definition; - let Some(typ) = parse_oracle_type(&type_name, length, precision, scale) else { - return Err(err(SchemaValidationError::UnsupportedType( - type_name.clone(), - ))); - }; - match (field.typ, typ) { - ( - FieldType::String | FieldType::Text, - OracleType::Varchar2(_) | OracleType::NVarchar2(_), - ) => {} - (FieldType::U128 | FieldType::I128, OracleType::Number(precision, 0)) - if precision >= 39 => {} - (FieldType::UInt | FieldType::Int, OracleType::Number(precision, 0)) - if precision >= 20 => {} - (FieldType::Float, OracleType::Number(38, 0) | OracleType::BinaryDouble) => {} - (FieldType::Boolean, OracleType::Number(_, 0)) => {} - (FieldType::Binary, OracleType::Raw(_)) => {} - (FieldType::Timestamp, OracleType::Timestamp(_) | OracleType::TimestampTZ(_)) => {} - (FieldType::Date, OracleType::Date) => {} - (FieldType::Decimal, OracleType::Number(_, _)) => {} - (dozer_type, remote_type) => { - return Err(err(SchemaValidationError::IncompatibleType { - field: field.name.clone(), - dozer_type, - remote_type, - })) - } - } - if (field.nullable, nullable.as_str()) == (true, "N") { - return Err(err(SchemaValidationError::MismatchedNullability { - src: field.nullable, - sink: false, - })); + for field in &schema.fields { + let definition = cols + .remove(&field.name) + .ok_or_else(|| err(SchemaValidationError::MissingColumn(field.name.clone())))?; + let Some(typ) = parse_oracle_type( + &definition.data_type, + definition.data_length, + definition.data_precision, + definition.data_scale, + ) else { + return Err(err(SchemaValidationError::UnsupportedType( + definition.data_type.clone(), + ))); + }; + match (field.typ, typ) { + ( + FieldType::String | FieldType::Text, + OracleType::Varchar2(_) | OracleType::NVarchar2(_), + ) => {} + (FieldType::U128 | FieldType::I128, OracleType::Number(precision, 0)) + if precision >= 39 => {} + (FieldType::UInt | FieldType::Int, OracleType::Number(precision, 0)) + if precision >= 20 => {} + (FieldType::Float, OracleType::Number(38, 0) | OracleType::BinaryDouble) => {} + (FieldType::Boolean, OracleType::Number(_, 0)) => {} + (FieldType::Binary, OracleType::Raw(_)) => {} + (FieldType::Timestamp, OracleType::Timestamp(_) | OracleType::TimestampTZ(_)) => {} + (FieldType::Date, OracleType::Date) => {} + (FieldType::Decimal, OracleType::Number(_, _)) => {} + (dozer_type, remote_type) => { + return Err(err(SchemaValidationError::IncompatibleType { + field: field.name.clone(), + dozer_type, + remote_type, + })) } } - - if !cols.is_empty() { - return Err(err(SchemaValidationError::ExtraColumns( - cols.keys().cloned().collect(), - ))); + if (field.nullable, definition.nullable.as_str()) == (true, "N") { + return Err(err(SchemaValidationError::MismatchedNullability { + src: field.nullable, + sink: false, + })); } - Ok(true) } - fn validate_or_create_table( - &self, - connection: &Connection, - table: &Table, - temp_table: Option<&Table>, - schema: &Schema, - ) -> Result<(), Error> { - let mut column_defs = Vec::with_capacity(schema.fields.len()); - for field in &schema.fields { - let name = &field.name; - let col_type = match field.typ { - FieldType::UInt => "NUMBER(20)", - FieldType::U128 => unimplemented!(), - FieldType::Int => "NUMBER(20)", - FieldType::I128 => unimplemented!(), - // Should this be BINARY_DOUBLE? - FieldType::Float => "NUMBER", - FieldType::Boolean => "NUMBER", - FieldType::String => "VARCHAR2(2000)", - FieldType::Text => "VARCHAR2(2000)", - FieldType::Binary => "RAW(1000)", - FieldType::Decimal => "NUMBER(29, 10)", - FieldType::Timestamp => "TIMESTAMP(9) WITH TIME ZONE", - FieldType::Date => "TIMESTAMP(0)", - FieldType::Json => unimplemented!(), - FieldType::Point => unimplemented!("Oracle Point"), - FieldType::Duration => unimplemented!(), - }; - column_defs.push(format!( - "\"{name}\" {col_type}{}", - if field.nullable { "" } else { " NOT NULL" } - )); - } + let left_over: Vec = cols + .into_values() + .filter(|col| (col.nullable == "N") && col.data_default.is_none()) + .map(|col| col.column_name) + .collect(); + if !left_over.is_empty() { + return Err(err(SchemaValidationError::ExtraColumns(left_over))); + } + Ok(true) +} - if !(self.validate_table(connection, table, schema)?) { - let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); - info!("### CREATE TABLE ####\n{}", table_query); - connection.execute(&table_query, &[])?; - } +fn validate_or_create_table( + connection: &Connection, + table: &Table, + temp_table: Option<&Table>, + schema: &Schema, +) -> Result<(), Error> { + let mut column_defs = Vec::with_capacity(schema.fields.len()); + for field in &schema.fields { + let name = &field.name; + let col_type = match field.typ { + FieldType::UInt => "NUMBER(20)", + FieldType::U128 => unimplemented!(), + FieldType::Int => "NUMBER(20)", + FieldType::I128 => unimplemented!(), + // Should this be BINARY_DOUBLE? + FieldType::Float => "NUMBER", + FieldType::Boolean => "NUMBER", + FieldType::String => "VARCHAR2(2000)", + FieldType::Text => "VARCHAR2(2000)", + FieldType::Binary => "RAW(1000)", + FieldType::Decimal => "NUMBER(29, 10)", + FieldType::Timestamp => "TIMESTAMP(9) WITH TIME ZONE", + FieldType::Date => "TIMESTAMP(0)", + FieldType::Json => unimplemented!(), + FieldType::Point => unimplemented!("Oracle Point"), + FieldType::Duration => unimplemented!(), + }; + column_defs.push(format!( + "\"{name}\" {col_type}{}", + if field.nullable { "" } else { " NOT NULL" } + )); + } - if let Some(temp_table) = temp_table { - let temp_table_query = format!("CREATE PRIVATE TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT PRESERVE DEFINITION", column_defs.join(",\n")).replace("NOT NULL", ""); - info!("### CREATE TEMPORARY TABLE ####\n{}", temp_table_query); - connection.execute(&temp_table_query, &[])?; - } + if !(validate_table(connection, table, schema)?) { + let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n")); + info!("### CREATE TABLE ####\n{}", table_query); + connection.execute(&table_query, &[])?; + } - Ok(()) + if let Some(temp_table) = temp_table { + let temp_table_query = format!("CREATE PRIVATE TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT PRESERVE DEFINITION", column_defs.join(",\n")).replace("NOT NULL", ""); + info!("### CREATE TEMPORARY TABLE ####\n{}", temp_table_query); + connection.execute(&temp_table_query, &[])?; } + Ok(()) +} +impl OracleSinkFactory { fn create_index( &self, connection: &Connection, @@ -510,26 +627,28 @@ impl SinkFactory for OracleSinkFactory { let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); let mut amended_schema = schema.clone(); - amended_schema.field( - dozer_types::types::FieldDefinition { - name: TXN_ID_COL.to_owned(), - typ: FieldType::UInt, - nullable: true, - source: dozer_types::types::SourceDefinition::Dynamic, - description: None, - }, - false, - ); - amended_schema.field( - dozer_types::types::FieldDefinition { - name: TXN_SEQ_COL.to_owned(), - typ: FieldType::UInt, - nullable: true, - source: dozer_types::types::SourceDefinition::Dynamic, - description: None, - }, - false, - ); + if !self.append_only { + amended_schema.field( + dozer_types::types::FieldDefinition { + name: TXN_ID_COL.to_owned(), + typ: FieldType::UInt, + nullable: true, + source: dozer_types::types::SourceDefinition::Dynamic, + description: None, + }, + false, + ); + amended_schema.field( + dozer_types::types::FieldDefinition { + name: TXN_SEQ_COL.to_owned(), + typ: FieldType::UInt, + nullable: true, + source: dozer_types::types::SourceDefinition::Dynamic, + description: None, + }, + false, + ); + } let temp_table = Table { owner: self.table.owner.clone(), @@ -537,19 +656,21 @@ impl SinkFactory for OracleSinkFactory { unique_key: vec![], }; - self.validate_or_create_table( + validate_or_create_table( &connection, &self.table, - Some(&temp_table), + (!self.append_only).then_some(&temp_table), &amended_schema, )?; - self.create_index(&connection, &self.table, &amended_schema)?; + if !self.append_only { + self.create_index(&connection, &self.table, &amended_schema)?; + } let meta_table = Table { owner: self.table.owner.clone(), name: METADATA_TABLE.to_owned(), unique_key: vec![], }; - self.validate_or_create_table( + validate_or_create_table( &connection, &meta_table, None, @@ -578,8 +699,14 @@ impl SinkFactory for OracleSinkFactory { let insert_append = format!( //"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})", - "INSERT INTO {} VALUES ({})", + "INSERT INTO {} ({}) VALUES ({})", &self.table, + amended_schema + .fields + .iter() + .map(|field| format!("\"{}\"", field.name)) + .collect::>() + .join(", "), (1..=amended_schema.fields.len()) .map(|i| format!(":{i}")) .collect::>() @@ -588,30 +715,31 @@ impl SinkFactory for OracleSinkFactory { let field_types = schema.fields.iter().map(|field| field.typ).collect(); - let merge_statement = generate_merge_statement(&self.table, &temp_table, &schema); - info!(target: "oracle_sink", "Merge statement {}", merge_statement); - - let insert_statement = generate_insert_statement(&temp_table, &schema); - info!(target: "oracle_sink", "Insert statement {}", insert_statement); - - let delete_statement = generate_delete_statement(&temp_table); - info!(target: "oracle_sink", "Delete statement {}", delete_statement); + let write_strategy = if self.append_only { + WriteStrategy::AppendOnly { + insert_append_stmt: insert_append.clone(), + } + } else { + WriteStrategy::Merge { + merge_stmt: generate_merge_statement(&self.table, &temp_table, &schema), + insert_stmt: generate_insert_statement(&temp_table, &schema), + delete_stmt: generate_delete_statement(&temp_table), + } + }; Ok(Box::new(OracleSink { + table: self.table.clone(), conn: connection, - insert_append, - merge_statement, - insert_statement, - delete_statement, + batch_insert_stmt: insert_append.clone(), + write_strategy, field_types, pk: schema.primary_index, batch_params: Vec::new(), + metadata_queries: None, //TODO: make this configurable batch_size: 10000, - insert_metadata: format!("INSERT INTO \"{METADATA_TABLE}\" (\"{META_TABLE_COL}\", \"{META_TXN_ID_COL}\") VALUES (q'\"{}_{}\"', :1)", &self.table.owner, &self.table.name), - update_metadata: format!("UPDATE \"{METADATA_TABLE}\" SET \"{META_TXN_ID_COL}\" = :1 WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name) , - select_metadata: format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name), latest_txid: None, + append_only: self.append_only, })) } } @@ -681,32 +809,8 @@ impl OracleSink { debug!(target: "oracle_sink", "Executing batch of size {}", self.batch_params.len()); let started = std::time::Instant::now(); - let mut batch = self - .conn - .batch(&self.insert_statement, self.batch_params.len()) - .build()?; - for params in self.batch_params.drain(..) { - let mut bind_idx = 1..; - for ((field, typ), i) in params - .params - .values - .into_iter() - .zip(&self.field_types) - .zip(&mut bind_idx) - { - batch.set(i, &OraField(field, *typ))?; - } - let (txid, seq_in_tx) = params.op_id.map(|opid| (opid.txid, opid.seq_in_tx)).unzip(); - batch.set(bind_idx.next().unwrap(), &txid)?; - batch.set(bind_idx.next().unwrap(), &seq_in_tx)?; - batch.set(bind_idx.next().unwrap(), &(params.op_kind as u64))?; - batch.append_row(&[])?; - } - batch.execute()?; - - self.conn.execute(&self.merge_statement, &[])?; - - self.conn.execute(&self.delete_statement, &[])?; + self.write_strategy + .write_batch(&self.conn, &mut self.batch_params, &self.field_types)?; debug!(target: "oracle_sink", "Execution took {:?}", started.elapsed()); Ok(()) @@ -728,6 +832,49 @@ impl OracleSink { } Ok(()) } + + fn create_metadata_queries(&self) -> MetadataQueries { + MetadataQueries { + insert_metadata: format!("INSERT INTO \"{METADATA_TABLE}\" (\"{META_TABLE_COL}\", \"{META_TXN_ID_COL}\") VALUES (q'\"{}_{}\"', :1)", &self.table.owner, &self.table.name), + update_metadata: format!("UPDATE \"{METADATA_TABLE}\" SET \"{META_TXN_ID_COL}\" = :1 WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name) , + select_metadata: format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = q'\"{}_{}\"'", &self.table.owner, &self.table.name), + } + } + fn init_metadata_table(&mut self) -> Result<(), BoxedError> { + let queries = self.create_metadata_queries(); + validate_or_create_table( + &self.conn, + &Table { + owner: self.table.owner.clone(), + name: METADATA_TABLE.to_owned(), + unique_key: vec![], + }, + None, + Schema::new() + .field( + FieldDefinition { + name: META_TABLE_COL.to_owned(), + typ: FieldType::String, + nullable: false, + source: SourceDefinition::Dynamic, + description: None, + }, + true, + ) + .field( + FieldDefinition { + name: META_TXN_ID_COL.to_owned(), + typ: FieldType::UInt, + nullable: false, + source: SourceDefinition::Dynamic, + description: None, + }, + false, + ), + )?; + self.metadata_queries = Some(queries); + Ok(()) + } } impl Sink for OracleSink { @@ -746,14 +893,19 @@ impl Sink for OracleSink { fn flush_batch(&mut self) -> Result<(), BoxedError> { self.exec_batch()?; if let Some(txid) = self.latest_txid { + if self.metadata_queries.is_none() { + self.init_metadata_table()?; + } + let queries = self.metadata_queries.as_ref().unwrap(); + // If the row_count == 0, we need to insert instead. if self .conn - .execute(&self.update_metadata, &[&txid])? + .execute(&queries.update_metadata, &[&txid])? .row_count()? == 0 { - self.conn.execute(&self.insert_metadata, &[&txid])?; + self.conn.execute(&queries.insert_metadata, &[&txid])?; } } self.conn.commit()?; @@ -787,7 +939,7 @@ impl Sink for OracleSink { Operation::BatchInsert { mut new } => { let mut batch = self .conn - .batch(&self.insert_append, self.batch_size) + .batch(&self.batch_insert_stmt, self.batch_size) .build()?; for record in new.drain(..) { let mut bind_idx = 1..; @@ -799,9 +951,11 @@ impl Sink for OracleSink { { batch.set(i, &OraField(field, *typ))?; } - let (txid, seq_in_tx) = op.id.map(|id| (id.txid, id.seq_in_tx)).unzip(); - batch.set(bind_idx.next().unwrap(), &txid)?; - batch.set(bind_idx.next().unwrap(), &seq_in_tx)?; + if !self.append_only { + let (txid, seq_in_tx) = op.id.map(|id| (id.txid, id.seq_in_tx)).unzip(); + batch.set(bind_idx.next().unwrap(), &txid)?; + batch.set(bind_idx.next().unwrap(), &seq_in_tx)?; + } batch.append_row(&[])?; } @@ -828,28 +982,52 @@ impl Sink for OracleSink { Ok(()) } - fn set_source_state( + fn set_source_state_data( &mut self, _source_state: &[u8], ) -> Result<(), dozer_types::errors::internal::BoxedError> { Ok(()) } - fn get_source_state( + fn get_source_state_data( &mut self, ) -> Result>, dozer_types::errors::internal::BoxedError> { Ok(None) } - fn get_latest_op_id( + fn get_source_state( &mut self, - ) -> Result, dozer_types::errors::internal::BoxedError> - { - match self.conn.query_row_as::(&self.select_metadata, &[]) { - Ok(txid) => Ok(Some(OpIdentifier { txid, seq_in_tx: 0 })), - Err(oracle::Error::NoDataFound) => Ok(None), - Err(e) => Err(e.into()), + ) -> Result { + let queries = self.create_metadata_queries(); + match self.conn.query_row_as::(&queries.select_metadata, &[]) { + Ok(txid) => { + return Ok(SourceState::Restartable(OpIdentifier { + txid, + seq_in_tx: 0, + })); + } + // Not found... + Err(oracle::Error::NoDataFound) => {} + // ... or the table does not exist + Err(oracle::Error::OciError(e)) if e.code() == 942 => {} + // TODO: Check for data -> ResumeState::Resume + Err(e) => { + return Err(e.into()); + } + }; + // If we're here, we could not find a op_id. If there is data, we started, + // but the source did not supply an op_id. + match self.conn.query_row( + &format!("SELECT 1 FROM {} WHERE rownum = 1", self.table), + &[], + ) { + Ok(_) => return Ok(SourceState::Started), + Err(oracle::Error::NoDataFound) => {} + // ... or the table does not exist + Err(oracle::Error::OciError(e)) if e.code() == 942 => {} + Err(e) => return Err(e.into()), } + Ok(SourceState::NotStarted) } } @@ -930,6 +1108,7 @@ mod tests { table_name: "test".into(), owner: None, unique_key: vec![], + append_only: false, }, ); let mut schema = Schema::new(); diff --git a/dozer-sql/Cargo.toml b/dozer-sql/Cargo.toml index 28bb354f41..01d736d39e 100644 --- a/dozer-sql/Cargo.toml +++ b/dozer-sql/Cargo.toml @@ -23,6 +23,7 @@ tokio = { version = "1", features = ["rt", "macros"] } [dev-dependencies] proptest = "1.3.1" +dozer-core = { path = "../dozer-core" } [features] python = ["dozer-sql-expression/python"] diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index 4d2b398255..2bd3c5a2a7 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -5,13 +5,15 @@ use dozer_core::event::EventHub; use dozer_core::executor::DagExecutor; use dozer_core::node::{ OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory, + SourceMessage, }; +use dozer_core::test_utils::CountingSender; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_types::chrono::DateTime; use dozer_types::errors::internal::BoxedError; use dozer_types::log::debug; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; +use dozer_types::node::{OpIdentifier, SourceState}; use dozer_types::ordered_float::OrderedFloat; use dozer_types::tonic::async_trait; use dozer_types::types::{ @@ -111,12 +113,13 @@ impl Source for TestSource { async fn start( &mut self, - sender: Sender<(PortHandle, IngestionMessage)>, - _last_checkpoint: Option, + sender: Sender, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { + let sender = CountingSender::new(sender); for _ in 0..10 { sender - .send(( + .send( DEFAULT_PORT_HANDLE, IngestionMessage::OperationEvent { table_index: 0, @@ -132,7 +135,7 @@ impl Source for TestSource { }, id: None, }, - )) + ) .await .unwrap(); } @@ -206,16 +209,16 @@ impl Sink for TestSink { Ok(()) } - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } - fn get_source_state(&mut self) -> Result>, BoxedError> { + fn get_source_state_data(&mut self) -> Result>, BoxedError> { Ok(None) } - fn get_latest_op_id(&mut self) -> Result, BoxedError> { - Ok(None) + fn get_source_state(&mut self) -> Result { + Ok(SourceState::NotStarted) } } diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 434b6ae1c3..5b2c6256fc 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -7,8 +7,10 @@ use dozer_core::errors::ExecutionError; use dozer_core::event::EventHub; use dozer_core::node::{ OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory, + SourceMessage, }; +use dozer_core::test_utils::CountingSender; use dozer_core::{Dag, DEFAULT_PORT_HANDLE}; use dozer_core::executor::DagExecutor; @@ -18,7 +20,7 @@ use dozer_sql::builder::statement_to_pipeline; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; +use dozer_types::node::{OpIdentifier, SourceState}; use dozer_types::types::{Operation, Record, Schema, SourceDefinition, TableOperation}; use std::collections::HashMap; use std::future::pending; @@ -127,20 +129,21 @@ impl Source for TestSource { async fn start( &mut self, - sender: tokio::sync::mpsc::Sender<(PortHandle, IngestionMessage)>, - _last_checkpoint: Option, + sender: tokio::sync::mpsc::Sender, + _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { + let sender = CountingSender::new(sender); while let Ok(Some((schema_name, op))) = self.receiver.recv() { let port = self.name_to_port.get(&schema_name).expect("port not found"); sender - .send(( + .send( *port, IngestionMessage::OperationEvent { table_index: 0, op, id: None, }, - )) + ) .await .unwrap(); } @@ -276,16 +279,16 @@ impl Sink for TestSink { Ok(()) } - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } - fn get_source_state(&mut self) -> Result>, BoxedError> { + fn get_source_state_data(&mut self) -> Result>, BoxedError> { Ok(None) } - fn get_latest_op_id(&mut self) -> Result, BoxedError> { - Ok(None) + fn get_source_state(&mut self) -> Result { + Ok(SourceState::NotStarted) } } diff --git a/dozer-types/src/epoch.rs b/dozer-types/src/epoch.rs index f2fc2da674..ad842b5baa 100644 --- a/dozer-types/src/epoch.rs +++ b/dozer-types/src/epoch.rs @@ -58,6 +58,7 @@ pub struct Epoch { pub common_info: EpochCommonInfo, pub decision_instant: SystemTime, pub source_time: Option, + pub originating_msg_id: Option, } impl Epoch { @@ -66,6 +67,7 @@ impl Epoch { common_info: EpochCommonInfo { id, source_states }, decision_instant, source_time: None, + originating_msg_id: None, } } @@ -73,4 +75,9 @@ impl Epoch { self.source_time = Some(source_time); self } + + pub fn with_originating_msg(mut self, originating_msg_id: usize) -> Self { + self.originating_msg_id = Some(originating_msg_id); + self + } } diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index 10c1f468d5..25e8ac242f 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -248,6 +248,8 @@ pub struct OracleSinkConfig { pub unique_key: Vec, #[serde(default)] pub owner: Option, + #[serde(default)] + pub append_only: bool, } pub fn default_log_reader_batch_size() -> u32 { diff --git a/dozer-types/src/node.rs b/dozer-types/src/node.rs index b5b5562ae5..580b2ddf71 100644 --- a/dozer-types/src/node.rs +++ b/dozer-types/src/node.rs @@ -104,6 +104,13 @@ impl OpIdentifier { } } +#[derive(Debug, Clone)] +pub enum ResumeState { + Resume, + ResumeFromOpid(OpIdentifier), + DontResume, +} + #[derive( Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, bincode::Encode, bincode::Decode, )] @@ -111,16 +118,47 @@ impl OpIdentifier { pub enum SourceState { /// This source hasn't been ingested. NotStarted, - /// This source has some data ingested, and it can't be restarted. - NonRestartable, + /// This source has some data ingested, but it doesn't supply OpIdentifiers + Started, /// This source has some data ingested, and it can be restarted if it's given the op id. Restartable(OpIdentifier), } impl SourceState { - pub fn op_id(&self) -> Option<&OpIdentifier> { + pub fn set(&mut self, new: SourceState) { + match (&self, &new) { + (_, Self::NotStarted) => { + panic!("Cannot go back from Started to NotStarted state") + } + // We can restart from the earlier one + (Self::Restartable(_), Self::Started) => {} + (Self::Restartable(old_id), Self::Restartable(new_id)) => { + assert!( + new_id > old_id, + "SourceState::Restartable op identifier cannot go backwards" + ); + } + _ => {} + } + *self = new; + } + + pub fn merge(self, new: SourceState) -> Option { + match (self, new) { + (Self::NotStarted, Self::NotStarted) => Some(Self::NotStarted), + (_, Self::NotStarted) => None, + // We can restart from the earlier one + (Self::Restartable(_), Self::Started) => None, + (Self::Restartable(old_id), Self::Restartable(new_id)) => { + Some(Self::Restartable(old_id.min(new_id))) + } + (_, new) => Some(new), + } + } + + pub fn op_id(&self) -> Option { if let Self::Restartable(op_id) = self { - Some(op_id) + Some(*op_id) } else { None } diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 3bac6f286e..d96348471a 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -1466,6 +1466,10 @@ "table_name" ], "properties": { + "append_only": { + "default": false, + "type": "boolean" + }, "connection": { "type": "string" },