diff --git a/e2e_test/sink/sink_into_table.slt b/e2e_test/sink/sink_into_table.slt new file mode 100644 index 0000000000000..b7ebc69a458fa --- /dev/null +++ b/e2e_test/sink/sink_into_table.slt @@ -0,0 +1,365 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# currently, we only support single sink into table + +statement ok +create table t_simple (v1 int, v2 int); + +statement ok +create table m_simple (v1 int primary key, v2 int); + +statement ok +create sink s_simple_1 into m_simple as select v1, v2 from t_simple; + +statement error Feature is not yet implemented: create sink into table with incoming sinks +create sink s_simple_2 into m_simple as select v1, v2 from t_simple; + +# and we can't alter a table with incoming sinks +statement error Feature is not yet implemented: alter table with incoming sinks +alter table m_simple add column v3 int; + +statement ok +drop sink s_simple_1; + +statement ok +drop table t_simple; + +statement ok +drop table m_simple; + +# drop table with associated sink +statement ok +create table t_simple (v1 int, v2 int); + +statement ok +create table m_simple (v1 int primary key, v2 int); + +statement ok +create sink s_simple_1 into m_simple as select v1, v2 from t_simple; + +statement ok +drop table m_simple; + +statement ok +drop table t_simple; + +# target table with row_id as primary key +statement ok +create table t_s1 (v1 int, v2 int); + +statement ok +insert into t_s1 values (1, 11), (2, 12), (3, 13); + +statement ok +create table t_row_id_as_primary_key (v1 int, v2 int, v3 int default 1000, v4 int as v1 + v2); + +statement error Only append-only sinks can sink to a table without primary keys. +create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1; + +statement ok +create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1 with (type = 'append-only', force_append_only = 'true'); + +statement ok +flush; + +query IIII rowsort +select * from t_row_id_as_primary_key; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 + +statement ok +insert into t_s1 values (4, 14), (5, 15), (6, 16); + +query IIII rowsort +select * from t_row_id_as_primary_key; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 + +statement ok +insert into t_row_id_as_primary_key values (100, 100); + +query IIII +select * from t_row_id_as_primary_key order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 +100 100 1000 200 + +# test append only +statement ok +update t_s1 set v2 = 10 where v1 > 3; + +query IIII +select * from t_row_id_as_primary_key order by v1, v2; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 10 1000 14 +4 14 1000 18 +5 10 1000 15 +5 15 1000 20 +6 10 1000 16 +6 16 1000 22 +100 100 1000 200 + +statement ok +delete from t_s1; + +query IIII +select * from t_row_id_as_primary_key order by v1,v2; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 10 1000 14 +4 14 1000 18 +5 10 1000 15 +5 15 1000 20 +6 10 1000 16 +6 16 1000 22 +100 100 1000 200 + +statement ok +drop sink s1; + +statement ok +drop table t_row_id_as_primary_key; + +statement ok +drop table t_s1; + + +# target table with append only + +statement ok +create table t_s2 (v1 int, v2 int); + +statement ok +insert into t_s2 values (1, 11), (2, 12), (3, 13); + +statement ok +create table t_append_only (v1 int, v2 int, v3 int default 1000, v4 int as v1 + v2) append only; + +statement error Only append-only sinks can sink to a table without primary keys. +create sink s2 into t_append_only as select v1, v2 from t_s2; + +statement ok +create sink s2 into t_append_only as select v1, v2 from t_s2 with (type = 'append-only', force_append_only = 'true'); + +statement ok +flush; + +query IIII rowsort +select * from t_append_only; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 + +statement ok +insert into t_s2 values (4, 14), (5, 15), (6, 16); + +query IIII rowsort +select * from t_append_only; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 + +statement ok +insert into t_append_only values (100, 100); + +query IIII +select * from t_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 +100 100 1000 200 + +# test append only +statement ok +update t_s2 set v2 = 10 where v1 > 3; + +query IIII +select * from t_append_only order by v1, v2; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 10 1000 14 +4 14 1000 18 +5 10 1000 15 +5 15 1000 20 +6 10 1000 16 +6 16 1000 22 +100 100 1000 200 + +statement ok +delete from t_s2; + +query IIII +select * from t_append_only order by v1,v2; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 10 1000 14 +4 14 1000 18 +5 10 1000 15 +5 15 1000 20 +6 10 1000 16 +6 16 1000 22 +100 100 1000 200 + +statement ok +drop sink s2; + +statement ok +drop table t_append_only; + +statement ok +drop table t_s2; + + +# target table with primary key + +statement ok +create table t_s3 (v1 int, v2 int); + +statement ok +insert into t_s3 values (1, 11), (2, 12), (3, 13); + +statement ok +create table t_primary_key (v1 int primary key, v2 int, v3 int default 1000, v4 int as v1 + v2); + +statement ok +create sink s3 into t_primary_key as select v1, v2 from t_s3; + +statement ok +flush; + +query IIII rowsort +select * from t_primary_key; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 + +statement ok +insert into t_s3 values (4, 14), (5, 15), (6, 16); + +query IIII rowsort +select * from t_primary_key; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 + +statement ok +delete from t_s3 where v1 > 3; + +query IIII rowsort +select * from t_primary_key; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 + +statement ok +update t_s3 set v2 = 111 where v1 = 1; + +query IIII rowsort +select * from t_primary_key; +---- +1 111 1000 112 +2 12 1000 14 +3 13 1000 16 + +statement ok +insert into t_primary_key values (100, 100); + +query IIII +select * from t_primary_key order by v1; +---- +1 111 1000 112 +2 12 1000 14 +3 13 1000 16 +100 100 1000 200 + +statement ok +delete from t_s3 where v1 > 3; + +query IIII +select * from t_primary_key order by v1; +---- +1 111 1000 112 +2 12 1000 14 +3 13 1000 16 +100 100 1000 200 + +statement ok +drop sink s3; + +statement ok +drop table t_primary_key; + +statement ok +drop table t_s3; + +# cycle check + +statement ok +create table t_a(v int primary key); + +statement ok +create table t_b(v int primary key); + +statement ok +create table t_c(v int primary key); + +statement ok +create sink s_a into t_b as select v from t_a; + +statement ok +create sink s_b into t_c as select v from t_b; + +statement error Creating such a sink will result in circular dependency +create sink s_c into t_a as select v from t_c; + +statement ok +drop sink s_a; + +statement ok +drop sink s_b; + +statement ok +drop table t_a; + +statement ok +drop table t_b; + +statement ok +drop table t_c; diff --git a/proto/catalog.proto b/proto/catalog.proto index 99e3e7539443e..cbcf4e31182c5 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -144,6 +144,9 @@ message Sink { string sink_from_name = 18; StreamJobStatus stream_job_status = 19; SinkFormatDesc format_desc = 20; + + // Target table id (only applicable for table sink) + optional uint32 target_table = 21; } message Connection { @@ -287,6 +290,9 @@ message Table { // This field is used to store the description set by the `comment on` clause. optional string description = 33; + // This field is used to mark the the sink into this table. + repeated uint32 incoming_sinks = 34; + // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/proto/connector_service.proto b/proto/connector_service.proto index a887848fde7e8..2ff8dec81e27c 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -27,6 +27,7 @@ message SinkParam { string db_name = 5; string sink_from_name = 6; catalog.SinkFormatDesc format_desc = 7; + optional uint32 target_table = 8; } enum SinkPayloadFormat { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 994af9c092bb0..b613eb4205b86 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -80,6 +80,8 @@ message AlterSourceResponse { message CreateSinkRequest { catalog.Sink sink = 1; stream_plan.StreamFragmentGraph fragment_graph = 2; + // It is used to provide a replace plan for the downstream table in `create sink into table` requests. + optional ReplaceTablePlan affected_table_change = 3; } message CreateSinkResponse { @@ -91,6 +93,7 @@ message CreateSinkResponse { message DropSinkRequest { uint32 sink_id = 1; bool cascade = 2; + optional ReplaceTablePlan affected_table_change = 3; } message DropSinkResponse { @@ -280,7 +283,7 @@ message DropIndexResponse { uint64 version = 2; } -message ReplaceTablePlanRequest { +message ReplaceTablePlan { // The new table catalog, with the correct table ID and a new version. // If the new version does not match the subsequent version in the meta service's // catalog, this request will be rejected. @@ -288,11 +291,16 @@ message ReplaceTablePlanRequest { // The new materialization plan, where all schema are updated. stream_plan.StreamFragmentGraph fragment_graph = 2; // The mapping from the old columns to the new columns of the table. + // If no column modifications occur (such as for sinking into table), this will be None. catalog.ColIndexMapping table_col_index_mapping = 3; // Source catalog of table's associated source catalog.Source source = 4; } +message ReplaceTablePlanRequest { + ReplaceTablePlan plan = 1; +} + message ReplaceTablePlanResponse { common.Status status = 1; // The new global catalog version. diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index e610a37ae6bbd..990a69c2caae2 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -90,19 +90,11 @@ message ThrottleMutation { map actor_throttle = 1; } -message Barrier { - enum BarrierKind { - BARRIER_KIND_UNSPECIFIED = 0; - // The first barrier after a fresh start or recovery. - // There will be no data associated with the previous epoch of the barrier. - BARRIER_KIND_INITIAL = 1; - // A normal barrier. Data should be flushed locally. - BARRIER_KIND_BARRIER = 2; - // A checkpoint barrier. Data should be synchorized to the shared storage. - BARRIER_KIND_CHECKPOINT = 3; - } +message CombinedMutation { + repeated BarrierMutation mutations = 1; +} - data.Epoch epoch = 1; +message BarrierMutation { oneof mutation { // Add new dispatchers to some actors, used for creating materialized views. AddMutation add = 3; @@ -119,7 +111,27 @@ message Barrier { ResumeMutation resume = 8; // Throttle specific source exec or chain exec. ThrottleMutation throttle = 10; + // Combined mutation. + CombinedMutation combined = 100; + } +} + +message Barrier { + enum BarrierKind { + BARRIER_KIND_UNSPECIFIED = 0; + // The first barrier after a fresh start or recovery. + // There will be no data associated with the previous epoch of the barrier. + BARRIER_KIND_INITIAL = 1; + // A normal barrier. Data should be flushed locally. + BARRIER_KIND_BARRIER = 2; + // A checkpoint barrier. Data should be synchorized to the shared storage. + BARRIER_KIND_CHECKPOINT = 3; } + + data.Epoch epoch = 1; + + BarrierMutation mutation = 3; + // Used for tracing. map tracing_context = 2; // The kind of the barrier. @@ -210,6 +222,7 @@ message SinkDesc { // it is the name of the sink itself. string sink_from_name = 12; catalog.SinkFormatDesc format_desc = 13; + optional uint32 target_table = 14; } enum SinkLogStoreType { diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 973ccb1390704..34c8205c05f01 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -44,7 +44,7 @@ use risingwave_stream::executor::external::ExternalStorageTable; use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::executor::test_utils::MockSource; use risingwave_stream::executor::{ - expect_first_barrier, ActorContext, Barrier, BoxedExecutor as StreamBoxedExecutor, + expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedExecutor as StreamBoxedExecutor, BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, MaterializeExecutor, Message, Mutation, PkIndices, PkIndicesRef, StreamExecutorError, }; @@ -306,12 +306,13 @@ async fn test_cdc_backfill() -> StreamResult<()> { _phantom: PhantomData, })], ); - let init_barrier = Barrier::new_test_barrier(curr_epoch).with_mutation(Mutation::Add { - adds: HashMap::new(), - added_actors: HashSet::new(), - splits, - pause: false, - }); + let init_barrier = + Barrier::new_test_barrier(curr_epoch).with_mutation(Mutation::Add(AddMutation { + adds: HashMap::new(), + added_actors: HashSet::new(), + splits, + pause: false, + })); tx.send_barrier(init_barrier); diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 1fdd8b44959e3..3fa82e36f976b 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -64,6 +64,9 @@ pub struct SinkDesc { /// Name of the "table" field for Debezium. If the sink is from table or mv, /// it is the name of table/mv. Otherwise, it is the name of the sink. pub sink_from_name: String, + + /// Id of the target table for sink into table. + pub target_table: Option, } impl SinkDesc { @@ -95,6 +98,7 @@ impl SinkDesc { initialized_at_epoch: None, db_name: self.db_name, sink_from_name: self.sink_from_name, + target_table: self.target_table, } } @@ -116,6 +120,7 @@ impl SinkDesc { format_desc: self.format_desc.as_ref().map(|f| f.to_proto()), db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), + target_table: self.target_table.map(|table_id| table_id.table_id()), } } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index ca3a09e7f2eda..55d450adc5ee2 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -296,6 +296,8 @@ pub struct SinkCatalog { /// Name for the table info for Debezium sink pub sink_from_name: String, + + pub target_table: Option, } impl SinkCatalog { @@ -333,6 +335,7 @@ impl SinkCatalog { db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), stream_job_status: PbStreamJobStatus::Creating.into(), + target_table: self.target_table.map(|table_id| table_id.table_id()), } } @@ -421,6 +424,7 @@ impl From for SinkCatalog { initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from), db_name: pb.db_name, sink_from_name: pb.sink_from_name, + target_table: pb.target_table.map(TableId::new), } } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 05e412cf50b3d..795e9f73b52cf 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -31,6 +31,7 @@ pub mod pulsar; pub mod redis; pub mod remote; pub mod starrocks; +pub mod table; pub mod test_sink; pub mod utils; pub mod writer; @@ -42,7 +43,7 @@ use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::{ColumnDesc, Field, Schema}; +use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableId}; use risingwave_common::error::{anyhow_error, ErrorCode, RwError}; use risingwave_common::metrics::{ LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge, @@ -58,6 +59,7 @@ use self::catalog::{SinkFormatDesc, SinkType}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::log_store::LogReader; +use crate::sink::table::TABLE_SINK; use crate::sink::writer::SinkWriter; use crate::ConnectorParams; @@ -83,7 +85,8 @@ macro_rules! for_all_sinks { { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, - { Test, $crate::sink::test_sink::TestSink } + { Test, $crate::sink::test_sink::TestSink }, + { Table, $crate::sink::table::TableSink } } $(,$arg)* } @@ -144,6 +147,7 @@ pub struct SinkParam { pub format_desc: Option, pub db_name: String, pub sink_from_name: String, + pub target_table: Option, } impl SinkParam { @@ -175,6 +179,7 @@ impl SinkParam { format_desc, db_name: pb_param.db_name, sink_from_name: pb_param.sink_from_name, + target_table: pb_param.target_table.map(TableId::new), } } @@ -190,6 +195,7 @@ impl SinkParam { format_desc: self.format_desc.as_ref().map(|f| f.to_proto()), db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), + target_table: self.target_table.map(|table_id| table_id.table_id()), } } @@ -215,6 +221,7 @@ impl From for SinkParam { format_desc: sink_catalog.format_desc, db_name: sink_catalog.db_name, sink_from_name: sink_catalog.sink_from_name, + target_table: sink_catalog.target_table, } } } @@ -325,10 +332,14 @@ impl SinkImpl { param.properties.remove(PRIVATE_LINK_TARGET_KEY); param.properties.remove(CONNECTION_NAME_KEY); - let sink_type = param - .properties - .get(CONNECTOR_TYPE_KEY) - .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?; + let sink_type = if param.target_table.is_some() { + TABLE_SINK + } else { + param.properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { + SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) + })? + }; + match_sink_name_str!( sink_type.to_lowercase().as_str(), SinkType, diff --git a/src/connector/src/sink/table.rs b/src/connector/src/sink/table.rs new file mode 100644 index 0000000000000..4290a671c6ca2 --- /dev/null +++ b/src/connector/src/sink/table.rs @@ -0,0 +1,76 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; + +use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset}; +use crate::sink::{ + DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkParam, SinkWriterParam, +}; + +pub const TABLE_SINK: &str = "table"; + +/// A table sink outputs stream into another RisingWave's table. +/// +/// Different from a materialized view, table sinks do not enforce strong consistency between upstream and downstream in principle. As a result, the `create sink` statement returns immediately, which is similar to any other `create sink`. It also allows users to execute DMLs on these target tables. +/// +/// See also [RFC: Create Sink into Table](https://github.com/risingwavelabs/rfcs/pull/52). +#[derive(Debug)] +pub struct TableSink; + +impl TryFrom for TableSink { + type Error = SinkError; + + fn try_from(_value: SinkParam) -> std::result::Result { + Ok(Self) + } +} + +impl Sink for TableSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = Self; + + const SINK_NAME: &'static str = TABLE_SINK; + + async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { + Ok(Self) + } + + async fn validate(&self) -> Result<()> { + Ok(()) + } +} + +#[async_trait] +impl LogSinker for TableSink { + async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> { + log_reader.init().await?; + loop { + let (epoch, item) = log_reader.next_item().await?; + match item { + LogStoreReadItem::StreamChunk { chunk_id, .. } => { + log_reader + .truncate(TruncateOffset::Chunk { epoch, chunk_id }) + .await?; + } + LogStoreReadItem::Barrier { .. } => { + log_reader + .truncate(TruncateOffset::Barrier { epoch }) + .await?; + } + LogStoreReadItem::UpdateVnodeBitmap(_) => {} + } + } + } +} diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index f867c29623a06..e5ef528913641 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -592,6 +592,7 @@ mod tests { format_desc: None, db_name: "db".into(), sink_from_name: "table".into(), + target_table: None, }; let rw_schema = param.schema(); diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index ea065af3746f6..79c628ae76aa0 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -791,6 +791,7 @@ impl TestCase { "test_db".into(), "test_table".into(), format_desc, + None, ) { Ok(sink_plan) => { ret.sink_plan = Some(explain_plan(&sink_plan.into())); diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 69a613c0d59c1..9423734de9f7e 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -26,7 +26,8 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, create_connection_request, PbTableJobType, + alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan, + PbTableJobType, ReplaceTablePlan, }; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_rpc_client::MetaClient; @@ -108,7 +109,12 @@ pub trait CatalogWriter: Send + Sync { graph: StreamFragmentGraph, ) -> Result<()>; - async fn create_sink(&self, sink: PbSink, graph: StreamFragmentGraph) -> Result<()>; + async fn create_sink( + &self, + sink: PbSink, + graph: StreamFragmentGraph, + affected_table_change: Option, + ) -> Result<()>; async fn create_function(&self, function: PbFunction) -> Result<()>; @@ -136,7 +142,12 @@ pub trait CatalogWriter: Send + Sync { async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>; - async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()>; + async fn drop_sink( + &self, + sink_id: u32, + cascade: bool, + affected_table_change: Option, + ) -> Result<()>; async fn drop_database(&self, database_id: u32) -> Result<()>; @@ -291,8 +302,16 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn create_sink(&self, sink: PbSink, graph: StreamFragmentGraph) -> Result<()> { - let (_id, version) = self.meta_client.create_sink(sink, graph).await?; + async fn create_sink( + &self, + sink: PbSink, + graph: StreamFragmentGraph, + affected_table_change: Option, + ) -> Result<()> { + let (_id, version) = self + .meta_client + .create_sink(sink, graph, affected_table_change) + .await?; self.wait_version(version).await } @@ -358,8 +377,16 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> { - let version = self.meta_client.drop_sink(sink_id, cascade).await?; + async fn drop_sink( + &self, + sink_id: u32, + cascade: bool, + affected_table_change: Option, + ) -> Result<()> { + let version = self + .meta_client + .drop_sink(sink_id, cascade, affected_table_change) + .await?; self.wait_version(version).await } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 750a06da7d231..c8b7b4ef437e4 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -28,7 +28,7 @@ use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::DefaultColumnDesc; -use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId}; +use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId}; use crate::expr::ExprImpl; use crate::optimizer::property::Cardinality; use crate::user::UserId; @@ -155,6 +155,9 @@ pub struct TableCatalog { /// description of table, set by `comment on`. pub description: Option, + + /// Incoming sinks, used for sink into table + pub incoming_sinks: Vec, } // How the stream job was created will determine @@ -442,6 +445,7 @@ impl TableCatalog { stream_job_status: PbStreamJobStatus::Creating.into(), create_type: self.create_type.to_prost().into(), description: self.description.clone(), + incoming_sinks: self.incoming_sinks.clone(), } } @@ -556,6 +560,7 @@ impl From for TableCatalog { cleaned_by_watermark: matches!(tb.cleaned_by_watermark, true), create_type: CreateType::from_prost(create_type), description: tb.description, + incoming_sinks: tb.incoming_sinks.clone(), } } } @@ -649,6 +654,7 @@ mod tests { stream_job_status: PbStreamJobStatus::Creating.into(), create_type: PbCreateType::Foreground.into(), description: Some("description".to_string()), + incoming_sinks: vec![], } .into(); @@ -705,7 +711,8 @@ mod tests { initialized_at_epoch: None, cleaned_by_watermark: false, create_type: CreateType::Foreground, - description: Some("description".to_string()) + description: Some("description".to_string()), + incoming_sinks: vec![], } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 649fade132e0d..b042595dbf595 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -20,23 +20,19 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; -use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::Table; -use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; -use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement, }; use risingwave_sqlparser::parser::Parser; use super::create_source::get_json_schema_location; -use super::create_table::{gen_create_table_plan, ColumnIdGenerator}; +use super::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::table_catalog::TableType; -use crate::handler::create_table::gen_create_table_plan_with_source; -use crate::{build_graph, Binder, OptimizerContext, TableCatalog, WithOptions}; +use crate::session::SessionImpl; +use crate::{Binder, TableCatalog, WithOptions}; /// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or /// `DropColumn`. @@ -46,31 +42,11 @@ pub async fn handle_alter_table_column( operation: AlterTableOperation, ) -> Result { let session = handler_args.session; - let db_name = session.database(); - let (schema_name, real_table_name) = - Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; - let search_path = session.config().search_path(); - let user_name = &session.auth_context().user_name; - - let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); - - let original_catalog = { - let reader = session.env().catalog_reader().read_guard(); - let (table, schema_name) = - reader.get_table_by_name(db_name, schema_path, &real_table_name)?; - - match table.table_type() { - TableType::Table => {} - - _ => Err(ErrorCode::InvalidInputSyntax(format!( - "\"{table_name}\" is not a table or cannot be altered" - )))?, - } - - session.check_privilege_for_drop_alter(schema_name, &**table)?; + let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; - table.clone() - }; + if !original_catalog.incoming_sinks.is_empty() { + bail_not_implemented!("alter table with incoming sinks"); + } // TODO(yuhao): alter table with generated columns. if original_catalog.has_generated_column() { @@ -183,61 +159,19 @@ pub async fn handle_alter_table_column( panic!("unexpected statement type: {:?}", definition); }; - let (graph, table, source) = { - let context = OptimizerContext::from_handler_args(handler_args); - let (plan, source, table) = match source_schema { - Some(source_schema) => { - gen_create_table_plan_with_source( - context, - table_name, - columns, - constraints, - source_schema, - source_watermarks, - col_id_gen, - append_only, - ) - .await? - } - None => gen_create_table_plan( - context, - table_name, - columns, - constraints, - col_id_gen, - source_watermarks, - append_only, - )?, - }; - - // TODO: avoid this backward conversion. - if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() { - Err(ErrorCode::InvalidInputSyntax( - "alter primary key of table is not supported".to_owned(), - ))? - } - - let graph = StreamFragmentGraph { - parallelism: session - .config() - .streaming_parallelism() - .map(|parallelism| Parallelism { - parallelism: parallelism.get(), - }), - ..build_graph(plan) - }; - - // Fill the original table ID. - let table = Table { - id: original_catalog.id().table_id(), - optional_associated_source_id: original_catalog - .associated_source_id() - .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())), - ..table - }; - - (graph, table, source) - }; + let (graph, table, source) = generate_stream_graph_for_table( + &session, + table_name, + &original_catalog, + source_schema, + handler_args, + col_id_gen, + columns, + constraints, + source_watermarks, + append_only, + ) + .await?; // Calculate the mapping from the original columns to the new columns. let col_index_mapping = ColIndexMapping::new( @@ -273,6 +207,39 @@ fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool { } } +pub fn fetch_table_catalog_for_alter( + session: &SessionImpl, + table_name: &ObjectName, +) -> Result> { + let db_name = session.database(); + let (schema_name, real_table_name) = + Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; + let search_path = session.config().search_path(); + let user_name = &session.auth_context().user_name; + + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let original_catalog = { + let reader = session.env().catalog_reader().read_guard(); + let (table, schema_name) = + reader.get_table_by_name(db_name, schema_path, &real_table_name)?; + + match table.table_type() { + TableType::Table => {} + + _ => Err(ErrorCode::InvalidInputSyntax(format!( + "\"{table_name}\" is not a table or cannot be altered" + )))?, + } + + session.check_privilege_for_drop_alter(schema_name, &**table)?; + + table.clone() + }; + + Ok(original_catalog) +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 8a30a3d681cc1..73060b7a9f965 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -12,38 +12,53 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::rc::Rc; -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock}; +use anyhow::Context; use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId}; -use risingwave_common::error::{ErrorCode, Result}; -use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc}; +use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_common::types::Datum; +use risingwave_common::util::value_encoding::DatumFromProtoExt; +use risingwave_common::{bail, bail_not_implemented, catalog}; +use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; +use risingwave_pb::catalog::{PbSource, Table}; +use risingwave_pb::ddl_service::ReplaceTablePlan; +use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; +use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode}; use risingwave_sqlparser::ast::{ ConnectorSchema, CreateSink, CreateSinkStatement, EmitMode, Encode, Format, ObjectName, Query, - Select, SelectItem, SetExpr, TableFactor, TableWithJoins, + Select, SelectItem, SetExpr, Statement, TableFactor, TableWithJoins, }; +use risingwave_sqlparser::parser::Parser; use super::create_mv::get_column_names; use super::RwPgResponse; use crate::binder::Binder; +use crate::catalog::catalog_service::CatalogReadGuard; +use crate::expr::{ExprImpl, InputRef, Literal}; +use crate::handler::alter_table_column::fetch_table_catalog_for_alter; +use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; use crate::handler::privilege::resolve_query_privileges; +use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; -use crate::optimizer::plan_node::Explain; +use crate::optimizer::plan_node::{generic, Explain, LogicalSource, StreamProject}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor}; use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::utils::resolve_privatelink_in_with_option; -use crate::{Planner, WithOptions}; +use crate::{Planner, TableCatalog, WithOptions}; pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { let table_factor = TableFactor::Table { @@ -71,11 +86,19 @@ pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { }) } +// used to store result of `gen_sink_plan` +pub struct SinkPlanContext { + pub query: Box, + pub sink_plan: PlanRef, + pub sink_catalog: SinkCatalog, + pub target_table_catalog: Option>, +} + pub fn gen_sink_plan( session: &SessionImpl, context: OptimizerContextRef, stmt: CreateSinkStatement, -) -> Result<(Box, PlanRef, SinkCatalog)> { +) -> Result { let db_name = session.database(); let (sink_schema_name, sink_table_name) = Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?; @@ -93,6 +116,8 @@ pub fn gen_sink_plan( } }; + let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value()); + let (sink_database_id, sink_schema_id) = session.get_database_and_schema_id_for_create(sink_schema_name.clone())?; @@ -111,6 +136,19 @@ pub fn gen_sink_plan( let col_names = get_column_names(&bound, session, stmt.columns)?; let mut with_options = context.with_options().clone(); + + if sink_into_table_name.is_some() { + let prev = with_options + .inner_mut() + .insert(CONNECTOR_TYPE_KEY.to_string(), "table".to_string()); + + if prev.is_some() { + return Err(RwError::from(ErrorCode::BindError( + "In the case of sinking into table, the 'connector' parameter should not be provided.".to_string(), + ))); + } + } + let connection_id = { let conn_id = resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?; @@ -124,16 +162,18 @@ pub fn gen_sink_plan( let connector = with_options .get(CONNECTOR_TYPE_KEY) + .cloned() .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?; + let format_desc = match stmt.sink_schema { // Case A: new syntax `format ... encode ...` Some(f) => { - validate_compatibility(connector, &f)?; + validate_compatibility(&connector, &f)?; Some(bind_sink_format_desc(f)?) - }, + } None => match with_options.get(SINK_TYPE_OPTION) { // Case B: old syntax `type = '...'` - Some(t) => SinkFormatDesc::from_legacy_type(connector, t)?.map(|mut f| { + Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| { session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`."); if let Some(v) = with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) { f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into()); @@ -150,6 +190,14 @@ pub fn gen_sink_plan( plan_root.set_out_names(col_names)?; }; + let target_table_catalog = stmt + .into_table_name + .as_ref() + .map(|table_name| fetch_table_catalog_for_alter(session, table_name)) + .transpose()?; + + let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id()); + let sink_plan = plan_root.gen_sink_plan( sink_table_name, definition, @@ -158,9 +206,11 @@ pub fn gen_sink_plan( db_name.to_owned(), sink_from_table_name, format_desc, + target_table, )?; let sink_desc = sink_plan.sink_desc().clone(); - let sink_plan: PlanRef = sink_plan.into(); + + let mut sink_plan: PlanRef = sink_plan.into(); let ctx = sink_plan.ctx(); let explain_trace = ctx.is_explain_trace(); @@ -180,7 +230,45 @@ pub fn gen_sink_plan( dependent_relations.into_iter().collect_vec(), ); - Ok((query, sink_plan, sink_catalog)) + if let Some(table_catalog) = &target_table_catalog { + for column in sink_catalog.full_columns() { + if column.is_generated() { + unreachable!("can not derive generated columns in a sink's catalog, but meet one"); + } + } + + let user_defined_primary_key_table = + !(table_catalog.append_only || table_catalog.row_id_index.is_some()); + + if !(user_defined_primary_key_table + || sink_catalog.sink_type == SinkType::AppendOnly + || sink_catalog.sink_type == SinkType::ForceAppendOnly) + { + return Err(RwError::from(ErrorCode::BindError( + "Only append-only sinks can sink to a table without primary keys.".to_string(), + ))); + } + + let exprs = derive_default_column_project_for_sink(&sink_catalog, table_catalog)?; + + let logical_project = generic::Project::new(exprs, sink_plan); + + sink_plan = StreamProject::new(logical_project).into(); + + let exprs = + LogicalSource::derive_output_exprs_from_generated_columns(table_catalog.columns())?; + if let Some(exprs) = exprs { + let logical_project = generic::Project::new(exprs, sink_plan); + sink_plan = StreamProject::new(logical_project).into(); + } + }; + + Ok(SinkPlanContext { + query, + sink_plan, + sink_catalog, + target_table_catalog, + }) } pub async fn handle_create_sink( @@ -197,9 +285,16 @@ pub async fn handle_create_sink( return Ok(resp); } - let (sink, graph) = { + let (sink, graph, target_table_catalog) = { let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); - let (query, plan, sink) = gen_sink_plan(&session, context.clone(), stmt)?; + + let SinkPlanContext { + query, + sink_plan: plan, + sink_catalog: sink, + target_table_catalog, + } = gen_sink_plan(&session, context.clone(), stmt)?; + let has_order_by = !query.order_by.is_empty(); if has_order_by { context.warn_to_user( @@ -207,7 +302,9 @@ pub async fn handle_create_sink( .to_string(), ); } + let mut graph = build_graph(plan); + graph.parallelism = session .config() @@ -215,9 +312,40 @@ pub async fn handle_create_sink( .map(|parallelism| Parallelism { parallelism: parallelism.get(), }); - (sink, graph) + + (sink, graph, target_table_catalog) }; + let mut target_table_replace_plan = None; + if let Some(table_catalog) = target_table_catalog { + if !table_catalog.incoming_sinks.is_empty() { + bail_not_implemented!(issue = 13818, "create sink into table with incoming sinks"); + } + + check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())?; + + let (mut graph, mut table, source) = + reparse_table_for_sink(&session, &table_catalog).await?; + + table.incoming_sinks = table_catalog.incoming_sinks.clone(); + + // for now we only support one incoming sink + assert!(table.incoming_sinks.is_empty()); + + for fragment in graph.fragments.values_mut() { + if let Some(node) = &mut fragment.node { + insert_merger_to_union(node); + } + } + + target_table_replace_plan = Some(ReplaceTablePlan { + source, + table: Some(table), + fragment_graph: Some(graph), + table_col_index_mapping: None, + }); + } + let _job_guard = session .env() @@ -230,11 +358,213 @@ pub async fn handle_create_sink( )); let catalog_writer = session.catalog_writer()?; - catalog_writer.create_sink(sink.to_proto(), graph).await?; + catalog_writer + .create_sink(sink.to_proto(), graph, target_table_replace_plan) + .await?; Ok(PgResponse::empty_result(StatementType::CREATE_SINK)) } +fn check_cycle_for_sink( + session: &SessionImpl, + sink_catalog: SinkCatalog, + table_id: catalog::TableId, +) -> Result<()> { + let reader = session.env().catalog_reader().read_guard(); + + let mut sinks = HashMap::new(); + let db_name = session.database(); + for schema in reader.iter_schemas(db_name)? { + for sink in schema.iter_sink() { + sinks.insert(sink.id.sink_id, sink.as_ref()); + } + } + fn visit_sink( + session: &SessionImpl, + reader: &CatalogReadGuard, + sink_index: &HashMap, + sink: &SinkCatalog, + visited_tables: &mut HashSet, + ) -> Result<()> { + for table_id in &sink.dependent_relations { + if let Ok(table) = reader.get_table_by_id(table_id) { + visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? + } else { + bail!("table not found: {:?}", table_id); + } + } + + Ok(()) + } + + fn visit_table( + session: &SessionImpl, + reader: &CatalogReadGuard, + sink_index: &HashMap, + table: &TableCatalog, + visited_tables: &mut HashSet, + ) -> Result<()> { + if visited_tables.contains(&table.id.table_id) { + return Err(RwError::from(ErrorCode::BindError( + "Creating such a sink will result in circular dependency.".to_string(), + ))); + } + + let _ = visited_tables.insert(table.id.table_id); + for sink_id in &table.incoming_sinks { + if let Some(sink) = sink_index.get(sink_id) { + visit_sink(session, reader, sink_index, sink, visited_tables)? + } else { + bail!("sink not found: {:?}", sink_id); + } + } + + Ok(()) + } + + let mut visited_tables = HashSet::new(); + visited_tables.insert(table_id.table_id); + + visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables) +} + +pub(crate) async fn reparse_table_for_sink( + session: &Arc, + table_catalog: &Arc, +) -> Result<(StreamFragmentGraph, Table, Option)> { + // Retrieve the original table definition and parse it to AST. + let [definition]: [_; 1] = Parser::parse_sql(&table_catalog.definition) + .context("unable to parse original table definition")? + .try_into() + .unwrap(); + let Statement::CreateTable { + name, + source_schema, + .. + } = &definition + else { + panic!("unexpected statement: {:?}", definition); + }; + + let table_name = name.clone(); + let source_schema = source_schema + .clone() + .map(|source_schema| source_schema.into_v2_with_warning()); + + // Create handler args as if we're creating a new table with the altered definition. + let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; + let col_id_gen = ColumnIdGenerator::new_alter(table_catalog); + let Statement::CreateTable { + columns, + constraints, + source_watermarks, + append_only, + .. + } = definition + else { + panic!("unexpected statement type: {:?}", definition); + }; + + let (graph, table, source) = generate_stream_graph_for_table( + session, + table_name, + table_catalog, + source_schema, + handler_args, + col_id_gen, + columns, + constraints, + source_watermarks, + append_only, + ) + .await?; + + Ok((graph, table, source)) +} + +pub(crate) fn insert_merger_to_union(node: &mut StreamNode) { + if let Some(NodeBody::Union(_union_node)) = &mut node.node_body { + node.input.push(StreamNode { + identity: "Merge (sink into table)".to_string(), + fields: node.fields.clone(), + node_body: Some(NodeBody::Merge(MergeNode { + upstream_dispatcher_type: DispatcherType::Hash as _, + ..Default::default() + })), + ..Default::default() + }); + + return; + } + + for input in &mut node.input { + insert_merger_to_union(input); + } +} +fn derive_default_column_project_for_sink( + sink: &SinkCatalog, + target_table_catalog: &Arc, +) -> Result> { + let mut exprs = vec![]; + + let sink_visible_columns = sink + .full_columns() + .iter() + .enumerate() + .filter(|(_i, c)| !c.is_hidden()) + .collect_vec(); + + for (idx, table_column) in target_table_catalog.columns().iter().enumerate() { + if table_column.is_generated() { + continue; + } + + let data_type = table_column.data_type(); + + if idx < sink_visible_columns.len() { + let (sink_col_idx, sink_column) = sink_visible_columns[idx]; + + let sink_col_type = sink_column.data_type(); + + if data_type != sink_col_type { + bail!( + "column type mismatch: {:?} vs {:?}", + data_type, + sink_col_type + ); + } else { + exprs.push(ExprImpl::InputRef(Box::new(InputRef::new( + sink_col_idx, + data_type.clone(), + )))); + } + } else { + let data = match table_column + .column_desc + .generated_or_default_column + .as_ref() + { + // default column with default value + Some(GeneratedOrDefaultColumn::DefaultColumn(default_column)) => { + Datum::from_protobuf(default_column.get_snapshot_value().unwrap(), data_type) + .unwrap() + } + // default column with no default value + None => None, + + // generated column is unreachable + _ => unreachable!(), + }; + + exprs.push(ExprImpl::Literal(Box::new(Literal::new( + data, + data_type.clone(), + )))); + }; + } + Ok(exprs) +} + /// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`. /// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`] /// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`. @@ -248,7 +578,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { F::Upsert => SinkFormat::Upsert, F::Debezium => SinkFormat::Debezium, f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => { - return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into()) + return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into()); } }; let encode = match value.row_encode { @@ -257,7 +587,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { E::Avro => SinkEncode::Avro, E::Template => SinkEncode::Template, e @ (E::Native | E::Csv | E::Bytes) => { - return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into()) + return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into()); } }; let mut options = WithOptions::try_from(value.row_options.as_slice())?.into_inner(); @@ -303,6 +633,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock Result<()> { let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS .get(connector) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1c1e5573f6010..399dcbd0e4b0a 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; +use std::sync::Arc; use anyhow::anyhow; use either::Either; @@ -31,11 +32,12 @@ use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector::source; use risingwave_connector::source::external::{DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; -use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, WatermarkDesc}; +use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{DefaultColumnDesc, GeneratedColumnDesc}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; +use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ CdcTableInfo, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Format, ObjectName, SourceWatermark, TableConstraint, @@ -671,7 +673,7 @@ fn gen_table_plan_inner( let source_catalog = source.as_ref().map(|source| Rc::new((source).into())); let source_node: PlanRef = LogicalSource::new( - source_catalog, + source_catalog.clone(), columns.clone(), row_id_index, false, @@ -1020,6 +1022,76 @@ pub fn check_create_table_with_source( Ok(source_schema) } +#[allow(clippy::too_many_arguments)] +pub async fn generate_stream_graph_for_table( + session: &Arc, + table_name: ObjectName, + original_catalog: &Arc, + source_schema: Option, + handler_args: HandlerArgs, + col_id_gen: ColumnIdGenerator, + columns: Vec, + constraints: Vec, + source_watermarks: Vec, + append_only: bool, +) -> Result<(StreamFragmentGraph, Table, Option)> { + use risingwave_pb::catalog::table::OptionalAssociatedSourceId; + + let context = OptimizerContext::from_handler_args(handler_args); + let (plan, source, table) = match source_schema { + Some(source_schema) => { + gen_create_table_plan_with_source( + context, + table_name, + columns, + constraints, + source_schema, + source_watermarks, + col_id_gen, + append_only, + ) + .await? + } + None => gen_create_table_plan( + context, + table_name, + columns, + constraints, + col_id_gen, + source_watermarks, + append_only, + )?, + }; + + // TODO: avoid this backward conversion. + if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() { + Err(ErrorCode::InvalidInputSyntax( + "alter primary key of table is not supported".to_owned(), + ))? + } + + let graph = StreamFragmentGraph { + parallelism: session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }), + ..build_graph(plan) + }; + + // Fill the original table ID. + let table = Table { + id: original_catalog.id().table_id(), + optional_associated_source_id: original_catalog + .associated_source_id() + .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())), + ..table + }; + + Ok((graph, table, source)) +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 5554f69ac9fbb..3f4c3f3df6ea9 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -14,11 +14,13 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::error::Result; +use risingwave_pb::ddl_service::ReplaceTablePlan; use risingwave_sqlparser::ast::ObjectName; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::root_catalog::SchemaPath; +use crate::handler::create_sink::reparse_table_for_sink; use crate::handler::HandlerArgs; pub async fn handle_drop_sink( @@ -34,7 +36,7 @@ pub async fn handle_drop_sink( let user_name = &session.auth_context().user_name; let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); - let sink_id = { + let sink = { let catalog_reader = session.env().catalog_reader().read_guard(); let (sink, schema_name) = match catalog_reader.get_sink_by_name(db_name, schema_path, &sink_name) { @@ -52,11 +54,38 @@ pub async fn handle_drop_sink( session.check_privilege_for_drop_alter(schema_name, &*sink)?; - sink.id + sink }; + let sink_id = sink.id; + + let mut affected_table_change = None; + if let Some(target_table_id) = &sink.target_table { + let table_catalog = { + let reader = session.env().catalog_reader().read_guard(); + let table = reader.get_table_by_id(target_table_id)?; + table.clone() + }; + + let (graph, mut table, source) = reparse_table_for_sink(&session, &table_catalog).await?; + + // for now we only support one incoming sink + assert_eq!(table_catalog.incoming_sinks.len(), 1); + + table.incoming_sinks = vec![]; + + affected_table_change = Some(ReplaceTablePlan { + source, + table: Some(table), + fragment_graph: Some(graph), + table_col_index_mapping: None, + }); + } + let catalog_writer = session.catalog_writer()?; - catalog_writer.drop_sink(sink_id.sink_id, cascade).await?; + catalog_writer + .drop_sink(sink_id.sink_id, cascade, affected_table_change) + .await?; Ok(PgResponse::empty_result(StatementType::DROP_SINK)) } diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 6bec02b7bd4db..ad631662e4f85 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -110,7 +110,7 @@ async fn do_handle_explain( .map(|x| x.0), Statement::CreateSink { stmt } => { - gen_sink_plan(&session, context.clone(), stmt).map(|x| x.1) + gen_sink_plan(&session, context.clone(), stmt).map(|plan| plan.sink_plan) } Statement::CreateIndex { diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index de16b1893de10..f6de31e4b6990 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -16,22 +16,29 @@ use std::collections::HashMap; use std::ops::DerefMut; pub mod plan_node; + pub use plan_node::{Explain, PlanRef}; + pub mod property; mod delta_join_solver; mod heuristic_optimizer; mod plan_rewriter; + pub use plan_rewriter::PlanRewriter; + mod plan_visitor; + pub use plan_visitor::{ ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor, }; + mod logical_optimization; mod optimizer_context; mod plan_expr_rewriter; mod plan_expr_visitor; mod rule; + use fixedbitset::FixedBitSet; use itertools::Itertools as _; pub use logical_optimization::*; @@ -39,7 +46,7 @@ pub use optimizer_context::*; use plan_expr_rewriter::ConstEvalRewriter; use property::Order; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, + ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId, }; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -467,7 +474,10 @@ impl PlanRoot { .collect_vec() }; - fn inject_project_if_needed(columns: &[ColumnCatalog], node: PlanRef) -> Result { + fn inject_project_for_generated_column_if_needed( + columns: &[ColumnCatalog], + node: PlanRef, + ) -> Result { let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?; if let Some(exprs) = exprs { let logical_project = generic::Project::new(exprs, node); @@ -476,6 +486,13 @@ impl PlanRoot { Ok(node) } + #[derive(PartialEq, Debug, Copy, Clone)] + enum PrimaryKeyKind { + UserDefinedPrimaryKey, + RowIdAsPrimaryKey, + AppendOnly, + } + fn inject_dml_node( columns: &[ColumnCatalog], append_only: bool, @@ -487,7 +504,7 @@ impl PlanRoot { let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into(); // Add generated columns. - dml_node = inject_project_if_needed(columns, dml_node)?; + dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?; dml_node = match kind { PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::RowIdAsPrimaryKey => { @@ -500,13 +517,6 @@ impl PlanRoot { Ok(dml_node) } - #[derive(PartialEq, Debug, Copy, Clone)] - enum PrimaryKeyKind { - UserDefinedPrimaryKey, - RowIdAsPrimaryKey, - AppendOnly, - } - let kind = if append_only { assert!(row_id_index.is_some()); PrimaryKeyKind::AppendOnly @@ -528,12 +538,14 @@ impl PlanRoot { let union_inputs = if with_external_source { let mut external_source_node = stream_plan; - external_source_node = inject_project_if_needed(&columns, external_source_node)?; + external_source_node = + inject_project_for_generated_column_if_needed(&columns, external_source_node)?; external_source_node = match kind { PrimaryKeyKind::UserDefinedPrimaryKey => { RequiredDist::hash_shard(&pk_column_indices) .enforce_if_not_satisfies(external_source_node, &Order::any())? } + PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => { StreamExchange::new_no_shuffle(external_source_node).into() } @@ -705,6 +717,7 @@ impl PlanRoot { db_name: String, sink_from_table_name: String, format_desc: Option, + target_table: Option, ) -> Result { let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close)?; @@ -713,6 +726,7 @@ impl PlanRoot { sink_name, db_name, sink_from_table_name, + target_table, self.required_dist.clone(), self.required_order.clone(), self.out_fields.clone(), diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 8cc509214ee00..efa059e50efd9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -253,6 +253,7 @@ impl StreamMaterialize { cleaned_by_watermark: false, create_type: CreateType::Foreground, // Will be updated in the handler itself. description: None, + incoming_sinks: vec![], }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index ea2144cdfebb1..68814531d9293 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -19,7 +19,7 @@ use anyhow::anyhow; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, Field}; +use risingwave_common::catalog::{ColumnCatalog, Field, TableId}; use risingwave_common::constants::log_store::{ EPOCH_COLUMN_INDEX, KV_LOG_STORE_PREDEFINED_COLUMNS, SEQ_ID_COLUMN_INDEX, }; @@ -82,6 +82,7 @@ impl StreamSink { name: String, db_name: String, sink_from_table_name: String, + target_table: Option, user_distributed_by: RequiredDist, user_order_by: Order, user_cols: FixedBitSet, @@ -97,6 +98,7 @@ impl StreamSink { name, db_name, sink_from_table_name, + target_table, user_order_by, columns, definition, @@ -125,12 +127,14 @@ impl StreamSink { Ok(Self::new(input, sink)) } + #[allow(clippy::too_many_arguments)] fn derive_sink_desc( input: PlanRef, user_distributed_by: RequiredDist, name: String, db_name: String, sink_from_name: String, + target_table: Option, user_order_by: Order, columns: Vec, definition: String, @@ -202,6 +206,7 @@ impl StreamSink { properties: properties.into_inner(), sink_type, format_desc, + target_table, }; Ok((input, sink_desc)) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index f05a8be162554..73e85c12a885d 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -181,6 +181,7 @@ impl TableCatalogBuilder { // It should be ignored for internal tables. create_type: CreateType::Foreground, description: None, + incoming_sinks: vec![], } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 19aa6482f3b88..2bfd561f14a0f 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -40,6 +40,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_set_schema_request, create_connection_request, DdlProgress, PbTableJobType, + ReplaceTablePlan, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -310,7 +311,12 @@ impl CatalogWriter for MockCatalogWriter { self.create_source_inner(source).map(|_| ()) } - async fn create_sink(&self, sink: PbSink, graph: StreamFragmentGraph) -> Result<()> { + async fn create_sink( + &self, + sink: PbSink, + graph: StreamFragmentGraph, + _affected_table_change: Option, + ) -> Result<()> { self.create_sink_inner(sink, graph) } @@ -429,7 +435,12 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } - async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> { + async fn drop_sink( + &self, + sink_id: u32, + cascade: bool, + _target_table_change: Option, + ) -> Result<()> { if cascade { return Err(ErrorCode::NotSupported( "drop cascade in MockCatalogWriter is unsupported".to_string(), diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 55313ec6f5f3d..4b8e9e983a9bc 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -44,7 +44,9 @@ use crate::manager::{ IdCategoryType, MetaSrvEnv, StreamingJob, }; use crate::rpc::cloud_provider::AwsEc2Client; -use crate::rpc::ddl_controller::{DdlCommand, DdlController, DropMode, StreamingJobId}; +use crate::rpc::ddl_controller::{ + DdlCommand, DdlController, DropMode, ReplaceTableInfo, StreamingJobId, +}; use crate::stream::{GlobalStreamManagerRef, SourceManagerRef}; use crate::{MetaError, MetaResult}; @@ -91,6 +93,36 @@ impl DdlServiceImpl { sink_manager, } } + + fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo { + let mut source = change.source; + let mut fragment_graph = change.fragment_graph.unwrap(); + let mut table = change.table.unwrap(); + if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = + table.optional_associated_source_id + { + let source = source.as_mut().unwrap(); + let table_id = table.id; + fill_table_stream_graph_info( + Some((source, source_id)), + (&mut table, table_id), + TableJobType::General, + &mut fragment_graph, + ); + } + let table_col_index_mapping = change + .table_col_index_mapping + .as_ref() + .map(ColIndexMapping::from_protobuf); + + let stream_job = StreamingJob::Table(source, table, TableJobType::General); + + ReplaceTableInfo { + streaming_job: stream_job, + fragment_graph, + col_index_mapping: table_col_index_mapping, + } + } } #[async_trait::async_trait] @@ -213,6 +245,7 @@ impl DdlService for DdlServiceImpl { stream_job, fragment_graph, CreateType::Foreground, + None, )) .await?; Ok(Response::new(CreateSourceResponse { @@ -249,26 +282,37 @@ impl DdlService for DdlServiceImpl { self.env.idle_manager().record_activity(); let req = request.into_inner(); + let sink = req.get_sink()?.clone(); let fragment_graph = req.get_fragment_graph()?.clone(); + let affected_table_change = req.get_affected_table_change().cloned().ok(); // validate connection before starting the DDL procedure if let Some(connection_id) = sink.connection_id { self.validate_connection(connection_id).await?; } - let mut stream_job = StreamingJob::Sink(sink); + let mut stream_job = match &affected_table_change { + None => StreamingJob::Sink(sink, None), + Some(change) => { + let table = change.table.clone().unwrap(); + let source = change.source.clone(); + StreamingJob::Sink(sink, Some((table, source))) + } + }; + let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + stream_job.set_id(id); - let version = self - .ddl_controller - .run_command(DdlCommand::CreateStreamingJob( - stream_job, - fragment_graph, - CreateType::Foreground, - )) - .await?; + let command = DdlCommand::CreateStreamingJob( + stream_job, + fragment_graph, + CreateType::Foreground, + affected_table_change.map(Self::extract_replace_table_info), + ); + + let version = self.ddl_controller.run_command(command).await?; Ok(Response::new(CreateSinkResponse { status: None, @@ -284,13 +328,16 @@ impl DdlService for DdlServiceImpl { let request = request.into_inner(); let sink_id = request.sink_id; let drop_mode = DropMode::from_request_setting(request.cascade); - let version = self - .ddl_controller - .run_command(DdlCommand::DropStreamingJob( - StreamingJobId::Sink(sink_id), - drop_mode, - )) - .await?; + + let command = DdlCommand::DropStreamingJob( + StreamingJobId::Sink(sink_id), + drop_mode, + request + .affected_table_change + .map(Self::extract_replace_table_info), + ); + + let version = self.ddl_controller.run_command(command).await?; self.sink_manager .stop_sink_coordinator(SinkId::from(sink_id)) @@ -323,6 +370,7 @@ impl DdlService for DdlServiceImpl { stream_job, fragment_graph, create_type, + None, )) .await?; @@ -348,6 +396,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::DropStreamingJob( StreamingJobId::MaterializedView(table_id), drop_mode, + None, )) .await?; @@ -378,6 +427,7 @@ impl DdlService for DdlServiceImpl { stream_job, fragment_graph, CreateType::Foreground, + None, )) .await?; @@ -402,6 +452,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::DropStreamingJob( StreamingJobId::Index(index_id), drop_mode, + None, )) .await?; @@ -483,6 +534,7 @@ impl DdlService for DdlServiceImpl { stream_job, fragment_graph, CreateType::Foreground, + None, )) .await?; @@ -507,6 +559,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::DropStreamingJob( StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id), table_id), drop_mode, + None, )) .await?; @@ -566,34 +619,13 @@ impl DdlService for DdlServiceImpl { &self, request: Request, ) -> Result, Status> { - let req = request.into_inner(); - - let mut source = req.source; - let mut fragment_graph = req.fragment_graph.unwrap(); - let mut table = req.table.unwrap(); - if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = - table.optional_associated_source_id - { - let source = source.as_mut().unwrap(); - let table_id = table.id; - fill_table_stream_graph_info( - Some((source, source_id)), - (&mut table, table_id), - TableJobType::General, - &mut fragment_graph, - ); - } - let table_col_index_mapping = - ColIndexMapping::from_protobuf(&req.table_col_index_mapping.unwrap()); - let stream_job = StreamingJob::Table(source, table, TableJobType::General); + let req = request.into_inner().get_plan().cloned()?; let version = self .ddl_controller - .run_command(DdlCommand::ReplaceTable( - stream_job, - fragment_graph, - table_col_index_mapping, - )) + .run_command(DdlCommand::ReplaceTable(Self::extract_replace_table_info( + req, + ))) .await?; Ok(Response::new(ReplaceTablePlanResponse { diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6e33a6a8bd7aa..4a8876c9efbef 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -23,12 +23,14 @@ use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::meta::PausedReason; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; -use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation}; +use risingwave_pb::stream_plan::barrier::BarrierKind; +use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::throttle_mutation::RateLimit; use risingwave_pb::stream_plan::update_mutation::*; use risingwave_pb::stream_plan::{ - AddMutation, Dispatcher, Dispatchers, FragmentTypeFlag, PauseMutation, ResumeMutation, - SourceChangeSplitMutation, StopMutation, ThrottleMutation, UpdateMutation, + AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, FragmentTypeFlag, + PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, ThrottleMutation, + UpdateMutation, }; use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; use risingwave_rpc_client::StreamClientPoolRef; @@ -73,6 +75,15 @@ pub struct Reschedule { pub actor_splits: HashMap>, } +#[derive(Debug, Clone)] +pub struct ReplaceTablePlan { + pub old_table_fragments: TableFragments, + pub new_table_fragments: TableFragments, + pub merge_updates: Vec, + pub dispatchers: HashMap>, + pub init_split_assignment: SplitAssignment, +} + /// [`Command`] is the action of [`crate::barrier::GlobalBarrierManager`]. For different commands, /// we'll build different barriers to send, and may do different stuffs after the barrier is /// collected. @@ -118,6 +129,7 @@ pub enum Command { init_split_assignment: SplitAssignment, definition: String, ddl_type: DdlType, + replace_table: Option, }, /// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given /// table fragment. @@ -140,13 +152,7 @@ pub enum Command { /// /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment /// of the Merge executors are changed additionally. - ReplaceTable { - old_table_fragments: TableFragments, - new_table_fragments: TableFragments, - merge_updates: Vec, - dispatchers: HashMap>, - init_split_assignment: SplitAssignment, - }, + ReplaceTable(ReplaceTablePlan), /// `SourceSplitAssignment` generates Plain(Mutation::Splits) for pushing initialized splits or /// newly added splits. @@ -176,6 +182,25 @@ impl Command { Command::Plain(_) => CommandChanges::None, Command::Pause(_) => CommandChanges::None, Command::Resume(_) => CommandChanges::None, + Command::CreateStreamingJob { + table_fragments, + replace_table: + Some(ReplaceTablePlan { + old_table_fragments, + new_table_fragments, + .. + }), + .. + } => { + let to_add = new_table_fragments.actor_ids().into_iter().collect(); + let to_remove = old_table_fragments.actor_ids().into_iter().collect(); + + CommandChanges::CreateSinkIntoTable { + sink_id: table_fragments.table_id(), + to_add, + to_remove, + } + } Command::CreateStreamingJob { table_fragments, .. } => CommandChanges::CreateTable(table_fragments.table_id()), @@ -194,11 +219,11 @@ impl Command { .collect(); CommandChanges::Actor { to_add, to_remove } } - Command::ReplaceTable { + Command::ReplaceTable(ReplaceTablePlan { old_table_fragments, new_table_fragments, .. - } => { + }) => { let to_add = new_table_fragments.actor_ids().into_iter().collect(); let to_remove = old_table_fragments.actor_ids().into_iter().collect(); CommandChanges::Actor { to_add, to_remove } @@ -353,6 +378,7 @@ impl CommandContext { table_fragments, dispatchers, init_split_assignment: split_assignment, + replace_table, .. } => { let actor_dispatchers = dispatchers @@ -371,13 +397,38 @@ impl CommandContext { .values() .flat_map(build_actor_connector_splits) .collect(); - Some(Mutation::Add(AddMutation { + let add = Some(Mutation::Add(AddMutation { actor_dispatchers, added_actors, actor_splits, // If the cluster is already paused, the new actors should be paused too. pause: self.current_paused_reason.is_some(), - })) + })); + + if let Some(ReplaceTablePlan { + old_table_fragments, + new_table_fragments: _, + merge_updates, + dispatchers, + init_split_assignment, + }) = replace_table + { + let update = Self::generate_update_mutation_for_replace_table( + old_table_fragments, + merge_updates, + dispatchers, + init_split_assignment, + ); + + Some(Mutation::Combined(CombinedMutation { + mutations: vec![ + BarrierMutation { mutation: add }, + BarrierMutation { mutation: update }, + ], + })) + } else { + add + } } Command::CancelStreamingJob(table_fragments) => { @@ -385,40 +436,18 @@ impl CommandContext { Some(Mutation::Stop(StopMutation { actors })) } - Command::ReplaceTable { + Command::ReplaceTable(ReplaceTablePlan { old_table_fragments, merge_updates, dispatchers, init_split_assignment, .. - } => { - let dropped_actors = old_table_fragments.actor_ids(); - - let actor_new_dispatchers = dispatchers - .iter() - .map(|(&actor_id, dispatchers)| { - ( - actor_id, - Dispatchers { - dispatchers: dispatchers.clone(), - }, - ) - }) - .collect(); - - let actor_splits = init_split_assignment - .values() - .flat_map(build_actor_connector_splits) - .collect(); - - Some(Mutation::Update(UpdateMutation { - actor_new_dispatchers, - merge_update: merge_updates.clone(), - dropped_actors, - actor_splits, - ..Default::default() - })) - } + }) => Self::generate_update_mutation_for_replace_table( + old_table_fragments, + merge_updates, + dispatchers, + init_split_assignment, + ), Command::RescheduleFragment { reschedules, .. } => { let mut dispatcher_update = HashMap::new(); @@ -554,6 +583,40 @@ impl CommandContext { Ok(mutation) } + fn generate_update_mutation_for_replace_table( + old_table_fragments: &TableFragments, + merge_updates: &[MergeUpdate], + dispatchers: &HashMap>, + init_split_assignment: &SplitAssignment, + ) -> Option { + let dropped_actors = old_table_fragments.actor_ids(); + + let actor_new_dispatchers = dispatchers + .iter() + .map(|(&actor_id, dispatchers)| { + ( + actor_id, + Dispatchers { + dispatchers: dispatchers.clone(), + }, + ) + }) + .collect(); + + let actor_splits = init_split_assignment + .values() + .flat_map(build_actor_connector_splits) + .collect(); + + Some(Mutation::Update(UpdateMutation { + actor_new_dispatchers, + merge_update: merge_updates.to_owned(), + dropped_actors, + actor_splits, + ..Default::default() + })) + } + /// Returns the paused reason after executing the current command. pub fn next_paused_reason(&self) -> Option { match &self.command { @@ -768,6 +831,8 @@ impl CommandContext { dispatchers, upstream_mview_actors, init_split_assignment, + definition: _, + replace_table, .. } => { let mut dependent_table_actors = Vec::with_capacity(upstream_mview_actors.len()); @@ -787,6 +852,31 @@ impl CommandContext { ) .await?; + if let Some(ReplaceTablePlan { + old_table_fragments, + new_table_fragments, + merge_updates, + dispatchers, + init_split_assignment: _, + }) = replace_table + { + let table_ids = + HashSet::from_iter(std::iter::once(old_table_fragments.table_id())); + // Tell compute nodes to drop actors. + let node_actors = self.fragment_manager.table_node_actors(&table_ids).await?; + self.clean_up(node_actors).await?; + + // Drop fragment info in meta store. + self.fragment_manager + .post_replace_table( + old_table_fragments, + new_table_fragments, + merge_updates, + dispatchers, + ) + .await?; + } + // Extract the fragments that include source operators. let source_fragments = table_fragments.stream_source_fragments(); @@ -807,13 +897,13 @@ impl CommandContext { self.clean_up(node_dropped_actors).await?; } - Command::ReplaceTable { + Command::ReplaceTable(ReplaceTablePlan { old_table_fragments, new_table_fragments, merge_updates, dispatchers, .. - } => { + }) => { let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id())); // Tell compute nodes to drop actors. diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 7b6cf91fcc72d..334a2394de4fb 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -36,7 +36,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; -use risingwave_pb::stream_plan::Barrier; +use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; use risingwave_pb::stream_service::{ BarrierCompleteRequest, BarrierCompleteResponse, InjectBarrierRequest, }; @@ -74,7 +74,7 @@ mod recovery; mod schedule; mod trace; -pub use self::command::{Command, Reschedule}; +pub use self::command::{Command, ReplaceTablePlan, Reschedule}; pub use self::schedule::BarrierScheduler; pub use self::trace::TracedEpoch; @@ -153,6 +153,12 @@ pub enum CommandChanges { to_add: HashSet, to_remove: HashSet, }, + /// This is used for sinking into the table, featuring both `CreateTable` and `Actor` changes. + CreateSinkIntoTable { + sink_id: TableId, + to_add: HashSet, + to_remove: HashSet, + }, /// No changes. None, } @@ -279,7 +285,11 @@ impl CheckpointControl { /// created table and added actors into checkpoint control, so that `can_actor_send_or_collect` /// will return `true`. fn pre_resolve(&mut self, command: &Command) { - match command.changes() { + self.pre_resolve_helper(command.changes()); + } + + fn pre_resolve_helper(&mut self, changes: CommandChanges) { + match changes { CommandChanges::CreateTable(table) => { assert!( !self.dropping_tables.contains(&table), @@ -299,6 +309,15 @@ impl CheckpointControl { self.adding_actors.extend(to_add); } + CommandChanges::CreateSinkIntoTable { + sink_id, + to_add, + to_remove, + } => { + self.pre_resolve_helper(CommandChanges::CreateTable(sink_id)); + self.pre_resolve_helper(CommandChanges::Actor { to_add, to_remove }); + } + _ => {} } } @@ -307,7 +326,11 @@ impl CheckpointControl { /// removed actors from checkpoint control, so that `can_actor_send_or_collect` will return /// `false`. fn post_resolve(&mut self, command: &Command) { - match command.changes() { + self.post_resolve_helper(command.changes()); + } + + fn post_resolve_helper(&mut self, change: CommandChanges) { + match change { CommandChanges::DropTables(tables) => { assert!( self.dropping_tables.is_disjoint(&tables), @@ -316,14 +339,14 @@ impl CheckpointControl { self.dropping_tables.extend(tables); } - CommandChanges::Actor { to_remove, .. } => { + CommandChanges::Actor { to_remove, .. } + | CommandChanges::CreateSinkIntoTable { to_remove, .. } => { assert!( self.removing_actors.is_disjoint(&to_remove), "duplicated actor in concurrent checkpoint" ); self.removing_actors.extend(to_remove); } - _ => {} } } @@ -468,6 +491,14 @@ impl CheckpointControl { self.removing_actors.retain(|a| !to_remove.contains(a)); } CommandChanges::None => {} + CommandChanges::CreateSinkIntoTable { + sink_id, + to_add, + to_remove, + } => { + self.remove_changes(CommandChanges::CreateTable(sink_id)); + self.remove_changes(CommandChanges::Actor { to_add, to_remove }); + } } } @@ -845,7 +876,7 @@ impl GlobalBarrierManager { curr: command_context.curr_epoch.value().0, prev: command_context.prev_epoch.value().0, }), - mutation, + mutation: mutation.clone().map(|_| BarrierMutation { mutation }), tracing_context: TracingContext::from_span(command_context.curr_epoch.span()) .to_protobuf(), kind: command_context.kind as i32, diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index d9e3feea70639..ca85b935f07a8 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -23,7 +23,8 @@ use risingwave_common::catalog::TableId; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::get_reschedule_plan_request::{PbWorkerChanges, StableResizePolicy}; use risingwave_pb::meta::PausedReason; -use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation}; +use risingwave_pb::stream_plan::barrier::BarrierKind; +use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest, diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 80778465f4a86..46e391ddea106 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -135,6 +135,8 @@ impl From> for PbTable { .optional_associated_source_id .map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)), description: None, + // TODO: fix it for model v2. + incoming_sinks: vec![], } } } @@ -198,6 +200,8 @@ impl From> for PbSink { sink_from_name: value.0.sink_from_name, stream_job_status: PbStreamJobStatus::from(value.0.job_status) as _, format_desc: value.0.sink_format_desc.map(|desc| desc.0), + // todo: fix this for model v2 + target_table: None, } } } diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 56e14486adedc..64487bba0a9a1 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -294,6 +294,10 @@ impl DatabaseManager { self.tables.get(&table_id) } + pub fn get_sink(&self, sink_id: SinkId) -> Option<&Sink> { + self.sinks.get(&sink_id) + } + pub fn get_all_table_options(&self) -> HashMap { self.tables .iter() diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 75a4b9869aecd..498325660cb04 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -34,8 +34,8 @@ use risingwave_common::catalog::{ use risingwave_common::{bail, ensure}; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, TableType}; use risingwave_pb::catalog::{ - Comment, Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, - Source, StreamJobStatus, Table, View, + Comment, Connection, CreateType, Database, Function, Index, PbSource, PbStreamJobStatus, + Schema, Sink, Source, StreamJobStatus, Table, View, }; use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -648,7 +648,7 @@ impl CatalogManager { self.start_create_table_procedure(table, internal_tables) .await } - StreamingJob::Sink(sink) => self.start_create_sink_procedure(sink).await, + StreamingJob::Sink(sink, _) => self.start_create_sink_procedure(sink).await, StreamingJob::Index(index, index_table) => { self.start_create_index_procedure(index, index_table).await } @@ -1104,6 +1104,13 @@ impl CatalogManager { RelationIdEnum::Table(table_id) => { let table = tables.get(&table_id).cloned(); if let Some(table) = table { + for incoming_sink in &table.incoming_sinks { + let sink = sinks.get(incoming_sink).cloned(); + if let Some(sink) = sink { + deque.push_back(RelationInfo::Sink(sink)); + } + } + deque.push_back(RelationInfo::Table(table)); } else { bail!("table doesn't exist"); @@ -1151,6 +1158,7 @@ impl CatalogManager { if !all_table_ids.insert(table_id) { continue; } + let table_fragments = fragment_manager .select_table_fragments_by_table_id(&table_id.into()) .await?; @@ -1345,6 +1353,7 @@ impl CatalogManager { if !all_view_ids.insert(view.id) { continue; } + if let Some(ref_count) = database_core.relation_ref_count.get(&view.id).cloned() { if ref_count > 0 { @@ -1425,6 +1434,34 @@ impl CatalogManager { .map(|sink_id| sinks.remove(*sink_id).unwrap()) .collect_vec(); + if !matches!(relation, RelationIdEnum::Sink(_)) { + let table_sinks = sinks_removed + .iter() + .filter(|sink| { + if let Some(target_table) = sink.target_table { + // Table sink but associated with the table + if matches!(relation, RelationIdEnum::Table(table_id) if table_id == target_table) { + false + } else { + // Table sink + true + } + } else { + // Normal sink + false + } + }) + .collect_vec(); + + // Since dropping the sink into the table requires the frontend to handle some of the logic (regenerating the plan), it’s not compatible with the current cascade dropping. + if !table_sinks.is_empty() { + bail!( + "Found {} sink(s) into table in dependency, please drop them manually", + table_sinks.len() + ); + } + } + let internal_tables = all_internal_table_ids .iter() .map(|internal_table_id| { @@ -2777,7 +2814,11 @@ impl CatalogManager { Ok(version) } - pub async fn cancel_create_sink_procedure(&self, sink: &Sink) { + pub async fn cancel_create_sink_procedure( + &self, + sink: &Sink, + target_table: &Option<(Table, Option)>, + ) { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; @@ -2794,6 +2835,10 @@ impl CatalogManager { } user_core.decrease_ref(sink.owner); refcnt_dec_connection(database_core, sink.connection_id); + + if let Some((table, source)) = target_table { + Self::cancel_replace_table_procedure_inner(source, table, core); + } } /// This is used for `ALTER TABLE ADD/DROP COLUMN`. @@ -2840,7 +2885,8 @@ impl CatalogManager { &self, source: &Option, table: &Table, - table_col_index_mapping: ColIndexMapping, + table_col_index_mapping: Option, + incoming_sink_id: Option, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; @@ -2879,17 +2925,19 @@ impl CatalogManager { let mut updated_indexes = vec![]; - let expr_rewriter = ReplaceTableExprRewriter { - table_col_index_mapping: table_col_index_mapping.clone(), - }; + if let Some(table_col_index_mapping) = table_col_index_mapping.clone() { + let expr_rewriter = ReplaceTableExprRewriter { + table_col_index_mapping, + }; - for index_id in &index_ids { - let mut index = indexes.get_mut(*index_id).unwrap(); - index - .index_item - .iter_mut() - .for_each(|x| expr_rewriter.rewrite_expr(x)); - updated_indexes.push(indexes.get(index_id).cloned().unwrap()); + for index_id in &index_ids { + let mut index = indexes.get_mut(*index_id).unwrap(); + index + .index_item + .iter_mut() + .for_each(|x| expr_rewriter.rewrite_expr(x)); + updated_indexes.push(indexes.get(index_id).cloned().unwrap()); + } } // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must @@ -2897,7 +2945,12 @@ impl CatalogManager { let mut table = table.clone(); table.stream_job_status = PbStreamJobStatus::Created.into(); + if let Some(incoming_sink_id) = incoming_sink_id { + table.incoming_sinks.push(incoming_sink_id); + } + tables.insert(table.id, table.clone()); + commit_meta!(self, tables, indexes, sources)?; // Group notification @@ -2932,6 +2985,16 @@ impl CatalogManager { unreachable!("unexpected job: {stream_job:?}") }; let core = &mut *self.core.lock().await; + + Self::cancel_replace_table_procedure_inner(source, table, core); + Ok(()) + } + + fn cancel_replace_table_procedure_inner( + source: &Option, + table: &Table, + core: &mut CatalogManagerCore, + ) { let database_core = &mut core.database; let key = (table.database_id, table.schema_id, table.name.clone()); @@ -2957,7 +3020,6 @@ impl CatalogManager { // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must // occur after it's created. We may need to add a new tracker for `alter` procedure.s database_core.unmark_creating(&key); - Ok(()) } pub async fn comment_on(&self, comment: Comment) -> MetaResult { diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index 4c2b2a37295b1..f63675ae3a141 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -407,6 +407,7 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), + target_table: None, }; let epoch1 = 233; @@ -577,6 +578,7 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), + target_table: None, }; let epoch1 = 233; @@ -696,6 +698,7 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), + target_table: None, }; let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker(); @@ -735,6 +738,7 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), + target_table: None, }; let epoch = 233; @@ -814,6 +818,7 @@ mod tests { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), + target_table: None, }; let epoch = 233; diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index da70542421016..cc7545c29f340 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -26,7 +26,7 @@ use crate::model::FragmentId; #[derive(Debug, Clone)] pub enum StreamingJob { MaterializedView(Table), - Sink(Sink), + Sink(Sink, Option<(Table, Option)>), Table(Option, Table, TableJobType), Index(Index, Table), Source(PbSource), @@ -55,7 +55,7 @@ impl From<&StreamingJob> for DdlType { fn from(job: &StreamingJob) -> Self { match job { StreamingJob::MaterializedView(_) => DdlType::MaterializedView, - StreamingJob::Sink(_) => DdlType::Sink, + StreamingJob::Sink(_, _) => DdlType::Sink, StreamingJob::Table(_, _, _) => DdlType::Table, StreamingJob::Index(_, _) => DdlType::Index, StreamingJob::Source(_) => DdlType::Source, @@ -68,7 +68,7 @@ impl StreamingJob { let created_at_epoch = Some(Epoch::now().0); match self { StreamingJob::MaterializedView(table) => table.created_at_epoch = created_at_epoch, - StreamingJob::Sink(table) => table.created_at_epoch = created_at_epoch, + StreamingJob::Sink(table, _) => table.created_at_epoch = created_at_epoch, StreamingJob::Table(source, table, ..) => { table.created_at_epoch = created_at_epoch; if let Some(source) = source { @@ -90,7 +90,7 @@ impl StreamingJob { StreamingJob::MaterializedView(table) => { table.initialized_at_epoch = initialized_at_epoch } - StreamingJob::Sink(table) => table.initialized_at_epoch = initialized_at_epoch, + StreamingJob::Sink(table, _) => table.initialized_at_epoch = initialized_at_epoch, StreamingJob::Table(source, table, ..) => { table.initialized_at_epoch = initialized_at_epoch; if let Some(source) = source { @@ -111,7 +111,7 @@ impl StreamingJob { pub fn set_id(&mut self, id: u32) { match self { Self::MaterializedView(table) => table.id = id, - Self::Sink(sink) => sink.id = id, + Self::Sink(sink, _) => sink.id = id, Self::Table(_, table, ..) => table.id = id, Self::Index(index, index_table) => { index.id = id; @@ -132,7 +132,7 @@ impl StreamingJob { Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { table.fragment_id = id; } - Self::Sink(_) | Self::Source(_) => {} + Self::Sink(_, _) | Self::Source(_) => {} } } @@ -142,7 +142,7 @@ impl StreamingJob { Self::Table(_, table, ..) => { table.dml_fragment_id = id; } - Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_) => {} + Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_, _) => {} Self::Source(_) => {} } } @@ -150,7 +150,7 @@ impl StreamingJob { pub fn id(&self) -> u32 { match self { Self::MaterializedView(table) => table.id, - Self::Sink(sink) => sink.id, + Self::Sink(sink, _) => sink.id, Self::Table(_, table, ..) => table.id, Self::Index(index, _) => index.id, Self::Source(source) => source.id, @@ -160,7 +160,7 @@ impl StreamingJob { pub fn mv_table(&self) -> Option { match self { Self::MaterializedView(table) => Some(table.id), - Self::Sink(_sink) => None, + Self::Sink(_, _) => None, Self::Table(_, table, ..) => Some(table.id), Self::Index(_, table) => Some(table.id), Self::Source(_) => None, @@ -173,14 +173,14 @@ impl StreamingJob { Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { Some(table) } - Self::Sink(_) | Self::Source(_) => None, + Self::Sink(_, _) | Self::Source(_) => None, } } pub fn schema_id(&self) -> u32 { match self { Self::MaterializedView(table) => table.schema_id, - Self::Sink(sink) => sink.schema_id, + Self::Sink(sink, _) => sink.schema_id, Self::Table(_, table, ..) => table.schema_id, Self::Index(index, _) => index.schema_id, Self::Source(source) => source.schema_id, @@ -190,7 +190,7 @@ impl StreamingJob { pub fn database_id(&self) -> u32 { match self { Self::MaterializedView(table) => table.database_id, - Self::Sink(sink) => sink.database_id, + Self::Sink(sink, _) => sink.database_id, Self::Table(_, table, ..) => table.database_id, Self::Index(index, _) => index.database_id, Self::Source(source) => source.database_id, @@ -200,7 +200,7 @@ impl StreamingJob { pub fn name(&self) -> String { match self { Self::MaterializedView(table) => table.name.clone(), - Self::Sink(sink) => sink.name.clone(), + Self::Sink(sink, _) => sink.name.clone(), Self::Table(_, table, ..) => table.name.clone(), Self::Index(index, _) => index.name.clone(), Self::Source(source) => source.name.clone(), @@ -210,7 +210,7 @@ impl StreamingJob { pub fn owner(&self) -> u32 { match self { StreamingJob::MaterializedView(mv) => mv.owner, - StreamingJob::Sink(sink) => sink.owner, + StreamingJob::Sink(sink, _) => sink.owner, StreamingJob::Table(_, table, ..) => table.owner, StreamingJob::Index(index, _) => index.owner, StreamingJob::Source(source) => source.owner, @@ -222,7 +222,7 @@ impl StreamingJob { Self::MaterializedView(table) => table.definition.clone(), Self::Table(_, table, ..) => table.definition.clone(), Self::Index(_, table) => table.definition.clone(), - Self::Sink(sink) => sink.definition.clone(), + Self::Sink(sink, _) => sink.definition.clone(), Self::Source(source) => source.definition.clone(), } } @@ -230,7 +230,7 @@ impl StreamingJob { pub fn properties(&self) -> HashMap { match self { Self::MaterializedView(table) => table.properties.clone(), - Self::Sink(sink) => sink.properties.clone(), + Self::Sink(sink, _) => sink.properties.clone(), Self::Table(_, table, ..) => table.properties.clone(), Self::Index(_, index_table) => index_table.properties.clone(), Self::Source(source) => source.properties.clone(), diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index aac59d823f7d2..3bee4bd6a0529 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -321,6 +321,15 @@ impl TableFragments { .cloned() } + pub fn sink_fragment(&self) -> Option { + self.fragments + .values() + .find(|fragment| { + (fragment.get_fragment_type_mask() & FragmentTypeFlag::Sink as u32) != 0 + }) + .cloned() + } + /// Returns actors that contains backfill executors. pub fn backfill_actor_ids(&self) -> HashSet { Self::filter_actor_ids(self, |fragment_type_mask| { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 359978813c222..ac70b9139041b 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -21,9 +21,10 @@ use std::time::Duration; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::config::DefaultParallelism; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::{ParallelUnitMapping, VirtualNode}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::epoch::Epoch; +use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_connector::dispatch_source_prop; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, @@ -36,9 +37,11 @@ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, }; +use risingwave_pb::meta::table_fragments::PbFragment; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - FragmentTypeFlag, StreamFragmentGraph as StreamFragmentGraphProto, + Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, + StreamFragmentGraph as StreamFragmentGraphProto, }; use tokio::sync::Semaphore; use tokio::time::sleep; @@ -52,7 +55,7 @@ use crate::manager::{ SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, UserId, ViewId, IGNORED_NOTIFICATION_VERSION, }; -use crate::model::{StreamEnvironment, TableFragments}; +use crate::model::{FragmentId, StreamEnvironment, TableFragments}; use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, @@ -96,6 +99,13 @@ impl StreamingJobId { } } +// It’s used to describe the information of the table that needs to be replaced and it will be used during replacing table and creating sink into table operations. +pub struct ReplaceTableInfo { + pub streaming_job: StreamingJob, + pub fragment_graph: StreamFragmentGraphProto, + pub col_index_mapping: Option, +} + pub enum DdlCommand { CreateDatabase(Database), DropDatabase(DatabaseId), @@ -107,10 +117,15 @@ pub enum DdlCommand { DropFunction(FunctionId), CreateView(View), DropView(ViewId, DropMode), - CreateStreamingJob(StreamingJob, StreamFragmentGraphProto, CreateType), - DropStreamingJob(StreamingJobId, DropMode), - ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping), + CreateStreamingJob( + StreamingJob, + StreamFragmentGraphProto, + CreateType, + Option, + ), + DropStreamingJob(StreamingJobId, DropMode, Option), AlterName(alter_name_request::Object, String), + ReplaceTable(ReplaceTableInfo), AlterSourceColumn(Source), AlterTableOwner(Object, UserId), AlterSetSchema(alter_set_schema_request::Object, SchemaId), @@ -127,7 +142,7 @@ impl DdlCommand { | DdlCommand::DropSource(_, _) | DdlCommand::DropFunction(_) | DdlCommand::DropView(_, _) - | DdlCommand::DropStreamingJob(_, _) + | DdlCommand::DropStreamingJob(_, _, _) | DdlCommand::DropConnection(_) => true, // Simply ban all other commands in recovery. _ => false, @@ -260,15 +275,30 @@ impl DdlController { DdlCommand::DropView(view_id, drop_mode) => { ctrl.drop_view(view_id, drop_mode).await } - DdlCommand::CreateStreamingJob(stream_job, fragment_graph, create_type) => { - ctrl.create_streaming_job(stream_job, fragment_graph, create_type) - .await + DdlCommand::CreateStreamingJob( + stream_job, + fragment_graph, + create_type, + affected_table_replace_info, + ) => { + ctrl.create_streaming_job( + stream_job, + fragment_graph, + create_type, + affected_table_replace_info, + ) + .await } - DdlCommand::DropStreamingJob(job_id, drop_mode) => { - ctrl.drop_streaming_job(job_id, drop_mode).await + DdlCommand::DropStreamingJob(job_id, drop_mode, target_replace_info) => { + ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info) + .await } - DdlCommand::ReplaceTable(stream_job, fragment_graph, table_col_index_mapping) => { - ctrl.replace_table(stream_job, fragment_graph, table_col_index_mapping) + DdlCommand::ReplaceTable(ReplaceTableInfo { + streaming_job, + fragment_graph, + col_index_mapping, + }) => { + ctrl.replace_table(streaming_job, fragment_graph, col_index_mapping) .await } DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await, @@ -445,6 +475,7 @@ impl DdlController { mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, create_type: CreateType, + affected_table_replace_info: Option, ) -> MetaResult { tracing::debug!( id = stream_job.id(), @@ -473,7 +504,12 @@ impl DdlController { let result = try { tracing::debug!(id = stream_job.id(), "building stream job"); let (ctx, table_fragments) = self - .build_stream_job(env, &stream_job, fragment_graph) + .build_stream_job( + env.clone(), + &stream_job, + fragment_graph, + affected_table_replace_info, + ) .await?; internal_tables = ctx.internal_tables(); @@ -487,7 +523,16 @@ impl DdlController { // Register the source on the connector node. self.source_manager.register_source(source).await?; } - StreamingJob::Sink(ref sink) => { + StreamingJob::Sink(ref sink, ref mut target_table) => { + // When sinking into table occurs, some variables of the target table may be modified, + // such as `fragment_id` being altered by `prepare_replace_table`. + // At this point, it’s necessary to update the table info carried with the sink. + if let Some((StreamingJob::Table(source, table, _), ..)) = + &ctx.replace_table_job_info + { + *target_table = Some((table.clone(), source.clone())); + } + // Validate the sink on the connector node. validate_sink(sink).await?; } @@ -576,6 +621,147 @@ impl DdlController { Ok(()) } + // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream. + // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function. + // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here. + async fn inject_replace_table_job( + &self, + env: StreamEnvironment, + sink_table_fragments: &TableFragments, + ReplaceTableInfo { + mut streaming_job, + fragment_graph, + .. + }: ReplaceTableInfo, + ) -> MetaResult<(StreamingJob, ReplaceTableContext, TableFragments)> { + let fragment_graph = self + .prepare_replace_table(&mut streaming_job, fragment_graph) + .await?; + + let (mut replace_table_ctx, mut table_fragments) = self + .build_replace_table(env.clone(), &streaming_job, fragment_graph, None) + .await?; + + let mut union_fragment_id = None; + + for (fragment_id, fragment) in &mut table_fragments.fragments { + for actor in &mut fragment.actors { + if let Some(node) = &mut actor.nodes { + visit_stream_node(node, |body| { + if let NodeBody::Merge(m) = body && m.upstream_actor_id.is_empty() { + if let Some(union_fragment_id) = union_fragment_id.as_mut() { + // The union fragment should be unique. + assert_eq!(*union_fragment_id, *fragment_id); + } else { + union_fragment_id = Some(*fragment_id); + } + }; + }) + }; + } + } + + let table = streaming_job.table().unwrap(); + + let target_fragment_id = + union_fragment_id.expect("fragment of placeholder merger not found"); + + let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); + + Self::inject_replace_table_plan_for_sink( + &sink_fragment, + table, + &mut replace_table_ctx, + &mut table_fragments, + target_fragment_id, + ); + + Ok((streaming_job, replace_table_ctx, table_fragments)) + } + + fn inject_replace_table_plan_for_sink( + sink_fragment: &PbFragment, + table: &Table, + replace_table_ctx: &mut ReplaceTableContext, + table_fragments: &mut TableFragments, + target_fragment_id: FragmentId, + ) { + let sink_actor_ids = sink_fragment + .actors + .iter() + .map(|a| a.actor_id) + .collect_vec(); + + let union_fragment = table_fragments + .fragments + .get_mut(&target_fragment_id) + .unwrap(); + + let downstream_actor_ids = union_fragment + .actors + .iter() + .map(|actor| actor.actor_id) + .collect_vec(); + + let output_indices = table + .columns + .iter() + .enumerate() + .map(|(idx, _)| idx as _) + .collect_vec(); + + let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec(); + + let mapping = downstream_actor_ids + .iter() + .map(|id| { + let actor_status = table_fragments.actor_status.get(id).unwrap(); + let parallel_unit_id = actor_status.parallel_unit.as_ref().unwrap().id; + + (parallel_unit_id, *id) + }) + .collect(); + + let actor_mapping = + ParallelUnitMapping::from_protobuf(union_fragment.vnode_mapping.as_ref().unwrap()) + .to_actor(&mapping); + + let upstream_actors = sink_fragment.get_actors(); + + for actor in upstream_actors { + replace_table_ctx.dispatchers.insert( + actor.actor_id, + vec![Dispatcher { + r#type: DispatcherType::Hash as _, + dist_key_indices: dist_key_indices.clone(), + output_indices: output_indices.clone(), + hash_mapping: Some(actor_mapping.to_protobuf()), + dispatcher_id: union_fragment.fragment_id as _, + downstream_actor_id: downstream_actor_ids.clone(), + }], + ); + } + + let upstream_fragment_id = sink_fragment.fragment_id; + + for actor in &mut union_fragment.actors { + if let Some(node) = &mut actor.nodes { + let fields = node.fields.clone(); + + visit_stream_node(node, |node| { + if let NodeBody::Merge(merge_node) = node && merge_node.upstream_actor_id.is_empty() { + *merge_node = MergeNode { + upstream_actor_id: sink_actor_ids.clone(), + upstream_fragment_id, + upstream_dispatcher_type: DispatcherType::Hash as _, + fields: fields.clone(), + }; + } + }); + } + } + } + /// Let the stream manager to create the actors, and do some cleanup work after it fails or finishes. async fn create_streaming_job_inner( &self, @@ -586,10 +772,12 @@ impl DdlController { ) -> MetaResult { let job_id = stream_job.id(); tracing::debug!(id = job_id, "creating stream job"); + let result = self .stream_manager .create_streaming_job(table_fragments, ctx) .await; + if let Err(e) = result { match stream_job.create_type() { // NOTE: This assumes that we will trigger recovery, @@ -604,9 +792,11 @@ impl DdlController { } return Err(e); }; + tracing::debug!(id = job_id, "finishing stream job"); let version = self.finish_stream_job(stream_job, internal_tables).await?; tracing::debug!(id = job_id, "finished stream job"); + Ok(version) } @@ -614,9 +804,10 @@ impl DdlController { &self, job_id: StreamingJobId, drop_mode: DropMode, + target_replace_info: Option, ) -> MetaResult { let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; - let (version, streaming_job_ids) = match job_id { + let (mut version, streaming_job_ids) = match job_id { StreamingJobId::MaterializedView(table_id) => { self.catalog_manager .drop_relation( @@ -655,9 +846,21 @@ impl DdlController { } }; + if let Some(ReplaceTableInfo { + streaming_job, + fragment_graph, + col_index_mapping, + }) = target_replace_info + { + version = self + .replace_table(streaming_job, fragment_graph, col_index_mapping) + .await?; + } + self.stream_manager .drop_streaming_jobs(streaming_job_ids) .await; + Ok(version) } @@ -736,6 +939,7 @@ impl DdlController { env: StreamEnvironment, stream_job: &StreamingJob, fragment_graph: StreamFragmentGraph, + affected_table_replace_info: Option, ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { let id = stream_job.id(); let default_parallelism = fragment_graph.default_parallelism(); @@ -791,8 +995,20 @@ impl DdlController { // 3. Build the table fragments structure that will be persisted in the stream manager, // and the context that contains all information needed for building the // actors on the compute nodes. - let table_fragments = - TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + let table_fragments = TableFragments::new( + id.into(), + graph, + &building_locations.actor_locations, + env.clone(), + ); + + let replace_table_job_info = match affected_table_replace_info { + Some(replace_table_info) => Some( + self.inject_replace_table_job(env, &table_fragments, replace_table_info) + .await?, + ), + None => None, + }; let ctx = CreateStreamingJobContext { dispatchers, @@ -805,6 +1021,7 @@ impl DdlController { mv_table_id: stream_job.mv_table(), create_type: stream_job.create_type(), ddl_type: stream_job.into(), + replace_table_job_info, }; // 4. Mark tables as creating, including internal tables and the table of the stream job. @@ -855,9 +1072,9 @@ impl DdlController { tracing::warn!("Failed to cancel create table procedure, perhaps barrier manager has already cleaned it. Reason: {e:#?}"); } } - StreamingJob::Sink(sink) => { + StreamingJob::Sink(sink, target_table) => { self.catalog_manager - .cancel_create_sink_procedure(sink) + .cancel_create_sink_procedure(sink, target_table) .await; } StreamingJob::Table(source, table, ..) => { @@ -917,10 +1134,24 @@ impl DdlController { .finish_create_table_procedure(internal_tables, table) .await? } - StreamingJob::Sink(sink) => { - self.catalog_manager + StreamingJob::Sink(sink, target_table) => { + let sink_id = sink.id; + + let mut version = self + .catalog_manager .finish_create_sink_procedure(internal_tables, sink) - .await? + .await?; + + if let Some((table, source)) = target_table { + let streaming_job = + StreamingJob::Table(source, table, TableJobType::Unspecified); + + version = self + .finish_replace_table(&streaming_job, None, Some(sink_id)) + .await?; + } + + version } StreamingJob::Table(source, table, ..) => { creating_internal_table_ids.push(table.id); @@ -992,7 +1223,7 @@ impl DdlController { &self, mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, - table_col_index_mapping: ColIndexMapping, + table_col_index_mapping: Option, ) -> MetaResult { let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); @@ -1018,7 +1249,7 @@ impl DdlController { match result { Ok(_) => { - self.finish_replace_table(&stream_job, table_col_index_mapping) + self.finish_replace_table(&stream_job, table_col_index_mapping, None) .await } Err(err) => { @@ -1061,7 +1292,7 @@ impl DdlController { env: StreamEnvironment, stream_job: &StreamingJob, mut fragment_graph: StreamFragmentGraph, - table_col_index_mapping: ColIndexMapping, + table_col_index_mapping: Option, ) -> MetaResult<(ReplaceTableContext, TableFragments)> { let id = stream_job.id(); let default_parallelism = fragment_graph.default_parallelism(); @@ -1085,18 +1316,23 @@ impl DdlController { // Map the column indices in the dispatchers with the given mapping. let downstream_fragments = self - .fragment_manager - .get_downstream_fragments(id.into()) - .await? - .into_iter() - .map(|(d, f)| Some((table_col_index_mapping.rewrite_dispatch_strategy(&d)?, f))) - .collect::>() - .ok_or_else(|| { - // The `rewrite` only fails if some column is dropped. - MetaError::invalid_parameter( - "unable to drop the column due to being referenced by downstream materialized views or sinks", - ) - })?; + .fragment_manager + .get_downstream_fragments(id.into()) + .await? + .into_iter() + .map(|(d, f)| + if let Some(mapping) = &table_col_index_mapping { + Some((mapping.rewrite_dispatch_strategy(&d)?, f)) + } else { + Some((d, f)) + }) + .collect::>() + .ok_or_else(|| { + // The `rewrite` only fails if some column is dropped. + MetaError::invalid_parameter( + "unable to drop the column due to being referenced by downstream materialized views or sinks", + ) + })?; let complete_graph = CompleteStreamFragmentGraph::with_downstreams( fragment_graph, @@ -1158,14 +1394,20 @@ impl DdlController { async fn finish_replace_table( &self, stream_job: &StreamingJob, - table_col_index_mapping: ColIndexMapping, + table_col_index_mapping: Option, + incoming_sink_id: Option, ) -> MetaResult { let StreamingJob::Table(source, table, ..) = stream_job else { unreachable!("unexpected job: {stream_job:?}") }; self.catalog_manager - .finish_replace_table_procedure(source, table, table_col_index_mapping) + .finish_replace_table_procedure( + source, + table, + table_col_index_mapping, + incoming_sink_id, + ) .await } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 81212add27f4c..d2d7e2f041545 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -30,9 +30,9 @@ use tracing::Instrument; use uuid::Uuid; use super::{Locations, ScaleController, ScaleControllerRef}; -use crate::barrier::{BarrierScheduler, Command}; +use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan}; use crate::hummock::HummockManagerRef; -use crate::manager::{ClusterManagerRef, DdlType, FragmentManagerRef, MetaSrvEnv}; +use crate::manager::{ClusterManagerRef, DdlType, FragmentManagerRef, MetaSrvEnv, StreamingJob}; use crate::model::{ActorId, TableFragments}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -71,6 +71,9 @@ pub struct CreateStreamingJobContext { pub create_type: CreateType, pub ddl_type: DdlType, + + /// Context provided for potential replace table, typically used when sinking into a table. + pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>, } impl CreateStreamingJobContext { @@ -444,8 +447,12 @@ impl GlobalStreamManager { internal_tables, create_type, ddl_type, + replace_table_job_info, }: CreateStreamingJobContext, ) -> MetaResult<()> { + let mut replace_table_command = None; + let mut replace_table_id = None; + // Register to compaction group beforehand. let hummock_manager_ref = self.hummock_manager.clone(); let registered_table_ids = hummock_manager_ref @@ -470,6 +477,37 @@ impl GlobalStreamManager { self.build_actors(&table_fragments, &building_locations, &existing_locations) .await?; + if let Some((_, context, table_fragments)) = replace_table_job_info { + self.build_actors( + &table_fragments, + &context.building_locations, + &context.existing_locations, + ) + .await?; + + // Add table fragments to meta store with state: `State::Initial`. + self.fragment_manager + .start_create_table_fragments(table_fragments.clone()) + .await?; + + let dummy_table_id = table_fragments.table_id(); + + let init_split_assignment = self + .source_manager + .pre_allocate_splits(&dummy_table_id) + .await?; + + replace_table_command = Some(ReplaceTablePlan { + old_table_fragments: context.old_table_fragments, + new_table_fragments: table_fragments, + merge_updates: context.merge_updates, + dispatchers: context.dispatchers, + init_split_assignment, + }); + + replace_table_id = Some(dummy_table_id); + } + // Add table fragments to meta store with state: `State::Initial`. self.fragment_manager .start_create_table_fragments(table_fragments.clone()) @@ -479,23 +517,27 @@ impl GlobalStreamManager { let init_split_assignment = self.source_manager.pre_allocate_splits(&table_id).await?; - if let Err(err) = self - .barrier_scheduler - .run_command(Command::CreateStreamingJob { - table_fragments, - upstream_mview_actors, - dispatchers, - init_split_assignment, - definition: definition.to_string(), - ddl_type, - }) - .await - { + let command = Command::CreateStreamingJob { + table_fragments, + upstream_mview_actors, + dispatchers, + init_split_assignment, + definition: definition.to_string(), + ddl_type, + replace_table: replace_table_command, + }; + + if let Err(err) = self.barrier_scheduler.run_command(command).await { if create_type == CreateType::Foreground { + let mut table_ids = HashSet::from_iter(std::iter::once(table_id)); + if let Some(dummy_table_id) = replace_table_id { + table_ids.insert(dummy_table_id); + } self.fragment_manager - .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(table_id))) + .drop_table_fragments_vec(&table_ids) .await?; } + return Err(err); } @@ -531,13 +573,13 @@ impl GlobalStreamManager { if let Err(err) = self .barrier_scheduler - .run_config_change_command_with_pause(Command::ReplaceTable { + .run_config_change_command_with_pause(Command::ReplaceTable(ReplaceTablePlan { old_table_fragments, new_table_fragments: table_fragments, merge_updates, dispatchers, init_split_assignment, - }) + })) .await { self.fragment_manager diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 8163b19848eff..0351cd8d482fe 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -405,10 +405,12 @@ impl MetaClient { &self, sink: PbSink, graph: StreamFragmentGraph, + affected_table_change: Option, ) -> Result<(u32, CatalogVersion)> { let request = CreateSinkRequest { sink: Some(sink), fragment_graph: Some(graph), + affected_table_change, }; let resp = self.inner.create_sink(request).await?; @@ -504,10 +506,12 @@ impl MetaClient { table_col_index_mapping: ColIndexMapping, ) -> Result { let request = ReplaceTablePlanRequest { - source, - table: Some(table), - fragment_graph: Some(graph), - table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), + plan: Some(ReplaceTablePlan { + source, + table: Some(table), + fragment_graph: Some(graph), + table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()), + }), }; let resp = self.inner.replace_table_plan(request).await?; // TODO: handle error in `resp.status` here @@ -565,8 +569,17 @@ impl MetaClient { Ok(resp.version) } - pub async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result { - let request = DropSinkRequest { sink_id, cascade }; + pub async fn drop_sink( + &self, + sink_id: u32, + cascade: bool, + affected_table_change: Option, + ) -> Result { + let request = DropSinkRequest { + sink_id, + cascade, + affected_table_change, + }; let resp = self.inner.drop_sink(request).await?; Ok(resp.version) } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 41c651a3556b4..133688875fd6e 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -820,6 +820,7 @@ pub struct CreateSinkStatement { pub columns: Vec, pub emit_mode: Option, pub sink_schema: Option, + pub into_table_name: Option, } impl ParseTo for CreateSinkStatement { @@ -827,6 +828,13 @@ impl ParseTo for CreateSinkStatement { impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p); impl_parse_to!(sink_name: ObjectName, p); + let into_table_name = if p.parse_keyword(Keyword::INTO) { + impl_parse_to!(into_table_name: ObjectName, p); + Some(into_table_name) + } else { + None + }; + let columns = p.parse_parenthesized_column_list(IsOptional::Optional)?; let sink_from = if p.parse_keyword(Keyword::FROM) { @@ -843,11 +851,12 @@ impl ParseTo for CreateSinkStatement { // This check cannot be put into the `WithProperties::parse_to`, since other // statements may not need the with properties. - if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) { + if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() { p.expected("WITH", p.peek_token())? } impl_parse_to!(with_properties: WithProperties, p); - if with_properties.0.is_empty() { + + if with_properties.0.is_empty() && into_table_name.is_none() { return Err(ParserError::ParserError( "sink properties not provided".to_string(), )); @@ -863,6 +872,7 @@ impl ParseTo for CreateSinkStatement { columns, emit_mode, sink_schema, + into_table_name, }) } } diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index c98c06c287ce2..528fa7326b54a 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -552,6 +552,7 @@ mod tests { stream_job_status: PbStreamJobStatus::Created.into(), create_type: PbCreateType::Foreground.into(), description: None, + incoming_sinks: vec![], } } diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index 46218012d44df..8ea96e280d072 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -136,7 +136,9 @@ mod test { use super::ChainExecutor; use crate::executor::test_utils::MockSource; - use crate::executor::{Barrier, Executor, ExecutorInfo, Message, Mutation, PkIndices}; + use crate::executor::{ + AddMutation, Barrier, Executor, ExecutorInfo, Message, Mutation, PkIndices, + }; use crate::task::{CreateMviewProgress, LocalBarrierManager}; #[tokio::test] @@ -163,17 +165,19 @@ mod test { schema.clone(), PkIndices::new(), vec![ - Message::Barrier(Barrier::new_test_barrier(1).with_mutation(Mutation::Add { - adds: maplit::hashmap! { - 0 => vec![Dispatcher { - downstream_actor_id: vec![actor_id], - ..Default::default() - }], + Message::Barrier(Barrier::new_test_barrier(1).with_mutation(Mutation::Add( + AddMutation { + adds: maplit::hashmap! { + 0 => vec![Dispatcher { + downstream_actor_id: vec![actor_id], + ..Default::default() + }], + }, + added_actors: maplit::hashset! { actor_id }, + splits: Default::default(), + pause: false, }, - added_actors: maplit::hashset! { actor_id }, - splits: Default::default(), - pause: false, - })), + ))), Message::Chunk(StreamChunk::from_pretty("I\n + 3")), Message::Chunk(StreamChunk::from_pretty("I\n + 4")), ], diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index bb3d14285fd38..321c177e153b1 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -33,7 +33,7 @@ use tokio::time::Instant; use tracing::{event, Instrument}; use super::exchange::output::{new_output, BoxedOutput}; -use super::Watermark; +use super::{AddMutation, UpdateMutation, Watermark}; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, BoxedExecutor, Message, Mutation, StreamConsumer}; @@ -205,16 +205,38 @@ impl DispatchExecutorInner { }; match mutation { - Mutation::Add { adds, .. } => { + Mutation::Add(AddMutation { adds, .. }) => { if let Some(new_dispatchers) = adds.get(&self.actor_id) { self.add_dispatchers(new_dispatchers)?; } } - Mutation::Update { + Mutation::Update(UpdateMutation { dispatchers, actor_new_dispatchers: actor_dispatchers, .. - } => { + }) => { + if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) { + self.add_dispatchers(new_dispatchers)?; + } + + if let Some(updates) = dispatchers.get(&self.actor_id) { + for update in updates { + self.pre_update_dispatcher(update)?; + } + } + } + Mutation::AddAndUpdate( + AddMutation { adds, .. }, + UpdateMutation { + dispatchers, + actor_new_dispatchers: actor_dispatchers, + .. + }, + ) => { + if let Some(new_dispatchers) = adds.get(&self.actor_id) { + self.add_dispatchers(new_dispatchers)?; + } + if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) { self.add_dispatchers(new_dispatchers)?; } @@ -226,7 +248,7 @@ impl DispatchExecutorInner { } } _ => {} - }; + } Ok(()) } @@ -246,11 +268,19 @@ impl DispatchExecutorInner { } } } - Mutation::Update { + Mutation::Update(UpdateMutation { dispatchers, dropped_actors, .. - } => { + }) + | Mutation::AddAndUpdate( + _, + UpdateMutation { + dispatchers, + dropped_actors, + .. + }, + ) => { if let Some(updates) = dispatchers.get(&self.actor_id) { for update in updates { self.post_update_dispatcher(update)?; @@ -263,7 +293,6 @@ impl DispatchExecutorInner { } } } - _ => {} }; @@ -1124,14 +1153,14 @@ mod tests { hash_mapping: Default::default(), }] }; - let b1 = Barrier::new_test_barrier(1).with_mutation(Mutation::Update { + let b1 = Barrier::new_test_barrier(1).with_mutation(Mutation::Update(UpdateMutation { dispatchers: dispatcher_updates, merges: Default::default(), vnode_bitmaps: Default::default(), dropped_actors: Default::default(), actor_splits: Default::default(), actor_new_dispatchers: Default::default(), - }); + })); tx.send(Message::Barrier(b1)).await.unwrap(); executor.next().await.unwrap().unwrap(); @@ -1176,14 +1205,14 @@ mod tests { hash_mapping: Default::default(), }] }; - let b3 = Barrier::new_test_barrier(3).with_mutation(Mutation::Update { + let b3 = Barrier::new_test_barrier(3).with_mutation(Mutation::Update(UpdateMutation { dispatchers: dispatcher_updates, merges: Default::default(), vnode_bitmaps: Default::default(), dropped_actors: Default::default(), actor_splits: Default::default(), actor_new_dispatchers: Default::default(), - }); + })); tx.send(Message::Barrier(b3)).await.unwrap(); executor.next().await.unwrap().unwrap(); diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index ca67fd02da558..38a8958bd9d03 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -142,7 +142,8 @@ impl MergeExecutor { ); barrier.passed_actors.push(actor_id); - if let Some(Mutation::Update { dispatchers, .. }) = barrier.mutation.as_deref() + if let Some(Mutation::Update(UpdateMutation { dispatchers, .. })) = + barrier.mutation.as_deref() { if select_all .upstream_actor_ids() @@ -639,14 +640,14 @@ mod tests { } }; - let b1 = Barrier::new_test_barrier(1).with_mutation(Mutation::Update { + let b1 = Barrier::new_test_barrier(1).with_mutation(Mutation::Update(UpdateMutation { dispatchers: Default::default(), merges: merge_updates, vnode_bitmaps: Default::default(), dropped_actors: Default::default(), actor_splits: Default::default(), actor_new_dispatchers: Default::default(), - }); + })); send!([untouched, old], Message::Barrier(b1.clone())); assert!(recv!().is_none()); // We should not receive the barrier, since merger is waiting for the new upstream new. diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index b02b47fa3aae6..6fa4618e57a20 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -34,12 +34,14 @@ use risingwave_connector::source::SplitImpl; use risingwave_expr::expr::{Expression, NonStrictExpression}; use risingwave_pb::data::PbEpoch; use risingwave_pb::expr::PbInputRef; -use risingwave_pb::stream_plan::barrier::{BarrierKind, PbMutation}; +use risingwave_pb::stream_plan::barrier::BarrierKind; +use risingwave_pb::stream_plan::barrier_mutation::PbMutation; use risingwave_pb::stream_plan::stream_message::StreamMessage; use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate}; use risingwave_pb::stream_plan::{ - AddMutation, Dispatchers, PauseMutation, PbBarrier, PbDispatcher, PbStreamMessage, PbWatermark, - ResumeMutation, SourceChangeSplitMutation, StopMutation, ThrottleMutation, UpdateMutation, + BarrierMutation, CombinedMutation, Dispatchers, PauseMutation, PbAddMutation, PbBarrier, + PbDispatcher, PbStreamMessage, PbUpdateMutation, PbWatermark, ResumeMutation, + SourceChangeSplitMutation, StopMutation, ThrottleMutation, }; use smallvec::SmallVec; @@ -220,29 +222,36 @@ pub const INVALID_EPOCH: u64 = 0; type UpstreamFragmentId = FragmentId; +#[derive(Debug, Clone, PartialEq)] +pub struct UpdateMutation { + pub dispatchers: HashMap>, + pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>, + pub vnode_bitmaps: HashMap>, + pub dropped_actors: HashSet, + pub actor_splits: HashMap>, + pub actor_new_dispatchers: HashMap>, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct AddMutation { + pub adds: HashMap>, + pub added_actors: HashSet, + // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations. + pub splits: HashMap>, + pub pause: bool, +} + /// See [`PbMutation`] for the semantics of each mutation. #[derive(Debug, Clone, PartialEq)] pub enum Mutation { Stop(HashSet), - Update { - dispatchers: HashMap>, - merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>, - vnode_bitmaps: HashMap>, - dropped_actors: HashSet, - actor_splits: HashMap>, - actor_new_dispatchers: HashMap>, - }, - Add { - adds: HashMap>, - added_actors: HashSet, - // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations. - splits: HashMap>, - pause: bool, - }, + Update(UpdateMutation), + Add(AddMutation), SourceChangeSplit(HashMap>), Pause, Resume, Throttle(HashMap>), + AddAndUpdate(AddMutation, UpdateMutation), } #[derive(Debug, Clone)] @@ -308,7 +317,10 @@ impl Barrier { pub fn all_stop_actors(&self) -> Option<&HashSet> { match self.mutation.as_deref() { Some(Mutation::Stop(actors)) => Some(actors), - Some(Mutation::Update { dropped_actors, .. }) => Some(dropped_actors), + Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) + | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => { + Some(dropped_actors) + } _ => None, } } @@ -320,7 +332,10 @@ impl Barrier { /// added for scaling are not included. pub fn is_newly_added(&self, actor_id: ActorId) -> bool { match self.mutation.as_deref() { - Some(Mutation::Add { added_actors, .. }) => added_actors.contains(&actor_id), + Some(Mutation::Add(AddMutation { added_actors, .. })) + | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => { + added_actors.contains(&actor_id) + } _ => false, } } @@ -330,7 +345,7 @@ impl Barrier { match self.mutation.as_deref() { Some( Mutation::Update { .. } // new actors for scaling - | Mutation::Add { pause: true, .. } // new streaming job, or recovery + | Mutation::Add(AddMutation { pause: true, .. }) // new streaming job, or recovery ) => true, _ => false, } @@ -351,7 +366,11 @@ impl Barrier { self.mutation .as_deref() .and_then(|mutation| match mutation { - Mutation::Update { merges, .. } => merges.get(&(actor_id, upstream_fragment_id)), + Mutation::Update(UpdateMutation { merges, .. }) + | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => { + merges.get(&(actor_id, upstream_fragment_id)) + } + _ => None, }) } @@ -365,7 +384,10 @@ impl Barrier { self.mutation .as_deref() .and_then(|mutation| match mutation { - Mutation::Update { vnode_bitmaps, .. } => vnode_bitmaps.get(&actor_id).cloned(), + Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) + | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => { + vnode_bitmaps.get(&actor_id).cloned() + } _ => None, }) } @@ -414,14 +436,14 @@ impl Mutation { Mutation::Stop(actors) => PbMutation::Stop(StopMutation { actors: actors.iter().copied().collect::>(), }), - Mutation::Update { + Mutation::Update(UpdateMutation { dispatchers, merges, vnode_bitmaps, dropped_actors, actor_splits, actor_new_dispatchers, - } => PbMutation::Update(UpdateMutation { + }) => PbMutation::Update(PbUpdateMutation { dispatcher_update: dispatchers.values().flatten().cloned().collect(), merge_update: merges.values().cloned().collect(), actor_vnode_bitmap_update: vnode_bitmaps @@ -442,12 +464,12 @@ impl Mutation { }) .collect(), }), - Mutation::Add { + Mutation::Add(AddMutation { adds, added_actors, splits, pause, - } => PbMutation::Add(AddMutation { + }) => PbMutation::Add(PbAddMutation { actor_dispatchers: adds .iter() .map(|(&actor_id, dispatchers)| { @@ -484,6 +506,17 @@ impl Mutation { .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })) .collect(), }), + + Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation { + mutations: vec![ + BarrierMutation { + mutation: Some(Mutation::Add(add.clone()).to_protobuf()), + }, + BarrierMutation { + mutation: Some(Mutation::Update(update.clone()).to_protobuf()), + }, + ], + }), } } @@ -491,7 +524,7 @@ impl Mutation { let mutation = match prost { PbMutation::Stop(stop) => Mutation::Stop(HashSet::from_iter(stop.get_actors().clone())), - PbMutation::Update(update) => Mutation::Update { + PbMutation::Update(update) => Mutation::Update(UpdateMutation { dispatchers: update .dispatcher_update .iter() @@ -527,9 +560,9 @@ impl Mutation { .iter() .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone())) .collect(), - }, + }), - PbMutation::Add(add) => Mutation::Add { + PbMutation::Add(add) => Mutation::Add(AddMutation { adds: add .actor_dispatchers .iter() @@ -553,7 +586,7 @@ impl Mutation { }) .collect(), pause: add.pause, - }, + }), PbMutation::Splits(s) => { let mut change_splits: Vec<(ActorId, Vec)> = @@ -581,6 +614,26 @@ impl Mutation { .map(|(actor_id, limit)| (*actor_id, limit.rate_limit)) .collect(), ), + + PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] { + [BarrierMutation { + mutation: Some(add), + }, BarrierMutation { + mutation: Some(update), + }] => { + let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else { + unreachable!(); + }; + + let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else { + unreachable!(); + }; + + Mutation::AddAndUpdate(add_mutation, update_mutation) + } + + _ => unreachable!(), + }, }; Ok(mutation) } @@ -602,7 +655,9 @@ impl Barrier { curr: epoch.curr, prev: epoch.prev, }), - mutation: mutation.map(|mutation| mutation.to_protobuf()), + mutation: mutation.map(|m| BarrierMutation { + mutation: Some(m.to_protobuf()), + }), tracing_context: tracing_context.to_protobuf(), kind: kind as _, passed_actors, @@ -613,7 +668,7 @@ impl Barrier { let mutation = prost .mutation .as_ref() - .map(Mutation::from_protobuf) + .map(|m| Mutation::from_protobuf(m.mutation.as_ref().unwrap())) .transpose()? .map(Arc::new); let epoch = prost.get_epoch()?; diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 5920cc57d2ae3..c2d837194ab68 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -233,7 +233,7 @@ mod tests { use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use super::*; - use crate::executor::{ActorContext, Barrier, Executor, Mutation}; + use crate::executor::{ActorContext, Barrier, Executor, Mutation, UpdateMutation}; use crate::task::test_utils::helper_make_local_actor; #[tokio::test] @@ -336,14 +336,14 @@ mod tests { } }; - let b1 = Barrier::new_test_barrier(1).with_mutation(Mutation::Update { + let b1 = Barrier::new_test_barrier(1).with_mutation(Mutation::Update(UpdateMutation { dispatchers: Default::default(), merges: merge_updates, vnode_bitmaps: Default::default(), dropped_actors: Default::default(), actor_splits: Default::default(), actor_new_dispatchers: Default::default(), - }); + })); send!([new], Message::Barrier(b1.clone())); assert!(recv!().is_none()); // We should not receive the barrier, as new is not the upstream. diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 56bccd0dba72d..86e8dd5ec47c3 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -463,6 +463,7 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), + target_table: None, }; let info = ExecutorInfo { @@ -590,6 +591,7 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), + target_table: None, }; let info = ExecutorInfo { @@ -714,6 +716,7 @@ mod test { format_desc: None, db_name: "test".into(), sink_from_name: "test".into(), + target_table: None, }; let info = ExecutorInfo { diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index e69390ad37d49..cee773d5b3c9a 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -285,11 +285,11 @@ impl FsSourceExecutor { let mut boot_state = Vec::default(); if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - Mutation::Add { splits, .. } - | Mutation::Update { + Mutation::Add(AddMutation { splits, .. }) + | Mutation::Update(UpdateMutation { actor_splits: splits, .. - } => { + }) => { if let Some(splits) = splits.get(&self.actor_ctx.id) { boot_state = splits.clone(); } @@ -376,7 +376,7 @@ impl FsSourceExecutor { } Mutation::Pause => stream.pause_stream(), Mutation::Resume => stream.resume_stream(), - Mutation::Update { actor_splits, .. } => { + Mutation::Update(UpdateMutation { actor_splits, .. }) => { self.apply_split_change( &source_desc, &mut stream, diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 86fa6e811a287..548d98284c262 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -376,11 +376,11 @@ impl SourceExecutor { let mut boot_state = Vec::default(); if let Some(mutation) = barrier.mutation.as_ref() { match mutation.as_ref() { - Mutation::Add { splits, .. } - | Mutation::Update { + Mutation::Add(AddMutation { splits, .. }) + | Mutation::Update(UpdateMutation { actor_splits: splits, .. - } => { + }) => { if let Some(splits) = splits.get(&self.actor_ctx.id) { tracing::info!( "source exector: actor {:?} boot with splits: {:?}", @@ -493,7 +493,9 @@ impl SourceExecutor { should_trim_state = true; } - Mutation::Update { actor_splits, .. } => { + Mutation::Update(UpdateMutation { + actor_splits, .. + }) => { target_state = self .apply_split_change( &source_desc, @@ -760,7 +762,7 @@ mod tests { ); let mut executor = Box::new(executor).execute(); - let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add { + let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add(AddMutation { adds: HashMap::new(), added_actors: HashSet::new(), splits: hashmap! { @@ -773,7 +775,7 @@ mod tests { ], }, pause: false, - }); + })); barrier_tx.send(init_barrier).unwrap(); // Consume barrier. @@ -854,7 +856,7 @@ mod tests { ); let mut handler = Box::new(executor).execute(); - let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add { + let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add(AddMutation { adds: HashMap::new(), added_actors: HashSet::new(), splits: hashmap! { @@ -867,7 +869,7 @@ mod tests { ], }, pause: false, - }); + })); barrier_tx.send(init_barrier).unwrap(); // Consume barrier. diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index 868e5ea2bb7c7..ff398ac7f27ec 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -168,7 +168,7 @@ mod tests { use super::ValuesExecutor; use crate::executor::test_utils::StreamExecutorTestExt; - use crate::executor::{ActorContext, Barrier, Executor, ExecutorInfo, Mutation}; + use crate::executor::{ActorContext, AddMutation, Barrier, Executor, ExecutorInfo, Mutation}; use crate::task::{CreateMviewProgress, LocalBarrierManager}; #[tokio::test] @@ -227,12 +227,13 @@ mod tests { let mut values_executor = Box::new(values_executor_struct).execute(); // Init barrier - let first_message = Barrier::new_test_barrier(1).with_mutation(Mutation::Add { - adds: Default::default(), - added_actors: maplit::hashset! {actor_id}, - splits: Default::default(), - pause: false, - }); + let first_message = + Barrier::new_test_barrier(1).with_mutation(Mutation::Add(AddMutation { + adds: Default::default(), + added_actors: maplit::hashset! {actor_id}, + splits: Default::default(), + pause: false, + })); tx.send(first_message).unwrap(); assert!(matches!( diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index c60467fa4ad17..615da9d81219f 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use anyhow::anyhow; -use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::catalog::{ColumnCatalog, TableId}; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ @@ -47,6 +47,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { let sink_id = sink_desc.get_id().into(); let db_name = sink_desc.get_db_name().into(); let sink_from_name = sink_desc.get_sink_from_name().into(); + let target_table = sink_desc.get_target_table().cloned().ok().map(TableId::new); let properties = sink_desc.get_properties().clone(); let downstream_pk = sink_desc .downstream_pk @@ -101,6 +102,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { format_desc, db_name, sink_from_name, + target_table, }; let sink_id_str = format!("{}", sink_id.sink_id); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 0adc359a42335..f7b422026db2d 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -159,6 +159,7 @@ async fn compaction_test( stream_job_status: PbStreamJobStatus::Created.into(), create_type: PbCreateType::Foreground.into(), description: None, + incoming_sinks: vec![], }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2;