From 2a49ce1402dcd26c6b0f67668dcd3226277f7655 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 20 May 2024 16:50:29 +0800 Subject: [PATCH 01/22] fix: optional args of tumble --- src/flow/src/transform/aggr.rs | 100 +++++++++++++++++++++++++++++++++ src/flow/src/transform/expr.rs | 10 +++- 2 files changed, 108 insertions(+), 2 deletions(-) diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index d21df2cf6907..50ffd8e46d98 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -435,6 +435,106 @@ mod test { use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + #[tokio::test] + async fn test_tumble_parse_optional() { + let engine = create_test_query_engine(); + let sql = "SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour')"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_expr = AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }; + let expected = TypedPlan { + typ: RelationType::new(vec![ + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]), + // TODO(discord9): mfp indirectly ref to key columns + /* + .with_key(vec![1]) + .with_time_index(Some(0)),*/ + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types(RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ])), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ]) + .unwrap() + .project(vec![2, 3]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .project(vec![0, 1]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ColumnType::new(CDT::uint64_datatype(), true), //sum(number) + ]) + .with_key(vec![1]) + .with_time_index(Some(0)), + ), + ), + mfp: MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(2), + ScalarExpr::Column(3), + ScalarExpr::Column(0), + ScalarExpr::Column(1), + ]) + .unwrap() + .project(vec![4, 5, 6]) + .unwrap(), + }, + }; + assert_eq!(flow_plan, expected); + } + #[tokio::test] async fn test_tumble_parse() { let engine = create_test_query_engine(); diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 7e0dc2df3b62..9058b98c0971 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -102,7 +102,7 @@ impl TypedExpr { match arg_len { // because variadic function can also have 1 arguments, we need to check if it's a variadic function first - 1 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => { + 1 if UnaryFunc::from_str_and_type(fn_name, None).is_ok() => { let func = UnaryFunc::from_str_and_type(fn_name, None)?; let arg = arg_exprs[0].clone(); let ret_type = ColumnType::new_nullable(func.signature().output.clone()); @@ -125,7 +125,13 @@ impl TypedExpr { Ok(TypedExpr::new(arg.call_unary(func), ret_type)) } // because variadic function can also have 2 arguments, we need to check if it's a variadic function first - 2 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => { + 2 if BinaryFunc::from_str_expr_and_type( + fn_name, + &arg_exprs, + arg_types.get(0..2).expect("arg have 2 elements"), + ) + .is_ok() => + { let (func, signature) = BinaryFunc::from_str_expr_and_type(fn_name, &arg_exprs, &arg_types[0..2])?; From a6fd5184b2f1acd55a50d2ee0e98526244fa9906 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 20 May 2024 17:41:23 +0800 Subject: [PATCH 02/22] fix(WIP): choose --- src/flow/src/adapter.rs | 13 ++-- src/flow/src/transform/aggr.rs | 127 +++++++++++++++++++++++++++++++++ src/flow/src/utils.rs | 2 + 3 files changed, 135 insertions(+), 7 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 25bb3cb2bf2c..8e2e7cdecdae 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -653,21 +653,20 @@ impl FlownodeManager { /// /// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid /// TSO coord mess -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct FlowTickManager { start: Instant, -} - -impl std::fmt::Debug for FlowTickManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FlowTickManager").finish() - } + start_timestamp: repr::Timestamp, } impl FlowTickManager { pub fn new() -> Self { FlowTickManager { start: Instant::now(), + start_timestamp: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as repr::Timestamp, } } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 50ffd8e46d98..25506853432e 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -435,6 +435,133 @@ mod test { use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + #[tokio::test] + async fn test_tumble_compsite() { + let engine = create_test_query_engine(); + let sql = + "SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_exprs = vec![ + AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }, + AggregateExpr { + func: AggregateFunc::Count, + expr: ScalarExpr::Column(0), + distinct: false, + }, + ]; + let avg_expr = ScalarExpr::If { + cond: Box::new(ScalarExpr::Column(4).call_binary( + ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()), + BinaryFunc::NotEq, + )), + then: Box::new(ScalarExpr::Column(3).call_binary( + ScalarExpr::Column(4).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())), + BinaryFunc::DivUInt64, + )), + els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), + }; + let expected = TypedPlan { + typ: RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), // number + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]), + // TODO(discord9): mfp indirectly ref to key columns + /* + .with_key(vec![1]) + .with_time_index(Some(0)),*/ + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types(RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ])), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 3_600_000_000_000, + ), + start_time: None, + }, + ), + ScalarExpr::Column(0), + ]) + .unwrap() + .project(vec![2, 3, 4]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .project(vec![0, 1]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: aggr_exprs.clone(), + simple_aggrs: vec![ + AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0), + AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1), + ], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::datetime_datatype(), false), // window start(time index) + ColumnType::new(CDT::datetime_datatype(), false), // window end(pk) + ColumnType::new(CDT::uint32_datatype(), false), // number(pk) + ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) + ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number) + ]) + .with_key(vec![1, 2]) + .with_time_index(Some(0)), + ), + ), + mfp: MapFilterProject::new(5) + .map(vec![ + avg_expr, + ScalarExpr::Column(2), + ScalarExpr::Column(5), + ScalarExpr::Column(0), + ScalarExpr::Column(1), + ]) + .unwrap() + .project(vec![6, 7, 8, 9]) + .unwrap(), + }, + }; + assert_eq!(flow_plan, expected); + } + #[tokio::test] async fn test_tumble_parse_optional() { let engine = create_test_query_engine(); diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 93edf176e77a..599dc4541aa4 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -208,7 +208,9 @@ impl Arrangement { for ((key, val), update_ts, diff) in updates { // check if the key is expired if let Some(s) = &mut self.expire_state { + dbg!(now, &key, &s); if let Some(expired_by) = s.update_event_ts(now, &key)? { + dbg!(expired_by, &key); max_expired_by = max_expired_by.max(Some(expired_by)); continue; } From 685c6dc8c342a0e89f6db9badb248ff39a7eca58 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 20 May 2024 19:10:35 +0800 Subject: [PATCH 03/22] feat: rename default ts to GREPTIME_TIMESTAMP --- src/flow/src/adapter.rs | 7 ++++--- src/flow/src/transform/aggr.rs | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 8e2e7cdecdae..5c70862109a8 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -26,6 +26,7 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_frontend::handler::FrontendInvoker; use common_meta::key::TableMetadataManagerRef; +use common_query::prelude::GREPTIME_TIMESTAMP; use common_runtime::JoinHandle; use common_telemetry::{debug, info}; use datatypes::schema::ColumnSchema; @@ -281,7 +282,7 @@ impl FlownodeManager { let schema = meta.schema.column_schemas; let is_auto_create = schema .last() - .map(|s| s.name == "__ts_placeholder") + .map(|s| s.name == GREPTIME_TIMESTAMP) .unwrap_or(false); (primary_keys, schema, is_auto_create) } else { @@ -318,7 +319,7 @@ impl FlownodeManager { ); // TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one let ts_col = ColumnSchema::new( - "__ts_placeholder", + "GREPTIME_TIMESTAMP", ConcreteDataType::timestamp_millisecond_datatype(), true, ) @@ -676,6 +677,6 @@ impl FlowTickManager { pub fn tick(&self) -> repr::Timestamp { let current = Instant::now(); let since_the_epoch = current - self.start; - since_the_epoch.as_millis() as repr::Timestamp + since_the_epoch.as_millis() as repr::Timestamp + self.start_timestamp } } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 25506853432e..67f0ac588adc 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -434,7 +434,7 @@ mod test { use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; - + /// TODO(discord9): add more illegal sql tests #[tokio::test] async fn test_tumble_compsite() { let engine = create_test_query_engine(); From e460f4df785fed85b2669e8b1f702e8d7baa9d8a Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 20 May 2024 19:34:00 +0800 Subject: [PATCH 04/22] fix: default timestamp name --- src/flow/src/adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 5c70862109a8..712cd5cbf200 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -319,7 +319,7 @@ impl FlownodeManager { ); // TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one let ts_col = ColumnSchema::new( - "GREPTIME_TIMESTAMP", + GREPTIME_TIMESTAMP, ConcreteDataType::timestamp_millisecond_datatype(), true, ) From 14d7522f73e2e26d9d92b6c80dbdb878e878e527 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 22 May 2024 14:55:33 +0800 Subject: [PATCH 05/22] fix: reorder write requests --- src/flow/src/adapter.rs | 10 ++++- src/flow/src/adapter/flownode_impl.rs | 53 ++++++++++++++++++++++++++- src/flow/src/adapter/node_context.rs | 8 ++-- src/flow/src/adapter/table_source.rs | 32 ++++++++++------ src/flow/src/repr/relation.rs | 8 +++- src/flow/src/transform.rs | 4 +- src/flow/src/transform/plan.rs | 2 +- src/flow/src/utils.rs | 2 - 8 files changed, 93 insertions(+), 26 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 712cd5cbf200..340ab6991a44 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -303,6 +303,7 @@ impl FlownodeManager { .clone(); // TODO(discord9): use default key from schema let primary_keys = schema + .typ() .keys .first() .map(|v| { @@ -326,11 +327,18 @@ impl FlownodeManager { .with_time_index(true); let wout_ts = schema + .typ() .column_types + .clone() .into_iter() .enumerate() .map(|(idx, typ)| { - ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable) + let name = schema + .names + .get(idx) + .cloned() + .unwrap_or(format!("Col_{}", idx)); + ColumnSchema::new(name, typ.scalar_type, typ.nullable) }) .collect_vec(); diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index e770bb5e4cf1..9b9b6d3626a6 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -14,13 +14,17 @@ //! impl `FlowNode` trait for FlowNodeManager so standalone can call them +use std::collections::HashMap; + use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; use api::v1::region::InsertRequests; use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; use common_meta::node_manager::Flownode; +use common_telemetry::debug; use itertools::Itertools; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use crate::adapter::FlownodeManager; use crate::repr::{self, DiffRow}; @@ -101,12 +105,57 @@ impl Flownode for FlownodeManager { async fn handle_inserts(&self, request: InsertRequests) -> Result { for write_request in request.requests { let region_id = write_request.region_id; - let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]); + let table_id = RegionId::from(region_id).table_id(); + + let (insert_schema, rows_proto) = write_request + .rows + .map(|r| (r.schema, r.rows)) + .unwrap_or_default(); + // TODO(discord9): reconsider time assignment mechanism let now = self.tick_manager.tick(); + + let fetch_order = { + let ctx = self.node_context.lock().await; + let table_col_names = ctx + .table_repr + .get_by_table_id(&table_id) + .map(|r| r.1) + .and_then(|id| ctx.schema.get(&id)) + .map(|desc| &desc.names) + .context(UnexpectedSnafu { + err_msg: format!("Table not found: {}", table_id), + })?; + let name_to_col = HashMap::<_, _>::from_iter( + insert_schema + .iter() + .enumerate() + .map(|(i, name)| (&name.column_name, i)), + ); + let fetch_order: Vec = table_col_names + .iter() + .map(|names| { + name_to_col.get(names).copied().context(UnexpectedSnafu { + err_msg: format!("Column not found: {}", names), + }) + }) + .try_collect()?; + if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) { + debug!("Reordering columns: {:?}", fetch_order) + } + fetch_order + }; + let rows: Vec = rows_proto .into_iter() .map(repr::Row::from) + .map(|r| { + let reordered = fetch_order + .iter() + .map(|&i| r.inner[i].clone()) + .collect_vec(); + repr::Row::new(reordered) + }) .map(|r| (r, now, 1)) .collect_vec(); self.handle_write_request(region_id.into(), rows) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index b1d01373fb8a..bc88d2bbe555 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -27,7 +27,7 @@ use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::adapter::{FlowId, TableName, TableSource}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; -use crate::repr::{DiffRow, RelationType, BROADCAST_CAP}; +use crate::repr::{DiffRow, RelationDesc, RelationType, BROADCAST_CAP}; /// A context that holds the information of the dataflow #[derive(Default, Debug)] @@ -54,7 +54,7 @@ pub struct FlownodeContext { /// store source in buffer for each source table, in case broadcast channel is full pub send_buffer: BTreeMap>, /// the schema of the table, query from metasrv or inferred from TypedPlan - pub schema: HashMap, + pub schema: HashMap, /// All the tables that have been registered in the worker pub table_repr: IdToNameMap, pub query_context: Option>, @@ -226,7 +226,7 @@ impl FlownodeContext { /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. /// /// Returns an error if no table has been registered with the provided names - pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> { + pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> { let id = self .table_repr .get_by_name(name) @@ -297,7 +297,7 @@ impl FlownodeContext { .get_by_name(table_name) .map(|(_, gid)| gid) .unwrap(); - self.schema.insert(gid, schema); + self.schema.insert(gid, schema.into_named(vec![])); Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index cfa41f785ac8..36e9cd7561eb 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -17,7 +17,6 @@ use common_error::ext::BoxedError; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameManager}; -use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; @@ -25,7 +24,7 @@ use crate::adapter::error::{ Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; use crate::adapter::TableName; -use crate::repr::{self, ColumnType, RelationType}; +use crate::repr::{self, ColumnType, RelationDesc, RelationType}; /// mapping of table name <-> table id should be query from tableinfo manager pub struct TableSource { @@ -107,7 +106,7 @@ impl TableSource { pub async fn get_table_name_schema( &self, table_id: &TableId, - ) -> Result<(TableName, RelationType), Error> { + ) -> Result<(TableName, RelationDesc), Error> { let table_info_value = self .get_table_info_value(table_id) .await? @@ -123,14 +122,20 @@ impl TableSource { ]; let raw_schema = table_info_value.table_info.meta.schema; - let column_types = raw_schema + let (col_names, column_types): (Vec<_>, Vec<_>) = raw_schema .column_schemas + .clone() .into_iter() - .map(|col| ColumnType { - nullable: col.is_nullable(), - scalar_type: col.data_type, + .map(|col| { + ( + col.name.clone(), + ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }, + ) }) - .collect_vec(); + .unzip(); let key = table_info_value.table_info.meta.primary_key_indices; let keys = vec![repr::Key::from(key)]; @@ -138,10 +143,13 @@ impl TableSource { let time_index = raw_schema.timestamp_index; Ok(( table_name, - RelationType { - column_types, - keys, - time_index, + RelationDesc { + typ: RelationType { + column_types, + keys, + time_index, + }, + names: col_names, }, )) } diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 59edb31616fa..48f4de2894f0 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -262,6 +262,10 @@ impl RelationType { true } + + pub fn into_named(self, names: Vec) -> RelationDesc { + RelationDesc { typ: self, names } + } } /// The type of a `Value` @@ -325,8 +329,8 @@ fn return_true() -> bool { /// Individual column names are optional. #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] pub struct RelationDesc { - typ: RelationType, - names: Vec, + pub typ: RelationType, + pub names: Vec, } impl RelationDesc { diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 9fe0b73d3642..03927f516962 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -211,7 +211,7 @@ mod test { let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema); + schemas.insert(gid, schema.into_named(vec![])); } { @@ -225,7 +225,7 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), ColumnType::new(CDT::datetime_datatype(), false), ]); - schemas.insert(gid, schema); + schemas.insert(gid, schema.into_named(vec![])); tri_map.insert(Some(name.clone()), Some(1025), gid); } diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 0dedc9e5356b..337eba7eef45 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -269,7 +269,7 @@ impl TypedPlan { id: crate::expr::Id::Global(table.0), }; let get_table = TypedPlan { - typ: table.1, + typ: table.1.typ().clone(), plan: get_table, }; diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 599dc4541aa4..93edf176e77a 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -208,9 +208,7 @@ impl Arrangement { for ((key, val), update_ts, diff) in updates { // check if the key is expired if let Some(s) = &mut self.expire_state { - dbg!(now, &key, &s); if let Some(expired_by) = s.update_event_ts(now, &key)? { - dbg!(expired_by, &key); max_expired_by = max_expired_by.max(Some(expired_by)); continue; } From 06c22133539d3abd6b3d4cdba839cc2999cfaf8d Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 22 May 2024 17:50:26 +0800 Subject: [PATCH 06/22] fix: expire state --- src/flow/src/compute/render/reduce.rs | 20 ++++++++++++- src/flow/src/expr/error.rs | 7 +++++ src/flow/src/expr/scalar.rs | 2 ++ src/flow/src/transform/aggr.rs | 25 +++++++++------- src/flow/src/utils.rs | 41 ++++++++++++++++++++++----- 5 files changed, 76 insertions(+), 19 deletions(-) diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index e46f8c2bedc3..7a443e999140 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -26,7 +26,7 @@ use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::{Context, SubgraphArg}; use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; -use crate::expr::error::{DataTypeSnafu, InternalSnafu}; +use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu}; use crate::expr::{AggregateExpr, EvalError, ScalarExpr}; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; @@ -397,6 +397,24 @@ fn reduce_accum_subgraph( // TODO(discord9): consider key-based lock let mut arrange = arrange.write(); for (key, value_diffs) in key_to_vals { + if let Some(expire_man) = &arrange.get_expire_state() { + let mut is_expired = false; + err_collector.run(|| { + if let Some(expired) = expire_man.get_expire_duration(now, &key)? { + is_expired = true; + DataAlreadyExpiredSnafu { + expired_by: expired, + } + .fail() + } else { + Ok(()) + } + }); + if is_expired { + // errors already collected, we can just continue to next key + continue; + } + } let col_diffs = { let row_len = value_diffs[0].0.len(); let res = err_collector.run(|| get_col_diffs(value_diffs, row_len)); diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 5a2823423974..09ad758056ba 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -100,4 +100,11 @@ pub enum EvalError { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Incoming data already expired by {} ms", expired_by))] + DataAlreadyExpired { + expired_by: i64, + #[snafu(implicit)] + location: Location, + }, } diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index dfd5fcd0f214..984d6f1a44a6 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -45,6 +45,8 @@ impl TypedExpr { impl TypedExpr { /// expand multi-value expression to multiple expressions with new indices + /// + /// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling` pub fn expand_multi_value( input_typ: &RelationType, exprs: &[TypedExpr], diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 67f0ac588adc..1217281b1bce 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -434,9 +434,10 @@ mod test { use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + /// TODO(discord9): add more illegal sql tests #[tokio::test] - async fn test_tumble_compsite() { + async fn test_tumble_composite() { let engine = create_test_query_engine(); let sql = "SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; @@ -469,12 +470,6 @@ mod test { els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), }; let expected = TypedPlan { - typ: RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), // number - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ]), // TODO(discord9): mfp indirectly ref to key columns /* .with_key(vec![1]) @@ -536,11 +531,13 @@ mod test { } .with_types( RelationType::new(vec![ + // keys ColumnType::new(CDT::datetime_datatype(), false), // window start(time index) ColumnType::new(CDT::datetime_datatype(), false), // window end(pk) ColumnType::new(CDT::uint32_datatype(), false), // number(pk) - ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) - ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number) + // values + ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) + ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number) ]) .with_key(vec![1, 2]) .with_time_index(Some(0)), @@ -548,8 +545,8 @@ mod test { ), mfp: MapFilterProject::new(5) .map(vec![ - avg_expr, - ScalarExpr::Column(2), + avg_expr, // avg(number) + ScalarExpr::Column(2), // number(pk) ScalarExpr::Column(5), ScalarExpr::Column(0), ScalarExpr::Column(1), @@ -558,6 +555,12 @@ mod test { .project(vec![6, 7, 8, 9]) .unwrap(), }, + typ: RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), // number + ColumnType::new(CDT::uint64_datatype(), true), // avg(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]), }; assert_eq!(flow_plan, expected); } diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 93edf176e77a..ea1885e50650 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::Bound; use std::sync::Arc; +use common_telemetry::debug; use itertools::Itertools; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; @@ -86,7 +87,7 @@ impl KeyExpiryManager { /// /// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired. /// - If it's not expired, return None - pub fn update_event_ts( + pub fn get_expire_duration_and_update_event_ts( &mut self, now: Timestamp, row: &Row, @@ -95,6 +96,26 @@ impl KeyExpiryManager { return Ok(None); }; + self.event_ts_to_key + .entry(event_ts) + .or_default() + .insert(row.clone()); + + self.get_expire_duration(now, row) + } + + /// Get the expire duration of a key, if it's expired by now. + /// + /// Return None if the key is not expired + pub fn get_expire_duration( + &self, + now: Timestamp, + row: &Row, + ) -> Result, EvalError> { + let Some(event_ts) = self.extract_event_ts(row)? else { + return Ok(None); + }; + if let Some(expire_time) = self.compute_expiration_timestamp(now) { if expire_time > event_ts { // return how much time it's expired @@ -102,10 +123,6 @@ impl KeyExpiryManager { } } - self.event_ts_to_key - .entry(event_ts) - .or_default() - .insert(row.clone()); Ok(None) } @@ -189,6 +206,10 @@ impl Arrangement { } } + pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> { + self.expire_state.as_ref() + } + pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) { self.expire_state = Some(expire_state); } @@ -208,8 +229,12 @@ impl Arrangement { for ((key, val), update_ts, diff) in updates { // check if the key is expired if let Some(s) = &mut self.expire_state { - if let Some(expired_by) = s.update_event_ts(now, &key)? { + if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? { max_expired_by = max_expired_by.max(Some(expired_by)); + debug!( + "Expired key: {:?}, expired by: {:?} with time being now={}", + key, expired_by, now + ); continue; } } @@ -335,7 +360,9 @@ impl Arrangement { for (key, updates) in batch { // check if the key is expired if let Some(s) = &mut self.expire_state { - if let Some(expired_by) = s.update_event_ts(now, &key)? { + if let Some(expired_by) = + s.get_expire_duration_and_update_event_ts(now, &key)? + { max_expired_by = max_expired_by.max(Some(expired_by)); continue; } From 1417aaae21a8a39168384acf71bfb9976cf5e526 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 May 2024 15:43:25 +0800 Subject: [PATCH 07/22] fix: test of tumble --- src/flow/src/transform/aggr.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 1217281b1bce..8b69146c153a 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -537,7 +537,7 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), // number(pk) // values ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) - ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number) + ColumnType::new(CDT::int64_datatype(), true), // avg.count(number) ]) .with_key(vec![1, 2]) .with_time_index(Some(0)), @@ -545,11 +545,11 @@ mod test { ), mfp: MapFilterProject::new(5) .map(vec![ - avg_expr, // avg(number) + avg_expr, ScalarExpr::Column(2), // number(pk) - ScalarExpr::Column(5), - ScalarExpr::Column(0), - ScalarExpr::Column(1), + ScalarExpr::Column(5), // avg.sum(number) + ScalarExpr::Column(0), // window start + ScalarExpr::Column(1), // window end ]) .unwrap() .project(vec![6, 7, 8, 9]) From 78545f003ccd9fe8f652177f439b70c6d5994a2e Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 May 2024 20:58:40 +0800 Subject: [PATCH 08/22] fix: send buf clear --- src/flow/src/adapter.rs | 11 ++++++----- src/flow/src/adapter/node_context.rs | 10 ++++++++-- src/flow/src/compute/render/src_sink.rs | 4 ++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 340ab6991a44..e0d0428fc398 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -521,18 +521,19 @@ impl FlownodeManager { worker.lock().await.run_available(now).await.unwrap(); } // first check how many inputs were sent - let send_cnt = match self.node_context.lock().await.flush_all_sender() { - Ok(cnt) => cnt, + match self.node_context.lock().await.flush_all_sender() { + Ok(_) => (), Err(err) => { common_telemetry::error!("Flush send buf errors: {:?}", err); break; } }; - // if no inputs - if send_cnt == 0 { + // if no thing in send buf then break + let buf_len = self.node_context.lock().await.get_send_buf_size(); + if buf_len == 0 { break; } else { - debug!("FlownodeManager::run_available: send_cnt={}", send_cnt); + debug!("Send buf len = {}", buf_len); } } diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index bc88d2bbe555..a5db59698dc0 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -51,8 +51,6 @@ pub struct FlownodeContext { mpsc::UnboundedReceiver, ), >, - /// store source in buffer for each source table, in case broadcast channel is full - pub send_buffer: BTreeMap>, /// the schema of the table, query from metasrv or inferred from TypedPlan pub schema: HashMap, /// All the tables that have been registered in the worker @@ -109,6 +107,7 @@ impl SourceSender { } if row_cnt > 0 { debug!("Send {} rows", row_cnt); + debug!("Send buf len = {}", self.send_buf.len()); } Ok(row_cnt) @@ -140,12 +139,19 @@ impl FlownodeContext { } /// flush all sender's buf + /// + /// return numbers being sent pub fn flush_all_sender(&mut self) -> Result { self.source_sender .iter_mut() .map(|(_table_id, src_sender)| src_sender.try_send_all()) .try_fold(0, |acc, x| x.map(|x| x + acc)) } + + /// Return the sum number of rows in all send buf + pub fn get_send_buf_size(&self) -> usize { + self.source_sender.values().map(|v| v.send_buf.len()).sum() + } } impl FlownodeContext { diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 33ecb9670caa..66159c35c687 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -79,8 +79,8 @@ impl<'referred, 'df> Context<'referred, 'df> { } err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); send.give(all); - // always schedule source to run at next tick - inner_schd.schedule_at(now + 1); + // always schedule source to run at now so we can repeatly run source if needed + inner_schd.schedule_at(now); }); schd.set_cur_subgraph(sub); let arranged = Arranged::new(arrange_handler); From a019362abff196a50aeeb1b021d4ed24d479252a Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 May 2024 21:02:07 +0800 Subject: [PATCH 09/22] fix: ts { let ts = get_ts_as_millisecond(arg)?; - let start_time = start_time.map(|t| t.val()).unwrap_or(0); + let start_time = start_time.map(|t| t.val()); let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond - let window_start = start_time + (ts - start_time) / window_size * window_size; + let window_start = get_window_start(ts, window_size, start_time); let ret = Timestamp::new_millisecond(window_start); Ok(Value::from(ret)) @@ -290,9 +290,9 @@ impl UnaryFunc { start_time, } => { let ts = get_ts_as_millisecond(arg)?; - let start_time = start_time.map(|t| t.val()).unwrap_or(0); + let start_time = start_time.map(|t| t.val()); let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond - let window_start = start_time + (ts - start_time) / window_size * window_size; + let window_start = get_window_start(ts, window_size, start_time); let window_end = window_start + window_size; let ret = Timestamp::new_millisecond(window_end); @@ -302,6 +302,31 @@ impl UnaryFunc { } } +fn get_window_start( + ts: repr::Timestamp, + window_size: repr::Duration, + start_time: Option, +) -> repr::Timestamp { + let start_time = start_time.unwrap_or(0); + // left close right open + if ts >= start_time { + start_time + (ts - start_time) / window_size * window_size + } else { + start_time + (ts - start_time) / window_size * window_size + - (start_time - ts) % window_size * window_size + } +} + +#[test] +fn test_get_window_start() { + assert_eq!(get_window_start(1, 2, None), 0); + assert_eq!(get_window_start(2, 2, None), 2); + assert_eq!(get_window_start(0, 2, None), 0); + + assert_eq!(get_window_start(-1, 2, None), -2); + assert_eq!(get_window_start(-2, 2, None), -2); +} + fn get_ts_as_millisecond(arg: Value) -> Result { let ts = if let Some(ts) = arg.as_timestamp() { ts.convert_to(TimeUnit::Millisecond) From b188bfd0c10fa6860495cc37c86a497c119ac110 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 May 2024 21:10:46 +0800 Subject: [PATCH 10/22] fix: window_start when ts Result { From 62f23a3861ac1dab3ecd86983b6aed5e760ea021 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 May 2024 21:13:18 +0800 Subject: [PATCH 11/22] mend --- src/flow/src/compute/render/src_sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 66159c35c687..2371a98edd82 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -79,7 +79,7 @@ impl<'referred, 'df> Context<'referred, 'df> { } err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); send.give(all); - // always schedule source to run at now so we can repeatly run source if needed + // always schedule source to run at now so we can repeatedly run source if needed inner_schd.schedule_at(now); }); schd.set_cur_subgraph(sub); From 9e48e7bea29e566007b0de376112a2e2f3a10723 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 24 May 2024 16:01:54 +0800 Subject: [PATCH 12/22] fix: range begin>range end --- src/flow/src/adapter.rs | 5 +++-- src/flow/src/compute/render/map.rs | 8 ++++++-- src/flow/src/compute/render/reduce.rs | 8 ++++++-- src/flow/src/compute/render/src_sink.rs | 9 ++++++--- src/flow/src/utils.rs | 4 ++++ 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index e0d0428fc398..9e3bcea199e3 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -513,9 +513,8 @@ impl FlownodeManager { /// However this is not blocking and can sometimes return while actual computation is still running in worker thread /// TODO(discord9): add flag for subgraph that have input since last run pub async fn run_available(&self) -> Result<(), Error> { - let now = self.tick_manager.tick(); - loop { + let now = self.tick_manager.tick(); for worker in self.worker_handles.iter() { // TODO(discord9): consider how to handle error in individual worker worker.lock().await.run_available(now).await.unwrap(); @@ -553,6 +552,8 @@ impl FlownodeManager { ); let table_id = region_id.table_id(); self.node_context.lock().await.send(table_id, rows)?; + // TODO(discord9): put it in a background task? + self.run_available().await?; Ok(()) } } diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 2261f4de14f7..50bd48f5fb70 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -124,9 +124,13 @@ fn mfp_subgraph( // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. // 2. Output the updates. // 3. Truncate all updates within that range. - let from = arrange.read().last_compaction_time().map(|n| n + 1); + let from = arrange.read().last_compaction_time(); let from = from.unwrap_or(repr::Timestamp::MIN); - let output_kv = arrange.read().get_updates_in_range(from..=now); + let range = ( + std::ops::Bound::Excluded(from), + std::ops::Bound::Included(now), + ); + let output_kv = arrange.read().get_updates_in_range(range); // the output is expected to be key -> empty val let output = output_kv .into_iter() diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 7a443e999140..8321f101872f 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -301,9 +301,13 @@ fn update_reduce_distinct_arrange( // Deal with output: // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. - let from = arrange.read().last_compaction_time().map(|n| n + 1); + let from = arrange.read().last_compaction_time(); let from = from.unwrap_or(repr::Timestamp::MIN); - let output_kv = arrange.read().get_updates_in_range(from..=now); + let range = ( + std::ops::Bound::Excluded(from), + std::ops::Bound::Included(now), + ); + let output_kv = arrange.read().get_updates_in_range(range); // 2. Truncate all updates stored in arrangement within that range. let run_compaction = || { diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 2371a98edd82..3d5a6a7bd2cd 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -55,9 +55,12 @@ impl<'referred, 'df> Context<'referred, 'df> { .df .add_subgraph_source("source", send_port, move |_ctx, send| { let now = *now.borrow(); - let arr = arrange_handler_inner.write().get_updates_in_range(..=now); - err_collector.run(|| arrange_handler_inner.write().compact_to(now)); + // write lock to prevent unexpected mutation + let mut arranged = arrange_handler_inner.write(); + let arr = arranged.get_updates_in_range(..=now); + err_collector.run(|| arranged.compact_to(now)); + debug!("Call source"); let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d)); let mut to_send = Vec::new(); let mut to_arrange = Vec::new(); @@ -77,7 +80,7 @@ impl<'referred, 'df> Context<'referred, 'df> { to_arrange.len() ); } - err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); + err_collector.run(|| arranged.apply_updates(now, to_arrange)); send.give(all); // always schedule source to run at now so we can repeatedly run source if needed inner_schd.schedule_at(now); diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index ea1885e50650..1762d0255282 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -567,6 +567,10 @@ impl ArrangeHandler { pub fn set_full_arrangement(&self, full: bool) { self.write().full_arrangement = full; } + + pub fn is_full_arrangement(&self) -> bool { + self.read().full_arrangement + } } #[cfg(test)] From 094e09bc0097e750741dab2ca9a40c613f8ef19f Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 24 May 2024 16:32:36 +0800 Subject: [PATCH 13/22] refactor: per reviews --- src/flow/src/adapter/flownode_impl.rs | 2 +- src/flow/src/adapter/node_context.rs | 2 +- src/flow/src/adapter/table_source.rs | 4 ++-- src/flow/src/repr/relation.rs | 9 +++++++++ src/flow/src/transform.rs | 4 ++-- src/flow/src/transform/expr.rs | 2 -- src/flow/src/utils.rs | 9 ++++++++- 7 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 9b9b6d3626a6..152251975ab8 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -148,8 +148,8 @@ impl Flownode for FlownodeManager { let rows: Vec = rows_proto .into_iter() - .map(repr::Row::from) .map(|r| { + let r = repr::Row::from(r); let reordered = fetch_order .iter() .map(|&i| r.inner[i].clone()) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index a5db59698dc0..1ef5823a477e 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -303,7 +303,7 @@ impl FlownodeContext { .get_by_name(table_name) .map(|(_, gid)| gid) .unwrap(); - self.schema.insert(gid, schema.into_named(vec![])); + self.schema.insert(gid, schema.into_unnamed()); Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 36e9cd7561eb..53932cd692c2 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -122,17 +122,17 @@ impl TableSource { ]; let raw_schema = table_info_value.table_info.meta.schema; - let (col_names, column_types): (Vec<_>, Vec<_>) = raw_schema + let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema .column_schemas .clone() .into_iter() .map(|col| { ( - col.name.clone(), ColumnType { nullable: col.is_nullable(), scalar_type: col.data_type, }, + col.name, ) }) .unzip(); diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 48f4de2894f0..09e0b88344b7 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -263,9 +263,18 @@ impl RelationType { true } + /// Return relation describe with column names pub fn into_named(self, names: Vec) -> RelationDesc { RelationDesc { typ: self, names } } + + /// Return relation describe without column names + pub fn into_unnamed(self) -> RelationDesc { + RelationDesc { + typ: self, + names: vec![], + } + } } /// The type of a `Value` diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 03927f516962..6f93e36e9682 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -211,7 +211,7 @@ mod test { let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema.into_named(vec![])); + schemas.insert(gid, schema.into_unnamed()); } { @@ -225,7 +225,7 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), ColumnType::new(CDT::datetime_datatype(), false), ]); - schemas.insert(gid, schema.into_named(vec![])); + schemas.insert(gid, schema.into_unnamed()); tri_map.insert(Some(name.clone()), Some(1025), gid); } diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 9058b98c0971..dfbc2c9125f9 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -101,7 +101,6 @@ impl TypedExpr { .unzip(); match arg_len { - // because variadic function can also have 1 arguments, we need to check if it's a variadic function first 1 if UnaryFunc::from_str_and_type(fn_name, None).is_ok() => { let func = UnaryFunc::from_str_and_type(fn_name, None)?; let arg = arg_exprs[0].clone(); @@ -124,7 +123,6 @@ impl TypedExpr { Ok(TypedExpr::new(arg.call_unary(func), ret_type)) } - // because variadic function can also have 2 arguments, we need to check if it's a variadic function first 2 if BinaryFunc::from_str_expr_and_type( fn_name, &arg_exprs, diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 1762d0255282..30d48f0319d4 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -101,7 +101,14 @@ impl KeyExpiryManager { .or_default() .insert(row.clone()); - self.get_expire_duration(now, row) + if let Some(expire_time) = self.compute_expiration_timestamp(now) { + if expire_time > event_ts { + // return how much time it's expired + return Ok(Some(expire_time - event_ts)); + } + } + + Ok(None) } /// Get the expire duration of a key, if it's expired by now. From eeef04a37b55795d50a53ed30dc83c2730e90a6f Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 28 May 2024 11:38:49 +0800 Subject: [PATCH 14/22] feat!: ts placeholder rename to __ts_placeholder --- src/flow/src/adapter.rs | 24 ++++++++++-------------- src/flow/src/compute/render/reduce.rs | 13 +++++++++---- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 9e3bcea199e3..277c64fe22e0 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -26,7 +26,6 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_frontend::handler::FrontendInvoker; use common_meta::key::TableMetadataManagerRef; -use common_query::prelude::GREPTIME_TIMESTAMP; use common_runtime::JoinHandle; use common_telemetry::{debug, info}; use datatypes::schema::ColumnSchema; @@ -67,6 +66,10 @@ use error::Error; pub const PER_REQ_MAX_ROW_CNT: usize = 8192; +pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; + +pub const UPDATE_AT_TS_COL: &str = "update_at"; + // TODO: refactor common types for flow to a separate module /// FlowId is a unique identifier for a flow task pub type FlowId = u64; @@ -282,7 +285,7 @@ impl FlownodeManager { let schema = meta.schema.column_schemas; let is_auto_create = schema .last() - .map(|s| s.name == GREPTIME_TIMESTAMP) + .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) .unwrap_or(false); (primary_keys, schema, is_auto_create) } else { @@ -314,13 +317,13 @@ impl FlownodeManager { }) .unwrap_or_default(); let update_at = ColumnSchema::new( - "update_at", + UPDATE_AT_TS_COL, ConcreteDataType::timestamp_millisecond_datatype(), true, ); // TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one let ts_col = ColumnSchema::new( - GREPTIME_TIMESTAMP, + AUTO_CREATED_PLACEHOLDER_TS_COL, ConcreteDataType::timestamp_millisecond_datatype(), true, ) @@ -357,16 +360,7 @@ impl FlownodeManager { table_name.join("."), reqs ); - let now = SystemTime::now(); - let now = now - .duration_since(SystemTime::UNIX_EPOCH) - .map(|s| s.as_millis() as repr::Timestamp) - .unwrap_or_else(|_| { - -(SystemTime::UNIX_EPOCH - .duration_since(now) - .unwrap() - .as_millis() as repr::Timestamp) - }); + let now = self.tick_manager.tick(); for req in reqs { match req { DiffRequest::Insert(insert) => { @@ -666,7 +660,9 @@ impl FlownodeManager { /// TSO coord mess #[derive(Clone, Debug)] pub struct FlowTickManager { + /// The starting instant of the flow, used with `start_timestamp` to calculate the current timestamp start: Instant, + /// The timestamp when the flow started start_timestamp: repr::Timestamp, } diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 8321f101872f..fa29a6324215 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -406,10 +406,15 @@ fn reduce_accum_subgraph( err_collector.run(|| { if let Some(expired) = expire_man.get_expire_duration(now, &key)? { is_expired = true; - DataAlreadyExpiredSnafu { - expired_by: expired, - } - .fail() + // expired data is ignored in computation, and a simple warning is logged + common_telemetry::warn!( + "Data already expired: {}", + DataAlreadyExpiredSnafu { + expired_by: expired, + } + .build() + ); + Ok(()) } else { Ok(()) } From de4eaf91b05a36449ee66e6a64d3a23075dcd88e Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 28 May 2024 15:21:17 +0800 Subject: [PATCH 15/22] refactor: better condition --- src/flow/src/adapter.rs | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 277c64fe22e0..0c4cffd62df2 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -35,12 +35,12 @@ use itertools::Itertools; use query::{QueryEngine, QueryEngineFactory}; use serde::{Deserialize, Serialize}; use session::context::QueryContext; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; use tokio::sync::{oneshot, watch, Mutex, RwLock}; -use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; +use crate::adapter::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; pub(crate) use crate::adapter::node_context::FlownodeContext; use crate::adapter::table_source::TableSource; use crate::adapter::util::column_schemas_to_proto; @@ -283,10 +283,16 @@ impl FlownodeManager { .map(|i| meta.schema.column_schemas[i].name.clone()) .collect_vec(); let schema = meta.schema.column_schemas; - let is_auto_create = schema - .last() - .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) - .unwrap_or(false); + // check if the last column is the auto created timestamp column, hence the table is auto created from + // flow's plan type + let is_auto_create = { + let correct_name = schema + .last() + .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL) + .unwrap_or(false); + let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1); + correct_name && correct_time_index + }; (primary_keys, schema, is_auto_create) } else { // TODO(discord9): condiser remove buggy auto create by schema @@ -351,7 +357,7 @@ impl FlownodeManager { (primary_keys, with_ts, true) }; - + let schema_len = schema.len(); let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; debug!( @@ -373,13 +379,23 @@ impl FlownodeManager { ))]); // ts col, if auto create if is_auto_create { + ensure!( + row.len() == schema_len - 1, + InternalSnafu { + reason: format!( + "Row len mismatch, expect {} got {}", + schema_len - 1, + row.len() + ) + } + ); row.extend([Value::from( common_time::Timestamp::new_millisecond(0), )]); } - row.into() + Ok(row.into()) }) - .collect::>(); + .collect::, Error>>()?; let table_name = table_name.last().unwrap().clone(); let req = RowInsertRequest { table_name, From 7e6a6d0cd881046c181a0bb77409205379b509a1 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 28 May 2024 19:41:43 +0800 Subject: [PATCH 16/22] tests(WIP): func sig choose --- src/flow/src/adapter.rs | 1 + src/flow/src/transform/expr.rs | 89 ++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 0c4cffd62df2..6f7d44aecbef 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -66,6 +66,7 @@ use error::Error; pub const PER_REQ_MAX_ROW_CNT: usize = 8192; +// TODO: replace this with `GREPTIME_TIMESTAMP` before v0.9 pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; pub const UPDATE_AT_TS_COL: &str = "update_at"; diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index dfbc2c9125f9..8b0f641acfb0 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -328,8 +328,11 @@ impl TypedExpr { #[cfg(test)] mod test { + use std::collections::HashMap; + use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; + use substrait::substrait_proto::proto::FunctionArgument; use super::*; use crate::expr::{GlobalId, MapFilterProject}; @@ -514,4 +517,90 @@ mod test { assert_eq!(flow_plan.unwrap(), expected); } + + #[test] + fn test_func_sig() { + fn col(i: usize) -> substrait_proto::proto::FunctionArgument { + use substrait_proto::proto::expression; + let expr = Expression { + rex_type: Some(expression::RexType::Selection(Box::new( + expression::FieldReference { + reference_type: Some( + expression::field_reference::ReferenceType::DirectReference( + expression::ReferenceSegment { + reference_type: Some( + expression::reference_segment::ReferenceType::StructField( + Box::new(expression::reference_segment::StructField { + field: i as i32, + child: None, + }), + ), + ), + }, + ), + ), + root_type: None, + }, + ))), + }; + substrait_proto::proto::FunctionArgument { + arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value( + expr, + )), + } + } + + let f = substrait_proto::proto::expression::ScalarFunction { + function_reference: 0, + arguments: vec![col(0)], + options: vec![], + output_type: None, + ..Default::default() + }; + let input_schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); + let extensions = FunctionExtensions { + anchor_to_name: HashMap::from([(0, "is_null".to_string())]), + }; + let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); + + assert_eq!( + res, + TypedExpr { + expr: ScalarExpr::Column(0).call_unary(UnaryFunc::IsNull), + typ: ColumnType { + scalar_type: CDT::boolean_datatype(), + nullable: true, + }, + } + ); + + let f = substrait_proto::proto::expression::ScalarFunction { + function_reference: 0, + arguments: vec![col(0), col(1)], + options: vec![], + output_type: None, + ..Default::default() + }; + let input_schema = RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), + ColumnType::new(CDT::uint32_datatype(), false), + ]); + let extensions = FunctionExtensions { + anchor_to_name: HashMap::from([(0, "add".to_string())]), + }; + let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); + + assert_eq!( + res, + TypedExpr { + expr: ScalarExpr::Column(0) + .call_binary(ScalarExpr::Column(1), BinaryFunc::AddUInt32,), + typ: ColumnType { + scalar_type: CDT::uint32_datatype(), + nullable: true, + }, + } + ); + // TODO: test tumble function&other var functions + } } From 6f743c6e956e6edcb46db349f7abed78a1ccecaa Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 28 May 2024 20:18:36 +0800 Subject: [PATCH 17/22] tests(WIP): tumble func --- src/flow/src/expr/func.rs | 39 ++++++++++++++++++++++++++++++++++ src/flow/src/transform/expr.rs | 39 +++++++++++++++++++++++----------- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index bfbcda77d23b..dfe276ea543f 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -76,6 +76,13 @@ impl UnmaterializableFunc { } } + pub fn is_valid_func_name(name: &str) -> bool { + match name.to_lowercase().as_str() { + "now" | "current_schema" | "tumble" => true, + _ => false, + } + } + /// Create a UnmaterializableFunc from a string of the function name pub fn from_str_args(name: &str, args: Vec) -> Result { match name.to_lowercase().as_str() { @@ -183,6 +190,13 @@ impl UnaryFunc { } } + pub fn is_valid_func_name(name: &str) -> bool { + match name.to_lowercase().as_str() { + "not" | "is_null" | "is_true" | "is_false" | "step_timestamp" | "cast" => true, + _ => false, + } + } + /// Create a UnaryFunc from a string of the function name and given argument type(optional) pub fn from_str_and_type( name: &str, @@ -579,6 +593,27 @@ impl BinaryFunc { Ok(ret) } + pub fn is_valid_func_name(name: &str) -> bool { + matches!( + name.to_lowercase().as_str(), + "eq" | "equal" + | "not_eq" + | "not_equal" + | "lt" + | "lte" + | "gt" + | "gte" + | "add" + | "sub" + | "subtract" + | "mul" + | "multiply" + | "div" + | "divide" + | "mod" + ) + } + /// choose the appropriate specialization based on the input types /// return a specialization of the binary function and it's actual input and output type(so no null type present) /// @@ -770,6 +805,10 @@ impl VariadicFunc { } } + pub fn is_valid_func_name(name: &str) -> bool { + matches!(name.to_lowercase().as_str(), "and" | "or") + } + /// Create a VariadicFunc from a string of the function name and given argument types(optional) pub fn from_str_and_types( name: &str, diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 8b0f641acfb0..d9fd5f40ff4c 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -101,7 +101,7 @@ impl TypedExpr { .unzip(); match arg_len { - 1 if UnaryFunc::from_str_and_type(fn_name, None).is_ok() => { + 1 if UnaryFunc::is_valid_func_name(fn_name) => { let func = UnaryFunc::from_str_and_type(fn_name, None)?; let arg = arg_exprs[0].clone(); let ret_type = ColumnType::new_nullable(func.signature().output.clone()); @@ -123,13 +123,7 @@ impl TypedExpr { Ok(TypedExpr::new(arg.call_unary(func), ret_type)) } - 2 if BinaryFunc::from_str_expr_and_type( - fn_name, - &arg_exprs, - arg_types.get(0..2).expect("arg have 2 elements"), - ) - .is_ok() => - { + 2 if BinaryFunc::is_valid_func_name(fn_name) => { let (func, signature) = BinaryFunc::from_str_expr_and_type(fn_name, &arg_exprs, &arg_types[0..2])?; @@ -171,7 +165,8 @@ impl TypedExpr { Ok(TypedExpr::new(ret_expr, ret_type)) } _var => { - if let Ok(func) = VariadicFunc::from_str_and_types(fn_name, &arg_types) { + if VariadicFunc::is_valid_func_name(fn_name) { + let func = VariadicFunc::from_str_and_types(fn_name, &arg_types)?; let ret_type = ColumnType::new_nullable(func.signature().output.clone()); let mut expr = ScalarExpr::CallVariadic { func, @@ -179,9 +174,8 @@ impl TypedExpr { }; expr.optimize(); Ok(TypedExpr::new(expr, ret_type)) - } else if let Ok(func) = - UnmaterializableFunc::from_str_args(fn_name, arg_typed_exprs) - { + } else if UnmaterializableFunc::is_valid_func_name(fn_name) { + let func = UnmaterializableFunc::from_str_args(fn_name, arg_typed_exprs)?; let ret_type = ColumnType::new_nullable(func.signature().output.clone()); Ok(TypedExpr::new( ScalarExpr::CallUnmaterializable(func), @@ -520,6 +514,9 @@ mod test { #[test] fn test_func_sig() { + fn lit(v: Value) -> substrait_proto::proto::FunctionArgument { + todo!() + } fn col(i: usize) -> substrait_proto::proto::FunctionArgument { use substrait_proto::proto::expression; let expr = Expression { @@ -602,5 +599,23 @@ mod test { } ); // TODO: test tumble function&other var functions + + let f = substrait_proto::proto::expression::ScalarFunction { + function_reference: 0, + arguments: vec![col(0), col(1)], + options: vec![], + output_type: None, + ..Default::default() + }; + let input_schema = RelationType::new(vec![ + ColumnType::new(CDT::time_nanosecond_datatype(), false), + ColumnType::new(CDT::string_datatype(), false), + ]); + let extensions = FunctionExtensions { + anchor_to_name: HashMap::from([(0, "tumble".to_string())]), + }; + let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); + + dbg!(&res); } } From be57e46f6d0faf7198b528afffee54449a8b9f9b Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 29 May 2024 16:20:45 +0800 Subject: [PATCH 18/22] feat: make run_available optional blocking --- src/flow/src/adapter.rs | 32 ++++++++++++++++++++++------ src/flow/src/adapter/node_context.rs | 2 +- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 6f7d44aecbef..52209a172f63 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -510,9 +510,12 @@ impl FlownodeManager { debug!("Starting to run"); loop { // TODO(discord9): only run when new inputs arrive or scheduled to - self.run_available().await.unwrap(); + debug!("call run_available in run every second"); + self.run_available(true).await.unwrap(); + debug!("call send_writeback_requests in run every second"); // TODO(discord9): error handling self.send_writeback_requests().await.unwrap(); + debug!("call log_all_errors in run every second"); self.log_all_errors().await; tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -521,17 +524,33 @@ impl FlownodeManager { /// Run all available subgraph in the flow node /// This will try to run all dataflow in this node /// - /// However this is not blocking and can sometimes return while actual computation is still running in worker thread + /// set `blocking` to true to wait until lock is acquired + /// and false to return immediately if lock is not acquired /// TODO(discord9): add flag for subgraph that have input since last run - pub async fn run_available(&self) -> Result<(), Error> { + pub async fn run_available(&self, blocking: bool) -> Result<(), Error> { loop { let now = self.tick_manager.tick(); for worker in self.worker_handles.iter() { // TODO(discord9): consider how to handle error in individual worker - worker.lock().await.run_available(now).await.unwrap(); + if blocking { + worker.lock().await.run_available(now).await?; + } else if let Ok(worker) = worker.try_lock() { + worker.run_available(now).await?; + } else { + return Ok(()); + } } // first check how many inputs were sent - match self.node_context.lock().await.flush_all_sender() { + let (flush_res, buf_len) = if blocking { + let mut ctx = self.node_context.lock().await; + (ctx.flush_all_sender(), ctx.get_send_buf_size()) + } else { + match self.node_context.try_lock() { + Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()), + Err(_) => return Ok(()), + } + }; + match flush_res { Ok(_) => (), Err(err) => { common_telemetry::error!("Flush send buf errors: {:?}", err); @@ -539,7 +558,6 @@ impl FlownodeManager { } }; // if no thing in send buf then break - let buf_len = self.node_context.lock().await.get_send_buf_size(); if buf_len == 0 { break; } else { @@ -564,7 +582,7 @@ impl FlownodeManager { let table_id = region_id.table_id(); self.node_context.lock().await.send(table_id, rows)?; // TODO(discord9): put it in a background task? - self.run_available().await?; + // self.run_available(false).await?; Ok(()) } } diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 1ef5823a477e..f544e80fb255 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -107,7 +107,7 @@ impl SourceSender { } if row_cnt > 0 { debug!("Send {} rows", row_cnt); - debug!("Send buf len = {}", self.send_buf.len()); + debug!("Remaining Send buf.len() = {}", self.send_buf.len()); } Ok(row_cnt) From 85f0d7633642812f99289e97acb6d6ad9e00b666 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 29 May 2024 16:40:23 +0800 Subject: [PATCH 19/22] tests: tumble transform --- src/flow/src/transform/expr.rs | 66 ++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index d9fd5f40ff4c..74fc7ef61753 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -324,9 +324,10 @@ impl TypedExpr { mod test { use std::collections::HashMap; + use common_time::{DateTime, Interval}; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; - use substrait::substrait_proto::proto::FunctionArgument; + use pretty_assertions::assert_eq; use super::*; use crate::expr::{GlobalId, MapFilterProject}; @@ -514,8 +515,20 @@ mod test { #[test] fn test_func_sig() { - fn lit(v: Value) -> substrait_proto::proto::FunctionArgument { - todo!() + fn lit(v: impl ToString) -> substrait_proto::proto::FunctionArgument { + use substrait_proto::proto::expression; + let expr = Expression { + rex_type: Some(expression::RexType::Literal(expression::Literal { + nullable: false, + type_variation_reference: 0, + literal_type: Some(expression::literal::LiteralType::String(v.to_string())), + })), + }; + substrait_proto::proto::FunctionArgument { + arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value( + expr, + )), + } } fn col(i: usize) -> substrait_proto::proto::FunctionArgument { use substrait_proto::proto::expression; @@ -598,17 +611,16 @@ mod test { }, } ); - // TODO: test tumble function&other var functions let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0), col(1)], + arguments: vec![col(0), lit("1 second"), lit("2021-07-01 00:00:00")], options: vec![], output_type: None, ..Default::default() }; let input_schema = RelationType::new(vec![ - ColumnType::new(CDT::time_nanosecond_datatype(), false), + ColumnType::new(CDT::timestamp_nanosecond_datatype(), false), ColumnType::new(CDT::string_datatype(), false), ]); let extensions = FunctionExtensions { @@ -616,6 +628,46 @@ mod test { }; let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); - dbg!(&res); + assert_eq!( + res, + ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { + ts: Box::new( + ScalarExpr::Column(0) + .with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false)) + ), + window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000), + start_time: Some(DateTime::new(1625097600000)) + }) + .with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)), + ); + + let f = substrait_proto::proto::expression::ScalarFunction { + function_reference: 0, + arguments: vec![col(0), lit("1 second")], + options: vec![], + output_type: None, + ..Default::default() + }; + let input_schema = RelationType::new(vec![ + ColumnType::new(CDT::timestamp_nanosecond_datatype(), false), + ColumnType::new(CDT::string_datatype(), false), + ]); + let extensions = FunctionExtensions { + anchor_to_name: HashMap::from([(0, "tumble".to_string())]), + }; + let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions).unwrap(); + + assert_eq!( + res, + ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { + ts: Box::new( + ScalarExpr::Column(0) + .with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false)) + ), + window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000), + start_time: None + }) + .with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)), + ) } } From 3b5439c36e444d4c57d0f280099f12df831d221b Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 29 May 2024 17:07:35 +0800 Subject: [PATCH 20/22] chore: clippy --- src/flow/src/expr/func.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index dfe276ea543f..31131a2758eb 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -77,10 +77,10 @@ impl UnmaterializableFunc { } pub fn is_valid_func_name(name: &str) -> bool { - match name.to_lowercase().as_str() { - "now" | "current_schema" | "tumble" => true, - _ => false, - } + matches!( + name.to_lowercase().as_str(), + "now" | "current_schema" | "tumble" + ) } /// Create a UnmaterializableFunc from a string of the function name @@ -191,10 +191,10 @@ impl UnaryFunc { } pub fn is_valid_func_name(name: &str) -> bool { - match name.to_lowercase().as_str() { - "not" | "is_null" | "is_true" | "is_false" | "step_timestamp" | "cast" => true, - _ => false, - } + matches!( + name.to_lowercase().as_str(), + "not" | "is_null" | "is_true" | "is_false" | "step_timestamp" | "cast" + ) } /// Create a UnaryFunc from a string of the function name and given argument type(optional) From 41d0a12589849396a96e616e3d4ab7a8e09246d9 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 29 May 2024 19:02:15 +0800 Subject: [PATCH 21/22] fix?: lagged missing data --- src/flow/src/adapter/node_context.rs | 3 ++- src/flow/src/compute/render/src_sink.rs | 33 ++++++++++++++++++++----- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index f544e80fb255..ffaa3cc70252 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -71,7 +71,8 @@ pub struct SourceSender { impl Default for SourceSender { fn default() -> Self { Self { - sender: broadcast::Sender::new(BROADCAST_CAP), + // TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data + sender: broadcast::Sender::new(BROADCAST_CAP * 2), send_buf: Default::default(), } } diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 3d5a6a7bd2cd..9fed1c415305 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -20,12 +20,14 @@ use common_telemetry::{debug, info}; use hydroflow::scheduled::graph_ext::GraphExt; use itertools::Itertools; use snafu::OptionExt; +use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, mpsc}; use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::Context; use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; -use crate::expr::GlobalId; +use crate::expr::error::InternalSnafu; +use crate::expr::{EvalError, GlobalId}; use crate::repr::{DiffRow, Row, BROADCAST_CAP}; #[allow(clippy::mutable_key_type)] @@ -65,11 +67,30 @@ impl<'referred, 'df> Context<'referred, 'df> { let mut to_send = Vec::new(); let mut to_arrange = Vec::new(); // TODO(discord9): handling tokio broadcast error - while let Ok((r, t, d)) = src_recv.try_recv() { - if t <= now { - to_send.push((r, t, d)); - } else { - to_arrange.push(((r, Row::empty()), t, d)); + loop { + match src_recv.try_recv() { + Ok((r, t, d)) => { + if t <= now { + to_send.push((r, t, d)); + } else { + to_arrange.push(((r, Row::empty()), t, d)); + } + } + Err(TryRecvError::Lagged(lag_offset)) => { + common_telemetry::error!("Flow missing {} rows behind", lag_offset); + break; + } + Err(err) => { + err_collector.run(|| -> Result<(), EvalError> { + InternalSnafu { + reason: format!( + "Error receiving from broadcast channel: {}", + err + ), + } + .fail() + }); + } } } let all = prev_avail.chain(to_send).collect_vec(); From e183323256169c127217dc26903e141529d80f71 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 29 May 2024 20:01:58 +0800 Subject: [PATCH 22/22] fix: flow source break on empty chnl --- src/flow/src/compute/render/src_sink.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 9fed1c415305..96411b6d04b0 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -76,6 +76,9 @@ impl<'referred, 'df> Context<'referred, 'df> { to_arrange.push(((r, Row::empty()), t, d)); } } + Err(TryRecvError::Empty) => { + break; + } Err(TryRecvError::Lagged(lag_offset)) => { common_telemetry::error!("Flow missing {} rows behind", lag_offset); break;