Skip to content

Commit

Permalink
chore: Remove clones and duplicated code in dozer_sql::builder
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Feb 27, 2024
1 parent 53262c9 commit 2098fd1
Show file tree
Hide file tree
Showing 17 changed files with 375 additions and 468 deletions.
21 changes: 11 additions & 10 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,23 +365,24 @@ fn add_sink_to_pipeline(
id: &str,
table_infos: Vec<(&OutputTableInfo, PortHandle)>,
) {
let mut connections = vec![];
let mut entry_points = vec![];
pipeline.add_sink(sink, id.to_string());

for (table_info, port) in table_infos {
match table_info {
OutputTableInfo::Original(table_info) => {
entry_points.push(PipelineEntryPoint::new(table_info.table_name.clone(), port))
pipeline.add_entry_point(
id.to_string(),
PipelineEntryPoint::new(table_info.table_name.clone(), port),
);
}
OutputTableInfo::Transformed(table_info) => {
connections.push((table_info, port));
pipeline.connect_nodes(
table_info.node.clone(),
table_info.port,
id.to_string(),
port,
);
}
}
}

pipeline.add_sink(sink, id, entry_points);

for (table_info, port) in connections {
pipeline.connect_nodes(&table_info.node, table_info.port, id, port);
}
}
41 changes: 15 additions & 26 deletions dozer-core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,33 @@ pub struct AppPipeline {
}

impl AppPipeline {
pub fn add_processor(
&mut self,
proc: Box<dyn ProcessorFactory>,
id: &str,
entry_points: Vec<PipelineEntryPoint>,
) {
let handle = NodeHandle::new(None, id.to_string());
self.processors.push((handle.clone(), proc));
fn create_handle(id: String) -> NodeHandle {
NodeHandle::new(None, id)
}

for entry_point in entry_points {
self.entry_points.push((handle.clone(), entry_point));
}
pub fn add_processor(&mut self, proc: Box<dyn ProcessorFactory>, id: String) {
self.processors.push((Self::create_handle(id), proc));
}

pub fn add_sink(
&mut self,
sink: Box<dyn SinkFactory>,
id: &str,
entry_points: Vec<PipelineEntryPoint>,
) {
let handle = NodeHandle::new(None, id.to_string());
self.sinks.push((handle.clone(), sink));
pub fn add_sink(&mut self, sink: Box<dyn SinkFactory>, id: String) {
self.sinks.push((Self::create_handle(id), sink));
}

for entry_point in entry_points {
self.entry_points.push((handle.clone(), entry_point));
}
pub fn add_entry_point(&mut self, id: String, entry_point: PipelineEntryPoint) {
self.entry_points
.push((Self::create_handle(id), entry_point));
}

pub fn connect_nodes(
&mut self,
from: &str,
from: String,
from_port: PortHandle,
to: &str,
to: String,
to_port: PortHandle,
) {
let edge = Edge::new(
Endpoint::new(NodeHandle::new(None, from.to_string()), from_port),
Endpoint::new(NodeHandle::new(None, to.to_string()), to_port),
Endpoint::new(NodeHandle::new(None, from), from_port),
Endpoint::new(NodeHandle::new(None, to), to_port),
);
self.edges.push(edge);
}
Expand Down
44 changes: 22 additions & 22 deletions dozer-core/src/tests/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,46 +171,46 @@ fn test_app_dag() {
let mut app = App::new(asm);

let mut p1 = AppPipeline::new_with_default_flags();
p1.add_processor(
Box::new(NoopJoinProcessorFactory {}),
"join",
vec![
PipelineEntryPoint::new("users_postgres".to_string(), NOOP_JOIN_LEFT_INPUT_PORT),
PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT),
],
p1.add_processor(Box::new(NoopJoinProcessorFactory {}), "join".to_string());
p1.add_entry_point(
"join".to_string(),
PipelineEntryPoint::new("users_postgres".to_string(), NOOP_JOIN_LEFT_INPUT_PORT),
);
p1.add_entry_point(
"join".to_string(),
PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT),
);
p1.add_sink(
Box::new(CountingSinkFactory::new(20_000, latch.clone())),
"sink",
vec![],
"sink".to_string(),
);
p1.connect_nodes(
"join",
"join".to_string(),
DEFAULT_PORT_HANDLE,
"sink",
"sink".to_string(),
COUNTING_SINK_INPUT_PORT,
);

app.add_pipeline(p1);

let mut p2 = AppPipeline::new_with_default_flags();
p2.add_processor(
Box::new(NoopJoinProcessorFactory {}),
"join",
vec![
PipelineEntryPoint::new("users_snowflake".to_string(), NOOP_JOIN_LEFT_INPUT_PORT),
PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT),
],
p2.add_processor(Box::new(NoopJoinProcessorFactory {}), "join".to_string());
p2.add_entry_point(
"join".to_string(),
PipelineEntryPoint::new("users_snowflake".to_string(), NOOP_JOIN_LEFT_INPUT_PORT),
);
p2.add_entry_point(
"join".to_string(),
PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT),
);
p2.add_sink(
Box::new(CountingSinkFactory::new(20_000, latch)),
"sink",
vec![],
"sink".to_string(),
);
p2.connect_nodes(
"join",
"join".to_string(),
DEFAULT_PORT_HANDLE,
"sink",
"sink".to_string(),
COUNTING_SINK_INPUT_PORT,
);

Expand Down
23 changes: 16 additions & 7 deletions dozer-sql/src/aggregation/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dozer_core::{
node::{PortHandle, Processor, ProcessorFactory},
DEFAULT_PORT_HANDLE,
};
use dozer_sql_expression::sqlparser::ast::Select;
use dozer_sql_expression::sqlparser::ast::{Expr, SelectItem};
use dozer_types::errors::internal::BoxedError;
use dozer_types::models::udf_config::UdfConfig;
use dozer_types::parking_lot::Mutex;
Expand All @@ -18,8 +18,9 @@ use tokio::runtime::Runtime;
#[derive(Debug)]
pub struct AggregationProcessorFactory {
id: String,
projection: Select,
_stateful: bool,
projection: Vec<SelectItem>,
group_by: Vec<Expr>,
having: Option<Expr>,
enable_probabilistic_optimizations: bool,
udfs: Vec<UdfConfig>,
runtime: Arc<Runtime>,
Expand All @@ -31,16 +32,18 @@ pub struct AggregationProcessorFactory {
impl AggregationProcessorFactory {
pub fn new(
id: String,
projection: Select,
stateful: bool,
projection: Vec<SelectItem>,
group_by: Vec<Expr>,
having: Option<Expr>,
enable_probabilistic_optimizations: bool,
udfs: Vec<UdfConfig>,
runtime: Arc<Runtime>,
) -> Self {
Self {
id,
projection,
_stateful: stateful,
group_by,
having,
enable_probabilistic_optimizations,
udfs,
runtime,
Expand All @@ -51,7 +54,13 @@ impl AggregationProcessorFactory {
async fn get_planner(&self, input_schema: Schema) -> Result<CommonPlanner, PipelineError> {
let mut projection_planner =
CommonPlanner::new(input_schema, self.udfs.as_slice(), self.runtime.clone());
projection_planner.plan(self.projection.clone()).await?;
projection_planner
.plan(
self.projection.clone(),
self.group_by.clone(),
self.having.clone(),
)
.await?;
Ok(projection_planner)
}
}
Expand Down
6 changes: 5 additions & 1 deletion dozer-sql/src/aggregation/tests/aggregation_test_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ fn test_planner_with_aggregator() {
let statement = get_select(sql).unwrap();

runtime
.block_on(projection_planner.plan(*statement))
.block_on(projection_planner.plan(
statement.projection,
statement.group_by,
statement.having,
))
.unwrap();

let mut processor = AggregationProcessor::new(
Expand Down
6 changes: 5 additions & 1 deletion dozer-sql/src/aggregation/tests/aggregation_tests_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ pub(crate) fn init_processor(
let statement = get_select(sql).unwrap();

runtime
.block_on(projection_planner.plan(*statement))
.block_on(projection_planner.plan(
statement.projection,
statement.group_by,
statement.having,
))
.unwrap();

let processor = AggregationProcessor::new(
Expand Down
43 changes: 42 additions & 1 deletion dozer-sql/src/builder/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use dozer_sql_expression::{builder::ExpressionBuilder, sqlparser::ast::ObjectName};
use dozer_sql_expression::{
builder::{ExpressionBuilder, NameOrAlias},
sqlparser::ast::{ObjectName, TableFactor},
};

use crate::errors::{PipelineError, ProductError};

use super::QueryContext;

Expand Down Expand Up @@ -29,6 +34,42 @@ pub fn is_a_pipeline_output(
false
}

pub fn get_name_or_alias(relation: &TableFactor) -> Result<NameOrAlias, PipelineError> {
match relation {
TableFactor::Table { name, alias, .. } => {
let table_name = string_from_sql_object_name(name);
if let Some(table_alias) = alias {
let alias = table_alias.name.value.clone();
return Ok(NameOrAlias(table_name, Some(alias)));
}
Ok(NameOrAlias(table_name, None))
}
TableFactor::Derived { alias, .. } => {
if let Some(table_alias) = alias {
let alias = table_alias.name.value.clone();
return Ok(NameOrAlias("dozer_derived".to_string(), Some(alias)));
}
Ok(NameOrAlias("dozer_derived".to_string(), None))
}
TableFactor::TableFunction { .. } => Err(PipelineError::ProductError(
ProductError::UnsupportedTableFunction,
)),
TableFactor::UNNEST { .. } => {
Err(PipelineError::ProductError(ProductError::UnsupportedUnnest))
}
TableFactor::NestedJoin { alias, .. } => {
if let Some(table_alias) = alias {
let alias = table_alias.name.value.clone();
return Ok(NameOrAlias("dozer_nested".to_string(), Some(alias)));
}
Ok(NameOrAlias("dozer_nested".to_string(), None))
}
TableFactor::Pivot { .. } => {
Err(PipelineError::ProductError(ProductError::UnsupportedPivot))
}
}
}

pub fn string_from_sql_object_name(name: &ObjectName) -> String {
name.0
.iter()
Expand Down
Loading

0 comments on commit 2098fd1

Please sign in to comment.