diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 6efd862273624..b8ca322d767a8 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -39,3 +39,6 @@ d70dba827c303373f3220c9733f7c7443e5c2d37 # chore: cargo +nightly fmt (#13162) (format let-chains) c583e2c6c054764249acf484438c7bf7197765f4 + +# chore: replace all ProstXxx with PbXxx (#8621) +6fd8821f2e053957b183d648bea9c95b6703941f diff --git a/Cargo.lock b/Cargo.lock index fac9e97c5f8b9..c54f8f0898050 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9917,6 +9917,7 @@ dependencies = [ "auto_enums", "await-tree", "bytes", + "cfg-if", "criterion", "delta_btree_map", "educe 0.5.7", diff --git a/proto/catalog.proto b/proto/catalog.proto index ec7c68a3802ba..f376d4dc3bed7 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -62,9 +62,16 @@ message StreamSourceInfo { SchemaRegistryNameStrategy name_strategy = 10; optional string key_message_name = 11; plan_common.ExternalTableDesc external_table = 12; - // Whether the stream source is a cdc source streaming job. - // We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72. - bool cdc_source_job = 13; + // Whether the stream source has a streaming job. + // This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72). + // Currently, the following sources have streaming jobs: + // - Direct CDC sources (mysql & postgresql) + // - MQ sources (Kafka, Pulsar, Kinesis, etc.) + bool has_streaming_job = 13; + // Only used when `has_streaming_job` is `true`. + // If `false`, `requires_singleton` will be set in the stream plan. + bool is_distributed = 15; + reserved "cdc_source_job"; // deprecated // Options specified by user in the FORMAT ENCODE clause. map format_encode_options = 14; } diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 0ccf6718ecad3..5101122f47372 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -205,6 +205,23 @@ message StreamFsFetchNode { StreamFsFetch node_inner = 1; } +message SourceBackfillNode { + uint32 source_id = 1; + optional uint32 row_id_index = 3; + // XXX: is this all columns or only required columns? + repeated plan_common.ColumnCatalog columns = 4; + catalog.StreamSourceInfo info = 7; + string source_name = 8; + map with_properties = 6; + // Streaming rate limit + // optional uint32 rate_limit = 9; + + // fields above are the same as StreamSource + + // `| partition_id | backfill_progress |` + catalog.Table state_table = 2; +} + message SinkDesc { reserved 4; reserved "columns"; @@ -758,6 +775,7 @@ message StreamNode { StreamFsFetchNode stream_fs_fetch = 138; StreamCdcScanNode stream_cdc_scan = 139; CdcFilterNode cdc_filter = 140; + SourceBackfillNode source_backfill = 141; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. @@ -852,6 +870,7 @@ enum FragmentTypeFlag { FRAGMENT_TYPE_FLAG_VALUES = 64; FRAGMENT_TYPE_FLAG_DML = 128; FRAGMENT_TYPE_FLAG_CDC_FILTER = 256; + FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL = 512; } // The streaming context associated with a stream plan diff --git a/src/common/src/util/iter_util.rs b/src/common/src/util/iter_util.rs index 92f19a0ee46fc..7588171ad2f73 100644 --- a/src/common/src/util/iter_util.rs +++ b/src/common/src/util/iter_util.rs @@ -54,3 +54,29 @@ where { a.into_iter().zip_eq_fast(b) } + +pub trait IntoIteratorExt +where + for<'a> &'a Self: IntoIterator, +{ + /// Shorter version of `self.iter().map(f).collect()`. + fn map_collect(&self, f: F) -> BCollection + where + F: FnMut(&A) -> B, + for<'a> &'a Self: IntoIterator, + BCollection: FromIterator, + { + self.into_iter().map(f).collect() + } + + /// Shorter version of `self.iter().map(f).collect_vec()`. + fn map_to_vec(&self, f: F) -> Vec + where + F: FnMut(&A) -> B, + for<'a> &'a Self: IntoIterator, + { + self.map_collect(f) + } +} + +impl IntoIteratorExt for T where for<'a> &'a Self: IntoIterator {} diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index ce2820752f120..c9518a03c2623 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -187,6 +187,9 @@ pub fn visit_stream_node_tables_inner( always!(source.state_table, "FsFetch"); } } + NodeBody::SourceBackfill(node) => { + always!(node.state_table, "SourceBackfill") + } // Sink NodeBody::Sink(node) => { diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index a6ddb359bacdb..6583aa5a49d1c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -72,8 +72,10 @@ pub trait SourceProperties: TryFromHashmap + Clone + WithOptions { type SplitEnumerator: SplitEnumerator; type SplitReader: SplitReader; + /// Load additional info from `PbSource`. Currently only used by CDC. fn init_from_pb_source(&mut self, _source: &PbSource) {} + /// Load additional info from `ExternalTableDesc`. Currently only used by CDC. fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {} } @@ -447,10 +449,12 @@ impl ConnectorProperties { matches!(self, ConnectorProperties::Kinesis(_)) } + /// Load additional info from `PbSource`. Currently only used by CDC. pub fn init_from_pb_source(&mut self, source: &PbSource) { dispatch_source_prop!(self, prop, prop.init_from_pb_source(source)) } + /// Load additional info from `ExternalTableDesc`. Currently only used by CDC. pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) { dispatch_source_prop!(self, prop, prop.init_from_pb_cdc_table_desc(cdc_table_desc)) } diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index b3a2bc6554c60..03dc99ec6a9fd 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -136,7 +136,7 @@ where }; self.table_schema = table_schema; if let Some(info) = source.info.as_ref() { - self.is_multi_table_shared = info.cdc_source_job; + self.is_multi_table_shared = info.has_streaming_job; } } diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 8c16f14d7ce71..17c16b16dd5fc 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -58,6 +58,12 @@ impl From<&SourceCatalog> for BoundSource { } } +impl BoundSource { + pub fn can_backfill(&self) -> bool { + self.catalog.info.has_streaming_job + } +} + impl Binder { /// Binds table or source, or logical view according to what we get from the catalog. pub fn bind_relation_by_name_inner( diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f15017926dc46..0d4a9104f7748 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -474,7 +474,7 @@ fn bind_columns_from_source_for_cdc( row_encode: row_encode_to_prost(&source_schema.row_encode) as i32, format_encode_options, use_schema_registry: json_schema_infer_use_schema_registry(&schema_config), - cdc_source_job: true, + has_streaming_job: true, ..Default::default() }; if !format_encode_options_to_consume.is_empty() { @@ -1130,18 +1130,22 @@ pub async fn handle_create_source( ensure_table_constraints_supported(&stmt.constraints)?; let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?; - // gated the feature with a session variable let create_cdc_source_job = if is_cdc_connector(&with_properties) { CdcTableType::from_properties(&with_properties).can_backfill() } else { false }; + let has_streaming_job = create_cdc_source_job || is_kafka_connector(&with_properties); - let (columns_from_resolve_source, source_info) = if create_cdc_source_job { + let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)? } else { bind_columns_from_source(&session, &source_schema, &with_properties).await? }; + if has_streaming_job { + source_info.has_streaming_job = true; + source_info.is_distributed = !create_cdc_source_job; + } let columns_from_sql = bind_sql_columns(&stmt.columns)?; let mut columns = bind_all_columns( @@ -1235,18 +1239,15 @@ pub async fn handle_create_source( let catalog_writer = session.catalog_writer()?; - if create_cdc_source_job { - // create a streaming job for the cdc source, which will mark as *singleton* in the Fragmenter + if has_streaming_job { let graph = { let context = OptimizerContext::from_handler_args(handler_args); - // cdc source is an append-only source in plain json format let source_node = LogicalSource::with_catalog( Rc::new(SourceCatalog::from(&source)), false, context.into(), )?; - // generate stream graph for cdc source job let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?; let mut graph = build_graph(stream_plan)?; graph.parallelism = diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 3d25ee6fb0597..61722d9f6b188 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -568,6 +568,8 @@ impl ToStream for LogicalSource { plan_prefix = Some(self.rewrite_new_s3_plan()?); } + + // TODO: after SourceBackfill is added, we shouldn't put generated columns/row id here, and put them after backfill instead. if self.core.for_table { plan = dispatch_new_s3_plan(self.rewrite_to_stream_batch_source(), plan_prefix) } else { diff --git a/src/frontend/src/optimizer/plan_node/logical_source_backfill.rs b/src/frontend/src/optimizer/plan_node/logical_source_backfill.rs new file mode 100644 index 0000000000000..f652b5159a774 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_source_backfill.rs @@ -0,0 +1,480 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::{max, min}; +use std::ops::Bound; +use std::ops::Bound::{Excluded, Included, Unbounded}; +use std::rc::Rc; + +use itertools::Itertools; +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::catalog::{ColumnCatalog, Schema, KAFKA_TIMESTAMP_COLUMN_NAME}; +use risingwave_common::error::Result; +use risingwave_connector::source::DataType; +use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; +use risingwave_pb::plan_common::GeneratedColumnDesc; + +use super::generic::GenericPlanRef; +use super::stream_watermark_filter::StreamWatermarkFilter; +use super::utils::{childless_record, Distill}; +use super::{ + generic, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, + PlanBase, PlanRef, PredicatePushdown, StreamRowIdGen, ToBatch, ToStream, +}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, InputRef}; +use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::column_names_pretty; +use crate::optimizer::plan_node::{ + ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamSourceBackfill, + ToStreamContext, +}; +use crate::optimizer::property::Distribution::HashShard; +use crate::utils::{ColIndexMapping, Condition, IndexRewriter}; + +/// `LogicalSourceBackfill` returns contents of a table or other equivalent object +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalSourceBackfill { + pub base: PlanBase, + pub core: generic::Source, + + // TODO: generated columns aren't handled yet. + /// Expressions to output. This field presents and will be turned to a `Project` when + /// converting to a physical plan, only if there are generated columns. + output_exprs: Option>, +} + +impl LogicalSourceBackfill { + pub fn new( + source_catalog: Option>, + column_catalog: Vec, + row_id_index: Option, + gen_row_id: bool, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Result { + let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); + let core = generic::Source { + catalog: source_catalog, + column_catalog, + row_id_index, + gen_row_id, + for_table, + ctx, + kafka_timestamp_range, + }; + + let base = PlanBase::new_logical_with_core(&core); + + let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?; + + Ok(LogicalSourceBackfill { + base, + core, + output_exprs, + }) + } + + pub fn with_catalog( + source_catalog: Rc, + for_table: bool, + ctx: OptimizerContextRef, + ) -> Result { + let column_catalogs = source_catalog.columns.clone(); + let row_id_index = source_catalog.row_id_index; + let gen_row_id = source_catalog.append_only; + + Self::new( + Some(source_catalog), + column_catalogs, + row_id_index, + gen_row_id, + for_table, + ctx, + ) + } + + pub fn derive_output_exprs_from_generated_columns( + columns: &[ColumnCatalog], + ) -> Result>> { + if !columns.iter().any(|c| c.is_generated()) { + return Ok(None); + } + + let col_mapping = { + let mut mapping = vec![None; columns.len()]; + let mut cur = 0; + for (idx, column) in columns.iter().enumerate() { + if !column.is_generated() { + mapping[idx] = Some(cur); + cur += 1; + } else { + mapping[idx] = None; + } + } + ColIndexMapping::new(mapping, columns.len()) + }; + + let mut rewriter = IndexRewriter::new(col_mapping); + let mut exprs = Vec::with_capacity(columns.len()); + let mut cur = 0; + for column in columns { + let column_desc = &column.column_desc; + let ret_data_type = column_desc.data_type.clone(); + + if let Some(GeneratedOrDefaultColumn::GeneratedColumn(generated_column)) = + &column_desc.generated_or_default_column + { + let GeneratedColumnDesc { expr } = generated_column; + // TODO(yuhao): avoid this `from_expr_proto`. + let proj_expr = + rewriter.rewrite_expr(ExprImpl::from_expr_proto(expr.as_ref().unwrap())?); + let casted_expr = proj_expr.cast_assign(ret_data_type)?; + exprs.push(casted_expr); + } else { + let input_ref = InputRef { + data_type: ret_data_type, + index: cur, + }; + cur += 1; + exprs.push(ExprImpl::InputRef(Box::new(input_ref))); + } + } + + Ok(Some(exprs)) + } + + /// `row_id_index` in source node should rule out generated column + #[must_use] + fn rewrite_row_id_idx(columns: &[ColumnCatalog], row_id_index: Option) -> Option { + row_id_index.map(|idx| { + let mut cnt = 0; + for col in columns.iter().take(idx + 1) { + if col.is_generated() { + cnt += 1; + } + } + idx - cnt + }) + } + + pub fn source_catalog(&self) -> Rc { + self.core + .catalog + .clone() + .expect("source catalog should exist for LogicalSourceBackfill") + } + + fn clone_with_kafka_timestamp_range(&self, range: (Bound, Bound)) -> Self { + let mut core = self.core.clone(); + core.kafka_timestamp_range = range; + Self { + base: self.base.clone(), + core, + output_exprs: self.output_exprs.clone(), + } + } + + /// The columns in stream/batch source node indicate the actual columns it will produce, + /// instead of the columns defined in source catalog. The difference is generated columns. + #[must_use] + fn rewrite_to_stream_batch_source(&self) -> generic::Source { + let column_catalog = self.core.column_catalog.clone(); + // Filter out the generated columns. + let row_id_index = Self::rewrite_row_id_idx(&column_catalog, self.core.row_id_index); + let source_column_catalogs = column_catalog + .into_iter() + .filter(|c| !c.is_generated()) + .collect_vec(); + generic::Source { + catalog: self.core.catalog.clone(), + column_catalog: source_column_catalogs, + row_id_index, + ctx: self.core.ctx.clone(), + ..self.core + } + } +} + +impl_plan_tree_node_for_leaf! {LogicalSourceBackfill} +impl Distill for LogicalSourceBackfill { + fn distill<'a>(&self) -> XmlNode<'a> { + let src = Pretty::from(self.source_catalog().name.clone()); + let time = Pretty::debug(&self.core.kafka_timestamp_range); + let fields = vec![ + ("source", src), + ("columns", column_names_pretty(self.schema())), + ("time_range", time), + ]; + + childless_record("LogicalSourceBackfill", fields) + } +} + +impl ColPrunable for LogicalSourceBackfill { + fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { + let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len()); + LogicalProject::with_mapping(self.clone().into(), mapping).into() + } +} + +impl ExprRewritable for LogicalSourceBackfill { + fn has_rewritable_expr(&self) -> bool { + self.output_exprs.is_some() + } + + fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { + let mut output_exprs = self.output_exprs.clone(); + + for expr in output_exprs.iter_mut().flatten() { + *expr = r.rewrite_expr(expr.clone()); + } + + Self { + output_exprs, + ..self.clone() + } + .into() + } +} + +impl ExprVisitable for LogicalSourceBackfill { + fn visit_exprs(&self, v: &mut dyn ExprVisitor) { + self.output_exprs + .iter() + .flatten() + .for_each(|e| v.visit_expr(e)); + } +} + +/// A util function to extract kafka offset timestamp range. +/// +/// Currently we only support limiting kafka offset timestamp range using literals, e.g. we only +/// support expressions like `_rw_kafka_timestamp <= '2022-10-11 1:00:00+00:00'`. +/// +/// # Parameters +/// +/// * `expr`: Expression to be consumed. +/// * `range`: Original timestamp range, if `expr` can be recognized, we will update `range`. +/// * `schema`: Input schema. +/// +/// # Return Value +/// +/// If `expr` can be recognized and consumed by this function, then we return `None`. +/// Otherwise `expr` is returned. +fn expr_to_kafka_timestamp_range( + expr: ExprImpl, + range: &mut (Bound, Bound), + schema: &Schema, +) -> Option { + let merge_upper_bound = |first, second| -> Bound { + match (first, second) { + (first, Unbounded) => first, + (Unbounded, second) => second, + (Included(f1), Included(f2)) => Included(min(f1, f2)), + (Included(f1), Excluded(f2)) => { + if f1 < f2 { + Included(f1) + } else { + Excluded(f2) + } + } + (Excluded(f1), Included(f2)) => { + if f2 < f1 { + Included(f2) + } else { + Excluded(f1) + } + } + (Excluded(f1), Excluded(f2)) => Excluded(min(f1, f2)), + } + }; + + let merge_lower_bound = |first, second| -> Bound { + match (first, second) { + (first, Unbounded) => first, + (Unbounded, second) => second, + (Included(f1), Included(f2)) => Included(max(f1, f2)), + (Included(f1), Excluded(f2)) => { + if f1 > f2 { + Included(f1) + } else { + Excluded(f2) + } + } + (Excluded(f1), Included(f2)) => { + if f2 > f1 { + Included(f2) + } else { + Excluded(f1) + } + } + (Excluded(f1), Excluded(f2)) => Excluded(max(f1, f2)), + } + }; + + let extract_timestampz_literal = |expr: &ExprImpl| -> Result> { + match expr { + ExprImpl::FunctionCall(function_call) if function_call.inputs().len() == 2 => { + match (&function_call.inputs()[0], &function_call.inputs()[1]) { + (ExprImpl::InputRef(input_ref), literal) + if let Some(datum) = literal.try_fold_const().transpose()? + && schema.fields[input_ref.index].name + == KAFKA_TIMESTAMP_COLUMN_NAME + && literal.return_type() == DataType::Timestamptz => + { + Ok(Some(( + datum.unwrap().into_timestamptz().timestamp_millis(), + false, + ))) + } + (literal, ExprImpl::InputRef(input_ref)) + if let Some(datum) = literal.try_fold_const().transpose()? + && schema.fields[input_ref.index].name + == KAFKA_TIMESTAMP_COLUMN_NAME + && literal.return_type() == DataType::Timestamptz => + { + Ok(Some(( + datum.unwrap().into_timestamptz().timestamp_millis(), + true, + ))) + } + _ => Ok(None), + } + } + _ => Ok(None), + } + }; + + match &expr { + ExprImpl::FunctionCall(function_call) => { + if let Ok(Some((timestampz_literal, reverse))) = extract_timestampz_literal(&expr) { + match function_call.func_type() { + ExprType::GreaterThan => { + if reverse { + range.1 = merge_upper_bound(range.1, Excluded(timestampz_literal)); + } else { + range.0 = merge_lower_bound(range.0, Excluded(timestampz_literal)); + } + + None + } + ExprType::GreaterThanOrEqual => { + if reverse { + range.1 = merge_upper_bound(range.1, Included(timestampz_literal)); + } else { + range.0 = merge_lower_bound(range.0, Included(timestampz_literal)); + } + None + } + ExprType::Equal => { + range.0 = merge_lower_bound(range.0, Included(timestampz_literal)); + range.1 = merge_upper_bound(range.1, Included(timestampz_literal)); + None + } + ExprType::LessThan => { + if reverse { + range.0 = merge_lower_bound(range.0, Excluded(timestampz_literal)); + } else { + range.1 = merge_upper_bound(range.1, Excluded(timestampz_literal)); + } + None + } + ExprType::LessThanOrEqual => { + if reverse { + range.0 = merge_lower_bound(range.0, Included(timestampz_literal)); + } else { + range.1 = merge_upper_bound(range.1, Included(timestampz_literal)); + } + None + } + _ => Some(expr), + } + } else { + Some(expr) + } + } + _ => Some(expr), + } +} + +impl PredicatePushdown for LogicalSourceBackfill { + fn predicate_pushdown( + &self, + predicate: Condition, + _ctx: &mut PredicatePushdownContext, + ) -> PlanRef { + let mut range = self.core.kafka_timestamp_range; + + let mut new_conjunctions = Vec::with_capacity(predicate.conjunctions.len()); + for expr in predicate.conjunctions { + if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, self.base.schema()) { + // Not recognized, so push back + new_conjunctions.push(e); + } + } + + let new_source = self.clone_with_kafka_timestamp_range(range).into(); + + if new_conjunctions.is_empty() { + new_source + } else { + LogicalFilter::create( + new_source, + Condition { + conjunctions: new_conjunctions, + }, + ) + } + } +} + +impl ToBatch for LogicalSourceBackfill { + fn to_batch(&self) -> Result { + // TODO: + let source = BatchSource::new(self.core.clone()); + Ok(source.into()) + } +} + +impl ToStream for LogicalSourceBackfill { + fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { + let mut plan = StreamSourceBackfill::new(self.rewrite_to_stream_batch_source()).into(); + + let catalog = self.source_catalog(); + if !catalog.watermark_descs.is_empty() && !self.core.for_table { + plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); + } + + assert!(!(self.core.gen_row_id && self.core.for_table)); + if let Some(row_id_index) = self.core.row_id_index + && self.core.gen_row_id + { + plan = StreamRowIdGen::new_with_dist(plan, row_id_index, HashShard(vec![row_id_index])) + .into(); + } + Ok(plan) + } + + fn logical_rewrite_for_stream( + &self, + _ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + Ok(( + self.clone().into(), + ColIndexMapping::identity(self.schema().len()), + )) + } +} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index a0475c4ae092e..977d4c00b16a6 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -676,8 +676,8 @@ impl dyn PlanNode { impl dyn PlanNode { /// Serialize the plan node and its children to a stream plan proto. /// - /// Note that [`StreamTableScan`] has its own implementation of `to_stream_prost`. We have a - /// hook inside to do some ad-hoc thing for [`StreamTableScan`]. + /// Note that some operators has their own implementation of `to_stream_prost`. We have a + /// hook inside to do some ad-hoc things. pub fn to_stream_prost( &self, state: &mut BuildFragmentGraphState, @@ -690,6 +690,9 @@ impl dyn PlanNode { if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() { return stream_cdc_table_scan.adhoc_to_stream_prost(state); } + if let Some(stream_source_backfill) = self.as_stream_source_backfill() { + return stream_source_backfill.adhoc_to_stream_prost(state); + } if let Some(stream_share) = self.as_stream_share() { return stream_share.adhoc_to_stream_prost(state); } @@ -824,6 +827,7 @@ mod logical_project_set; mod logical_scan; mod logical_share; mod logical_source; +mod logical_source_backfill; mod logical_sys_scan; mod logical_table_function; mod logical_topn; @@ -853,6 +857,7 @@ mod stream_simple_agg; mod stream_sink; mod stream_sort; mod stream_source; +mod stream_source_backfill; mod stream_stateless_simple_agg; mod stream_table_scan; mod stream_topn; @@ -915,6 +920,7 @@ pub use logical_project_set::LogicalProjectSet; pub use logical_scan::LogicalScan; pub use logical_share::LogicalShare; pub use logical_source::LogicalSource; +pub use logical_source_backfill::LogicalSourceBackfill; pub use logical_sys_scan::LogicalSysScan; pub use logical_table_function::LogicalTableFunction; pub use logical_topn::LogicalTopN; @@ -946,6 +952,7 @@ pub use stream_simple_agg::StreamSimpleAgg; pub use stream_sink::StreamSink; pub use stream_sort::StreamEowcSort; pub use stream_source::StreamSource; +pub use stream_source_backfill::StreamSourceBackfill; pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg; pub use stream_table_scan::StreamTableScan; pub use stream_temporal_join::StreamTemporalJoin; @@ -987,6 +994,7 @@ macro_rules! for_all_plan_nodes { , { Logical, CdcScan } , { Logical, SysScan } , { Logical, Source } + , { Logical, SourceBackfill } , { Logical, Insert } , { Logical, Delete } , { Logical, Update } @@ -1040,6 +1048,7 @@ macro_rules! for_all_plan_nodes { , { Stream, CdcTableScan } , { Stream, Sink } , { Stream, Source } + , { Stream, SourceBackfill } , { Stream, HashJoin } , { Stream, Exchange } , { Stream, HashAgg } @@ -1083,6 +1092,7 @@ macro_rules! for_logical_plan_nodes { , { Logical, CdcScan } , { Logical, SysScan } , { Logical, Source } + , { Logical, SourceBackfill } , { Logical, Insert } , { Logical, Delete } , { Logical, Update } @@ -1156,6 +1166,7 @@ macro_rules! for_stream_plan_nodes { , { Stream, CdcTableScan } , { Stream, Sink } , { Stream, Source } + , { Stream, SourceBackfill } , { Stream, HashAgg } , { Stream, SimpleAgg } , { Stream, StatelessSimpleAgg } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 65eaba0525d04..8a1df949a3f11 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -132,6 +132,7 @@ impl StreamNode for StreamCdcTableScan { } impl StreamCdcTableScan { + /// plan: merge -> filter -> exchange(simple) -> `stream_scan` pub fn adhoc_to_stream_prost( &self, state: &mut BuildFragmentGraphState, @@ -241,10 +242,10 @@ impl StreamCdcTableScan { .collect_vec(); tracing::debug!( - "output_column_ids: {:?}, upstream_column_ids: {:?}, output_indices: {:?}", - self.core.output_column_ids(), - upstream_column_ids, - output_indices + output_column_ids=?self.core.output_column_ids(), + ?upstream_column_ids, + ?output_indices, + "stream cdc table scan output indices" ); let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode { diff --git a/src/frontend/src/optimizer/plan_node/stream_source_backfill.rs b/src/frontend/src/optimizer/plan_node/stream_source_backfill.rs new file mode 100644 index 0000000000000..e5f2b8c221611 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/stream_source_backfill.rs @@ -0,0 +1,189 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::rc::Rc; + +use fixedbitset::FixedBitSet; +use itertools::Itertools; +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::catalog::Field; +use risingwave_common::types::DataType; +use risingwave_common::util::sort_util::OrderType; +use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; +use risingwave_pb::stream_plan::PbStreamNode; + +use super::stream::prelude::*; +use super::utils::TableCatalogBuilder; +use super::{PlanBase, PlanRef}; +use crate::catalog::source_catalog::SourceCatalog; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::{childless_record, Distill}; +use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode}; +use crate::optimizer::property::Distribution; +use crate::scheduler::SchedulerResult; +use crate::stream_fragmenter::BuildFragmentGraphState; +use crate::{Explain, TableCatalog, WithOptions}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct StreamSourceBackfill { + pub base: PlanBase, + core: generic::Source, +} + +impl_plan_tree_node_for_leaf! { StreamSourceBackfill } + +impl StreamSourceBackfill { + pub fn new(source: generic::Source) -> Self { + let base = PlanBase::new_stream_with_core( + &source, + Distribution::SomeShard, + source.catalog.as_ref().map_or(true, |s| s.append_only), + false, + FixedBitSet::with_capacity(source.column_catalog.len()), + ); + + Self { base, core: source } + } + + fn get_columns(&self) -> Vec<&str> { + self.core + .column_catalog + .iter() + .map(|column| column.name()) + .collect() + } + + pub fn source_catalog(&self) -> Rc { + self.core + .catalog + .clone() + .expect("source backfill should have source cataglog") + } + + pub fn infer_internal_table_catalog() -> TableCatalog { + // note that source's internal table is to store partition_id -> offset mapping and its + // schema is irrelevant to input schema + // On the premise of ensuring that the materialized_source data can be cleaned up, keep the + // state in source. + // Source state doesn't maintain retention_seconds, internal_table_subset function only + // returns retention_seconds so default is used here + let mut builder = TableCatalogBuilder::new(WithOptions::new(HashMap::default())); + + let key = Field { + data_type: DataType::Varchar, + name: "partition_id".to_string(), + sub_fields: vec![], + type_name: "".to_string(), + }; + let value = Field { + data_type: DataType::Jsonb, + name: "backfill_progress".to_string(), + sub_fields: vec![], + type_name: "".to_string(), + }; + + let ordered_col_idx = builder.add_column(&key); + builder.add_column(&value); + builder.add_order_column(ordered_col_idx, OrderType::ascending()); + + builder.build(vec![], 1) + } + + pub fn adhoc_to_stream_prost( + &self, + state: &mut BuildFragmentGraphState, + ) -> SchedulerResult { + use risingwave_pb::stream_plan::*; + + let stream_key = self + .stream_key() + .unwrap_or_else(|| { + panic!( + "should always have a stream key in the stream plan but not, sub plan: {}", + PlanRef::from(self.clone()).explain_to_string() + ) + }) + .iter() + .map(|x| *x as u32) + .collect_vec(); + + let source_catalog = self.source_catalog(); + let source_inner = SourceBackfillNode { + source_id: source_catalog.id, + source_name: source_catalog.name.clone(), + state_table: Some( + Self::infer_internal_table_catalog() + .with_id(state.gen_table_id_wrapped()) + .to_internal_table_prost(), + ), + info: Some(source_catalog.info.clone()), + // XXX: what's the usage of this? + row_id_index: self.core.row_id_index.map(|index| index as _), + columns: self + .core + .column_catalog + .iter() + .map(|c| c.to_protobuf()) + .collect_vec(), + with_properties: source_catalog.with_properties.clone().into_iter().collect(), + // rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + }; + + let stream_scan_body = PbNodeBody::SourceBackfill(source_inner); + + let fields = self.schema().to_prost(); + // plan: merge -> backfill + Ok(PbStreamNode { + fields: fields.clone(), + input: vec![ + // The merge node body will be filled by the `ActorBuilder` on the meta service. + PbStreamNode { + node_body: Some(PbNodeBody::Merge(Default::default())), + identity: "Upstream".into(), + fields, + stream_key: vec![], // not used + ..Default::default() + }, + ], + node_body: Some(stream_scan_body), + stream_key, + operator_id: self.base.id().0 as u64, + identity: self.distill_to_string(), + append_only: self.append_only(), + }) + } +} + +impl Distill for StreamSourceBackfill { + fn distill<'a>(&self) -> XmlNode<'a> { + let columns = self + .get_columns() + .iter() + .map(|ele| Pretty::from(ele.to_string())) + .collect(); + let col = Pretty::Array(columns); + childless_record("StreamSourceBackfill", vec![("columns", col)]) + } +} + +impl ExprRewritable for StreamSourceBackfill {} + +impl ExprVisitable for StreamSourceBackfill {} + +impl StreamNode for StreamSourceBackfill { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody { + unreachable!("stream source backfill cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead.") + } +} diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 42fdc83a3f933..62414cb2ab005 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -27,7 +27,8 @@ use crate::binder::{ use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ LogicalApply, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, LogicalShare, - LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef, + LogicalSource, LogicalSourceBackfill, LogicalSysScan, LogicalTableFunction, LogicalValues, + PlanRef, }; use crate::optimizer::property::Cardinality; use crate::planner::Planner; @@ -85,7 +86,14 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx())?.into()) + if source.can_backfill() { + Ok( + LogicalSourceBackfill::with_catalog(Rc::new(source.catalog), false, self.ctx())? + .into(), + ) + } else { + Ok(LogicalSource::with_catalog(Rc::new(source.catalog), false, self.ctx())?.into()) + } } pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result { diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 344aa103deeb4..8d2185539523b 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -264,9 +264,9 @@ fn build_fragment( if let Some(source) = node.source_inner.as_ref() && let Some(source_info) = source.info.as_ref() - && source_info.cdc_source_job + && source_info.has_streaming_job + && !source_info.is_distributed { - tracing::debug!("mark cdc source job as singleton"); current_fragment.requires_singleton = true; } } @@ -294,6 +294,7 @@ fn build_fragment( } NodeBody::StreamCdcScan(_) => { + // XXX: Should we use a different flag for CDC scan? current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; // the backfill algorithm is not parallel safe current_fragment.requires_singleton = true; @@ -309,6 +310,13 @@ fn build_fragment( .upstream_table_ids .push(node.upstream_source_id); } + NodeBody::SourceBackfill(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceBackfill as u32; + // memorize upstream source id for later use + let source_id = node.source_id; + state.dependent_table_ids.insert(source_id.into()); + current_fragment.upstream_table_ids.push(source_id); + } NodeBody::Now(_) => { // TODO: Remove this and insert a `BarrierRecv` instead. diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index f388944ff008e..0aed7a8aa9480 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -30,9 +30,8 @@ use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::throttle_mutation::RateLimit; use risingwave_pb::stream_plan::update_mutation::*; use risingwave_pb::stream_plan::{ - AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, FragmentTypeFlag, - PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, ThrottleMutation, - UpdateMutation, + AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, PauseMutation, + ResumeMutation, SourceChangeSplitMutation, StopMutation, ThrottleMutation, UpdateMutation, }; use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; use uuid::Uuid; @@ -689,24 +688,11 @@ impl CommandContext { pub fn actors_to_track(&self) -> HashSet { match &self.command { Command::CreateStreamingJob { - dispatchers, - table_fragments, - .. - } => { - // cdc backfill table job doesn't need to be tracked - if table_fragments.fragments().iter().any(|fragment| { - fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 - }) { - Default::default() - } else { - dispatchers - .values() - .flatten() - .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter().copied()) - .chain(table_fragments.values_actor_ids()) - .collect() - } - } + table_fragments, .. + } => table_fragments + .tracking_progress_actor_ids() + .into_iter() + .collect(), _ => Default::default(), } } @@ -953,7 +939,7 @@ impl CommandContext { // Extract the fragments that include source operators. let source_fragments = table_fragments.stream_source_fragments(); - + // TODO: handle backfill? self.barrier_manager_context .source_manager .apply_source_change( diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index f91262117be16..e025c54e1690e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -840,6 +840,7 @@ impl GlobalBarrierManager { } commands }; + tracing::trace!("finished_commands: {}", finished_commands.len()); for command in finished_commands { self.checkpoint_control.stash_command_to_finish(command); diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 0c753a3c3f025..8c2b9d5e32641 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -206,6 +206,11 @@ impl TrackingJob { pub(crate) fn notify_finished(self) { match self { TrackingJob::New(command) => { + tracing::trace!( + "notify finished, command: {:?}, curr_epoch: {:?}", + command.context.command, + command.context.curr_epoch + ); command .notifiers .into_iter() @@ -368,6 +373,7 @@ impl CreateMviewProgressTracker { version_stats: &HummockVersionStats, ) -> Option { let actors = command.context.actors_to_track(); + tracing::trace!("add actors to track: {:?}", actors); if actors.is_empty() { // The command can be finished immediately. return Some(TrackingJob::New(command)); @@ -426,6 +432,7 @@ impl CreateMviewProgressTracker { upstream_total_key_count, definition, ); + tracing::trace!("add progress: {:?}", progress); if *ddl_type == DdlType::Sink { // We return the original tracking job immediately. // This is because sink can be decoupled with backfill progress. @@ -450,6 +457,7 @@ impl CreateMviewProgressTracker { progress: &CreateMviewProgress, version_stats: &HummockVersionStats, ) -> Option { + tracing::trace!("update progress: {:?}", progress); let actor = progress.backfill_actor_id; let Some(table_id) = self.actor_map.get(&actor).copied() else { // On restart, backfill will ALWAYS notify CreateMviewProgressTracker, diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index aab3234d620cb..a66673b7fff4d 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -278,21 +278,25 @@ impl BarrierScheduler { let mut infos = Vec::with_capacity(contexts.len()); for (injected_rx, collect_rx, finish_rx) in contexts { + tracing::trace!("waiting for command to be injected"); // Wait for this command to be injected, and record the result. let info = injected_rx .await .map_err(|e| anyhow!("failed to inject barrier: {}", e))?; infos.push(info); + tracing::trace!("injected_rx finished"); // Throw the error if it occurs when collecting this barrier. collect_rx .await .map_err(|e| anyhow!("failed to collect barrier: {}", e))??; + tracing::trace!("collect_rx finished"); // Wait for this command to be finished. finish_rx .await .map_err(|e| anyhow!("failed to finish command: {}", e))?; + tracing::trace!("finish_rx finished"); } Ok(infos) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 70fc1c52a0fce..d472161b30021 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1422,7 +1422,7 @@ impl CatalogController { .map(|obj| obj.oid) .collect_vec(); - // cdc source streaming job. + // source streaming job. if object_type == ObjectType::Source { let source_info: Option = Source::find_by_id(object_id) .select_only() @@ -1432,7 +1432,7 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?; if let Some(source_info) = source_info - && source_info.into_inner().cdc_source_job + && source_info.into_inner().has_streaming_job { to_drop_streaming_jobs.push(object_id); } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 0de826fef9f86..fa4e52bbaf322 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -39,8 +39,7 @@ use risingwave_pb::meta::{ use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - PbDispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, PbStreamNode, - StreamSource, + PbDispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, }; use sea_orm::sea_query::{Expr, Value}; use sea_orm::ActiveValue::Set; @@ -1160,23 +1159,6 @@ impl CatalogController { Ok(chain_fragments) } - /// Find the external stream source info inside the stream node, if any. - fn find_stream_source(stream_node: &PbStreamNode) -> Option<&StreamSource> { - if let Some(NodeBody::Source(source)) = &stream_node.node_body { - if let Some(inner) = &source.source_inner { - return Some(inner); - } - } - - for child in &stream_node.input { - if let Some(source) = Self::find_stream_source(child) { - return Some(source); - } - } - - None - } - pub async fn load_source_fragment_ids( &self, ) -> MetaResult>> { @@ -1195,9 +1177,9 @@ impl CatalogController { let mut source_fragment_ids = HashMap::new(); for (fragment_id, _, stream_node) in fragments { - if let Some(source) = Self::find_stream_source(stream_node.inner_ref()) { + if let Some(source_id) = stream_node.inner_ref().find_stream_source() { source_fragment_ids - .entry(source.source_id as SourceId) + .entry(source_id as SourceId) .or_insert_with(BTreeSet::new) .insert(fragment_id); } @@ -1205,31 +1187,33 @@ impl CatalogController { Ok(source_fragment_ids) } - pub async fn get_stream_source_fragment_ids( + pub async fn load_backfill_fragment_ids( &self, - job_id: ObjectId, - ) -> MetaResult>> { + ) -> MetaResult>> { let inner = self.inner.read().await; - let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() + let mut fragments: Vec<(FragmentId, Vec, i32, StreamNode)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, + fragment::Column::UpstreamFragmentId, fragment::Column::FragmentTypeMask, fragment::Column::StreamNode, ]) - .filter(fragment::Column::JobId.eq(job_id)) .into_tuple() .all(&inner.db) .await?; - fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::Source as i32 != 0); + fragments.retain(|(_, _, mask, _)| *mask & PbFragmentTypeFlag::SourceBackfill as i32 != 0); let mut source_fragment_ids = HashMap::new(); - for (fragment_id, _, stream_node) in fragments { - if let Some(source) = Self::find_stream_source(stream_node.inner_ref()) { + for (fragment_id, upstream_fragment_id, _, stream_node) in fragments { + if let Some(source_id) = stream_node.inner_ref().find_source_backfill() { + if upstream_fragment_id.len() != 1 { + bail!("SourceBackfill should have only one upstream fragment, found {} for fragment {}", upstream_fragment_id.len(), fragment_id); + } source_fragment_ids - .entry(source.source_id as SourceId) + .entry(source_id as SourceId) .or_insert_with(BTreeSet::new) - .insert(fragment_id); + .insert((fragment_id, upstream_fragment_id[0])); } } Ok(source_fragment_ids) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 576c5c3c40699..636f188cc186e 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -683,7 +683,7 @@ impl CatalogController { if let Some(table_id) = source.optional_associated_table_id { vec![table_id] } else if let Some(source_info) = &source.source_info - && source_info.inner_ref().cdc_source_job + && source_info.inner_ref().has_streaming_job { vec![source_id] } else { @@ -714,6 +714,7 @@ impl CatalogController { .all(&txn) .await?; + // TODO: limit source backfill? fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index a9c7459d745d9..7cd623badbd26 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -361,11 +361,13 @@ impl DatabaseManager { .chain(self.indexes.keys().copied()) .chain(self.sources.keys().copied()) .chain( - // filter cdc source jobs self.sources .iter() .filter(|(_, source)| { - source.info.as_ref().is_some_and(|info| info.cdc_source_job) + source + .info + .as_ref() + .is_some_and(|info| info.has_streaming_job) }) .map(|(id, _)| id) .copied(), diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 72f903d3b389d..44ad9631ec0cc 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1009,7 +1009,7 @@ impl FragmentManager { pub async fn get_running_actors_and_upstream_fragment_of_fragment( &self, fragment_id: FragmentId, - ) -> MetaResult<(Vec, Vec)> { + ) -> MetaResult)>> { let map = &self.core.read().await.table_fragments; for table_fragment in map.values() { @@ -1020,9 +1020,9 @@ impl FragmentManager { .filter(|a| { table_fragment.actor_status[&a.actor_id].state == ActorState::Running as i32 }) - .cloned() + .map(|a| (a.actor_id, a.upstream_actor_id.clone())) .collect(); - return Ok((running_actors, fragment.upstream_fragment_ids.clone())); + return Ok(running_actors); } } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 89294fdcd43df..276c892a6448d 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1145,7 +1145,7 @@ impl CatalogManager { let mut all_sink_ids: HashSet = HashSet::default(); let mut all_source_ids: HashSet = HashSet::default(); let mut all_view_ids: HashSet = HashSet::default(); - let mut all_cdc_source_ids: HashSet = HashSet::default(); + let mut all_streaming_job_source_ids: HashSet = HashSet::default(); let relations_depend_on = |relation_id: RelationId| -> Vec { let tables_depend_on = tables @@ -1408,11 +1408,10 @@ impl CatalogManager { continue; } - // cdc source streaming job if let Some(info) = source.info - && info.cdc_source_job + && info.has_streaming_job { - all_cdc_source_ids.insert(source.id); + all_streaming_job_source_ids.insert(source.id); let source_table_fragments = fragment_manager .select_table_fragments_by_table_id(&source.id.into()) .await?; @@ -1669,7 +1668,7 @@ impl CatalogManager { .into_iter() .map(|id| id.into()) .chain(all_sink_ids.into_iter().map(|id| id.into())) - .chain(all_cdc_source_ids.into_iter().map(|id| id.into())) + .chain(all_streaming_job_source_ids.into_iter().map(|id| id.into())) .collect_vec(); Ok((version, catalog_deleted_ids)) diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 30726984a1d99..0d805f201e1b8 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -372,10 +372,10 @@ impl MetadataManager { } } - pub async fn get_running_actors_and_upstream_fragment_of_fragment( + pub async fn get_running_actors_and_upstream_actors_of_fragment( &self, id: FragmentId, - ) -> MetaResult<(Vec, Vec)> { + ) -> MetaResult)>> { match self { MetadataManager::V1(mgr) => { mgr.fragment_manager diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 54993d8fee805..3a96f8a142c3f 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -30,7 +30,7 @@ use risingwave_pb::meta::{PbTableFragments, PbTableParallelism}; use risingwave_pb::plan_common::PbExprContext; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - FragmentTypeFlag, PbFragmentTypeFlag, PbStreamContext, StreamActor, StreamNode, StreamSource, + FragmentTypeFlag, PbFragmentTypeFlag, PbStreamContext, StreamActor, StreamNode, }; use super::{ActorId, FragmentId}; @@ -337,7 +337,7 @@ impl TableFragments { } /// Returns the actor ids with the given fragment type. - fn filter_actor_ids(&self, check_type: impl Fn(u32) -> bool) -> Vec { + pub fn filter_actor_ids(&self, check_type: impl Fn(u32) -> bool) -> Vec { self.fragments .values() .filter(|fragment| check_type(fragment.get_fragment_type_mask())) @@ -367,10 +367,12 @@ impl TableFragments { }) } - /// Returns values actor ids. - pub fn values_actor_ids(&self) -> Vec { + /// Returns actor ids that need to be tracked when creating MV. + pub fn tracking_progress_actor_ids(&self) -> Vec { Self::filter_actor_ids(self, |fragment_type_mask| { - (fragment_type_mask & FragmentTypeFlag::Values as u32) != 0 + (fragment_type_mask + & (FragmentTypeFlag::Values as u32 | FragmentTypeFlag::StreamScan as u32)) + != 0 }) } @@ -411,23 +413,6 @@ impl TableFragments { .collect() } - /// Find the external stream source info inside the stream node, if any. - pub fn find_stream_source(stream_node: &StreamNode) -> Option<&StreamSource> { - if let Some(NodeBody::Source(source)) = stream_node.node_body.as_ref() { - if let Some(inner) = &source.source_inner { - return Some(inner); - } - } - - for child in &stream_node.input { - if let Some(source) = Self::find_stream_source(child) { - return Some(source); - } - } - - None - } - /// Extract the fragments that include source executors that contains an external stream source, /// grouping by source id. pub fn stream_source_fragments(&self) -> HashMap> { @@ -435,10 +420,7 @@ impl TableFragments { for fragment in self.fragments() { for actor in &fragment.actors { - if let Some(source_id) = - TableFragments::find_stream_source(actor.nodes.as_ref().unwrap()) - .map(|s| s.source_id) - { + if let Some(source_id) = actor.nodes.as_ref().unwrap().find_stream_source() { source_fragments .entry(source_id) .or_insert(BTreeSet::new()) @@ -451,6 +433,29 @@ impl TableFragments { source_fragments } + pub fn source_backfill_fragments( + &self, + ) -> MetadataModelResult>> { + let mut source_fragments = HashMap::new(); + + for fragment in self.fragments() { + for actor in &fragment.actors { + if let Some(source_id) = actor.nodes.as_ref().unwrap().find_source_backfill() { + if fragment.upstream_fragment_ids.len() != 1 { + return Err(anyhow::anyhow!("SourceBackfill should have only one upstream fragment, found {:?} for fragment {}", fragment.upstream_fragment_ids, fragment.fragment_id).into()); + } + source_fragments + .entry(source_id) + .or_insert(BTreeSet::new()) + .insert((fragment.fragment_id, fragment.upstream_fragment_ids[0])); + + break; + } + } + } + Ok(source_fragments) + } + /// Resolve dependent table fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap) { let table_id = match stream_node.node_body.as_ref() { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8a017f6aa2e50..2c5319205584c 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -486,6 +486,7 @@ impl DdlController { unimplemented!("support drop source in v2"); }; // 1. Drop source in catalog. + // If the source has a streaming job, it's also dropped here. let (version, streaming_job_ids) = mgr .catalog_manager .drop_relation( @@ -1288,8 +1289,12 @@ impl DdlController { .get_upstream_root_fragments(fragment_graph.dependent_table_ids()) .await?; - let upstream_actors: HashMap<_, _> = upstream_root_fragments + // XXX: do we need to filter here? + let upstream_mview_actors: HashMap<_, _> = upstream_root_fragments .iter() + // .filter(|(_, fragment)| { + // fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 + // }) .map(|(&table_id, fragment)| { ( table_id, @@ -1366,7 +1371,7 @@ impl DdlController { let ctx = CreateStreamingJobContext { dispatchers, - upstream_mview_actors: upstream_actors, + upstream_mview_actors, internal_tables, building_locations, existing_locations, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 00a8c18885dcc..9e0ac3ca1c935 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -606,7 +606,7 @@ impl ScaleController { if (fragment.get_fragment_type_mask() & FragmentTypeFlag::Source as u32) != 0 { let stream_node = fragment.actors.first().unwrap().get_nodes().unwrap(); - if TableFragments::find_stream_source(stream_node).is_some() { + if stream_node.find_stream_source().is_some() { stream_source_fragment_ids.insert(*fragment_id); } } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 95f5c219351a3..ed1dd74e45350 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -30,6 +30,7 @@ use risingwave_connector::source::{ }; use risingwave_pb::catalog::Source; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; +use risingwave_pb::stream_plan::Dispatcher; use risingwave_rpc_client::ConnectorClient; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{oneshot, Mutex}; @@ -228,8 +229,8 @@ pub struct SourceManagerCore { managed_sources: HashMap, /// Fragments associated with each source source_fragments: HashMap>, - /// Revert index for source_fragments - fragment_sources: HashMap, + /// `source_id` -> `(fragment_id, upstream_fragment_id)` + backfill_fragments: HashMap>, /// Splits assigned per actor actor_splits: HashMap>, @@ -240,20 +241,14 @@ impl SourceManagerCore { metadata_manager: MetadataManager, managed_sources: HashMap, source_fragments: HashMap>, + backfill_fragments: HashMap>, actor_splits: HashMap>, ) -> Self { - let mut fragment_sources = HashMap::new(); - for (source_id, fragment_ids) in &source_fragments { - for fragment_id in fragment_ids { - fragment_sources.insert(*fragment_id, *source_id); - } - } - Self { metadata_manager, managed_sources, source_fragments, - fragment_sources, + backfill_fragments, actor_splits, } } @@ -267,12 +262,13 @@ impl SourceManagerCore { let mut split_assignment: SplitAssignment = HashMap::new(); for (source_id, handle) in &self.managed_sources { - let fragment_ids = match self.source_fragments.get(source_id) { + let source_fragment_ids = match self.source_fragments.get(source_id) { Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids, _ => { continue; } }; + let backfill_fragment_ids = self.backfill_fragments.get(source_id); let Some(discovered_splits) = handle.discovered_splits().await else { continue; @@ -281,21 +277,18 @@ impl SourceManagerCore { tracing::warn!("No splits discovered for source {}", source_id); } - let mut source_fragments = vec![]; - let mut backfill_fragments = vec![]; - - for fragment_id in fragment_ids { - let (actors, upstream_fragment_ids) = match self + for &fragment_id in source_fragment_ids { + let actors = match self .metadata_manager - .get_running_actors_and_upstream_fragment_of_fragment(*fragment_id) + .get_running_actors_of_fragment(fragment_id) .await { - Ok((actors, upstream_fragment_ids)) => { + Ok(actors) => { if actors.is_empty() { tracing::warn!("No actors found for fragment {}", fragment_id); continue; } - (actors, upstream_fragment_ids) + actors } Err(err) => { tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string()); @@ -303,35 +296,9 @@ impl SourceManagerCore { } }; - if !upstream_fragment_ids.is_empty() { - debug_assert!( - upstream_fragment_ids.len() == 1, - "source backfill fragment should have exactly one upstream fragment, fragment_id: {fragment_id}, upstream_fragment_ids: {upstream_fragment_ids:?}" - ); - for actor in &actors { - debug_assert!( - actor.upstream_actor_id.len() == 1, - "source backfill actor should have exactly one upstream actor, fragment_id: {fragment_id}, actor: {actor:?}" - ); - } - backfill_fragments.push((*fragment_id, upstream_fragment_ids[0], actors)); - } else { - for actor in &actors { - debug_assert!( - actor.upstream_actor_id.is_empty(), - "source actor should not have upstream actors, fragment_id: {fragment_id}, actor: {actor:?}" - ); - } - source_fragments.push((*fragment_id, actors)); - } - } - - // assign splits for source fragments first - for (fragment_id, actors) in source_fragments { let prev_actor_splits: HashMap<_, _> = actors .into_iter() - .map(|actor| { - let actor_id = actor.actor_id; + .map(|actor_id| { ( actor_id, self.actor_splits @@ -354,28 +321,41 @@ impl SourceManagerCore { } } - // align splits for backfill fragments with its upstream source fragment - for (fragment_id, upstream_fragment_id, actors) in backfill_fragments { - let upstream_assignment = split_assignment - .get(&upstream_fragment_id) - .unwrap_or_else(||panic!( - "source backfill fragment's upstream fragment should have assignment, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, split_assignment: {split_assignment:?}")); - split_assignment.insert( - fragment_id, - actors - .into_iter() - .map(|a| { - let actor_id = a.actor_id; - ( - actor_id, - upstream_assignment - .get(&actor_id) - .cloned() - .unwrap_or_else(||panic!("source backfill actor should have upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor: {a:?}, upstream_assignment: {upstream_assignment:?}")), - ) - }) - .collect(), - ); + if let Some(backfill_fragment_ids) = backfill_fragment_ids { + // align splits for backfill fragments with its upstream source fragment + for (fragment_id, upstream_fragment_id) in backfill_fragment_ids { + let Some(upstream_assignment) = split_assignment.get(upstream_fragment_id) + else { + // upstream fragment unchanged, do not update backfill fragment too + continue; + }; + let actors = match self + .metadata_manager + .get_running_actors_and_upstream_actors_of_fragment(*fragment_id) + .await + { + Ok(actors) => { + if actors.is_empty() { + tracing::warn!("No actors found for fragment {}", fragment_id); + continue; + } + actors + } + Err(err) => { + tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string()); + continue; + } + }; + split_assignment.insert( + *fragment_id, + align_backfill_splits( + actors, + upstream_assignment, + *fragment_id, + *upstream_fragment_id, + )?, + ); + } } } @@ -390,10 +370,6 @@ impl SourceManagerCore { ) { if let Some(source_fragments) = source_fragments { for (source_id, mut fragment_ids) in source_fragments { - for fragment_id in &fragment_ids { - self.fragment_sources.insert(*fragment_id, source_id); - } - self.source_fragments .entry(source_id) .or_default() @@ -432,10 +408,6 @@ impl SourceManagerCore { entry.remove(); } } - - for fragment_id in &fragment_ids { - self.fragment_sources.remove(fragment_id); - } } for actor_id in actor_splits { @@ -491,6 +463,7 @@ impl Default for SplitDiffOptions { /// /// If an actor has an upstream actor, it should be a backfill executor, /// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case. +/// Use `align_backfill_splits` instead. /// /// - `fragment_id`: just for logging fn reassign_splits( @@ -586,6 +559,32 @@ where ) } +fn align_backfill_splits( + backfill_actors: impl IntoIterator)>, + upstream_assignment: &HashMap>, + fragment_id: FragmentId, + upstream_fragment_id: FragmentId, +) -> anyhow::Result>> { + backfill_actors + .into_iter() + .map(|(actor_id, upstream_actor_id)| { + let err = || anyhow::anyhow!("source backfill actor should have upstream actor, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, actor_id: {actor_id}, upstream_assignment: {upstream_assignment:?}, upstream_actor_id: {upstream_actor_id:?}"); + if upstream_actor_id.len() != 1 { + return Err(err()); + } + let Some(splits ) = upstream_assignment + .get(&upstream_actor_id[0]) + else { + return Err(err()); + }; + Ok(( + actor_id, + splits.clone(), + )) + }) + .collect() +} + impl SourceManager { const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10); const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10); @@ -611,6 +610,7 @@ impl SourceManager { let mut actor_splits = HashMap::new(); let mut source_fragments = HashMap::new(); + let mut backfill_fragments = HashMap::new(); match &metadata_manager { MetadataManager::V1(mgr) => { @@ -622,6 +622,7 @@ impl SourceManager { .values() { source_fragments.extend(table_fragments.stream_source_fragments()); + backfill_fragments.extend(table_fragments.source_backfill_fragments()?); actor_splits.extend(table_fragments.actor_splits.clone()); } } @@ -638,6 +639,21 @@ impl SourceManager { ) }) .collect(); + backfill_fragments = mgr + .catalog_controller + .load_backfill_fragment_ids() + .await? + .into_iter() + .map(|(source_id, fragment_ids)| { + ( + source_id as SourceId, + fragment_ids + .into_iter() + .map(|(id, up_id)| (id as _, up_id as _)) + .collect(), + ) + }) + .collect(); actor_splits = mgr .catalog_controller .load_actor_splits() @@ -662,6 +678,7 @@ impl SourceManager { metadata_manager, managed_sources, source_fragments, + backfill_fragments, actor_splits, )); @@ -806,6 +823,68 @@ impl SourceManager { Ok(assigned) } + pub async fn allocate_splits_for_backfill( + &self, + table_id: &TableId, + dispatchers: &HashMap>, + ) -> MetaResult { + let core = self.core.lock().await; + let table_fragments = core + .metadata_manager + .get_job_fragments_by_id(table_id) + .await?; + + let upstream_assignment = &core.actor_splits; + let source_backfill_fragments = table_fragments.source_backfill_fragments()?; + tracing::debug!( + ?source_backfill_fragments, + ?table_fragments, + "allocate_splits_for_backfill source backfill fragments" + ); + + let mut assigned = HashMap::new(); + + for (_source_id, fragments) in source_backfill_fragments { + for (fragment_id, upstream_fragment_id) in fragments { + let upstream_actors = core + .metadata_manager + .get_running_actors_of_fragment(upstream_fragment_id) + .await?; + let mut backfill_actors = vec![]; + for upstream_actor in upstream_actors { + if let Some(dispatchers) = dispatchers.get(&upstream_actor) { + let err = || { + anyhow::anyhow!( + "source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}", + fragment_id = fragment_id, + upstream_fragment_id = upstream_fragment_id, + upstream_actor = upstream_actor, + dispatchers = dispatchers + ) + }; + if dispatchers.len() != 1 || dispatchers[0].downstream_actor_id.len() != 1 { + return Err(err().into()); + } + + backfill_actors + .push((dispatchers[0].downstream_actor_id[0], vec![upstream_actor])); + } + } + assigned.insert( + fragment_id, + align_backfill_splits( + backfill_actors, + upstream_assignment, + fragment_id, + upstream_fragment_id, + )?, + ); + } + } + + Ok(assigned) + } + /// register connector worker for source. pub async fn register_source(&self, source: &Source) -> anyhow::Result<()> { let mut core = self.core.lock().await; diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index c42c2f5a51425..52a14af0629c6 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -122,7 +122,7 @@ impl ActorBuilder { /// During this process, the following things will be done: /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the /// compute nodes. - /// 2. Fill the upstream mview info of the `Merge` node under the `StreamScan` node. + /// 2. Fill the upstream mview info of the `Merge` node under the other "leaf" nodes. fn rewrite(&self) -> MetaResult { self.rewrite_inner(&self.nodes, 0) } @@ -254,6 +254,44 @@ impl ActorBuilder { }) } + // "Leaf" node `SourceBackfill`. + NodeBody::SourceBackfill(source_backfill) => { + let input = stream_node.get_input(); + assert_eq!(input.len(), 1); + + let merge_node = &input[0]; + assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_))); + + let upstream_source_id = source_backfill.source_id; + + // Index the upstreams by the an external edge ID. + let upstreams = &self.upstreams[&EdgeId::UpstreamExternal { + upstream_table_id: upstream_source_id.into(), + downstream_fragment_id: self.fragment_id, + }]; + + let upstream_actor_id = upstreams.actors.as_global_ids(); + + // rewrite the input of `SourceBackfill` + let input = vec![ + // Fill the merge node body with correct upstream info. + StreamNode { + node_body: Some(NodeBody::Merge(MergeNode { + upstream_actor_id, + upstream_fragment_id: upstreams.fragment_id.as_global_id(), + upstream_dispatcher_type: DispatcherType::NoShuffle as _, + fields: merge_node.fields.clone(), + })), + ..merge_node.clone() + }, + ]; + + Ok(StreamNode { + input, + ..stream_node.clone() + }) + } + // For other nodes, visit the children recursively. _ => { let mut new_stream_node = stream_node.clone(); @@ -622,6 +660,7 @@ impl ActorGraphBuildState { /// The result of a built actor graph. Will be further embedded into the `Context` for building /// actors on the compute nodes. +#[derive(Debug)] pub struct ActorGraphBuildResult { /// The graph of sealed fragments, including all actors. pub graph: BTreeMap, diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 937dadda21fb8..e4b7dc7b1b3b1 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -24,7 +24,7 @@ use risingwave_common::bail; use risingwave_common::catalog::{ generate_internal_table_name_with_type, TableId, CDC_SOURCE_COLUMN_NUM, }; -use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common::util::iter_util::{IntoIteratorExt, ZipEqFast}; use risingwave_common::util::stream_graph_visitor; use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::TableJobType; @@ -54,7 +54,8 @@ pub(super) struct BuildingFragment { /// The ID of the job if it's materialized in this fragment. table_id: Option, - /// The required columns of each upstream table. + /// The required column IDs of each upstream table. + /// Will be converted to indices when building the edge connected to the upstream. /// /// For shared CDC source on table, its `vec![]`, since the output is fixed. upstream_table_columns: HashMap>, @@ -177,6 +178,15 @@ impl BuildingFragment { stream_scan.upstream_column_ids.clone(), ), NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]), + NodeBody::SourceBackfill(backfill) => ( + backfill.source_id.into(), + // FIXME: only pass required columns instead of all columns here + backfill + .columns + .iter() + .map(|c| c.column_desc.as_ref().unwrap().column_id) + .collect(), + ), _ => return, }; table_columns @@ -187,7 +197,7 @@ impl BuildingFragment { assert_eq!( table_columns.len(), fragment.upstream_table_ids.len(), - "fragment type: {}", + "fragment type: {:b}", fragment.fragment_type_mask ); @@ -286,7 +296,7 @@ impl StreamFragmentEdge { /// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later, /// that contains the additional information of pre-existing /// fragments, which are connected to the graph's top-most or bottom-most fragments. -#[derive(Default)] +#[derive(Default, Debug)] pub struct StreamFragmentGraph { /// stores all the fragments in the graph. fragments: HashMap, @@ -513,7 +523,7 @@ pub(super) enum EitherFragment { /// An internal fragment that is being built for the current streaming job. Building(BuildingFragment), - /// An existing fragment that is external but connected to the fragments being built. + /// An existing fragment that is external but connected to the fragments being built.!!!!!!!!!!!!! Existing(Fragment), } @@ -525,6 +535,7 @@ pub(super) enum EitherFragment { /// `Materialize` node will be included in this structure. /// - if we're going to replace the plan of a table with downstream mviews, the downstream fragments /// containing the `StreamScan` nodes will be included in this structure. +#[derive(Debug)] pub struct CompleteStreamFragmentGraph { /// The fragment graph of the streaming job being built. building_graph: StreamFragmentGraph, @@ -655,50 +666,96 @@ impl CompleteStreamFragmentGraph { (source_job_id, edge) } DdlType::MaterializedView | DdlType::Sink | DdlType::Index => { - // handle MV on MV + // handle MV on MV/Source // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` // of the new materialized view. - let mview_fragment = upstream_root_fragments + let upstream_fragment = upstream_root_fragments .get(&upstream_table_id) .context("upstream materialized view fragment not found")?; - let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); - - // Resolve the required output columns from the upstream materialized view. - let (dist_key_indices, output_indices) = { - let nodes = mview_fragment.actors[0].get_nodes().unwrap(); - let mview_node = - nodes.get_node_body().unwrap().as_materialize().unwrap(); - let all_column_ids = mview_node.column_ids(); - let dist_key_indices = mview_node.dist_key_indices(); - let output_indices = output_columns - .iter() - .map(|c| { - all_column_ids - .iter() - .position(|&id| id == *c) - .map(|i| i as u32) - }) - .collect::>>() - .context( - "column not found in the upstream materialized view", - )?; - (dist_key_indices, output_indices) - }; - let dispatch_strategy = mv_on_mv_dispatch_strategy( - uses_arrangement_backfill, - dist_key_indices, - output_indices, - ); - let edge = StreamFragmentEdge { - id: EdgeId::UpstreamExternal { - upstream_table_id, - downstream_fragment_id: id, - }, - dispatch_strategy, - }; - - (mview_id, edge) + let upstream_root_fragment_id = + GlobalFragmentId::new(upstream_fragment.fragment_id); + + if upstream_fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32 + != 0 + { + // Resolve the required output columns from the upstream materialized view. + let (dist_key_indices, output_indices) = { + let nodes = upstream_fragment.actors[0].get_nodes().unwrap(); + let mview_node = + nodes.get_node_body().unwrap().as_materialize().unwrap(); + let all_column_ids = mview_node.column_ids(); + let dist_key_indices = mview_node.dist_key_indices(); + let output_indices = output_columns + .map_collect::<_, _, _, Option>>(|c| { + all_column_ids + .iter() + .position(|&id| id == *c) + .map(|i| i as u32) + }) + .context( + "column not found in the upstream materialized view", + )?; + (dist_key_indices, output_indices) + }; + let dispatch_strategy = mv_on_mv_dispatch_strategy( + uses_arrangement_backfill, + dist_key_indices, + output_indices, + ); + let edge = StreamFragmentEdge { + id: EdgeId::UpstreamExternal { + upstream_table_id, + downstream_fragment_id: id, + }, + dispatch_strategy, + }; + + (upstream_root_fragment_id, edge) + } else if upstream_fragment.fragment_type_mask + & FragmentTypeFlag::Source as u32 + != 0 + { + let source_fragment = upstream_root_fragments + .get(&upstream_table_id) + .context("upstream source fragment not found")?; + let source_job_id = + GlobalFragmentId::new(source_fragment.fragment_id); + + let output_indices = { + let nodes = upstream_fragment.actors[0].get_nodes().unwrap(); + let source_node = + nodes.get_node_body().unwrap().as_source().unwrap(); + + let all_column_ids = source_node.column_ids().unwrap(); + output_columns + .map_collect::<_, _, _, Option>>(|c| { + all_column_ids + .iter() + .position(|&id| id == *c) + .map(|i| i as u32) + }) + .context("column not found in the upstream source node")? + }; + + let edge = StreamFragmentEdge { + id: EdgeId::UpstreamExternal { + upstream_table_id, + downstream_fragment_id: id, + }, + // We always use `NoShuffle` for the exchange between the upstream + // `Source` and the downstream `StreamScan` of the new MV. + dispatch_strategy: DispatchStrategy { + r#type: DispatcherType::NoShuffle as _, + dist_key_indices: vec![], // not used for `NoShuffle` + output_indices, + }, + }; + + (source_job_id, edge) + } else { + bail!("the upstream fragment should be a MView or Source, got fragment type: {:b}", upstream_fragment.fragment_type_mask) + } } DdlType::Source | DdlType::Table(_) => { bail!("the streaming job shouldn't have an upstream fragment, ddl_type: {:?}", ddl_type) diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index ed2dac5be0e06..1ae24ec1b7d51 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -326,6 +326,7 @@ impl Scheduler { /// [`Locations`] represents the parallel unit and worker locations of the actors. #[cfg_attr(test, derive(Default))] +#[derive(Debug)] pub struct Locations { /// actor location map. pub actor_locations: BTreeMap, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 28168d3db64d1..f5fdc4cd8bbd8 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -354,6 +354,7 @@ impl GlobalStreamManager { } } CreatingState::Created => { + tracing::debug!(id=?table_id, "streaming job created"); self.creating_job_info.delete_job(table_id).await; return Ok(()); } @@ -435,6 +436,7 @@ impl GlobalStreamManager { actor_id: actors.clone(), }) .await?; + tracing::debug!("build actors finished"); Ok(()) as MetaResult<()> }) @@ -507,7 +509,7 @@ impl GlobalStreamManager { .await?; let dummy_table_id = table_fragments.table_id(); - + // TODO: need change? let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; @@ -524,7 +526,16 @@ impl GlobalStreamManager { let table_id = table_fragments.table_id(); - let init_split_assignment = self.source_manager.allocate_splits(&table_id).await?; + // Here we need to consider: + // - Source with streaming job (backfill-able source) + // - Table with connector + // - MV on backfill-able source + let mut init_split_assignment = self.source_manager.allocate_splits(&table_id).await?; + init_split_assignment.extend( + self.source_manager + .allocate_splits_for_backfill(&table_id, &dispatchers) + .await?, + ); let command = Command::CreateStreamingJob { table_fragments, @@ -535,7 +546,7 @@ impl GlobalStreamManager { ddl_type, replace_table: replace_table_command, }; - + tracing::trace!(?command, "sending first barrier for creating streaming job"); if let Err(err) = self.barrier_scheduler.run_command(command).await { if create_type == CreateType::Foreground { let mut table_ids = HashSet::from_iter(std::iter::once(table_id)); @@ -569,7 +580,7 @@ impl GlobalStreamManager { .await?; let dummy_table_id = table_fragments.table_id(); - + // TODO: need change? let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; if let Err(err) = self diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index dafa3a568780f..57094cbc8abed 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -190,6 +190,61 @@ impl stream_plan::MaterializeNode { } } +impl stream_plan::SourceNode { + pub fn column_ids(&self) -> Option> { + Some( + self.source_inner + .as_ref()? + .columns + .iter() + .map(|c| c.get_column_desc().unwrap().column_id) + .collect(), + ) + } +} + +impl stream_plan::StreamNode { + /// Find the external stream source info inside the stream node, if any. + /// + /// Returns `source_id`. + pub fn find_stream_source(&self) -> Option { + if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) = + self.node_body.as_ref() + { + if let Some(inner) = &source.source_inner { + return Some(inner.source_id); + } + } + + for child in &self.input { + if let Some(source) = child.find_stream_source() { + return Some(source); + } + } + + None + } + + /// Find the external stream source info inside the stream node, if any. + /// + /// Returns `source_id`. + pub fn find_source_backfill(&self) -> Option { + if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) = + self.node_body.as_ref() + { + return Some(source.source_id); + } + + for child in &self.input { + if let Some(source) = child.find_source_backfill() { + return Some(source); + } + } + + None + } +} + #[cfg(test)] mod tests { use crate::data::{data_type, DataType}; diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 7cce4531be55f..96fb09826d604 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -22,6 +22,7 @@ async-trait = "0.1" auto_enums = "0.8" await-tree = { workspace = true } bytes = "1" +cfg-if = "1" delta_btree_map = { path = "../utils/delta_btree_map" } educe = "0.5" either = "1" @@ -55,6 +56,7 @@ risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } risingwave_storage = { workspace = true } rw_futures_util = { workspace = true } +serde = { version = "1.0", features = ["derive"] } serde_json = "1" smallvec = "1" static_assertions = "1" @@ -90,7 +92,6 @@ risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_hummock_test = { path = "../storage/hummock_test", features = [ "test", ] } -serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.9" tracing-test = "0.2" diff --git a/src/stream/src/executor/exchange/output.rs b/src/stream/src/executor/exchange/output.rs index 7b85633bdd18c..d94e4cfbac841 100644 --- a/src/stream/src/executor/exchange/output.rs +++ b/src/stream/src/executor/exchange/output.rs @@ -67,6 +67,13 @@ impl LocalOutput { } } +impl Drop for LocalOutput { + fn drop(&mut self) { + let backtrace = std::backtrace::Backtrace::capture(); + tracing::debug!(actor_id=?self.actor_id, "dropping LocalOutput, backtrace: {}", backtrace); + } +} + #[async_trait] impl Output for LocalOutput { async fn send(&mut self, message: Message) -> StreamResult<()> { @@ -76,7 +83,7 @@ impl Output for LocalOutput { .await .map_err(|SendError(message)| { anyhow!( - "failed to send message to actor {}: {:?}", + "failed to send message to actor {}, message: {:?}", self.actor_id, message ) @@ -130,7 +137,7 @@ impl Output for RemoteOutput { .await .map_err(|SendError(message)| { anyhow!( - "failed to send message to actor {}: {:#?}", + "failed to send message to actor {}, message: {:?}", self.actor_id, message ) diff --git a/src/stream/src/executor/source/executor_core.rs b/src/stream/src/executor/source/executor_core.rs index 82f151bc9ce31..7ea82017c4da4 100644 --- a/src/stream/src/executor/source/executor_core.rs +++ b/src/stream/src/executor/source/executor_core.rs @@ -41,6 +41,9 @@ pub struct StreamSourceCore { pub(crate) split_state_store: SourceStateTableHandler, /// In-memory cache for the splits. + /// + /// Source messages will only write the cache. + /// It is read on split change and rebuild stream reader on error. pub(crate) state_cache: HashMap, } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 3aa885cfffe1b..1c7254fe5bc3b 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -293,7 +293,7 @@ impl FsFetchExecutor { ) }) .collect(); - state_store_handler.take_snapshot(file_assignment).await?; + state_store_handler.set_states(file_assignment).await?; state_store_handler.state_store.try_flush().await?; } _ => unreachable!(), diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 7065561bbc0f6..ea1b63015c63d 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -237,7 +237,7 @@ impl FsSourceExecutor { if !incompleted.is_empty() { tracing::debug!(actor_id = self.actor_ctx.id, incompleted = ?incompleted, "take snapshot"); - core.split_state_store.take_snapshot(incompleted).await? + core.split_state_store.set_states(incompleted).await? } if !completed.is_empty() { diff --git a/src/stream/src/executor/source/kafka_backfill_executor.rs b/src/stream/src/executor/source/kafka_backfill_executor.rs index d1aef5a6bbf2e..a0b71177dac61 100644 --- a/src/stream/src/executor/source/kafka_backfill_executor.rs +++ b/src/stream/src/executor/source/kafka_backfill_executor.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +// FIXME: rebuild_stream_reader_from_error + use std::assert_matches::assert_matches; use std::cmp::Ordering; use std::fmt::Formatter; use std::pin::pin; -use std::time::Duration; use anyhow::anyhow; use either::Either; @@ -24,35 +25,54 @@ use futures::stream::{select_with_strategy, AbortHandle, Abortable}; use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::buffer::BitmapBuilder; -use risingwave_common::metrics::GLOBAL_ERROR_METRICS; -use risingwave_common::row::Row; +use risingwave_common::row::{Row, RowExt}; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_common::types::JsonbVal; use risingwave_connector::source::{ - BoxSourceWithStateStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, - SplitMetaData, StreamChunkWithState, + BoxSourceWithStateStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitMetaData, + StreamChunkWithState, }; use risingwave_connector::ConnectorParams; +use risingwave_pb::plan_common::AdditionalColumnType; use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; use risingwave_storage::StateStore; -use thiserror_ext::AsReport; -use tokio::sync::mpsc::UnboundedReceiver; -use tokio::time::Instant; +use serde::{Deserialize, Serialize}; use super::executor_core::StreamSourceCore; +use super::kafka_backfill_state_table::BackfillStateTableHandler; use crate::executor::monitor::StreamingMetrics; -use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; -type ExecutorSplitId = String; +pub type SplitId = String; +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum BackfillState { + /// `None` means not started yet. It's the initial state. + Backfilling(Option), + /// Backfill is stopped at this offset. Source needs to filter out messages before this offset. + SourceCachingUp(String), + Finished, +} +pub type BackfillStates = HashMap; + +impl BackfillState { + // TODO: use a more compact encoding? + pub fn encode_to_json(self) -> JsonbVal { + serde_json::to_value(self).unwrap().into() + } + + pub fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) + } +} /// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to /// some latencies in network and cost in meta. const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5; pub struct KafkaBackfillExecutorWrapper { - inner: KafkaBackfillExecutor, + pub inner: KafkaBackfillExecutor, /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset` - input: Box, + pub input: Box, } pub struct KafkaBackfillExecutor { @@ -60,7 +80,9 @@ pub struct KafkaBackfillExecutor { info: ExecutorInfo, /// Streaming source for external - stream_source_core: Option>, + // FIXME: some fields e.g. its state table is not used. We might need to refactor + stream_source_core: StreamSourceCore, + backfill_state_store: BackfillStateTableHandler, /// Metrics for monitor. metrics: Arc, @@ -82,19 +104,20 @@ impl KafkaBackfillExecutor { pub fn new( actor_ctx: ActorContextRef, info: ExecutorInfo, - stream_source_core: Option>, + stream_source_core: StreamSourceCore, metrics: Arc, // barrier_receiver: UnboundedReceiver, system_params: SystemParamsReaderRef, source_ctrl_opts: SourceCtrlOpts, connector_params: ConnectorParams, + backfill_state_store: BackfillStateTableHandler, ) -> Self { Self { actor_ctx, info, stream_source_core, + backfill_state_store, metrics, - // barrier_receiver: Some(barrier_receiver), system_params, source_ctrl_opts, connector_params, @@ -105,10 +128,7 @@ impl KafkaBackfillExecutor { &self, source_desc: &SourceDesc, state: ConnectorState, - ) -> StreamExecutorResult<( - BoxSourceWithStateStream, - HashMap, - )> { + ) -> StreamExecutorResult<(BoxSourceWithStateStream, HashMap)> { let column_ids = source_desc .columns .iter() @@ -116,7 +136,7 @@ impl KafkaBackfillExecutor { .collect_vec(); let source_ctx = SourceContext::new_with_suppressor( self.actor_ctx.id, - self.stream_source_core.as_ref().unwrap().source_id, + self.stream_source_core.source_id, self.actor_ctx.fragment_id, source_desc.metrics.clone(), self.source_ctrl_opts.clone(), @@ -150,240 +170,32 @@ impl KafkaBackfillExecutor { } } - async fn apply_split_change( - &mut self, - source_desc: &SourceDesc, - stream: &mut StreamReaderWithPause, - split_assignment: &HashMap>, - ) -> StreamExecutorResult>> { - // self.metrics - // .source_split_change_count - // .with_label_values( - // &self - // .get_metric_labels() - // .iter() - // .map(AsRef::as_ref) - // .collect::>(), - // ) - // .inc(); - if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() { - if let Some(target_state) = self.update_state_if_changed(Some(target_splits)).await? { - tracing::info!( - actor_id = self.actor_ctx.id, - state = ?target_state, - "apply split change" - ); - - self.replace_stream_reader_with_target_state( - source_desc, - stream, - target_state.clone(), - ) - .await?; - - return Ok(Some(target_state)); - } - } - - Ok(None) - } - - // Note: `update_state_if_changed` will modify `state_cache` - async fn update_state_if_changed( - &mut self, - state: ConnectorState, - ) -> StreamExecutorResult { - let core = self.stream_source_core.as_mut().unwrap(); - - let target_splits: HashMap<_, _> = state - .unwrap() - .into_iter() - .map(|split| (split.id(), split)) - .collect(); - - let mut target_state: Vec = Vec::with_capacity(target_splits.len()); - - let mut split_changed = false; - - for (split_id, split) in &target_splits { - if let Some(s) = core.state_cache.get(split_id) { - // existing split, no change, clone from cache - target_state.push(s.clone()) - } else { - split_changed = true; - // write new assigned split to state cache. snapshot is base on cache. - - let initial_state = if let Some(recover_state) = core - .split_state_store - .try_recover_from_state_store(split) - .await? - { - recover_state - } else { - split.clone() - }; - - core.state_cache - .entry(split.id()) - .or_insert_with(|| initial_state.clone()); - - target_state.push(initial_state); - } - } - - // state cache may be stale - for existing_split_id in core.stream_source_splits.keys() { - if !target_splits.contains_key(existing_split_id) { - tracing::info!("split dropping detected: {}", existing_split_id); - split_changed = true; - } - } - - Ok(split_changed.then_some(target_state)) - } - - /// Rebuild stream if there is a err in stream - async fn rebuild_stream_reader_from_error( - &mut self, - source_desc: &SourceDesc, - stream: &mut StreamReaderWithPause, - split_info: &mut [SplitImpl], - e: StreamExecutorError, - ) -> StreamExecutorResult<()> { - let core = self.stream_source_core.as_mut().unwrap(); - tracing::warn!( - "stream source reader error, actor: {:?}, source: {:?}", - self.actor_ctx.id, - core.source_id, - ); - GLOBAL_ERROR_METRICS.user_source_reader_error.report([ - "SourceReaderError".to_owned(), - e.to_report_string(), - "KafkaBackfillExecutor".to_owned(), - self.actor_ctx.id.to_string(), - core.source_id.to_string(), - ]); - // fetch the newest offset, either it's in cache (before barrier) - // or in state table (just after barrier) - let target_state = if core.state_cache.is_empty() { - for ele in &mut *split_info { - if let Some(recover_state) = core - .split_state_store - .try_recover_from_state_store(ele) - .await? - { - *ele = recover_state; - } - } - split_info.to_owned() - } else { - core.state_cache - .values() - .map(|split_impl| split_impl.to_owned()) - .collect_vec() - }; - - self.replace_stream_reader_with_target_state(source_desc, stream, target_state) - .await - } - - async fn replace_stream_reader_with_target_state( - &mut self, - source_desc: &SourceDesc, - stream: &mut StreamReaderWithPause, - target_state: Vec, - ) -> StreamExecutorResult<()> { - tracing::info!( - "actor {:?} apply source split change to {:?}", - self.actor_ctx.id, - target_state - ); - - // Replace the source reader with a new one of the new state. - - let (reader, abort_handles) = self - .build_stream_source_reader(source_desc, Some(target_state.clone())) - .await?; - - stream.replace_data_stream(reader); - - Ok(()) - } - - async fn take_snapshot_and_clear_cache( - &mut self, - epoch: EpochPair, - target_state: Option>, - should_trim_state: bool, - ) -> StreamExecutorResult<()> { - let core = self.stream_source_core.as_mut().unwrap(); - - let mut cache = core - .state_cache - .values() - .map(|split_impl| split_impl.to_owned()) - .collect_vec(); - - if let Some(target_splits) = target_state { - let target_split_ids: HashSet<_> = - target_splits.iter().map(|split| split.id()).collect(); - - cache.retain(|split| target_split_ids.contains(&split.id())); - - let dropped_splits = core - .stream_source_splits - .extract_if(|split_id, _| !target_split_ids.contains(split_id)) - .map(|(_, split)| split) - .collect_vec(); - - if should_trim_state && !dropped_splits.is_empty() { - // trim dropped splits' state - core.split_state_store.trim_state(&dropped_splits).await?; - } - - core.stream_source_splits = target_splits - .into_iter() - .map(|split| (split.id(), split)) - .collect(); - } - - if !cache.is_empty() { - tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot"); - core.split_state_store.take_snapshot(cache).await? - } - // commit anyway, even if no message saved - core.split_state_store.state_store.commit(epoch).await?; - - core.state_cache.clear(); - - Ok(()) - } - - async fn try_flush_data(&mut self) -> StreamExecutorResult<()> { - let core = self.stream_source_core.as_mut().unwrap(); - core.split_state_store.state_store.try_flush().await?; - - Ok(()) - } - #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute(mut self, input: BoxedExecutor) { - // TODO: these can be inferred in frontend by checking additional_column_type - let split_column_idx = 1; - let offset_column_idx = 2; - let mut input = input.execute(); // Poll the upstream to get the first barrier. let barrier = expect_first_barrier(&mut input).await?; + tracing::debug!("KafkaBackfillExecutor got first barrier: {barrier:?}"); - let mut core = self.stream_source_core.unwrap(); + let mut core = self.stream_source_core; // Build source description from the builder. let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap(); let source_desc = source_desc_builder .build() .map_err(StreamExecutorError::connector_error)?; + let split_column_idx = source_desc + .columns + .iter() + .position(|c| matches!(c.additional_column_type, AdditionalColumnType::Partition)) + .expect("kafka source should have partition column"); + let offset_column_idx = source_desc + .columns + .iter() + .position(|c| matches!(c.additional_column_type, AdditionalColumnType::Offset)) + .expect("kafka source should have offset column"); + tracing::debug!(?split_column_idx, ?offset_column_idx); let mut boot_state = Vec::default(); if let Some(mutation) = barrier.mutation.as_ref() { @@ -405,27 +217,39 @@ impl KafkaBackfillExecutor { _ => {} } } - let mut latest_split_info = boot_state.clone(); + let latest_split_info = boot_state.clone(); + tracing::debug!("latest_split_info: {:?}", latest_split_info); - core.split_state_store.init_epoch(barrier.epoch); + self.backfill_state_store.init_epoch(barrier.epoch); - for ele in &mut boot_state { - if let Some(recover_state) = core - .split_state_store + let mut backfill_states: BackfillStates = HashMap::new(); + + let mut unfinished_splits = vec![]; + for ele in boot_state { + let split_id = ele.id().to_string(); + let (split, backfill_state) = self + .backfill_state_store .try_recover_from_state_store(ele) - .await? - { - *ele = recover_state; + .await?; + + backfill_states.insert(split_id, backfill_state); + if split.is_some() { + unfinished_splits.push(split.unwrap()); } } + let need_backfill = backfill_states + .values() + .any(|state| !matches!(state, BackfillState::Finished)); + tracing::debug!("KafkaBackfillExecutor backfill_state: {backfill_states:?}"); // init in-memory split states with persisted state if any - core.init_split_state(boot_state.clone()); + core.init_split_state(unfinished_splits.clone()); // Return the ownership of `stream_source_core` to the source executor. - self.stream_source_core = Some(core); + self.stream_source_core = core; - let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); + let recover_state: ConnectorState = + (!unfinished_splits.is_empty()).then_some(unfinished_splits); tracing::info!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state"); let (source_chunk_reader, abort_handles) = self .build_stream_source_reader(&source_desc, recover_state) @@ -456,24 +280,6 @@ impl KafkaBackfillExecutor { |_: &mut ()| futures::stream::PollNext::Left, ); - #[derive(Debug)] - enum BackfillState { - /// `None` means not started yet. It's the initial state. - Backfilling(Option), - /// Backfill is stopped at this offset. Source needs to filter out messages before this offset. - SourceCachingUp(String), - Finished, - } - - let split_ids = abort_handles.keys(); - // TODO: recover from state store - let mut backfill_state: HashMap = split_ids - .map(|k| (k.clone(), BackfillState::Backfilling(None))) - .collect(); - let need_backfill = backfill_state - .values() - .any(|state| !matches!(state, BackfillState::Finished)); - if need_backfill { #[for_await] 'backfill_loop: for either in &mut backfill_stream { @@ -482,7 +288,16 @@ impl KafkaBackfillExecutor { Either::Left(msg) => { match msg? { Message::Barrier(barrier) => { - // TODO: handle split change etc. & Persist progress. + // TODO: handle split change etc. + + self.backfill_state_store + .set_states(backfill_states.clone()) + .await?; + self.backfill_state_store + .state_store + .commit(barrier.epoch) + .await?; + yield Message::Barrier(barrier); } Message::Chunk(chunk) => { @@ -490,12 +305,11 @@ impl KafkaBackfillExecutor { // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly. let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len()); for (i, (_, row)) in chunk.rows().enumerate() { - let split = - row.datum_at(split_column_idx).unwrap().into_int64(); + tracing::debug!(row = %row.display()); + let split = row.datum_at(split_column_idx).unwrap().into_utf8(); let offset = - row.datum_at(offset_column_idx).unwrap().into_int64(); - let backfill_state = - backfill_state.get_mut(&split.to_string()).unwrap(); + row.datum_at(offset_column_idx).unwrap().into_utf8(); + let backfill_state = backfill_states.get_mut(split).unwrap(); match backfill_state { BackfillState::Backfilling(backfill_offset) => { new_vis.set(i, false); @@ -509,10 +323,7 @@ impl KafkaBackfillExecutor { Ordering::Equal => { // backfilling for this split is finished just right. *backfill_state = BackfillState::Finished; - abort_handles - .get(&split.to_string()) - .unwrap() - .abort(); + abort_handles.get(split).unwrap().abort(); } Ordering::Greater => { // backfilling for this split produced more data. @@ -562,7 +373,7 @@ impl KafkaBackfillExecutor { yield Message::Chunk(new_chunk); } // TODO: maybe use a counter to optimize this - if backfill_state + if backfill_states .values() .all(|state| matches!(state, BackfillState::Finished)) { @@ -584,18 +395,16 @@ impl KafkaBackfillExecutor { let split_offset_mapping = split_offset_mapping.expect("kafka source should have offsets"); - let state: HashMap<_, _> = split_offset_mapping + let _state: HashMap<_, _> = split_offset_mapping .iter() .flat_map(|(split_id, offset)| { let origin_split_impl = self .stream_source_core - .as_mut() - .unwrap() .stream_source_splits .get_mut(split_id); // update backfill progress - let prev_state = backfill_state.insert( + let prev_state = backfill_states.insert( split_id.to_string(), BackfillState::Backfilling(Some(offset.to_string())), ); @@ -613,14 +422,14 @@ impl KafkaBackfillExecutor { }) .try_collect()?; - self.stream_source_core - .as_mut() - .unwrap() - .state_cache - .extend(state); + // self.stream_source_core + // .as_mut() + // .unwrap() + // .state_cache + // .extend(state); yield Message::Chunk(chunk); - self.try_flush_data().await?; + // self.try_flush_data().await?; } } } @@ -629,7 +438,8 @@ impl KafkaBackfillExecutor { // All splits finished backfilling. Now we only forward the source data. #[for_await] for msg in input { - match msg? { + let msg = msg?; + match msg { Message::Barrier(barrier) => { // TODO: How to handle a split change here? // We might need to persist its state. Is is possible that we need to backfill? @@ -646,10 +456,11 @@ impl KafkaBackfillExecutor { } } -fn compare_kafka_offset(a: Option<&String>, b: i64) -> Ordering { +fn compare_kafka_offset(a: Option<&String>, b: &str) -> Ordering { match a { Some(a) => { let a = a.parse::().unwrap(); + let b = b.parse::().unwrap(); a.cmp(&b) } None => Ordering::Less, @@ -676,14 +487,11 @@ impl Executor for KafkaBackfillExecutorWrapper { impl Debug for KafkaBackfillExecutor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if let Some(core) = &self.stream_source_core { - f.debug_struct("KafkaBackfillExecutor") - .field("source_id", &core.source_id) - .field("column_ids", &core.column_ids) - .field("pk_indices", &self.info.pk_indices) - .finish() - } else { - f.debug_struct("KafkaBackfillExecutor").finish() - } + let core = &self.stream_source_core; + f.debug_struct("KafkaBackfillExecutor") + .field("source_id", &core.source_id) + .field("column_ids", &core.column_ids) + .field("pk_indices", &self.info.pk_indices) + .finish() } } diff --git a/src/stream/src/executor/source/kafka_backfill_state_table.rs b/src/stream/src/executor/source/kafka_backfill_state_table.rs new file mode 100644 index 0000000000000..d478140cc4287 --- /dev/null +++ b/src/stream/src/executor/source/kafka_backfill_state_table.rs @@ -0,0 +1,132 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; +use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::{JsonbVal, ScalarImpl, ScalarRef, ScalarRefImpl}; +use risingwave_common::util::epoch::EpochPair; +use risingwave_common::{bail, row}; +use risingwave_connector::source::{SplitImpl, SplitMetaData}; +use risingwave_pb::catalog::PbTable; +use risingwave_storage::StateStore; + +use super::kafka_backfill_executor::{BackfillState, BackfillStates, SplitId}; +use crate::common::table::state_table::StateTable; +use crate::executor::error::StreamExecutorError; +use crate::executor::StreamExecutorResult; + +pub struct BackfillStateTableHandler { + pub state_store: StateTable, +} + +impl BackfillStateTableHandler { + pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { + // The state of source should not be cleaned up by retention_seconds + assert!(!table_catalog + .properties + .contains_key(&String::from(PROPERTIES_RETENTION_SECOND_KEY))); + + Self { + state_store: StateTable::from_table_catalog(table_catalog, store, None).await, + } + } + + pub fn init_epoch(&mut self, epoch: EpochPair) { + self.state_store.init_epoch(epoch); + } + + fn string_to_scalar(rhs: impl Into) -> ScalarImpl { + ScalarImpl::Utf8(rhs.into().into_boxed_str()) + } + + pub(crate) async fn get(&self, key: &SplitId) -> StreamExecutorResult> { + self.state_store + .get_row(row::once(Some(Self::string_to_scalar(key)))) + .await + .map_err(StreamExecutorError::from) + } + + pub async fn set(&mut self, key: SplitId, value: JsonbVal) -> StreamExecutorResult<()> { + let row = [ + Some(Self::string_to_scalar(&key)), + Some(ScalarImpl::Jsonb(value)), + ]; + match self.get(&key).await? { + Some(prev_row) => { + self.state_store.update(prev_row, row); + } + None => { + self.state_store.insert(row); + } + } + Ok(()) + } + + pub async fn delete(&mut self, key: &SplitId) -> StreamExecutorResult<()> { + if let Some(prev_row) = self.get(key).await? { + self.state_store.delete(prev_row); + } + + Ok(()) + } + + pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> { + if states.is_empty() { + // TODO should be a clear Error Code + bail!("states require not null"); + } else { + for (split_id, state) in states { + self.set(split_id, state.encode_to_json()).await?; + } + } + Ok(()) + } + + // pub async fn trim_state(&mut self, to_trim: &[SplitImpl]) -> StreamExecutorResult<()> { + // for split in to_trim { + // tracing::info!("trimming source state for split {}", split.id()); + // self.delete(split.id()).await?; + // } + + // Ok(()) + // } + + /// `None` means no need to read from the split anymore (backfill finished) + pub async fn try_recover_from_state_store( + &mut self, + mut stream_source_split: SplitImpl, + ) -> StreamExecutorResult<(Option, BackfillState)> { + Ok( + match self.get(&stream_source_split.id().to_string()).await? { + None => (Some(stream_source_split), BackfillState::Backfilling(None)), + Some(row) => match row.datum_at(1) { + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { + let state = BackfillState::restore_from_json(jsonb_ref.to_owned_scalar())?; + let new_split = match &state { + BackfillState::Backfilling(None) => Some(stream_source_split), + BackfillState::Backfilling(Some(offset)) => { + stream_source_split.update_in_place(offset.clone())?; + Some(stream_source_split) + } + BackfillState::SourceCachingUp(_) => None, + BackfillState::Finished => None, + }; + (new_split, state) + } + _ => unreachable!(), + }, + }, + ) + } +} diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index dd9fb470e2f32..82ed243535136 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -23,6 +23,8 @@ pub mod fetch_executor; pub use fetch_executor::*; pub mod kafka_backfill_executor; +pub mod kafka_backfill_state_table; +pub use kafka_backfill_state_table::BackfillStateTableHandler; pub mod source_executor; pub mod list_executor; diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 80b57c43b5450..3e9220e621da5 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -171,7 +171,7 @@ impl SourceExecutor { Ok(None) } - // Note: `update_state_if_changed` will modify `state_cache` + /// Note: `update_state_if_changed` will modify `state_cache` async fn update_state_if_changed( &mut self, state: ConnectorState, @@ -292,7 +292,9 @@ impl SourceExecutor { Ok(()) } - async fn take_snapshot_and_clear_cache( + /// - `target_state`: the new split info from barrier. `None` if no split update. + /// - `should_trim_state`: whether to trim state for dropped splits. + async fn persist_state_and_clear_cache( &mut self, epoch: EpochPair, target_state: Option>, @@ -331,16 +333,17 @@ impl SourceExecutor { if !cache.is_empty() { tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot"); - core.split_state_store.take_snapshot(cache).await? + core.split_state_store.set_states(cache).await? } + // commit anyway, even if no message saved core.split_state_store.state_store.commit(epoch).await?; - core.state_cache.clear(); Ok(()) } + /// try mem table spill async fn try_flush_data(&mut self) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); core.split_state_store.state_store.try_flush().await?; @@ -518,7 +521,8 @@ impl SourceExecutor { latest_split_info = target_state.clone(); } - self.take_snapshot_and_clear_cache( + // We clear cache on barrier because ..? + self.persist_state_and_clear_cache( epoch, target_state, should_trim_state, diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 31f20ddb9d7fa..e3f7b80e6880e 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -12,13 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +cfg_if::cfg_if! { + if #[cfg(test)] { + use risingwave_common::catalog::{DatabaseId, SchemaId}; + use risingwave_pb::catalog::table::TableType; + use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType}; + use risingwave_pb::data::data_type::TypeName; + use risingwave_pb::data::DataType; + use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc}; + } +} + use std::collections::HashSet; use std::ops::{Bound, Deref}; use std::sync::Arc; use futures::{pin_mut, StreamExt}; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::{DatabaseId, SchemaId}; use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row}; @@ -27,12 +37,7 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_common::{bail, row}; use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; use risingwave_hummock_sdk::key::next_key; -use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::PbTable; -use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType}; -use risingwave_pb::data::data_type::TypeName; -use risingwave_pb::data::DataType; -use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; @@ -140,10 +145,10 @@ impl SourceStateTableHandler { /// set all complete /// can only used by `FsSourceExecutor` - pub(crate) async fn set_all_complete(&mut self, states: Vec) -> StreamExecutorResult<()> - where - SS: SplitMetaData, - { + pub(crate) async fn set_all_complete( + &mut self, + states: Vec, + ) -> StreamExecutorResult<()> { if states.is_empty() { // TODO should be a clear Error Code bail!("states require not null"); @@ -180,11 +185,7 @@ impl SourceStateTableHandler { Ok(()) } - /// This function provides the ability to persist the source state - /// and needs to be invoked by the ``SourceReader`` to call it, - /// and will return the error when the dependent ``StateStore`` handles the error. - /// The caller should ensure that the passed parameters are not empty. - pub async fn take_snapshot(&mut self, states: Vec) -> StreamExecutorResult<()> + pub async fn set_states(&mut self, states: Vec) -> StreamExecutorResult<()> where SS: SplitMetaData, { @@ -200,10 +201,7 @@ impl SourceStateTableHandler { Ok(()) } - pub async fn trim_state(&mut self, to_trim: &[SS]) -> StreamExecutorResult<()> - where - SS: SplitMetaData, - { + pub async fn trim_state(&mut self, to_trim: &[SplitImpl]) -> StreamExecutorResult<()> { for split in to_trim { tracing::info!("trimming source state for split {}", split.id()); self.delete(split.id()).await?; @@ -228,8 +226,9 @@ impl SourceStateTableHandler { } } -// align with schema defined in `LogicalSource::infer_internal_table_catalog`. The function is used -// for test purpose and should not be used in production. +/// align with schema defined in `LogicalSource::infer_internal_table_catalog`. The function is used +/// for test purpose and should not be used in production. +#[cfg(test)] pub fn default_source_internal_table(id: u32) -> PbTable { let make_column = |column_type: TypeName, column_id: i32| -> ColumnCatalog { ColumnCatalog { @@ -325,7 +324,7 @@ pub(crate) mod tests { state_table_handler.init_epoch(epoch_1); state_table_handler - .take_snapshot(vec![split_impl.clone()]) + .set_states(vec![split_impl.clone()]) .await?; state_table_handler.state_store.commit(epoch_2).await?; diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 9a9e83c0a328f..5f3f710341336 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -42,6 +42,7 @@ mod simple_agg; mod sink; mod sort; mod source; +mod source_backfill; mod stateless_simple_agg; mod stream_cdc_scan; mod stream_scan; @@ -84,6 +85,7 @@ use self::simple_agg::*; use self::sink::*; use self::sort::*; use self::source::*; +use self::source_backfill::*; use self::stateless_simple_agg::*; use self::stream_cdc_scan::*; use self::stream_scan::*; @@ -172,5 +174,6 @@ pub async fn create_executor( NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder, NodeBody::OverWindow => OverWindowExecutorBuilder, NodeBody::StreamFsFetch => FsFetchExecutorBuilder, + NodeBody::SourceBackfill => KafkaBackfillExecutorBuilder, } } diff --git a/src/stream/src/from_proto/source_backfill.rs b/src/stream/src/from_proto/source_backfill.rs new file mode 100644 index 0000000000000..e129a7c362975 --- /dev/null +++ b/src/stream/src/from_proto/source_backfill.rs @@ -0,0 +1,170 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::{ + default_key_column_name_version_mapping, ColumnId, TableId, KAFKA_TIMESTAMP_COLUMN_NAME, +}; +use risingwave_connector::source::SourceCtrlOpts; +use risingwave_pb::data::data_type::TypeName as PbTypeName; +use risingwave_pb::plan_common::{ + AdditionalColumnType, ColumnDescVersion, FormatType, PbEncodeType, +}; +use risingwave_pb::stream_plan::SourceBackfillNode; +use risingwave_source::source_desc::SourceDescBuilder; + +use super::*; +use crate::executor::kafka_backfill_executor::{ + KafkaBackfillExecutor, KafkaBackfillExecutorWrapper, +}; +use crate::executor::source::StreamSourceCore; +use crate::executor::state_table_handler::SourceStateTableHandler; +use crate::executor::BackfillStateTableHandler; + +pub struct KafkaBackfillExecutorBuilder; + +impl ExecutorBuilder for KafkaBackfillExecutorBuilder { + type Node = SourceBackfillNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &Self::Node, + store: impl StateStore, + _stream: &mut LocalStreamManagerCore, + ) -> StreamResult { + let [input]: [_; 1] = params.input.try_into().unwrap(); + + // let (sender, barrier_receiver) = unbounded_channel(); + // stream + // .context + // .barrier_manager() + // .register_sender(params.actor_context.id, sender); + let system_params = params.env.system_params_manager_ref().get_params(); + + let source_id = TableId::new(node.source_id); + let source_name = node.source_name.clone(); + let source_info = node.get_info()?; + + let mut source_columns = node.columns.clone(); + + { + // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707 + // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key + if source_info.format() == FormatType::Upsert + && (source_info.row_encode() == PbEncodeType::Avro + || source_info.row_encode() == PbEncodeType::Protobuf) + { + let _ = source_columns.iter_mut().map(|c| { + let _ = c.column_desc.as_mut().map(|desc| { + let is_bytea = desc + .get_column_type() + .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32) + .unwrap(); + if desc.name == default_key_column_name_version_mapping( + &desc.version() + ) + && is_bytea + // the column is from a legacy version + && desc.version == ColumnDescVersion::Unspecified as i32 + { + desc.additional_column_type = AdditionalColumnType::Key as i32; + } + }); + }); + } + } + + { + // compatible code: handle legacy column `_rw_kafka_timestamp` + // the column is auto added for all kafka source to empower batch query on source + // solution: rewrite the column `additional_column_type` to Timestamp + + let _ = source_columns.iter_mut().map(|c| { + let _ = c.column_desc.as_mut().map(|desc| { + let is_timestamp = desc + .get_column_type() + .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32) + .unwrap(); + if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME + && is_timestamp + // the column is from a legacy version + && desc.version == ColumnDescVersion::Unspecified as i32 + { + desc.additional_column_type = AdditionalColumnType::Timestamp as i32; + } + }); + }); + } + + let source_desc_builder = SourceDescBuilder::new( + source_columns.clone(), + params.env.source_metrics(), + node.row_id_index.map(|x| x as _), + node.with_properties.clone(), + source_info.clone(), + params.env.connector_params(), + params.env.config().developer.connector_message_buffer_size, + // `pk_indices` is used to ensure that a message will be skipped instead of parsed + // with null pk when the pk column is missing. + // + // Currently pk_indices for source is always empty since pk information is not + // passed via `StreamSource` so null pk may be emitted to downstream. + // + // TODO: use the correct information to fill in pk_dicies. + // We should consdier add back the "pk_column_ids" field removed by #8841 in + // StreamSource + params.info.pk_indices.clone(), + ); + + let source_ctrl_opts = SourceCtrlOpts { + chunk_size: params.env.config().developer.chunk_size, + }; + + let source_column_ids: Vec<_> = source_columns + .iter() + .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id)) + .collect(); + + // FIXME: remove this. It's wrong + let state_table_handler = SourceStateTableHandler::from_table_catalog( + node.state_table.as_ref().unwrap(), + store.clone(), + ) + .await; + let backfill_state_table = BackfillStateTableHandler::from_table_catalog( + node.state_table.as_ref().unwrap(), + store.clone(), + ) + .await; + let stream_source_core = StreamSourceCore::new( + source_id, + source_name, + source_column_ids, + source_desc_builder, + state_table_handler, + ); + + let exec = KafkaBackfillExecutor::new( + params.actor_context.clone(), + params.info.clone(), + stream_source_core, + params.executor_stats.clone(), + // barrier_receiver, + system_params, + source_ctrl_opts.clone(), + params.env.connector_params(), + backfill_state_table, + ); + Ok(KafkaBackfillExecutorWrapper { inner: exec, input }.boxed()) + } +}