From df5809983773043ddede49d7cec3df7b211d3fc1 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 18 Dec 2024 19:31:02 +0800 Subject: [PATCH 1/5] fix: typed builder --- src/flow/src/adapter.rs | 22 ++- src/flow/src/adapter/flownode_impl.rs | 13 +- src/flow/src/adapter/node_context.rs | 23 ++- src/flow/src/adapter/util.rs | 15 ++ src/flow/src/compute/render.rs | 23 ++- src/flow/src/compute/render/map.rs | 3 +- src/flow/src/compute/render/reduce.rs | 30 +-- src/flow/src/expr.rs | 89 ++++++++- .../standalone/common/flow/flow_basic.result | 178 +++++++++++++++++- .../standalone/common/flow/flow_basic.sql | 79 +++++++- 10 files changed, 430 insertions(+), 45 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 586eaa8e586a..52c7ffcc63c9 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -565,6 +565,8 @@ impl FlowWorkerManager { let default_interval = Duration::from_secs(1); let mut avg_spd = 0; // rows/sec let mut since_last_run = tokio::time::Instant::now(); + let run_per_trace = 5; + let mut run_cnt = 0; loop { // TODO(discord9): only run when new inputs arrive or scheduled to let row_cnt = self.run_available(true).await.unwrap_or_else(|err| { @@ -607,10 +609,19 @@ impl FlowWorkerManager { } else { (9 * avg_spd + cur_spd) / 10 }; - trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd); let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms let new_wait = Duration::from_millis(new_wait as u64).min(default_interval); - trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt); + + // print trace every `run_per_trace` times so that we can see if there is something wrong + // but also not get flooded with trace + if run_cnt >= run_per_trace { + trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd); + trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt); + run_cnt = 0; + } else { + run_cnt += 1; + } + METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64); since_last_run = tokio::time::Instant::now(); tokio::time::sleep(new_wait).await; @@ -670,13 +681,18 @@ impl FlowWorkerManager { &self, region_id: RegionId, rows: Vec, + batch_datatypes: &[ConcreteDataType], ) -> Result<(), Error> { let rows_len = rows.len(); let table_id = region_id.table_id(); let _timer = METRIC_FLOW_INSERT_ELAPSED .with_label_values(&[table_id.to_string().as_str()]) .start_timer(); - self.node_context.read().await.send(table_id, rows).await?; + self.node_context + .read() + .await + .send(table_id, rows, batch_datatypes) + .await?; trace!( "Handling write request for table_id={} with {} rows", table_id, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 3841d08914c5..57c6513359c7 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -28,6 +28,7 @@ use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; +use super::util::from_proto_to_column_schema; use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::error::InternalSnafu; use crate::metrics::METRIC_FLOW_TASK_COUNT; @@ -206,9 +207,17 @@ impl Flownode for FlowWorkerManager { }) .map(|r| (r, now, 1)) .collect_vec(); - self.handle_write_request(region_id.into(), rows) - .await + let batch_datatypes = insert_schema + .iter() + .map(|s| from_proto_to_column_schema(s)) + .collect::, _>>() .map_err(to_meta_err)?; + self.handle_write_request(region_id.into(), rows, &batch_datatypes) + .await + .map_err(|err| { + common_telemetry::error!(err;"Failed to handle write request"); + to_meta_err(err) + })?; } Ok(Default::default()) } diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 26e1a6483ab8..990fdd129797 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_telemetry::trace; +use datatypes::prelude::ConcreteDataType; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; @@ -131,7 +132,11 @@ impl SourceSender { } /// return number of rows it actual send(including what's in the buffer) - pub async fn send_rows(&self, rows: Vec) -> Result { + pub async fn send_rows( + &self, + rows: Vec, + batch_datatypes: &[ConcreteDataType], + ) -> Result { METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _); while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 { tokio::task::yield_now().await; @@ -139,8 +144,11 @@ impl SourceSender { // row count metrics is approx so relaxed order is ok self.send_buf_row_cnt .fetch_add(rows.len(), Ordering::SeqCst); - let batch = Batch::try_from_rows(rows.into_iter().map(|(row, _, _)| row).collect()) - .context(EvalSnafu)?; + let batch = Batch::try_from_rows_with_types( + rows.into_iter().map(|(row, _, _)| row).collect(), + batch_datatypes, + ) + .context(EvalSnafu)?; common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count()); self.send_buf_tx.send(batch).await.map_err(|e| { crate::error::InternalSnafu { @@ -157,14 +165,19 @@ impl FlownodeContext { /// return number of rows it actual send(including what's in the buffer) /// /// TODO(discord9): make this concurrent - pub async fn send(&self, table_id: TableId, rows: Vec) -> Result { + pub async fn send( + &self, + table_id: TableId, + rows: Vec, + batch_datatypes: &[ConcreteDataType], + ) -> Result { let sender = self .source_sender .get(&table_id) .with_context(|| TableNotFoundSnafu { name: table_id.to_string(), })?; - sender.send_rows(rows).await + sender.send_rows(rows, batch_datatypes).await } /// flush all sender's buf diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index a1d2895ba3be..a9da8ef2c35a 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -16,12 +16,27 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::column_def::options_from_column_schema; use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType}; use common_error::ext::BoxedError; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use itertools::Itertools; use snafu::ResultExt; use crate::error::{Error, ExternalSnafu}; +pub fn from_proto_to_column_schema( + column_schema: &api::v1::ColumnSchema, +) -> Result { + let wrapper = ColumnDataTypeWrapper::try_new( + column_schema.datatype, + column_schema.datatype_extension.clone(), + ) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let cdt = ConcreteDataType::from(wrapper); + + Ok(cdt) +} + /// convert `ColumnSchema` lists to it's corresponding proto type pub fn column_schemas_to_proto( column_schemas: Vec, diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index 94f00a182921..46ac7e8a1d5e 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -30,7 +30,7 @@ use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff}; use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu}; use crate::expr::{self, Batch, GlobalId, LocalId}; use crate::plan::{Plan, TypedPlan}; -use crate::repr::{self, DiffRow}; +use crate::repr::{self, DiffRow, RelationType}; mod map; mod reduce; @@ -124,10 +124,10 @@ impl Context<'_, '_> { /// Like `render_plan` but in Batch Mode pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result, Error> { match plan.plan { - Plan::Constant { rows } => Ok(self.render_constant_batch(rows)), + Plan::Constant { rows } => Ok(self.render_constant_batch(rows, &plan.schema.typ)), Plan::Get { id } => self.get_batch_by_id(id), Plan::Let { id, value, body } => self.eval_batch_let(id, value, body), - Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp), + Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp, &plan.schema.typ), Plan::Reduce { input, key_val_plan, @@ -172,7 +172,11 @@ impl Context<'_, '_> { /// render Constant, take all rows that have a timestamp not greater than the current time /// This function is primarily used for testing /// Always assume input is sorted by timestamp - pub fn render_constant_batch(&mut self, rows: Vec) -> CollectionBundle { + pub fn render_constant_batch( + &mut self, + rows: Vec, + output_type: &RelationType, + ) -> CollectionBundle { let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant_batch"); let mut per_time: BTreeMap> = Default::default(); for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) { @@ -185,6 +189,8 @@ impl Context<'_, '_> { let scheduler_inner = scheduler.clone(); let err_collector = self.err_collector.clone(); + let output_type = output_type.clone(); + let subgraph_id = self.df .add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| { @@ -199,7 +205,14 @@ impl Context<'_, '_> { not_great_than_now.into_iter().for_each(|(_ts, rows)| { err_collector.run(|| { let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect(); - let batch = Batch::try_from_rows(rows)?; + let batch = Batch::try_from_rows_with_types( + rows, + &output_type + .column_types + .iter() + .map(|ty| ty.scalar_type().clone()) + .collect_vec(), + )?; send_port.give(vec![batch]); Ok(()) }); diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 416652328401..059e93fe3956 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -25,7 +25,7 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector use crate::error::{Error, PlanSnafu}; use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr}; use crate::plan::TypedPlan; -use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; +use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; use crate::utils::ArrangeHandler; impl Context<'_, '_> { @@ -34,6 +34,7 @@ impl Context<'_, '_> { &mut self, input: Box, mfp: MapFilterProject, + _output_type: &RelationType, ) -> Result, Error> { let input = self.render_plan_batch(*input)?; diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index ee20f8ca783f..79b57c4b25c1 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -47,6 +47,7 @@ impl Context<'_, '_> { reduce_plan: &ReducePlan, output_type: &RelationType, ) -> Result, Error> { + common_telemetry::debug!("render reduce batch"); let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan { if !accum_plan.distinct_aggrs.is_empty() { NotImplementedSnafu { @@ -87,6 +88,8 @@ impl Context<'_, '_> { })?; let key_val_plan = key_val_plan.clone(); + let output_type = output_type.clone(); + let now = self.compute_state.current_time_ref(); let err_collector = self.err_collector.clone(); @@ -118,6 +121,7 @@ impl Context<'_, '_> { src_data, &key_val_plan, &accum_plan, + &output_type, SubgraphArg { now, err_collector: &err_collector, @@ -354,6 +358,7 @@ fn reduce_batch_subgraph( src_data: impl IntoIterator, key_val_plan: &KeyValPlan, accum_plan: &AccumulablePlan, + output_type: &RelationType, SubgraphArg { now, err_collector, @@ -535,17 +540,13 @@ fn reduce_batch_subgraph( // this output part is not supposed to be resource intensive // (because for every batch there wouldn't usually be as many output row?), // so we can do some costly operation here - let output_types = all_output_dict.first_entry().map(|entry| { - entry - .key() - .iter() - .chain(entry.get().iter()) - .map(|v| v.data_type()) - .collect::>() - }); + let output_types = output_type + .column_types + .iter() + .map(|t| t.scalar_type.clone()) + .collect_vec(); - if let Some(output_types) = output_types { - err_collector.run(|| { + err_collector.run(|| { let column_cnt = output_types.len(); let row_cnt = all_output_dict.len(); @@ -585,7 +586,6 @@ fn reduce_batch_subgraph( Ok(()) }); - } } /// reduce subgraph, reduce the input data into a single row @@ -1516,7 +1516,9 @@ mod test { let mut ctx = harness_test_ctx(&mut df, &mut state); let rows = vec![ - (Row::new(vec![1i64.into()]), 1, 1), + (Row::new(vec![Value::Null]), -1, 1), + (Row::new(vec![1i64.into()]), 0, 1), + (Row::new(vec![Value::Null]), 1, 1), (Row::new(vec![2i64.into()]), 2, 1), (Row::new(vec![3i64.into()]), 3, 1), (Row::new(vec![1i64.into()]), 4, 1), @@ -1558,13 +1560,15 @@ mod test { Box::new(input_plan.with_types(typ.into_unnamed())), &key_val_plan, &reduce_plan, - &RelationType::empty(), + &RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]), ) .unwrap(); { let now_inner = now.clone(); let expected = BTreeMap::>::from([ + (-1, vec![]), + (0, vec![1i64]), (1, vec![1i64]), (2, vec![3i64]), (3, vec![6i64]), diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 2e6019ba4ca7..c5e37956de36 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -24,7 +24,7 @@ mod scalar; mod signature; use arrow::compute::FilterBuilder; -use datatypes::prelude::DataType; +use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::value::Value; use datatypes::vectors::{BooleanVector, Helper, VectorRef}; pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn}; @@ -85,6 +85,48 @@ impl Default for Batch { } impl Batch { + /// Get batch from rows, will try best to determine data type + pub fn try_from_rows_with_types( + rows: Vec, + batch_datatypes: &[ConcreteDataType], + ) -> Result { + if rows.is_empty() { + return Ok(Self::empty()); + } + let len = rows.len(); + let mut builder = batch_datatypes + .iter() + .map(|ty| ty.create_mutable_vector(len)) + .collect_vec(); + for row in rows { + ensure!( + row.len() == builder.len(), + InvalidArgumentSnafu { + reason: format!( + "row length not match, expect {}, found {}", + builder.len(), + row.len() + ) + } + ); + for (idx, value) in row.iter().enumerate() { + builder[idx] + .try_push_value_ref(value.as_value_ref()) + .context(DataTypeSnafu { + msg: "Failed to convert rows to columns", + })?; + } + } + + let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec(); + let batch = Self::try_new(columns, len)?; + Ok(batch) + } + + /// Get batch from rows, will try best to determine data type + /// + /// for test purposes only + #[cfg(test)] pub fn try_from_rows(rows: Vec) -> Result { if rows.is_empty() { return Ok(Self::empty()); @@ -94,7 +136,24 @@ impl Batch { .first() .unwrap() .iter() - .map(|v| v.data_type().create_mutable_vector(len)) + .enumerate() + .map(|(i, v)| { + let mut ty = None; + if v.data_type().is_null() { + for row in rows.iter() { + if let Some(t) = row.get(i) + && !t.data_type().is_null() + { + ty = Some(t.data_type().clone()); + break; + } + } + } + // if all rows are null, use null type + let ty = ty.unwrap_or(datatypes::prelude::ConcreteDataType::null_datatype()); + + ty.create_mutable_vector(len) + }) .collect_vec(); for row in rows { ensure!( @@ -221,10 +280,28 @@ impl Batch { return Ok(()); } - let dts = if self.batch.is_empty() { - other.batch.iter().map(|v| v.data_type()).collect_vec() - } else { - self.batch.iter().map(|v| v.data_type()).collect_vec() + let dts = { + let max_len = self.batch.len().max(other.batch.len()); + let mut dts = Vec::with_capacity(max_len); + for i in 0..max_len { + if let Some(v) = self.batch().get(i) + && !v.data_type().is_null() + { + dts.push(v.data_type()) + } else if let Some(v) = other.batch().get(i) + && !v.data_type().is_null() + { + dts.push(v.data_type()) + } else { + // both are null, so we will push null type + dts.push(datatypes::prelude::ConcreteDataType::null_datatype()) + } + } + if self.batch.is_empty() { + other.batch.iter().map(|v| v.data_type()).collect_vec() + } else { + self.batch.iter().map(|v| v.data_type()).collect_vec() + } }; let batch_builders = dts diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index fa360a6de684..c70fe54fec19 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -390,6 +390,65 @@ GROUP BY Affected Rows: 0 +INSERT INTO + bytes_log +VALUES + (NULL, '2023-01-01 00:00:01'), + (300, '2023-01-01 00:00:29'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + rate, + time_window +FROM + approx_rate; + ++------+---------------------+ +| rate | time_window | ++------+---------------------+ +| 0.0 | 2023-01-01T00:00:00 | ++------+---------------------+ + +INSERT INTO + bytes_log +VALUES + (NULL, '2022-01-01 00:00:01'), + (NULL, '2022-01-01 00:00:29'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + rate, + time_window +FROM + approx_rate; + ++------+---------------------+ +| rate | time_window | ++------+---------------------+ +| | 2022-01-01T00:00:00 | +| 0.0 | 2023-01-01T00:00:00 | ++------+---------------------+ + INSERT INTO bytes_log VALUES @@ -416,6 +475,8 @@ FROM +-------------------+---------------------+ | rate | time_window | +-------------------+---------------------+ +| | 2022-01-01T00:00:00 | +| 0.0 | 2023-01-01T00:00:00 | | 6.633333333333334 | 2025-01-01T00:00:00 | +-------------------+---------------------+ @@ -445,6 +506,8 @@ FROM +--------------------+---------------------+ | rate | time_window | +--------------------+---------------------+ +| | 2022-01-01T00:00:00 | +| 0.0 | 2023-01-01T00:00:00 | | 6.633333333333334 | 2025-01-01T00:00:00 | | 1.6666666666666667 | 2025-01-01T00:00:30 | +--------------------+---------------------+ @@ -992,6 +1055,7 @@ CREATE TABLE requests_without_ip ( service_name STRING, val INT, ts TIMESTAMP TIME INDEX, + PRIMARY KEY(service_name) ); Affected Rows: 0 @@ -1009,12 +1073,12 @@ Affected Rows: 0 INSERT INTO requests VALUES - ("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"), + (NULL, "10.0.0.1", 100, "2024-10-18 19:00:00"), ("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"), - ("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"), + (NULL, "10.0.0.1", 200, "2024-10-18 19:00:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"), - ("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"), - ("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"), + (NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"), + (NULL, "10.0.0.2", 100, "2024-10-18 19:01:01"), ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); @@ -1037,10 +1101,12 @@ FROM +--------------+-----+---------------------+ | service_name | val | ts | +--------------+-----+---------------------+ +| | 100 | 2024-10-18T19:00:00 | +| | 200 | 2024-10-18T19:00:30 | +| | 300 | 2024-10-18T19:01:00 | +| | 100 | 2024-10-18T19:01:01 | | svc1 | 100 | 2024-10-18T19:00:00 | | svc1 | 200 | 2024-10-18T19:00:30 | -| svc1 | 300 | 2024-10-18T19:01:00 | -| svc1 | 100 | 2024-10-18T19:01:01 | | svc1 | 400 | 2024-10-18T19:01:30 | | svc1 | 200 | 2024-10-18T19:01:31 | +--------------+-----+---------------------+ @@ -1057,6 +1123,106 @@ FROM | 1 | +--------------+ +INSERT INTO + requests +VALUES + (null, "10.0.0.1", 100, "2024-10-19 19:00:00"), + (null, "10.0.0.2", 100, "2024-10-19 19:00:00"), + (null, "10.0.0.1", 200, "2024-10-19 19:00:30"), + (null, "10.0.0.2", 200, "2024-10-19 19:00:30"), + (null, "10.0.0.1", 300, "2024-10-19 19:01:00"), + (null, "10.0.0.2", 100, "2024-10-19 19:01:01"), + (null, "10.0.0.1", 400, "2024-10-19 19:01:30"), + (null, "10.0.0.2", 200, "2024-10-19 19:01:31"); + +Affected Rows: 8 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('requests_long_term'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('requests_long_term') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT + * +FROM + requests_without_ip; + ++--------------+-----+---------------------+ +| service_name | val | ts | ++--------------+-----+---------------------+ +| | 100 | 2024-10-18T19:00:00 | +| | 200 | 2024-10-18T19:00:30 | +| | 300 | 2024-10-18T19:01:00 | +| | 100 | 2024-10-18T19:01:01 | +| | 100 | 2024-10-19T19:00:00 | +| | 200 | 2024-10-19T19:00:30 | +| | 300 | 2024-10-19T19:01:00 | +| | 100 | 2024-10-19T19:01:01 | +| | 400 | 2024-10-19T19:01:30 | +| | 200 | 2024-10-19T19:01:31 | +| svc1 | 100 | 2024-10-18T19:00:00 | +| svc1 | 200 | 2024-10-18T19:00:30 | +| svc1 | 400 | 2024-10-18T19:01:30 | +| svc1 | 200 | 2024-10-18T19:01:31 | ++--------------+-----+---------------------+ + +INSERT INTO + requests +VALUES + ("svc2", "10.0.0.1", 100, "2024-10-18 19:00:00"), + ("svc2", "10.0.0.2", 100, "2024-10-18 19:00:00"), + ("svc2", "10.0.0.1", 200, "2024-10-18 19:00:30"), + ("svc2", "10.0.0.2", 200, "2024-10-18 19:00:30"), + ("svc2", "10.0.0.1", 300, "2024-10-18 19:01:00"), + ("svc2", "10.0.0.2", 100, "2024-10-18 19:01:01"), + ("svc2", "10.0.0.1", 400, "2024-10-18 19:01:30"), + ("svc2", "10.0.0.2", 200, "2024-10-18 19:01:31"); + +Affected Rows: 8 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('requests_long_term'); + ++----------------------------------------+ +| ADMIN FLUSH_FLOW('requests_long_term') | ++----------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------+ + +SELECT + * +FROM + requests_without_ip; + ++--------------+-----+---------------------+ +| service_name | val | ts | ++--------------+-----+---------------------+ +| | 100 | 2024-10-18T19:00:00 | +| | 200 | 2024-10-18T19:00:30 | +| | 300 | 2024-10-18T19:01:00 | +| | 100 | 2024-10-18T19:01:01 | +| | 100 | 2024-10-19T19:00:00 | +| | 200 | 2024-10-19T19:00:30 | +| | 300 | 2024-10-19T19:01:00 | +| | 100 | 2024-10-19T19:01:01 | +| | 400 | 2024-10-19T19:01:30 | +| | 200 | 2024-10-19T19:01:31 | +| svc1 | 100 | 2024-10-18T19:00:00 | +| svc1 | 200 | 2024-10-18T19:00:30 | +| svc1 | 400 | 2024-10-18T19:01:30 | +| svc1 | 200 | 2024-10-18T19:01:31 | +| svc2 | 100 | 2024-10-18T19:00:00 | +| svc2 | 200 | 2024-10-18T19:00:30 | +| svc2 | 300 | 2024-10-18T19:01:00 | +| svc2 | 100 | 2024-10-18T19:01:01 | +| svc2 | 400 | 2024-10-18T19:01:30 | +| svc2 | 200 | 2024-10-18T19:01:31 | ++--------------+-----+---------------------+ + DROP FLOW requests_long_term; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 8946c014be36..74abbc85df22 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -214,6 +214,36 @@ from GROUP BY time_window; +INSERT INTO + bytes_log +VALUES + (NULL, '2023-01-01 00:00:01'), + (300, '2023-01-01 00:00:29'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +SELECT + rate, + time_window +FROM + approx_rate; + +INSERT INTO + bytes_log +VALUES + (NULL, '2022-01-01 00:00:01'), + (NULL, '2022-01-01 00:00:29'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +SELECT + rate, + time_window +FROM + approx_rate; + INSERT INTO bytes_log VALUES @@ -539,6 +569,7 @@ CREATE TABLE requests_without_ip ( service_name STRING, val INT, ts TIMESTAMP TIME INDEX, + PRIMARY KEY(service_name) ); CREATE FLOW requests_long_term SINK TO requests_without_ip AS @@ -552,12 +583,12 @@ FROM INSERT INTO requests VALUES - ("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"), + (NULL, "10.0.0.1", 100, "2024-10-18 19:00:00"), ("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"), - ("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"), + (NULL, "10.0.0.1", 200, "2024-10-18 19:00:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"), - ("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"), - ("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"), + (NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"), + (NULL, "10.0.0.2", 100, "2024-10-18 19:01:01"), ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); @@ -575,6 +606,46 @@ SELECT FROM INFORMATION_SCHEMA.FLOWS; +INSERT INTO + requests +VALUES + (null, "10.0.0.1", 100, "2024-10-19 19:00:00"), + (null, "10.0.0.2", 100, "2024-10-19 19:00:00"), + (null, "10.0.0.1", 200, "2024-10-19 19:00:30"), + (null, "10.0.0.2", 200, "2024-10-19 19:00:30"), + (null, "10.0.0.1", 300, "2024-10-19 19:01:00"), + (null, "10.0.0.2", 100, "2024-10-19 19:01:01"), + (null, "10.0.0.1", 400, "2024-10-19 19:01:30"), + (null, "10.0.0.2", 200, "2024-10-19 19:01:31"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('requests_long_term'); + +SELECT + * +FROM + requests_without_ip; + +INSERT INTO + requests +VALUES + ("svc2", "10.0.0.1", 100, "2024-10-18 19:00:00"), + ("svc2", "10.0.0.2", 100, "2024-10-18 19:00:00"), + ("svc2", "10.0.0.1", 200, "2024-10-18 19:00:30"), + ("svc2", "10.0.0.2", 200, "2024-10-18 19:00:30"), + ("svc2", "10.0.0.1", 300, "2024-10-18 19:01:00"), + ("svc2", "10.0.0.2", 100, "2024-10-18 19:01:01"), + ("svc2", "10.0.0.1", 400, "2024-10-18 19:01:30"), + ("svc2", "10.0.0.2", 200, "2024-10-18 19:01:31"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('requests_long_term'); + +SELECT + * +FROM + requests_without_ip; + DROP FLOW requests_long_term; DROP TABLE requests_without_ip; From d12f34594ec773cc4e98fc051498d811f9a59c13 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 18 Dec 2024 19:39:37 +0800 Subject: [PATCH 2/5] chore: clippy --- src/flow/src/adapter/flownode_impl.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 57c6513359c7..ce72d7a35af3 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -209,7 +209,7 @@ impl Flownode for FlowWorkerManager { .collect_vec(); let batch_datatypes = insert_schema .iter() - .map(|s| from_proto_to_column_schema(s)) + .map(from_proto_to_column_schema) .collect::, _>>() .map_err(to_meta_err)?; self.handle_write_request(region_id.into(), rows, &batch_datatypes) From 07dbb132716fef85bcc020bd727ecd66b30a3e5b Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 18 Dec 2024 19:42:36 +0800 Subject: [PATCH 3/5] chore: rename --- src/flow/src/adapter/flownode_impl.rs | 4 ++-- src/flow/src/adapter/util.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index ce72d7a35af3..1fa11b4d83a2 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use super::util::from_proto_to_column_schema; +use super::util::from_proto_to_data_type; use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::error::InternalSnafu; use crate::metrics::METRIC_FLOW_TASK_COUNT; @@ -209,7 +209,7 @@ impl Flownode for FlowWorkerManager { .collect_vec(); let batch_datatypes = insert_schema .iter() - .map(from_proto_to_column_schema) + .map(from_proto_to_data_type) .collect::, _>>() .map_err(to_meta_err)?; self.handle_write_request(region_id.into(), rows, &batch_datatypes) diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index a9da8ef2c35a..f2a29bec8e9e 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -23,7 +23,7 @@ use snafu::ResultExt; use crate::error::{Error, ExternalSnafu}; -pub fn from_proto_to_column_schema( +pub fn from_proto_to_data_type( column_schema: &api::v1::ColumnSchema, ) -> Result { let wrapper = ColumnDataTypeWrapper::try_new( From 1bc84a9481760fed6e31e34ea53422f737d24bc5 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 19 Dec 2024 11:31:10 +0800 Subject: [PATCH 4/5] fix: unit tests --- src/flow/src/compute/render/reduce.rs | 6 ++- src/flow/src/expr.rs | 57 ---------------------- src/flow/src/expr/linear.rs | 70 +++++++++++++++++++++------ 3 files changed, 61 insertions(+), 72 deletions(-) diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 79b57c4b25c1..5376c466fbbe 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -1585,7 +1585,11 @@ mod test { if let Some(expected) = expected.get(&now) { let batch = expected.iter().map(|v| Value::from(*v)).collect_vec(); - let batch = Batch::try_from_rows(vec![batch.into()]).unwrap(); + let batch = Batch::try_from_rows_with_types( + vec![batch.into()], + &[CDT::int64_datatype()], + ) + .unwrap(); assert_eq!(res.first(), Some(&batch)); } }); diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index c5e37956de36..95b99c9e208c 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -123,63 +123,6 @@ impl Batch { Ok(batch) } - /// Get batch from rows, will try best to determine data type - /// - /// for test purposes only - #[cfg(test)] - pub fn try_from_rows(rows: Vec) -> Result { - if rows.is_empty() { - return Ok(Self::empty()); - } - let len = rows.len(); - let mut builder = rows - .first() - .unwrap() - .iter() - .enumerate() - .map(|(i, v)| { - let mut ty = None; - if v.data_type().is_null() { - for row in rows.iter() { - if let Some(t) = row.get(i) - && !t.data_type().is_null() - { - ty = Some(t.data_type().clone()); - break; - } - } - } - // if all rows are null, use null type - let ty = ty.unwrap_or(datatypes::prelude::ConcreteDataType::null_datatype()); - - ty.create_mutable_vector(len) - }) - .collect_vec(); - for row in rows { - ensure!( - row.len() == builder.len(), - InvalidArgumentSnafu { - reason: format!( - "row length not match, expect {}, found {}", - builder.len(), - row.len() - ) - } - ); - for (idx, value) in row.iter().enumerate() { - builder[idx] - .try_push_value_ref(value.as_value_ref()) - .context(DataTypeSnafu { - msg: "Failed to convert rows to columns", - })?; - } - } - - let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec(); - let batch = Self::try_new(columns, len)?; - Ok(batch) - } - pub fn empty() -> Self { Self { batch: vec![], diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 373e467aba1b..f96d7827b6bd 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -908,20 +908,33 @@ mod test { .unwrap() .unwrap(); assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)])); - + let ty = [ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ]; // batch mode - let mut batch = Batch::try_from_rows(vec![Row::from(vec![ - Value::from(4), - Value::from(2), - Value::from(3), - ])]) + let mut batch = Batch::try_from_rows_with_types( + vec![Row::from(vec![ + Value::from(4), + Value::from(2), + Value::from(3), + ])], + &ty, + ) .unwrap(); let ret = safe_mfp.eval_batch_into(&mut batch).unwrap(); assert_eq!( ret, - Batch::try_from_rows(vec![Row::from(vec![Value::from(false), Value::from(true)])]) - .unwrap() + Batch::try_from_rows_with_types( + vec![Row::from(vec![Value::from(false), Value::from(true)])], + &[ + ConcreteDataType::boolean_datatype(), + ConcreteDataType::boolean_datatype(), + ], + ) + .unwrap() ); } @@ -956,7 +969,15 @@ mod test { .unwrap(); assert_eq!(ret, None); - let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let input_type = [ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::string_datatype(), + ]; + + let mut input1_batch = + Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap(); assert_eq!( ret_batch, @@ -974,7 +995,8 @@ mod test { .unwrap(); assert_eq!(ret, Some(Row::pack(vec![Value::from(11)]))); - let mut input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap(); + let mut input2_batch = + Batch::try_from_rows_with_types(vec![Row::new(input2)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch).unwrap(); assert_eq!( ret_batch, @@ -1027,7 +1049,14 @@ mod test { let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty()); assert!(matches!(ret, Err(EvalError::InvalidArgument { .. }))); - let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let input_type = [ + ConcreteDataType::int64_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ]; + let mut input1_batch = + Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch); assert!(matches!(ret_batch, Err(EvalError::InvalidArgument { .. }))); @@ -1037,7 +1066,13 @@ mod test { .unwrap(); assert_eq!(ret, Some(Row::new(input2.clone()))); - let input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap(); + let input_type = [ + ConcreteDataType::int64_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ]; + let input2_batch = + Batch::try_from_rows_with_types(vec![Row::new(input2)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch.clone()).unwrap(); assert_eq!(ret_batch, input2_batch); @@ -1047,7 +1082,8 @@ mod test { .unwrap(); assert_eq!(ret, None); - let input3_batch = Batch::try_from_rows(vec![Row::new(input3)]).unwrap(); + let input3_batch = + Batch::try_from_rows_with_types(vec![Row::new(input3)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input3_batch.clone()).unwrap(); assert_eq!( ret_batch, @@ -1083,7 +1119,13 @@ mod test { let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty()); assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)]))); - let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let input_type = [ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ]; + let mut input1_batch = + Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap(); assert_eq!( From 201b68341a194d2b60250405a176419d808ef414 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 19 Dec 2024 15:02:53 +0800 Subject: [PATCH 5/5] refactor: per review --- src/flow/src/adapter.rs | 2 +- src/flow/src/compute/render/reduce.rs | 1 - src/flow/src/expr.rs | 7 ++----- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 52c7ffcc63c9..6d70377cf2aa 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -565,7 +565,7 @@ impl FlowWorkerManager { let default_interval = Duration::from_secs(1); let mut avg_spd = 0; // rows/sec let mut since_last_run = tokio::time::Instant::now(); - let run_per_trace = 5; + let run_per_trace = 10; let mut run_cnt = 0; loop { // TODO(discord9): only run when new inputs arrive or scheduled to diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 5376c466fbbe..1d0689c4032f 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -47,7 +47,6 @@ impl Context<'_, '_> { reduce_plan: &ReducePlan, output_type: &RelationType, ) -> Result, Error> { - common_telemetry::debug!("render reduce batch"); let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan { if !accum_plan.distinct_aggrs.is_empty() { NotImplementedSnafu { diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 95b99c9e208c..5dde62b43a69 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -240,11 +240,8 @@ impl Batch { dts.push(datatypes::prelude::ConcreteDataType::null_datatype()) } } - if self.batch.is_empty() { - other.batch.iter().map(|v| v.data_type()).collect_vec() - } else { - self.batch.iter().map(|v| v.data_type()).collect_vec() - } + + dts }; let batch_builders = dts