From 731d608d2490b0344a8845e415443dcfd4457f2c Mon Sep 17 00:00:00 2001 From: Dario Pizzamiglio Date: Thu, 28 Sep 2023 16:35:46 +0800 Subject: [PATCH] build join --- .../src/pipeline_builder/from_builder.rs | 112 ++++++++---- .../src/pipeline_builder/join_builder.rs | 163 ++++++++++++------ dozer-sql/src/table_operator/factory.rs | 13 +- dozer-sql/src/tests/builder_test.rs | 7 +- dozer-sql/src/window/builder.rs | 52 +----- dozer-sql/src/window/factory.rs | 9 +- 6 files changed, 190 insertions(+), 166 deletions(-) diff --git a/dozer-sql/src/pipeline_builder/from_builder.rs b/dozer-sql/src/pipeline_builder/from_builder.rs index 03c00a2df4..7cf1974e69 100644 --- a/dozer-sql/src/pipeline_builder/from_builder.rs +++ b/dozer-sql/src/pipeline_builder/from_builder.rs @@ -37,6 +37,13 @@ pub enum TableOperatorArg { Descriptor(TableOperatorDescriptor), } +#[derive(Clone, Debug)] +pub enum SourceType { + Srouce, + Pipeline, + Processor, +} + pub fn insert_from_to_pipeline( from: &TableWithJoins, pipeline: &mut AppPipeline, @@ -60,13 +67,25 @@ fn insert_table_to_pipeline( let product_processor_name = insert_from_processor_to_pipeline(query_context, relation, pipeline); - insert_table_operator_processor_to_pipeline( + let connection_info = insert_table_operator_processor_to_pipeline( &operator, - product_processor_name, pipeline, pipeline_idx, query_context, - ) + )?; + + pipeline.connect_nodes( + &connection_info.output_node.0, + connection_info.output_node.1, + &product_processor_name.clone(), + DEFAULT_PORT_HANDLE, + ); + + Ok(ConnectionInfo { + processor_name: product_processor_name.clone(), + input_nodes: vec![], + output_node: (product_processor_name, DEFAULT_PORT_HANDLE), + }) } else { insert_table_processor_to_pipeline(relation, pipeline, pipeline_idx, query_context) } @@ -126,18 +145,19 @@ fn insert_table_processor_to_pipeline( }) } -fn insert_table_operator_processor_to_pipeline( +pub fn insert_table_operator_processor_to_pipeline( operator: &TableOperatorDescriptor, - output_node_name: String, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { // the sources names that are used in this pipeline - let mut entry_points = vec![]; + let mut input_nodes = vec![]; if operator.name.to_uppercase() == "TTL" { + let mut entry_points = vec![]; + let processor_name = generate_name("TOP", operator, query_context); if !query_context.processors_list.insert(processor_name.clone()) { return Err(PipelineError::ProcessorAlreadyExists(processor_name)); @@ -154,7 +174,6 @@ fn insert_table_operator_processor_to_pipeline( TableOperatorArg::Descriptor(descriptor) => { let connection_info = insert_table_operator_processor_to_pipeline( descriptor, - processor_name.clone(), pipeline, pipeline_idx, query_context, @@ -163,33 +182,37 @@ fn insert_table_operator_processor_to_pipeline( } }; - if is_an_entry_point(&source_name, query_context, pipeline_idx) { + if is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) { let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle); entry_points.push(entry_point); - query_context.used_sources.push(source_name); - } else { + query_context.used_sources.push(source_name.clone()); + } else if is_a_pipeline_output(&source_name, query_context, pipeline_idx) { input_nodes.push(( - source_name, - processor_name.to_owned(), + source_name.clone(), + processor_name.clone(), DEFAULT_PORT_HANDLE as PortHandle, )); } - pipeline.add_processor(Box::new(processor), &processor_name, entry_points); - - pipeline.connect_nodes( - &processor_name, - DEFAULT_PORT_HANDLE, - &output_node_name, - DEFAULT_PORT_HANDLE, - ); + pipeline.add_processor(Box::new(processor), &processor_name.clone(), entry_points); + + if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) + && !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) + { + pipeline.connect_nodes( + &source_name, + DEFAULT_PORT_HANDLE, + &processor_name.clone(), + DEFAULT_PORT_HANDLE, + ); + } Ok(ConnectionInfo { - processor_name, + processor_name: processor_name.clone(), input_nodes, - output_node: (output_node_name, DEFAULT_PORT_HANDLE), + output_node: (processor_name, DEFAULT_PORT_HANDLE), }) } else { Err(PipelineError::UnsupportedTableOperator( @@ -197,6 +220,8 @@ fn insert_table_operator_processor_to_pipeline( )) } } else if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" { + let mut entry_points = vec![]; + let processor_name = generate_name("WIN", operator, query_context); if !query_context.processors_list.insert(processor_name.clone()) { return Err(PipelineError::ProcessorAlreadyExists(processor_name)); @@ -209,7 +234,6 @@ fn insert_table_operator_processor_to_pipeline( TableOperatorArg::Descriptor(descriptor) => { let connection_info = insert_table_operator_processor_to_pipeline( descriptor, - processor_name.clone(), pipeline, pipeline_idx, query_context, @@ -223,10 +247,10 @@ fn insert_table_operator_processor_to_pipeline( PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle); entry_points.push(entry_point); - query_context.used_sources.push(source_name); - } else { + query_context.used_sources.push(source_name.clone()); + } else if is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) { input_nodes.push(( - source_name, + source_name.clone(), processor_name.to_owned(), DEFAULT_PORT_HANDLE as PortHandle, )); @@ -234,17 +258,21 @@ fn insert_table_operator_processor_to_pipeline( pipeline.add_processor(Box::new(processor), &processor_name, entry_points); - pipeline.connect_nodes( - &processor_name, - DEFAULT_PORT_HANDLE, - &output_node_name, - DEFAULT_PORT_HANDLE, - ); + if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) + && !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) + { + pipeline.connect_nodes( + &source_name, + DEFAULT_PORT_HANDLE, + &processor_name, + DEFAULT_PORT_HANDLE, + ); + } Ok(ConnectionInfo { - processor_name, + processor_name: processor_name.clone(), input_nodes, - output_node: (output_node_name, DEFAULT_PORT_HANDLE), + output_node: (processor_name, DEFAULT_PORT_HANDLE), }) } else { Err(PipelineError::UnsupportedTableOperator( @@ -258,7 +286,7 @@ fn insert_table_operator_processor_to_pipeline( } } -fn generate_name( +pub fn generate_name( prefix: &str, operator: &TableOperatorDescriptor, query_context: &mut QueryContext, @@ -358,6 +386,20 @@ pub fn is_an_entry_point(name: &str, query_context: &QueryContext, pipeline_idx: true } +pub fn is_a_pipeline_output( + name: &str, + query_context: &mut QueryContext, + pipeline_idx: usize, +) -> bool { + if query_context + .pipeline_map + .contains_key(&(pipeline_idx, name.to_owned())) + { + return true; + } + false +} + pub fn string_from_sql_object_name(name: &ObjectName) -> String { name.0 .iter() diff --git a/dozer-sql/src/pipeline_builder/join_builder.rs b/dozer-sql/src/pipeline_builder/join_builder.rs index d26979ac59..a3a5d7ed4b 100644 --- a/dozer-sql/src/pipeline_builder/join_builder.rs +++ b/dozer-sql/src/pipeline_builder/join_builder.rs @@ -1,5 +1,6 @@ use dozer_core::{ app::{AppPipeline, PipelineEntryPoint}, + node::PortHandle, DEFAULT_PORT_HANDLE, }; use dozer_sql_expression::sqlparser::ast::TableWithJoins; @@ -11,12 +12,14 @@ use crate::{ join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT}, table::factory::get_name_or_alias, }, - table_operator::factory::TableOperatorProcessorFactory, + table_operator::factory::{get_source_name, TableOperatorProcessorFactory}, window::factory::WindowProcessorFactory, }; use super::from_builder::{ - is_an_entry_point, is_table_operator, ConnectionInfo, TableOperatorDescriptor, + generate_name, insert_table_operator_processor_to_pipeline, is_a_pipeline_output, + is_an_entry_point, is_table_operator, ConnectionInfo, TableOperatorArg, + TableOperatorDescriptor, }; #[derive(Clone, Debug)] @@ -184,91 +187,137 @@ fn insert_join_source_to_pipeline( } fn insert_table_operator_to_pipeline( - table_operator: &TableOperatorDescriptor, + operator: &TableOperatorDescriptor, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { + let mut entry_points = vec![]; let mut input_nodes = vec![]; - if table_operator.name.to_uppercase() == "TTL" { - let processor_name = format!( - "TOP_{0}_{1}", - table_operator.name, - query_context.get_next_processor_id() - ); + if operator.name.to_uppercase() == "TTL" { + let processor_name = generate_name("TOP", operator, query_context); if !query_context.processors_list.insert(processor_name.clone()) { return Err(PipelineError::ProcessorAlreadyExists(processor_name)); } let processor = TableOperatorProcessorFactory::new( processor_name.clone(), - table_operator.clone(), + operator.clone(), query_context.udfs.to_owned(), ); - let source_name = processor - .get_source_name() - .map_err(PipelineError::TableOperatorError)?; + if let Some(table) = operator.args.get(0) { + let source_name = match table { + TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?, + TableOperatorArg::Descriptor(descriptor) => { + let connection_info = insert_table_operator_processor_to_pipeline( + descriptor, + pipeline, + pipeline_idx, + query_context, + )?; + connection_info.processor_name + } + }; - let mut entry_points = vec![]; + if is_an_entry_point(&source_name, query_context, pipeline_idx) { + let entry_point = + PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle); - if is_an_entry_point(&source_name, query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); + entry_points.push(entry_point); + query_context.used_sources.push(source_name.clone()); + } else if is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) { + input_nodes.push(( + source_name.clone(), + processor_name.to_owned(), + DEFAULT_PORT_HANDLE as PortHandle, + )); + } - entry_points.push(entry_point); - query_context.used_sources.push(source_name); - } else { - input_nodes.push((source_name, processor_name.clone(), DEFAULT_PORT_HANDLE)); - } + pipeline.add_processor(Box::new(processor), &processor_name.clone(), entry_points); - pipeline.add_processor(Box::new(processor), &processor_name, entry_points); + if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) + && !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) + { + pipeline.connect_nodes( + &source_name, + DEFAULT_PORT_HANDLE, + &processor_name.clone(), + DEFAULT_PORT_HANDLE, + ); + } - Ok(ConnectionInfo { - processor_name: processor_name.clone(), - input_nodes, - output_node: (processor_name, DEFAULT_PORT_HANDLE), - }) - } else if table_operator.name.to_uppercase() == "TUMBLE" - || table_operator.name.to_uppercase() == "HOP" - { + Ok(ConnectionInfo { + processor_name: processor_name.clone(), + input_nodes, + output_node: (processor_name, DEFAULT_PORT_HANDLE), + }) + } else { + Err(PipelineError::UnsupportedTableOperator( + operator.name.clone(), + )) + } + } else if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" { // for now, we only support window operators let processor_name = format!("window_{}", query_context.get_next_processor_id()); if !query_context.processors_list.insert(processor_name.clone()) { return Err(PipelineError::ProcessorAlreadyExists(processor_name)); } - let window_processor_factory = - WindowProcessorFactory::new(processor_name.clone(), table_operator.clone()); - let window_source_name = window_processor_factory.get_source_name()?; - let mut window_entry_points = vec![]; + let processor = WindowProcessorFactory::new(processor_name.clone(), operator.clone()); + if let Some(table) = operator.args.get(0) { + let source_name = match table { + TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?, + TableOperatorArg::Descriptor(descriptor) => { + let connection_info = insert_table_operator_processor_to_pipeline( + descriptor, + pipeline, + pipeline_idx, + query_context, + )?; + connection_info.processor_name + } + }; - if is_an_entry_point(&window_source_name, query_context, pipeline_idx) { - let entry_point = - PipelineEntryPoint::new(window_source_name.clone(), DEFAULT_PORT_HANDLE); + if is_an_entry_point(&source_name, query_context, pipeline_idx) { + let entry_point = + PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle); - window_entry_points.push(entry_point); - query_context.used_sources.push(window_source_name); - } else { - input_nodes.push(( - window_source_name, - processor_name.clone(), - DEFAULT_PORT_HANDLE, - )); - } + entry_points.push(entry_point); + query_context.used_sources.push(source_name.clone()); + } else if is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) { + input_nodes.push(( + source_name.clone(), + processor_name.to_owned(), + DEFAULT_PORT_HANDLE as PortHandle, + )); + } - pipeline.add_processor( - Box::new(window_processor_factory), - &processor_name, - window_entry_points, - ); + pipeline.add_processor(Box::new(processor), &processor_name.clone(), entry_points); + + if !is_an_entry_point(&source_name, query_context, pipeline_idx) + && !is_a_pipeline_output(&source_name, query_context, pipeline_idx) + { + pipeline.connect_nodes( + &source_name, + DEFAULT_PORT_HANDLE, + &processor_name.clone(), + DEFAULT_PORT_HANDLE, + ); + } - Ok(ConnectionInfo { - processor_name: processor_name.clone(), - input_nodes, - output_node: (processor_name, DEFAULT_PORT_HANDLE), - }) + Ok(ConnectionInfo { + processor_name: processor_name.clone(), + input_nodes, + output_node: (processor_name, DEFAULT_PORT_HANDLE), + }) + } else { + Err(PipelineError::UnsupportedTableOperator( + operator.name.clone(), + )) + } } else { Err(PipelineError::UnsupportedTableOperator( - table_operator.name.clone(), + operator.name.clone(), )) } } diff --git a/dozer-sql/src/table_operator/factory.rs b/dozer-sql/src/table_operator/factory.rs index 5db138a74b..eb3f9dadf0 100644 --- a/dozer-sql/src/table_operator/factory.rs +++ b/dozer-sql/src/table_operator/factory.rs @@ -24,7 +24,7 @@ use super::{ processor::TableOperatorProcessor, }; -const SOURCE_TABLE_ARGUMENT: usize = 0; +const _SOURCE_TABLE_ARGUMENT: usize = 0; #[derive(Debug)] pub struct TableOperatorProcessorFactory { @@ -43,17 +43,6 @@ impl TableOperatorProcessorFactory { udfs, } } - - pub(crate) fn get_source_name(&self) -> Result { - todo!() - // let source_arg = self.table.args.get(SOURCE_TABLE_ARGUMENT).ok_or( - // TableOperatorError::MissingSourceArgument(self.table.name.to_owned()), - // )?; - - // let source_name = get_source_name(self.table.name.to_owned(), source_arg)?; - - // Ok(source_name) - } } impl ProcessorFactory for TableOperatorProcessorFactory { diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index 920f400fb1..9242e0fdab 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -170,9 +170,10 @@ impl Sink for TestSink { fn process( &mut self, _from_port: PortHandle, - _record_store: &ProcessorRecordStore, - _op: ProcessorOperation, + record_store: &ProcessorRecordStore, + op: ProcessorOperation, ) -> Result<(), BoxedError> { + println!("Sink: {:?}", op.load(record_store).unwrap()); Ok(()) } @@ -193,7 +194,7 @@ impl Sink for TestSink { async fn test_pipeline_builder() { let mut pipeline = AppPipeline::new_with_default_flags(); let context = statement_to_pipeline( - "SELECT Spending \ + "SELECT Spending \ FROM TTL(TUMBLE(users, timestamp, '5 MINUTES'), timestamp, '1 MINUTE') \ WHERE Spending >= 1", &mut pipeline, diff --git a/dozer-sql/src/window/builder.rs b/dozer-sql/src/window/builder.rs index e57215d77c..c079455ad6 100644 --- a/dozer-sql/src/window/builder.rs +++ b/dozer-sql/src/window/builder.rs @@ -14,7 +14,7 @@ use crate::{ use super::operator::WindowType; -const ARG_SOURCE: usize = 0; +const _ARG_SOURCE: usize = 0; const ARG_COLUMN: usize = 1; const ARG_TUMBLE_INTERVAL: usize = 2; @@ -78,30 +78,6 @@ pub(crate) fn window_from_table_operator( } } -pub(crate) fn window_source_name( - operator: &TableOperatorDescriptor, -) -> Result { - todo!() - // if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" { - // let source_arg = operator - // .args - // .get(ARG_SOURCE) - // .ok_or(WindowError::WindowMissingSourceArgument)?; - // let argument = if let TableOperatorArg::Argument(arg) = source_arg { - // arg - // } else { - // return Err(WindowError::WindowInvalidSource("".to_string())); - // }; - // let source_name = get_window_source_name(argument)?; - - // Ok(source_name) - // } else { - // Err(WindowError::UnsupportedRelationFunction( - // operator.name.clone(), - // )) - // } -} - fn get_window_interval(interval_arg: &FunctionArg) -> Result { match interval_arg { FunctionArg::Named { name, arg: _ } => { @@ -148,32 +124,6 @@ fn get_window_hop(hop_arg: &FunctionArg) -> Result { } } -fn get_window_source_name(arg: &FunctionArg) -> Result { - match arg { - FunctionArg::Named { name, arg: _ } => { - let source_name = ExpressionBuilder::normalize_ident(name); - Err(WindowError::WindowInvalidSource(source_name)) - } - FunctionArg::Unnamed(arg_expr) => match arg_expr { - FunctionArgExpr::Expr(expr) => match expr { - Expr::Identifier(ident) => { - let source_name = ExpressionBuilder::normalize_ident(ident); - Ok(source_name) - } - Expr::CompoundIdentifier(ident) => { - let source_name = ExpressionBuilder::fullname_from_ident(ident); - Ok(source_name) - } - _ => Err(WindowError::WindowInvalidColumn(expr.to_string())), - }, - FunctionArgExpr::QualifiedWildcard(_) => { - Err(WindowError::WindowInvalidColumn("*".to_string())) - } - FunctionArgExpr::Wildcard => Err(WindowError::WindowInvalidColumn("*".to_string())), - }, - } -} - fn get_window_column_index( args: &[TableOperatorArg], schema: &Schema, diff --git a/dozer-sql/src/window/factory.rs b/dozer-sql/src/window/factory.rs index 9b8594c2f1..d4e55dd9dc 100644 --- a/dozer-sql/src/window/factory.rs +++ b/dozer-sql/src/window/factory.rs @@ -12,10 +12,7 @@ use crate::{ pipeline_builder::from_builder::TableOperatorDescriptor, }; -use super::{ - builder::{window_from_table_operator, window_source_name}, - processor::WindowProcessor, -}; +use super::{builder::window_from_table_operator, processor::WindowProcessor}; #[derive(Debug)] pub struct WindowProcessorFactory { @@ -27,10 +24,6 @@ impl WindowProcessorFactory { pub fn new(id: String, table: TableOperatorDescriptor) -> Self { Self { id, table } } - - pub(crate) fn get_source_name(&self) -> Result { - window_source_name(&self.table).map_err(PipelineError::WindowError) - } } impl ProcessorFactory for WindowProcessorFactory {