From 53262c9a7ef788b1c9b35b052202bcdb6bf2188d Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Tue, 27 Feb 2024 17:53:08 +0800 Subject: [PATCH] chore: Clean up `dozer_sql::builder` (#2424) --- dozer-sql/src/builder/common.rs | 38 ++ dozer-sql/src/builder/from.rs | 130 +++++++ .../join_builder.rs => builder/join.rs} | 7 +- dozer-sql/src/{builder.rs => builder/mod.rs} | 346 ++---------------- .../table_operator.rs} | 280 ++++---------- dozer-sql/src/builder/tests.rs | 211 +++++++++++ dozer-sql/src/lib.rs | 1 - dozer-sql/src/pipeline_builder/mod.rs | 2 - dozer-sql/src/planner/projection.rs | 2 +- dozer-sql/src/product/table/factory.rs | 6 +- dozer-sql/src/table_operator/factory.rs | 2 +- dozer-sql/src/window/builder.rs | 12 +- dozer-sql/src/window/factory.rs | 2 +- 13 files changed, 481 insertions(+), 558 deletions(-) create mode 100644 dozer-sql/src/builder/common.rs create mode 100644 dozer-sql/src/builder/from.rs rename dozer-sql/src/{pipeline_builder/join_builder.rs => builder/join.rs} (97%) rename dozer-sql/src/{builder.rs => builder/mod.rs} (62%) rename dozer-sql/src/{pipeline_builder/from_builder.rs => builder/table_operator.rs} (59%) create mode 100644 dozer-sql/src/builder/tests.rs delete mode 100644 dozer-sql/src/pipeline_builder/mod.rs diff --git a/dozer-sql/src/builder/common.rs b/dozer-sql/src/builder/common.rs new file mode 100644 index 0000000000..fd7026615b --- /dev/null +++ b/dozer-sql/src/builder/common.rs @@ -0,0 +1,38 @@ +use dozer_sql_expression::{builder::ExpressionBuilder, sqlparser::ast::ObjectName}; + +use super::QueryContext; + +pub fn is_an_entry_point(name: &str, query_context: &QueryContext, pipeline_idx: usize) -> bool { + if query_context + .pipeline_map + .contains_key(&(pipeline_idx, name.to_owned())) + { + return false; + } + if query_context.processors_list.contains(&name.to_owned()) { + return false; + } + true +} + +pub fn is_a_pipeline_output( + name: &str, + query_context: &mut QueryContext, + pipeline_idx: usize, +) -> bool { + if query_context + .pipeline_map + .contains_key(&(pipeline_idx, name.to_owned())) + { + return true; + } + false +} + +pub fn string_from_sql_object_name(name: &ObjectName) -> String { + name.0 + .iter() + .map(ExpressionBuilder::normalize_ident) + .collect::>() + .join(".") +} diff --git a/dozer-sql/src/builder/from.rs b/dozer-sql/src/builder/from.rs new file mode 100644 index 0000000000..6ac8c9a822 --- /dev/null +++ b/dozer-sql/src/builder/from.rs @@ -0,0 +1,130 @@ +use dozer_core::{ + app::{AppPipeline, PipelineEntryPoint}, + DEFAULT_PORT_HANDLE, +}; +use dozer_sql_expression::sqlparser::ast::{TableFactor, TableWithJoins}; + +use crate::{ + builder::{get_from_source, QueryContext}, + errors::PipelineError, + product::table::factory::TableProcessorFactory, +}; + +use super::{ + common::is_an_entry_point, + join::insert_join_to_pipeline, + table_operator::{insert_table_operator_processor_to_pipeline, is_table_operator}, + ConnectionInfo, +}; + +pub fn insert_from_to_pipeline( + from: &TableWithJoins, + pipeline: &mut AppPipeline, + pipeline_idx: usize, + query_context: &mut QueryContext, +) -> Result { + if from.joins.is_empty() { + insert_table_to_pipeline(&from.relation, pipeline, pipeline_idx, query_context) + } else { + insert_join_to_pipeline(from, pipeline, pipeline_idx, query_context) + } +} + +fn insert_table_to_pipeline( + relation: &TableFactor, + pipeline: &mut AppPipeline, + pipeline_idx: usize, + query_context: &mut QueryContext, +) -> Result { + if let Some(operator) = is_table_operator(relation)? { + let product_processor_name = + insert_from_processor_to_pipeline(query_context, relation, pipeline); + + let connection_info = insert_table_operator_processor_to_pipeline( + &operator, + pipeline, + pipeline_idx, + query_context, + )?; + + pipeline.connect_nodes( + &connection_info.output_node.0, + connection_info.output_node.1, + &product_processor_name.clone(), + DEFAULT_PORT_HANDLE, + ); + + Ok(ConnectionInfo { + input_nodes: connection_info.input_nodes, + output_node: (product_processor_name, DEFAULT_PORT_HANDLE), + }) + } else { + insert_table_processor_to_pipeline(relation, pipeline, pipeline_idx, query_context) + } +} + +fn insert_table_processor_to_pipeline( + relation: &TableFactor, + pipeline: &mut AppPipeline, + pipeline_idx: usize, + query_context: &mut QueryContext, +) -> Result { + // let relation_name_or_alias = get_name_or_alias(relation)?; + let relation_name_or_alias = get_from_source(relation, pipeline, query_context, pipeline_idx)?; + + let processor_name = format!( + "from:{}--{}", + relation_name_or_alias.0, + query_context.get_next_processor_id() + ); + if !query_context.processors_list.insert(processor_name.clone()) { + return Err(PipelineError::ProcessorAlreadyExists(processor_name)); + } + let product_processor_factory = + TableProcessorFactory::new(processor_name.clone(), relation.to_owned()); + + let product_input_name = relation_name_or_alias.0; + + let mut input_nodes = vec![]; + let mut product_entry_points = vec![]; + + // is a node that is an entry point to the pipeline + if is_an_entry_point(&product_input_name, query_context, pipeline_idx) { + let entry_point = PipelineEntryPoint::new(product_input_name.clone(), DEFAULT_PORT_HANDLE); + + product_entry_points.push(entry_point); + query_context.used_sources.push(product_input_name); + } + // is a node that is connected to another pipeline + else { + input_nodes.push(( + product_input_name, + processor_name.clone(), + DEFAULT_PORT_HANDLE, + )); + } + + pipeline.add_processor( + Box::new(product_processor_factory), + &processor_name, + product_entry_points, + ); + + Ok(ConnectionInfo { + input_nodes, + output_node: (processor_name, DEFAULT_PORT_HANDLE), + }) +} + +fn insert_from_processor_to_pipeline( + query_context: &mut QueryContext, + relation: &TableFactor, + pipeline: &mut AppPipeline, +) -> String { + let product_processor_name = format!("from--{}", query_context.get_next_processor_id()); + let product_processor = + TableProcessorFactory::new(product_processor_name.clone(), relation.clone()); + + pipeline.add_processor(Box::new(product_processor), &product_processor_name, vec![]); + product_processor_name +} diff --git a/dozer-sql/src/pipeline_builder/join_builder.rs b/dozer-sql/src/builder/join.rs similarity index 97% rename from dozer-sql/src/pipeline_builder/join_builder.rs rename to dozer-sql/src/builder/join.rs index 2e1f23cea3..1c508d4004 100644 --- a/dozer-sql/src/pipeline_builder/join_builder.rs +++ b/dozer-sql/src/builder/join.rs @@ -13,8 +13,9 @@ use crate::{ }, }; -use super::from_builder::{ - insert_table_operator_processor_to_pipeline, is_an_entry_point, is_table_operator, +use super::{ + common::is_an_entry_point, + table_operator::{insert_table_operator_processor_to_pipeline, is_table_operator}, ConnectionInfo, }; @@ -25,7 +26,7 @@ enum JoinSource { Join(ConnectionInfo), } -pub(crate) fn insert_join_to_pipeline( +pub fn insert_join_to_pipeline( from: &TableWithJoins, pipeline: &mut AppPipeline, pipeline_idx: usize, diff --git a/dozer-sql/src/builder.rs b/dozer-sql/src/builder/mod.rs similarity index 62% rename from dozer-sql/src/builder.rs rename to dozer-sql/src/builder/mod.rs index 8b6dc6673d..e5f0a17cea 100644 --- a/dozer-sql/src/builder.rs +++ b/dozer-sql/src/builder/mod.rs @@ -3,13 +3,10 @@ use crate::builder::PipelineError::InvalidQuery; use crate::errors::PipelineError; use crate::selection::factory::SelectionProcessorFactory; use dozer_core::app::AppPipeline; -use dozer_core::app::PipelineEntryPoint; use dozer_core::node::PortHandle; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_sql_expression::builder::{ExpressionBuilder, NameOrAlias}; -use dozer_sql_expression::sqlparser::ast::{ - Join, SetOperator, SetQuantifier, TableFactor, TableWithJoins, -}; +use dozer_sql_expression::sqlparser::ast::{SetOperator, SetQuantifier, TableFactor}; use dozer_types::models::udf_config::UdfConfig; use dozer_sql_expression::sqlparser::{ @@ -23,7 +20,6 @@ use std::sync::Arc; use tokio::runtime::Runtime; use super::errors::UnsupportedSqlError; -use super::pipeline_builder::from_builder::insert_from_to_pipeline; use super::product::set::set_factory::SetProcessorFactory; @@ -33,21 +29,14 @@ pub struct OutputNodeInfo { pub node: String, // Port to connect in dag pub port: PortHandle, - // If this table is originally from a source or created in transforms - pub is_derived: bool, // TODO add:indexes to the tables } -pub struct TableInfo { - pub name: NameOrAlias, - pub override_name: Option, - pub is_derived: bool, -} /// The struct contains some contexts during query to pipeline. #[derive(Debug, Clone)] pub struct QueryContext { // Internal tables map, used to store the tables that are created by the queries - pub pipeline_map: HashMap<(usize, String), OutputNodeInfo>, + pipeline_map: HashMap<(usize, String), OutputNodeInfo>, // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks. pub output_tables_map: HashMap, @@ -56,20 +45,20 @@ pub struct QueryContext { pub used_sources: Vec, // Internal tables map, used to store the tables that are created by the queries - pub processors_list: HashSet, + processors_list: HashSet, // Processors counter - pub processor_counter: usize, + processor_counter: usize, // Udf related configs - pub udfs: Vec, + udfs: Vec, // The tokio runtime - pub runtime: Arc, + runtime: Arc, } impl QueryContext { - pub fn get_next_processor_id(&mut self) -> usize { + fn get_next_processor_id(&mut self) -> usize { self.processor_counter += 1; self.processor_counter } @@ -87,11 +76,6 @@ impl QueryContext { } } -#[derive(Debug, Clone)] -pub struct IndexedTableWithJoins { - pub relation: (NameOrAlias, TableFactor), - pub joins: Vec<(NameOrAlias, Join)>, -} pub fn statement_to_pipeline( sql: &str, pipeline: &mut AppPipeline, @@ -112,7 +96,6 @@ pub fn statement_to_pipeline( query_to_pipeline( &TableInfo { name: query_name.clone(), - is_derived: false, override_name: override_name.clone(), }, query, @@ -134,6 +117,11 @@ pub fn statement_to_pipeline( Ok(ctx) } +struct TableInfo { + name: NameOrAlias, + override_name: Option, +} + fn query_to_pipeline( table_info: &TableInfo, query: &Query, @@ -182,7 +170,6 @@ fn query_to_pipeline( query_to_pipeline( &TableInfo { name: NameOrAlias(table_name.clone(), Some(table_name)), - is_derived: true, override_name: None, }, &table.query, @@ -213,7 +200,6 @@ fn query_to_pipeline( query_to_pipeline( &TableInfo { name: NameOrAlias(query_name, None), - is_derived: true, override_name: None, }, &query, @@ -270,17 +256,8 @@ fn select_to_pipeline( )); } - // let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?; - // - // let (input_nodes, output_node, mut used_sources) = add_from_to_pipeline( - // pipeline, - // &input_tables, - // &mut query_ctx.pipeline_map, - // pipeline_idx, - // )?; - let connection_info = - insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?; + from::insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?; let input_nodes = connection_info.input_nodes; let output_node = connection_info.output_node; @@ -360,7 +337,6 @@ fn select_to_pipeline( OutputNodeInfo { node: gen_agg_name.clone(), port: DEFAULT_PORT_HANDLE, - is_derived: table_info.is_derived, }, ); @@ -384,7 +360,6 @@ fn select_to_pipeline( OutputNodeInfo { node: gen_agg_name.clone(), port: DEFAULT_PORT_HANDLE, - is_derived: false, }, ); } @@ -408,13 +383,11 @@ fn set_to_pipeline( let left_table_info = TableInfo { name: NameOrAlias(gen_left_set_name.clone(), None), override_name: None, - is_derived: false, }; let gen_right_set_name = format!("set_right_{}", query_ctx.get_next_processor_id()); let right_table_info = TableInfo { name: NameOrAlias(gen_right_set_name.clone(), None), override_name: None, - is_derived: false, }; let _left_pipeline_name = match *left_select { @@ -548,79 +521,13 @@ fn set_to_pipeline( OutputNodeInfo { node: gen_set_name.clone(), port: DEFAULT_PORT_HANDLE, - is_derived: table_info.is_derived, }, ); Ok(gen_set_name) } -/// Returns a vector of input port handles and relative table name -/// -/// # Errors -/// -/// This function will return an error if it's not possible to get an input name. -pub fn get_input_tables( - from: &TableWithJoins, - pipeline: &mut AppPipeline, - query_ctx: &mut QueryContext, - pipeline_idx: usize, -) -> Result { - let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?; - let mut joins = vec![]; - - for join in from.joins.iter() { - let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?; - joins.push((input_name.clone(), join.clone())); - } - - Ok(IndexedTableWithJoins { - relation: (name, from.relation.clone()), - joins, - }) -} - -pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec { - let mut input_names = vec![]; - input_names.push(input_tables.relation.0.clone()); - - for join in &input_tables.joins { - input_names.push(join.0.clone()); - } - input_names -} - -pub fn get_entry_points( - input_tables: &IndexedTableWithJoins, - pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>, - pipeline_idx: usize, -) -> Result, PipelineError> { - let mut endpoints = vec![]; - - let input_names = get_input_names(input_tables); - - for (input_port, table) in input_names.iter().enumerate() { - let name = table.0.clone(); - if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) { - endpoints.push(PipelineEntryPoint::new(name, input_port as PortHandle)); - } - } - - Ok(endpoints) -} - -pub fn is_an_entry_point( - name: &str, - pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>, - pipeline_idx: usize, -) -> bool { - if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) { - return true; - } - false -} - -pub fn get_from_source( +fn get_from_source( relation: &TableFactor, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, @@ -654,7 +561,6 @@ pub fn get_from_source( query_to_pipeline( &TableInfo { name: name_or.clone(), - is_derived: true, override_name: None, }, subquery, @@ -673,217 +579,19 @@ pub fn get_from_source( } } -#[cfg(test)] -mod tests { - use super::statement_to_pipeline; - use crate::{errors::PipelineError, tests::utils::create_test_runtime}; - use dozer_core::app::AppPipeline; - #[test] - #[should_panic] - fn disallow_zero_outgoing_ndes() { - let sql = "select * from film"; - let runtime = create_test_runtime(); - statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ) - .unwrap(); - } - - #[test] - fn test_duplicate_into_clause() { - let sql = "select * into table1 from film1 ; select * into table1 from film2"; - let runtime = create_test_runtime(); - let result = statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ); - assert!(matches!( - result, - Err(PipelineError::DuplicateIntoClause(dup_table)) if dup_table == "table1" - )); - } - - #[test] - fn parse_sql_pipeline() { - let sql = r#" - SELECT - a.name as "Genre", - SUM(amount) as "Gross Revenue(in $)" - INTO gross_revenue_stats - FROM - ( - SELECT - c.name, - f.title, - p.amount - FROM film f - LEFT JOIN film_category fc - ON fc.film_id = f.film_id - LEFT JOIN category c - ON fc.category_id = c.category_id - LEFT JOIN inventory i - ON i.film_id = f.film_id - LEFT JOIN rental r - ON r.inventory_id = i.inventory_id - LEFT JOIN payment p - ON p.rental_id = r.rental_id - WHERE p.amount IS NOT NULL - ) a - GROUP BY name; - - SELECT - f.name, f.title, p.amount - INTO film_amounts - FROM film f - LEFT JOIN film_category fc; - - WITH tbl as (select id from a) - select id - into cte_table - from tbl; - - WITH tbl as (select id from a), - tbl2 as (select id from tbl) - select id - into nested_cte_table - from tbl2; - - WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1), - cte_table2 as (select id_ct1 from cte_table1) - select id_ct2 - into nested_derived_table - from cte_table2; - - with tbl as (select id, ticker from stocks) - select tbl.id - into nested_stocks_table - from stocks join tbl on tbl.id = stocks.id; - "#; - - let runtime = create_test_runtime(); - let context = statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ) - .unwrap(); - - // Should create as many output tables as into statements - let mut output_keys = context.output_tables_map.keys().collect::>(); - output_keys.sort(); - let mut expected_keys = vec![ - "gross_revenue_stats", - "film_amounts", - "cte_table", - "nested_cte_table", - "nested_derived_table", - "nested_stocks_table", - ]; - expected_keys.sort(); - assert_eq!(output_keys, expected_keys); - } - - #[test] - fn test_missing_into_in_simple_from_clause() { - let sql = r#"SELECT a FROM B "#; - let runtime = create_test_runtime(); - let result = statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ); - //check if the result is an error - assert!(matches!(result, Err(PipelineError::MissingIntoClause))) - } - - #[test] - fn test_correct_into_clause() { - let sql = r#"SELECT a INTO C FROM B"#; - let runtime = create_test_runtime(); - let result = statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ); - //check if the result is ok - assert!(result.is_ok()); - } +#[derive(Clone, Debug)] +struct ConnectionInfo { + input_nodes: Vec<(String, String, PortHandle)>, + output_node: (String, PortHandle), +} - #[test] - fn test_missing_into_in_nested_from_clause() { - let sql = r#"SELECT a FROM (SELECT a from b)"#; - let runtime = create_test_runtime(); - let result = statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ); - //check if the result is an error - assert!(matches!(result, Err(PipelineError::MissingIntoClause))) - } +mod common; +mod from; +mod join; +mod table_operator; - #[test] - fn test_correct_into_in_nested_from() { - let sql = r#"SELECT a INTO c FROM (SELECT a from b)"#; - let runtime = create_test_runtime(); - let result = statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ); - //check if the result is ok - assert!(result.is_ok()); - } +pub use common::string_from_sql_object_name; +pub use table_operator::{TableOperatorArg, TableOperatorDescriptor}; - #[test] - fn test_missing_into_in_with_clause() { - let sql = r#"WITH tbl as (select a from B) - select B - from tbl;"#; - let runtime = create_test_runtime(); - let result = statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ); - //check if the result is an error - assert!(matches!(result, Err(PipelineError::MissingIntoClause))) - } - - #[test] - fn test_correct_into_in_with_clause() { - let sql = r#"WITH tbl as (select a from B) - select B - into C - from tbl;"#; - let runtime = create_test_runtime(); - let result = statement_to_pipeline( - sql, - &mut AppPipeline::new_with_default_flags(), - None, - vec![], - runtime, - ); - //check if the result is ok - assert!(result.is_ok()); - } -} +#[cfg(test)] +mod tests; diff --git a/dozer-sql/src/pipeline_builder/from_builder.rs b/dozer-sql/src/builder/table_operator.rs similarity index 59% rename from dozer-sql/src/pipeline_builder/from_builder.rs rename to dozer-sql/src/builder/table_operator.rs index e6cfba5931..8c6b9d97a5 100644 --- a/dozer-sql/src/pipeline_builder/from_builder.rs +++ b/dozer-sql/src/builder/table_operator.rs @@ -1,28 +1,21 @@ use dozer_core::{ app::{AppPipeline, PipelineEntryPoint}, - node::PortHandle, DEFAULT_PORT_HANDLE, }; -use dozer_sql_expression::{ - builder::ExpressionBuilder, - sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, ObjectName, TableFactor, TableWithJoins}, +use dozer_sql_expression::sqlparser::ast::{ + Expr, FunctionArg, FunctionArgExpr, ObjectName, TableFactor, }; use crate::{ - builder::{get_from_source, QueryContext}, errors::PipelineError, - product::table::factory::TableProcessorFactory, table_operator::factory::{get_source_name, TableOperatorProcessorFactory}, window::factory::WindowProcessorFactory, }; -use super::join_builder::insert_join_to_pipeline; - -#[derive(Clone, Debug)] -pub struct ConnectionInfo { - pub input_nodes: Vec<(String, String, PortHandle)>, - pub output_node: (String, PortHandle), -} +use super::{ + common::{is_a_pipeline_output, is_an_entry_point, string_from_sql_object_name}, + ConnectionInfo, QueryContext, +}; #[derive(Clone, Debug)] pub struct TableOperatorDescriptor { @@ -36,103 +29,67 @@ pub enum TableOperatorArg { Descriptor(TableOperatorDescriptor), } -pub fn insert_from_to_pipeline( - from: &TableWithJoins, - pipeline: &mut AppPipeline, - pipeline_idx: usize, - query_context: &mut QueryContext, -) -> Result { - if from.joins.is_empty() { - insert_table_to_pipeline(&from.relation, pipeline, pipeline_idx, query_context) - } else { - insert_join_to_pipeline(from, pipeline, pipeline_idx, query_context) - } -} - -fn insert_table_to_pipeline( +pub fn is_table_operator( relation: &TableFactor, - pipeline: &mut AppPipeline, - pipeline_idx: usize, - query_context: &mut QueryContext, -) -> Result { - if let Some(operator) = is_table_operator(relation)? { - let product_processor_name = - insert_from_processor_to_pipeline(query_context, relation, pipeline); - - let connection_info = insert_table_operator_processor_to_pipeline( - &operator, - pipeline, - pipeline_idx, - query_context, - )?; - - pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &product_processor_name.clone(), - DEFAULT_PORT_HANDLE, - ); +) -> Result, PipelineError> { + match relation { + TableFactor::Table { name, args, .. } => { + if args.is_none() { + return Ok(None); + } + let operator = get_table_operator_descriptor(name, args)?; - Ok(ConnectionInfo { - input_nodes: connection_info.input_nodes, - output_node: (product_processor_name, DEFAULT_PORT_HANDLE), - }) - } else { - insert_table_processor_to_pipeline(relation, pipeline, pipeline_idx, query_context) + Ok(operator) + } + TableFactor::Derived { .. } => Ok(None), + TableFactor::TableFunction { .. } => Err(PipelineError::UnsupportedTableFunction), + TableFactor::UNNEST { .. } => Err(PipelineError::UnsupportedUnnest), + TableFactor::NestedJoin { .. } => Err(PipelineError::UnsupportedNestedJoin), + TableFactor::Pivot { .. } => Err(PipelineError::UnsupportedPivot), } } -fn insert_table_processor_to_pipeline( - relation: &TableFactor, - pipeline: &mut AppPipeline, - pipeline_idx: usize, - query_context: &mut QueryContext, -) -> Result { - // let relation_name_or_alias = get_name_or_alias(relation)?; - let relation_name_or_alias = get_from_source(relation, pipeline, query_context, pipeline_idx)?; +fn get_table_operator_descriptor( + name: &ObjectName, + args: &Option>, +) -> Result, PipelineError> { + let mut operator_args = vec![]; - let processor_name = format!( - "from:{}--{}", - relation_name_or_alias.0, - query_context.get_next_processor_id() - ); - if !query_context.processors_list.insert(processor_name.clone()) { - return Err(PipelineError::ProcessorAlreadyExists(processor_name)); + if let Some(args) = args { + for arg in args { + let operator_arg = get_table_operator_arg(arg)?; + operator_args.push(operator_arg); + } } - let product_processor_factory = - TableProcessorFactory::new(processor_name.clone(), relation.to_owned()); - - let product_input_name = relation_name_or_alias.0; - let mut input_nodes = vec![]; - let mut product_entry_points = vec![]; - - // is a node that is an entry point to the pipeline - if is_an_entry_point(&product_input_name, query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(product_input_name.clone(), DEFAULT_PORT_HANDLE); + Ok(Some(TableOperatorDescriptor { + name: string_from_sql_object_name(name), + args: operator_args, + })) +} - product_entry_points.push(entry_point); - query_context.used_sources.push(product_input_name); - } - // is a node that is connected to another pipeline - else { - input_nodes.push(( - product_input_name, - processor_name.clone(), - DEFAULT_PORT_HANDLE, - )); +fn get_table_operator_arg(arg: &FunctionArg) -> Result { + match arg { + FunctionArg::Named { name, arg: _ } => { + Err(PipelineError::UnsupportedTableOperator(name.to_string())) + } + FunctionArg::Unnamed(arg_expr) => match arg_expr { + FunctionArgExpr::Expr(Expr::Function(function)) => { + let operator_descriptor = get_table_operator_descriptor( + &function.clone().name, + &Some(function.clone().args), + )?; + if let Some(descriptor) = operator_descriptor { + Ok(TableOperatorArg::Descriptor(descriptor)) + } else { + Err(PipelineError::UnsupportedTableOperator( + string_from_sql_object_name(&function.name), + )) + } + } + _ => Ok(TableOperatorArg::Argument(arg.clone())), + }, } - - pipeline.add_processor( - Box::new(product_processor_factory), - &processor_name, - product_entry_points, - ); - - Ok(ConnectionInfo { - input_nodes, - output_node: (processor_name, DEFAULT_PORT_HANDLE), - }) } pub fn insert_table_operator_processor_to_pipeline( @@ -174,8 +131,7 @@ pub fn insert_table_operator_processor_to_pipeline( }; if is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) { - let entry_point = - PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle); + let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); entry_points.push(entry_point); query_context.used_sources.push(source_name.clone()); @@ -183,7 +139,7 @@ pub fn insert_table_operator_processor_to_pipeline( input_nodes.push(( source_name.clone(), processor_name.clone(), - DEFAULT_PORT_HANDLE as PortHandle, + DEFAULT_PORT_HANDLE, )); } @@ -233,8 +189,7 @@ pub fn insert_table_operator_processor_to_pipeline( }; if is_an_entry_point(&source_name, query_context, pipeline_idx) { - let entry_point = - PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle); + let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); entry_points.push(entry_point); query_context.used_sources.push(source_name.clone()); @@ -242,7 +197,7 @@ pub fn insert_table_operator_processor_to_pipeline( input_nodes.push(( source_name.clone(), processor_name.to_owned(), - DEFAULT_PORT_HANDLE as PortHandle, + DEFAULT_PORT_HANDLE, )); } @@ -275,7 +230,7 @@ pub fn insert_table_operator_processor_to_pipeline( } } -pub fn generate_name( +fn generate_name( prefix: &str, operator: &TableOperatorDescriptor, query_context: &mut QueryContext, @@ -288,114 +243,3 @@ pub fn generate_name( ); processor_name } - -fn insert_from_processor_to_pipeline( - query_context: &mut QueryContext, - relation: &TableFactor, - pipeline: &mut AppPipeline, -) -> String { - let product_processor_name = format!("from--{}", query_context.get_next_processor_id()); - let product_processor = - TableProcessorFactory::new(product_processor_name.clone(), relation.clone()); - - pipeline.add_processor(Box::new(product_processor), &product_processor_name, vec![]); - product_processor_name -} - -pub fn get_table_operator_arg(arg: &FunctionArg) -> Result { - match arg { - FunctionArg::Named { name, arg: _ } => { - Err(PipelineError::UnsupportedTableOperator(name.to_string())) - } - FunctionArg::Unnamed(arg_expr) => match arg_expr { - FunctionArgExpr::Expr(Expr::Function(function)) => { - let operator_descriptor = get_table_operator_descriptor( - &function.clone().name, - &Some(function.clone().args), - )?; - if let Some(descriptor) = operator_descriptor { - Ok(TableOperatorArg::Descriptor(descriptor)) - } else { - Err(PipelineError::UnsupportedTableOperator( - string_from_sql_object_name(&function.name), - )) - } - } - _ => Ok(TableOperatorArg::Argument(arg.clone())), - }, - } -} - -pub fn get_table_operator_descriptor( - name: &ObjectName, - args: &Option>, -) -> Result, PipelineError> { - let mut operator_args = vec![]; - - if let Some(args) = args { - for arg in args { - let operator_arg = get_table_operator_arg(arg)?; - operator_args.push(operator_arg); - } - } - - Ok(Some(TableOperatorDescriptor { - name: string_from_sql_object_name(name), - args: operator_args, - })) -} - -pub fn is_table_operator( - relation: &TableFactor, -) -> Result, PipelineError> { - match relation { - TableFactor::Table { name, args, .. } => { - if args.is_none() { - return Ok(None); - } - let operator = get_table_operator_descriptor(name, args)?; - - Ok(operator) - } - TableFactor::Derived { .. } => Ok(None), - TableFactor::TableFunction { .. } => Err(PipelineError::UnsupportedTableFunction), - TableFactor::UNNEST { .. } => Err(PipelineError::UnsupportedUnnest), - TableFactor::NestedJoin { .. } => Err(PipelineError::UnsupportedNestedJoin), - TableFactor::Pivot { .. } => Err(PipelineError::UnsupportedPivot), - } -} - -pub fn is_an_entry_point(name: &str, query_context: &QueryContext, pipeline_idx: usize) -> bool { - if query_context - .pipeline_map - .contains_key(&(pipeline_idx, name.to_owned())) - { - return false; - } - if query_context.processors_list.contains(&name.to_owned()) { - return false; - } - true -} - -pub fn is_a_pipeline_output( - name: &str, - query_context: &mut QueryContext, - pipeline_idx: usize, -) -> bool { - if query_context - .pipeline_map - .contains_key(&(pipeline_idx, name.to_owned())) - { - return true; - } - false -} - -pub fn string_from_sql_object_name(name: &ObjectName) -> String { - name.0 - .iter() - .map(ExpressionBuilder::normalize_ident) - .collect::>() - .join(".") -} diff --git a/dozer-sql/src/builder/tests.rs b/dozer-sql/src/builder/tests.rs new file mode 100644 index 0000000000..0426c4dc84 --- /dev/null +++ b/dozer-sql/src/builder/tests.rs @@ -0,0 +1,211 @@ +use super::statement_to_pipeline; +use crate::{errors::PipelineError, tests::utils::create_test_runtime}; +use dozer_core::app::AppPipeline; +#[test] +#[should_panic] +fn disallow_zero_outgoing_ndes() { + let sql = "select * from film"; + let runtime = create_test_runtime(); + statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ) + .unwrap(); +} + +#[test] +fn test_duplicate_into_clause() { + let sql = "select * into table1 from film1 ; select * into table1 from film2"; + let runtime = create_test_runtime(); + let result = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ); + assert!(matches!( + result, + Err(PipelineError::DuplicateIntoClause(dup_table)) if dup_table == "table1" + )); +} + +#[test] +fn parse_sql_pipeline() { + let sql = r#" + SELECT + a.name as "Genre", + SUM(amount) as "Gross Revenue(in $)" + INTO gross_revenue_stats + FROM + ( + SELECT + c.name, + f.title, + p.amount + FROM film f + LEFT JOIN film_category fc + ON fc.film_id = f.film_id + LEFT JOIN category c + ON fc.category_id = c.category_id + LEFT JOIN inventory i + ON i.film_id = f.film_id + LEFT JOIN rental r + ON r.inventory_id = i.inventory_id + LEFT JOIN payment p + ON p.rental_id = r.rental_id + WHERE p.amount IS NOT NULL + ) a + GROUP BY name; + + SELECT + f.name, f.title, p.amount + INTO film_amounts + FROM film f + LEFT JOIN film_category fc; + + WITH tbl as (select id from a) + select id + into cte_table + from tbl; + + WITH tbl as (select id from a), + tbl2 as (select id from tbl) + select id + into nested_cte_table + from tbl2; + + WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1), + cte_table2 as (select id_ct1 from cte_table1) + select id_ct2 + into nested_derived_table + from cte_table2; + + with tbl as (select id, ticker from stocks) + select tbl.id + into nested_stocks_table + from stocks join tbl on tbl.id = stocks.id; + "#; + + let runtime = create_test_runtime(); + let context = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ) + .unwrap(); + + // Should create as many output tables as into statements + let mut output_keys = context.output_tables_map.keys().collect::>(); + output_keys.sort(); + let mut expected_keys = vec![ + "gross_revenue_stats", + "film_amounts", + "cte_table", + "nested_cte_table", + "nested_derived_table", + "nested_stocks_table", + ]; + expected_keys.sort(); + assert_eq!(output_keys, expected_keys); +} + +#[test] +fn test_missing_into_in_simple_from_clause() { + let sql = r#"SELECT a FROM B "#; + let runtime = create_test_runtime(); + let result = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ); + //check if the result is an error + assert!(matches!(result, Err(PipelineError::MissingIntoClause))) +} + +#[test] +fn test_correct_into_clause() { + let sql = r#"SELECT a INTO C FROM B"#; + let runtime = create_test_runtime(); + let result = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ); + //check if the result is ok + assert!(result.is_ok()); +} + +#[test] +fn test_missing_into_in_nested_from_clause() { + let sql = r#"SELECT a FROM (SELECT a from b)"#; + let runtime = create_test_runtime(); + let result = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ); + //check if the result is an error + assert!(matches!(result, Err(PipelineError::MissingIntoClause))) +} + +#[test] +fn test_correct_into_in_nested_from() { + let sql = r#"SELECT a INTO c FROM (SELECT a from b)"#; + let runtime = create_test_runtime(); + let result = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ); + //check if the result is ok + assert!(result.is_ok()); +} + +#[test] +fn test_missing_into_in_with_clause() { + let sql = r#"WITH tbl as (select a from B) +select B +from tbl;"#; + let runtime = create_test_runtime(); + let result = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ); + //check if the result is an error + assert!(matches!(result, Err(PipelineError::MissingIntoClause))) +} + +#[test] +fn test_correct_into_in_with_clause() { + let sql = r#"WITH tbl as (select a from B) +select B +into C +from tbl;"#; + let runtime = create_test_runtime(); + let result = statement_to_pipeline( + sql, + &mut AppPipeline::new_with_default_flags(), + None, + vec![], + runtime, + ); + //check if the result is ok + assert!(result.is_ok()); +} diff --git a/dozer-sql/src/lib.rs b/dozer-sql/src/lib.rs index 8f2e2087c7..5ef05a1d2c 100644 --- a/dozer-sql/src/lib.rs +++ b/dozer-sql/src/lib.rs @@ -2,7 +2,6 @@ mod aggregation; pub mod builder; pub mod errors; mod expression; -mod pipeline_builder; mod planner; mod product; mod projection; diff --git a/dozer-sql/src/pipeline_builder/mod.rs b/dozer-sql/src/pipeline_builder/mod.rs deleted file mode 100644 index 6753ef9e55..0000000000 --- a/dozer-sql/src/pipeline_builder/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub(crate) mod from_builder; -pub(crate) mod join_builder; diff --git a/dozer-sql/src/planner/projection.rs b/dozer-sql/src/planner/projection.rs index 924d3cceb2..c987370b4c 100644 --- a/dozer-sql/src/planner/projection.rs +++ b/dozer-sql/src/planner/projection.rs @@ -1,8 +1,8 @@ #![allow(dead_code)] use std::sync::Arc; +use crate::builder::string_from_sql_object_name; use crate::errors::PipelineError; -use crate::pipeline_builder::from_builder::string_from_sql_object_name; use dozer_sql_expression::builder::ExpressionBuilder; use dozer_sql_expression::execution::Expression; use dozer_sql_expression::sqlparser::ast::{Expr, Ident, Select, SelectItem}; diff --git a/dozer-sql/src/product/table/factory.rs b/dozer-sql/src/product/table/factory.rs index 0a32b3be9c..9889f81945 100644 --- a/dozer-sql/src/product/table/factory.rs +++ b/dozer-sql/src/product/table/factory.rs @@ -10,8 +10,10 @@ use dozer_sql_expression::{ }; use dozer_types::{errors::internal::BoxedError, tonic::async_trait, types::Schema}; -use crate::errors::{PipelineError, ProductError}; -use crate::window::builder::string_from_sql_object_name; +use crate::{ + builder::string_from_sql_object_name, + errors::{PipelineError, ProductError}, +}; use super::processor::TableProcessor; diff --git a/dozer-sql/src/table_operator/factory.rs b/dozer-sql/src/table_operator/factory.rs index e2cca99dbf..f4f5670979 100644 --- a/dozer-sql/src/table_operator/factory.rs +++ b/dozer-sql/src/table_operator/factory.rs @@ -14,8 +14,8 @@ use dozer_types::{models::udf_config::UdfConfig, tonic::async_trait}; use tokio::runtime::Runtime; use crate::{ + builder::{TableOperatorArg, TableOperatorDescriptor}, errors::{PipelineError, TableOperatorError}, - pipeline_builder::from_builder::{TableOperatorArg, TableOperatorDescriptor}, }; use super::{ diff --git a/dozer-sql/src/window/builder.rs b/dozer-sql/src/window/builder.rs index c079455ad6..65b233caab 100644 --- a/dozer-sql/src/window/builder.rs +++ b/dozer-sql/src/window/builder.rs @@ -1,6 +1,6 @@ use dozer_sql_expression::{ builder::ExpressionBuilder, - sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Ident, ObjectName, Value}, + sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Ident, Value}, }; use dozer_types::{ chrono::Duration, @@ -8,8 +8,8 @@ use dozer_types::{ }; use crate::{ + builder::{TableOperatorArg, TableOperatorDescriptor}, errors::{JoinError, PipelineError, WindowError}, - pipeline_builder::from_builder::{TableOperatorArg, TableOperatorDescriptor}, }; use super::operator::WindowType; @@ -194,14 +194,6 @@ fn parse_duration_string(duration_string: &str) -> Result } } -pub fn string_from_sql_object_name(name: &ObjectName) -> String { - name.0 - .iter() - .map(ExpressionBuilder::normalize_ident) - .collect::>() - .join(".") -} - pub fn get_field_index(ident: &[Ident], schema: &Schema) -> Result, PipelineError> { let tables_matches = |table_ident: &Ident, fd: &FieldDefinition| -> bool { match fd.source.clone() { diff --git a/dozer-sql/src/window/factory.rs b/dozer-sql/src/window/factory.rs index 857986c16d..785ebf4fb6 100644 --- a/dozer-sql/src/window/factory.rs +++ b/dozer-sql/src/window/factory.rs @@ -7,8 +7,8 @@ use dozer_core::{ use dozer_types::{errors::internal::BoxedError, tonic::async_trait, types::Schema}; use crate::{ + builder::TableOperatorDescriptor, errors::{PipelineError, WindowError}, - pipeline_builder::from_builder::TableOperatorDescriptor, }; use super::{builder::window_from_table_operator, processor::WindowProcessor};