Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Send source state and op id to sink #2354

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async fn forward_message_to_pipeline(
break;
}
}
IngestionMessage::SnapshottingDone | IngestionMessage::SnapshottingStarted => {
IngestionMessage::SnapshottingDone { .. } | IngestionMessage::SnapshottingStarted => {
for port in &ports {
if sender.send((*port, message.clone())).await.is_err() {
break;
Expand Down
25 changes: 21 additions & 4 deletions dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use dozer_types::{
chrono::Local,
errors::internal::BoxedError,
log::{info, warn},
types::{FieldType, Operation, Schema},
node::OpIdentifier,
types::{FieldType, Operation, OperationWithId, Schema},
};

#[derive(Debug)]
Expand Down Expand Up @@ -59,10 +60,10 @@ impl Sink for DummySink {
&mut self,
_from_port: PortHandle,
_record_store: &ProcessorRecordStore,
op: Operation,
op: OperationWithId,
) -> Result<(), BoxedError> {
if let Some(inserted_at_index) = self.inserted_at_index {
if let Operation::Insert { new } = op {
if let Operation::Insert { new } = op.op {
let value = &new.values[inserted_at_index];
if let Some(inserted_at) = value.to_timestamp() {
let latency = Local::now().naive_utc() - inserted_at.naive_utc();
Expand Down Expand Up @@ -92,7 +93,11 @@ impl Sink for DummySink {
Ok(())
}

fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> {
fn on_source_snapshotting_done(
&mut self,
connection_name: String,
_id: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
if let Some(started_instant) = self.snapshotting_started_instant.remove(&connection_name) {
info!(
"Snapshotting for connection {} took {:?}",
Expand All @@ -107,4 +112,16 @@ impl Sink for DummySink {
}
Ok(())
}

fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> {
Ok(())
}

fn get_source_state(&mut self) -> Result<Option<Vec<u8>>, BoxedError> {
Ok(None)
}

fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError> {
Ok(None)
}
}
26 changes: 21 additions & 5 deletions dozer-cli/src/pipeline/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use dozer_core::{
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_tracing::LabelsAndProgress;
use dozer_types::indicatif::ProgressBar;
use dozer_types::types::Schema;
use dozer_types::{errors::internal::BoxedError, types::Operation};
use dozer_types::{errors::internal::BoxedError, node::OpIdentifier};
use dozer_types::{indicatif::ProgressBar, types::OperationWithId};
use tokio::{runtime::Runtime, sync::Mutex};

#[derive(Debug)]
Expand Down Expand Up @@ -89,12 +89,12 @@ impl Sink for LogSink {
&mut self,
_from_port: PortHandle,
_record_store: &ProcessorRecordStore,
op: Operation,
op: OperationWithId,
) -> Result<(), BoxedError> {
let end = self
.runtime
.block_on(self.log.lock())
.write(dozer_cache::dozer_log::replication::LogOperation::Op { op });
.write(dozer_cache::dozer_log::replication::LogOperation::Op { op: op.op });
self.pb.set_position(end as u64);
Ok(())
}
Expand Down Expand Up @@ -133,12 +133,28 @@ impl Sink for LogSink {
Ok(())
}

fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> {
fn on_source_snapshotting_done(
&mut self,
connection_name: String,
_id: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
let end = self
.runtime
.block_on(self.log.lock())
.write(LogOperation::SnapshottingDone { connection_name });
self.pb.set_position(end as u64);
Ok(())
}

fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> {
Ok(())
}

fn get_source_state(&mut self) -> Result<Option<Vec<u8>>, BoxedError> {
Ok(None)
}

fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError> {
Ok(None)
}
}
31 changes: 7 additions & 24 deletions dozer-cli/src/simple/build/contract/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap},
fs::OpenOptions,
path::Path,
};
Expand All @@ -11,7 +11,7 @@ use dozer_core::{
node::PortHandle,
petgraph::{
algo::is_isomorphic_matching,
visit::{EdgeRef, IntoEdgesDirected, IntoNodeReferences},
visit::{IntoEdgesDirected, IntoNodeReferences},
Direction,
},
};
Expand Down Expand Up @@ -94,7 +94,11 @@ impl Contract {
api,
)?;

let connections = collect_ancestor_sources(dag_schemas, node_index);
let connections = dag_schemas
.collect_ancestor_sources(node_index)
.into_iter()
.map(|handle| handle.id)
.collect();

let schema = EndpointSchema {
path: api.path.clone(),
Expand Down Expand Up @@ -195,27 +199,6 @@ fn sink_input_schema(dag: &DagSchemas, node_index: NodeIndex) -> &Schema {
&edge.weight().schema
}

fn collect_ancestor_sources(dag: &DagSchemas, node_index: NodeIndex) -> HashSet<String> {
let mut sources = HashSet::new();
collect_ancestor_sources_recursive(dag, node_index, &mut sources);
sources
}

fn collect_ancestor_sources_recursive(
dag: &DagSchemas,
node_index: NodeIndex,
sources: &mut HashSet<String>,
) {
for edge in dag.graph().edges_directed(node_index, Direction::Incoming) {
let source_node_index = edge.source();
let source_node = &dag.graph()[source_node_index];
if matches!(source_node.kind, dozer_core::NodeKind::Source(_)) {
sources.insert(source_node.handle.id.clone());
}
collect_ancestor_sources_recursive(dag, source_node_index, sources);
}
}

fn serde_json_to_path(path: impl AsRef<Path>, value: &impl Serialize) -> Result<(), BuildError> {
let file = OpenOptions::new()
.create(true)
Expand Down
150 changes: 124 additions & 26 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use std::{collections::HashMap, fmt::Debug};
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
};

use daggy::{
petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences},
NodeIndex,
};
use dozer_types::node::{NodeHandle, OpIdentifier};
use dozer_types::{
log::warn,
node::{NodeHandle, OpIdentifier},
};

use crate::{
checkpoint::OptionCheckpoint,
dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType},
errors::ExecutionError,
node::{Processor, Sink, Source},
node::{Processor, Sink, SinkFactory, Source},
NodeKind as DagNodeKind,
};

Expand Down Expand Up @@ -64,29 +70,119 @@ impl BuilderDag {
}
}

// Build the nodes.
let mut graph = daggy::Dag::new();
// Collect sources that may affect a node.
let mut affecting_sources = dag_schemas
.graph()
.node_identifiers()
.map(|node_index| dag_schemas.collect_ancestor_sources(node_index))
.collect::<Vec<_>>();

// Prepare nodes and edges for consuming.
let (nodes, edges) = dag_schemas.into_graph().into_graph().into_nodes_edges();
for (node_index, node) in nodes.into_iter().enumerate() {
let mut nodes = nodes
.into_iter()
.map(|node| Some(node.weight))
.collect::<Vec<_>>();

// Build the sinks and load checkpoint.
let mut graph = daggy::Dag::new();
let mut source_states = HashMap::new();
let mut source_op_ids = HashMap::new();
let mut source_id_to_sinks = HashMap::<NodeHandle, Vec<NodeIndex>>::new();
let mut node_index_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
for (node_index, node) in nodes.iter_mut().enumerate() {
if let Some((handle, sink)) = take_sink(node) {
let sources = std::mem::take(&mut affecting_sources[node_index]);
if sources.len() > 1 {
warn!("Multiple sources ({sources:?}) connected to same sink: {handle}");
}
let source = sources.into_iter().next().expect("sink must have a source");

let node_index = NodeIndex::new(node_index);
let mut sink = sink
.build(
input_schemas
.remove(&node_index)
.expect("we collected all input schemas"),
)
.await
.map_err(ExecutionError::Factory)?;

let state = sink.get_source_state().map_err(ExecutionError::Sink)?;
if let Some(state) = state {
match source_states.entry(handle.clone()) {
Entry::Occupied(entry) => {
if entry.get() != &state {
return Err(ExecutionError::SourceStateConflict(handle));
}
}
Entry::Vacant(entry) => {
entry.insert(state);
}
}
}

let op_id = sink.get_latest_op_id().map_err(ExecutionError::Sink)?;
if let Some(op_id) = op_id {
match source_op_ids.entry(handle.clone()) {
Entry::Occupied(mut entry) => {
*entry.get_mut() = op_id.min(*entry.get());
}
Entry::Vacant(entry) => {
entry.insert(op_id);
}
}
}

let new_node_index = graph.add_node(NodeType {
handle,
kind: NodeKind::Sink(sink),
});
node_index_map.insert(node_index, new_node_index);
source_id_to_sinks
.entry(source)
.or_default()
.push(new_node_index);
}
}

// Build sources, processors, and collect source states.
for (node_index, node) in nodes.iter_mut().enumerate() {
let Some(node) = node.take() else {
continue;
};
let node_index = NodeIndex::new(node_index);
let node = node.weight;
let node = match node.kind {
DagNodeKind::Source(source) => {
let source_state = checkpoint.get_source_state(&node.handle)?;
let source = source
.build(
output_schemas
.remove(&node_index)
.expect("we collected all output schemas"),
source_state.map(|state| state.0.to_vec()),
source_states.remove(&node.handle),
)
.map_err(ExecutionError::Factory)?;

// Write state to relevant sink.
let state = source
.serialize_state()
.await
.map_err(ExecutionError::Source)?;
for sink in source_id_to_sinks.remove(&node.handle).unwrap_or_default() {
let sink = &mut graph[sink];
let NodeKind::Sink(sink) = &mut sink.kind else {
unreachable!()
};
sink.set_source_state(&state)
.map_err(ExecutionError::Sink)?;
}

let last_checkpoint = source_op_ids.remove(&node.handle);
NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint: source_state.map(|state| state.1),
last_checkpoint,
},
}
}
Expand All @@ -111,28 +207,20 @@ impl BuilderDag {
kind: NodeKind::Processor(processor),
}
}
DagNodeKind::Sink(sink) => {
let sink = sink
.build(
input_schemas
.remove(&node_index)
.expect("we collected all input schemas"),
)
.await
.map_err(ExecutionError::Factory)?;
NodeType {
handle: node.handle,
kind: NodeKind::Sink(sink),
}
}
DagNodeKind::Sink(_) => unreachable!(),
};
graph.add_node(node);
let new_node_index = graph.add_node(node);
node_index_map.insert(node_index, new_node_index);
}

// Connect the edges.
for edge in edges {
graph
.add_edge(edge.source(), edge.target(), edge.weight)
.add_edge(
node_index_map[&edge.source()],
node_index_map[&edge.target()],
edge.weight,
)
.expect("we know there's no loop");
}

Expand All @@ -147,3 +235,13 @@ impl BuilderDag {
self.graph
}
}

fn take_sink(node: &mut Option<super::NodeType>) -> Option<(NodeHandle, Box<dyn SinkFactory>)> {
let super::NodeType { handle, kind } = node.take()?;
if let super::NodeKind::Sink(sink) = kind {
Some((handle, sink))
} else {
*node = Some(super::NodeType { handle, kind });
None
}
}
4 changes: 2 additions & 2 deletions dozer-core/src/channels.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::node::PortHandle;
use dozer_types::types::Operation;
use dozer_types::types::OperationWithId;

pub trait ProcessorChannelForwarder {
/// Sends a operation to downstream nodes. Panics if the operation cannot be sent.
///
/// We must panic instead of returning an error because this method will be called by `Processor::process`,
/// which only returns recoverable errors.
fn send(&mut self, op: Operation, port: PortHandle);
fn send(&mut self, op: OperationWithId, port: PortHandle);
}
Loading
Loading