diff --git a/dozer-recordstore/src/in_memory/mod.rs b/dozer-recordstore/src/in_memory/mod.rs index 6eb994bb6f..7289f0fe25 100644 --- a/dozer-recordstore/src/in_memory/mod.rs +++ b/dozer-recordstore/src/in_memory/mod.rs @@ -45,9 +45,16 @@ struct RecordRefInner { unsafe impl Send for RecordRefInner {} unsafe impl Sync for RecordRefInner {} -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct RecordRef(Arc); +impl std::fmt::Debug for RecordRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let values: Vec<_> = self.load().iter().map(|val| val.cloned()).collect(); + write!(f, "RecordRef({:?}), values: {:?}", self.0, &values) + } +} + impl PartialEq for RecordRef { fn eq(&self, other: &Self) -> bool { self.load() == other.load() diff --git a/dozer-sql/src/product/join/processor.rs b/dozer-sql/src/product/join/processor.rs index b6d2b943cb..df1bf9cddb 100644 --- a/dozer-sql/src/product/join/processor.rs +++ b/dozer-sql/src/product/join/processor.rs @@ -177,3 +177,395 @@ impl Processor for ProductProcessor { .map_err(Into::into) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use dozer_core::node::ProcessorFactory; + use dozer_recordstore::{ProcessorRecord, ProcessorRecordStoreDeserializer, RecordRef}; + use dozer_sql_expression::builder::NameOrAlias; + use dozer_sql_expression::sqlparser::ast::JoinOperator as SqlJoinOperator; + use dozer_types::{ + models::app_config::RecordStore, + types::{Field, FieldDefinition, Schema}, + }; + + use crate::product::join::{ + factory::{LEFT_JOIN_PORT, RIGHT_JOIN_PORT}, + operator::JoinType, + }; + use crate::{product::join::factory::JoinProcessorFactory, tests::utils::get_select}; + + use super::*; + + struct TestChannelForwarder { + operations: Vec, + } + + impl ProcessorChannelForwarder for TestChannelForwarder { + fn send(&mut self, op: ProcessorOperation, _port: dozer_core::node::PortHandle) { + self.operations.push(op); + } + } + + fn create_schema(table_name: &'static str) -> Schema { + let mut schema = Schema::new(); + schema + .field( + FieldDefinition { + name: "joinkey".into(), + typ: dozer_types::types::FieldType::UInt, + nullable: false, + source: dozer_types::types::SourceDefinition::Table { + connection: "test".into(), + name: table_name.into(), + }, + }, + true, + ) + .field( + FieldDefinition { + name: "data".into(), + typ: dozer_types::types::FieldType::UInt, + nullable: false, + source: dozer_types::types::SourceDefinition::Table { + connection: "test".into(), + name: table_name.into(), + }, + }, + false, + ); + schema + } + + enum JoinSide { + Left, + Right, + } + + struct Executor { + processor: Box, + forwarder: TestChannelForwarder, + record_store: ProcessorRecordStore, + } + + impl Executor { + fn new(kind: JoinType) -> Self { + let record_store = + ProcessorRecordStoreDeserializer::new(RecordStore::InMemory).unwrap(); + let left_schema = create_schema("left"); + let right_schema = create_schema("right"); + + let stmt = get_select( + "SELECT left.joinkey FROM left INNER JOIN right ON left.joinkey = right.joinkey", + ) + .unwrap(); + let join = &stmt.from[0].joins[0]; + let join_op = join.join_operator.clone(); + let SqlJoinOperator::Inner(constraint) = join_op else { + unreachable!() + }; + let join_op = match kind { + JoinType::Inner => SqlJoinOperator::Inner(constraint), + JoinType::LeftOuter => SqlJoinOperator::LeftOuter(constraint), + JoinType::RightOuter => SqlJoinOperator::RightOuter(constraint), + }; + let factory = JoinProcessorFactory::new( + "test".into(), + Some(NameOrAlias("left".into(), None)), + Some(NameOrAlias("right".into(), None)), + join_op, + false, + ); + + let schemas = [ + (LEFT_JOIN_PORT, left_schema), + (RIGHT_JOIN_PORT, right_schema), + ] + .into_iter() + .collect(); + let processor = factory + .build(schemas, HashMap::new(), &record_store, None) + .unwrap(); + + let record_store = record_store.into_record_store(); + + let forwarder = TestChannelForwarder { operations: vec![] }; + Executor { + processor, + forwarder, + record_store, + } + } + + fn do_op( + &mut self, + operation: ProcessorOperation, + side: JoinSide, + ) -> Vec { + let port = match side { + JoinSide::Left => LEFT_JOIN_PORT, + JoinSide::Right => RIGHT_JOIN_PORT, + }; + self.processor + .process(port, &self.record_store, operation, &mut self.forwarder) + .unwrap(); + let output_ops = self.forwarder.operations.clone(); + self.forwarder.operations.clear(); + output_ops + } + + fn insert( + &mut self, + side: JoinSide, + values: &[Field], + ) -> (RecordRef, Vec) { + let record_ref = self.record_store.create_ref(values).unwrap(); + let processor_record = ProcessorRecord::new(Box::new([record_ref.clone()])); + let op = ProcessorOperation::Insert { + new: processor_record.clone(), + }; + (record_ref, self.do_op(op, side)) + } + + fn update( + &mut self, + side: JoinSide, + old: RecordRef, + new: &[Field], + ) -> (RecordRef, Vec) { + let record_ref = self.record_store.create_ref(new).unwrap(); + let processor_record = ProcessorRecord::new(Box::new([record_ref.clone()])); + let op = ProcessorOperation::Update { + old: ProcessorRecord::new(Box::new([old])), + new: processor_record.clone(), + }; + (record_ref, self.do_op(op, side)) + } + + fn delete(&mut self, side: JoinSide, old: RecordRef) -> Vec { + let op = ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([old])), + }; + self.do_op(op, side) + } + } + + #[test] + fn test_inner_join() { + let mut exec = Executor::new(JoinType::Inner); + + let (left_record, ops) = exec.insert(JoinSide::Left, &[Field::UInt(0), Field::UInt(1)]); + assert_eq!(ops, &[]); + + let (right_record, ops) = exec.insert(JoinSide::Right, &[Field::UInt(0), Field::UInt(2)]); + assert_eq!( + ops, + &[ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([left_record.clone(), right_record.clone()])) + }] + ); + let (new_left_record, ops) = exec.update( + JoinSide::Left, + left_record.clone(), + &[Field::UInt(0), Field::UInt(2)], + ); + assert_eq!( + ops, + &[ + ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + left_record.clone(), + right_record.clone() + ])) + }, + ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + right_record.clone() + ])) + } + ] + ); + + assert_eq!( + exec.delete(JoinSide::Right, right_record.clone()), + &[ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + right_record.clone() + ])) + },] + ); + } + + #[test] + fn test_left_outer_join() { + let mut exec = Executor::new(JoinType::LeftOuter); + + let null_record = exec + .record_store + .create_ref(&[Field::Null, Field::Null]) + .unwrap(); + + let (left_record, ops) = exec.insert(JoinSide::Left, &[Field::UInt(0), Field::UInt(1)]); + assert_eq!( + ops, + &[ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([left_record.clone(), null_record.clone()])) + }] + ); + + let (right_record, ops) = exec.insert(JoinSide::Right, &[Field::UInt(0), Field::UInt(2)]); + assert_eq!( + ops, + &[ + ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([left_record.clone(), null_record.clone()])), + }, + ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([ + left_record.clone(), + right_record.clone() + ])) + } + ] + ); + let (new_left_record, ops) = exec.update( + JoinSide::Left, + left_record.clone(), + &[Field::UInt(0), Field::UInt(2)], + ); + assert_eq!( + ops, + &[ + ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + left_record.clone(), + right_record.clone() + ])) + }, + ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + right_record.clone() + ])) + } + ] + ); + + assert_eq!( + exec.delete(JoinSide::Right, right_record.clone()), + &[ + ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + right_record.clone() + ])) + }, + ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + null_record.clone(), + ])) + }, + ] + ); + let (right_record, _) = exec.insert(JoinSide::Right, &[Field::UInt(0), Field::UInt(2)]); + + assert_eq!( + exec.delete(JoinSide::Left, new_left_record.clone()), + &[ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + right_record.clone() + ])) + },] + ); + } + + #[test] + fn test_right_outer_join() { + let mut exec = Executor::new(JoinType::RightOuter); + + let null_record = exec + .record_store + .create_ref(&[Field::Null, Field::Null]) + .unwrap(); + + let (left_record, ops) = exec.insert(JoinSide::Left, &[Field::UInt(0), Field::UInt(1)]); + assert_eq!(ops, &[]); + + let (right_record, ops) = exec.insert(JoinSide::Right, &[Field::UInt(0), Field::UInt(2)]); + assert_eq!( + ops, + &[ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([left_record.clone(), right_record.clone()])) + }] + ); + let (new_left_record, ops) = exec.update( + JoinSide::Left, + left_record.clone(), + &[Field::UInt(0), Field::UInt(2)], + ); + assert_eq!( + ops, + &[ + ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + left_record.clone(), + right_record.clone() + ])) + }, + ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([ + null_record.clone(), + right_record.clone() + ])) + }, + ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + null_record.clone(), + right_record.clone() + ])) + }, + ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + right_record.clone() + ])) + } + ] + ); + + assert_eq!( + exec.delete(JoinSide::Left, right_record.clone()), + &[ + ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + right_record.clone() + ])) + }, + ProcessorOperation::Insert { + new: ProcessorRecord::new(Box::new([ + null_record.clone(), + right_record.clone(), + ])) + }, + ] + ); + let (new_left_record, _) = exec.insert(JoinSide::Left, &[Field::UInt(0), Field::UInt(2)]); + + assert_eq!( + exec.delete(JoinSide::Right, new_left_record.clone()), + &[ProcessorOperation::Delete { + old: ProcessorRecord::new(Box::new([ + new_left_record.clone(), + right_record.clone() + ])) + },] + ); + } +}