=
diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs
index a499a9c6ea3d3..07d2a6c7653e7 100644
--- a/src/frontend/src/optimizer/plan_node/logical_scan.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs
@@ -273,7 +273,7 @@ impl LogicalScan {
self.output_col_idx().to_vec(),
self.core.table_desc.clone(),
self.indexes().to_vec(),
- self.base.ctx.clone(),
+ self.base.ctx().clone(),
predicate,
self.for_system_time_as_of_proctime(),
self.table_cardinality(),
@@ -288,7 +288,7 @@ impl LogicalScan {
output_col_idx,
self.core.table_desc.clone(),
self.indexes().to_vec(),
- self.base.ctx.clone(),
+ self.base.ctx().clone(),
self.predicate().clone(),
self.for_system_time_as_of_proctime(),
self.table_cardinality(),
@@ -309,7 +309,7 @@ impl_plan_tree_node_for_leaf! {LogicalScan}
impl Distill for LogicalScan {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(5);
vec.push(("table", Pretty::from(self.table_name().to_owned())));
let key_is_columns =
@@ -440,7 +440,7 @@ impl LogicalScan {
let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
self.core.table_desc.clone(),
self.base
- .ctx
+ .ctx()
.session_ctx()
.config()
.get_max_split_range_gap(),
@@ -551,7 +551,7 @@ impl ToStream for LogicalScan {
None.into(),
)));
}
- match self.base.stream_key.is_none() {
+ match self.base.stream_key().is_none() {
true => {
let mut col_ids = HashSet::new();
diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs
index d924ee7180168..d6b5711740a98 100644
--- a/src/frontend/src/optimizer/plan_node/logical_share.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_share.rs
@@ -69,7 +69,7 @@ impl LogicalShare {
}
pub(super) fn pretty_fields<'a>(base: &PlanBase, name: &'a str) -> XmlNode<'a> {
- childless_record(name, vec![("id", Pretty::debug(&base.id.0))])
+ childless_record(name, vec![("id", Pretty::debug(&base.id().0))])
}
}
diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs
index 1d37da9eaa40f..45a5fbcb2240f 100644
--- a/src/frontend/src/optimizer/plan_node/logical_source.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_source.rs
@@ -28,6 +28,7 @@ use risingwave_connector::source::{ConnectorProperties, DataType};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::GeneratedColumnDesc;
+use super::generic::GenericPlanRef;
use super::stream_watermark_filter::StreamWatermarkFilter;
use super::utils::{childless_record, Distill};
use super::{
@@ -204,7 +205,7 @@ impl LogicalSource {
..self.core.clone()
};
let mut new_s3_plan: PlanRef = StreamSource {
- base: PlanBase::new_stream_with_logical(
+ base: PlanBase::new_stream_with_core(
&logical_source,
Distribution::Single,
true, // `list` will keep listing all objects, it must be append-only
@@ -506,7 +507,7 @@ impl PredicatePushdown for LogicalSource {
let mut new_conjunctions = Vec::with_capacity(predicate.conjunctions.len());
for expr in predicate.conjunctions {
- if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, &self.base.schema) {
+ if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, self.base.schema()) {
// Not recognized, so push back
new_conjunctions.push(e);
}
diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs
index 51e4e620cf4ca..1f02b026c0020 100644
--- a/src/frontend/src/optimizer/plan_node/logical_union.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_union.rs
@@ -130,7 +130,7 @@ impl ToBatch for LogicalUnion {
if !self.all() {
let batch_union = BatchUnion::new(new_logical).into();
Ok(BatchHashAgg::new(
- generic::Agg::new(vec![], (0..self.base.schema.len()).collect(), batch_union)
+ generic::Agg::new(vec![], (0..self.base.schema().len()).collect(), batch_union)
.with_enable_two_phase(false),
)
.into())
@@ -170,7 +170,7 @@ impl ToStream for LogicalUnion {
&self,
ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
- let original_schema = self.base.schema.clone();
+ let original_schema = self.base.schema().clone();
let original_schema_len = original_schema.len();
let mut rewrites = vec![];
for input in &self.core.inputs {
@@ -353,7 +353,7 @@ mod tests {
// Check the result
let union = plan.as_logical_union().unwrap();
- assert_eq!(union.base.schema.len(), 2);
+ assert_eq!(union.base.schema().len(), 2);
}
#[tokio::test]
diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs
index 80e4f350d8edb..1dbe1d3d3c5c9 100644
--- a/src/frontend/src/optimizer/plan_node/logical_update.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_update.rs
@@ -15,6 +15,7 @@
use risingwave_common::catalog::TableVersionId;
use risingwave_common::error::Result;
+use super::generic::GenericPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, LogicalProject,
diff --git a/src/frontend/src/optimizer/plan_node/logical_values.rs b/src/frontend/src/optimizer/plan_node/logical_values.rs
index c6a3d2ac0564e..e62c6400f2015 100644
--- a/src/frontend/src/optimizer/plan_node/logical_values.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_values.rs
@@ -21,6 +21,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::types::{DataType, ScalarImpl};
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{
BatchValues, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PredicatePushdown,
@@ -144,7 +145,7 @@ impl ColPrunable for LogicalValues {
.iter()
.map(|i| self.schema().fields[*i].clone())
.collect();
- Self::new(rows, Schema { fields }, self.base.ctx.clone()).into()
+ Self::new(rows, Schema { fields }, self.base.ctx().clone()).into()
}
}
diff --git a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
index 73f82e86aa260..9f2e8d94634be 100644
--- a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
+++ b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::hash::Hash;
+use super::generic::GenericPlanRef;
use super::{EndoPlan, LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary, VisitPlan};
use crate::optimizer::plan_visitor;
use crate::utils::{Endo, Visit};
diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs
index 188787c93b8c0..f16ebfb0c792c 100644
--- a/src/frontend/src/optimizer/plan_node/mod.rs
+++ b/src/frontend/src/optimizer/plan_node/mod.rs
@@ -46,7 +46,7 @@ use serde::Serialize;
use smallvec::SmallVec;
use self::batch::BatchPlanRef;
-use self::generic::GenericPlanRef;
+use self::generic::{GenericPlanRef, PhysicalPlanRef};
use self::stream::StreamPlanRef;
use self::utils::Distill;
use super::property::{Distribution, FunctionalDependencySet, Order};
@@ -419,29 +419,31 @@ impl PlanTreeNode for PlanRef {
}
}
-impl StreamPlanRef for PlanRef {
- fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
+impl PlanNodeMeta for PlanRef {
+ fn node_type(&self) -> PlanNodeType {
+ self.0.node_type()
}
- fn append_only(&self) -> bool {
- self.plan_base().append_only
+ fn plan_base(&self) -> &PlanBase {
+ self.0.plan_base()
}
- fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
+ fn convention(&self) -> Convention {
+ self.0.convention()
}
}
-impl BatchPlanRef for PlanRef {
- fn order(&self) -> &Order {
- &self.plan_base().order
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+impl GenericPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn id(&self) -> PlanNodeId {
+ self.plan_base().id()
}
-}
-impl GenericPlanRef for PlanRef {
fn schema(&self) -> &Schema {
- &self.plan_base().schema
+ self.plan_base().schema()
}
fn stream_key(&self) -> Option<&[usize]> {
@@ -457,6 +459,47 @@ impl GenericPlanRef for PlanRef {
}
}
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Stream` or `Batch`.
+impl
PhysicalPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn distribution(&self) -> &Distribution {
+ self.plan_base().distribution()
+ }
+}
+
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Stream`.
+impl
StreamPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn append_only(&self) -> bool {
+ self.plan_base().append_only()
+ }
+
+ fn emit_on_window_close(&self) -> bool {
+ self.plan_base().emit_on_window_close()
+ }
+
+ fn watermark_columns(&self) -> &FixedBitSet {
+ self.plan_base().watermark_columns()
+ }
+}
+
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Batch`.
+impl
BatchPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn order(&self) -> &Order {
+ self.plan_base().order()
+ }
+}
+
/// In order to let expression display id started from 1 for explaining, hidden column names and
/// other places. We will reset expression display id to 0 and clone the whole plan to reset the
/// schema.
@@ -512,15 +555,15 @@ pub(crate) fn pretty_config() -> PrettyConfig {
impl dyn PlanNode {
pub fn id(&self) -> PlanNodeId {
- self.plan_base().id
+ self.plan_base().id()
}
pub fn ctx(&self) -> OptimizerContextRef {
- self.plan_base().ctx.clone()
+ self.plan_base().ctx().clone()
}
pub fn schema(&self) -> &Schema {
- &self.plan_base().schema
+ self.plan_base().schema()
}
pub fn stream_key(&self) -> Option<&[usize]> {
@@ -528,27 +571,28 @@ impl dyn PlanNode {
}
pub fn order(&self) -> &Order {
- &self.plan_base().order
+ self.plan_base().order()
}
+ // TODO: avoid no manual delegation
pub fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
+ self.plan_base().distribution()
}
pub fn append_only(&self) -> bool {
- self.plan_base().append_only
+ self.plan_base().append_only()
}
pub fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
+ self.plan_base().emit_on_window_close()
}
pub fn functional_dependency(&self) -> &FunctionalDependencySet {
- &self.plan_base().functional_dependency
+ self.plan_base().functional_dependency()
}
pub fn watermark_columns(&self) -> &FixedBitSet {
- &self.plan_base().watermark_columns
+ self.plan_base().watermark_columns()
}
/// Serialize the plan node and its children to a stream plan proto.
diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs
index e9a5bf26885bf..51b1aa5f41141 100644
--- a/src/frontend/src/optimizer/plan_node/plan_base.rs
+++ b/src/frontend/src/optimizer/plan_node/plan_base.rs
@@ -14,47 +14,132 @@
use educe::Educe;
use fixedbitset::FixedBitSet;
-use paste::paste;
use risingwave_common::catalog::Schema;
use super::generic::GenericPlanNode;
use super::*;
-use crate::for_all_plan_nodes;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order};
-/// the common fields of all nodes, please make a field named `base` in
-/// every planNode and correctly value it when construct the planNode.
-#[derive(Clone, Debug, Educe)]
-#[educe(PartialEq, Eq, Hash)]
-pub struct PlanBase {
- #[educe(PartialEq(ignore))]
- #[educe(Hash(ignore))]
- pub id: PlanNodeId,
- #[educe(PartialEq(ignore))]
- #[educe(Hash(ignore))]
- pub ctx: OptimizerContextRef,
- pub schema: Schema,
- /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key
- pub stream_key: Option>,
- /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
- /// correctness, but insert unnecessary sort in plan
- pub order: Order,
+/// Common extra fields for physical plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct PhysicalCommonExtra {
/// The distribution property of the PlanNode's output, store an `Distribution::any()` here
/// will not affect correctness, but insert unnecessary exchange in plan
- pub dist: Distribution,
+ dist: Distribution,
+}
+
+/// Extra fields for stream plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct StreamExtra {
+ /// Common fields for physical plan nodes.
+ physical: PhysicalCommonExtra,
+
/// The append-only property of the PlanNode's output is a stream-only property. Append-only
/// means the stream contains only insert operation.
- pub append_only: bool,
+ append_only: bool,
/// Whether the output is emitted on window close.
- pub emit_on_window_close: bool,
- pub functional_dependency: FunctionalDependencySet,
+ emit_on_window_close: bool,
/// The watermark column indices of the PlanNode's output. There could be watermark output from
/// this stream operator.
- pub watermark_columns: FixedBitSet,
+ watermark_columns: FixedBitSet,
+}
+
+/// Extra fields for batch plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct BatchExtra {
+ /// Common fields for physical plan nodes.
+ physical: PhysicalCommonExtra,
+
+ /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
+ /// correctness, but insert unnecessary sort in plan
+ order: Order,
+}
+
+/// Extra fields for physical plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+enum PhysicalExtra {
+ Stream(StreamExtra),
+ Batch(BatchExtra),
+}
+
+impl PhysicalExtra {
+ fn common(&self) -> &PhysicalCommonExtra {
+ match self {
+ PhysicalExtra::Stream(stream) => &stream.physical,
+ PhysicalExtra::Batch(batch) => &batch.physical,
+ }
+ }
+
+ fn common_mut(&mut self) -> &mut PhysicalCommonExtra {
+ match self {
+ PhysicalExtra::Stream(stream) => &mut stream.physical,
+ PhysicalExtra::Batch(batch) => &mut batch.physical,
+ }
+ }
+
+ fn stream(&self) -> &StreamExtra {
+ match self {
+ PhysicalExtra::Stream(extra) => extra,
+ _ => panic!("access stream properties from batch plan node"),
+ }
+ }
+
+ fn batch(&self) -> &BatchExtra {
+ match self {
+ PhysicalExtra::Batch(extra) => extra,
+ _ => panic!("access batch properties from stream plan node"),
+ }
+ }
+}
+
+/// the common fields of all nodes, please make a field named `base` in
+/// every planNode and correctly value it when construct the planNode.
+///
+/// All fields are intentionally made private and immutable, as they should
+/// normally be the same as the given [`GenericPlanNode`] when constructing.
+///
+/// - To access them, use traits including [`GenericPlanRef`],
+/// [`PhysicalPlanRef`], [`StreamPlanRef`] and [`BatchPlanRef`].
+/// - To mutate them, use methods like `new_*` or `clone_with_*`.
+#[derive(Clone, Debug, Educe)]
+#[educe(PartialEq, Eq, Hash)]
+pub struct PlanBase {
+ // -- common fields --
+ #[educe(PartialEq(ignore), Hash(ignore))]
+ id: PlanNodeId,
+ #[educe(PartialEq(ignore), Hash(ignore))]
+ ctx: OptimizerContextRef,
+
+ schema: Schema,
+ /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key
+ // TODO: this is actually a logical and stream only property
+ stream_key: Option>,
+ functional_dependency: FunctionalDependencySet,
+
+ /// Extra fields if the plan node is physical.
+ physical_extra: Option,
+}
+
+impl PlanBase {
+ fn physical_extra(&self) -> &PhysicalExtra {
+ self.physical_extra
+ .as_ref()
+ .expect("access physical properties from logical plan node")
+ }
+
+ fn physical_extra_mut(&mut self) -> &mut PhysicalExtra {
+ self.physical_extra
+ .as_mut()
+ .expect("access physical properties from logical plan node")
+ }
}
impl generic::GenericPlanRef for PlanBase {
+ fn id(&self) -> PlanNodeId {
+ self.id
+ }
+
fn schema(&self) -> &Schema {
&self.schema
}
@@ -72,23 +157,29 @@ impl generic::GenericPlanRef for PlanBase {
}
}
-impl stream::StreamPlanRef for PlanBase {
+impl generic::PhysicalPlanRef for PlanBase {
fn distribution(&self) -> &Distribution {
- &self.dist
+ &self.physical_extra().common().dist
}
+}
+impl stream::StreamPlanRef for PlanBase {
fn append_only(&self) -> bool {
- self.append_only
+ self.physical_extra().stream().append_only
}
fn emit_on_window_close(&self) -> bool {
- self.emit_on_window_close
+ self.physical_extra().stream().emit_on_window_close
+ }
+
+ fn watermark_columns(&self) -> &FixedBitSet {
+ &self.physical_extra().stream().watermark_columns
}
}
impl batch::BatchPlanRef for PlanBase {
fn order(&self) -> &Order {
- &self.order
+ &self.physical_extra().batch().order
}
}
@@ -100,47 +191,22 @@ impl PlanBase {
functional_dependency: FunctionalDependencySet,
) -> Self {
let id = ctx.next_plan_node_id();
- let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
schema,
stream_key,
- dist: Distribution::Single,
- order: Order::any(),
- // Logical plan node won't touch `append_only` field
- append_only: true,
- emit_on_window_close: false,
functional_dependency,
- watermark_columns,
+ physical_extra: None,
}
}
- pub fn new_logical_with_core(node: &impl GenericPlanNode) -> Self {
+ pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
Self::new_logical(
- node.ctx(),
- node.schema(),
- node.stream_key(),
- node.functional_dependency(),
- )
- }
-
- pub fn new_stream_with_logical(
- logical: &impl GenericPlanNode,
- dist: Distribution,
- append_only: bool,
- emit_on_window_close: bool,
- watermark_columns: FixedBitSet,
- ) -> Self {
- Self::new_stream(
- logical.ctx(),
- logical.schema(),
- logical.stream_key(),
- logical.functional_dependency(),
- dist,
- append_only,
- emit_on_window_close,
- watermark_columns,
+ core.ctx(),
+ core.schema(),
+ core.stream_key(),
+ core.functional_dependency(),
)
}
@@ -160,22 +226,36 @@ impl PlanBase {
id,
ctx,
schema,
- dist,
- order: Order::any(),
stream_key,
- append_only,
- emit_on_window_close,
functional_dependency,
- watermark_columns,
+ physical_extra: Some(PhysicalExtra::Stream({
+ StreamExtra {
+ physical: PhysicalCommonExtra { dist },
+ append_only,
+ emit_on_window_close,
+ watermark_columns,
+ }
+ })),
}
}
- pub fn new_batch_from_logical(
- logical: &impl GenericPlanNode,
+ pub fn new_stream_with_core(
+ core: &impl GenericPlanNode,
dist: Distribution,
- order: Order,
+ append_only: bool,
+ emit_on_window_close: bool,
+ watermark_columns: FixedBitSet,
) -> Self {
- Self::new_batch(logical.ctx(), logical.schema(), dist, order)
+ Self::new_stream(
+ core.ctx(),
+ core.schema(),
+ core.stream_key(),
+ core.functional_dependency(),
+ dist,
+ append_only,
+ emit_on_window_close,
+ watermark_columns,
+ )
}
pub fn new_batch(
@@ -186,75 +266,49 @@ impl PlanBase {
) -> Self {
let id = ctx.next_plan_node_id();
let functional_dependency = FunctionalDependencySet::new(schema.len());
- let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
schema,
- dist,
- order,
stream_key: None,
- // Batch plan node won't touch `append_only` field
- append_only: true,
- emit_on_window_close: false, // TODO(rc): batch EOWC support?
functional_dependency,
- watermark_columns,
+ physical_extra: Some(PhysicalExtra::Batch({
+ BatchExtra {
+ physical: PhysicalCommonExtra { dist },
+ order,
+ }
+ })),
}
}
- pub fn derive_stream_plan_base(plan_node: &PlanRef) -> Self {
- PlanBase::new_stream(
- plan_node.ctx(),
- plan_node.schema().clone(),
- plan_node.stream_key().map(|v| v.to_vec()),
- plan_node.functional_dependency().clone(),
- plan_node.distribution().clone(),
- plan_node.append_only(),
- plan_node.emit_on_window_close(),
- plan_node.watermark_columns().clone(),
- )
+ pub fn new_batch_with_core(
+ core: &impl GenericPlanNode,
+ dist: Distribution,
+ order: Order,
+ ) -> Self {
+ Self::new_batch(core.ctx(), core.schema(), dist, order)
}
pub fn clone_with_new_plan_id(&self) -> Self {
let mut new = self.clone();
- new.id = self.ctx.next_plan_node_id();
+ new.id = self.ctx().next_plan_node_id();
+ new
+ }
+
+ /// Clone the plan node with a new distribution.
+ ///
+ /// Panics if the plan node is not physical.
+ pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
+ let mut new = self.clone();
+ new.physical_extra_mut().common_mut().dist = dist;
new
}
}
-macro_rules! impl_base_delegate {
- ($( { $convention:ident, $name:ident }),*) => {
- $(paste! {
- impl [<$convention $name>] {
- pub fn id(&self) -> PlanNodeId {
- self.plan_base().id
- }
- pub fn ctx(&self) -> OptimizerContextRef {
- self.plan_base().ctx()
- }
- pub fn schema(&self) -> &Schema {
- &self.plan_base().schema
- }
- pub fn stream_key(&self) -> Option<&[usize]> {
- self.plan_base().stream_key()
- }
- pub fn order(&self) -> &Order {
- &self.plan_base().order
- }
- pub fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
- }
- pub fn append_only(&self) -> bool {
- self.plan_base().append_only
- }
- pub fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
- }
- pub fn functional_dependency(&self) -> &FunctionalDependencySet {
- &self.plan_base().functional_dependency
- }
- }
- })*
+// Mutators for testing only.
+#[cfg(test)]
+impl PlanBase {
+ pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
+ &mut self.functional_dependency
}
}
-for_all_plan_nodes! { impl_base_delegate }
diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs
index 2edf997bf91fd..866c62c2413a5 100644
--- a/src/frontend/src/optimizer/plan_node/stream.rs
+++ b/src/frontend/src/optimizer/plan_node/stream.rs
@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use super::generic::GenericPlanRef;
-use crate::optimizer::property::Distribution;
+use fixedbitset::FixedBitSet;
-/// A subtrait of [`GenericPlanRef`] for stream plans.
+use super::generic::PhysicalPlanRef;
+
+/// A subtrait of [`PhysicalPlanRef`] for stream plans.
///
/// Due to the lack of refactoring, all plan nodes currently implement this trait
/// through [`super::PlanBase`]. One may still use this trait as a bound for
-/// expecting a stream plan, in contrast to [`GenericPlanRef`].
-pub trait StreamPlanRef: GenericPlanRef {
- fn distribution(&self) -> &Distribution;
+/// accessing a stream plan, in contrast to [`GenericPlanRef`] or
+/// [`PhysicalPlanRef`].
+///
+/// [`GenericPlanRef`]: super::generic::GenericPlanRef
+pub trait StreamPlanRef: PhysicalPlanRef {
fn append_only(&self) -> bool;
fn emit_on_window_close(&self) -> bool;
+ fn watermark_columns(&self) -> &FixedBitSet;
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs
index 6e96d0eab0e93..51b5e589e886e 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs
@@ -17,7 +17,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::DedupNode;
-use super::generic::{self, GenericPlanNode, GenericPlanRef};
+use super::generic::{self, GenericPlanNode, GenericPlanRef, PhysicalPlanRef};
use super::stream::StreamPlanRef;
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
@@ -37,7 +37,7 @@ impl StreamDedup {
// A dedup operator must be append-only.
assert!(input.append_only());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
true,
diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
index db9e6ac296bbf..bb18f9cffdf0f 100644
--- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
@@ -20,7 +20,7 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
-use super::generic::{self};
+use super::generic::{self, GenericPlanRef};
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
@@ -67,7 +67,7 @@ impl StreamDeltaJoin {
core.i2o_col_mapping().rewrite_bitset(&watermark_columns)
};
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
append_only,
@@ -90,7 +90,7 @@ impl StreamDeltaJoin {
impl Distill for StreamDeltaJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.core.join_type)));
diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs
index c9f969384c3a4..9b000974786e4 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dml.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs
@@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
index e1ca18da937e9..a4b74f37208e7 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
@@ -17,7 +17,8 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::DynamicFilterNode;
-use super::generic::DynamicFilter;
+use super::generic::{DynamicFilter, GenericPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, column_names_pretty, watermark_pretty, Distill};
use super::{generic, ExprRewritable};
use crate::expr::{Expr, ExprImpl};
@@ -37,7 +38,7 @@ impl StreamDynamicFilter {
let watermark_columns = core.watermark_columns(core.right().watermark_columns()[0]);
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
core.left().distribution().clone(),
false, /* we can have a new abstraction for append only and monotonically increasing
@@ -78,11 +79,11 @@ impl StreamDynamicFilter {
impl Distill for StreamDynamicFilter {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let pred = self.core.pretty_field();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("predicate", pred));
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
vec.push(("output", column_names_pretty(self.schema())));
diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
index 9418af8e4a364..d8c5a9635ce59 100644
--- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
@@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{self, PlanWindowFunction};
+use super::generic::{self, GenericPlanRef, PlanWindowFunction};
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -50,7 +50,7 @@ impl StreamEowcOverWindow {
// ancient rows in some rarely updated partitions that are emitted at the end of time.
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
true,
diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs
index 0fa1713bf4488..99e6c3c5161a1 100644
--- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs
@@ -16,6 +16,8 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Distribution, DistributionDisplay};
@@ -78,7 +80,7 @@ impl StreamExchange {
impl Distill for StreamExchange {
fn distill<'a>(&self) -> XmlNode<'a> {
let distribution_display = DistributionDisplay {
- distribution: &self.base.dist,
+ distribution: self.base.distribution(),
input_schema: self.input.schema(),
};
childless_record(
@@ -117,13 +119,13 @@ impl StreamNode for StreamExchange {
})
} else {
Some(DispatchStrategy {
- r#type: match &self.base.dist {
+ r#type: match &self.base.distribution() {
Distribution::HashShard(_) => DispatcherType::Hash,
Distribution::Single => DispatcherType::Simple,
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
- dist_key_indices: match &self.base.dist {
+ dist_key_indices: match &self.base.distribution() {
Distribution::HashShard(keys) => {
keys.iter().map(|num| *num as u32).collect()
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs
index e0f8852a19fb5..5959b8d6be4d2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_expand.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs
@@ -48,7 +48,7 @@ impl StreamExpand {
.map(|idx| idx + input.schema().len()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs
index ff4d344607776..0f000e6b8c0db 100644
--- a/src/frontend/src/optimizer/plan_node/stream_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs
@@ -34,7 +34,7 @@ impl StreamFilter {
let input = core.input.clone();
let dist = input.distribution().clone();
// Filter executor won't change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
index 190c05c0a5ba1..95fd72e9f6aa0 100644
--- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
@@ -48,7 +48,7 @@ impl_plan_tree_node_for_unary! { StreamFsFetch }
impl StreamFsFetch {
pub fn new(input: PlanRef, source: generic::Source) -> Self {
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&source,
Distribution::SomeShard,
source.catalog.as_ref().map_or(true, |s| s.append_only),
diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
index 14711d353f9d8..3e8f3c00206c4 100644
--- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
@@ -16,7 +16,8 @@ use fixedbitset::FixedBitSet;
use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{DistillUnit, TopNLimit};
+use super::generic::{DistillUnit, GenericPlanRef, TopNLimit};
+use super::stream::StreamPlanRef;
use super::utils::{plan_node_name, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::generic::GenericPlanNode;
@@ -135,7 +136,7 @@ impl Distill for StreamGroupTopN {
{ "append_only", self.input().append_only() },
);
let mut node = self.core.distill_with_name(name);
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
node.fields.push(("output_watermarks".into(), ow));
}
node
diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
index 5047dcb0e78ef..3edfb8256d5b2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
@@ -18,10 +18,11 @@ use pretty_xmlish::XmlNode;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{self, PlanAggCall};
+use super::generic::{self, GenericPlanRef, PlanAggCall};
use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};
@@ -85,7 +86,7 @@ impl StreamHashAgg {
}
// Hash agg executor might change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
emit_on_window_close, // in EOWC mode, we produce append only output
@@ -142,7 +143,7 @@ impl StreamHashAgg {
impl Distill for StreamHashAgg {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record(
@@ -214,7 +215,7 @@ impl StreamNode for StreamHashAgg {
})
.collect(),
row_count_index: self.row_count_idx as u32,
- emit_on_window_close: self.base.emit_on_window_close,
+ emit_on_window_close: self.base.emit_on_window_close(),
version: PbAggNodeVersion::Issue12140 as _,
})
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
index 0075b1730b4eb..9d9c41425c4b1 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
@@ -20,7 +20,8 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DeltaExpression, HashJoinNode, PbInequalityPair};
-use super::generic::Join;
+use super::generic::{GenericPlanRef, Join};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode,
@@ -178,7 +179,7 @@ impl StreamHashJoin {
};
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
append_only,
@@ -291,7 +292,7 @@ impl Distill for StreamHashJoin {
{ "interval", self.clean_left_state_conjunction_idx.is_some() && self.clean_right_state_conjunction_idx.is_some() },
{ "append_only", self.is_append_only },
);
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(6);
vec.push(("type", Pretty::debug(&self.core.join_type)));
@@ -316,7 +317,7 @@ impl Distill for StreamHashJoin {
if let Some(i) = self.clean_right_state_conjunction_idx {
vec.push(("conditions_to_clean_right_state_table", get_cond(i)));
}
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
index c68b1b307d470..e177be6942360 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
@@ -17,6 +17,8 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::HopWindowNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
@@ -56,7 +58,7 @@ impl StreamHopWindow {
)
.rewrite_bitset(&watermark_columns);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
@@ -75,7 +77,7 @@ impl StreamHopWindow {
impl Distill for StreamHopWindow {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record("StreamHopWindow", vec)
diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs
index fb17537bc90e6..9c87f1a34abbd 100644
--- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs
@@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use super::derive::derive_columns;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion};
use crate::catalog::FragmentId;
use crate::optimizer::plan_node::derive::derive_pk;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -149,7 +151,22 @@ impl StreamMaterialize {
TableType::MaterializedView => {
assert_matches!(user_distributed_by, RequiredDist::Any);
// ensure the same pk will not shuffle to different node
- RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key())
+ let required_dist =
+ RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
+
+ // If the input is a stream join, enforce the stream key as the materialized
+ // view distribution key to avoid slow backfilling caused by
+ // data skew of the dimension table join key.
+ // See for more information.
+ let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
+ || matches!(input.as_stream_temporal_join(), Some(_join))
+ || matches!(input.as_stream_delta_join(), Some(_join));
+
+ if is_stream_join {
+ return Ok(required_dist.enforce(input, &Order::any()));
+ }
+
+ required_dist
}
TableType::Index => {
assert_matches!(
@@ -273,8 +290,8 @@ impl Distill for StreamMaterialize {
vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
- let watermark_columns = &self.base.watermark_columns;
- if self.base.watermark_columns.count_ones(..) > 0 {
+ let watermark_columns = &self.base.watermark_columns();
+ if self.base.watermark_columns().count_ones(..) > 0 {
let watermark_column_names = watermark_columns
.ones()
.map(|i| table.columns()[i].name_with_hidden().to_string())
@@ -294,16 +311,16 @@ impl PlanTreeNodeUnary for StreamMaterialize {
fn clone_with_input(&self, input: PlanRef) -> Self {
let new = Self::new(input, self.table().clone());
new.base
- .schema
+ .schema()
.fields
.iter()
- .zip_eq_fast(self.base.schema.fields.iter())
+ .zip_eq_fast(self.base.schema().fields.iter())
.for_each(|(a, b)| {
assert_eq!(a.data_type, b.data_type);
assert_eq!(a.type_name, b.type_name);
assert_eq!(a.sub_fields, b.sub_fields);
});
- assert_eq!(new.plan_base().stream_key, self.plan_base().stream_key);
+ assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
new
}
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs
index 9eb0a0e0f143e..91ebc344fa51d 100644
--- a/src/frontend/src/optimizer/plan_node/stream_now.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_now.rs
@@ -19,8 +19,7 @@ use risingwave_common::types::DataType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::NowNode;
-use super::generic::GenericPlanRef;
-use super::stream::StreamPlanRef;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::utils::{childless_record, Distill, TableCatalogBuilder};
use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode};
use crate::optimizer::plan_node::utils::column_names_pretty;
@@ -59,7 +58,7 @@ impl StreamNow {
impl Distill for StreamNow {
fn distill<'a>(&self) -> XmlNode<'a> {
- let vec = if self.base.ctx.is_explain_verbose() {
+ let vec = if self.base.ctx().is_explain_verbose() {
vec![("output", column_names_pretty(self.schema()))]
} else {
vec![]
diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs
index 0d749f0c7b0e6..5a2f9d98f1340 100644
--- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs
@@ -21,6 +21,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use super::generic::{GenericPlanNode, PlanWindowFunction};
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::TableCatalog;
@@ -37,7 +38,7 @@ impl StreamOverWindow {
let input = &core.input;
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
false, // general over window cannot be append-only
@@ -122,7 +123,7 @@ impl StreamNode for StreamOverWindow {
.to_internal_table_prost();
let cache_policy = self
.base
- .ctx
+ .ctx()
.session_ctx()
.config()
.get_streaming_over_window_cache_policy();
diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs
index 8a7665881e0cf..c0ff0d1cf2f43 100644
--- a/src/frontend/src/optimizer/plan_node/stream_project.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_project.rs
@@ -17,6 +17,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::ProjectNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{try_derive_watermark, Expr, ExprImpl, ExprRewriter, WatermarkDerivation};
@@ -41,7 +43,7 @@ impl Distill for StreamProject {
let schema = self.schema();
let mut vec = self.core.fields_pretty(schema);
if let Some(display_output_watermarks) =
- watermark_pretty(&self.base.watermark_columns, schema)
+ watermark_pretty(self.base.watermark_columns(), schema)
{
vec.push(("output_watermarks", display_output_watermarks));
}
@@ -79,7 +81,7 @@ impl StreamProject {
}
// Project executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs
index cadd600f3c3b7..ba09d79c96c60 100644
--- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs
@@ -66,7 +66,7 @@ impl StreamProjectSet {
// ProjectSet executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs
index 8b406005f40a6..3acf0b132805e 100644
--- a/src/frontend/src/optimizer/plan_node/stream_share.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_share.rs
@@ -16,6 +16,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::PbStreamNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::Distill;
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode};
use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
@@ -34,7 +36,7 @@ impl StreamShare {
let input = core.input.borrow().0.clone();
let dist = input.distribution().clone();
// Filter executor won't change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
@@ -79,7 +81,7 @@ impl StreamNode for StreamShare {
impl StreamShare {
pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode {
- let operator_id = self.base.id.0 as u32;
+ let operator_id = self.base.id().0 as u32;
match state.get_share_stream_node(operator_id) {
None => {
diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
index 90a1ae6ead318..ef33d9a3d4d7a 100644
--- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
@@ -21,6 +21,8 @@ use super::generic::{self, PlanAggCall};
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::generic::PhysicalPlanRef;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -48,7 +50,7 @@ impl StreamSimpleAgg {
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
// Simple agg executor might change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns);
+ let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns);
StreamSimpleAgg {
base,
core,
@@ -99,7 +101,7 @@ impl StreamNode for StreamSimpleAgg {
.collect(),
distribution_key: self
.base
- .dist
+ .distribution()
.dist_column_indices()
.iter()
.map(|idx| *idx as u32)
diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs
index a51380d630331..32e9fb487910c 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sink.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs
@@ -37,6 +37,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use tracing::info;
use super::derive::{derive_columns, derive_pk};
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, StreamNode};
use crate::optimizer::plan_node::PlanTreeNodeUnary;
@@ -57,7 +58,7 @@ pub struct StreamSink {
impl StreamSink {
#[must_use]
pub fn new(input: PlanRef, sink_desc: SinkDesc) -> Self {
- let base = PlanBase::derive_stream_plan_base(&input);
+ let base = input.plan_base().clone_with_new_plan_id();
Self {
base,
input,
@@ -389,7 +390,7 @@ impl Distill for StreamSink {
.iter()
.map(|k| k.column_index)
.collect_vec(),
- schema: &self.base.schema,
+ schema: self.base.schema(),
};
vec.push(("pk", pk.distill()));
}
@@ -409,7 +410,7 @@ impl StreamNode for StreamSink {
PbNodeBody::Sink(SinkNode {
sink_desc: Some(self.sink_desc.to_proto()),
table: Some(table.to_internal_table_prost()),
- log_store_type: match self.base.ctx.session_ctx().config().get_sink_decouple() {
+ log_store_type: match self.base.ctx().session_ctx().config().get_sink_decouple() {
SinkDecouple::Default => {
let enable_sink_decouple =
match_sink_name_str!(
diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs
index b82d71068d817..41a56a0fd5df2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sort.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs
@@ -20,6 +20,8 @@ use risingwave_common::catalog::FieldDisplay;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -84,7 +86,7 @@ impl StreamEowcSort {
tbl_builder.add_order_column(self.sort_column_index, OrderType::ascending());
order_cols.insert(self.sort_column_index);
- let dist_key = self.base.dist.dist_column_indices().to_vec();
+ let dist_key = self.base.distribution().dist_column_indices().to_vec();
for idx in &dist_key {
if !order_cols.contains(idx) {
tbl_builder.add_order_column(*idx, OrderType::ascending());
diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs
index 377e2704776bb..ae66cf568118b 100644
--- a/src/frontend/src/optimizer/plan_node/stream_source.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_source.rs
@@ -37,7 +37,7 @@ pub struct StreamSource {
impl StreamSource {
pub fn new(core: generic::Source) -> Self {
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
Distribution::SomeShard,
core.catalog.as_ref().map_or(true, |s| s.append_only),
diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
index e832cf5530452..8c47d73568def 100644
--- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
@@ -20,6 +20,7 @@ use super::generic::{self, PlanAggCall};
use super::utils::impl_distill_by_unit;
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::property::RequiredDist;
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -49,7 +50,7 @@ impl StreamStatelessSimpleAgg {
}
}
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input_dist.clone(),
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
index 907a41db28525..965ca217a3369 100644
--- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
@@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ChainType, PbStreamNode};
+use super::generic::PhysicalPlanRef;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::{ExprRewriter, FunctionCall};
use crate::optimizer::plan_node::generic::GenericPlanRef;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -66,7 +68,7 @@ impl StreamTableScan {
None => Distribution::SomeShard,
}
};
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
core.table_desc.append_only,
@@ -192,7 +194,7 @@ impl_plan_tree_node_for_leaf! { StreamTableScan }
impl Distill for StreamTableScan {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(4);
vec.push(("table", Pretty::from(self.core.table_name.clone())));
vec.push(("columns", self.core.columns_pretty(verbose)));
@@ -200,12 +202,12 @@ impl Distill for StreamTableScan {
if verbose {
let pk = IndicesDisplay {
indices: self.stream_key().unwrap_or_default(),
- schema: &self.base.schema,
+ schema: self.base.schema(),
};
vec.push(("pk", pk.distill()));
let dist = Pretty::display(&DistributionDisplay {
distribution: self.distribution(),
- input_schema: &self.base.schema,
+ input_schema: self.base.schema(),
});
vec.push(("dist", dist));
}
@@ -325,7 +327,7 @@ impl StreamTableScan {
..Default::default()
})),
stream_key,
- operator_id: self.base.id.0 as u64,
+ operator_id: self.base.id().0 as u64,
identity: {
let s = self.distill_to_string();
s.replace("StreamTableScan", "Chain")
diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
index 2191ca322342d..675dbeb9ab381 100644
--- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
@@ -18,6 +18,8 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::TemporalJoinNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
@@ -61,7 +63,7 @@ impl StreamTemporalJoin {
.rewrite_bitset(core.left.watermark_columns()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
true,
@@ -88,7 +90,7 @@ impl StreamTemporalJoin {
impl Distill for StreamTemporalJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.core.join_type)));
@@ -101,7 +103,7 @@ impl Distill for StreamTemporalJoin {
}),
));
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs
index e7a880fa7d757..87890625f6be7 100644
--- a/src/frontend/src/optimizer/plan_node/stream_topn.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs
@@ -40,7 +40,7 @@ impl StreamTopN {
};
let watermark_columns = FixedBitSet::with_capacity(input.schema().len());
- let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns);
+ let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns);
StreamTopN { base, core }
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs
index 8f6353d6be44c..6d6dca2d8dd02 100644
--- a/src/frontend/src/optimizer/plan_node/stream_union.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_union.rs
@@ -19,6 +19,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::UnionNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanRef};
use crate::optimizer::plan_node::generic::GenericPlanNode;
@@ -46,7 +48,7 @@ impl StreamUnion {
|acc_watermark_columns, input| acc_watermark_columns.bitand(input.watermark_columns()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
inputs.iter().all(|x| x.append_only()),
@@ -60,7 +62,7 @@ impl StreamUnion {
impl Distill for StreamUnion {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record("StreamUnion", vec)
diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs
index fb0b844411f63..f8cc5db851159 100644
--- a/src/frontend/src/optimizer/plan_node/stream_values.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_values.rs
@@ -18,6 +18,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::values_node::ExprTuple;
use risingwave_pb::stream_plan::ValuesNode;
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode};
use crate::expr::{Expr, ExprImpl};
diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
index ed5a946603ee4..066bc9a234ca5 100644
--- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
@@ -21,6 +21,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::WatermarkDesc;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{ExprDisplay, ExprImpl};
@@ -85,7 +86,7 @@ impl Distill for StreamWatermarkFilter {
})
.collect();
let display_output_watermarks =
- watermark_pretty(&self.base.watermark_columns, input_schema).unwrap();
+ watermark_pretty(self.base.watermark_columns(), input_schema).unwrap();
let fields = vec![
("watermark_descs", Pretty::Array(display_watermark_descs)),
("output_watermarks", display_output_watermarks),
diff --git a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
index f30f3d9fa4966..7e53b903ac962 100644
--- a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
+++ b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
@@ -16,6 +16,7 @@ use std::collections::HashMap;
use itertools::Itertools;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNode, StreamShare};
use crate::optimizer::PlanRewriter;
use crate::PlanRef;
diff --git a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
index 9ab0d4d580ddc..5b9efb9fc7c94 100644
--- a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
+++ b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
@@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet};
use itertools::Itertools;
use crate::catalog::SourceId;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalShare, LogicalSource, PlanNodeId, PlanTreeNode, StreamShare,
};
diff --git a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
index 7d538392f9361..7950b5d81a49c 100644
--- a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
+++ b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use super::{DefaultBehavior, DefaultValue};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::PlanVisitor;
diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs
index 4fcaf959eac87..2df1d7ae00bc3 100644
--- a/src/frontend/src/optimizer/property/distribution.rs
+++ b/src/frontend/src/optimizer/property/distribution.rs
@@ -295,10 +295,12 @@ impl RequiredDist {
pub fn enforce_if_not_satisfies(
&self,
- plan: PlanRef,
+ mut plan: PlanRef,
required_order: &Order,
) -> Result {
- let plan = required_order.enforce_if_not_satisfies(plan)?;
+ if let Convention::Batch = plan.convention() {
+ plan = required_order.enforce_if_not_satisfies(plan)?;
+ }
if !plan.distribution().satisfies(self) {
Ok(self.enforce(plan, required_order))
} else {
@@ -329,7 +331,7 @@ impl RequiredDist {
}
}
- fn enforce(&self, plan: PlanRef, required_order: &Order) -> PlanRef {
+ pub fn enforce(&self, plan: PlanRef, required_order: &Order) -> PlanRef {
let dist = self.to_dist();
match plan.convention() {
Convention::Batch => BatchExchange::new(plan, required_order.clone(), dist).into(),
diff --git a/src/frontend/src/optimizer/property/order.rs b/src/frontend/src/optimizer/property/order.rs
index a70bffb13a8ba..19ad7586e1c11 100644
--- a/src/frontend/src/optimizer/property/order.rs
+++ b/src/frontend/src/optimizer/property/order.rs
@@ -92,7 +92,7 @@ impl Order {
}
}
- pub fn enforce(&self, plan: PlanRef) -> PlanRef {
+ fn enforce(&self, plan: PlanRef) -> PlanRef {
assert_eq!(plan.convention(), Convention::Batch);
BatchSort::new(plan, self.clone()).into()
}
diff --git a/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs b/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs
index 34025eca43032..3e22348e27b49 100644
--- a/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs
+++ b/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs
@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::aggregate::AggKind;
use super::super::plan_node::*;
@@ -48,11 +47,11 @@ impl Rule for AggGroupBySimplifyRule {
if !new_group_key.contains(i) {
let data_type = agg_input.schema().fields[i].data_type();
new_agg_calls.push(PlanAggCall {
- agg_kind: AggKind::FirstValue,
+ agg_kind: AggKind::InternalLastSeenValue,
return_type: data_type.clone(),
inputs: vec![InputRef::new(i, data_type)],
distinct: false,
- order_by: vec![ColumnOrder::new(i, OrderType::ascending())],
+ order_by: vec![],
filter: Condition::true_cond(),
direct_args: vec![],
});
diff --git a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
index 02165232372e4..eeba7d9f3be3b 100644
--- a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
+++ b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
@@ -15,6 +15,7 @@
use risingwave_common::types::ScalarImpl;
use super::Rule;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalValues};
use crate::PlanRef;
diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
index 66579248a76f9..7ac121692c81d 100644
--- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
+++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
@@ -23,6 +23,7 @@ use crate::expr::{
CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall,
InputRef,
};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary};
use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder};
use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter;
diff --git a/src/frontend/src/optimizer/rule/expand_to_project_rule.rs b/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
index 1ed1da0037aba..01a39042efd98 100644
--- a/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
+++ b/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
@@ -36,7 +36,7 @@ impl Rule for ExpandToProjectRule {
let column_subset = column_subsets.get(0).unwrap();
// if `column_subsets` len equals 1, convert it into a project
- let mut exprs = Vec::with_capacity(expand.base.schema.len());
+ let mut exprs = Vec::with_capacity(expand.base.schema().len());
// Add original input column first
for i in 0..input.schema().len() {
exprs.push(ExprImpl::InputRef(
diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs
index 9103d1bc906bc..323cc59ef3558 100644
--- a/src/frontend/src/optimizer/rule/index_selection_rule.rs
+++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs
@@ -66,6 +66,7 @@ use crate::expr::{
FunctionCall, InputRef,
};
use crate::optimizer::optimizer_context::OptimizerContextRef;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
generic, ColumnPruningContext, LogicalJoin, LogicalScan, LogicalUnion, PlanTreeNode,
PlanTreeNodeBinary, PredicatePushdown, PredicatePushdownContext,
diff --git a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
index dcbb6f7b015ee..bd2db0ac67cca 100644
--- a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
+++ b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
@@ -47,6 +47,7 @@ mod tests {
use super::*;
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::utils::Condition;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
index c496a906400ae..8682db8491a1d 100644
--- a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
+++ b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
@@ -46,6 +46,7 @@ mod tests {
use super::*;
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::utils::Condition;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
index ea8386bc227f8..c32ae40531cd0 100644
--- a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
+++ b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
@@ -27,7 +27,7 @@ use risingwave_expr::aggregate::AggKind;
use super::{BoxedRule, Rule};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
-use crate::optimizer::plan_node::generic::Agg;
+use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
use crate::optimizer::plan_node::{
LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
};
diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
index dfb6963c7fb4f..93637d3ba8193 100644
--- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
+++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
@@ -18,6 +18,7 @@ use risingwave_expr::window_function::WindowFuncKind;
use super::Rule;
use crate::expr::{collect_input_refs, ExprImpl, ExprType};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalTopN, PlanTreeNodeUnary};
use crate::optimizer::property::Order;
use crate::planner::LIMIT_ALL_COUNT;
diff --git a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
index dc5f9c2bc9aba..f34146ba80050 100644
--- a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
+++ b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
@@ -18,6 +18,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use super::super::plan_node::*;
use super::{BoxedRule, Rule};
use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_visitor::{PlanCorrelatedIdFinder, PlanVisitor};
use crate::optimizer::PlanRef;
use crate::utils::Condition;
diff --git a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
index 5a6f1187fdd02..f85ffc2318459 100644
--- a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
+++ b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
@@ -18,6 +18,7 @@ use risingwave_common::types::DataType;
use super::{BoxedRule, Rule};
use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary,
};
@@ -51,7 +52,7 @@ impl Rule for TableFunctionToProjectSetRule {
let logical_values = LogicalValues::create(
vec![vec![]],
Schema::new(vec![]),
- logical_table_function.base.ctx.clone(),
+ logical_table_function.base.ctx().clone(),
);
let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]);
// We need a project to align schema type because
diff --git a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
index 9759739490fe6..a13bef3baa9d9 100644
--- a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
+++ b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalValues, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::{LogicalCardinalityExt, SideEffectVisitor};
use crate::optimizer::{PlanRef, PlanVisitor};
diff --git a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
index 8119b8847b600..7b83c017ab781 100644
--- a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
+++ b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::{PlanRef, PlanTreeNode};
diff --git a/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs b/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
index 2a12f6b712e0d..f1d203fba1350 100644
--- a/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
+++ b/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
@@ -13,7 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
-use crate::optimizer::plan_node::generic::Agg;
+use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
use crate::optimizer::plan_node::{LogicalUnion, PlanTreeNode};
use crate::optimizer::PlanRef;
@@ -24,7 +24,7 @@ impl Rule for UnionToDistinctRule {
let union: &LogicalUnion = plan.as_logical_union()?;
if !union.all() {
let union_all = LogicalUnion::create(true, union.inputs().into_iter().collect());
- let distinct = Agg::new(vec![], (0..union.base.schema.len()).collect(), union_all)
+ let distinct = Agg::new(vec![], (0..union.base.schema().len()).collect(), union_all)
.with_enable_two_phase(false);
Some(distinct.into())
} else {
diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs
index 4e16bc6cd0b21..cb20103b3e76f 100644
--- a/src/frontend/src/scheduler/plan_fragmenter.rs
+++ b/src/frontend/src/scheduler/plan_fragmenter.rs
@@ -103,7 +103,7 @@ impl Serialize for ExecutionPlanNode {
impl From for ExecutionPlanNode {
fn from(plan_node: PlanRef) -> Self {
Self {
- plan_node_id: plan_node.plan_base().id,
+ plan_node_id: plan_node.plan_base().id(),
plan_node_type: plan_node.node_type(),
node: plan_node.to_batch_prost_body(),
children: vec![],
diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs
index ed6ad289a5a68..d39dde51399d8 100644
--- a/src/meta/src/barrier/mod.rs
+++ b/src/meta/src/barrier/mod.rs
@@ -626,7 +626,7 @@ impl GlobalBarrierManager {
let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);
- self.recovery(prev_epoch, paused_reason, true)
+ self.recovery(prev_epoch, paused_reason)
.instrument(span)
.await
};
@@ -981,10 +981,7 @@ impl GlobalBarrierManager {
// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
- *state = self
- .recovery(prev_epoch, None, false)
- .instrument(span)
- .await;
+ *state = self.recovery(prev_epoch, None).instrument(span).await;
self.set_status(BarrierManagerStatus::Running).await;
} else {
panic!("failed to execute barrier: {:?}", err);
diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs
index 21197a8df98d4..3e319f0e69a52 100644
--- a/src/meta/src/barrier/recovery.rs
+++ b/src/meta/src/barrier/recovery.rs
@@ -219,7 +219,6 @@ impl GlobalBarrierManager {
&self,
prev_epoch: TracedEpoch,
paused_reason: Option,
- bootstrap_recovery: bool,
) -> BarrierManagerState {
// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
@@ -227,11 +226,9 @@ impl GlobalBarrierManager {
.await;
tracing::info!("recovery start!");
- if bootstrap_recovery {
- self.clean_dirty_tables()
- .await
- .expect("clean dirty tables should not fail");
- }
+ self.clean_dirty_tables()
+ .await
+ .expect("clean dirty tables should not fail");
self.clean_dirty_fragments()
.await
.expect("clean dirty fragments");
diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs
index bcac32922d180..f988646428aac 100644
--- a/src/meta/src/manager/catalog/mod.rs
+++ b/src/meta/src/manager/catalog/mod.rs
@@ -853,14 +853,18 @@ impl CatalogManager {
database_core.clear_creating_stream_jobs();
let user_core = &mut core.user;
for table in &tables_to_clean {
- // Recovered when init database manager.
- for relation_id in &table.dependent_relations {
- database_core.decrease_ref_count(*relation_id);
+ // If table type is internal, no need to update the ref count OR
+ // user ref count.
+ if table.table_type != TableType::Internal as i32 {
+ // Recovered when init database manager.
+ for relation_id in &table.dependent_relations {
+ database_core.decrease_ref_count(*relation_id);
+ }
+ // Recovered when init user manager.
+ tracing::debug!("decrease ref for {}", table.id);
+ user_core.decrease_ref(table.owner);
}
- // Recovered when init user manager.
- user_core.decrease_ref(table.owner);
}
-
Ok(())
}
@@ -919,10 +923,11 @@ impl CatalogManager {
let database_core = &mut core.database;
let tables = &mut database_core.tables;
let Some(table) = tables.get(&table_id).cloned() else {
- bail!(
- "table_id {} missing when attempting to cancel job",
+ tracing::warn!(
+ "table_id {} missing when attempting to cancel job, could be cleaned on recovery",
table_id
- )
+ );
+ return Ok(());
};
table
};
@@ -938,7 +943,8 @@ impl CatalogManager {
let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table_id in table_ids {
- tables.remove(table_id);
+ let res = tables.remove(table_id);
+ assert!(res.is_some());
}
commit_meta!(self, tables)?;
}
@@ -2032,8 +2038,7 @@ impl CatalogManager {
let user_core = &mut core.user;
let key = (index.database_id, index.schema_id, index.name.clone());
assert!(
- !database_core.indexes.contains_key(&index.id)
- && database_core.has_in_progress_creation(&key),
+ !database_core.indexes.contains_key(&index.id),
"index must be in creating procedure"
);
@@ -2188,8 +2193,7 @@ impl CatalogManager {
let user_core = &mut core.user;
let key = (sink.database_id, sink.schema_id, sink.name.clone());
assert!(
- !database_core.sinks.contains_key(&sink.id)
- && database_core.has_in_progress_creation(&key),
+ !database_core.sinks.contains_key(&sink.id),
"sink must be in creating procedure"
);
diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs
index 76de970a919a9..58fb2d50c6287 100644
--- a/src/sqlparser/src/ast/statement.rs
+++ b/src/sqlparser/src/ast/statement.rs
@@ -294,6 +294,7 @@ pub enum Encode {
Json, // Keyword::JSON
Bytes, // Keyword::BYTES
Native,
+ Template,
}
// TODO: unify with `from_keyword`
@@ -309,6 +310,7 @@ impl fmt::Display for Encode {
Encode::Json => "JSON",
Encode::Bytes => "BYTES",
Encode::Native => "NATIVE",
+ Encode::Template => "TEMPLATE",
}
)
}
@@ -322,13 +324,12 @@ impl Encode {
"CSV" => Encode::Csv,
"PROTOBUF" => Encode::Protobuf,
"JSON" => Encode::Json,
+ "TEMPLATE" => Encode::Template,
"NATIVE" => Encode::Native, // used internally for schema change
- _ => {
- return Err(ParserError::ParserError(
- "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE after Encode"
- .to_string(),
- ))
- }
+ _ => return Err(ParserError::ParserError(
+ "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE after Encode"
+ .to_string(),
+ )),
})
}
}
diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml
index f1022ab2fd935..c6fc5531acd33 100644
--- a/src/storage/Cargo.toml
+++ b/src/storage/Cargo.toml
@@ -25,7 +25,7 @@ dyn-clone = "1.0.14"
either = "1"
enum-as-inner = "0.6"
fail = "0.5"
-foyer = { git = "https://github.com/mrcroxx/foyer", rev = "438eec8" }
+foyer = { git = "https://github.com/mrcroxx/foyer", rev = "5d0134b" }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hex = "0.4"
diff --git a/src/storage/src/hummock/file_cache/store.rs b/src/storage/src/hummock/file_cache/store.rs
index 9de54552ae077..222700c8376b2 100644
--- a/src/storage/src/hummock/file_cache/store.rs
+++ b/src/storage/src/hummock/file_cache/store.rs
@@ -256,6 +256,7 @@ where
io_size: config.device_io_size,
},
allocator_bits: config.allocator_bits,
+ catalog_bits: 6,
admissions,
reinsertions: config.reinsertions,
buffer_pool_size: config.buffer_pool_size,
diff --git a/src/storage/src/row_serde/value_serde.rs b/src/storage/src/row_serde/value_serde.rs
index 5d56cdba2d96d..9048b90c23a53 100644
--- a/src/storage/src/row_serde/value_serde.rs
+++ b/src/storage/src/row_serde/value_serde.rs
@@ -114,9 +114,10 @@ impl ValueRowSerdeNew for ColumnAwareSerde {
// It's okay since we previously banned impure expressions in default columns.
build_from_prost(&expr.expect("expr should not be none"))
.expect("build_from_prost error")
- .eval_row_infallible(&OwnedRow::empty())
+ .eval_row(&OwnedRow::empty())
.now_or_never()
.expect("constant expression should not be async")
+ .expect("eval_row failed")
};
Some((i, value))
} else {
diff --git a/src/stream/clippy.toml b/src/stream/clippy.toml
index a6969d5bd607b..b7257c4acb98c 100644
--- a/src/stream/clippy.toml
+++ b/src/stream/clippy.toml
@@ -3,8 +3,8 @@ disallowed-methods = [
{ path = "risingwave_expr::expr::build_from_prost", reason = "Expressions in streaming must be in non-strict mode. Please use `build_non_strict_from_prost` instead." },
{ path = "risingwave_expr::expr::build_func", reason = "Expressions in streaming must be in non-strict mode. Please use `build_func_non_strict` instead." },
- { path = "risingwave_expr::expr::Expression::eval", reason = "Please use `Expression::eval_infallible` instead." },
- { path = "risingwave_expr::expr::Expression::eval_row", reason = "Please use `Expression::eval_row_infallible` instead." },
+ { path = "risingwave_expr::expr::Expression::eval", reason = "Please use `NonStrictExpression::eval_infallible` instead." },
+ { path = "risingwave_expr::expr::Expression::eval_row", reason = "Please use `NonStrictExpression::eval_row_infallible` instead." },
{ path = "risingwave_common::error::internal_err", reason = "Please use per-crate error type instead." },
{ path = "risingwave_common::error::internal_error", reason = "Please use per-crate error type instead." },
diff --git a/src/stream/src/executor/aggregation/mod.rs b/src/stream/src/executor/aggregation/mod.rs
index dd0ce9d01c544..9bb1113152962 100644
--- a/src/stream/src/executor/aggregation/mod.rs
+++ b/src/stream/src/executor/aggregation/mod.rs
@@ -21,6 +21,7 @@ use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_expr::aggregate::{AggCall, AggKind};
+use risingwave_expr::expr::{LogReport, NonStrictExpression};
use risingwave_storage::StateStore;
use crate::common::table::state_table::StateTable;
@@ -74,7 +75,12 @@ pub async fn agg_call_filter_res(
}
if let Some(ref filter) = agg_call.filter {
- if let Bool(filter_res) = filter.eval_infallible(chunk).await.as_ref() {
+ // TODO: should we build `filter` in non-strict mode?
+ if let Bool(filter_res) = NonStrictExpression::new_topmost(&**filter, LogReport)
+ .eval_infallible(chunk)
+ .await
+ .as_ref()
+ {
vis &= filter_res.to_bitmap();
} else {
bail!("Filter can only receive bool array");
diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs
index e8eb4da545f2e..ccb55b75c24fc 100644
--- a/src/stream/src/executor/dynamic_filter.rs
+++ b/src/stream/src/executor/dynamic_filter.rs
@@ -26,7 +26,7 @@ use risingwave_common::row::{self, once, OwnedRow, OwnedRow as RowData, Row};
use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl, ToDatumRef, ToOwnedDatum};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::{
- build_func_non_strict, BoxedExpression, InputRefExpression, LiteralExpression,
+ build_func_non_strict, InputRefExpression, LiteralExpression, NonStrictExpression,
};
use risingwave_pb::expr::expr_node::Type as ExprNodeType;
use risingwave_pb::expr::expr_node::Type::{
@@ -97,7 +97,7 @@ impl DynamicFilterExecutor,
+ condition: Option,
) -> Result<(Vec, Bitmap), StreamExecutorError> {
let mut new_ops = Vec::with_capacity(chunk.capacity());
let mut new_visibility = BitmapBuilder::with_capacity(chunk.capacity());
@@ -265,7 +265,7 @@ impl DynamicFilterExecutor,
- expr: BoxedExpression,
+ expr: NonStrictExpression,
executor_id: u64,
) -> Self {
let input_info = input.info();
@@ -190,8 +190,8 @@ mod tests {
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
- use risingwave_expr::expr::build_from_pretty;
+ use super::super::test_utils::expr::build_from_pretty;
use super::super::test_utils::MockSource;
use super::super::*;
use super::*;
diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs
index 7aed840679c82..4178012cb9d9e 100644
--- a/src/stream/src/executor/hash_join.rs
+++ b/src/stream/src/executor/hash_join.rs
@@ -28,7 +28,7 @@ use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, DefaultOrd, ToOwnedDatum};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqDebug;
-use risingwave_expr::expr::BoxedExpression;
+use risingwave_expr::expr::NonStrictExpression;
use risingwave_expr::ExprError;
use risingwave_storage::StateStore;
use tokio::time::Instant;
@@ -242,9 +242,9 @@ pub struct HashJoinExecutor,
/// Optional non-equi join conditions
- cond: Option,
+ cond: Option,
/// Column indices of watermark output and offset expression of each inequality, respectively.
- inequality_pairs: Vec<(Vec, Option)>,
+ inequality_pairs: Vec<(Vec, Option)>,
/// The output watermark of each inequality condition and its value is the minimum of the
/// calculation result of both side. It will be used to generate watermark into downstream
/// and do state cleaning if `clean_state` field of that inequality is `true`.
@@ -313,7 +313,7 @@ struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
side_l: &'a mut JoinSide,
side_r: &'a mut JoinSide,
actual_output_data_types: &'a [DataType],
- cond: &'a mut Option,
+ cond: &'a mut Option,
inequality_watermarks: &'a [Option],
chunk: StreamChunk,
append_only_optimize: bool,
@@ -448,8 +448,8 @@ impl HashJoinExecutor,
executor_id: u64,
- cond: Option,
- inequality_pairs: Vec<(usize, usize, bool, Option)>,
+ cond: Option,
+ inequality_pairs: Vec<(usize, usize, bool, Option)>,
op_info: String,
state_table_l: StateTable,
degree_state_table_l: StateTable,
@@ -912,7 +912,7 @@ impl HashJoinExecutor input_watermark.val = value.unwrap(),
@@ -1275,11 +1275,11 @@ mod tests {
use risingwave_common::hash::{Key128, Key64};
use risingwave_common::types::ScalarImpl;
use risingwave_common::util::sort_util::OrderType;
- use risingwave_expr::expr::build_from_pretty;
use risingwave_storage::memory::MemoryStateStore;
use super::*;
use crate::common::table::state_table::StateTable;
+ use crate::executor::test_utils::expr::build_from_pretty;
use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
use crate::executor::{ActorContext, Barrier, EpochPair};
@@ -1327,7 +1327,7 @@ mod tests {
(state_table, degree_state_table)
}
- fn create_cond(condition_text: Option) -> BoxedExpression {
+ fn create_cond(condition_text: Option) -> NonStrictExpression {
build_from_pretty(
condition_text
.as_deref()
@@ -1339,7 +1339,7 @@ mod tests {
with_condition: bool,
null_safe: bool,
condition_text: Option,
- inequality_pairs: Vec<(usize, usize, bool, Option)>,
+ inequality_pairs: Vec<(usize, usize, bool, Option)>,
) -> (MessageSender, MessageSender, BoxedMessageStream) {
let schema = Schema {
fields: vec![
diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs
index c6fffcd94896d..42d13d790da88 100644
--- a/src/stream/src/executor/hop_window.rs
+++ b/src/stream/src/executor/hop_window.rs
@@ -19,7 +19,7 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::types::Interval;
-use risingwave_expr::expr::BoxedExpression;
+use risingwave_expr::expr::NonStrictExpression;
use risingwave_expr::ExprError;
use super::error::StreamExecutorError;
@@ -33,8 +33,8 @@ pub struct HopWindowExecutor {
pub time_col_idx: usize,
pub window_slide: Interval,
pub window_size: Interval,
- window_start_exprs: Vec,
- window_end_exprs: Vec,
+ window_start_exprs: Vec,
+ window_end_exprs: Vec,
pub output_indices: Vec,
chunk_size: usize,
}
@@ -48,8 +48,8 @@ impl HopWindowExecutor {
time_col_idx: usize,
window_slide: Interval,
window_size: Interval,
- window_start_exprs: Vec,
- window_end_exprs: Vec,
+ window_start_exprs: Vec,
+ window_end_exprs: Vec,
output_indices: Vec,
chunk_size: usize,
) -> Self {
@@ -251,6 +251,7 @@ mod tests {
use risingwave_common::types::test_utils::IntervalTestExt;
use risingwave_common::types::{DataType, Interval};
use risingwave_expr::expr::test_utils::make_hop_window_expression;
+ use risingwave_expr::expr::NonStrictExpression;
use crate::executor::test_utils::MockSource;
use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk};
@@ -302,8 +303,14 @@ mod tests {
2,
window_slide,
window_size,
- window_start_exprs,
- window_end_exprs,
+ window_start_exprs
+ .into_iter()
+ .map(NonStrictExpression::for_test)
+ .collect(),
+ window_end_exprs
+ .into_iter()
+ .map(NonStrictExpression::for_test)
+ .collect(),
output_indices,
CHUNK_SIZE,
)
diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs
index a9c219a25641f..cd505093294f1 100644
--- a/src/stream/src/executor/integration_tests.rs
+++ b/src/stream/src/executor/integration_tests.rs
@@ -152,7 +152,7 @@ async fn test_merger_sum_aggr() {
vec![],
vec![
// TODO: use the new streaming_if_null expression here, and add `None` tests
- Box::new(InputRefExpression::new(DataType::Int64, 1)),
+ NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
],
3,
MultiMap::new(),
diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs
index 99b090e21a240..c28d6ec8564d9 100644
--- a/src/stream/src/executor/mod.rs
+++ b/src/stream/src/executor/mod.rs
@@ -31,7 +31,7 @@ use risingwave_common::util::epoch::{Epoch, EpochPair};
use risingwave_common::util::tracing::TracingContext;
use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
use risingwave_connector::source::SplitImpl;
-use risingwave_expr::expr::BoxedExpression;
+use risingwave_expr::expr::{Expression, NonStrictExpression};
use risingwave_pb::data::PbEpoch;
use risingwave_pb::expr::PbInputRef;
use risingwave_pb::stream_plan::barrier::{BarrierKind, PbMutation};
@@ -641,7 +641,7 @@ impl Watermark {
pub async fn transform_with_expr(
self,
- expr: &BoxedExpression,
+ expr: &NonStrictExpression,
new_col_idx: usize,
) -> Option {
let Self { col_idx, val, .. } = self;
@@ -651,7 +651,7 @@ impl Watermark {
OwnedRow::new(row)
};
let val = expr.eval_row_infallible(&row).await?;
- Some(Self::new(new_col_idx, expr.return_type(), val))
+ Some(Self::new(new_col_idx, expr.inner().return_type(), val))
}
/// Transform the watermark with the given output indices. If this watermark is not in the
diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs
index 56a31bde901b9..8cfebfecd3f33 100644
--- a/src/stream/src/executor/project.rs
+++ b/src/stream/src/executor/project.rs
@@ -21,7 +21,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::iter_util::ZipEqFast;
-use risingwave_expr::expr::BoxedExpression;
+use risingwave_expr::expr::NonStrictExpression;
use super::*;
@@ -38,7 +38,7 @@ struct Inner {
info: ExecutorInfo,
/// Expressions of the current projection.
- exprs: Vec,
+ exprs: Vec,
/// All the watermark derivations, (input_column_index, output_column_index). And the
/// derivation expression is the project's expression itself.
watermark_derivations: MultiMap,
@@ -58,7 +58,7 @@ impl ProjectExecutor {
ctx: ActorContextRef,
input: Box,
pk_indices: PkIndices,
- exprs: Vec,
+ exprs: Vec,
executor_id: u64,
watermark_derivations: MultiMap,
nondecreasing_expr_indices: Vec,
@@ -233,11 +233,12 @@ mod tests {
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, Datum};
- use risingwave_expr::expr::{self, build_from_pretty, Expression, ValueImpl};
+ use risingwave_expr::expr::{self, Expression, ValueImpl};
use super::super::test_utils::MockSource;
use super::super::*;
use super::*;
+ use crate::executor::test_utils::expr::build_from_pretty;
use crate::executor::test_utils::StreamExecutorTestExt;
#[tokio::test]
@@ -345,7 +346,7 @@ mod tests {
let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
- let c_expr = DummyNondecreasingExpr.boxed();
+ let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);
let project = Box::new(ProjectExecutor::new(
ActorContext::create(123),
diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs
index 6867e3d55bfde..ff3214db88eaa 100644
--- a/src/stream/src/executor/project_set.rs
+++ b/src/stream/src/executor/project_set.rs
@@ -24,6 +24,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::{DataType, Datum, DatumRef, ToOwnedDatum};
use risingwave_common::util::iter_util::ZipEqFast;
+use risingwave_expr::expr::{LogReport, NonStrictExpression};
use risingwave_expr::table_function::ProjectSetSelectItem;
use super::error::StreamExecutorError;
@@ -260,7 +261,11 @@ impl Inner {
ProjectSetSelectItem::Expr(expr) => {
watermark
.clone()
- .transform_with_expr(expr, expr_idx + PROJ_ROW_ID_OFFSET)
+ .transform_with_expr(
+ // TODO: should we build `expr` in non-strict mode?
+ &NonStrictExpression::new_topmost(expr, LogReport),
+ expr_idx + PROJ_ROW_ID_OFFSET,
+ )
.await
}
ProjectSetSelectItem::TableFunction(_) => {
diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs
index 3c8cde63c4ca9..82c1e56649672 100644
--- a/src/stream/src/executor/temporal_join.rs
+++ b/src/stream/src/executor/temporal_join.rs
@@ -32,7 +32,7 @@ use risingwave_common::hash::{HashKey, NullBitmap};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
-use risingwave_expr::expr::BoxedExpression;
+use risingwave_expr::expr::NonStrictExpression;
use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
@@ -57,7 +57,7 @@ pub struct TemporalJoinExecutor,
right_join_keys: Vec,
null_safe: Vec,
- condition: Option,
+ condition: Option,
output_indices: Vec,
pk_indices: PkIndices,
schema: Schema,
@@ -338,7 +338,7 @@ impl TemporalJoinExecutor
left_join_keys: Vec,
right_join_keys: Vec,
null_safe: Vec,
- condition: Option,
+ condition: Option,
pk_indices: PkIndices,
output_indices: Vec,
table_output_indices: Vec,
diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs
index 5bcb0fe547976..2ae461e62351c 100644
--- a/src/stream/src/executor/test_utils.rs
+++ b/src/stream/src/executor/test_utils.rs
@@ -34,11 +34,11 @@ pub mod prelude {
pub use risingwave_common::test_prelude::StreamChunkTestExt;
pub use risingwave_common::types::DataType;
pub use risingwave_common::util::sort_util::OrderType;
- pub use risingwave_expr::expr::build_from_pretty;
pub use risingwave_storage::memory::MemoryStateStore;
pub use risingwave_storage::StateStore;
pub use crate::common::table::state_table::StateTable;
+ pub use crate::executor::test_utils::expr::build_from_pretty;
pub use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
pub use crate::executor::{ActorContext, BoxedMessageStream, Executor, PkIndices};
}
@@ -263,6 +263,14 @@ pub trait StreamExecutorTestExt: MessageStream + Unpin {
// FIXME: implement on any `impl MessageStream` if the analyzer works well.
impl StreamExecutorTestExt for BoxedMessageStream {}
+pub mod expr {
+ use risingwave_expr::expr::NonStrictExpression;
+
+ pub fn build_from_pretty(s: impl AsRef) -> NonStrictExpression {
+ NonStrictExpression::for_test(risingwave_expr::expr::build_from_pretty(s))
+ }
+}
+
pub mod agg_executor {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs
index 624b2531bf7bd..8c09b56aa3551 100644
--- a/src/stream/src/executor/values.rs
+++ b/src/stream/src/executor/values.rs
@@ -21,7 +21,7 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::ensure;
use risingwave_common::util::iter_util::ZipEqFast;
-use risingwave_expr::expr::BoxedExpression;
+use risingwave_expr::expr::NonStrictExpression;
use tokio::sync::mpsc::UnboundedReceiver;
use super::{
@@ -40,7 +40,7 @@ pub struct ValuesExecutor {
barrier_receiver: UnboundedReceiver,
progress: CreateMviewProgress,
- rows: vec::IntoIter>,
+ rows: vec::IntoIter>,
pk_indices: PkIndices,
identity: String,
schema: Schema,
@@ -51,7 +51,7 @@ impl ValuesExecutor {
pub fn new(
ctx: ActorContextRef,
progress: CreateMviewProgress,
- rows: Vec>,
+ rows: Vec>,
schema: Schema,
barrier_receiver: UnboundedReceiver,
executor_id: u64,
@@ -167,7 +167,7 @@ mod tests {
};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, ScalarImpl, StructType};
- use risingwave_expr::expr::{BoxedExpression, LiteralExpression};
+ use risingwave_expr::expr::{BoxedExpression, LiteralExpression, NonStrictExpression};
use tokio::sync::mpsc::unbounded_channel;
use super::ValuesExecutor;
@@ -202,11 +202,11 @@ mod tests {
vec![],
),
Some(ScalarImpl::Struct(value)),
- )) as BoxedExpression,
+ )),
Box::new(LiteralExpression::new(
DataType::Int64,
Some(ScalarImpl::Int64(0)),
- )) as BoxedExpression,
+ )),
];
let fields = exprs
.iter() // for each column
@@ -215,7 +215,10 @@ mod tests {
let values_executor_struct = ValuesExecutor::new(
ActorContext::create(actor_id),
progress,
- vec![exprs],
+ vec![exprs
+ .into_iter()
+ .map(NonStrictExpression::for_test)
+ .collect()],
Schema { fields },
barrier_receiver,
10005,
diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs
index ad332112ef269..5e5454cecff93 100644
--- a/src/stream/src/executor/watermark_filter.rs
+++ b/src/stream/src/executor/watermark_filter.rs
@@ -23,7 +23,8 @@ use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, DefaultOrd, ScalarImpl};
use risingwave_common::{bail, row};
use risingwave_expr::expr::{
- build_func_non_strict, BoxedExpression, Expression, InputRefExpression, LiteralExpression,
+ build_func_non_strict, ExpressionBoxExt, InputRefExpression, LiteralExpression,
+ NonStrictExpression,
};
use risingwave_expr::Result as ExprResult;
use risingwave_pb::expr::expr_node::Type;
@@ -44,7 +45,7 @@ use crate::task::ActorEvalErrorReport;
pub struct WatermarkFilterExecutor {
input: BoxedExecutor,
/// The expression used to calculate the watermark value.
- watermark_expr: BoxedExpression,
+ watermark_expr: NonStrictExpression,
/// The column we should generate watermark and filter on.
event_time_col_idx: usize,
ctx: ActorContextRef,
@@ -55,7 +56,7 @@ pub struct WatermarkFilterExecutor {
impl WatermarkFilterExecutor {
pub fn new(
input: BoxedExecutor,
- watermark_expr: BoxedExpression,
+ watermark_expr: NonStrictExpression,
event_time_col_idx: usize,
ctx: ActorContextRef,
table: StateTable,
@@ -298,7 +299,7 @@ impl WatermarkFilterExecutor {
event_time_col_idx: usize,
watermark: ScalarImpl,
eval_error_report: ActorEvalErrorReport,
- ) -> ExprResult {
+ ) -> ExprResult {
build_func_non_strict(
Type::GreaterThanOrEqual,
DataType::Boolean,
@@ -350,11 +351,11 @@ mod tests {
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::Date;
use risingwave_common::util::sort_util::OrderType;
- use risingwave_expr::expr::build_from_pretty;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::Distribution;
use super::*;
+ use crate::executor::test_utils::expr::build_from_pretty;
use crate::executor::test_utils::{MessageSender, MockSource};
use crate::executor::ActorContext;
diff --git a/src/stream/src/from_proto/hash_join.rs b/src/stream/src/from_proto/hash_join.rs
index 44799af9405c2..87174282e517a 100644
--- a/src/stream/src/from_proto/hash_join.rs
+++ b/src/stream/src/from_proto/hash_join.rs
@@ -18,7 +18,7 @@ use std::sync::Arc;
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::types::DataType;
use risingwave_expr::expr::{
- build_func_non_strict, build_non_strict_from_prost, BoxedExpression, InputRefExpression,
+ build_func_non_strict, build_non_strict_from_prost, InputRefExpression, NonStrictExpression,
};
pub use risingwave_pb::expr::expr_node::Type as ExprType;
use risingwave_pb::plan_common::JoinType as JoinTypeProto;
@@ -109,7 +109,8 @@ impl ExecutorBuilder for HashJoinExecutorBuilder {
build_non_strict_from_prost(
delta_expression.delta.as_ref().unwrap(),
params.eval_error_report.clone(),
- )?,
+ )?
+ .into_inner(),
],
params.eval_error_report.clone(),
)?)
@@ -175,8 +176,8 @@ struct HashJoinExecutorDispatcherArgs {
pk_indices: PkIndices,
output_indices: Vec,
executor_id: u64,
- cond: Option,
- inequality_pairs: Vec<(usize, usize, bool, Option)>,
+ cond: Option,
+ inequality_pairs: Vec<(usize, usize, bool, Option)>,
op_info: String,
state_table_l: StateTable,
degree_state_table_l: StateTable,
diff --git a/src/stream/src/from_proto/temporal_join.rs b/src/stream/src/from_proto/temporal_join.rs
index 8b7b3b6af1335..58699089e8c27 100644
--- a/src/stream/src/from_proto/temporal_join.rs
+++ b/src/stream/src/from_proto/temporal_join.rs
@@ -18,7 +18,7 @@ use risingwave_common::catalog::{ColumnDesc, TableId, TableOption};
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
-use risingwave_expr::expr::{build_non_strict_from_prost, BoxedExpression};
+use risingwave_expr::expr::{build_non_strict_from_prost, NonStrictExpression};
use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc};
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::Distribution;
@@ -190,7 +190,7 @@ struct TemporalJoinExecutorDispatcherArgs {
left_join_keys: Vec,
right_join_keys: Vec,
null_safe: Vec,
- condition: Option,
+ condition: Option,
pk_indices: PkIndices,
output_indices: Vec,
table_output_indices: Vec,
diff --git a/src/stream/tests/integration_tests/hop_window.rs b/src/stream/tests/integration_tests/hop_window.rs
index 167857cc7d9fc..9d6d879240fc0 100644
--- a/src/stream/tests/integration_tests/hop_window.rs
+++ b/src/stream/tests/integration_tests/hop_window.rs
@@ -15,6 +15,7 @@
use risingwave_common::types::test_utils::IntervalTestExt;
use risingwave_common::types::{Interval, Timestamp};
use risingwave_expr::expr::test_utils::make_hop_window_expression;
+use risingwave_expr::expr::NonStrictExpression;
use risingwave_stream::executor::{ExecutorInfo, HopWindowExecutor};
use crate::prelude::*;
@@ -55,8 +56,14 @@ fn create_executor(output_indices: Vec) -> (MessageSender, BoxedMessageSt
TIME_COL_IDX,
window_slide,
window_size,
- window_start_exprs,
- window_end_exprs,
+ window_start_exprs
+ .into_iter()
+ .map(NonStrictExpression::for_test)
+ .collect(),
+ window_end_exprs
+ .into_iter()
+ .map(NonStrictExpression::for_test)
+ .collect(),
output_indices,
CHUNK_SIZE,
)
diff --git a/src/stream/tests/integration_tests/project_set.rs b/src/stream/tests/integration_tests/project_set.rs
index bf1354c25b83b..61a879256108d 100644
--- a/src/stream/tests/integration_tests/project_set.rs
+++ b/src/stream/tests/integration_tests/project_set.rs
@@ -29,10 +29,10 @@ fn create_executor() -> (MessageSender, BoxedMessageStream) {
};
let (tx, source) = MockSource::channel(schema, PkIndices::new());
- let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
- let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)");
- let tf1 = repeat(build_from_pretty("1:int4"), 1);
- let tf2 = repeat(build_from_pretty("2:int4"), 2);
+ let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)").into_inner();
+ let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)").into_inner();
+ let tf1 = repeat(build_from_pretty("1:int4").into_inner(), 1);
+ let tf2 = repeat(build_from_pretty("2:int4").into_inner(), 2);
let project_set = Box::new(ProjectSetExecutor::new(
ActorContext::create(123),
diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs
index 1fd5c90e59e4b..89df82d4c21a0 100644
--- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs
+++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs
@@ -61,6 +61,9 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
.run("create materialized view m1 as select * from t1;")
.await?;
+ // If the CN is killed before first barrier pass for the MV, the MV will be dropped.
+ // This is because it's table fragments will NOT be committed until first barrier pass.
+ sleep(Duration::from_secs(5)).await;
kill_cn_and_wait_recover(&cluster).await;
// Send some upstream updates.
diff --git a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs
index c05e52c927424..776692b2fab90 100644
--- a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs
+++ b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs
@@ -25,7 +25,7 @@ const ROOT_TABLE_CREATE: &str = "create table t1 (v1 int);";
const MV1: &str = "create materialized view m1 as select * from t1 where v1 > 5;";
const MV2: &str = "create materialized view m2 as select * from t1 where v1 > 10;";
const MV3: &str = "create materialized view m3 as select * from m2 where v1 < 15;";
-const MV4: &str = "create materialized view m4 as select m1.v1 as m1v, m3.v1 as m3v from m1 join m3 on m1.v1 = m3.v1;";
+const MV4: &str = "create materialized view m4 as select m1.v1 as m1v, m3.v1 as m3v from m1 join m3 on m1.v1 = m3.v1 limit 100;";
const MV5: &str = "create materialized view m5 as select * from m4;";
#[tokio::test]
@@ -40,6 +40,7 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
.locate_one_fragment([
identity_contains("materialize"),
no_identity_contains("chain"),
+ no_identity_contains("topn"),
no_identity_contains("hashjoin"),
])
.await?;
@@ -129,6 +130,7 @@ async fn test_diamond_cascade_materialized_view() -> Result<()> {
.locate_one_fragment([
identity_contains("materialize"),
no_identity_contains("chain"),
+ no_identity_contains("topn"),
no_identity_contains("hashjoin"),
])
.await?;
diff --git a/src/tests/simulation/tests/integration_tests/scale/plan.rs b/src/tests/simulation/tests/integration_tests/scale/plan.rs
index c7244dc826b42..8b62a58998a3f 100644
--- a/src/tests/simulation/tests/integration_tests/scale/plan.rs
+++ b/src/tests/simulation/tests/integration_tests/scale/plan.rs
@@ -39,10 +39,7 @@ async fn test_resize_normal() -> Result<()> {
.await?;
let join_fragment = cluster
- .locate_one_fragment([
- identity_contains("hashJoin"),
- identity_contains("materialize"),
- ])
+ .locate_one_fragment([identity_contains("hashJoin")])
.await?;
let join_fragment_id = join_fragment.inner.fragment_id;
@@ -270,7 +267,7 @@ async fn test_resize_no_shuffle() -> Result<()> {
session
.run(
"create materialized view mv7 as select mv1.v as mv1v, mv5.v as mv5v from mv1
-join mv5 on mv1.v = mv5.v;",
+join mv5 on mv1.v = mv5.v limit 1;",
)
.await?;
@@ -316,6 +313,7 @@ join mv5 on mv1.v = mv5.v;",
let top_materialize_fragment = cluster
.locate_one_fragment([
identity_contains("materialize"),
+ no_identity_contains("topn"),
no_identity_contains("chain"),
no_identity_contains("hashJoin"),
])