Skip to content

Commit

Permalink
feat: Add JavaScript UDF (#2221)
Browse files Browse the repository at this point in the history
* feat: Add `JavaScript` UDF

* refactor: Make `Runtime::call_function` accept `&mut self` to improve ergonomics
  • Loading branch information
chubei authored Nov 17, 2023
1 parent 5803abe commit 3ad02a8
Show file tree
Hide file tree
Showing 53 changed files with 1,195 additions and 629 deletions.
40 changes: 27 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions dozer-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ pub mod live;
pub mod pipeline;
pub mod simple;
use dozer_api::shutdown::ShutdownSender;
use dozer_core::{app::AppPipeline, errors::ExecutionError};
use dozer_sql::{builder::statement_to_pipeline, errors::PipelineError};
use dozer_core::errors::ExecutionError;
use dozer_types::log::debug;
use errors::OrchestrationError;

Expand All @@ -27,11 +26,6 @@ pub use dozer_ingestion::{
errors::ConnectorError,
{get_connector, TableInfo},
};
pub use dozer_sql::builder::QueryContext;
pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, PipelineError> {
let mut pipeline = AppPipeline::new_with_default_flags();
statement_to_pipeline(sql, &mut pipeline, None, vec![])
}

pub use dozer_types::models::connection::Connection;
use dozer_types::tracing::error;
Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ fn get_contract(dozer_and_contract: &Option<DozerAndContract>) -> Result<&Contra
pub async fn create_contract(dozer: SimpleOrchestrator) -> Result<Contract, OrchestrationError> {
let dag = create_dag(&dozer).await?;
let version = dozer.config.version;
let schemas = DagSchemas::new(dag)?;
let schemas = DagSchemas::new(dag).await?;
let contract = Contract::new(
version as usize,
&schemas,
Expand Down Expand Up @@ -393,6 +393,7 @@ fn get_dozer_run_instance(
&mut AppPipeline::new(dozer.config.flags.clone().into()),
None,
dozer.config.udfs.clone(),
dozer.runtime.clone(),
)
.map_err(LiveError::PipelineError)?;

Expand Down
22 changes: 16 additions & 6 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ impl<'a> PipelineBuilder<'a> {

// This function is used to figure out the sources that are used in the pipeline
// based on the SQL and API Endpoints
pub fn calculate_sources(&self) -> Result<CalculatedSources, OrchestrationError> {
pub fn calculate_sources(
&self,
runtime: Arc<Runtime>,
) -> Result<CalculatedSources, OrchestrationError> {
let mut original_sources = vec![];

let mut query_ctx = None;
Expand All @@ -164,8 +167,9 @@ impl<'a> PipelineBuilder<'a> {
let mut transformed_sources = vec![];

if let Some(sql) = &self.sql {
let query_context = statement_to_pipeline(sql, &mut pipeline, None, self.udfs.to_vec())
.map_err(OrchestrationError::PipelineError)?;
let query_context =
statement_to_pipeline(sql, &mut pipeline, None, self.udfs.to_vec(), runtime)
.map_err(OrchestrationError::PipelineError)?;

query_ctx = Some(query_context.clone());

Expand Down Expand Up @@ -202,7 +206,7 @@ impl<'a> PipelineBuilder<'a> {
runtime: &Arc<Runtime>,
shutdown: ShutdownReceiver,
) -> Result<dozer_core::Dag, OrchestrationError> {
let calculated_sources = self.calculate_sources()?;
let calculated_sources = self.calculate_sources(runtime.clone())?;

debug!("Used Sources: {:?}", calculated_sources.original_sources);
let grouped_connections = self
Expand All @@ -229,8 +233,14 @@ impl<'a> PipelineBuilder<'a> {
}

if let Some(sql) = &self.sql {
let query_context = statement_to_pipeline(sql, &mut pipeline, None, self.udfs.to_vec())
.map_err(OrchestrationError::PipelineError)?;
let query_context = statement_to_pipeline(
sql,
&mut pipeline,
None,
self.udfs.to_vec(),
runtime.clone(),
)
.map_err(OrchestrationError::PipelineError)?;

for (name, table_info) in query_context.output_tables_map {
available_output_tables
Expand Down
7 changes: 5 additions & 2 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dozer_ingestion::{IngestionConfig, Ingestor};
use dozer_tracing::LabelsAndProgress;
use dozer_types::errors::internal::BoxedError;
use dozer_types::indicatif::ProgressBar;
use dozer_types::log::info;
use dozer_types::log::{error, info};
use dozer_types::models::connection::Connection;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::parking_lot::Mutex;
Expand Down Expand Up @@ -286,7 +286,10 @@ impl Source for ConnectorSource {
.await;
match result {
Ok(Ok(_)) => {}
Ok(Err(e)) => std::panic::panic_any(e),
Ok(Err(e)) => {
error!("{}", e);
std::panic::panic_any(e)
}
// Aborted means we are shutting down
Err(Aborted) => (),
}
Expand Down
5 changes: 3 additions & 2 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl SimpleOrchestrator {
.runtime
.block_on(builder.build(&self.runtime, shutdown))?;
// Populate schemas.
let dag_schemas = DagSchemas::new(dag)?;
let dag_schemas = self.runtime.block_on(DagSchemas::new(dag))?;

// Get current contract.
let enable_token = self.config.api.api_security.is_some();
Expand Down Expand Up @@ -470,12 +470,13 @@ impl SimpleOrchestrator {
}
}

pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
pub fn validate_sql(sql: String, runtime: Arc<Runtime>) -> Result<(), PipelineError> {
statement_to_pipeline(
&sql,
&mut AppPipeline::new_with_default_flags(),
None,
vec![],
runtime,
)
.map_or_else(
|e| {
Expand Down
39 changes: 27 additions & 12 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{collections::HashMap, fmt::Debug};

use daggy::petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences};
use daggy::{
petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences},
NodeIndex,
};
use dozer_types::node::NodeHandle;

use crate::{
Expand Down Expand Up @@ -62,8 +65,12 @@ impl BuilderDag {
}

// Build the nodes.
let graph = dag_schemas.into_graph().try_map(
|node_index, node| match node.kind {
let mut graph = daggy::Dag::new();
let (nodes, edges) = dag_schemas.into_graph().into_graph().into_nodes_edges();
for (node_index, node) in nodes.into_iter().enumerate() {
let node_index = NodeIndex::new(node_index);
let node = node.weight;
let node = match node.kind {
DagNodeKind::Source(source) => {
let mut last_checkpoint_by_name = checkpoint.get_source_state(&node.handle)?;
let mut last_checkpoint = HashMap::new();
Expand All @@ -87,13 +94,13 @@ impl BuilderDag {
)
.map_err(ExecutionError::Factory)?;

Ok::<_, ExecutionError>(NodeType {
NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint,
},
})
}
}
DagNodeKind::Processor(processor) => {
let processor = processor
Expand All @@ -109,11 +116,12 @@ impl BuilderDag {
.remove(&node_index)
.expect("we collected all processor checkpoint data"),
)
.await
.map_err(ExecutionError::Factory)?;
Ok(NodeType {
NodeType {
handle: node.handle,
kind: NodeKind::Processor(processor),
})
}
}
DagNodeKind::Sink(sink) => {
let sink = sink
Expand All @@ -123,14 +131,21 @@ impl BuilderDag {
.expect("we collected all input schemas"),
)
.map_err(ExecutionError::Factory)?;
Ok(NodeType {
NodeType {
handle: node.handle,
kind: NodeKind::Sink(sink),
})
}
}
},
|_, edge| Ok(edge),
)?;
};
graph.add_node(node);
}

// Connect the edges.
for edge in edges {
graph
.add_edge(edge.source(), edge.target(), edge.weight)
.expect("we know there's no loop");
}

Ok(BuilderDag { graph })
}
Expand Down
7 changes: 4 additions & 3 deletions dozer-core/src/dag_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ impl DagSchemas {
impl DagSchemas {
/// Validate and populate the schemas, the resultant DAG will have the exact same structure as the input DAG,
/// with validated schema information on the edges.
pub fn new(dag: Dag) -> Result<Self, ExecutionError> {
pub async fn new(dag: Dag) -> Result<Self, ExecutionError> {
validate_connectivity(&dag);

match populate_schemas(dag.into_graph()) {
match populate_schemas(dag.into_graph()).await {
Ok(graph) => {
info!("[pipeline] Validation completed");
Ok(Self { graph })
Expand Down Expand Up @@ -188,7 +188,7 @@ fn validate_connectivity(dag: &Dag) {
}

/// In topological order, pass output schemas to downstream nodes' input schemas.
fn populate_schemas(
async fn populate_schemas(
dag: daggy::Dag<NodeType, DagEdgeType>,
) -> Result<daggy::Dag<NodeType, EdgeType>, ExecutionError> {
let mut edges = vec![None; dag.graph().edge_count()];
Expand Down Expand Up @@ -226,6 +226,7 @@ fn populate_schemas(
for edge in dag.graph().edges(node_index) {
let schema = processor
.get_output_schema(&edge.weight().from, &input_schemas)
.await
.map_err(ExecutionError::Factory)?;
create_edge(&mut edges, edge, EdgeKind::FromProcessor, schema);
}
Expand Down
6 changes: 3 additions & 3 deletions dozer-core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl DagExecutor {
checkpoint: OptionCheckpoint,
options: ExecutorOptions,
) -> Result<Self, ExecutionError> {
let dag_schemas = DagSchemas::new(dag)?;
let dag_schemas = DagSchemas::new(dag).await?;

let builder_dag = BuilderDag::new(&checkpoint, dag_schemas).await?;

Expand All @@ -89,8 +89,8 @@ impl DagExecutor {
})
}

pub fn validate<T: Clone + Debug>(dag: Dag) -> Result<(), ExecutionError> {
DagSchemas::new(dag)?;
pub async fn validate<T: Clone + Debug>(dag: Dag) -> Result<(), ExecutionError> {
DagSchemas::new(dag).await?;
Ok(())
}

Expand Down
Loading

0 comments on commit 3ad02a8

Please sign in to comment.