Skip to content

Commit

Permalink
Add visit_stream_node_cont, modify visit_inner & remove check for inc…
Browse files Browse the repository at this point in the history
…oming sinks in handle_create_sink

Implement e2e test for error messages, sink creation, table alteration, query sorting, sink and table dropping.

Code changes: Imports, function rename, argument modifications, variable rename, new arguments.

fix conflict

tmp

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Dec 9, 2023
1 parent 19f4254 commit 821e49a
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 65 deletions.
98 changes: 94 additions & 4 deletions e2e_test/sink/sink_into_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ create table m_simple (v1 int primary key, v2 int);
statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;

statement error Feature is not yet implemented: create sink into table with incoming sinks
create sink s_simple_2 into m_simple as select v1, v2 from t_simple;

# and we can't alter a table with incoming sinks
# we can't alter a table with incoming sinks
statement error Feature is not yet implemented: alter table with incoming sinks
alter table m_simple add column v3 int;

Expand Down Expand Up @@ -363,3 +360,96 @@ drop table t_b;

statement ok
drop table t_c;

# multi sinks

statement ok
create table t_a(v int primary key);

statement ok
insert into t_a values (1), (2), (3);

statement ok
create table t_b(v int primary key);

statement ok
insert into t_b values (3), (4), (5);

statement ok
create table t_c(v int primary key);

statement ok
insert into t_c values (5), (6), (7);

statement ok
create table t_m(v int primary key);

statement ok
create sink s_a into t_m as select v from t_a;

statement ok
create sink s_b into t_m as select v from t_b;

statement ok
create sink s_c into t_m as select v from t_c;

statement ok
flush;

query I rowsort
select * from t_m order by v;
----
1
2
3
4
5
6
7

statement ok
drop sink s_a;

statement ok
insert into t_b values (11), (12), (13);

query I
select * from t_m order by v;
----
1
2
3
4
5
6
7
11
12
13

statement ok
drop sink s_b;

statement ok
drop sink s_c;

statement ok
delete from t_m where v > 5;

query I
select * from t_m order by v;
----
1
2
3
4
5

statement ok
drop table t_a;

statement ok
drop table t_b;

statement ok
drop table t_c;
20 changes: 20 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ where
visit_inner(stream_node, &mut f)
}

/// A utility for to accessing the [`StreamNode`]. The returned bool is used to determine whether the access needs to continue.
pub fn visit_stream_node_cont<F>(stream_node: &mut StreamNode, mut f: F)
where
F: FnMut(&mut StreamNode) -> bool,
{
fn visit_inner<F>(stream_node: &mut StreamNode, f: &mut F)
where
F: FnMut(&mut StreamNode) -> bool,
{
if !f(stream_node) {
return;
}
for input in &mut stream_node.input {
visit_inner(input, f);
}
}

visit_inner(stream_node, &mut f)
}

/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s in a
/// [`StreamFragment`] recursively.
pub fn visit_fragment<F>(fragment: &mut StreamFragment, f: F)
Expand Down
17 changes: 6 additions & 11 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::Datum;
use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_common::{bail, bail_not_implemented, catalog};
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
Expand Down Expand Up @@ -318,23 +318,18 @@ pub async fn handle_create_sink(

let mut target_table_replace_plan = None;
if let Some(table_catalog) = target_table_catalog {
if !table_catalog.incoming_sinks.is_empty() {
bail_not_implemented!(issue = 13818, "create sink into table with incoming sinks");
}

check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())?;

let (mut graph, mut table, source) =
reparse_table_for_sink(&session, &table_catalog).await?;

table.incoming_sinks = table_catalog.incoming_sinks.clone();

// for now we only support one incoming sink
assert!(table.incoming_sinks.is_empty());

for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
insert_merger_to_union(node);
for _ in 0..(table_catalog.incoming_sinks.len() + 1) {
for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
insert_merger_to_union(node);
}
}
}

Expand Down
18 changes: 13 additions & 5 deletions src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_sqlparser::ast::ObjectName;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::root_catalog::SchemaPath;
use crate::handler::create_sink::reparse_table_for_sink;
use crate::handler::create_sink::{insert_merger_to_union, reparse_table_for_sink};
use crate::handler::HandlerArgs;

pub async fn handle_drop_sink(
Expand Down Expand Up @@ -67,12 +67,20 @@ pub async fn handle_drop_sink(
table.clone()
};

let (graph, mut table, source) = reparse_table_for_sink(&session, &table_catalog).await?;
let (mut graph, mut table, source) =
reparse_table_for_sink(&session, &table_catalog).await?;

// for now we only support one incoming sink
assert_eq!(table_catalog.incoming_sinks.len(), 1);
assert!(!table_catalog.incoming_sinks.is_empty());

table.incoming_sinks = vec![];
table.incoming_sinks = table_catalog.incoming_sinks.clone();

for _ in 0..(table_catalog.incoming_sinks.len() - 1) {
for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
insert_merger_to_union(node);
}
}
}

affected_table_change = Some(ReplaceTablePlan {
source,
Expand Down
12 changes: 10 additions & 2 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2886,7 +2886,8 @@ impl CatalogManager {
source: &Option<Source>,
table: &Table,
table_col_index_mapping: Option<ColIndexMapping>,
incoming_sink_id: Option<SinkId>,
creating_sink_id: Option<SinkId>,
dropping_sink_id: Option<SinkId>,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
Expand Down Expand Up @@ -2945,10 +2946,17 @@ impl CatalogManager {

let mut table = table.clone();
table.stream_job_status = PbStreamJobStatus::Created.into();
if let Some(incoming_sink_id) = incoming_sink_id {

if let Some(incoming_sink_id) = creating_sink_id {
table.incoming_sinks.push(incoming_sink_id);
}

if let Some(dropping_sink_id) = dropping_sink_id {
table
.incoming_sinks
.retain(|sink_id| *sink_id != dropping_sink_id);
}

tables.insert(table.id, table.clone());

commit_meta!(self, tables, indexes, sources)?;
Expand Down
Loading

0 comments on commit 821e49a

Please sign in to comment.