From 31fdc265c0ab7ed22fc081e71c5af3c3d88f82a1 Mon Sep 17 00:00:00 2001 From: Xu Date: Fri, 15 Sep 2023 21:01:14 -0400 Subject: [PATCH 1/7] feat(expr): switch to `fancy-regex` crate & update the original version (#12329) Co-authored-by: xzhseh --- Cargo.lock | 21 +++++++++++++ e2e_test/batch/basic/func.slt.part | 24 +++++++++++++++ src/expr/Cargo.toml | 1 + src/expr/src/table_function/regexp_matches.rs | 1 + src/expr/src/vector_op/regexp.rs | 30 ++++++++++++------- src/workspace-hack/Cargo.toml | 2 ++ 6 files changed, 68 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e73822d5a77d..b6a0183a9f564 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1160,6 +1160,15 @@ dependencies = [ "shlex", ] +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + [[package]] name = "bit-vec" version = "0.6.3" @@ -2668,6 +2677,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fancy-regex" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95f7c0680e4142284cf8b22c14a476e87d61b004a3a0861872b32ef7ead40a2" +dependencies = [ + "bit-set", + "regex", +] + [[package]] name = "faster-hex" version = "0.8.1" @@ -6967,6 +6986,7 @@ dependencies = [ "easy-ext", "either", "expect-test", + "fancy-regex", "futures", "futures-async-stream", "futures-util", @@ -10031,6 +10051,7 @@ dependencies = [ "aws-smithy-client", "axum", "base64 0.21.3", + "bit-vec", "bitflags 2.4.0", "byteorder", "bytes", diff --git a/e2e_test/batch/basic/func.slt.part b/e2e_test/batch/basic/func.slt.part index 22c07c7dd2b45..ec60f6512406d 100644 --- a/e2e_test/batch/basic/func.slt.part +++ b/e2e_test/batch/basic/func.slt.part @@ -570,6 +570,30 @@ select regexp_replace('💩💩💩💩💩foo🤔️bar亲爱的😭baz这不 ---- 💩💩💩💩💩foo🤔️bar亲爱的😭这是🥵爱情❤️‍🔥 +# Positive Lookahead +query T +select regexp_replace('foobarbaz', 'a(?=r)', 'X'); +---- +foobXrbaz + +# Negative Lookahead +query T +select regexp_replace('chocolate', 'o(?!c)', 'X'); +---- +chocXlate + +# Positive Lookbehind +query T +select regexp_replace('foobarXaz', '(?<=X)a', 'X'); +---- +foobarXXz + +# Negative Lookbehind +query T +select regexp_replace('foobarXaz', '(?( // ignored in PostgreSQL's behavior. let skip_flag = regex.regex.captures_len() > 1; let list = capture + .unwrap() .iter() .skip(if skip_flag { 1 } else { 0 }) .map(|mat| mat.map(|m| m.as_str().into())) diff --git a/src/expr/src/vector_op/regexp.rs b/src/expr/src/vector_op/regexp.rs index 9cf713551cff4..0962d0bf16c4f 100644 --- a/src/expr/src/vector_op/regexp.rs +++ b/src/expr/src/vector_op/regexp.rs @@ -16,7 +16,7 @@ use std::str::FromStr; -use regex::{Regex, RegexBuilder}; +use fancy_regex::{Regex, RegexBuilder}; use risingwave_common::array::ListValue; use risingwave_expr_macro::function; @@ -32,10 +32,17 @@ pub struct RegexpContext { impl RegexpContext { fn new(pattern: &str, flags: &str, replacement: &str) -> Result { let options = RegexpOptions::from_str(flags)?; + + let origin = if options.case_insensitive { + format!("(?i:{})", pattern) + } else { + pattern.to_string() + }; + Ok(Self { - regex: RegexBuilder::new(pattern) - .case_insensitive(options.case_insensitive) - .build()?, + regex: RegexBuilder::new(&origin) + .build() + .map_err(|e| ExprError::Parse(e.to_string().into()))?, global: options.global, replacement: make_replacement(replacement), }) @@ -142,7 +149,7 @@ fn regexp_match(text: &str, regex: &RegexpContext) -> Option { // If there are multiple captures, then the first one is the whole match, and should be // ignored in PostgreSQL's behavior. let skip_first = regex.regex.captures_len() > 1; - let capture = regex.regex.captures(text)?; + let capture = regex.regex.captures(text).unwrap()?; let list = capture .iter() .skip(if skip_first { 1 } else { 0 }) @@ -190,7 +197,7 @@ fn regexp_count(text: &str, start: i32, regex: &RegexpContext) -> Result { }; let mut count = 0; - while let Some(captures) = regex.regex.captures(&text[start..]) { + while let Ok(Some(captures)) = regex.regex.captures(&text[start..]) { count += 1; start += captures.get(0).unwrap().end(); } @@ -297,7 +304,7 @@ fn regexp_replace( let mut ret = text[..search_start].to_string(); // Begin the actual replace logic - while let Some(capture) = ctx.regex.captures(&text[search_start..]) { + while let Ok(Some(capture)) = ctx.regex.captures(&text[search_start..]) { let match_start = capture.get(0).unwrap().start(); let match_end = capture.get(0).unwrap().end(); @@ -344,7 +351,7 @@ fn regexp_replace( let mut count = 1; // The absolute index for the start of searching let mut search_start = start; - while let Some(capture) = ctx.regex.captures(&text[search_start..]) { + while let Ok(Some(capture)) = ctx.regex.captures(&text[search_start..]) { // Get the current start & end index let match_start = capture.get(0).unwrap().start(); let match_end = capture.get(0).unwrap().end(); @@ -378,7 +385,7 @@ fn regexp_replace( if let Some(n) = n { // Replace only the N-th match let mut count = 1; - while let Some(capture) = ctx.regex.captures(&text[start..]) { + while let Ok(Some(capture)) = ctx.regex.captures(&text[start..]) { if count == n { // We've reached the pattern to replace let match_start = capture.get(0).unwrap().start(); @@ -406,12 +413,13 @@ fn regexp_replace( } } else { // `N` is not specified - if ctx.regex.captures(&text[start..]).is_none() { + if let Ok(None) = ctx.regex.captures(&text[start..]) { // No match return Ok(text.into()); } + // Otherwise replace the source text - if let Some(capture) = ctx.regex.captures(&text[start..]) { + if let Ok(Some(capture)) = ctx.regex.captures(&text[start..]) { let match_start = capture.get(0).unwrap().start(); let match_end = capture.get(0).unwrap().end(); diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 63cf30bf854df..e4f7f52a66ca3 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -24,6 +24,7 @@ aws-credential-types = { version = "0.55", default-features = false, features = aws-sdk-s3 = { version = "0.28", features = ["native-tls"] } aws-smithy-client = { version = "0.55", default-features = false, features = ["native-tls", "rustls"] } base64 = { version = "0.21", features = ["alloc"] } +bit-vec = { version = "0.6" } bitflags = { version = "2", default-features = false, features = ["std"] } byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } @@ -121,6 +122,7 @@ aws-credential-types = { version = "0.55", default-features = false, features = aws-sdk-s3 = { version = "0.28", features = ["native-tls"] } aws-smithy-client = { version = "0.55", default-features = false, features = ["native-tls", "rustls"] } base64 = { version = "0.21", features = ["alloc"] } +bit-vec = { version = "0.6" } bitflags = { version = "2", default-features = false, features = ["std"] } byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } From 8ef74ad37194848ab7c75c7d39d503ad1d64025f Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Sat, 16 Sep 2023 12:16:02 +0800 Subject: [PATCH 2/7] fix(udf): handle visibility of input chunks in UDTF (#12357) Signed-off-by: Runji Wang --- e2e_test/udf/udf.slt | 29 ++++++++++++++++ src/common/src/array/arrow.rs | 6 ++-- src/common/src/array/data_chunk.rs | 22 ++++++++++++ src/expr/src/table_function/mod.rs | 4 +-- src/expr/src/table_function/user_defined.rs | 38 ++++++++++++++++----- 5 files changed, 87 insertions(+), 12 deletions(-) diff --git a/e2e_test/udf/udf.slt b/e2e_test/udf/udf.slt index 33579a825832e..110b1c0f373dd 100644 --- a/e2e_test/udf/udf.slt +++ b/e2e_test/udf/udf.slt @@ -224,6 +224,35 @@ select (extract_tcp_info(E'\\x45000034a8a8400040065b8ac0a8000ec0a80001035d20b6d9 ---- 192.168.0.14 192.168.0.1 861 8374 +# steaming +# to ensure UDF & UDTF respect visibility + +statement ok +create table t (x int); + +statement ok +create materialized view mv as select gcd(x, x), series(x) from t where x <> 2; + +statement ok +insert into t values (1), (2), (3); + +statement ok +flush; + +query II +select * from mv; +---- +1 0 +3 0 +3 1 +3 2 + +statement ok +drop materialized view mv; + +statement ok +drop table t; + # error handling statement error diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 9b4165b608d98..0f89e6b4f53f4 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -27,6 +27,7 @@ use crate::util::iter_util::ZipEqDebug; // Implement bi-directional `From` between `DataChunk` and `arrow_array::RecordBatch`. +// note: DataChunk -> arrow RecordBatch will IGNORE the visibilities. impl TryFrom<&DataChunk> for arrow_array::RecordBatch { type Error = ArrayError; @@ -47,8 +48,9 @@ impl TryFrom<&DataChunk> for arrow_array::RecordBatch { .collect(); let schema = Arc::new(Schema::new(fields)); - - arrow_array::RecordBatch::try_new(schema, columns) + let opts = + arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity())); + arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts) .map_err(|err| ArrayError::ToArrow(err.to_string())) } } diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index cc4bef12cccff..657dfd3c366f9 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::fmt::Display; use std::hash::BuildHasher; use std::sync::Arc; @@ -265,6 +266,27 @@ impl DataChunk { } } + /// Convert the chunk to compact format. + /// + /// If the chunk is not compacted, return a new compacted chunk, otherwise return a reference to self. + pub fn compact_cow(&self) -> Cow<'_, Self> { + match &self.vis2 { + Vis::Compact(_) => Cow::Borrowed(self), + Vis::Bitmap(visibility) => { + let cardinality = visibility.count_ones(); + let columns = self + .columns + .iter() + .map(|col| { + let array = col; + array.compact(visibility, cardinality).into() + }) + .collect::>(); + Cow::Owned(Self::new(columns, cardinality)) + } + } + } + pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult { let mut columns = vec![]; for any_col in proto.get_columns() { diff --git a/src/expr/src/table_function/mod.rs b/src/expr/src/table_function/mod.rs index 23453d9f7b956..245eebd7f3720 100644 --- a/src/expr/src/table_function/mod.rs +++ b/src/expr/src/table_function/mod.rs @@ -51,7 +51,7 @@ pub trait TableFunction: std::fmt::Debug + Sync + Send { /// # Contract of the output /// /// The returned `DataChunk` contains exact two columns: - /// - The first column is an I32Array containing row indexes of input chunk. It should be + /// - The first column is an I32Array containing row indices of input chunk. It should be /// monotonically increasing. /// - The second column is the output values. The data type of the column is `return_type`. /// @@ -82,7 +82,7 @@ pub trait TableFunction: std::fmt::Debug + Sync + Send { /// (You don't need to understand this section to implement a `TableFunction`) /// /// The output of the `TableFunction` is different from the output of the `ProjectSet` executor. - /// `ProjectSet` executor uses the row indexes to stitch multiple table functions and produces + /// `ProjectSet` executor uses the row indices to stitch multiple table functions and produces /// `projected_row_id`. /// /// ## Example diff --git a/src/expr/src/table_function/user_defined.rs b/src/expr/src/table_function/user_defined.rs index 7b0385854c544..813cf23504482 100644 --- a/src/expr/src/table_function/user_defined.rs +++ b/src/expr/src/table_function/user_defined.rs @@ -14,9 +14,10 @@ use std::sync::Arc; +use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; use futures_util::stream; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{DataChunk, I32Array}; use risingwave_common::bail; use risingwave_udf::ArrowFlightUdfClient; @@ -25,6 +26,7 @@ use super::*; #[derive(Debug)] pub struct UserDefinedTableFunction { children: Vec, + #[allow(dead_code)] arg_schema: SchemaRef, return_type: DataType, client: Arc, @@ -49,25 +51,42 @@ impl TableFunction for UserDefinedTableFunction { impl UserDefinedTableFunction { #[try_stream(boxed, ok = DataChunk, error = ExprError)] async fn eval_inner<'a>(&'a self, input: &'a DataChunk) { + // evaluate children expressions let mut columns = Vec::with_capacity(self.children.len()); for c in &self.children { - let val = c.eval_checked(input).await?.as_ref().try_into()?; + let val = c.eval_checked(input).await?; columns.push(val); } + let direct_input = DataChunk::new(columns, input.vis().clone()); + + // compact the input chunk and record the row mapping + let visible_rows = direct_input.vis().iter_ones().collect_vec(); + let compacted_input = direct_input.compact_cow(); + let arrow_input = RecordBatch::try_from(compacted_input.as_ref())?; - let opts = - arrow_array::RecordBatchOptions::default().with_row_count(Some(input.cardinality())); - let input = - arrow_array::RecordBatch::try_new_with_options(self.arg_schema.clone(), columns, &opts) - .expect("failed to build record batch"); + // call UDTF #[for_await] for res in self .client - .call_stream(&self.identifier, stream::once(async { input })) + .call_stream(&self.identifier, stream::once(async { arrow_input })) .await? { let output = DataChunk::try_from(&res?)?; self.check_output(&output)?; + + // we send the compacted input to UDF, so we need to map the row indices back to the original input + let origin_indices = output + .column_at(0) + .as_int32() + .raw_iter() + // we have checked all indices are non-negative + .map(|idx| visible_rows[idx as usize] as i32) + .collect::(); + + let output = DataChunk::new( + vec![origin_indices.into_ref(), output.column_at(1).clone()], + output.vis().clone(), + ); yield output; } } @@ -87,6 +106,9 @@ impl UserDefinedTableFunction { DataType::Int32, ); } + if output.column_at(0).as_int32().raw_iter().any(|i| i < 0) { + bail!("UDF returned negative row index"); + } if !output .column_at(1) .data_type() From a975d9346333c53f34c6f73793fb718076177d7b Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Sun, 17 Sep 2023 19:04:24 +0800 Subject: [PATCH 3/7] fix: handle kafka sink message timeout error (#12350) --- src/connector/src/sink/kafka.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 72234bcbbb907..34403c4e44ad4 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -434,7 +434,14 @@ impl KafkaSinkWriter { Err((e, rec)) => { record = rec; match e { - KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => { + err @ KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) + | err @ KafkaError::MessageProduction(RDKafkaErrorCode::MessageTimedOut) => { + tracing::warn!( + "producing message (key {:?}) to topic {} failed, err {:?}, retrying", + record.key.map(|k| k.to_bytes()), + record.topic, + err + ); tokio::time::sleep(self.config.retry_interval).await; continue; } From f304ed214295b26fd068ef4e408524ef095a9c7f Mon Sep 17 00:00:00 2001 From: xxchan Date: Sun, 17 Sep 2023 20:20:17 +0800 Subject: [PATCH 4/7] revert: Revert "chore: add platforms to hakari (#12333)" (#12363) --- .config/hakari.toml | 8 +- Cargo.lock | 14 ---- src/workspace-hack/Cargo.toml | 144 ---------------------------------- 3 files changed, 4 insertions(+), 162 deletions(-) diff --git a/.config/hakari.toml b/.config/hakari.toml index e3e58e80cbf16..fd5ad4473184f 100644 --- a/.config/hakari.toml +++ b/.config/hakari.toml @@ -15,10 +15,10 @@ resolver = "2" # Add triples corresponding to platforms commonly used by developers here. # https://doc.rust-lang.org/rustc/platform-support.html platforms = [ - "x86_64-unknown-linux-gnu", - "aarch64-unknown-linux-gnu", - "x86_64-apple-darwin", - "aarch64-apple-darwin", + # "x86_64-unknown-linux-gnu", + # "aarch64-unknown-linux-gnu", + # "x86_64-apple-darwin", + # "aarch64-apple-darwin", ] # Write out exact versions rather than a semver range. (Defaults to false.) diff --git a/Cargo.lock b/Cargo.lock index b6a0183a9f564..6cb0662b32155 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10049,7 +10049,6 @@ dependencies = [ "aws-credential-types", "aws-sdk-s3", "aws-smithy-client", - "axum", "base64 0.21.3", "bit-vec", "bitflags 2.4.0", @@ -10092,18 +10091,12 @@ dependencies = [ "log", "madsim-rdkafka", "madsim-tokio", - "memchr", - "mime_guess", - "miniz_oxide", "mio", "multimap", "nom", "num-bigint", "num-integer", "num-traits", - "once_cell", - "openssl-sys", - "opentelemetry", "opentelemetry_api", "opentelemetry_sdk", "parking_lot 0.12.1", @@ -10118,15 +10111,12 @@ dependencies = [ "rand", "rand_chacha", "rand_core 0.6.4", - "rdkafka-sys", "regex", "regex-automata 0.3.8", "regex-syntax 0.7.5", "reqwest", "ring", "rust_decimal", - "rustix 0.38.11", - "rustls 0.21.7", "scopeguard", "serde", "serde_json", @@ -10135,8 +10125,6 @@ dependencies = [ "subtle", "syn 1.0.109", "syn 2.0.33", - "tikv-jemalloc-sys", - "tikv-jemallocator", "time", "time-macros", "tinyvec", @@ -10148,7 +10136,6 @@ dependencies = [ "toml_edit", "tonic", "tower", - "tower-http", "tracing", "tracing-core", "tracing-subscriber", @@ -10157,7 +10144,6 @@ dependencies = [ "url", "uuid", "zeroize", - "zstd-sys", ] [[package]] diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index e4f7f52a66ca3..603619f3a8b27 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -217,148 +217,4 @@ url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } zeroize = { version = "1", features = ["zeroize_derive"] } -[target.x86_64-unknown-linux-gnu.dependencies] -axum = { version = "0.6" } -memchr = { version = "2" } -mime_guess = { version = "2" } -miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } -once_cell = { version = "1", features = ["unstable"] } -openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } -opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } -rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } -rustix = { version = "0.38", features = ["fs", "termios"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } -serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } -zstd-sys = { version = "2", features = ["std"] } - -[target.x86_64-unknown-linux-gnu.build-dependencies] -axum = { version = "0.6" } -memchr = { version = "2" } -mime_guess = { version = "2" } -miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } -once_cell = { version = "1", features = ["unstable"] } -openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } -opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } -rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } -rustix = { version = "0.38", features = ["fs", "termios"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } -serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } -zstd-sys = { version = "2", features = ["std"] } - -[target.aarch64-unknown-linux-gnu.dependencies] -axum = { version = "0.6" } -memchr = { version = "2" } -mime_guess = { version = "2" } -miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } -once_cell = { version = "1", features = ["unstable"] } -openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } -opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } -rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } -rustix = { version = "0.38", features = ["fs", "termios"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } -serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } -zstd-sys = { version = "2", features = ["std"] } - -[target.aarch64-unknown-linux-gnu.build-dependencies] -axum = { version = "0.6" } -memchr = { version = "2" } -mime_guess = { version = "2" } -miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } -once_cell = { version = "1", features = ["unstable"] } -openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } -opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } -rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } -rustix = { version = "0.38", features = ["fs", "termios"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } -serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } -zstd-sys = { version = "2", features = ["std"] } - -[target.x86_64-apple-darwin.dependencies] -axum = { version = "0.6" } -memchr = { version = "2" } -mime_guess = { version = "2" } -miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } -once_cell = { version = "1", features = ["unstable"] } -openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } -opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } -rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } -rustix = { version = "0.38", features = ["fs", "termios"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } -serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } -zstd-sys = { version = "2", features = ["std"] } - -[target.x86_64-apple-darwin.build-dependencies] -axum = { version = "0.6" } -memchr = { version = "2" } -mime_guess = { version = "2" } -miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } -once_cell = { version = "1", features = ["unstable"] } -openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } -opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } -rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } -rustix = { version = "0.38", features = ["fs", "termios"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } -serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } -zstd-sys = { version = "2", features = ["std"] } - -[target.aarch64-apple-darwin.dependencies] -axum = { version = "0.6" } -memchr = { version = "2" } -mime_guess = { version = "2" } -miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } -once_cell = { version = "1", features = ["unstable"] } -openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } -opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } -rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } -rustix = { version = "0.38", features = ["fs", "termios"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } -serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } -zstd-sys = { version = "2", features = ["std"] } - -[target.aarch64-apple-darwin.build-dependencies] -axum = { version = "0.6" } -memchr = { version = "2" } -mime_guess = { version = "2" } -miniz_oxide = { version = "0.7", default-features = false, features = ["with-alloc"] } -once_cell = { version = "1", features = ["unstable"] } -openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry = { version = "0.20", default-features = false, features = ["metrics", "rt-tokio", "trace"] } -opentelemetry_sdk = { version = "0.20", default-features = false, features = ["rt-tokio"] } -rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8ea07c4", default-features = false, features = ["cmake-build", "gssapi", "libz", "ssl-vendored", "zstd"] } -rustix = { version = "0.38", features = ["fs", "termios"] } -rustls = { version = "0.21", features = ["dangerous_configuration"] } -serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } -tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } -zstd-sys = { version = "2", features = ["std"] } - ### END HAKARI SECTION From 1877aed241f9db69468fb7122bf8540ffc7b4dd9 Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Mon, 18 Sep 2023 13:49:15 +0800 Subject: [PATCH 5/7] refactor(sink): impl SinkFormatter for AppendOnly and Upsert (#12321) --- src/connector/src/lib.rs | 1 + .../src/sink/formatter/append_only.rs | 55 +++++++++++++ src/connector/src/sink/formatter/mod.rs | 50 ++++++++++++ src/connector/src/sink/formatter/upsert.rs | 61 ++++++++++++++ src/connector/src/sink/kafka.rs | 72 +++++++---------- src/connector/src/sink/kinesis.rs | 81 +++++++++---------- src/connector/src/sink/mod.rs | 41 ++++++++++ src/connector/src/sink/utils.rs | 48 ----------- 8 files changed, 274 insertions(+), 135 deletions(-) create mode 100644 src/connector/src/sink/formatter/append_only.rs create mode 100644 src/connector/src/sink/formatter/mod.rs create mode 100644 src/connector/src/sink/formatter/upsert.rs diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 75a895a5f80cd..f103346bf14fa 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -30,6 +30,7 @@ #![feature(async_fn_in_trait)] #![feature(associated_type_defaults)] #![feature(impl_trait_in_assoc_type)] +#![feature(iter_from_generator)] use std::time::Duration; diff --git a/src/connector/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs new file mode 100644 index 0000000000000..ba7d018cd7fbc --- /dev/null +++ b/src/connector/src/sink/formatter/append_only.rs @@ -0,0 +1,55 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::Op; + +use super::{Result, SinkFormatter, StreamChunk}; +use crate::sink::encoder::RowEncoder; +use crate::tri; + +pub struct AppendOnlyFormatter { + key_encoder: KE, + val_encoder: VE, +} + +impl AppendOnlyFormatter { + pub fn new(key_encoder: KE, val_encoder: VE) -> Self { + Self { + key_encoder, + val_encoder, + } + } +} + +impl SinkFormatter for AppendOnlyFormatter { + type K = KE::Output; + type V = VE::Output; + + fn format_chunk( + &self, + chunk: &StreamChunk, + ) -> impl Iterator, Option)>> { + std::iter::from_generator(|| { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let event_key_object = Some(tri!(self.key_encoder.encode(row))); + let event_object = Some(tri!(self.val_encoder.encode(row))); + + yield Ok((event_key_object, event_object)) + } + }) + } +} diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs new file mode 100644 index 0000000000000..432e4d33e0f2b --- /dev/null +++ b/src/connector/src/sink/formatter/mod.rs @@ -0,0 +1,50 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::StreamChunk; + +use crate::sink::Result; + +mod append_only; +mod upsert; + +pub use append_only::AppendOnlyFormatter; +pub use upsert::UpsertFormatter; + +/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format, +/// for example append-only, upsert or debezium. +pub trait SinkFormatter { + type K; + type V; + + fn format_chunk( + &self, + chunk: &StreamChunk, + ) -> impl Iterator, Option)>>; +} + +/// `tri!` in generators yield `Err` and return `()` +/// `?` in generators return `Err` +#[macro_export] +macro_rules! tri { + ($expr:expr) => { + match $expr { + Ok(val) => val, + Err(err) => { + yield Err(err); + return; + } + } + }; +} diff --git a/src/connector/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs new file mode 100644 index 0000000000000..6ef2b5f2ca333 --- /dev/null +++ b/src/connector/src/sink/formatter/upsert.rs @@ -0,0 +1,61 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::array::Op; + +use super::{Result, SinkFormatter, StreamChunk}; +use crate::sink::encoder::RowEncoder; +use crate::tri; + +pub struct UpsertFormatter { + key_encoder: KE, + val_encoder: VE, +} + +impl UpsertFormatter { + pub fn new(key_encoder: KE, val_encoder: VE) -> Self { + Self { + key_encoder, + val_encoder, + } + } +} + +impl SinkFormatter for UpsertFormatter { + type K = KE::Output; + type V = VE::Output; + + fn format_chunk( + &self, + chunk: &StreamChunk, + ) -> impl Iterator, Option)>> { + std::iter::from_generator(|| { + for (op, row) in chunk.rows() { + let event_key_object = Some(tri!(self.key_encoder.encode(row))); + + let event_object = match op { + Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), + // Empty value with a key + Op::Delete => None, + Op::UpdateDelete => { + // upsert semantic does not require update delete event + continue; + } + }; + + yield Ok((event_key_object, event_object)) + } + }) + } +} diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 34403c4e44ad4..378c4ee8d930a 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -30,19 +30,16 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; use serde_derive::{Deserialize, Serialize}; -use serde_json::Value; use serde_with::{serde_as, DisplayFromStr}; use super::encoder::{JsonEncoder, TimestampHandlingMode}; +use super::formatter::{AppendOnlyFormatter, UpsertFormatter}; use super::{ - Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, - SINK_TYPE_UPSERT, + FormattedSink, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::common::KafkaCommon; -use crate::sink::utils::{ - gen_append_only_message_stream, gen_debezium_message_stream, gen_upsert_message_stream, - AppendOnlyAdapterOpts, DebeziumAdapterOpts, UpsertAdapterOpts, -}; +use crate::sink::utils::{gen_debezium_message_stream, DebeziumAdapterOpts}; use crate::sink::{ DummySinkCommitCoordinator, Result, SinkWriterParam, SinkWriterV1, SinkWriterV1Adapter, }; @@ -460,20 +457,20 @@ impl KafkaSinkWriter { ret } - async fn write_json_objects( + async fn write_inner( &mut self, - event_key_object: Option, - event_object: Option, + event_key_object: Option>, + event_object: Option>, ) -> Result<()> { let topic = self.config.common.topic.clone(); // here we assume the key part always exists and value part is optional. // if value is None, we will skip the payload part. - let key_str = event_key_object.unwrap().to_string(); - let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes()); + let key_str = event_key_object.unwrap(); + let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(&key_str); let payload; if let Some(value) = event_object { - payload = value.to_string(); - record = record.payload(payload.as_bytes()); + payload = value; + record = record.payload(&payload); } // Send the data but not wait it to finish sinking // Will join all `DeliveryFuture` during commit @@ -544,8 +541,11 @@ impl KafkaSinkWriter { #[for_await] for msg in dbz_stream { let (event_key_object, event_object) = msg?; - self.write_json_objects(event_key_object, event_object) - .await?; + self.write_inner( + event_key_object.map(|j| j.to_string().into_bytes()), + event_object.map(|j| j.to_string().into_bytes()), + ) + .await?; } Ok(()) } @@ -559,20 +559,9 @@ impl KafkaSinkWriter { let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli); // Initialize the upsert_stream - let upsert_stream = gen_upsert_message_stream( - chunk, - UpsertAdapterOpts::default(), - key_encoder, - val_encoder, - ); + let f = UpsertFormatter::new(key_encoder, val_encoder); - #[for_await] - for msg in upsert_stream { - let (event_key_object, event_object) = msg?; - self.write_json_objects(event_key_object, event_object) - .await?; - } - Ok(()) + self.write_chunk(chunk, f).await } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { @@ -584,20 +573,18 @@ impl KafkaSinkWriter { let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli); // Initialize the append_only_stream - let append_only_stream = gen_append_only_message_stream( - chunk, - AppendOnlyAdapterOpts::default(), - key_encoder, - val_encoder, - ); + let f = AppendOnlyFormatter::new(key_encoder, val_encoder); - #[for_await] - for msg in append_only_stream { - let (event_key_object, event_object) = msg?; - self.write_json_objects(event_key_object, event_object) - .await?; - } - Ok(()) + self.write_chunk(chunk, f).await + } +} + +impl FormattedSink for KafkaSinkWriter { + type K = Vec; + type V = Vec; + + async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { + self.write_inner(k, v).await } } @@ -652,6 +639,7 @@ mod test { use risingwave_common::catalog::Field; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::DataType; + use serde_json::Value; use super::*; use crate::sink::utils::*; diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index b73b48771c2bd..e319984de8a34 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -29,13 +29,11 @@ use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; -use super::SinkParam; +use super::formatter::{AppendOnlyFormatter, UpsertFormatter}; +use super::{FormattedSink, SinkParam}; use crate::common::KinesisCommon; use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; -use crate::sink::utils::{ - gen_append_only_message_stream, gen_debezium_message_stream, gen_upsert_message_stream, - AppendOnlyAdapterOpts, DebeziumAdapterOpts, UpsertAdapterOpts, -}; +use crate::sink::utils::{gen_debezium_message_stream, DebeziumAdapterOpts}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -173,7 +171,8 @@ impl KinesisSinkWriter { }) } - async fn put_record(&self, key: &str, payload: Blob) -> Result { + async fn put_record(&self, key: &str, payload: Vec) -> Result { + let payload = Blob::new(payload); // todo: switch to put_records() for batching Retry::spawn( ExponentialBackoff::from_millis(100).map(jitter).take(3), @@ -202,40 +201,28 @@ impl KinesisSinkWriter { }) } - async fn upsert(&self, chunk: StreamChunk) -> Result<()> { + async fn upsert(mut self: &Self, chunk: StreamChunk) -> Result<()> { let key_encoder = JsonEncoder::new( &self.schema, Some(&self.pk_indices), TimestampHandlingMode::Milli, ); let val_encoder = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::Milli); - let upsert_stream = gen_upsert_message_stream( - chunk, - UpsertAdapterOpts::default(), - key_encoder, - val_encoder, - ); + let f = UpsertFormatter::new(key_encoder, val_encoder); - crate::impl_load_stream_write_record!(upsert_stream, self.put_record); - Ok(()) + self.write_chunk(chunk, f).await } - async fn append_only(&self, chunk: StreamChunk) -> Result<()> { + async fn append_only(mut self: &Self, chunk: StreamChunk) -> Result<()> { let key_encoder = JsonEncoder::new( &self.schema, Some(&self.pk_indices), TimestampHandlingMode::Milli, ); let val_encoder = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::Milli); - let append_only_stream = gen_append_only_message_stream( - chunk, - AppendOnlyAdapterOpts::default(), - key_encoder, - val_encoder, - ); + let f = AppendOnlyFormatter::new(key_encoder, val_encoder); - crate::impl_load_stream_write_record!(append_only_stream, self.put_record); - Ok(()) + self.write_chunk(chunk, f).await } async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> { @@ -249,12 +236,36 @@ impl KinesisSinkWriter { &self.sink_from_name, ); - crate::impl_load_stream_write_record!(dbz_stream, self.put_record); + #[for_await] + for msg in dbz_stream { + let (event_key_object, event_object) = msg?; + let key_str = event_key_object.unwrap().to_string(); + self.put_record( + &key_str, + if let Some(value) = event_object { + value.to_string().into_bytes() + } else { + vec![] + }, + ) + .await?; + } Ok(()) } } +impl FormattedSink for &KinesisSinkWriter { + type K = String; + type V = Vec; + + async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { + self.put_record(&k.unwrap(), v.unwrap_or_default()) + .await + .map(|_| ()) + } +} + #[async_trait::async_trait] impl SinkWriter for KinesisSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { @@ -285,23 +296,3 @@ impl SinkWriter for KinesisSinkWriter { Ok(()) } } - -#[macro_export] -macro_rules! impl_load_stream_write_record { - ($stream:ident, $op_fn:stmt) => { - #[for_await] - for msg in $stream { - let (event_key_object, event_object) = msg?; - let key_str = event_key_object.unwrap().to_string(); - $op_fn( - &key_str, - Blob::new(if let Some(value) = event_object { - value.to_string().into_bytes() - } else { - vec![] - }), - ) - .await?; - } - }; -} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 3e785edc7a9e9..b8fba46813199 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -17,6 +17,7 @@ pub mod catalog; pub mod clickhouse; pub mod coordinate; pub mod encoder; +pub mod formatter; pub mod iceberg; pub mod kafka; pub mod kinesis; @@ -47,6 +48,8 @@ pub use tracing; use self::catalog::SinkType; use self::clickhouse::{ClickHouseConfig, ClickHouseSink}; +use self::encoder::SerTo; +use self::formatter::SinkFormatter; use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK}; use crate::sink::boxed::BoxSink; use crate::sink::catalog::{SinkCatalog, SinkId}; @@ -202,6 +205,44 @@ pub trait SinkWriterV1: Send + 'static { async fn abort(&mut self) -> Result<()>; } +/// A free-form sink that may output in multiple formats and encodings. Examples include kafka, +/// kinesis, nats and redis. +/// +/// The implementor specifies required key & value type (likely string or bytes), as well as how to +/// write a single pair. The provided `write_chunk` method would handle the interaction with a +/// `SinkFormatter`. +/// +/// Currently kafka takes `&mut self` while kinesis takes `&self`. So we use `&mut self` in trait +/// but implement it for `&Kinesis`. This allows us to hold `&mut &Kinesis` and `&Kinesis` +/// simultaneously, preventing the schema clone issue propagating from kafka to kinesis. +pub trait FormattedSink { + type K; + type V; + async fn write_one(&mut self, k: Option, v: Option) -> Result<()>; + + async fn write_chunk( + &mut self, + chunk: StreamChunk, + formatter: F, + ) -> Result<()> + where + F::K: SerTo, + F::V: SerTo, + { + for r in formatter.format_chunk(&chunk) { + let (event_key_object, event_object) = r?; + + self.write_one( + event_key_object.map(SerTo::ser_to).transpose()?, + event_object.map(SerTo::ser_to).transpose()?, + ) + .await?; + } + + Ok(()) + } +} + pub struct SinkWriterV1Adapter { is_empty: bool, epoch: u64, diff --git a/src/connector/src/sink/utils.rs b/src/connector/src/sink/utils.rs index cb3023b9e4f9b..e1bae62c2b5c2 100644 --- a/src/connector/src/sink/utils.rs +++ b/src/connector/src/sink/utils.rs @@ -249,51 +249,3 @@ pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> Ok(records) } - -#[derive(Debug, Clone, Default)] -pub struct UpsertAdapterOpts {} - -#[try_stream(ok = (Option, Option), error = SinkError)] -pub async fn gen_upsert_message_stream<'a>( - chunk: StreamChunk, - _opts: UpsertAdapterOpts, - key_encoder: JsonEncoder<'a>, - val_encoder: JsonEncoder<'a>, -) { - for (op, row) in chunk.rows() { - let event_key_object = Some(Value::Object(key_encoder.encode(row)?)); - - let event_object = match op { - Op::Insert => Some(Value::Object(val_encoder.encode(row)?)), - Op::Delete => Some(Value::Null), - Op::UpdateDelete => { - // upsert semantic does not require update delete event - continue; - } - Op::UpdateInsert => Some(Value::Object(val_encoder.encode(row)?)), - }; - - yield (event_key_object, event_object); - } -} - -#[derive(Debug, Clone, Default)] -pub struct AppendOnlyAdapterOpts {} - -#[try_stream(ok = (Option, Option), error = SinkError)] -pub async fn gen_append_only_message_stream<'a>( - chunk: StreamChunk, - _opts: AppendOnlyAdapterOpts, - key_encoder: JsonEncoder<'a>, - val_encoder: JsonEncoder<'a>, -) { - for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; - } - let event_key_object = Some(Value::Object(key_encoder.encode(row)?)); - let event_object = Some(Value::Object(val_encoder.encode(row)?)); - - yield (event_key_object, event_object); - } -} From 711ecd5c06ded04a13d8e8753a961c73453804c8 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Mon, 18 Sep 2023 14:11:24 +0800 Subject: [PATCH 6/7] feat(state_table): add iterator sub range under a certain pk prefix (#12251) --- src/stream/src/common/table/state_table.rs | 94 +++++++-- .../src/common/table/test_state_table.rs | 182 ++++++++++++++++++ 2 files changed, 261 insertions(+), 15 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index be3c2620ddfe9..045fb1fdaeba9 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -17,6 +17,7 @@ use std::ops::Bound::*; use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; +use either::Either; use futures::{pin_mut, FutureExt, Stream, StreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; @@ -1195,6 +1196,27 @@ where .await } + /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same + /// `vnode`. + pub async fn iter_row_with_pk_prefix_sub_range( + &self, + pk_prefix: impl Row, + sub_range: &(Bound, Bound), + prefetch_options: PrefetchOptions, + ) -> StreamExecutorResult> { + let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes(); + + let memcomparable_range = + prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix); + + let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode); + Ok(deserialize_keyed_row_stream( + self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options) + .await?, + &self.row_serde, + )) + } + /// This function scans raw key-values from the relational table with specific `pk_range` under /// the same `vnode`. async fn iter_kv_with_pk_range( @@ -1297,15 +1319,38 @@ pub fn prefix_range_to_memcomparable( range: &(Bound, Bound), ) -> (Bound, Bound) { ( - to_memcomparable(pk_serde, &range.0, false), - to_memcomparable(pk_serde, &range.1, true), + start_range_to_memcomparable(pk_serde, &range.0), + end_range_to_memcomparable(pk_serde, &range.1, None), + ) +} + +fn prefix_and_sub_range_to_memcomparable( + pk_serde: &OrderedRowSerde, + sub_range: &(Bound, Bound), + pk_prefix: impl Row, +) -> (Bound, Bound) { + let (range_start, range_end) = sub_range; + let prefix_serializer = pk_serde.prefix(pk_prefix.len()); + let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer); + let start_range = match range_start { + Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))), + Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))), + Unbounded => Bound::Included(Either::Right(&pk_prefix)), + }; + let end_range = match range_end { + Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)), + Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)), + Unbounded => Unbounded, + }; + ( + start_range_to_memcomparable(pk_serde, &start_range), + end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)), ) } -fn to_memcomparable( +fn start_range_to_memcomparable( pk_serde: &OrderedRowSerde, bound: &Bound, - is_upper: bool, ) -> Bound { let serialize_pk_prefix = |pk_prefix: &R| { let prefix_serializer = pk_serde.prefix(pk_prefix.len()); @@ -1315,20 +1360,39 @@ fn to_memcomparable( Unbounded => Unbounded, Included(r) => { let serialized = serialize_pk_prefix(r); - if is_upper { - end_bound_of_prefix(&serialized) - } else { - Included(serialized) - } + + Included(serialized) } Excluded(r) => { let serialized = serialize_pk_prefix(r); - if !is_upper { - // if lower - start_bound_of_excluded_prefix(&serialized) - } else { - Excluded(serialized) - } + + start_bound_of_excluded_prefix(&serialized) + } + } +} + +fn end_range_to_memcomparable( + pk_serde: &OrderedRowSerde, + bound: &Bound, + serialized_pk_prefix: Option, +) -> Bound { + let serialize_pk_prefix = |pk_prefix: &R| { + let prefix_serializer = pk_serde.prefix(pk_prefix.len()); + serialize_pk(pk_prefix, &prefix_serializer) + }; + match bound { + Unbounded => match serialized_pk_prefix { + Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix), + None => Unbounded, + }, + Included(r) => { + let serialized = serialize_pk_prefix(r); + + end_bound_of_prefix(&serialized) + } + Excluded(r) => { + let serialized = serialize_pk_prefix(r); + Excluded(serialized) } } } diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index c3e5759a47ae6..2f5dc3202adb9 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; + use futures::{pin_mut, StreamExt}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; @@ -1833,3 +1835,183 @@ async fn test_state_table_watermark_cache_refill() { .as_scalar_ref_impl() ) } + +#[tokio::test] +async fn test_state_table_iter_prefix_and_sub_range() { + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let test_env = prepare_hummock_test_env().await; + + let order_types = vec![OrderType::ascending(), OrderType::ascending()]; + let column_ids = [ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)]; + let column_descs = vec![ + ColumnDesc::unnamed(column_ids[0], DataType::Int32), + ColumnDesc::unnamed(column_ids[1], DataType::Int32), + ColumnDesc::unnamed(column_ids[2], DataType::Int32), + ]; + let pk_index = vec![0_usize, 1_usize]; + let read_prefix_len_hint = 0; + let table = gen_prost_table( + TEST_TABLE_ID, + column_descs, + order_types, + pk_index, + read_prefix_len_hint, + ); + + test_env.register_table(table.clone()).await; + let mut state_table = + StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None) + .await; + let mut epoch = EpochPair::new_test_epoch(1); + state_table.init_epoch(epoch); + + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ])); + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ])); + state_table.insert(OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ])); + + state_table.insert(OwnedRow::new(vec![ + Some(4_i32.into()), + Some(44_i32.into()), + Some(444_i32.into()), + ])); + + epoch.inc(); + state_table.commit(epoch).await.unwrap(); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); + + let sub_range1 = ( + std::ops::Bound::Included(OwnedRow::new(vec![Some(11_i32.into())])), + std::ops::Bound::Excluded(OwnedRow::new(vec![Some(33_i32.into())])), + ); + + let iter = state_table + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range1, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); + + let sub_range2: (Bound, Bound) = ( + std::ops::Bound::Excluded(OwnedRow::new(vec![Some(11_i32.into())])), + std::ops::Bound::Unbounded, + ); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); + let iter = state_table + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range2, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); + + let sub_range3: (Bound, Bound) = ( + std::ops::Bound::Unbounded, + std::ops::Bound::Included(OwnedRow::new(vec![Some(33_i32.into())])), + ); + + let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]); + let iter = state_table + .iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range3, Default::default()) + .await + .unwrap(); + + pin_mut!(iter); + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(11_i32.into()), + Some(111_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(22_i32.into()), + Some(222_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await.unwrap().unwrap(); + + assert_eq!( + &OwnedRow::new(vec![ + Some(1_i32.into()), + Some(33_i32.into()), + Some(333_i32.into()), + ]), + res.as_ref() + ); + + let res = iter.next().await; + assert!(res.is_none()); +} From 784fe568d44e40fa29915292ac2df61faf5e5c91 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Mon, 18 Sep 2023 14:47:49 +0800 Subject: [PATCH 7/7] fix(backup): ensure correct delta log order (#12371) --- .../backup_restore/meta_snapshot_builder.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs index 0df9966607b93..e54c9f443f125 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::future::Future; use anyhow::anyhow; @@ -65,18 +65,22 @@ impl MetaSnapshotBuilder { // hummock_version and version_stats is guaranteed to exist in a initialized cluster. let hummock_version = { let mut redo_state = hummock_version; - let hummock_version_deltas = - HummockVersionDelta::list_at_snapshot::(&meta_store_snapshot).await?; - for version_delta in &hummock_version_deltas { + let hummock_version_deltas: BTreeMap<_, _> = + HummockVersionDelta::list_at_snapshot::(&meta_store_snapshot) + .await? + .into_iter() + .map(|d| (d.id, d)) + .collect(); + for version_delta in hummock_version_deltas.values() { if version_delta.prev_id == redo_state.id { redo_state.apply_version_delta(version_delta); } } - if let Some(log) = hummock_version_deltas.iter().next_back() { - if log.id != redo_state.id { + if let Some((max_log_id, _)) = hummock_version_deltas.last_key_value() { + if *max_log_id != redo_state.id { return Err(BackupError::Other(anyhow::anyhow!(format!( "inconsistent hummock version: expected {}, actual {}", - log.id, redo_state.id + max_log_id, redo_state.id )))); } }