From 1b5e7d83bc69841c555ff12543f5617c45194e4e Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Thu, 9 May 2024 15:35:59 +0800 Subject: [PATCH 1/3] feat: specify column when sink into table (#16587) Co-authored-by: Eric Fu --- .../sink/sink_into_table/specify_column.slt | 54 +++++++ src/frontend/src/catalog/table_catalog.rs | 14 ++ src/frontend/src/handler/create_mv.rs | 23 +-- src/frontend/src/handler/create_sink.rs | 142 +++++++++++------- .../src/optimizer/plan_node/stream_sink.rs | 1 + src/sqlparser/src/ast/statement.rs | 3 +- 6 files changed, 174 insertions(+), 63 deletions(-) create mode 100644 e2e_test/sink/sink_into_table/specify_column.slt diff --git a/e2e_test/sink/sink_into_table/specify_column.slt b/e2e_test/sink/sink_into_table/specify_column.slt new file mode 100644 index 0000000000000..2eefd8abf8fb0 --- /dev/null +++ b/e2e_test/sink/sink_into_table/specify_column.slt @@ -0,0 +1,54 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table s (a int, b int, c int) append only; + +statement ok +create table t (a int, b int default 900, c int default 9000); + +statement error +create sink ss into t(aaa) as select a from s with(type = 'append-only'); + +statement error +create sink ss into t(a) as select a, b from s with(type = 'append-only'); + +statement error +create sink ss into t(a, b) as select b from s with(type = 'append-only'); + +statement error +create sink ss into t(a, b, c, a) as select a, b from s with(type = 'append-only'); + +statement ok +create sink s1 into t(a,B,c) as select c, b, a from s with(type = 'append-only'); + +statement ok +create sink s2 into t(a,B) as select 2*c, 2*b from s with(type = 'append-only'); + +statement ok +create sink s3 into t(c) as select 3*a from s with(type = 'append-only'); + +statement ok +insert into s values(10, 100, 1000); + +query III rowsort +select * from t order by a; +---- +1000 100 10 +2000 200 9000 +NULL 900 30 + +statement ok +drop sink s1; + +statement ok +drop sink s2; + +statement ok +drop sink s3; + +statement ok +drop table s; + +statement ok +drop table t; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index f5f95861d29cd..9b02402571d0f 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -450,6 +450,20 @@ impl TableCatalog { .map(|(i, _)| i) } + pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl { + if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self + .columns[col_idx] + .column_desc + .generated_or_default_column + .as_ref() + { + ExprImpl::from_expr_proto(expr.as_ref().unwrap()) + .expect("expr in default columns corrupted") + } else { + ExprImpl::literal_null(self.columns[col_idx].data_type().clone()) + } + } + pub fn default_columns(&self) -> impl Iterator + '_ { self.columns.iter().enumerate().filter_map(|(i, c)| { if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 94ff4eebbc66e..ceaa92e06b71a 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -37,21 +37,24 @@ use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; +pub(super) fn parse_column_names(columns: &[Ident]) -> Option> { + if columns.is_empty() { + None + } else { + Some(columns.iter().map(|v| v.real_value()).collect()) + } +} + +/// If columns is empty, it means that the user did not specify the column names. +/// In this case, we extract the column names from the query. +/// If columns is not empty, it means that user specify the column names and the user +/// should guarantee that the column names number are consistent with the query. pub(super) fn get_column_names( bound: &BoundQuery, session: &SessionImpl, columns: Vec, ) -> Result>> { - // If columns is empty, it means that the user did not specify the column names. - // In this case, we extract the column names from the query. - // If columns is not empty, it means that user specify the column names and the user - // should guarantee that the column names number are consistent with the query. - let col_names: Option> = if columns.is_empty() { - None - } else { - Some(columns.iter().map(|v| v.real_value()).collect()) - }; - + let col_names = parse_column_names(&columns); if let BoundSetExpr::Select(select) = &bound.body { // `InputRef`'s alias will be implicitly assigned in `bind_project`. // If user provide columns name (col_names.is_some()), we don't need alias. diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f2d8cfb733bf8..c4c1b2ad0aace 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::rc::Rc; use std::sync::{Arc, LazyLock}; @@ -23,9 +23,8 @@ use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::{FromArrow, IcebergArrowConvert}; -use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, TableId, UserId}; -use risingwave_common::types::Datum; -use risingwave_common::util::value_encoding::DatumFromProtoExt; +use risingwave_common::catalog::{ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId}; +use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; @@ -34,7 +33,6 @@ use risingwave_connector::sink::{ }; use risingwave_pb::catalog::{PbSource, Table}; use risingwave_pb::ddl_service::ReplaceTablePlan; -use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode}; @@ -51,8 +49,9 @@ use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; use crate::catalog::source_catalog::SourceCatalog; use crate::error::{ErrorCode, Result, RwError}; -use crate::expr::{ExprImpl, InputRef, Literal}; +use crate::expr::{ExprImpl, InputRef}; use crate::handler::alter_table_column::fetch_table_catalog_for_alter; +use crate::handler::create_mv::parse_column_names; use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; @@ -81,6 +80,7 @@ pub fn gen_sink_plan( stmt: CreateSinkStatement, partition_info: Option, ) -> Result { + let user_specified_columns = !stmt.columns.is_empty(); let db_name = session.database(); let (sink_schema_name, sink_table_name) = Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?; @@ -119,8 +119,12 @@ pub fn gen_sink_plan( let check_items = resolve_query_privileges(&bound); session.check_privileges(&check_items)?; - // If column names not specified, use the name in materialized view. - let col_names = get_column_names(&bound, session, stmt.columns)?; + let col_names = if sink_into_table_name.is_some() { + parse_column_names(&stmt.columns) + } else { + // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name. + get_column_names(&bound, session, stmt.columns)? + }; let mut with_options = context.with_options().clone(); @@ -173,8 +177,8 @@ pub fn gen_sink_plan( }; let mut plan_root = Planner::new(context).plan_query(bound)?; - if let Some(col_names) = col_names { - plan_root.set_out_names(col_names)?; + if let Some(col_names) = &col_names { + plan_root.set_out_names(col_names.clone())?; }; let without_backfill = match with_options.remove(SINK_WITHOUT_BACKFILL) { @@ -197,6 +201,25 @@ pub fn gen_sink_plan( .map(|table_name| fetch_table_catalog_for_alter(session, table_name)) .transpose()?; + if let Some(target_table_catalog) = &target_table_catalog { + if let Some(col_names) = col_names { + let target_table_columns = target_table_catalog + .columns() + .iter() + .map(|c| c.name()) + .collect::>(); + for c in col_names { + if !target_table_columns.contains(c.as_str()) { + return Err(RwError::from(ErrorCode::BindError(format!( + "Column {} not found in table {}", + c, + target_table_catalog.name() + )))); + } + } + } + } + let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id()); let sink_plan = plan_root.gen_sink_plan( @@ -252,7 +275,12 @@ pub fn gen_sink_plan( ))); } - let exprs = derive_default_column_project_for_sink(&sink_catalog, table_catalog)?; + let exprs = derive_default_column_project_for_sink( + &sink_catalog, + sink_plan.schema(), + table_catalog, + user_specified_columns, + )?; let logical_project = generic::Project::new(exprs, sink_plan); @@ -633,66 +661,78 @@ pub(crate) fn insert_merger_to_union(node: &mut StreamNode) { insert_merger_to_union(input); } } + +fn derive_sink_to_table_expr( + sink_schema: &Schema, + idx: usize, + target_type: &DataType, +) -> Result { + let input_type = &sink_schema.fields()[idx].data_type; + + if target_type != input_type { + bail!( + "column type mismatch: {:?} vs {:?}", + target_type, + input_type + ); + } else { + Ok(ExprImpl::InputRef(Box::new(InputRef::new( + idx, + input_type.clone(), + )))) + } +} + fn derive_default_column_project_for_sink( sink: &SinkCatalog, + sink_schema: &Schema, target_table_catalog: &Arc, + user_specified_columns: bool, ) -> Result> { + assert_eq!(sink.full_schema().len(), sink_schema.len()); + let mut exprs = vec![]; - let sink_visible_columns = sink + let sink_visible_col_idxes = sink .full_columns() .iter() - .enumerate() - .filter(|(_i, c)| !c.is_hidden()) + .positions(|c| !c.is_hidden()) .collect_vec(); + let sink_visible_col_idxes_by_name = sink + .full_columns() + .iter() + .enumerate() + .filter(|(_, c)| !c.is_hidden()) + .map(|(i, c)| (c.name(), i)) + .collect::>(); for (idx, table_column) in target_table_catalog.columns().iter().enumerate() { if table_column.is_generated() { continue; } - let data_type = table_column.data_type(); - - if idx < sink_visible_columns.len() { - let (sink_col_idx, sink_column) = sink_visible_columns[idx]; - - let sink_col_type = sink_column.data_type(); + let default_col_expr = || -> ExprImpl { target_table_catalog.default_column_expr(idx) }; + let sink_col_expr = |sink_col_idx: usize| -> Result { + derive_sink_to_table_expr(sink_schema, sink_col_idx, table_column.data_type()) + }; - if data_type != sink_col_type { - bail!( - "column type mismatch: {:?} vs {:?}", - data_type, - sink_col_type - ); + // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly. + // The missing columns will be filled with default value (`null` if not explicitly defined). + // Otherwhise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table. + #[allow(clippy::collapsible_else_if)] + if user_specified_columns { + if let Some(idx) = sink_visible_col_idxes_by_name.get(table_column.name()) { + exprs.push(sink_col_expr(*idx)?); } else { - exprs.push(ExprImpl::InputRef(Box::new(InputRef::new( - sink_col_idx, - data_type.clone(), - )))); + exprs.push(default_col_expr()); } } else { - let data = match table_column - .column_desc - .generated_or_default_column - .as_ref() - { - // default column with default value - Some(GeneratedOrDefaultColumn::DefaultColumn(default_column)) => { - Datum::from_protobuf(default_column.get_snapshot_value().unwrap(), data_type) - .unwrap() - } - // default column with no default value - None => None, - - // generated column is unreachable - _ => unreachable!(), + if idx < sink_visible_col_idxes.len() { + exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?); + } else { + exprs.push(default_col_expr()); }; - - exprs.push(ExprImpl::Literal(Box::new(Literal::new( - data, - data_type.clone(), - )))); - }; + } } Ok(exprs) } diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 0e3c1683009d3..ff8f12b49db0b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -180,6 +180,7 @@ impl StreamSink { .into_stream() .expect("input should be stream plan") .clone_with_new_plan_id(); + Self { base, input, diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 3bc99fe6e9482..24950f19aadb9 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -498,7 +498,6 @@ impl fmt::Display for CreateSink { } } } - // sql_grammar!(CreateSinkStatement { // if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], // sink_name: Ident, @@ -543,7 +542,7 @@ impl ParseTo for CreateSinkStatement { p.expected("FROM or AS after CREATE SINK sink_name", p.peek_token())? }; - let emit_mode = p.parse_emit_mode()?; + let emit_mode: Option = p.parse_emit_mode()?; // This check cannot be put into the `WithProperties::parse_to`, since other // statements may not need the with properties. From d4722387ed889a13ff23c081a899d7b0e6d4ed7a Mon Sep 17 00:00:00 2001 From: ShenJiawei <1648645367@qq.com> Date: Thu, 9 May 2024 15:43:25 +0800 Subject: [PATCH 2/3] feat: add pipeline auto-update Helm Charts and Operator on New Release (#16642) --- ...e-helm-and-operator-version-by-release.yml | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 .github/workflows/auto-update-helm-and-operator-version-by-release.yml diff --git a/.github/workflows/auto-update-helm-and-operator-version-by-release.yml b/.github/workflows/auto-update-helm-and-operator-version-by-release.yml new file mode 100644 index 0000000000000..1641caa3301fd --- /dev/null +++ b/.github/workflows/auto-update-helm-and-operator-version-by-release.yml @@ -0,0 +1,71 @@ +name: Update Helm Charts and Risingwave Operator on New Release + +on: + release: + types: [published] + workflow_dispatch: + inputs: + version: + description: 'release version' + required: true + +env: + NEW_APP_VERSION: ${{ github.event.inputs.version || github.event.release.tag_name }} + +jobs: + update-helm-charts: + runs-on: ubuntu-latest + steps: + - name: Checkout Helm Charts Repository + uses: actions/checkout@v3 + with: + repository: 'risingwavelabs/helm-charts' + token: ${{ secrets.GITHUB_TOKEN }} + path: 'helm-charts' + + - name: Update values.yaml + run: | + sed -i "s/^ tag:.*/ tag: \"${{ env.NEW_APP_VERSION }}\"/" helm-charts/charts/risingwave/values.yaml + + - name: Update Chart.yaml + run: | + cd helm-charts/charts/risingwave + CURRENT_VERSION=$(grep 'version:' Chart.yaml | awk '{print $2}' | head -n 1) + NEW_VERSION=$(echo $CURRENT_VERSION | awk -F. -v OFS='.' '{$NF++; print}') + sed -i "/type: application/,/version:/!b; /version:/s/version: .*/version: $NEW_VERSION/" Chart.yaml + sed -i "s/^appVersion: .*/appVersion: \"${{ env.NEW_APP_VERSION }}\"/" Chart.yaml + echo "NEW_CHART_VERSION=$NEW_VERSION" >> $GITHUB_ENV + + - name: Create Pull Request + uses: peter-evans/create-pull-request@v4 + with: + token: ${{ secrets.GITHUB_TOKEN }} + commit-message: 'chore: bump risingwave to ${{ env.NEW_APP_VERSION }}, release chart ${{ env.NEW_CHART_VERSION }}' + title: 'chore: bump risingwave to ${{ env.NEW_APP_VERSION }}, release chart ${{ env.NEW_CHART_VERSION }}' + body: 'This is an automated pull request to update the chart versions' + branch: 'auto-update-${{ env.NEW_APP_VERSION }}' + + update-risingwave-operator: + runs-on: ubuntu-latest + steps: + - name: Checkout Risingwave Operator Repository + uses: actions/checkout@v3 + with: + repository: 'risingwavelabs/risingwave-operator' + token: ${{ secrets.GITHUB_TOKEN }} + path: 'risingwave-operator' + + - name: Update risingwave-operator image tags + run: | + cd risingwave-operator + PREV_VERSION=$(grep -roh "risingwavelabs/risingwave:v[0-9\.]*" * | head -n 1 | cut -d':' -f2) + grep -rl "risingwavelabs/risingwave:$PREV_VERSION" . | xargs sed -i "s|risingwavelabs/risingwave:$PREV_VERSION|risingwavelabs/risingwave:${{ env.NEW_APP_VERSION }}|g" + + - name: Create Pull Request for risingwave-operator + uses: peter-evans/create-pull-request@v4 + with: + token: ${{ secrets.GITHUB_TOKEN }} + commit-message: 'chore: bump risingwave image tags to ${{ env.NEW_APP_VERSION }}' + title: 'chore: bump risingwave image tags to ${{ env.NEW_APP_VERSION }}' + body: 'This is an automated pull request to update the risingwave image tags' + branch: 'auto-update-${{ env.NEW_APP_VERSION }}' From cb8a92a92b400e0247a4b25df87e3396992b0546 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 9 May 2024 15:49:56 +0800 Subject: [PATCH 3/3] fix: fix the inconsistency issue with the two-stage write to the meta store for sink into table (#16305) --- src/meta/src/barrier/recovery.rs | 2 +- src/meta/src/controller/catalog.rs | 149 +----------- src/meta/src/controller/streaming_job.rs | 281 ++++++++++++++++++++--- src/meta/src/manager/catalog/fragment.rs | 63 ++--- src/meta/src/rpc/ddl_controller_v2.rs | 22 +- 5 files changed, 276 insertions(+), 241 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 0f21ea956858e..f17b4e163901c 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -285,7 +285,7 @@ impl GlobalBarrierManagerContext { tracing::debug!("recovering stream job {}", id); finished.await.ok().context("failed to finish command")??; tracing::debug!(id, "finished stream job"); - catalog_controller.finish_streaming_job(id).await?; + catalog_controller.finish_streaming_job(id, None).await?; }; if let Err(e) = &res { tracing::error!( diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 481331163c285..af1853aec366d 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -64,9 +64,9 @@ use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, - get_fragment_mappings, get_fragment_mappings_by_jobs, get_referring_objects, - get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, - resolve_source_register_info_for_jobs, PartialObject, + get_fragment_mappings_by_jobs, get_referring_objects, get_referring_objects_cascade, + get_user_privilege, list_user_info_by_ids, resolve_source_register_info_for_jobs, + PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -926,149 +926,6 @@ impl CatalogController { Ok(true) } - /// `finish_streaming_job` marks job related objects as `Created` and notify frontend. - pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult { - let inner = self.inner.write().await; - let txn = inner.db.begin().await?; - - let job_type = Object::find_by_id(job_id) - .select_only() - .column(object::Column::ObjType) - .into_tuple() - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?; - - // update `created_at` as now() and `created_at_cluster_version` as current cluster version. - let res = Object::update_many() - .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into()) - .col_expr( - object::Column::CreatedAtClusterVersion, - current_cluster_version().into(), - ) - .filter(object::Column::Oid.eq(job_id)) - .exec(&txn) - .await?; - if res.rows_affected == 0 { - return Err(MetaError::catalog_id_not_found("streaming job", job_id)); - } - - // mark the target stream job as `Created`. - let job = streaming_job::ActiveModel { - job_id: Set(job_id), - job_status: Set(JobStatus::Created), - ..Default::default() - }; - job.update(&txn).await?; - - // notify frontend: job, internal tables. - let internal_table_objs = Table::find() - .find_also_related(Object) - .filter(table::Column::BelongsToJobId.eq(job_id)) - .all(&txn) - .await?; - let mut relations = internal_table_objs - .iter() - .map(|(table, obj)| PbRelation { - relation_info: Some(PbRelationInfo::Table( - ObjectModel(table.clone(), obj.clone().unwrap()).into(), - )), - }) - .collect_vec(); - - match job_type { - ObjectType::Table => { - let (table, obj) = Table::find_by_id(job_id) - .find_also_related(Object) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?; - if let Some(source_id) = table.optional_associated_source_id { - let (src, obj) = Source::find_by_id(source_id) - .find_also_related(Object) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?; - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Source( - ObjectModel(src, obj.unwrap()).into(), - )), - }); - } - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Table( - ObjectModel(table, obj.unwrap()).into(), - )), - }); - } - ObjectType::Sink => { - let (sink, obj) = Sink::find_by_id(job_id) - .find_also_related(Object) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?; - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Sink( - ObjectModel(sink, obj.unwrap()).into(), - )), - }); - } - ObjectType::Index => { - let (index, obj) = Index::find_by_id(job_id) - .find_also_related(Object) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?; - { - let (table, obj) = Table::find_by_id(index.index_table_id) - .find_also_related(Object) - .one(&txn) - .await? - .ok_or_else(|| { - MetaError::catalog_id_not_found("table", index.index_table_id) - })?; - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Table( - ObjectModel(table, obj.unwrap()).into(), - )), - }); - } - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Index( - ObjectModel(index, obj.unwrap()).into(), - )), - }); - } - ObjectType::Source => { - let (source, obj) = Source::find_by_id(job_id) - .find_also_related(Object) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?; - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Source( - ObjectModel(source, obj.unwrap()).into(), - )), - }); - } - _ => unreachable!("invalid job type: {:?}", job_type), - } - - let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; - txn.commit().await?; - - self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) - .await; - let version = self - .notify_frontend( - NotificationOperation::Add, - NotificationInfo::RelationGroup(PbRelationGroup { relations }), - ) - .await; - - Ok(version) - } - pub async fn create_source( &self, mut pb_source: PbSource, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 8b7c5281a9268..c501c14252ffe 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -16,11 +16,11 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::num::NonZeroUsize; use itertools::Itertools; -use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; +use risingwave_common::{bail, current_cluster_version}; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::actor_dispatcher::DispatcherType; use risingwave_meta_model_v2::object::ObjectType; @@ -43,16 +43,17 @@ use risingwave_pb::meta::subscribe_response::{ }; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::{ - FragmentParallelUnitMapping, PbRelation, PbRelationGroup, PbTableFragments, + FragmentParallelUnitMapping, PbFragmentParallelUnitMapping, PbRelation, PbRelationGroup, + PbTableFragments, Relation, }; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; +use risingwave_pb::stream_plan::update_mutation::{MergeUpdate, PbMergeUpdate}; use risingwave_pb::stream_plan::{ PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, }; -use sea_orm::sea_query::SimpleExpr; +use sea_orm::sea_query::{Expr, SimpleExpr}; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, @@ -572,6 +573,186 @@ impl CatalogController { Ok(obj_id) } + /// `finish_streaming_job` marks job related objects as `Created` and notify frontend. + pub async fn finish_streaming_job( + &self, + job_id: ObjectId, + replace_table_job_info: Option<(crate::manager::StreamingJob, Vec, u32)>, + ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + let job_type = Object::find_by_id(job_id) + .select_only() + .column(object::Column::ObjType) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?; + + // update `created_at` as now() and `created_at_cluster_version` as current cluster version. + let res = Object::update_many() + .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into()) + .col_expr( + object::Column::CreatedAtClusterVersion, + current_cluster_version().into(), + ) + .filter(object::Column::Oid.eq(job_id)) + .exec(&txn) + .await?; + if res.rows_affected == 0 { + return Err(MetaError::catalog_id_not_found("streaming job", job_id)); + } + + // mark the target stream job as `Created`. + let job = streaming_job::ActiveModel { + job_id: Set(job_id), + job_status: Set(JobStatus::Created), + ..Default::default() + }; + job.update(&txn).await?; + + // notify frontend: job, internal tables. + let internal_table_objs = Table::find() + .find_also_related(Object) + .filter(table::Column::BelongsToJobId.eq(job_id)) + .all(&txn) + .await?; + let mut relations = internal_table_objs + .iter() + .map(|(table, obj)| PbRelation { + relation_info: Some(PbRelationInfo::Table( + ObjectModel(table.clone(), obj.clone().unwrap()).into(), + )), + }) + .collect_vec(); + + match job_type { + ObjectType::Table => { + let (table, obj) = Table::find_by_id(job_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?; + if let Some(source_id) = table.optional_associated_source_id { + let (src, obj) = Source::find_by_id(source_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Source( + ObjectModel(src, obj.unwrap()).into(), + )), + }); + } + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Table( + ObjectModel(table, obj.unwrap()).into(), + )), + }); + } + ObjectType::Sink => { + let (sink, obj) = Sink::find_by_id(job_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Sink( + ObjectModel(sink, obj.unwrap()).into(), + )), + }); + } + ObjectType::Index => { + let (index, obj) = Index::find_by_id(job_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?; + { + let (table, obj) = Table::find_by_id(index.index_table_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found("table", index.index_table_id) + })?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Table( + ObjectModel(table, obj.unwrap()).into(), + )), + }); + } + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Index( + ObjectModel(index, obj.unwrap()).into(), + )), + }); + } + ObjectType::Source => { + let (source, obj) = Source::find_by_id(job_id) + .find_also_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Source( + ObjectModel(source, obj.unwrap()).into(), + )), + }); + } + _ => unreachable!("invalid job type: {:?}", job_type), + } + + let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; + + let replace_table_mapping_update = match replace_table_job_info { + Some((streaming_job, merge_updates, dummy_id)) => { + let incoming_sink_id = job_id; + + let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner( + dummy_id as ObjectId, + merge_updates, + None, + Some(incoming_sink_id as _), + None, + &txn, + streaming_job, + ) + .await?; + + Some((relations, fragment_mapping)) + } + None => None, + }; + + txn.commit().await?; + + self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) + .await; + + let mut version = self + .notify_frontend( + NotificationOperation::Add, + NotificationInfo::RelationGroup(PbRelationGroup { relations }), + ) + .await; + + if let Some((relations, fragment_mapping)) = replace_table_mapping_update { + self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) + .await; + version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { relations }), + ) + .await; + } + + Ok(version) + } + pub async fn finish_replace_streaming_job( &self, dummy_id: ObjectId, @@ -581,13 +762,53 @@ impl CatalogController { creating_sink_id: Option, dropping_sink_id: Option, ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + let (relations, fragment_mapping) = Self::finish_replace_streaming_job_inner( + dummy_id, + merge_updates, + table_col_index_mapping, + creating_sink_id, + dropping_sink_id, + &txn, + streaming_job, + ) + .await?; + + txn.commit().await?; + + // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table + // catalog and need to access the old fragment. Let frontend nodes delete the old fragment + // when they receive table catalog change. + // self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings) + // .await; + self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) + .await; + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { relations }), + ) + .await; + + Ok(version) + } + + pub async fn finish_replace_streaming_job_inner( + dummy_id: ObjectId, + merge_updates: Vec, + table_col_index_mapping: Option, + creating_sink_id: Option, + dropping_sink_id: Option, + txn: &DatabaseTransaction, + streaming_job: StreamingJob, + ) -> MetaResult<(Vec, Vec)> { // Question: The source catalog should be remain unchanged? let StreamingJob::Table(_, table, ..) = streaming_job else { unreachable!("unexpected job: {streaming_job:?}") }; - let inner = self.inner.write().await; - let txn = inner.db.begin().await?; let job_id = table.id as ObjectId; let mut table = table::ActiveModel::from(table); @@ -605,7 +826,7 @@ impl CatalogController { } table.incoming_sinks = Set(incoming_sinks.into()); - let table = table.update(&txn).await?; + let table = table.update(txn).await?; // Update state table fragment id. let fragment_table_ids: Vec<(FragmentId, I32Array)> = Fragment::find() @@ -616,7 +837,7 @@ impl CatalogController { ]) .filter(fragment::Column::JobId.eq(dummy_id)) .into_tuple() - .all(&txn) + .all(txn) .await?; for (fragment_id, state_table_ids) in fragment_table_ids { for state_table_id in state_table_ids.into_inner() { @@ -625,7 +846,7 @@ impl CatalogController { fragment_id: Set(Some(fragment_id)), ..Default::default() } - .update(&txn) + .update(txn) .await?; } } @@ -634,12 +855,12 @@ impl CatalogController { // 1. replace old fragments/actors with new ones. Fragment::delete_many() .filter(fragment::Column::JobId.eq(job_id)) - .exec(&txn) + .exec(txn) .await?; Fragment::update_many() .col_expr(fragment::Column::JobId, SimpleExpr::from(job_id)) .filter(fragment::Column::JobId.eq(dummy_id)) - .exec(&txn) + .exec(txn) .await?; // 2. update merges. @@ -670,7 +891,7 @@ impl CatalogController { actor::Column::UpstreamActorIds, ]) .into_tuple::<(ActorId, FragmentId, ActorUpstreamActors)>() - .one(&txn) + .one(txn) .await? .ok_or_else(|| { MetaError::catalog_id_not_found("actor", merge_update.actor_id) @@ -693,7 +914,7 @@ impl CatalogController { upstream_actor_ids: Set(upstream_actors), ..Default::default() } - .update(&txn) + .update(txn) .await?; to_update_fragment_ids.insert(fragment_id); @@ -708,7 +929,7 @@ impl CatalogController { fragment::Column::UpstreamFragmentId, ]) .into_tuple::<(FragmentId, StreamNode, I32Array)>() - .one(&txn) + .one(txn) .await? .map(|(id, node, upstream)| (id, node.to_protobuf(), upstream)) .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; @@ -732,18 +953,18 @@ impl CatalogController { upstream_fragment_id: Set(upstream_fragment_id), ..Default::default() } - .update(&txn) + .update(txn) .await?; } // 3. remove dummy object. - Object::delete_by_id(dummy_id).exec(&txn).await?; + Object::delete_by_id(dummy_id).exec(txn).await?; // 4. update catalogs and notify. let mut relations = vec![]; let table_obj = table .find_related(Object) - .one(&txn) + .one(txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("object", table.table_id))?; relations.push(PbRelation { @@ -759,7 +980,7 @@ impl CatalogController { .columns([index::Column::IndexId, index::Column::IndexItems]) .filter(index::Column::PrimaryTableId.eq(job_id)) .into_tuple() - .all(&txn) + .all(txn) .await?; for (index_id, nodes) in index_items { let mut pb_nodes = nodes.to_protobuf(); @@ -771,11 +992,11 @@ impl CatalogController { index_items: Set(pb_nodes.into()), ..Default::default() } - .update(&txn) + .update(txn) .await?; let index_obj = index .find_related(Object) - .one(&txn) + .one(txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?; relations.push(PbRelation { @@ -785,25 +1006,11 @@ impl CatalogController { }); } } - let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; - txn.commit().await?; - - // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table - // catalog and need to access the old fragment. Let frontend nodes delete the old fragment - // when they receive table catalog change. - // self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings) - // .await; - self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) - .await; - let version = self - .notify_frontend( - NotificationOperation::Update, - NotificationInfo::RelationGroup(PbRelationGroup { relations }), - ) - .await; + let fragment_mapping: Vec = + get_fragment_mappings(txn, job_id as _).await?; - Ok(version) + Ok((relations, fragment_mapping)) } /// `try_abort_replacing_streaming_job` is used to abort the replacing streaming job, the input `job_id` is the dummy job id. diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index c10a65eea4195..c52598875d4f2 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -561,7 +561,6 @@ impl FragmentManager { .filter_map(|table_id| map.get(table_id).cloned()) .collect_vec(); - let mut dirty_sink_into_table_upstream_fragment_id = HashSet::new(); let mut table_fragments = BTreeMapTransaction::new(map); let mut table_ids_to_unregister_from_hummock = vec![]; for table_fragment in &to_delete_table_fragments { @@ -596,32 +595,14 @@ impl FragmentManager { }) }); } - - if let Some(sink_fragment) = table_fragment.sink_fragment() { - let dispatchers = sink_fragment - .get_actors() - .iter() - .map(|actor| actor.get_dispatcher()) - .collect_vec(); - - if !dispatchers.is_empty() { - dirty_sink_into_table_upstream_fragment_id.insert(sink_fragment.fragment_id); - } - } } - if !dirty_sink_into_table_upstream_fragment_id.is_empty() { - let to_delete_table_ids: HashSet<_> = to_delete_table_fragments - .iter() - .map(|table| table.table_id()) - .collect(); - - Self::clean_dirty_table_sink_downstreams( - dirty_sink_into_table_upstream_fragment_id, - to_delete_table_ids, - &mut table_fragments, - )?; - } + let to_delete_table_ids: HashSet<_> = to_delete_table_fragments + .iter() + .map(|table| table.table_id()) + .collect(); + + Self::clean_dirty_table_sink_downstreams(to_delete_table_ids, &mut table_fragments)?; if table_ids.is_empty() { commit_meta!(self, table_fragments)?; @@ -647,13 +628,23 @@ impl FragmentManager { // but the union branch that attaches the downstream table to the sink fragment may still exist. // This could lead to issues. Therefore, we need to find the sink fragment’s downstream, then locate its union node and delete the dirty merge. fn clean_dirty_table_sink_downstreams( - dirty_sink_into_table_upstream_fragment_id: HashSet, to_delete_table_ids: HashSet, table_fragments: &mut BTreeMapTransaction<'_, TableId, TableFragments>, ) -> MetaResult<()> { tracing::info!("cleaning dirty downstream merge nodes for table sink"); + let mut all_fragment_ids = HashSet::new(); + + for (table_id, table_fragment) in table_fragments.tree_ref() { + if to_delete_table_ids.contains(table_id) { + continue; + } + + all_fragment_ids.extend(table_fragment.fragment_ids()); + } + let mut dirty_downstream_table_ids = HashMap::new(); + for (table_id, table_fragment) in table_fragments.tree_mut() { if to_delete_table_ids.contains(table_id) { continue; @@ -663,9 +654,7 @@ impl FragmentManager { if fragment .get_upstream_fragment_ids() .iter() - .all(|upstream_fragment_id| { - !dirty_sink_into_table_upstream_fragment_id.contains(upstream_fragment_id) - }) + .all(|upstream_fragment_id| all_fragment_ids.contains(upstream_fragment_id)) { continue; } @@ -675,8 +664,7 @@ impl FragmentManager { if let Some(NodeBody::Union(_)) = node.node_body { for input in &mut node.input { if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body - && dirty_sink_into_table_upstream_fragment_id - .contains(&merge_node.upstream_fragment_id) + && !all_fragment_ids.contains(&merge_node.upstream_fragment_id) { dirty_downstream_table_ids .insert(*table_id, fragment.fragment_id); @@ -687,12 +675,6 @@ impl FragmentManager { true }) } - - fragment - .upstream_fragment_ids - .retain(|upstream_fragment_id| { - !dirty_sink_into_table_upstream_fragment_id.contains(upstream_fragment_id) - }); } } @@ -706,13 +688,16 @@ impl FragmentManager { .get_mut(&fragment_id) .with_context(|| format!("fragment not exist: id={}", fragment_id))?; + fragment + .upstream_fragment_ids + .retain(|upstream_fragment_id| all_fragment_ids.contains(upstream_fragment_id)); + for actor in &mut fragment.actors { visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| { if let Some(NodeBody::Union(_)) = node.node_body { node.input.retain_mut(|input| { if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body - && dirty_sink_into_table_upstream_fragment_id - .contains(&merge_node.upstream_fragment_id) + && !all_fragment_ids.contains(&merge_node.upstream_fragment_id) { false } else { diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 3e948e88e2821..13ecfca13c782 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -214,7 +214,7 @@ impl DdlController { ( streaming_job.clone(), ctx.merge_updates.clone(), - table_fragments.table_id(), + table_fragments.table_id().table_id(), ) }, ); @@ -223,25 +223,11 @@ impl DdlController { .create_streaming_job(table_fragments, ctx) .await?; - let mut version = mgr + let version = mgr .catalog_controller - .finish_streaming_job(stream_job_id as _) + .finish_streaming_job(stream_job_id as _, replace_table_job_info) .await?; - if let Some((streaming_job, merge_updates, table_id)) = replace_table_job_info { - version = mgr - .catalog_controller - .finish_replace_streaming_job( - table_id.table_id as _, - streaming_job, - merge_updates, - None, - Some(stream_job_id), - None, - ) - .await?; - } - Ok(version) } (CreateType::Background, _) => { @@ -257,7 +243,7 @@ impl DdlController { if result.is_ok() { let _ = mgr .catalog_controller - .finish_streaming_job(stream_job_id as _) + .finish_streaming_job(stream_job_id as _, None) .await.inspect_err(|err| { tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to finish background streaming job"); });