Skip to content

Commit

Permalink
feat(cdc): support alter cdc table schema (#17334)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Jul 7, 2024
1 parent 69effdd commit 739d005
Show file tree
Hide file tree
Showing 16 changed files with 502 additions and 64 deletions.
1 change: 1 addition & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- PG
DROP TABLE IF EXISTS shipments;
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
Expand Down
242 changes: 242 additions & 0 deletions e2e_test/source/cdc_inline/alter/cdc_table_alter.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
control substitution on

# mysql env vars will be read from the `.risingwave/config/risedev-env` file

system ok
mysql -e "
SET GLOBAL time_zone = '+00:00';
"

system ok
mysql -e "
DROP DATABASE IF EXISTS testdb1;
CREATE DATABASE testdb1;
USE testdb1;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,'scooter','Small 2-wheel scooter'),
(default,'car battery','12V car battery'),
(default,'12-pack drill','12-pack of drill bits with sizes ranging from #40 to #3'),
(default,'hammer','12oz carpenter s hammer'),
(default,'hammer','14oz carpenter s hammer'),
(default,'hammer','16oz carpenter s hammer'),
(default,'rocks','box of assorted rocks'),
(default,'jacket','water resistent black wind breaker'),
(default,'spare tire','24 inch spare tire');
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
"

statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST}',
port = '${MYSQL_TCP_PORT}',
username = 'root',
password = '${MYSQL_PWD}',
database.name = 'testdb1',
server.id = '5185'
);

statement ok
create table my_products ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) from mysql_source table 'testdb1.products';

statement ok
create table my_orders (
order_id int,
order_date timestamp,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) from mysql_source table 'testdb1.orders';

system ok
psql -c "
DROP TABLE IF EXISTS shipments1;
CREATE TABLE shipments1 (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments1_shipment_id_seq RESTART WITH 1001;
INSERT INTO shipments1
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
"

statement ok
create source pg_source with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
slot.name = 'cdc_alter_test'
);

statement ok
create table pg_shipments (
shipment_id INTEGER,
order_id INTEGER,
origin STRING,
destination STRING,
is_arrived boolean,
PRIMARY KEY (shipment_id)
) from pg_source table 'public.shipments1';

# Create a mview join orders, products and shipments
statement ok
create materialized view enriched_orders as SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM my_orders AS o
LEFT JOIN my_products AS p ON o.product_id = p.id
LEFT JOIN pg_shipments AS s ON o.order_id = s.order_id;


sleep 3s

query III
select order_id, product_id, shipment_id from enriched_orders order by order_id;
----
10001 102 1001
10002 105 1002
10003 106 1003


# alter mysql tables
system ok
mysql -e "
USE testdb1;
ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 0.0;
ALTER TABLE orders ADD COLUMN order_comment VARCHAR(255);
"

# alter cdc tables
statement ok
ALTER TABLE my_products ADD COLUMN weight DECIMAL;

statement ok
ALTER TABLE my_orders ADD COLUMN order_comment VARCHAR;

# wait alter ddl
sleep 3s

query ITTT
SELECT id,name,description,weight FROM my_products order by id limit 3
----
101 scooter Small 2-wheel scooter NULL
102 car battery 12V car battery NULL
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL


# update mysql tables
system ok
mysql -e "
USE testdb1;
UPDATE products SET weight = 10.5 WHERE id = 101;
UPDATE products SET weight = 12.5 WHERE id = 102;
UPDATE orders SET order_comment = 'very good' WHERE order_id = 10001;
"

sleep 3s

query ITTT
SELECT id,name,description,weight FROM my_products order by id limit 3
----
101 scooter Small 2-wheel scooter 10.50
102 car battery 12V car battery 12.50
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL

query ITTT
SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM my_orders order by order_id limit 2
----
10001 2020-07-30 10:08:22 Jark 102 0 very good
10002 2020-07-30 10:11:09 Sally 105 0 NULL


# alter mysql tables
system ok
mysql -e "
USE testdb1;
ALTER TABLE products DROP COLUMN weight;
"

# alter cdc table to drop column
statement ok
ALTER TABLE my_products DROP COLUMN weight;

# wait alter ddl
sleep 3s

query TTTT
describe my_products;
----
id integer false NULL
name character varying false NULL
description character varying false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description my_products NULL NULL


# alter pg table
system ok
psql -c "
ALTER TABLE shipments1 DROP COLUMN destination;
"

statement error unable to drop the column due to being referenced by downstream materialized views or sinks
ALTER TABLE pg_shipments DROP COLUMN destination;

# wait alter ddl
sleep 3s

# query mv again
query III
select order_id, product_id, shipment_id from enriched_orders order by order_id;
----
10001 102 1001
10002 105 1002
10003 106 1003

statement ok
drop materialized view enriched_orders;

statement ok
drop table my_orders;

statement ok
create table orders_test (*) from mysql_source table 'testdb1.orders';

statement error Not supported: alter a table with empty column definitions
ALTER TABLE orders_test ADD COLUMN order_comment VARCHAR;

statement ok
drop source mysql_source cascade;

statement ok
drop source pg_source cascade;
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ message ReplaceTablePlan {
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
TableJobType job_type = 5;
}

message ReplaceTablePlanRequest {
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_pb::catalog::{
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan,
PbTableJobType, ReplaceTablePlan,
PbTableJobType, ReplaceTablePlan, TableJobType,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -92,6 +92,7 @@ pub trait CatalogWriter: Send + Sync {
table: PbTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
job_type: TableJobType,
) -> Result<()>;

async fn alter_source_column(&self, source: PbSource) -> Result<()>;
Expand Down Expand Up @@ -309,10 +310,11 @@ impl CatalogWriter for CatalogWriterImpl {
table: PbTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
job_type: TableJobType,
) -> Result<()> {
let version = self
.meta_client
.replace_table(source, table, graph, mapping)
.replace_table(source, table, graph, mapping, job_type)
.await?;
self.wait_version(version).await
}
Expand Down
17 changes: 13 additions & 4 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ pub async fn replace_table_with_definition(
on_conflict,
with_version_column,
wildcard_idx,
cdc_table_info,
..
} = definition
else {
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table, source) = generate_stream_graph_for_table(
let (graph, table, source, job_type) = generate_stream_graph_for_table(
session,
table_name,
original_catalog,
Expand All @@ -72,6 +73,7 @@ pub async fn replace_table_with_definition(
append_only,
on_conflict,
with_version_column,
cdc_table_info,
)
.await?;

Expand All @@ -92,7 +94,7 @@ pub async fn replace_table_with_definition(
let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(source, table, graph, col_index_mapping)
.replace_table(source, table, graph, col_index_mapping, job_type)
.await?;
Ok(())
}
Expand Down Expand Up @@ -145,6 +147,13 @@ pub async fn handle_alter_table_column(
}
}

if columns.is_empty() {
Err(ErrorCode::NotSupported(
"alter a table with empty column definitions".to_string(),
"Please recreate the table with column definitions.".to_string(),
))?
}

match operation {
AlterTableOperation::AddColumn {
column_def: new_column,
Expand All @@ -171,7 +180,7 @@ pub async fn handle_alter_table_column(
))?
}

// Add the new column to the table definition.
// Add the new column to the table definition if it is not created by `create table (*)` syntax.
columns.push(new_column);
}

Expand Down Expand Up @@ -210,7 +219,7 @@ pub async fn handle_alter_table_column(
}

_ => unreachable!(),
}
};

replace_table_with_definition(
&session,
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL,
};
use risingwave_pb::catalog::{PbSource, Table};
use risingwave_pb::ddl_service::ReplaceTablePlan;
use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode};
Expand Down Expand Up @@ -466,6 +466,7 @@ pub async fn handle_create_sink(
table: Some(table),
fragment_graph: Some(graph),
table_col_index_mapping: None,
job_type: TableJobType::General as _,
});
}

Expand Down Expand Up @@ -634,7 +635,7 @@ pub(crate) async fn reparse_table_for_sink(
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table, source) = generate_stream_graph_for_table(
let (graph, table, source, _) = generate_stream_graph_for_table(
session,
table_name,
table_catalog,
Expand All @@ -648,6 +649,7 @@ pub(crate) async fn reparse_table_for_sink(
append_only,
on_conflict,
with_version_column,
None,
)
.await?;

Expand Down
Loading

0 comments on commit 739d005

Please sign in to comment.