Skip to content

Commit

Permalink
feat(frontend): support alter shared source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 4, 2024
1 parent 0ab300f commit 7c0ac3f
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 8 deletions.
138 changes: 138 additions & 0 deletions e2e_test/source_inline/kafka/alter/add_column_shared.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
control substitution on

system ok
rpk topic create shared_source_alter -p 4

system ok
cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a", "v3": "a1"}
1 {"v1": 2, "v2": "b", "v3": "b1"}
2 {"v1": 3, "v2": "c", "v3": "c1"}
3 {"v1": 4, "v2": "d", "v3": "d1"}
EOF

statement ok
create source s (v1 int, v2 varchar, v3 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source_alter',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;


statement ok
create materialized view mv_before_alter as select * from s;

sleep 2s

query ?? rowsort
select * from s;
----
1 a
2 b
3 c
4 d

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d


statement ok
alter source s add column v3 varchar;

# New MV will have v3.

statement ok
create materialized view mv_after_alter as select * from s;

query ??? rowsort
select * from mv_after_alter;
----
1 a a1
2 b b1
3 c c1
4 d d1

# Batch select from source will have v3.

query ??? rowsort
select * from s;
----
1 a a1
2 b b1
3 c c1
4 d d1

# Old MV is not affected.

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d

# Produce new data.

system ok
cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0
0 {"v1": 5, "v2": "e", "v3": "e1"}
1 {"v1": 6, "v2": "f", "v3": "f1"}
2 {"v1": 7, "v2": "g", "v3": "g1"}
3 {"v1": 8, "v2": "h", "v3": "h1"}
EOF

sleep 2s


query ??? rowsort
select * from mv_after_alter;
----
1 a a1
2 b b1
3 c c1
4 d d1
5 e e1
6 f f1
7 g g1
8 h h1


# Batch select from source will have v3.

query ??? rowsort
select * from s;
----
1 a a1
2 b b1
3 c c1
4 d d1
5 e e1
6 f f1
7 g g1
8 h h1

# Old MV is not affected.

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d
5 e
6 f
7 g
8 h


statement ok
drop source s cascade;

# TODO: test alter source with schema registry
2 changes: 2 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ message DropSourceResponse {

message AlterSourceRequest {
catalog.Source source = 1;
// for shared source, we need to replace the streaming job
optional ReplaceStreamingJobPlan plan = 2;
}

message AlterSourceResponse {
Expand Down
17 changes: 14 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,11 @@ pub trait CatalogWriter: Send + Sync {
async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;

/// Replace the source in the catalog.
async fn alter_source(&self, source: PbSource) -> Result<()>;
async fn alter_source(
&self,
source: PbSource,
replace_streaming_job_plan: Option<PbReplaceStreamingJobPlan>,
) -> Result<()>;

async fn alter_parallelism(
&self,
Expand Down Expand Up @@ -498,8 +502,15 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn alter_source(&self, source: PbSource) -> Result<()> {
let version = self.meta_client.alter_source(source).await?;
async fn alter_source(
&self,
source: PbSource,
replace_streaming_job_plan: Option<PbReplaceStreamingJobPlan>,
) -> Result<()> {
let version = self
.meta_client
.alter_source(source, replace_streaming_job_plan)
.await?;
self.wait_version(version).await
}

Expand Down
72 changes: 71 additions & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::max_column_id;
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_pb::catalog::PbSource;
use risingwave_sqlparser::ast::{
AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement,
};
Expand All @@ -25,6 +26,7 @@ use risingwave_sqlparser::parser::Parser;
use super::create_table::bind_sql_columns;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
use crate::error::{ErrorCode, Result, RwError};
use crate::Binder;

Expand Down Expand Up @@ -121,8 +123,9 @@ pub async fn handle_alter_source_column(
catalog.version += 1;

let catalog_writer = session.catalog_writer()?;
let replace_plan = todo!();
catalog_writer
.alter_source(catalog.to_prost(schema_id, db_id))
.alter_source(catalog.to_prost(schema_id, db_id), replace_plan)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
Expand All @@ -149,6 +152,73 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul
Ok(stmt.to_string())
}

pub async fn get_replace_source_plan(
session: &Arc<SessionImpl>,
table_name: ObjectName,
new_definition: Statement,
old_catalog: &Arc<SourceCatalog>,
) -> Result<(PbSource, StreamFragmentGraph, ColIndexMapping, TableJobType)> {
// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
// let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
let Statement::CreateTable {
columns,
constraints,
source_watermarks,
append_only,
on_conflict,
with_version_column,
wildcard_idx,
cdc_table_info,
format_encode,
include_column_options,
..
} = new_definition
else {
panic!("unexpected statement type: {:?}", new_definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());

let (mut graph, source, job_type) = generate_stream_graph_for_replace_table(
session,
table_name,
old_catalog,
format_encode,
handler_args.clone(),
col_id_gen,
columns.clone(),
wildcard_idx,
constraints,
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
new_version_columns,
include_column_options,
)
.await?;

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
old_catalog
.columns()
.iter()
.map(|old_c| {
source.columns.iter().position(|new_c| {
new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id()
})
})
.collect(),
table.columns.len(),
);

Ok((source, graph, col_index_mapping, job_type))
}

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub async fn handle_alter_source_with_sr(
pb_source.version += 1;

let catalog_writer = session.catalog_writer()?;
catalog_writer.alter_source(pb_source).await?;
catalog_writer.alter_source(pb_source, todo!()).await?;

Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
}
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,11 @@ impl CatalogWriter for MockCatalogWriter {
}
}

async fn alter_source(&self, source: PbSource) -> Result<()> {
async fn alter_source(
&self,
source: PbSource,
_replace_streaming_job_plan: Option<PbReplaceStreamingJobPlan>,
) -> Result<()> {
self.catalog.write().update_source(&source);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use risingwave_pb::catalog::{
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_name_request, alter_set_schema_request, alter_swap_rename_request, DdlProgress,
TableJobType, WaitVersion,
PbReplaceStreamingJobPlan, TableJobType, WaitVersion,
};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::PbFragment;
Expand Down
7 changes: 6 additions & 1 deletion src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,14 @@ impl MetaClient {
.ok_or_else(|| anyhow!("wait version not set"))?)
}

pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
pub async fn alter_source(
&self,
source: PbSource,
plan: Option<PbReplaceStreamingJobPlan>,
) -> Result<WaitVersion> {
let request = AlterSourceRequest {
source: Some(source),
plan,
};
let resp = self.inner.alter_source(request).await?;
Ok(resp
Expand Down

0 comments on commit 7c0ac3f

Please sign in to comment.