=
diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs
index a499a9c6ea3d3..07d2a6c7653e7 100644
--- a/src/frontend/src/optimizer/plan_node/logical_scan.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs
@@ -273,7 +273,7 @@ impl LogicalScan {
self.output_col_idx().to_vec(),
self.core.table_desc.clone(),
self.indexes().to_vec(),
- self.base.ctx.clone(),
+ self.base.ctx().clone(),
predicate,
self.for_system_time_as_of_proctime(),
self.table_cardinality(),
@@ -288,7 +288,7 @@ impl LogicalScan {
output_col_idx,
self.core.table_desc.clone(),
self.indexes().to_vec(),
- self.base.ctx.clone(),
+ self.base.ctx().clone(),
self.predicate().clone(),
self.for_system_time_as_of_proctime(),
self.table_cardinality(),
@@ -309,7 +309,7 @@ impl_plan_tree_node_for_leaf! {LogicalScan}
impl Distill for LogicalScan {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(5);
vec.push(("table", Pretty::from(self.table_name().to_owned())));
let key_is_columns =
@@ -440,7 +440,7 @@ impl LogicalScan {
let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
self.core.table_desc.clone(),
self.base
- .ctx
+ .ctx()
.session_ctx()
.config()
.get_max_split_range_gap(),
@@ -551,7 +551,7 @@ impl ToStream for LogicalScan {
None.into(),
)));
}
- match self.base.stream_key.is_none() {
+ match self.base.stream_key().is_none() {
true => {
let mut col_ids = HashSet::new();
diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs
index d924ee7180168..d6b5711740a98 100644
--- a/src/frontend/src/optimizer/plan_node/logical_share.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_share.rs
@@ -69,7 +69,7 @@ impl LogicalShare {
}
pub(super) fn pretty_fields<'a>(base: &PlanBase, name: &'a str) -> XmlNode<'a> {
- childless_record(name, vec![("id", Pretty::debug(&base.id.0))])
+ childless_record(name, vec![("id", Pretty::debug(&base.id().0))])
}
}
diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs
index 1d37da9eaa40f..45a5fbcb2240f 100644
--- a/src/frontend/src/optimizer/plan_node/logical_source.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_source.rs
@@ -28,6 +28,7 @@ use risingwave_connector::source::{ConnectorProperties, DataType};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::GeneratedColumnDesc;
+use super::generic::GenericPlanRef;
use super::stream_watermark_filter::StreamWatermarkFilter;
use super::utils::{childless_record, Distill};
use super::{
@@ -204,7 +205,7 @@ impl LogicalSource {
..self.core.clone()
};
let mut new_s3_plan: PlanRef = StreamSource {
- base: PlanBase::new_stream_with_logical(
+ base: PlanBase::new_stream_with_core(
&logical_source,
Distribution::Single,
true, // `list` will keep listing all objects, it must be append-only
@@ -506,7 +507,7 @@ impl PredicatePushdown for LogicalSource {
let mut new_conjunctions = Vec::with_capacity(predicate.conjunctions.len());
for expr in predicate.conjunctions {
- if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, &self.base.schema) {
+ if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, self.base.schema()) {
// Not recognized, so push back
new_conjunctions.push(e);
}
diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs
index 51e4e620cf4ca..1f02b026c0020 100644
--- a/src/frontend/src/optimizer/plan_node/logical_union.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_union.rs
@@ -130,7 +130,7 @@ impl ToBatch for LogicalUnion {
if !self.all() {
let batch_union = BatchUnion::new(new_logical).into();
Ok(BatchHashAgg::new(
- generic::Agg::new(vec![], (0..self.base.schema.len()).collect(), batch_union)
+ generic::Agg::new(vec![], (0..self.base.schema().len()).collect(), batch_union)
.with_enable_two_phase(false),
)
.into())
@@ -170,7 +170,7 @@ impl ToStream for LogicalUnion {
&self,
ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
- let original_schema = self.base.schema.clone();
+ let original_schema = self.base.schema().clone();
let original_schema_len = original_schema.len();
let mut rewrites = vec![];
for input in &self.core.inputs {
@@ -353,7 +353,7 @@ mod tests {
// Check the result
let union = plan.as_logical_union().unwrap();
- assert_eq!(union.base.schema.len(), 2);
+ assert_eq!(union.base.schema().len(), 2);
}
#[tokio::test]
diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs
index 80e4f350d8edb..1dbe1d3d3c5c9 100644
--- a/src/frontend/src/optimizer/plan_node/logical_update.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_update.rs
@@ -15,6 +15,7 @@
use risingwave_common::catalog::TableVersionId;
use risingwave_common::error::Result;
+use super::generic::GenericPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, LogicalProject,
diff --git a/src/frontend/src/optimizer/plan_node/logical_values.rs b/src/frontend/src/optimizer/plan_node/logical_values.rs
index c6a3d2ac0564e..e62c6400f2015 100644
--- a/src/frontend/src/optimizer/plan_node/logical_values.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_values.rs
@@ -21,6 +21,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::types::{DataType, ScalarImpl};
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{
BatchValues, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PredicatePushdown,
@@ -144,7 +145,7 @@ impl ColPrunable for LogicalValues {
.iter()
.map(|i| self.schema().fields[*i].clone())
.collect();
- Self::new(rows, Schema { fields }, self.base.ctx.clone()).into()
+ Self::new(rows, Schema { fields }, self.base.ctx().clone()).into()
}
}
diff --git a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
index 73f82e86aa260..9f2e8d94634be 100644
--- a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
+++ b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::hash::Hash;
+use super::generic::GenericPlanRef;
use super::{EndoPlan, LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary, VisitPlan};
use crate::optimizer::plan_visitor;
use crate::utils::{Endo, Visit};
diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs
index 188787c93b8c0..f16ebfb0c792c 100644
--- a/src/frontend/src/optimizer/plan_node/mod.rs
+++ b/src/frontend/src/optimizer/plan_node/mod.rs
@@ -46,7 +46,7 @@ use serde::Serialize;
use smallvec::SmallVec;
use self::batch::BatchPlanRef;
-use self::generic::GenericPlanRef;
+use self::generic::{GenericPlanRef, PhysicalPlanRef};
use self::stream::StreamPlanRef;
use self::utils::Distill;
use super::property::{Distribution, FunctionalDependencySet, Order};
@@ -419,29 +419,31 @@ impl PlanTreeNode for PlanRef {
}
}
-impl StreamPlanRef for PlanRef {
- fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
+impl PlanNodeMeta for PlanRef {
+ fn node_type(&self) -> PlanNodeType {
+ self.0.node_type()
}
- fn append_only(&self) -> bool {
- self.plan_base().append_only
+ fn plan_base(&self) -> &PlanBase {
+ self.0.plan_base()
}
- fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
+ fn convention(&self) -> Convention {
+ self.0.convention()
}
}
-impl BatchPlanRef for PlanRef {
- fn order(&self) -> &Order {
- &self.plan_base().order
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+impl GenericPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn id(&self) -> PlanNodeId {
+ self.plan_base().id()
}
-}
-impl GenericPlanRef for PlanRef {
fn schema(&self) -> &Schema {
- &self.plan_base().schema
+ self.plan_base().schema()
}
fn stream_key(&self) -> Option<&[usize]> {
@@ -457,6 +459,47 @@ impl GenericPlanRef for PlanRef {
}
}
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Stream` or `Batch`.
+impl
PhysicalPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn distribution(&self) -> &Distribution {
+ self.plan_base().distribution()
+ }
+}
+
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Stream`.
+impl
StreamPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn append_only(&self) -> bool {
+ self.plan_base().append_only()
+ }
+
+ fn emit_on_window_close(&self) -> bool {
+ self.plan_base().emit_on_window_close()
+ }
+
+ fn watermark_columns(&self) -> &FixedBitSet {
+ self.plan_base().watermark_columns()
+ }
+}
+
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Batch`.
+impl
BatchPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn order(&self) -> &Order {
+ self.plan_base().order()
+ }
+}
+
/// In order to let expression display id started from 1 for explaining, hidden column names and
/// other places. We will reset expression display id to 0 and clone the whole plan to reset the
/// schema.
@@ -512,15 +555,15 @@ pub(crate) fn pretty_config() -> PrettyConfig {
impl dyn PlanNode {
pub fn id(&self) -> PlanNodeId {
- self.plan_base().id
+ self.plan_base().id()
}
pub fn ctx(&self) -> OptimizerContextRef {
- self.plan_base().ctx.clone()
+ self.plan_base().ctx().clone()
}
pub fn schema(&self) -> &Schema {
- &self.plan_base().schema
+ self.plan_base().schema()
}
pub fn stream_key(&self) -> Option<&[usize]> {
@@ -528,27 +571,28 @@ impl dyn PlanNode {
}
pub fn order(&self) -> &Order {
- &self.plan_base().order
+ self.plan_base().order()
}
+ // TODO: avoid no manual delegation
pub fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
+ self.plan_base().distribution()
}
pub fn append_only(&self) -> bool {
- self.plan_base().append_only
+ self.plan_base().append_only()
}
pub fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
+ self.plan_base().emit_on_window_close()
}
pub fn functional_dependency(&self) -> &FunctionalDependencySet {
- &self.plan_base().functional_dependency
+ self.plan_base().functional_dependency()
}
pub fn watermark_columns(&self) -> &FixedBitSet {
- &self.plan_base().watermark_columns
+ self.plan_base().watermark_columns()
}
/// Serialize the plan node and its children to a stream plan proto.
diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs
index e9a5bf26885bf..51b1aa5f41141 100644
--- a/src/frontend/src/optimizer/plan_node/plan_base.rs
+++ b/src/frontend/src/optimizer/plan_node/plan_base.rs
@@ -14,47 +14,132 @@
use educe::Educe;
use fixedbitset::FixedBitSet;
-use paste::paste;
use risingwave_common::catalog::Schema;
use super::generic::GenericPlanNode;
use super::*;
-use crate::for_all_plan_nodes;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order};
-/// the common fields of all nodes, please make a field named `base` in
-/// every planNode and correctly value it when construct the planNode.
-#[derive(Clone, Debug, Educe)]
-#[educe(PartialEq, Eq, Hash)]
-pub struct PlanBase {
- #[educe(PartialEq(ignore))]
- #[educe(Hash(ignore))]
- pub id: PlanNodeId,
- #[educe(PartialEq(ignore))]
- #[educe(Hash(ignore))]
- pub ctx: OptimizerContextRef,
- pub schema: Schema,
- /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key
- pub stream_key: Option>,
- /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
- /// correctness, but insert unnecessary sort in plan
- pub order: Order,
+/// Common extra fields for physical plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct PhysicalCommonExtra {
/// The distribution property of the PlanNode's output, store an `Distribution::any()` here
/// will not affect correctness, but insert unnecessary exchange in plan
- pub dist: Distribution,
+ dist: Distribution,
+}
+
+/// Extra fields for stream plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct StreamExtra {
+ /// Common fields for physical plan nodes.
+ physical: PhysicalCommonExtra,
+
/// The append-only property of the PlanNode's output is a stream-only property. Append-only
/// means the stream contains only insert operation.
- pub append_only: bool,
+ append_only: bool,
/// Whether the output is emitted on window close.
- pub emit_on_window_close: bool,
- pub functional_dependency: FunctionalDependencySet,
+ emit_on_window_close: bool,
/// The watermark column indices of the PlanNode's output. There could be watermark output from
/// this stream operator.
- pub watermark_columns: FixedBitSet,
+ watermark_columns: FixedBitSet,
+}
+
+/// Extra fields for batch plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct BatchExtra {
+ /// Common fields for physical plan nodes.
+ physical: PhysicalCommonExtra,
+
+ /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
+ /// correctness, but insert unnecessary sort in plan
+ order: Order,
+}
+
+/// Extra fields for physical plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+enum PhysicalExtra {
+ Stream(StreamExtra),
+ Batch(BatchExtra),
+}
+
+impl PhysicalExtra {
+ fn common(&self) -> &PhysicalCommonExtra {
+ match self {
+ PhysicalExtra::Stream(stream) => &stream.physical,
+ PhysicalExtra::Batch(batch) => &batch.physical,
+ }
+ }
+
+ fn common_mut(&mut self) -> &mut PhysicalCommonExtra {
+ match self {
+ PhysicalExtra::Stream(stream) => &mut stream.physical,
+ PhysicalExtra::Batch(batch) => &mut batch.physical,
+ }
+ }
+
+ fn stream(&self) -> &StreamExtra {
+ match self {
+ PhysicalExtra::Stream(extra) => extra,
+ _ => panic!("access stream properties from batch plan node"),
+ }
+ }
+
+ fn batch(&self) -> &BatchExtra {
+ match self {
+ PhysicalExtra::Batch(extra) => extra,
+ _ => panic!("access batch properties from stream plan node"),
+ }
+ }
+}
+
+/// the common fields of all nodes, please make a field named `base` in
+/// every planNode and correctly value it when construct the planNode.
+///
+/// All fields are intentionally made private and immutable, as they should
+/// normally be the same as the given [`GenericPlanNode`] when constructing.
+///
+/// - To access them, use traits including [`GenericPlanRef`],
+/// [`PhysicalPlanRef`], [`StreamPlanRef`] and [`BatchPlanRef`].
+/// - To mutate them, use methods like `new_*` or `clone_with_*`.
+#[derive(Clone, Debug, Educe)]
+#[educe(PartialEq, Eq, Hash)]
+pub struct PlanBase {
+ // -- common fields --
+ #[educe(PartialEq(ignore), Hash(ignore))]
+ id: PlanNodeId,
+ #[educe(PartialEq(ignore), Hash(ignore))]
+ ctx: OptimizerContextRef,
+
+ schema: Schema,
+ /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key
+ // TODO: this is actually a logical and stream only property
+ stream_key: Option>,
+ functional_dependency: FunctionalDependencySet,
+
+ /// Extra fields if the plan node is physical.
+ physical_extra: Option,
+}
+
+impl PlanBase {
+ fn physical_extra(&self) -> &PhysicalExtra {
+ self.physical_extra
+ .as_ref()
+ .expect("access physical properties from logical plan node")
+ }
+
+ fn physical_extra_mut(&mut self) -> &mut PhysicalExtra {
+ self.physical_extra
+ .as_mut()
+ .expect("access physical properties from logical plan node")
+ }
}
impl generic::GenericPlanRef for PlanBase {
+ fn id(&self) -> PlanNodeId {
+ self.id
+ }
+
fn schema(&self) -> &Schema {
&self.schema
}
@@ -72,23 +157,29 @@ impl generic::GenericPlanRef for PlanBase {
}
}
-impl stream::StreamPlanRef for PlanBase {
+impl generic::PhysicalPlanRef for PlanBase {
fn distribution(&self) -> &Distribution {
- &self.dist
+ &self.physical_extra().common().dist
}
+}
+impl stream::StreamPlanRef for PlanBase {
fn append_only(&self) -> bool {
- self.append_only
+ self.physical_extra().stream().append_only
}
fn emit_on_window_close(&self) -> bool {
- self.emit_on_window_close
+ self.physical_extra().stream().emit_on_window_close
+ }
+
+ fn watermark_columns(&self) -> &FixedBitSet {
+ &self.physical_extra().stream().watermark_columns
}
}
impl batch::BatchPlanRef for PlanBase {
fn order(&self) -> &Order {
- &self.order
+ &self.physical_extra().batch().order
}
}
@@ -100,47 +191,22 @@ impl PlanBase {
functional_dependency: FunctionalDependencySet,
) -> Self {
let id = ctx.next_plan_node_id();
- let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
schema,
stream_key,
- dist: Distribution::Single,
- order: Order::any(),
- // Logical plan node won't touch `append_only` field
- append_only: true,
- emit_on_window_close: false,
functional_dependency,
- watermark_columns,
+ physical_extra: None,
}
}
- pub fn new_logical_with_core(node: &impl GenericPlanNode) -> Self {
+ pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
Self::new_logical(
- node.ctx(),
- node.schema(),
- node.stream_key(),
- node.functional_dependency(),
- )
- }
-
- pub fn new_stream_with_logical(
- logical: &impl GenericPlanNode,
- dist: Distribution,
- append_only: bool,
- emit_on_window_close: bool,
- watermark_columns: FixedBitSet,
- ) -> Self {
- Self::new_stream(
- logical.ctx(),
- logical.schema(),
- logical.stream_key(),
- logical.functional_dependency(),
- dist,
- append_only,
- emit_on_window_close,
- watermark_columns,
+ core.ctx(),
+ core.schema(),
+ core.stream_key(),
+ core.functional_dependency(),
)
}
@@ -160,22 +226,36 @@ impl PlanBase {
id,
ctx,
schema,
- dist,
- order: Order::any(),
stream_key,
- append_only,
- emit_on_window_close,
functional_dependency,
- watermark_columns,
+ physical_extra: Some(PhysicalExtra::Stream({
+ StreamExtra {
+ physical: PhysicalCommonExtra { dist },
+ append_only,
+ emit_on_window_close,
+ watermark_columns,
+ }
+ })),
}
}
- pub fn new_batch_from_logical(
- logical: &impl GenericPlanNode,
+ pub fn new_stream_with_core(
+ core: &impl GenericPlanNode,
dist: Distribution,
- order: Order,
+ append_only: bool,
+ emit_on_window_close: bool,
+ watermark_columns: FixedBitSet,
) -> Self {
- Self::new_batch(logical.ctx(), logical.schema(), dist, order)
+ Self::new_stream(
+ core.ctx(),
+ core.schema(),
+ core.stream_key(),
+ core.functional_dependency(),
+ dist,
+ append_only,
+ emit_on_window_close,
+ watermark_columns,
+ )
}
pub fn new_batch(
@@ -186,75 +266,49 @@ impl PlanBase {
) -> Self {
let id = ctx.next_plan_node_id();
let functional_dependency = FunctionalDependencySet::new(schema.len());
- let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
schema,
- dist,
- order,
stream_key: None,
- // Batch plan node won't touch `append_only` field
- append_only: true,
- emit_on_window_close: false, // TODO(rc): batch EOWC support?
functional_dependency,
- watermark_columns,
+ physical_extra: Some(PhysicalExtra::Batch({
+ BatchExtra {
+ physical: PhysicalCommonExtra { dist },
+ order,
+ }
+ })),
}
}
- pub fn derive_stream_plan_base(plan_node: &PlanRef) -> Self {
- PlanBase::new_stream(
- plan_node.ctx(),
- plan_node.schema().clone(),
- plan_node.stream_key().map(|v| v.to_vec()),
- plan_node.functional_dependency().clone(),
- plan_node.distribution().clone(),
- plan_node.append_only(),
- plan_node.emit_on_window_close(),
- plan_node.watermark_columns().clone(),
- )
+ pub fn new_batch_with_core(
+ core: &impl GenericPlanNode,
+ dist: Distribution,
+ order: Order,
+ ) -> Self {
+ Self::new_batch(core.ctx(), core.schema(), dist, order)
}
pub fn clone_with_new_plan_id(&self) -> Self {
let mut new = self.clone();
- new.id = self.ctx.next_plan_node_id();
+ new.id = self.ctx().next_plan_node_id();
+ new
+ }
+
+ /// Clone the plan node with a new distribution.
+ ///
+ /// Panics if the plan node is not physical.
+ pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
+ let mut new = self.clone();
+ new.physical_extra_mut().common_mut().dist = dist;
new
}
}
-macro_rules! impl_base_delegate {
- ($( { $convention:ident, $name:ident }),*) => {
- $(paste! {
- impl [<$convention $name>] {
- pub fn id(&self) -> PlanNodeId {
- self.plan_base().id
- }
- pub fn ctx(&self) -> OptimizerContextRef {
- self.plan_base().ctx()
- }
- pub fn schema(&self) -> &Schema {
- &self.plan_base().schema
- }
- pub fn stream_key(&self) -> Option<&[usize]> {
- self.plan_base().stream_key()
- }
- pub fn order(&self) -> &Order {
- &self.plan_base().order
- }
- pub fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
- }
- pub fn append_only(&self) -> bool {
- self.plan_base().append_only
- }
- pub fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
- }
- pub fn functional_dependency(&self) -> &FunctionalDependencySet {
- &self.plan_base().functional_dependency
- }
- }
- })*
+// Mutators for testing only.
+#[cfg(test)]
+impl PlanBase {
+ pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
+ &mut self.functional_dependency
}
}
-for_all_plan_nodes! { impl_base_delegate }
diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs
index 2edf997bf91fd..866c62c2413a5 100644
--- a/src/frontend/src/optimizer/plan_node/stream.rs
+++ b/src/frontend/src/optimizer/plan_node/stream.rs
@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use super::generic::GenericPlanRef;
-use crate::optimizer::property::Distribution;
+use fixedbitset::FixedBitSet;
-/// A subtrait of [`GenericPlanRef`] for stream plans.
+use super::generic::PhysicalPlanRef;
+
+/// A subtrait of [`PhysicalPlanRef`] for stream plans.
///
/// Due to the lack of refactoring, all plan nodes currently implement this trait
/// through [`super::PlanBase`]. One may still use this trait as a bound for
-/// expecting a stream plan, in contrast to [`GenericPlanRef`].
-pub trait StreamPlanRef: GenericPlanRef {
- fn distribution(&self) -> &Distribution;
+/// accessing a stream plan, in contrast to [`GenericPlanRef`] or
+/// [`PhysicalPlanRef`].
+///
+/// [`GenericPlanRef`]: super::generic::GenericPlanRef
+pub trait StreamPlanRef: PhysicalPlanRef {
fn append_only(&self) -> bool;
fn emit_on_window_close(&self) -> bool;
+ fn watermark_columns(&self) -> &FixedBitSet;
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs
index 6e96d0eab0e93..51b5e589e886e 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs
@@ -17,7 +17,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::DedupNode;
-use super::generic::{self, GenericPlanNode, GenericPlanRef};
+use super::generic::{self, GenericPlanNode, GenericPlanRef, PhysicalPlanRef};
use super::stream::StreamPlanRef;
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
@@ -37,7 +37,7 @@ impl StreamDedup {
// A dedup operator must be append-only.
assert!(input.append_only());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
true,
diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
index db9e6ac296bbf..bb18f9cffdf0f 100644
--- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
@@ -20,7 +20,7 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
-use super::generic::{self};
+use super::generic::{self, GenericPlanRef};
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
@@ -67,7 +67,7 @@ impl StreamDeltaJoin {
core.i2o_col_mapping().rewrite_bitset(&watermark_columns)
};
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
append_only,
@@ -90,7 +90,7 @@ impl StreamDeltaJoin {
impl Distill for StreamDeltaJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.core.join_type)));
diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs
index c9f969384c3a4..9b000974786e4 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dml.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs
@@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
index e1ca18da937e9..a4b74f37208e7 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
@@ -17,7 +17,8 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::DynamicFilterNode;
-use super::generic::DynamicFilter;
+use super::generic::{DynamicFilter, GenericPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, column_names_pretty, watermark_pretty, Distill};
use super::{generic, ExprRewritable};
use crate::expr::{Expr, ExprImpl};
@@ -37,7 +38,7 @@ impl StreamDynamicFilter {
let watermark_columns = core.watermark_columns(core.right().watermark_columns()[0]);
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
core.left().distribution().clone(),
false, /* we can have a new abstraction for append only and monotonically increasing
@@ -78,11 +79,11 @@ impl StreamDynamicFilter {
impl Distill for StreamDynamicFilter {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let pred = self.core.pretty_field();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("predicate", pred));
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
vec.push(("output", column_names_pretty(self.schema())));
diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
index 9418af8e4a364..d8c5a9635ce59 100644
--- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
@@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{self, PlanWindowFunction};
+use super::generic::{self, GenericPlanRef, PlanWindowFunction};
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -50,7 +50,7 @@ impl StreamEowcOverWindow {
// ancient rows in some rarely updated partitions that are emitted at the end of time.
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
true,
diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs
index 0fa1713bf4488..99e6c3c5161a1 100644
--- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs
@@ -16,6 +16,8 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Distribution, DistributionDisplay};
@@ -78,7 +80,7 @@ impl StreamExchange {
impl Distill for StreamExchange {
fn distill<'a>(&self) -> XmlNode<'a> {
let distribution_display = DistributionDisplay {
- distribution: &self.base.dist,
+ distribution: self.base.distribution(),
input_schema: self.input.schema(),
};
childless_record(
@@ -117,13 +119,13 @@ impl StreamNode for StreamExchange {
})
} else {
Some(DispatchStrategy {
- r#type: match &self.base.dist {
+ r#type: match &self.base.distribution() {
Distribution::HashShard(_) => DispatcherType::Hash,
Distribution::Single => DispatcherType::Simple,
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
- dist_key_indices: match &self.base.dist {
+ dist_key_indices: match &self.base.distribution() {
Distribution::HashShard(keys) => {
keys.iter().map(|num| *num as u32).collect()
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs
index e0f8852a19fb5..5959b8d6be4d2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_expand.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs
@@ -48,7 +48,7 @@ impl StreamExpand {
.map(|idx| idx + input.schema().len()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs
index ff4d344607776..0f000e6b8c0db 100644
--- a/src/frontend/src/optimizer/plan_node/stream_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs
@@ -34,7 +34,7 @@ impl StreamFilter {
let input = core.input.clone();
let dist = input.distribution().clone();
// Filter executor won't change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
index 190c05c0a5ba1..95fd72e9f6aa0 100644
--- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
@@ -48,7 +48,7 @@ impl_plan_tree_node_for_unary! { StreamFsFetch }
impl StreamFsFetch {
pub fn new(input: PlanRef, source: generic::Source) -> Self {
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&source,
Distribution::SomeShard,
source.catalog.as_ref().map_or(true, |s| s.append_only),
diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
index 14711d353f9d8..3e8f3c00206c4 100644
--- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
@@ -16,7 +16,8 @@ use fixedbitset::FixedBitSet;
use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{DistillUnit, TopNLimit};
+use super::generic::{DistillUnit, GenericPlanRef, TopNLimit};
+use super::stream::StreamPlanRef;
use super::utils::{plan_node_name, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::generic::GenericPlanNode;
@@ -135,7 +136,7 @@ impl Distill for StreamGroupTopN {
{ "append_only", self.input().append_only() },
);
let mut node = self.core.distill_with_name(name);
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
node.fields.push(("output_watermarks".into(), ow));
}
node
diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
index 25e1ac801f97c..55ab6b5906e59 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
@@ -18,10 +18,11 @@ use pretty_xmlish::XmlNode;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{self, PlanAggCall};
+use super::generic::{self, GenericPlanRef, PlanAggCall};
use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};
@@ -85,7 +86,7 @@ impl StreamHashAgg {
}
// Hash agg executor might change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
emit_on_window_close, // in EOWC mode, we produce append only output
@@ -142,7 +143,7 @@ impl StreamHashAgg {
impl Distill for StreamHashAgg {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record(
@@ -214,7 +215,7 @@ impl StreamNode for StreamHashAgg {
})
.collect(),
row_count_index: self.row_count_idx as u32,
- emit_on_window_close: self.base.emit_on_window_close,
+ emit_on_window_close: self.base.emit_on_window_close(),
})
}
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
index 0075b1730b4eb..9d9c41425c4b1 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
@@ -20,7 +20,8 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DeltaExpression, HashJoinNode, PbInequalityPair};
-use super::generic::Join;
+use super::generic::{GenericPlanRef, Join};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode,
@@ -178,7 +179,7 @@ impl StreamHashJoin {
};
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
append_only,
@@ -291,7 +292,7 @@ impl Distill for StreamHashJoin {
{ "interval", self.clean_left_state_conjunction_idx.is_some() && self.clean_right_state_conjunction_idx.is_some() },
{ "append_only", self.is_append_only },
);
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(6);
vec.push(("type", Pretty::debug(&self.core.join_type)));
@@ -316,7 +317,7 @@ impl Distill for StreamHashJoin {
if let Some(i) = self.clean_right_state_conjunction_idx {
vec.push(("conditions_to_clean_right_state_table", get_cond(i)));
}
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
index c68b1b307d470..e177be6942360 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
@@ -17,6 +17,8 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::HopWindowNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
@@ -56,7 +58,7 @@ impl StreamHopWindow {
)
.rewrite_bitset(&watermark_columns);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
@@ -75,7 +77,7 @@ impl StreamHopWindow {
impl Distill for StreamHopWindow {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record("StreamHopWindow", vec)
diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs
index fb17537bc90e6..9c87f1a34abbd 100644
--- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs
@@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use super::derive::derive_columns;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion};
use crate::catalog::FragmentId;
use crate::optimizer::plan_node::derive::derive_pk;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -149,7 +151,22 @@ impl StreamMaterialize {
TableType::MaterializedView => {
assert_matches!(user_distributed_by, RequiredDist::Any);
// ensure the same pk will not shuffle to different node
- RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key())
+ let required_dist =
+ RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key());
+
+ // If the input is a stream join, enforce the stream key as the materialized
+ // view distribution key to avoid slow backfilling caused by
+ // data skew of the dimension table join key.
+ // See for more information.
+ let is_stream_join = matches!(input.as_stream_hash_join(), Some(_join))
+ || matches!(input.as_stream_temporal_join(), Some(_join))
+ || matches!(input.as_stream_delta_join(), Some(_join));
+
+ if is_stream_join {
+ return Ok(required_dist.enforce(input, &Order::any()));
+ }
+
+ required_dist
}
TableType::Index => {
assert_matches!(
@@ -273,8 +290,8 @@ impl Distill for StreamMaterialize {
vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
- let watermark_columns = &self.base.watermark_columns;
- if self.base.watermark_columns.count_ones(..) > 0 {
+ let watermark_columns = &self.base.watermark_columns();
+ if self.base.watermark_columns().count_ones(..) > 0 {
let watermark_column_names = watermark_columns
.ones()
.map(|i| table.columns()[i].name_with_hidden().to_string())
@@ -294,16 +311,16 @@ impl PlanTreeNodeUnary for StreamMaterialize {
fn clone_with_input(&self, input: PlanRef) -> Self {
let new = Self::new(input, self.table().clone());
new.base
- .schema
+ .schema()
.fields
.iter()
- .zip_eq_fast(self.base.schema.fields.iter())
+ .zip_eq_fast(self.base.schema().fields.iter())
.for_each(|(a, b)| {
assert_eq!(a.data_type, b.data_type);
assert_eq!(a.type_name, b.type_name);
assert_eq!(a.sub_fields, b.sub_fields);
});
- assert_eq!(new.plan_base().stream_key, self.plan_base().stream_key);
+ assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
new
}
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs
index 9eb0a0e0f143e..91ebc344fa51d 100644
--- a/src/frontend/src/optimizer/plan_node/stream_now.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_now.rs
@@ -19,8 +19,7 @@ use risingwave_common::types::DataType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::NowNode;
-use super::generic::GenericPlanRef;
-use super::stream::StreamPlanRef;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::utils::{childless_record, Distill, TableCatalogBuilder};
use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode};
use crate::optimizer::plan_node::utils::column_names_pretty;
@@ -59,7 +58,7 @@ impl StreamNow {
impl Distill for StreamNow {
fn distill<'a>(&self) -> XmlNode<'a> {
- let vec = if self.base.ctx.is_explain_verbose() {
+ let vec = if self.base.ctx().is_explain_verbose() {
vec![("output", column_names_pretty(self.schema()))]
} else {
vec![]
diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs
index 0d749f0c7b0e6..5a2f9d98f1340 100644
--- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs
@@ -21,6 +21,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use super::generic::{GenericPlanNode, PlanWindowFunction};
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::TableCatalog;
@@ -37,7 +38,7 @@ impl StreamOverWindow {
let input = &core.input;
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
false, // general over window cannot be append-only
@@ -122,7 +123,7 @@ impl StreamNode for StreamOverWindow {
.to_internal_table_prost();
let cache_policy = self
.base
- .ctx
+ .ctx()
.session_ctx()
.config()
.get_streaming_over_window_cache_policy();
diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs
index 8a7665881e0cf..c0ff0d1cf2f43 100644
--- a/src/frontend/src/optimizer/plan_node/stream_project.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_project.rs
@@ -17,6 +17,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::ProjectNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{try_derive_watermark, Expr, ExprImpl, ExprRewriter, WatermarkDerivation};
@@ -41,7 +43,7 @@ impl Distill for StreamProject {
let schema = self.schema();
let mut vec = self.core.fields_pretty(schema);
if let Some(display_output_watermarks) =
- watermark_pretty(&self.base.watermark_columns, schema)
+ watermark_pretty(self.base.watermark_columns(), schema)
{
vec.push(("output_watermarks", display_output_watermarks));
}
@@ -79,7 +81,7 @@ impl StreamProject {
}
// Project executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs
index cadd600f3c3b7..ba09d79c96c60 100644
--- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs
@@ -66,7 +66,7 @@ impl StreamProjectSet {
// ProjectSet executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs
index 8b406005f40a6..3acf0b132805e 100644
--- a/src/frontend/src/optimizer/plan_node/stream_share.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_share.rs
@@ -16,6 +16,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::PbStreamNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::Distill;
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode};
use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
@@ -34,7 +36,7 @@ impl StreamShare {
let input = core.input.borrow().0.clone();
let dist = input.distribution().clone();
// Filter executor won't change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
@@ -79,7 +81,7 @@ impl StreamNode for StreamShare {
impl StreamShare {
pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode {
- let operator_id = self.base.id.0 as u32;
+ let operator_id = self.base.id().0 as u32;
match state.get_share_stream_node(operator_id) {
None => {
diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
index 59311dd22226c..92d96fdf21b08 100644
--- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
@@ -21,6 +21,8 @@ use super::generic::{self, PlanAggCall};
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::generic::PhysicalPlanRef;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -48,7 +50,7 @@ impl StreamSimpleAgg {
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
// Simple agg executor might change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns);
+ let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns);
StreamSimpleAgg {
base,
core,
@@ -99,7 +101,7 @@ impl StreamNode for StreamSimpleAgg {
.collect(),
distribution_key: self
.base
- .dist
+ .distribution()
.dist_column_indices()
.iter()
.map(|idx| *idx as u32)
diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs
index a51380d630331..32e9fb487910c 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sink.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs
@@ -37,6 +37,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use tracing::info;
use super::derive::{derive_columns, derive_pk};
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, StreamNode};
use crate::optimizer::plan_node::PlanTreeNodeUnary;
@@ -57,7 +58,7 @@ pub struct StreamSink {
impl StreamSink {
#[must_use]
pub fn new(input: PlanRef, sink_desc: SinkDesc) -> Self {
- let base = PlanBase::derive_stream_plan_base(&input);
+ let base = input.plan_base().clone_with_new_plan_id();
Self {
base,
input,
@@ -389,7 +390,7 @@ impl Distill for StreamSink {
.iter()
.map(|k| k.column_index)
.collect_vec(),
- schema: &self.base.schema,
+ schema: self.base.schema(),
};
vec.push(("pk", pk.distill()));
}
@@ -409,7 +410,7 @@ impl StreamNode for StreamSink {
PbNodeBody::Sink(SinkNode {
sink_desc: Some(self.sink_desc.to_proto()),
table: Some(table.to_internal_table_prost()),
- log_store_type: match self.base.ctx.session_ctx().config().get_sink_decouple() {
+ log_store_type: match self.base.ctx().session_ctx().config().get_sink_decouple() {
SinkDecouple::Default => {
let enable_sink_decouple =
match_sink_name_str!(
diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs
index b82d71068d817..41a56a0fd5df2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sort.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs
@@ -20,6 +20,8 @@ use risingwave_common::catalog::FieldDisplay;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -84,7 +86,7 @@ impl StreamEowcSort {
tbl_builder.add_order_column(self.sort_column_index, OrderType::ascending());
order_cols.insert(self.sort_column_index);
- let dist_key = self.base.dist.dist_column_indices().to_vec();
+ let dist_key = self.base.distribution().dist_column_indices().to_vec();
for idx in &dist_key {
if !order_cols.contains(idx) {
tbl_builder.add_order_column(*idx, OrderType::ascending());
diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs
index 377e2704776bb..ae66cf568118b 100644
--- a/src/frontend/src/optimizer/plan_node/stream_source.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_source.rs
@@ -37,7 +37,7 @@ pub struct StreamSource {
impl StreamSource {
pub fn new(core: generic::Source) -> Self {
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
Distribution::SomeShard,
core.catalog.as_ref().map_or(true, |s| s.append_only),
diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
index 0af7ebded94d9..474582ec877c7 100644
--- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
@@ -20,6 +20,7 @@ use super::generic::{self, PlanAggCall};
use super::utils::impl_distill_by_unit;
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::property::RequiredDist;
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -49,7 +50,7 @@ impl StreamStatelessSimpleAgg {
}
}
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input_dist.clone(),
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
index 907a41db28525..965ca217a3369 100644
--- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
@@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ChainType, PbStreamNode};
+use super::generic::PhysicalPlanRef;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::{ExprRewriter, FunctionCall};
use crate::optimizer::plan_node::generic::GenericPlanRef;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -66,7 +68,7 @@ impl StreamTableScan {
None => Distribution::SomeShard,
}
};
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
core.table_desc.append_only,
@@ -192,7 +194,7 @@ impl_plan_tree_node_for_leaf! { StreamTableScan }
impl Distill for StreamTableScan {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(4);
vec.push(("table", Pretty::from(self.core.table_name.clone())));
vec.push(("columns", self.core.columns_pretty(verbose)));
@@ -200,12 +202,12 @@ impl Distill for StreamTableScan {
if verbose {
let pk = IndicesDisplay {
indices: self.stream_key().unwrap_or_default(),
- schema: &self.base.schema,
+ schema: self.base.schema(),
};
vec.push(("pk", pk.distill()));
let dist = Pretty::display(&DistributionDisplay {
distribution: self.distribution(),
- input_schema: &self.base.schema,
+ input_schema: self.base.schema(),
});
vec.push(("dist", dist));
}
@@ -325,7 +327,7 @@ impl StreamTableScan {
..Default::default()
})),
stream_key,
- operator_id: self.base.id.0 as u64,
+ operator_id: self.base.id().0 as u64,
identity: {
let s = self.distill_to_string();
s.replace("StreamTableScan", "Chain")
diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
index 2191ca322342d..675dbeb9ab381 100644
--- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
@@ -18,6 +18,8 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::TemporalJoinNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
@@ -61,7 +63,7 @@ impl StreamTemporalJoin {
.rewrite_bitset(core.left.watermark_columns()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
true,
@@ -88,7 +90,7 @@ impl StreamTemporalJoin {
impl Distill for StreamTemporalJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.core.join_type)));
@@ -101,7 +103,7 @@ impl Distill for StreamTemporalJoin {
}),
));
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs
index e7a880fa7d757..87890625f6be7 100644
--- a/src/frontend/src/optimizer/plan_node/stream_topn.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs
@@ -40,7 +40,7 @@ impl StreamTopN {
};
let watermark_columns = FixedBitSet::with_capacity(input.schema().len());
- let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns);
+ let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns);
StreamTopN { base, core }
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs
index 8f6353d6be44c..6d6dca2d8dd02 100644
--- a/src/frontend/src/optimizer/plan_node/stream_union.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_union.rs
@@ -19,6 +19,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::UnionNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanRef};
use crate::optimizer::plan_node::generic::GenericPlanNode;
@@ -46,7 +48,7 @@ impl StreamUnion {
|acc_watermark_columns, input| acc_watermark_columns.bitand(input.watermark_columns()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
inputs.iter().all(|x| x.append_only()),
@@ -60,7 +62,7 @@ impl StreamUnion {
impl Distill for StreamUnion {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record("StreamUnion", vec)
diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs
index fb0b844411f63..f8cc5db851159 100644
--- a/src/frontend/src/optimizer/plan_node/stream_values.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_values.rs
@@ -18,6 +18,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::values_node::ExprTuple;
use risingwave_pb::stream_plan::ValuesNode;
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode};
use crate::expr::{Expr, ExprImpl};
diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
index ed5a946603ee4..066bc9a234ca5 100644
--- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
@@ -21,6 +21,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::WatermarkDesc;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{ExprDisplay, ExprImpl};
@@ -85,7 +86,7 @@ impl Distill for StreamWatermarkFilter {
})
.collect();
let display_output_watermarks =
- watermark_pretty(&self.base.watermark_columns, input_schema).unwrap();
+ watermark_pretty(self.base.watermark_columns(), input_schema).unwrap();
let fields = vec![
("watermark_descs", Pretty::Array(display_watermark_descs)),
("output_watermarks", display_output_watermarks),
diff --git a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
index f30f3d9fa4966..7e53b903ac962 100644
--- a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
+++ b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
@@ -16,6 +16,7 @@ use std::collections::HashMap;
use itertools::Itertools;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNode, StreamShare};
use crate::optimizer::PlanRewriter;
use crate::PlanRef;
diff --git a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
index 9ab0d4d580ddc..5b9efb9fc7c94 100644
--- a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
+++ b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
@@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet};
use itertools::Itertools;
use crate::catalog::SourceId;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalShare, LogicalSource, PlanNodeId, PlanTreeNode, StreamShare,
};
diff --git a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
index 7d538392f9361..7950b5d81a49c 100644
--- a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
+++ b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use super::{DefaultBehavior, DefaultValue};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::PlanVisitor;
diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs
index 4fcaf959eac87..2df1d7ae00bc3 100644
--- a/src/frontend/src/optimizer/property/distribution.rs
+++ b/src/frontend/src/optimizer/property/distribution.rs
@@ -295,10 +295,12 @@ impl RequiredDist {
pub fn enforce_if_not_satisfies(
&self,
- plan: PlanRef,
+ mut plan: PlanRef,
required_order: &Order,
) -> Result {
- let plan = required_order.enforce_if_not_satisfies(plan)?;
+ if let Convention::Batch = plan.convention() {
+ plan = required_order.enforce_if_not_satisfies(plan)?;
+ }
if !plan.distribution().satisfies(self) {
Ok(self.enforce(plan, required_order))
} else {
@@ -329,7 +331,7 @@ impl RequiredDist {
}
}
- fn enforce(&self, plan: PlanRef, required_order: &Order) -> PlanRef {
+ pub fn enforce(&self, plan: PlanRef, required_order: &Order) -> PlanRef {
let dist = self.to_dist();
match plan.convention() {
Convention::Batch => BatchExchange::new(plan, required_order.clone(), dist).into(),
diff --git a/src/frontend/src/optimizer/property/order.rs b/src/frontend/src/optimizer/property/order.rs
index a70bffb13a8ba..19ad7586e1c11 100644
--- a/src/frontend/src/optimizer/property/order.rs
+++ b/src/frontend/src/optimizer/property/order.rs
@@ -92,7 +92,7 @@ impl Order {
}
}
- pub fn enforce(&self, plan: PlanRef) -> PlanRef {
+ fn enforce(&self, plan: PlanRef) -> PlanRef {
assert_eq!(plan.convention(), Convention::Batch);
BatchSort::new(plan, self.clone()).into()
}
diff --git a/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs b/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs
index 34025eca43032..3e22348e27b49 100644
--- a/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs
+++ b/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs
@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::aggregate::AggKind;
use super::super::plan_node::*;
@@ -48,11 +47,11 @@ impl Rule for AggGroupBySimplifyRule {
if !new_group_key.contains(i) {
let data_type = agg_input.schema().fields[i].data_type();
new_agg_calls.push(PlanAggCall {
- agg_kind: AggKind::FirstValue,
+ agg_kind: AggKind::InternalLastSeenValue,
return_type: data_type.clone(),
inputs: vec![InputRef::new(i, data_type)],
distinct: false,
- order_by: vec![ColumnOrder::new(i, OrderType::ascending())],
+ order_by: vec![],
filter: Condition::true_cond(),
direct_args: vec![],
});
diff --git a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
index 02165232372e4..eeba7d9f3be3b 100644
--- a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
+++ b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
@@ -15,6 +15,7 @@
use risingwave_common::types::ScalarImpl;
use super::Rule;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalValues};
use crate::PlanRef;
diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
index 66579248a76f9..7ac121692c81d 100644
--- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
+++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
@@ -23,6 +23,7 @@ use crate::expr::{
CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall,
InputRef,
};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary};
use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder};
use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter;
diff --git a/src/frontend/src/optimizer/rule/expand_to_project_rule.rs b/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
index 1ed1da0037aba..01a39042efd98 100644
--- a/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
+++ b/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
@@ -36,7 +36,7 @@ impl Rule for ExpandToProjectRule {
let column_subset = column_subsets.get(0).unwrap();
// if `column_subsets` len equals 1, convert it into a project
- let mut exprs = Vec::with_capacity(expand.base.schema.len());
+ let mut exprs = Vec::with_capacity(expand.base.schema().len());
// Add original input column first
for i in 0..input.schema().len() {
exprs.push(ExprImpl::InputRef(
diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs
index 9103d1bc906bc..323cc59ef3558 100644
--- a/src/frontend/src/optimizer/rule/index_selection_rule.rs
+++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs
@@ -66,6 +66,7 @@ use crate::expr::{
FunctionCall, InputRef,
};
use crate::optimizer::optimizer_context::OptimizerContextRef;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
generic, ColumnPruningContext, LogicalJoin, LogicalScan, LogicalUnion, PlanTreeNode,
PlanTreeNodeBinary, PredicatePushdown, PredicatePushdownContext,
diff --git a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
index dcbb6f7b015ee..bd2db0ac67cca 100644
--- a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
+++ b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
@@ -47,6 +47,7 @@ mod tests {
use super::*;
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::utils::Condition;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
index c496a906400ae..8682db8491a1d 100644
--- a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
+++ b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
@@ -46,6 +46,7 @@ mod tests {
use super::*;
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::utils::Condition;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
index ea8386bc227f8..c32ae40531cd0 100644
--- a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
+++ b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
@@ -27,7 +27,7 @@ use risingwave_expr::aggregate::AggKind;
use super::{BoxedRule, Rule};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
-use crate::optimizer::plan_node::generic::Agg;
+use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
use crate::optimizer::plan_node::{
LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
};
diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
index dfb6963c7fb4f..93637d3ba8193 100644
--- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
+++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
@@ -18,6 +18,7 @@ use risingwave_expr::window_function::WindowFuncKind;
use super::Rule;
use crate::expr::{collect_input_refs, ExprImpl, ExprType};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalTopN, PlanTreeNodeUnary};
use crate::optimizer::property::Order;
use crate::planner::LIMIT_ALL_COUNT;
diff --git a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
index dc5f9c2bc9aba..f34146ba80050 100644
--- a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
+++ b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
@@ -18,6 +18,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use super::super::plan_node::*;
use super::{BoxedRule, Rule};
use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_visitor::{PlanCorrelatedIdFinder, PlanVisitor};
use crate::optimizer::PlanRef;
use crate::utils::Condition;
diff --git a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
index 5a6f1187fdd02..f85ffc2318459 100644
--- a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
+++ b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
@@ -18,6 +18,7 @@ use risingwave_common::types::DataType;
use super::{BoxedRule, Rule};
use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary,
};
@@ -51,7 +52,7 @@ impl Rule for TableFunctionToProjectSetRule {
let logical_values = LogicalValues::create(
vec![vec![]],
Schema::new(vec![]),
- logical_table_function.base.ctx.clone(),
+ logical_table_function.base.ctx().clone(),
);
let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]);
// We need a project to align schema type because
diff --git a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
index 9759739490fe6..a13bef3baa9d9 100644
--- a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
+++ b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalValues, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::{LogicalCardinalityExt, SideEffectVisitor};
use crate::optimizer::{PlanRef, PlanVisitor};
diff --git a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
index 8119b8847b600..7b83c017ab781 100644
--- a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
+++ b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::{PlanRef, PlanTreeNode};
diff --git a/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs b/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
index 2a12f6b712e0d..f1d203fba1350 100644
--- a/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
+++ b/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
@@ -13,7 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
-use crate::optimizer::plan_node::generic::Agg;
+use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
use crate::optimizer::plan_node::{LogicalUnion, PlanTreeNode};
use crate::optimizer::PlanRef;
@@ -24,7 +24,7 @@ impl Rule for UnionToDistinctRule {
let union: &LogicalUnion = plan.as_logical_union()?;
if !union.all() {
let union_all = LogicalUnion::create(true, union.inputs().into_iter().collect());
- let distinct = Agg::new(vec![], (0..union.base.schema.len()).collect(), union_all)
+ let distinct = Agg::new(vec![], (0..union.base.schema().len()).collect(), union_all)
.with_enable_two_phase(false);
Some(distinct.into())
} else {
diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs
index 4e16bc6cd0b21..cb20103b3e76f 100644
--- a/src/frontend/src/scheduler/plan_fragmenter.rs
+++ b/src/frontend/src/scheduler/plan_fragmenter.rs
@@ -103,7 +103,7 @@ impl Serialize for ExecutionPlanNode {
impl From for ExecutionPlanNode {
fn from(plan_node: PlanRef) -> Self {
Self {
- plan_node_id: plan_node.plan_base().id,
+ plan_node_id: plan_node.plan_base().id(),
plan_node_type: plan_node.node_type(),
node: plan_node.to_batch_prost_body(),
children: vec![],
diff --git a/src/jni_core/Cargo.toml b/src/jni_core/Cargo.toml
index 69c11a7f21e24..77cafd155000d 100644
--- a/src/jni_core/Cargo.toml
+++ b/src/jni_core/Cargo.toml
@@ -10,6 +10,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]
[dependencies]
+anyhow = "1"
bytes = "1"
cfg-or-panic = "0.2"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs
index 29bbf76929b45..4815cd7368370 100644
--- a/src/jni_core/src/lib.rs
+++ b/src/jni_core/src/lib.rs
@@ -902,14 +902,17 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendSinkWriterRe
'a,
>(
env: EnvParam<'a>,
- channel: Pointer<'a, Sender>,
+ channel: Pointer<'a, Sender>>,
msg: JByteArray<'a>,
) -> jboolean {
execute_and_catch(env, move |env| {
let sink_writer_stream_response: SinkWriterStreamResponse =
Message::decode(to_guarded_slice(&msg, env)?.deref())?;
- match channel.as_ref().blocking_send(sink_writer_stream_response) {
+ match channel
+ .as_ref()
+ .blocking_send(Ok(sink_writer_stream_response))
+ {
Ok(_) => Ok(JNI_TRUE),
Err(e) => {
tracing::info!("send error. {:?}", e);
diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml
index 67e9a95026cc7..3e96dfcc7be2f 100644
--- a/src/meta/Cargo.toml
+++ b/src/meta/Cargo.toml
@@ -64,13 +64,6 @@ sea-orm = { version = "0.12.0", features = [
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
-sqlx = { version = "0.7", features = [
- "runtime-tokio",
- "postgres",
- "mysql",
- "sqlite",
- "chrono",
-] }
sync-point = { path = "../utils/sync-point" }
thiserror = "1"
tokio = { version = "0.2", package = "madsim-tokio", features = [
diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs
index 55c7b27b0c80a..bf1bddad2070f 100644
--- a/src/meta/node/src/lib.rs
+++ b/src/meta/node/src/lib.rs
@@ -14,7 +14,7 @@
#![feature(lint_reasons)]
#![feature(let_chains)]
-#![cfg_attr(coverage, feature(no_coverage))]
+#![cfg_attr(coverage, feature(coverage_attribute))]
mod server;
use std::time::Duration;
diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs
index 935d398aeacb0..2fa5f50e15666 100644
--- a/src/meta/service/src/ddl_service.rs
+++ b/src/meta/service/src/ddl_service.rs
@@ -717,7 +717,7 @@ impl DdlService for DdlServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn get_tables(
&self,
request: Request,
diff --git a/src/meta/service/src/heartbeat_service.rs b/src/meta/service/src/heartbeat_service.rs
index 7c51b39346894..e31058ff2bdc5 100644
--- a/src/meta/service/src/heartbeat_service.rs
+++ b/src/meta/service/src/heartbeat_service.rs
@@ -32,7 +32,7 @@ impl HeartbeatServiceImpl {
#[async_trait::async_trait]
impl HeartbeatService for HeartbeatServiceImpl {
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn heartbeat(
&self,
request: Request,
diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs
index 0d473a6ed031f..6c8cc11f8971c 100644
--- a/src/meta/service/src/lib.rs
+++ b/src/meta/service/src/lib.rs
@@ -16,7 +16,7 @@
#![feature(let_chains)]
#![feature(lazy_cell)]
#![feature(impl_trait_in_assoc_type)]
-#![cfg_attr(coverage, feature(no_coverage))]
+#![cfg_attr(coverage, feature(coverage_attribute))]
use risingwave_meta::*;
diff --git a/src/meta/service/src/meta_member_service.rs b/src/meta/service/src/meta_member_service.rs
index 25c4c7ad4cc84..5753061176e8c 100644
--- a/src/meta/service/src/meta_member_service.rs
+++ b/src/meta/service/src/meta_member_service.rs
@@ -36,7 +36,7 @@ impl MetaMemberServiceImpl {
#[async_trait::async_trait]
impl MetaMemberService for MetaMemberServiceImpl {
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn members(
&self,
_request: Request,
diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs
index bd247c1e18980..0fcc470a70e39 100644
--- a/src/meta/service/src/notification_service.rs
+++ b/src/meta/service/src/notification_service.rs
@@ -207,7 +207,7 @@ impl NotificationServiceImpl {
impl NotificationService for NotificationServiceImpl {
type SubscribeStream = UnboundedReceiverStream;
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn subscribe(
&self,
request: Request,
diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs
index f231ea5f4955d..676180adc7581 100644
--- a/src/meta/service/src/scale_service.rs
+++ b/src/meta/service/src/scale_service.rs
@@ -59,7 +59,7 @@ impl ScaleServiceImpl {
#[async_trait::async_trait]
impl ScaleService for ScaleServiceImpl {
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn get_cluster_info(
&self,
_: Request,
@@ -110,7 +110,7 @@ impl ScaleService for ScaleServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn reschedule(
&self,
request: Request,
@@ -174,7 +174,7 @@ impl ScaleService for ScaleServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn get_reschedule_plan(
&self,
request: Request,
diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs
index ef232d9b04ffd..92af1d4beb707 100644
--- a/src/meta/service/src/stream_service.rs
+++ b/src/meta/service/src/stream_service.rs
@@ -59,7 +59,7 @@ impl StreamServiceImpl {
#[async_trait::async_trait]
impl StreamManagerService for StreamServiceImpl {
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn flush(&self, request: Request) -> TonicResponse {
self.env.idle_manager().record_activity();
let req = request.into_inner();
@@ -71,7 +71,7 @@ impl StreamManagerService for StreamServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn pause(&self, _: Request) -> Result, Status> {
let i = self
.barrier_scheduler
@@ -83,7 +83,7 @@ impl StreamManagerService for StreamServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn resume(&self, _: Request) -> Result, Status> {
let i = self
.barrier_scheduler
@@ -122,7 +122,7 @@ impl StreamManagerService for StreamServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn list_table_fragments(
&self,
request: Request,
@@ -165,7 +165,7 @@ impl StreamManagerService for StreamServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn list_table_fragment_states(
&self,
_request: Request,
@@ -186,7 +186,7 @@ impl StreamManagerService for StreamServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn list_fragment_distribution(
&self,
_request: Request,
@@ -215,7 +215,7 @@ impl StreamManagerService for StreamServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn list_actor_states(
&self,
_request: Request,
diff --git a/src/meta/service/src/user_service.rs b/src/meta/service/src/user_service.rs
index 8c982521b112a..cb290766e6fd1 100644
--- a/src/meta/service/src/user_service.rs
+++ b/src/meta/service/src/user_service.rs
@@ -107,7 +107,7 @@ impl UserServiceImpl {
#[async_trait::async_trait]
impl UserService for UserServiceImpl {
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn create_user(
&self,
request: Request,
@@ -128,7 +128,7 @@ impl UserService for UserServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn drop_user(
&self,
request: Request,
@@ -142,7 +142,7 @@ impl UserService for UserServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn update_user(
&self,
request: Request,
@@ -165,7 +165,7 @@ impl UserService for UserServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn grant_privilege(
&self,
request: Request,
@@ -185,7 +185,7 @@ impl UserService for UserServiceImpl {
}))
}
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
async fn revoke_privilege(
&self,
request: Request,
diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs
index ed6ad289a5a68..d39dde51399d8 100644
--- a/src/meta/src/barrier/mod.rs
+++ b/src/meta/src/barrier/mod.rs
@@ -626,7 +626,7 @@ impl GlobalBarrierManager {
let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);
- self.recovery(prev_epoch, paused_reason, true)
+ self.recovery(prev_epoch, paused_reason)
.instrument(span)
.await
};
@@ -981,10 +981,7 @@ impl GlobalBarrierManager {
// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
- *state = self
- .recovery(prev_epoch, None, false)
- .instrument(span)
- .await;
+ *state = self.recovery(prev_epoch, None).instrument(span).await;
self.set_status(BarrierManagerStatus::Running).await;
} else {
panic!("failed to execute barrier: {:?}", err);
diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs
index 21197a8df98d4..3e319f0e69a52 100644
--- a/src/meta/src/barrier/recovery.rs
+++ b/src/meta/src/barrier/recovery.rs
@@ -219,7 +219,6 @@ impl GlobalBarrierManager {
&self,
prev_epoch: TracedEpoch,
paused_reason: Option,
- bootstrap_recovery: bool,
) -> BarrierManagerState {
// Mark blocked and abort buffered schedules, they might be dirty already.
self.scheduled_barriers
@@ -227,11 +226,9 @@ impl GlobalBarrierManager {
.await;
tracing::info!("recovery start!");
- if bootstrap_recovery {
- self.clean_dirty_tables()
- .await
- .expect("clean dirty tables should not fail");
- }
+ self.clean_dirty_tables()
+ .await
+ .expect("clean dirty tables should not fail");
self.clean_dirty_fragments()
.await
.expect("clean dirty fragments");
diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs
index c0cfcf3baba59..cb37307384aa2 100644
--- a/src/meta/src/controller/catalog.rs
+++ b/src/meta/src/controller/catalog.rs
@@ -15,11 +15,12 @@
use std::iter;
use itertools::Itertools;
+use risingwave_common::bail;
use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_pb::catalog::{
PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};
-use risingwave_pb::meta::relation::{PbRelationInfo, RelationInfo};
+use risingwave_pb::meta::relation::PbRelationInfo;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
@@ -30,19 +31,21 @@ use sea_orm::{
};
use tokio::sync::RwLock;
+use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs};
use crate::controller::utils::{
check_connection_name_duplicate, check_function_signature_duplicate,
check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id,
- ensure_object_not_refer, ensure_schema_empty, ensure_user_id, list_used_by, PartialObject,
+ ensure_object_not_refer, ensure_schema_empty, ensure_user_id, get_referring_objects,
+ get_referring_objects_cascade, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{MetaSrvEnv, NotificationVersion};
-use crate::model_v2::connection::PrivateLinkService;
use crate::model_v2::object::ObjectType;
use crate::model_v2::prelude::*;
use crate::model_v2::{
- connection, database, function, index, object, object_dependency, schema, table, view,
- ConnectionId, DatabaseId, FunctionId, ObjectId, SchemaId, SourceId, TableId, UserId,
+ connection, database, function, index, object, object_dependency, schema, sink, source, table,
+ view, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService, SchemaId, SourceId,
+ TableId, UserId,
};
use crate::rpc::ddl_controller::DropMode;
use crate::{MetaError, MetaResult};
@@ -503,7 +506,7 @@ impl CatalogController {
assert_eq!(obj.obj_type, object_type);
let mut to_drop_objects = match drop_mode {
- DropMode::Cascade => list_used_by(object_id, &txn).await?,
+ DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
DropMode::Restrict => {
ensure_object_not_refer(object_type, object_id, &txn).await?;
vec![]
@@ -589,7 +592,7 @@ impl CatalogController {
.into_iter()
.map(|obj| match obj.obj_type {
ObjectType::Table => PbRelation {
- relation_info: Some(RelationInfo::Table(PbTable {
+ relation_info: Some(PbRelationInfo::Table(PbTable {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
@@ -597,7 +600,7 @@ impl CatalogController {
})),
},
ObjectType::Source => PbRelation {
- relation_info: Some(RelationInfo::Source(PbSource {
+ relation_info: Some(PbRelationInfo::Source(PbSource {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
@@ -605,7 +608,7 @@ impl CatalogController {
})),
},
ObjectType::Sink => PbRelation {
- relation_info: Some(RelationInfo::Sink(PbSink {
+ relation_info: Some(PbRelationInfo::Sink(PbSink {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
@@ -613,7 +616,7 @@ impl CatalogController {
})),
},
ObjectType::View => PbRelation {
- relation_info: Some(RelationInfo::View(PbView {
+ relation_info: Some(PbRelationInfo::View(PbView {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
@@ -621,7 +624,7 @@ impl CatalogController {
})),
},
ObjectType::Index => PbRelation {
- relation_info: Some(RelationInfo::Index(PbIndex {
+ relation_info: Some(PbRelationInfo::Index(PbIndex {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
@@ -647,6 +650,142 @@ impl CatalogController {
version,
))
}
+
+ pub async fn alter_relation_name(
+ &self,
+ object_type: ObjectType,
+ object_id: ObjectId,
+ object_name: &str,
+ ) -> MetaResult {
+ let inner = self.inner.write().await;
+ let txn = inner.db.begin().await?;
+ let obj: PartialObject = Object::find_by_id(object_id)
+ .into_partial_model()
+ .one(&txn)
+ .await?
+ .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
+ assert_eq!(obj.obj_type, object_type);
+ check_relation_name_duplicate(
+ object_name,
+ obj.database_id.unwrap(),
+ obj.schema_id.unwrap(),
+ &txn,
+ )
+ .await?;
+
+ let mut to_update_relations = vec![];
+ // rename relation.
+ macro_rules! rename_relation {
+ ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
+ let (mut relation, obj) = $entity::find_by_id($object_id)
+ .find_also_related(Object)
+ .one(&txn)
+ .await?
+ .unwrap();
+ let old_name = relation.name.clone();
+ relation.name = object_name.into();
+ relation.definition = alter_relation_rename(&relation.definition, object_name);
+ let active_model = $table::ActiveModel {
+ $identity: ActiveValue::Set(relation.$identity),
+ name: ActiveValue::Set(object_name.into()),
+ definition: ActiveValue::Set(relation.definition.clone()),
+ ..Default::default()
+ };
+ active_model.update(&txn).await?;
+ to_update_relations.push(PbRelation {
+ relation_info: Some(PbRelationInfo::$entity(
+ ObjectModel(relation, obj.unwrap()).into(),
+ )),
+ });
+ old_name
+ }};
+ }
+
+ let old_name = match object_type {
+ ObjectType::Table => rename_relation!(Table, table, table_id, object_id),
+ ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
+ ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
+ ObjectType::View => rename_relation!(View, view, view_id, object_id),
+ ObjectType::Index => {
+ let (mut index, obj) = Index::find_by_id(object_id)
+ .find_also_related(Object)
+ .one(&txn)
+ .await?
+ .unwrap();
+ index.name = object_name.into();
+ let index_table_id = index.index_table_id;
+
+ // the name of index and its associated table is the same.
+ let active_model = index::ActiveModel {
+ index_id: ActiveValue::Set(index.index_id),
+ name: ActiveValue::Set(object_name.into()),
+ ..Default::default()
+ };
+ active_model.update(&txn).await?;
+ to_update_relations.push(PbRelation {
+ relation_info: Some(PbRelationInfo::Index(
+ ObjectModel(index, obj.unwrap()).into(),
+ )),
+ });
+ rename_relation!(Table, table, table_id, index_table_id)
+ }
+ _ => unreachable!("only relation name can be altered."),
+ };
+
+ // rename referring relation name.
+ macro_rules! rename_relation_ref {
+ ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
+ let (mut relation, obj) = $entity::find_by_id($object_id)
+ .find_also_related(Object)
+ .one(&txn)
+ .await?
+ .unwrap();
+ relation.definition =
+ alter_relation_rename_refs(&relation.definition, &old_name, object_name);
+ let active_model = $table::ActiveModel {
+ $identity: ActiveValue::Set(relation.$identity),
+ definition: ActiveValue::Set(relation.definition.clone()),
+ ..Default::default()
+ };
+ active_model.update(&txn).await?;
+ to_update_relations.push(PbRelation {
+ relation_info: Some(PbRelationInfo::$entity(
+ ObjectModel(relation, obj.unwrap()).into(),
+ )),
+ });
+ }};
+ }
+ let objs = get_referring_objects(object_id, &txn).await?;
+ for obj in objs {
+ match obj.obj_type {
+ ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
+ ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
+ ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
+ ObjectType::Index => {
+ let index_table_id: Option = Index::find_by_id(obj.oid)
+ .select_only()
+ .column(index::Column::IndexTableId)
+ .into_tuple()
+ .one(&txn)
+ .await?;
+ rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
+ }
+ _ => bail!("only table, sink, view and index depend on other objects."),
+ }
+ }
+ txn.commit().await?;
+
+ let version = self
+ .notify_frontend(
+ NotificationOperation::Update,
+ NotificationInfo::RelationGroup(PbRelationGroup {
+ relations: to_update_relations,
+ }),
+ )
+ .await;
+
+ Ok(version)
+ }
}
#[cfg(test)]
diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs
index 5b0ff4ab99bef..07793e30a17fe 100644
--- a/src/meta/src/controller/mod.rs
+++ b/src/meta/src/controller/mod.rs
@@ -13,16 +13,23 @@
// limitations under the License.
use anyhow::anyhow;
+use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
-use risingwave_pb::catalog::{PbConnection, PbDatabase, PbSchema};
+use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
+use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableType};
+use risingwave_pb::catalog::{
+ PbConnection, PbCreateType, PbDatabase, PbHandleConflictBehavior, PbIndex, PbSchema, PbSink,
+ PbSinkType, PbSource, PbStreamJobStatus, PbTable, PbView,
+};
use sea_orm::{ActiveValue, DatabaseConnection, ModelTrait};
-use crate::model_v2::{connection, database, object, schema};
+use crate::model_v2::{connection, database, index, object, schema, sink, source, table, view};
use crate::MetaError;
#[allow(dead_code)]
pub mod catalog;
pub mod cluster;
+pub mod rename;
pub mod system_param;
pub mod utils;
@@ -61,9 +68,9 @@ pub struct ObjectModel(M, object::Model);
impl From> for PbDatabase {
fn from(value: ObjectModel) -> Self {
Self {
- id: value.0.database_id as _,
+ id: value.0.database_id,
name: value.0.name,
- owner: value.1.owner_id as _,
+ owner: value.1.owner_id,
}
}
}
@@ -71,7 +78,7 @@ impl From> for PbDatabase {
impl From for database::ActiveModel {
fn from(db: PbDatabase) -> Self {
Self {
- database_id: ActiveValue::Set(db.id as _),
+ database_id: ActiveValue::Set(db.id),
name: ActiveValue::Set(db.name),
}
}
@@ -80,7 +87,7 @@ impl From for database::ActiveModel {
impl From for schema::ActiveModel {
fn from(schema: PbSchema) -> Self {
Self {
- schema_id: ActiveValue::Set(schema.id as _),
+ schema_id: ActiveValue::Set(schema.id),
name: ActiveValue::Set(schema.name),
}
}
@@ -89,10 +96,159 @@ impl From for schema::ActiveModel {
impl From> for PbSchema {
fn from(value: ObjectModel) -> Self {
Self {
- id: value.0.schema_id as _,
+ id: value.0.schema_id,
name: value.0.name,
- database_id: value.1.database_id.unwrap() as _,
- owner: value.1.owner_id as _,
+ database_id: value.1.database_id.unwrap(),
+ owner: value.1.owner_id,
+ }
+ }
+}
+
+impl From> for PbTable {
+ fn from(value: ObjectModel) -> Self {
+ Self {
+ id: value.0.table_id,
+ schema_id: value.1.schema_id.unwrap(),
+ database_id: value.1.database_id.unwrap(),
+ name: value.0.name,
+ columns: value.0.columns.0,
+ pk: value.0.pk.0,
+ dependent_relations: vec![], // todo: deprecate it.
+ table_type: PbTableType::from(value.0.table_type) as _,
+ distribution_key: value.0.distribution_key.0,
+ stream_key: value.0.stream_key.0,
+ append_only: value.0.append_only,
+ owner: value.1.owner_id,
+ properties: value.0.properties.0,
+ fragment_id: value.0.fragment_id as u32,
+ vnode_col_index: value.0.vnode_col_index,
+ row_id_index: value.0.row_id_index,
+ value_indices: value.0.value_indices.0,
+ definition: value.0.definition,
+ handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
+ value.0.handle_pk_conflict_behavior,
+ ) as _,
+ read_prefix_len_hint: value.0.read_prefix_len_hint,
+ watermark_indices: value.0.watermark_indices.0,
+ dist_key_in_pk: value.0.dist_key_in_pk.0,
+ dml_fragment_id: value.0.dml_fragment_id.map(|id| id as u32),
+ cardinality: value.0.cardinality.map(|cardinality| cardinality.0),
+ initialized_at_epoch: Some(
+ Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
+ ),
+ created_at_epoch: Some(
+ Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
+ ),
+ cleaned_by_watermark: value.0.cleaned_by_watermark,
+ stream_job_status: PbStreamJobStatus::from(value.0.job_status) as _,
+ create_type: PbCreateType::from(value.0.create_type) as _,
+ version: Some(value.0.version.0),
+ optional_associated_source_id: value
+ .0
+ .optional_associated_source_id
+ .map(PbOptionalAssociatedSourceId::AssociatedSourceId),
+ }
+ }
+}
+
+impl From> for PbSource {
+ fn from(value: ObjectModel) -> Self {
+ Self {
+ id: value.0.source_id,
+ schema_id: value.1.schema_id.unwrap(),
+ database_id: value.1.database_id.unwrap(),
+ name: value.0.name,
+ row_id_index: value.0.row_id_index,
+ columns: value.0.columns.0,
+ pk_column_ids: value.0.pk_column_ids.0,
+ properties: value.0.properties.0,
+ owner: value.1.owner_id,
+ info: value.0.source_info.map(|info| info.0),
+ watermark_descs: value.0.watermark_descs.0,
+ definition: value.0.definition,
+ connection_id: value.0.connection_id,
+ // todo: using the timestamp from the database directly.
+ initialized_at_epoch: Some(
+ Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
+ ),
+ created_at_epoch: Some(
+ Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
+ ),
+ version: value.0.version,
+ optional_associated_table_id: value
+ .0
+ .optional_associated_table_id
+ .map(PbOptionalAssociatedTableId::AssociatedTableId),
+ }
+ }
+}
+
+impl From> for PbSink {
+ fn from(value: ObjectModel) -> Self {
+ Self {
+ id: value.0.sink_id,
+ schema_id: value.1.schema_id.unwrap(),
+ database_id: value.1.database_id.unwrap(),
+ name: value.0.name,
+ columns: value.0.columns.0,
+ plan_pk: value.0.plan_pk.0,
+ dependent_relations: vec![], // todo: deprecate it.
+ distribution_key: value.0.distribution_key.0,
+ downstream_pk: value.0.downstream_pk.0,
+ sink_type: PbSinkType::from(value.0.sink_type) as _,
+ owner: value.1.owner_id,
+ properties: value.0.properties.0,
+ definition: value.0.definition,
+ connection_id: value.0.connection_id,
+ initialized_at_epoch: Some(
+ Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
+ ),
+ created_at_epoch: Some(
+ Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
+ ),
+ db_name: value.0.db_name,
+ sink_from_name: value.0.sink_from_name,
+ stream_job_status: PbStreamJobStatus::from(value.0.job_status) as _,
+ format_desc: value.0.sink_format_desc.map(|desc| desc.0),
+ }
+ }
+}
+
+impl From> for PbIndex {
+ fn from(value: ObjectModel) -> Self {
+ Self {
+ id: value.0.index_id,
+ schema_id: value.1.schema_id.unwrap(),
+ database_id: value.1.database_id.unwrap(),
+ name: value.0.name,
+ owner: value.1.owner_id,
+ index_table_id: value.0.index_table_id,
+ primary_table_id: value.0.primary_table_id,
+ index_item: value.0.index_items.0,
+ original_columns: value.0.original_columns.0,
+ initialized_at_epoch: Some(
+ Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
+ ),
+ created_at_epoch: Some(
+ Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0,
+ ),
+ stream_job_status: PbStreamJobStatus::from(value.0.job_status) as _,
+ }
+ }
+}
+
+impl From> for PbView {
+ fn from(value: ObjectModel) -> Self {
+ Self {
+ id: value.0.view_id,
+ schema_id: value.1.schema_id.unwrap(),
+ database_id: value.1.database_id.unwrap(),
+ name: value.0.name,
+ owner: value.1.owner_id,
+ properties: value.0.properties.0,
+ sql: value.0.definition,
+ dependent_relations: vec![], // todo: deprecate it.
+ columns: value.0.columns.0,
}
}
}
@@ -100,11 +256,11 @@ impl From> for PbSchema {
impl From> for PbConnection {
fn from(value: ObjectModel) -> Self {
Self {
- id: value.1.oid as _,
- schema_id: value.1.schema_id.unwrap() as _,
- database_id: value.1.database_id.unwrap() as _,
+ id: value.1.oid,
+ schema_id: value.1.schema_id.unwrap(),
+ database_id: value.1.database_id.unwrap(),
name: value.0.name,
- owner: value.1.owner_id as _,
+ owner: value.1.owner_id,
info: Some(PbConnectionInfo::PrivateLinkService(value.0.info.0)),
}
}
diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs
new file mode 100644
index 0000000000000..254565efb391c
--- /dev/null
+++ b/src/meta/src/controller/rename.rs
@@ -0,0 +1,430 @@
+// Copyright 2023 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use itertools::Itertools;
+use risingwave_common::util::column_index_mapping::ColIndexMapping;
+use risingwave_pb::expr::expr_node::RexNode;
+use risingwave_pb::expr::{ExprNode, FunctionCall, UserDefinedFunction};
+use risingwave_sqlparser::ast::{
+ Array, CreateSink, CreateSinkStatement, CreateSourceStatement, Distinct, Expr, Function,
+ FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem, SetExpr, Statement,
+ TableAlias, TableFactor, TableWithJoins,
+};
+use risingwave_sqlparser::parser::Parser;
+
+/// `alter_relation_rename` renames a relation to a new name in its `Create` statement, and returns
+/// the updated definition raw sql. Note that the `definition` must be a `Create` statement and the
+/// `new_name` must be a valid identifier, it should be validated before calling this function. To
+/// update all relations that depend on the renamed one, use `alter_relation_rename_refs`.
+pub fn alter_relation_rename(definition: &str, new_name: &str) -> String {
+ // This happens when we try to rename a table that's created by `CREATE TABLE AS`. Remove it
+ // when we support `SHOW CREATE TABLE` for `CREATE TABLE AS`.
+ if definition.is_empty() {
+ tracing::warn!("found empty definition when renaming relation, ignored.");
+ return definition.into();
+ }
+ let ast = Parser::parse_sql(definition).expect("failed to parse relation definition");
+ let mut stmt = ast
+ .into_iter()
+ .exactly_one()
+ .expect("should contains only one statement");
+
+ match &mut stmt {
+ Statement::CreateTable { name, .. }
+ | Statement::CreateView { name, .. }
+ | Statement::CreateIndex { name, .. }
+ | Statement::CreateSource {
+ stmt: CreateSourceStatement {
+ source_name: name, ..
+ },
+ }
+ | Statement::CreateSink {
+ stmt: CreateSinkStatement {
+ sink_name: name, ..
+ },
+ } => replace_table_name(name, new_name),
+ _ => unreachable!(),
+ };
+
+ stmt.to_string()
+}
+
+/// `alter_relation_rename_refs` updates all references of renamed-relation in the definition of
+/// target relation's `Create` statement.
+pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> String {
+ let ast = Parser::parse_sql(definition).expect("failed to parse relation definition");
+ let mut stmt = ast
+ .into_iter()
+ .exactly_one()
+ .expect("should contains only one statement");
+
+ match &mut stmt {
+ Statement::CreateTable {
+ query: Some(query), ..
+ }
+ | Statement::CreateView { query, .. }
+ | Statement::Query(query) // Used by view, actually we store a query as the definition of view.
+ | Statement::CreateSink {
+ stmt:
+ CreateSinkStatement {
+ sink_from: CreateSink::AsQuery(query),
+ ..
+ },
+ } => {
+ QueryRewriter::rewrite_query(query, from, to);
+ }
+ Statement::CreateIndex { table_name, .. }
+ | Statement::CreateSink {
+ stmt:
+ CreateSinkStatement {
+ sink_from: CreateSink::From(table_name),
+ ..
+ },
+ } => replace_table_name(table_name, to),
+ _ => unreachable!(),
+ };
+ stmt.to_string()
+}
+
+/// Replace the last ident in the `table_name` with the given name, the object name is ensured to be
+/// non-empty. e.g. `schema.table` or `database.schema.table`.
+fn replace_table_name(table_name: &mut ObjectName, to: &str) {
+ let idx = table_name.0.len() - 1;
+ table_name.0[idx] = Ident::new_unchecked(to);
+}
+
+/// `QueryRewriter` is a visitor that updates all references of relation named `from` to `to` in the
+/// given query, which is the part of create statement of `relation`.
+struct QueryRewriter<'a> {
+ from: &'a str,
+ to: &'a str,
+}
+
+impl QueryRewriter<'_> {
+ fn rewrite_query(query: &mut Query, from: &str, to: &str) {
+ let rewriter = QueryRewriter { from, to };
+ rewriter.visit_query(query)
+ }
+
+ /// Visit the query and update all references of relation named `from` to `to`.
+ fn visit_query(&self, query: &mut Query) {
+ if let Some(with) = &mut query.with {
+ for cte_table in &mut with.cte_tables {
+ self.visit_query(&mut cte_table.query);
+ }
+ }
+ self.visit_set_expr(&mut query.body);
+ for expr in &mut query.order_by {
+ self.visit_expr(&mut expr.expr);
+ }
+ }
+
+ /// Visit table factor and update all references of relation named `from` to `to`.
+ /// Rewrite idents(i.e. `schema.table`, `table`) that contains the old name in the
+ /// following pattern:
+ /// 1. `FROM a` to `FROM new_a AS a`
+ /// 2. `FROM a AS b` to `FROM new_a AS b`
+ ///
+ /// So that we DON'T have to:
+ /// 1. rewrite the select and expr part like `schema.table.column`, `table.column`,
+ /// `alias.column` etc.
+ /// 2. handle the case that the old name is used as alias.
+ /// 3. handle the case that the new name is used as alias.
+ fn visit_table_factor(&self, table_factor: &mut TableFactor) {
+ match table_factor {
+ TableFactor::Table { name, alias, .. } => {
+ let idx = name.0.len() - 1;
+ if name.0[idx].real_value() == self.from {
+ if alias.is_none() {
+ *alias = Some(TableAlias {
+ name: Ident::new_unchecked(self.from),
+ columns: vec![],
+ });
+ }
+ name.0[idx] = Ident::new_unchecked(self.to);
+ }
+ }
+ TableFactor::Derived { subquery, .. } => self.visit_query(subquery),
+ TableFactor::TableFunction { args, .. } => {
+ for arg in args {
+ self.visit_function_args(arg);
+ }
+ }
+ TableFactor::NestedJoin(table_with_joins) => {
+ self.visit_table_with_joins(table_with_joins);
+ }
+ }
+ }
+
+ /// Visit table with joins and update all references of relation named `from` to `to`.
+ fn visit_table_with_joins(&self, table_with_joins: &mut TableWithJoins) {
+ self.visit_table_factor(&mut table_with_joins.relation);
+ for join in &mut table_with_joins.joins {
+ self.visit_table_factor(&mut join.relation);
+ }
+ }
+
+ /// Visit query body expression and update all references.
+ fn visit_set_expr(&self, set_expr: &mut SetExpr) {
+ match set_expr {
+ SetExpr::Select(select) => {
+ if let Distinct::DistinctOn(exprs) = &mut select.distinct {
+ for expr in exprs {
+ self.visit_expr(expr);
+ }
+ }
+ for select_item in &mut select.projection {
+ self.visit_select_item(select_item);
+ }
+ for from_item in &mut select.from {
+ self.visit_table_with_joins(from_item);
+ }
+ if let Some(where_clause) = &mut select.selection {
+ self.visit_expr(where_clause);
+ }
+ for expr in &mut select.group_by {
+ self.visit_expr(expr);
+ }
+ if let Some(having) = &mut select.having {
+ self.visit_expr(having);
+ }
+ }
+ SetExpr::Query(query) => self.visit_query(query),
+ SetExpr::SetOperation { left, right, .. } => {
+ self.visit_set_expr(left);
+ self.visit_set_expr(right);
+ }
+ SetExpr::Values(_) => {}
+ }
+ }
+
+ /// Visit function arguments and update all references.
+ fn visit_function_args(&self, function_args: &mut FunctionArg) {
+ match function_args {
+ FunctionArg::Unnamed(arg) | FunctionArg::Named { arg, .. } => match arg {
+ FunctionArgExpr::Expr(expr) | FunctionArgExpr::ExprQualifiedWildcard(expr, _) => {
+ self.visit_expr(expr)
+ }
+ FunctionArgExpr::QualifiedWildcard(_, None) | FunctionArgExpr::Wildcard(None) => {}
+ FunctionArgExpr::QualifiedWildcard(_, Some(exprs))
+ | FunctionArgExpr::Wildcard(Some(exprs)) => {
+ for expr in exprs {
+ self.visit_expr(expr);
+ }
+ }
+ },
+ }
+ }
+
+ /// Visit function and update all references.
+ fn visit_function(&self, function: &mut Function) {
+ for arg in &mut function.args {
+ self.visit_function_args(arg);
+ }
+ }
+
+ /// Visit expression and update all references.
+ fn visit_expr(&self, expr: &mut Expr) {
+ match expr {
+ Expr::FieldIdentifier(expr, ..)
+ | Expr::IsNull(expr)
+ | Expr::IsNotNull(expr)
+ | Expr::IsTrue(expr)
+ | Expr::IsNotTrue(expr)
+ | Expr::IsFalse(expr)
+ | Expr::IsNotFalse(expr)
+ | Expr::IsUnknown(expr)
+ | Expr::IsNotUnknown(expr)
+ | Expr::IsJson { expr, .. }
+ | Expr::InList { expr, .. }
+ | Expr::SomeOp(expr)
+ | Expr::AllOp(expr)
+ | Expr::UnaryOp { expr, .. }
+ | Expr::Cast { expr, .. }
+ | Expr::TryCast { expr, .. }
+ | Expr::AtTimeZone {
+ timestamp: expr, ..
+ }
+ | Expr::Extract { expr, .. }
+ | Expr::Substring { expr, .. }
+ | Expr::Overlay { expr, .. }
+ | Expr::Trim { expr, .. }
+ | Expr::Nested(expr)
+ | Expr::ArrayIndex { obj: expr, .. }
+ | Expr::ArrayRangeIndex { obj: expr, .. } => self.visit_expr(expr),
+
+ Expr::Position { substring, string } => {
+ self.visit_expr(substring);
+ self.visit_expr(string);
+ }
+
+ Expr::InSubquery { expr, subquery, .. } => {
+ self.visit_expr(expr);
+ self.visit_query(subquery);
+ }
+ Expr::Between {
+ expr, low, high, ..
+ } => {
+ self.visit_expr(expr);
+ self.visit_expr(low);
+ self.visit_expr(high);
+ }
+
+ Expr::IsDistinctFrom(expr1, expr2)
+ | Expr::IsNotDistinctFrom(expr1, expr2)
+ | Expr::BinaryOp {
+ left: expr1,
+ right: expr2,
+ ..
+ } => {
+ self.visit_expr(expr1);
+ self.visit_expr(expr2);
+ }
+ Expr::Function(function) => self.visit_function(function),
+ Expr::Exists(query) | Expr::Subquery(query) | Expr::ArraySubquery(query) => {
+ self.visit_query(query)
+ }
+
+ Expr::GroupingSets(exprs_vec) | Expr::Cube(exprs_vec) | Expr::Rollup(exprs_vec) => {
+ for exprs in exprs_vec {
+ for expr in exprs {
+ self.visit_expr(expr);
+ }
+ }
+ }
+
+ Expr::Row(exprs) | Expr::Array(Array { elem: exprs, .. }) => {
+ for expr in exprs {
+ self.visit_expr(expr);
+ }
+ }
+
+ Expr::LambdaFunction { body, args: _ } => self.visit_expr(body),
+
+ // No need to visit.
+ Expr::Identifier(_)
+ | Expr::CompoundIdentifier(_)
+ | Expr::Collate { .. }
+ | Expr::Value(_)
+ | Expr::Parameter { .. }
+ | Expr::TypedString { .. }
+ | Expr::Case { .. } => {}
+ }
+ }
+
+ /// Visit select item and update all references.
+ fn visit_select_item(&self, select_item: &mut SelectItem) {
+ match select_item {
+ SelectItem::UnnamedExpr(expr)
+ | SelectItem::ExprQualifiedWildcard(expr, _)
+ | SelectItem::ExprWithAlias { expr, .. } => self.visit_expr(expr),
+ SelectItem::QualifiedWildcard(_, None) | SelectItem::Wildcard(None) => {}
+ SelectItem::QualifiedWildcard(_, Some(exprs)) | SelectItem::Wildcard(Some(exprs)) => {
+ for expr in exprs {
+ self.visit_expr(expr);
+ }
+ }
+ }
+ }
+}
+
+pub struct ReplaceTableExprRewriter {
+ pub table_col_index_mapping: ColIndexMapping,
+}
+
+impl ReplaceTableExprRewriter {
+ pub fn rewrite_expr(&self, expr: &mut ExprNode) {
+ let rex_node = expr.rex_node.as_mut().unwrap();
+ match rex_node {
+ RexNode::InputRef(input_col_idx) => {
+ *input_col_idx = self.table_col_index_mapping.map(*input_col_idx as usize) as u32
+ }
+ RexNode::Constant(_) => {}
+ RexNode::Udf(udf) => self.rewrite_udf(udf),
+ RexNode::FuncCall(function_call) => self.rewrite_function_call(function_call),
+ RexNode::Now(_) => {}
+ }
+ }
+
+ fn rewrite_udf(&self, udf: &mut UserDefinedFunction) {
+ udf.children
+ .iter_mut()
+ .for_each(|expr| self.rewrite_expr(expr));
+ }
+
+ fn rewrite_function_call(&self, function_call: &mut FunctionCall) {
+ function_call
+ .children
+ .iter_mut()
+ .for_each(|expr| self.rewrite_expr(expr));
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_alter_table_rename() {
+ let definition = "CREATE TABLE foo (a int, b int)";
+ let new_name = "bar";
+ let expected = "CREATE TABLE bar (a INT, b INT)";
+ let actual = alter_relation_rename(definition, new_name);
+ assert_eq!(expected, actual);
+ }
+
+ #[test]
+ fn test_rename_index_refs() {
+ let definition = "CREATE INDEX idx1 ON foo(v1 DESC, v2)";
+ let from = "foo";
+ let to = "bar";
+ let expected = "CREATE INDEX idx1 ON bar(v1 DESC, v2)";
+ let actual = alter_relation_rename_refs(definition, from, to);
+ assert_eq!(expected, actual);
+ }
+
+ #[test]
+ fn test_rename_sink_refs() {
+ let definition =
+ "CREATE SINK sink_t FROM foo WITH (connector = 'kafka', format = 'append_only')";
+ let from = "foo";
+ let to = "bar";
+ let expected =
+ "CREATE SINK sink_t FROM bar WITH (connector = 'kafka', format = 'append_only')";
+ let actual = alter_relation_rename_refs(definition, from, to);
+ assert_eq!(expected, actual);
+ }
+
+ #[test]
+ fn test_rename_with_alias_refs() {
+ let definition =
+ "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, foo.v2 AS m2v FROM foo";
+ let from = "foo";
+ let to = "bar";
+ let expected =
+ "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, foo.v2 AS m2v FROM bar AS foo";
+ let actual = alter_relation_rename_refs(definition, from, to);
+ assert_eq!(expected, actual);
+
+ let definition = "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, (foo.v2).v3 AS m2v FROM foo WHERE foo.v1 = 1 AND (foo.v2).v3 IS TRUE";
+ let expected = "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, (foo.v2).v3 AS m2v FROM bar AS foo WHERE foo.v1 = 1 AND (foo.v2).v3 IS TRUE";
+ let actual = alter_relation_rename_refs(definition, from, to);
+ assert_eq!(expected, actual);
+
+ let definition = "CREATE MATERIALIZED VIEW mv1 AS SELECT bar.v1 AS m1v, (bar.v2).v3 AS m2v FROM foo AS bar WHERE bar.v1 = 1";
+ let expected = "CREATE MATERIALIZED VIEW mv1 AS SELECT bar.v1 AS m1v, (bar.v2).v3 AS m2v FROM bar AS bar WHERE bar.v1 = 1";
+ let actual = alter_relation_rename_refs(definition, from, to);
+ assert_eq!(expected, actual);
+ }
+}
diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs
index 964ee24ae99b5..d36918db3820d 100644
--- a/src/meta/src/controller/utils.rs
+++ b/src/meta/src/controller/utils.rs
@@ -115,8 +115,11 @@ pub struct PartialObject {
pub database_id: Option,
}
-/// List all objects that are using the given one. It runs a recursive CTE to find all the dependencies.
-pub async fn list_used_by(obj_id: ObjectId, db: &C) -> MetaResult>
+/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
+pub async fn get_referring_objects_cascade(
+ obj_id: ObjectId,
+ db: &C,
+) -> MetaResult>
where
C: ConnectionTrait,
{
@@ -318,6 +321,24 @@ where
Ok(())
}
+/// List all objects that are using the given one.
+pub async fn get_referring_objects(object_id: ObjectId, db: &C) -> MetaResult>
+where
+ C: ConnectionTrait,
+{
+ let objs = ObjectDependency::find()
+ .filter(object_dependency::Column::Oid.eq(object_id))
+ .join(
+ JoinType::InnerJoin,
+ object_dependency::Relation::Object1.def(),
+ )
+ .into_partial_model()
+ .all(db)
+ .await?;
+
+ Ok(objs)
+}
+
/// `ensure_schema_empty` ensures that the schema is empty, used by `DROP SCHEMA`.
pub async fn ensure_schema_empty(schema_id: SchemaId, db: &C) -> MetaResult<()>
where
diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs
index f8332819a4610..03323d53fa0af 100644
--- a/src/meta/src/error.rs
+++ b/src/meta/src/error.rs
@@ -20,7 +20,6 @@ use risingwave_common::error::BoxedError;
use risingwave_connector::sink::SinkError;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::RpcError;
-use sqlx::Error;
use crate::hummock::error::Error as HummockError;
use crate::manager::WorkerId;
@@ -181,12 +180,6 @@ impl From for MetaError {
}
}
-impl From for MetaError {
- fn from(value: Error) -> Self {
- MetaErrorInner::Election(value.to_string()).into()
- }
-}
-
impl From for MetaError {
fn from(e: RpcError) -> Self {
MetaErrorInner::RpcError(e).into()
diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs
index 2b0c3e3db87dc..1b3a284e9ccc9 100644
--- a/src/meta/src/hummock/manager/mod.rs
+++ b/src/meta/src/hummock/manager/mod.rs
@@ -1761,7 +1761,7 @@ impl HummockManager {
}
/// Get version deltas from meta store
- #[cfg_attr(coverage, no_coverage)]
+ #[cfg_attr(coverage, coverage(off))]
#[named]
pub async fn list_version_deltas(
&self,
diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs
index afe66d27ad8e8..f549578f079c6 100644
--- a/src/meta/src/lib.rs
+++ b/src/meta/src/lib.rs
@@ -26,13 +26,12 @@
#![feature(error_generic_member_access)]
#![feature(assert_matches)]
#![feature(try_blocks)]
-#![cfg_attr(coverage, feature(no_coverage))]
+#![cfg_attr(coverage, feature(coverage_attribute))]
#![feature(custom_test_frameworks)]
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]
#![feature(is_sorted)]
#![feature(impl_trait_in_assoc_type)]
#![feature(type_name_of_val)]
-#![feature(async_fn_in_trait)]
pub mod backup_restore;
pub mod barrier;
diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs
index bcac32922d180..f988646428aac 100644
--- a/src/meta/src/manager/catalog/mod.rs
+++ b/src/meta/src/manager/catalog/mod.rs
@@ -853,14 +853,18 @@ impl CatalogManager {
database_core.clear_creating_stream_jobs();
let user_core = &mut core.user;
for table in &tables_to_clean {
- // Recovered when init database manager.
- for relation_id in &table.dependent_relations {
- database_core.decrease_ref_count(*relation_id);
+ // If table type is internal, no need to update the ref count OR
+ // user ref count.
+ if table.table_type != TableType::Internal as i32 {
+ // Recovered when init database manager.
+ for relation_id in &table.dependent_relations {
+ database_core.decrease_ref_count(*relation_id);
+ }
+ // Recovered when init user manager.
+ tracing::debug!("decrease ref for {}", table.id);
+ user_core.decrease_ref(table.owner);
}
- // Recovered when init user manager.
- user_core.decrease_ref(table.owner);
}
-
Ok(())
}
@@ -919,10 +923,11 @@ impl CatalogManager {
let database_core = &mut core.database;
let tables = &mut database_core.tables;
let Some(table) = tables.get(&table_id).cloned() else {
- bail!(
- "table_id {} missing when attempting to cancel job",
+ tracing::warn!(
+ "table_id {} missing when attempting to cancel job, could be cleaned on recovery",
table_id
- )
+ );
+ return Ok(());
};
table
};
@@ -938,7 +943,8 @@ impl CatalogManager {
let tables = &mut database_core.tables;
let mut tables = BTreeMapTransaction::new(tables);
for table_id in table_ids {
- tables.remove(table_id);
+ let res = tables.remove(table_id);
+ assert!(res.is_some());
}
commit_meta!(self, tables)?;
}
@@ -2032,8 +2038,7 @@ impl CatalogManager {
let user_core = &mut core.user;
let key = (index.database_id, index.schema_id, index.name.clone());
assert!(
- !database_core.indexes.contains_key(&index.id)
- && database_core.has_in_progress_creation(&key),
+ !database_core.indexes.contains_key(&index.id),
"index must be in creating procedure"
);
@@ -2188,8 +2193,7 @@ impl CatalogManager {
let user_core = &mut core.user;
let key = (sink.database_id, sink.schema_id, sink.name.clone());
assert!(
- !database_core.sinks.contains_key(&sink.id)
- && database_core.has_in_progress_creation(&key),
+ !database_core.sinks.contains_key(&sink.id),
"sink must be in creating procedure"
);
diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs
index 7e26e32ee62eb..ea579867fc320 100644
--- a/src/meta/src/manager/catalog/utils.rs
+++ b/src/meta/src/manager/catalog/utils.rs
@@ -401,7 +401,7 @@ impl ReplaceTableExprRewriter {
#[cfg(test)]
mod tests {
- use crate::manager::catalog::utils::{alter_relation_rename, alter_relation_rename_refs};
+ use super::*;
#[test]
fn test_alter_table_rename() {
diff --git a/src/meta/src/model_v2/connection.rs b/src/meta/src/model_v2/connection.rs
index f6638ed0b53a4..0096603c843a3 100644
--- a/src/meta/src/model_v2/connection.rs
+++ b/src/meta/src/model_v2/connection.rs
@@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use risingwave_pb::catalog::connection::{PbInfo, PbPrivateLinkService};
+use risingwave_pb::catalog::connection::PbInfo;
use risingwave_pb::catalog::PbConnection;
use sea_orm::entity::prelude::*;
-use sea_orm::{ActiveValue, FromJsonQueryResult};
-use serde::{Deserialize, Serialize};
+use sea_orm::ActiveValue;
-use crate::model_v2::ConnectionId;
+use crate::model_v2::{ConnectionId, PrivateLinkService};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "connection")]
@@ -65,11 +64,6 @@ impl Related for Entity {
impl ActiveModelBehavior for ActiveModel {}
-#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
-pub struct PrivateLinkService(pub PbPrivateLinkService);
-
-impl Eq for PrivateLinkService {}
-
impl From for ActiveModel {
fn from(conn: PbConnection) -> Self {
let Some(PbInfo::PrivateLinkService(private_link_srv)) = conn.info else {
diff --git a/src/meta/src/model_v2/index.rs b/src/meta/src/model_v2/index.rs
index 6a4b7d1b349ca..3b80632e2cfc3 100644
--- a/src/meta/src/model_v2/index.rs
+++ b/src/meta/src/model_v2/index.rs
@@ -14,7 +14,7 @@
use sea_orm::entity::prelude::*;
-use crate::model_v2::{I32Array, IndexId, TableId};
+use crate::model_v2::{ExprNodeArray, I32Array, IndexId, JobStatus, TableId};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "index")]
@@ -24,8 +24,9 @@ pub struct Model {
pub name: String,
pub index_table_id: TableId,
pub primary_table_id: TableId,
- pub index_items: Option,
- pub original_columns: Option,
+ pub index_items: ExprNodeArray,
+ pub original_columns: I32Array,
+ pub job_status: JobStatus,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
diff --git a/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs
index 43a8e5d24d22f..c9559bd6feda2 100644
--- a/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs
+++ b/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs
@@ -404,15 +404,16 @@ impl MigrationTrait for Migration {
.table(Source::Table)
.col(ColumnDef::new(Source::SourceId).integer().primary_key())
.col(ColumnDef::new(Source::Name).string().not_null())
- .col(ColumnDef::new(Source::RowIdIndex).string())
- .col(ColumnDef::new(Source::Columns).json())
- .col(ColumnDef::new(Source::PkColumnIds).json())
- .col(ColumnDef::new(Source::Properties).json())
- .col(ColumnDef::new(Source::Definition).string())
+ .col(ColumnDef::new(Source::RowIdIndex).integer())
+ .col(ColumnDef::new(Source::Columns).json().not_null())
+ .col(ColumnDef::new(Source::PkColumnIds).json().not_null())
+ .col(ColumnDef::new(Source::Properties).json().not_null())
+ .col(ColumnDef::new(Source::Definition).string().not_null())
.col(ColumnDef::new(Source::SourceInfo).json())
- .col(ColumnDef::new(Source::WatermarkDescs).json())
+ .col(ColumnDef::new(Source::WatermarkDescs).json().not_null())
.col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
.col(ColumnDef::new(Source::ConnectionId).integer())
+ .col(ColumnDef::new(Source::Version).big_integer().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_source_object_id")
@@ -442,15 +443,17 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::Columns).json().not_null())
.col(ColumnDef::new(Table::Pk).json().not_null())
.col(ColumnDef::new(Table::DistributionKey).json().not_null())
+ .col(ColumnDef::new(Table::StreamKey).json().not_null())
.col(ColumnDef::new(Table::AppendOnly).boolean().not_null())
.col(ColumnDef::new(Table::Properties).json().not_null())
.col(ColumnDef::new(Table::FragmentId).integer().not_null())
.col(ColumnDef::new(Table::VnodeColIndex).integer())
+ .col(ColumnDef::new(Table::RowIdIndex).integer())
.col(ColumnDef::new(Table::ValueIndices).json().not_null())
.col(ColumnDef::new(Table::Definition).string().not_null())
.col(
ColumnDef::new(Table::HandlePkConflictBehavior)
- .integer()
+ .string()
.not_null(),
)
.col(
@@ -467,6 +470,8 @@ impl MigrationTrait for Migration {
.boolean()
.not_null(),
)
+ .col(ColumnDef::new(Table::JobStatus).string().not_null())
+ .col(ColumnDef::new(Table::CreateType).string().not_null())
.col(ColumnDef::new(Table::Version).json().not_null())
.foreign_key(
&mut ForeignKey::create()
@@ -506,16 +511,18 @@ impl MigrationTrait for Migration {
.table(Sink::Table)
.col(ColumnDef::new(Sink::SinkId).integer().primary_key())
.col(ColumnDef::new(Sink::Name).string().not_null())
- .col(ColumnDef::new(Sink::Columns).json())
- .col(ColumnDef::new(Sink::PkColumnIds).json())
- .col(ColumnDef::new(Sink::DistributionKey).json())
- .col(ColumnDef::new(Sink::DownstreamPk).json())
+ .col(ColumnDef::new(Sink::Columns).json().not_null())
+ .col(ColumnDef::new(Sink::PlanPk).json().not_null())
+ .col(ColumnDef::new(Sink::DistributionKey).json().not_null())
+ .col(ColumnDef::new(Sink::DownstreamPk).json().not_null())
.col(ColumnDef::new(Sink::SinkType).string().not_null())
- .col(ColumnDef::new(Sink::Properties).json())
+ .col(ColumnDef::new(Sink::Properties).json().not_null())
.col(ColumnDef::new(Sink::Definition).string().not_null())
.col(ColumnDef::new(Sink::ConnectionId).integer())
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
+ .col(ColumnDef::new(Sink::SinkFormatDesc).json())
+ .col(ColumnDef::new(Sink::JobStatus).string().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_sink_object_id")
@@ -541,7 +548,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(View::ViewId).integer().primary_key())
.col(ColumnDef::new(View::Name).string().not_null())
.col(ColumnDef::new(View::Properties).json().not_null())
- .col(ColumnDef::new(View::Sql).string().not_null())
+ .col(ColumnDef::new(View::Definition).string().not_null())
.col(ColumnDef::new(View::Columns).json().not_null())
.foreign_key(
&mut ForeignKey::create()
@@ -562,8 +569,9 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Index::Name).string().not_null())
.col(ColumnDef::new(Index::IndexTableId).integer().not_null())
.col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
- .col(ColumnDef::new(Index::IndexItems).json())
- .col(ColumnDef::new(Index::OriginalColumns).json())
+ .col(ColumnDef::new(Index::IndexItems).json().not_null())
+ .col(ColumnDef::new(Index::OriginalColumns).json().not_null())
+ .col(ColumnDef::new(Index::JobStatus).string().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_index_object_id")
@@ -862,10 +870,12 @@ enum Table {
Columns,
Pk,
DistributionKey,
+ StreamKey,
AppendOnly,
Properties,
FragmentId,
VnodeColIndex,
+ RowIdIndex,
ValueIndices,
Definition,
HandlePkConflictBehavior,
@@ -875,6 +885,8 @@ enum Table {
DmlFragmentId,
Cardinality,
CleanedByWatermark,
+ JobStatus,
+ CreateType,
Version,
}
@@ -892,6 +904,7 @@ enum Source {
WatermarkDescs,
OptionalAssociatedTableId,
ConnectionId,
+ Version,
}
#[derive(DeriveIden)]
@@ -900,7 +913,7 @@ enum Sink {
SinkId,
Name,
Columns,
- PkColumnIds,
+ PlanPk,
DistributionKey,
DownstreamPk,
SinkType,
@@ -909,6 +922,8 @@ enum Sink {
ConnectionId,
DbName,
SinkFromName,
+ SinkFormatDesc,
+ JobStatus,
}
#[derive(DeriveIden)]
@@ -925,7 +940,7 @@ enum View {
ViewId,
Name,
Properties,
- Sql,
+ Definition,
Columns,
}
@@ -938,6 +953,7 @@ enum Index {
PrimaryTableId,
IndexItems,
OriginalColumns,
+ JobStatus,
}
#[derive(DeriveIden)]
diff --git a/src/meta/src/model_v2/mod.rs b/src/meta/src/model_v2/mod.rs
index d799a608933ac..1c2f928063fff 100644
--- a/src/meta/src/model_v2/mod.rs
+++ b/src/meta/src/model_v2/mod.rs
@@ -14,7 +14,8 @@
use std::collections::HashMap;
-use sea_orm::FromJsonQueryResult;
+use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
+use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
pub mod prelude;
@@ -63,19 +64,73 @@ pub type FunctionId = ObjectId;
pub type ConnectionId = ObjectId;
pub type UserId = u32;
-#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Eq, Serialize, Deserialize, Default)]
-pub struct I32Array(pub Vec);
+#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
+#[sea_orm(rs_type = "String", db_type = "String(None)")]
+pub enum JobStatus {
+ #[sea_orm(string_value = "CREATING")]
+ Creating,
+ #[sea_orm(string_value = "CREATED")]
+ Created,
+}
-#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Eq, Serialize, Deserialize, Default)]
-pub struct DataType(pub risingwave_pb::data::DataType);
+impl From for PbStreamJobStatus {
+ fn from(job_status: JobStatus) -> Self {
+ match job_status {
+ JobStatus::Creating => Self::Creating,
+ JobStatus::Created => Self::Created,
+ }
+ }
+}
-#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Eq, Serialize, Deserialize, Default)]
-pub struct DataTypeArray(pub Vec);
+#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
+#[sea_orm(rs_type = "String", db_type = "String(None)")]
+pub enum CreateType {
+ #[sea_orm(string_value = "BACKGROUND")]
+ Background,
+ #[sea_orm(string_value = "FOREGROUND")]
+ Foreground,
+}
-#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
-pub struct FieldArray(pub Vec);
+impl From for PbCreateType {
+ fn from(create_type: CreateType) -> Self {
+ match create_type {
+ CreateType::Background => Self::Background,
+ CreateType::Foreground => Self::Foreground,
+ }
+ }
+}
-impl Eq for FieldArray {}
+/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
+macro_rules! derive_from_json_struct {
+ ($struct_name:ident, $field_type:ty) => {
+ #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
+ pub struct $struct_name(pub $field_type);
+ impl Eq for $struct_name {}
+ };
+}
-#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Eq, Serialize, Deserialize, Default)]
-pub struct Property(pub HashMap);
+derive_from_json_struct!(I32Array, Vec);
+derive_from_json_struct!(DataType, risingwave_pb::data::DataType);
+derive_from_json_struct!(DataTypeArray, Vec);
+derive_from_json_struct!(FieldArray, Vec);
+derive_from_json_struct!(Property, HashMap);
+derive_from_json_struct!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
+derive_from_json_struct!(
+ ColumnCatalogArray,
+ Vec
+);
+derive_from_json_struct!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
+derive_from_json_struct!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
+derive_from_json_struct!(
+ WatermarkDescArray,
+ Vec
+);
+derive_from_json_struct!(ExprNodeArray, Vec);
+derive_from_json_struct!(ColumnOrderArray, Vec);
+derive_from_json_struct!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
+derive_from_json_struct!(Cardinality, risingwave_pb::plan_common::PbCardinality);
+derive_from_json_struct!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
+derive_from_json_struct!(
+ PrivateLinkService,
+ risingwave_pb::catalog::connection::PbPrivateLinkService
+);
diff --git a/src/meta/src/model_v2/sink.rs b/src/meta/src/model_v2/sink.rs
index 8c22a04a8fd01..bef46f1d7195f 100644
--- a/src/meta/src/model_v2/sink.rs
+++ b/src/meta/src/model_v2/sink.rs
@@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use risingwave_pb::catalog::PbSinkType;
use sea_orm::entity::prelude::*;
-use crate::model_v2::{ConnectionId, I32Array, SinkId};
+use crate::model_v2::{
+ ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, JobStatus, Property,
+ SinkFormatDesc, SinkId,
+};
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
@@ -27,22 +31,34 @@ pub enum SinkType {
Upsert,
}
+impl From for PbSinkType {
+ fn from(sink_type: SinkType) -> Self {
+ match sink_type {
+ SinkType::AppendOnly => Self::AppendOnly,
+ SinkType::ForceAppendOnly => Self::ForceAppendOnly,
+ SinkType::Upsert => Self::Upsert,
+ }
+ }
+}
+
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "sink")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub sink_id: SinkId,
pub name: String,
- pub columns: Option,
- pub pk_column_ids: Option,
- pub distribution_key: Option,
- pub downstream_pk: Option,
+ pub columns: ColumnCatalogArray,
+ pub plan_pk: ColumnOrderArray,
+ pub distribution_key: I32Array,
+ pub downstream_pk: I32Array,
pub sink_type: SinkType,
- pub properties: Option