From 25f111bdcb4bd3561a035b048726beee1eee528d Mon Sep 17 00:00:00 2001 From: MG190202 <91785591+MG190202@users.noreply.github.com> Date: Tue, 27 Feb 2024 22:33:13 +0530 Subject: [PATCH 1/6] Remove leftover assertion (#2429) --- dozer-sink-aerospike/src/lib.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index ac3f79d4fd..6323c5e318 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -35,10 +35,7 @@ use aerospike_client_sys::{ as_val_val_reserve, as_vector, as_vector_increase_capacity, as_vector_init, AS_BATCH_WRITE, AS_BIN_NAME_MAX_LEN, }; -use dozer_core::{ - node::{PortHandle, Sink, SinkFactory}, - DEFAULT_PORT_HANDLE, -}; +use dozer_core::node::{PortHandle, Sink, SinkFactory}; use dozer_types::errors::internal::BoxedError; use dozer_types::geo::{Coord, Point}; use dozer_types::ordered_float::OrderedFloat; @@ -1190,7 +1187,6 @@ impl Sink for AerospikeSink { } fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { - debug_assert_eq!(op.port, DEFAULT_PORT_HANDLE); self.sender.send(op)?; Ok(()) } @@ -1248,6 +1244,7 @@ impl Sink for AerospikeSink { #[cfg(test)] mod tests { + use dozer_core::DEFAULT_PORT_HANDLE; use dozer_log::tokio; use std::time::Duration; From 10aed02f003897b2ab3cc4b69ef42edf2fbaa12f Mon Sep 17 00:00:00 2001 From: Jesse Date: Tue, 27 Feb 2024 18:16:13 +0100 Subject: [PATCH 2/6] Tighten filter on which ignored tests run in CI (#2430) --- .github/workflows/unit.yaml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/unit.yaml b/.github/workflows/unit.yaml index f5ceac0bf1..9737477e96 100644 --- a/.github/workflows/unit.yaml +++ b/.github/workflows/unit.yaml @@ -80,8 +80,11 @@ jobs: SN_DRIVER: ${{ secrets.SN_DRIVER }} shell: bash run: | - cargo test -p 'dozer-ingestion-*' --lib --no-fail-fast -- --ignored - + cargo test \ + -p dozer-ingestion-postgres \ + -p dozer-ingestion-kafka \ + -p dozer-ingestion-mysql \ + --lib --no-fail-fast -- --ignored - name: Run tests shell: bash run: | From 62ea8f02383957ba04161984551cc90e6dec6ae3 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Wed, 28 Feb 2024 01:18:46 +0800 Subject: [PATCH 3/6] chore: Remove clones and duplicated code in `dozer_sql::builder` (#2428) --- dozer-cli/src/pipeline/builder.rs | 21 +- dozer-core/src/app.rs | 41 ++-- dozer-core/src/tests/app.rs | 44 ++-- dozer-sql/src/aggregation/factory.rs | 23 +- .../tests/aggregation_test_planner.rs | 6 +- .../tests/aggregation_tests_utils.rs | 6 +- dozer-sql/src/builder/common.rs | 43 +++- dozer-sql/src/builder/from.rs | 66 +++--- dozer-sql/src/builder/join.rs | 160 ++++++-------- dozer-sql/src/builder/mod.rs | 127 +++++------ dozer-sql/src/builder/table_operator.rs | 206 +++++++----------- dozer-sql/src/planner/projection.rs | 17 +- .../src/planner/tests/projection_tests.rs | 6 +- dozer-sql/src/planner/tests/schema_tests.rs | 12 +- dozer-sql/src/product/table/factory.rs | 55 +---- dozer-sql/src/tests/builder_test.rs | 7 +- dozer-tests/src/sql_tests/helper/pipeline.rs | 7 +- 17 files changed, 377 insertions(+), 470 deletions(-) diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index 5bdf64beac..974c94e123 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -365,23 +365,24 @@ fn add_sink_to_pipeline( id: &str, table_infos: Vec<(&OutputTableInfo, PortHandle)>, ) { - let mut connections = vec![]; - let mut entry_points = vec![]; + pipeline.add_sink(sink, id.to_string()); for (table_info, port) in table_infos { match table_info { OutputTableInfo::Original(table_info) => { - entry_points.push(PipelineEntryPoint::new(table_info.table_name.clone(), port)) + pipeline.add_entry_point( + id.to_string(), + PipelineEntryPoint::new(table_info.table_name.clone(), port), + ); } OutputTableInfo::Transformed(table_info) => { - connections.push((table_info, port)); + pipeline.connect_nodes( + table_info.node.clone(), + table_info.port, + id.to_string(), + port, + ); } } } - - pipeline.add_sink(sink, id, entry_points); - - for (table_info, port) in connections { - pipeline.connect_nodes(&table_info.node, table_info.port, id, port); - } } diff --git a/dozer-core/src/app.rs b/dozer-core/src/app.rs index 907682c0b6..29597fca7a 100644 --- a/dozer-core/src/app.rs +++ b/dozer-core/src/app.rs @@ -33,44 +33,33 @@ pub struct AppPipeline { } impl AppPipeline { - pub fn add_processor( - &mut self, - proc: Box, - id: &str, - entry_points: Vec, - ) { - let handle = NodeHandle::new(None, id.to_string()); - self.processors.push((handle.clone(), proc)); + fn create_handle(id: String) -> NodeHandle { + NodeHandle::new(None, id) + } - for entry_point in entry_points { - self.entry_points.push((handle.clone(), entry_point)); - } + pub fn add_processor(&mut self, proc: Box, id: String) { + self.processors.push((Self::create_handle(id), proc)); } - pub fn add_sink( - &mut self, - sink: Box, - id: &str, - entry_points: Vec, - ) { - let handle = NodeHandle::new(None, id.to_string()); - self.sinks.push((handle.clone(), sink)); + pub fn add_sink(&mut self, sink: Box, id: String) { + self.sinks.push((Self::create_handle(id), sink)); + } - for entry_point in entry_points { - self.entry_points.push((handle.clone(), entry_point)); - } + pub fn add_entry_point(&mut self, id: String, entry_point: PipelineEntryPoint) { + self.entry_points + .push((Self::create_handle(id), entry_point)); } pub fn connect_nodes( &mut self, - from: &str, + from: String, from_port: PortHandle, - to: &str, + to: String, to_port: PortHandle, ) { let edge = Edge::new( - Endpoint::new(NodeHandle::new(None, from.to_string()), from_port), - Endpoint::new(NodeHandle::new(None, to.to_string()), to_port), + Endpoint::new(NodeHandle::new(None, from), from_port), + Endpoint::new(NodeHandle::new(None, to), to_port), ); self.edges.push(edge); } diff --git a/dozer-core/src/tests/app.rs b/dozer-core/src/tests/app.rs index 9c8f917c77..833a6a82a6 100644 --- a/dozer-core/src/tests/app.rs +++ b/dozer-core/src/tests/app.rs @@ -171,46 +171,46 @@ fn test_app_dag() { let mut app = App::new(asm); let mut p1 = AppPipeline::new_with_default_flags(); - p1.add_processor( - Box::new(NoopJoinProcessorFactory {}), - "join", - vec![ - PipelineEntryPoint::new("users_postgres".to_string(), NOOP_JOIN_LEFT_INPUT_PORT), - PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT), - ], + p1.add_processor(Box::new(NoopJoinProcessorFactory {}), "join".to_string()); + p1.add_entry_point( + "join".to_string(), + PipelineEntryPoint::new("users_postgres".to_string(), NOOP_JOIN_LEFT_INPUT_PORT), + ); + p1.add_entry_point( + "join".to_string(), + PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT), ); p1.add_sink( Box::new(CountingSinkFactory::new(20_000, latch.clone())), - "sink", - vec![], + "sink".to_string(), ); p1.connect_nodes( - "join", + "join".to_string(), DEFAULT_PORT_HANDLE, - "sink", + "sink".to_string(), COUNTING_SINK_INPUT_PORT, ); app.add_pipeline(p1); let mut p2 = AppPipeline::new_with_default_flags(); - p2.add_processor( - Box::new(NoopJoinProcessorFactory {}), - "join", - vec![ - PipelineEntryPoint::new("users_snowflake".to_string(), NOOP_JOIN_LEFT_INPUT_PORT), - PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT), - ], + p2.add_processor(Box::new(NoopJoinProcessorFactory {}), "join".to_string()); + p2.add_entry_point( + "join".to_string(), + PipelineEntryPoint::new("users_snowflake".to_string(), NOOP_JOIN_LEFT_INPUT_PORT), + ); + p2.add_entry_point( + "join".to_string(), + PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT), ); p2.add_sink( Box::new(CountingSinkFactory::new(20_000, latch)), - "sink", - vec![], + "sink".to_string(), ); p2.connect_nodes( - "join", + "join".to_string(), DEFAULT_PORT_HANDLE, - "sink", + "sink".to_string(), COUNTING_SINK_INPUT_PORT, ); diff --git a/dozer-sql/src/aggregation/factory.rs b/dozer-sql/src/aggregation/factory.rs index 37ea84d17a..b70abf655f 100644 --- a/dozer-sql/src/aggregation/factory.rs +++ b/dozer-sql/src/aggregation/factory.rs @@ -5,7 +5,7 @@ use dozer_core::{ node::{PortHandle, Processor, ProcessorFactory}, DEFAULT_PORT_HANDLE, }; -use dozer_sql_expression::sqlparser::ast::Select; +use dozer_sql_expression::sqlparser::ast::{Expr, SelectItem}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::udf_config::UdfConfig; use dozer_types::parking_lot::Mutex; @@ -18,8 +18,9 @@ use tokio::runtime::Runtime; #[derive(Debug)] pub struct AggregationProcessorFactory { id: String, - projection: Select, - _stateful: bool, + projection: Vec, + group_by: Vec, + having: Option, enable_probabilistic_optimizations: bool, udfs: Vec, runtime: Arc, @@ -31,8 +32,9 @@ pub struct AggregationProcessorFactory { impl AggregationProcessorFactory { pub fn new( id: String, - projection: Select, - stateful: bool, + projection: Vec, + group_by: Vec, + having: Option, enable_probabilistic_optimizations: bool, udfs: Vec, runtime: Arc, @@ -40,7 +42,8 @@ impl AggregationProcessorFactory { Self { id, projection, - _stateful: stateful, + group_by, + having, enable_probabilistic_optimizations, udfs, runtime, @@ -51,7 +54,13 @@ impl AggregationProcessorFactory { async fn get_planner(&self, input_schema: Schema) -> Result { let mut projection_planner = CommonPlanner::new(input_schema, self.udfs.as_slice(), self.runtime.clone()); - projection_planner.plan(self.projection.clone()).await?; + projection_planner + .plan( + self.projection.clone(), + self.group_by.clone(), + self.having.clone(), + ) + .await?; Ok(projection_planner) } } diff --git a/dozer-sql/src/aggregation/tests/aggregation_test_planner.rs b/dozer-sql/src/aggregation/tests/aggregation_test_planner.rs index beb343e302..17d80621c4 100644 --- a/dozer-sql/src/aggregation/tests/aggregation_test_planner.rs +++ b/dozer-sql/src/aggregation/tests/aggregation_test_planner.rs @@ -76,7 +76,11 @@ fn test_planner_with_aggregator() { let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); let mut processor = AggregationProcessor::new( diff --git a/dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs b/dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs index 0b1e34bace..aec3fd1fd4 100644 --- a/dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs +++ b/dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs @@ -28,7 +28,11 @@ pub(crate) fn init_processor( let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); let processor = AggregationProcessor::new( diff --git a/dozer-sql/src/builder/common.rs b/dozer-sql/src/builder/common.rs index fd7026615b..0d24256a15 100644 --- a/dozer-sql/src/builder/common.rs +++ b/dozer-sql/src/builder/common.rs @@ -1,4 +1,9 @@ -use dozer_sql_expression::{builder::ExpressionBuilder, sqlparser::ast::ObjectName}; +use dozer_sql_expression::{ + builder::{ExpressionBuilder, NameOrAlias}, + sqlparser::ast::{ObjectName, TableFactor}, +}; + +use crate::errors::{PipelineError, ProductError}; use super::QueryContext; @@ -29,6 +34,42 @@ pub fn is_a_pipeline_output( false } +pub fn get_name_or_alias(relation: &TableFactor) -> Result { + match relation { + TableFactor::Table { name, alias, .. } => { + let table_name = string_from_sql_object_name(name); + if let Some(table_alias) = alias { + let alias = table_alias.name.value.clone(); + return Ok(NameOrAlias(table_name, Some(alias))); + } + Ok(NameOrAlias(table_name, None)) + } + TableFactor::Derived { alias, .. } => { + if let Some(table_alias) = alias { + let alias = table_alias.name.value.clone(); + return Ok(NameOrAlias("dozer_derived".to_string(), Some(alias))); + } + Ok(NameOrAlias("dozer_derived".to_string(), None)) + } + TableFactor::TableFunction { .. } => Err(PipelineError::ProductError( + ProductError::UnsupportedTableFunction, + )), + TableFactor::UNNEST { .. } => { + Err(PipelineError::ProductError(ProductError::UnsupportedUnnest)) + } + TableFactor::NestedJoin { alias, .. } => { + if let Some(table_alias) = alias { + let alias = table_alias.name.value.clone(); + return Ok(NameOrAlias("dozer_nested".to_string(), Some(alias))); + } + Ok(NameOrAlias("dozer_nested".to_string(), None)) + } + TableFactor::Pivot { .. } => { + Err(PipelineError::ProductError(ProductError::UnsupportedPivot)) + } + } +} + pub fn string_from_sql_object_name(name: &ObjectName) -> String { name.0 .iter() diff --git a/dozer-sql/src/builder/from.rs b/dozer-sql/src/builder/from.rs index 6ac8c9a822..6fa98bea03 100644 --- a/dozer-sql/src/builder/from.rs +++ b/dozer-sql/src/builder/from.rs @@ -11,46 +11,46 @@ use crate::{ }; use super::{ - common::is_an_entry_point, + common::{get_name_or_alias, is_an_entry_point}, join::insert_join_to_pipeline, table_operator::{insert_table_operator_processor_to_pipeline, is_table_operator}, ConnectionInfo, }; pub fn insert_from_to_pipeline( - from: &TableWithJoins, + from: TableWithJoins, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { if from.joins.is_empty() { - insert_table_to_pipeline(&from.relation, pipeline, pipeline_idx, query_context) + insert_table_to_pipeline(from.relation, pipeline, pipeline_idx, query_context) } else { insert_join_to_pipeline(from, pipeline, pipeline_idx, query_context) } } fn insert_table_to_pipeline( - relation: &TableFactor, + relation: TableFactor, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { - if let Some(operator) = is_table_operator(relation)? { + if let Some(operator) = is_table_operator(&relation)? { let product_processor_name = - insert_from_processor_to_pipeline(query_context, relation, pipeline); + insert_from_processor_to_pipeline(query_context, relation, pipeline)?; let connection_info = insert_table_operator_processor_to_pipeline( - &operator, + operator, pipeline, pipeline_idx, query_context, )?; pipeline.connect_nodes( - &connection_info.output_node.0, + connection_info.output_node.0, connection_info.output_node.1, - &product_processor_name.clone(), + product_processor_name.clone(), DEFAULT_PORT_HANDLE, ); @@ -64,51 +64,41 @@ fn insert_table_to_pipeline( } fn insert_table_processor_to_pipeline( - relation: &TableFactor, + relation: TableFactor, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { - // let relation_name_or_alias = get_name_or_alias(relation)?; - let relation_name_or_alias = get_from_source(relation, pipeline, query_context, pipeline_idx)?; + let relation_name_or_alias = get_name_or_alias(&relation)?; + let product_input_name = get_from_source(relation, pipeline, query_context, pipeline_idx)?.0; let processor_name = format!( "from:{}--{}", - relation_name_or_alias.0, + product_input_name, query_context.get_next_processor_id() ); if !query_context.processors_list.insert(processor_name.clone()) { return Err(PipelineError::ProcessorAlreadyExists(processor_name)); } let product_processor_factory = - TableProcessorFactory::new(processor_name.clone(), relation.to_owned()); - - let product_input_name = relation_name_or_alias.0; - - let mut input_nodes = vec![]; - let mut product_entry_points = vec![]; + TableProcessorFactory::new(processor_name.clone(), relation_name_or_alias.clone()); + pipeline.add_processor(Box::new(product_processor_factory), processor_name.clone()); // is a node that is an entry point to the pipeline - if is_an_entry_point(&product_input_name, query_context, pipeline_idx) { + let input_nodes = if is_an_entry_point(&product_input_name, query_context, pipeline_idx) { let entry_point = PipelineEntryPoint::new(product_input_name.clone(), DEFAULT_PORT_HANDLE); - - product_entry_points.push(entry_point); + pipeline.add_entry_point(processor_name.clone(), entry_point); query_context.used_sources.push(product_input_name); + vec![] } // is a node that is connected to another pipeline else { - input_nodes.push(( + vec![( product_input_name, processor_name.clone(), DEFAULT_PORT_HANDLE, - )); - } - - pipeline.add_processor( - Box::new(product_processor_factory), - &processor_name, - product_entry_points, - ); + )] + }; Ok(ConnectionInfo { input_nodes, @@ -118,13 +108,15 @@ fn insert_table_processor_to_pipeline( fn insert_from_processor_to_pipeline( query_context: &mut QueryContext, - relation: &TableFactor, + relation: TableFactor, pipeline: &mut AppPipeline, -) -> String { +) -> Result { let product_processor_name = format!("from--{}", query_context.get_next_processor_id()); - let product_processor = - TableProcessorFactory::new(product_processor_name.clone(), relation.clone()); + let product_processor = TableProcessorFactory::new( + product_processor_name.clone(), + get_name_or_alias(&relation)?, + ); - pipeline.add_processor(Box::new(product_processor), &product_processor_name, vec![]); - product_processor_name + pipeline.add_processor(Box::new(product_processor), product_processor_name.clone()); + Ok(product_processor_name) } diff --git a/dozer-sql/src/builder/join.rs b/dozer-sql/src/builder/join.rs index 1c508d4004..653010d927 100644 --- a/dozer-sql/src/builder/join.rs +++ b/dozer-sql/src/builder/join.rs @@ -1,20 +1,18 @@ use dozer_core::{ app::{AppPipeline, PipelineEntryPoint}, + node::PortHandle, DEFAULT_PORT_HANDLE, }; -use dozer_sql_expression::sqlparser::ast::TableWithJoins; +use dozer_sql_expression::sqlparser::ast::{TableFactor, TableWithJoins}; use crate::{ builder::{get_from_source, QueryContext}, errors::PipelineError, - product::{ - join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT}, - table::factory::get_name_or_alias, - }, + product::join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT}, }; use super::{ - common::is_an_entry_point, + common::{get_name_or_alias, is_an_entry_point}, table_operator::{insert_table_operator_processor_to_pipeline, is_table_operator}, ConnectionInfo, }; @@ -27,21 +25,21 @@ enum JoinSource { } pub fn insert_join_to_pipeline( - from: &TableWithJoins, + from: TableWithJoins, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { let mut input_nodes = vec![]; - let left_table = &from.relation; - let mut left_name_or_alias = Some(get_name_or_alias(left_table)?); + let left_table = from.relation; + let mut left_name_or_alias = Some(get_name_or_alias(&left_table)?); let mut left_join_source = - insert_join_source_to_pipeline(left_table.clone(), pipeline, pipeline_idx, query_context)?; + insert_join_source_to_pipeline(left_table, pipeline, pipeline_idx, query_context)?; - for join in &from.joins { - let right_table = &join.relation; - let right_name_or_alias = Some(get_name_or_alias(right_table)?); + for join in from.joins { + let right_table = join.relation; + let right_name_or_alias = Some(get_name_or_alias(&right_table)?); let right_join_source = insert_join_source_to_pipeline( right_table.clone(), pipeline, @@ -58,84 +56,36 @@ pub fn insert_join_to_pipeline( } let join_processor_factory = JoinProcessorFactory::new( join_processor_name.clone(), - left_name_or_alias.clone(), + left_name_or_alias, right_name_or_alias, - join.join_operator.clone(), + join.join_operator, pipeline .flags() .enable_probabilistic_optimizations .in_joins .unwrap_or(false), ); - - let mut pipeline_entry_points = vec![]; - if let JoinSource::Table(ref source_table) = left_join_source { - if is_an_entry_point(source_table, query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_table.clone(), LEFT_JOIN_PORT); - - pipeline_entry_points.push(entry_point); - query_context.used_sources.push(source_table.to_string()); - } else { - input_nodes.push(( - source_table.to_string(), - join_processor_name.clone(), - LEFT_JOIN_PORT, - )); - } - } - - if let JoinSource::Table(ref source_table) = right_join_source.clone() { - if is_an_entry_point(source_table, query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_table.clone(), RIGHT_JOIN_PORT); - - pipeline_entry_points.push(entry_point); - query_context.used_sources.push(source_table.to_string()); - } else { - input_nodes.push(( - source_table.to_string(), - join_processor_name.clone(), - RIGHT_JOIN_PORT, - )); - } - } - pipeline.add_processor( Box::new(join_processor_factory), - &join_processor_name, - pipeline_entry_points, + join_processor_name.clone(), ); - match left_join_source { - JoinSource::Table(_) => {} - JoinSource::Operator(ref connection_info) => pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &join_processor_name, - LEFT_JOIN_PORT, - ), - JoinSource::Join(ref connection_info) => pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &join_processor_name, - LEFT_JOIN_PORT, - ), - } - - match right_join_source { - JoinSource::Table(_) => {} - JoinSource::Operator(connection_info) => pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &join_processor_name, - RIGHT_JOIN_PORT, - ), - JoinSource::Join(connection_info) => pipeline.connect_nodes( - &connection_info.output_node.0, - connection_info.output_node.1, - &join_processor_name, - RIGHT_JOIN_PORT, - ), - } + input_nodes.extend(modify_pipeline_graph( + left_join_source, + join_processor_name.clone(), + LEFT_JOIN_PORT, + pipeline, + pipeline_idx, + query_context, + )); + input_nodes.extend(modify_pipeline_graph( + right_join_source, + join_processor_name.clone(), + RIGHT_JOIN_PORT, + pipeline, + pipeline_idx, + query_context, + )); // TODO: refactor join source name and aliasing logic left_name_or_alias = None; @@ -146,10 +96,7 @@ pub fn insert_join_to_pipeline( } match left_join_source { - JoinSource::Table(_) => Err(PipelineError::InvalidJoin( - "No JOIN operator found".to_string(), - )), - JoinSource::Operator(_) => Err(PipelineError::InvalidJoin( + JoinSource::Table(_) | JoinSource::Operator(_) => Err(PipelineError::InvalidJoin( "No JOIN operator found".to_string(), )), JoinSource::Join(connection_info) => Ok(connection_info), @@ -158,14 +105,14 @@ pub fn insert_join_to_pipeline( // TODO: refactor this fn insert_join_source_to_pipeline( - source: dozer_sql_expression::sqlparser::ast::TableFactor, + source: TableFactor, pipeline: &mut AppPipeline, pipeline_idx: usize, query_context: &mut QueryContext, ) -> Result { let join_source = if let Some(table_operator) = is_table_operator(&source)? { let connection_info = insert_table_operator_processor_to_pipeline( - &table_operator, + table_operator, pipeline, pipeline_idx, query_context, @@ -176,15 +123,44 @@ fn insert_join_source_to_pipeline( "Nested JOINs are not supported".to_string(), )); } else { - let name_or_alias = get_from_source(&source, pipeline, query_context, pipeline_idx)?; + let name_or_alias = get_from_source(source, pipeline, query_context, pipeline_idx)?; JoinSource::Table(name_or_alias.0) }; Ok(join_source) } -fn is_nested_join(left_table: &dozer_sql_expression::sqlparser::ast::TableFactor) -> bool { - matches!( - left_table, - dozer_sql_expression::sqlparser::ast::TableFactor::NestedJoin { .. } - ) +fn is_nested_join(left_table: &TableFactor) -> bool { + matches!(left_table, TableFactor::NestedJoin { .. }) +} + +/// Returns the input node if there is one +fn modify_pipeline_graph( + source: JoinSource, + id: String, + port: PortHandle, + pipeline: &mut AppPipeline, + pipeline_idx: usize, + query_context: &mut QueryContext, +) -> Option<(String, String, u16)> { + match source { + JoinSource::Table(source_table) => { + if is_an_entry_point(&source_table, query_context, pipeline_idx) { + let entry_point = PipelineEntryPoint::new(source_table.clone(), port); + pipeline.add_entry_point(id, entry_point); + query_context.used_sources.push(source_table); + None + } else { + Some((source_table, id, port)) + } + } + JoinSource::Operator(connection_info) | JoinSource::Join(connection_info) => { + pipeline.connect_nodes( + connection_info.output_node.0, + connection_info.output_node.1, + id, + port, + ); + None + } + } } diff --git a/dozer-sql/src/builder/mod.rs b/dozer-sql/src/builder/mod.rs index e5f0a17cea..3ccf3ed356 100644 --- a/dozer-sql/src/builder/mod.rs +++ b/dozer-sql/src/builder/mod.rs @@ -90,18 +90,17 @@ pub fn statement_to_pipeline( .map_err(|err| PipelineError::InternalError(Box::new(err)))?; let query_name = NameOrAlias(format!("query_{}", ctx.get_next_processor_id()), None); - for (idx, statement) in ast.iter().enumerate() { + for (idx, statement) in ast.into_iter().enumerate() { match statement { Statement::Query(query) => { query_to_pipeline( - &TableInfo { + TableInfo { name: query_name.clone(), override_name: override_name.clone(), }, - query, + *query, pipeline, &mut ctx, - false, idx, is_top_select, )?; @@ -123,11 +122,10 @@ struct TableInfo { } fn query_to_pipeline( - table_info: &TableInfo, - query: &Query, + table_info: TableInfo, + query: Query, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, - stateful: bool, pipeline_idx: usize, is_top_select: bool, ) -> Result<(), PipelineError> { @@ -145,14 +143,14 @@ fn query_to_pipeline( } // Attach the first pipeline if there is with clause - if let Some(with) = &query.with { + if let Some(with) = query.with { if with.recursive { return Err(PipelineError::UnsupportedSqlError( UnsupportedSqlError::Recursive, )); } - for table in &with.cte_tables { + for table in with.cte_tables { if table.from.is_some() { return Err(PipelineError::UnsupportedSqlError( UnsupportedSqlError::CteFromError, @@ -168,28 +166,26 @@ fn query_to_pipeline( ))); } query_to_pipeline( - &TableInfo { + TableInfo { name: NameOrAlias(table_name.clone(), Some(table_name)), override_name: None, }, - &table.query, + *table.query, pipeline, query_ctx, - true, pipeline_idx, false, //Inside a with clause, so not top select )?; } }; - match *query.body.clone() { + match *query.body { SetExpr::Select(select) => { select_to_pipeline( table_info, *select, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?; @@ -198,14 +194,13 @@ fn query_to_pipeline( let query_name = format!("subquery_{}", query_ctx.get_next_processor_id()); let mut ctx = QueryContext::new(query_ctx.udfs.clone(), query_ctx.runtime.clone()); query_to_pipeline( - &TableInfo { + TableInfo { name: NameOrAlias(query_name, None), override_name: None, }, - &query, + *query, pipeline, &mut ctx, - stateful, pipeline_idx, false, //Inside a subquery, so not top select )? @@ -224,7 +219,6 @@ fn query_to_pipeline( set_quantifier, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?; @@ -241,23 +235,21 @@ fn query_to_pipeline( } fn select_to_pipeline( - table_info: &TableInfo, + table_info: TableInfo, select: Select, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, - stateful: bool, pipeline_idx: usize, is_top_select: bool, ) -> Result { // FROM clause - if select.from.len() != 1 { + let Some(from) = select.from.into_iter().next() else { return Err(PipelineError::UnsupportedSqlError( UnsupportedSqlError::FromCommaSyntax, )); - } + }; - let connection_info = - from::insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?; + let connection_info = from::insert_from_to_pipeline(from, pipeline, pipeline_idx, query_ctx)?; let input_nodes = connection_info.input_nodes; let output_node = connection_info.output_node; @@ -267,16 +259,16 @@ fn select_to_pipeline( let gen_selection_name = format!("select--{}", query_ctx.get_next_processor_id()); let (gen_product_name, product_output_port) = output_node; - for (source_name, processor_name, processor_port) in input_nodes.iter() { + for (source_name, processor_name, processor_port) in input_nodes { if let Some(table_info) = query_ctx .pipeline_map .get(&(pipeline_idx, source_name.clone())) { pipeline.connect_nodes( - &table_info.node, + table_info.node.clone(), table_info.port, processor_name, - *processor_port as PortHandle, + processor_port, ); // If not present in pipeline_map, insert into used_sources as this is coming from source } else { @@ -286,8 +278,9 @@ fn select_to_pipeline( let aggregation = AggregationProcessorFactory::new( gen_agg_name.clone(), - select.clone(), - stateful, + select.projection, + select.group_by, + select.having, pipeline .flags() .enable_probabilistic_optimizations @@ -297,37 +290,37 @@ fn select_to_pipeline( query_ctx.runtime.clone(), ); - pipeline.add_processor(Box::new(aggregation), &gen_agg_name, vec![]); + pipeline.add_processor(Box::new(aggregation), gen_agg_name.clone()); // Where clause if let Some(selection) = select.selection { let selection = SelectionProcessorFactory::new( - gen_selection_name.to_owned(), + gen_selection_name.clone(), selection, query_ctx.udfs.clone(), query_ctx.runtime.clone(), ); - pipeline.add_processor(Box::new(selection), &gen_selection_name, vec![]); + pipeline.add_processor(Box::new(selection), gen_selection_name.clone()); pipeline.connect_nodes( - &gen_product_name, + gen_product_name, product_output_port, - &gen_selection_name, + gen_selection_name.clone(), DEFAULT_PORT_HANDLE, ); pipeline.connect_nodes( - &gen_selection_name, + gen_selection_name, DEFAULT_PORT_HANDLE, - &gen_agg_name, + gen_agg_name.clone(), DEFAULT_PORT_HANDLE, ); } else { pipeline.connect_nodes( - &gen_product_name, + gen_product_name, product_output_port, - &gen_agg_name, + gen_agg_name.clone(), DEFAULT_PORT_HANDLE, ); } @@ -369,13 +362,12 @@ fn select_to_pipeline( #[allow(clippy::too_many_arguments)] fn set_to_pipeline( - table_info: &TableInfo, + table_info: TableInfo, left_select: Box, right_select: Box, set_quantifier: SetQuantifier, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, - stateful: bool, pipeline_idx: usize, is_top_select: bool, ) -> Result { @@ -392,11 +384,10 @@ fn set_to_pipeline( let _left_pipeline_name = match *left_select { SetExpr::Select(select) => select_to_pipeline( - &left_table_info, + left_table_info, *select, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?, @@ -406,13 +397,12 @@ fn set_to_pipeline( left, right, } => set_to_pipeline( - &left_table_info, + left_table_info, left, right, set_quantifier, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?, @@ -425,11 +415,10 @@ fn set_to_pipeline( let _right_pipeline_name = match *right_select { SetExpr::Select(select) => select_to_pipeline( - &right_table_info, + right_table_info, *select, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?, @@ -439,13 +428,12 @@ fn set_to_pipeline( left, right, } => set_to_pipeline( - &right_table_info, + right_table_info, left, right, set_quantifier, pipeline, query_ctx, - stateful, pipeline_idx, is_top_select, )?, @@ -458,29 +446,15 @@ fn set_to_pipeline( let mut gen_set_name = format!("set_{}", query_ctx.get_next_processor_id()); - let left_pipeline_output_node = match query_ctx + let left_pipeline_output_node = query_ctx .pipeline_map .get(&(pipeline_idx, gen_left_set_name)) - { - Some(pipeline) => pipeline, - None => { - return Err(PipelineError::InvalidQuery( - "Invalid UNION left Query".to_string(), - )) - } - }; + .ok_or_else(|| PipelineError::InvalidQuery("Invalid UNION left Query".to_string()))?; - let right_pipeline_output_node = match query_ctx + let right_pipeline_output_node = query_ctx .pipeline_map .get(&(pipeline_idx, gen_right_set_name)) - { - Some(pipeline) => pipeline, - None => { - return Err(PipelineError::InvalidQuery( - "Invalid UNION Right Query".to_string(), - )) - } - }; + .ok_or_else(|| PipelineError::InvalidQuery("Invalid UNION right Query".to_string()))?; if table_info.override_name.is_some() { gen_set_name = table_info.override_name.to_owned().unwrap(); @@ -496,20 +470,20 @@ fn set_to_pipeline( .unwrap_or(false), ); - pipeline.add_processor(Box::new(set_proc_fac), &gen_set_name, vec![]); + pipeline.add_processor(Box::new(set_proc_fac), gen_set_name.clone()); pipeline.connect_nodes( - &left_pipeline_output_node.node, + left_pipeline_output_node.node.clone(), left_pipeline_output_node.port, - &gen_set_name, - 0 as PortHandle, + gen_set_name.clone(), + 0, ); pipeline.connect_nodes( - &right_pipeline_output_node.node, + right_pipeline_output_node.node.clone(), right_pipeline_output_node.port, - &gen_set_name, - 1 as PortHandle, + gen_set_name.clone(), + 1, ); for (_, table_name) in query_ctx.pipeline_map.keys() { @@ -528,7 +502,7 @@ fn set_to_pipeline( } fn get_from_source( - relation: &TableFactor, + relation: TableFactor, pipeline: &mut AppPipeline, query_ctx: &mut QueryContext, pipeline_idx: usize, @@ -559,14 +533,13 @@ fn get_from_source( let is_top_select = false; //inside FROM clause, so not top select let name_or = NameOrAlias(name, alias_name); query_to_pipeline( - &TableInfo { + TableInfo { name: name_or.clone(), override_name: None, }, - subquery, + *subquery, pipeline, query_ctx, - false, pipeline_idx, is_top_select, )?; diff --git a/dozer-sql/src/builder/table_operator.rs b/dozer-sql/src/builder/table_operator.rs index 8c6b9d97a5..3f1001e6c6 100644 --- a/dozer-sql/src/builder/table_operator.rs +++ b/dozer-sql/src/builder/table_operator.rs @@ -1,5 +1,6 @@ use dozer_core::{ app::{AppPipeline, PipelineEntryPoint}, + node::ProcessorFactory, DEFAULT_PORT_HANDLE, }; use dozer_sql_expression::sqlparser::ast::{ @@ -37,7 +38,7 @@ pub fn is_table_operator( if args.is_none() { return Ok(None); } - let operator = get_table_operator_descriptor(name, args)?; + let operator = get_table_operator_descriptor(name, args.as_deref())?; Ok(operator) } @@ -51,7 +52,7 @@ pub fn is_table_operator( fn get_table_operator_descriptor( name: &ObjectName, - args: &Option>, + args: Option<&[FunctionArg]>, ) -> Result, PipelineError> { let mut operator_args = vec![]; @@ -75,10 +76,8 @@ fn get_table_operator_arg(arg: &FunctionArg) -> Result match arg_expr { FunctionArgExpr::Expr(Expr::Function(function)) => { - let operator_descriptor = get_table_operator_descriptor( - &function.clone().name, - &Some(function.clone().args), - )?; + let operator_descriptor = + get_table_operator_descriptor(&function.name, Some(&function.args))?; if let Some(descriptor) = operator_descriptor { Ok(TableOperatorArg::Descriptor(descriptor)) } else { @@ -93,141 +92,88 @@ fn get_table_operator_arg(arg: &FunctionArg) -> Result Result { - // the sources names that are used in this pipeline - - let mut input_nodes = vec![]; - - if operator.name.to_uppercase() == "TTL" { - let mut entry_points = vec![]; - - let processor_name = generate_name("TOP", operator, query_context); - if !query_context.processors_list.insert(processor_name.clone()) { - return Err(PipelineError::ProcessorAlreadyExists(processor_name)); - } - let processor = TableOperatorProcessorFactory::new( - processor_name.clone(), - operator.clone(), - query_context.udfs.to_owned(), - query_context.runtime.clone(), - ); - - if let Some(table) = operator.args.first() { - let source_name = match table { - TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?, - TableOperatorArg::Descriptor(descriptor) => { - let connection_info = insert_table_operator_processor_to_pipeline( - descriptor, - pipeline, - pipeline_idx, - query_context, - )?; - connection_info.output_node.0 - } - }; - - if is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); - - entry_points.push(entry_point); - query_context.used_sources.push(source_name.clone()); - } else if is_a_pipeline_output(&source_name, query_context, pipeline_idx) { - input_nodes.push(( - source_name.clone(), - processor_name.clone(), - DEFAULT_PORT_HANDLE, - )); - } - - pipeline.add_processor(Box::new(processor), &processor_name.clone(), entry_points); - - if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) - && !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) - { - pipeline.connect_nodes( - &source_name, - DEFAULT_PORT_HANDLE, - &processor_name.clone(), - DEFAULT_PORT_HANDLE, - ); - } - - Ok(ConnectionInfo { - input_nodes, - output_node: (processor_name, DEFAULT_PORT_HANDLE), - }) + let (processor_name, processor): (_, Box) = + if operator.name.to_uppercase() == "TTL" { + let processor_name = generate_name("TOP", &operator, query_context); + let processor = Box::new(TableOperatorProcessorFactory::new( + processor_name.clone(), + operator.clone(), + query_context.udfs.to_owned(), + query_context.runtime.clone(), + )); + (processor_name, processor) + } else if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" + { + let processor_name = generate_name("WIN", &operator, query_context); + let processor = Box::new(WindowProcessorFactory::new( + processor_name.clone(), + operator.clone(), + )); + (processor_name, processor) } else { - Err(PipelineError::UnsupportedTableOperator( + return Err(PipelineError::UnsupportedTableOperator( operator.name.clone(), - )) - } - } else if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" { - let mut entry_points = vec![]; + )); + }; - let processor_name = generate_name("WIN", operator, query_context); - if !query_context.processors_list.insert(processor_name.clone()) { - return Err(PipelineError::ProcessorAlreadyExists(processor_name)); - } - let processor = WindowProcessorFactory::new(processor_name.clone(), operator.clone()); - - if let Some(table) = operator.args.first() { - let source_name = match table { - TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?, - TableOperatorArg::Descriptor(descriptor) => { - let connection_info = insert_table_operator_processor_to_pipeline( - descriptor, - pipeline, - pipeline_idx, - query_context, - )?; - connection_info.output_node.0 - } - }; - - if is_an_entry_point(&source_name, query_context, pipeline_idx) { - let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); - - entry_points.push(entry_point); - query_context.used_sources.push(source_name.clone()); - } else if is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) { - input_nodes.push(( - source_name.clone(), - processor_name.to_owned(), - DEFAULT_PORT_HANDLE, - )); - } + if !query_context.processors_list.insert(processor_name.clone()) { + return Err(PipelineError::ProcessorAlreadyExists(processor_name)); + } - pipeline.add_processor(Box::new(processor), &processor_name, entry_points); - - if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) - && !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) - { - pipeline.connect_nodes( - &source_name, - DEFAULT_PORT_HANDLE, - &processor_name, - DEFAULT_PORT_HANDLE, - ); - } + pipeline.add_processor(processor, processor_name.clone()); - Ok(ConnectionInfo { - input_nodes, - output_node: (processor_name, DEFAULT_PORT_HANDLE), - }) - } else { - Err(PipelineError::UnsupportedTableOperator( - operator.name.clone(), - )) + let Some(table) = operator.args.into_iter().next() else { + return Err(PipelineError::UnsupportedTableOperator( + operator.name.clone(), + )); + }; + + let source_name = match table { + TableOperatorArg::Argument(argument) => get_source_name(&operator.name, &argument)?, + TableOperatorArg::Descriptor(descriptor) => { + let connection_info = insert_table_operator_processor_to_pipeline( + descriptor, + pipeline, + pipeline_idx, + query_context, + )?; + connection_info.output_node.0 } + }; + + let is_an_entry_point = is_an_entry_point(&source_name, query_context, pipeline_idx); + let is_a_pipeline_output = is_a_pipeline_output(&source_name, query_context, pipeline_idx); + + let input_nodes = if is_an_entry_point { + let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE); + pipeline.add_entry_point(processor_name.clone(), entry_point); + query_context.used_sources.push(source_name.clone()); + vec![] + } else if is_a_pipeline_output { + vec![( + source_name.clone(), + processor_name.clone(), + DEFAULT_PORT_HANDLE, + )] } else { - Err(PipelineError::UnsupportedTableOperator( - operator.name.clone(), - )) - } + pipeline.connect_nodes( + source_name, + DEFAULT_PORT_HANDLE, + processor_name.clone(), + DEFAULT_PORT_HANDLE, + ); + vec![] + }; + + Ok(ConnectionInfo { + input_nodes, + output_node: (processor_name, DEFAULT_PORT_HANDLE), + }) } fn generate_name( diff --git a/dozer-sql/src/planner/projection.rs b/dozer-sql/src/planner/projection.rs index c987370b4c..6330adfbf1 100644 --- a/dozer-sql/src/planner/projection.rs +++ b/dozer-sql/src/planner/projection.rs @@ -5,7 +5,7 @@ use crate::builder::string_from_sql_object_name; use crate::errors::PipelineError; use dozer_sql_expression::builder::ExpressionBuilder; use dozer_sql_expression::execution::Expression; -use dozer_sql_expression::sqlparser::ast::{Expr, Ident, Select, SelectItem}; +use dozer_sql_expression::sqlparser::ast::{Expr, Ident, SelectItem}; use dozer_types::models::udf_config::UdfConfig; use dozer_types::types::{FieldDefinition, Schema}; use tokio::runtime::Runtime; @@ -208,15 +208,20 @@ impl<'a> CommonPlanner<'_> { Ok(()) } - pub async fn plan(&mut self, select: Select) -> Result<(), PipelineError> { - for expr in select.clone().projection { + pub async fn plan( + &mut self, + projection: Vec, + group_by: Vec, + having: Option, + ) -> Result<(), PipelineError> { + for expr in projection { self.add_select_item(expr).await?; } - if !select.group_by.is_empty() { - self.add_groupby_items(select.group_by).await?; + if !group_by.is_empty() { + self.add_groupby_items(group_by).await?; } - if let Some(having) = select.having { + if let Some(having) = having { self.add_having_item(having).await?; } diff --git a/dozer-sql/src/planner/tests/projection_tests.rs b/dozer-sql/src/planner/tests/projection_tests.rs index 0ca127d87b..4241957fce 100644 --- a/dozer-sql/src/planner/tests/projection_tests.rs +++ b/dozer-sql/src/planner/tests/projection_tests.rs @@ -44,7 +44,11 @@ fn test_basic_projection() { let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); assert_eq!( diff --git a/dozer-sql/src/planner/tests/schema_tests.rs b/dozer-sql/src/planner/tests/schema_tests.rs index 360cbb4f8e..1458080fa5 100644 --- a/dozer-sql/src/planner/tests/schema_tests.rs +++ b/dozer-sql/src/planner/tests/schema_tests.rs @@ -49,7 +49,11 @@ fn test_schema_index_partial_group_by() { let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); assert!(projection_planner @@ -105,7 +109,11 @@ fn test_schema_index_full_group_by() { let statement = get_select(sql).unwrap(); runtime - .block_on(projection_planner.plan(*statement)) + .block_on(projection_planner.plan( + statement.projection, + statement.group_by, + statement.having, + )) .unwrap(); assert_eq!( diff --git a/dozer-sql/src/product/table/factory.rs b/dozer-sql/src/product/table/factory.rs index 9889f81945..f8da9641e8 100644 --- a/dozer-sql/src/product/table/factory.rs +++ b/dozer-sql/src/product/table/factory.rs @@ -4,28 +4,22 @@ use dozer_core::{ node::{PortHandle, Processor, ProcessorFactory}, DEFAULT_PORT_HANDLE, }; -use dozer_sql_expression::{ - builder::{extend_schema_source_def, NameOrAlias}, - sqlparser::ast::TableFactor, -}; +use dozer_sql_expression::builder::{extend_schema_source_def, NameOrAlias}; use dozer_types::{errors::internal::BoxedError, tonic::async_trait, types::Schema}; -use crate::{ - builder::string_from_sql_object_name, - errors::{PipelineError, ProductError}, -}; +use crate::errors::PipelineError; use super::processor::TableProcessor; #[derive(Debug)] pub struct TableProcessorFactory { id: String, - relation: TableFactor, + table: NameOrAlias, } impl TableProcessorFactory { - pub fn new(id: String, relation: TableFactor) -> Self { - Self { id, relation } + pub fn new(id: String, table: NameOrAlias) -> Self { + Self { id, table } } } @@ -53,8 +47,7 @@ impl ProcessorFactory for TableProcessorFactory { input_schemas: &HashMap, ) -> Result { if let Some(input_schema) = input_schemas.get(&DEFAULT_PORT_HANDLE) { - let table = get_name_or_alias(&self.relation)?; - let extended_input_schema = extend_schema_source_def(input_schema, &table); + let extended_input_schema = extend_schema_source_def(input_schema, &self.table); Ok(extended_input_schema) } else { Err(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE).into()) @@ -73,39 +66,3 @@ impl ProcessorFactory for TableProcessorFactory { ))) } } - -pub fn get_name_or_alias(relation: &TableFactor) -> Result { - match relation { - TableFactor::Table { name, alias, .. } => { - let table_name = string_from_sql_object_name(name); - if let Some(table_alias) = alias { - let alias = table_alias.name.value.clone(); - return Ok(NameOrAlias(table_name, Some(alias))); - } - Ok(NameOrAlias(table_name, None)) - } - TableFactor::Derived { alias, .. } => { - if let Some(table_alias) = alias { - let alias = table_alias.name.value.clone(); - return Ok(NameOrAlias("dozer_derived".to_string(), Some(alias))); - } - Ok(NameOrAlias("dozer_derived".to_string(), None)) - } - TableFactor::TableFunction { .. } => Err(PipelineError::ProductError( - ProductError::UnsupportedTableFunction, - )), - TableFactor::UNNEST { .. } => { - Err(PipelineError::ProductError(ProductError::UnsupportedUnnest)) - } - TableFactor::NestedJoin { alias, .. } => { - if let Some(table_alias) = alias { - let alias = table_alias.name.value.clone(); - return Ok(NameOrAlias("dozer_nested".to_string(), Some(alias))); - } - Ok(NameOrAlias("dozer_nested".to_string(), None)) - } - TableFactor::Pivot { .. } => { - Err(PipelineError::ProductError(ProductError::UnsupportedPivot)) - } - } -} diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index e306d8aaf6..9c3c588cff 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -253,13 +253,12 @@ fn test_pipeline_builder() { pipeline.add_sink( Box::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])), - "sink", - vec![], + "sink".to_string(), ); pipeline.connect_nodes( - &table_info.node, + table_info.node.clone(), table_info.port, - "sink", + "sink".to_string(), DEFAULT_PORT_HANDLE, ); diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 023d0f5a30..800c456782 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -358,14 +358,13 @@ impl TestPipeline { let output = Arc::new(Mutex::new(HashMap::new())); pipeline.add_sink( Box::new(TestSinkFactory::new(output.clone())), - "sink", - vec![], + "sink".to_string(), ); pipeline.connect_nodes( - &output_table.node, + output_table.node.clone(), output_table.port, - "sink", + "sink".to_string(), DEFAULT_PORT_HANDLE, ); let used_schemas = pipeline.get_entry_points_sources_names(); From e2baca33497b97e4f88dca35146fe3a5234c82c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karolis=20Gudi=C5=A1kis?= Date: Wed, 28 Feb 2024 03:03:16 +0200 Subject: [PATCH 4/6] Fix: Revert oracle resumability and fix snapshot data insert (#2425) * Fix snapshot inserts * remove log * Remove import * Fix field names * Fix data types mapping * Remove unused consts * Mark U128 and I128 as unimplemented --- dozer-sink-oracle/src/lib.rs | 67 ++++++++---------------------------- 1 file changed, 15 insertions(+), 52 deletions(-) diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index d665b7c73b..17a32ec083 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -20,8 +20,6 @@ use oracle::{ Connection, }; -const TXN_ID_COL: &str = "__txn_id"; -const TXN_SEQ_COL: &str = "__txn_seq"; const METADATA_TABLE: &str = "__replication_metadata"; const META_TXN_ID_COL: &str = "txn_id"; const META_TABLE_COL: &str = "table"; @@ -73,7 +71,6 @@ impl From for Error { #[derive(Debug)] struct BatchedOperation { - op_id: Option, op_kind: OpKind, params: Record, } @@ -201,6 +198,8 @@ impl OracleSinkFactory { FieldType::String | FieldType::Text, OracleType::Varchar2(_) | OracleType::NVarchar2(_), ) => {} + (FieldType::U128 | FieldType::I128, OracleType::Number(precision, 0)) + if precision >= 39 => {} (FieldType::UInt | FieldType::Int, OracleType::Number(precision, 0)) if precision >= 20 => {} (FieldType::Float, OracleType::Number(38, 0) | OracleType::BinaryDouble) => {} @@ -246,10 +245,10 @@ impl OracleSinkFactory { for field in &schema.fields { let name = &field.name; let col_type = match field.typ { - dozer_types::types::FieldType::UInt => "INTEGER", - dozer_types::types::FieldType::U128 => "INTEGER", - dozer_types::types::FieldType::Int => "INTEGER", - dozer_types::types::FieldType::I128 => "INTEGER", + dozer_types::types::FieldType::UInt => "NUMBER(20)", + dozer_types::types::FieldType::U128 => unimplemented!(), + dozer_types::types::FieldType::Int => "NUMBER(20)", + dozer_types::types::FieldType::I128 => unimplemented!(), // Should this be BINARY_DOUBLE? dozer_types::types::FieldType::Float => "NUMBER", dozer_types::types::FieldType::Boolean => "NUMBER", @@ -280,11 +279,7 @@ impl OracleSinkFactory { } fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { - let field_names = schema - .fields - .iter() - .map(|field| field.name.as_str()) - .chain([TXN_ID_COL, TXN_SEQ_COL]); + let field_names = schema.fields.iter().map(|field| &field.name); let mut parameter_index = 1usize..; let input_fields = field_names @@ -326,12 +321,6 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { let opkind_idx = parameter_index.next().unwrap(); - let opid_select = format!( - r#"(D."{TXN_ID_COL}" IS NULL - OR S."{TXN_ID_COL}" > D."{TXN_ID_COL}" - OR (S."{TXN_ID_COL}" = D."{TXN_ID_COL}" AND S."{TXN_SEQ_COL}" > D."{TXN_SEQ_COL}"))"# - ); - // Match on PK and txn_id. // If the record does not exist and the op is INSERT, do the INSERT // If the record exists, but the txid is higher than the operation's txid, @@ -339,11 +328,10 @@ fn generate_merge_statement(table_name: &str, schema: &Schema) -> String { format!( r#"MERGE INTO "{table_name}" D USING (SELECT {input_fields}, :{opkind_idx} DOZER_OPKIND FROM DUAL) S - ON ({pk_select}) - WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) WHERE S.DOZER_OPKIND = 0 - WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE S.DOZER_OPKIND = 1 AND {opid_select} - DELETE WHERE S.DOZER_OPKIND = 2 AND {opid_select} - "# + ON (S.DOZER_OPKIND > 0) + WHEN NOT MATCHED THEN INSERT ({destination_columns}) VALUES ({source_values}) + WHEN MATCHED THEN UPDATE SET {destination_assign} WHERE {pk_select} + DELETE WHERE S.DOZER_OPKIND = 2"# ) } @@ -381,27 +369,8 @@ impl SinkFactory for OracleSinkFactory { let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); let table_name = &self.table; - let mut amended_schema = schema.clone(); - amended_schema.field( - dozer_types::types::FieldDefinition { - name: TXN_ID_COL.to_owned(), - typ: FieldType::UInt, - nullable: true, - source: dozer_types::types::SourceDefinition::Dynamic, - }, - false, - ); - amended_schema.field( - dozer_types::types::FieldDefinition { - name: TXN_SEQ_COL.to_owned(), - typ: FieldType::UInt, - nullable: true, - source: dozer_types::types::SourceDefinition::Dynamic, - }, - false, - ); - self.validate_or_create_table(&connection, table_name, &amended_schema)?; + self.validate_or_create_table(&connection, table_name, &schema)?; self.validate_or_create_table( &connection, METADATA_TABLE, @@ -429,7 +398,7 @@ impl SinkFactory for OracleSinkFactory { let insert_append = format!( //"INSERT /*+ APPEND */ INTO \"{table_name}\" VALUES ({})", "INSERT INTO \"{table_name}\" VALUES ({})", - (1..=amended_schema.fields.len()) + (1..=schema.fields.len()) .map(|i| format!(":{i}")) .collect::>() .join(", ") @@ -528,9 +497,6 @@ impl OracleSink { { batch.set(i, &OraField(field, *typ))?; } - let (txid, seq_in_tx) = params.op_id.map(|opid| (opid.txid, opid.seq_in_tx)).unzip(); - batch.set(bind_idx.next().unwrap(), &txid)?; - batch.set(bind_idx.next().unwrap(), &seq_in_tx)?; batch.set(bind_idx.next().unwrap(), &(params.op_kind as u64))?; batch.append_row(&[])?; } @@ -540,12 +506,11 @@ impl OracleSink { fn batch( &mut self, - op_id: Option, + _op_id: Option, kind: OpKind, record: Record, ) -> oracle::Result<()> { self.batch_params.push(BatchedOperation { - op_id, op_kind: kind, params: record, }); @@ -620,9 +585,7 @@ impl Sink for OracleSink { { batch.set(i, &OraField(field, *typ))?; } - let (txid, seq_in_tx) = op.id.map(|id| (id.txid, id.seq_in_tx)).unzip(); - batch.set(bind_idx.next().unwrap(), &txid)?; - batch.set(bind_idx.next().unwrap(), &seq_in_tx)?; + batch.append_row(&[])?; } batch.execute()?; } From 16cb605d2784fea0c2c619236829f43a96407f8b Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Wed, 28 Feb 2024 15:33:54 +0800 Subject: [PATCH 5/6] chore: Update version number to 0.4.0 (#2432) --- Cargo.lock | 46 ++++++++++++------------- dozer-cli/Cargo.toml | 2 +- dozer-core/Cargo.toml | 2 +- dozer-deno/Cargo.toml | 2 +- dozer-ingestion/Cargo.toml | 2 +- dozer-ingestion/aerospike/Cargo.toml | 4 +-- dozer-ingestion/connector/Cargo.toml | 2 +- dozer-ingestion/deltalake/Cargo.toml | 6 ++-- dozer-ingestion/ethereum/Cargo.toml | 2 +- dozer-ingestion/grpc/Cargo.toml | 2 +- dozer-ingestion/javascript/Cargo.toml | 2 +- dozer-ingestion/kafka/Cargo.toml | 2 +- dozer-ingestion/mongodb/Cargo.toml | 2 +- dozer-ingestion/mysql/Cargo.toml | 2 +- dozer-ingestion/object-store/Cargo.toml | 2 +- dozer-ingestion/postgres/Cargo.toml | 2 +- dozer-ingestion/snowflake/Cargo.toml | 2 +- dozer-log/Cargo.toml | 2 +- dozer-sink-oracle/Cargo.toml | 6 ++-- dozer-sql/Cargo.toml | 2 +- dozer-sql/expression/Cargo.toml | 2 +- dozer-tests/Cargo.toml | 2 +- dozer-tracing/Cargo.toml | 2 +- dozer-types/Cargo.toml | 2 +- dozer-utils/Cargo.toml | 2 +- 25 files changed, 51 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee6a4e3889..81e0674378 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3544,7 +3544,7 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" [[package]] name = "dozer-cli" -version = "0.3.0" +version = "0.4.0" dependencies = [ "actix-files", "actix-web", @@ -3589,7 +3589,7 @@ dependencies = [ [[package]] name = "dozer-core" -version = "0.3.0" +version = "0.4.0" dependencies = [ "async-stream", "bincode", @@ -3608,7 +3608,7 @@ dependencies = [ [[package]] name = "dozer-deno" -version = "0.3.0" +version = "0.4.0" dependencies = [ "deno_ast", "deno_cache_dir", @@ -3622,7 +3622,7 @@ dependencies = [ [[package]] name = "dozer-ingestion" -version = "0.3.0" +version = "0.4.0" dependencies = [ "bytes", "chrono", @@ -3657,7 +3657,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-aerospike" -version = "0.3.0" +version = "0.4.0" dependencies = [ "actix-web", "base64 0.21.7", @@ -3666,7 +3666,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-connector" -version = "0.3.0" +version = "0.4.0" dependencies = [ "dozer-types", "futures", @@ -3675,7 +3675,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-deltalake" -version = "0.3.0" +version = "0.4.0" dependencies = [ "deltalake", "dozer-ingestion-connector", @@ -3684,7 +3684,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-ethereum" -version = "0.3.0" +version = "0.4.0" dependencies = [ "dozer-ingestion-connector", "dozer-tracing", @@ -3694,7 +3694,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-grpc" -version = "0.3.0" +version = "0.4.0" dependencies = [ "dozer-ingestion-connector", "tonic-reflection", @@ -3704,7 +3704,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-javascript" -version = "0.3.0" +version = "0.4.0" dependencies = [ "camino", "dozer-deno", @@ -3713,7 +3713,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-kafka" -version = "0.3.0" +version = "0.4.0" dependencies = [ "base64 0.21.7", "dozer-ingestion-connector", @@ -3723,7 +3723,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-mongodb" -version = "0.3.0" +version = "0.4.0" dependencies = [ "bson", "dozer-ingestion-connector", @@ -3732,7 +3732,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-mysql" -version = "0.3.0" +version = "0.4.0" dependencies = [ "dozer-ingestion-connector", "geozero", @@ -3746,7 +3746,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-object-store" -version = "0.3.0" +version = "0.4.0" dependencies = [ "datafusion", "dozer-ingestion-connector", @@ -3766,7 +3766,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-postgres" -version = "0.3.0" +version = "0.4.0" dependencies = [ "dozer-ingestion-connector", "postgres-protocol", @@ -3784,7 +3784,7 @@ dependencies = [ [[package]] name = "dozer-ingestion-snowflake" -version = "0.3.0" +version = "0.4.0" dependencies = [ "base64 0.21.7", "dozer-ingestion-connector", @@ -3808,7 +3808,7 @@ dependencies = [ [[package]] name = "dozer-log" -version = "0.3.0" +version = "0.4.0" dependencies = [ "async-stream", "aws-config", @@ -3864,7 +3864,7 @@ dependencies = [ [[package]] name = "dozer-sql" -version = "0.3.0" +version = "0.4.0" dependencies = [ "ahash 0.8.6", "bincode", @@ -3884,7 +3884,7 @@ dependencies = [ [[package]] name = "dozer-sql-expression" -version = "0.3.0" +version = "0.4.0" dependencies = [ "async-recursion", "bigdecimal", @@ -3905,7 +3905,7 @@ dependencies = [ [[package]] name = "dozer-tests" -version = "0.3.0" +version = "0.4.0" dependencies = [ "ahash 0.8.6", "async-trait", @@ -3933,7 +3933,7 @@ dependencies = [ [[package]] name = "dozer-tracing" -version = "0.3.0" +version = "0.4.0" dependencies = [ "atty", "console-subscriber", @@ -3950,7 +3950,7 @@ dependencies = [ [[package]] name = "dozer-types" -version = "0.3.0" +version = "0.4.0" dependencies = [ "ahash 0.8.6", "arbitrary", @@ -3988,7 +3988,7 @@ dependencies = [ [[package]] name = "dozer-utils" -version = "0.3.0" +version = "0.4.0" dependencies = [ "dozer-types", ] diff --git a/dozer-cli/Cargo.toml b/dozer-cli/Cargo.toml index 3a0935b90d..b312594414 100644 --- a/dozer-cli/Cargo.toml +++ b/dozer-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-cli" -version = "0.3.0" +version = "0.4.0" edition = "2021" default-run = "dozer" authors = ["getdozer/dozer-dev"] diff --git a/dozer-core/Cargo.toml b/dozer-core/Cargo.toml index 9b7ddebfd4..f5f811ae89 100644 --- a/dozer-core/Cargo.toml +++ b/dozer-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-core" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["getdozer/dozer-dev"] diff --git a/dozer-deno/Cargo.toml b/dozer-deno/Cargo.toml index cf443d987e..95d58db870 100644 --- a/dozer-deno/Cargo.toml +++ b/dozer-deno/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-deno" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index 1eb381fbb7..b068d5107b 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["getdozer/dozer-dev"] diff --git a/dozer-ingestion/aerospike/Cargo.toml b/dozer-ingestion/aerospike/Cargo.toml index 92cc1c3228..cab030bd3f 100644 --- a/dozer-ingestion/aerospike/Cargo.toml +++ b/dozer-ingestion/aerospike/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-aerospike" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -8,4 +8,4 @@ edition = "2021" [dependencies] dozer-ingestion-connector = { path = "../connector" } actix-web = "4.5.1" -base64 = "0.21.7" \ No newline at end of file +base64 = "0.21.7" diff --git a/dozer-ingestion/connector/Cargo.toml b/dozer-ingestion/connector/Cargo.toml index 2a89adb804..53d05fa6cc 100644 --- a/dozer-ingestion/connector/Cargo.toml +++ b/dozer-ingestion/connector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-connector" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/deltalake/Cargo.toml b/dozer-ingestion/deltalake/Cargo.toml index 71394f1a74..1bd59dc3dd 100644 --- a/dozer-ingestion/deltalake/Cargo.toml +++ b/dozer-ingestion/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-deltalake" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -14,6 +14,4 @@ git = "https://github.com/delta-io/delta-rs" rev = "72505449e9538371fe5fda35d545dbd662facd07" version = "0.17" default-features = false -features = [ - "datafusion", -] +features = ["datafusion"] diff --git a/dozer-ingestion/ethereum/Cargo.toml b/dozer-ingestion/ethereum/Cargo.toml index 705b7d9dc9..d3d2d279a7 100644 --- a/dozer-ingestion/ethereum/Cargo.toml +++ b/dozer-ingestion/ethereum/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-ethereum" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/grpc/Cargo.toml b/dozer-ingestion/grpc/Cargo.toml index ba74f6eaa9..ed6bbd2b4f 100644 --- a/dozer-ingestion/grpc/Cargo.toml +++ b/dozer-ingestion/grpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-grpc" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/javascript/Cargo.toml b/dozer-ingestion/javascript/Cargo.toml index 1e31616078..e8dfaf038e 100644 --- a/dozer-ingestion/javascript/Cargo.toml +++ b/dozer-ingestion/javascript/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-javascript" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/kafka/Cargo.toml b/dozer-ingestion/kafka/Cargo.toml index 0e43c7f695..ea266c9a48 100644 --- a/dozer-ingestion/kafka/Cargo.toml +++ b/dozer-ingestion/kafka/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-kafka" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/mongodb/Cargo.toml b/dozer-ingestion/mongodb/Cargo.toml index 95c3add16a..789fabddb3 100644 --- a/dozer-ingestion/mongodb/Cargo.toml +++ b/dozer-ingestion/mongodb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-mongodb" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/mysql/Cargo.toml b/dozer-ingestion/mysql/Cargo.toml index a4ab786128..de1a727a9a 100644 --- a/dozer-ingestion/mysql/Cargo.toml +++ b/dozer-ingestion/mysql/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-mysql" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/object-store/Cargo.toml b/dozer-ingestion/object-store/Cargo.toml index 60487806b6..3604b93494 100644 --- a/dozer-ingestion/object-store/Cargo.toml +++ b/dozer-ingestion/object-store/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-object-store" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/postgres/Cargo.toml b/dozer-ingestion/postgres/Cargo.toml index 632b4724bb..25b3fc84cd 100644 --- a/dozer-ingestion/postgres/Cargo.toml +++ b/dozer-ingestion/postgres/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-postgres" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-ingestion/snowflake/Cargo.toml b/dozer-ingestion/snowflake/Cargo.toml index 15317f61a1..776e80b5ac 100644 --- a/dozer-ingestion/snowflake/Cargo.toml +++ b/dozer-ingestion/snowflake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-ingestion-snowflake" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-log/Cargo.toml b/dozer-log/Cargo.toml index fae5096eb7..e634766706 100644 --- a/dozer-log/Cargo.toml +++ b/dozer-log/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-log" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/dozer-sink-oracle/Cargo.toml b/dozer-sink-oracle/Cargo.toml index d3a4e26e33..aeb9a7a87c 100644 --- a/dozer-sink-oracle/Cargo.toml +++ b/dozer-sink-oracle/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dozer-core = { version = "0.3.0", path = "../dozer-core" } -dozer-log = { version = "0.3.0", path = "../dozer-log" } -dozer-types = { version = "0.3.0", path = "../dozer-types" } +dozer-core = { path = "../dozer-core" } +dozer-log = { path = "../dozer-log" } +dozer-types = { path = "../dozer-types" } oracle = { version = "0.5.7", features = ["chrono"] } diff --git a/dozer-sql/Cargo.toml b/dozer-sql/Cargo.toml index 6c7d74127c..6f129da09a 100644 --- a/dozer-sql/Cargo.toml +++ b/dozer-sql/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-sql" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["getdozer/dozer-dev"] diff --git a/dozer-sql/expression/Cargo.toml b/dozer-sql/expression/Cargo.toml index 0b0f23d202..29e638a6fa 100644 --- a/dozer-sql/expression/Cargo.toml +++ b/dozer-sql/expression/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-sql-expression" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["getdozer/dozer-dev"] diff --git a/dozer-tests/Cargo.toml b/dozer-tests/Cargo.toml index c1a214c071..efe66d3b12 100644 --- a/dozer-tests/Cargo.toml +++ b/dozer-tests/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-tests" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["getdozer/dozer-dev"] diff --git a/dozer-tracing/Cargo.toml b/dozer-tracing/Cargo.toml index 87204f1fe5..05b9a2d65a 100644 --- a/dozer-tracing/Cargo.toml +++ b/dozer-tracing/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-tracing" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["getdozer/dozer-dev"] diff --git a/dozer-types/Cargo.toml b/dozer-types/Cargo.toml index d8f3090bf1..942037637c 100644 --- a/dozer-types/Cargo.toml +++ b/dozer-types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-types" -version = "0.3.0" +version = "0.4.0" authors = ["getdozer/dozer-dev"] edition = "2021" diff --git a/dozer-utils/Cargo.toml b/dozer-utils/Cargo.toml index 9c498344a5..4859216f6b 100644 --- a/dozer-utils/Cargo.toml +++ b/dozer-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dozer-utils" -version = "0.3.0" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From 1b88cb8d35c22f5c47bfd9bfc72f7f2899c4e1bf Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Wed, 28 Feb 2024 16:04:21 +0800 Subject: [PATCH 6/6] chore: Update integration test (#2433) --- .github/workflows/integration/test-dozer.sh | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.github/workflows/integration/test-dozer.sh b/.github/workflows/integration/test-dozer.sh index b00da6f525..541c2092a1 100644 --- a/.github/workflows/integration/test-dozer.sh +++ b/.github/workflows/integration/test-dozer.sh @@ -2,9 +2,3 @@ set -e # Check if dozer version matches `DOZER_VERSION` dozer -V | grep "$DOZER_VERSION" - -# Install Rust -curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y - -# Run grpc ingest test because it doesn't need docker or ETH secrets. -CARGO_TARGET_DIR=../ DOZER_BIN=dozer RUST_LOG=info "$HOME/.cargo/bin/cargo" run -p dozer-tests --bin dozer-tests -- grpc_ingest