diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index 0b1c53305c..1a98592817 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -2,8 +2,9 @@ use std::time::Instant; use std::{ alloc::{handle_alloc_error, Layout}, ffi::{c_char, c_void, CStr, CString, NulError}, + fmt::Display, mem::MaybeUninit, - ptr::{addr_of, NonNull}, + ptr::{addr_of, addr_of_mut, NonNull}, slice, }; @@ -18,7 +19,7 @@ use dozer_types::{ ordered_float::OrderedFloat, rust_decimal::prelude::*, thiserror, - types::{DozerDuration, DozerPoint, Field, Record, Schema}, + types::{DozerDuration, DozerPoint, Field, Schema}, }; use crate::{denorm_dag::Error, AerospikeSinkError}; @@ -62,10 +63,10 @@ impl BinNames { &self.storage } - pub(crate) fn new>>(names: I) -> Result { + pub(crate) fn new<'a, I: IntoIterator>(names: I) -> Result { let storage: Vec = names .into_iter() - .map(|name| CString::new(name.as_ref())) + .map(CString::new) .collect::>()?; let ptrs = Self::make_ptrs(&storage); Ok(Self { @@ -312,7 +313,6 @@ impl Client { &self, batch: *mut as_batch_records, ) -> Result<(), AerospikeError> { - dbg!("Batch get {} records", (*batch).list.size); as_try(|err| aerospike_batch_read(self.inner.as_ptr(), err, std::ptr::null(), batch)) } @@ -476,9 +476,6 @@ unsafe fn init_key_single( Field::U128(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings), Field::I128(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings), Field::Decimal(v) => set_str_key(key, namespace, set, v.to_string(), allocated_strings), - // For keys, we need to allocate a new CString, because there is no - // API to set a key to a string that's not null-terminated. For bin - // values, we can. XXX: possible point for optimization Field::Text(string) | Field::String(string) => { set_str_key(key, namespace, set, string.clone(), allocated_strings); } @@ -516,98 +513,72 @@ unsafe fn init_key_single( Ok(()) } -unsafe fn rec_set_str( - record: *mut as_record, - name: *const c_char, - string: String, +unsafe fn map_set_str( + map: *mut as_orderedmap, + key: *const as_val, + string: impl Display, allocated_strings: &mut Vec, ) { - rec_set_bytes( - record, - name, - string.as_bytes(), - as_bytes_type_e_AS_BYTES_STRING, - ); - allocated_strings.push(string); -} + let string = format!("{string}\0"); -unsafe fn rec_set_bytes( - record: *mut as_record, - name: *const c_char, - bytes: &[u8], - type_: as_bytes_type, -) { - let ptr = bytes.as_ptr(); - let len = bytes.len(); - as_record_set_raw_typep(record, name, ptr, len as u32, type_, false); + let cstr = CStr::from_bytes_with_nul(string.as_bytes()).unwrap(); + let val = + as_string_new_wlen(cstr.as_ptr() as *mut c_char, string.len(), false) as *const as_val; + as_orderedmap_set(map, key, val); + allocated_strings.push(string); } -#[allow(unused)] -pub(crate) unsafe fn init_record( - record: *mut as_record, - dozer_record: &Record, +pub(crate) unsafe fn new_record_map( + dozer_record: &[Field], bin_names: &[CString], - n_extra_cols: u16, allocated_strings: &mut Vec, -) -> Result<(), AerospikeSinkError> { - as_record_init( - record, - dozer_record.values.len() as u16 + n_extra_cols /* denorm */ + 2, /* tx_id and seq */ - ); - for (def, field) in bin_names.iter().zip(&dozer_record.values) { - let name = def.as_ptr(); +) -> Result<*mut as_orderedmap, AerospikeSinkError> { + let map = check_alloc(as_orderedmap_new(bin_names.len().try_into().unwrap())); + for (def, field) in bin_names.iter().zip(dozer_record) { + let key = check_alloc(as_string_new_strdup(def.as_ptr())) as *const as_val; match field { Field::UInt(v) => { - as_record_set_int64(record, name, *v as i64); + as_orderedmap_set( + map, + key, + check_alloc(as_integer_new((*v).try_into().unwrap())) as *const as_val, + ); } Field::U128(v) => { - rec_set_str(record, name, v.to_string(), allocated_strings); + map_set_str(map, key, v, allocated_strings); } Field::Int(v) => { - as_record_set_int64(record, name, *v); + as_orderedmap_set(map, key, check_alloc(as_integer_new(*v)) as *const as_val); } Field::I128(v) => { - rec_set_str(record, name, v.to_string(), allocated_strings); + map_set_str(map, key, v, allocated_strings); } Field::Float(OrderedFloat(v)) => { - as_record_set_double(record, name, *v); + as_orderedmap_set(map, key, check_alloc(as_double_new(*v)) as *const as_val); } Field::Boolean(v) => { - as_record_set_bool(record, name, *v); + as_orderedmap_set(map, key, check_alloc(as_boolean_new(*v)) as *const as_val); } Field::String(v) | Field::Text(v) => { - as_record_set_raw_typep( - record, - name, - v.as_ptr(), - v.len() as u32, - as_bytes_type_e_AS_BYTES_STRING, - false, - ); + map_set_str(map, key, v, allocated_strings); } Field::Binary(v) => { - as_record_set_rawp(record, name, v.as_ptr(), v.len() as u32, false); + let bytes = check_alloc(as_bytes_new(v.len().try_into().unwrap())); + as_bytes_set(bytes, 0, v.as_ptr(), v.len().try_into().unwrap()); + as_orderedmap_set(map, key, bytes as *const as_val); } Field::Decimal(v) => { - rec_set_str(record, name, v.to_string(), allocated_strings); + map_set_str(map, key, v, allocated_strings); } Field::Timestamp(v) => { - rec_set_str(record, name, v.to_rfc3339(), allocated_strings); + map_set_str(map, key, v.to_rfc3339(), allocated_strings); } // Date's display implementation is RFC3339 compatible Field::Date(v) => { - rec_set_str(record, name, v.to_string(), allocated_strings); - } - Field::Duration(DozerDuration(duration, _)) => { - rec_set_str( - record, - name, - format!("PT{},{:09}S", duration.as_secs(), duration.subsec_nanos()), - allocated_strings, - ); + map_set_str(map, key, v, allocated_strings); } Field::Null => { - as_record_set_nil(record, name); + as_orderedmap_set(map, key, addr_of!(as_nil) as *const as_val); } // XXX: Geojson points have to have coordinates <90. Dozer points can // be arbitrary locations. @@ -616,20 +587,34 @@ pub(crate) unsafe fn init_record( // a plain string format. Instead, we just make sure we include a nul-byte // in our regular string, as that is easiest to integration with the other // string allocations. - let string = format!( - r#"{{"type": "Point", "coordinates": [{}, {}]}}{}"#, - x.0, y.0, '\0' + map_set_str( + map, + key, + format_args!(r#"{{"type": "Point", "coordinates": [{}, {}]}}"#, x.0, y.0), + allocated_strings, ); - as_record_set_geojson_strp(record, name, string.as_ptr().cast(), false); - allocated_strings.push(string); + // Parsing is unimplemented and it's better to fail early + unimplemented!(); } Field::Json(v) => { - let value = convert_json(v)?; - as_record_set(record, name, value); + let val = convert_json(v)? as *const as_val; + as_orderedmap_set(map, key, val); + // Parsing is unimplemented and it's better to fail early + unimplemented!(); + } + Field::Duration(DozerDuration(duration, _)) => { + map_set_str( + map, + key, + format_args!("PT{},{:09}S", duration.as_secs(), duration.subsec_nanos()), + allocated_strings, + ); + // Parsing is unimplemented and it's better to fail early + unimplemented!(); } } } - Ok(()) + Ok(map) } unsafe fn set_operation_str( @@ -741,6 +726,42 @@ fn map(val: *mut as_val, typ: as_val_type_e, f: impl FnOnce(&T) -> Option Result>, Error> { + unsafe { + let list = as_record_get_list(record, list_bin.as_ptr()); + let n_recs = as_list_size(list); + + let mut result = Vec::with_capacity(n_recs as usize); + for elem in (0..n_recs).map(|i| as_list_get_map(list, i)) { + if elem.is_null() { + continue; + } + + let mut values = Vec::with_capacity(schema.fields.len()); + for (field, name) in schema.fields.iter().zip(bin_names.names()) { + let mut string = MaybeUninit::uninit(); + let key = as_string_init(string.as_mut_ptr(), name.as_ptr() as *mut c_char, false); + let val = as_map_get(elem, key as *const as_val); + as_string_destroy(&mut string.assume_init() as *mut as_string); + let v = parse_val(val, field)?; + values.push(v); + } + result.push(values); + } + Ok(result) + } +} + +#[inline(always)] +unsafe fn as_string_destroy(string: *mut as_string_s) { + as_val_val_destroy(string as *mut as_val); +} + pub(crate) fn parse_record( record: &as_record, schema: &Schema, @@ -750,99 +771,106 @@ pub(crate) fn parse_record( let mut values = Vec::with_capacity(schema.fields.len()); for (field, name) in schema.fields.iter().zip(bin_names.names()) { let val = unsafe { as_record_get(record, name.as_ptr()) as *mut as_val }; - let v = if val.is_null() { - Field::Null - } else { - match field.typ { - dozer_types::types::FieldType::UInt => { - map(val, as_val_type_e_AS_INTEGER, |v: &as_integer| { - Some(Field::UInt(v.value.to_u64()?)) - }) - } - dozer_types::types::FieldType::U128 => { - map(val, as_val_type_e_AS_STRING, |v: &as_string| { - Some(Field::U128(unsafe { - CStr::from_ptr(v.value).to_str().ok()?.parse().ok()? - })) - }) - } - dozer_types::types::FieldType::Int => { - map(val, as_val_type_e_AS_INTEGER, |v: &as_integer| { - Some(Field::Int(v.value)) - }) - } - dozer_types::types::FieldType::I128 => { - map(val, as_val_type_e_AS_STRING, |v: &as_string| { - Some(Field::I128(unsafe { - CStr::from_ptr(v.value).to_str().ok()?.parse().ok()? - })) - }) - } - dozer_types::types::FieldType::Float => { - map(val, as_val_type_e_AS_DOUBLE, |v: &as_double| { - Some(Field::Float(OrderedFloat(v.value))) - }) - } - dozer_types::types::FieldType::Boolean => { - map(val, as_val_type_e_AS_BOOLEAN, |v: &as_boolean| { - Some(Field::Boolean(v.value)) - }) - } - dozer_types::types::FieldType::String => { - map(val, as_val_type_e_AS_STRING, |v: &as_string| { - Some(Field::String( - unsafe { CStr::from_ptr(v.value) }.to_str().ok()?.to_owned(), - )) - }) - } - dozer_types::types::FieldType::Text => { - map(val, as_val_type_e_AS_STRING, |v: &as_string| { - Some(Field::Text( - unsafe { CStr::from_ptr(v.value) }.to_str().ok()?.to_owned(), - )) - }) - } - dozer_types::types::FieldType::Binary => { - map(val, as_val_type_e_AS_BYTES, |v: &as_bytes| { - Some(Field::Binary(unsafe { - slice::from_raw_parts(v.value, v.size as usize).to_vec() - })) - }) - } - dozer_types::types::FieldType::Decimal => { - map(val, as_val_type_e_AS_STRING, |v: &as_string| { - Some(Field::Decimal(unsafe { - CStr::from_ptr(v.value).to_str().ok()?.parse().ok()? - })) - }) - } - dozer_types::types::FieldType::Timestamp => { - map(val, as_val_type_e_AS_STRING, |v: &as_string| { - Some(Field::Timestamp(unsafe { - DateTime::parse_from_rfc3339(CStr::from_ptr(v.value).to_str().ok()?) - .ok()? - })) - }) - } + let v = parse_val(val, field)?; + values.push(v); + } + Ok(values) +} - dozer_types::types::FieldType::Date => { - map(val, as_val_type_e_AS_STRING, |v: &as_string| { - Some(Field::Date(unsafe { - NaiveDate::from_str(CStr::from_ptr(v.value).to_str().ok()?).ok()? - })) - }) - } - dozer_types::types::FieldType::Point => unimplemented!(), - dozer_types::types::FieldType::Duration => unimplemented!(), - dozer_types::types::FieldType::Json => unimplemented!(), +fn parse_val( + val: *mut as_val_s, + field: &dozer_types::types::FieldDefinition, +) -> Result { + let v = if val.is_null() { + Field::Null + } else { + match field.typ { + dozer_types::types::FieldType::UInt => { + map(val, as_val_type_e_AS_INTEGER, |v: &as_integer| { + Some(Field::UInt(v.value.to_u64()?)) + }) } - }; - if !field.nullable && v == Field::Null { - return Err(Error::NotNullNotFound); + dozer_types::types::FieldType::U128 => { + map(val, as_val_type_e_AS_STRING, |v: &as_string| { + Some(Field::U128(unsafe { + CStr::from_ptr(v.value).to_str().ok()?.parse().ok()? + })) + }) + } + dozer_types::types::FieldType::Int => { + map(val, as_val_type_e_AS_INTEGER, |v: &as_integer| { + Some(Field::Int(v.value)) + }) + } + dozer_types::types::FieldType::I128 => { + map(val, as_val_type_e_AS_STRING, |v: &as_string| { + Some(Field::I128(unsafe { + CStr::from_ptr(v.value).to_str().ok()?.parse().ok()? + })) + }) + } + dozer_types::types::FieldType::Float => { + map(val, as_val_type_e_AS_DOUBLE, |v: &as_double| { + Some(Field::Float(OrderedFloat(v.value))) + }) + } + dozer_types::types::FieldType::Boolean => { + map(val, as_val_type_e_AS_BOOLEAN, |v: &as_boolean| { + Some(Field::Boolean(v.value)) + }) + } + dozer_types::types::FieldType::String => { + map(val, as_val_type_e_AS_STRING, |v: &as_string| { + Some(Field::String( + unsafe { CStr::from_ptr(v.value) }.to_str().ok()?.to_owned(), + )) + }) + } + dozer_types::types::FieldType::Text => { + map(val, as_val_type_e_AS_STRING, |v: &as_string| { + Some(Field::Text( + unsafe { CStr::from_ptr(v.value) }.to_str().ok()?.to_owned(), + )) + }) + } + dozer_types::types::FieldType::Binary => { + map(val, as_val_type_e_AS_BYTES, |v: &as_bytes| { + Some(Field::Binary(unsafe { + slice::from_raw_parts(v.value, v.size as usize).to_vec() + })) + }) + } + dozer_types::types::FieldType::Decimal => { + map(val, as_val_type_e_AS_STRING, |v: &as_string| { + Some(Field::Decimal(unsafe { + CStr::from_ptr(v.value).to_str().ok()?.parse().ok()? + })) + }) + } + dozer_types::types::FieldType::Timestamp => { + map(val, as_val_type_e_AS_STRING, |v: &as_string| { + Some(Field::Timestamp(unsafe { + DateTime::parse_from_rfc3339(CStr::from_ptr(v.value).to_str().ok()?).ok()? + })) + }) + } + + dozer_types::types::FieldType::Date => { + map(val, as_val_type_e_AS_STRING, |v: &as_string| { + Some(Field::Date(unsafe { + NaiveDate::from_str(CStr::from_ptr(v.value).to_str().ok()?).ok()? + })) + }) + } + dozer_types::types::FieldType::Point => unimplemented!(), + dozer_types::types::FieldType::Duration => unimplemented!(), + dozer_types::types::FieldType::Json => unimplemented!(), } - values.push(v); + }; + if !field.nullable && v == Field::Null { + return Err(Error::NotNullNotFound); } - Ok(values) + Ok(v) } #[inline(always)] @@ -860,6 +888,7 @@ fn as_util_fromval(v: *mut as_val, typ: as_val_type_e) -> Option> unsafe fn as_vector_reserve(vector: *mut as_vector) -> *mut c_void { if (*vector).size >= (*vector).capacity { as_vector_increase_capacity(vector); + check_alloc((*vector).list); } let item = (*vector) .list @@ -929,3 +958,264 @@ impl Drop for AsOperations { unsafe { as_operations_destroy(self.0) } } } + +macro_rules! as_util_hook { + ($hook:tt, $default:expr, $object:expr $(,$($arg:tt),*)?) => {{ + if !$object.is_null() && !(*$object).hooks.is_null() && (*(*$object).hooks).$hook.is_some() { + (*(*$object).hooks).$hook.unwrap()($object, $($($arg)*)?) + } else { + $default + } + }}; +} + +#[inline(always)] +unsafe fn as_list_size(list: *const as_list) -> u32 { + as_util_hook!(size, 0, list) +} + +#[inline(always)] +unsafe fn as_list_get(list: *const as_list, i: u32) -> *const as_val { + as_util_hook!(get, std::ptr::null(), list, i) +} + +#[inline(always)] +unsafe fn as_list_get_map(list: *const as_list, i: u32) -> *const as_map { + let val = as_list_get(list, i); + if !val.is_null() && (*val).type_ as u32 == as_val_type_e_AS_MAP { + val as *const as_map + } else { + std::ptr::null() + } +} + +#[inline(always)] +unsafe fn as_map_get(map: *const as_map, key: *const as_val) -> *mut as_val { + as_util_hook!(get, std::ptr::null_mut(), map, key) +} + +pub(crate) struct ReadBatchResults { + recs: AsBatchRecords, +} + +impl ReadBatchResults { + fn vector(&self) -> *const as_vector { + &self.recs.as_ref().list + } + + pub(crate) fn get(&self, idx: usize) -> Result, AerospikeError> { + let rec = unsafe { + assert!(idx < (*self.vector()).size as usize); + let rec = as_vector_get(self.vector(), idx) as *const as_batch_read_record; + rec.as_ref().unwrap() + }; + + #[allow(non_upper_case_globals)] + match rec.result { + as_status_e_AEROSPIKE_OK => Ok(Some(&rec.record)), + as_status_e_AEROSPIKE_ERR_RECORD_NOT_FOUND => Ok(None), + other => Err(AerospikeError::from_code(other)), + } + } +} + +pub(crate) struct ReadBatch<'a> { + client: &'a Client, + inner: Option, + allocated_strings: Vec, + read_ops: usize, +} + +impl<'a> ReadBatch<'a> { + fn reserve_read(&mut self) -> *mut as_batch_read_record { + unsafe { check_alloc(as_batch_read_reserve(self.inner.as_mut().unwrap().as_ptr())) } + } + + pub(crate) fn add_read_all( + &mut self, + namespace: &CStr, + set: &CStr, + key: &[Field], + ) -> Result { + let idx = self.read_ops; + let read_rec = self.reserve_read(); + unsafe { + init_key( + addr_of_mut!((*read_rec).key), + namespace, + set, + key, + &mut self.allocated_strings, + )?; + (*read_rec).read_all_bins = true; + } + self.read_ops += 1; + Ok(idx) + } + + pub(crate) fn execute(mut self) -> Result { + unsafe { self.client.batch_get(self.inner.as_mut().unwrap().as_ptr()) }?; + + Ok(ReadBatchResults { + recs: self.inner.take().unwrap(), + }) + } + + pub(crate) fn new( + client: &'a Client, + capacity: u32, + allocated_strings: Option>, + ) -> Self { + Self { + client, + inner: Some(AsBatchRecords::new(capacity)), + allocated_strings: allocated_strings.unwrap_or_default(), + read_ops: 0, + } + } +} + +struct AsBatchRecords(NonNull); + +impl AsBatchRecords { + fn new(capacity: u32) -> Self { + // Capacity needs to be at least 1, otherwise growing the vector will fail + // because it uses naive doubling of the capacity. We use rustc's heuristic + // for the minimum size of the vector (4 if the size of the element <= 1024) + // to save some re-allocations for small vectors. + let capacity = capacity.max(4); + unsafe { Self(NonNull::new(as_batch_records_create(capacity)).unwrap()) } + } + + fn as_ref(&self) -> &as_batch_records { + unsafe { self.0.as_ref() } + } + + fn as_ptr(&mut self) -> *mut as_batch_records { + self.0.as_ptr() + } +} + +pub(crate) struct WriteBatch<'a> { + client: &'a Client, + inner: Option, + allocated_strings: Vec, + operations: Vec, +} + +impl<'a> WriteBatch<'a> { + pub(crate) fn new( + client: &'a Client, + capacity: u32, + allocated_strings: Option>, + ) -> Self { + Self { + client, + inner: Some(AsBatchRecords::new(capacity)), + allocated_strings: allocated_strings.unwrap_or_default(), + operations: Vec::with_capacity(capacity as usize), + } + } + + fn batch_ptr(&mut self) -> *mut as_batch_records { + self.inner.as_mut().unwrap().as_ptr() + } + + pub(crate) fn reserve_write(&mut self) -> *mut as_batch_write_record { + unsafe { check_alloc(as_batch_write_reserve(self.batch_ptr())) } + } + + pub(crate) fn reserve_remove(&mut self) -> *mut as_batch_remove_record { + unsafe { check_alloc(as_batch_remove_reserve(self.batch_ptr())) } + } + + pub(crate) fn add_write( + &mut self, + namespace: &CStr, + set: &CStr, + bin_names: &[CString], + key: &[Field], + values: &[Field], + ) -> Result<(), AerospikeSinkError> { + let write_rec = self.reserve_write(); + unsafe { + init_key( + addr_of_mut!((*write_rec).key), + namespace, + set, + key, + &mut self.allocated_strings, + )?; + let mut ops = AsOperations::new(values.len().try_into().unwrap()); + init_batch_write_operations( + ops.as_mut_ptr(), + values, + bin_names, + &mut self.allocated_strings, + )?; + (*write_rec).ops = ops.as_mut_ptr(); + self.operations.push(ops); + } + Ok(()) + } + + pub(crate) fn add_write_list( + &mut self, + namespace: &CStr, + set: &CStr, + bin: &CStr, + key: &[Field], + bin_names: &[CString], + values: &[Vec], + ) -> Result<(), AerospikeSinkError> { + let write_rec = self.reserve_write(); + unsafe { + init_key( + addr_of_mut!((*write_rec).key), + namespace, + set, + key, + &mut self.allocated_strings, + )?; + let mut ops = AsOperations::new(1); + let list = as_arraylist_new(values.len().try_into().unwrap(), 0); + for record in values { + let map = new_record_map(record, bin_names, &mut self.allocated_strings)?; + as_arraylist_append(list, map as *mut as_val); + } + as_operations_add_write(ops.as_mut_ptr(), bin.as_ptr(), list as *mut as_bin_value); + (*write_rec).ops = ops.as_mut_ptr(); + self.operations.push(ops); + } + Ok(()) + } + + pub(crate) fn add_remove( + &mut self, + namespace: &CStr, + set: &CStr, + key: &[Field], + ) -> Result<(), AerospikeSinkError> { + let remove_rec = self.reserve_remove(); + unsafe { + init_key( + addr_of_mut!((*remove_rec).key), + namespace, + set, + key, + &mut self.allocated_strings, + )?; + } + Ok(()) + } + + pub(crate) fn execute(mut self) -> Result<(), AerospikeError> { + unsafe { self.client.write_batch(self.inner.take().unwrap().as_ptr()) } + } +} + +impl Drop for AsBatchRecords { + fn drop(&mut self) { + unsafe { as_batch_records_destroy(self.0.as_ptr()) } + } +} diff --git a/dozer-sink-aerospike/src/denorm_dag.rs b/dozer-sink-aerospike/src/denorm_dag.rs index 488eae0892..3b41675514 100644 --- a/dozer-sink-aerospike/src/denorm_dag.rs +++ b/dozer-sink-aerospike/src/denorm_dag.rs @@ -1,11 +1,8 @@ use std::collections::HashMap; use std::ffi::{CStr, CString, NulError}; -use std::ops::Deref; -use std::ptr::{addr_of_mut, NonNull}; -use aerospike_client_sys::*; use dozer_core::daggy::petgraph::Direction; -use dozer_core::daggy::{self, NodeIndex}; +use dozer_core::daggy::{self, EdgeIndex, NodeIndex}; use dozer_core::petgraph::visit::{ EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences, }; @@ -14,16 +11,16 @@ use dozer_types::indexmap::IndexMap; use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable}; use dozer_types::thiserror; use dozer_types::types::{Field, Record, Schema, TableOperation}; -use itertools::Itertools; +use itertools::{Either, Itertools}; use smallvec::SmallVec; use crate::aerospike::{ - as_batch_read_reserve, as_batch_records_create, as_batch_remove_reserve, - as_batch_write_reserve, as_vector_get, check_alloc, init_batch_write_operations, init_key, - parse_record, AerospikeError, AsOperations, BinNames, Client, + parse_record, parse_record_many, BinNames, Client, ReadBatch, ReadBatchResults, WriteBatch, }; use crate::AerospikeSinkError; +const MANY_LIST_BIN: &CStr = unsafe { CStr::from_bytes_with_nul_unchecked("data\0".as_bytes()) }; + #[derive(Debug, Clone)] struct CachedRecord { dirty: bool, @@ -31,25 +28,322 @@ struct CachedRecord { record: Option>, } -impl Deref for CachedRecord { - type Target = Option>; +#[derive(Debug, Clone, Default)] +struct OneToOneBatch(IndexMap, SmallVec<[CachedRecord; 2]>>); + +#[derive(Debug, Clone)] +enum ManyOp { + Add(Vec), + Remove(Vec), +} + +#[derive(Debug, Clone)] +struct ManyRecord { + version: usize, + ops: Vec, +} + +#[derive(Debug, Clone, Default)] +struct OneToManyEntry { + base: Option>>, + ops: Vec, +} + +#[derive(Debug, Clone, Default)] +struct OneToManyBatch(IndexMap, OneToManyEntry>); + +impl OneToManyBatch { + fn insert_point( + &mut self, + key: Vec, + version: usize, + ) -> (&mut OneToManyEntry, usize, usize) { + let entry = self.0.entry(key); + let idx = entry.index(); + let entry = entry.or_default(); + let insert_point = entry + .ops + .iter() + .position(|rec| rec.version >= version) + .unwrap_or(entry.ops.len()); + (entry, idx, insert_point) + } + + fn insert_local(&mut self, key: Vec, value: Vec, version: usize) -> usize { + let (entry, idx, insert_point) = self.insert_point(key, version); + match entry.ops.get_mut(insert_point) { + Some(entry) if entry.version == version => { + entry.ops.push(ManyOp::Add(value)); + } + _ => { + entry.ops.insert( + insert_point, + ManyRecord { + version, + ops: vec![ManyOp::Add(value)], + }, + ); + } + } + idx + } + + fn remove_local(&mut self, key: Vec, old_value: &[Field], version: usize) -> usize { + let (entry, idx, insert_point) = self.insert_point(key, version); + match entry.ops.get_mut(insert_point) { + Some(entry) if entry.version == version => { + if let Some(added) = entry + .ops + .iter() + .position(|entry| matches!(entry, ManyOp::Add(value) if value == old_value)) + { + let _ = entry.ops.swap_remove(added); + } else { + entry.ops.push(ManyOp::Remove(old_value.to_vec())); + } + } + _ => entry.ops.insert( + insert_point, + ManyRecord { + version, + ops: vec![ManyOp::Remove(old_value.to_vec())], + }, + ), + }; + idx + } + + fn replace_local( + &mut self, + key: Vec, + old_value: Vec, + new_value: Vec, + version: usize, + ) -> usize { + let (entry, idx, insert_point) = self.insert_point(key, version); + match entry.ops.get_mut(insert_point) { + Some(entry) if entry.version == version => { + if let Some(added) = entry + .ops + .iter_mut() + .find(|entry| matches!(entry, ManyOp::Add(value) if value == &old_value)) + { + *added = ManyOp::Add(new_value); + } else { + entry.ops.push(ManyOp::Remove(old_value)); + entry.ops.push(ManyOp::Add(new_value)); + } + } + _ => entry.ops.insert( + insert_point, + ManyRecord { + version, + ops: vec![ManyOp::Remove(old_value), ManyOp::Add(new_value)], + }, + ), + }; + idx + } + + fn insert_remote(&mut self, index: usize, value: Vec>) { + let (_, record) = self.0.get_index_mut(index).unwrap(); + record.base = Some(value); + } + + fn get(&self, key: &[Field], version: usize) -> Option>> { + let entry = self.0.get(key)?; + + Self::get_inner(entry, version) + } + fn get_index(&self, index: usize, version: usize) -> Option>> { + let (_, entry) = self.0.get_index(index)?; + + Self::get_inner(entry, version) + } + + fn get_inner(entry: &OneToManyEntry, version: usize) -> Option>> { + let mut recs = entry.base.clone()?; + for version in entry.ops.iter().take_while(|ops| ops.version <= version) { + for op in &version.ops { + match op { + ManyOp::Add(rec) => recs.push(rec.clone()), + ManyOp::Remove(to_remove) => { + if let Some(to_remove) = recs.iter().position(|rec| rec == to_remove) { + recs.swap_remove(to_remove); + } + } + } + } + } + if recs.is_empty() { + None + } else { + Some(recs.into_iter()) + } + } - fn deref(&self) -> &Self::Target { - &self.record + fn write( + &mut self, + record_batch: &mut WriteBatch, + schema: &AerospikeSchema, + ) -> Result<(), AerospikeSinkError> { + for (k, v) in self.0.drain(..) { + // We should always have a base, otherwise we can't do idempotent writes + let mut record = v.base.unwrap(); + // Apply ops + for version in v.ops { + for op in version.ops { + match op { + ManyOp::Add(rec) => { + record.push(rec); + } + ManyOp::Remove(rec) => { + if let Some(pos) = record.iter().position(|r| r == &rec) { + record.swap_remove(pos); + } + } + } + } + } + record_batch.add_write_list( + &schema.namespace, + &schema.set, + MANY_LIST_BIN, + &k, + schema.bins.names(), + &record, + )?; + } + Ok(()) } } #[derive(Debug, Clone)] -struct Node { - namespace: CString, - set: CString, - schema: Schema, - batch: IndexMap, SmallVec<[CachedRecord; 2]>>, - bins: BinNames, - denormalize_to: Option<(CString, CString)>, +enum CachedBatch { + One(OneToOneBatch), + Many(OneToManyBatch), +} +struct DirtyRecord<'a> { + idx: usize, + key: &'a [Field], + version: usize, +} + +impl CachedBatch { + fn iter_dirty(&self) -> impl Iterator { + match self { + Self::One(batch) => batch + .0 + .iter() + .enumerate() + .filter_map(|(i, (k, v))| Some((i, k, v.last()?))) + .filter(|(_, _, v)| v.dirty) + .map(|(i, k, v)| DirtyRecord { + idx: i, + key: k, + version: v.version, + }), + Self::Many(_) => unimplemented!(), + } + } + + fn remove_local(&mut self, key: Vec, old_value: &[Field], version: usize) -> usize { + match self { + Self::One(batch) => batch.insert_local(key, None, version), + Self::Many(batch) => batch.remove_local(key, old_value, version), + } + } + + fn insert_local(&mut self, key: Vec, value: Vec, version: usize) -> usize { + match self { + Self::One(batch) => batch.insert_local(key, Some(value), version), + Self::Many(batch) => batch.insert_local(key, value, version), + } + } + + fn replace_local( + &mut self, + key: Vec, + old_value: Vec, + new_value: Vec, + version: usize, + ) -> usize { + match self { + Self::One(batch) => batch.insert_impl(key, Some(new_value), version, true, true), + Self::Many(batch) => batch.replace_local(key, old_value, new_value, version), + } + } + + fn clear(&mut self) { + match self { + Self::One(batch) => batch.clear(), + Self::Many(batch) => batch.0.clear(), + } + } + + fn len(&self) -> usize { + match self { + Self::One(batch) => batch.len(), + Self::Many(batch) => batch.0.len(), + } + } + + fn write( + &mut self, + record_batch: &mut WriteBatch, + schema: &AerospikeSchema, + ) -> Result<(), AerospikeSinkError> { + match self { + Self::One(batch) => batch.write(record_batch, schema), + Self::Many(batch) => batch.write(record_batch, schema), + } + } + + fn get<'a>( + &'a self, + key: &[Field], + version: usize, + ) -> Option> + 'a> { + match self { + Self::One(batch) => { + let record = batch.get(key, version)?.record.clone()?; + Some(Either::Left(std::iter::once(record))) + } + Self::Many(batch) => Some(Either::Right(batch.get(key, version)?)), + } + } + + fn get_index( + &self, + index: usize, + version: usize, + ) -> Option> + '_> { + match self { + Self::One(batch) => { + let record = batch.get_index(index, version)?.record.clone()?; + Some(Either::Left(std::iter::once(record))) + } + Self::Many(batch) => Some(Either::Right(batch.get_index(index, version)?)), + } + } + + fn should_update_at(&mut self, key: Vec, version: usize) -> (bool, usize) { + match self { + Self::One(batch) => { + let (index, exists) = batch.index_or_default(key, version); + (!exists, index) + } + // For a many batch, we always need the base from the remote + Self::Many(batch) => { + let entry = batch.0.entry(key); + let idx = entry.index(); + (entry.or_default().base.is_none(), idx) + } + } + } } -impl Node { +impl OneToOneBatch { fn insert_local( &mut self, key: Vec, @@ -67,7 +361,7 @@ impl Node { replace: bool, dirty: bool, ) -> usize { - let entry = self.batch.entry(key); + let entry = self.0.entry(key); let idx = entry.index(); let versions = entry.or_default(); let record = CachedRecord { @@ -95,15 +389,85 @@ impl Node { idx } - fn insert_remote(&mut self, key: Vec, value: Option>) -> usize { - self.insert_impl(key, value, 0, false, false) + fn insert_remote(&mut self, index: usize, value: Option>) { + let (_, versions) = self.0.get_index_mut(index).unwrap(); + versions.insert( + 0, + CachedRecord { + dirty: false, + version: 0, + record: value, + }, + ); + } + + fn get<'a>(&'a self, key: &[Field], version: usize) -> Option<&'a CachedRecord> { + let versions = self.0.get(key)?; + // Find the last version thats <= version + versions.iter().take_while(|v| v.version <= version).last() } - fn get(&self, key: &[Field], version: usize) -> Option<&CachedRecord> { - let versions = self.batch.get(key)?; + fn get_index(&self, index: usize, version: usize) -> Option<&CachedRecord> { + let (_, versions) = self.0.get_index(index)?; // Find the last version thats <= version - versions.iter().filter(|v| v.version <= version).last() + versions.iter().take_while(|v| v.version <= version).last() + } + + /// Returns the index at which the entry for the given key exists, + /// or was created and whether it existed + fn index_or_default(&mut self, key: Vec, version: usize) -> (usize, bool) { + let entry = self.0.entry(key); + let idx = entry.index(); + let versions = entry.or_default(); + (idx, versions.first().is_some_and(|v| v.version <= version)) + } + + fn clear(&mut self) { + self.0.clear() + } + + fn len(&self) -> usize { + self.0.len() } + + fn write( + &mut self, + batch: &mut WriteBatch, + schema: &AerospikeSchema, + ) -> Result<(), AerospikeSinkError> { + for (key, dirty_record) in self.0.drain(..).filter_map(|(key, mut rec)| { + let last_version = rec.pop()?; + last_version.dirty.then_some((key, last_version.record)) + }) { + if let Some(dirty_record) = dirty_record { + batch.add_write( + &schema.namespace, + &schema.set, + schema.bins.names(), + &key, + &dirty_record, + )?; + } else { + batch.add_remove(&schema.namespace, &schema.set, &key)?; + } + } + Ok(()) + } +} + +#[derive(Debug, Clone)] +struct AerospikeSchema { + namespace: CString, + set: CString, + bins: BinNames, +} + +#[derive(Debug, Clone)] +struct Node { + schema: Schema, + batch: CachedBatch, + as_schema: AerospikeSchema, + denormalize_to: Option<(CString, CString, Vec)>, } #[derive(Debug, Clone, PartialEq, Hash, Eq)] @@ -151,7 +515,7 @@ pub(crate) enum Error { pub(crate) struct DenormalizationState { dag: DenormDag, current_transaction: Option, - base_tables: Vec<(NodeIndex, Vec)>, + base_tables: Vec<(NodeIndex, Vec, Vec)>, transaction_counter: usize, } @@ -160,16 +524,25 @@ pub(crate) struct DenormalizedTable { pub(crate) bin_names: Vec, pub(crate) namespace: CString, pub(crate) set: CString, - pub(crate) records: Vec<(Vec, Vec)>, + pub(crate) records: Vec>, + pub(crate) pk: Vec, } type DenormDag = daggy::Dag; fn bin_names_recursive(dag: &DenormDag, nid: NodeIndex, bins: &mut Vec) { - let mut neighbors = dag.neighbors_directed(nid, Direction::Outgoing).detach(); - while let Some((edge, node)) = neighbors.next(dag.graph()) { - let edge = dag.edge_weight(edge).unwrap(); - bins.extend_from_slice(edge.bins.names()); - bin_names_recursive(dag, node, bins); + for edge in dag.edges_directed(nid, Direction::Outgoing) { + bins.extend_from_slice(edge.weight().bins.names()); + bin_names_recursive(dag, edge.target(), bins); + } +} + +impl DenormalizationState { + fn node(&self, index: NodeIndex) -> &Node { + self.dag.node_weight(index).unwrap() + } + + fn edge(&self, index: EdgeIndex) -> &Edge { + self.dag.edge_weight(index).unwrap() } } @@ -180,14 +553,22 @@ impl DenormalizationState { let base_tables: Vec<_> = dag .node_references() // Filter out non-base-tables - .filter_map(|(i, node)| node.denormalize_to.is_some().then_some(i)) + .filter_map(|(i, node)| node.denormalize_to.as_ref().map(|(_, _, pk)| (i, pk))) // Find all added bin names using a depth-first search - .map(|id| { - let mut bin_names = dag.node_weight(id).unwrap().bins.names().to_vec(); + .map(|(id, pk)| -> Result<_, Error> { + let mut bin_names = dag.node_weight(id).unwrap().as_schema.bins.names().to_vec(); bin_names_recursive(&dag, id, &mut bin_names); - (id, bin_names) + let mut primary_key = Vec::new(); + for key in pk { + let idx = bin_names + .iter() + .position(|bin| bin.to_str().is_ok_and(|bin| bin == key)) + .ok_or_else(|| Error::FieldNotFound(key.clone()))?; + primary_key.push(idx); + } + Ok((id, bin_names, primary_key)) }) - .collect(); + .try_collect()?; Ok(Self { dag, current_transaction: None, @@ -200,24 +581,35 @@ impl DenormalizationState { let mut dag: daggy::Dag = daggy::Dag::new(); let mut node_by_name = HashMap::new(); for (table, schema) in tables.iter() { - let bin_names = BinNames::new(schema.fields.iter().map(|field| &field.name))?; + let bin_names = BinNames::new(schema.fields.iter().map(|field| field.name.as_str()))?; let denormalize_to = table .write_denormalized_to .as_ref() .map(|to| -> Result<_, Error> { - let AerospikeSet { namespace, set } = to; + let AerospikeSet { + namespace, + set, + primary_key, + } = to; Ok(( CString::new(namespace.as_str())?, CString::new(set.as_str())?, + primary_key.clone(), )) }) .transpose()?; let idx = dag.add_node(Node { - namespace: CString::new(table.namespace.as_str())?, - set: CString::new(table.set_name.as_str())?, + as_schema: AerospikeSchema { + namespace: CString::new(table.namespace.as_str())?, + set: CString::new(table.set_name.as_str())?, + bins: bin_names, + }, schema: schema.clone(), - batch: IndexMap::new(), - bins: bin_names, + batch: if table.aggregate_by_pk { + CachedBatch::Many(OneToManyBatch::default()) + } else { + CachedBatch::One(OneToOneBatch::default()) + }, denormalize_to, }); @@ -275,9 +667,7 @@ impl DenormalizationState { for (denorm_idx, lookup_idx) in key_idx.iter().zip(&from_schema.primary_index) { let denorm_field = &schema.fields[*denorm_idx]; let lookup_field = &from_schema.fields[*lookup_idx]; - if denorm_field.typ != lookup_field.typ - || denorm_field.nullable != lookup_field.nullable - { + if denorm_field.typ != lookup_field.typ { return Err(mismatch_err()); } } @@ -321,126 +711,13 @@ impl DenormalizationState { } } -pub(crate) struct RecordBatch { - inner: NonNull, - allocated_strings: Vec, - operations: Vec, -} - -impl RecordBatch { - pub(crate) fn new(capacity: u32, n_strings_estimate: u32) -> Self { - let ptr = unsafe { NonNull::new(as_batch_records_create(capacity)).unwrap() }; - Self { - inner: ptr, - allocated_strings: Vec::with_capacity(n_strings_estimate as usize), - operations: Vec::with_capacity(capacity as usize), - } - } - - pub(crate) unsafe fn as_mut_ptr(&mut self) -> *mut as_batch_records { - self.inner.as_ptr() - } - - #[inline(always)] - pub(crate) unsafe fn inner(&self) -> &as_batch_records { - self.inner.as_ref() - } - - pub(crate) fn reserve_read(&mut self) -> *mut as_batch_read_record { - unsafe { check_alloc(as_batch_read_reserve(self.as_mut_ptr())) } - } - - pub(crate) fn reserve_write(&mut self) -> *mut as_batch_write_record { - unsafe { check_alloc(as_batch_write_reserve(self.as_mut_ptr())) } - } - - pub(crate) fn reserve_remove(&mut self) -> *mut as_batch_remove_record { - unsafe { check_alloc(as_batch_remove_reserve(self.as_mut_ptr())) } - } - - pub(crate) fn add_write( - &mut self, - namespace: &CStr, - set: &CStr, - bin_names: &[CString], - key: &[Field], - values: &[Field], - ) -> Result<(), AerospikeSinkError> { - let write_rec = self.reserve_write(); - unsafe { - init_key( - addr_of_mut!((*write_rec).key), - namespace, - set, - key, - &mut self.allocated_strings, - )?; - let mut ops = AsOperations::new(values.len().try_into().unwrap()); - init_batch_write_operations( - ops.as_mut_ptr(), - values, - bin_names, - &mut self.allocated_strings, - )?; - (*write_rec).ops = ops.as_mut_ptr(); - self.operations.push(ops); - } - Ok(()) - } - - fn add_remove( - &mut self, - namespace: &CStr, - set: &CStr, - key: &[Field], - ) -> Result<(), AerospikeSinkError> { - let remove_rec = self.reserve_remove(); - unsafe { - init_key( - addr_of_mut!((*remove_rec).key), - namespace, - set, - key, - &mut self.allocated_strings, - )?; - } - Ok(()) - } - - fn add_read_all( - &mut self, - namespace: &CStr, - set: &CStr, - key: &[Field], - ) -> Result<(), AerospikeSinkError> { - let read_rec = self.reserve_read(); - unsafe { - init_key( - addr_of_mut!((*read_rec).key), - namespace, - set, - key, - &mut self.allocated_strings, - )?; - (*read_rec).read_all_bins = true; - } - Ok(()) - } -} - -impl Drop for RecordBatch { - fn drop(&mut self) { - unsafe { as_batch_records_destroy(self.inner.as_ptr()) } - } -} - #[derive(Clone)] struct BatchLookup { - base_table: usize, - index: usize, + node: NodeIndex, + nodebatch_idx: usize, version: usize, - key: Option>, - batch_read_index: Option, + readbatch_idx: Option, + follow: bool, } impl DenormalizationState { @@ -448,7 +725,8 @@ impl DenormalizationState { let node = self.dag.node_weight_mut(node_id).unwrap(); let idx = new.get_key_fields(&node.schema); - node.insert_local(idx, Some(new.values.clone()), self.transaction_counter); + node.batch + .insert_local(idx, new.values, self.transaction_counter); } pub(crate) fn process(&mut self, op: TableOperation) -> Result<(), AerospikeSinkError> { @@ -459,7 +737,8 @@ impl DenormalizationState { let node = self.dag.node_weight_mut(node_id).unwrap(); let schema = &node.schema; let idx = old.get_key_fields(schema); - node.insert_local(idx, None, self.transaction_counter); + node.batch + .remove_local(idx, &old.values, self.transaction_counter); } dozer_types::types::Operation::Insert { new } => { self.do_insert(node_id, new); @@ -475,7 +754,8 @@ impl DenormalizationState { new: new_pk.clone(), }); } - node.insert_local(new_pk, Some(new.values), self.transaction_counter); + node.batch + .replace_local(new_pk, old.values, new.values, self.transaction_counter); } dozer_types::types::Operation::BatchInsert { new } => { for value in new { @@ -492,6 +772,18 @@ impl DenormalizationState { } } pub(crate) fn persist(&mut self, client: &Client) -> Result<(), AerospikeSinkError> { + let mut read_batch = ReadBatch::new(client, 0, None); + let mut lookups = Vec::new(); + self.add_manynode_base_lookups(&mut read_batch, &mut lookups)?; + let read_results = read_batch.execute()?; + for lookup in lookups { + self.update_from_lookup( + lookup.readbatch_idx.unwrap(), + lookup.node, + &read_results, + lookup.nodebatch_idx, + )?; + } let batch_size_upper_bound: usize = self .dag .node_references() @@ -499,32 +791,15 @@ impl DenormalizationState { .sum(); let batch_size: u32 = batch_size_upper_bound.try_into().unwrap(); - let mut write_batch = RecordBatch::new(batch_size, batch_size); + let mut write_batch = WriteBatch::new(client, batch_size, None); for node in self.dag.node_weights_mut() { // Only write if the last version is dirty (the newest version was changed by this // batch) - for (key, dirty_record) in node.batch.drain(..).filter_map(|(key, mut rec)| { - let last_version = rec.pop()?; - last_version.dirty.then_some((key, last_version.record)) - }) { - if let Some(dirty_record) = dirty_record { - write_batch.add_write( - &node.namespace, - &node.set, - node.bins.names(), - &key, - &dirty_record, - )?; - } else { - write_batch.add_remove(&node.namespace, &node.set, &key)?; - } - } + node.batch.write(&mut write_batch, &node.as_schema)?; } - unsafe { - client.write_batch(write_batch.as_mut_ptr())?; - } + write_batch.execute()?; self.transaction_counter = 0; Ok(()) } @@ -534,159 +809,233 @@ impl DenormalizationState { client: &Client, ) -> Result, AerospikeSinkError> { let mut lookups = Vec::new(); - for (base_table_idx, (nid, _)) in self.base_tables.iter().enumerate() { - let node = self.dag.node_weight(*nid).unwrap(); - let node_keys = node - .batch - .iter() - .enumerate() - .filter_map(|(i, (k, v))| Some((i, k, v.last()?))) - .filter(|(_, _, v)| v.dirty) - .map(|(i, k, v)| BatchLookup { - base_table: base_table_idx, - index: i, - version: v.version, - key: Some(k.clone()), - batch_read_index: None, - }) - .collect_vec(); - let n_cols = node.schema.fields.len(); - lookups.push((*nid, (0..n_cols).collect_vec(), node_keys)); + for (nid, _, _) in &self.base_tables { + let node = self.node(*nid); + let node_keys = node.batch.iter_dirty().map( + |DirtyRecord { + idx, + key: _, + version, + }| BatchLookup { + version, + node: *nid, + nodebatch_idx: idx, + readbatch_idx: None, + follow: true, + }, + ); + lookups.extend(node_keys); } - let mut results: Vec = self - .base_tables - .iter() - .zip(lookups.iter()) - .map(|((nid, bin_names), (_, _, keys))| { - let node = self.dag.node_weight(*nid).unwrap(); - let (namespace, set) = node.denormalize_to.clone().unwrap(); - DenormalizedTable { - bin_names: bin_names.clone(), - namespace, - set, - records: keys - .iter() - .map(|key| { - ( - key.key.clone().unwrap(), - Vec::with_capacity(bin_names.len()), - ) - }) - .collect(), - } - }) - .collect(); - let mut batch = RecordBatch::new(0, 0); + let mut n_lookups = 0; + let mut batch = ReadBatch::new(client, 0, None); while !lookups.is_empty() { - let mut new_lookups = Vec::new(); - let batch_size: usize = lookups.iter().map(|(_, _, keys)| keys.len()).sum(); - let mut new_batch = RecordBatch::new(batch_size as u32, batch_size as u32); - let mut batch_idx = 0; - for (nid, fields, node_lookups) in lookups { - let node = self.dag.node_weight_mut(nid).unwrap(); - for lookup in node_lookups.iter().cloned() { - let BatchLookup { - base_table, - index, - version, - key, - batch_read_index, - } = lookup; - // Update the node, if we retrieved from the remote - if let Some(batch_read_index) = batch_read_index { - unsafe { - let rec = as_vector_get( - &batch.inner().list as *const as_vector, - batch_read_index, - ) as *const as_batch_read_record; - let result = (*rec).result; - if result == as_status_e_AEROSPIKE_ERR_RECORD_NOT_FOUND { - node.insert_remote(key.clone().unwrap(), None); - } else if result == as_status_e_AEROSPIKE_OK { - node.insert_remote( - key.clone().unwrap(), - Some(parse_record(&(*rec).record, &node.schema, &node.bins)?), - ); - } else { - return Err(AerospikeError::from_code(result).into()); - } - } - } - // Look up in node and add to new batch. If it wasn't in there, we looked - // it up remotely, so this is always `Some` - let record = key.as_ref().map(|key| node.get(key, version).unwrap()); - let base_fields = &mut results[base_table].records[index].1; - if let Some(record) = record.and_then(|record| record.record.as_ref()) { - let denorm_fields = fields.iter().copied().map(|i| record[i].clone()); - base_fields.extend(denorm_fields); - } else { - base_fields.extend(std::iter::repeat(Field::Null).take(fields.len())); - } + let batch_results = batch.execute()?; + let mut new_lookups = Vec::with_capacity(lookups.len()); + let mut new_batch = ReadBatch::new(client, lookups.len().try_into().unwrap(), None); + + // For persisting, we need all many-node baselines, so put them in the + // first batch + if n_lookups == 0 { + self.add_manynode_base_lookups(&mut new_batch, &mut new_lookups)?; + } + for BatchLookup { + node: nid, + nodebatch_idx, + version, + readbatch_idx, + follow, + } in lookups + { + // Update the node's local batch + if let Some(readbatch_idx) = readbatch_idx { + self.update_from_lookup(readbatch_idx, nid, &batch_results, nodebatch_idx)?; } - let node = self.dag.node_weight(nid).unwrap(); - - for edge in self.dag.edges_directed(nid, Direction::Outgoing) { - let mut new_node_lookups = Vec::with_capacity(node_lookups.len()); - let key_fields = &edge.weight().key_fields; - let target_node = self.dag.node_weight(edge.target()).unwrap(); - for lookup in &node_lookups { - let BatchLookup { - base_table, - index, - version, - key, - batch_read_index: _, - } = lookup; - let (new_key, new_batch_read_index) = if let Some(record) = key - .as_ref() - .and_then(|key| node.get(key, *version).unwrap().record.as_ref()) - { - let new_key: Vec = key_fields - .iter() - .copied() - .map(|i| record[i].clone()) - .collect(); - let batch_id = if target_node.get(&new_key, *version).is_none() { - new_batch.add_read_all( - &target_node.namespace, - &target_node.set, - &new_key, - )?; - let idx = batch_idx; - batch_idx += 1; - Some(idx) - } else { - None - }; - (Some(new_key), batch_id) + if !follow { + continue; + } + let Some(values) = self.node(nid).batch.get_index(nodebatch_idx, version) else { + continue; + }; + let values = values.collect_vec(); + let mut edges = self + .dag + .neighbors_directed(nid, Direction::Outgoing) + .detach(); + while let Some((edge, target)) = edges.next(self.dag.graph()) { + for value in &values { + let key = self + .edge(edge) + .key_fields + .iter() + .copied() + .map(|i| value[i].clone()) + .collect_vec(); + let (should_update, batch_idx) = self + .dag + .node_weight_mut(target) + .unwrap() + .batch + .should_update_at(key.clone(), version); + let target_schema = &self.node(target).as_schema; + let batch_read_index = if should_update { + Some(new_batch.add_read_all( + &target_schema.namespace, + &target_schema.set, + &key, + )?) } else { - (None, None) + None }; - new_node_lookups.push(BatchLookup { - base_table: *base_table, - index: *index, - version: *version, - key: new_key, - batch_read_index: new_batch_read_index, - }); + + new_lookups.push(BatchLookup { + node: target, + nodebatch_idx: batch_idx, + version, + readbatch_idx: batch_read_index, + follow: true, + }) } - new_lookups.push(( - edge.target(), - edge.weight().field_indices.clone(), - new_node_lookups, - )); } } lookups = new_lookups; batch = new_batch; - if batch_idx > 0 { - unsafe { - client.batch_get(batch.as_mut_ptr())?; + n_lookups += 1; + } + + let mut res = Vec::new(); + // Recursively collect results + for (nid, bin_names, pk) in &self.base_tables { + let mut results = Vec::new(); + let node = self.node(*nid); + for DirtyRecord { + idx: _, + key, + version, + } in node.batch.iter_dirty() + { + let field_indices = (0..node.schema.fields.len()).collect_vec(); + results.append(&mut self.recurse_lookup(&field_indices, *nid, key, version)) + } + let (namespace, set, _) = node.denormalize_to.clone().unwrap(); + res.push(DenormalizedTable { + bin_names: bin_names.clone(), + namespace, + set, + records: results, + pk: pk.clone(), + }) + } + Ok(res) + } + + fn add_manynode_base_lookups( + &mut self, + read_batch: &mut ReadBatch<'_>, + lookups: &mut Vec, + ) -> Result<(), AerospikeSinkError> { + for (i, node) in self.dag.node_references() { + if let CachedBatch::Many(node_batch) = &node.batch { + for (batch_idx, key) in node_batch + .0 + .iter() + .enumerate() + .filter_map(|(i, (key, entry))| entry.base.is_none().then_some((i, key))) + { + let batch_read_index = read_batch.add_read_all( + &node.as_schema.namespace, + &node.as_schema.set, + key, + )?; + lookups.push(BatchLookup { + node: i, + nodebatch_idx: batch_idx, + version: 0, + readbatch_idx: Some(batch_read_index), + follow: false, + }); } } } - Ok(results) + Ok(()) + } + + fn update_from_lookup( + &mut self, + readbatch_idx: usize, + nid: NodeIndex, + batch_results: &ReadBatchResults, + nodebatch_idx: usize, + ) -> Result<(), AerospikeSinkError> { + let node = self.dag.node_weight_mut(nid).unwrap(); + let rec = batch_results.get(readbatch_idx)?; + match &mut node.batch { + CachedBatch::One(batch) => batch.insert_remote( + nodebatch_idx, + rec.map(|rec| -> Result<_, Error> { + parse_record(rec, &node.schema, &node.as_schema.bins) + }) + .transpose()?, + ), + CachedBatch::Many(batch) => batch.insert_remote( + nodebatch_idx, + rec.map(|rec| -> Result<_, Error> { + parse_record_many(rec, &node.schema, MANY_LIST_BIN, &node.as_schema.bins) + }) + .transpose()? + .unwrap_or_default(), + ), + } + Ok(()) + } + + fn recurse_lookup( + &self, + field_indices: &[usize], + node_id: NodeIndex, + key: &[Field], + version: usize, + ) -> Vec> { + let node = self.node(node_id); + let records = { + match node.batch.get(key, version) { + Some(t) => Either::Right(t), + None => Either::Left(std::iter::once(vec![Field::Null; node.schema.fields.len()])), + } + }; + + let mut result = Vec::new(); + for record in records { + let mut results_per_edge = Vec::new(); + for edge in self.dag.edges_directed(node_id, Direction::Outgoing) { + let key = edge + .weight() + .key_fields + .iter() + .map(|i| record[*i].clone()) + .collect_vec(); + let edge_results = + self.recurse_lookup(&edge.weight().field_indices, edge.target(), &key, version); + results_per_edge.push(edge_results); + } + + let mut record_result = vec![field_indices + .iter() + .map(|i| record[*i].clone()) + .collect_vec()]; + for edge_result in results_per_edge { + record_result = record_result + .into_iter() + .cartesian_product(edge_result) + .map(|(mut old, mut new)| { + old.append(&mut new); + old + }) + .collect_vec(); + } + result.append(&mut record_result); + } + result } pub(crate) fn commit(&mut self) { @@ -699,10 +1048,7 @@ mod tests { use std::ffi::CString; use dozer_types::{ - models::sink::{ - AerospikeDenormalizations, AerospikeSet, AerospikeSinkTable, DenormColumn, DenormKey, - }, - rust_decimal::Decimal, + models::sink::AerospikeSinkTable, types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, TableOperation, @@ -713,216 +1059,426 @@ mod tests { use super::DenormalizationState; - #[test] - #[ignore] - fn test_denorm() { - let mut customer_schema = Schema::new(); - customer_schema - .field( - FieldDefinition::new( - "id".into(), - FieldType::String, - false, - SourceDefinition::Dynamic, - ), + macro_rules! schema_row { + ($schema:expr, $f:literal: $t:ident PRIMARY_KEY) => { + $schema.field( + FieldDefinition::new($f.into(), FieldType::$t, true, SourceDefinition::Dynamic), true, - ) - .field( - FieldDefinition::new( - "phone_number".into(), - FieldType::String, - false, - SourceDefinition::Dynamic, - ), - false, ); + }; - let mut account_schema = Schema::new(); - account_schema - .field( - FieldDefinition::new( - "id".into(), - FieldType::UInt, - false, - SourceDefinition::Dynamic, - ), - true, - ) - .field( - FieldDefinition::new( - "customer_id".into(), - FieldType::String, - false, - SourceDefinition::Dynamic, - ), - false, - ) - .field( - FieldDefinition::new( - "transaction_limit".into(), - FieldType::UInt, - true, - SourceDefinition::Dynamic, - ), + ($schema:expr, $f:literal: $t:ident) => { + $schema.field( + FieldDefinition::new($f.into(), FieldType::$t, true, SourceDefinition::Dynamic), false, ); - let mut transaction_schema = Schema::new(); - transaction_schema + }; + } + macro_rules! schema { + ($($f:literal: $t:ident $($pk:ident)?),+$(,)?) => {{ + let mut schema = Schema::new(); + $(schema_row!(schema, $f: $t $($pk)?));+; + schema + }}; + } + + trait Table { + fn schema() -> Schema; + fn to_row(&self) -> Vec; + fn to_record(&self) -> Record { + Record::new(self.to_row()) + } + } + struct Customer { + id: &'static str, + phone_number: &'static str, + } + + impl Table for Customer { + fn schema() -> Schema { + schema! { + "id": String PRIMARY_KEY, + "phone_number": String + } + } + fn to_row(&self) -> Vec { + vec![ + Field::String(self.id.to_owned()), + Field::String(self.phone_number.to_owned()), + ] + } + } + + struct AccountOwner { + account_id: u64, + customer_id: &'static str, + transaction_limit: Option, + } + + impl Table for AccountOwner { + fn schema() -> Schema { + schema! { + "account_id": UInt PRIMARY_KEY, + "customer_id": String, + "transaction_limit": UInt + } + } + + fn to_row(&self) -> Vec { + vec![ + Field::UInt(self.account_id), + Field::String(self.customer_id.to_owned()), + self.transaction_limit.map_or(Field::Null, Field::UInt), + ] + } + } + + struct Transaction { + id: u64, + account_id: u64, + amount: &'static str, + } + + impl Table for Transaction { + fn schema() -> Schema { + schema! { + "id": UInt PRIMARY_KEY, + "account_id": UInt, + "amount": Decimal + } + } + + fn to_row(&self) -> Vec { + vec![ + Field::UInt(self.id), + Field::UInt(self.account_id), + Field::Decimal(self.amount.try_into().unwrap()), + ] + } + } + + #[derive(Debug)] + struct DenormResult { + id: u64, + account_id: u64, + amount: &'static str, + customer_id: Option<&'static str>, + transaction_limit: Option, + phone_number: Option<&'static str>, + } + + impl Table for DenormResult { + fn schema() -> Schema { + schema! { + "id": UInt PRIMARY_KEY, + "account_id": UInt, + "amount": Decimal, + "customer_id": String PRIMARY_KEY, + "transaction_limit": UInt, + "phone_number": String + } + } + + fn to_row(&self) -> Vec { + vec![ + Field::UInt(self.id), + Field::UInt(self.account_id), + Field::Decimal(self.amount.try_into().unwrap()), + self.customer_id + .map_or(Field::Null, |s| Field::String(s.to_owned())), + self.transaction_limit.map_or(Field::Null, Field::UInt), + self.phone_number + .map_or(Field::Null, |s| Field::String(s.to_owned())), + ] + } + } + + impl PartialEq> for DenormalizedTable { + fn eq(&self, other: &Vec) -> bool { + other.eq(self) + } + } + + impl PartialEq for Vec { + fn eq(&self, other: &DenormalizedTable) -> bool { + let DenormalizedTable { + bin_names, + namespace: _, + set: _, + records, + pk, + } = other; + bin_names + .iter() + .map(|name| name.to_str().unwrap()) + .eq(DenormResult::schema() + .fields + .iter() + .map(|field| field.name.as_str())) + && records + .iter() + .cloned() + .eq(self.iter().map(|rec| rec.to_row())) + && pk == &DenormResult::schema().primary_index + } + } + + fn client() -> Client { + let client = Client::new(&CString::new("localhost:3000").unwrap()).unwrap(); + let mut response = std::ptr::null_mut(); + let request = "truncate-namespace:namespace=test"; + let request = CString::new(request).unwrap(); + unsafe { + client.info(&request, &mut response).unwrap(); + } + client + } + + fn lookup_table(name: &str) -> (AerospikeSinkTable, Schema) { + let mut schema = Schema::new(); + schema .field( - FieldDefinition::new( - "id".into(), - FieldType::UInt, - false, - SourceDefinition::Dynamic, - ), + FieldDefinition { + name: "id".into(), + typ: FieldType::UInt, + nullable: false, + source: SourceDefinition::Dynamic, + }, true, ) .field( FieldDefinition::new( - "account_id".into(), + format!("{name}_value"), FieldType::UInt, false, SourceDefinition::Dynamic, ), false, - ) - .field( - FieldDefinition::new( - "amount".into(), - FieldType::Decimal, - false, - SourceDefinition::Dynamic, - ), - false, ); + ( + dozer_types::serde_yaml::from_str(&format!( + r#" + source_table_name: + namespace: test + set_name: {name} + primary_key: + - id + "#, + )) + .unwrap(), + schema, + ) + } + #[test] + #[ignore] + fn test_denorm_order() { let tables = vec![ ( - AerospikeSinkTable { - source_table_name: "".into(), - namespace: "test".into(), - set_name: "customers".into(), - denormalize: vec![], - write_denormalized_to: None, - primary_key: vec![], - }, - customer_schema, - ), - ( - AerospikeSinkTable { - source_table_name: "".into(), - namespace: "test".into(), - set_name: "accounts".into(), - denormalize: vec![AerospikeDenormalizations { - from_namespace: "test".into(), - from_set: "customers".into(), - key: DenormKey::Simple("customer_id".into()), - columns: vec![DenormColumn::Direct("phone_number".into())], - }], - write_denormalized_to: None, - primary_key: vec![], - }, - account_schema, - ), - ( - AerospikeSinkTable { - source_table_name: "".into(), - namespace: "test".into(), - set_name: "transactions".into(), - denormalize: vec![AerospikeDenormalizations { - from_namespace: "test".into(), - from_set: "accounts".into(), - key: DenormKey::Simple("account_id".into()), - columns: vec![DenormColumn::Direct("transaction_limit".into())], - }], - write_denormalized_to: Some(AerospikeSet { - namespace: "test".into(), - set: "transactions_denorm".into(), - }), - primary_key: vec![], + dozer_types::serde_yaml::from_str( + r#" + source_table_name: + namespace: test + set_name: base + primary_key: + - id + denormalize: + - from_namespace: test + from_set: lookup_0 + key: lookup_0_id + columns: [lookup_0_value] + - from_namespace: test + from_set: lookup_1 + key: lookup_1_id + columns: [lookup_1_value] + write_denormalized_to: + primary_key: [id] + namespace: test + set: denorm + "#, + ) + .unwrap(), + schema! { + "id": UInt PRIMARY_KEY, + "base_value": UInt, + "lookup_0_id": UInt, + "lookup_1_id": UInt, }, - transaction_schema, ), + lookup_table("lookup_0"), + lookup_table("lookup_1"), ]; + let mut state = DenormalizationState::new(&tables).unwrap(); - // Customers state .process(TableOperation { id: None, op: Operation::Insert { - new: dozer_types::types::Record::new(vec![ - Field::String("1001".into()), - Field::String("+1234567".into()), + new: Record::new(vec![ + Field::UInt(1), + Field::UInt(1), + Field::UInt(100), + Field::UInt(200), ]), }, port: 0, }) .unwrap(); - // Accounts state .process(TableOperation { id: None, op: Operation::Insert { - new: dozer_types::types::Record::new(vec![ - Field::UInt(101), - Field::String("1001".into()), - Field::Null, - ]), + new: Record::new(vec![Field::UInt(100), Field::UInt(1000)]), }, port: 1, }) .unwrap(); - // Transactions state .process(TableOperation { id: None, op: Operation::Insert { - new: Record::new(vec![ - Field::UInt(1), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - ]), + new: Record::new(vec![Field::UInt(200), Field::UInt(2000)]), }, port: 2, }) .unwrap(); - let client = Client::new(&CString::new("localhost:3000").unwrap()).unwrap(); - let res = state.perform_denorm(&client).unwrap(); + + let client = &client(); assert_eq!( - res, + state.perform_denorm(client).unwrap(), vec![DenormalizedTable { bin_names: vec![ CString::new("id").unwrap(), - CString::new("account_id").unwrap(), - CString::new("amount").unwrap(), - CString::new("transaction_limit").unwrap(), - CString::new("phone_number").unwrap(), + CString::new("base_value").unwrap(), + CString::new("lookup_0_id").unwrap(), + CString::new("lookup_1_id").unwrap(), + CString::new("lookup_1_value").unwrap(), + CString::new("lookup_0_value").unwrap(), ], - records: vec![( - vec![Field::UInt(1)], - vec![ - Field::UInt(1), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - Field::Null, - Field::String("+1234567".into()) - ] - )], namespace: CString::new("test").unwrap(), - set: CString::new("transactions_denorm").unwrap() + set: CString::new("denorm").unwrap(), + records: vec![vec![ + Field::UInt(1), + Field::UInt(1), + Field::UInt(100), + Field::UInt(200), + Field::UInt(2000), + Field::UInt(1000), + ]], + pk: vec![0], }] ); + } + + #[test] + #[ignore] + fn test_denorm_missing() { + let mut state = state(); + let client = client(); + + state + .process(TableOperation { + id: None, + op: Operation::Insert { + new: Transaction { + id: 0, + account_id: 100, + amount: "10.01", + } + .to_record(), + }, + port: 2, + }) + .unwrap(); + + assert_eq!( + state.perform_denorm(&client).unwrap(), + vec![vec![DenormResult { + id: 0, + account_id: 100, + amount: "10.01", + customer_id: None, + transaction_limit: None, + phone_number: None, + }]] + ) + } + + #[test] + #[ignore] + fn test_denorm() { + let mut state = state(); + let client = client(); + // Customers + state + .process(TableOperation { + id: None, + op: Operation::Insert { + new: Customer { + id: "1001", + phone_number: "+1234567", + } + .to_record(), + }, + port: 0, + }) + .unwrap(); + // Accounts + state + .process(TableOperation { + id: None, + op: Operation::Insert { + new: AccountOwner { + account_id: 101, + customer_id: "1001", + transaction_limit: None, + } + .to_record(), + }, + port: 1, + }) + .unwrap(); + state.persist(&client).unwrap(); + assert_eq!(state.perform_denorm(&client).unwrap(), vec![vec![]]); + // Transactions + state + .process(TableOperation { + id: None, + op: Operation::Insert { + new: Transaction { + id: 1, + account_id: 101, + amount: "1.23", + } + .to_record(), + }, + port: 2, + }) + .unwrap(); + let res = state.perform_denorm(&client).unwrap(); + assert_eq!( + res, + vec![vec![DenormResult { + id: 1, + account_id: 101, + amount: "1.23", + customer_id: Some("1001"), + transaction_limit: None, + phone_number: Some("+1234567"), + }]] + ); state.commit(); state.persist(&client).unwrap(); state .process(TableOperation { id: None, op: Operation::Insert { - new: Record::new(vec![ - Field::UInt(2), - Field::UInt(101), - Field::Decimal(Decimal::new(321, 2)), - ]), + new: Transaction { + id: 2, + account_id: 101, + amount: "3.21", + } + .to_record(), }, port: 2, }) @@ -932,14 +1488,16 @@ mod tests { .process(TableOperation { id: None, op: Operation::Update { - old: dozer_types::types::Record::new(vec![ - Field::String("1001".into()), - Field::String("+1234567".into()), - ]), - new: dozer_types::types::Record::new(vec![ - Field::String("1001".into()), - Field::String("+7654321".into()), - ]), + old: Customer { + id: "1001", + phone_number: "+1234567", + } + .to_record(), + new: Customer { + id: "1001", + phone_number: "+7654321", + } + .to_record(), }, port: 0, }) @@ -948,52 +1506,126 @@ mod tests { .process(TableOperation { id: None, op: Operation::Insert { - new: Record::new(vec![ - Field::UInt(3), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - ]), + new: Transaction { + id: 3, + account_id: 101, + amount: "1.23", + } + .to_record(), }, port: 2, }) .unwrap(); + state + .process(TableOperation { + id: None, + op: Operation::Insert { + new: Customer { + id: "1001", + phone_number: "+2 123", + } + .to_record(), + }, + port: 0, + }) + .unwrap(); state.commit(); let res = state.perform_denorm(&client).unwrap(); assert_eq!( res, - vec![DenormalizedTable { - bin_names: vec![ - CString::new("id").unwrap(), - CString::new("account_id").unwrap(), - CString::new("amount").unwrap(), - CString::new("transaction_limit").unwrap(), - CString::new("phone_number").unwrap(), - ], - records: vec![ - ( - vec![Field::UInt(2)], - vec![ - Field::UInt(2), - Field::UInt(101), - Field::Decimal(Decimal::new(321, 2)), - Field::Null, - Field::String("+1234567".into()) - ] - ), - ( - vec![Field::UInt(3)], - vec![ - Field::UInt(3), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - Field::Null, - Field::String("+7654321".into()) - ] - ) - ], - namespace: CString::new("test").unwrap(), - set: CString::new("transactions_denorm").unwrap() - }] + vec![vec![ + DenormResult { + id: 2, + account_id: 101, + amount: "3.21", + customer_id: Some("1001"), + transaction_limit: None, + phone_number: Some("+1234567") + }, + DenormResult { + id: 3, + account_id: 101, + amount: "1.23", + customer_id: Some("1001"), + transaction_limit: None, + phone_number: Some("+7654321"), + }, + DenormResult { + id: 3, + account_id: 101, + amount: "1.23", + customer_id: Some("1001"), + transaction_limit: None, + phone_number: Some("+2 123"), + }, + ],] ); + state.persist(&client).unwrap(); + } + + fn state() -> DenormalizationState { + let tables = vec![ + ( + dozer_types::serde_yaml::from_str( + r#" + source_table_name: + namespace: test + set_name: customers + primary_key: + - id + aggregate_by_pk: true + "#, + ) + .unwrap(), + Customer::schema(), + ), + ( + dozer_types::serde_yaml::from_str( + r#" + source_table_name: + namespace: test + set_name: accounts + primary_key: + - account_id + denormalize: + - from_namespace: test + from_set: customers + key: customer_id + columns: + - phone_number + "#, + ) + .unwrap(), + AccountOwner::schema(), + ), + ( + dozer_types::serde_yaml::from_str( + r#" + source_table_name: + namespace: test + set_name: transactions + primary_key: + - id + denormalize: + - from_namespace: test + from_set: accounts + key: account_id + columns: + - customer_id + - transaction_limit + write_denormalized_to: + namespace: test + set: transactions_denorm + primary_key: + - id + - customer_id + "#, + ) + .unwrap(), + Transaction::schema(), + ), + ]; + + DenormalizationState::new(&tables).unwrap() } } diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index 762436f8a7..6f367010e1 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -15,8 +15,7 @@ use std::mem::MaybeUninit; use std::ptr::NonNull; use std::sync::Arc; -use crate::aerospike::AerospikeError; -use crate::denorm_dag::RecordBatch; +use crate::aerospike::{AerospikeError, WriteBatch}; mod aerospike; mod denorm_dag; @@ -251,7 +250,7 @@ struct AerospikeMetadata { client: Arc, key: NonNull, record: NonNull, - last_base_transaction: Option, + last_denorm_transaction: Option, last_lookup_transaction: Option, } @@ -301,7 +300,7 @@ impl AerospikeMetadata { client, key, record: NonNull::new(record).unwrap(), - last_base_transaction: base, + last_denorm_transaction: base, last_lookup_transaction: lookup, }) } @@ -316,8 +315,8 @@ impl AerospikeMetadata { Ok(()) } - fn write_base(&mut self, txid: TxnId) -> Result<(), AerospikeSinkError> { - self.last_base_transaction = Some(txid); + fn write_denorm(&mut self, txid: TxnId) -> Result<(), AerospikeSinkError> { + self.last_denorm_transaction = Some(txid); self.write(txid, constants::META_BASE_TXN_ID_BIN)?; Ok(()) } @@ -388,7 +387,7 @@ impl AerospikeSinkWorker { fn commit(&mut self, txid: Option) -> Result<(), AerospikeSinkError> { match ( txid, - self.metadata_writer.last_base_transaction, + self.metadata_writer.last_denorm_transaction, self.metadata_writer.last_lookup_transaction, ) { (Some(current), Some(last_denorm), Some(last_lookup)) => { @@ -438,10 +437,10 @@ impl AerospikeSinkWorker { .map(|table| table.records.len()) .sum(); // Write denormed tables - let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32); - + let mut batch = WriteBatch::new(&self.client, batch_size_est as u32, None); for table in denormalized_tables { - for (key, record) in table.records { + for record in table.records { + let key = table.pk.iter().map(|i| record[*i].clone()).collect_vec(); batch.add_write( &table.namespace, &table.set, @@ -452,11 +451,11 @@ impl AerospikeSinkWorker { } } - unsafe { self.client.write_batch(batch.as_mut_ptr())? }; + batch.execute()?; // Write denormed txid if let Some(txid) = txid { - self.metadata_writer.write_base(txid)?; + self.metadata_writer.write_denorm(txid)?; } self.state.persist(&self.client)?; @@ -674,6 +673,7 @@ mod tests { denormalize: vec![], write_denormalized_to: None, primary_key: vec![], + aggregate_by_pk: false, }], max_batch_duration_ms: None, preferred_batch_size: None, diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index 89aa653017..2a2559b1f2 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -156,6 +156,7 @@ pub struct AerospikeDenormalizations { pub struct AerospikeSet { pub namespace: String, pub set: String, + pub primary_key: Vec, } #[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, PartialEq, Eq)] @@ -166,9 +167,12 @@ pub struct AerospikeSinkTable { pub set_name: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub denormalize: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] pub write_denormalized_to: Option, #[serde(default)] pub primary_key: Vec, + #[serde(default)] + pub aggregate_by_pk: bool, } #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 77be0c2804..a2dcd3c8bb 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -191,12 +191,19 @@ "type": "object", "required": [ "namespace", + "primary_key", "set" ], "properties": { "namespace": { "type": "string" }, + "primary_key": { + "type": "array", + "items": { + "type": "string" + } + }, "set": { "type": "string" } @@ -263,6 +270,10 @@ "source_table_name" ], "properties": { + "aggregate_by_pk": { + "default": false, + "type": "boolean" + }, "denormalize": { "type": "array", "items": {