Skip to content

Commit

Permalink
feat(frontend): support alter shared source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 10, 2024
1 parent d2bb1db commit ca9b4c1
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 31 deletions.
199 changes: 199 additions & 0 deletions e2e_test/source_inline/kafka/alter/add_column_shared.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
control substitution on

system ok
rpk topic create shared_source_alter -p 4

system ok
cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a", "v3": "a1"}
1 {"v1": 2, "v2": "b", "v3": "b1"}
2 {"v1": 3, "v2": "c", "v3": "c1"}
3 {"v1": 4, "v2": "d", "v3": "d1"}
EOF

statement ok
create source s (v1 int, v2 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source_alter',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;


statement ok
create materialized view mv_before_alter as select * from s;

statement ok
create materialized view mv_before_alter_2 as select * from s;


sleep 2s

query ?? rowsort
select * from s;
----
1 a
2 b
3 c
4 d

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d


statement ok
alter source s add column v3 varchar;

# New MV will have v3.

statement ok
create materialized view mv_after_alter as select * from s;

query ??? rowsort
select * from mv_after_alter;
----
1 a a1
2 b b1
3 c c1
4 d d1

# Batch select from source will have v3.

query ??? rowsort
select * from s;
----
1 a a1
2 b b1
3 c c1
4 d d1

# Old MV is not affected.

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d

# Produce new data.

system ok
cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0
0 {"v1": 5, "v2": "e", "v3": "e1"}
1 {"v1": 6, "v2": "f", "v3": "f1"}
2 {"v1": 7, "v2": "g", "v3": "g1"}
3 {"v1": 8, "v2": "h", "v3": "h1"}
EOF

sleep 2s


query ??? rowsort
select * from mv_after_alter;
----
1 a a1
2 b b1
3 c c1
4 d d1
5 e e1
6 f f1
7 g g1
8 h h1

query error
select * from mv_after_alter_2;
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Catalog error
2: table or source not found: mv_after_alter_2



# Batch select from source will have v3.

query ??? rowsort
select * from s;
----
1 a a1
2 b b1
3 c c1
4 d d1
5 e e1
6 f f1
7 g g1
8 h h1

# Old MV is not affected.

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d
5 e
6 f
7 g
8 h


statement ok
drop source s cascade;

# Test alter source without downstream

statement ok
create source s (v1 int, v2 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source_alter',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
alter source s add column v3 varchar;


statement ok
create materialized view mv_after_alter as select * from s;


query ??? rowsort
select * from mv_after_alter;
----
1 a a1
2 b b1
3 c c1
4 d d1
5 e e1
6 f f1
7 g g1
8 h h1

query ??? rowsort
select * from s;
----
1 a a1
2 b b1
3 c c1
4 d d1
5 e e1
6 f f1
7 g g1
8 h h1

statement ok
drop source s cascade;

system ok
rpk topic delete shared_source_alter;

# TODO: test alter source with schema registry
2 changes: 2 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ message DropSourceResponse {
WaitVersion version = 2;
}

// Only for non-shared source
message AlterSourceRequest {
catalog.Source source = 1;
}

// Only for non-shared source
message AlterSourceResponse {
common.Status status = 1;
WaitVersion version = 2;
Expand Down
37 changes: 36 additions & 1 deletion src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_pb::catalog::{
PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::replace_job_plan::{ReplaceJob, ReplaceSource, ReplaceTable};
use risingwave_pb::ddl_service::{
alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
create_connection_request, PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType,
Expand Down Expand Up @@ -99,6 +100,13 @@ pub trait CatalogWriter: Send + Sync {
job_type: TableJobType,
) -> Result<()>;

async fn replace_source(
&self,
source: PbSource,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
) -> Result<()>;

async fn create_index(
&self,
index: PbIndex,
Expand Down Expand Up @@ -311,7 +319,34 @@ impl CatalogWriter for CatalogWriterImpl {
) -> Result<()> {
let version = self
.meta_client
.replace_table(source, table, graph, mapping, job_type)
.replace_job(
graph,
mapping,
ReplaceJob::ReplaceTable(ReplaceTable {
source,
table: Some(table),
job_type: job_type as _,
}),
)
.await?;
self.wait_version(version).await
}

async fn replace_source(
&self,
source: PbSource,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
) -> Result<()> {
let version = self
.meta_client
.replace_job(
graph,
mapping,
ReplaceJob::ReplaceSource(ReplaceSource {
source: Some(source),
}),
)
.await?;
self.wait_version(version).await
}
Expand Down
36 changes: 28 additions & 8 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::max_column_id;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_sqlparser::ast::{
AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement,
};
use risingwave_sqlparser::parser::Parser;

use super::create_source::generate_stream_graph_for_source;
use super::create_table::bind_sql_columns;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
Expand All @@ -38,7 +39,7 @@ pub async fn handle_alter_source_column(
operation: AlterSourceOperation,
) -> Result<RwPgResponse> {
// Get original definition
let session = handler_args.session;
let session = handler_args.session.clone();
let db_name = session.database();
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
Expand Down Expand Up @@ -66,9 +67,6 @@ pub async fn handle_alter_source_column(
)
.into());
};
if catalog.info.is_shared() {
bail_not_implemented!(issue = 16003, "alter shared source");
}

// Currently only allow source without schema registry
let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?;
Expand Down Expand Up @@ -96,6 +94,7 @@ pub async fn handle_alter_source_column(
SourceEncode::Json | SourceEncode::Csv | SourceEncode::Bytes | SourceEncode::Parquet => {}
}

let old_columns = catalog.columns.clone();
let columns = &mut catalog.columns;
match operation {
AlterSourceOperation::AddColumn { column_def } => {
Expand All @@ -121,9 +120,30 @@ pub async fn handle_alter_source_column(
catalog.version += 1;

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_source(catalog.to_prost(schema_id, db_id))
.await?;
if catalog.info.is_shared() {
let graph = generate_stream_graph_for_source(handler_args, catalog.clone())?;

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
old_columns
.iter()
.map(|old_c| {
catalog
.columns
.iter()
.position(|new_c| new_c.column_id() == old_c.column_id())
})
.collect(),
catalog.columns.len(),
);
catalog_writer
.replace_source(catalog.to_prost(schema_id, db_id), graph, col_index_mapping)
.await?
} else {
catalog_writer
.alter_source(catalog.to_prost(schema_id, db_id))
.await?
};

Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
}
Expand Down
Loading

0 comments on commit ca9b4c1

Please sign in to comment.