Skip to content

Commit

Permalink
build join
Browse files Browse the repository at this point in the history
  • Loading branch information
Dario Pizzamiglio committed Sep 28, 2023
1 parent 2017880 commit 731d608
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 166 deletions.
112 changes: 77 additions & 35 deletions dozer-sql/src/pipeline_builder/from_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ pub enum TableOperatorArg {
Descriptor(TableOperatorDescriptor),
}

#[derive(Clone, Debug)]
pub enum SourceType {
Srouce,
Pipeline,
Processor,
}

pub fn insert_from_to_pipeline(
from: &TableWithJoins,
pipeline: &mut AppPipeline,
Expand All @@ -60,13 +67,25 @@ fn insert_table_to_pipeline(
let product_processor_name =
insert_from_processor_to_pipeline(query_context, relation, pipeline);

insert_table_operator_processor_to_pipeline(
let connection_info = insert_table_operator_processor_to_pipeline(
&operator,
product_processor_name,
pipeline,
pipeline_idx,
query_context,
)
)?;

pipeline.connect_nodes(
&connection_info.output_node.0,
connection_info.output_node.1,
&product_processor_name.clone(),
DEFAULT_PORT_HANDLE,
);

Ok(ConnectionInfo {
processor_name: product_processor_name.clone(),
input_nodes: vec![],
output_node: (product_processor_name, DEFAULT_PORT_HANDLE),
})
} else {
insert_table_processor_to_pipeline(relation, pipeline, pipeline_idx, query_context)
}
Expand Down Expand Up @@ -126,18 +145,19 @@ fn insert_table_processor_to_pipeline(
})
}

fn insert_table_operator_processor_to_pipeline(
pub fn insert_table_operator_processor_to_pipeline(
operator: &TableOperatorDescriptor,
output_node_name: String,
pipeline: &mut AppPipeline,
pipeline_idx: usize,
query_context: &mut QueryContext,
) -> Result<ConnectionInfo, PipelineError> {
// the sources names that are used in this pipeline
let mut entry_points = vec![];

let mut input_nodes = vec![];

if operator.name.to_uppercase() == "TTL" {
let mut entry_points = vec![];

let processor_name = generate_name("TOP", operator, query_context);
if !query_context.processors_list.insert(processor_name.clone()) {
return Err(PipelineError::ProcessorAlreadyExists(processor_name));
Expand All @@ -154,7 +174,6 @@ fn insert_table_operator_processor_to_pipeline(
TableOperatorArg::Descriptor(descriptor) => {
let connection_info = insert_table_operator_processor_to_pipeline(
descriptor,
processor_name.clone(),
pipeline,
pipeline_idx,
query_context,
Expand All @@ -163,40 +182,46 @@ fn insert_table_operator_processor_to_pipeline(
}
};

if is_an_entry_point(&source_name, query_context, pipeline_idx) {
if is_an_entry_point(&source_name.clone(), query_context, pipeline_idx) {
let entry_point =
PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle);

entry_points.push(entry_point);
query_context.used_sources.push(source_name);
} else {
query_context.used_sources.push(source_name.clone());
} else if is_a_pipeline_output(&source_name, query_context, pipeline_idx) {
input_nodes.push((
source_name,
processor_name.to_owned(),
source_name.clone(),
processor_name.clone(),
DEFAULT_PORT_HANDLE as PortHandle,
));
}

pipeline.add_processor(Box::new(processor), &processor_name, entry_points);

pipeline.connect_nodes(
&processor_name,
DEFAULT_PORT_HANDLE,
&output_node_name,
DEFAULT_PORT_HANDLE,
);
pipeline.add_processor(Box::new(processor), &processor_name.clone(), entry_points);

if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx)
&& !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx)
{
pipeline.connect_nodes(
&source_name,
DEFAULT_PORT_HANDLE,
&processor_name.clone(),
DEFAULT_PORT_HANDLE,
);
}

Ok(ConnectionInfo {
processor_name,
processor_name: processor_name.clone(),
input_nodes,
output_node: (output_node_name, DEFAULT_PORT_HANDLE),
output_node: (processor_name, DEFAULT_PORT_HANDLE),
})
} else {
Err(PipelineError::UnsupportedTableOperator(
operator.name.clone(),
))
}
} else if operator.name.to_uppercase() == "TUMBLE" || operator.name.to_uppercase() == "HOP" {
let mut entry_points = vec![];

let processor_name = generate_name("WIN", operator, query_context);
if !query_context.processors_list.insert(processor_name.clone()) {
return Err(PipelineError::ProcessorAlreadyExists(processor_name));
Expand All @@ -209,7 +234,6 @@ fn insert_table_operator_processor_to_pipeline(
TableOperatorArg::Descriptor(descriptor) => {
let connection_info = insert_table_operator_processor_to_pipeline(
descriptor,
processor_name.clone(),
pipeline,
pipeline_idx,
query_context,
Expand All @@ -223,28 +247,32 @@ fn insert_table_operator_processor_to_pipeline(
PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE as PortHandle);

entry_points.push(entry_point);
query_context.used_sources.push(source_name);
} else {
query_context.used_sources.push(source_name.clone());
} else if is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx) {
input_nodes.push((
source_name,
source_name.clone(),
processor_name.to_owned(),
DEFAULT_PORT_HANDLE as PortHandle,
));
}

pipeline.add_processor(Box::new(processor), &processor_name, entry_points);

pipeline.connect_nodes(
&processor_name,
DEFAULT_PORT_HANDLE,
&output_node_name,
DEFAULT_PORT_HANDLE,
);
if !is_an_entry_point(&source_name.clone(), query_context, pipeline_idx)
&& !is_a_pipeline_output(&source_name.clone(), query_context, pipeline_idx)
{
pipeline.connect_nodes(
&source_name,
DEFAULT_PORT_HANDLE,
&processor_name,
DEFAULT_PORT_HANDLE,
);
}

Ok(ConnectionInfo {
processor_name,
processor_name: processor_name.clone(),
input_nodes,
output_node: (output_node_name, DEFAULT_PORT_HANDLE),
output_node: (processor_name, DEFAULT_PORT_HANDLE),
})
} else {
Err(PipelineError::UnsupportedTableOperator(
Expand All @@ -258,7 +286,7 @@ fn insert_table_operator_processor_to_pipeline(
}
}

fn generate_name(
pub fn generate_name(
prefix: &str,
operator: &TableOperatorDescriptor,
query_context: &mut QueryContext,
Expand Down Expand Up @@ -358,6 +386,20 @@ pub fn is_an_entry_point(name: &str, query_context: &QueryContext, pipeline_idx:
true
}

pub fn is_a_pipeline_output(
name: &str,
query_context: &mut QueryContext,
pipeline_idx: usize,
) -> bool {
if query_context
.pipeline_map
.contains_key(&(pipeline_idx, name.to_owned()))
{
return true;
}
false
}

pub fn string_from_sql_object_name(name: &ObjectName) -> String {
name.0
.iter()
Expand Down
Loading

0 comments on commit 731d608

Please sign in to comment.