Skip to content

Commit

Permalink
Aerospike sink: Add transactionally consistent denormalization (#2437)
Browse files Browse the repository at this point in the history
* Resolve sink checkpoint data to sources

* Oracle: send op_id in every op

* Aerospike sink: Transactionally consistent denormalization

* Do batching on aerospike sink transactions

* Denorm resumability

* Aerospike sink: Allow specifying an alternate primary key for sets

* Fix: allocate metadata record if not found in sink

* Update aerospike client
  • Loading branch information
Jesse-Bakker authored Mar 7, 2024
1 parent 050a49a commit 4fcb982
Show file tree
Hide file tree
Showing 15 changed files with 2,535 additions and 1,059 deletions.
16 changes: 13 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,26 @@ impl BuilderDag {
.serialize_state()
.await
.map_err(ExecutionError::Source)?;
let mut checkpoint = None;
for sink in source_id_to_sinks.remove(&node.handle).unwrap_or_default() {
let sink = &mut graph[sink];
let sink_handle = &sink.handle;
let NodeKind::Sink(sink) = &mut sink.kind else {
unreachable!()
};
sink.set_source_state(&state)
.map_err(ExecutionError::Sink)?;
if let Some(sink_checkpoint) = source_op_ids.remove(sink_handle) {
checkpoint =
Some(checkpoint.unwrap_or(sink_checkpoint).min(sink_checkpoint));
}
}

let last_checkpoint = source_op_ids.remove(&node.handle);
NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint,
last_checkpoint: checkpoint,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl DagExecutor {
let Some(node) = execution_dag.graph()[node_index].kind.as_ref() else {
continue;
};
match &node {
match node {
NodeKind::Source { .. } => unreachable!("We already started the source node"),
NodeKind::Processor(_) => {
let processor_node = ProcessorNode::new(&mut execution_dag, node_index).await;
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub trait SinkFactory: Send + Sync + Debug {
fn type_name(&self) -> String;
}

pub trait Sink: Send + Sync + Debug {
pub trait Sink: Send + Debug {
fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError>;
fn process(&mut self, op: TableOperation) -> Result<(), BoxedError>;

Expand Down
6 changes: 3 additions & 3 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,17 +375,17 @@ impl Connector {
}
};

for (table_index, op) in transaction.operations {
for (seq, (table_index, op)) in transaction.operations.into_iter().enumerate() {
if ingestor
.blocking_handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
id: Some(OpIdentifier::new(transaction.commit_scn, seq as u64)),
})
.is_err()
{
return;
}
};
}

if ingestor
Expand Down
3 changes: 2 additions & 1 deletion dozer-sink-aerospike/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ edition = "2021"
dozer-core = { path = "../dozer-core" }
dozer-types = { path = "../dozer-types" }
aerospike-client-sys = { path = "./aerospike-client-sys" }
crossbeam-channel = "0.5.11"
itertools = "0.12"
smallvec = "1.13.1"
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
#include <aerospike/as_arraylist.h>
#include <aerospike/as_map.h>
#include <aerospike/as_orderedmap.h>
#include <aerospike/as_exp.h>
168 changes: 168 additions & 0 deletions dozer-sink-aerospike/aerospike-client-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,171 @@
#![allow(non_snake_case)]

include!(concat!(env!("OUT_DIR"), "/generated.rs"));

#[macro_export]
macro_rules! as_exp_build {
($func:ident $args:tt ) => {{
let mut v = Vec::new();
$crate::as_exp_build_inner!(v, $func $args);
$crate::as_exp_compile(v.as_mut_ptr(), v.len() as u32)
}}
}

#[macro_export]
macro_rules! as_exp_build_inner {
($v:expr, as_exp_bin_int($bin_name:expr $(,)?)) => {{
let bin_name: *const i8 = $bin_name;
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_BIN,
count: 3,
sz: 0,
prev_va_args: 0,
v: std::mem::zeroed(),
});
$crate::as_exp_build_inner!($v, as_exp_int($crate::as_exp_type_AS_EXP_TYPE_INT as i64));
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_VAL_RAWSTR,
v: $crate::as_exp_entry__bindgen_ty_1 { str_val: bin_name },
count: 0,
sz: 0,
prev_va_args: 0,
});
}};
($v:expr, as_exp_int($val:expr)) => {
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_VAL_INT,
v: $crate::as_exp_entry__bindgen_ty_1 { int_val: $val },
count: 0,
sz: 0,
prev_va_args: 0,
})
};
($v:expr, as_exp_uint($val:expr)) => {
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_VAL_UINT,
v: $crate::as_exp_entry__bindgen_ty_1 { uint_val: $val },
count: 0,
sz: 0,
prev_va_args: 0,
})
};
($v:expr, as_exp_cmp_eq($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_EQ,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left_name $left_args);
$crate::as_exp_build_inner!($v, $right_name $right_args);
}};
($v:expr, as_exp_cmp_gt($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_GT,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left_name $left_args);
$crate::as_exp_build_inner!($v, $right_name $right_args);
}};
($v:expr, as_exp_cmp_ge($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_GE,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left);
$crate::as_exp_build_inner!($v, $right);
}};
($v:expr, as_exp_cmp_lt($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_LT,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left_name $left_args);
$crate::as_exp_build_inner!($v, $right_name $right_args);
}};
($v:expr, as_exp_cmp_le($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_LE,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left_name $left_args);
$crate::as_exp_build_inner!($v, $right_name $right_args);
}};
($v:expr, as_exp_and($($arg_name:ident $arg_args:tt),*)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_AND,
count: 0,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$($crate::as_exp_build_inner!($v, $arg_name $arg_args));*;
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_END_OF_VA_ARGS,
count: 0,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
}};
($v:expr, as_exp_or($($arg_name:ident $arg_args:tt),*)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_OR,
count: 0,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$($crate::as_exp_build_inner!($v, $arg_name $arg_args));*;
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_END_OF_VA_ARGS,
count: 0,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
}};
}

#[cfg(test)]
mod tests {
use std::ffi::CString;

use super::*;

#[test]
fn test_as_exp_build() {
// Tested that this results in the same compiled expression as when
// using the macros from the C library
let bin_name = CString::new("bin_name").unwrap();
unsafe {
let exp = as_exp_build! {
as_exp_and(
as_exp_cmp_gt(
as_exp_bin_int(bin_name.as_ptr()),
as_exp_int(3)
),
as_exp_cmp_lt(
as_exp_bin_int(bin_name.as_ptr()),
as_exp_int(8)
)
)
};
assert!(!exp.is_null());
as_exp_destroy(exp);
}
}
}
Loading

0 comments on commit 4fcb982

Please sign in to comment.