Skip to content

Commit

Permalink
fix: Record writer was not checkpointed
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Oct 19, 2023
1 parent 31787bd commit 1cf2c3c
Show file tree
Hide file tree
Showing 39 changed files with 369 additions and 322 deletions.
3 changes: 2 additions & 1 deletion dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ impl<'a> Executor<'a> {
}

pub fn run_dag_executor(
runtime: &Runtime,
dag_executor: DagExecutor,
running: Arc<AtomicBool>,
labels: LabelsAndProgress,
) -> Result<(), OrchestrationError> {
let join_handle = dag_executor.start(running, labels)?;
let join_handle = runtime.block_on(dag_executor.start(running, labels))?;
join_handle
.join()
.map_err(OrchestrationError::ExecutionError)
Expand Down
8 changes: 7 additions & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,14 @@ impl SimpleOrchestrator {
}

let labels = self.labels.clone();
let runtime_clone = self.runtime.clone();
let pipeline_future = self.runtime.spawn_blocking(move || {
run_dag_executor(dag_executor, shutdown.get_running_flag(), labels)
run_dag_executor(
&runtime_clone,
dag_executor,
shutdown.get_running_flag(),
labels,
)
});

let mut futures = FuturesUnordered::new();
Expand Down
43 changes: 9 additions & 34 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use std::{collections::HashMap, fmt::Debug};

use daggy::petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences};
use dozer_types::node::NodeHandle;

use crate::{
checkpoint::{CheckpointFactory, CheckpointFactoryOptions, OptionCheckpoint},
checkpoint::OptionCheckpoint,
dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType},
errors::ExecutionError,
node::{PortHandle, Processor, Sink, Source, SourceState},
node::{Processor, Sink, Source, SourceState},
NodeKind as DagNodeKind,
};

Expand All @@ -25,7 +25,6 @@ pub struct NodeType {
pub enum NodeKind {
Source {
source: Box<dyn Source>,
port_names: HashMap<PortHandle, String>,
last_checkpoint: SourceState,
},
Processor(Box<dyn Processor>),
Expand All @@ -38,14 +37,11 @@ pub enum NodeKind {
#[derive(Debug)]
pub struct BuilderDag {
graph: daggy::Dag<NodeType, EdgeType>,
checkpoint_factory: Arc<CheckpointFactory>,
initial_epoch_id: u64,
}

impl BuilderDag {
pub async fn new(
checkpoint: OptionCheckpoint,
options: CheckpointFactoryOptions,
checkpoint: &OptionCheckpoint,
dag_schemas: DagSchemas,
) -> Result<Self, ExecutionError> {
// Collect input output schemas.
Expand All @@ -69,21 +65,16 @@ impl BuilderDag {
let graph = dag_schemas.into_graph().try_map(
|node_index, node| match node.kind {
DagNodeKind::Source(source) => {
let mut port_names = HashMap::default();
for port in source.get_output_ports() {
let port_name = source.get_output_port_name(&port.handle);
port_names.insert(port.handle, port_name);
}

let mut last_checkpoint_by_name = checkpoint.get_source_state(&node.handle)?;
let mut last_checkpoint = HashMap::new();
for (port_handle, port_name) in port_names.iter() {
for port_def in source.get_output_ports() {
let port_name = source.get_output_port_name(&port_def.handle);
last_checkpoint.insert(
*port_handle,
port_def.handle,
last_checkpoint_by_name
.as_mut()
.and_then(|last_checkpoint| {
last_checkpoint.remove(port_name).flatten()
last_checkpoint.remove(&port_name).flatten()
}),
);
}
Expand All @@ -100,7 +91,6 @@ impl BuilderDag {
handle: node.handle,
kind: NodeKind::Source {
source,
port_names,
last_checkpoint,
},
})
Expand Down Expand Up @@ -142,28 +132,13 @@ impl BuilderDag {
|_, edge| Ok(edge),
)?;

let initial_epoch_id = checkpoint.next_epoch_id();
let (checkpoint_factory, _) = CheckpointFactory::new(checkpoint, options).await?;

Ok(BuilderDag {
graph,
initial_epoch_id,
checkpoint_factory: Arc::new(checkpoint_factory),
})
Ok(BuilderDag { graph })
}

pub fn graph(&self) -> &daggy::Dag<NodeType, EdgeType> {
&self.graph
}

pub fn checkpoint_factory(&self) -> &Arc<CheckpointFactory> {
&self.checkpoint_factory
}

pub fn initial_epoch_id(&self) -> u64 {
self.initial_epoch_id
}

pub fn into_graph(self) -> daggy::Dag<NodeType, EdgeType> {
self.graph
}
Expand Down
32 changes: 32 additions & 0 deletions dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ impl OptionCheckpoint {
Ok(None)
}
}

pub async fn load_record_writer_data(
&self,
node_handle: &NodeHandle,
port_name: &str,
) -> Result<Option<Vec<u8>>, storage::Error> {
if let Some(checkpoint) = &self.checkpoint {
let key = record_writer_key(&checkpoint.processor_prefix, node_handle, port_name);
info!("Loading record writer {node_handle}-{port_name} checkpoint from {key}");
self.storage.download_object(key).await.map(Some)
} else {
Ok(None)
}
}
}

impl CheckpointFactory {
Expand Down Expand Up @@ -248,6 +262,12 @@ fn processor_key(processor_prefix: &str, node_handle: &NodeHandle) -> String {
.into_string()
}

fn record_writer_key(processor_prefix: &str, node_handle: &NodeHandle, port_name: &str) -> String {
AsRef::<Utf8Path>::as_ref(processor_prefix)
.join(format!("{}-{}", node_handle, port_name))
.into_string()
}

impl CheckpointWriter {
pub fn new(
factory: Arc<CheckpointFactory>,
Expand Down Expand Up @@ -281,6 +301,16 @@ impl CheckpointWriter {
.map_err(|_| ExecutionError::CheckpointWriterThreadPanicked)
}

pub fn create_record_writer_object(
&self,
node_handle: &NodeHandle,
port_name: &str,
) -> Result<Object, ExecutionError> {
let key = record_writer_key(&self.processor_prefix, node_handle, port_name);
Object::new(self.factory.queue.clone(), key)
.map_err(|_| ExecutionError::CheckpointWriterThreadPanicked)
}

fn drop(&mut self) -> Result<(), ExecutionError> {
self.factory.write_record_store_slice(
std::mem::take(&mut self.record_store_key),
Expand Down Expand Up @@ -429,6 +459,8 @@ pub async fn create_checkpoint_factory_for_test(
(temp_dir, Arc::new(checkpoint_factory), handle)
}

pub mod serialize;

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dozer_core::dozer_log::{storage::Object, tokio::sync::mpsc::error::SendError};
use dozer_log::{storage::Object, tokio::sync::mpsc::error::SendError};
use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore, ProcessorRecordStoreDeserializer};
use dozer_types::{
bincode,
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/dag_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ fn contains_port(
Ok(match node {
NodeKind::Processor(p) => {
if direction == PortDirection::Output {
p.get_output_ports().iter().any(|e| e.handle == port)
p.get_output_ports().iter().any(|e| e == &port)
} else {
p.get_input_ports().contains(&port)
}
Expand Down
54 changes: 35 additions & 19 deletions dozer-core/src/dag_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,37 @@ use super::{EdgeType as DagEdgeType, NodeType};
#[serde(crate = "dozer_types::serde")]
pub struct EdgeType {
pub output_port: PortHandle,
pub output_port_type: OutputPortType,
pub input_port: PortHandle,
pub schema: Schema,
pub edge_kind: EdgeKind,
}

impl EdgeType {
pub fn new(
output_port: PortHandle,
output_port_type: OutputPortType,
input_port: PortHandle,
schema: Schema,
edge_kind: EdgeKind,
) -> Self {
Self {
output_port,
output_port_type,
input_port,
schema,
edge_kind,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(crate = "dozer_types::serde")]
pub enum EdgeKind {
FromSource {
port_type: OutputPortType,
port_name: String,
},
FromProcessor,
}

pub trait EdgeHaveSchema: EdgeHavePorts {
fn schema(&self) -> &Schema;
}
Expand Down Expand Up @@ -191,26 +201,33 @@ fn populate_schemas(
let ports = source.get_output_ports();

for edge in dag.graph().edges(node_index) {
let port = find_output_port_def(&ports, edge);
let port = edge.weight().from;
let port_type = find_output_port_type(&ports, edge);
let port_name = source.get_output_port_name(&port);
let schema = source
.get_output_schema(&port.handle)
.get_output_schema(&port)
.map_err(ExecutionError::Factory)?;
create_edge(&mut edges, edge, port, schema);
create_edge(
&mut edges,
edge,
EdgeKind::FromSource {
port_type,
port_name,
},
schema,
);
}
}

NodeKind::Processor(processor) => {
let input_schemas =
validate_input_schemas(&dag, &edges, node_index, processor.get_input_ports())?;

let ports = processor.get_output_ports();

for edge in dag.graph().edges(node_index) {
let port = find_output_port_def(&ports, edge);
let schema = processor
.get_output_schema(&port.handle, &input_schemas)
.get_output_schema(&edge.weight().from, &input_schemas)
.map_err(ExecutionError::Factory)?;
create_edge(&mut edges, edge, port, schema);
create_edge(&mut edges, edge, EdgeKind::FromProcessor, schema);
}
}

Expand All @@ -229,14 +246,14 @@ fn populate_schemas(
))
}

fn find_output_port_def<'a>(
ports: &'a [OutputPortDef],
fn find_output_port_type(
ports: &[OutputPortDef],
edge: EdgeReference<DagEdgeType>,
) -> &'a OutputPortDef {
) -> OutputPortType {
let handle = edge.weight().from;
for port in ports {
if port.handle == handle {
return port;
return port.typ;
}
}
panic!("BUG: port {handle} not found")
Expand All @@ -245,17 +262,16 @@ fn find_output_port_def<'a>(
fn create_edge(
edges: &mut [Option<EdgeType>],
edge: EdgeReference<DagEdgeType>,
port: &OutputPortDef,
edge_kind: EdgeKind,
schema: Schema,
) {
debug_assert!(port.handle == edge.weight().from);
let edge_ref = &mut edges[edge.id().index()];
debug_assert!(edge_ref.is_none());
*edge_ref = Some(EdgeType::new(
port.handle,
port.typ,
edge.weight().from,
edge.weight().to,
schema,
edge_kind,
));
}

Expand Down
5 changes: 5 additions & 0 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;

use crate::checkpoint::serialize::{DeserializationError, SerializationError};
use crate::node::PortHandle;
use dozer_recordstore::RecordStoreError;
use dozer_types::errors::internal::BoxedError;
Expand Down Expand Up @@ -31,6 +32,8 @@ pub enum ExecutionError {
AppSourceConnectionAlreadyExists(String),
#[error("Factory error: {0}")]
Factory(#[source] BoxedError),
#[error("Failed to restore record writer: {0}")]
RestoreRecordWriter(#[source] DeserializationError),
#[error("Source error: {0}")]
Source(#[source] BoxedError),
#[error("File system error {0:?}: {1}")]
Expand All @@ -52,6 +55,8 @@ pub enum ExecutionError {
},
#[error("Failed to create checkpoint: {0}")]
FailedToCreateCheckpoint(BoxedError),
#[error("Serialization error: {0}")]
Serialization(#[from] SerializationError),
}

impl<T> From<crossbeam::channel::SendError<T>> for ExecutionError {
Expand Down
Loading

0 comments on commit 1cf2c3c

Please sign in to comment.