Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiple sinks into table #13659

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 94 additions & 4 deletions e2e_test/sink/sink_into_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ create table m_simple (v1 int primary key, v2 int);
statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;

statement error Feature is not yet implemented: create sink into table with incoming sinks
create sink s_simple_2 into m_simple as select v1, v2 from t_simple;

# and we can't alter a table with incoming sinks
# we can't alter a table with incoming sinks
statement error Feature is not yet implemented: alter table with incoming sinks
alter table m_simple add column v3 int;

Expand Down Expand Up @@ -363,3 +360,96 @@ drop table t_b;

statement ok
drop table t_c;

# multi sinks

statement ok
create table t_a(v int primary key);

statement ok
insert into t_a values (1), (2), (3);

statement ok
create table t_b(v int primary key);

statement ok
insert into t_b values (3), (4), (5);

statement ok
create table t_c(v int primary key);

statement ok
insert into t_c values (5), (6), (7);

statement ok
create table t_m(v int primary key);

statement ok
create sink s_a into t_m as select v from t_a;

statement ok
create sink s_b into t_m as select v from t_b;

statement ok
create sink s_c into t_m as select v from t_c;

statement ok
flush;

query I rowsort
select * from t_m order by v;
----
1
2
3
4
5
6
7

statement ok
drop sink s_a;

statement ok
insert into t_b values (11), (12), (13);

query I
select * from t_m order by v;
----
1
2
3
4
5
6
7
11
12
13

statement ok
drop sink s_b;

statement ok
drop sink s_c;

statement ok
delete from t_m where v > 5;

query I
select * from t_m order by v;
----
1
2
3
4
5

statement ok
drop table t_a;

statement ok
drop table t_b;

statement ok
drop table t_c;
20 changes: 20 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ where
visit_inner(stream_node, &mut f)
}

/// A utility for to accessing the [`StreamNode`]. The returned bool is used to determine whether the access needs to continue.
pub fn visit_stream_node_cont<F>(stream_node: &mut StreamNode, mut f: F)
where
F: FnMut(&mut StreamNode) -> bool,
{
fn visit_inner<F>(stream_node: &mut StreamNode, f: &mut F)
where
F: FnMut(&mut StreamNode) -> bool,
{
if !f(stream_node) {
return;
}
for input in &mut stream_node.input {
visit_inner(input, f);
}
}

visit_inner(stream_node, &mut f)
}

/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s in a
/// [`StreamFragment`] recursively.
pub fn visit_fragment<F>(fragment: &mut StreamFragment, f: F)
Expand Down
17 changes: 6 additions & 11 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::Datum;
use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_common::{bail, bail_not_implemented, catalog};
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
Expand Down Expand Up @@ -318,23 +318,18 @@ pub async fn handle_create_sink(

let mut target_table_replace_plan = None;
if let Some(table_catalog) = target_table_catalog {
if !table_catalog.incoming_sinks.is_empty() {
bail_not_implemented!(issue = 13818, "create sink into table with incoming sinks");
}

check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())?;

let (mut graph, mut table, source) =
reparse_table_for_sink(&session, &table_catalog).await?;

table.incoming_sinks = table_catalog.incoming_sinks.clone();

// for now we only support one incoming sink
assert!(table.incoming_sinks.is_empty());

for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
insert_merger_to_union(node);
for _ in 0..(table_catalog.incoming_sinks.len() + 1) {
for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
insert_merger_to_union(node);
}
}
}

Expand Down
18 changes: 13 additions & 5 deletions src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_sqlparser::ast::ObjectName;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::root_catalog::SchemaPath;
use crate::handler::create_sink::reparse_table_for_sink;
use crate::handler::create_sink::{insert_merger_to_union, reparse_table_for_sink};
use crate::handler::HandlerArgs;

pub async fn handle_drop_sink(
Expand Down Expand Up @@ -67,12 +67,20 @@ pub async fn handle_drop_sink(
table.clone()
};

let (graph, mut table, source) = reparse_table_for_sink(&session, &table_catalog).await?;
let (mut graph, mut table, source) =
reparse_table_for_sink(&session, &table_catalog).await?;

// for now we only support one incoming sink
assert_eq!(table_catalog.incoming_sinks.len(), 1);
assert!(!table_catalog.incoming_sinks.is_empty());

table.incoming_sinks = vec![];
table.incoming_sinks = table_catalog.incoming_sinks.clone();

for _ in 0..(table_catalog.incoming_sinks.len() - 1) {
for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
insert_merger_to_union(node);
}
}
}

affected_table_change = Some(ReplaceTablePlan {
source,
Expand Down
12 changes: 10 additions & 2 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2886,7 +2886,8 @@ impl CatalogManager {
source: &Option<Source>,
table: &Table,
table_col_index_mapping: Option<ColIndexMapping>,
incoming_sink_id: Option<SinkId>,
creating_sink_id: Option<SinkId>,
dropping_sink_id: Option<SinkId>,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
Expand Down Expand Up @@ -2945,10 +2946,17 @@ impl CatalogManager {

let mut table = table.clone();
table.stream_job_status = PbStreamJobStatus::Created.into();
if let Some(incoming_sink_id) = incoming_sink_id {

if let Some(incoming_sink_id) = creating_sink_id {
table.incoming_sinks.push(incoming_sink_id);
}

if let Some(dropping_sink_id) = dropping_sink_id {
table
.incoming_sinks
.retain(|sink_id| *sink_id != dropping_sink_id);
}

tables.insert(table.id, table.clone());

commit_meta!(self, tables, indexes, sources)?;
Expand Down
Loading
Loading