diff --git a/Cargo.lock b/Cargo.lock index 633698b84ac36..ea8e7d9276450 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3929,7 +3929,7 @@ dependencies = [ name = "delta_btree_map" version = "2.2.0-alpha" dependencies = [ - "educe", + "educe 0.6.0", "enum-as-inner 0.6.0", ] @@ -4454,6 +4454,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4bd92664bf78c4d3dba9b7cdafce6fa15b13ed3ed16175218196942e99168a8" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "educe" version = "0.6.0" @@ -10648,7 +10660,7 @@ dependencies = [ "criterion", "darwin-libproc", "easy-ext", - "educe", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "enumflags2", @@ -10740,7 +10752,7 @@ name = "risingwave_common_estimate_size" version = "2.2.0-alpha" dependencies = [ "bytes", - "educe", + "educe 0.6.0", "ethnum", "fixedbitset 0.5.0", "jsonbb", @@ -11055,7 +11067,7 @@ dependencies = [ "rustls-pemfile 2.2.0", "rustls-pki-types", "rw_futures_util", - "sea-schema 0.15.0", + "sea-schema", "serde", "serde_derive", "serde_json", @@ -11230,7 +11242,7 @@ dependencies = [ "const-currying", "downcast-rs", "easy-ext", - "educe", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -11273,7 +11285,7 @@ dependencies = [ "chrono", "chrono-tz 0.10.0", "criterion", - "educe", + "educe 0.6.0", "expect-test", "fancy-regex", "futures-async-stream", @@ -11340,7 +11352,7 @@ dependencies = [ "downcast-rs", "dyn-clone", "easy-ext", - "educe", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -11593,7 +11605,7 @@ dependencies = [ "comfy-table", "crepe", "easy-ext", - "educe", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -11695,6 +11707,7 @@ name = "risingwave_meta_model_migration" version = "2.2.0-alpha" dependencies = [ "async-std", + "easy-ext", "sea-orm", "sea-orm-migration", "serde", @@ -11708,7 +11721,7 @@ version = "2.2.0-alpha" dependencies = [ "anyhow", "clap", - "educe", + "educe 0.6.0", "either", "futures", "hex", @@ -12133,7 +12146,7 @@ dependencies = [ "cfg-if", "criterion", "delta_btree_map", - "educe", + "educe 0.6.0", "either", "enum-as-inner 0.6.0", "expect-test", @@ -12781,9 +12794,9 @@ dependencies = [ [[package]] name = "sea-orm" -version = "0.12.15" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8814e37dc25de54398ee62228323657520b7f29713b8e238649385dbe473ee0" +checksum = "ea1fee0cf8528dbe6eda29d5798afc522a63b75e44c5b15721e6e64af9c7cc4b" dependencies = [ "async-stream", "async-trait", @@ -12794,12 +12807,12 @@ dependencies = [ "ouroboros", "rust_decimal", "sea-orm-macros", - "sea-query 0.30.7", - "sea-query-binder 0.5.0", + "sea-query", + "sea-query-binder", "serde", "serde_json", "sqlx", - "strum 0.25.0", + "strum 0.26.3", "thiserror", "time", "tracing", @@ -12809,16 +12822,16 @@ dependencies = [ [[package]] name = "sea-orm-cli" -version = "0.12.15" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "620bc560062ae251b1366bde43b3f1508445cab5c2c8cbdb397034638ab1b357" +checksum = "5f0b8869c75cf3fbb1bd860abb025033cd2e514c5f4fa43e792697cb1fe6c882" dependencies = [ "chrono", "clap", "dotenvy", "glob", "regex", - "sea-schema 0.14.2", + "sea-schema", "tracing", "tracing-subscriber", "url", @@ -12826,9 +12839,9 @@ dependencies = [ [[package]] name = "sea-orm-macros" -version = "0.12.15" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e115c6b078e013aa963cc2d38c196c2c40b05f03d0ac872fe06b6e0d5265603" +checksum = "8737b566799ed0444f278d13c300c4c6f1a91782f60ff5825a591852d5502030" dependencies = [ "heck 0.4.1", "proc-macro2", @@ -12840,9 +12853,9 @@ dependencies = [ [[package]] name = "sea-orm-migration" -version = "0.12.15" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee8269bc6ff71afd6b78aa4333ac237a69eebd2cdb439036291e64fb4b8db23c" +checksum = "216643749e26ce27ab6c51d3475f2692981d4a902d34455bcd322f412900df5c" dependencies = [ "async-trait", "clap", @@ -12850,20 +12863,20 @@ dependencies = [ "futures", "sea-orm", "sea-orm-cli", - "sea-schema 0.14.2", + "sea-schema", "tracing", "tracing-subscriber", ] [[package]] name = "sea-query" -version = "0.30.7" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4166a1e072292d46dc91f31617c2a1cdaf55a8be4b5c9f4bf2ba248e3ac4999b" +checksum = "b4fd043b8117af233e221f73e3ea8dfbc8e8c3c928017c474296db45c649105c" dependencies = [ "bigdecimal 0.3.1", "chrono", - "derivative", + "educe 0.5.11", "inherent", "ordered-float 3.9.1", "rust_decimal", @@ -12873,42 +12886,22 @@ dependencies = [ "uuid", ] -[[package]] -name = "sea-query" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5073b2cfed767511a57d18115f3b3d8bcb5690bf8c89518caec6cb22c0cd74" -dependencies = [ - "inherent", - "sea-query-derive", -] - [[package]] name = "sea-query-binder" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36bbb68df92e820e4d5aeb17b4acd5cc8b5d18b2c36a4dd6f4626aabfa7ab1b9" +checksum = "754965d4aee6145bec25d0898e5c931e6c22859789ce62fd85a42a15ed5a8ce3" dependencies = [ "bigdecimal 0.3.1", "chrono", "rust_decimal", - "sea-query 0.30.7", + "sea-query", "serde_json", "sqlx", "time", "uuid", ] -[[package]] -name = "sea-query-binder" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "754965d4aee6145bec25d0898e5c931e6c22859789ce62fd85a42a15ed5a8ce3" -dependencies = [ - "sea-query 0.31.0", - "sqlx", -] - [[package]] name = "sea-query-derive" version = "0.4.2" @@ -12923,17 +12916,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "sea-schema" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30d148608012d25222442d1ebbfafd1228dbc5221baf4ec35596494e27a2394e" -dependencies = [ - "futures", - "sea-query 0.30.7", - "sea-schema-derive 0.2.0", -] - [[package]] name = "sea-schema" version = "0.15.0" @@ -12941,24 +12923,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad52149fc81836ea7424c3425d8f6ed8ad448dd16d2e4f6a3907ba46f3f2fd78" dependencies = [ "futures", - "sea-query 0.31.0", - "sea-query-binder 0.6.0", - "sea-schema-derive 0.3.0", + "sea-query", + "sea-query-binder", + "sea-schema-derive", "sqlx", ] -[[package]] -name = "sea-schema-derive" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6f686050f76bffc4f635cda8aea6df5548666b830b52387e8bc7de11056d11e" -dependencies = [ - "heck 0.4.1", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "sea-schema-derive" version = "0.3.0" @@ -13711,7 +13681,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2b6b8f606d3c4cdcaf2031c4320b79d7584e454b79562ba3d675f49701c160e" dependencies = [ "async-trait", - "educe", + "educe 0.6.0", "fs-err", "futures", "glob", diff --git a/Cargo.toml b/Cargo.toml index 2000a730297f2..17ba5bf07551f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -175,12 +175,12 @@ parking_lot = { version = "0.12", features = [ "arc_lock", "deadlock_detection", ] } -sea-orm = { version = "0.12.15", features = [ - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", +sea-orm = { version = "~1.0", features = [ + "sqlx-all", "runtime-tokio-native-tls", + "with-uuid", ] } +sea-orm-migration = "~1.0" sqlx = { version = "0.7.4", default-features = false, features = [ "bigdecimal", "chrono", diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 52335fa592340..3f3cd705f09f9 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -466,29 +466,7 @@ steps: - label: "S3 source check on AWS (json parser)" key: "s3-v2-source-check-aws-json-parser" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2.py -t json" - if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-s3-source-tests" - || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - depends_on: build - plugins: - - seek-oss/aws-sm#v2.3.1: - env: - S3_SOURCE_TEST_CONF: ci_s3_source_test_aws - - docker-compose#v5.1.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - environment: - - S3_SOURCE_TEST_CONF - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 25 - retry: *auto-retry - - - label: "S3 source new file check on AWS (json)" - key: "s3-v2-source-new-file-check-aws" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2_new_file.py" + command: "ci/scripts/s3-source-test.sh -p ci-release -s file_source.py -t json" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -510,51 +488,7 @@ steps: - label: "S3 sink on parquet and json file" key: "s3-sink-parquet-and-json-encode" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink.py" - if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-s3-source-tests" - || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - depends_on: build - plugins: - - seek-oss/aws-sm#v2.3.1: - env: - S3_SOURCE_TEST_CONF: ci_s3_source_test_aws - - docker-compose#v5.1.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - environment: - - S3_SOURCE_TEST_CONF - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 25 - retry: *auto-retry - - - label: "S3 sink batching test" - key: "s3-sink-batching-test" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_sink_batch.py" - if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-s3-source-tests" - || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - depends_on: build - plugins: - - seek-oss/aws-sm#v2.3.1: - env: - S3_SOURCE_TEST_CONF: ci_s3_source_test_aws - - docker-compose#v5.1.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - environment: - - S3_SOURCE_TEST_CONF - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 25 - retry: *auto-retry - - - label: "S3 source batch read on AWS (json parser)" - key: "s3-v2-source-batch-read-check-aws-json-parser" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_batch.py -t json" + command: "ci/scripts/s3-source-test.sh -p ci-release -s file_sink.py" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -576,7 +510,7 @@ steps: - label: "S3 source check on AWS (csv parser)" key: "s3-v2-source-check-aws-csv-parser" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2.py -t csv_without_header" + command: "ci/scripts/s3-source-test.sh -p ci-release -s file_source.py -t csv_without_header" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -596,23 +530,6 @@ steps: timeout_in_minutes: 25 retry: *auto-retry - - label: "PosixFs source on OpenDAL fs engine (csv parser)" - key: "s3-source-test-for-opendal-fs-engine-csv-parser" - command: "ci/scripts/s3-source-test.sh -p ci-release -s posix_fs_source.py -t csv_without_header" - if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-s3-source-tests" - || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - depends_on: build - plugins: - - docker-compose#v5.1.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 25 - retry: *auto-retry - - label: "pulsar source check" key: "pulsar-source-tests" command: "ci/scripts/pulsar-source-test.sh -p ci-release" @@ -787,6 +704,46 @@ steps: timeout_in_minutes: 25 retry: *auto-retry + - label: "end-to-end test (postgres backend)" + key: e2e-test-postgres-backend + command: "ci/scripts/e2e-test.sh -p ci-release -m ci-3streaming-2serving-3fe-pg-backend" + if: | + !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-test-other-backends" + || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/ + depends_on: + - "build" + - "build-other" + - "docslt" + plugins: + - docker-compose#v5.1.0: + run: pg-mysql-backend-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + + - label: "end-to-end test (mysql backend)" + key: e2e-test-mysql-backend + command: "ci/scripts/e2e-test.sh -p ci-release -m ci-3streaming-2serving-3fe-mysql-backend" + if: | + !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-test-other-backends" + || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/ + depends_on: + - "build" + - "build-other" + - "docslt" + plugins: + - docker-compose#v5.1.0: + run: pg-mysql-backend-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + - label: "end-to-end test for opendal (parallel)" key: "e2e-test-opendal-parallel" command: "ci/scripts/e2e-test-parallel-for-opendal.sh -p ci-release" @@ -1195,4 +1152,4 @@ steps: # This should be the LAST part of the main-cron file. - label: "trigger failed test notification" if: build.pull_request.labels includes "ci/main-cron/test-notify" || build.branch == "main" - command: "ci/scripts/notify.py" \ No newline at end of file + command: "ci/scripts/notify.py" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 4b6079b72414d..5924f0fe49bfa 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -808,9 +808,7 @@ steps: - label: "end-to-end test (mysql backend)" command: "ci/scripts/e2e-test.sh -p ci-dev -m ci-3streaming-2serving-3fe-mysql-backend" - if: "false" - # TODO: enable this when all bugs of mysql backend are addressed. - # if: build.pull_request.labels includes "ci/run-e2e-test-other-backends" || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/ + if: build.pull_request.labels includes "ci/run-e2e-test-other-backends" || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/ depends_on: - "build" - "build-other" diff --git a/e2e_test/ddl/alter_swap_rename.slt b/e2e_test/ddl/alter_swap_rename.slt new file mode 100644 index 0000000000000..4accda4256951 --- /dev/null +++ b/e2e_test/ddl/alter_swap_rename.slt @@ -0,0 +1,181 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# Create initial tables and views for testing swap +statement ok +CREATE TABLE t1 (v1 INT primary key, v2 STRUCT>); + +statement ok +CREATE TABLE t2 (v1 INT primary key, v2 STRUCT>); + +# Insert some test data +statement ok +INSERT INTO t1 VALUES(1,(1,(1,2))); + +statement ok +INSERT INTO t2 VALUES(2,(2,(2,4))); + +# Create materialized views referencing the tables +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT v1, (t.v2).v1 AS v21 FROM t1 t; + +statement ok +CREATE MATERIALIZED VIEW mv2 AS SELECT v1, (t.v2).v1 AS v21 FROM t2 t; + +# Create regular views +statement ok +CREATE VIEW v1 AS SELECT t1.v1 FROM t1; + +statement ok +CREATE VIEW v2 AS SELECT t2.v2 FROM t2; + +# Create sources +statement ok +CREATE SOURCE src1 (v INT) WITH ( + connector = 'datagen', + fields.v.kind = 'sequence', + fields.v.start = '1', + fields.v.end = '5', + datagen.rows.per.second='10', + datagen.split.num = '1' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE src2 (v INT) WITH ( + connector = 'datagen', + fields.v.kind = 'sequence', + fields.v.start = '6', + fields.v.end = '10', + datagen.rows.per.second='10', + datagen.split.num = '1' +) FORMAT PLAIN ENCODE JSON; + +# Create sinks +statement ok +CREATE SINK sink1 AS SELECT * FROM mv1 WITH ( + connector = 'blackhole' +); + +statement ok +CREATE SINK sink2 AS SELECT * FROM mv2 WITH ( + connector = 'blackhole' +); + +# Create subscriptions +statement ok +CREATE SUBSCRIPTION sub1 FROM mv1 WITH ( + retention = '1D' +); + +statement ok +CREATE SUBSCRIPTION sub2 FROM mv2 WITH ( + retention = '1D' +); + +# Test table swap +statement ok +ALTER TABLE t1 SWAP WITH t2; + +statement error Permission denied +ALTER TABLE t1 SWAP WITH mv1; + +statement error not found +ALTER TABLE mv1 SWAP WITH mv2; + +query II rowsort +SELECT * FROM t1; +---- +2 (2,"(2,4)") + +query II rowsort +SELECT * FROM t2; +---- +1 (1,"(1,2)") + +# Test materialized view swap +statement ok +ALTER MATERIALIZED VIEW mv1 SWAP WITH mv2; + +# Verify materialized view contents +query II rowsort +SELECT * FROM mv1; +---- +2 2 + +query II rowsort +SELECT * FROM mv2; +---- +1 1 + +# Test view swap +statement ok +ALTER VIEW v1 SWAP WITH v2; + +# Verify view definitions are swapped +query TT +SHOW CREATE VIEW v1; +---- +public.v1 CREATE VIEW v1 AS SELECT t2.v2 FROM t1 AS t2 + +query TT +SHOW CREATE VIEW v2; +---- +public.v2 CREATE VIEW v2 AS SELECT t1.v1 FROM t2 AS t1 + +# Test source swap +statement ok +ALTER SOURCE src1 SWAP WITH src2; + +# Verify source definitions are swapped +query TT +SHOW CREATE SOURCE src1; +---- +public.src1 CREATE SOURCE src1 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '6', fields.v.end = '10', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON + +query TT +SHOW CREATE SOURCE src2; +---- +public.src2 CREATE SOURCE src2 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '1', fields.v.end = '5', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON + +# Test sink swap +statement ok +ALTER SINK sink1 SWAP WITH sink2; + +# Verify sink definitions are swapped +query TT +SHOW CREATE SINK sink1; +---- +public.sink1 CREATE SINK sink1 AS SELECT * FROM mv1 AS mv2 WITH (connector = 'blackhole') + +query TT +SHOW CREATE SINK sink2; +---- +public.sink2 CREATE SINK sink2 AS SELECT * FROM mv2 AS mv1 WITH (connector = 'blackhole') + +# Test subscription swap +statement ok +ALTER SUBSCRIPTION sub1 SWAP WITH sub2; + +# Verify subscription definitions are swapped +query TT +SHOW CREATE SUBSCRIPTION sub1; +---- +public.sub1 CREATE SUBSCRIPTION sub1 FROM mv1 WITH (retention = '1D') + +query TT +SHOW CREATE SUBSCRIPTION sub2; +---- +public.sub2 CREATE SUBSCRIPTION sub2 FROM mv2 WITH (retention = '1D') + +# Clean up +statement ok +DROP SOURCE src1; + +statement ok +DROP SOURCE src2; + +statement ok +DROP TABLE t1 CASCADE; + +statement ok +DROP TABLE t2 CASCADE; diff --git a/e2e_test/s3/fs_sink.py b/e2e_test/s3/file_sink.py similarity index 71% rename from e2e_test/s3/fs_sink.py rename to e2e_test/s3/file_sink.py index 344b1b807d7e4..04042eb7817af 100644 --- a/e2e_test/s3/fs_sink.py +++ b/e2e_test/s3/file_sink.py @@ -10,6 +10,8 @@ from time import sleep from minio import Minio from random import uniform +from time import sleep +import time def gen_data(file_num, item_num_per_file): assert item_num_per_file % 2 == 0, \ @@ -266,9 +268,129 @@ def _assert_eq(field, got, expect): cur.execute(f'drop table test_parquet_sink_table') cur.execute(f'drop sink test_file_sink_json') cur.execute(f'drop table test_json_sink_table') + cur.execute(f'drop table s3_test_parquet') cur.close() conn.close() +def test_file_sink_batching(): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE t (v1 int, v2 int);''') + + print('test file sink batching...\n') + cur.execute(f'''CREATE sink test_file_sink_batching as select + v1, v2 from t WITH ( + connector = 's3', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + s3.path = 'test_file_sink_batching/', + s3.file_type = 'parquet', + type = 'append-only', + rollover_seconds = 5, + max_row_count = 5, + force_append_only='true' + ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') + + cur.execute(f'''CREATE TABLE test_file_sink_batching_table( + v1 int, + v2 int, + ) WITH ( + connector = 's3', + match_pattern = 'test_file_sink_batching/*.parquet', + refresh.interval.sec = 1, + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + ) FORMAT PLAIN ENCODE PARQUET;''') + + cur.execute(f'''ALTER SINK test_file_sink_batching SET PARALLELISM = 2;''') + + cur.execute(f'''INSERT INTO t VALUES (10, 10);''') + + + cur.execute(f'select count(*) from test_file_sink_batching_table') + # no item will be selectedpsq + result = cur.fetchone() + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + def _assert_greater(field, got, expect): + assert got > expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], 0) + print('the rollover_seconds has not reached, count(*) = 0') + + + time.sleep(11) + + cur.execute(f'select count(*) from test_file_sink_batching_table') + result = cur.fetchone() + _assert_eq('count(*)', result[0], 1) + print('the rollover_seconds has reached, count(*) = ', result[0]) + + cur.execute(f''' + INSERT INTO t VALUES (20, 20); + INSERT INTO t VALUES (30, 30); + INSERT INTO t VALUES (40, 40); + INSERT INTO t VALUES (50, 10); + ''') + + cur.execute(f'select count(*) from test_file_sink_batching_table') + # count(*) = 1 + result = cur.fetchone() + _assert_eq('count(*)', result[0], 1) + print('the max row count has not reached, count(*) = ', result[0]) + + cur.execute(f''' + INSERT INTO t VALUES (60, 20); + INSERT INTO t VALUES (70, 30); + INSERT INTO t VALUES (80, 10); + INSERT INTO t VALUES (90, 20); + INSERT INTO t VALUES (100, 30); + INSERT INTO t VALUES (100, 10); + ''') + + time.sleep(10) + + cur.execute(f'select count(*) from test_file_sink_batching_table') + result = cur.fetchone() + _assert_greater('count(*)', result[0], 1) + print('the rollover_seconds has reached, count(*) = ', result[0]) + + cur.execute(f'drop sink test_file_sink_batching;') + cur.execute(f'drop table t;') + cur.execute(f'drop table test_file_sink_batching_table;') + cur.close() + conn.close() + # delete objects + + client = Minio( + "127.0.0.1:9301", + "hummockadmin", + "hummockadmin", + secure=False, + ) + objects = client.list_objects("hummock001", prefix="test_file_sink_batching/", recursive=True) + + for obj in objects: + client.remove_object("hummock001", obj.object_name) + print(f"Deleted: {obj.object_name}") + if __name__ == "__main__": @@ -307,7 +429,9 @@ def _assert_eq(field, got, expect): do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) - # clean up s3 files + # clean up s3 files for idx, _ in enumerate(data): client.remove_object("hummock001", _s3(idx)) + # test file sink batching + test_file_sink_batching() \ No newline at end of file diff --git a/e2e_test/s3/fs_source_batch.py b/e2e_test/s3/file_source.py similarity index 55% rename from e2e_test/s3/fs_source_batch.py rename to e2e_test/s3/file_source.py index fc09b0ef4b516..0f3aaaed64a76 100644 --- a/e2e_test/s3/fs_source_batch.py +++ b/e2e_test/s3/file_source.py @@ -5,7 +5,9 @@ import random import psycopg2 +# from handle_incremental_file import upload_to_s3_bucket, check_for_new_files from time import sleep +import time from io import StringIO from minio import Minio from functools import partial @@ -29,6 +31,7 @@ def format_json(data): for file in data ] + def format_csv(data, with_header): csv_files = [] @@ -42,7 +45,7 @@ def format_csv(data, with_header): csv_files.append(ostream.getvalue()) return csv_files -def do_test(config, file_num, item_num_per_file, prefix, fmt): +def do_test(config, file_num, item_num_per_file, prefix, fmt, need_drop_table=True): conn = psycopg2.connect( host="localhost", port="4566", @@ -53,7 +56,7 @@ def do_test(config, file_num, item_num_per_file, prefix, fmt): # Open a cursor to execute SQL statements cur = conn.cursor() - def _source(): + def _table(): return f's3_test_{fmt}' def _encode(): @@ -62,33 +65,42 @@ def _encode(): else: return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" + def _include_clause(): + if fmt == 'json': + return 'INCLUDE payload as rw_payload' + else: + return '' + # Execute a SELECT statement - cur.execute(f'''CREATE SOURCE {_source()}( + cur.execute(f'''CREATE TABLE {_table()}( id int, name TEXT, sex int, mark int, - ) WITH ( + ) + {_include_clause()} + WITH ( connector = 's3', match_pattern = '{prefix}*.{fmt}', s3.region_name = '{config['S3_REGION']}', s3.bucket_name = '{config['S3_BUCKET']}', s3.credentials.access = '{config['S3_ACCESS_KEY']}', s3.credentials.secret = '{config['S3_SECRET_KEY']}', - s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}', + refresh.interval.sec = 1 ) FORMAT PLAIN ENCODE {_encode()};''') total_rows = file_num * item_num_per_file MAX_RETRIES = 40 for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from {_source()}') + cur.execute(f'select count(*) from {_table()}') result = cur.fetchone() if result[0] == total_rows: break - print(f"[retry {retry_no}] Now got {result[0]} rows in source, {total_rows} expected, wait 30s") + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") sleep(30) - stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_source()}' + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' print(f'Execute {stmt}') cur.execute(stmt) result = cur.fetchone() @@ -103,13 +115,35 @@ def _assert_eq(field, got, expect): _assert_eq('sum(sex)', result[2], total_rows / 2) _assert_eq('sum(mark)', result[3], 0) - print('Test pass') + # only do payload check for json format, which enables INCLUDE CLAUSE + if fmt == 'json': + # check rw_payload + print('Check rw_payload') + stmt = f"select id, name, sex, mark, rw_payload from {_table()} limit 1;" + cur.execute(stmt) + result = cur.fetchone() + print("Got one line with rw_payload: ", result) + payload = result[4] + _assert_eq('id', payload['id'], result[0]) + _assert_eq('name', payload['name'], result[1]) + _assert_eq('sex', payload['sex'], result[2]) + _assert_eq('mark', payload['mark'], result[3]) - cur.execute(f'drop source {_source()}') + print('Test pass') + + if need_drop_table: + cur.execute(f'drop table {_table()}') cur.close() conn.close() -def test_empty_source(config, prefix, fmt): +FORMATTER = { + 'json': format_json, + 'csv_with_header': partial(format_csv, with_header=True), + 'csv_without_header': partial(format_csv, with_header=False), + } + + +def test_batch_read(config, file_num, item_num_per_file, prefix, fmt): conn = psycopg2.connect( host="localhost", port="4566", @@ -121,7 +155,7 @@ def test_empty_source(config, prefix, fmt): cur = conn.cursor() def _source(): - return f's3_test_empty_{fmt}' + return f's3_test_{fmt}' def _encode(): if fmt == 'json': @@ -145,6 +179,16 @@ def _encode(): s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' ) FORMAT PLAIN ENCODE {_encode()};''') + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from {_source()}') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in source, {total_rows} expected, wait 30s") + sleep(30) + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_source()}' print(f'Execute {stmt}') cur.execute(stmt) @@ -155,25 +199,61 @@ def _encode(): def _assert_eq(field, got, expect): assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - _assert_eq('count(*)', result[0], 0) + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + _assert_eq('sum(sex)', result[2], total_rows / 2) + _assert_eq('sum(mark)', result[3], 0) - print('Empty source test pass') + print('Test batch read pass') cur.execute(f'drop source {_source()}') cur.close() conn.close() + +def upload_to_s3_bucket(config, minio_client, run_id, files, start_bias): + _local = lambda idx, start_bias: f"data_{idx + start_bias}.{fmt}" + _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" + for idx, file_str in enumerate(files): + with open(_local(idx, start_bias), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + + minio_client.fput_object( + config["S3_BUCKET"], _s3(idx, start_bias), _local(idx, start_bias) + ) + + +def check_for_new_files(file_num, item_num_per_file, fmt): + conn = psycopg2.connect(host="localhost", port="4566", user="root", database="dev") + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return f"s3_test_{fmt}" + + total_rows = file_num * item_num_per_file + + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f"select count(*) from {_table()}") + result = cur.fetchone() + if result[0] == total_rows: + return True + print( + f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s" + ) + time.sleep(10) + return False + + if __name__ == "__main__": FILE_NUM = 4001 ITEM_NUM_PER_FILE = 2 data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) fmt = sys.argv[1] - FORMATTER = { - 'json': format_json, - 'csv_with_header': partial(format_csv, with_header=True), - 'csv_without_header': partial(format_csv, with_header=False), - } assert fmt in FORMATTER, f"Unsupported format: {fmt}" formatted_files = FORMATTER[fmt](data) @@ -201,10 +281,41 @@ def _assert_eq(field, got, expect): ) # do test + print("Test streaming file source...\n") do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) + print("Test batch read file source...\n") + test_batch_read(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) + + # test file source handle incremental files + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + fmt = "json" + + split_idx = 51 + data_batch1 = data[:split_idx] + data_batch2 = data[split_idx:] + run_id = str(random.randint(1000, 9999)) + print(f"S3 Source New File Test: run ID: {run_id} to buckek") + + formatted_batch1 = FORMATTER[fmt](data_batch1) + upload_to_s3_bucket(config, client, run_id, formatted_batch1, 0) + + # config in do_test that fs source's list interval is 1s + do_test( + config, len(data_batch1), ITEM_NUM_PER_FILE, run_id, fmt, need_drop_table=False + ) + + formatted_batch2 = FORMATTER[fmt](data_batch2) + upload_to_s3_bucket(config, client, run_id, formatted_batch2, split_idx) + + success_flag = check_for_new_files(FILE_NUM, ITEM_NUM_PER_FILE, fmt) + if success_flag: + print("Test(add new file) pass") + else: + print("Test(add new file) fail") + + _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" + # clean up s3 files for idx, _ in enumerate(formatted_files): - client.remove_object(config["S3_BUCKET"], _s3(idx)) - - test_empty_source(config, run_id, fmt) \ No newline at end of file + client.remove_object(config["S3_BUCKET"], _s3(idx, 0)) diff --git a/e2e_test/s3/fs_sink_batch.py b/e2e_test/s3/fs_sink_batch.py deleted file mode 100644 index 125675f61beed..0000000000000 --- a/e2e_test/s3/fs_sink_batch.py +++ /dev/null @@ -1,142 +0,0 @@ -import os -import sys -import random -import psycopg2 -import json -import time -import pyarrow as pa -import pyarrow.parquet as pq -import pandas as pd -from datetime import datetime, timezone -from time import sleep -from minio import Minio -from random import uniform - -def do_test(config, file_num, item_num_per_file, prefix): - conn = psycopg2.connect( - host="localhost", - port="4566", - user="root", - database="dev" - ) - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - - # Execute a SELECT statement - cur.execute(f'''CREATE TABLE t (v1 int, v2 int);''') - - print('create sink') - cur.execute(f'''CREATE sink test_file_sink_batching as select - v1, v2 from t WITH ( - connector = 's3', - s3.region_name = 'custom', - s3.bucket_name = 'hummock001', - s3.credentials.access = 'hummockadmin', - s3.credentials.secret = 'hummockadmin', - s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', - s3.path = 'test_sink/', - s3.file_type = 'parquet', - type = 'append-only', - rollover_seconds = 5, - max_row_count = 5, - force_append_only='true' - ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') - - cur.execute(f'''CREATE TABLE test_sink_table( - v1 int, - v2 int, - ) WITH ( - connector = 's3', - match_pattern = 'test_sink/*.parquet', - refresh.interval.sec = 1, - s3.region_name = 'custom', - s3.bucket_name = 'hummock001', - s3.credentials.access = 'hummockadmin', - s3.credentials.secret = 'hummockadmin', - s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', - ) FORMAT PLAIN ENCODE PARQUET;''') - - cur.execute(f'''ALTER SINK test_file_sink_batching SET PARALLELISM = 2;''') - - cur.execute(f'''INSERT INTO t VALUES (10, 10);''') - - - cur.execute(f'select count(*) from test_sink_table') - # no item will be selectedpsq - result = cur.fetchone() - - def _assert_eq(field, got, expect): - assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - def _assert_greater(field, got, expect): - assert got > expect, f'{field} assertion failed: got {got}, expect {expect}.' - - _assert_eq('count(*)', result[0], 0) - print('the rollover_seconds has not reached, count(*) = 0') - - - time.sleep(11) - - cur.execute(f'select count(*) from test_sink_table') - result = cur.fetchone() - _assert_eq('count(*)', result[0], 1) - print('the rollover_seconds has reached, count(*) = ', result[0]) - - cur.execute(f''' - INSERT INTO t VALUES (20, 20); - INSERT INTO t VALUES (30, 30); - INSERT INTO t VALUES (40, 40); - INSERT INTO t VALUES (50, 10); - ''') - - cur.execute(f'select count(*) from test_sink_table') - # count(*) = 1 - result = cur.fetchone() - _assert_eq('count(*)', result[0], 1) - print('the max row count has not reached, count(*) = ', result[0]) - - cur.execute(f''' - INSERT INTO t VALUES (60, 20); - INSERT INTO t VALUES (70, 30); - INSERT INTO t VALUES (80, 10); - INSERT INTO t VALUES (90, 20); - INSERT INTO t VALUES (100, 30); - INSERT INTO t VALUES (100, 10); - ''') - - time.sleep(10) - - cur.execute(f'select count(*) from test_sink_table') - result = cur.fetchone() - _assert_greater('count(*)', result[0], 1) - print('the rollover_seconds has reached, count(*) = ', result[0]) - - cur.execute(f'drop sink test_file_sink_batching;') - cur.execute(f'drop table t;') - cur.execute(f'drop table test_sink_table;') - cur.close() - conn.close() - -if __name__ == "__main__": - FILE_NUM = 10 - ITEM_NUM_PER_FILE = 2000 - - config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) - client = Minio( - "127.0.0.1:9301", - "hummockadmin", - "hummockadmin", - secure=False, - ) - run_id = str(random.randint(1000, 9999)) - _local = lambda idx: f'data_{idx}.parquet' - _s3 = lambda idx: f"{run_id}_data_{idx}.parquet" - - do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) - - objects = client.list_objects("hummock001", prefix="test_sink/", recursive=True) - - for obj in objects: - client.remove_object("hummock001", obj.object_name) - print(f"Deleted: {obj.object_name}") diff --git a/e2e_test/s3/fs_source_v2.py b/e2e_test/s3/fs_source_v2.py deleted file mode 100644 index 343b921540857..0000000000000 --- a/e2e_test/s3/fs_source_v2.py +++ /dev/null @@ -1,180 +0,0 @@ -import os -import sys -import csv -import json -import random -import psycopg2 - -from time import sleep -from io import StringIO -from minio import Minio -from functools import partial - -def gen_data(file_num, item_num_per_file): - assert item_num_per_file % 2 == 0, \ - f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' - return [ - [{ - 'id': file_id * item_num_per_file + item_id, - 'name': f'{file_id}_{item_id}', - 'sex': item_id % 2, - 'mark': (-1) ** (item_id % 2), - } for item_id in range(item_num_per_file)] - for file_id in range(file_num) - ] - -def format_json(data): - return [ - '\n'.join([json.dumps(item) for item in file]) - for file in data - ] - - -def format_csv(data, with_header): - csv_files = [] - - for file_data in data: - ostream = StringIO() - writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys()) - if with_header: - writer.writeheader() - for item_data in file_data: - writer.writerow(item_data) - csv_files.append(ostream.getvalue()) - return csv_files - -def do_test(config, file_num, item_num_per_file, prefix, fmt, need_drop_table=True): - conn = psycopg2.connect( - host="localhost", - port="4566", - user="root", - database="dev" - ) - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - def _table(): - return f's3_test_{fmt}' - - def _encode(): - if fmt == 'json': - return 'JSON' - else: - return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" - - def _include_clause(): - if fmt == 'json': - return 'INCLUDE payload as rw_payload' - else: - return '' - - # Execute a SELECT statement - cur.execute(f'''CREATE TABLE {_table()}( - id int, - name TEXT, - sex int, - mark int, - ) - {_include_clause()} - WITH ( - connector = 's3', - match_pattern = '{prefix}*.{fmt}', - s3.region_name = '{config['S3_REGION']}', - s3.bucket_name = '{config['S3_BUCKET']}', - s3.credentials.access = '{config['S3_ACCESS_KEY']}', - s3.credentials.secret = '{config['S3_SECRET_KEY']}', - s3.endpoint_url = 'https://{config['S3_ENDPOINT']}', - refresh.interval.sec = 1 - ) FORMAT PLAIN ENCODE {_encode()};''') - - total_rows = file_num * item_num_per_file - MAX_RETRIES = 40 - for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from {_table()}') - result = cur.fetchone() - if result[0] == total_rows: - break - print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") - sleep(30) - - stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' - print(f'Execute {stmt}') - cur.execute(stmt) - result = cur.fetchone() - - print('Got:', result) - - def _assert_eq(field, got, expect): - assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - - _assert_eq('count(*)', result[0], total_rows) - _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) - _assert_eq('sum(sex)', result[2], total_rows / 2) - _assert_eq('sum(mark)', result[3], 0) - - # only do payload check for json format, which enables INCLUDE CLAUSE - if fmt == 'json': - # check rw_payload - print('Check rw_payload') - stmt = f"select id, name, sex, mark, rw_payload from {_table()} limit 1;" - cur.execute(stmt) - result = cur.fetchone() - print("Got one line with rw_payload: ", result) - payload = result[4] - _assert_eq('id', payload['id'], result[0]) - _assert_eq('name', payload['name'], result[1]) - _assert_eq('sex', payload['sex'], result[2]) - _assert_eq('mark', payload['mark'], result[3]) - - print('Test pass') - - if need_drop_table: - cur.execute(f'drop table {_table()}') - cur.close() - conn.close() - -FORMATTER = { - 'json': format_json, - 'csv_with_header': partial(format_csv, with_header=True), - 'csv_without_header': partial(format_csv, with_header=False), - } - -if __name__ == "__main__": - FILE_NUM = 4001 - ITEM_NUM_PER_FILE = 2 - data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) - - fmt = sys.argv[1] - assert fmt in FORMATTER, f"Unsupported format: {fmt}" - formatted_files = FORMATTER[fmt](data) - - config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) - client = Minio( - config["S3_ENDPOINT"], - access_key=config["S3_ACCESS_KEY"], - secret_key=config["S3_SECRET_KEY"], - secure=True, - ) - run_id = str(random.randint(1000, 9999)) - _local = lambda idx: f'data_{idx}.{fmt}' - _s3 = lambda idx: f"{run_id}_data_{idx}.{fmt}" - - # put s3 files - for idx, file_str in enumerate(formatted_files): - with open(_local(idx), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) - - client.fput_object( - config["S3_BUCKET"], - _s3(idx), - _local(idx) - ) - - # do test - do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) - - # clean up s3 files - for idx, _ in enumerate(formatted_files): - client.remove_object(config["S3_BUCKET"], _s3(idx)) diff --git a/e2e_test/s3/fs_source_v2_new_file.py b/e2e_test/s3/fs_source_v2_new_file.py deleted file mode 100644 index c90103e15c127..0000000000000 --- a/e2e_test/s3/fs_source_v2_new_file.py +++ /dev/null @@ -1,90 +0,0 @@ -from fs_source_v2 import gen_data, FORMATTER, do_test -import json -import os -import random -import psycopg2 -import time -from minio import Minio - - -def upload_to_s3_bucket(config, minio_client, run_id, files, start_bias): - _local = lambda idx, start_bias: f"data_{idx + start_bias}.{fmt}" - _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" - for idx, file_str in enumerate(files): - with open(_local(idx, start_bias), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) - - minio_client.fput_object( - config["S3_BUCKET"], _s3(idx, start_bias), _local(idx, start_bias) - ) - - -def check_for_new_files(file_num, item_num_per_file, fmt): - conn = psycopg2.connect(host="localhost", port="4566", user="root", database="dev") - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - def _table(): - return f"s3_test_{fmt}" - - total_rows = file_num * item_num_per_file - - MAX_RETRIES = 40 - for retry_no in range(MAX_RETRIES): - cur.execute(f"select count(*) from {_table()}") - result = cur.fetchone() - if result[0] == total_rows: - return True - print( - f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s" - ) - time.sleep(10) - return False - - -if __name__ == "__main__": - FILE_NUM = 101 - ITEM_NUM_PER_FILE = 2 - data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) - fmt = "json" - - split_idx = 51 - data_batch1 = data[:split_idx] - data_batch2 = data[split_idx:] - - config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) - client = Minio( - config["S3_ENDPOINT"], - access_key=config["S3_ACCESS_KEY"], - secret_key=config["S3_SECRET_KEY"], - secure=True, - ) - run_id = str(random.randint(1000, 9999)) - print(f"S3 Source New File Test: run ID: {run_id} to bucket {config['S3_BUCKET']}") - - formatted_batch1 = FORMATTER[fmt](data_batch1) - upload_to_s3_bucket(config, client, run_id, formatted_batch1, 0) - - # config in do_test that fs source's list interval is 1s - do_test( - config, len(data_batch1), ITEM_NUM_PER_FILE, run_id, fmt, need_drop_table=False - ) - - formatted_batch2 = FORMATTER[fmt](data_batch2) - upload_to_s3_bucket(config, client, run_id, formatted_batch2, split_idx) - - success_flag = check_for_new_files(FILE_NUM, ITEM_NUM_PER_FILE, fmt) - if success_flag: - print("Test(add new file) pass") - else: - print("Test(add new file) fail") - - _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" - # clean up s3 files - for idx, _ in enumerate(data): - client.remove_object(config["S3_BUCKET"], _s3(idx, 0)) - - if success_flag == False: - exit(1) diff --git a/e2e_test/s3/posix_fs_source.py b/e2e_test/s3/posix_fs_source.py deleted file mode 100644 index a7cea46fa496a..0000000000000 --- a/e2e_test/s3/posix_fs_source.py +++ /dev/null @@ -1,136 +0,0 @@ -import os -import sys -import csv -import random -import psycopg2 -import opendal - -from time import sleep -from io import StringIO -from functools import partial - -def gen_data(file_num, item_num_per_file): - assert item_num_per_file % 2 == 0, \ - f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' - return [ - [{ - 'id': file_id * item_num_per_file + item_id, - 'name': f'{file_id}_{item_id}', - 'sex': item_id % 2, - 'mark': (-1) ** (item_id % 2), - } for item_id in range(item_num_per_file)] - for file_id in range(file_num) - ] - -def format_csv(data, with_header): - csv_files = [] - - for file_data in data: - ostream = StringIO() - writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys()) - if with_header: - writer.writeheader() - for item_data in file_data: - writer.writerow(item_data) - csv_files.append(ostream.getvalue()) - return csv_files - - -def do_test(file_num, item_num_per_file, prefix, fmt): - conn = psycopg2.connect( - host="localhost", - port="4566", - user="root", - database="dev" - ) - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - def _table(): - return f'posix_fs_test_{fmt}' - - def _encode(): - return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" - - # Execute a SELECT statement - cur.execute(f'''CREATE TABLE {_table()}( - id int, - name TEXT, - sex int, - mark int, - ) WITH ( - connector = 'posix_fs', - match_pattern = '{prefix}*.{fmt}', - posix_fs.root = '/tmp', - ) FORMAT PLAIN ENCODE {_encode()};''') - - total_rows = file_num * item_num_per_file - MAX_RETRIES = 40 - for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from {_table()}') - result = cur.fetchone() - if result[0] == total_rows: - break - print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") - sleep(30) - - stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' - print(f'Execute {stmt}') - cur.execute(stmt) - result = cur.fetchone() - - print('Got:', result) - - def _assert_eq(field, got, expect): - assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - - _assert_eq('count(*)', result[0], total_rows) - _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) - _assert_eq('sum(sex)', result[2], total_rows / 2) - _assert_eq('sum(mark)', result[3], 0) - - print('Test pass') - - cur.execute(f'drop table {_table()}') - cur.close() - conn.close() - - -if __name__ == "__main__": - FILE_NUM = 4001 - ITEM_NUM_PER_FILE = 2 - data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) - - fmt = sys.argv[1] - FORMATTER = { - 'csv_with_header': partial(format_csv, with_header=True), - 'csv_without_header': partial(format_csv, with_header=False), - } - assert fmt in FORMATTER, f"Unsupported format: {fmt}" - formatted_files = FORMATTER[fmt](data) - - run_id = str(random.randint(1000, 9999)) - _local = lambda idx: f'data_{idx}.{fmt}' - _posix = lambda idx: f"{run_id}_data_{idx}.{fmt}" - # put local files - op = opendal.Operator("fs", root="/tmp") - - print("write file to /tmp") - for idx, file_str in enumerate(formatted_files): - with open(_local(idx), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) - file_name = _posix(idx) - file_bytes = file_str.encode('utf-8') - op.write(file_name, file_bytes) - - # do test - print("do test") - do_test(FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) - - # clean up local files - print("clean up local files in /tmp") - for idx, _ in enumerate(formatted_files): - file_name = _posix(idx) - op.delete(file_name) diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 53148c66836e8..63eebe5e36375 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -232,10 +232,10 @@ SELECT order_id,order_date,customer_name,product_id FROM orders_test order by or 10003 2020-07-30 12:00:30 Edward 106 query IIIIITTTTTTTTT -SELECT c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint; +SELECT c_boolean, c_bit, c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint; ---- --128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 -NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL +t, t, -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 +f, f, NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL statement ok create secret pg_pwd with ( diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 09f69df349e37..872d6e900c911 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -212,6 +212,23 @@ public DbzConnectorConfig( } } + // adapt value of sslmode to the expected value + var sslMode = postgresProps.getProperty("database.sslmode"); + if (sslMode != null) { + switch (sslMode) { + case "disabled": + sslMode = "disable"; + break; + case "preferred": + sslMode = "prefer"; + break; + case "required": + sslMode = "require"; + break; + } + postgresProps.setProperty("database.sslmode", sslMode); + } + dbzProps.putAll(postgresProps); if (isCdcSourceJob) { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index de860593e8105..6467bd6e1d7e7 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -268,6 +268,26 @@ message AlterOwnerResponse { WaitVersion version = 2; } +message AlterSwapRenameRequest { + message ObjectNameSwapPair { + uint32 src_object_id = 1; + uint32 dst_object_id = 2; + } + oneof object { + ObjectNameSwapPair schema = 1; + ObjectNameSwapPair table = 2; + ObjectNameSwapPair view = 3; + ObjectNameSwapPair source = 4; + ObjectNameSwapPair sink = 5; + ObjectNameSwapPair subscription = 6; + } +} + +message AlterSwapRenameResponse { + common.Status status = 1; + WaitVersion version = 2; +} + message CreateFunctionRequest { catalog.Function function = 1; } @@ -513,4 +533,5 @@ service DdlService { rpc Wait(WaitRequest) returns (WaitResponse); rpc CommentOn(CommentOnRequest) returns (CommentOnResponse); rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse); + rpc AlterSwapRename(AlterSwapRenameRequest) returns (AlterSwapRenameResponse); } diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index e9a8eeba70cb3..9970ad50003c1 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -75,6 +75,14 @@ pub fn mysql_datum_to_rw_datum( ) -> Result { match rw_data_type { DataType::Boolean => { + // TinyInt(1) is used to represent boolean in MySQL + // This handles backwards compatibility, + // before https://github.com/risingwavelabs/risingwave/pull/19071 + // we permit boolean and tinyint(1) to be equivalent to boolean in RW. + if let Some(Ok(val)) = mysql_row.get_opt::, _>(mysql_datum_index) { + let _ = mysql_row.take::(mysql_datum_index); + return Ok(val.map(ScalarImpl::from)); + } // Bit(1) match mysql_row.take_opt::>, _>(mysql_datum_index) { None => bail!( diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 456415a62e0d6..35d1033d8ee1b 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -371,7 +371,7 @@ pub struct StarrocksSinkWriter { pk_indices: Vec, is_append_only: bool, client: Option, - txn_client: StarrocksTxnClient, + txn_client: Arc, row_encoder: JsonEncoder, executor_id: u64, curr_txn_label: Option, @@ -432,7 +432,7 @@ impl StarrocksSinkWriter { pk_indices, is_append_only, client: None, - txn_client: StarrocksTxnClient::new(txn_request_builder), + txn_client: Arc::new(StarrocksTxnClient::new(txn_request_builder)), row_encoder: JsonEncoder::new_with_starrocks(schema, None), executor_id, curr_txn_label: None, @@ -527,6 +527,23 @@ impl StarrocksSinkWriter { } } +impl Drop for StarrocksSinkWriter { + fn drop(&mut self) { + if let Some(txn_label) = self.curr_txn_label.take() { + let txn_client = self.txn_client.clone(); + tokio::spawn(async move { + if let Err(e) = txn_client.rollback(txn_label.clone()).await { + tracing::error!( + "starrocks rollback transaction error: {:?}, txn label: {}", + e.as_report(), + txn_label + ); + } + }); + } + } +} + #[async_trait] impl SinkWriter for StarrocksSinkWriter { type CommitMetadata = Option; diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 2d3fbd7e0178a..271d395181df8 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -25,8 +25,9 @@ use risingwave_pb::catalog::{ PbSubscription, PbTable, PbView, }; use risingwave_pb::ddl_service::{ - alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request, - PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion, + alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request, + create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, + WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -197,6 +198,8 @@ pub trait CatalogWriter: Send + Sync { object: alter_set_schema_request::Object, new_schema_id: u32, ) -> Result<()>; + + async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>; } #[derive(Clone)] @@ -498,6 +501,11 @@ impl CatalogWriter for CatalogWriterImpl { Ok(()) } + + async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> { + let version = self.meta_client.alter_swap_rename(object).await?; + self.wait_version(version).await + } } impl CatalogWriterImpl { diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 2a14799ce9ab0..86d0353f29073 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -115,10 +115,14 @@ impl SchemaCatalog { let table_ref = Arc::new(table); let old_table = self.table_by_id.get(&id).unwrap(); - // check if table name get updated. - if old_table.name() != name { + // check if the table name gets updated. + if old_table.name() != name + && let Some(t) = self.table_by_name.get(old_table.name()) + && t.id == id + { self.table_by_name.remove(old_table.name()); } + self.table_by_name.insert(name, table_ref.clone()); self.table_by_id.insert(id, table_ref.clone()); table_ref @@ -137,8 +141,11 @@ impl SchemaCatalog { let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table); let index_ref = Arc::new(index); - // check if index name get updated. - if old_index.name != name { + // check if the index name gets updated. + if old_index.name != name + && let Some(idx) = self.index_by_name.get(&old_index.name) + && idx.id == id + { self.index_by_name.remove(&old_index.name); } self.index_by_name.insert(name, index_ref.clone()); @@ -245,8 +252,11 @@ impl SchemaCatalog { let source_ref = Arc::new(source); let old_source = self.source_by_id.get(&id).unwrap(); - // check if source name get updated. - if old_source.name != name { + // check if the source name gets updated. + if old_source.name != name + && let Some(src) = self.source_by_name.get(&old_source.name) + && src.id == id + { self.source_by_name.remove(&old_source.name); } @@ -294,8 +304,11 @@ impl SchemaCatalog { let sink_ref = Arc::new(sink); let old_sink = self.sink_by_id.get(&id).unwrap(); - // check if sink name get updated. - if old_sink.name != name { + // check if the sink name gets updated. + if old_sink.name != name + && let Some(s) = self.sink_by_name.get(&old_sink.name) + && s.id.sink_id == id + { self.sink_by_name.remove(&old_sink.name); } @@ -331,8 +344,11 @@ impl SchemaCatalog { let subscription_ref = Arc::new(subscription); let old_subscription = self.subscription_by_id.get(&id).unwrap(); - // check if subscription name get updated. - if old_subscription.name != name { + // check if the subscription name gets updated. + if old_subscription.name != name + && let Some(s) = self.subscription_by_name.get(&old_subscription.name) + && s.id.subscription_id == id + { self.subscription_by_name.remove(&old_subscription.name); } @@ -365,8 +381,11 @@ impl SchemaCatalog { let view_ref = Arc::new(view); let old_view = self.view_by_id.get(&id).unwrap(); - // check if view name get updated. - if old_view.name != name { + // check if the view name gets updated. + if old_view.name != name + && let Some(v) = self.view_by_name.get(old_view.name()) + && v.id == id + { self.view_by_name.remove(&old_view.name); } @@ -438,8 +457,11 @@ impl SchemaCatalog { .function_by_name .get_mut(&old_function_by_id.name) .unwrap(); - // check if function name get updated. - if old_function_by_id.name != name { + // check if the function name gets updated. + if old_function_by_id.name != name + && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types) + && f.id == id + { old_function_by_name.remove(&old_function_by_id.arg_types); if old_function_by_name.is_empty() { self.function_by_name.remove(&old_function_by_id.name); @@ -473,8 +495,11 @@ impl SchemaCatalog { let connection_ref = Arc::new(connection); let old_connection = self.connection_by_id.get(&id).unwrap(); - // check if connection name get updated. - if old_connection.name != name { + // check if the connection name gets updated. + if old_connection.name != name + && let Some(conn) = self.connection_by_name.get(&old_connection.name) + && conn.id == id + { self.connection_by_name.remove(&old_connection.name); } @@ -513,8 +538,11 @@ impl SchemaCatalog { let secret_ref = Arc::new(secret); let old_secret = self.secret_by_id.get(&id).unwrap(); - // check if secret name get updated. - if old_secret.name != name { + // check if the secret name gets updated. + if old_secret.name != name + && let Some(s) = self.secret_by_name.get(&old_secret.name) + && s.id == id + { self.secret_by_name.remove(&old_secret.name); } diff --git a/src/frontend/src/handler/alter_swap_rename.rs b/src/frontend/src/handler/alter_swap_rename.rs new file mode 100644 index 0000000000000..a1d23484576f8 --- /dev/null +++ b/src/frontend/src/handler/alter_swap_rename.rs @@ -0,0 +1,179 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use pgwire::pg_response::StatementType; +use risingwave_common::bail_not_implemented; +use risingwave_pb::ddl_service::alter_swap_rename_request; +use risingwave_pb::ddl_service::alter_swap_rename_request::ObjectNameSwapPair; +use risingwave_sqlparser::ast::ObjectName; + +use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::CatalogError; +use crate::error::{ErrorCode, Result}; +use crate::handler::{HandlerArgs, RwPgResponse}; +use crate::session::SessionImpl; +use crate::user::UserId; +use crate::Binder; + +/// Check if the session user has the privilege to swap and rename the objects. +fn check_swap_rename_privilege( + session_impl: &Arc, + src_owner: UserId, + target_owner: UserId, +) -> Result<()> { + if !session_impl.is_super_user() + && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id()) + { + return Err(ErrorCode::PermissionDenied(format!( + "{} is not super user and not the owner of the objects.", + session_impl.user_name() + )) + .into()); + } + Ok(()) +} + +pub async fn handle_swap_rename( + handler_args: HandlerArgs, + source_object: ObjectName, + target_object: ObjectName, + stmt_type: StatementType, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (src_schema_name, src_obj_name) = + Binder::resolve_schema_qualified_name(db_name, source_object)?; + let search_path = session.config().search_path(); + let user_name = &session.auth_context().user_name; + let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name); + let (target_schema_name, target_obj_name) = + Binder::resolve_schema_qualified_name(db_name, target_object)?; + let target_schema_path = + SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name); + + let obj = match stmt_type { + StatementType::ALTER_SCHEMA => { + // TODO: support it until resolves https://github.com/risingwavelabs/risingwave/issues/19028 + bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet"); + } + StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_table, _) = catalog_reader.get_created_table_by_name( + db_name, + src_schema_path, + &src_obj_name, + )?; + let (target_table, _) = catalog_reader.get_created_table_by_name( + db_name, + target_schema_path, + &target_obj_name, + )?; + + if src_table.table_type != target_table.table_type { + return Err(ErrorCode::PermissionDenied(format!( + "cannot swap between {} and {}: type mismatch", + src_obj_name, target_obj_name + )) + .into()); + } + if stmt_type == StatementType::ALTER_TABLE && !src_table.is_table() { + return Err(CatalogError::NotFound("table", src_obj_name.to_string()).into()); + } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() { + return Err( + CatalogError::NotFound("materialized view", src_obj_name.to_string()).into(), + ); + } + + check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?; + + alter_swap_rename_request::Object::Table(ObjectNameSwapPair { + src_object_id: src_table.id.table_id, + dst_object_id: target_table.id.table_id, + }) + } + StatementType::ALTER_VIEW => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_view, _) = + catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?; + let (target_view, _) = + catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?; + check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?; + + alter_swap_rename_request::Object::View(ObjectNameSwapPair { + src_object_id: src_view.id, + dst_object_id: target_view.id, + }) + } + StatementType::ALTER_SOURCE => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_source, _) = + catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?; + let (target_source, _) = + catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?; + check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?; + + alter_swap_rename_request::Object::Source(ObjectNameSwapPair { + src_object_id: src_source.id, + dst_object_id: target_source.id, + }) + } + StatementType::ALTER_SINK => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_sink, _) = + catalog_reader.get_sink_by_name(db_name, src_schema_path, &src_obj_name)?; + let (target_sink, _) = + catalog_reader.get_sink_by_name(db_name, target_schema_path, &target_obj_name)?; + check_swap_rename_privilege( + &session, + src_sink.owner.user_id, + target_sink.owner.user_id, + )?; + + alter_swap_rename_request::Object::Sink(ObjectNameSwapPair { + src_object_id: src_sink.id.sink_id, + dst_object_id: target_sink.id.sink_id, + }) + } + StatementType::ALTER_SUBSCRIPTION => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_subscription, _) = + catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?; + let (target_subscription, _) = catalog_reader.get_subscription_by_name( + db_name, + target_schema_path, + &target_obj_name, + )?; + check_swap_rename_privilege( + &session, + src_subscription.owner.user_id, + target_subscription.owner.user_id, + )?; + + alter_swap_rename_request::Object::Subscription(ObjectNameSwapPair { + src_object_id: src_subscription.id.subscription_id, + dst_object_id: target_subscription.id.subscription_id, + }) + } + _ => { + unreachable!("handle_swap_rename: unsupported statement type") + } + }; + + let catalog_writer = session.catalog_writer()?; + catalog_writer.alter_swap_rename(obj).await?; + + Ok(RwPgResponse::empty_result(stmt_type)) +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index d9a190ca319e7..9cf94a37c65b0 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -46,6 +46,7 @@ mod alter_set_schema; mod alter_source_column; mod alter_source_with_sr; mod alter_streaming_rate_limit; +mod alter_swap_rename; mod alter_system; mod alter_table_column; mod alter_table_with_sr; @@ -637,6 +638,18 @@ pub async fn handle( ) .await } + Statement::AlterSchema { + name, + operation: AlterSchemaOperation::SwapRenameSchema { target_schema }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_schema, + StatementType::ALTER_SCHEMA, + ) + .await + } Statement::AlterTable { name, operation: @@ -720,6 +733,18 @@ pub async fn handle( ) .await } + Statement::AlterTable { + name, + operation: AlterTableOperation::SwapRenameTable { target_table }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_table, + StatementType::ALTER_TABLE, + ) + .await + } Statement::AlterIndex { name, operation: AlterIndexOperation::RenameIndex { index_name }, @@ -837,6 +862,19 @@ pub async fn handle( ) .await } + Statement::AlterView { + materialized, + name, + operation: AlterViewOperation::SwapRenameView { target_view }, + } => { + let statement_type = if materialized { + StatementType::ALTER_MATERIALIZED_VIEW + } else { + StatementType::ALTER_VIEW + }; + alter_swap_rename::handle_swap_rename(handler_args, name, target_view, statement_type) + .await + } Statement::AlterSink { name, operation: AlterSinkOperation::RenameSink { sink_name }, @@ -884,6 +922,18 @@ pub async fn handle( ) .await } + Statement::AlterSink { + name, + operation: AlterSinkOperation::SwapRenameSink { target_sink }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_sink, + StatementType::ALTER_SINK, + ) + .await + } Statement::AlterSubscription { name, operation: AlterSubscriptionOperation::RenameSubscription { subscription_name }, @@ -913,6 +963,21 @@ pub async fn handle( ) .await } + Statement::AlterSubscription { + name, + operation: + AlterSubscriptionOperation::SwapRenameSubscription { + target_subscription, + }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_subscription, + StatementType::ALTER_SUBSCRIPTION, + ) + .await + } Statement::AlterSource { name, operation: AlterSourceOperation::RenameSource { source_name }, @@ -969,6 +1034,18 @@ pub async fn handle( ) .await } + Statement::AlterSource { + name, + operation: AlterSourceOperation::SwapRenameSource { target_source }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_source, + StatementType::ALTER_SOURCE, + ) + .await + } Statement::AlterFunction { name, args, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index ae5201427b563..d94b1dd2652d6 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -44,8 +44,8 @@ use risingwave_pb::catalog::{ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, create_connection_request, DdlProgress, - PbTableJobType, ReplaceTablePlan, TableJobType, + alter_name_request, alter_set_schema_request, alter_swap_rename_request, + create_connection_request, DdlProgress, PbTableJobType, ReplaceTablePlan, TableJobType, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -645,6 +645,10 @@ impl CatalogWriter for MockCatalogWriter { ) -> Result<()> { todo!() } + + async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> { + todo!() + } } impl MockCatalogWriter { diff --git a/src/meta/model/migration/Cargo.toml b/src/meta/model/migration/Cargo.toml index a54753da16fbb..9d9fbd5292d25 100644 --- a/src/meta/model/migration/Cargo.toml +++ b/src/meta/model/migration/Cargo.toml @@ -15,11 +15,9 @@ normal = ["workspace-hack"] [dependencies] async-std = { version = "1", features = ["attributes", "tokio1"] } +easy-ext = "1" sea-orm = { workspace = true } +sea-orm-migration = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" uuid = { version = "1", features = ["v4"] } - -[dependencies.sea-orm-migration] -version = "0.12.14" -features = ["sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-native-tls", "with-uuid"] diff --git a/src/meta/model/migration/README.md b/src/meta/model/migration/README.md index d0253be5c05d4..e354d556ed6de 100644 --- a/src/meta/model/migration/README.md +++ b/src/meta/model/migration/README.md @@ -54,3 +54,9 @@ ## Adding a migration - Add a new column to some catalogs. You can checkout the migration [m20240617_070131_index_column_properties.rs](src/m20240617_070131_index_column_properties.rs) as a reference. + +### Special notes on MySQL backend + +MySQL data types typically have stricter length limits compared to those in PostgreSQL or SQLite. For example, the `VARCHAR`, `TEXT`, `BLOB`, and `BINARY` data types have a maximum length of 65,535 bytes. + +If you need to store more data, e.g., SQL definition, UDF body, protobuf-encoded internal data, etc., please **avoid** using the built-in constructors like `ColumnDef::text` or `ColumnDef::blob`, but use the extensions defined in [`./src/utils.rs`](./src/utils.rs) instead. diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index 0c8c244c0624e..b84a29891eee3 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -24,6 +24,7 @@ mod m20240820_081248_add_time_travel_per_table_epoch; mod m20240911_083152_variable_vnode_count; mod m20241016_065621_hummock_gc_history; mod m20241025_062548_singleton_vnode_count; +mod utils; pub struct Migrator; diff --git a/src/meta/model/migration/src/m20230908_072257_init.rs b/src/meta/model/migration/src/m20230908_072257_init.rs index 7840c40cd8f30..38fba91011569 100644 --- a/src/meta/model/migration/src/m20230908_072257_init.rs +++ b/src/meta/model/migration/src/m20230908_072257_init.rs @@ -1,6 +1,7 @@ use sea_orm_migration::prelude::{Index as MigrationIndex, Table as MigrationTable, *}; use crate::sea_orm::{DatabaseBackend, DbBackend, Statement}; +use crate::utils::ColumnDefExt; use crate::{assert_not_has_tables, drop_tables}; #[derive(DeriveMigrationName)] @@ -140,7 +141,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(User::CanCreateDb).boolean().not_null()) .col(ColumnDef::new(User::CanCreateUser).boolean().not_null()) .col(ColumnDef::new(User::CanLogin).boolean().not_null()) - .col(ColumnDef::new(User::AuthInfo).binary()) + .col(ColumnDef::new(User::AuthInfo).rw_binary(manager)) .to_owned(), ) .await?; @@ -378,10 +379,14 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(Fragment::StreamNode) - .blob(BlobSize::Long) + .rw_binary(manager) + .not_null(), + ) + .col( + ColumnDef::new(Fragment::VnodeMapping) + .rw_binary(manager) .not_null(), ) - .col(ColumnDef::new(Fragment::VnodeMapping).binary().not_null()) .col(ColumnDef::new(Fragment::StateTableIds).json_binary()) .col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary()) .foreign_key( @@ -407,12 +412,16 @@ impl MigrationTrait for Migration { ) .col(ColumnDef::new(Actor::FragmentId).integer().not_null()) .col(ColumnDef::new(Actor::Status).string().not_null()) - .col(ColumnDef::new(Actor::Splits).binary()) + .col(ColumnDef::new(Actor::Splits).rw_binary(manager)) .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null()) .col(ColumnDef::new(Actor::WorkerId).integer().not_null()) .col(ColumnDef::new(Actor::UpstreamActorIds).json_binary()) - .col(ColumnDef::new(Actor::VnodeBitmap).binary()) - .col(ColumnDef::new(Actor::ExprContext).binary().not_null()) + .col(ColumnDef::new(Actor::VnodeBitmap).rw_binary(manager)) + .col( + ColumnDef::new(Actor::ExprContext) + .rw_binary(manager) + .not_null(), + ) .foreign_key( &mut ForeignKey::create() .name("FK_actor_fragment_id") @@ -454,7 +463,7 @@ impl MigrationTrait for Migration { .json_binary() .not_null(), ) - .col(ColumnDef::new(ActorDispatcher::HashMapping).binary()) + .col(ColumnDef::new(ActorDispatcher::HashMapping).rw_binary(manager)) .col( ColumnDef::new(ActorDispatcher::DispatcherId) .integer() @@ -495,7 +504,11 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Connection::Name).string().not_null()) - .col(ColumnDef::new(Connection::Info).binary().not_null()) + .col( + ColumnDef::new(Connection::Info) + .rw_binary(manager) + .not_null(), + ) .foreign_key( &mut ForeignKey::create() .name("FK_connection_object_id") @@ -514,16 +527,28 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Source::SourceId).integer().primary_key()) .col(ColumnDef::new(Source::Name).string().not_null()) .col(ColumnDef::new(Source::RowIdIndex).integer()) - .col(ColumnDef::new(Source::Columns).binary().not_null()) + .col( + ColumnDef::new(Source::Columns) + .rw_binary(manager) + .not_null(), + ) .col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null()) .col( ColumnDef::new(Source::WithProperties) .json_binary() .not_null(), ) - .col(ColumnDef::new(Source::Definition).text().not_null()) - .col(ColumnDef::new(Source::SourceInfo).binary()) - .col(ColumnDef::new(Source::WatermarkDescs).binary().not_null()) + .col( + ColumnDef::new(Source::Definition) + .rw_long_text(manager) + .not_null(), + ) + .col(ColumnDef::new(Source::SourceInfo).rw_binary(manager)) + .col( + ColumnDef::new(Source::WatermarkDescs) + .rw_binary(manager) + .not_null(), + ) .col(ColumnDef::new(Source::OptionalAssociatedTableId).integer()) .col(ColumnDef::new(Source::ConnectionId).integer()) .col(ColumnDef::new(Source::Version).big_integer().not_null()) @@ -562,8 +587,8 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer()) .col(ColumnDef::new(Table::TableType).string().not_null()) .col(ColumnDef::new(Table::BelongsToJobId).integer()) - .col(ColumnDef::new(Table::Columns).binary().not_null()) - .col(ColumnDef::new(Table::Pk).binary().not_null()) + .col(ColumnDef::new(Table::Columns).rw_binary(manager).not_null()) + .col(ColumnDef::new(Table::Pk).rw_binary(manager).not_null()) .col( ColumnDef::new(Table::DistributionKey) .json_binary() @@ -575,7 +600,11 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::VnodeColIndex).integer()) .col(ColumnDef::new(Table::RowIdIndex).integer()) .col(ColumnDef::new(Table::ValueIndices).json_binary().not_null()) - .col(ColumnDef::new(Table::Definition).text().not_null()) + .col( + ColumnDef::new(Table::Definition) + .rw_long_text(manager) + .not_null(), + ) .col( ColumnDef::new(Table::HandlePkConflictBehavior) .string() @@ -593,14 +622,14 @@ impl MigrationTrait for Migration { ) .col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null()) .col(ColumnDef::new(Table::DmlFragmentId).integer()) - .col(ColumnDef::new(Table::Cardinality).binary()) + .col(ColumnDef::new(Table::Cardinality).rw_binary(manager)) .col( ColumnDef::new(Table::CleanedByWatermark) .boolean() .not_null(), ) .col(ColumnDef::new(Table::Description).string()) - .col(ColumnDef::new(Table::Version).binary()) + .col(ColumnDef::new(Table::Version).rw_binary(manager)) .col(ColumnDef::new(Table::RetentionSeconds).integer()) .col( ColumnDef::new(Table::IncomingSinks) @@ -655,8 +684,8 @@ impl MigrationTrait for Migration { .table(Sink::Table) .col(ColumnDef::new(Sink::SinkId).integer().primary_key()) .col(ColumnDef::new(Sink::Name).string().not_null()) - .col(ColumnDef::new(Sink::Columns).binary().not_null()) - .col(ColumnDef::new(Sink::PlanPk).binary().not_null()) + .col(ColumnDef::new(Sink::Columns).rw_binary(manager).not_null()) + .col(ColumnDef::new(Sink::PlanPk).rw_binary(manager).not_null()) .col( ColumnDef::new(Sink::DistributionKey) .json_binary() @@ -665,11 +694,15 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null()) .col(ColumnDef::new(Sink::SinkType).string().not_null()) .col(ColumnDef::new(Sink::Properties).json_binary().not_null()) - .col(ColumnDef::new(Sink::Definition).text().not_null()) + .col( + ColumnDef::new(Sink::Definition) + .rw_long_text(manager) + .not_null(), + ) .col(ColumnDef::new(Sink::ConnectionId).integer()) .col(ColumnDef::new(Sink::DbName).string().not_null()) .col(ColumnDef::new(Sink::SinkFromName).string().not_null()) - .col(ColumnDef::new(Sink::SinkFormatDesc).binary()) + .col(ColumnDef::new(Sink::SinkFormatDesc).rw_binary(manager)) .col(ColumnDef::new(Sink::TargetTable).integer()) .foreign_key( &mut ForeignKey::create() @@ -703,8 +736,12 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(View::ViewId).integer().primary_key()) .col(ColumnDef::new(View::Name).string().not_null()) .col(ColumnDef::new(View::Properties).json_binary().not_null()) - .col(ColumnDef::new(View::Definition).text().not_null()) - .col(ColumnDef::new(View::Columns).binary().not_null()) + .col( + ColumnDef::new(View::Definition) + .rw_long_text(manager) + .not_null(), + ) + .col(ColumnDef::new(View::Columns).rw_binary(manager).not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_view_object_id") @@ -724,7 +761,11 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Index::Name).string().not_null()) .col(ColumnDef::new(Index::IndexTableId).integer().not_null()) .col(ColumnDef::new(Index::PrimaryTableId).integer().not_null()) - .col(ColumnDef::new(Index::IndexItems).binary().not_null()) + .col( + ColumnDef::new(Index::IndexItems) + .rw_binary(manager) + .not_null(), + ) .col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null()) .foreign_key( &mut ForeignKey::create() @@ -760,13 +801,22 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Function::FunctionId).integer().primary_key()) .col(ColumnDef::new(Function::Name).string().not_null()) .col(ColumnDef::new(Function::ArgNames).string().not_null()) - .col(ColumnDef::new(Function::ArgTypes).binary().not_null()) - .col(ColumnDef::new(Function::ReturnType).binary().not_null()) + .col( + ColumnDef::new(Function::ArgTypes) + .rw_binary(manager) + .not_null(), + ) + .col( + ColumnDef::new(Function::ReturnType) + .rw_binary(manager) + .not_null(), + ) .col(ColumnDef::new(Function::Language).string().not_null()) .col(ColumnDef::new(Function::Link).string()) .col(ColumnDef::new(Function::Identifier).string()) - .col(ColumnDef::new(Function::Body).string()) - .col(ColumnDef::new(Function::CompressedBinary).string()) + .col(ColumnDef::new(Function::Body).rw_long_text(manager)) + // XXX: should this be binary type instead? + .col(ColumnDef::new(Function::CompressedBinary).rw_long_text(manager)) .col(ColumnDef::new(Function::Kind).string().not_null()) .col( ColumnDef::new(Function::AlwaysRetryOnNetworkError) diff --git a/src/meta/model/migration/src/m20231008_020431_hummock.rs b/src/meta/model/migration/src/m20231008_020431_hummock.rs index 42fe0c424f99c..ea499d67182e3 100644 --- a/src/meta/model/migration/src/m20231008_020431_hummock.rs +++ b/src/meta/model/migration/src/m20231008_020431_hummock.rs @@ -1,5 +1,6 @@ use sea_orm_migration::prelude::*; +use crate::utils::ColumnDefExt; use crate::{assert_not_has_tables, drop_tables}; #[derive(DeriveMigrationName)] @@ -32,7 +33,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(CompactionTask::Task) - .blob(BlobSize::Long) + .rw_binary(manager) .not_null(), ) .col( @@ -54,7 +55,7 @@ impl MigrationTrait for Migration { .not_null() .primary_key(), ) - .col(ColumnDef::new(CompactionConfig::Config).blob(BlobSize::Long)) + .col(ColumnDef::new(CompactionConfig::Config).rw_binary(manager)) .to_owned(), ) .await?; @@ -69,7 +70,7 @@ impl MigrationTrait for Migration { .not_null() .primary_key(), ) - .col(ColumnDef::new(CompactionStatus::Status).blob(BlobSize::Long)) + .col(ColumnDef::new(CompactionStatus::Status).rw_binary(manager)) .to_owned(), ) .await?; @@ -142,7 +143,7 @@ impl MigrationTrait for Migration { .boolean() .not_null(), ) - .col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).blob(BlobSize::Long)) + .col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).rw_binary(manager)) .to_owned(), ) .await?; diff --git a/src/meta/model/migration/src/m20240304_074901_subscription.rs b/src/meta/model/migration/src/m20240304_074901_subscription.rs index 244c6abe11604..f759d303b25a9 100644 --- a/src/meta/model/migration/src/m20240304_074901_subscription.rs +++ b/src/meta/model/migration/src/m20240304_074901_subscription.rs @@ -1,5 +1,6 @@ use sea_orm_migration::prelude::{Table as MigrationTable, *}; +use crate::utils::ColumnDefExt; use crate::{assert_not_has_tables, drop_tables}; #[derive(DeriveMigrationName)] @@ -19,8 +20,16 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Subscription::Name).string().not_null()) - .col(ColumnDef::new(Subscription::Columns).binary().not_null()) - .col(ColumnDef::new(Subscription::PlanPk).binary().not_null()) + .col( + ColumnDef::new(Subscription::Columns) + .rw_binary(manager) + .not_null(), + ) + .col( + ColumnDef::new(Subscription::PlanPk) + .rw_binary(manager) + .not_null(), + ) .col( ColumnDef::new(Subscription::DistributionKey) .json_binary() @@ -31,7 +40,11 @@ impl MigrationTrait for Migration { .json_binary() .not_null(), ) - .col(ColumnDef::new(Subscription::Definition).string().not_null()) + .col( + ColumnDef::new(Subscription::Definition) + .rw_long_text(manager) + .not_null(), + ) .col( ColumnDef::new(Subscription::SubscriptionFromName) .string() diff --git a/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs b/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs index 715be0e7cd23f..e82f7aa59d1ab 100644 --- a/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs +++ b/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs @@ -1,6 +1,7 @@ use sea_orm_migration::prelude::{Table as MigrationTable, *}; use crate::drop_tables; +use crate::utils::ColumnDefExt; #[derive(DeriveMigrationName)] pub struct Migration; @@ -20,7 +21,11 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Subscription::Name).string().not_null()) - .col(ColumnDef::new(Subscription::Definition).string().not_null()) + .col( + ColumnDef::new(Subscription::Definition) + .rw_long_text(manager) + .not_null(), + ) .col( ColumnDef::new(Subscription::RetentionSeconds) .string() @@ -66,9 +71,21 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Subscription::Name).string().not_null()) - .col(ColumnDef::new(Subscription::Definition).string().not_null()) - .col(ColumnDef::new(Subscription::Columns).binary().not_null()) - .col(ColumnDef::new(Subscription::PlanPk).binary().not_null()) + .col( + ColumnDef::new(Subscription::Definition) + .rw_long_text(manager) + .not_null(), + ) + .col( + ColumnDef::new(Subscription::Columns) + .rw_binary(manager) + .not_null(), + ) + .col( + ColumnDef::new(Subscription::PlanPk) + .rw_binary(manager) + .not_null(), + ) .col( ColumnDef::new(Subscription::DistributionKey) .json_binary() diff --git a/src/meta/model/migration/src/m20240525_090457_secret.rs b/src/meta/model/migration/src/m20240525_090457_secret.rs index ed23085c66574..734c0a6d1fd61 100644 --- a/src/meta/model/migration/src/m20240525_090457_secret.rs +++ b/src/meta/model/migration/src/m20240525_090457_secret.rs @@ -1,5 +1,6 @@ use sea_orm_migration::prelude::{Table as MigrationTable, *}; +use crate::utils::ColumnDefExt; use crate::{assert_not_has_tables, drop_tables}; #[derive(DeriveMigrationName)] @@ -21,7 +22,7 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Secret::Name).string().not_null()) - .col(ColumnDef::new(Secret::Value).binary().not_null()) + .col(ColumnDef::new(Secret::Value).rw_binary(manager).not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_secret_object_id") @@ -42,7 +43,7 @@ impl MigrationTrait for Migration { .alter_table( MigrationTable::alter() .table(Sink::Table) - .add_column(ColumnDef::new(Sink::SecretRef).binary()) + .add_column(ColumnDef::new(Sink::SecretRef).rw_binary(manager)) .to_owned(), ) .await?; @@ -52,7 +53,7 @@ impl MigrationTrait for Migration { .alter_table( MigrationTable::alter() .table(Source::Table) - .add_column(ColumnDef::new(Source::SecretRef).binary()) + .add_column(ColumnDef::new(Source::SecretRef).rw_binary(manager)) .to_owned(), ) .await?; diff --git a/src/meta/model/migration/src/m20240617_070131_index_column_properties.rs b/src/meta/model/migration/src/m20240617_070131_index_column_properties.rs index daff755da999f..f8f113e7974db 100644 --- a/src/meta/model/migration/src/m20240617_070131_index_column_properties.rs +++ b/src/meta/model/migration/src/m20240617_070131_index_column_properties.rs @@ -1,5 +1,7 @@ use sea_orm_migration::prelude::*; +use crate::utils::ColumnDefExt; + #[derive(DeriveMigrationName)] pub struct Migration; @@ -10,7 +12,7 @@ impl MigrationTrait for Migration { .alter_table( Table::alter() .table(Index::Table) - .add_column(ColumnDef::new(Index::IndexColumnProperties).binary()) + .add_column(ColumnDef::new(Index::IndexColumnProperties).rw_binary(manager)) .to_owned(), ) .await diff --git a/src/meta/model/migration/src/m20240617_071625_sink_into_table_column.rs b/src/meta/model/migration/src/m20240617_071625_sink_into_table_column.rs index 4904dd3623245..c4e01c3bcd5be 100644 --- a/src/meta/model/migration/src/m20240617_071625_sink_into_table_column.rs +++ b/src/meta/model/migration/src/m20240617_071625_sink_into_table_column.rs @@ -1,5 +1,6 @@ use sea_orm_migration::prelude::{Table as MigrationTable, *}; +use crate::utils::ColumnDefExt; use crate::SubQueryStatement::SelectStatement; #[derive(DeriveMigrationName)] @@ -12,7 +13,7 @@ impl MigrationTrait for Migration { .alter_table( MigrationTable::alter() .table(Sink::Table) - .add_column(ColumnDef::new(Sink::OriginalTargetColumns).binary()) + .add_column(ColumnDef::new(Sink::OriginalTargetColumns).rw_binary(manager)) .to_owned(), ) .await?; diff --git a/src/meta/model/migration/src/m20240618_072634_function_compressed_binary.rs b/src/meta/model/migration/src/m20240618_072634_function_compressed_binary.rs index 6b4ef6157bcf6..92d27da1e1385 100644 --- a/src/meta/model/migration/src/m20240618_072634_function_compressed_binary.rs +++ b/src/meta/model/migration/src/m20240618_072634_function_compressed_binary.rs @@ -1,6 +1,7 @@ use sea_orm_migration::prelude::*; use crate::sea_orm::{DatabaseBackend, DbBackend, Statement}; +use crate::utils::ColumnDefExt; #[derive(DeriveMigrationName)] pub struct Migration; @@ -19,7 +20,7 @@ impl MigrationTrait for Migration { Table::alter() .table(Function::Table) .modify_column( - ColumnDef::new(Function::CompressedBinary).blob(BlobSize::Medium), + ColumnDef::new(Function::CompressedBinary).rw_binary(manager), ) .to_owned(), ) diff --git a/src/meta/model/migration/src/m20240630_131430_remove_parallel_unit.rs b/src/meta/model/migration/src/m20240630_131430_remove_parallel_unit.rs index 5acc8dc1a6165..dabcb8d3daa79 100644 --- a/src/meta/model/migration/src/m20240630_131430_remove_parallel_unit.rs +++ b/src/meta/model/migration/src/m20240630_131430_remove_parallel_unit.rs @@ -2,6 +2,7 @@ use sea_orm_migration::prelude::*; use serde::{Deserialize, Serialize}; use crate::sea_orm::{FromJsonQueryResult, FromQueryResult, Statement}; +use crate::utils::ColumnDefExt; #[derive(DeriveMigrationName)] pub struct Migration; @@ -75,7 +76,11 @@ impl MigrationTrait for Migration { .alter_table( Table::alter() .table(Fragment::Table) - .add_column(ColumnDef::new(Fragment::VnodeMapping).binary().not_null()) + .add_column( + ColumnDef::new(Fragment::VnodeMapping) + .rw_binary(manager) + .not_null(), + ) .to_owned(), ) .await?; diff --git a/src/meta/model/migration/src/m20240701_060504_hummock_time_travel.rs b/src/meta/model/migration/src/m20240701_060504_hummock_time_travel.rs index 7dec44913dc8f..619537078e28e 100644 --- a/src/meta/model/migration/src/m20240701_060504_hummock_time_travel.rs +++ b/src/meta/model/migration/src/m20240701_060504_hummock_time_travel.rs @@ -1,6 +1,7 @@ use sea_orm_migration::prelude::*; use crate::drop_tables; +use crate::utils::ColumnDefExt; #[derive(DeriveMigrationName)] pub struct Migration; @@ -26,7 +27,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(HummockSstableInfo::SstableInfo) - .blob(BlobSize::Long) + .rw_binary(manager) .null(), ) .to_owned(), @@ -46,7 +47,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(HummockTimeTravelVersion::Version) - .blob(BlobSize::Long) + .rw_binary(manager) .null(), ) .to_owned(), @@ -66,7 +67,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(HummockTimeTravelDelta::VersionDelta) - .blob(BlobSize::Long) + .rw_binary(manager) .null(), ) .to_owned(), diff --git a/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs b/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs index bd2b12bd0724c..be5b1fef90fdf 100644 --- a/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs +++ b/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs @@ -6,22 +6,21 @@ pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Hack for unifying the test for different backends. + let json_is_empty_array = |col| { + Expr::col(col) + .cast_as(Alias::new("char(2)")) + .eq(Expr::value("[]")) + }; + // Fill vnode count with 1 for singleton tables. manager .exec_stmt( UpdateStatement::new() .table(Table::Table) .values([(Table::VnodeCount, Expr::value(1))]) - .and_where( - Expr::col(Table::DistributionKey) - .cast_as(Alias::new("varchar")) - .eq(Expr::value("[]")), - ) - .and_where( - Expr::col(Table::DistKeyInPk) - .cast_as(Alias::new("varchar")) - .eq(Expr::value("[]")), - ) + .and_where(json_is_empty_array(Table::DistributionKey)) + .and_where(json_is_empty_array(Table::DistKeyInPk)) .and_where(Expr::col(Table::VnodeColIndex).is_null()) .to_owned(), ) diff --git a/src/meta/model/migration/src/utils.rs b/src/meta/model/migration/src/utils.rs new file mode 100644 index 0000000000000..7d3eb1fbb3ae2 --- /dev/null +++ b/src/meta/model/migration/src/utils.rs @@ -0,0 +1,29 @@ +use sea_orm::DatabaseBackend; +use sea_orm_migration::prelude::*; + +#[easy_ext::ext(ColumnDefExt)] +impl ColumnDef { + /// Set column type as `longblob` for MySQL, `bytea` for Postgres, and `blob` for Sqlite. + /// + /// Should be preferred over [`binary`](ColumnDef::binary) or [`blob`](ColumnDef::blob) for large binary fields, + /// typically the fields wrapping protobuf or other serialized data. Otherwise, MySQL will return an error + /// when the length exceeds 65535 bytes. + pub fn rw_binary(&mut self, manager: &SchemaManager) -> &mut Self { + match manager.get_database_backend() { + DatabaseBackend::MySql => self.custom(extension::mysql::MySqlType::LongBlob), + DatabaseBackend::Postgres | DatabaseBackend::Sqlite => self.blob(), + } + } + + /// Set column type as `longtext` for MySQL, and `text` for Postgres and Sqlite. + /// + /// Should be preferred over [`text`](ColumnDef::text) or [`string`](ColumnDef::string) for large text fields, + /// typically user-specified contents like UDF body or SQL definition. Otherwise, MySQL will return an error + /// when the length exceeds 65535 bytes. + pub fn rw_long_text(&mut self, manager: &SchemaManager) -> &mut Self { + match manager.get_database_backend() { + DatabaseBackend::MySql => self.custom(Alias::new("longtext")), + DatabaseBackend::Postgres | DatabaseBackend::Sqlite => self.text(), + } + } +} diff --git a/src/meta/model/src/actor.rs b/src/meta/model/src/actor.rs index cbbbca543679a..2278d7e9fd373 100644 --- a/src/meta/model/src/actor.rs +++ b/src/meta/model/src/actor.rs @@ -21,7 +21,7 @@ use crate::{ }; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum ActorStatus { #[sea_orm(string_value = "INACTIVE")] Inactive, diff --git a/src/meta/model/src/actor_dispatcher.rs b/src/meta/model/src/actor_dispatcher.rs index 7d40af6967d35..1ef2e251dd750 100644 --- a/src/meta/model/src/actor_dispatcher.rs +++ b/src/meta/model/src/actor_dispatcher.rs @@ -23,7 +23,7 @@ use crate::{ActorId, ActorMapping, FragmentId, I32Array}; #[derive( Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, )] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum DispatcherType { #[sea_orm(string_value = "HASH")] Hash, diff --git a/src/meta/model/src/catalog_version.rs b/src/meta/model/src/catalog_version.rs index cfcd90c12f6a0..3aa797800561e 100644 --- a/src/meta/model/src/catalog_version.rs +++ b/src/meta/model/src/catalog_version.rs @@ -16,7 +16,7 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum VersionCategory { #[sea_orm(string_value = "NOTIFICATION")] Notification, diff --git a/src/meta/model/src/fragment.rs b/src/meta/model/src/fragment.rs index 9a365f37377db..45f59ed3c0aec 100644 --- a/src/meta/model/src/fragment.rs +++ b/src/meta/model/src/fragment.rs @@ -33,7 +33,7 @@ pub struct Model { } #[derive(Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum DistributionType { #[sea_orm(string_value = "SINGLE")] Single, diff --git a/src/meta/model/src/function.rs b/src/meta/model/src/function.rs index 5589c1daa023f..0fea52c6c3488 100644 --- a/src/meta/model/src/function.rs +++ b/src/meta/model/src/function.rs @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize}; use crate::{DataType, DataTypeArray, FunctionId}; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum FunctionKind { #[sea_orm(string_value = "Scalar")] Scalar, diff --git a/src/meta/model/src/lib.rs b/src/meta/model/src/lib.rs index 0b99b48914b2a..6610484a89185 100644 --- a/src/meta/model/src/lib.rs +++ b/src/meta/model/src/lib.rs @@ -93,7 +93,7 @@ pub type FragmentId = i32; pub type ActorId = i32; #[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum JobStatus { #[sea_orm(string_value = "INITIAL")] Initial, @@ -125,7 +125,7 @@ impl From for PbStreamJobState { } #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum CreateType { #[sea_orm(string_value = "BACKGROUND")] Background, diff --git a/src/meta/model/src/object.rs b/src/meta/model/src/object.rs index 4ee0e5a4f4c49..a9a6fbe524881 100644 --- a/src/meta/model/src/object.rs +++ b/src/meta/model/src/object.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; use crate::{DatabaseId, ObjectId, SchemaId, UserId}; #[derive(Clone, Debug, PartialEq, Eq, Copy, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum ObjectType { #[sea_orm(string_value = "DATABASE")] Database, diff --git a/src/meta/model/src/secret.rs b/src/meta/model/src/secret.rs index 0d122267bb4bc..bea3f87f56bc9 100644 --- a/src/meta/model/src/secret.rs +++ b/src/meta/model/src/secret.rs @@ -23,7 +23,6 @@ pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub secret_id: i32, pub name: String, - #[sea_orm(column_type = "Binary(BlobSize::Blob(None))")] pub value: Vec, } diff --git a/src/meta/model/src/sink.rs b/src/meta/model/src/sink.rs index c5a72fbb748c8..39b77fcc6381b 100644 --- a/src/meta/model/src/sink.rs +++ b/src/meta/model/src/sink.rs @@ -23,7 +23,7 @@ use crate::{ }; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum SinkType { #[sea_orm(string_value = "APPEND_ONLY")] AppendOnly, diff --git a/src/meta/model/src/table.rs b/src/meta/model/src/table.rs index 1d950f277396e..b8ba38d438b7c 100644 --- a/src/meta/model/src/table.rs +++ b/src/meta/model/src/table.rs @@ -29,7 +29,7 @@ use crate::{ #[derive( Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, )] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum TableType { #[sea_orm(string_value = "TABLE")] Table, @@ -65,7 +65,7 @@ impl From for TableType { } #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum HandleConflictBehavior { #[sea_orm(string_value = "OVERWRITE")] Overwrite, diff --git a/src/meta/model/src/user_privilege.rs b/src/meta/model/src/user_privilege.rs index 48f9a38a5504c..bebf8485f4643 100644 --- a/src/meta/model/src/user_privilege.rs +++ b/src/meta/model/src/user_privilege.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; use crate::{ObjectId, PrivilegeId, UserId}; #[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum Action { #[sea_orm(string_value = "INSERT")] Insert, diff --git a/src/meta/model/src/worker.rs b/src/meta/model/src/worker.rs index ee2ab9b22d3c8..82b13184eabe6 100644 --- a/src/meta/model/src/worker.rs +++ b/src/meta/model/src/worker.rs @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize}; use crate::{TransactionId, WorkerId}; #[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum WorkerType { #[sea_orm(string_value = "FRONTEND")] Frontend, @@ -61,7 +61,7 @@ impl From for PbWorkerType { } #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "String(None)")] +#[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum WorkerStatus { #[sea_orm(string_value = "STARTING")] Starting, diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 71b45b1887eff..1d0cc36fff3f2 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -1013,6 +1013,23 @@ impl DdlService for DdlServiceImpl { Ok(Response::new(AutoSchemaChangeResponse {})) } + + async fn alter_swap_rename( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let version = self + .ddl_controller + .run_command(DdlCommand::AlterSwapRename(req.object.unwrap())) + .await?; + + Ok(Response::new(AlterSwapRenameResponse { + status: None, + version, + })) + } } fn add_auto_schema_change_fail_event_log( diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 10e6338bc2136..6a1cddc0ef748 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1140,7 +1140,10 @@ impl GlobalBarrierWorker { .instrument(span) .await; } else { - panic!("failed to execute barrier: {}", err.as_report()); + panic!( + "a streaming error occurred while recovery is disabled, aborting: {:?}", + err.as_report() + ); } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index bcff662f94f08..1f7324d97493b 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -63,8 +63,10 @@ use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::info; -use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs}; -use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs}; +use super::utils::{ + check_subscription_name_duplicate, get_fragment_ids_by_jobs, rename_relation, + rename_relation_refer, +}; use crate::controller::utils::{ build_relation_group_for_delete, check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, @@ -2483,139 +2485,104 @@ impl CatalogController { ) .await?; - let mut to_update_relations = vec![]; // rename relation. - macro_rules! rename_relation { - ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ - let (mut relation, obj) = $entity::find_by_id($object_id) - .find_also_related(Object) - .one(&txn) - .await? - .unwrap(); - let obj = obj.unwrap(); - let old_name = relation.name.clone(); - relation.name = object_name.into(); - if obj.obj_type != ObjectType::View { - relation.definition = alter_relation_rename(&relation.definition, object_name); - } - let active_model = $table::ActiveModel { - $identity: Set(relation.$identity), - name: Set(object_name.into()), - definition: Set(relation.definition.clone()), - ..Default::default() - }; - active_model.update(&txn).await?; - to_update_relations.push(PbRelation { - relation_info: Some(PbRelationInfo::$entity(ObjectModel(relation, obj).into())), - }); - old_name - }}; - } + let (mut to_update_relations, old_name) = + rename_relation(&txn, object_type, object_id, object_name).await?; + // rename referring relation name. + to_update_relations.extend( + rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?, + ); - // TODO: check is there any thing to change for shared source? - let old_name = match object_type { - ObjectType::Table => rename_relation!(Table, table, table_id, object_id), - ObjectType::Source => rename_relation!(Source, source, source_id, object_id), - ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id), - ObjectType::Subscription => { - rename_relation!(Subscription, subscription, subscription_id, object_id) - } - ObjectType::View => rename_relation!(View, view, view_id, object_id), - ObjectType::Index => { - let (mut index, obj) = Index::find_by_id(object_id) - .find_also_related(Object) - .one(&txn) - .await? - .unwrap(); - index.name = object_name.into(); - let index_table_id = index.index_table_id; - let old_name = rename_relation!(Table, table, table_id, index_table_id); - - // the name of index and its associated table is the same. - let active_model = index::ActiveModel { - index_id: Set(index.index_id), - name: Set(object_name.into()), - ..Default::default() - }; - active_model.update(&txn).await?; - to_update_relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Index( - ObjectModel(index, obj.unwrap()).into(), - )), - }); - old_name - } - _ => unreachable!("only relation name can be altered."), - }; + txn.commit().await?; - // rename referring relation name. - macro_rules! rename_relation_ref { - ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ - let (mut relation, obj) = $entity::find_by_id($object_id) - .find_also_related(Object) - .one(&txn) - .await? - .unwrap(); - relation.definition = - alter_relation_rename_refs(&relation.definition, &old_name, object_name); - let active_model = $table::ActiveModel { - $identity: Set(relation.$identity), - definition: Set(relation.definition.clone()), - ..Default::default() - }; - active_model.update(&txn).await?; - to_update_relations.push(PbRelation { - relation_info: Some(PbRelationInfo::$entity( - ObjectModel(relation, obj.unwrap()).into(), - )), - }); - }}; - } - let mut objs = get_referring_objects(object_id, &txn).await?; - if object_type == ObjectType::Table { - let incoming_sinks: I32Array = Table::find_by_id(object_id) + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { + relations: to_update_relations, + }), + ) + .await; + + Ok(version) + } + + pub async fn alter_swap_rename( + &self, + object_type: ObjectType, + object_id: ObjectId, + dst_object_id: ObjectId, + ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let dst_name: String = match object_type { + ObjectType::Table => Table::find_by_id(dst_object_id) .select_only() - .column(table::Column::IncomingSinks) + .column(table::Column::Name) .into_tuple() .one(&txn) .await? - .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + ObjectType::Source => Source::find_by_id(dst_object_id) + .select_only() + .column(source::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + ObjectType::Sink => Sink::find_by_id(dst_object_id) + .select_only() + .column(sink::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + ObjectType::View => View::find_by_id(dst_object_id) + .select_only() + .column(view::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + ObjectType::Subscription => Subscription::find_by_id(dst_object_id) + .select_only() + .column(subscription::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + _ => { + return Err(MetaError::permission_denied(format!( + "swap rename not supported for object type: {:?}", + object_type + ))); + } + }; - objs.extend( - incoming_sinks - .into_inner() - .into_iter() - .map(|id| PartialObject { - oid: id, - obj_type: ObjectType::Sink, - schema_id: None, - database_id: None, - }), - ); - } + // rename relations. + let (mut to_update_relations, src_name) = + rename_relation(&txn, object_type, object_id, &dst_name).await?; + let (to_update_relations2, _) = + rename_relation(&txn, object_type, dst_object_id, &src_name).await?; + to_update_relations.extend(to_update_relations2); + // rename referring relation name. + to_update_relations.extend( + rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?, + ); + to_update_relations.extend( + rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?, + ); - for obj in objs { - match obj.obj_type { - ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid), - ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid), - ObjectType::Subscription => { - rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid) - } - ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid), - ObjectType::Index => { - let index_table_id: Option = Index::find_by_id(obj.oid) - .select_only() - .column(index::Column::IndexTableId) - .into_tuple() - .one(&txn) - .await?; - rename_relation_ref!(Table, table, table_id, index_table_id.unwrap()); - } - _ => { - bail!("only table, sink, subscription, view and index depend on other objects.") - } - } - } txn.commit().await?; let version = self diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 2240f701a47bf..7a2158b8584d2 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -17,8 +17,8 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use anyhow::{anyhow, Context}; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash; use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::{bail, hash}; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; @@ -27,7 +27,7 @@ use risingwave_meta_model::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege, view, ActorId, ConnectorSplits, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, - PrivilegeId, SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId, + PrivilegeId, SchemaId, SourceId, StreamNode, TableId, UserId, VnodeBitmap, WorkerId, }; use risingwave_meta_model_migration::WithQuery; use risingwave_pb::catalog::{ @@ -49,12 +49,15 @@ use sea_orm::sea_query::{ WithClause, }; use sea_orm::{ - ColumnTrait, ConnectionTrait, DerivePartialModel, EntityTrait, FromQueryResult, JoinType, - Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement, + ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait, + FromQueryResult, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Set, + Statement, }; use thiserror_ext::AsReport; +use crate::controller::ObjectModel; use crate::{MetaError, MetaResult}; + /// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object. /// /// # Examples @@ -1177,6 +1180,170 @@ pub fn extract_external_table_name_from_definition(table_definition: &str) -> Op } } +/// `rename_relation` renames the target relation and its definition, +/// it commits the changes to the transaction and returns the updated relations and the old name. +pub async fn rename_relation( + txn: &DatabaseTransaction, + object_type: ObjectType, + object_id: ObjectId, + object_name: &str, +) -> MetaResult<(Vec, String)> { + use sea_orm::ActiveModelTrait; + + use crate::controller::rename::alter_relation_rename; + + let mut to_update_relations = vec![]; + // rename relation. + macro_rules! rename_relation { + ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ + let (mut relation, obj) = $entity::find_by_id($object_id) + .find_also_related(Object) + .one(txn) + .await? + .unwrap(); + let obj = obj.unwrap(); + let old_name = relation.name.clone(); + relation.name = object_name.into(); + if obj.obj_type != ObjectType::View { + relation.definition = alter_relation_rename(&relation.definition, object_name); + } + let active_model = $table::ActiveModel { + $identity: Set(relation.$identity), + name: Set(object_name.into()), + definition: Set(relation.definition.clone()), + ..Default::default() + }; + active_model.update(txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::$entity(ObjectModel(relation, obj).into())), + }); + old_name + }}; + } + // TODO: check is there any thing to change for shared source? + let old_name = match object_type { + ObjectType::Table => rename_relation!(Table, table, table_id, object_id), + ObjectType::Source => rename_relation!(Source, source, source_id, object_id), + ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id), + ObjectType::Subscription => { + rename_relation!(Subscription, subscription, subscription_id, object_id) + } + ObjectType::View => rename_relation!(View, view, view_id, object_id), + ObjectType::Index => { + let (mut index, obj) = Index::find_by_id(object_id) + .find_also_related(Object) + .one(txn) + .await? + .unwrap(); + index.name = object_name.into(); + let index_table_id = index.index_table_id; + let old_name = rename_relation!(Table, table, table_id, index_table_id); + + // the name of index and its associated table is the same. + let active_model = index::ActiveModel { + index_id: sea_orm::ActiveValue::Set(index.index_id), + name: sea_orm::ActiveValue::Set(object_name.into()), + ..Default::default() + }; + active_model.update(txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Index( + ObjectModel(index, obj.unwrap()).into(), + )), + }); + old_name + } + _ => unreachable!("only relation name can be altered."), + }; + + Ok((to_update_relations, old_name)) +} + +/// `rename_relation_refer` updates the definition of relations that refer to the target one, +/// it commits the changes to the transaction and returns all the updated relations. +pub async fn rename_relation_refer( + txn: &DatabaseTransaction, + object_type: ObjectType, + object_id: ObjectId, + object_name: &str, + old_name: &str, +) -> MetaResult> { + use sea_orm::ActiveModelTrait; + + use crate::controller::rename::alter_relation_rename_refs; + + let mut to_update_relations = vec![]; + macro_rules! rename_relation_ref { + ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ + let (mut relation, obj) = $entity::find_by_id($object_id) + .find_also_related(Object) + .one(txn) + .await? + .unwrap(); + relation.definition = + alter_relation_rename_refs(&relation.definition, old_name, object_name); + let active_model = $table::ActiveModel { + $identity: Set(relation.$identity), + definition: Set(relation.definition.clone()), + ..Default::default() + }; + active_model.update(txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::$entity( + ObjectModel(relation, obj.unwrap()).into(), + )), + }); + }}; + } + let mut objs = get_referring_objects(object_id, txn).await?; + if object_type == ObjectType::Table { + let incoming_sinks: I32Array = Table::find_by_id(object_id) + .select_only() + .column(table::Column::IncomingSinks) + .into_tuple() + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + + objs.extend( + incoming_sinks + .into_inner() + .into_iter() + .map(|id| PartialObject { + oid: id, + obj_type: ObjectType::Sink, + schema_id: None, + database_id: None, + }), + ); + } + + for obj in objs { + match obj.obj_type { + ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid), + ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid), + ObjectType::Subscription => { + rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid) + } + ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid), + ObjectType::Index => { + let index_table_id: Option = Index::find_by_id(obj.oid) + .select_only() + .column(index::Column::IndexTableId) + .into_tuple() + .one(txn) + .await?; + rename_relation_ref!(Table, table, table_id, index_table_id.unwrap()); + } + _ => { + bail!("only table, sink, subscription, view and index depend on other objects.") + } + } + } + + Ok(to_update_relations) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 6c8eb32e6a6c8..4187ce818b345 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -385,12 +385,7 @@ impl HummockManager { .exec(db) .await?; hummock_gc_history::Entity::insert_many(models) - .on_conflict( - OnConflict::column(hummock_gc_history::Column::ObjectId) - .do_nothing() - .to_owned(), - ) - .do_nothing() + .on_conflict_do_nothing() .exec(db) .await?; Ok(()) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 4e28fa512abbd..fab79fe01f9a0 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -34,7 +34,6 @@ use risingwave_meta_model::{ hummock_time_travel_version, }; use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta}; -use sea_orm::sea_query::OnConflict; use sea_orm::ActiveValue::Set; use sea_orm::{ ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QueryOrder, QuerySelect, @@ -383,12 +382,7 @@ impl HummockManager { sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())), }; hummock_sstable_info::Entity::insert(m) - .on_conflict( - OnConflict::column(hummock_sstable_info::Column::SstId) - .do_nothing() - .to_owned(), - ) - .do_nothing() + .on_conflict_do_nothing() .exec(txn) .await?; count += 1; @@ -437,12 +431,7 @@ impl HummockManager { .into()), }; hummock_time_travel_version::Entity::insert(m) - .on_conflict( - OnConflict::column(hummock_time_travel_version::Column::VersionId) - .do_nothing() - .to_owned(), - ) - .do_nothing() + .on_conflict_do_nothing() .exec(txn) .await?; } @@ -468,12 +457,7 @@ impl HummockManager { .into()), }; hummock_time_travel_delta::Entity::insert(m) - .on_conflict( - OnConflict::column(hummock_time_travel_delta::Column::VersionId) - .do_nothing() - .to_owned(), - ) - .do_nothing() + .on_conflict_do_nothing() .exec(txn) .await?; } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index d787be50f0ba3..f0cbd1d5a477f 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -48,7 +48,8 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, WaitVersion, + alter_name_request, alter_set_schema_request, alter_swap_rename_request, DdlProgress, + TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -141,6 +142,7 @@ pub enum DdlCommand { ), DropStreamingJob(StreamingJobId, DropMode, Option), AlterName(alter_name_request::Object, String), + AlterSwapRename(alter_swap_rename_request::Object), ReplaceTable(ReplaceTableInfo), AlterSourceColumn(Source), AlterObjectOwner(Object, UserId), @@ -339,6 +341,7 @@ impl DdlController { DdlCommand::DropSubscription(subscription_id, drop_mode) => { ctrl.drop_subscription(subscription_id, drop_mode).await } + DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await, } } .in_current_span(); @@ -1878,6 +1881,55 @@ impl DdlController { .await } + async fn alter_swap_rename( + &self, + object: alter_swap_rename_request::Object, + ) -> MetaResult { + let (obj_type, src_id, dst_id) = match object { + alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"), + alter_swap_rename_request::Object::Table(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::Table, src_id, dst_id) + } + alter_swap_rename_request::Object::View(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::View, src_id, dst_id) + } + alter_swap_rename_request::Object::Source(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::Source, src_id, dst_id) + } + alter_swap_rename_request::Object::Sink(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::Sink, src_id, dst_id) + } + alter_swap_rename_request::Object::Subscription(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::Subscription, src_id, dst_id) + } + }; + + self.metadata_manager + .catalog_controller + .alter_swap_rename(obj_type, src_id, dst_id) + .await + } + async fn alter_owner( &self, object: Object, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 0b096e53ddfa9..24375d395a069 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -573,6 +573,19 @@ impl MetaClient { Ok(()) } + pub async fn alter_swap_rename( + &self, + object: alter_swap_rename_request::Object, + ) -> Result { + let request = AlterSwapRenameRequest { + object: Some(object), + }; + let resp = self.inner.alter_swap_rename(request).await?; + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) + } + pub async fn replace_table( &self, source: Option, @@ -2096,6 +2109,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse } ,{ ddl_client, wait, WaitRequest, WaitResponse } ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse } + ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse } ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse } ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse } ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 15ac6a9d27623..d94cf80cb9f2d 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -38,6 +38,7 @@ pub enum AlterDatabaseOperation { pub enum AlterSchemaOperation { ChangeOwner { new_owner_name: Ident }, RenameSchema { schema_name: ObjectName }, + SwapRenameSchema { target_schema: ObjectName }, } /// An `ALTER TABLE` (`Statement::AlterTable`) operation @@ -110,6 +111,10 @@ pub enum AlterTableOperation { SetBackfillRateLimit { rate_limit: i32, }, + /// `SWAP WITH ` + SwapRenameTable { + target_table: ObjectName, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -146,6 +151,10 @@ pub enum AlterViewOperation { SetBackfillRateLimit { rate_limit: i32, }, + /// `SWAP WITH ` + SwapRenameView { + target_view: ObjectName, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -165,6 +174,10 @@ pub enum AlterSinkOperation { parallelism: SetVariableValue, deferred: bool, }, + /// `SWAP WITH ` + SwapRenameSink { + target_sink: ObjectName, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -173,6 +186,7 @@ pub enum AlterSubscriptionOperation { RenameSubscription { subscription_name: ObjectName }, ChangeOwner { new_owner_name: Ident }, SetSchema { new_schema_name: ObjectName }, + SwapRenameSubscription { target_subscription: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -185,6 +199,7 @@ pub enum AlterSourceOperation { FormatEncode { format_encode: FormatEncodeOptions }, RefreshSchema, SetSourceRateLimit { rate_limit: i32 }, + SwapRenameSource { target_source: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -221,6 +236,9 @@ impl fmt::Display for AlterSchemaOperation { AlterSchemaOperation::RenameSchema { schema_name } => { write!(f, "RENAME TO {}", schema_name) } + AlterSchemaOperation::SwapRenameSchema { target_schema } => { + write!(f, "SWAP WITH {}", target_schema) + } } } } @@ -300,6 +318,9 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::SetBackfillRateLimit { rate_limit } => { write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } + AlterTableOperation::SwapRenameTable { target_table } => { + write!(f, "SWAP WITH {}", target_table) + } } } } @@ -351,6 +372,9 @@ impl fmt::Display for AlterViewOperation { AlterViewOperation::SetBackfillRateLimit { rate_limit } => { write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } + AlterViewOperation::SwapRenameView { target_view } => { + write!(f, "SWAP WITH {}", target_view) + } } } } @@ -378,6 +402,9 @@ impl fmt::Display for AlterSinkOperation { if *deferred { " DEFERRED" } else { "" } ) } + AlterSinkOperation::SwapRenameSink { target_sink } => { + write!(f, "SWAP WITH {}", target_sink) + } } } } @@ -394,6 +421,11 @@ impl fmt::Display for AlterSubscriptionOperation { AlterSubscriptionOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } + AlterSubscriptionOperation::SwapRenameSubscription { + target_subscription, + } => { + write!(f, "SWAP WITH {}", target_subscription) + } } } } @@ -422,6 +454,9 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::SetSourceRateLimit { rate_limit } => { write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) } + AlterSourceOperation::SwapRenameSource { target_source } => { + write!(f, "SWAP WITH {}", target_source) + } } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 8ec7191f749c2..93e47d7a6b11a 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -503,6 +503,7 @@ define_keywords!( SUCCEEDS, SUM, SUPERUSER, + SWAP, SYMMETRIC, SYNC, SYSTEM, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index b383869c0d3d0..d5582f31a64de 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3096,8 +3096,11 @@ impl Parser<'_> { self.expect_keyword(Keyword::TO)?; let schema_name = self.parse_object_name()?; AlterSchemaOperation::RenameSchema { schema_name } + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_schema = self.parse_object_name()?; + AlterSchemaOperation::SwapRenameSchema { target_schema } } else { - return self.expected("RENAME OR OWNER TO after ALTER SCHEMA"); + return self.expected("RENAME, OWNER TO, OR SWAP WITH after ALTER SCHEMA"); }; Ok(Statement::AlterSchema { @@ -3216,8 +3219,12 @@ impl Parser<'_> { AlterTableOperation::AlterColumn { column_name, op } } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { AlterTableOperation::RefreshSchema + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_table = self.parse_object_name()?; + AlterTableOperation::SwapRenameTable { target_table } } else { - return self.expected("ADD or RENAME or OWNER TO or SET or DROP after ALTER TABLE"); + return self + .expected("ADD or RENAME or OWNER TO or SET or DROP or SWAP after ALTER TABLE"); }; Ok(Statement::AlterTable { name: table_name, @@ -3322,6 +3329,9 @@ impl Parser<'_> { AlterViewOperation::ChangeOwner { new_owner_name: owner_name, } + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_view = self.parse_object_name()?; + AlterViewOperation::SwapRenameView { target_view } } else if self.parse_keyword(Keyword::SET) { if self.parse_keyword(Keyword::SCHEMA) { let schema_name = self.parse_object_name()?; @@ -3352,7 +3362,7 @@ impl Parser<'_> { } } else { return self.expected(&format!( - "RENAME or OWNER TO or SET after ALTER {}VIEW", + "RENAME or OWNER TO or SET or SWAP after ALTER {}VIEW", if materialized { "MATERIALIZED " } else { "" } )); }; @@ -3401,6 +3411,9 @@ impl Parser<'_> { } else { return self.expected("SCHEMA/PARALLELISM after SET"); } + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_sink = self.parse_object_name()?; + AlterSinkOperation::SwapRenameSink { target_sink } } else { return self.expected("RENAME or OWNER TO or SET after ALTER SINK"); }; @@ -3434,8 +3447,13 @@ impl Parser<'_> { } else { return self.expected("SCHEMA after SET"); } + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_subscription = self.parse_object_name()?; + AlterSubscriptionOperation::SwapRenameSubscription { + target_subscription, + } } else { - return self.expected("RENAME or OWNER TO or SET after ALTER SUBSCRIPTION"); + return self.expected("RENAME or OWNER TO or SET or SWAP after ALTER SUBSCRIPTION"); }; Ok(Statement::AlterSubscription { @@ -3482,6 +3500,9 @@ impl Parser<'_> { AlterSourceOperation::FormatEncode { format_encode } } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { AlterSourceOperation::RefreshSchema + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_source = self.parse_object_name()?; + AlterSourceOperation::SwapRenameSource { target_source } } else { return self.expected( "RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE",