From 2098fd12f7f5494a623f30f2fcc20889527cfffd Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Tue, 27 Feb 2024 23:11:58 +0800 Subject: [PATCH] chore: Remove clones and duplicated code in `dozer_sql::builder` --- dozer-cli/src/pipeline/builder.rs | 21 +- dozer-core/src/app.rs | 41 ++-- dozer-core/src/tests/app.rs | 44 ++-- dozer-sql/src/aggregation/factory.rs | 23 +- .../tests/aggregation_test_planner.rs | 6 +- .../tests/aggregation_tests_utils.rs | 6 +- dozer-sql/src/builder/common.rs | 43 +++- dozer-sql/src/builder/from.rs | 66 +++--- dozer-sql/src/builder/join.rs | 160 ++++++-------- dozer-sql/src/builder/mod.rs | 123 ++++------- dozer-sql/src/builder/table_operator.rs | 206 +++++++----------- dozer-sql/src/planner/projection.rs | 17 +- .../src/planner/tests/projection_tests.rs | 6 +- dozer-sql/src/planner/tests/schema_tests.rs | 12 +- dozer-sql/src/product/table/factory.rs | 55 +---- dozer-sql/src/tests/builder_test.rs | 7 +- dozer-tests/src/sql_tests/helper/pipeline.rs | 7 +- 17 files changed, 375 insertions(+), 468 deletions(-) diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index 5bdf64beac..974c94e123 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -365,23 +365,24 @@ fn add_sink_to_pipeline( id: &str, table_infos: Vec<(&OutputTableInfo, PortHandle)>, ) { - let mut connections = vec![]; - let mut entry_points = vec![]; + pipeline.add_sink(sink, id.to_string()); for (table_info, port) in table_infos { match table_info { OutputTableInfo::Original(table_info) => { - entry_points.push(PipelineEntryPoint::new(table_info.table_name.clone(), port)) + pipeline.add_entry_point( + id.to_string(), + PipelineEntryPoint::new(table_info.table_name.clone(), port), + ); } OutputTableInfo::Transformed(table_info) => { - connections.push((table_info, port)); + pipeline.connect_nodes( + table_info.node.clone(), + table_info.port, + id.to_string(), + port, + ); } } } - - pipeline.add_sink(sink, id, entry_points); - - for (table_info, port) in connections { - pipeline.connect_nodes(&table_info.node, table_info.port, id, port); - } } diff --git a/dozer-core/src/app.rs b/dozer-core/src/app.rs index 907682c0b6..29597fca7a 100644 --- a/dozer-core/src/app.rs +++ b/dozer-core/src/app.rs @@ -33,44 +33,33 @@ pub struct AppPipeline { } impl AppPipeline { - pub fn add_processor( - &mut self, - proc: Box, - id: &str, - entry_points: Vec, - ) { - let handle = NodeHandle::new(None, id.to_string()); - self.processors.push((handle.clone(), proc)); + fn create_handle(id: String) -> NodeHandle { + NodeHandle::new(None, id) + } - for entry_point in entry_points { - self.entry_points.push((handle.clone(), entry_point)); - } + pub fn add_processor(&mut self, proc: Box, id: String) { + self.processors.push((Self::create_handle(id), proc)); } - pub fn add_sink( - &mut self, - sink: Box, - id: &str, - entry_points: Vec, - ) { - let handle = NodeHandle::new(None, id.to_string()); - self.sinks.push((handle.clone(), sink)); + pub fn add_sink(&mut self, sink: Box, id: String) { + self.sinks.push((Self::create_handle(id), sink)); + } - for entry_point in entry_points { - self.entry_points.push((handle.clone(), entry_point)); - } + pub fn add_entry_point(&mut self, id: String, entry_point: PipelineEntryPoint) { + self.entry_points + .push((Self::create_handle(id), entry_point)); } pub fn connect_nodes( &mut self, - from: &str, + from: String, from_port: PortHandle, - to: &str, + to: String, to_port: PortHandle, ) { let edge = Edge::new( - Endpoint::new(NodeHandle::new(None, from.to_string()), from_port), - Endpoint::new(NodeHandle::new(None, to.to_string()), to_port), + Endpoint::new(NodeHandle::new(None, from), from_port), + Endpoint::new(NodeHandle::new(None, to), to_port), ); self.edges.push(edge); } diff --git a/dozer-core/src/tests/app.rs b/dozer-core/src/tests/app.rs index 9c8f917c77..833a6a82a6 100644 --- a/dozer-core/src/tests/app.rs +++ b/dozer-core/src/tests/app.rs @@ -171,46 +171,46 @@ fn test_app_dag() { let mut app = App::new(asm); let mut p1 = AppPipeline::new_with_default_flags(); - p1.add_processor( - Box::new(NoopJoinProcessorFactory {}), - "join", - vec![ - PipelineEntryPoint::new("users_postgres".to_string(), NOOP_JOIN_LEFT_INPUT_PORT), - PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT), - ], + p1.add_processor(Box::new(NoopJoinProcessorFactory {}), "join".to_string()); + p1.add_entry_point( + "join".to_string(), + PipelineEntryPoint::new("users_postgres".to_string(), NOOP_JOIN_LEFT_INPUT_PORT), + ); + p1.add_entry_point( + "join".to_string(), + PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT), ); p1.add_sink( Box::new(CountingSinkFactory::new(20_000, latch.clone())), - "sink", - vec![], + "sink".to_string(), ); p1.connect_nodes( - "join", + "join".to_string(), DEFAULT_PORT_HANDLE, - "sink", + "sink".to_string(), COUNTING_SINK_INPUT_PORT, ); app.add_pipeline(p1); let mut p2 = AppPipeline::new_with_default_flags(); - p2.add_processor( - Box::new(NoopJoinProcessorFactory {}), - "join", - vec![ - PipelineEntryPoint::new("users_snowflake".to_string(), NOOP_JOIN_LEFT_INPUT_PORT), - PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT), - ], + p2.add_processor(Box::new(NoopJoinProcessorFactory {}), "join".to_string()); + p2.add_entry_point( + "join".to_string(), + PipelineEntryPoint::new("users_snowflake".to_string(), NOOP_JOIN_LEFT_INPUT_PORT), + ); + p2.add_entry_point( + "join".to_string(), + PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT), ); p2.add_sink( Box::new(CountingSinkFactory::new(20_000, latch)), - "sink", - vec![], + "sink".to_string(), ); p2.connect_nodes( - "join", + "join".to_string(), DEFAULT_PORT_HANDLE, - "sink", + "sink".to_string(), COUNTING_SINK_INPUT_PORT, ); diff --git a/dozer-sql/src/aggregation/factory.rs b/dozer-sql/src/aggregation/factory.rs index 37ea84d17a..b70abf655f 100644 --- a/dozer-sql/src/aggregation/factory.rs +++ b/dozer-sql/src/aggregation/factory.rs @@ -5,7 +5,7 @@ use dozer_core::{ node::{PortHandle, Processor, ProcessorFactory}, DEFAULT_PORT_HANDLE, }; -use dozer_sql_expression::sqlparser::ast::Select; +use dozer_sql_expression::sqlparser::ast::{Expr, SelectItem}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::udf_config::UdfConfig; use dozer_types::parking_lot::Mutex; @@ -18,8 +18,9 @@ use tokio::runtime::Runtime; #[derive(Debug)] pub struct AggregationProcessorFactory { id: String, - projection: Select, - _stateful: bool, + projection: Vec, + group_by: Vec, + having: Option, enable_probabilistic_optimizations: bool, udfs: Vec, runtime: Arc, @@ -31,8 +32,9 @@ pub struct AggregationProcessorFactory { impl AggregationProcessorFactory { pub fn new( id: String, - projection: Select, - stateful: bool, + projection: Vec, + group_by: Vec, + having: Option, enable_probabilistic_optimizations: bool, udfs: Vec, runtime: Arc, @@ -40,7 +42,8 @@ impl AggregationProcessorFactory { Self { id, projection, - _stateful: stateful, + group_by, + having, enable_probabilistic_optimizations, udfs, runtime, @@ -51,7 +54,13 @@ impl AggregationProcessorFactory { async fn get_planner(&self, input_schema: Schema) -> Result { let mut projection_planner = CommonPlanner::new(input_schema, self.udfs.as_slice(), self.runtime.clone()); - projection_planner.plan(self.projection.clone()).await?; + projection_planner + .plan( + self.projection.clone(), + self.group_by.clone(), + self.having.clone(), + ) + .await?; Ok(projection_planner) } } diff --git a/dozer-sql/src/aggregation/tests/aggregation_test_planner.rs b/dozer-sql/src/aggregation/tests/aggregation_test_planner.rs index beb343e302..17d80621c4 100644 --- a/dozer-sql/src/aggregation/tests/aggregation_test_planner.rs +++ b/dozer-sql/src/aggregation/tests/aggregation_test_planner.rs @@ -76,7 +76,11 @@ fn test_planner_with_aggregator() { let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); let mut processor = AggregationProcessor::new( diff --git a/dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs b/dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs index 0b1e34bace..aec3fd1fd4 100644 --- a/dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs +++ b/dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs @@ -28,7 +28,11 @@ pub(crate) fn init_processor( let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); let processor = AggregationProcessor::new( diff --git a/dozer-sql/src/builder/common.rs b/dozer-sql/src/builder/common.rs index fd7026615b..0d24256a15 100644 --- a/dozer-sql/src/builder/common.rs +++ b/dozer-sql/src/builder/common.rs @@ -1,4 +1,9 @@ -use dozer_sql_expression::{builder::ExpressionBuilder, sqlparser::ast::ObjectName}; +use dozer_sql_expression::{ + builder::{ExpressionBuilder, NameOrAlias}, + sqlparser::ast::{ObjectName, TableFactor}, +}; + +use crate::errors::{PipelineError, ProductError}; use super::QueryContext; @@ -29,6 +34,42 @@ pub fn is_a_pipeline_output( false } +pub fn get_name_or_alias(relation: &TableFactor) -> Result { + match relation { + TableFactor::Table { name, alias, .. } => { + let table_name = string_from_sql_object_name(name); + if let Some(table_alias) = alias { + let alias = table_alias.name.value.clone(); + return Ok(NameOrAlias(table_name, Some(alias))); + } + Ok(NameOrAlias(table_name, None)) + } + TableFactor::Derived { alias, .. } => { + if let Some(table_alias) = alias { + let alias = table_alias.name.value.clone(); + return Ok(NameOrAlias("dozer_derived".to_string(), Some(alias))); + } + Ok(NameOrAlias("dozer_derived".to_string(), None)) + } + TableFactor::TableFunction { .. } => Err(PipelineError::ProductError( + ProductError::UnsupportedTableFunction, + )), + TableFactor::UNNEST { .. } => { + Err(PipelineError::ProductError(ProductError::UnsupportedUnnest)) + } + TableFactor::NestedJoin { alias, .. } => { + if let Some(table_alias) = alias { + let alias = table_alias.name.value.clone(); + return Ok(NameOrAlias("dozer_nested".to_string(), Some(alias))); + } + Ok(NameOrAlias("dozer_nested".to_string(), None)) + } + TableFactor::Pivot { .. } => { + Err(PipelineError::ProductError(ProductError::UnsupportedPivot)) + } + } +} + pub fn string_from_sql_object_name(name: &ObjectName) -> String { name.0 .iter() diff --git a/dozer-sql/src/builder/from.rs b/dozer-sql/src/builder/from.rs index 6ac8c9a822..6fa98bea03 100644 --- a/dozer-sql/src/builder/from.rs +++ b/dozer-sql/src/builder/from.rs @@ -11,46 +11,46 @@ use crate::{ }; use super::{ - common::is_an_entry_point, + common::{get_name_or_alias, is_an_entry_point}, join::insert_join_to_pipeline, table_operator::{insert_table_operator_processor_to_pipeline, is_table_operator}, ConnectionInfo, }; pub fn insert_from_to_pipeline( - from: &TableWithJoins, + from: TableWithJoins, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { if from.joins.is_empty() { - insert_table_to_pipeline(&from.relation, pipeline, pipeline_idx, query_context) + insert_table_to_pipeline(from.relation, pipeline, pipeline_idx, query_context) } else { insert_join_to_pipeline(from, pipeline, pipeline_idx, query_context) } } fn insert_table_to_pipeline( - relation: &TableFactor, + relation: TableFactor, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { - if let Some(operator) = is_table_operator(relation)? { + if let Some(operator) = is_table_operator(&relation)? { let product_processor_name = - insert_from_processor_to_pipeline(query_context, relation, pipeline); + insert_from_processor_to_pipeline(query_context, relation, pipeline)?; let connection_info = insert_table_operator_processor_to_pipeline( - &operator, + operator, pipeline, pipeline_idx, query_context, )?; pipeline.connect_nodes( - &connection_info.output_node.0, + connection_info.output_node.0, connection_info.output_node.1, - &product_processor_name.clone(), + product_processor_name.clone(), DEFAULT_PORT_HANDLE, ); @@ -64,51 +64,41 @@ fn insert_table_to_pipeline( } fn insert_table_processor_to_pipeline( - relation: &TableFactor, + relation: TableFactor, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { - // let relation_name_or_alias = get_name_or_alias(relation)?; - let relation_name_or_alias = get_from_source(relation, pipeline, query_context, pipeline_idx)?; + let relation_name_or_alias = get_name_or_alias(&relation)?; + let product_input_name = get_from_source(relation, pipeline, query_context, pipeline_idx)?.0; let processor_name = format!( "from:{}--{}", - relation_name_or_alias.0, + product_input_name, query_context.get_next_processor_id() ); if !query_context.processors_list.insert(processor_name.clone()) { return Err(PipelineError::ProcessorAlreadyExists(processor_name)); } let product_processor_factory = - TableProcessorFactory::new(processor_name.clone(), relation.to_owned()); - - let product_input_name = relation_name_or_alias.0; - - let mut input_nodes = vec![]; - let mut product_entry_points = vec![]; + TableProcessorFactory::new(processor_name.clone(), relation_name_or_alias.clone()); + pipeline.add_processor(Box::new(product_processor_factory), processor_name.clone()); // is a node that is an entry point to the pipeline - if is_an_entry_point(&product_input_name, query_context, pipeline_idx) { + let input_nodes = if is_an_entry_point(&product_input_name, query_context, pipeline_idx) { let entry_point = PipelineEntryPoint::new(product_input_name.clone(), DEFAULT_PORT_HANDLE); - - product_entry_points.push(entry_point); + pipeline.add_entry_point(processor_name.clone(), entry_point); query_context.used_sources.push(product_input_name); + vec![] } // is a node that is connected to another pipeline else { - input_nodes.push(( + vec![( product_input_name, processor_name.clone(), DEFAULT_PORT_HANDLE, - )); - } - - pipeline.add_processor( - Box::new(product_processor_factory), - &processor_name, - product_entry_points, - ); + )] + }; Ok(ConnectionInfo { input_nodes, @@ -118,13 +108,15 @@ fn insert_table_processor_to_pipeline( fn insert_from_processor_to_pipeline( query_context: &mut QueryContext, - relation: &TableFactor, + relation: TableFactor, pipeline: &mut AppPipeline, -) -> String { +) -> Result { let product_processor_name = format!("from--{}", query_context.get_next_processor_id()); - let product_processor = - TableProcessorFactory::new(product_processor_name.clone(), relation.clone()); + let product_processor = TableProcessorFactory::new( + product_processor_name.clone(), + get_name_or_alias(&relation)?, + ); - pipeline.add_processor(Box::new(product_processor), &product_processor_name, vec![]); - product_processor_name + pipeline.add_processor(Box::new(product_processor), product_processor_name.clone()); + Ok(product_processor_name) } diff --git a/dozer-sql/src/builder/join.rs b/dozer-sql/src/builder/join.rs index 1c508d4004..653010d927 100644 --- a/dozer-sql/src/builder/join.rs +++ b/dozer-sql/src/builder/join.rs @@ -1,20 +1,18 @@ use dozer_core::{ app::{AppPipeline, PipelineEntryPoint}, + node::PortHandle, DEFAULT_PORT_HANDLE, }; -use dozer_sql_expression::sqlparser::ast::TableWithJoins; +use dozer_sql_expression::sqlparser::ast::{TableFactor, TableWithJoins}; use crate::{ builder::{get_from_source, QueryContext}, errors::PipelineError, - product::{ - join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT}, - table::factory::get_name_or_alias, - }, + product::join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT}, }; use super::{ - common::is_an_entry_point, + common::{get_name_or_alias, is_an_entry_point}, table_operator::{insert_table_operator_processor_to_pipeline, is_table_operator}, ConnectionInfo, }; @@ -27,21 +25,21 @@ enum JoinSource { } pub fn insert_join_to_pipeline( - from: &TableWithJoins, + from: TableWithJoins, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { let mut input_nodes = vec![]; - let left_table = &from.relation; - let mut left_name_or_alias = Some(get_name_or_alias(left_table)?); + let left_table = from.relation; + let mut left_name_or_alias = Some(get_name_or_alias(&left_table)?); let mut left_join_source = - insert_join_source_to_pipeline(left_table.clone(), pipeline, pipeline_idx, query_context)?; + insert_join_source_to_pipeline(left_table, pipeline, pipeline_idx, query_context)?; - for join in &from.joins { - let right_table = &join.relation; - let right_name_or_alias = Some(get_name_or_alias(right_table)?); + for join in from.joins { + let right_table = join.relation; + let right_name_or_alias = Some(get_name_or_alias(&right_table)?); let right_join_source = insert_join_source_to_pipeline( right_table.clone(), pipeline, @@ -58,84 +56,36 @@ pub fn insert_join_to_pipeline( } let join_processor_factory = JoinProcessorFactory::new( join_processor_name.clone(), - left_name_or_alias.clone(), + left_name_or_alias, right_name_or_alias, - join.join_operator.clone(), + join.join_operator, pipeline .flags() .enable_probabilistic_optimizations .in_joins .unwrap_or(false), ); - - let mut pipeline_entry_points = vec![]; - if let JoinSource::Table(ref source_table) = left_join_source { - if is_an_entry_point(source_table, query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_table.clone(), LEFT_JOIN_PORT); - - pipeline_entry_points.push(entry_point); - query_context.used_sources.push(source_table.to_string()); - } else { - input_nodes.push(( - source_table.to_string(), - join_processor_name.clone(), - LEFT_JOIN_PORT, - )); - } - } - - if let JoinSource::Table(ref source_table) = right_join_source.clone() { - if is_an_entry_point(source_table, query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_table.clone(), RIGHT_JOIN_PORT); - - pipeline_entry_points.push(entry_point); - query_context.used_sources.push(source_table.to_string()); - } else { - input_nodes.push(( - source_table.to_string(), - join_processor_name.clone(), - RIGHT_JOIN_PORT, - )); - } - } - pipeline.add_processor( Box::new(join_processor_factory), - &join_processor_name, - pipeline_entry_points, + join_processor_name.clone(), ); - match left_join_source { - JoinSource::Table(_) => {} - JoinSource::Operator(ref connection_info) => pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &join_processor_name, - LEFT_JOIN_PORT, - ), - JoinSource::Join(ref connection_info) => pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &join_processor_name, - LEFT_JOIN_PORT, - ), - } - - match right_join_source { - JoinSource::Table(_) => {} - JoinSource::Operator(connection_info) => pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &join_processor_name, - RIGHT_JOIN_PORT, - ), - JoinSource::Join(connection_info) => pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &join_processor_name, - RIGHT_JOIN_PORT, - ), - } + input_nodes.extend(modify_pipeline_graph( + left_join_source, + join_processor_name.clone(), + LEFT_JOIN_PORT, + pipeline, + pipeline_idx, + query_context, + )); + input_nodes.extend(modify_pipeline_graph( + right_join_source, + join_processor_name.clone(), + RIGHT_JOIN_PORT, + pipeline, + pipeline_idx, + query_context, + )); // TODO: refactor join source name and aliasing logic left_name_or_alias = None; @@ -146,10 +96,7 @@ pub fn insert_join_to_pipeline( } match left_join_source { - JoinSource::Table(_) => Err(PipelineError::InvalidJoin( - "No JOIN operator found".to_string(), - )), - JoinSource::Operator(_) => Err(PipelineError::InvalidJoin( + JoinSource::Table(_) | JoinSource::Operator(_) => Err(PipelineError::InvalidJoin( "No JOIN operator found".to_string(), )), JoinSource::Join(connection_info) => Ok(connection_info), @@ -158,14 +105,14 @@ pub fn insert_join_to_pipeline( // TODO: refactor this fn insert_join_source_to_pipeline( - source: dozer_sql_expression::sqlparser::ast::TableFactor, + source: TableFactor, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { let join_source = if let Some(table_operator) = is_table_operator(&source)? { let connection_info = insert_table_operator_processor_to_pipeline( - &table_operator, + table_operator, pipeline, pipeline_idx, query_context, @@ -176,15 +123,44 @@ fn insert_join_source_to_pipeline( "Nested JOINs are not supported".to_string(), )); } else { - let name_or_alias = get_from_source(&source, pipeline, query_context, pipeline_idx)?; + let name_or_alias = get_from_source(source, pipeline, query_context, pipeline_idx)?; JoinSource::Table(name_or_alias.0) }; Ok(join_source) } -fn is_nested_join(left_table: &dozer_sql_expression::sqlparser::ast::TableFactor) -> bool { - matches!( - left_table, - dozer_sql_expression::sqlparser::ast::TableFactor::NestedJoin { .. } - ) +fn is_nested_join(left_table: &TableFactor) -> bool { + matches!(left_table, TableFactor::NestedJoin { .. }) +} + +/// Returns the input node if there is one +fn modify_pipeline_graph( + source: JoinSource, + id: String, + port: PortHandle, + pipeline: &mut AppPipeline, + pipeline_idx: usize, + query_context: &mut QueryContext, +) -> Option<(String, String, u16)> { + match source { + JoinSource::Table(source_table) => { + if is_an_entry_point(&source_table, query_context, pipeline_idx) { + let entry_point = PipelineEntryPoint::new(source_table.clone(), port); + pipeline.add_entry_point(id, entry_point); + query_context.used_sources.push(source_table); + None + } else { + Some((source_table, id, port)) + } + } + JoinSource::Operator(connection_info) | JoinSource::Join(connection_info) => { + pipeline.connect_nodes( + connection_info.output_node.0, + connection_info.output_node.1, + id, + port, + ); + None + } + } } diff --git a/dozer-sql/src/builder/mod.rs b/dozer-sql/src/builder/mod.rs index e5f0a17cea..106d71f121 100644 --- a/dozer-sql/src/builder/mod.rs +++ b/dozer-sql/src/builder/mod.rs @@ -90,18 +90,17 @@ pub fn statement_to_pipeline( .map_err(|err| PipelineError::InternalError(Box::new(err)))?; let query_name = NameOrAlias(format!("query_{}", ctx.get_next_processor_id()), None); - for (idx, statement) in ast.iter().enumerate() { + for (idx, statement) in ast.into_iter().enumerate() { match statement { Statement::Query(query) => { query_to_pipeline( - &TableInfo { + TableInfo { name: query_name.clone(), override_name: override_name.clone(), }, - query, + *query, pipeline, &mut ctx, - false, idx, is_top_select, )?; @@ -123,11 +122,10 @@ struct TableInfo { } fn query_to_pipeline( - table_info: &TableInfo, - query: &Query, + table_info: TableInfo, + query: Query, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, - stateful: bool, pipeline_idx: usize, is_top_select: bool, ) -> Result<(), PipelineError> { @@ -145,14 +143,14 @@ fn query_to_pipeline( } // Attach the first pipeline if there is with clause - if let Some(with) = &query.with { + if let Some(with) = query.with { if with.recursive { return Err(PipelineError::UnsupportedSqlError( UnsupportedSqlError::Recursive, )); } - for table in &with.cte_tables { + for table in with.cte_tables { if table.from.is_some() { return Err(PipelineError::UnsupportedSqlError( UnsupportedSqlError::CteFromError, @@ -168,28 +166,26 @@ fn query_to_pipeline( ))); } query_to_pipeline( - &TableInfo { + TableInfo { name: NameOrAlias(table_name.clone(), Some(table_name)), override_name: None, }, - &table.query, + *table.query, pipeline, query_ctx, - true, pipeline_idx, false, //Inside a with clause, so not top select )?; } }; - match *query.body.clone() { + match *query.body { SetExpr::Select(select) => { select_to_pipeline( table_info, *select, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?; @@ -198,14 +194,13 @@ fn query_to_pipeline( let query_name = format!("subquery_{}", query_ctx.get_next_processor_id()); let mut ctx = QueryContext::new(query_ctx.udfs.clone(), query_ctx.runtime.clone()); query_to_pipeline( - &TableInfo { + TableInfo { name: NameOrAlias(query_name, None), override_name: None, }, - &query, + *query, pipeline, &mut ctx, - stateful, pipeline_idx, false, //Inside a subquery, so not top select )? @@ -224,7 +219,6 @@ fn query_to_pipeline( set_quantifier, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?; @@ -241,23 +235,21 @@ fn query_to_pipeline( } fn select_to_pipeline( - table_info: &TableInfo, + table_info: TableInfo, select: Select, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, - stateful: bool, pipeline_idx: usize, is_top_select: bool, ) -> Result { // FROM clause - if select.from.len() != 1 { + let Some(from) = select.from.into_iter().next() else { return Err(PipelineError::UnsupportedSqlError( UnsupportedSqlError::FromCommaSyntax, )); - } + }; - let connection_info = - from::insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?; + let connection_info = from::insert_from_to_pipeline(from, pipeline, pipeline_idx, query_ctx)?; let input_nodes = connection_info.input_nodes; let output_node = connection_info.output_node; @@ -267,16 +259,16 @@ fn select_to_pipeline( let gen_selection_name = format!("select--{}", query_ctx.get_next_processor_id()); let (gen_product_name, product_output_port) = output_node; - for (source_name, processor_name, processor_port) in input_nodes.iter() { + for (source_name, processor_name, processor_port) in input_nodes { if let Some(table_info) = query_ctx .pipeline_map .get(&(pipeline_idx, source_name.clone())) { pipeline.connect_nodes( - &table_info.node, + table_info.node.clone(), table_info.port, processor_name, - *processor_port as PortHandle, + processor_port, ); // If not present in pipeline_map, insert into used_sources as this is coming from source } else { @@ -286,8 +278,9 @@ fn select_to_pipeline( let aggregation = AggregationProcessorFactory::new( gen_agg_name.clone(), - select.clone(), - stateful, + select.projection, + select.group_by, + select.having, pipeline .flags() .enable_probabilistic_optimizations @@ -297,37 +290,37 @@ fn select_to_pipeline( query_ctx.runtime.clone(), ); - pipeline.add_processor(Box::new(aggregation), &gen_agg_name, vec![]); + pipeline.add_processor(Box::new(aggregation), gen_agg_name.clone()); // Where clause if let Some(selection) = select.selection { let selection = SelectionProcessorFactory::new( - gen_selection_name.to_owned(), + gen_selection_name.clone(), selection, query_ctx.udfs.clone(), query_ctx.runtime.clone(), ); - pipeline.add_processor(Box::new(selection), &gen_selection_name, vec![]); + pipeline.add_processor(Box::new(selection), gen_selection_name.clone()); pipeline.connect_nodes( - &gen_product_name, + gen_product_name, product_output_port, - &gen_selection_name, + gen_selection_name.clone(), DEFAULT_PORT_HANDLE, ); pipeline.connect_nodes( - &gen_selection_name, + gen_selection_name, DEFAULT_PORT_HANDLE, - &gen_agg_name, + gen_agg_name.clone(), DEFAULT_PORT_HANDLE, ); } else { pipeline.connect_nodes( - &gen_product_name, + gen_product_name, product_output_port, - &gen_agg_name, + gen_agg_name.clone(), DEFAULT_PORT_HANDLE, ); } @@ -369,13 +362,12 @@ fn select_to_pipeline( #[allow(clippy::too_many_arguments)] fn set_to_pipeline( - table_info: &TableInfo, + table_info: TableInfo, left_select: Box, right_select: Box, set_quantifier: SetQuantifier, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, - stateful: bool, pipeline_idx: usize, is_top_select: bool, ) -> Result { @@ -392,11 +384,10 @@ fn set_to_pipeline( let _left_pipeline_name = match *left_select { SetExpr::Select(select) => select_to_pipeline( - &left_table_info, + left_table_info, *select, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?, @@ -406,13 +397,12 @@ fn set_to_pipeline( left, right, } => set_to_pipeline( - &left_table_info, + left_table_info, left, right, set_quantifier, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?, @@ -425,11 +415,10 @@ fn set_to_pipeline( let _right_pipeline_name = match *right_select { SetExpr::Select(select) => select_to_pipeline( - &right_table_info, + right_table_info, *select, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?, @@ -439,13 +428,12 @@ fn set_to_pipeline( left, right, } => set_to_pipeline( - &right_table_info, + right_table_info, left, right, set_quantifier, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?, @@ -458,29 +446,15 @@ fn set_to_pipeline( let mut gen_set_name = format!("set_{}", query_ctx.get_next_processor_id()); - let left_pipeline_output_node = match query_ctx + let left_pipeline_output_node = query_ctx .pipeline_map .get(&(pipeline_idx, gen_left_set_name)) - { - Some(pipeline) => pipeline, - None => { - return Err(PipelineError::InvalidQuery( - "Invalid UNION left Query".to_string(), - )) - } - }; + .ok_or_else(|| PipelineError::InvalidQuery("Invalid UNION left Query".to_string()))?; - let right_pipeline_output_node = match query_ctx + let right_pipeline_output_node = query_ctx .pipeline_map .get(&(pipeline_idx, gen_right_set_name)) - { - Some(pipeline) => pipeline, - None => { - return Err(PipelineError::InvalidQuery( - "Invalid UNION Right Query".to_string(), - )) - } - }; + .ok_or_else(|| PipelineError::InvalidQuery("Invalid UNION right Query".to_string()))?; if table_info.override_name.is_some() { gen_set_name = table_info.override_name.to_owned().unwrap(); @@ -496,19 +470,19 @@ fn set_to_pipeline( .unwrap_or(false), ); - pipeline.add_processor(Box::new(set_proc_fac), &gen_set_name, vec![]); + pipeline.add_processor(Box::new(set_proc_fac), gen_set_name.clone()); pipeline.connect_nodes( - &left_pipeline_output_node.node, + left_pipeline_output_node.node.clone(), left_pipeline_output_node.port, - &gen_set_name, + gen_set_name.clone(), 0 as PortHandle, ); pipeline.connect_nodes( - &right_pipeline_output_node.node, + right_pipeline_output_node.node.clone(), right_pipeline_output_node.port, - &gen_set_name, + gen_set_name.clone(), 1 as PortHandle, ); @@ -528,7 +502,7 @@ fn set_to_pipeline( } fn get_from_source( - relation: &TableFactor, + relation: TableFactor, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, pipeline_idx: usize, @@ -559,14 +533,13 @@ fn get_from_source( let is_top_select = false; //inside FROM clause, so not top select let name_or = NameOrAlias(name, alias_name); query_to_pipeline( - &TableInfo { + TableInfo { name: name_or.clone(), override_name: None, }, - subquery, + *subquery, pipeline, query_ctx, - false, pipeline_idx, is_top_select, )?; diff --git a/dozer-sql/src/builder/table_operator.rs b/dozer-sql/src/builder/table_operator.rs index 8c6b9d97a5..3f1001e6c6 100644 --- a/dozer-sql/src/builder/table_operator.rs +++ b/dozer-sql/src/builder/table_operator.rs @@ -1,5 +1,6 @@ use dozer_core::{ app::{AppPipeline, PipelineEntryPoint}, + node::ProcessorFactory, DEFAULT_PORT_HANDLE, }; use dozer_sql_expression::sqlparser::ast::{ @@ -37,7 +38,7 @@ pub fn is_table_operator( if args.is_none() { return Ok(None); } - let operator = get_table_operator_descriptor(name, args)?; + let operator = get_table_operator_descriptor(name, args.as_deref())?; Ok(operator) } @@ -51,7 +52,7 @@ pub fn is_table_operator( fn get_table_operator_descriptor( name: &ObjectName, - args: &Option>, + args: Option<&[FunctionArg]>, ) -> Result, PipelineError> { let mut operator_args = vec![]; @@ -75,10 +76,8 @@ fn get_table_operator_arg(arg: &FunctionArg) -> Result match arg_expr { FunctionArgExpr::Expr(Expr::Function(function)) => { - let operator_descriptor = get_table_operator_descriptor( - &function.clone().name, - &Some(function.clone().args), - )?; + let operator_descriptor = + get_table_operator_descriptor(&function.name, Some(&function.args))?; if let Some(descriptor) = operator_descriptor { Ok(TableOperatorArg::Descriptor(descriptor)) } else { @@ -93,141 +92,88 @@ fn get_table_operator_arg(arg: &FunctionArg) -> Result Result { - // the sources names that are used in this pipeline - - let mut input_nodes = vec![]; - - if operator.name.to_uppercase() == "TTL" { - let mut entry_points = vec![]; - - let processor_name = generate_name("TOP", operator, query_context); - if !query_context.processors_list.insert(processor_name.clone()) { - return Err(PipelineError::ProcessorAlreadyExists(processor_name)); - } - let processor = TableOperatorProcessorFactory::new( - processor_name.clone(), - operator.clone(), - query_context.udfs.to_owned(), - query_context.runtime.clone(), - ); - - if let Some(table) = operator.args.first() { - let source_name = match table { - TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?, - TableOperatorArg::Descriptor(descriptor) => { - let connection_info = insert_table_operator_processor_to_pipeline( - descriptor, - pipeline, - pipeline_idx, - query_context, - )?; - connection_info.output_node.0 - } - }; - - if is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); - - entry_points.push(entry_point); - query_context.used_sources.push(source_name.clone()); - } else if is_a_pipeline_output(&source_name, query_context, pipeline_idx) { - input_nodes.push(( - source_name.clone(), - processor_name.clone(), - DEFAULT_PORT_HANDLE, - )); - } - - pipeline.add_processor(Box::new(processor), &processor_name.clone(), entry_points); - - if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) - && !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) - { - pipeline.connect_nodes( - &source_name, - DEFAULT_PORT_HANDLE, - &processor_name.clone(), - DEFAULT_PORT_HANDLE, - ); - } - - Ok(ConnectionInfo { - input_nodes, - output_node: (processor_name, DEFAULT_PORT_HANDLE), - }) + let (processor_name, processor): (_, Box) = + if operator.name.to_uppercase() == "TTL" { + let processor_name = generate_name("TOP", &operator, query_context); + let processor = Box::new(TableOperatorProcessorFactory::new( + processor_name.clone(), + operator.clone(), + query_context.udfs.to_owned(), + query_context.runtime.clone(), + )); + (processor_name, processor) + } else if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" + { + let processor_name = generate_name("WIN", &operator, query_context); + let processor = Box::new(WindowProcessorFactory::new( + processor_name.clone(), + operator.clone(), + )); + (processor_name, processor) } else { - Err(PipelineError::UnsupportedTableOperator( + return Err(PipelineError::UnsupportedTableOperator( operator.name.clone(), - )) - } - } else if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" { - let mut entry_points = vec![]; + )); + }; - let processor_name = generate_name("WIN", operator, query_context); - if !query_context.processors_list.insert(processor_name.clone()) { - return Err(PipelineError::ProcessorAlreadyExists(processor_name)); - } - let processor = WindowProcessorFactory::new(processor_name.clone(), operator.clone()); - - if let Some(table) = operator.args.first() { - let source_name = match table { - TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?, - TableOperatorArg::Descriptor(descriptor) => { - let connection_info = insert_table_operator_processor_to_pipeline( - descriptor, - pipeline, - pipeline_idx, - query_context, - )?; - connection_info.output_node.0 - } - }; - - if is_an_entry_point(&source_name, query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); - - entry_points.push(entry_point); - query_context.used_sources.push(source_name.clone()); - } else if is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) { - input_nodes.push(( - source_name.clone(), - processor_name.to_owned(), - DEFAULT_PORT_HANDLE, - )); - } + if !query_context.processors_list.insert(processor_name.clone()) { + return Err(PipelineError::ProcessorAlreadyExists(processor_name)); + } - pipeline.add_processor(Box::new(processor), &processor_name, entry_points); - - if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) - && !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) - { - pipeline.connect_nodes( - &source_name, - DEFAULT_PORT_HANDLE, - &processor_name, - DEFAULT_PORT_HANDLE, - ); - } + pipeline.add_processor(processor, processor_name.clone()); - Ok(ConnectionInfo { - input_nodes, - output_node: (processor_name, DEFAULT_PORT_HANDLE), - }) - } else { - Err(PipelineError::UnsupportedTableOperator( - operator.name.clone(), - )) + let Some(table) = operator.args.into_iter().next() else { + return Err(PipelineError::UnsupportedTableOperator( + operator.name.clone(), + )); + }; + + let source_name = match table { + TableOperatorArg::Argument(argument) => get_source_name(&operator.name, &argument)?, + TableOperatorArg::Descriptor(descriptor) => { + let connection_info = insert_table_operator_processor_to_pipeline( + descriptor, + pipeline, + pipeline_idx, + query_context, + )?; + connection_info.output_node.0 } + }; + + let is_an_entry_point = is_an_entry_point(&source_name, query_context, pipeline_idx); + let is_a_pipeline_output = is_a_pipeline_output(&source_name, query_context, pipeline_idx); + + let input_nodes = if is_an_entry_point { + let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); + pipeline.add_entry_point(processor_name.clone(), entry_point); + query_context.used_sources.push(source_name.clone()); + vec![] + } else if is_a_pipeline_output { + vec![( + source_name.clone(), + processor_name.clone(), + DEFAULT_PORT_HANDLE, + )] } else { - Err(PipelineError::UnsupportedTableOperator( - operator.name.clone(), - )) - } + pipeline.connect_nodes( + source_name, + DEFAULT_PORT_HANDLE, + processor_name.clone(), + DEFAULT_PORT_HANDLE, + ); + vec![] + }; + + Ok(ConnectionInfo { + input_nodes, + output_node: (processor_name, DEFAULT_PORT_HANDLE), + }) } fn generate_name( diff --git a/dozer-sql/src/planner/projection.rs b/dozer-sql/src/planner/projection.rs index c987370b4c..6330adfbf1 100644 --- a/dozer-sql/src/planner/projection.rs +++ b/dozer-sql/src/planner/projection.rs @@ -5,7 +5,7 @@ use crate::builder::string_from_sql_object_name; use crate::errors::PipelineError; use dozer_sql_expression::builder::ExpressionBuilder; use dozer_sql_expression::execution::Expression; -use dozer_sql_expression::sqlparser::ast::{Expr, Ident, Select, SelectItem}; +use dozer_sql_expression::sqlparser::ast::{Expr, Ident, SelectItem}; use dozer_types::models::udf_config::UdfConfig; use dozer_types::types::{FieldDefinition, Schema}; use tokio::runtime::Runtime; @@ -208,15 +208,20 @@ impl<'a> CommonPlanner<'_> { Ok(()) } - pub async fn plan(&mut self, select: Select) -> Result<(), PipelineError> { - for expr in select.clone().projection { + pub async fn plan( + &mut self, + projection: Vec, + group_by: Vec, + having: Option, + ) -> Result<(), PipelineError> { + for expr in projection { self.add_select_item(expr).await?; } - if !select.group_by.is_empty() { - self.add_groupby_items(select.group_by).await?; + if !group_by.is_empty() { + self.add_groupby_items(group_by).await?; } - if let Some(having) = select.having { + if let Some(having) = having { self.add_having_item(having).await?; } diff --git a/dozer-sql/src/planner/tests/projection_tests.rs b/dozer-sql/src/planner/tests/projection_tests.rs index 0ca127d87b..4241957fce 100644 --- a/dozer-sql/src/planner/tests/projection_tests.rs +++ b/dozer-sql/src/planner/tests/projection_tests.rs @@ -44,7 +44,11 @@ fn test_basic_projection() { let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); assert_eq!( diff --git a/dozer-sql/src/planner/tests/schema_tests.rs b/dozer-sql/src/planner/tests/schema_tests.rs index 360cbb4f8e..1458080fa5 100644 --- a/dozer-sql/src/planner/tests/schema_tests.rs +++ b/dozer-sql/src/planner/tests/schema_tests.rs @@ -49,7 +49,11 @@ fn test_schema_index_partial_group_by() { let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); assert!(projection_planner @@ -105,7 +109,11 @@ fn test_schema_index_full_group_by() { let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); assert_eq!( diff --git a/dozer-sql/src/product/table/factory.rs b/dozer-sql/src/product/table/factory.rs index 9889f81945..f8da9641e8 100644 --- a/dozer-sql/src/product/table/factory.rs +++ b/dozer-sql/src/product/table/factory.rs @@ -4,28 +4,22 @@ use dozer_core::{ node::{PortHandle, Processor, ProcessorFactory}, DEFAULT_PORT_HANDLE, }; -use dozer_sql_expression::{ - builder::{extend_schema_source_def, NameOrAlias}, - sqlparser::ast::TableFactor, -}; +use dozer_sql_expression::builder::{extend_schema_source_def, NameOrAlias}; use dozer_types::{errors::internal::BoxedError, tonic::async_trait, types::Schema}; -use crate::{ - builder::string_from_sql_object_name, - errors::{PipelineError, ProductError}, -}; +use crate::errors::PipelineError; use super::processor::TableProcessor; #[derive(Debug)] pub struct TableProcessorFactory { id: String, - relation: TableFactor, + table: NameOrAlias, } impl TableProcessorFactory { - pub fn new(id: String, relation: TableFactor) -> Self { - Self { id, relation } + pub fn new(id: String, table: NameOrAlias) -> Self { + Self { id, table } } } @@ -53,8 +47,7 @@ impl ProcessorFactory for TableProcessorFactory { input_schemas: &HashMap, ) -> Result { if let Some(input_schema) = input_schemas.get(&DEFAULT_PORT_HANDLE) { - let table = get_name_or_alias(&self.relation)?; - let extended_input_schema = extend_schema_source_def(input_schema, &table); + let extended_input_schema = extend_schema_source_def(input_schema, &self.table); Ok(extended_input_schema) } else { Err(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE).into()) @@ -73,39 +66,3 @@ impl ProcessorFactory for TableProcessorFactory { ))) } } - -pub fn get_name_or_alias(relation: &TableFactor) -> Result { - match relation { - TableFactor::Table { name, alias, .. } => { - let table_name = string_from_sql_object_name(name); - if let Some(table_alias) = alias { - let alias = table_alias.name.value.clone(); - return Ok(NameOrAlias(table_name, Some(alias))); - } - Ok(NameOrAlias(table_name, None)) - } - TableFactor::Derived { alias, .. } => { - if let Some(table_alias) = alias { - let alias = table_alias.name.value.clone(); - return Ok(NameOrAlias("dozer_derived".to_string(), Some(alias))); - } - Ok(NameOrAlias("dozer_derived".to_string(), None)) - } - TableFactor::TableFunction { .. } => Err(PipelineError::ProductError( - ProductError::UnsupportedTableFunction, - )), - TableFactor::UNNEST { .. } => { - Err(PipelineError::ProductError(ProductError::UnsupportedUnnest)) - } - TableFactor::NestedJoin { alias, .. } => { - if let Some(table_alias) = alias { - let alias = table_alias.name.value.clone(); - return Ok(NameOrAlias("dozer_nested".to_string(), Some(alias))); - } - Ok(NameOrAlias("dozer_nested".to_string(), None)) - } - TableFactor::Pivot { .. } => { - Err(PipelineError::ProductError(ProductError::UnsupportedPivot)) - } - } -} diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index e306d8aaf6..9c3c588cff 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -253,13 +253,12 @@ fn test_pipeline_builder() { pipeline.add_sink( Box::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])), - "sink", - vec![], + "sink".to_string(), ); pipeline.connect_nodes( - &table_info.node, + table_info.node.clone(), table_info.port, - "sink", + "sink".to_string(), DEFAULT_PORT_HANDLE, ); diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 023d0f5a30..800c456782 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -358,14 +358,13 @@ impl TestPipeline { let output = Arc::new(Mutex::new(HashMap::new())); pipeline.add_sink( Box::new(TestSinkFactory::new(output.clone())), - "sink", - vec![], + "sink".to_string(), ); pipeline.connect_nodes( - &output_table.node, + output_table.node.clone(), output_table.port, - "sink", + "sink".to_string(), DEFAULT_PORT_HANDLE, ); let used_schemas = pipeline.get_entry_points_sources_names();