From b17569d774f25351f4458b3c42723bf51e6bccd0 Mon Sep 17 00:00:00 2001 From: Yufan Song <33971064+yufansong@users.noreply.github.com> Date: Mon, 18 Sep 2023 20:13:30 -0700 Subject: [PATCH 1/3] feat(stream): add nkey and jwt auth methods for nats connector (#12227) Co-authored-by: yufansong --- Cargo.lock | 256 ++++++------------ src/connector/Cargo.toml | 3 + src/connector/src/common.rs | 118 +++++--- .../src/source/nats/enumerator/mod.rs | 14 +- src/connector/src/source/nats/mod.rs | 6 + .../src/source/nats/source/message.rs | 30 +- .../src/source/nats/source/reader.rs | 43 ++- src/connector/src/source/nats/split.rs | 26 +- src/workspace-hack/Cargo.toml | 4 - 9 files changed, 273 insertions(+), 227 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2519cf2a63356..7abddeda699db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,7 +163,7 @@ checksum = "9c0fdddc3fdac97394ffcc5c89c634faa9c1c166ced54189af34e407c97b6ee7" dependencies = [ "apache-avro-derive", "byteorder", - "digest 0.10.7", + "digest", "lazy_static", "libflate", "log", @@ -189,7 +189,7 @@ dependencies = [ "byteorder", "bzip2", "crc32fast", - "digest 0.10.7", + "digest", "lazy_static", "libflate", "log", @@ -795,7 +795,7 @@ dependencies = [ "once_cell", "percent-encoding", "regex", - "sha2 0.10.7", + "sha2", "time", "tracing", ] @@ -829,7 +829,7 @@ dependencies = [ "md-5", "pin-project-lite", "sha1", - "sha2 0.10.7", + "sha2", "tracing", ] @@ -1224,15 +1224,6 @@ dependencies = [ "triple_accel", ] -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array", -] - [[package]] name = "block-buffer" version = "0.10.4" @@ -1776,12 +1767,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "const-oid" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" - [[package]] name = "const-oid" version = "0.9.5" @@ -2091,15 +2076,29 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "3.2.0" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61" +checksum = "622178105f911d937a42cdb140730ba4a3ed2becd8ae6ce39c7d28b5d75d4588" dependencies = [ - "byteorder", - "digest 0.9.0", - "rand_core 0.5.1", + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "platforms", + "rustc_version", "subtle", - "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.33", ] [[package]] @@ -2300,23 +2299,14 @@ dependencies = [ "uuid", ] -[[package]] -name = "der" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b71cca7d95d7681a4b3b9cdf63c8dbc3730d0584c2c74e31416d64a90493f4" -dependencies = [ - "const-oid 0.6.2", -] - [[package]] name = "der" version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" dependencies = [ - "const-oid 0.9.5", - "pem-rfc7468 0.7.0", + "const-oid", + "pem-rfc7468", "zeroize", ] @@ -2395,23 +2385,14 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" -[[package]] -name = "digest" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" -dependencies = [ - "generic-array", -] - [[package]] name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer 0.10.4", - "const-oid 0.9.5", + "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -2483,23 +2464,23 @@ checksum = "49457524c7e65648794c98283282a0b7c73b10018e7091f1cdcfff314fd7ae59" [[package]] name = "ed25519" -version = "1.5.3" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91cff35c70bba8a626e3185d8cd48cc11b5437e1a5bcd15b9b5fa3c64b6dfee7" +checksum = "60f6d271ca33075c88028be6f04d502853d63a5ece419d269c15315d4fc1cf1d" dependencies = [ - "signature 1.6.4", + "signature", ] [[package]] name = "ed25519-dalek" -version = "1.0.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" +checksum = "7277392b266383ef8396db7fdeb1e77b6c52fed775f5df15bb24f35b72156980" dependencies = [ "curve25519-dalek", "ed25519", - "sha2 0.9.9", - "zeroize", + "sha2", + "signature", ] [[package]] @@ -2720,6 +2701,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "fiat-crypto" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0870c84016d4b481be5c9f323c24f65e31e901ae618f0e80f4308fb00de1d2d" + [[package]] name = "fiemap" version = "0.1.1" @@ -3439,7 +3426,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest 0.10.7", + "digest", ] [[package]] @@ -4373,7 +4360,7 @@ version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "digest 0.10.7", + "digest", ] [[package]] @@ -4610,7 +4597,7 @@ dependencies = [ "serde", "serde_json", "sha1", - "sha2 0.10.7", + "sha2", "smallvec", "subprocess", "thiserror", @@ -4686,9 +4673,9 @@ dependencies = [ [[package]] name = "nkeys" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9261eb915c785ea65708bc45ef43507ea46914e1a73f1412d1a38aba967c8e" +checksum = "aad178aad32087b19042ee36dfd450b73f5f934fbfb058b59b198684dfec4c47" dependencies = [ "byteorder", "data-encoding", @@ -4922,7 +4909,7 @@ dependencies = [ "serde", "serde_json", "serde_path_to_error", - "sha2 0.10.7", + "sha2", "thiserror", "url", ] @@ -4948,12 +4935,6 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - [[package]] name = "opendal" version = "0.39.0" @@ -4982,7 +4963,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "sha2 0.10.7", + "sha2", "tokio", "uuid", ] @@ -5415,15 +5396,6 @@ dependencies = [ "serde", ] -[[package]] -name = "pem-rfc7468" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f22eb0e3c593294a99e9ff4b24cf6b752d43f193aa4415fe5077c159996d497" -dependencies = [ - "base64ct", -] - [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -5558,21 +5530,9 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" dependencies = [ - "der 0.7.8", - "pkcs8 0.10.2", - "spki 0.7.2", -] - -[[package]] -name = "pkcs8" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee3ef9b64d26bad0536099c816c6734379e45bbd5f14798def6809e5cc350447" -dependencies = [ - "der 0.4.5", - "pem-rfc7468 0.2.3", - "spki 0.4.1", - "zeroize", + "der", + "pkcs8", + "spki", ] [[package]] @@ -5581,8 +5541,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der 0.7.8", - "spki 0.7.2", + "der", + "spki", ] [[package]] @@ -5591,6 +5551,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "platforms" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" + [[package]] name = "plotters" version = "0.3.5" @@ -5681,7 +5647,7 @@ dependencies = [ "md-5", "memchr", "rand", - "sha2 0.10.7", + "sha2", "stringprep", ] @@ -6163,7 +6129,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -6173,15 +6139,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.4", + "rand_core", ] -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" - [[package]] name = "rand_core" version = "0.6.4" @@ -6197,7 +6157,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" dependencies = [ - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -6359,7 +6319,7 @@ dependencies = [ "serde", "serde_json", "sha1", - "sha2 0.10.7", + "sha2", "tokio", ] @@ -6894,6 +6854,7 @@ dependencies = [ "mysql_async", "mysql_common", "nexmark", + "nkeys", "num-bigint", "opendal", "parking_lot 0.12.1", @@ -6919,6 +6880,7 @@ dependencies = [ "simd-json", "tempfile", "thiserror", + "time", "tokio-retry", "tokio-stream", "tokio-util", @@ -7017,7 +6979,7 @@ dependencies = [ "serde", "serde_json", "sha1", - "sha2 0.10.7", + "sha2", "smallvec", "static_assertions", "thiserror", @@ -7090,7 +7052,7 @@ dependencies = [ "risingwave_variables", "serde", "serde_json", - "sha2 0.10.7", + "sha2", "smallvec", "tempfile", "thiserror", @@ -7756,17 +7718,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ab43bb47d23c1a631b4b680199a45255dce26fa9ab2fa902581f624ff13e6a8" dependencies = [ "byteorder", - "const-oid 0.9.5", - "digest 0.10.7", + "const-oid", + "digest", "num-bigint-dig", "num-integer", "num-iter", "num-traits", "pkcs1", - "pkcs8 0.10.2", - "rand_core 0.6.4", - "signature 2.1.0", - "spki 0.7.2", + "pkcs8", + "rand_core", + "signature", + "spki", "subtle", "zeroize", ] @@ -8343,7 +8305,7 @@ checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.7", + "digest", ] [[package]] @@ -8352,19 +8314,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" -[[package]] -name = "sha2" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" -dependencies = [ - "block-buffer 0.9.0", - "cfg-if", - "cpufeatures", - "digest 0.9.0", - "opaque-debug", -] - [[package]] name = "sha2" version = "0.10.7" @@ -8373,7 +8322,7 @@ checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.7", + "digest", ] [[package]] @@ -8439,30 +8388,24 @@ dependencies = [ [[package]] name = "signatory" -version = "0.23.2" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfecc059e81632eef1dd9b79e22fc28b8fe69b30d3357512a77a0ad8ee3c782" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" dependencies = [ - "pkcs8 0.7.6", - "rand_core 0.6.4", - "signature 1.6.4", + "pkcs8", + "rand_core", + "signature", "zeroize", ] [[package]] name = "signature" -version = "1.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" - -[[package]] -name = "signature" -version = "2.1.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e1788eed21689f9cf370582dfc467ef36ed9c707f073528ddafa8d83e3b8500" +checksum = "8fe458c98333f9c8152221191a77e2a44e8325d0193484af2e9421a53019e57d" dependencies = [ - "digest 0.10.7", - "rand_core 0.6.4", + "digest", + "rand_core", ] [[package]] @@ -8624,15 +8567,6 @@ dependencies = [ "lock_api", ] -[[package]] -name = "spki" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c01a0c15da1b0b0e1494112e7af814a678fec9bd157881b49beac661e9b6f32" -dependencies = [ - "der 0.4.5", -] - [[package]] name = "spki" version = "0.7.2" @@ -8640,7 +8574,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a" dependencies = [ "base64ct", - "der 0.7.8", + "der", ] [[package]] @@ -10063,7 +9997,6 @@ dependencies = [ "base64 0.21.3", "bit-vec", "bitflags 2.4.0", - "byteorder", "bytes", "cc", "chrono", @@ -10121,7 +10054,7 @@ dependencies = [ "prost", "rand", "rand_chacha", - "rand_core 0.6.4", + "rand_core", "regex", "regex-automata 0.3.8", "regex-syntax 0.7.5", @@ -10154,7 +10087,6 @@ dependencies = [ "unicode-normalization", "url", "uuid", - "zeroize", ] [[package]] @@ -10238,20 +10170,6 @@ name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.33", -] [[package]] name = "zstd" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 6b0aebf4b0f18..bb1b4b2b4ee0f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -66,6 +66,7 @@ mysql_common = { version = "0.30", default-features = false, features = [ "chrono", ] } nexmark = { version = "0.2", features = ["serde"] } +nkeys = "0.3.2" num-bigint = "0.4" opendal = "0.39" parking_lot = "0.12" @@ -101,6 +102,7 @@ serde_with = { version = "3", features = ["json"] } simd-json = "0.10.6" tempfile = "3" thiserror = "1" +time = "0.3.28" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", @@ -117,6 +119,7 @@ tonic = { workspace = true } tracing = "0.1" url = "2" urlencoding = "2" + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 5a03fc7bfd9af..4e2142fda0adb 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -26,11 +26,12 @@ use risingwave_common::error::anyhow_error; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; +use time::OffsetDateTime; use crate::aws_auth::AwsAuthProps; use crate::deserialize_duration_from_string; use crate::sink::SinkError; - +use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and // sink. @@ -342,27 +343,33 @@ pub struct UpsertMessage<'a> { #[serde_as] #[derive(Deserialize, Serialize, Debug, Clone)] pub struct NatsCommon { - #[serde(rename = "nats.server_url")] + #[serde(rename = "server_url")] pub server_url: String, - #[serde(rename = "nats.subject")] + #[serde(rename = "subject")] pub subject: String, - #[serde(rename = "nats.user")] + #[serde(rename = "connect_mode")] + pub connect_mode: Option, + #[serde(rename = "username")] pub user: Option, - #[serde(rename = "nats.password")] + #[serde(rename = "password")] pub password: Option, - #[serde(rename = "nats.max_bytes")] + #[serde(rename = "jwt")] + pub jwt: Option, + #[serde(rename = "nkey")] + pub nkey: Option, + #[serde(rename = "max_bytes")] #[serde_as(as = "Option")] pub max_bytes: Option, - #[serde(rename = "nats.max_messages")] + #[serde(rename = "max_messages")] #[serde_as(as = "Option")] pub max_messages: Option, - #[serde(rename = "nats.max_messages_per_subject")] + #[serde(rename = "max_messages_per_subject")] #[serde_as(as = "Option")] pub max_messages_per_subject: Option, - #[serde(rename = "nats.max_consumers")] + #[serde(rename = "max_consumers")] #[serde_as(as = "Option")] pub max_consumers: Option, - #[serde(rename = "nats.max_message_size")] + #[serde(rename = "max_message_size")] #[serde_as(as = "Option")] pub max_message_size: Option, } @@ -370,9 +377,39 @@ pub struct NatsCommon { impl NatsCommon { pub(crate) async fn build_client(&self) -> anyhow::Result { let mut connect_options = async_nats::ConnectOptions::new(); - if let (Some(v_user), Some(v_password)) = (self.user.as_ref(), self.password.as_ref()) { - connect_options = connect_options.user_and_password(v_user.into(), v_password.into()); - } + match self.connect_mode.as_deref() { + Some("user_and_password") => { + if let (Some(v_user), Some(v_password)) = + (self.user.as_ref(), self.password.as_ref()) + { + connect_options = + connect_options.user_and_password(v_user.into(), v_password.into()) + } else { + return Err(anyhow_error!( + "nats connect mode is user_and_password, but user or password is empty" + )); + } + } + + Some("credential") => { + if let (Some(v_nkey), Some(v_jwt)) = (self.nkey.as_ref(), self.jwt.as_ref()) { + connect_options = connect_options + .credentials(&self.create_credential(v_nkey, v_jwt)?) + .expect("failed to parse static creds") + } else { + return Err(anyhow_error!( + "nats connect mode is credential, but nkey or jwt is empty" + )); + } + } + Some("plain") => {} + _ => { + return Err(anyhow_error!( + "nats connect mode only accept user_and_password/credential/plain" + )); + } + }; + let servers = self.server_url.split(',').collect::>(); let client = connect_options .connect( @@ -394,8 +431,8 @@ impl NatsCommon { pub(crate) async fn build_consumer( &self, - split_id: i32, - start_sequence: Option, + split_id: String, + start_sequence: NatsOffset, ) -> anyhow::Result< async_nats::jetstream::consumer::Consumer, > { @@ -406,23 +443,28 @@ impl NatsCommon { ack_policy: jetstream::consumer::AckPolicy::None, ..Default::default() }; - match start_sequence { - Some(v) => { - let consumer = stream - .get_or_create_consumer(&name, { - config.deliver_policy = DeliverPolicy::ByStartSequence { - start_sequence: v + 1, - }; - config - }) - .await?; - Ok(consumer) - } - None => { - let consumer = stream.get_or_create_consumer(&name, config).await?; - Ok(consumer) + + let deliver_policy = match start_sequence { + NatsOffset::Earliest => DeliverPolicy::All, + NatsOffset::Latest => DeliverPolicy::Last, + NatsOffset::SequenceNumber(v) => { + let parsed = v.parse::()?; + DeliverPolicy::ByStartSequence { + start_sequence: 1 + parsed, + } } - } + NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime { + start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?, + }, + NatsOffset::None => DeliverPolicy::All, + }; + let consumer = stream + .get_or_create_consumer(&name, { + config.deliver_policy = deliver_policy; + config + }) + .await?; + Ok(consumer) } pub(crate) async fn build_or_get_stream( @@ -432,6 +474,7 @@ impl NatsCommon { let mut config = jetstream::stream::Config { // the subject default use name value name: self.subject.clone(), + max_bytes: 1000000, ..Default::default() }; if let Some(v) = self.max_bytes { @@ -452,4 +495,17 @@ impl NatsCommon { let stream = jetstream.get_or_create_stream(config).await?; Ok(stream) } + + pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> anyhow::Result { + let creds = format!( + "-----BEGIN NATS USER JWT-----\n{}\n------END NATS USER JWT------\n\n\ + ************************* IMPORTANT *************************\n\ + NKEY Seed printed below can be used to sign and prove identity.\n\ + NKEYs are sensitive and should be treated as secrets.\n\n\ + -----BEGIN USER NKEY SEED-----\n{}\n------END USER NKEY SEED------\n\n\ + *************************************************************", + jwt, seed + ); + Ok(creds) + } } diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index 88384bfb685e6..e987a45188114 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use anyhow; use async_trait::async_trait; -use super::source::NatsSplit; +use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; #[derive(Debug, Clone, Eq, PartialEq)] pub struct NatsSplitEnumerator { subject: String, - split_num: i32, + split_id: SplitId, } #[async_trait] @@ -36,7 +38,7 @@ impl SplitEnumerator for NatsSplitEnumerator { ) -> anyhow::Result { Ok(Self { subject: properties.common.subject, - split_num: 0, + split_id: Arc::from("0"), }) } @@ -44,8 +46,8 @@ impl SplitEnumerator for NatsSplitEnumerator { // TODO: to simplify the logic, return 1 split for first version let nats_split = NatsSplit { subject: self.subject.clone(), - split_num: 0, // be the same as `from_nats_jetstream_message` - start_sequence: None, + split_id: Arc::from("0"), // be the same as `from_nats_jetstream_message` + start_sequence: NatsOffset::None, }; Ok(vec![nats_split]) diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 1d887e342f1f8..3e8cc57bc1da8 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -29,6 +29,12 @@ pub const NATS_CONNECTOR: &str = "nats"; pub struct NatsProperties { #[serde(flatten)] pub common: NatsCommon, + + #[serde(rename = "scan.startup.mode")] + pub scan_startup_mode: Option, + + #[serde(rename = "scan.startup.timestamp_millis")] + pub start_time: Option, } impl NatsProperties {} diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index afb3029d3b917..e582df86664e8 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -13,19 +13,37 @@ // limitations under the License. use async_nats; +use async_nats::jetstream::Message; use crate::source::base::SourceMessage; -use crate::source::SourceMeta; +use crate::source::{SourceMeta, SplitId}; -impl SourceMessage { - pub fn from_nats_jetstream_message(message: async_nats::jetstream::message::Message) -> Self { +#[derive(Clone, Debug)] +pub struct NatsMessage { + pub split_id: SplitId, + pub sequence_number: String, + pub payload: Vec, +} + +impl From for SourceMessage { + fn from(message: NatsMessage) -> Self { SourceMessage { key: None, - payload: Some(message.message.payload.to_vec()), + payload: Some(message.payload), // For nats jetstream, use sequence id as offset - offset: message.info().unwrap().stream_sequence.to_string(), - split_id: "0".into(), + offset: message.sequence_number, + split_id: message.split_id, meta: SourceMeta::Empty, } } } + +impl NatsMessage { + pub fn new(split_id: SplitId, message: Message) -> Self { + NatsMessage { + split_id, + sequence_number: message.info().unwrap().stream_sequence.to_string(), + payload: message.message.payload.to_vec(), + } + } +} diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index d958b5a898495..6e22748bcf468 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,18 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::Result; +use anyhow::{anyhow, Result}; use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use super::message::NatsMessage; +use super::{NatsOffset, NatsSplit}; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; -use crate::source::nats::split::NatsSplit; use crate::source::nats::NatsProperties; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitReader, + BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitReader, }; pub struct NatsSplitReader { @@ -31,6 +32,8 @@ pub struct NatsSplitReader { properties: NatsProperties, parser_config: ParserConfig, source_ctx: SourceContextRef, + start_position: NatsOffset, + split_id: SplitId, } #[async_trait] @@ -47,15 +50,42 @@ impl SplitReader for NatsSplitReader { ) -> Result { // TODO: to simplify the logic, return 1 split for first version assert!(splits.len() == 1); + let split = splits.into_iter().next().unwrap(); + let split_id = split.split_id; + let start_position = match &split.start_sequence { + NatsOffset::None => match &properties.scan_startup_mode { + None => NatsOffset::Earliest, + Some(mode) => match mode.as_str() { + "latest" => NatsOffset::Latest, + "earliest" => NatsOffset::Earliest, + "timestamp_millis" => { + if let Some(time) = &properties.start_time { + NatsOffset::Timestamp(time.parse()?) + } else { + return Err(anyhow!("scan_startup_timestamp_millis is required")); + } + } + _ => { + return Err(anyhow!( + "invalid scan_startup_mode, accept earliest/latest/timestamp_millis" + )) + } + }, + }, + start_position => start_position.to_owned(), + }; + let consumer = properties .common - .build_consumer(0, splits[0].start_sequence) + .build_consumer(split_id.to_string(), start_position.clone()) .await?; Ok(Self { consumer, properties, parser_config, source_ctx, + start_position, + split_id, }) } @@ -75,7 +105,10 @@ impl CommonSplitReader for NatsSplitReader { for msgs in messages.ready_chunks(capacity) { let mut msg_vec = Vec::with_capacity(capacity); for msg in msgs { - msg_vec.push(SourceMessage::from_nats_jetstream_message(msg?)); + msg_vec.push(SourceMessage::from(NatsMessage::new( + self.split_id.clone(), + msg?, + ))); } yield msg_vec; } diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index f0fcfaff35481..4072de230d983 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -18,20 +18,29 @@ use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub enum NatsOffset { + Earliest, + Latest, + SequenceNumber(String), + Timestamp(i128), + None, +} + /// The states of a NATS split, which will be persisted to checkpoint. #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] pub struct NatsSplit { pub(crate) subject: String, // TODO: to simplify the logic, return 1 split for first version. May use parallelism in // future. - pub(crate) split_num: i32, - pub(crate) start_sequence: Option, + pub(crate) split_id: SplitId, + pub(crate) start_sequence: NatsOffset, } impl SplitMetaData for NatsSplit { fn id(&self) -> SplitId { // TODO: should avoid constructing a string every time - format!("{}", self.split_num).into() + format!("{}", self.split_id).into() } fn restore_from_json(value: JsonbVal) -> anyhow::Result { @@ -44,16 +53,21 @@ impl SplitMetaData for NatsSplit { } impl NatsSplit { - pub fn new(subject: String, split_num: i32, start_sequence: Option) -> Self { + pub fn new(subject: String, split_id: SplitId, start_sequence: NatsOffset) -> Self { Self { subject, - split_num, + split_id, start_sequence, } } pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> { - self.start_sequence = Some(start_sequence.as_str().parse::().unwrap()); + let start_sequence = if start_sequence.is_empty() { + NatsOffset::Earliest + } else { + NatsOffset::SequenceNumber(start_sequence) + }; + self.start_sequence = start_sequence; Ok(()) } } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 603619f3a8b27..bc01c7f871d89 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -26,7 +26,6 @@ aws-smithy-client = { version = "0.55", default-features = false, features = ["n base64 = { version = "0.21", features = ["alloc"] } bit-vec = { version = "0.6" } bitflags = { version = "2", default-features = false, features = ["std"] } -byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", features = ["alloc", "serde"] } clap = { version = "4", features = ["cargo", "derive", "env"] } @@ -112,7 +111,6 @@ unicode-bidi = { version = "0.3" } unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } -zeroize = { version = "1", features = ["zeroize_derive"] } [build-dependencies] ahash = { version = "0.8" } @@ -124,7 +122,6 @@ aws-smithy-client = { version = "0.55", default-features = false, features = ["n base64 = { version = "0.21", features = ["alloc"] } bit-vec = { version = "0.6" } bitflags = { version = "2", default-features = false, features = ["std"] } -byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } chrono = { version = "0.4", features = ["alloc", "serde"] } @@ -215,6 +212,5 @@ unicode-bidi = { version = "0.3" } unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } -zeroize = { version = "1", features = ["zeroize_derive"] } ### END HAKARI SECTION From 588bb80eec402eeb63dc1c8bc66661e996ba85fc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 19 Sep 2023 11:39:18 +0800 Subject: [PATCH 2/3] chore(deps): Bump aho-corasick from 1.0.5 to 1.1.0 (#12402) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7abddeda699db..891ab13dc4a91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,9 +49,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "0f2135563fb5c609d2b2b87c1e8ce7bc41b0b45430fa9661f457981503dd5bf0" dependencies = [ "memchr", ] From 161b9b0ef684aa065b3c78f1103925f13aafdf30 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 19 Sep 2023 12:03:12 +0800 Subject: [PATCH 3/3] feat(frontend): support background ddl for materialized views (#12355) --- ci/scripts/run-e2e-test.sh | 1 + e2e_test/background_ddl/basic.slt | 73 ++++++++++++++++++ proto/ddl_service.proto | 7 ++ src/common/src/session_config/mod.rs | 20 ++++- src/frontend/src/catalog/catalog_service.rs | 14 +++- src/frontend/src/handler/create_mv.rs | 12 ++- src/frontend/src/test_utils.rs | 6 +- src/meta/src/manager/notification.rs | 2 + src/meta/src/rpc/ddl_controller.rs | 85 +++++++++++++++++---- src/meta/src/rpc/service/ddl_service.rs | 24 +++++- src/rpc_client/src/meta_client.rs | 2 + 11 files changed, 220 insertions(+), 26 deletions(-) create mode 100644 e2e_test/background_ddl/basic.slt diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index ea97c93276eea..fc0fb5e648865 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -65,6 +65,7 @@ echo "--- e2e, $mode, batch" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ cluster_start sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}" +sqllogictest -p 4566 -d dev './e2e_test/background_ddl/**/*.slt' --junit "batch-ddl-${profile}" sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}" sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt' sqllogictest -p 4566 -d test './e2e_test/database/test.slt' diff --git a/e2e_test/background_ddl/basic.slt b/e2e_test/background_ddl/basic.slt new file mode 100644 index 0000000000000..6a8141697d006 --- /dev/null +++ b/e2e_test/background_ddl/basic.slt @@ -0,0 +1,73 @@ +statement ok +SET BACKGROUND_DDL=true; + +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4; + +statement ok +CREATE TABLE t (v1 int); + +statement ok +INSERT INTO t select * from generate_series(1, 500000); + +statement ok +FLUSH; + +statement ok +CREATE MATERIALIZED VIEW m1 as SELECT * FROM t; + +statement ok +CREATE MATERIALIZED VIEW m2 as SELECT * FROM t; + +statement ok +CREATE MATERIALIZED VIEW m3 as SELECT * FROM t; + +sleep 3s + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +3 + +statement error +SELECT * FROM m1; + +# Meta should always reject duplicate mview. +statement error +CREATE MATERIALIZED VIEW m3 as SELECT * FROM t; + +# Wait for background ddl to finish +sleep 30s + +query I +select count(*) from m1; +---- +500000 + +query I +select count(*) from m2; +---- +500000 + +query I +select count(*) from m3; +---- +500000 + +statement ok +DROP MATERIALIZED VIEW m1; + +statement ok +DROP MATERIALIZED VIEW m2; + +statement ok +DROP MATERIALIZED VIEW m3; + +statement ok +DROP TABLE t; + +statement ok +SET BACKGROUND_DDL=false; + +statement ok +ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 27c9f2ee82f83..68a74d421462c 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -97,9 +97,16 @@ message DropSinkResponse { uint64 version = 2; } +enum StreamJobExecutionMode { + STREAM_JOB_EXECUTION_MODE_UNSPECIFIED = 0; + BACKGROUND = 1; + FOREGROUND = 2; +} + message CreateMaterializedViewRequest { catalog.Table materialized_view = 1; stream_plan.StreamFragmentGraph fragment_graph = 2; + StreamJobExecutionMode stream_job_execution_mode = 3; } message CreateMaterializedViewResponse { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 2643f9e0643d4..367cf4ce35ac4 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -36,7 +36,7 @@ use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 37] = [ +const CONFIG_KEYS: [&str; 38] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -74,6 +74,7 @@ const CONFIG_KEYS: [&str; 37] = [ "STREAMING_RATE_LIMIT", "CDC_BACKFILL", "RW_STREAMING_OVER_WINDOW_CACHE_POLICY", + "BACKGROUND_DDL", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -115,6 +116,7 @@ const STANDARD_CONFORMING_STRINGS: usize = 33; const STREAMING_RATE_LIMIT: usize = 34; const CDC_BACKFILL: usize = 35; const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36; +const BACKGROUND_DDL: usize = 37; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -339,6 +341,7 @@ type RowSecurity = ConfigBool; type StandardConformingStrings = ConfigString; type StreamingRateLimit = ConfigU64; type CdcBackfill = ConfigBool; +type BackgroundDdl = ConfigBool; /// Report status or notice to caller. pub trait ConfigReporter { @@ -486,6 +489,8 @@ pub struct ConfigMap { /// Cache policy for partition cache in streaming over window. /// Can be "full", "recent", "recent_first_n" or "recent_last_n". streaming_over_window_cache_policy: OverWindowCachePolicy, + + background_ddl: BackgroundDdl, } impl ConfigMap { @@ -603,6 +608,8 @@ impl ConfigMap { self.cdc_backfill = val.as_slice().try_into()? } else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) { self.streaming_over_window_cache_policy = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) { + self.background_ddl = val.as_slice().try_into()?; } else { return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); } @@ -690,6 +697,8 @@ impl ConfigMap { Ok(self.cdc_backfill.to_string()) } else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) { Ok(self.streaming_over_window_cache_policy.to_string()) + } else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) { + Ok(self.background_ddl.to_string()) } else { Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) } @@ -882,6 +891,11 @@ impl ConfigMap { setting: self.streaming_over_window_cache_policy.to_string(), description: String::from(r#"Cache policy for partition cache in streaming over window. Can be "full", "recent", "recent_first_n" or "recent_last_n"."#), }, + VariableInfo{ + name: BackgroundDdl::entry_name().to_lowercase(), + setting: self.background_ddl.to_string(), + description: String::from("Run DDL statements in background"), + }, ] } @@ -1020,4 +1034,8 @@ impl ConfigMap { pub fn get_streaming_over_window_cache_policy(&self) -> OverWindowCachePolicy { self.streaming_over_window_cache_policy } + + pub fn get_background_ddl(&self) -> bool { + self.background_ddl.0 + } } diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 14a9f9ad104cc..bc2648fe105bd 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -24,7 +24,7 @@ use risingwave_pb::catalog::{ PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, }; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; -use risingwave_pb::ddl_service::create_connection_request; +use risingwave_pb::ddl_service::{create_connection_request, StreamJobExecutionMode}; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_rpc_client::MetaClient; use tokio::sync::watch::Receiver; @@ -70,6 +70,7 @@ pub trait CatalogWriter: Send + Sync { &self, table: PbTable, graph: StreamFragmentGraph, + stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()>; async fn create_table( @@ -190,12 +191,19 @@ impl CatalogWriter for CatalogWriterImpl { &self, table: PbTable, graph: StreamFragmentGraph, + stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()> { let (_, version) = self .meta_client - .create_materialized_view(table, graph) + .create_materialized_view(table, graph, stream_job_execution_mode) .await?; - self.wait_version(version).await + if matches!( + stream_job_execution_mode, + StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified + ) { + self.wait_version(version).await? + } + Ok(()) } async fn create_view(&self, view: PbView) -> Result<()> { diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index cc08064700aff..3983291e2845f 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -16,6 +16,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::catalog::PbTable; +use risingwave_pb::ddl_service::StreamJobExecutionMode; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::user::grant_privilege::Action; use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; @@ -188,6 +189,7 @@ It only indicates the physical clustering of the data, which may improve the per (table, graph) }; + // Ensure writes to `StreamJobTracker` are atomic. let _job_guard = session .env() @@ -199,9 +201,17 @@ It only indicates the physical clustering of the data, which may improve the per table.name.clone(), )); + let run_in_background = session.config().get_background_ddl(); + let stream_job_execution_mode = if run_in_background { + StreamJobExecutionMode::Background + } else { + StreamJobExecutionMode::Foreground + }; + + let session = session.clone(); let catalog_writer = session.catalog_writer()?; catalog_writer - .create_materialized_view(table, graph) + .create_materialized_view(table, graph, stream_job_execution_mode) .await?; Ok(PgResponse::empty_result( diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 20eb252fc5053..1823dcec91281 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -34,7 +34,7 @@ use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table, }; -use risingwave_pb::ddl_service::{create_connection_request, DdlProgress}; +use risingwave_pb::ddl_service::{create_connection_request, DdlProgress, StreamJobExecutionMode}; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, @@ -235,6 +235,7 @@ impl CatalogWriter for MockCatalogWriter { &self, mut table: PbTable, _graph: StreamFragmentGraph, + _stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()> { table.id = self.gen_id(); self.catalog.write().create_table(&table); @@ -260,7 +261,8 @@ impl CatalogWriter for MockCatalogWriter { table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); } - self.create_materialized_view(table, graph).await?; + self.create_materialized_view(table, graph, StreamJobExecutionMode::Foreground) + .await?; Ok(()) } diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 5e4172911ba70..2e6272a79c68f 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -36,6 +36,8 @@ pub type MessageStatus = Status; pub type Notification = Result; pub type NotificationManagerRef = Arc; pub type NotificationVersion = u64; +/// NOTE(kwannoel): This is just ignored, used in background DDL +pub const IGNORED_NOTIFICATION_VERSION: u64 = 0; #[derive(Clone, Debug)] pub enum LocalNotification { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 254e1ae5eca29..42b46da61c604 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -26,7 +26,7 @@ use risingwave_pb::catalog::{ connection, Connection, Database, Function, Schema, Source, Table, View, }; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; -use risingwave_pb::ddl_service::DdlProgress; +use risingwave_pb::ddl_service::{DdlProgress, StreamJobExecutionMode}; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use tokio::sync::Semaphore; use tracing::log::warn; @@ -37,6 +37,7 @@ use crate::manager::{ CatalogManagerRef, ClusterManagerRef, ConnectionId, DatabaseId, FragmentManagerRef, FunctionId, IdCategory, IndexId, LocalNotification, MetaSrvEnv, NotificationVersion, RelationIdEnum, SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, ViewId, + IGNORED_NOTIFICATION_VERSION, }; use crate::model::{StreamEnvironment, TableFragments}; use crate::rpc::cloud_provider::AwsEc2Client; @@ -92,7 +93,11 @@ pub enum DdlCommand { DropFunction(FunctionId), CreateView(View), DropView(ViewId, DropMode), - CreateStreamingJob(StreamingJob, StreamFragmentGraphProto), + CreateStreamingJob( + StreamingJob, + StreamFragmentGraphProto, + StreamJobExecutionMode, + ), DropStreamingJob(StreamingJobId, DropMode), ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping), AlterRelationName(Relation, String), @@ -235,8 +240,13 @@ impl DdlController { DdlCommand::DropView(view_id, drop_mode) => { ctrl.drop_view(view_id, drop_mode).await } - DdlCommand::CreateStreamingJob(stream_job, fragment_graph) => { - ctrl.create_streaming_job(stream_job, fragment_graph).await + DdlCommand::CreateStreamingJob( + stream_job, + fragment_graph, + stream_job_execution_mode, + ) => { + ctrl.create_streaming_job(stream_job, fragment_graph, stream_job_execution_mode) + .await } DdlCommand::DropStreamingJob(job_id, drop_mode) => { ctrl.drop_streaming_job(job_id, drop_mode).await @@ -404,6 +414,7 @@ impl DdlController { &self, mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, + stream_job_execution_mode: StreamJobExecutionMode, ) -> MetaResult { let _permit = self .creating_streaming_job_permits @@ -429,32 +440,76 @@ impl DdlController { internal_tables = ctx.internal_tables(); - match &stream_job { - StreamingJob::Table(Some(source), _) => { + match stream_job { + StreamingJob::Table(Some(ref source), _) => { // Register the source on the connector node. self.source_manager.register_source(source).await?; } - StreamingJob::Sink(sink) => { + StreamingJob::Sink(ref sink) => { // Validate the sink on the connector node. validate_sink(sink, self.env.connector_client()).await?; } _ => {} } - - self.stream_manager - .create_streaming_job(table_fragments, ctx) - .await?; + (ctx, table_fragments) }; - match result { - Ok(_) => self.finish_stream_job(stream_job, internal_tables).await, - Err(err) => { + let (ctx, table_fragments) = match result { + Ok(r) => r, + Err(e) => { self.cancel_stream_job(&stream_job, internal_tables).await; - Err(err) + return Err(e); + } + }; + + match stream_job_execution_mode { + StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified => { + self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables) + .await + } + StreamJobExecutionMode::Background => { + let ctrl = self.clone(); + let definition = stream_job.definition(); + let fut = async move { + let result = ctrl + .create_streaming_job_inner( + stream_job, + table_fragments, + ctx, + internal_tables, + ) + .await; + match result { + Err(e) => tracing::error!(definition, error = ?e, "stream_job_error"), + Ok(_) => { + tracing::info!(definition, "stream_job_ok") + } + } + }; + tokio::spawn(fut); + Ok(IGNORED_NOTIFICATION_VERSION) } } } + async fn create_streaming_job_inner( + &self, + stream_job: StreamingJob, + table_fragments: TableFragments, + ctx: CreateStreamingJobContext, + internal_tables: Vec, + ) -> MetaResult { + let result = self + .stream_manager + .create_streaming_job(table_fragments, ctx) + .await; + if let Err(e) = result { + self.cancel_stream_job(&stream_job, internal_tables).await; + return Err(e); + }; + self.finish_stream_job(stream_job, internal_tables).await + } + async fn drop_streaming_job( &self, job_id: StreamingJobId, diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index 8cf8f3e419a50..e22deadf947e8 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -229,7 +229,11 @@ impl DdlService for DdlServiceImpl { let version = self .ddl_controller - .run_command(DdlCommand::CreateStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::CreateStreamingJob( + stream_job, + fragment_graph, + StreamJobExecutionMode::Foreground, + )) .await?; Ok(Response::new(CreateSinkResponse { @@ -280,7 +284,11 @@ impl DdlService for DdlServiceImpl { let version = self .ddl_controller - .run_command(DdlCommand::CreateStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::CreateStreamingJob( + stream_job, + fragment_graph, + req.stream_job_execution_mode(), + )) .await?; Ok(Response::new(CreateMaterializedViewResponse { @@ -331,7 +339,11 @@ impl DdlService for DdlServiceImpl { let version = self .ddl_controller - .run_command(DdlCommand::CreateStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::CreateStreamingJob( + stream_job, + fragment_graph, + StreamJobExecutionMode::Foreground, + )) .await?; Ok(Response::new(CreateIndexResponse { @@ -423,7 +435,11 @@ impl DdlService for DdlServiceImpl { let version = self .ddl_controller - .run_command(DdlCommand::CreateStreamingJob(stream_job, fragment_graph)) + .run_command(DdlCommand::CreateStreamingJob( + stream_job, + fragment_graph, + StreamJobExecutionMode::Foreground, + )) .await?; Ok(Response::new(CreateTableResponse { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f3d9f8ad12fd2..50cab50c92682 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -332,10 +332,12 @@ impl MetaClient { &self, table: PbTable, graph: StreamFragmentGraph, + stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<(TableId, CatalogVersion)> { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), + stream_job_execution_mode: stream_job_execution_mode as i32, }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here