Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Dec 5, 2023
1 parent 186193d commit 27f3117
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 17 deletions.
3 changes: 3 additions & 0 deletions e2e_test/sink/sink_into_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ create sink s_b into t_m as select v from t_b;
statement ok
create sink s_c into t_m as select v from t_c;

statement ok
flush;

query I rowsort
select * from t_m order by v;
----
Expand Down
11 changes: 1 addition & 10 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::Datum;
use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_common::{bail, bail_not_implemented};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
Expand Down Expand Up @@ -301,20 +301,11 @@ pub async fn handle_create_sink(

let mut target_table_replace_plan = None;
if let Some(table_catalog) = target_table_catalog {
if !table_catalog.incoming_sinks.is_empty() {
bail_not_implemented!(issue = 13818, "create sink into table with incoming sinks");
}

let (mut graph, mut table, source) =
reparse_table_for_sink(&session, &table_catalog).await?;

table.incoming_sinks = table_catalog.incoming_sinks.clone();

// for now we only support one incoming sink
assert!(table.incoming_sinks.is_empty());

table.incoming_sinks = table_catalog.incoming_sinks.clone();

for _ in 0..(table_catalog.incoming_sinks.len() + 1) {
for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::root_catalog::SchemaPath;
use crate::handler::create_sink::{insert_merger_to_union, reparse_table_for_sink};
use crate::handler::create_table::generate_stream_graph_for_table;
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;

pub async fn handle_drop_sink(
Expand Down
8 changes: 3 additions & 5 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use risingwave_connector::dispatch_source_prop;
use risingwave_connector::source::{
ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator,
};
use risingwave_pb::catalog;
use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider;
use risingwave_pb::catalog::{
connection, Comment, Connection, CreateType, Database, Function, Schema, Sink, Source, Table,
Expand Down Expand Up @@ -765,7 +764,7 @@ impl DdlController {

let sink_table_fragments = guard
.table_fragments()
.get(&catalog::TableId::new(*sink_id))
.get(&risingwave_common::catalog::TableId::new(*sink_id))
.unwrap();

let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
Expand Down Expand Up @@ -1018,11 +1017,10 @@ impl DdlController {
panic!("additional replace table event only occurs when dropping sink into table")
};

let ReplaceTableJob {
let ReplaceTableJobForSink {
streaming_job,
context,
table_fragments,
col_index_mapping,
} = self
.inject_replace_table_job_for_table_sink(
env,
Expand All @@ -1039,7 +1037,7 @@ impl DdlController {

// ignore error
let _version = self
.finish_replace_table(&streaming_job, col_index_mapping, None, Some(sink_id))
.finish_replace_table(&streaming_job, None, None, Some(sink_id))
.await;
}

Expand Down

0 comments on commit 27f3117

Please sign in to comment.