From 7690f674c48a7cb824dcd44fe3940a03d42e54ed Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:41:54 +0800 Subject: [PATCH 1/7] chore(deps): update sqlx to 0.8.2, sea-orm to 1.1, sea-orm-migration to 1.1 (#19145) --- Cargo.lock | 160 ++++++++++++++++++++------------------- Cargo.toml | 14 ++-- src/connector/Cargo.toml | 2 +- 3 files changed, 89 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea8e7d9276450..fb9830efdcc41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3929,7 +3929,7 @@ dependencies = [ name = "delta_btree_map" version = "2.2.0-alpha" dependencies = [ - "educe 0.6.0", + "educe", "enum-as-inner 0.6.0", ] @@ -4454,18 +4454,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "educe" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4bd92664bf78c4d3dba9b7cdafce6fa15b13ed3ed16175218196942e99168a8" -dependencies = [ - "enum-ordinalize", - "proc-macro2", - "quote", - "syn 2.0.79", -] - [[package]] name = "educe" version = "0.6.0" @@ -5884,9 +5872,9 @@ checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" [[package]] name = "hashlink" -version = "0.8.4" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ "hashbrown 0.14.5", ] @@ -5919,9 +5907,6 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] [[package]] name = "heck" @@ -7062,9 +7047,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.27.0" +version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ "cc", "pkg-config", @@ -8602,9 +8587,9 @@ dependencies = [ [[package]] name = "ouroboros" -version = "0.17.2" +version = "0.18.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2ba07320d39dfea882faa70554b4bd342a5f273ed59ba7c1c6b4c840492c954" +checksum = "944fa20996a25aded6b4795c6d63f10014a7a83f8be9828a11860b08c5fc4a67" dependencies = [ "aliasable", "ouroboros_macro", @@ -8613,13 +8598,14 @@ dependencies = [ [[package]] name = "ouroboros_macro" -version = "0.17.2" +version = "0.18.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec4c6225c69b4ca778c0aea097321a64c421cf4577b331c61b229267edabb6f8" +checksum = "39b0deead1528fd0e5947a8546a9642a9777c25f6e1e26f34c97b204bbb465bd" dependencies = [ "heck 0.4.1", - "proc-macro-error 1.0.4", + "itertools 0.12.1", "proc-macro2", + "proc-macro2-diagnostics", "quote", "syn 2.0.79", ] @@ -9356,7 +9342,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" dependencies = [ "diff", - "yansi", + "yansi 0.5.1", ] [[package]] @@ -9472,6 +9458,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", + "version_check", + "yansi 1.0.1", +] + [[package]] name = "procfs" version = "0.14.2" @@ -10660,7 +10659,7 @@ dependencies = [ "criterion", "darwin-libproc", "easy-ext", - "educe 0.6.0", + "educe", "either", "enum-as-inner 0.6.0", "enumflags2", @@ -10752,7 +10751,7 @@ name = "risingwave_common_estimate_size" version = "2.2.0-alpha" dependencies = [ "bytes", - "educe 0.6.0", + "educe", "ethnum", "fixedbitset 0.5.0", "jsonbb", @@ -11242,7 +11241,7 @@ dependencies = [ "const-currying", "downcast-rs", "easy-ext", - "educe 0.6.0", + "educe", "either", "enum-as-inner 0.6.0", "expect-test", @@ -11285,7 +11284,7 @@ dependencies = [ "chrono", "chrono-tz 0.10.0", "criterion", - "educe 0.6.0", + "educe", "expect-test", "fancy-regex", "futures-async-stream", @@ -11352,7 +11351,7 @@ dependencies = [ "downcast-rs", "dyn-clone", "easy-ext", - "educe 0.6.0", + "educe", "either", "enum-as-inner 0.6.0", "expect-test", @@ -11605,7 +11604,7 @@ dependencies = [ "comfy-table", "crepe", "easy-ext", - "educe 0.6.0", + "educe", "either", "enum-as-inner 0.6.0", "expect-test", @@ -11721,7 +11720,7 @@ version = "2.2.0-alpha" dependencies = [ "anyhow", "clap", - "educe 0.6.0", + "educe", "either", "futures", "hex", @@ -12146,7 +12145,7 @@ dependencies = [ "cfg-if", "criterion", "delta_btree_map", - "educe 0.6.0", + "educe", "either", "enum-as-inner 0.6.0", "expect-test", @@ -12794,13 +12793,13 @@ dependencies = [ [[package]] name = "sea-orm" -version = "1.0.1" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea1fee0cf8528dbe6eda29d5798afc522a63b75e44c5b15721e6e64af9c7cc4b" +checksum = "d5680a8b686985116607ef5f5af2b1f9e1cc2c228330e93101816a0baa279afa" dependencies = [ "async-stream", "async-trait", - "bigdecimal 0.3.1", + "bigdecimal 0.4.5", "chrono", "futures", "log", @@ -12822,9 +12821,9 @@ dependencies = [ [[package]] name = "sea-orm-cli" -version = "1.0.1" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0b8869c75cf3fbb1bd860abb025033cd2e514c5f4fa43e792697cb1fe6c882" +checksum = "70a157f42d291ccbd6e913b9d9b12dbe2ccbcf0472efc60c8715dd1254083aec" dependencies = [ "chrono", "clap", @@ -12839,9 +12838,9 @@ dependencies = [ [[package]] name = "sea-orm-macros" -version = "1.0.1" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8737b566799ed0444f278d13c300c4c6f1a91782f60ff5825a591852d5502030" +checksum = "3a239e3bb1b566ad4ec2654d0d193d6ceddfd733487edc9c21a64d214c773910" dependencies = [ "heck 0.4.1", "proc-macro2", @@ -12853,9 +12852,9 @@ dependencies = [ [[package]] name = "sea-orm-migration" -version = "1.0.1" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "216643749e26ce27ab6c51d3475f2692981d4a902d34455bcd322f412900df5c" +checksum = "63ba07e9f2479cc671758fcb1edee42ff2e32c34b3e67ab41d0af1e41f73c74e" dependencies = [ "async-trait", "clap", @@ -12870,13 +12869,12 @@ dependencies = [ [[package]] name = "sea-query" -version = "0.31.1" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4fd043b8117af233e221f73e3ea8dfbc8e8c3c928017c474296db45c649105c" +checksum = "ff504d13b5e4b52fffcf2fb203d0352a5722fa5151696db768933e41e1e591bb" dependencies = [ - "bigdecimal 0.3.1", + "bigdecimal 0.4.5", "chrono", - "educe 0.5.11", "inherent", "ordered-float 3.9.1", "rust_decimal", @@ -12888,11 +12886,11 @@ dependencies = [ [[package]] name = "sea-query-binder" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "754965d4aee6145bec25d0898e5c931e6c22859789ce62fd85a42a15ed5a8ce3" +checksum = "b0019f47430f7995af63deda77e238c17323359af241233ec768aba1faea7608" dependencies = [ - "bigdecimal 0.3.1", + "bigdecimal 0.4.5", "chrono", "rust_decimal", "sea-query", @@ -12918,9 +12916,9 @@ dependencies = [ [[package]] name = "sea-schema" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad52149fc81836ea7424c3425d8f6ed8ad448dd16d2e4f6a3907ba46f3f2fd78" +checksum = "aab1592d17860a9a8584d9b549aebcd06f7bdc3ff615f71752486ba0b05b1e6e" dependencies = [ "futures", "sea-query", @@ -13681,7 +13679,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2b6b8f606d3c4cdcaf2031c4320b79d7584e454b79562ba3d675f49701c160e" dependencies = [ "async-trait", - "educe 0.6.0", + "educe", "fs-err", "futures", "glob", @@ -13730,8 +13728,8 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.7.4" -source = "git+https://github.com/risingwavelabs/sqlx.git?rev=ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a#ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a" +version = "0.8.2" +source = "git+https://github.com/madsim-rs/sqlx.git?rev=3efe6d0065963db2a2b7f30dee69f647e28dec81#3efe6d0065963db2a2b7f30dee69f647e28dec81" dependencies = [ "sqlx-core", "sqlx-macros", @@ -13742,24 +13740,24 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.7.4" -source = "git+https://github.com/risingwavelabs/sqlx.git?rev=ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a#ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a" +version = "0.8.2" +source = "git+https://github.com/madsim-rs/sqlx.git?rev=3efe6d0065963db2a2b7f30dee69f647e28dec81#3efe6d0065963db2a2b7f30dee69f647e28dec81" dependencies = [ - "ahash 0.8.11", "atoi", - "bigdecimal 0.3.1", + "bigdecimal 0.4.5", "byteorder", "bytes", "chrono", "crc", "crossbeam-queue", "either", - "event-listener 2.5.3", + "event-listener 5.3.1", "futures-channel", "futures-core", "futures-intrusive", "futures-io", "futures-util", + "hashbrown 0.14.5", "hashlink", "hex", "indexmap 2.6.0", @@ -13786,24 +13784,24 @@ dependencies = [ [[package]] name = "sqlx-macros" -version = "0.7.4" -source = "git+https://github.com/risingwavelabs/sqlx.git?rev=ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a#ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a" +version = "0.8.2" +source = "git+https://github.com/madsim-rs/sqlx.git?rev=3efe6d0065963db2a2b7f30dee69f647e28dec81#3efe6d0065963db2a2b7f30dee69f647e28dec81" dependencies = [ "proc-macro2", "quote", "sqlx-core", "sqlx-macros-core", - "syn 1.0.109", + "syn 2.0.79", ] [[package]] name = "sqlx-macros-core" -version = "0.7.4" -source = "git+https://github.com/risingwavelabs/sqlx.git?rev=ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a#ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a" +version = "0.8.2" +source = "git+https://github.com/madsim-rs/sqlx.git?rev=3efe6d0065963db2a2b7f30dee69f647e28dec81#3efe6d0065963db2a2b7f30dee69f647e28dec81" dependencies = [ "dotenvy", "either", - "heck 0.4.1", + "heck 0.5.0", "hex", "madsim-tokio", "once_cell", @@ -13816,19 +13814,19 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 1.0.109", + "syn 2.0.79", "tempfile", "url", ] [[package]] name = "sqlx-mysql" -version = "0.7.4" -source = "git+https://github.com/risingwavelabs/sqlx.git?rev=ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a#ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a" +version = "0.8.2" +source = "git+https://github.com/madsim-rs/sqlx.git?rev=3efe6d0065963db2a2b7f30dee69f647e28dec81#3efe6d0065963db2a2b7f30dee69f647e28dec81" dependencies = [ "atoi", - "base64 0.21.7", - "bigdecimal 0.3.1", + "base64 0.22.0", + "bigdecimal 0.4.5", "bitflags 2.6.0", "byteorder", "bytes", @@ -13869,12 +13867,12 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.7.4" -source = "git+https://github.com/risingwavelabs/sqlx.git?rev=ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a#ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a" +version = "0.8.2" +source = "git+https://github.com/madsim-rs/sqlx.git?rev=3efe6d0065963db2a2b7f30dee69f647e28dec81#3efe6d0065963db2a2b7f30dee69f647e28dec81" dependencies = [ "atoi", - "base64 0.21.7", - "bigdecimal 0.3.1", + "base64 0.22.0", + "bigdecimal 0.4.5", "bitflags 2.6.0", "byteorder", "chrono", @@ -13912,8 +13910,8 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.7.4" -source = "git+https://github.com/risingwavelabs/sqlx.git?rev=ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a#ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a" +version = "0.8.2" +source = "git+https://github.com/madsim-rs/sqlx.git?rev=3efe6d0065963db2a2b7f30dee69f647e28dec81#3efe6d0065963db2a2b7f30dee69f647e28dec81" dependencies = [ "atoi", "chrono", @@ -13927,11 +13925,11 @@ dependencies = [ "madsim-tokio", "percent-encoding", "serde", + "serde_urlencoded", "sqlx-core", "time", "tracing", "url", - "urlencoding", "uuid", ] @@ -16504,6 +16502,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "yup-oauth2" version = "8.3.0" diff --git a/Cargo.toml b/Cargo.toml index 17ba5bf07551f..0e1c1725baa8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -175,13 +175,13 @@ parking_lot = { version = "0.12", features = [ "arc_lock", "deadlock_detection", ] } -sea-orm = { version = "~1.0", features = [ +sea-orm = { version = "~1.1", features = [ "sqlx-all", "runtime-tokio-native-tls", "with-uuid", ] } -sea-orm-migration = "~1.0" -sqlx = { version = "0.7.4", default-features = false, features = [ +sea-orm-migration = "~1.1" +sqlx = { version = "0.8.2", default-features = false, features = [ "bigdecimal", "chrono", "json", @@ -350,11 +350,9 @@ getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae # tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055" } tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" } - -# sqlx version: v0.7.4 -# patch diffs: https://github.com/madsim-rs/sqlx/pull/3 -sqlx = { git = "https://github.com/risingwavelabs/sqlx.git", rev = "ff6d6d2dc0e9e8e47282fd29be006eed7ae3421a" } - +# sqlx version: v0.8.2 +# patch diffs: https://github.com/madsim-rs/sqlx/pull/4 +sqlx = { git = "https://github.com/madsim-rs/sqlx.git", rev = "3efe6d0065963db2a2b7f30dee69f647e28dec81" } futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" } # patch to remove preserve_order from serde_json bson = { git = "https://github.com/risingwavelabs/bson-rust", rev = "e5175ec" } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 14257251041f4..3fe5499e41d08 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -128,7 +128,7 @@ rustls-native-certs = "0.7" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } -sea-schema = { version = "0.15", default-features = false, features = [ +sea-schema = { version = "0.16", default-features = false, features = [ "discovery", "sqlx-postgres", "sqlx-mysql", From a9f8945658dc2044f8e2f4eef5037ec6b7f42330 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 4 Nov 2024 18:20:30 +0800 Subject: [PATCH 2/7] refactor(meta): simplify stream job table/source id assignment (#19171) Signed-off-by: xxchan Co-authored-by: Bugen Zhao --- .git-blame-ignore-revs | 3 + proto/catalog.proto | 2 + .../src/handler/alter_table_column.rs | 4 +- src/frontend/src/handler/create_sink.rs | 4 +- src/frontend/src/handler/create_table.rs | 18 +++-- src/meta/service/src/ddl_service.rs | 37 +++++---- src/meta/src/manager/streaming_job.rs | 4 - src/meta/src/rpc/ddl_controller.rs | 76 +------------------ src/meta/src/stream/source_manager.rs | 1 + src/meta/src/stream/stream_graph/fragment.rs | 60 ++++++++++----- 10 files changed, 85 insertions(+), 124 deletions(-) diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index b8ca322d767a8..c8e1f42656e47 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -42,3 +42,6 @@ c583e2c6c054764249acf484438c7bf7197765f4 # chore: replace all ProstXxx with PbXxx (#8621) 6fd8821f2e053957b183d648bea9c95b6703941f + +# chore: cleanup v2 naming for sql metastore (#18941) +9a6a7f9052d5679165ff57cc01417c742c95351c diff --git a/proto/catalog.proto b/proto/catalog.proto index 169347c199eb9..5383104e9c0f2 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -95,6 +95,8 @@ message StreamSourceInfo { } message Source { + // For shared source, this is the same as the job id. + // For non-shared source and table with connector, this is a different oid. uint32 id = 1; uint32 schema_id = 2; uint32 database_id = 3; diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 1241553aff04a..ce90fa94253f3 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -35,7 +35,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_source::get_json_schema_location; -use super::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; +use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; @@ -192,7 +192,7 @@ pub async fn get_replace_table_plan( panic!("unexpected statement type: {:?}", definition); }; - let (mut graph, table, source, job_type) = generate_stream_graph_for_table( + let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table( session, table_name, original_catalog, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index ea9f9e98f9b71..24a7e96fe15d7 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -54,7 +54,7 @@ use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{rewrite_now_to_proctime, ExprImpl, InputRef}; use crate::handler::alter_table_column::fetch_table_catalog_for_alter; use crate::handler::create_mv::parse_column_names; -use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; +use crate::handler::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; @@ -672,7 +672,7 @@ pub(crate) async fn reparse_table_for_sink( panic!("unexpected statement type: {:?}", definition); }; - let (graph, table, source, _) = generate_stream_graph_for_table( + let (graph, table, source, _) = generate_stream_graph_for_replace_table( session, table_name, table_catalog, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index eab38a44c4ff4..1e5dc489c1a0c 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -35,6 +35,7 @@ use risingwave_connector::source::cdc::external::{ ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; use risingwave_connector::{source, WithOptionsSecResolved}; +use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; @@ -1322,7 +1323,7 @@ pub fn check_create_table_with_source( } #[allow(clippy::too_many_arguments)] -pub async fn generate_stream_graph_for_table( +pub async fn generate_stream_graph_for_replace_table( _session: &Arc, table_name: ObjectName, original_catalog: &Arc, @@ -1341,7 +1342,7 @@ pub async fn generate_stream_graph_for_table( ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; - let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) { + let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) { (Some(format_encode), None) => ( gen_create_table_plan_with_source( handler_args, @@ -1441,13 +1442,18 @@ pub async fn generate_stream_graph_for_table( let graph = build_graph(plan)?; // Fill the original table ID. - let table = Table { + let mut table = Table { id: original_catalog.id().table_id(), - optional_associated_source_id: original_catalog - .associated_source_id() - .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())), ..table }; + if let Some(source_id) = original_catalog.associated_source_id() { + table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId( + source_id.table_id, + )); + source.as_mut().unwrap().id = source_id.table_id; + source.as_mut().unwrap().optional_associated_table_id = + Some(OptionalAssociatedTableId::AssociatedTableId(table.id)) + } Ok((graph, table, source, job_type)) } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 1d0cc36fff3f2..e59c4a4100141 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -23,9 +23,7 @@ use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; -use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info; use risingwave_meta::rpc::metrics::MetaMetrics; -use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{Comment, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; @@ -84,27 +82,28 @@ impl DdlServiceImpl { } } - fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo { - let job_type = change.get_job_type().unwrap_or_default(); - let mut source = change.source; - let mut fragment_graph = change.fragment_graph.unwrap(); - let mut table = change.table.unwrap(); - if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = - table.optional_associated_source_id - { - source.as_mut().unwrap().id = source_id; - fill_table_stream_graph_info(&mut source, &mut table, job_type, &mut fragment_graph); - } - let table_col_index_mapping = change - .table_col_index_mapping + fn extract_replace_table_info( + ReplaceTablePlan { + table, + fragment_graph, + table_col_index_mapping, + source, + job_type, + }: ReplaceTablePlan, + ) -> ReplaceTableInfo { + let table = table.unwrap(); + let col_index_mapping = table_col_index_mapping .as_ref() .map(ColIndexMapping::from_protobuf); - let stream_job = StreamingJob::Table(source, table, job_type); ReplaceTableInfo { - streaming_job: stream_job, - fragment_graph, - col_index_mapping: table_col_index_mapping, + streaming_job: StreamingJob::Table( + source, + table, + TableJobType::try_from(job_type).unwrap(), + ), + fragment_graph: fragment_graph.unwrap(), + col_index_mapping, } } } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 5257b2c1aa24a..9d5b135095fb1 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -328,8 +328,4 @@ impl StreamingJob { StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()), } } - - pub fn is_source_job(&self) -> bool { - matches!(self, StreamingJob::Source(_)) - } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index f0cbd1d5a477f..fd336a1d0a5b4 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -27,7 +27,7 @@ use risingwave_common::secret::SecretEncryption; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::{ - visit_fragment, visit_stream_node, visit_stream_node_cont_mut, + visit_stream_node, visit_stream_node_cont_mut, }; use risingwave_common::{bail, hash, must_match}; use risingwave_connector::error::ConnectorError; @@ -40,11 +40,9 @@ use risingwave_meta_model::{ ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, }; -use risingwave_pb::catalog::source::OptionalAssociatedTableId; -use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, Schema, Secret, - Sink, Source, Subscription, Table, View, + Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source, + Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -906,7 +904,7 @@ impl DdlController { pub async fn create_streaming_job( &self, mut streaming_job: StreamingJob, - mut fragment_graph: StreamFragmentGraphProto, + fragment_graph: StreamFragmentGraphProto, affected_table_replace_info: Option, ) -> MetaResult { let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); @@ -921,24 +919,6 @@ impl DdlController { .await?; let job_id = streaming_job.id(); - match &mut streaming_job { - StreamingJob::Table(src, table, job_type) => { - // If we're creating a table with connector, we should additionally fill its ID first. - fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph); - } - StreamingJob::Source(src) => { - // set the inner source id of source node. - for fragment in fragment_graph.fragments.values_mut() { - visit_fragment(fragment, |node_body| { - if let NodeBody::Source(source_node) = node_body { - source_node.source_inner.as_mut().unwrap().source_id = src.id; - } - }); - } - } - _ => {} - } - tracing::debug!( id = job_id, definition = streaming_job.definition(), @@ -2003,51 +1983,3 @@ impl DdlController { .await } } - -/// Fill in necessary information for `Table` stream graph. -/// e.g., fill source id for table with connector, fill external table id for CDC table. -pub fn fill_table_stream_graph_info( - source: &mut Option, - table: &mut PbTable, - table_job_type: TableJobType, - fragment_graph: &mut PbStreamFragmentGraph, -) { - let mut source_count = 0; - for fragment in fragment_graph.fragments.values_mut() { - visit_fragment(fragment, |node_body| { - if let NodeBody::Source(source_node) = node_body { - if source_node.source_inner.is_none() { - // skip empty source for dml node - return; - } - - // If we're creating a table with connector, we should additionally fill its ID first. - if let Some(source) = source { - source_node.source_inner.as_mut().unwrap().source_id = source.id; - source_count += 1; - - assert_eq!( - source_count, 1, - "require exactly 1 external stream source when creating table with a connector" - ); - - // Fill in the correct table id for source. - source.optional_associated_table_id = - Some(OptionalAssociatedTableId::AssociatedTableId(table.id)); - // Fill in the correct source id for mview. - table.optional_associated_source_id = - Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id)); - } - } - - // fill table id for cdc backfill - if let NodeBody::StreamCdcScan(node) = node_body - && table_job_type == TableJobType::SharedCdcSource - { - if let Some(table_desc) = node.cdc_table_desc.as_mut() { - table_desc.table_id = table.id; - } - } - }); - } -} diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index b13acb68ac39b..ef8c1298264b9 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -1002,6 +1002,7 @@ impl SourceManager { /// create and register connector worker for source. pub async fn register_source(&self, source: &Source) -> MetaResult<()> { + tracing::debug!("register_source: {}", source.get_id()); let mut core = self.core.lock().await; if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) { let handle = create_source_worker_handle(source, self.metrics.clone()) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 86a2197a9d5bc..c2ccd4300ccf1 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -56,7 +56,7 @@ pub(super) struct BuildingFragment { inner: StreamFragment, /// The ID of the job if it contains the streaming job node. - table_id: Option, + job_id: Option, /// The required column IDs of each upstream table. /// Will be converted to indices when building the edge connected to the upstream. @@ -82,12 +82,12 @@ impl BuildingFragment { // Fill the information of the internal tables in the fragment. Self::fill_internal_tables(&mut fragment, job, table_id_gen); - let table_id = Self::fill_job(&mut fragment, job).then(|| job.id()); + let job_id = Self::fill_job(&mut fragment, job).then(|| job.id()); let upstream_table_columns = Self::extract_upstream_table_columns(&mut fragment); Self { inner: fragment, - table_id, + job_id, upstream_table_columns, } } @@ -126,17 +126,17 @@ impl BuildingFragment { /// Fill the information with the job in the fragment. fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool { - let table_id = job.id(); + let job_id = job.id(); let fragment_id = fragment.fragment_id; - let mut has_table = false; + let mut has_job = false; stream_graph_visitor::visit_fragment(fragment, |node_body| match node_body { NodeBody::Materialize(materialize_node) => { - materialize_node.table_id = table_id; + materialize_node.table_id = job_id; // Fill the ID of the `Table`. let table = materialize_node.table.as_mut().unwrap(); - table.id = table_id; + table.id = job_id; table.database_id = job.database_id(); table.schema_id = job.schema_id(); table.fragment_id = fragment_id; @@ -145,27 +145,49 @@ impl BuildingFragment { table.definition = job.name(); } - has_table = true; + has_job = true; } NodeBody::Sink(sink_node) => { - sink_node.sink_desc.as_mut().unwrap().id = table_id; + sink_node.sink_desc.as_mut().unwrap().id = job_id; - has_table = true; + has_job = true; } NodeBody::Dml(dml_node) => { - dml_node.table_id = table_id; + dml_node.table_id = job_id; dml_node.table_version_id = job.table_version_id().unwrap(); } - NodeBody::Source(_) => { - // Notice: Table job has a dumb Source node, we should be careful that `has_table` should not be overwrite to `false` - if !has_table { - has_table = job.is_source_job(); + NodeBody::Source(source_node) => { + match job { + // Note: For table without connector, it has a dummy Source node. + // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog. + StreamingJob::Table(source, _table, _table_job_type) => { + if let Some(source_inner) = source_node.source_inner.as_mut() { + if let Some(source) = source { + debug_assert_ne!(source.id, job_id); + source_inner.source_id = source.id; + } + } + } + StreamingJob::Source(source) => { + has_job = true; + if let Some(source_inner) = source_node.source_inner.as_mut() { + debug_assert_eq!(source.id, job_id); + source_inner.source_id = source.id; + } + } + // For other job types, no need to fill the source id, since it refers to an existing source. + _ => {} + } + } + NodeBody::StreamCdcScan(node) => { + if let Some(table_desc) = node.cdc_table_desc.as_mut() { + table_desc.table_id = job_id; } } _ => {} }); - has_table + has_job } /// Extract the required columns (in IDs) of each upstream table. @@ -499,7 +521,7 @@ impl StreamFragmentGraph { pub fn table_fragment_id(&self) -> FragmentId { self.fragments .values() - .filter(|b| b.table_id.is_some()) + .filter(|b| b.job_id.is_some()) .map(|b| b.fragment_id) .exactly_one() .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job") @@ -1095,7 +1117,7 @@ impl CompleteStreamFragmentGraph { let internal_tables = building_fragment.extract_internal_tables(); let BuildingFragment { inner, - table_id, + job_id, upstream_table_columns: _, } = building_fragment; @@ -1104,7 +1126,7 @@ impl CompleteStreamFragmentGraph { let materialized_fragment_id = if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 { - table_id + job_id } else { None }; From 946b50089673b026720905e5cccece0485192553 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 4 Nov 2024 18:40:53 +0800 Subject: [PATCH 3/7] fix: fix index create privilege check and ensure consistency between the owner and its associated table (#19252) --- src/frontend/src/handler/create_index.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index 4b32bedc01ea2..1bf7ad16e1c93 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -21,11 +21,9 @@ use either::Either; use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::acl::AclMode; use risingwave_common::catalog::{IndexId, TableDesc, TableId}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus, PbTable}; -use risingwave_pb::user::grant_privilege::Object; use risingwave_sqlparser::ast; use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr}; @@ -34,7 +32,6 @@ use crate::binder::Binder; use crate::catalog::root_catalog::SchemaPath; use crate::error::{ErrorCode, Result}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef}; -use crate::handler::privilege::ObjectCheckItem; use crate::handler::HandlerArgs; use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter; use crate::optimizer::plan_node::{Explain, LogicalProject, LogicalScan, StreamMaterialize}; @@ -83,11 +80,11 @@ pub(crate) fn gen_create_index_plan( ); } - session.check_privileges(&[ObjectCheckItem::new( - table.owner, - AclMode::Select, - Object::TableId(table.id.table_id), - )])?; + if !session.is_super_user() && session.user_id() != table.owner { + return Err( + ErrorCode::PermissionDenied(format!("must be owner of table {}", table.name)).into(), + ); + } let mut binder = Binder::new_for_stream(session); binder.bind_table(Some(&schema_name), &table_name, None)?; @@ -202,7 +199,7 @@ pub(crate) fn gen_create_index_plan( &index_columns_ordered_expr, &include_columns_expr, // We use the first index column as distributed key by default if users - // haven't specify the distributed by columns. + // haven't specified the distributed by columns. if distributed_columns_expr.is_empty() { 1 } else { @@ -221,7 +218,7 @@ pub(crate) fn gen_create_index_plan( index_table_prost.retention_seconds = table.retention_seconds; } - index_table_prost.owner = session.user_id(); + index_table_prost.owner = table.owner; index_table_prost.dependent_relations = vec![table.id.table_id]; let index_columns_len = index_columns_ordered_expr.len() as u32; From bb0d786b39c9eef9dbf1e16814e8d12e46b8a676 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 4 Nov 2024 19:13:05 +0800 Subject: [PATCH 4/7] fix(connector): handle backwards compat parsing for `boolean` datatype (#19251) Co-authored-by: William Wen --- e2e_test/source_legacy/cdc/cdc.share_stream.slt | 6 +++--- src/connector/src/parser/mysql.rs | 8 ++++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_legacy/cdc/cdc.share_stream.slt b/e2e_test/source_legacy/cdc/cdc.share_stream.slt index 53148c66836e8..cf1000957b6fb 100644 --- a/e2e_test/source_legacy/cdc/cdc.share_stream.slt +++ b/e2e_test/source_legacy/cdc/cdc.share_stream.slt @@ -232,10 +232,10 @@ SELECT order_id,order_date,customer_name,product_id FROM orders_test order by or 10003 2020-07-30 12:00:30 Edward 106 query IIIIITTTTTTTTT -SELECT c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint; +SELECT c_boolean, c_bit, c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint; ---- --128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 -NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL +t t -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b 1001-01-01 00:00:00 1998-01-01 00:00:00 1970-01-01 00:00:01+00:00 +f f NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d 1001-01-01 NULL 2000-01-01 00:00:00 NULL statement ok create secret pg_pwd with ( diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index e9a8eeba70cb3..9970ad50003c1 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -75,6 +75,14 @@ pub fn mysql_datum_to_rw_datum( ) -> Result { match rw_data_type { DataType::Boolean => { + // TinyInt(1) is used to represent boolean in MySQL + // This handles backwards compatibility, + // before https://github.com/risingwavelabs/risingwave/pull/19071 + // we permit boolean and tinyint(1) to be equivalent to boolean in RW. + if let Some(Ok(val)) = mysql_row.get_opt::, _>(mysql_datum_index) { + let _ = mysql_row.take::(mysql_datum_index); + return Ok(val.map(ScalarImpl::from)); + } // Bit(1) match mysql_row.take_opt::>, _>(mysql_datum_index) { None => bail!( From b796d0ddb02112ddedf2a4ae755924865fbf0ba5 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 4 Nov 2024 22:09:25 +0800 Subject: [PATCH 5/7] refactor(common): simplify chunk iteration with `filter_by_bitmap` (#19254) Signed-off-by: Richard Chien --- src/batch/src/executor/group_top_n.rs | 15 ++- src/batch/src/executor/hash_agg.rs | 9 +- src/batch/src/executor/join/hash_join.rs | 94 +++++-------------- .../src/executor/join/lookup_join_base.rs | 9 +- src/common/src/bitmap.rs | 10 ++ 5 files changed, 48 insertions(+), 89 deletions(-) diff --git a/src/batch/src/executor/group_top_n.rs b/src/batch/src/executor/group_top_n.rs index b1f4791358131..7dda468066e93 100644 --- a/src/batch/src/executor/group_top_n.rs +++ b/src/batch/src/executor/group_top_n.rs @@ -20,6 +20,7 @@ use futures_async_stream::try_stream; use hashbrown::HashMap; use itertools::Itertools; use risingwave_common::array::DataChunk; +use risingwave_common::bitmap::FilterByBitmap; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc}; @@ -198,16 +199,12 @@ impl GroupTopNExecutor { let chunk = Arc::new(chunk?); let keys = K::build_many(self.group_key.as_slice(), &chunk); - for (row_id, ((encoded_row, key), visible)) in - encode_chunk(&chunk, &self.column_orders)? - .into_iter() - .zip_eq_fast(keys.into_iter()) - .zip_eq_fast(chunk.visibility().iter()) - .enumerate() + for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)? + .into_iter() + .zip_eq_fast(keys.into_iter()) + .enumerate() + .filter_by_bitmap(chunk.visibility()) { - if !visible { - continue; - } let heap = groups.entry(key).or_insert_with(|| { TopNHeap::new( self.limit, diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index bde5d36bd8c64..ea780d7bf83b3 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -21,7 +21,7 @@ use futures_async_stream::try_stream; use hashbrown::hash_map::Entry; use itertools::Itertools; use risingwave_common::array::{DataChunk, StreamChunk}; -use risingwave_common::bitmap::Bitmap; +use risingwave_common::bitmap::{Bitmap, FilterByBitmap}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::MemoryContext; @@ -545,14 +545,11 @@ impl HashAggExecutor { let chunk = StreamChunk::from(chunk?); let keys = K::build_many(self.group_key_columns.as_slice(), &chunk); let mut memory_usage_diff = 0; - for (row_id, (key, visible)) in keys + for (row_id, key) in keys .into_iter() - .zip_eq_fast(chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(chunk.visibility()) { - if !visible { - continue; - } let mut new_group = false; let states = match groups.entry(key) { Entry::Occupied(entry) => entry.into_mut(), diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 834320b60d2b0..5ec115e0990a2 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{Array, DataChunk, RowRef}; -use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; +use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap}; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc}; @@ -514,15 +514,12 @@ impl HashJoinExecutor { for (build_chunk_id, build_chunk) in build_side.iter().enumerate() { let build_keys = K::build_many(&self.build_key_idxs, build_chunk); - for (build_row_id, (build_key, visible)) in build_keys + for (build_row_id, build_key) in build_keys .into_iter() - .zip_eq_fast(build_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(build_chunk.visibility()) { self.shutdown_rx.check()?; - if !visible { - continue; - } // Only insert key to hash map if it is consistent with the null safe restriction. if build_key.null_bitmap().is_subset(&null_matched) { let row_id = RowId::new(build_chunk_id, build_row_id); @@ -765,14 +762,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -828,14 +822,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { for build_row_id in next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) @@ -898,14 +889,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { non_equi_state .first_output_row_id @@ -979,14 +967,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } shutdown_rx.check()?; if !ANTI_JOIN { if hash_map.contains_key(probe_key) { @@ -1043,14 +1028,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { non_equi_state @@ -1119,14 +1101,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { non_equi_state @@ -1201,14 +1180,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -1266,14 +1242,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -1338,13 +1311,7 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_key, visible) in probe_keys - .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) - { - if !visible { - continue; - } + for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) { for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -1392,14 +1359,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } for build_row_id in next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) { @@ -1465,14 +1429,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { for build_row_id in next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) @@ -1547,14 +1508,11 @@ impl HashJoinExecutor { for probe_chunk in probe_side.execute() { let probe_chunk = probe_chunk?; let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk); - for (probe_row_id, (probe_key, visible)) in probe_keys + for (probe_row_id, probe_key) in probe_keys .iter() - .zip_eq_fast(probe_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(probe_chunk.visibility()) { - if !visible { - continue; - } left_non_equi_state.found_matched = false; if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { left_non_equi_state diff --git a/src/batch/src/executor/join/lookup_join_base.rs b/src/batch/src/executor/join/lookup_join_base.rs index 39a39b9a1424d..743ae25cf4d6a 100644 --- a/src/batch/src/executor/join/lookup_join_base.rs +++ b/src/batch/src/executor/join/lookup_join_base.rs @@ -18,13 +18,13 @@ use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::DataChunk; +use risingwave_common::bitmap::FilterByBitmap; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher}; use risingwave_common::memory::MemoryContext; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType}; use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::expr::BoxedExpression; @@ -150,14 +150,11 @@ impl LookupJoinBase { for (build_chunk_id, build_chunk) in build_side.iter().enumerate() { let build_keys = K::build_many(&hash_join_build_side_key_idxs, build_chunk); - for (build_row_id, (build_key, visible)) in build_keys + for (build_row_id, build_key) in build_keys .into_iter() - .zip_eq_fast(build_chunk.visibility().iter()) .enumerate() + .filter_by_bitmap(build_chunk.visibility()) { - if !visible { - continue; - } // Only insert key to hash map if it is consistent with the null safe // restriction. if build_key.null_bitmap().is_subset(&null_matched) { diff --git a/src/common/src/bitmap.rs b/src/common/src/bitmap.rs index 22869b23bc1d8..995771970ece3 100644 --- a/src/common/src/bitmap.rs +++ b/src/common/src/bitmap.rs @@ -42,6 +42,7 @@ use std::ops::{BitAnd, BitAndAssign, BitOr, BitOrAssign, BitXor, Not, Range, Ran use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::PbBuffer; +use rw_iter_util::ZipEqFast; #[derive(Default, Debug, Clone, EstimateSize)] pub struct BitmapBuilder { @@ -826,6 +827,15 @@ impl iter::Iterator for BitmapOnesIter<'_> { } } +pub trait FilterByBitmap: ExactSizeIterator + Sized { + fn filter_by_bitmap(self, bitmap: &Bitmap) -> impl Iterator { + self.zip_eq_fast(bitmap.iter()) + .filter_map(|(item, bit)| bit.then_some(item)) + } +} + +impl FilterByBitmap for T where T: ExactSizeIterator {} + #[cfg(test)] mod tests { use super::*; From 46ad59877c7d5115d5285e3e24ac6590b1478b28 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Tue, 5 Nov 2024 00:49:33 +0800 Subject: [PATCH 6/7] fix(meta): record delta for new compaction group (#19253) --- src/meta/src/hummock/manager/commit_epoch.rs | 110 +++++++++---------- src/meta/src/hummock/manager/time_travel.rs | 26 +++-- src/meta/src/hummock/manager/versioning.rs | 2 - 3 files changed, 65 insertions(+), 73 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 93e5d5be1dfb8..5875feabf6db0 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -44,9 +44,9 @@ use crate::hummock::metrics_utils::{ }; use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; +use crate::hummock::time_travel::should_mark_next_time_travel_version_snapshot; use crate::hummock::{ - commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer, - HummockManager, + commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager, }; pub enum NewTableFragmentInfo { @@ -109,13 +109,6 @@ impl HummockManager { ); } - let previous_time_travel_toggle_check = versioning.time_travel_toggle_check; - versioning.time_travel_toggle_check = self.time_travel_enabled().await; - if !previous_time_travel_toggle_check && versioning.time_travel_toggle_check { - // Take a snapshot for the first commit epoch after enabling time travel. - versioning.mark_next_time_travel_version_snapshot(); - } - let mut version = HummockVersionTransaction::new( &mut versioning.current_version, &mut versioning.hummock_version_deltas, @@ -214,6 +207,10 @@ impl HummockManager { new_table_watermarks, change_log_delta, ); + if should_mark_next_time_travel_version_snapshot(&time_travel_delta) { + // Unable to invoke mark_next_time_travel_version_snapshot because versioning is already mutable borrowed. + versioning.time_travel_snapshot_interval_counter = u64::MAX; + } // Apply stats changes. let mut version_stats = HummockVersionStatsTransaction::new( @@ -247,59 +244,50 @@ impl HummockManager { ); table_metrics.inc_write_throughput(stats_value as u64); } - if versioning.time_travel_toggle_check { - let mut time_travel_version = None; - if versioning.time_travel_snapshot_interval_counter - >= self.env.opts.hummock_time_travel_snapshot_interval - { - versioning.time_travel_snapshot_interval_counter = 0; - time_travel_version = Some(version.latest_version()); - } else { - versioning.time_travel_snapshot_interval_counter = versioning - .time_travel_snapshot_interval_counter - .saturating_add(1); - } - let group_parents = version - .latest_version() - .levels - .values() - .map(|g| (g.group_id, g.parent_group_id)) - .collect(); - let time_travel_tables_to_commit = - table_compaction_group_mapping - .iter() - .filter_map(|(table_id, cg_id)| { - tables_to_commit - .get(table_id) - .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) - }); - let mut txn = self.env.meta_store_ref().conn.begin().await?; - let version_snapshot_sst_ids = self - .write_time_travel_metadata( - &txn, - time_travel_version, - time_travel_delta, - &group_parents, - &versioning.last_time_travel_snapshot_sst_ids, - time_travel_tables_to_commit, - ) - .await?; - commit_multi_var_with_provided_txn!( - txn, - version, - version_stats, - compaction_group_manager_txn - )?; - if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { - versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; - } + let mut time_travel_version = None; + if versioning.time_travel_snapshot_interval_counter + >= self.env.opts.hummock_time_travel_snapshot_interval + { + versioning.time_travel_snapshot_interval_counter = 0; + time_travel_version = Some(version.latest_version()); } else { - commit_multi_var!( - self.meta_store_ref(), - version, - version_stats, - compaction_group_manager_txn - )?; + versioning.time_travel_snapshot_interval_counter = versioning + .time_travel_snapshot_interval_counter + .saturating_add(1); + } + let group_parents = version + .latest_version() + .levels + .values() + .map(|g| (g.group_id, g.parent_group_id)) + .collect(); + let time_travel_tables_to_commit = + table_compaction_group_mapping + .iter() + .filter_map(|(table_id, cg_id)| { + tables_to_commit + .get(table_id) + .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) + }); + let mut txn = self.env.meta_store_ref().conn.begin().await?; + let version_snapshot_sst_ids = self + .write_time_travel_metadata( + &txn, + time_travel_version, + time_travel_delta, + &group_parents, + &versioning.last_time_travel_snapshot_sst_ids, + time_travel_tables_to_commit, + ) + .await?; + commit_multi_var_with_provided_txn!( + txn, + version, + version_stats, + compaction_group_manager_txn + )?; + if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { + versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; } for compaction_group_id in &modified_compaction_groups { diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index fab79fe01f9a0..1ceaad5cfd390 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -17,14 +17,13 @@ use std::collections::{HashMap, HashSet, VecDeque}; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::time_travel::{ refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta, }; -use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId, }; @@ -45,14 +44,6 @@ use crate::hummock::HummockManager; /// Time travel. impl HummockManager { - pub(crate) async fn time_travel_enabled(&self) -> bool { - self.env - .system_params_reader() - .await - .time_travel_retention_ms() - > 0 - } - pub(crate) async fn init_time_travel_state(&self) -> Result<()> { let sql_store = self.env.meta_store_ref(); let mut guard = self.versioning.write().await; @@ -473,6 +464,11 @@ fn replay_archive( let mut last_version = HummockVersion::from_persisted_protobuf(&version); for d in deltas { let d = HummockVersionDelta::from_persisted_protobuf(&d); + debug_assert!( + !should_mark_next_time_travel_version_snapshot(&d), + "unexpected time travel delta {:?}", + d + ); // Need to work around the assertion in `apply_version_delta`. // Because compaction deltas are not included in time travel archive. while last_version.id < d.prev_id { @@ -506,3 +502,13 @@ fn should_ignore_group(root_group_id: CompactionGroupId) -> bool { pub fn require_sql_meta_store_err() -> Error { Error::TimeTravel(anyhow!("require SQL meta store")) } + +/// Time travel delta replay only expect `NewL0SubLevel`. In all other cases, a new version snapshot should be created. +pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool { + delta.group_deltas.iter().any(|(_, deltas)| { + deltas + .group_deltas + .iter() + .any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_))) + }) +} diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 7fa93f72d02b8..fd516a2ba3e1e 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -55,8 +55,6 @@ pub struct Versioning { pub time_travel_snapshot_interval_counter: u64, /// Used to avoid the attempts to rewrite the same SST to meta store pub last_time_travel_snapshot_sst_ids: HashSet, - /// Whether time travel is enabled during last commit epoch. - pub time_travel_toggle_check: bool, // Persistent states below pub hummock_version_deltas: BTreeMap, From 9d83354629bde178ba31e5fbea4ebc51c814fecf Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 5 Nov 2024 11:42:08 +0800 Subject: [PATCH 7/7] chore(deps): Bump speedate from 0.14.0 to 0.15.0 (#19257) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 11 ++++------- src/common/Cargo.toml | 2 +- src/frontend/Cargo.toml | 2 +- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb9830efdcc41..2e84e822497b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13599,12 +13599,12 @@ dependencies = [ [[package]] name = "speedate" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c323c4e6fece5a5a1a2a7f726d243144cce9fbcfe3ce4d9f3c6ede726a2bc780" +checksum = "9a5e7adf4e07e7de39a64d77962ca14a09165e592d42d0c9f9acadb679f4f937" dependencies = [ - "strum 0.25.0", - "strum_macros 0.25.3", + "strum 0.26.3", + "strum_macros 0.26.4", ] [[package]] @@ -14015,9 +14015,6 @@ name = "strum" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" -dependencies = [ - "strum_macros 0.25.3", -] [[package]] name = "strum" diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 8b341fb621fe6..4b002bd8084c6 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -97,7 +97,7 @@ serde_default = "0.2" serde_json = "1" serde_with = "3" smallbitset = "0.7.1" -speedate = "0.14.0" +speedate = "0.15.0" stacker = "0.1" static_assertions = "1" strum = "0.26" diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 76ee2aa076e8e..502eaeba2f685 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -81,7 +81,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10.7" smallvec = { version = "1.13.1", features = ["serde"] } -speedate = "0.14.0" +speedate = "0.15.0" tempfile = "3" thiserror = "1" thiserror-ext = { workspace = true }