Skip to content

Commit

Permalink
feat: support alter source add column (#11350)
Browse files Browse the repository at this point in the history
  • Loading branch information
wugouzi authored Aug 24, 2023
1 parent 451c387 commit 4d1a8e9
Show file tree
Hide file tree
Showing 26 changed files with 565 additions and 9 deletions.
6 changes: 6 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ cargo make ci-start ci-pubsub
cargo run --bin prepare_ci_pubsub
sqllogictest -p 4566 -d dev './e2e_test/source/basic/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/basic/old_row_format_syntax/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka.slt'

echo "--- e2e, kafka alter source"
chmod +x ./scripts/source/prepare_data_after_alter.sh
./scripts/source/prepare_data_after_alter.sh 2
sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data.slt'

echo "--- Run CH-benCHmark"
./risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
Expand Down
113 changes: 113 additions & 0 deletions e2e_test/source/basic/alter/kafka.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
statement ok
CREATE SOURCE s1 (v1 int) with (
connector = 'kafka',
topic = 'kafka_alter',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE s2 (v2 varchar) with (
connector = 'kafka',
topic = 'kafka_alter',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
create materialized view mv1 as select * from s1;

statement ok
create materialized view mv2 as select * from s2;

sleep 10s

statement ok
flush;

query I
select * from s1;
----
1

query T
select * from s2;
----
11

# alter source
statement ok
alter source s1 add column v2 varchar;

# alter source with null column
statement ok
alter source s2 add column v4 int;

statement ok
create materialized view mv3 as select * from s1;

statement ok
create materialized view mv4 as select * from s2;

sleep 10s

statement ok
flush;

query IT
select * from s1
----
1 11

query TI
select * from s2
----
11 NULL

query I
select * from mv1
----
1

query T
select * from mv2
----
11

query IT
select * from mv3
----
1 11

query TI
select * from mv4
----
11 NULL

# alter source again
statement ok
alter source s1 add column v3 int;

statement ok
create materialized view mv5 as select * from s1;

sleep 10s

statement ok
flush;

query ITI
select * from s1
----
1 11 111

query ITI
select * from mv5
----
1 11 111

# check definition after altering
query TT
show create source s1;
----
public.s1 CREATE SOURCE s1 (v1 INT, v2 CHARACTER VARYING, v3 INT) WITH (connector = 'kafka', topic = 'kafka_alter', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest') FORMAT PLAIN ENCODE JSON
67 changes: 67 additions & 0 deletions e2e_test/source/basic/alter/kafka_after_new_data.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
sleep 5s

statement ok
flush;

query IT rowsort
select * from s1
----
1 11 111
2 22 222

query I rowsort
select * from mv1
----
1
2

query IT rowsort
select * from mv3
----
1 11
2 22

query TI rowsort
select * from s2
----
11 NULL
22 NULL

query T rowsort
select * from mv2
----
11
22

query TI rowsort
select * from mv4
----
11 NULL
22 NULL

query ITI rowsort
select * from mv5
----
1 11 111
2 22 222

statement ok
drop materialized view mv1

statement ok
drop materialized view mv2

statement ok
drop materialized view mv3

statement ok
drop materialized view mv4

statement ok
drop materialized view mv5

statement ok
drop source s1

statement ok
drop source s2
6 changes: 6 additions & 0 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,12 @@ select count(*) from s16
----
0

statement error Feature is not yet implemented: Alter source with schema registry
alter source s18 add column v10 int;

statement error Feature is not yet implemented: Alter source with schema registry
alter source s17 add column v10 int;

query III rowsort
select * from s21;
----
Expand Down
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ message Source {

optional uint64 initialized_at_epoch = 15;
optional uint64 created_at_epoch = 16;

// Per-source catalog version, used by schema change.
uint64 version = 100;
}

enum SinkType {
Expand Down
10 changes: 10 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ message DropSourceResponse {
uint64 version = 2;
}

message AlterSourceRequest {
catalog.Source source = 1;
}

message AlterSourceResponse {
common.Status status = 1;
uint64 version = 2;
}

message CreateSinkRequest {
catalog.Sink sink = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
Expand Down Expand Up @@ -316,6 +325,7 @@ service DdlService {
rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse);
rpc CreateTable(CreateTableRequest) returns (CreateTableResponse);
rpc AlterRelationName(AlterRelationNameRequest) returns (AlterRelationNameResponse);
rpc AlterSource(AlterSourceRequest) returns (AlterSourceResponse);
rpc DropTable(DropTableRequest) returns (DropTableResponse);
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
rpc CreateView(CreateViewRequest) returns (CreateViewResponse);
Expand Down
1 change: 1 addition & 0 deletions scripts/source/alter_data/kafka_alter.2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"v1": 2, "v2": "22", "v3": 222}
18 changes: 18 additions & 0 deletions scripts/source/prepare_data_after_alter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -e

KCAT_BIN="kcat"
# kcat bin name on linux is "kafkacat"
if [ "$(uname)" == "Linux" ]
then
KCAT_BIN="kafkacat"
fi

SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)"
cd "$SCRIPT_PATH/.." || exit 1

FILE="./source/alter_data/kafka_alter.$1"
echo "Send data from $FILE"
cat $FILE | ${KCAT_BIN} -P -b message_queue:29092 -t kafka_alter
1 change: 1 addition & 0 deletions scripts/source/test_data/kafka_alter.1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"v1": 1, "v2": "11", "v3": 111}
4 changes: 4 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub type CatalogVersion = u64;
pub type TableVersionId = u64;
/// The default version ID for a new table.
pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
/// The version number of the per-source catalog.
pub type SourceVersionId = u64;
/// The default version ID for a new source.
pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;

pub const DEFAULT_DATABASE_NAME: &str = "dev";
pub const DEFAULT_SCHEMA_NAME: &str = "public";
Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use parking_lot::lock_api::ArcRwLockReadGuard;
use parking_lot::{RawRwLock, RwLock};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
Expand All @@ -30,7 +30,7 @@ use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;

use super::root_catalog::Catalog;
use super::DatabaseId;
use super::{DatabaseId, TableId};
use crate::user::UserId;

pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
Expand Down Expand Up @@ -86,6 +86,8 @@ pub trait CatalogWriter: Send + Sync {
mapping: ColIndexMapping,
) -> Result<()>;

async fn alter_source_column(&self, source: PbSource) -> Result<()>;

async fn create_index(
&self,
index: PbIndex,
Expand Down Expand Up @@ -220,6 +222,11 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn alter_source_column(&self, source: PbSource) -> Result<()> {
let version = self.meta_client.alter_source_column(source).await?;
self.wait_version(version).await
}

async fn replace_table(
&self,
table: PbTable,
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,17 @@ impl Catalog {
.ok_or_else(|| CatalogError::NotFound("schema_id", schema_id.to_string()))
}

pub fn get_source_by_id(
&self,
db_id: &DatabaseId,
schema_id: &SchemaId,
source_id: &SourceId,
) -> CatalogResult<&Arc<SourceCatalog>> {
self.get_schema_by_id(db_id, schema_id)?
.get_source_by_id(source_id)
.ok_or_else(|| CatalogError::NotFound("source_id", source_id.to_string()))
}

/// Refer to [`SearchPath`].
pub fn first_valid_schema(
&self,
Expand Down
Loading

0 comments on commit 4d1a8e9

Please sign in to comment.