Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): support datafusion scalar function #4142

Merged
merged 11 commits into from
Jun 18, 2024
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,33 @@ workspace = true

[dependencies]
api.workspace = true
arrow-schema.workspace = true
async-trait.workspace = true
bytes.workspace = true
catalog.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-function.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datatypes.workspace = true
enum-as-inner = "0.6.0"
enum_dispatch = "0.3"
futures = "0.3"
# This fork is simply for keeping our dependency in our org, and pin the version
# it is the same with upstream repo
async-trait.workspace = true
common-function.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
enum-as-inner = "0.6.0"
greptime-proto.workspace = true
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
# otherwise it is the same with upstream repo
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
minstant = "0.1.7"
Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ mod table_source;

use error::Error;

// TODO: replace this with `GREPTIME_TIMESTAMP` before v0.9
// TODO(discord9): replace this with `GREPTIME_TIMESTAMP` before v0.9
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";

pub const UPDATE_AT_TS_COL: &str = "update_at";

// TODO: refactor common types for flow to a separate module
// TODO(discord9): refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u64;
pub type TableName = [String; 3];
Expand Down
1 change: 0 additions & 1 deletion src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ impl Default for SourceSender {
}
}

// TODO: make all send operation immut
impl SourceSender {
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
self.sender.subscribe()
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}
pub(crate) use id::{GlobalId, Id, LocalId};
pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
pub(crate) use relation::{AggregateExpr, AggregateFunc};
pub(crate) use scalar::{ScalarExpr, TypedExpr};
pub(crate) use scalar::{DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr};
28 changes: 27 additions & 1 deletion src/flow/src/expr/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

use std::any::Any;

use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use datafusion_common::DataFusionError;
use datatypes::data_type::ConcreteDataType;
use serde::{Deserialize, Serialize};
use snafu::{Location, Snafu};
use snafu::{Location, ResultExt, Snafu};

fn is_send_sync() {
fn check<T: Send + Sync>() {}
Expand Down Expand Up @@ -107,4 +110,27 @@ pub enum EvalError {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Arrow error: {raw:?}, context: {context}"))]
Arrow {
raw: ArrowError,
context: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("DataFusion error: {raw:?}, context: {context}"))]
Datafusion {
raw: DataFusionError,
context: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("External error"))]
External {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
}
2 changes: 1 addition & 1 deletion src/flow/src/expr/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ impl VariadicFunc {
name: &str,
arg_types: &[Option<ConcreteDataType>],
) -> Result<Self, Error> {
// TODO: future variadic funcs to be added might need to check arg_types
// TODO(discord9): future variadic funcs to be added might need to check arg_types
let _ = arg_types;
match name {
"and" => Ok(Self::And),
Expand Down
Loading