Skip to content

Commit

Permalink
rename to non strict expression
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 17, 2023
1 parent 0058837 commit 0de5792
Show file tree
Hide file tree
Showing 21 changed files with 73 additions and 72 deletions.
14 changes: 8 additions & 6 deletions src/expr/core/src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use super::expr_in::InExpression;
use super::expr_some_all::SomeAllExpression;
use super::expr_udf::UdfExpression;
use super::expr_vnode::VnodeExpression;
use super::wrapper::{Checked, EvalErrorReport, NonStrict};
use super::InfallibleExpression;
use super::wrapper::checked::Checked;
use super::wrapper::non_strict::NonStrict;
use super::wrapper::EvalErrorReport;
use super::NonStrictExpression;
use crate::expr::{
BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
};
Expand All @@ -44,10 +46,10 @@ pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
pub fn build_non_strict_from_prost(
prost: &ExprNode,
error_report: impl EvalErrorReport + 'static,
) -> Result<InfallibleExpression> {
) -> Result<NonStrictExpression> {
ExprBuilder::new_non_strict(error_report)
.build(prost)
.map(InfallibleExpression)
.map(NonStrictExpression)
}

/// Build an expression from protobuf with possibly some wrappers attached to each node.
Expand Down Expand Up @@ -222,9 +224,9 @@ pub fn build_func_non_strict(
ret_type: DataType,
children: Vec<BoxedExpression>,
error_report: impl EvalErrorReport + 'static,
) -> Result<InfallibleExpression> {
) -> Result<NonStrictExpression> {
let expr = build_func(func, ret_type, children)?;
let wrapped = InfallibleExpression(ExprBuilder::new_non_strict(error_report).wrap(expr));
let wrapped = NonStrictExpression(ExprBuilder::new_non_strict(error_report).wrap(expr));

Ok(wrapped)
}
Expand Down
8 changes: 4 additions & 4 deletions src/expr/core/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,17 @@ impl<E: Expression + 'static> E {
}

#[derive(Debug)]
pub struct InfallibleExpression<E = BoxedExpression>(E);
pub struct NonStrictExpression<E = BoxedExpression>(E);

impl<E> InfallibleExpression<E>
impl<E> NonStrictExpression<E>
where
E: Expression,
{
pub fn for_test(inner: E) -> InfallibleExpression
pub fn for_test(inner: E) -> NonStrictExpression
where
E: 'static,
{
InfallibleExpression(inner.boxed())
NonStrictExpression(inner.boxed())
}

pub fn todo(inner: E) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/expr/wrapper/checked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::expr::{Expression, ValueImpl};

/// A wrapper of [`Expression`] that does extra checks after evaluation.
#[derive(Debug)]
pub struct Checked<E>(pub E);
pub(crate) struct Checked<E>(pub E);

// TODO: avoid the overhead of extra boxing.
#[async_trait]
Expand Down
7 changes: 3 additions & 4 deletions src/expr/core/src/expr/wrapper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod checked;
mod non_strict;
pub(crate) mod checked;
pub(crate) mod non_strict;

pub use checked::Checked;
pub use non_strict::{EvalErrorReport, NonStrict};
pub use non_strict::EvalErrorReport;
2 changes: 1 addition & 1 deletion src/expr/core/src/expr/wrapper/non_strict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl EvalErrorReport for ! {
/// - When an error occurs during chunk-level evaluation, recompute in row-based execution and pad
/// with NULL for each failed row.
/// - Report all error occurred during row-level evaluation to the [`EvalErrorReport`].
pub struct NonStrict<E, R> {
pub(crate) struct NonStrict<E, R> {
inner: E,
report: R,
}
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_expr::aggregate::{AggCall, AggKind};
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTable;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub async fn agg_call_filter_res(
}

if let Some(ref filter) = agg_call.filter {
if let Bool(filter_res) = InfallibleExpression::todo(&**filter)
if let Bool(filter_res) = NonStrictExpression::todo(&**filter)
.eval_infallible(chunk)
.await
.as_ref()
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::row::{self, once, OwnedRow, OwnedRow as RowData, Row};
use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl, ToDatumRef, ToOwnedDatum};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::{
build_func_non_strict, InfallibleExpression, InputRefExpression, LiteralExpression,
build_func_non_strict, InputRefExpression, LiteralExpression, NonStrictExpression,
};
use risingwave_pb::expr::expr_node::Type as ExprNodeType;
use risingwave_pb::expr::expr_node::Type::{
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<S: StateStore, const USE_WATERMARK_CACHE: bool> DynamicFilterExecutor<S, US
async fn apply_batch(
&mut self,
chunk: &StreamChunk,
condition: Option<InfallibleExpression>,
condition: Option<NonStrictExpression>,
) -> Result<(Vec<Op>, Bitmap), StreamExecutorError> {
let mut new_ops = Vec::with_capacity(chunk.capacity());
let mut new_visibility = BitmapBuilder::with_capacity(chunk.capacity());
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::array::{Array, ArrayImpl, Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::Schema;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;

use super::*;

Expand All @@ -34,14 +34,14 @@ pub struct FilterExecutor {

/// Expression of the current filter, note that the filter must always have the same output for
/// the same input.
expr: InfallibleExpression,
expr: NonStrictExpression,
}

impl FilterExecutor {
pub fn new(
ctx: ActorContextRef,
input: Box<dyn Executor>,
expr: InfallibleExpression,
expr: NonStrictExpression,
executor_id: u64,
) -> Self {
let input_info = input.info();
Expand Down
16 changes: 8 additions & 8 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, DefaultOrd, ToOwnedDatum};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;
use risingwave_expr::ExprError;
use risingwave_storage::StateStore;
use tokio::time::Instant;
Expand Down Expand Up @@ -242,9 +242,9 @@ pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitiv
/// The parameters of the right join executor
side_r: JoinSide<K, S>,
/// Optional non-equi join conditions
cond: Option<InfallibleExpression>,
cond: Option<NonStrictExpression>,
/// Column indices of watermark output and offset expression of each inequality, respectively.
inequality_pairs: Vec<(Vec<usize>, Option<InfallibleExpression>)>,
inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
/// The output watermark of each inequality condition and its value is the minimum of the
/// calculation result of both side. It will be used to generate watermark into downstream
/// and do state cleaning if `clean_state` field of that inequality is `true`.
Expand Down Expand Up @@ -313,7 +313,7 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
side_l: &'a mut JoinSide<K, S>,
side_r: &'a mut JoinSide<K, S>,
actual_output_data_types: &'a [DataType],
cond: &'a mut Option<InfallibleExpression>,
cond: &'a mut Option<NonStrictExpression>,
inequality_watermarks: &'a [Option<Watermark>],
chunk: StreamChunk,
append_only_optimize: bool,
Expand Down Expand Up @@ -448,8 +448,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
pk_indices: PkIndices,
output_indices: Vec<usize>,
executor_id: u64,
cond: Option<InfallibleExpression>,
inequality_pairs: Vec<(usize, usize, bool, Option<InfallibleExpression>)>,
cond: Option<NonStrictExpression>,
inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
op_info: String,
state_table_l: StateTable<S>,
degree_state_table_l: StateTable<S>,
Expand Down Expand Up @@ -1327,7 +1327,7 @@ mod tests {
(state_table, degree_state_table)
}

fn create_cond(condition_text: Option<String>) -> InfallibleExpression {
fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
build_from_pretty(
condition_text
.as_deref()
Expand All @@ -1339,7 +1339,7 @@ mod tests {
with_condition: bool,
null_safe: bool,
condition_text: Option<String>,
inequality_pairs: Vec<(usize, usize, bool, Option<InfallibleExpression>)>,
inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
) -> (MessageSender, MessageSender, BoxedMessageStream) {
let schema = Schema {
fields: vec![
Expand Down
16 changes: 8 additions & 8 deletions src/stream/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::types::Interval;
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;
use risingwave_expr::ExprError;

use super::error::StreamExecutorError;
Expand All @@ -33,8 +33,8 @@ pub struct HopWindowExecutor {
pub time_col_idx: usize,
pub window_slide: Interval,
pub window_size: Interval,
window_start_exprs: Vec<InfallibleExpression>,
window_end_exprs: Vec<InfallibleExpression>,
window_start_exprs: Vec<NonStrictExpression>,
window_end_exprs: Vec<NonStrictExpression>,
pub output_indices: Vec<usize>,
chunk_size: usize,
}
Expand All @@ -48,8 +48,8 @@ impl HopWindowExecutor {
time_col_idx: usize,
window_slide: Interval,
window_size: Interval,
window_start_exprs: Vec<InfallibleExpression>,
window_end_exprs: Vec<InfallibleExpression>,
window_start_exprs: Vec<NonStrictExpression>,
window_end_exprs: Vec<NonStrictExpression>,
output_indices: Vec<usize>,
chunk_size: usize,
) -> Self {
Expand Down Expand Up @@ -251,7 +251,7 @@ mod tests {
use risingwave_common::types::test_utils::IntervalTestExt;
use risingwave_common::types::{DataType, Interval};
use risingwave_expr::expr::test_utils::make_hop_window_expression;
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;

use crate::executor::test_utils::MockSource;
use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk};
Expand Down Expand Up @@ -305,11 +305,11 @@ mod tests {
window_size,
window_start_exprs
.into_iter()
.map(InfallibleExpression::for_test)
.map(NonStrictExpression::for_test)
.collect(),
window_end_exprs
.into_iter()
.map(InfallibleExpression::for_test)
.map(NonStrictExpression::for_test)
.collect(),
output_indices,
CHUNK_SIZE,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async fn test_merger_sum_aggr() {
vec![],
vec![
// TODO: use the new streaming_if_null expression here, and add `None` tests
InfallibleExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
],
3,
MultiMap::new(),
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common::util::epoch::{Epoch, EpochPair};
use risingwave_common::util::tracing::TracingContext;
use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
use risingwave_connector::source::SplitImpl;
use risingwave_expr::expr::{Expression, InfallibleExpression};
use risingwave_expr::expr::{Expression, NonStrictExpression};
use risingwave_pb::data::PbEpoch;
use risingwave_pb::expr::PbInputRef;
use risingwave_pb::stream_plan::barrier::{BarrierKind, PbMutation};
Expand Down Expand Up @@ -641,7 +641,7 @@ impl Watermark {

pub async fn transform_with_expr(
self,
expr: &InfallibleExpression<impl Expression>,
expr: &NonStrictExpression<impl Expression>,
new_col_idx: usize,
) -> Option<Self> {
let Self { col_idx, val, .. } = self;
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;

use super::*;

Expand All @@ -38,7 +38,7 @@ struct Inner {
info: ExecutorInfo,

/// Expressions of the current projection.
exprs: Vec<InfallibleExpression>,
exprs: Vec<NonStrictExpression>,
/// All the watermark derivations, (input_column_index, output_column_index). And the
/// derivation expression is the project's expression itself.
watermark_derivations: MultiMap<usize, usize>,
Expand All @@ -58,7 +58,7 @@ impl ProjectExecutor {
ctx: ActorContextRef,
input: Box<dyn Executor>,
pk_indices: PkIndices,
exprs: Vec<InfallibleExpression>,
exprs: Vec<NonStrictExpression>,
executor_id: u64,
watermark_derivations: MultiMap<usize, usize>,
nondecreasing_expr_indices: Vec<usize>,
Expand Down Expand Up @@ -346,7 +346,7 @@ mod tests {

let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
let c_expr = InfallibleExpression::for_test(DummyNondecreasingExpr);
let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);

let project = Box::new(ProjectExecutor::new(
ActorContext::create(123),
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::{DataType, Datum, DatumRef, ToOwnedDatum};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;
use risingwave_expr::table_function::ProjectSetSelectItem;

use super::error::StreamExecutorError;
Expand Down Expand Up @@ -262,7 +262,7 @@ impl Inner {
watermark
.clone()
.transform_with_expr(
&InfallibleExpression::todo(expr),
&NonStrictExpression::todo(expr),
expr_idx + PROJ_ROW_ID_OFFSET,
)
.await
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use risingwave_common::hash::{HashKey, NullBitmap};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;
use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
Expand All @@ -57,7 +57,7 @@ pub struct TemporalJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrim
left_join_keys: Vec<usize>,
right_join_keys: Vec<usize>,
null_safe: Vec<bool>,
condition: Option<InfallibleExpression>,
condition: Option<NonStrictExpression>,
output_indices: Vec<usize>,
pk_indices: PkIndices,
schema: Schema,
Expand Down Expand Up @@ -338,7 +338,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor
left_join_keys: Vec<usize>,
right_join_keys: Vec<usize>,
null_safe: Vec<bool>,
condition: Option<InfallibleExpression>,
condition: Option<NonStrictExpression>,
pk_indices: PkIndices,
output_indices: Vec<usize>,
table_output_indices: Vec<usize>,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ pub trait StreamExecutorTestExt: MessageStream + Unpin {
impl StreamExecutorTestExt for BoxedMessageStream {}

pub mod expr {
use risingwave_expr::expr::InfallibleExpression;
use risingwave_expr::expr::NonStrictExpression;

pub fn build_from_pretty(s: impl AsRef<str>) -> InfallibleExpression {
InfallibleExpression::for_test(risingwave_expr::expr::build_from_pretty(s))
pub fn build_from_pretty(s: impl AsRef<str>) -> NonStrictExpression {
NonStrictExpression::for_test(risingwave_expr::expr::build_from_pretty(s))
}
}

Expand Down
Loading

0 comments on commit 0de5792

Please sign in to comment.