diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index 2748b575f76ae..7dffbcd42d66b 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -27,8 +27,10 @@ use super::expr_in::InExpression; use super::expr_some_all::SomeAllExpression; use super::expr_udf::UdfExpression; use super::expr_vnode::VnodeExpression; -use super::wrapper::{Checked, EvalErrorReport, NonStrict}; -use super::InfallibleExpression; +use super::wrapper::checked::Checked; +use super::wrapper::non_strict::NonStrict; +use super::wrapper::EvalErrorReport; +use super::NonStrictExpression; use crate::expr::{ BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression, }; @@ -44,10 +46,10 @@ pub fn build_from_prost(prost: &ExprNode) -> Result { pub fn build_non_strict_from_prost( prost: &ExprNode, error_report: impl EvalErrorReport + 'static, -) -> Result { +) -> Result { ExprBuilder::new_non_strict(error_report) .build(prost) - .map(InfallibleExpression) + .map(NonStrictExpression) } /// Build an expression from protobuf with possibly some wrappers attached to each node. @@ -222,9 +224,9 @@ pub fn build_func_non_strict( ret_type: DataType, children: Vec, error_report: impl EvalErrorReport + 'static, -) -> Result { +) -> Result { let expr = build_func(func, ret_type, children)?; - let wrapped = InfallibleExpression(ExprBuilder::new_non_strict(error_report).wrap(expr)); + let wrapped = NonStrictExpression(ExprBuilder::new_non_strict(error_report).wrap(expr)); Ok(wrapped) } diff --git a/src/expr/core/src/expr/mod.rs b/src/expr/core/src/expr/mod.rs index 78955e7e871ab..7b65eb1fe8144 100644 --- a/src/expr/core/src/expr/mod.rs +++ b/src/expr/core/src/expr/mod.rs @@ -117,17 +117,17 @@ impl E { } #[derive(Debug)] -pub struct InfallibleExpression(E); +pub struct NonStrictExpression(E); -impl InfallibleExpression +impl NonStrictExpression where E: Expression, { - pub fn for_test(inner: E) -> InfallibleExpression + pub fn for_test(inner: E) -> NonStrictExpression where E: 'static, { - InfallibleExpression(inner.boxed()) + NonStrictExpression(inner.boxed()) } pub fn todo(inner: E) -> Self { diff --git a/src/expr/core/src/expr/wrapper/checked.rs b/src/expr/core/src/expr/wrapper/checked.rs index 1e049ad481010..b3b1375c4fa82 100644 --- a/src/expr/core/src/expr/wrapper/checked.rs +++ b/src/expr/core/src/expr/wrapper/checked.rs @@ -22,7 +22,7 @@ use crate::expr::{Expression, ValueImpl}; /// A wrapper of [`Expression`] that does extra checks after evaluation. #[derive(Debug)] -pub struct Checked(pub E); +pub(crate) struct Checked(pub E); // TODO: avoid the overhead of extra boxing. #[async_trait] diff --git a/src/expr/core/src/expr/wrapper/mod.rs b/src/expr/core/src/expr/wrapper/mod.rs index 48241d05de45c..c93da021c882a 100644 --- a/src/expr/core/src/expr/wrapper/mod.rs +++ b/src/expr/core/src/expr/wrapper/mod.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod checked; -mod non_strict; +pub(crate) mod checked; +pub(crate) mod non_strict; -pub use checked::Checked; -pub use non_strict::{EvalErrorReport, NonStrict}; +pub use non_strict::EvalErrorReport; diff --git a/src/expr/core/src/expr/wrapper/non_strict.rs b/src/expr/core/src/expr/wrapper/non_strict.rs index 0859cea27aa49..04213c194e622 100644 --- a/src/expr/core/src/expr/wrapper/non_strict.rs +++ b/src/expr/core/src/expr/wrapper/non_strict.rs @@ -46,7 +46,7 @@ impl EvalErrorReport for ! { /// - When an error occurs during chunk-level evaluation, recompute in row-based execution and pad /// with NULL for each failed row. /// - Report all error occurred during row-level evaluation to the [`EvalErrorReport`]. -pub struct NonStrict { +pub(crate) struct NonStrict { inner: E, report: R, } diff --git a/src/stream/src/executor/aggregation/mod.rs b/src/stream/src/executor/aggregation/mod.rs index b4d69152c2911..b25d95855b42c 100644 --- a/src/stream/src/executor/aggregation/mod.rs +++ b/src/stream/src/executor/aggregation/mod.rs @@ -21,7 +21,7 @@ use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_expr::aggregate::{AggCall, AggKind}; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; @@ -75,7 +75,7 @@ pub async fn agg_call_filter_res( } if let Some(ref filter) = agg_call.filter { - if let Bool(filter_res) = InfallibleExpression::todo(&**filter) + if let Bool(filter_res) = NonStrictExpression::todo(&**filter) .eval_infallible(chunk) .await .as_ref() diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index b86a2a3d67c15..5828ae1e15c1c 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -26,7 +26,7 @@ use risingwave_common::row::{self, once, OwnedRow, OwnedRow as RowData, Row}; use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl, ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::{ - build_func_non_strict, InfallibleExpression, InputRefExpression, LiteralExpression, + build_func_non_strict, InputRefExpression, LiteralExpression, NonStrictExpression, }; use risingwave_pb::expr::expr_node::Type as ExprNodeType; use risingwave_pb::expr::expr_node::Type::{ @@ -97,7 +97,7 @@ impl DynamicFilterExecutor, + condition: Option, ) -> Result<(Vec, Bitmap), StreamExecutorError> { let mut new_ops = Vec::with_capacity(chunk.capacity()); let mut new_visibility = BitmapBuilder::with_capacity(chunk.capacity()); diff --git a/src/stream/src/executor/filter.rs b/src/stream/src/executor/filter.rs index 236f203dcbe1e..1a1e645e44e6d 100644 --- a/src/stream/src/executor/filter.rs +++ b/src/stream/src/executor/filter.rs @@ -19,7 +19,7 @@ use risingwave_common::array::{Array, ArrayImpl, Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::Schema; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use super::*; @@ -34,14 +34,14 @@ pub struct FilterExecutor { /// Expression of the current filter, note that the filter must always have the same output for /// the same input. - expr: InfallibleExpression, + expr: NonStrictExpression, } impl FilterExecutor { pub fn new( ctx: ActorContextRef, input: Box, - expr: InfallibleExpression, + expr: NonStrictExpression, executor_id: u64, ) -> Self { let input_info = input.info(); diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index cca953022369a..4178012cb9d9e 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -28,7 +28,7 @@ use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, DefaultOrd, ToOwnedDatum}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use risingwave_expr::ExprError; use risingwave_storage::StateStore; use tokio::time::Instant; @@ -242,9 +242,9 @@ pub struct HashJoinExecutor, /// Optional non-equi join conditions - cond: Option, + cond: Option, /// Column indices of watermark output and offset expression of each inequality, respectively. - inequality_pairs: Vec<(Vec, Option)>, + inequality_pairs: Vec<(Vec, Option)>, /// The output watermark of each inequality condition and its value is the minimum of the /// calculation result of both side. It will be used to generate watermark into downstream /// and do state cleaning if `clean_state` field of that inequality is `true`. @@ -313,7 +313,7 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> { side_l: &'a mut JoinSide, side_r: &'a mut JoinSide, actual_output_data_types: &'a [DataType], - cond: &'a mut Option, + cond: &'a mut Option, inequality_watermarks: &'a [Option], chunk: StreamChunk, append_only_optimize: bool, @@ -448,8 +448,8 @@ impl HashJoinExecutor, executor_id: u64, - cond: Option, - inequality_pairs: Vec<(usize, usize, bool, Option)>, + cond: Option, + inequality_pairs: Vec<(usize, usize, bool, Option)>, op_info: String, state_table_l: StateTable, degree_state_table_l: StateTable, @@ -1327,7 +1327,7 @@ mod tests { (state_table, degree_state_table) } - fn create_cond(condition_text: Option) -> InfallibleExpression { + fn create_cond(condition_text: Option) -> NonStrictExpression { build_from_pretty( condition_text .as_deref() @@ -1339,7 +1339,7 @@ mod tests { with_condition: bool, null_safe: bool, condition_text: Option, - inequality_pairs: Vec<(usize, usize, bool, Option)>, + inequality_pairs: Vec<(usize, usize, bool, Option)>, ) -> (MessageSender, MessageSender, BoxedMessageStream) { let schema = Schema { fields: vec![ diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 7c8c44330c862..42d13d790da88 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -19,7 +19,7 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{DataChunk, Op}; use risingwave_common::types::Interval; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use risingwave_expr::ExprError; use super::error::StreamExecutorError; @@ -33,8 +33,8 @@ pub struct HopWindowExecutor { pub time_col_idx: usize, pub window_slide: Interval, pub window_size: Interval, - window_start_exprs: Vec, - window_end_exprs: Vec, + window_start_exprs: Vec, + window_end_exprs: Vec, pub output_indices: Vec, chunk_size: usize, } @@ -48,8 +48,8 @@ impl HopWindowExecutor { time_col_idx: usize, window_slide: Interval, window_size: Interval, - window_start_exprs: Vec, - window_end_exprs: Vec, + window_start_exprs: Vec, + window_end_exprs: Vec, output_indices: Vec, chunk_size: usize, ) -> Self { @@ -251,7 +251,7 @@ mod tests { use risingwave_common::types::test_utils::IntervalTestExt; use risingwave_common::types::{DataType, Interval}; use risingwave_expr::expr::test_utils::make_hop_window_expression; - use risingwave_expr::expr::InfallibleExpression; + use risingwave_expr::expr::NonStrictExpression; use crate::executor::test_utils::MockSource; use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk}; @@ -305,11 +305,11 @@ mod tests { window_size, window_start_exprs .into_iter() - .map(InfallibleExpression::for_test) + .map(NonStrictExpression::for_test) .collect(), window_end_exprs .into_iter() - .map(InfallibleExpression::for_test) + .map(NonStrictExpression::for_test) .collect(), output_indices, CHUNK_SIZE, diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 1ae92bf3dcd18..cd505093294f1 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -152,7 +152,7 @@ async fn test_merger_sum_aggr() { vec![], vec![ // TODO: use the new streaming_if_null expression here, and add `None` tests - InfallibleExpression::for_test(InputRefExpression::new(DataType::Int64, 1)), + NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)), ], 3, MultiMap::new(), diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index e3a341ba3a5d0..c28d6ec8564d9 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -31,7 +31,7 @@ use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_common::util::tracing::TracingContext; use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt}; use risingwave_connector::source::SplitImpl; -use risingwave_expr::expr::{Expression, InfallibleExpression}; +use risingwave_expr::expr::{Expression, NonStrictExpression}; use risingwave_pb::data::PbEpoch; use risingwave_pb::expr::PbInputRef; use risingwave_pb::stream_plan::barrier::{BarrierKind, PbMutation}; @@ -641,7 +641,7 @@ impl Watermark { pub async fn transform_with_expr( self, - expr: &InfallibleExpression, + expr: &NonStrictExpression, new_col_idx: usize, ) -> Option { let Self { col_idx, val, .. } = self; diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 2e40568a7167f..8cfebfecd3f33 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -21,7 +21,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use super::*; @@ -38,7 +38,7 @@ struct Inner { info: ExecutorInfo, /// Expressions of the current projection. - exprs: Vec, + exprs: Vec, /// All the watermark derivations, (input_column_index, output_column_index). And the /// derivation expression is the project's expression itself. watermark_derivations: MultiMap, @@ -58,7 +58,7 @@ impl ProjectExecutor { ctx: ActorContextRef, input: Box, pk_indices: PkIndices, - exprs: Vec, + exprs: Vec, executor_id: u64, watermark_derivations: MultiMap, nondecreasing_expr_indices: Vec, @@ -346,7 +346,7 @@ mod tests { let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)"); let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)"); - let c_expr = InfallibleExpression::for_test(DummyNondecreasingExpr); + let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr); let project = Box::new(ProjectExecutor::new( ActorContext::create(123), diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 8bf94b86b69ea..53e1d7a6277bd 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -24,7 +24,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::{DataType, Datum, DatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use risingwave_expr::table_function::ProjectSetSelectItem; use super::error::StreamExecutorError; @@ -262,7 +262,7 @@ impl Inner { watermark .clone() .transform_with_expr( - &InfallibleExpression::todo(expr), + &NonStrictExpression::todo(expr), expr_idx + PROJ_ROW_ID_OFFSET, ) .await diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index bb456ed570d66..82c1e56649672 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -32,7 +32,7 @@ use risingwave_common::hash::{HashKey, NullBitmap}; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; @@ -57,7 +57,7 @@ pub struct TemporalJoinExecutor, right_join_keys: Vec, null_safe: Vec, - condition: Option, + condition: Option, output_indices: Vec, pk_indices: PkIndices, schema: Schema, @@ -338,7 +338,7 @@ impl TemporalJoinExecutor left_join_keys: Vec, right_join_keys: Vec, null_safe: Vec, - condition: Option, + condition: Option, pk_indices: PkIndices, output_indices: Vec, table_output_indices: Vec, diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 7d3c48de3a5e5..13a9237cf0159 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -264,10 +264,10 @@ pub trait StreamExecutorTestExt: MessageStream + Unpin { impl StreamExecutorTestExt for BoxedMessageStream {} pub mod expr { - use risingwave_expr::expr::InfallibleExpression; + use risingwave_expr::expr::NonStrictExpression; - pub fn build_from_pretty(s: impl AsRef) -> InfallibleExpression { - InfallibleExpression::for_test(risingwave_expr::expr::build_from_pretty(s)) + pub fn build_from_pretty(s: impl AsRef) -> NonStrictExpression { + NonStrictExpression::for_test(risingwave_expr::expr::build_from_pretty(s)) } } diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index bb6017ae14410..8c09b56aa3551 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -21,7 +21,7 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::ensure; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use tokio::sync::mpsc::UnboundedReceiver; use super::{ @@ -40,7 +40,7 @@ pub struct ValuesExecutor { barrier_receiver: UnboundedReceiver, progress: CreateMviewProgress, - rows: vec::IntoIter>, + rows: vec::IntoIter>, pk_indices: PkIndices, identity: String, schema: Schema, @@ -51,7 +51,7 @@ impl ValuesExecutor { pub fn new( ctx: ActorContextRef, progress: CreateMviewProgress, - rows: Vec>, + rows: Vec>, schema: Schema, barrier_receiver: UnboundedReceiver, executor_id: u64, @@ -167,7 +167,7 @@ mod tests { }; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, ScalarImpl, StructType}; - use risingwave_expr::expr::{BoxedExpression, InfallibleExpression, LiteralExpression}; + use risingwave_expr::expr::{BoxedExpression, LiteralExpression, NonStrictExpression}; use tokio::sync::mpsc::unbounded_channel; use super::ValuesExecutor; @@ -217,7 +217,7 @@ mod tests { progress, vec![exprs .into_iter() - .map(InfallibleExpression::for_test) + .map(NonStrictExpression::for_test) .collect()], Schema { fields }, barrier_receiver, diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 01a6744addafd..5e5454cecff93 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -23,8 +23,8 @@ use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, DefaultOrd, ScalarImpl}; use risingwave_common::{bail, row}; use risingwave_expr::expr::{ - build_func_non_strict, ExpressionBoxExt, InfallibleExpression, InputRefExpression, - LiteralExpression, + build_func_non_strict, ExpressionBoxExt, InputRefExpression, LiteralExpression, + NonStrictExpression, }; use risingwave_expr::Result as ExprResult; use risingwave_pb::expr::expr_node::Type; @@ -45,7 +45,7 @@ use crate::task::ActorEvalErrorReport; pub struct WatermarkFilterExecutor { input: BoxedExecutor, /// The expression used to calculate the watermark value. - watermark_expr: InfallibleExpression, + watermark_expr: NonStrictExpression, /// The column we should generate watermark and filter on. event_time_col_idx: usize, ctx: ActorContextRef, @@ -56,7 +56,7 @@ pub struct WatermarkFilterExecutor { impl WatermarkFilterExecutor { pub fn new( input: BoxedExecutor, - watermark_expr: InfallibleExpression, + watermark_expr: NonStrictExpression, event_time_col_idx: usize, ctx: ActorContextRef, table: StateTable, @@ -299,7 +299,7 @@ impl WatermarkFilterExecutor { event_time_col_idx: usize, watermark: ScalarImpl, eval_error_report: ActorEvalErrorReport, - ) -> ExprResult { + ) -> ExprResult { build_func_non_strict( Type::GreaterThanOrEqual, DataType::Boolean, diff --git a/src/stream/src/from_proto/hash_join.rs b/src/stream/src/from_proto/hash_join.rs index c3eed40b18391..87174282e517a 100644 --- a/src/stream/src/from_proto/hash_join.rs +++ b/src/stream/src/from_proto/hash_join.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use risingwave_common::hash::{HashKey, HashKeyDispatcher}; use risingwave_common::types::DataType; use risingwave_expr::expr::{ - build_func_non_strict, build_non_strict_from_prost, InfallibleExpression, InputRefExpression, + build_func_non_strict, build_non_strict_from_prost, InputRefExpression, NonStrictExpression, }; pub use risingwave_pb::expr::expr_node::Type as ExprType; use risingwave_pb::plan_common::JoinType as JoinTypeProto; @@ -176,8 +176,8 @@ struct HashJoinExecutorDispatcherArgs { pk_indices: PkIndices, output_indices: Vec, executor_id: u64, - cond: Option, - inequality_pairs: Vec<(usize, usize, bool, Option)>, + cond: Option, + inequality_pairs: Vec<(usize, usize, bool, Option)>, op_info: String, state_table_l: StateTable, degree_state_table_l: StateTable, diff --git a/src/stream/src/from_proto/temporal_join.rs b/src/stream/src/from_proto/temporal_join.rs index 758f1aac96b5a..58699089e8c27 100644 --- a/src/stream/src/from_proto/temporal_join.rs +++ b/src/stream/src/from_proto/temporal_join.rs @@ -18,7 +18,7 @@ use risingwave_common::catalog::{ColumnDesc, TableId, TableOption}; use risingwave_common::hash::{HashKey, HashKeyDispatcher}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; -use risingwave_expr::expr::{build_non_strict_from_prost, InfallibleExpression}; +use risingwave_expr::expr::{build_non_strict_from_prost, NonStrictExpression}; use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc}; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::Distribution; @@ -190,7 +190,7 @@ struct TemporalJoinExecutorDispatcherArgs { left_join_keys: Vec, right_join_keys: Vec, null_safe: Vec, - condition: Option, + condition: Option, pk_indices: PkIndices, output_indices: Vec, table_output_indices: Vec, diff --git a/src/stream/tests/integration_tests/hop_window.rs b/src/stream/tests/integration_tests/hop_window.rs index 04ccd8dbf51ba..9d6d879240fc0 100644 --- a/src/stream/tests/integration_tests/hop_window.rs +++ b/src/stream/tests/integration_tests/hop_window.rs @@ -15,7 +15,7 @@ use risingwave_common::types::test_utils::IntervalTestExt; use risingwave_common::types::{Interval, Timestamp}; use risingwave_expr::expr::test_utils::make_hop_window_expression; -use risingwave_expr::expr::InfallibleExpression; +use risingwave_expr::expr::NonStrictExpression; use risingwave_stream::executor::{ExecutorInfo, HopWindowExecutor}; use crate::prelude::*; @@ -58,11 +58,11 @@ fn create_executor(output_indices: Vec) -> (MessageSender, BoxedMessageSt window_size, window_start_exprs .into_iter() - .map(InfallibleExpression::for_test) + .map(NonStrictExpression::for_test) .collect(), window_end_exprs .into_iter() - .map(InfallibleExpression::for_test) + .map(NonStrictExpression::for_test) .collect(), output_indices, CHUNK_SIZE,