Skip to content

Commit

Permalink
refactor: Split dozer-sql-expression
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Sep 19, 2023
1 parent 4748fa3 commit bc99bfc
Show file tree
Hide file tree
Showing 163 changed files with 5,143 additions and 5,440 deletions.
40 changes: 29 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dozer-cli/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use dozer_cache::dozer_log::storage;
use dozer_cache::errors::CacheError;
use dozer_core::errors::ExecutionError;
use dozer_ingestion::errors::ConnectorError;
use dozer_sql::pipeline::errors::PipelineError;
use dozer_sql::errors::PipelineError;
use dozer_types::{constants::LOCK_FILE, thiserror::Error};
use dozer_types::{errors::internal::BoxedError, serde_json};
use dozer_types::{serde_yaml, thiserror};
Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod pipeline;
pub mod shutdown;
pub mod simple;
use dozer_core::{app::AppPipeline, errors::ExecutionError};
use dozer_sql::pipeline::{builder::statement_to_pipeline, errors::PipelineError};
use dozer_sql::{builder::statement_to_pipeline, errors::PipelineError};
use dozer_types::log::debug;
use errors::OrchestrationError;
use shutdown::ShutdownSender;
Expand Down Expand Up @@ -58,7 +58,7 @@ pub use dozer_ingestion::{
connectors::{get_connector, TableInfo},
errors::ConnectorError,
};
pub use dozer_sql::pipeline::builder::QueryContext;
pub use dozer_sql::builder::QueryContext;
pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, PipelineError> {
let mut pipeline = AppPipeline::new_with_default_flags();
statement_to_pipeline(sql, &mut pipeline, None, vec![])
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/live/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::errors::{BuildError, CliError, OrchestrationError};
use dozer_core::errors::ExecutionError;
use dozer_sql::pipeline::errors::PipelineError;
use dozer_sql::errors::PipelineError;

use dozer_types::thiserror;
use dozer_types::thiserror::Error;
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use clap::Parser;

use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_core::{app::AppPipeline, dag_schemas::DagSchemas, Dag};
use dozer_sql::pipeline::builder::statement_to_pipeline;
use dozer_sql::builder::statement_to_pipeline;
use dozer_tracing::{Labels, LabelsAndProgress};
use dozer_types::{
constants::DEFAULT_DEFAULT_MAX_NUM_RECORDS,
Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use dozer_core::app::PipelineEntryPoint;
use dozer_core::node::SinkFactory;
use dozer_core::DEFAULT_PORT_HANDLE;
use dozer_ingestion::connectors::{get_connector, get_connector_info_table};
use dozer_sql::pipeline::builder::statement_to_pipeline;
use dozer_sql::pipeline::builder::{OutputNodeInfo, QueryContext};
use dozer_sql::builder::statement_to_pipeline;
use dozer_sql::builder::{OutputNodeInfo, QueryContext};
use dozer_tracing::LabelsAndProgress;
use dozer_types::log::debug;
use dozer_types::models::api_endpoint::ApiEndpoint;
Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use crate::console_helper::PURPLE;
use crate::console_helper::RED;
use dozer_core::errors::ExecutionError;
use dozer_ingestion::connectors::{get_connector, SourceSchema, TableInfo};
use dozer_sql::pipeline::builder::statement_to_pipeline;
use dozer_sql::pipeline::errors::PipelineError;
use dozer_sql::builder::statement_to_pipeline;
use dozer_sql::errors::PipelineError;
use dozer_types::log::info;
use dozer_types::models::config::Config;
use dozer_types::tracing::error;
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/tests/test_suite/connectors/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ fn field_to_sql(field: &Field) -> String {
Field::Date(d) => format!("'{}'", d),
Field::Json(b) => format!("'{b}'::jsonb"),
Field::Point(p) => format!("'({},{})'", p.0.x(), p.0.y()),
Field::Duration(d) => d.to_string(),
Field::Duration(_) => field.to_string(),
Field::Null => "NULL".to_string(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-log-python/src/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn map_value(value: Field, py: Python) -> PyResult<Py<PyAny>> {
Field::Date(v) => Ok(v.to_string().to_object(py)),
Field::Json(v) => map_json_py(v, py),
Field::Point(v) => map_point(v, py),
Field::Duration(v) => Ok(v.to_string().to_object(py)),
Field::Duration(_) => Ok(value.to_string().to_object(py)),
Field::Null => Ok(py.None()),
}
}
Expand Down
16 changes: 3 additions & 13 deletions dozer-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,20 @@ dozer-types = { path = "../dozer-types" }
dozer-storage = { path = "../dozer-storage" }
dozer-core = { path = "../dozer-core" }
dozer-tracing = { path = "../dozer-tracing" }
dozer-sql-expression = { path = "expression" }

ahash = "0.8.3"
enum_dispatch = "0.3.11"
like = "0.3.1"
linked-hash-map = { version = "0.5.6", features = ["serde_impl"] }
metrics = "0.21.0"
multimap = "0.8.3"
num-traits = "0.2.15"
pest = "2.6.0"
pest_derive = "2.5.6"
regex = "1.8.1"
sqlparser = { git = "https://github.com/getdozer/sqlparser-rs.git" }
uuid = { version = "1.3.0", features = ["v1", "v4", "fast-rng"] }
bigdecimal = { version = "0.3", features = ["serde"], optional = true }
ort = { version = "1.15.2", optional = true }
ndarray = { version = "0.15", optional = true }
half = { version = "2.3.1", optional = true }

[dev-dependencies]
tempdir = "0.3.7"
proptest = "1.2.0"
tokio = { version = "1", features = ["rt", "macros"] }

[features]
python = ["dozer-types/python-auto-initialize"]
bigdecimal = ["dep:bigdecimal", "sqlparser/bigdecimal"]
onnx = ["dep:ort", "dep:ndarray", "dep:half"]
python = ["dozer-sql-expression/python"]
onnx = ["dozer-sql-expression/onnx"]
24 changes: 24 additions & 0 deletions dozer-sql/expression/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "dozer-sql-expression"
version = "0.1.39"
edition = "2021"
authors = ["getdozer/dozer-dev"]

[dependencies]
dozer-types = { path = "../../dozer-types" }
num-traits = "0.2.16"
sqlparser = { git = "https://github.com/getdozer/sqlparser-rs.git" }
bigdecimal = { version = "0.3", features = ["serde"], optional = true }
ort = { version = "1.15.2", optional = true }
ndarray = { version = "0.15", optional = true }
half = { version = "2.3.1", optional = true }
like = "0.3.1"
jsonpath = { path = "../jsonpath" }

[dev-dependencies]
proptest = "1.2.0"

[features]
bigdecimal = ["dep:bigdecimal", "sqlparser/bigdecimal"]
python = ["dozer-types/python-auto-initialize"]
onnx = ["dep:ort", "dep:ndarray", "dep:half"]
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::pipeline::errors::PipelineError;
use crate::pipeline::errors::PipelineError::InvalidFunction;
use std::fmt::{Display, Formatter};

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
Expand All @@ -14,16 +12,16 @@ pub enum AggregateFunctionType {
}

impl AggregateFunctionType {
pub(crate) fn new(name: &str) -> Result<AggregateFunctionType, PipelineError> {
pub(crate) fn new(name: &str) -> Option<AggregateFunctionType> {
match name {
"avg" => Ok(AggregateFunctionType::Avg),
"count" => Ok(AggregateFunctionType::Count),
"max" => Ok(AggregateFunctionType::Max),
"max_value" => Ok(AggregateFunctionType::MaxValue),
"min" => Ok(AggregateFunctionType::Min),
"min_value" => Ok(AggregateFunctionType::MinValue),
"sum" => Ok(AggregateFunctionType::Sum),
_ => Err(InvalidFunction(name.to_string())),
"avg" => Some(AggregateFunctionType::Avg),
"count" => Some(AggregateFunctionType::Count),
"max" => Some(AggregateFunctionType::Max),
"max_value" => Some(AggregateFunctionType::MaxValue),
"min" => Some(AggregateFunctionType::Min),
"min_value" => Some(AggregateFunctionType::MinValue),
"sum" => Some(AggregateFunctionType::Sum),
_ => None,
}
}
}
Expand Down
127 changes: 127 additions & 0 deletions dozer-sql/expression/src/arg_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::fmt::Display;
use std::ops::Range;

use crate::error::Error;
use crate::execution::{Expression, ExpressionType};
use dozer_types::chrono::{DateTime, FixedOffset};
use dozer_types::types::{DozerPoint, Field, FieldType, Schema};

pub fn validate_one_argument(
args: &[Expression],
schema: &Schema,
function_name: impl Display,
) -> Result<ExpressionType, Error> {
validate_num_arguments(1..2, args.len(), function_name)?;
args[0].get_type(schema)
}

pub fn validate_two_arguments(
args: &[Expression],
schema: &Schema,
function_name: impl Display,
) -> Result<(ExpressionType, ExpressionType), Error> {
validate_num_arguments(2..3, args.len(), function_name)?;
let arg1 = args[0].get_type(schema)?;
let arg2 = args[1].get_type(schema)?;
Ok((arg1, arg2))
}

pub fn validate_num_arguments(
expected: Range<usize>,
actual: usize,
function_name: impl Display,
) -> Result<(), Error> {
if !expected.contains(&actual) {
Err(Error::InvalidNumberOfArguments {
function_name: function_name.to_string(),
expected,
actual,
})
} else {
Ok(())
}
}

pub fn validate_arg_type(
arg: &Expression,
expected: Vec<FieldType>,
schema: &Schema,
function_name: impl Display,
argument_index: usize,
) -> Result<ExpressionType, Error> {
let arg_t = arg.get_type(schema)?;
if !expected.contains(&arg_t.return_type) {
Err(Error::InvalidFunctionArgumentType {
function_name: function_name.to_string(),
argument_index,
actual: arg_t.return_type,
expected,
})
} else {
Ok(arg_t)
}
}

pub fn extract_uint(
field: Field,
function_name: impl Display,
argument_index: usize,
) -> Result<u64, Error> {
if let Some(value) = field.to_uint() {
Ok(value)
} else {
Err(Error::InvalidFunctionArgument {
function_name: function_name.to_string(),
argument_index,
argument: field,
})
}
}

pub fn extract_float(
field: Field,
function_name: impl Display,
argument_index: usize,
) -> Result<f64, Error> {
if let Some(value) = field.to_float() {
Ok(value)
} else {
Err(Error::InvalidFunctionArgument {
function_name: function_name.to_string(),
argument_index,
argument: field,
})
}
}

pub fn extract_point(
field: Field,
function_name: impl Display,
argument_index: usize,
) -> Result<DozerPoint, Error> {
if let Some(value) = field.to_point() {
Ok(value)
} else {
Err(Error::InvalidFunctionArgument {
function_name: function_name.to_string(),
argument_index,
argument: field,
})
}
}

pub fn extract_timestamp(
field: Field,
function_name: impl Display,
argument_index: usize,
) -> Result<DateTime<FixedOffset>, Error> {
if let Some(value) = field.to_timestamp() {
Ok(value)
} else {
Err(Error::InvalidFunctionArgument {
function_name: function_name.to_string(),
argument_index,
argument: field,
})
}
}
Loading

0 comments on commit bc99bfc

Please sign in to comment.