Skip to content

Commit

Permalink
Build from
Browse files Browse the repository at this point in the history
  • Loading branch information
Dario Pizzamiglio committed Sep 28, 2023
1 parent e965fb5 commit 2017880
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 182 deletions.
4 changes: 4 additions & 0 deletions dozer-sql/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use dozer_sql_expression::sqlparser::{
parser::Parser,
};
use std::collections::HashMap;
use std::collections::HashSet;

use super::errors::UnsupportedSqlError;
use super::pipeline_builder::from_builder::insert_from_to_pipeline;
Expand Down Expand Up @@ -52,6 +53,9 @@ pub struct QueryContext {
// Used Sources
pub used_sources: Vec<String>,

// Internal tables map, used to store the tables that are created by the queries
pub processors_list: HashSet<String>,

// Processors counter
pub processor_counter: usize,

Expand Down
3 changes: 3 additions & 0 deletions dozer-sql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ pub enum PipelineError {

#[error("Invalid port handle: {0}")]
InvalidPortHandle(PortHandle),

#[error("Duplicated Processor name: {0}")]
ProcessorAlreadyExists(String),
}

#[derive(Error, Debug)]
Expand Down
252 changes: 151 additions & 101 deletions dozer-sql/src/pipeline_builder/from_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use dozer_core::{
app::{AppPipeline, PipelineEntryPoint},
node::PortHandle,
Expand All @@ -11,17 +9,18 @@ use dozer_sql_expression::{
};

use crate::{
builder::{get_from_source, OutputNodeInfo, QueryContext},
builder::{get_from_source, QueryContext},
errors::PipelineError,
product::table::factory::TableProcessorFactory,
table_operator::factory::TableOperatorProcessorFactory,
table_operator::factory::{get_source_name, TableOperatorProcessorFactory},
window::factory::WindowProcessorFactory,
};

use super::join_builder::insert_join_to_pipeline;

#[derive(Clone, Debug)]
pub struct ConnectionInfo {
pub processor_name: String,
pub input_nodes: Vec<(String, String, PortHandle)>,
pub output_node: (String, PortHandle),
}
Expand Down Expand Up @@ -58,9 +57,12 @@ fn insert_table_to_pipeline(
query_context: &mut QueryContext,
) -> Result<ConnectionInfo, PipelineError> {
if let Some(operator) = is_table_operator(relation)? {
let product_processor_name =
insert_from_processor_to_pipeline(query_context, relation, pipeline);

insert_table_operator_processor_to_pipeline(
relation,
&operator,
product_processor_name,
pipeline,
pipeline_idx,
query_context,
Expand All @@ -79,25 +81,24 @@ fn insert_table_processor_to_pipeline(
// let relation_name_or_alias = get_name_or_alias(relation)?;
let relation_name_or_alias = get_from_source(relation, pipeline, query_context, pipeline_idx)?;

let product_processor_name = format!(
let processor_name = format!(
"from:{}--{}",
relation_name_or_alias.0,
query_context.get_next_processor_id()
);
if !query_context.processors_list.insert(processor_name.clone()) {
return Err(PipelineError::ProcessorAlreadyExists(processor_name));
}
let product_processor_factory =
TableProcessorFactory::new(product_processor_name.clone(), relation.to_owned());
TableProcessorFactory::new(processor_name.clone(), relation.to_owned());

let product_input_name = relation_name_or_alias.0;

let mut input_nodes = vec![];
let mut product_entry_points = vec![];

// is a node that is an entry point to the pipeline
if is_an_entry_point(
&product_input_name,
&mut query_context.pipeline_map,
pipeline_idx,
) {
if is_an_entry_point(&product_input_name, query_context, pipeline_idx) {
let entry_point = PipelineEntryPoint::new(product_input_name.clone(), DEFAULT_PORT_HANDLE);

product_entry_points.push(entry_point);
Expand All @@ -107,136 +108,183 @@ fn insert_table_processor_to_pipeline(
else {
input_nodes.push((
product_input_name,
product_processor_name.clone(),
processor_name.clone(),
DEFAULT_PORT_HANDLE,
));
}

pipeline.add_processor(
Box::new(product_processor_factory),
&product_processor_name,
&processor_name,
product_entry_points,
);

Ok(ConnectionInfo {
processor_name: processor_name.clone(),
input_nodes,
output_node: (product_processor_name, DEFAULT_PORT_HANDLE),
output_node: (processor_name, DEFAULT_PORT_HANDLE),
})
}

fn insert_table_operator_processor_to_pipeline(
relation: &TableFactor,
operator: &TableOperatorDescriptor,
output_node_name: String,
pipeline: &mut AppPipeline,
pipeline_idx: usize,
query_context: &mut QueryContext,
) -> Result<ConnectionInfo, PipelineError> {
// the sources names that are used in this pipeline
let mut entry_points = vec![];
let mut input_nodes = vec![];

let product_processor_name = format!("join--{}", query_context.get_next_processor_id());
let product_processor =
TableProcessorFactory::new(product_processor_name.clone(), relation.clone());

pipeline.add_processor(Box::new(product_processor), &product_processor_name, vec![]);

if operator.name.to_uppercase() == "TTL" {
let processor_name = format!(
"TOP_{0}_{1}",
operator.name,
query_context.get_next_processor_id()
);
let processor_name = generate_name("TOP", operator, query_context);
if !query_context.processors_list.insert(processor_name.clone()) {
return Err(PipelineError::ProcessorAlreadyExists(processor_name));
}
let processor = TableOperatorProcessorFactory::new(
processor_name.clone(),
operator.clone(),
query_context.udfs.to_owned(),
);

let source_name = processor
.get_source_name()
.map_err(PipelineError::TableOperatorError)?;
if let Some(table) = operator.args.get(0) {
let source_name = match table {
TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?,
TableOperatorArg::Descriptor(descriptor) => {
let connection_info = insert_table_operator_processor_to_pipeline(
descriptor,
processor_name.clone(),
pipeline,
pipeline_idx,
query_context,
)?;
connection_info.processor_name
}
};

if is_an_entry_point(&source_name, query_context, pipeline_idx) {
let entry_point =
PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle);

entry_points.push(entry_point);
query_context.used_sources.push(source_name);
} else {
input_nodes.push((
source_name,
processor_name.to_owned(),
DEFAULT_PORT_HANDLE as PortHandle,
));
}

let mut entry_points = vec![];
pipeline.add_processor(Box::new(processor), &processor_name, entry_points);

if is_an_entry_point(&source_name, &mut query_context.pipeline_map, pipeline_idx) {
let entry_point =
PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle);
pipeline.connect_nodes(
&processor_name,
DEFAULT_PORT_HANDLE,
&output_node_name,
DEFAULT_PORT_HANDLE,
);

entry_points.push(entry_point);
query_context.used_sources.push(source_name);
Ok(ConnectionInfo {
processor_name,
input_nodes,
output_node: (output_node_name, DEFAULT_PORT_HANDLE),
})
} else {
input_nodes.push((
source_name,
processor_name.to_owned(),
DEFAULT_PORT_HANDLE as PortHandle,
));
Err(PipelineError::UnsupportedTableOperator(
operator.name.clone(),
))
}

pipeline.add_processor(Box::new(processor), &processor_name, entry_points);

pipeline.connect_nodes(
&processor_name,
DEFAULT_PORT_HANDLE,
&product_processor_name,
DEFAULT_PORT_HANDLE,
);

Ok(ConnectionInfo {
input_nodes,
output_node: (product_processor_name, DEFAULT_PORT_HANDLE),
})
} else if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" {
let window_processor_name = format!("window--{}", query_context.get_next_processor_id());
let window_processor =
WindowProcessorFactory::new(window_processor_name.clone(), operator.clone());
let processor_name = generate_name("WIN", operator, query_context);
if !query_context.processors_list.insert(processor_name.clone()) {
return Err(PipelineError::ProcessorAlreadyExists(processor_name));
}
let processor = WindowProcessorFactory::new(processor_name.clone(), operator.clone());

if let Some(table) = operator.args.get(0) {
let source_name = match table {
TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?,
TableOperatorArg::Descriptor(descriptor) => {
let connection_info = insert_table_operator_processor_to_pipeline(
descriptor,
processor_name.clone(),
pipeline,
pipeline_idx,
query_context,
)?;
connection_info.processor_name
}
};

if is_an_entry_point(&source_name, query_context, pipeline_idx) {
let entry_point =
PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle);

entry_points.push(entry_point);
query_context.used_sources.push(source_name);
} else {
input_nodes.push((
source_name,
processor_name.to_owned(),
DEFAULT_PORT_HANDLE as PortHandle,
));
}

let window_source_name = window_processor.get_source_name()?;
let mut window_entry_points = vec![];
pipeline.add_processor(Box::new(processor), &processor_name, entry_points);

if is_an_entry_point(
&window_source_name,
&mut query_context.pipeline_map,
pipeline_idx,
) {
let entry_point = PipelineEntryPoint::new(
window_source_name.clone(),
DEFAULT_PORT_HANDLE as PortHandle,
pipeline.connect_nodes(
&processor_name,
DEFAULT_PORT_HANDLE,
&output_node_name,
DEFAULT_PORT_HANDLE,
);

window_entry_points.push(entry_point);
query_context.used_sources.push(window_source_name);
Ok(ConnectionInfo {
processor_name,
input_nodes,
output_node: (output_node_name, DEFAULT_PORT_HANDLE),
})
} else {
input_nodes.push((
window_source_name,
window_processor_name.clone(),
DEFAULT_PORT_HANDLE as PortHandle,
));
Err(PipelineError::UnsupportedTableOperator(
operator.name.clone(),
))
}

pipeline.add_processor(
Box::new(window_processor),
&window_processor_name,
window_entry_points,
);

pipeline.connect_nodes(
&window_processor_name,
DEFAULT_PORT_HANDLE,
&product_processor_name,
DEFAULT_PORT_HANDLE,
);

Ok(ConnectionInfo {
input_nodes,
output_node: (product_processor_name, DEFAULT_PORT_HANDLE),
})
} else {
Err(PipelineError::UnsupportedTableOperator(
operator.name.clone(),
))
}
}

fn generate_name(
prefix: &str,
operator: &TableOperatorDescriptor,
query_context: &mut QueryContext,
) -> String {
let processor_name = format!(
"{0}_{1}_{2}",
prefix,
operator.name,
query_context.get_next_processor_id()
);
processor_name
}

fn insert_from_processor_to_pipeline(
query_context: &mut QueryContext,
relation: &TableFactor,
pipeline: &mut AppPipeline,
) -> String {
let product_processor_name = format!("from--{}", query_context.get_next_processor_id());
let product_processor =
TableProcessorFactory::new(product_processor_name.clone(), relation.clone());

pipeline.add_processor(Box::new(product_processor), &product_processor_name, vec![]);
product_processor_name
}

pub fn get_table_operator_arg(arg: &FunctionArg) -> Result<TableOperatorArg, PipelineError> {
match arg {
FunctionArg::Named { name, arg: _ } => {
Expand Down Expand Up @@ -297,15 +345,17 @@ pub fn is_table_operator(
}
}

pub fn is_an_entry_point(
name: &str,
pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
pipeline_idx: usize,
) -> bool {
if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) {
return true;
pub fn is_an_entry_point(name: &str, query_context: &QueryContext, pipeline_idx: usize) -> bool {
if query_context
.pipeline_map
.contains_key(&(pipeline_idx, name.to_owned()))
{
return false;
}
if query_context.processors_list.contains(&name.to_owned()) {
return false;
}
false
true
}

pub fn string_from_sql_object_name(name: &ObjectName) -> String {
Expand Down
Loading

0 comments on commit 2017880

Please sign in to comment.