From be1eb4efb7c7f0e62ba4c6895401818b5eb70d1c Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Mon, 13 May 2024 19:58:02 +0800 Subject: [PATCH] feat(flow): render source/sink (#3903) * feat(flow): render src/sink * chore: add empty impl * chore: typos * refactor: according to review(WIP) * refactor: reexport df_sbustrait&use to_sub_plan * fix: add implict location to error enum * fix: error handling unwrap query_ctx --- Cargo.lock | 43 +++- Cargo.toml | 1 + src/common/substrait/src/df_substrait.rs | 15 +- src/common/substrait/src/lib.rs | 5 +- src/flow/Cargo.toml | 16 +- src/flow/src/adapter.rs | 3 + src/flow/src/adapter/error.rs | 64 ++++- src/flow/src/adapter/node_context.rs | 315 +++++++++++++++++++++++ src/flow/src/compute.rs | 4 + src/flow/src/compute/render.rs | 9 +- src/flow/src/compute/render/map.rs | 2 +- src/flow/src/compute/render/reduce.rs | 10 +- src/flow/src/compute/render/src_sink.rs | 161 ++++++++++++ src/flow/src/compute/state.rs | 2 +- src/flow/src/compute/types.rs | 17 +- src/flow/src/expr/error.rs | 5 + src/flow/src/expr/func.rs | 8 +- src/flow/src/expr/id.rs | 2 +- src/flow/src/expr/relation/func.rs | 8 +- src/flow/src/plan.rs | 51 +++- src/flow/src/repr.rs | 10 +- src/flow/src/repr/relation.rs | 79 +++++- src/flow/src/transform.rs | 113 ++++---- src/flow/src/transform/aggr.rs | 57 ++-- src/flow/src/transform/expr.rs | 13 +- src/flow/src/transform/literal.rs | 17 +- src/flow/src/transform/plan.rs | 43 +++- 27 files changed, 932 insertions(+), 141 deletions(-) create mode 100644 src/flow/src/adapter/node_context.rs create mode 100644 src/flow/src/compute/render/src_sink.rs diff --git a/Cargo.lock b/Cargo.lock index 028c1a2f2a72..d8285185d110 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2477,6 +2477,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "darling" version = "0.14.4" @@ -3788,20 +3798,28 @@ name = "flow" version = "0.7.2" dependencies = [ "api", + "async-trait", "catalog", + "common-base", "common-catalog", "common-decimal", "common-error", + "common-frontend", "common-macro", + "common-meta", + "common-runtime", "common-telemetry", "common-time", "datafusion-common 37.0.0", "datafusion-expr 37.0.0", - "datafusion-substrait", "datatypes", "enum_dispatch", + "futures", + "greptime-proto", "hydroflow", "itertools 0.10.5", + "minstant", + "nom", "num-traits", "prost 0.12.4", "query", @@ -3811,6 +3829,7 @@ dependencies = [ "session", "smallvec", "snafu 0.8.2", + "store-api", "strum 0.25.0", "substrait 0.7.2", "table", @@ -5690,6 +5709,16 @@ dependencies = [ "adler", ] +[[package]] +name = "minstant" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db" +dependencies = [ + "ctor", + "web-time 1.1.0", +] + [[package]] name = "mio" version = "0.8.11" @@ -11334,7 +11363,7 @@ dependencies = [ "tracing-core", "tracing-log 0.2.0", "tracing-subscriber", - "web-time", + "web-time 0.2.4", ] [[package]] @@ -12092,6 +12121,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webbrowser" version = "0.8.15" diff --git a/Cargo.toml b/Cargo.toml index c48a4fc5e5a4..08ea98ff349a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -206,6 +206,7 @@ common-wal = { path = "src/common/wal" } datanode = { path = "src/datanode" } datatypes = { path = "src/datatypes" } file-engine = { path = "src/file-engine" } +flow = { path = "src/flow" } frontend = { path = "src/frontend" } index = { path = "src/index" } log-store = { path = "src/log-store" } diff --git a/src/common/substrait/src/df_substrait.rs b/src/common/substrait/src/df_substrait.rs index cfc2736829b1..0730f0773b32 100644 --- a/src/common/substrait/src/df_substrait.rs +++ b/src/common/substrait/src/df_substrait.rs @@ -69,14 +69,21 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { fn encode(&self, plan: &Self::Plan) -> Result { let mut buf = BytesMut::new(); + + let substrait_plan = self.to_sub_plan(plan)?; + substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?; + + Ok(buf.freeze()) + } +} + +impl DFLogicalSubstraitConvertor { + pub fn to_sub_plan(&self, plan: &LogicalPlan) -> Result, Error> { let session_state = SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default())) .with_serializer_registry(Arc::new(ExtensionSerializer)); let context = SessionContext::new_with_state(session_state); - let substrait_plan = to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)?; - substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?; - - Ok(buf.freeze()) + to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu) } } diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs index ca7e28e8d142..8a03dd7308ed 100644 --- a/src/common/substrait/src/lib.rs +++ b/src/common/substrait/src/lib.rs @@ -23,11 +23,14 @@ use async_trait::async_trait; use bytes::{Buf, Bytes}; use datafusion::catalog::CatalogProviderList; use datafusion::execution::context::SessionState; +/// Re-export the Substrait module of datafusion, +/// note this is a different version of the `substrait_proto` crate +pub use datafusion_substrait::substrait as substrait_proto_df; +pub use datafusion_substrait::{logical_plan as df_logical_plan, variation_const}; use session::context::QueryContextRef; pub use substrait_proto; pub use crate::df_substrait::DFLogicalSubstraitConvertor; - #[async_trait] pub trait SubstraitPlan { type Error: std::error::Error; diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 28be2e4dae6b..510c92d6e8cf 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -9,27 +9,41 @@ workspace = true [dependencies] api.workspace = true +catalog.workspace = true +common-base.workspace = true common-decimal.workspace = true common-error.workspace = true +common-frontend.workspace = true common-macro.workspace = true +common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true -datafusion-substrait.workspace = true datatypes.workspace = true enum_dispatch = "0.3" +futures = "0.3" # This fork is simply for keeping our dependency in our org, and pin the version # it is the same with upstream repo +async-trait.workspace = true +common-meta.workspace = true +greptime-proto.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true +minstant = "0.1.7" +nom = "7.1.3" num-traits = "0.2" +prost.workspace = true +query.workspace = true serde.workspace = true servers.workspace = true +session.workspace = true smallvec.workspace = true snafu.workspace = true +store-api.workspace = true strum.workspace = true substrait.workspace = true +table.workspace = true tokio.workspace = true tonic.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index f53fae567453..9eb68a02c5f4 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -16,3 +16,6 @@ //! and communicating with other parts of the database pub(crate) mod error; +pub(crate) mod node_context; + +pub(crate) use node_context::FlownodeContext; diff --git a/src/flow/src/adapter/error.rs b/src/flow/src/adapter/error.rs index 03e244666d71..3cc74b900d41 100644 --- a/src/flow/src/adapter/error.rs +++ b/src/flow/src/adapter/error.rs @@ -16,12 +16,11 @@ use std::any::Any; +use common_error::ext::BoxedError; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; -use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; -use serde::{Deserialize, Serialize}; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; @@ -32,6 +31,16 @@ use crate::expr::EvalError; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("External error"))] + External { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Internal error"))] + Internal { location: Location, reason: String }, + /// TODO(discord9): add detailed location of column #[snafu(display("Failed to eval stream"))] Eval { @@ -47,6 +56,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Table not found: {msg}, meta error: {source}"))] + TableNotFoundMeta { + source: common_meta::error::Error, + msg: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Table already exist: {name}"))] TableAlreadyExist { name: String, @@ -62,6 +79,27 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid query plan: {source}"))] + InvalidQueryPlan { + source: query::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid query: prost can't decode substrait plan: {inner}"))] + InvalidQueryProst { + inner: api::DecodeError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid query, can't transform to substrait: {source}"))] + InvalidQuerySubstrait { + source: substrait::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid query: {reason}"))] InvalidQuery { reason: String, @@ -112,6 +150,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Unexpected: {reason}"))] + Unexpected { + reason: String, + #[snafu(implicit)] + location: Location, + }, } /// Result type for flow module @@ -124,14 +169,21 @@ impl ErrorExt for Error { StatusCode::Internal } &Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, - Self::TableNotFound { .. } => StatusCode::TableNotFound, - &Self::InvalidQuery { .. } | &Self::Plan { .. } | &Self::Datatypes { .. } => { - StatusCode::PlanQuery + Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => { + StatusCode::TableNotFound } - Self::NoProtoType { .. } => StatusCode::Unexpected, + Self::InvalidQueryPlan { .. } + | Self::InvalidQuerySubstrait { .. } + | Self::InvalidQueryProst { .. } + | &Self::InvalidQuery { .. } + | &Self::Plan { .. } + | &Self::Datatypes { .. } => StatusCode::PlanQuery, + Self::NoProtoType { .. } | Self::Unexpected { .. } => StatusCode::Unexpected, &Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported } + &Self::External { .. } => StatusCode::Unknown, + Self::Internal { .. } => StatusCode::Internal, } } diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs new file mode 100644 index 000000000000..345414182222 --- /dev/null +++ b/src/flow/src/adapter/node_context.rs @@ -0,0 +1,315 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Node context, prone to change with every incoming requests + +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::sync::Arc; + +use session::context::QueryContext; +use snafu::{OptionExt, ResultExt}; +use table::metadata::TableId; +use tokio::sync::{broadcast, mpsc}; + +use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; +use crate::expr::error::InternalSnafu; +use crate::expr::GlobalId; +use crate::repr::{DiffRow, RelationType, BROADCAST_CAP}; + +// TODO: refactor common types for flow to a separate module +/// FlowId is a unique identifier for a flow task +pub type FlowId = u64; +pub type TableName = [String; 3]; + +pub struct TableSource {} + +impl TableSource { + pub async fn get_table_name_schema( + &self, + _table_id: &TableId, + ) -> Result<(TableName, RelationType), Error> { + todo!() + } +} + +/// A context that holds the information of the dataflow +#[derive(Default)] +pub struct FlownodeContext { + /// mapping from source table to tasks, useful for schedule which task to run when a source table is updated + pub source_to_tasks: BTreeMap>, + /// mapping from task to sink table, useful for sending data back to the client when a task is done running + pub flow_to_sink: BTreeMap, + /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender + /// + /// Note that we are getting insert requests with table id, so we should use table id as the key + pub source_sender: BTreeMap>, + /// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table + /// + /// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key + /// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here + pub sink_receiver: BTreeMap< + TableName, + ( + mpsc::UnboundedSender, + mpsc::UnboundedReceiver, + ), + >, + /// store source in buffer for each source table, in case broadcast channel is full + pub send_buffer: BTreeMap>, + /// the schema of the table, query from metasrv or inferred from TypedPlan + pub schema: HashMap, + /// All the tables that have been registered in the worker + pub table_repr: IdToNameMap, + pub query_context: Option>, +} + +impl FlownodeContext { + // return number of rows it actual send(including what's in the buffer) + pub fn send(&mut self, table_id: TableId, rows: Vec) -> Result { + let sender = self + .source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + })?; + let send_buffer = self.send_buffer.entry(table_id).or_default(); + send_buffer.extend(rows); + let mut row_cnt = 0; + while let Some(row) = send_buffer.pop_front() { + if sender.len() >= BROADCAST_CAP { + break; + } + row_cnt += 1; + sender + .send(row) + .map_err(|err| { + InternalSnafu { + reason: format!( + "Failed to send row to table_id = {:?}, error = {:?}", + table_id, err + ), + } + .build() + }) + .with_context(|_| EvalSnafu)?; + } + + Ok(row_cnt) + } +} + +impl FlownodeContext { + /// mapping source table to task, and sink table to task in worker context + /// + /// also add their corresponding broadcast sender/receiver + pub fn register_task_src_sink( + &mut self, + task_id: FlowId, + source_table_ids: &[TableId], + sink_table_name: TableName, + ) { + for source_table_id in source_table_ids { + self.add_source_sender(*source_table_id); + self.source_to_tasks + .entry(*source_table_id) + .or_default() + .insert(task_id); + } + + self.add_sink_receiver(sink_table_name.clone()); + self.flow_to_sink.insert(task_id, sink_table_name); + } + + pub fn add_source_sender(&mut self, table_id: TableId) { + self.source_sender + .entry(table_id) + .or_insert_with(|| broadcast::channel(BROADCAST_CAP).0); + } + + pub fn add_sink_receiver(&mut self, table_name: TableName) { + self.sink_receiver + .entry(table_name) + .or_insert_with(mpsc::unbounded_channel::); + } + + pub fn get_source_by_global_id( + &self, + id: &GlobalId, + ) -> Result<&broadcast::Sender, Error> { + let table_id = self + .table_repr + .get_by_global_id(id) + .with_context(|| TableNotFoundSnafu { + name: format!("Global Id = {:?}", id), + })? + .1 + .with_context(|| TableNotFoundSnafu { + name: format!("Table Id = {:?}", id), + })?; + self.source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + }) + } + + pub fn get_sink_by_global_id( + &self, + id: &GlobalId, + ) -> Result, Error> { + let table_name = self + .table_repr + .get_by_global_id(id) + .with_context(|| TableNotFoundSnafu { + name: format!("{:?}", id), + })? + .0 + .with_context(|| TableNotFoundSnafu { + name: format!("Global Id = {:?}", id), + })?; + self.sink_receiver + .get(&table_name) + .map(|(s, _r)| s.clone()) + .with_context(|| TableNotFoundSnafu { + name: table_name.join("."), + }) + } +} + +impl FlownodeContext { + /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. + /// + /// Returns an error if no table has been registered with the provided names + pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> { + let id = self + .table_repr + .get_by_name(name) + .map(|(_tid, gid)| gid) + .with_context(|| TableNotFoundSnafu { + name: name.join("."), + })?; + let schema = self + .schema + .get(&id) + .cloned() + .with_context(|| TableNotFoundSnafu { + name: name.join("."), + })?; + Ok((id, schema)) + } + + /// Assign a global id to a table, if already assigned, return the existing global id + /// + /// require at least one of `table_name` or `table_id` to be `Some` + /// + /// and will try to fetch the schema from table info manager(if table exist now) + /// + /// NOTE: this will not actually render the table into collection referred as GlobalId + /// merely creating a mapping from table id to global id + pub async fn assign_global_id_to_table( + &mut self, + srv_map: &TableSource, + mut table_name: Option, + table_id: Option, + ) -> Result { + // if we can find by table name/id. not assign it + if let Some(gid) = table_name + .as_ref() + .and_then(|table_name| self.table_repr.get_by_name(table_name)) + .map(|(_, gid)| gid) + .or_else(|| { + table_id + .and_then(|id| self.table_repr.get_by_table_id(&id)) + .map(|(_, gid)| gid) + }) + { + Ok(gid) + } else { + let global_id = self.new_global_id(); + + if let Some(table_id) = table_id { + let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?; + table_name = table_name.or(Some(known_table_name)); + self.schema.insert(global_id, schema); + } // if we don't have table id, it means database havn't assign one yet or we don't need it + + self.table_repr.insert(table_name, table_id, global_id); + Ok(global_id) + } + } + + /// Assign a schema to a table + /// + /// TODO(discord9): error handling + pub fn assign_table_schema( + &mut self, + table_name: &TableName, + schema: RelationType, + ) -> Result<(), Error> { + let gid = self + .table_repr + .get_by_name(table_name) + .map(|(_, gid)| gid) + .unwrap(); + self.schema.insert(gid, schema); + Ok(()) + } + + /// Get a new global id + pub fn new_global_id(&self) -> GlobalId { + GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64) + } +} + +/// A tri-directional map that maps table name, table id, and global id +#[derive(Default, Debug)] +pub struct IdToNameMap { + name_to_global_id: HashMap, + id_to_global_id: HashMap, + global_id_to_name_id: BTreeMap, Option)>, +} + +impl IdToNameMap { + pub fn new() -> Self { + Default::default() + } + + pub fn insert(&mut self, name: Option, id: Option, global_id: GlobalId) { + name.clone() + .and_then(|name| self.name_to_global_id.insert(name.clone(), global_id)); + id.and_then(|id| self.id_to_global_id.insert(id, global_id)); + self.global_id_to_name_id.insert(global_id, (name, id)); + } + + pub fn get_by_name(&self, name: &TableName) -> Option<(Option, GlobalId)> { + self.name_to_global_id.get(name).map(|global_id| { + let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap(); + (*id, *global_id) + }) + } + + pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option, GlobalId)> { + self.id_to_global_id.get(id).map(|global_id| { + let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap(); + (name.clone(), *global_id) + }) + } + + pub fn get_by_global_id( + &self, + global_id: &GlobalId, + ) -> Option<(Option, Option)> { + self.global_id_to_name_id.get(global_id).cloned() + } +} diff --git a/src/flow/src/compute.rs b/src/flow/src/compute.rs index 294716edf0bc..8463039dcd8a 100644 --- a/src/flow/src/compute.rs +++ b/src/flow/src/compute.rs @@ -17,3 +17,7 @@ mod render; mod state; mod types; + +pub(crate) use render::Context; +pub(crate) use state::DataflowState; +pub(crate) use types::ErrCollector; diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index d0fdc1fdb919..db479eadb352 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -45,6 +45,7 @@ use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement}; mod map; mod reduce; +mod src_sink; /// The Context for build a Operator with id of `GlobalId` pub struct Context<'referred, 'df> { @@ -52,13 +53,15 @@ pub struct Context<'referred, 'df> { pub df: &'referred mut Hydroflow<'df>, pub compute_state: &'referred mut DataflowState, /// a list of all collections being used in the operator + /// + /// TODO(discord9): remove extra clone by counting usage and remove it on last usage? pub input_collection: BTreeMap, /// used by `Get`/`Let` Plan for getting/setting local variables /// /// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead - local_scope: Vec>, + pub local_scope: Vec>, // Collect all errors in this operator's evaluation - err_collector: ErrCollector, + pub err_collector: ErrCollector, } impl<'referred, 'df> Drop for Context<'referred, 'df> { @@ -235,7 +238,7 @@ mod test { for now in time_range { state.set_current_ts(now); state.run_available_with_schedule(df); - assert!(state.get_err_collector().inner.borrow().is_empty()); + assert!(state.get_err_collector().is_empty()); if let Some(expected) = expected.get(&now) { assert_eq!(*output.borrow(), *expected, "at ts={}", now); } else { diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 42646933919e..4c4b41953cb1 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -153,7 +153,7 @@ fn eval_mfp_core( ) -> Vec { let mut all_updates = Vec::new(); for (mut row, _sys_time, diff) in input.into_iter() { - // this updates is expected to be only zero to two rows + // this updates is expected to be only zero, one or two rows let updates = mfp_plan.evaluate::(&mut row.inner, now, diff); // TODO(discord9): refactor error handling // Expect error in a single row to not interrupt the whole evaluation diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index a024021ada9e..c8ebefb7fb04 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -80,7 +80,11 @@ impl<'referred, 'df> Context<'referred, 'df> { out_send_port, move |_ctx, recv, send| { // mfp only need to passively receive updates from recvs - let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter()); + let data = recv + .take_inner() + .into_iter() + .flat_map(|v| v.into_iter()) + .collect_vec(); reduce_subgraph( &reduce_arrange, @@ -378,9 +382,8 @@ fn reduce_accum_subgraph( let mut all_updates = Vec::with_capacity(key_to_vals.len()); let mut all_outputs = Vec::with_capacity(key_to_vals.len()); - // lock the arrange for write for the rest of function body - // so to prevent wide race condition since we are going to update the arrangement by write after read + // so to prevent wired race condition since we are going to update the arrangement by write after read // TODO(discord9): consider key-based lock let mut arrange = arrange.write(); for (key, value_diffs) in key_to_vals { @@ -395,6 +398,7 @@ fn reduce_accum_subgraph( } }; let (accums, _, _) = arrange.get(now, &key).unwrap_or_default(); + let accums = accums.inner; // deser accums from offsets diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs new file mode 100644 index 000000000000..77f3e4105382 --- /dev/null +++ b/src/flow/src/compute/render/src_sink.rs @@ -0,0 +1,161 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Source and Sink for the dataflow + +use std::collections::{BTreeMap, VecDeque}; + +use common_telemetry::{debug, info}; +use hydroflow::scheduled::graph_ext::GraphExt; +use itertools::Itertools; +use snafu::OptionExt; +use tokio::sync::{broadcast, mpsc}; + +use crate::adapter::error::{Error, PlanSnafu}; +use crate::compute::render::Context; +use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; +use crate::expr::GlobalId; +use crate::repr::{DiffRow, Row, BROADCAST_CAP}; + +#[allow(clippy::mutable_key_type)] +impl<'referred, 'df> Context<'referred, 'df> { + /// Render a source which comes from brocast channel into the dataflow + /// will immediately send updates not greater than `now` and buffer the rest in arrangement + pub fn render_source( + &mut self, + mut src_recv: broadcast::Receiver, + ) -> Result { + let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source"); + let arrange_handler = self.compute_state.new_arrange(None); + let arrange_handler_inner = + arrange_handler + .clone_future_only() + .with_context(|| PlanSnafu { + reason: "No write is expected at this point", + })?; + + let schd = self.compute_state.get_scheduler(); + let inner_schd = schd.clone(); + let now = self.compute_state.current_time_ref(); + let err_collector = self.err_collector.clone(); + + let sub = self + .df + .add_subgraph_source("source", send_port, move |_ctx, send| { + let now = *now.borrow(); + let arr = arrange_handler_inner.write().get_updates_in_range(..=now); + err_collector.run(|| arrange_handler_inner.write().compact_to(now)); + + let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d)); + let mut to_send = Vec::new(); + let mut to_arrange = Vec::new(); + + // TODO(discord9): handling tokio broadcast error + while let Ok((r, t, d)) = src_recv.try_recv() { + if t <= now { + to_send.push((r, t, d)); + } else { + to_arrange.push(((r, Row::empty()), t, d)); + } + } + let all = prev_avail.chain(to_send).collect_vec(); + if !all.is_empty() || !to_arrange.is_empty() { + debug!( + "All send: {} rows, not yet send: {} rows", + all.len(), + to_arrange.len() + ); + } + err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); + send.give(all); + // always schedule source to run at next tick + inner_schd.schedule_at(now + 1); + }); + schd.set_cur_subgraph(sub); + let arranged = Arranged::new(arrange_handler); + arranged.writer.borrow_mut().replace(sub); + let arranged = BTreeMap::from([(vec![], arranged)]); + Ok(CollectionBundle { + collection: Collection::from_port(recv_port), + arranged, + }) + } + + pub fn render_unbounded_sink( + &mut self, + bundle: CollectionBundle, + sender: mpsc::UnboundedSender, + ) { + let CollectionBundle { + collection, + arranged: _, + } = bundle; + + let _sink = self.df.add_subgraph_sink( + "UnboundedSink", + collection.into_inner(), + move |_ctx, recv| { + let data = recv.take_inner(); + for row in data.into_iter().flat_map(|i| i.into_iter()) { + // if the sender is closed, stop sending + if sender.is_closed() { + break; + } + // TODO(discord9): handling tokio error + let _ = sender.send(row); + } + }, + ); + } + + /// Render a sink which send updates to broadcast channel, have internal buffer in case broadcast channel is full + pub fn render_sink(&mut self, bundle: CollectionBundle, sender: broadcast::Sender) { + let CollectionBundle { + collection, + arranged: _, + } = bundle; + let mut buf = VecDeque::with_capacity(1000); + + let schd = self.compute_state.get_scheduler(); + let inner_schd = schd.clone(); + let now = self.compute_state.current_time_ref(); + + let sink = self + .df + .add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| { + let data = recv.take_inner(); + buf.extend(data.into_iter().flat_map(|i| i.into_iter())); + if sender.len() >= BROADCAST_CAP { + return; + } else { + while let Some(row) = buf.pop_front() { + // if the sender is full, stop sending + if sender.len() >= BROADCAST_CAP { + break; + } + // TODO(discord9): handling tokio broadcast error + let _ = sender.send(row); + } + } + + // if buffer is not empty, schedule the next run at next tick + // so the buffer can be drained as soon as possible + if !buf.is_empty() { + inner_schd.schedule_at(*now.borrow() + 1); + } + }); + + schd.set_cur_subgraph(sink); + } +} diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index 5bcb3a7ab121..a9a431de97ef 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -25,7 +25,7 @@ use crate::utils::{ArrangeHandler, Arrangement}; /// input/output of a dataflow /// One `ComputeState` manage the input/output/schedule of one `Hydroflow` -#[derive(Default)] +#[derive(Debug, Default)] pub struct DataflowState { /// it is important to use a deque to maintain the order of subgraph here /// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index e14b584a822d..fa8c7315cb4f 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -21,7 +21,8 @@ use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::handoff::TeeingHandoff; use hydroflow::scheduled::port::RecvPort; use hydroflow::scheduled::SubgraphId; -use tokio::sync::RwLock; +use itertools::Itertools; +use tokio::sync::{Mutex, RwLock}; use crate::compute::render::Context; use crate::expr::{EvalError, ScalarExpr}; @@ -146,14 +147,22 @@ impl CollectionBundle { /// /// Using a `VecDeque` to preserve the order of errors /// when running dataflow continuously and need errors in order -#[derive(Default, Clone)] +#[derive(Debug, Default, Clone)] pub struct ErrCollector { - pub inner: Rc>>, + pub inner: Arc>>, } impl ErrCollector { + pub async fn get_all(&self) -> Vec { + self.inner.lock().await.drain(..).collect_vec() + } + + pub fn is_empty(&self) -> bool { + self.inner.blocking_lock().is_empty() + } + pub fn push_err(&self, err: EvalError) { - self.inner.borrow_mut().push_back(err) + self.inner.blocking_lock().push_back(err) } pub fn run(&self, f: F) -> Option diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 20a6a2449023..5a2823423974 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -23,6 +23,11 @@ use datatypes::data_type::ConcreteDataType; use serde::{Deserialize, Serialize}; use snafu::{Location, Snafu}; +fn is_send_sync() { + fn check() {} + check::(); +} + /// EvalError is about errors happen on columnar evaluation /// /// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 518bb14aded9..c177dcd571ea 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -17,9 +17,9 @@ use std::collections::HashMap; use std::sync::OnceLock; +use common_telemetry::debug; use common_time::DateTime; use datafusion_expr::Operator; -use datafusion_substrait::logical_plan::consumer::name_to_op; use datatypes::data_type::ConcreteDataType; use datatypes::types::cast; use datatypes::types::cast::CastOption; @@ -28,6 +28,7 @@ use serde::{Deserialize, Serialize}; use smallvec::smallvec; use snafu::{ensure, OptionExt, ResultExt}; use strum::{EnumIter, IntoEnumIterator}; +use substrait::df_logical_plan::consumer::name_to_op; use crate::adapter::error::{Error, InvalidQuerySnafu, PlanSnafu}; use crate::expr::error::{ @@ -206,8 +207,9 @@ impl UnaryFunc { from: arg_ty, to: to.clone(), } - })?; - Ok(res) + }); + debug!("Cast to type: {to:?}, result: {:?}", res); + res } } } diff --git a/src/flow/src/expr/id.rs b/src/flow/src/expr/id.rs index 9b098f05b333..f88baa70ded6 100644 --- a/src/flow/src/expr/id.rs +++ b/src/flow/src/expr/id.rs @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize}; -/// Global id's scope is in Current Worker, and is cross-dataflow +/// Global id's scope is in Current Flow node, and is cross-dataflow #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] pub enum GlobalId { /// System namespace. diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index bcee991d647b..4506bf7a5507 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -206,12 +206,16 @@ impl AggregateFunc { .fail(); } }; - let input_type = arg_type.unwrap_or_else(ConcreteDataType::null_datatype); + let input_type = if matches!(generic_fn, GenericFn::Count) { + ConcreteDataType::null_datatype() + } else { + arg_type.unwrap_or_else(ConcreteDataType::null_datatype) + }; rule.get(&(generic_fn, input_type.clone())) .cloned() .with_context(|| InvalidQuerySnafu { reason: format!( - "No specialization found for binary function {:?} with input type {:?}", + "No specialization found for aggregate function {:?} with input type {:?}", generic_fn, input_type ), }) diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 5e5723e44dda..2ff4ebc67a5c 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -18,12 +18,15 @@ mod join; mod reduce; +use std::collections::BTreeSet; + use datatypes::arrow::ipc::Map; use serde::{Deserialize, Serialize}; use crate::adapter::error::Error; use crate::expr::{ - AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr, + AggregateExpr, EvalError, GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, + TypedExpr, }; use crate::plan::join::JoinPlan; pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; @@ -71,6 +74,7 @@ impl TypedPlan { let mfp = MapFilterProject::new(input_arity) .map(exprs)? .project(input_arity..input_arity + output_arity)?; + let out_typ = self.typ.apply_mfp(&mfp, &expr_typs)?; // special case for mfp to compose when the plan is already mfp let plan = match self.plan { Plan::Mfp { @@ -85,8 +89,7 @@ impl TypedPlan { mfp, }, }; - let typ = RelationType::new(expr_typs); - Ok(TypedPlan { typ, plan }) + Ok(TypedPlan { typ: out_typ, plan }) } /// Add a new filter to the plan, will filter out the records that do not satisfy the filter @@ -182,3 +185,45 @@ pub enum Plan { consolidate_output: bool, }, } + +impl Plan { + /// Find all the used collection in the plan + pub fn find_used_collection(&self) -> BTreeSet { + fn recur_find_use(plan: &Plan, used: &mut BTreeSet) { + match plan { + Plan::Get { id } => { + match id { + Id::Local(_) => (), + Id::Global(g) => { + used.insert(*g); + } + }; + } + Plan::Let { value, body, .. } => { + recur_find_use(value, used); + recur_find_use(body, used); + } + Plan::Mfp { input, .. } => { + recur_find_use(input, used); + } + Plan::Reduce { input, .. } => { + recur_find_use(input, used); + } + Plan::Join { inputs, .. } => { + for input in inputs { + recur_find_use(input, used); + } + } + Plan::Union { inputs, .. } => { + for input in inputs { + recur_find_use(input, used); + } + } + _ => {} + } + } + let mut ret = Default::default(); + recur_find_use(self, &mut ret); + ret + } +} diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index d21da3957978..85bdfa8e4abb 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -27,7 +27,7 @@ use datatypes::types::cast; use datatypes::types::cast::CastOption; use datatypes::value::Value; use itertools::Itertools; -pub(crate) use relation::{ColumnType, RelationDesc, RelationType}; +pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -51,6 +51,9 @@ pub type DiffRow = (Row, Timestamp, Diff); /// Row with key-value pair, timestamp and diff pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); +/// broadcast channel capacity +pub const BROADCAST_CAP: usize = 1024; + /// Convert a value that is or can be converted to Datetime to internal timestamp /// /// support types are: `Date`, `DateTime`, `TimeStamp`, `i64` @@ -104,6 +107,11 @@ impl Row { Self { inner: vec![] } } + /// Returns true if the Row contains no elements. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + /// Create a row from a vector of values pub fn new(row: Vec) -> Self { Self { inner: row } diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index fa110c4564e3..b10881d2c966 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{BTreeMap, HashMap}; + use datatypes::prelude::ConcreteDataType; +use itertools::Itertools; use serde::{Deserialize, Serialize}; -use snafu::ensure; +use snafu::{ensure, OptionExt}; -use crate::adapter::error::{InvalidQuerySnafu, Result}; +use crate::adapter::error::{InvalidQuerySnafu, Result, UnexpectedSnafu}; +use crate::expr::MapFilterProject; /// a set of column indices that are "keys" for the collection. -#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] +#[derive(Default, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] pub struct Key { /// indicate whose column form key pub column_indices: Vec, @@ -28,9 +32,7 @@ pub struct Key { impl Key { /// create a new Key pub fn new() -> Self { - Self { - column_indices: Vec::new(), - } + Default::default() } /// create a new Key from a vector of column indices @@ -96,6 +98,71 @@ pub struct RelationType { } impl RelationType { + /// Trying to apply a mpf on current types, will return a new RelationType + /// with the new types, will also try to preserve keys&time index information + /// if the old key&time index columns are preserve in given mfp + /// + /// i.e. old column of size 3, with a mfp's + /// + /// project = `[2, 1]`, + /// + /// the old key = `[1]`, old time index = `[2]`, + /// + /// then new key=`[1]`, new time index=`[0]` + /// + /// note that this function will remove empty keys like key=`[]` will be removed + pub fn apply_mfp(&self, mfp: &MapFilterProject, expr_typs: &[ColumnType]) -> Result { + let all_types = self + .column_types + .iter() + .chain(expr_typs.iter()) + .cloned() + .collect_vec(); + let mfp_out_types = mfp + .projection + .iter() + .map(|i| { + all_types.get(*i).cloned().with_context(|| UnexpectedSnafu { + reason: format!( + "MFP index out of bound, len is {}, but the index is {}", + all_types.len(), + *i + ), + }) + }) + .try_collect()?; + let old_to_new_col = BTreeMap::from_iter( + mfp.projection + .clone() + .into_iter() + .enumerate() + .map(|(new, old)| (old, new)), + ); + + // since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform + let keys = self + .keys + .iter() + .filter_map(|key| { + key.column_indices + .iter() + .map(|old| old_to_new_col.get(old).cloned()) + .collect::>>() + // remove empty keys + .and_then(|v| if v.is_empty() { None } else { Some(v) }) + .map(Key::from) + }) + .collect_vec(); + + let time_index = self + .time_index + .and_then(|old| old_to_new_col.get(&old).cloned()); + Ok(Self { + column_types: mfp_out_types, + keys, + time_index, + }) + } /// Constructs a `RelationType` representing the relation with no columns and /// no keys. pub fn empty() -> Self { diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index bc1b84cb04fa..d8c514f92011 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -14,12 +14,35 @@ //! Transform Substrait into execution plan use std::collections::HashMap; +use std::sync::Arc; +use common_error::ext::BoxedError; +use common_telemetry::info; use datatypes::data_type::ConcreteDataType as CDT; - -use crate::adapter::error::{Error, NotImplementedSnafu, TableNotFoundSnafu}; +use literal::{from_substrait_literal, from_substrait_type}; +use prost::Message; +use query::parser::QueryLanguageParser; +use query::plan::LogicalPlan; +use query::QueryEngine; +use session::context::QueryContext; +use snafu::{OptionExt, ResultExt}; +/// note here we are using the `substrait_proto_df` crate from the `substrait` module and +/// rename it to `substrait_proto` +use substrait::{ + substrait_proto_df as substrait_proto, DFLogicalSubstraitConvertor, SubstraitPlan, +}; +use substrait_proto::proto::extensions::simple_extension_declaration::MappingType; +use substrait_proto::proto::extensions::SimpleExtensionDeclaration; + +use crate::adapter::error::{ + Error, ExternalSnafu, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, + InvalidQuerySubstraitSnafu, NotImplementedSnafu, TableNotFoundSnafu, UnexpectedSnafu, +}; +use crate::adapter::FlownodeContext; use crate::expr::GlobalId; +use crate::plan::TypedPlan; use crate::repr::RelationType; + /// a simple macro to generate a not implemented error macro_rules! not_impl_err { ($($arg:tt)*) => { @@ -43,11 +66,6 @@ mod expr; mod literal; mod plan; -use literal::{from_substrait_literal, from_substrait_type}; -use snafu::OptionExt; -use substrait::substrait_proto::proto::extensions::simple_extension_declaration::MappingType; -use substrait::substrait_proto::proto::extensions::SimpleExtensionDeclaration; - /// In Substrait, a function can be define by an u32 anchor, and the anchor can be mapped to a name /// /// So in substrait plan, a ref to a function can be a single u32 anchor instead of a full name in string @@ -79,38 +97,34 @@ impl FunctionExtensions { } } -/// A context that holds the information of the dataflow -pub struct DataflowContext { - /// `id` refer to any source table in the dataflow, and `name` is the name of the table - /// which is a `Vec` in substrait - id_to_name: HashMap>, - /// see `id_to_name` - name_to_id: HashMap, GlobalId>, - /// the schema of the table - schema: HashMap, -} - -impl DataflowContext { - /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. - /// - /// Returns an error if no table has been registered with the provided names - pub fn table(&self, name: &Vec) -> Result<(GlobalId, RelationType), Error> { - let id = self - .name_to_id - .get(name) - .copied() - .with_context(|| TableNotFoundSnafu { - name: name.join("."), - })?; - let schema = self - .schema - .get(&id) - .cloned() - .with_context(|| TableNotFoundSnafu { - name: name.join("."), - })?; - Ok((id, schema)) - } +/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan, +/// then to a substrait plan, and finally to a flow plan. +pub async fn sql_to_flow_plan( + ctx: &mut FlownodeContext, + engine: &Arc, + sql: &str, +) -> Result { + let query_ctx = ctx.query_context.clone().ok_or_else(|| { + UnexpectedSnafu { + reason: "Query context is missing", + } + .build() + })?; + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).context(InvalidQueryPlanSnafu)?; + let plan = engine + .planner() + .plan(stmt, query_ctx) + .await + .context(InvalidQueryPlanSnafu)?; + let LogicalPlan::DfPlan(plan) = plan; + let sub_plan = DFLogicalSubstraitConvertor {} + .to_sub_plan(&plan) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan)?; + + Ok(flow_plan) } #[cfg(test)] @@ -124,22 +138,29 @@ mod test { use query::plan::LogicalPlan; use query::QueryEngine; use session::context::QueryContext; - use substrait::substrait_proto::proto; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; + use substrait_proto::proto; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use super::*; + use crate::adapter::node_context::IdToNameMap; use crate::repr::ColumnType; - pub fn create_test_ctx() -> DataflowContext { + pub fn create_test_ctx() -> FlownodeContext { let gid = GlobalId::User(0); - let name = vec!["numbers".to_string()]; + let name = [ + "greptime".to_string(), + "public".to_string(), + "numbers".to_string(), + ]; let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); - - DataflowContext { - id_to_name: HashMap::from([(gid, name.clone())]), - name_to_id: HashMap::from([(name.clone(), gid)]), + let mut tri_map = IdToNameMap::new(); + tri_map.insert(Some(name.clone()), Some(0), gid); + FlownodeContext { schema: HashMap::from([(gid, schema)]), + table_repr: tri_map, + query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), + ..Default::default() } } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index be4a42a9b568..abe3aef17776 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -16,11 +16,6 @@ use std::collections::HashMap; use common_decimal::Decimal128; use common_time::{Date, Timestamp}; -use datafusion_substrait::variation_const::{ - DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF, - TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF, - UNSIGNED_INTEGER_TYPE_REF, -}; use datatypes::arrow::compute::kernels::window; use datatypes::arrow::ipc::Binary; use datatypes::data_type::ConcreteDataType as CDT; @@ -28,21 +23,26 @@ use datatypes::value::Value; use hydroflow::futures::future::Map; use itertools::Itertools; use snafu::{OptionExt, ResultExt}; -use substrait::substrait_proto::proto::aggregate_function::AggregationInvocation; -use substrait::substrait_proto::proto::aggregate_rel::{Grouping, Measure}; -use substrait::substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference; -use substrait::substrait_proto::proto::expression::literal::LiteralType; -use substrait::substrait_proto::proto::expression::reference_segment::ReferenceType::StructField; -use substrait::substrait_proto::proto::expression::{ +use substrait::variation_const::{ + DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF, + TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF, + UNSIGNED_INTEGER_TYPE_REF, +}; +use substrait_proto::proto::aggregate_function::AggregationInvocation; +use substrait_proto::proto::aggregate_rel::{Grouping, Measure}; +use substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference; +use substrait_proto::proto::expression::literal::LiteralType; +use substrait_proto::proto::expression::reference_segment::ReferenceType::StructField; +use substrait_proto::proto::expression::{ IfThen, Literal, MaskExpression, RexType, ScalarFunction, }; -use substrait::substrait_proto::proto::extensions::simple_extension_declaration::MappingType; -use substrait::substrait_proto::proto::extensions::SimpleExtensionDeclaration; -use substrait::substrait_proto::proto::function_argument::ArgType; -use substrait::substrait_proto::proto::r#type::Kind; -use substrait::substrait_proto::proto::read_rel::ReadType; -use substrait::substrait_proto::proto::rel::RelType; -use substrait::substrait_proto::proto::{self, plan_rel, Expression, Plan as SubPlan, Rel}; +use substrait_proto::proto::extensions::simple_extension_declaration::MappingType; +use substrait_proto::proto::extensions::SimpleExtensionDeclaration; +use substrait_proto::proto::function_argument::ArgType; +use substrait_proto::proto::r#type::Kind; +use substrait_proto::proto::read_rel::ReadType; +use substrait_proto::proto::rel::RelType; +use substrait_proto::proto::{self, plan_rel, Expression, Plan as SubPlan, Rel}; use crate::adapter::error::{ DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, @@ -54,11 +54,11 @@ use crate::expr::{ }; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; -use crate::transform::{DataflowContext, FunctionExtensions}; +use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions}; impl TypedExpr { fn from_substrait_agg_grouping( - ctx: &mut DataflowContext, + ctx: &mut FlownodeContext, groupings: &[Grouping], typ: &RelationType, extensions: &FunctionExtensions, @@ -84,7 +84,7 @@ impl TypedExpr { impl AggregateExpr { fn from_substrait_agg_measures( - ctx: &mut DataflowContext, + ctx: &mut FlownodeContext, measures: &[Measure], typ: &RelationType, extensions: &FunctionExtensions, @@ -218,7 +218,7 @@ impl KeyValPlan { impl TypedPlan { /// Convert AggregateRel into Flow's TypedPlan pub fn from_substrait_agg_rel( - ctx: &mut DataflowContext, + ctx: &mut FlownodeContext, agg: &proto::AggregateRel, extensions: &FunctionExtensions, ) -> Result { @@ -228,7 +228,7 @@ impl TypedPlan { return not_impl_err!("Aggregate without an input is not supported"); }; - let group_expr = + let group_exprs = TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.typ, extensions)?; let mut aggr_exprs = @@ -236,14 +236,14 @@ impl TypedPlan { let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan( &mut aggr_exprs, - &group_expr, + &group_exprs, input.typ.column_types.len(), )?; let output_type = { let mut output_types = Vec::new(); // first append group_expr as key, then aggr_expr as value - for expr in &group_expr { + for expr in &group_exprs { output_types.push(expr.typ.clone()); } @@ -252,7 +252,8 @@ impl TypedPlan { aggr.func.signature().output.clone(), )); } - RelationType::new(output_types) + // TODO(discord9): try best to get time + RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec()) }; // copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs @@ -365,8 +366,8 @@ mod test { }; let expected = TypedPlan { typ: RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), true), - ColumnType::new(CDT::uint32_datatype(), false), + ColumnType::new(CDT::uint32_datatype(), true), // col sum(number) + ColumnType::new(CDT::uint32_datatype(), false), // col number ]), plan: Plan::Mfp { input: Box::new(Plan::Reduce { diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index c8bff7da5c81..f6eda483b226 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -17,11 +17,11 @@ use datatypes::data_type::ConcreteDataType as CDT; use itertools::Itertools; use snafu::{OptionExt, ResultExt}; -use substrait::substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference; -use substrait::substrait_proto::proto::expression::reference_segment::ReferenceType::StructField; -use substrait::substrait_proto::proto::expression::{IfThen, RexType, ScalarFunction}; -use substrait::substrait_proto::proto::function_argument::ArgType; -use substrait::substrait_proto::proto::Expression; +use substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference; +use substrait_proto::proto::expression::reference_segment::ReferenceType::StructField; +use substrait_proto::proto::expression::{IfThen, RexType, ScalarFunction}; +use substrait_proto::proto::function_argument::ArgType; +use substrait_proto::proto::Expression; use crate::adapter::error::{ DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, @@ -31,8 +31,7 @@ use crate::expr::{ }; use crate::repr::{ColumnType, RelationType}; use crate::transform::literal::{from_substrait_literal, from_substrait_type}; -use crate::transform::FunctionExtensions; - +use crate::transform::{substrait_proto, FunctionExtensions}; // TODO: found proper place for this /// ref to `arrow_schema::datatype` for type name fn typename_to_cdt(name: &str) -> CDT { diff --git a/src/flow/src/transform/literal.rs b/src/flow/src/transform/literal.rs index b41a82e26a4b..41008fd992d2 100644 --- a/src/flow/src/transform/literal.rs +++ b/src/flow/src/transform/literal.rs @@ -14,18 +14,19 @@ use common_decimal::Decimal128; use common_time::{Date, Timestamp}; -use datafusion_substrait::variation_const::{ +use datatypes::data_type::ConcreteDataType as CDT; +use datatypes::value::Value; +use substrait::variation_const::{ DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF, TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF, UNSIGNED_INTEGER_TYPE_REF, }; -use datatypes::data_type::ConcreteDataType as CDT; -use datatypes::value::Value; -use substrait::substrait_proto::proto::expression::literal::LiteralType; -use substrait::substrait_proto::proto::expression::Literal; -use substrait::substrait_proto::proto::r#type::Kind; +use substrait_proto::proto::expression::literal::LiteralType; +use substrait_proto::proto::expression::Literal; +use substrait_proto::proto::r#type::Kind; use crate::adapter::error::{Error, NotImplementedSnafu, PlanSnafu}; +use crate::transform::substrait_proto; /// Convert a Substrait literal into a Value and its ConcreteDataType (So that we can know type even if the value is null) pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Error> { @@ -109,9 +110,7 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro } /// convert a Substrait type into a ConcreteDataType -pub fn from_substrait_type( - null_type: &substrait::substrait_proto::proto::Type, -) -> Result { +pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result { if let Some(kind) = &null_type.kind { match kind { Kind::Bool(_) => Ok(CDT::boolean_datatype()), diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index fd73bb33d08d..d2c196fa72ae 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -14,21 +14,23 @@ use itertools::Itertools; use snafu::OptionExt; -use substrait::substrait_proto::proto::expression::MaskExpression; -use substrait::substrait_proto::proto::read_rel::ReadType; -use substrait::substrait_proto::proto::rel::RelType; -use substrait::substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel}; +use substrait_proto::proto::expression::MaskExpression; +use substrait_proto::proto::read_rel::ReadType; +use substrait_proto::proto::rel::RelType; +use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel}; -use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu}; +use crate::adapter::error::{ + Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu, +}; use crate::expr::{MapFilterProject, TypedExpr}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, RelationType}; -use crate::transform::{DataflowContext, FunctionExtensions}; +use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions}; impl TypedPlan { /// Convert Substrait Plan into Flow's TypedPlan pub fn from_substrait_plan( - ctx: &mut DataflowContext, + ctx: &mut FlownodeContext, plan: &SubPlan, ) -> Result { // Register function extension @@ -62,7 +64,7 @@ impl TypedPlan { /// Convert Substrait Rel into Flow's TypedPlan /// TODO: SELECT DISTINCT(does it get compile with something else?) pub fn from_substrait_rel( - ctx: &mut DataflowContext, + ctx: &mut FlownodeContext, rel: &Rel, extensions: &FunctionExtensions, ) -> Result { @@ -114,7 +116,30 @@ impl TypedPlan { } Some(RelType::Read(read)) => { if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type { - let table_reference = nt.names.clone(); + let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu { + reason: "Query context not found", + })?; + let table_reference = match nt.names.len() { + 1 => [ + query_ctx.current_catalog().to_string(), + query_ctx.current_schema().to_string(), + nt.names[0].clone(), + ], + 2 => [ + query_ctx.current_catalog().to_string(), + nt.names[0].clone(), + nt.names[1].clone(), + ], + 3 => [ + nt.names[0].clone(), + nt.names[1].clone(), + nt.names[2].clone(), + ], + _ => InvalidQuerySnafu { + reason: "Expect table to have name", + } + .fail()?, + }; let table = ctx.table(&table_reference)?; let get_table = Plan::Get { id: crate::expr::Id::Global(table.0),