Skip to content

Commit

Permalink
feat: basic types for Dataflow Framework (#3186)
Browse files Browse the repository at this point in the history
* feat: basic types for Dataflow Framework

* docs: license header

* chore: add name in todo, remove deprecated code

* chore: typo

* test: linear&repr unit test

* refactor: avoid mod.rs

* feat: Relation Type

* feat: unmat funcs

* feat: simple temporal filter(sliding window)

* refactor: impl Snafu for EvalError

* feat: cast as type

* feat: temporal filter

* feat: time index in RelationType

* refactor: move EvalError to expr

* refactor: error handling for func

* chore: fmt&comment

* make EvalError pub again

* refactor: move ScalarExpr to scalar.rs

* refactor: remove mod.rs for relation

* chore: slim down PR size

* chore: license header

* chore: per review

* chore: more fix per review

* chore: even more fix per review

* chore: fmt

* chore: fmt

* feat: impl From/Into ProtoRow instead

* chore: use cast not cast_with_opt&`Key` struct

* chore: new_unchecked

* feat: `Key::subset_of` method

* chore: toml in order
  • Loading branch information
discord9 authored Jan 30, 2024
1 parent ddbd0ab commit 0a9361a
Show file tree
Hide file tree
Showing 12 changed files with 1,413 additions and 1 deletion.
555 changes: 554 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ members = [
"src/datanode",
"src/datatypes",
"src/file-engine",
"src/flow",
"src/frontend",
"src/log-store",
"src/meta-client",
Expand Down
24 changes: 24 additions & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "flow"
version.workspace = true
edition.workspace = true
license.workspace = true

[dependencies]
api.workspace = true
bimap = "0.6.3"
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
hydroflow = "0.5.0"
itertools.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
tokio.workspace = true
tonic.workspace = true
18 changes: 18 additions & 0 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! for getting data from source and sending results to sink
//! and communicating with other parts of the database
pub(crate) mod error;
77 changes: 77 additions & 0 deletions src/flow/src/adapter/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};

use crate::expr::EvalError;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
/// TODO(discord9): add detailed location of column
#[snafu(display("Failed to eval stream"))]
Eval {
source: EvalError,
location: Location,
},

#[snafu(display("Table not found: {name}"))]
TableNotFound { name: String, location: Location },

#[snafu(display("Table already exist: {name}"))]
TableAlreadyExist { name: String, location: Location },

#[snafu(display("Failed to join task"))]
JoinTask {
#[snafu(source)]
error: tokio::task::JoinError,
location: Location,
},

#[snafu(display("Invalid query: {reason}"))]
InvalidQuery { reason: String, location: Location },

#[snafu(display("No protobuf type for value: {value}"))]
NoProtoType { value: Value, location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Self::Eval { .. } | &Self::JoinTask { .. } => StatusCode::Internal,
&Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. } => StatusCode::TableNotFound,
&Self::InvalidQuery { .. } => StatusCode::PlanQuery,
Self::NoProtoType { .. } => StatusCode::Unexpected,
}
}

fn as_any(&self) -> &dyn Any {
self
}
}

define_into_tonic_status!(Error);
22 changes: 22 additions & 0 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! for declare Expression in dataflow, including map, reduce, id and join(TODO!) etc.
pub(crate) mod error;
mod id;

pub(crate) use id::{GlobalId, Id, LocalId};

pub(crate) use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
61 changes: 61 additions & 0 deletions src/flow/src/expr/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use datatypes::data_type::ConcreteDataType;
use serde::{Deserialize, Serialize};
use snafu::{Location, Snafu};

/// EvalError is about errors happen on columnar evaluation
///
/// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum EvalError {
#[snafu(display("Division by zero"))]
DivisionByZero,

#[snafu(display("Type mismatch: expected {expected}, actual {actual}"))]
TypeMismatch {
expected: ConcreteDataType,
actual: ConcreteDataType,
location: Location,
},

/// can't nest datatypes error because EvalError need to be store in map and serialization
#[snafu(display("Fail to unpack from value to given type: {msg}"))]
TryFromValue { msg: String, location: Location },

#[snafu(display("Fail to cast value of type {from} to given type {to}"))]
CastValue {
from: ConcreteDataType,
to: ConcreteDataType,
source: datatypes::Error,
location: Location,
},

#[snafu(display("Invalid argument: {reason}"))]
InvalidArgument { reason: String, location: Location },

#[snafu(display("Internal error: {reason}"))]
Internal { reason: String, location: Location },

#[snafu(display("Optimize error: {reason}"))]
Optimize { reason: String, location: Location },
}
41 changes: 41 additions & 0 deletions src/flow/src/expr/id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

/// Global id's scope is in Current Worker, and is cross-dataflow
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum GlobalId {
/// System namespace.
System(u64),
/// User namespace.
User(u64),
/// Transient namespace.
Transient(u64),
/// Dummy id for query being explained
Explain,
}

/// Local id is used in local scope created by `Plan::Let{id: LocalId, value, body}`
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub struct LocalId(pub(crate) u64);

/// Id is used to identify a dataflow component in plan like `Plan::Get{id: Id}`
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum Id {
/// An identifier that refers to a local component of a dataflow.
Local(LocalId),
/// An identifier that refers to a global dataflow.
Global(GlobalId),
}
33 changes: 33 additions & 0 deletions src/flow/src/expr/relation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) use func::AggregateFunc;
use serde::{Deserialize, Serialize};

use crate::expr::ScalarExpr;

mod func;

/// Describes an aggregation expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct AggregateExpr {
/// Names the aggregation function.
pub func: AggregateFunc,
/// An expression which extracts from each row the input to `func`.
/// TODO(discord9): currently unused, it only used in generate KeyValPlan from AggregateExpr
pub expr: ScalarExpr,
/// Should the aggregation be applied only to distinct results in each group.
#[serde(default)]
pub distinct: bool,
}
19 changes: 19 additions & 0 deletions src/flow/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(unused)]
// allow unused for now because it should be use later
mod adapter;
mod expr;
mod repr;
Loading

0 comments on commit 0a9361a

Please sign in to comment.