Skip to content

Commit

Permalink
Allow for non-opid-emitting sources to be restartable.
Browse files Browse the repository at this point in the history
This is useful for the aerospike source, because it uses XDR to track
source offsets.

Additionally, the oracle sink can now be append-only and it does not
need additional metadata columns in that case. It will also not create a
metadata table if the source does not emit OpIdentifiers.
  • Loading branch information
Jesse-Bakker committed Apr 3, 2024
1 parent 7660863 commit 54f2e0e
Show file tree
Hide file tree
Showing 38 changed files with 770 additions and 417 deletions.
34 changes: 26 additions & 8 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use dozer_core::event::EventHub;
use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
use dozer_core::node::{
OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceMessage,
};
use dozer_core::shutdown::ShutdownReceiver;
use dozer_ingestion::{
get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo,
Expand All @@ -13,7 +15,7 @@ use dozer_tracing::{emit_event, DozerMonitorContext};
use dozer_types::errors::internal::BoxedError;
use dozer_types::models::connection::Connection;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::node::OpIdentifier;
use dozer_types::node::SourceState;
use dozer_types::thiserror::{self, Error};
use dozer_types::tracing::info;
use dozer_types::types::{Operation, Schema, SourceDefinition};
Expand Down Expand Up @@ -225,8 +227,8 @@ impl Source for ConnectorSource {

async fn start(
&mut self,
sender: Sender<(PortHandle, IngestionMessage)>,
last_checkpoint: Option<OpIdentifier>,
sender: Sender<SourceMessage>,
last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
let (ingestor, iterator) = Ingestor::initialize_channel(self.ingestion_config.clone());
let connection_name = self.connection_name.clone();
Expand Down Expand Up @@ -305,7 +307,7 @@ impl Source for ConnectorSource {

async fn forward_message_to_pipeline(
mut iterator: IngestionIterator,
sender: Sender<(PortHandle, IngestionMessage)>,
sender: Sender<SourceMessage>,
connection_name: String,
tables: Vec<TableInfo>,
ports: Vec<PortHandle>,
Expand All @@ -325,7 +327,7 @@ async fn forward_message_to_pipeline(
.init();

let mut counter = vec![(0u64, 0u64); tables.len()];
while let Some(message) = iterator.receiver.recv().await {
while let Some((idx, message)) = iterator.receiver.recv().await {
match &message {
IngestionMessage::OperationEvent {
table_index, op, ..
Expand Down Expand Up @@ -374,13 +376,29 @@ async fn forward_message_to_pipeline(
}

// Send message to the pipeline
if sender.send((port, message)).await.is_err() {
if sender
.send(SourceMessage {
id: idx,
port,
message,
})
.await
.is_err()
{
break;
}
}
IngestionMessage::TransactionInfo(_) => {
// For transaction level messages, we can send to any port.
if sender.send((ports[0], message)).await.is_err() {
if sender
.send(SourceMessage {
id: idx,
port: ports[0],
message,
})
.await
.is_err()
{
break;
}
}
Expand Down
10 changes: 5 additions & 5 deletions dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use dozer_core::{
node::{PortHandle, Sink, SinkFactory},
DEFAULT_PORT_HANDLE,
};
use dozer_types::log::debug;
use dozer_types::{
chrono::Local,
errors::internal::BoxedError,
log::{info, warn},
node::OpIdentifier,
types::{FieldType, Operation, Schema, TableOperation},
};
use dozer_types::{log::debug, node::SourceState};

use crate::async_trait::async_trait;

Expand Down Expand Up @@ -179,15 +179,15 @@ impl Sink for DummySink {
Ok(())
}

fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> {
fn set_source_state_data(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> {
Ok(())
}

fn get_source_state(&mut self) -> Result<Option<Vec<u8>>, BoxedError> {
fn get_source_state_data(&mut self) -> Result<Option<Vec<u8>>, BoxedError> {
Ok(None)
}

fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError> {
Ok(None)
fn get_source_state(&mut self) -> Result<SourceState, BoxedError> {
Ok(SourceState::NotStarted)
}
}
50 changes: 28 additions & 22 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use daggy::{petgraph::visit::IntoNodeIdentifiers, NodeIndex};
use dozer_types::{
log::warn,
node::{NodeHandle, OpIdentifier},
node::{NodeHandle, SourceState},
};

use crate::{
Expand All @@ -31,7 +31,7 @@ pub struct NodeType {
pub enum NodeKind {
Source {
source: Box<dyn Source>,
last_checkpoint: Option<OpIdentifier>,
last_checkpoint: SourceState,
},
Processor(Box<dyn Processor>),
Sink(Box<dyn Sink>),
Expand Down Expand Up @@ -76,8 +76,8 @@ impl BuilderDag {
// Build the sinks and load checkpoint.
let event_hub = EventHub::new(event_hub_capacity);
let mut graph = daggy::Dag::new();
let mut source_states = HashMap::new();
let mut source_op_ids = HashMap::new();
let mut source_state_data = HashMap::new();
let mut source_states: HashMap<NodeHandle, SourceState> = HashMap::new();
let mut source_id_to_sinks = HashMap::<NodeHandle, Vec<NodeIndex>>::new();
let mut node_index_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
for (node_index, node) in nodes.iter_mut().enumerate() {
Expand All @@ -99,9 +99,9 @@ impl BuilderDag {
.await
.map_err(ExecutionError::Factory)?;

let state = sink.get_source_state().map_err(ExecutionError::Sink)?;
let state = sink.get_source_state_data().map_err(ExecutionError::Sink)?;
if let Some(state) = state {
match source_states.entry(source.clone()) {
match source_state_data.entry(source.clone()) {
Entry::Occupied(entry) => {
if entry.get() != &state {
return Err(ExecutionError::SourceStateConflict(source));
Expand All @@ -113,16 +113,18 @@ impl BuilderDag {
}
}

let op_id = sink.get_latest_op_id().map_err(ExecutionError::Sink)?;
if let Some(op_id) = op_id {
match source_op_ids.entry(source.clone()) {
Entry::Occupied(mut entry) => {
*entry.get_mut() = op_id.min(*entry.get());
}
Entry::Vacant(entry) => {
entry.insert(op_id);
let resume_state = sink.get_source_state().map_err(ExecutionError::Sink)?;
match source_states.entry(source.clone()) {
Entry::Occupied(mut entry) => {
if let Some(merged) = entry.get().clone().merge(resume_state) {
*entry.get_mut() = merged;
} else {
return Err(ExecutionError::ResumeStateConflict(source));
}
}
Entry::Vacant(entry) => {
entry.insert(resume_state);
}
}

let new_node_index = graph.add_node(NodeType {
Expand Down Expand Up @@ -151,7 +153,7 @@ impl BuilderDag {
.remove(&node_index)
.expect("we collected all output schemas"),
event_hub.clone(),
source_states.remove(&node.handle),
source_state_data.remove(&node.handle),
)
.map_err(ExecutionError::Factory)?;

Expand All @@ -163,23 +165,27 @@ impl BuilderDag {
let mut checkpoint = None;
for sink in source_id_to_sinks.remove(&node.handle).unwrap_or_default() {
let sink = &mut graph[sink];
let sink_handle = &sink.handle;
let NodeKind::Sink(sink) = &mut sink.kind else {
unreachable!()
};
sink.set_source_state(&state)
sink.set_source_state_data(&state)
.map_err(ExecutionError::Sink)?;
if let Some(sink_checkpoint) = source_op_ids.remove(sink_handle) {
checkpoint =
Some(checkpoint.unwrap_or(sink_checkpoint).min(sink_checkpoint));
}
let resume_state = source_states.remove(&node.handle);
checkpoint =
match (checkpoint, resume_state) {
(None, new) => new,
(old, None) => old,
(Some(old), Some(new)) => Some(old.merge(new).ok_or(
ExecutionError::ResumeStateConflict(node.handle.clone()),
)?),
};
}

NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint: checkpoint,
last_checkpoint: checkpoint.unwrap_or(SourceState::NotStarted),
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub enum ExecutionError {
Sink(#[source] BoxedError),
#[error("State of {0} is not consistent across sinks")]
SourceStateConflict(NodeHandle),
#[error("Resume state of {0} is not consistent across sinks")]
ResumeStateConflict(NodeHandle),
#[error("File system error {0:?}: {1}")]
FileSystemError(PathBuf, #[source] std::io::Error),
#[error("Checkpoint writer thread panicked")]
Expand Down
35 changes: 18 additions & 17 deletions dozer-core/src/executor/source_node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use std::{fmt::Debug, future::Future, pin::pin, sync::Arc, time::SystemTime};

use daggy::petgraph::visit::IntoNodeIdentifiers;
use dozer_types::{
log::debug, models::ingestion_types::TransactionInfo, node::OpIdentifier, types::TableOperation,
};
use dozer_types::{log::debug, models::ingestion_types::TransactionInfo, types::TableOperation};
use dozer_types::{models::ingestion_types::IngestionMessage, node::SourceState};
use futures::{future::Either, StreamExt};
use tokio::{
runtime::Runtime,
sync::mpsc::{channel, Receiver, Sender},
};

use crate::node::SourceMessage;
use crate::{
builder_dag::NodeKind,
epoch::Epoch,
errors::ExecutionError,
executor_operation::ExecutorOperation,
forwarder::ChannelManager,
node::{PortHandle, Source},
builder_dag::NodeKind, epoch::Epoch, errors::ExecutionError,
executor_operation::ExecutorOperation, forwarder::ChannelManager, node::Source,
};

use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions};
Expand All @@ -30,7 +25,7 @@ pub struct SourceNode<F> {
/// Structs for running a source.
source_runners: Vec<SourceRunner>,
/// Receivers from sources.
receivers: Vec<Receiver<(PortHandle, IngestionMessage)>>,
receivers: Vec<Receiver<SourceMessage>>,
/// The current epoch id.
epoch_id: u64,
/// The shutdown future.
Expand Down Expand Up @@ -68,7 +63,12 @@ impl<F: Future + Unpin> Node for SourceNode<F> {
let next = next.expect("We return just when the stream ends");
self.shutdown = shutdown;
let index = next.0;
let Some((port, message)) = next.1 else {
let Some(SourceMessage {
id: message_id,
port,
message,
}) = next.1
else {
debug!("[{}] quit", self.sources[index].channel_manager.owner().id);
match self.runtime.block_on(
handles[index]
Expand All @@ -92,17 +92,17 @@ impl<F: Future + Unpin> Node for SourceNode<F> {
let source = &mut self.sources[index];
match message {
IngestionMessage::OperationEvent { op, id, .. } => {
source.state = SourceState::NonRestartable;
source.state.set(SourceState::Started);
source
.channel_manager
.send_op(TableOperation { op, id, port })?;
}
IngestionMessage::TransactionInfo(info) => match info {
TransactionInfo::Commit { id, source_time } => {
if let Some(id) = id {
source.state = SourceState::Restartable(id);
source.state.set(SourceState::Restartable(id));
} else {
source.state = SourceState::NonRestartable;
source.state.set(SourceState::Started);
}

let source_states = Arc::new(
Expand All @@ -117,7 +117,8 @@ impl<F: Future + Unpin> Node for SourceNode<F> {
.collect(),
);
let mut epoch =
Epoch::new(self.epoch_id, source_states, SystemTime::now());
Epoch::new(self.epoch_id, source_states, SystemTime::now())
.with_originating_msg(message_id);
if let Some(st) = source_time {
epoch = epoch.with_source_time(st);
}
Expand Down Expand Up @@ -155,8 +156,8 @@ struct RunningSource {
#[derive(Debug)]
struct SourceRunner {
source: Box<dyn Source>,
last_checkpoint: Option<OpIdentifier>,
sender: Sender<(PortHandle, IngestionMessage)>,
last_checkpoint: SourceState,
sender: Sender<SourceMessage>,
}

/// Returns if the operation is sent successfully.
Expand Down
41 changes: 40 additions & 1 deletion dozer-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,46 @@ pub mod shutdown;
pub use tokio;

#[cfg(test)]
pub mod tests;
mod tests;

pub mod test_utils {
use std::sync::atomic::AtomicUsize;

use dozer_types::{models::ingestion_types::IngestionMessage, types::PortHandle};
use tokio::sync::mpsc::Sender;

use crate::node::SourceMessage;

pub struct CountingSender {
count: AtomicUsize,
sender: Sender<SourceMessage>,
}

impl CountingSender {
pub fn new(sender: Sender<SourceMessage>) -> Self {
Self {
count: 0.into(),
sender,
}
}

pub async fn send(
&self,
port: PortHandle,
message: IngestionMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<SourceMessage>> {
let idx = self.count.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
self.sender
.send(SourceMessage {
id: idx,
port,
message,
})
.await?;
Ok(())
}
}
}

pub use daggy::{self, petgraph};
pub use dozer_types::{epoch, event};
Loading

0 comments on commit 54f2e0e

Please sign in to comment.