Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): support alter cdc table schema #17334

Merged
merged 19 commits into from
Jul 7, 2024
1 change: 1 addition & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- PG
DROP TABLE IF EXISTS shipments;
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
Expand Down
242 changes: 242 additions & 0 deletions e2e_test/source/cdc_inline/alter/cdc_table_alter.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
control substitution on

# mysql env vars will be read from the `.risingwave/config/risedev-env` file

system ok
mysql -e "
SET GLOBAL time_zone = '+00:00';
"

system ok
mysql -e "
DROP DATABASE IF EXISTS testdb1;
CREATE DATABASE testdb1;
USE testdb1;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,'scooter','Small 2-wheel scooter'),
(default,'car battery','12V car battery'),
(default,'12-pack drill','12-pack of drill bits with sizes ranging from #40 to #3'),
(default,'hammer','12oz carpenter s hammer'),
(default,'hammer','14oz carpenter s hammer'),
(default,'hammer','16oz carpenter s hammer'),
(default,'rocks','box of assorted rocks'),
(default,'jacket','water resistent black wind breaker'),
(default,'spare tire','24 inch spare tire');
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
"

statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST}',
port = '${MYSQL_TCP_PORT}',
username = 'root',
password = '${MYSQL_PWD}',
database.name = 'testdb1',
server.id = '5185'
);

statement ok
create table my_products ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) from mysql_source table 'testdb1.products';

statement ok
create table my_orders (
order_id int,
order_date timestamp,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) from mysql_source table 'testdb1.orders';

system ok
psql -c "
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
DROP TABLE IF EXISTS shipments1;
CREATE TABLE shipments1 (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments1_shipment_id_seq RESTART WITH 1001;
INSERT INTO shipments1
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
"

statement ok
create source pg_source with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
slot.name = 'cdc_alter_test'
);

statement ok
create table pg_shipments (
shipment_id INTEGER,
order_id INTEGER,
origin STRING,
destination STRING,
is_arrived boolean,
PRIMARY KEY (shipment_id)
) from pg_source table 'public.shipments1';

# Create a mview join orders, products and shipments
statement ok
create materialized view enriched_orders as SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM my_orders AS o
LEFT JOIN my_products AS p ON o.product_id = p.id
LEFT JOIN pg_shipments AS s ON o.order_id = s.order_id;


sleep 3s

query III
select order_id, product_id, shipment_id from enriched_orders order by order_id;
----
10001 102 1001
10002 105 1002
10003 106 1003


# alter mysql tables
system ok
mysql -e "
USE testdb1;
ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 0.0;
ALTER TABLE orders ADD COLUMN order_comment VARCHAR(255);
"

# alter cdc tables
statement ok
ALTER TABLE my_products ADD COLUMN weight DECIMAL;

statement ok
ALTER TABLE my_orders ADD COLUMN order_comment VARCHAR;

# wait alter ddl
sleep 3s

query ITTT
SELECT id,name,description,weight FROM my_products order by id limit 3
----
101 scooter Small 2-wheel scooter NULL
102 car battery 12V car battery NULL
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL


# update mysql tables
system ok
mysql -e "
USE testdb1;
UPDATE products SET weight = 10.5 WHERE id = 101;
UPDATE products SET weight = 12.5 WHERE id = 102;
UPDATE orders SET order_comment = 'very good' WHERE order_id = 10001;
"

sleep 3s

query ITTT
SELECT id,name,description,weight FROM my_products order by id limit 3
----
101 scooter Small 2-wheel scooter 10.50
102 car battery 12V car battery 12.50
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL

query ITTT
SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM my_orders order by order_id limit 2
----
10001 2020-07-30 10:08:22 Jark 102 0 very good
10002 2020-07-30 10:11:09 Sally 105 0 NULL


# alter mysql tables
system ok
mysql -e "
USE testdb1;
ALTER TABLE products DROP COLUMN weight;
"

# alter cdc table to drop column
statement ok
ALTER TABLE my_products DROP COLUMN weight;

# wait alter ddl
sleep 3s

query TTTT
describe my_products;
----
id integer false NULL
name character varying false NULL
description character varying false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description my_products NULL NULL


# alter pg table
system ok
psql -c "
ALTER TABLE shipments1 DROP COLUMN destination;
"

statement error unable to drop the column due to being referenced by downstream materialized views or sinks
ALTER TABLE pg_shipments DROP COLUMN destination;

# wait alter ddl
sleep 3s

# query mv again
query III
select order_id, product_id, shipment_id from enriched_orders order by order_id;
----
10001 102 1001
10002 105 1002
10003 106 1003

statement ok
drop materialized view enriched_orders;

statement ok
drop table my_orders;

statement ok
create table orders_test (*) from mysql_source table 'testdb1.orders';

statement error Not supported: alter a table with empty column definitions
ALTER TABLE orders_test ADD COLUMN order_comment VARCHAR;

statement ok
drop source mysql_source cascade;

statement ok
drop source pg_source cascade;
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ message ReplaceTablePlan {
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
TableJobType job_type = 5;
}

message ReplaceTablePlanRequest {
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_pb::catalog::{
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan,
PbTableJobType, ReplaceTablePlan,
PbTableJobType, ReplaceTablePlan, TableJobType,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -92,6 +92,7 @@ pub trait CatalogWriter: Send + Sync {
table: PbTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
job_type: TableJobType,
) -> Result<()>;

async fn alter_source_column(&self, source: PbSource) -> Result<()>;
Expand Down Expand Up @@ -316,10 +317,11 @@ impl CatalogWriter for CatalogWriterImpl {
table: PbTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
job_type: TableJobType,
) -> Result<()> {
let version = self
.meta_client
.replace_table(source, table, graph, mapping)
.replace_table(source, table, graph, mapping, job_type)
.await?;
self.wait_version(version).await
}
Expand Down
17 changes: 13 additions & 4 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ pub async fn replace_table_with_definition(
on_conflict,
with_version_column,
wildcard_idx,
cdc_table_info,
..
} = definition
else {
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table, source) = generate_stream_graph_for_table(
let (graph, table, source, job_type) = generate_stream_graph_for_table(
session,
table_name,
original_catalog,
Expand All @@ -72,6 +73,7 @@ pub async fn replace_table_with_definition(
append_only,
on_conflict,
with_version_column,
cdc_table_info,
)
.await?;

Expand All @@ -92,7 +94,7 @@ pub async fn replace_table_with_definition(
let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(source, table, graph, col_index_mapping)
.replace_table(source, table, graph, col_index_mapping, job_type)
.await?;
Ok(())
}
Expand Down Expand Up @@ -145,6 +147,13 @@ pub async fn handle_alter_table_column(
}
}

if columns.is_empty() {
Err(ErrorCode::NotSupported(
"alter a table with empty column definitions".to_string(),
"Please recreate the table with column definitions.".to_string(),
))?
}

match operation {
AlterTableOperation::AddColumn {
column_def: new_column,
Expand All @@ -171,7 +180,7 @@ pub async fn handle_alter_table_column(
))?
}

// Add the new column to the table definition.
// Add the new column to the table definition if it is not created by `create table (*)` syntax.
columns.push(new_column);
}

Expand Down Expand Up @@ -210,7 +219,7 @@ pub async fn handle_alter_table_column(
}

_ => unreachable!(),
}
};

replace_table_with_definition(
&session,
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL,
};
use risingwave_pb::catalog::{PbSource, Table};
use risingwave_pb::ddl_service::ReplaceTablePlan;
use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode};
Expand Down Expand Up @@ -466,6 +466,7 @@ pub async fn handle_create_sink(
table: Some(table),
fragment_graph: Some(graph),
table_col_index_mapping: None,
job_type: TableJobType::General as _,
});
}

Expand Down Expand Up @@ -634,7 +635,7 @@ pub(crate) async fn reparse_table_for_sink(
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table, source) = generate_stream_graph_for_table(
let (graph, table, source, _) = generate_stream_graph_for_table(
session,
table_name,
table_catalog,
Expand All @@ -648,6 +649,7 @@ pub(crate) async fn reparse_table_for_sink(
append_only,
on_conflict,
with_version_column,
None,
)
.await?;

Expand Down
Loading
Loading