Skip to content

Commit

Permalink
some clean up for stream nodes' constructors
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jul 16, 2024
1 parent fe20942 commit 043bb8a
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 59 deletions.
5 changes: 3 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ impl StreamChangeLog {
pub fn new(core: generic::ChangeLog<PlanRef>) -> Self {
let input = core.input.clone();
let dist = input.distribution().clone();
let input_len = input.schema().len();
// Filter executor won't change the append-only behavior of the stream.
let mut watermark_columns = input.watermark_columns().clone();
if core.need_op {
watermark_columns.grow(input.watermark_columns().len() + 2);
watermark_columns.grow(input_len + 2);
} else {
watermark_columns.grow(input.watermark_columns().len() + 1);
watermark_columns.grow(input_len + 1);
}
let base = PlanBase::new_stream_with_core(
&core,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl StreamDeltaJoin {
let watermark_columns = from_left.bitand(&from_right);
core.i2o_col_mapping().rewrite_bitset(&watermark_columns)
};

// TODO: derive from input
let base = PlanBase::new_stream_with_core(
&core,
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct StreamExpand {
impl StreamExpand {
pub fn new(core: generic::Expand<PlanRef>) -> Self {
let input = core.input.clone();
let input_len = input.schema().len();

let dist = match input.distribution() {
Distribution::Single => Distribution::Single,
Expand All @@ -43,12 +44,7 @@ impl StreamExpand {
};

let mut watermark_columns = FixedBitSet::with_capacity(core.output_len());
watermark_columns.extend(
input
.watermark_columns()
.ones()
.map(|idx| idx + input.schema().len()),
);
watermark_columns.extend(input.watermark_columns().ones().map(|idx| idx + input_len));

let base = PlanBase::new_stream_with_core(
&core,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ impl StreamGroupTopN {
let input = &core.input;
let schema = input.schema().clone();

// FIXME(rc): Actually only watermark messages on the first group-by column are propagated
// acccoring to the current GroupTopN implementation. This should be fixed.
let watermark_columns = if input.append_only() {
input.watermark_columns().clone()
} else {
Expand Down
20 changes: 9 additions & 11 deletions src/frontend/src/optimizer/plan_node/stream_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use pretty_xmlish::XmlNode;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::HopWindowNode;

Expand Down Expand Up @@ -41,29 +40,28 @@ impl StreamHopWindow {
window_end_exprs: Vec<ExprImpl>,
) -> Self {
let input = core.input.clone();
let i2o = core.i2o_col_mapping();
let dist = i2o.rewrite_provided_distribution(input.distribution());
let dist = core
.i2o_col_mapping()
.rewrite_provided_distribution(input.distribution());

let mut watermark_columns = input.watermark_columns().clone();
let input2internal = core.input2internal_col_mapping();
let internal2output = core.internal2output_col_mapping();

let mut watermark_columns = input2internal.rewrite_bitset(input.watermark_columns());
watermark_columns.grow(core.internal_column_num());

if watermark_columns.contains(core.time_col.index) {
if input.watermark_columns().contains(core.time_col.index) {
// Watermark on `time_col` indicates watermark on both `window_start` and `window_end`.
watermark_columns.insert(core.internal_window_start_col_idx());
watermark_columns.insert(core.internal_window_end_col_idx());
}
let watermark_columns = ColIndexMapping::with_remaining_columns(
&core.output_indices,
core.internal_column_num(),
)
.rewrite_bitset(&watermark_columns);

let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
input.emit_on_window_close(),
watermark_columns,
internal2output.rewrite_bitset(&watermark_columns),
);
Self {
base,
Expand Down
26 changes: 14 additions & 12 deletions src/frontend/src/optimizer/plan_node/stream_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,26 @@ impl StreamProject {

let mut watermark_derivations = vec![];
let mut nondecreasing_exprs = vec![];
let mut watermark_columns = FixedBitSet::with_capacity(core.exprs.len());
let mut out_watermark_columns = FixedBitSet::with_capacity(core.exprs.len());
for (expr_idx, expr) in core.exprs.iter().enumerate() {
use monotonicity_variants::*;
match analyze_monotonicity(expr) {
Inherent(Constant) => {
// XXX(rc): we can produce one watermark on each recovery for this case.
}
Inherent(monotonicity) => {
if monotonicity.is_non_decreasing() {
nondecreasing_exprs.push(expr_idx); // to produce watermarks
out_watermark_columns.insert(expr_idx);
}
}
FollowingInput(input_idx) => {
if input.watermark_columns().contains(input_idx) {
watermark_derivations.push((input_idx, expr_idx));
watermark_columns.insert(expr_idx);
watermark_derivations.push((input_idx, expr_idx)); // to propagate watermarks
out_watermark_columns.insert(expr_idx);
}
}
Inherent(NonDecreasing) => {
nondecreasing_exprs.push(expr_idx);
watermark_columns.insert(expr_idx);
}
Inherent(Constant) => {
// XXX(rc): we can produce one watermark on each recovery for this case.
}
Inherent(_) | _FollowingInputInversely(_) => {}
_FollowingInputInversely(_) => {}
}
}
// Project executor won't change the append-only behavior of the stream, so it depends on
Expand All @@ -108,7 +110,7 @@ impl StreamProject {
distribution,
input.append_only(),
input.emit_on_window_close(),
watermark_columns,
out_watermark_columns,
);

StreamProject {
Expand Down
29 changes: 17 additions & 12 deletions src/frontend/src/optimizer/plan_node/stream_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,29 @@ impl StreamProjectSet {

let mut watermark_derivations = vec![];
let mut nondecreasing_exprs = vec![];
let mut watermark_columns = FixedBitSet::with_capacity(core.output_len());
let mut out_watermark_columns = FixedBitSet::with_capacity(core.output_len());
for (expr_idx, expr) in core.select_list.iter().enumerate() {
let out_expr_idx = expr_idx + 1;

use monotonicity_variants::*;
match analyze_monotonicity(expr) {
Inherent(Constant) => {
// XXX(rc): we can produce one watermark on each recovery for this case.
}
Inherent(monotonicity) => {
if monotonicity.is_non_decreasing() {
// FIXME(rc): we need to check expr is not table function
nondecreasing_exprs.push(expr_idx); // to produce watermarks
out_watermark_columns.insert(out_expr_idx);
}
}
FollowingInput(input_idx) => {
if input.watermark_columns().contains(input_idx) {
watermark_derivations.push((input_idx, expr_idx));
watermark_columns.insert(expr_idx + 1);
watermark_derivations.push((input_idx, expr_idx)); // to propagate watermarks
out_watermark_columns.insert(out_expr_idx);
}
}
Inherent(NonDecreasing) => {
nondecreasing_exprs.push(expr_idx);
watermark_columns.insert(expr_idx + 1);
}
Inherent(Constant) => {
// XXX(rc): we can produce one watermark on each recovery for this case.
}
Inherent(_) | _FollowingInputInversely(_) => {}
_FollowingInputInversely(_) => {}
}
}

Expand All @@ -75,7 +80,7 @@ impl StreamProjectSet {
distribution,
input.append_only(),
input.emit_on_window_close(),
watermark_columns,
out_watermark_columns,
);
StreamProjectSet {
base,
Expand Down
28 changes: 12 additions & 16 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,6 @@ impl TableCatalogBuilder {
self.value_indices = Some(value_indices);
}

#[allow(dead_code)]
pub fn set_watermark_columns(&mut self, watermark_columns: FixedBitSet) {
self.watermark_columns = Some(watermark_columns);
}

pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec<usize>) {
self.dist_key_in_pk = Some(dist_key_in_pk);
}
Expand Down Expand Up @@ -236,21 +231,22 @@ pub(crate) fn watermark_pretty<'a>(
watermark_columns: &FixedBitSet,
schema: &Schema,
) -> Option<Pretty<'a>> {
if watermark_columns.count_ones(..) > 0 {
Some(watermark_fields_pretty(watermark_columns.ones(), schema))
} else {
None
}
iter_fields_pretty(watermark_columns.ones(), schema)
}
pub(crate) fn watermark_fields_pretty<'a>(
watermark_columns: impl Iterator<Item = usize>,

pub(crate) fn iter_fields_pretty<'a>(
columns: impl Iterator<Item = usize>,
schema: &Schema,
) -> Pretty<'a> {
let arr = watermark_columns
) -> Option<Pretty<'a>> {
let arr = columns
.map(|idx| FieldDisplay(schema.fields.get(idx).unwrap()))
.map(|d| Pretty::display(&d))
.collect();
Pretty::Array(arr)
.collect::<Vec<_>>();
if arr.is_empty() {
None
} else {
Some(Pretty::Array(arr))
}
}

#[derive(Clone, Copy)]
Expand Down

0 comments on commit 043bb8a

Please sign in to comment.