diff --git a/.github/labeler.yml b/.github/labeler.yml index c80afc15ce8fb..f988abd5d3f4e 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -24,6 +24,10 @@ labels: title: "^doc.*" - label: "type/deprecate" title: "^deprecate.*" +- label: "cherry-pick" + title: "^cherry-pick.*" +- label: "cherry-pick" + title: "^cherry pick.*" - label: "breaking-change" body: '- \[x\] My PR contains breaking changes' diff --git a/.github/pr-title-checker-config.json b/.github/pr-title-checker-config.json index 1bb53a7b89ca1..9549c9c4f7709 100644 --- a/.github/pr-title-checker-config.json +++ b/.github/pr-title-checker-config.json @@ -4,7 +4,7 @@ "color": "B60205" }, "CHECKS": { - "regexp": "^(feat|fix|test|refactor|chore|style|doc|perf|build|ci|revert|deprecate)(\\(.*\\))?:.*", + "regexp": "^(feat|fix|test|refactor|chore|style|doc|perf|build|ci|revert|deprecate|cherry pick|cherry-pick)(\\(.*\\))?:.*", "ignoreLabels" : ["ignore-title"] }, "MESSAGES": { diff --git a/Cargo.lock b/Cargo.lock index 52ce898515317..27caa570579df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1028,6 +1028,32 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-lc-rs" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df33e4a55b03f8780ba55041bc7be91a2a8ec8c03517b0379d2d6c96d2c30d95" +dependencies = [ + "aws-lc-sys", + "mirai-annotations", + "paste", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ede3d6e360a48436fee127cb81710834407b1ec0c48a001cc29dec9005f73e" +dependencies = [ + "bindgen 0.69.4", + "cmake", + "dunce", + "fs_extra", + "libc", + "paste", +] + [[package]] name = "aws-runtime" version = "1.0.1" @@ -1568,6 +1594,29 @@ dependencies = [ "shlex", ] +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.5.0", + "cexpr", + "clang-sys", + "itertools 0.12.0", + "lazy_static", + "lazycell", + "log", + "prettyplease 0.2.15", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.48", + "which", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -3529,6 +3578,12 @@ dependencies = [ "shared_child", ] +[[package]] +name = "dunce" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" + [[package]] name = "duration-str" version = "0.7.0" @@ -4363,6 +4418,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "function_name" version = "0.3.0" @@ -4974,6 +5035,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "hostname" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" +dependencies = [ + "cfg-if", + "libc", + "windows 0.52.0", +] + [[package]] name = "http" version = "0.2.9" @@ -6223,6 +6295,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mirai-annotations" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" + [[package]] name = "mockall" version = "0.11.4" @@ -6361,7 +6439,7 @@ checksum = "8a60cb978c0a1d654edcc1460f8d6092dacf21346ed6017d81fb76a23ef5a8de" dependencies = [ "base64 0.21.7", "bigdecimal 0.4.2", - "bindgen", + "bindgen 0.59.2", "bitflags 2.5.0", "bitvec", "btoi", @@ -8632,7 +8710,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00" dependencies = [ - "hostname", + "hostname 0.3.1", "quick-error", ] @@ -8913,7 +8991,7 @@ dependencies = [ "risingwave_meta_node", "risingwave_rt", "shell-words", - "strum 0.25.0", + "strum 0.26.1", "strum_macros 0.26.1", "tempfile", "thiserror-ext", @@ -9821,7 +9899,7 @@ dependencies = [ "sea-orm", "serde", "serde_json", - "strum 0.25.0", + "strum 0.26.1", "sync-point", "thiserror", "thiserror-ext", @@ -9963,7 +10041,7 @@ dependencies = [ "opendal", "prometheus", "risingwave_common", - "rustls 0.22.3", + "rustls 0.23.4", "spin 0.9.8", "thiserror", "thiserror-ext", @@ -9985,7 +10063,7 @@ dependencies = [ "prost-helpers", "risingwave_error", "serde", - "strum 0.25.0", + "strum 0.26.1", "thiserror", "walkdir", "workspace-hack", @@ -10069,7 +10147,7 @@ dependencies = [ "console-subscriber", "either", "futures", - "hostname", + "hostname 0.4.0", "madsim-tokio", "opentelemetry", "opentelemetry-otlp", @@ -10667,6 +10745,21 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c4d6d8ad9f2492485e13453acbb291dd08f64441b6609c491f1c2cd2c6b4fe1" +dependencies = [ + "aws-lc-rs", + "log", + "once_cell", + "rustls-pki-types", + "rustls-webpki 0.102.2", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -10733,6 +10826,7 @@ version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ + "aws-lc-rs", "ring 0.17.5", "rustls-pki-types", "untrusted 0.9.0", @@ -11525,9 +11619,9 @@ checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" [[package]] name = "similar" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32fea41aca09ee824cc9724996433064c89f7777e60762749a4170a14abbfa21" +checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640" [[package]] name = "simple_asn1" @@ -12078,6 +12172,9 @@ name = "strum" version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f" +dependencies = [ + "strum_macros 0.26.1", +] [[package]] name = "strum_macros" @@ -13954,10 +14051,20 @@ version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" dependencies = [ - "windows-core", + "windows-core 0.51.1", "windows-targets 0.48.5", ] +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core 0.52.0", + "windows-targets 0.52.0", +] + [[package]] name = "windows-core" version = "0.51.1" @@ -13967,6 +14074,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.45.0" @@ -14363,6 +14479,7 @@ dependencies = [ "sqlx-postgres", "sqlx-sqlite", "strum 0.25.0", + "strum 0.26.1", "subtle", "syn 1.0.109", "syn 2.0.48", diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml index cc9befc068d0c..1d1c617bfa1fe 100644 --- a/src/cmd_all/Cargo.toml +++ b/src/cmd_all/Cargo.toml @@ -37,7 +37,7 @@ risingwave_frontend = { workspace = true } risingwave_meta_node = { workspace = true } risingwave_rt = { workspace = true } shell-words = "1.1.0" -strum = "0.25" +strum = "0.26" strum_macros = "0.26" tempfile = "3" tikv-jemallocator = { workspace = true, features = [ diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 4441be8c9db21..697fe421a5fb2 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -312,7 +312,8 @@ impl KafkaSplitEnumerator { for elem in offsets.elements_for_topic(self.topic.as_str()) { match elem.offset() { Offset::Offset(offset) => { - result.insert(elem.partition(), Some(offset)); + // XXX(rc): currently in RW source, `offset` means the last consumed offset, so we need to subtract 1 + result.insert(elem.partition(), Some(offset - 1)); } _ => { let (_, high_watermark) = self diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 98349ec7bfde5..675107b18dd83 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -71,7 +71,6 @@ use risingwave_rpc_client::{ComputeClientPool, ComputeClientPoolRef, MetaClient} use risingwave_sqlparser::ast::{ObjectName, Statement}; use risingwave_sqlparser::parser::Parser; use thiserror::Error; -use thiserror_ext::AsReport; use tokio::runtime::Builder; use tokio::sync::oneshot::Sender; use tokio::sync::watch; @@ -885,9 +884,7 @@ impl SessionImpl { formats: Vec, ) -> std::result::Result, BoxedError> { // Parse sql. - let mut stmts = Parser::parse_sql(&sql).inspect_err( - |e| tracing::error!(error = %e.as_report(), %sql, "failed to parse sql"), - )?; + let mut stmts = Parser::parse_sql(&sql)?; if stmts.is_empty() { return Ok(PgResponse::empty_result( pgwire::pg_response::StatementType::EMPTY, @@ -901,9 +898,7 @@ impl SessionImpl { ); } let stmt = stmts.swap_remove(0); - let rsp = handle(self, stmt, sql.clone(), formats).await.inspect_err( - |e| tracing::error!(error = %e.as_report(), %sql, "failed to handle sql"), - )?; + let rsp = handle(self, stmt, sql.clone(), formats).await?; Ok(rsp) } @@ -1093,11 +1088,7 @@ impl Session for SessionImpl { let string = stmt.to_string(); let sql_str = string.as_str(); let sql: Arc = Arc::from(sql_str); - let rsp = handle(self, stmt, sql.clone(), vec![format]) - .await - .inspect_err( - |e| tracing::error!(error = %e.as_report(), %sql, "failed to handle sql"), - )?; + let rsp = handle(self, stmt, sql.clone(), vec![format]).await?; Ok(rsp) } @@ -1140,9 +1131,7 @@ impl Session for SessionImpl { self: Arc, portal: Portal, ) -> std::result::Result, BoxedError> { - let rsp = handle_execute(self, portal) - .await - .inspect_err(|e| tracing::error!(error=%e.as_report(), "failed to handle execute"))?; + let rsp = handle_execute(self, portal).await?; Ok(rsp) } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 822a71d3ad17b..0119f6abfe06c 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -65,7 +65,7 @@ scopeguard = "1.2.0" sea-orm = { workspace = true } serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.113" -strum = { version = "0.25", features = ["derive"] } +strum = { version = "0.26", features = ["derive"] } sync-point = { path = "../utils/sync-point" } thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index d775e44f0e0c2..accdc1ab09652 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -30,7 +30,7 @@ madsim = "0.2.22" opendal = "0.45.1" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } -rustls = "0.22.3" +rustls = "0.23.4" spin = "0.9" thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 322b2260c5d16..196638f1e3689 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -608,9 +608,22 @@ impl S3ObjectStore { } Err(_) => { // s3 - let sdk_config = sdk_config_loader.load().await; - Client::new(&sdk_config) + #[cfg(madsim)] + let client = Client::new(&sdk_config); + #[cfg(not(madsim))] + let client = Client::from_conf( + aws_sdk_s3::config::Builder::from(&sdk_config) + .identity_cache( + aws_sdk_s3::config::IdentityCache::lazy() + .load_timeout(Duration::from_secs( + config.s3.identity_resolution_timeout_s, + )) + .build(), + ) + .build(), + ); + client } }; diff --git a/src/prost/Cargo.toml b/src/prost/Cargo.toml index d46e1baf2b4fe..af12b1683959c 100644 --- a/src/prost/Cargo.toml +++ b/src/prost/Cargo.toml @@ -14,7 +14,7 @@ prost = { workspace = true } prost-helpers = { path = "helpers" } risingwave_error = { workspace = true } serde = { version = "1", features = ["derive"] } -strum = "0.25" +strum = "0.26" thiserror = "1" tonic = { workspace = true } diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 496403fbc48e8..c7912e4334898 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -508,12 +508,10 @@ impl MaterializeCache { if update_cache { match conflict_behavior { - ConflictBehavior::Overwrite => { - self.data.push(key, Some(CompactedRow { row: value })); - } - ConflictBehavior::IgnoreConflict => { + ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict => { self.data.push(key, Some(CompactedRow { row: value })); } + _ => unreachable!(), } } @@ -521,7 +519,7 @@ impl MaterializeCache { Op::Delete | Op::UpdateDelete => { match conflict_behavior { - ConflictBehavior::Overwrite => { + ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict => { match self.force_get(&key) { Some(old_row) => { fixed_changes().delete(key.clone(), old_row.row.clone()); @@ -529,16 +527,7 @@ impl MaterializeCache { None => (), // delete a nonexistent value }; } - ConflictBehavior::IgnoreConflict => { - match self.force_get(&key) { - Some(old_row) => { - if old_row.row == value { - fixed_changes().delete(key.clone(), old_row.row.clone()); - } - } - None => (), // delete a nonexistent value - }; - } + _ => unreachable!(), }; @@ -1460,10 +1449,7 @@ mod tests { ) .await .unwrap(); - assert_eq!( - row, - Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())])) - ); + assert_eq!(row, None); // check delete wrong pk let row = table @@ -1505,7 +1491,7 @@ mod tests { .unwrap(); assert_eq!( row, - Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())])) + Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())])) ); // check update wrong pk, should become insert diff --git a/src/tests/sqlsmith/Cargo.toml b/src/tests/sqlsmith/Cargo.toml index 8e76dc20d71b8..8b6047467777d 100644 --- a/src/tests/sqlsmith/Cargo.toml +++ b/src/tests/sqlsmith/Cargo.toml @@ -27,7 +27,7 @@ risingwave_expr_impl = { workspace = true } risingwave_frontend = { workspace = true } risingwave_pb = { workspace = true } risingwave_sqlparser = { workspace = true } -similar = "2.4.0" +similar = "2.5.0" thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio" } tokio-postgres = "0.7" diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index fe421d242efae..38eba6e92d5b6 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -295,6 +295,8 @@ where let elapsed = start.elapsed(); // Always log if an error occurs. + // Note: all messages will be processed through this code path, making it the + // only necessary place to log errors. if let Err(error) = &result { tracing::error!(error = %error.as_report(), "error when process message"); } @@ -568,11 +570,8 @@ where session: Arc, ) -> PsqlResult<()> { // Parse sql. - let stmts = Parser::parse_sql(&sql) - .inspect_err( - |e| tracing::error!(sql = &*sql, error = %e.as_report(), "failed to parse sql"), - ) - .map_err(|err| PsqlError::SimpleQueryError(err.into()))?; + let stmts = + Parser::parse_sql(&sql).map_err(|err| PsqlError::SimpleQueryError(err.into()))?; if stmts.is_empty() { self.stream.write_no_flush(&BeMessage::EmptyQueryResponse)?; } @@ -691,9 +690,6 @@ where let stmt = { let stmts = Parser::parse_sql(sql) - .inspect_err( - |e| tracing::error!(sql, error = %e.as_report(), "failed to parse sql"), - ) .map_err(|err| PsqlError::ExtendedPrepareError(err.into()))?; if stmts.len() > 1 { diff --git a/src/utils/runtime/Cargo.toml b/src/utils/runtime/Cargo.toml index 1f4e085632eae..0e5a156361fb7 100644 --- a/src/utils/runtime/Cargo.toml +++ b/src/utils/runtime/Cargo.toml @@ -20,7 +20,7 @@ console = "0.15" console-subscriber = "0.2.0" either = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } -hostname = "0.3" +hostname = "0.4" parking_lot = { version = "0.12", features = ["deadlock_detection"] } pprof = { version = "0.13", features = ["flamegraph"] } risingwave_common = { workspace = true } diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index da77f2e76c0b6..7dd834afbcbdc 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -274,7 +274,10 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .compact() .with_filter(FilterFn::new(|metadata| metadata.is_event())) // filter-out all span-related info .boxed(), - Deployment::Cloud => fmt_layer.json().boxed(), + Deployment::Cloud => fmt_layer + .json() + .map_event_format(|e| e.with_current_span(false)) // avoid duplication as there's a span list field + .boxed(), Deployment::Other => fmt_layer.boxed(), }; diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 177ce846390dd..1ccd14ff77a17 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -129,7 +129,8 @@ sqlx-core = { version = "0.7", features = ["_rt-tokio", "_tls-native-tls", "bigd sqlx-mysql = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "rust_decimal", "time", "uuid"] } sqlx-postgres = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "rust_decimal", "time", "uuid"] } sqlx-sqlite = { version = "0.7", default-features = false, features = ["chrono", "json", "time", "uuid"] } -strum = { version = "0.25", features = ["derive"] } +strum-2f80eeee3b1b6c7e = { package = "strum", version = "0.26", features = ["derive"] } +strum-2ffb4c3fe830441c = { package = "strum", version = "0.25", features = ["derive"] } subtle = { version = "2" } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } target-lexicon = { version = "0.12", features = ["std"] }