diff --git a/e2e_test/sink/sink_into_table.slt b/e2e_test/sink/sink_into_table.slt index 9c789a9b35d29..a3df061fc8db4 100644 --- a/e2e_test/sink/sink_into_table.slt +++ b/e2e_test/sink/sink_into_table.slt @@ -393,6 +393,9 @@ create sink s_b into t_m as select v from t_b; statement ok create sink s_c into t_m as select v from t_c; +statement ok +flush; + query I rowsort select * from t_m order by v; ---- diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 2ea441b2649d9..eb383cc06d727 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -21,11 +21,11 @@ use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_common::bail; use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::Datum; use risingwave_common::util::value_encoding::DatumFromProtoExt; -use risingwave_common::{bail, bail_not_implemented}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, @@ -301,20 +301,11 @@ pub async fn handle_create_sink( let mut target_table_replace_plan = None; if let Some(table_catalog) = target_table_catalog { - if !table_catalog.incoming_sinks.is_empty() { - bail_not_implemented!(issue = 13818, "create sink into table with incoming sinks"); - } - let (mut graph, mut table, source) = reparse_table_for_sink(&session, &table_catalog).await?; table.incoming_sinks = table_catalog.incoming_sinks.clone(); - // for now we only support one incoming sink - assert!(table.incoming_sinks.is_empty()); - - table.incoming_sinks = table_catalog.incoming_sinks.clone(); - for _ in 0..(table_catalog.incoming_sinks.len() + 1) { for fragment in graph.fragments.values_mut() { if let Some(node) = &mut fragment.node { diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 5e3bd0e92b76a..a382f65b45e93 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -21,8 +21,6 @@ use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::root_catalog::SchemaPath; use crate::handler::create_sink::{insert_merger_to_union, reparse_table_for_sink}; -use crate::handler::create_table::generate_stream_graph_for_table; -use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; pub async fn handle_drop_sink( diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index d49b3e76caff3..eb042710e86cb 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -30,7 +30,6 @@ use risingwave_connector::dispatch_source_prop; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, }; -use risingwave_pb::catalog; use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; use risingwave_pb::catalog::{ connection, Comment, Connection, CreateType, Database, Function, Schema, Sink, Source, Table, @@ -765,7 +764,7 @@ impl DdlController { let sink_table_fragments = guard .table_fragments() - .get(&catalog::TableId::new(*sink_id)) + .get(&risingwave_common::catalog::TableId::new(*sink_id)) .unwrap(); let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); @@ -1018,11 +1017,10 @@ impl DdlController { panic!("additional replace table event only occurs when dropping sink into table") }; - let ReplaceTableJob { + let ReplaceTableJobForSink { streaming_job, context, table_fragments, - col_index_mapping, } = self .inject_replace_table_job_for_table_sink( env, @@ -1039,7 +1037,7 @@ impl DdlController { // ignore error let _version = self - .finish_replace_table(&streaming_job, col_index_mapping, None, Some(sink_id)) + .finish_replace_table(&streaming_job, None, None, Some(sink_id)) .await; }