diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 79b57c4b25c1..5376c466fbbe 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -1585,7 +1585,11 @@ mod test { if let Some(expected) = expected.get(&now) { let batch = expected.iter().map(|v| Value::from(*v)).collect_vec(); - let batch = Batch::try_from_rows(vec![batch.into()]).unwrap(); + let batch = Batch::try_from_rows_with_types( + vec![batch.into()], + &[CDT::int64_datatype()], + ) + .unwrap(); assert_eq!(res.first(), Some(&batch)); } }); diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index c5e37956de36..95b99c9e208c 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -123,63 +123,6 @@ impl Batch { Ok(batch) } - /// Get batch from rows, will try best to determine data type - /// - /// for test purposes only - #[cfg(test)] - pub fn try_from_rows(rows: Vec) -> Result { - if rows.is_empty() { - return Ok(Self::empty()); - } - let len = rows.len(); - let mut builder = rows - .first() - .unwrap() - .iter() - .enumerate() - .map(|(i, v)| { - let mut ty = None; - if v.data_type().is_null() { - for row in rows.iter() { - if let Some(t) = row.get(i) - && !t.data_type().is_null() - { - ty = Some(t.data_type().clone()); - break; - } - } - } - // if all rows are null, use null type - let ty = ty.unwrap_or(datatypes::prelude::ConcreteDataType::null_datatype()); - - ty.create_mutable_vector(len) - }) - .collect_vec(); - for row in rows { - ensure!( - row.len() == builder.len(), - InvalidArgumentSnafu { - reason: format!( - "row length not match, expect {}, found {}", - builder.len(), - row.len() - ) - } - ); - for (idx, value) in row.iter().enumerate() { - builder[idx] - .try_push_value_ref(value.as_value_ref()) - .context(DataTypeSnafu { - msg: "Failed to convert rows to columns", - })?; - } - } - - let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec(); - let batch = Self::try_new(columns, len)?; - Ok(batch) - } - pub fn empty() -> Self { Self { batch: vec![], diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 373e467aba1b..f96d7827b6bd 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -908,20 +908,33 @@ mod test { .unwrap() .unwrap(); assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)])); - + let ty = [ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ]; // batch mode - let mut batch = Batch::try_from_rows(vec![Row::from(vec![ - Value::from(4), - Value::from(2), - Value::from(3), - ])]) + let mut batch = Batch::try_from_rows_with_types( + vec![Row::from(vec![ + Value::from(4), + Value::from(2), + Value::from(3), + ])], + &ty, + ) .unwrap(); let ret = safe_mfp.eval_batch_into(&mut batch).unwrap(); assert_eq!( ret, - Batch::try_from_rows(vec![Row::from(vec![Value::from(false), Value::from(true)])]) - .unwrap() + Batch::try_from_rows_with_types( + vec![Row::from(vec![Value::from(false), Value::from(true)])], + &[ + ConcreteDataType::boolean_datatype(), + ConcreteDataType::boolean_datatype(), + ], + ) + .unwrap() ); } @@ -956,7 +969,15 @@ mod test { .unwrap(); assert_eq!(ret, None); - let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let input_type = [ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::string_datatype(), + ]; + + let mut input1_batch = + Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap(); assert_eq!( ret_batch, @@ -974,7 +995,8 @@ mod test { .unwrap(); assert_eq!(ret, Some(Row::pack(vec![Value::from(11)]))); - let mut input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap(); + let mut input2_batch = + Batch::try_from_rows_with_types(vec![Row::new(input2)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch).unwrap(); assert_eq!( ret_batch, @@ -1027,7 +1049,14 @@ mod test { let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty()); assert!(matches!(ret, Err(EvalError::InvalidArgument { .. }))); - let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let input_type = [ + ConcreteDataType::int64_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ]; + let mut input1_batch = + Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch); assert!(matches!(ret_batch, Err(EvalError::InvalidArgument { .. }))); @@ -1037,7 +1066,13 @@ mod test { .unwrap(); assert_eq!(ret, Some(Row::new(input2.clone()))); - let input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap(); + let input_type = [ + ConcreteDataType::int64_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ]; + let input2_batch = + Batch::try_from_rows_with_types(vec![Row::new(input2)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch.clone()).unwrap(); assert_eq!(ret_batch, input2_batch); @@ -1047,7 +1082,8 @@ mod test { .unwrap(); assert_eq!(ret, None); - let input3_batch = Batch::try_from_rows(vec![Row::new(input3)]).unwrap(); + let input3_batch = + Batch::try_from_rows_with_types(vec![Row::new(input3)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input3_batch.clone()).unwrap(); assert_eq!( ret_batch, @@ -1083,7 +1119,13 @@ mod test { let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty()); assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)]))); - let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap(); + let input_type = [ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype(), + ]; + let mut input1_batch = + Batch::try_from_rows_with_types(vec![Row::new(input1)], &input_type).unwrap(); let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap(); assert_eq!(