diff --git a/proto/catalog.proto b/proto/catalog.proto index ec7c68a3802ba..f376d4dc3bed7 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -62,9 +62,16 @@ message StreamSourceInfo { SchemaRegistryNameStrategy name_strategy = 10; optional string key_message_name = 11; plan_common.ExternalTableDesc external_table = 12; - // Whether the stream source is a cdc source streaming job. - // We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72. - bool cdc_source_job = 13; + // Whether the stream source has a streaming job. + // This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72). + // Currently, the following sources have streaming jobs: + // - Direct CDC sources (mysql & postgresql) + // - MQ sources (Kafka, Pulsar, Kinesis, etc.) + bool has_streaming_job = 13; + // Only used when `has_streaming_job` is `true`. + // If `false`, `requires_singleton` will be set in the stream plan. + bool is_distributed = 15; + reserved "cdc_source_job"; // deprecated // Options specified by user in the FORMAT ENCODE clause. map format_encode_options = 14; } diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index db910930b5bee..e2678fd97ac83 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -146,7 +146,7 @@ enum TableJobType { // table streaming jobs excepts the `SHARED_CDC_SOURCE` type TABLE_JOB_TYPE_GENERAL = 1; // table streaming job sharing a CDC source job - TABLE_JOB_TYPE_SHARED_CDC_SOURCE = 2; + TABLE_JOB_TYPE_SOURCE = 2; } message CreateTableRequest { diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index a168ea163f5b5..9a77c54173c50 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -757,6 +757,7 @@ message StreamNode { StreamFsFetchNode stream_fs_fetch = 138; StreamCdcScanNode stream_cdc_scan = 139; CdcFilterNode cdc_filter = 140; + SourceNode source_backfill = 141; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index ce2820752f120..ab5aed59fca2e 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -187,6 +187,12 @@ pub fn visit_stream_node_tables_inner( always!(source.state_table, "FsFetch"); } } + NodeBody::SourceBackfill(node) => { + always!( + node.source_inner.as_mut().unwrap().state_table, + "SourceBackfill" + ) + } // Sink NodeBody::Sink(node) => { diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index b3a2bc6554c60..03dc99ec6a9fd 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -136,7 +136,7 @@ where }; self.table_schema = table_schema; if let Some(info) = source.info.as_ref() { - self.is_multi_table_shared = info.cdc_source_job; + self.is_multi_table_shared = info.has_streaming_job; } } diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 8c16f14d7ce71..17c16b16dd5fc 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -58,6 +58,12 @@ impl From<&SourceCatalog> for BoundSource { } } +impl BoundSource { + pub fn can_backfill(&self) -> bool { + self.catalog.info.has_streaming_job + } +} + impl Binder { /// Binds table or source, or logical view according to what we get from the catalog. pub fn bind_relation_by_name_inner( diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 75a7d5a84911c..a3459b898c378 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -474,7 +474,7 @@ fn bind_columns_from_source_for_cdc( row_encode: row_encode_to_prost(&source_schema.row_encode) as i32, format_encode_options, use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), - cdc_source_job: true, + has_streaming_job: true, ..Default::default() }; if !format_encode_options_to_consume.is_empty() { @@ -1122,18 +1122,22 @@ pub async fn handle_create_source( ensure_table_constraints_supported(&stmt.constraints)?; let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?; - // gated the feature with a session variable let create_cdc_source_job = if is_cdc_connector(&with_properties) { CdcTableType::from_properties(&with_properties).can_backfill() } else { false }; + let has_streaming_job = create_cdc_source_job || is_kafka_connector(&with_properties); - let (columns_from_resolve_source, source_info) = if create_cdc_source_job { + let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)? } else { bind_columns_from_source(&session, &source_schema, &with_properties).await? }; + if has_streaming_job { + source_info.has_streaming_job = true; + source_info.is_distributed = !create_cdc_source_job; + } let columns_from_sql = bind_sql_columns(&stmt.columns)?; let mut columns = bind_all_columns( @@ -1226,21 +1230,18 @@ pub async fn handle_create_source( let catalog_writer = session.catalog_writer()?; - if create_cdc_source_job { - // create a streaming job for the cdc source, which will mark as *singleton* in the Fragmenter + if has_streaming_job { let graph = { let context = OptimizerContext::from_handler_args(handler_args); - // cdc source is an append-only source in plain json format let source_node = LogicalSource::new( Some(Rc::new(SourceCatalog::from(&source))), columns.clone(), row_id_index, - false, + false, // Do not gen RowID. Gen RowID after backfill node instead. false, context.into(), )?; - // generate stream graph for cdc source job let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; let mut graph = build_graph(stream_plan); graph.parallelism = diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index f86f31c1e0765..08c8ae1059c0e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -565,6 +565,8 @@ impl ToStream for LogicalSource { { plan_prefix = Some(self.rewrite_new_s3_plan()?); } + + // TODO: after SourceBackfill is added, we shouldn't put generated columns/row id here, and put them after backfill instead. plan = if self.core.for_table { dispatch_new_s3_plan(self.rewrite_to_stream_batch_source(), plan_prefix) } else { diff --git a/src/frontend/src/optimizer/plan_node/logical_source_backfill.rs b/src/frontend/src/optimizer/plan_node/logical_source_backfill.rs new file mode 100644 index 0000000000000..3cd3e08986f74 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_source_backfill.rs @@ -0,0 +1,588 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::{max, min}; +use std::ops::Bound; +use std::ops::Bound::{Excluded, Included, Unbounded}; +use std::rc::Rc; + +use fixedbitset::FixedBitSet; +use itertools::Itertools; +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::bail_not_implemented; +use risingwave_common::catalog::{ + ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, +}; +use risingwave_common::error::Result; +use risingwave_connector::source::{ConnectorProperties, DataType}; +use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; +use risingwave_pb::plan_common::GeneratedColumnDesc; + +use super::generic::GenericPlanRef; +use super::stream_watermark_filter::StreamWatermarkFilter; +use super::utils::{childless_record, Distill}; +use super::{ + generic, BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter, + LogicalProject, PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, + StreamSource, ToBatch, ToStream, +}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, InputRef}; +use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch; +use crate::optimizer::plan_node::utils::column_names_pretty; +use crate::optimizer::plan_node::{ + ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup, + StreamSourceBackfill, ToStreamContext, +}; +use crate::optimizer::property::Distribution::HashShard; +use crate::optimizer::property::{Distribution, Order, RequiredDist}; +use crate::utils::{ColIndexMapping, Condition, IndexRewriter}; + +/// `LogicalSourceBackfill` returns contents of a table or other equivalent object +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalSourceBackfill { + pub base: PlanBase, + pub core: generic::Source, + + /// Expressions to output. This field presents and will be turned to a `Project` when + /// converting to a physical plan, only if there are generated columns. + output_exprs: Option>, +} + +impl LogicalSourceBackfill { + pub fn new( + source_catalog: Option>, + column_catalog: Vec, + row_id_index: Option, + gen_row_id: bool, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Result { + let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); + let core = generic::Source { + catalog: source_catalog, + column_catalog, + row_id_index, + gen_row_id, + for_table, + ctx, + kafka_timestamp_range, + }; + + let base = PlanBase::new_logical_with_core(&core); + + let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?; + + Ok(LogicalSourceBackfill { + base, + core, + output_exprs, + }) + } + + pub fn with_catalog( + source_catalog: Rc, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Result { + let column_catalogs = source_catalog.columns.clone(); + let row_id_index = source_catalog.row_id_index; + let gen_row_id = source_catalog.append_only; + + Self::new( + Some(source_catalog), + column_catalogs, + row_id_index, + gen_row_id, + for_table, + ctx, + ) + } + + pub fn derive_output_exprs_from_generated_columns( + columns: &[ColumnCatalog], + ) -> Result>> { + if !columns.iter().any(|c| c.is_generated()) { + return Ok(None); + } + + let col_mapping = { + let mut mapping = vec![None; columns.len()]; + let mut cur = 0; + for (idx, column) in columns.iter().enumerate() { + if !column.is_generated() { + mapping[idx] = Some(cur); + cur += 1; + } else { + mapping[idx] = None; + } + } + ColIndexMapping::new(mapping, columns.len()) + }; + + let mut rewriter = IndexRewriter::new(col_mapping); + let mut exprs = Vec::with_capacity(columns.len()); + let mut cur = 0; + for column in columns { + let column_desc = &column.column_desc; + let ret_data_type = column_desc.data_type.clone(); + + if let Some(GeneratedOrDefaultColumn::GeneratedColumn(generated_column)) = + &column_desc.generated_or_default_column + { + let GeneratedColumnDesc { expr } = generated_column; + // TODO(yuhao): avoid this `from_expr_proto`. + let proj_expr = + rewriter.rewrite_expr(ExprImpl::from_expr_proto(expr.as_ref().unwrap())?); + let casted_expr = proj_expr.cast_assign(ret_data_type)?; + exprs.push(casted_expr); + } else { + let input_ref = InputRef { + data_type: ret_data_type, + index: cur, + }; + cur += 1; + exprs.push(ExprImpl::InputRef(Box::new(input_ref))); + } + } + + Ok(Some(exprs)) + } + + fn rewrite_new_s3_plan(&self) -> Result { + let logical_source = generic::Source { + catalog: self.core.catalog.clone(), + column_catalog: vec![ + ColumnCatalog { + column_desc: ColumnDesc::from_field_with_column_id( + &Field { + name: "filename".to_string(), + data_type: DataType::Varchar, + sub_fields: vec![], + type_name: "".to_string(), + }, + 0, + ), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::from_field_with_column_id( + &Field { + name: "last_edit_time".to_string(), + data_type: DataType::Timestamptz, + sub_fields: vec![], + type_name: "".to_string(), + }, + 1, + ), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::from_field_with_column_id( + &Field { + name: "file_size".to_string(), + data_type: DataType::Int64, + sub_fields: vec![], + type_name: "".to_string(), + }, + 0, + ), + is_hidden: false, + }, + ], + row_id_index: None, + gen_row_id: false, + ..self.core.clone() + }; + let mut new_s3_plan: PlanRef = StreamSource { + base: PlanBase::new_stream_with_core( + &logical_source, + Distribution::Single, + true, // `list` will keep listing all objects, it must be append-only + false, + FixedBitSet::with_capacity(logical_source.column_catalog.len()), + ), + core: logical_source, + } + .into(); + new_s3_plan = RequiredDist::shard_by_key(3, &[0]) + .enforce_if_not_satisfies(new_s3_plan, &Order::any())?; + new_s3_plan = StreamDedup::new(generic::Dedup { + input: new_s3_plan, + dedup_cols: vec![0], + }) + .into(); + + Ok(new_s3_plan) + } + + /// `row_id_index` in source node should rule out generated column + #[must_use] + fn rewrite_row_id_idx(columns: &[ColumnCatalog], row_id_index: Option) -> Option { + row_id_index.map(|idx| { + let mut cnt = 0; + for col in columns.iter().take(idx + 1) { + if col.is_generated() { + cnt += 1; + } + } + idx - cnt + }) + } + + pub fn source_catalog(&self) -> Rc { + self.core + .catalog + .clone() + .expect("source catalog should exist for LogicalSourceBackfill") + } + + fn clone_with_kafka_timestamp_range(&self, range: (Bound, Bound)) -> Self { + let mut core = self.core.clone(); + core.kafka_timestamp_range = range; + Self { + base: self.base.clone(), + core, + output_exprs: self.output_exprs.clone(), + } + } + + /// The columns in stream/batch source node indicate the actual columns it will produce, + /// instead of the columns defined in source catalog. The difference is generated columns. + #[must_use] + fn rewrite_to_stream_batch_source(&self) -> generic::Source { + let column_catalog = self.core.column_catalog.clone(); + // Filter out the generated columns. + let row_id_index = Self::rewrite_row_id_idx(&column_catalog, self.core.row_id_index); + let source_column_catalogs = column_catalog + .into_iter() + .filter(|c| !c.is_generated()) + .collect_vec(); + generic::Source { + catalog: self.core.catalog.clone(), + column_catalog: source_column_catalogs, + row_id_index, + ctx: self.core.ctx.clone(), + ..self.core + } + } + + fn wrap_with_optional_generated_columns_stream_proj( + &self, + input: Option, + ) -> Result { + if let Some(exprs) = &self.output_exprs { + let source: PlanRef = + dispatch_new_s3_plan(self.rewrite_to_stream_batch_source(), input); + let logical_project = generic::Project::new(exprs.to_vec(), source); + Ok(StreamProject::new(logical_project).into()) + } else { + let source = dispatch_new_s3_plan(self.core.clone(), input); + Ok(source) + } + } + + fn wrap_with_optional_generated_columns_batch_proj(&self) -> Result { + if let Some(exprs) = &self.output_exprs { + let source = BatchSource::new(self.rewrite_to_stream_batch_source()); + let logical_project = generic::Project::new(exprs.to_vec(), source.into()); + Ok(BatchProject::new(logical_project).into()) + } else { + let source = BatchSource::new(self.core.clone()); + Ok(source.into()) + } + } +} + +impl_plan_tree_node_for_leaf! {LogicalSourceBackfill} +impl Distill for LogicalSourceBackfill { + fn distill<'a>(&self) -> XmlNode<'a> { + let src = Pretty::from(self.source_catalog().name.clone()); + let time = Pretty::debug(&self.core.kafka_timestamp_range); + let fields = vec![ + ("source", src), + ("columns", column_names_pretty(self.schema())), + ("time_range", time), + ]; + + childless_record("LogicalSourceBackfill", fields) + } +} + +impl ColPrunable for LogicalSourceBackfill { + fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { + let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len()); + LogicalProject::with_mapping(self.clone().into(), mapping).into() + } +} + +impl ExprRewritable for LogicalSourceBackfill { + fn has_rewritable_expr(&self) -> bool { + self.output_exprs.is_some() + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + let mut output_exprs = self.output_exprs.clone(); + + for expr in output_exprs.iter_mut().flatten() { + *expr = r.rewrite_expr(expr.clone()); + } + + Self { + output_exprs, + ..self.clone() + } + .into() + } +} + +impl ExprVisitable for LogicalSourceBackfill { + fn visit_exprs(&self, v: &mut dyn ExprVisitor) { + self.output_exprs + .iter() + .flatten() + .for_each(|e| v.visit_expr(e)); + } +} + +/// A util function to extract kafka offset timestamp range. +/// +/// Currently we only support limiting kafka offset timestamp range using literals, e.g. we only +/// support expressions like `_rw_kafka_timestamp <= '2022-10-11 1:00:00+00:00'`. +/// +/// # Parameters +/// +/// * `expr`: Expression to be consumed. +/// * `range`: Original timestamp range, if `expr` can be recognized, we will update `range`. +/// * `schema`: Input schema. +/// +/// # Return Value +/// +/// If `expr` can be recognized and consumed by this function, then we return `None`. +/// Otherwise `expr` is returned. +fn expr_to_kafka_timestamp_range( + expr: ExprImpl, + range: &mut (Bound, Bound), + schema: &Schema, +) -> Option { + let merge_upper_bound = |first, second| -> Bound { + match (first, second) { + (first, Unbounded) => first, + (Unbounded, second) => second, + (Included(f1), Included(f2)) => Included(min(f1, f2)), + (Included(f1), Excluded(f2)) => { + if f1 < f2 { + Included(f1) + } else { + Excluded(f2) + } + } + (Excluded(f1), Included(f2)) => { + if f2 < f1 { + Included(f2) + } else { + Excluded(f1) + } + } + (Excluded(f1), Excluded(f2)) => Excluded(min(f1, f2)), + } + }; + + let merge_lower_bound = |first, second| -> Bound { + match (first, second) { + (first, Unbounded) => first, + (Unbounded, second) => second, + (Included(f1), Included(f2)) => Included(max(f1, f2)), + (Included(f1), Excluded(f2)) => { + if f1 > f2 { + Included(f1) + } else { + Excluded(f2) + } + } + (Excluded(f1), Included(f2)) => { + if f2 > f1 { + Included(f2) + } else { + Excluded(f1) + } + } + (Excluded(f1), Excluded(f2)) => Excluded(max(f1, f2)), + } + }; + + let extract_timestampz_literal = |expr: &ExprImpl| -> Result> { + match expr { + ExprImpl::FunctionCall(function_call) if function_call.inputs().len() == 2 => { + match (&function_call.inputs()[0], &function_call.inputs()[1]) { + (ExprImpl::InputRef(input_ref), literal) + if let Some(datum) = literal.try_fold_const().transpose()? + && schema.fields[input_ref.index].name + == KAFKA_TIMESTAMP_COLUMN_NAME + && literal.return_type() == DataType::Timestamptz => + { + Ok(Some(( + datum.unwrap().into_timestamptz().timestamp_millis(), + false, + ))) + } + (literal, ExprImpl::InputRef(input_ref)) + if let Some(datum) = literal.try_fold_const().transpose()? + && schema.fields[input_ref.index].name + == KAFKA_TIMESTAMP_COLUMN_NAME + && literal.return_type() == DataType::Timestamptz => + { + Ok(Some(( + datum.unwrap().into_timestamptz().timestamp_millis(), + true, + ))) + } + _ => Ok(None), + } + } + _ => Ok(None), + } + }; + + match &expr { + ExprImpl::FunctionCall(function_call) => { + if let Ok(Some((timestampz_literal, reverse))) = extract_timestampz_literal(&expr) { + match function_call.func_type() { + ExprType::GreaterThan => { + if reverse { + range.1 = merge_upper_bound(range.1, Excluded(timestampz_literal)); + } else { + range.0 = merge_lower_bound(range.0, Excluded(timestampz_literal)); + } + + None + } + ExprType::GreaterThanOrEqual => { + if reverse { + range.1 = merge_upper_bound(range.1, Included(timestampz_literal)); + } else { + range.0 = merge_lower_bound(range.0, Included(timestampz_literal)); + } + None + } + ExprType::Equal => { + range.0 = merge_lower_bound(range.0, Included(timestampz_literal)); + range.1 = merge_upper_bound(range.1, Included(timestampz_literal)); + None + } + ExprType::LessThan => { + if reverse { + range.0 = merge_lower_bound(range.0, Excluded(timestampz_literal)); + } else { + range.1 = merge_upper_bound(range.1, Excluded(timestampz_literal)); + } + None + } + ExprType::LessThanOrEqual => { + if reverse { + range.0 = merge_lower_bound(range.0, Included(timestampz_literal)); + } else { + range.1 = merge_upper_bound(range.1, Included(timestampz_literal)); + } + None + } + _ => Some(expr), + } + } else { + Some(expr) + } + } + _ => Some(expr), + } +} + +impl PredicatePushdown for LogicalSourceBackfill { + fn predicate_pushdown( + &self, + predicate: Condition, + _ctx: &mut PredicatePushdownContext, + ) -> PlanRef { + let mut range = self.core.kafka_timestamp_range; + + let mut new_conjunctions = Vec::with_capacity(predicate.conjunctions.len()); + for expr in predicate.conjunctions { + if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, self.base.schema()) { + // Not recognized, so push back + new_conjunctions.push(e); + } + } + + let new_source = self.clone_with_kafka_timestamp_range(range).into(); + + if new_conjunctions.is_empty() { + new_source + } else { + LogicalFilter::create( + new_source, + Condition { + conjunctions: new_conjunctions, + }, + ) + } + } +} + +impl ToBatch for LogicalSourceBackfill { + fn to_batch(&self) -> Result { + // TODO: + let source = BatchSource::new(self.core.clone()); + Ok(source.into()) + } +} + +impl ToStream for LogicalSourceBackfill { + fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { + let mut plan = StreamSourceBackfill::new(self.rewrite_to_stream_batch_source()).into(); + + let catalog = self.source_catalog(); + if !catalog.watermark_descs.is_empty() && !self.core.for_table { + plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); + } + + assert!(!(self.core.gen_row_id && self.core.for_table)); + if let Some(row_id_index) = self.core.row_id_index + && self.core.gen_row_id + { + plan = StreamRowIdGen::new_with_dist(plan, row_id_index, HashShard(vec![row_id_index])) + .into(); + } + Ok(plan) + } + + fn logical_rewrite_for_stream( + &self, + _ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + Ok(( + self.clone().into(), + ColIndexMapping::identity(self.schema().len()), + )) + } +} + +#[inline] +fn dispatch_new_s3_plan(source: generic::Source, input: Option) -> PlanRef { + if let Some(input) = input { + StreamFsFetch::new(input, source).into() + } else { + StreamSource::new(source).into() + } +} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index fd05a4c7ecc9a..91256938539d8 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -687,6 +687,9 @@ impl dyn PlanNode { if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() { return stream_cdc_table_scan.adhoc_to_stream_prost(state); } + if let Some(stream_source_backfill) = self.as_stream_source_backfill() { + return stream_source_backfill.adhoc_to_stream_prost(state); + } if let Some(stream_share) = self.as_stream_share() { return stream_share.adhoc_to_stream_prost(state); } @@ -821,6 +824,7 @@ mod logical_project_set; mod logical_scan; mod logical_share; mod logical_source; +mod logical_source_backfill; mod logical_sys_scan; mod logical_table_function; mod logical_topn; @@ -850,6 +854,7 @@ mod stream_simple_agg; mod stream_sink; mod stream_sort; mod stream_source; +mod stream_source_backfill; mod stream_stateless_simple_agg; mod stream_table_scan; mod stream_topn; @@ -912,6 +917,7 @@ pub use logical_project_set::LogicalProjectSet; pub use logical_scan::LogicalScan; pub use logical_share::LogicalShare; pub use logical_source::LogicalSource; +pub use logical_source_backfill::LogicalSourceBackfill; pub use logical_sys_scan::LogicalSysScan; pub use logical_table_function::LogicalTableFunction; pub use logical_topn::LogicalTopN; @@ -943,6 +949,7 @@ pub use stream_simple_agg::StreamSimpleAgg; pub use stream_sink::StreamSink; pub use stream_sort::StreamEowcSort; pub use stream_source::StreamSource; +pub use stream_source_backfill::StreamSourceBackfill; pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg; pub use stream_table_scan::StreamTableScan; pub use stream_temporal_join::StreamTemporalJoin; @@ -983,6 +990,7 @@ macro_rules! for_all_plan_nodes { , { Logical, CdcScan } , { Logical, SysScan } , { Logical, Source } + , { Logical, SourceBackfill } , { Logical, Insert } , { Logical, Delete } , { Logical, Update } @@ -1036,6 +1044,7 @@ macro_rules! for_all_plan_nodes { , { Stream, CdcTableScan } , { Stream, Sink } , { Stream, Source } + , { Stream, SourceBackfill } , { Stream, HashJoin } , { Stream, Exchange } , { Stream, HashAgg } @@ -1079,6 +1088,7 @@ macro_rules! for_logical_plan_nodes { , { Logical, CdcScan } , { Logical, SysScan } , { Logical, Source } + , { Logical, SourceBackfill } , { Logical, Insert } , { Logical, Delete } , { Logical, Update } @@ -1152,6 +1162,7 @@ macro_rules! for_stream_plan_nodes { , { Stream, CdcTableScan } , { Stream, Sink } , { Stream, Source } + , { Stream, SourceBackfill } , { Stream, HashAgg } , { Stream, SimpleAgg } , { Stream, StatelessSimpleAgg } diff --git a/src/frontend/src/optimizer/plan_node/stream_source_backfill.rs b/src/frontend/src/optimizer/plan_node/stream_source_backfill.rs new file mode 100644 index 0000000000000..ce3bea1f03f0b --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/stream_source_backfill.rs @@ -0,0 +1,148 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::rc::Rc; + +use fixedbitset::FixedBitSet; +use itertools::Itertools; +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; +use risingwave_pb::stream_plan::{PbStreamNode, PbStreamSource, SourceNode}; + +use super::stream::prelude::*; +use super::{PlanBase, PlanRef, PlanTreeNodeUnary}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::{childless_record, Distill}; +use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode}; +use crate::optimizer::property::Distribution; +use crate::stream_fragmenter::BuildFragmentGraphState; +use crate::Explain; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct StreamSourceBackfill { + pub base: PlanBase, + core: generic::Source, +} + +impl_plan_tree_node_for_leaf! { StreamSourceBackfill } + +impl StreamSourceBackfill { + pub fn new(source: generic::Source) -> Self { + let base = PlanBase::new_stream_with_core( + &source, + Distribution::SomeShard, + source.catalog.as_ref().map_or(true, |s| s.append_only), + false, + FixedBitSet::with_capacity(source.column_catalog.len()), + ); + + Self { base, core: source } + } + + fn get_columns(&self) -> Vec<&str> { + self.core + .column_catalog + .iter() + .map(|column| column.name()) + .collect() + } + + pub fn source_catalog(&self) -> Option> { + self.core.catalog.clone() + } + + pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode { + use risingwave_pb::stream_plan::*; + + let stream_key = self + .stream_key() + .unwrap_or_else(|| { + panic!( + "should always have a stream key in the stream plan but not, sub plan: {}", + PlanRef::from(self.clone()).explain_to_string() + ) + }) + .iter() + .map(|x| *x as u32) + .collect_vec(); + + let source_catalog = self.source_catalog(); + let source_inner = source_catalog.map(|source_catalog| PbStreamSource { + source_id: source_catalog.id, + source_name: source_catalog.name.clone(), + // FIXME: this is not the same as source. + state_table: Some( + generic::Source::infer_internal_table_catalog() + .with_id(state.gen_table_id_wrapped()) + .to_internal_table_prost(), + ), + info: Some(source_catalog.info.clone()), + row_id_index: self.core.row_id_index.map(|index| index as _), + columns: self + .core + .column_catalog + .iter() + .map(|c| c.to_protobuf()) + .collect_vec(), + with_properties: source_catalog.with_properties.clone().into_iter().collect(), + rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + }); + + let stream_scan_body = PbNodeBody::SourceBackfill(SourceNode { source_inner }); + + let fields = self.schema().to_prost(); + // plan: merge -> backfill + PbStreamNode { + fields: fields.clone(), + input: vec![ + // The merge node body will be filled by the `ActorBuilder` on the meta service. + PbStreamNode { + node_body: Some(PbNodeBody::Merge(Default::default())), + identity: "Upstream".into(), + fields, + stream_key: vec![], // not used + ..Default::default() + }, + ], + node_body: Some(stream_scan_body), + stream_key, + operator_id: self.base.id().0 as u64, + identity: self.distill_to_string(), + append_only: self.append_only(), + } + } +} + +impl Distill for StreamSourceBackfill { + fn distill<'a>(&self) -> XmlNode<'a> { + let columns = self + .get_columns() + .iter() + .map(|ele| Pretty::from(ele.to_string())) + .collect(); + let col = Pretty::Array(columns); + childless_record("StreamSourceBackfill", vec![("columns", col)]) + } +} + +impl ExprRewritable for StreamSourceBackfill {} + +impl ExprVisitable for StreamSourceBackfill {} + +impl StreamNode for StreamSourceBackfill { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody { + unreachable!("stream source backfill cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead.") + } +} diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index ff51cb56ec9bf..ec93e79375164 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -27,7 +27,8 @@ use crate::binder::{ use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ LogicalApply, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, LogicalShare, - LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef, + LogicalSource, LogicalSourceBackfill, LogicalSysScan, LogicalTableFunction, LogicalValues, + PlanRef, }; use crate::optimizer::property::Cardinality; use crate::planner::Planner; @@ -85,7 +86,14 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx())?.into()) + if source.can_backfill() { + Ok( + LogicalSourceBackfill::with_catalog(Rc::new(source.catalog), false, self.ctx())? + .into(), + ) + } else { + Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx())?.into()) + } } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result { diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 291d72b6a8f22..8948884f79f6e 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -263,9 +263,9 @@ fn build_fragment( if let Some(source) = node.source_inner.as_ref() && let Some(source_info) = source.info.as_ref() - && source_info.cdc_source_job + && source_info.has_streaming_job + && !source_info.is_distributed { - tracing::debug!("mark cdc source job as singleton"); current_fragment.requires_singleton = true; } } @@ -308,6 +308,13 @@ fn build_fragment( .upstream_table_ids .push(node.upstream_source_id); } + NodeBody::SourceBackfill(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32; + // memorize upstream source id for later use + let source_id = node.source_inner.as_ref().unwrap().source_id; + state.dependent_table_ids.insert(source_id.into()); + current_fragment.upstream_table_ids.push(source_id); + } NodeBody::Now(_) => { // TODO: Remove this and insert a `BarrierRecv` instead. diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index a9c7459d745d9..7cd623badbd26 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -361,11 +361,13 @@ impl DatabaseManager { .chain(self.indexes.keys().copied()) .chain(self.sources.keys().copied()) .chain( - // filter cdc source jobs self.sources .iter() .filter(|(_, source)| { - source.info.as_ref().is_some_and(|info| info.cdc_source_job) + source + .info + .as_ref() + .is_some_and(|info| info.has_streaming_job) }) .map(|(id, _)| id) .copied(), diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index f272e6d1ff198..8bfaaceda3141 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1144,7 +1144,7 @@ impl CatalogManager { let mut all_sink_ids: HashSet = HashSet::default(); let mut all_source_ids: HashSet = HashSet::default(); let mut all_view_ids: HashSet = HashSet::default(); - let mut all_cdc_source_ids: HashSet = HashSet::default(); + let mut all_streaming_job_source_ids: HashSet = HashSet::default(); let relations_depend_on = |relation_id: RelationId| -> Vec { let tables_depend_on = tables @@ -1407,11 +1407,10 @@ impl CatalogManager { continue; } - // cdc source streaming job if let Some(info) = source.info - && info.cdc_source_job + && info.has_streaming_job { - all_cdc_source_ids.insert(source.id); + all_streaming_job_source_ids.insert(source.id); let source_table_fragments = fragment_manager .select_table_fragments_by_table_id(&source.id.into()) .await?; @@ -1668,7 +1667,7 @@ impl CatalogManager { .into_iter() .map(|id| id.into()) .chain(all_sink_ids.into_iter().map(|id| id.into())) - .chain(all_cdc_source_ids.into_iter().map(|id| id.into())) + .chain(all_streaming_job_source_ids.into_iter().map(|id| id.into())) .collect_vec(); Ok((version, catalog_deleted_ids)) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 72d0086ba2fac..8683b3b24a8fc 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -485,6 +485,7 @@ impl DdlController { unimplemented!("support drop source in v2"); }; // 1. Drop source in catalog. + // If the source has a streaming job, it's also dropped here. let (version, streaming_job_ids) = mgr .catalog_manager .drop_relation( diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 801be6a4ab628..64cd410e03ac0 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -244,6 +244,42 @@ impl ActorBuilder { }) } + // "Leaf" node `StreamScan`. + NodeBody::SourceBackfill(source_backfill) => { + let input = stream_node.get_input(); + assert_eq!(input.len(), 1); + + let merge_node = &input[0]; + assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_))); + + // Index the upstreams by the an external edge ID. + let upstreams = &self.upstreams[&EdgeId::UpstreamExternal { + upstream_table_id: source_backfill.source_inner.as_ref().unwrap().source_id.into(), + downstream_fragment_id: self.fragment_id, + }]; + + let upstream_actor_id = upstreams.actors.as_global_ids(); + assert_eq!(upstream_actor_id.len(), 1); + + let input = vec![ + // Fill the merge node body with correct upstream info. + StreamNode { + node_body: Some(NodeBody::Merge(MergeNode { + upstream_actor_id, + upstream_fragment_id: upstreams.fragment_id.as_global_id(), + upstream_dispatcher_type: DispatcherType::NoShuffle as _, + fields: merge_node.fields.clone(), + })), + ..merge_node.clone() + }, + ]; + + Ok(StreamNode { + input, + ..stream_node.clone() + }) + } + // For other nodes, visit the children recursively. _ => { let mut new_stream_node = stream_node.clone(); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index e4b2b03004fe8..9e804d549889d 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -178,6 +178,17 @@ impl BuildingFragment { cdc_filter.upstream_source_id.into(), cdc_filter.upstream_column_ids.clone(), ), + NodeBody::SourceBackfill(backfill) => ( + backfill.source_inner.as_ref().unwrap().source_id.into(), + backfill + .source_inner + .as_ref() + .unwrap() + .columns + .iter() + .map(|c| c.column_desc.as_ref().unwrap().column_id) + .collect(), + ), _ => return, }; table_columns @@ -188,7 +199,7 @@ impl BuildingFragment { assert_eq!( table_columns.len(), fragment.upstream_table_ids.len(), - "fragment type: {}", + "fragment type: {:b}", fragment.fragment_type_mask ); diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 9a9e83c0a328f..89ca7779503ae 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -172,5 +172,7 @@ pub async fn create_executor( NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder, NodeBody::OverWindow => OverWindowExecutorBuilder, NodeBody::StreamFsFetch => FsFetchExecutorBuilder, + // TODO: + NodeBody::SourceBackfill => SourceExecutorBuilder, } }