diff --git a/Cargo.lock b/Cargo.lock index f110fec5c16c..220d030807e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5072,9 +5072,9 @@ dependencies = [ [[package]] name = "futures-async-stream" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "379790776b0d953337df4ab7ecc51936c66ea112484cad7912907b1d34253ebf" +checksum = "6cce57e88ba9fe4953f476112b2c8e315a2da07725a14dc091ac3e5b6e4cca72" dependencies = [ "futures-async-stream-macro", "futures-core", @@ -5083,9 +5083,9 @@ dependencies = [ [[package]] name = "futures-async-stream-macro" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df2c13d48c8cb8a3ec093ede6f0f4482f327d7bb781120c5fb483ef0f17e758" +checksum = "5ac45ed0bddbd110eb68862768a194f88700f5b91c39931d2f432fab67a16d08" dependencies = [ "proc-macro2", "quote", @@ -7626,6 +7626,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-format" version = "0.4.4" @@ -13827,13 +13833,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", "libc", + "num-conv", "num_threads", "powerfmt", "serde", @@ -13849,10 +13856,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ + "num-conv", "time-core", ] diff --git a/Cargo.toml b/Cargo.toml index 0c7b8a8999e6..1785398917b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -247,11 +247,16 @@ rw_iter_util = { path = "src/utils/iter_util" } [workspace.lints.rust] # `forbid` will also prevent the misuse of `#[allow(unused)]` unused_must_use = "forbid" -future_incompatible = "warn" -nonstandard_style = "warn" -rust_2018_idioms = "warn" +future_incompatible = { level = "warn", priority = -1 } +nonstandard_style = { level = "warn", priority = -1 } +rust_2018_idioms = { level = "warn", priority = -1 } # Backward compatibility is not important for an application. async_fn_in_trait = "allow" +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(madsim)', + 'cfg(coverage)', + 'cfg(dashboard_built)', +] } [workspace.lints.clippy] uninlined_format_args = "allow" diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 329bf01b49c5..6602509824e0 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240729 +export BUILD_ENV_VERSION=v20240731 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 89287926edc7..83cb000566d4 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: - mysql - db @@ -107,12 +107,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -123,7 +123,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: db: condition: service_healthy diff --git a/ci/rust-toolchain b/ci/rust-toolchain index 92ef4a5ff27b..6bc57a2a65d8 100644 --- a/ci/rust-toolchain +++ b/ci/rust-toolchain @@ -4,4 +4,4 @@ # 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints [toolchain] -channel = "nightly-2024-03-12" +channel = "nightly-2024-06-06" diff --git a/ci/scripts/build-other.sh b/ci/scripts/build-other.sh index 65c50462f97a..d59e8ea876cc 100755 --- a/ci/scripts/build-other.sh +++ b/ci/scripts/build-other.sh @@ -5,9 +5,9 @@ set -euo pipefail source ci/scripts/common.sh - echo "--- Build Rust UDF" cd e2e_test/udf/wasm +rustup target add wasm32-wasi cargo build --release cd ../../.. diff --git a/clippy.toml b/clippy.toml index 551de0eb6c47..21b972376b0e 100644 --- a/clippy.toml +++ b/clippy.toml @@ -39,3 +39,6 @@ doc-valid-idents = [ avoid-breaking-exported-api = false upper-case-acronyms-aggressive = true too-many-arguments-threshold = 10 +ignore-interior-mutability = [ + "risingwave_frontend::expr::ExprImpl" # XXX: Where does ExprImpl have interior mutability? +] diff --git a/e2e_test/source_inline/pubsub/prepare-data.rs b/e2e_test/source_inline/pubsub/prepare-data.rs index a792c6cbb261..e084b4691942 100755 --- a/e2e_test/source_inline/pubsub/prepare-data.rs +++ b/e2e_test/source_inline/pubsub/prepare-data.rs @@ -1,5 +1,5 @@ #!/usr/bin/env -S cargo -Zscript -```cargo +---cargo [dependencies] anyhow = "1" google-cloud-googleapis = { version = "0.13", features = ["pubsub"] } @@ -13,7 +13,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } -``` +--- use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::client::{Client, ClientConfig}; diff --git a/lints/Cargo.lock b/lints/Cargo.lock index 3a9bd8384c0d..e3b748e6da67 100644 --- a/lints/Cargo.lock +++ b/lints/Cargo.lock @@ -162,8 +162,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clippy_config" -version = "0.1.79" -source = "git+https://github.com/rust-lang/rust-clippy?rev=fca4e16ffb8c07186ee23becd44cd5c9fb51896c#fca4e16ffb8c07186ee23becd44cd5c9fb51896c" +version = "0.1.80" dependencies = [ "rustc-semver", "serde", @@ -172,8 +171,7 @@ dependencies = [ [[package]] name = "clippy_utils" -version = "0.1.79" -source = "git+https://github.com/rust-lang/rust-clippy?rev=fca4e16ffb8c07186ee23becd44cd5c9fb51896c#fca4e16ffb8c07186ee23becd44cd5c9fb51896c" +version = "0.1.80" dependencies = [ "arrayvec", "clippy_config", diff --git a/lints/Cargo.toml b/lints/Cargo.toml index 74fc49c3fd08..43ece1f6fc5b 100644 --- a/lints/Cargo.toml +++ b/lints/Cargo.toml @@ -14,7 +14,7 @@ path = "ui/format_error.rs" # See `README.md` before bumping the version. # Remember to update the version in `ci/Dockerfile` as well. [dependencies] -clippy_utils = { git = "https://github.com/rust-lang/rust-clippy", rev = "fca4e16ffb8c07186ee23becd44cd5c9fb51896c" } +clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "5e2a7c6adebdb0478ee6d5b67ab4ee94153b2997" } dylint_linting = "3.1.0" itertools = "0.12" diff --git a/lints/README.md b/lints/README.md index 5007474227ab..3ab55f0bbfe7 100644 --- a/lints/README.md +++ b/lints/README.md @@ -30,8 +30,13 @@ Duplicate `.vscode/settings.json.example` to `.vscode/settings.json` to enable r ## Bump toolchain -The version of the toolchain is specified in `rust-toolchain` file under current directory. -It does not have to be exactly the same as the one used to build RisingWave, but it should be close enough to avoid compile errors. +The version of the toolchain is specified in `rust-toolchain` file under current directory. It will be used to build the lints, and also be used by `dylint` to compile RisingWave, instead of the root-level `rust-toolchain`. + +So the chosen toolchain needs to +1. be close enough to the root-level `rust-toolchain` to make RisingWave compile. It does not have to be exactly the same version though. +2. be close enough to the dependency `clippy_utils`'s corresponding `rust-toolchain` in the Clippy's repo. + +(Note: `clippy_utils` depends on rustc's internal unstable API. When rustc has breaking changes, the `rust` repo's Clippy will be updated. And then it's [synced back to the Clippy repo bi-weekly](https://doc.rust-lang.org/clippy/development/infrastructure/sync.html#syncing-changes-between-clippy-and-rust-langrust). So ideally we can use `clippy_utils` in the rust repo corresponding to our root-level nightly version, but that repo is too large. Perhaps we can also consider copy the code out to workaround this problem.) The information below can be helpful in finding the appropriate version to bump to. diff --git a/lints/rust-toolchain b/lints/rust-toolchain index 3bbdf2b2d53f..a146af66cd63 100644 --- a/lints/rust-toolchain +++ b/lints/rust-toolchain @@ -1,5 +1,5 @@ # See `README.md` before bumping the version. [toolchain] -channel = "nightly-2024-04-18" +channel = "nightly-2024-06-06" components = ["llvm-tools-preview", "rustc-dev"] diff --git a/lints/src/format_error.rs b/lints/src/format_error.rs index 9b6d9be5b8c4..402adc4aa5af 100644 --- a/lints/src/format_error.rs +++ b/lints/src/format_error.rs @@ -14,7 +14,7 @@ use clippy_utils::diagnostics::span_lint_and_help; use clippy_utils::macros::{ - find_format_arg_expr, find_format_args, is_format_macro, macro_backtrace, + find_format_arg_expr, is_format_macro, macro_backtrace, FormatArgsStorage, }; use clippy_utils::ty::{implements_trait, match_type}; use clippy_utils::{ @@ -56,7 +56,15 @@ declare_tool_lint! { } #[derive(Default)] -pub struct FormatError; +pub struct FormatError { + format_args: FormatArgsStorage, +} + +impl FormatError { + pub fn new(format_args: FormatArgsStorage) -> Self { + Self { format_args } + } +} impl_lint_pass!(FormatError => [FORMAT_ERROR]); @@ -90,7 +98,7 @@ impl<'tcx> LateLintPass<'tcx> for FormatError { for macro_call in macro_backtrace(expr.span) { if is_format_macro(cx, macro_call.def_id) - && let Some(format_args) = find_format_args(cx, expr, macro_call.expn) + && let Some(format_args) = self.format_args.get(cx, expr, macro_call.expn) { for piece in &format_args.template { if let FormatArgsPiece::Placeholder(placeholder) = piece diff --git a/lints/src/lib.rs b/lints/src/lib.rs index df77538d3cf1..6928bcd028a8 100644 --- a/lints/src/lib.rs +++ b/lints/src/lib.rs @@ -14,7 +14,6 @@ #![feature(rustc_private)] #![feature(let_chains)] -#![feature(lazy_cell)] #![warn(unused_extern_crates)] extern crate rustc_ast; @@ -36,13 +35,19 @@ pub fn register_lints(_sess: &rustc_session::Session, lint_store: &mut rustc_lin // -- Begin lint registration -- // Preparation steps. - lint_store.register_early_pass(|| { - Box::::default() + let format_args_storage = clippy_utils::macros::FormatArgsStorage::default(); + let format_args = format_args_storage.clone(); + lint_store.register_early_pass(move || { + Box::new(utils::format_args_collector::FormatArgsCollector::new( + format_args.clone(), + )) }); // Actual lints. lint_store.register_lints(&[format_error::FORMAT_ERROR]); - lint_store.register_late_pass(|_| Box::::default()); + let format_args = format_args_storage.clone(); + lint_store + .register_late_pass(move |_| Box::new(format_error::FormatError::new(format_args.clone()))); // -- End lint registration -- diff --git a/lints/src/utils/format_args_collector.rs b/lints/src/utils/format_args_collector.rs index 7524169666d9..a3b144a6a8ae 100644 --- a/lints/src/utils/format_args_collector.rs +++ b/lints/src/utils/format_args_collector.rs @@ -12,15 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Copied from `https://github.com/rust-lang/rust-clippy/blob/8b0bf6423dfaf5545014db85fcba7bc745beed4c/clippy_lints/src/utils/format_args_collector.rs` -//! -//! Init `AST_FORMAT_ARGS` before running the late pass, so that we can call `find_format_args`. +//! Copied from `https://github.com/rust-lang/rust-clippy/blob/993d8ae2a7b26ac779fde923b2ce9ce35d7143a8/clippy_lints/src/utils/format_args_collector.rs` use std::iter::once; use std::mem; -use std::rc::Rc; -use clippy_utils::macros::AST_FORMAT_ARGS; +use clippy_utils::macros::FormatArgsStorage; use clippy_utils::source::snippet_opt; use itertools::Itertools; use rustc_ast::{Crate, Expr, ExprKind, FormatArgs}; @@ -30,11 +27,19 @@ use rustc_lint::{EarlyContext, EarlyLintPass}; use rustc_session::impl_lint_pass; use rustc_span::{hygiene, Span}; -/// Collects [`rustc_ast::FormatArgs`] so that future late passes can call -/// [`clippy_utils::macros::find_format_args`] -#[derive(Default)] +/// Populates [`FormatArgsStorage`] with AST [`FormatArgs`] nodes pub struct FormatArgsCollector { - format_args: FxHashMap>, + format_args: FxHashMap, + storage: FormatArgsStorage, +} + +impl FormatArgsCollector { + pub fn new(storage: FormatArgsStorage) -> Self { + Self { + format_args: FxHashMap::default(), + storage, + } + } } impl_lint_pass!(FormatArgsCollector => []); @@ -47,15 +52,12 @@ impl EarlyLintPass for FormatArgsCollector { } self.format_args - .insert(expr.span.with_parent(None), Rc::new((**args).clone())); + .insert(expr.span.with_parent(None), (**args).clone()); } } fn check_crate_post(&mut self, _: &EarlyContext<'_>, _: &Crate) { - AST_FORMAT_ARGS.with(|ast_format_args| { - let result = ast_format_args.set(mem::take(&mut self.format_args)); - debug_assert!(result.is_ok()); - }); + self.storage.set(mem::take(&mut self.format_args)); } } diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 5783c788bb5e..3614673ee941 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -206,21 +206,24 @@ impl LogRowSeqScanExecutor { .batch_iter_log_with_pk_bounds(old_epoch, new_epoch) .await? .flat_map(|r| { - futures::stream::iter(std::iter::from_coroutine(move || { - match r { - Ok(change_log_row) => { - fn with_op(op: Op, row: impl Row) -> impl Row { - row.chain([Some(ScalarImpl::Int16(op.to_i16()))]) + futures::stream::iter(std::iter::from_coroutine( + #[coroutine] + move || { + match r { + Ok(change_log_row) => { + fn with_op(op: Op, row: impl Row) -> impl Row { + row.chain([Some(ScalarImpl::Int16(op.to_i16()))]) + } + for (op, row) in change_log_row.into_op_value_iter() { + yield Ok(with_op(op, row)); + } } - for (op, row) in change_log_row.into_op_value_iter() { - yield Ok(with_op(op, row)); + Err(e) => { + yield Err(e); } - } - Err(e) => { - yield Err(e); - } - }; - })) + }; + }, + )) }); pin_mut!(iter); diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index ce479daa2afc..414f27b33b4a 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -28,7 +28,6 @@ #![feature(allocator_api)] #![feature(impl_trait_in_assoc_type)] #![feature(assert_matches)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(map_try_insert)] #![feature(iter_from_coroutine)] diff --git a/src/cmd_all/src/lib.rs b/src/cmd_all/src/lib.rs index 73180c75032c..9b95af8dce4d 100644 --- a/src/cmd_all/src/lib.rs +++ b/src/cmd_all/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] #![feature(let_chains)] mod common; diff --git a/src/common/metrics/src/lib.rs b/src/common/metrics/src/lib.rs index fa0250f4f0c7..35e8b1126584 100644 --- a/src/common/metrics/src/lib.rs +++ b/src/common/metrics/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] #![feature(type_alias_impl_trait)] #![feature(impl_trait_in_assoc_type)] use std::ops::Deref; diff --git a/src/common/secret/src/lib.rs b/src/common/secret/src/lib.rs index 8ac065e5ea18..17319a296734 100644 --- a/src/common/secret/src/lib.rs +++ b/src/common/secret/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - type SecretId = u32; mod secret_manager; diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 467b313720f2..8d47d0c62164 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -26,7 +26,6 @@ #![feature(lint_reasons)] #![feature(coroutines)] #![feature(map_try_insert)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(let_chains)] #![feature(portable_simd)] @@ -35,7 +34,6 @@ #![allow(incomplete_features)] #![feature(iterator_try_collect)] #![feature(iter_order_by)] -#![feature(exclusive_range_pattern)] #![feature(binary_heap_into_iter_sorted)] #![feature(impl_trait_in_assoc_type)] #![feature(map_entry_replace)] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index cee364ceb26d..d91fb56b1cb8 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -18,7 +18,6 @@ #![feature(let_chains)] #![feature(lint_reasons)] #![feature(impl_trait_in_assoc_type)] -#![feature(lazy_cell)] #![cfg_attr(coverage, feature(coverage_attribute))] #[macro_use] diff --git a/src/connector/benches/nexmark_integration.rs b/src/connector/benches/nexmark_integration.rs index 172931562efe..f59ae08a9ec1 100644 --- a/src/connector/benches/nexmark_integration.rs +++ b/src/connector/benches/nexmark_integration.rs @@ -18,8 +18,6 @@ //! `ByteStreamSourceParserImpl::create` based on the given configuration, rather //! than depending on a specific internal implementation. -#![feature(lazy_cell)] - use std::sync::LazyLock; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index 651fa84e109f..cbf0ad14046f 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -22,7 +22,6 @@ #![feature(box_patterns)] #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 85bb7740ae9f..02a3b8c84b50 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -20,7 +20,6 @@ #![feature(box_patterns)] #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index dde74a999ac9..ac93ab3e6980 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -87,7 +87,7 @@ impl AvroAccessBuilder { /// ## Confluent schema registry /// /// - In Kafka ([Confluent schema registry wire format](https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format)): - /// starts with 5 bytes`0x00{schema_id:08x}` followed by Avro binary encoding. + /// starts with 5 bytes`0x00{schema_id:08x}` followed by Avro binary encoding. async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult> { // parse payload to avro value // if use confluent schema, get writer schema from confluent schema registry diff --git a/src/connector/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs index 8a4b7feda6a4..c0c47fa37e17 100644 --- a/src/connector/src/sink/formatter/append_only.rs +++ b/src/connector/src/sink/formatter/append_only.rs @@ -40,19 +40,22 @@ impl SinkFormatter for AppendOnlyFormatter impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; - } - let event_key_object = match &self.key_encoder { - Some(key_encoder) => Some(tri!(key_encoder.encode(row))), - None => None, - }; - let event_object = Some(tri!(self.val_encoder.encode(row))); + std::iter::from_coroutine( + #[coroutine] + || { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let event_key_object = match &self.key_encoder { + Some(key_encoder) => Some(tri!(key_encoder.encode(row))), + None => None, + }; + let event_object = Some(tri!(self.val_encoder.encode(row))); - yield Ok((event_key_object, event_object)) - } - }) + yield Ok((event_key_object, event_object)) + } + }, + ) } } diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index fd4813e78541..a9bf0404f473 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -100,100 +100,103 @@ impl SinkFormatter for DebeziumJsonFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - let DebeziumJsonFormatter { - schema, - pk_indices, - db_name, - sink_from_name, - opts, - key_encoder, - val_encoder, - } = self; - let ts_ms = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let source_field = json!({ - // todo: still some missing fields in source field - // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events - "db": db_name, - "table": sink_from_name, - "ts_ms": ts_ms, - }); - - let mut update_cache: Option> = None; - - for (op, row) in chunk.rows() { - let event_key_object: Option = Some(json!({ - "schema": json!({ - "type": "struct", - "fields": fields_pk_to_json(&schema.fields, pk_indices), - "optional": false, - "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), - }), - "payload": tri!(key_encoder.encode(row)), - })); - let event_object: Option = match op { - Op::Insert => Some(json!({ - "schema": schema_to_json(schema, db_name, sink_from_name), - "payload": { - "before": null, - "after": tri!(val_encoder.encode(row)), - "op": "c", - "ts_ms": ts_ms, - "source": source_field, - } - })), - Op::Delete => { - let value_obj = Some(json!({ + std::iter::from_coroutine( + #[coroutine] + || { + let DebeziumJsonFormatter { + schema, + pk_indices, + db_name, + sink_from_name, + opts, + key_encoder, + val_encoder, + } = self; + let ts_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let source_field = json!({ + // todo: still some missing fields in source field + // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events + "db": db_name, + "table": sink_from_name, + "ts_ms": ts_ms, + }); + + let mut update_cache: Option> = None; + + for (op, row) in chunk.rows() { + let event_key_object: Option = Some(json!({ + "schema": json!({ + "type": "struct", + "fields": fields_pk_to_json(&schema.fields, pk_indices), + "optional": false, + "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), + }), + "payload": tri!(key_encoder.encode(row)), + })); + let event_object: Option = match op { + Op::Insert => Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": tri!(val_encoder.encode(row)), - "after": null, - "op": "d", + "before": null, + "after": tri!(val_encoder.encode(row)), + "op": "c", "ts_ms": ts_ms, "source": source_field, } - })); - yield Ok((event_key_object.clone(), value_obj)); - - if opts.gen_tombstone { - // Tomestone event - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events - yield Ok((event_key_object, None)); - } - - continue; - } - Op::UpdateDelete => { - update_cache = Some(tri!(val_encoder.encode(row))); - continue; - } - Op::UpdateInsert => { - if let Some(before) = update_cache.take() { - Some(json!({ + })), + Op::Delete => { + let value_obj = Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": before, - "after": tri!(val_encoder.encode(row)), - "op": "u", + "before": tri!(val_encoder.encode(row)), + "after": null, + "op": "d", "ts_ms": ts_ms, "source": source_field, } - })) - } else { - warn!( - "not found UpdateDelete in prev row, skipping, row index {:?}", - row.index() - ); + })); + yield Ok((event_key_object.clone(), value_obj)); + + if opts.gen_tombstone { + // Tomestone event + // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events + yield Ok((event_key_object, None)); + } + continue; } - } - }; - yield Ok((event_key_object, event_object)); - } - }) + Op::UpdateDelete => { + update_cache = Some(tri!(val_encoder.encode(row))); + continue; + } + Op::UpdateInsert => { + if let Some(before) = update_cache.take() { + Some(json!({ + "schema": schema_to_json(schema, db_name, sink_from_name), + "payload": { + "before": before, + "after": tri!(val_encoder.encode(row)), + "op": "u", + "ts_ms": ts_ms, + "source": source_field, + } + })) + } else { + warn!( + "not found UpdateDelete in prev row, skipping, row index {:?}", + row.index() + ); + continue; + } + } + }; + yield Ok((event_key_object, event_object)); + } + }, + ) } } diff --git a/src/connector/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs index 612c22aaaf86..7e586a7917ea 100644 --- a/src/connector/src/sink/formatter/upsert.rs +++ b/src/connector/src/sink/formatter/upsert.rs @@ -40,22 +40,25 @@ impl SinkFormatter for UpsertFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - for (op, row) in chunk.rows() { - let event_key_object = Some(tri!(self.key_encoder.encode(row))); - - let event_object = match op { - Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), - // Empty value with a key - Op::Delete => None, - Op::UpdateDelete => { - // upsert semantic does not require update delete event - continue; - } - }; - - yield Ok((event_key_object, event_object)) - } - }) + std::iter::from_coroutine( + #[coroutine] + || { + for (op, row) in chunk.rows() { + let event_key_object = Some(tri!(self.key_encoder.encode(row))); + + let event_object = match op { + Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), + // Empty value with a key + Op::Delete => None, + Op::UpdateDelete => { + // upsert semantic does not require update delete event + continue; + } + }; + + yield Ok((event_key_object, event_object)) + } + }, + ) } } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index c6e65fb00c39..3a6914d2249c 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -403,7 +403,16 @@ struct KafkaPayloadWriter<'a> { config: &'a KafkaConfig, } -pub type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; +mod opaque_type { + use super::*; + pub type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; + + pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture { + future.map(KafkaPayloadWriter::<'static>::map_future_result) + } +} +use opaque_type::map_delivery_future; +pub use opaque_type::KafkaSinkDeliveryFuture; pub struct KafkaSinkWriter { formatter: SinkFormatterImpl, @@ -482,7 +491,7 @@ impl<'w> KafkaPayloadWriter<'w> { Ok(delivery_future) => { if self .add_future - .add_future_may_await(Self::map_delivery_future(delivery_future)) + .add_future_may_await(map_delivery_future(delivery_future)) .await? { tracing::warn!( @@ -567,10 +576,6 @@ impl<'w> KafkaPayloadWriter<'w> { Err(_) => Err(KafkaError::Canceled.into()), } } - - fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture { - future.map(KafkaPayloadWriter::<'static>::map_future_result) - } } impl<'a> FormattedSink for KafkaPayloadWriter<'a> { diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 771d3c8a6f91..3b5d49da4603 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -201,8 +201,36 @@ impl KinesisSinkWriter { } } -pub type KinesisSinkPayloadWriterDeliveryFuture = - impl TryFuture + Unpin + Send + 'static; +mod opaque_type { + use super::*; + pub type KinesisSinkPayloadWriterDeliveryFuture = + impl TryFuture + Unpin + Send + 'static; + + impl KinesisSinkPayloadWriter { + pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture { + async move { + let builder = self.builder.expect("should not be None"); + let context_fmt = format!( + "failed to put record to {}", + builder + .get_stream_name() + .as_ref() + .expect("should have set stream name") + ); + Retry::spawn( + ExponentialBackoff::from_millis(100).map(jitter).take(3), + || builder.clone().send(), + ) + .await + .with_context(|| context_fmt.clone()) + .map_err(SinkError::Kinesis)?; + Ok(()) + } + .boxed() + } + } +} +pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture; impl KinesisSinkPayloadWriter { fn put_record(&mut self, key: String, payload: Vec) { @@ -216,28 +244,6 @@ impl KinesisSinkPayloadWriter { ), ); } - - fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture { - async move { - let builder = self.builder.expect("should not be None"); - let context_fmt = format!( - "failed to put record to {}", - builder - .get_stream_name() - .as_ref() - .expect("should have set stream name") - ); - Retry::spawn( - ExponentialBackoff::from_millis(100).map(jitter).take(3), - || builder.clone().send(), - ) - .await - .with_context(|| context_fmt.clone()) - .map_err(SinkError::Kinesis)?; - Ok(()) - } - .boxed() - } } impl FormattedSink for KinesisSinkPayloadWriter { diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 3f016ad94946..a92d5b16f85e 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -235,15 +235,20 @@ struct PulsarPayloadWriter<'w> { add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>, } -pub type PulsarDeliveryFuture = impl TryFuture + Unpin + 'static; - -fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture { - future.map(|result| { - result - .map(|_| ()) - .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e))) - }) +mod opaque_type { + use super::*; + pub type PulsarDeliveryFuture = impl TryFuture + Unpin + 'static; + + pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture { + future.map(|result| { + result + .map(|_| ()) + .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e))) + }) + } } +use opaque_type::may_delivery_future; +pub use opaque_type::PulsarDeliveryFuture; impl PulsarSinkWriter { pub async fn new( diff --git a/src/expr/core/src/lib.rs b/src/expr/core/src/lib.rs index b250b8ce901f..d45d4ca11f80 100644 --- a/src/expr/core/src/lib.rs +++ b/src/expr/core/src/lib.rs @@ -15,7 +15,6 @@ #![feature(let_chains)] #![feature(lint_reasons)] #![feature(iterator_try_collect)] -#![feature(lazy_cell)] #![feature(coroutines)] #![feature(never_type)] #![feature(error_generic_member_access)] diff --git a/src/expr/impl/src/lib.rs b/src/expr/impl/src/lib.rs index 56bdbe3b8110..e5c69c2660ee 100644 --- a/src/expr/impl/src/lib.rs +++ b/src/expr/impl/src/lib.rs @@ -25,8 +25,6 @@ #![feature(assert_matches)] #![feature(lint_reasons)] #![feature(iterator_try_collect)] -#![feature(exclusive_range_pattern)] -#![feature(lazy_cell)] #![feature(coroutines)] #![feature(test)] #![feature(iter_array_chunks)] diff --git a/src/expr/impl/src/window_function/buffer.rs b/src/expr/impl/src/window_function/buffer.rs index bd1c10d162b2..57217dda6fd4 100644 --- a/src/expr/impl/src/window_function/buffer.rs +++ b/src/expr/impl/src/window_function/buffer.rs @@ -970,7 +970,7 @@ mod tests { let key = |key: i64| -> StateKey { StateKey { - order_key: memcmp_encoding::encode_value(&Some(ScalarImpl::from(key)), order_type) + order_key: memcmp_encoding::encode_value(Some(ScalarImpl::from(key)), order_type) .unwrap(), pk: OwnedRow::empty().into(), } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 185d65cb567a..4bdf9fa398f7 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -25,7 +25,6 @@ #![feature(assert_matches)] #![feature(lint_reasons)] #![feature(box_patterns)] -#![feature(lazy_cell)] #![feature(macro_metavar_expr)] #![feature(min_specialization)] #![feature(extend_one)] diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 18d1807948d2..419f4ffd21cb 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(error_generic_member_access)] -#![feature(lazy_cell)] #![feature(once_cell_try)] #![feature(type_alias_impl_trait)] #![feature(try_blocks)] @@ -243,25 +242,28 @@ struct JavaClassMethodCache { utc: OnceLock, } -// TODO: may only return a RowRef -pub type StreamChunkRowIterator<'a> = impl Iterator + 'a; +mod opaque_type { + use super::*; + // TODO: may only return a RowRef + pub type StreamChunkRowIterator<'a> = impl Iterator + 'a; + + impl<'a> JavaBindingIteratorInner<'a> { + pub(super) fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> { + JavaBindingIteratorInner::StreamChunk( + chunk + .rows() + .map(|(op, row)| (op.to_protobuf(), row.to_owned_row())), + ) + } + } +} +pub use opaque_type::StreamChunkRowIterator; pub type HummockJavaBindingIterator = BoxStream<'static, anyhow::Result<(Bytes, OwnedRow)>>; - pub enum JavaBindingIteratorInner<'a> { Hummock(HummockJavaBindingIterator), StreamChunk(StreamChunkRowIterator<'a>), } -impl<'a> JavaBindingIteratorInner<'a> { - fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> { - JavaBindingIteratorInner::StreamChunk( - chunk - .rows() - .map(|(op, row)| (op.to_protobuf(), row.to_owned_row())), - ) - } -} - enum RowExtra { Op(Op), Key(Bytes), diff --git a/src/jni_core/src/macros.rs b/src/jni_core/src/macros.rs index 982ccda06ecf..2e7d095e0bd4 100644 --- a/src/jni_core/src/macros.rs +++ b/src/jni_core/src/macros.rs @@ -375,7 +375,6 @@ macro_rules! to_jvalue { /// Generate the jni signature of a given function /// ``` -/// #![feature(lazy_cell)] /// use risingwave_jni_core::gen_jni_sig; /// assert_eq!(gen_jni_sig!(boolean f(int, short, byte[])), "(IS[B)Z"); /// assert_eq!( diff --git a/src/license/src/lib.rs b/src/license/src/lib.rs index 0e641be9789b..7f2b25d8f3fb 100644 --- a/src/license/src/lib.rs +++ b/src/license/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - mod feature; mod manager; diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 80a83349f2cc..9ab248802772 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -14,7 +14,6 @@ #![feature(lint_reasons)] #![feature(let_chains)] -#![feature(lazy_cell)] #![feature(impl_trait_in_assoc_type)] #![cfg_attr(coverage, feature(coverage_attribute))] diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 71c99a7e065b..811b3b152d06 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -20,7 +20,6 @@ #![feature(extract_if)] #![feature(hash_extract_if)] #![feature(btree_extract_if)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(error_generic_member_access)] #![feature(assert_matches)] diff --git a/src/object_store/src/lib.rs b/src/object_store/src/lib.rs index 811d482587ac..d9e768b7f029 100644 --- a/src/object_store/src/lib.rs +++ b/src/object_store/src/lib.rs @@ -14,7 +14,6 @@ #![feature(trait_alias)] #![feature(type_alias_impl_trait)] -#![feature(lazy_cell)] #![feature(lint_reasons)] #![feature(error_generic_member_access)] #![feature(let_chains)] diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 72ec08c46537..8ee8dc078fe1 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,6 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![expect( + unexpected_cfgs, + reason = "feature(hdfs-backend) is banned https://github.com/risingwavelabs/risingwave/pull/7875" +)] + pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 0ef12f3da3a3..001eb8128a5b 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -116,10 +116,7 @@ impl S3StreamingUploader { /// Reference: const MIN_PART_SIZE: usize = 5 * 1024 * 1024; const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024; - let part_size = config - .upload_part_size - .min(MAX_PART_SIZE) - .max(MIN_PART_SIZE); + let part_size = config.upload_part_size.clamp(MIN_PART_SIZE, MAX_PART_SIZE); Self { client, diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 6b569277c54d..dbb150030194 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -21,7 +21,6 @@ #![feature(map_try_insert)] #![feature(hash_extract_if)] #![feature(btree_extract_if)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(error_generic_member_access)] #![cfg_attr(coverage, feature(coverage_attribute))] diff --git a/src/storage/benches/bench_block_iter.rs b/src/storage/benches/bench_block_iter.rs index f58499e07282..a0ea7cfd844d 100644 --- a/src/storage/benches/bench_block_iter.rs +++ b/src/storage/benches/bench_block_iter.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] use std::sync::LazyLock; use bytes::{BufMut, Bytes, BytesMut}; diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index aeeee6927d36..531652065f01 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::{Arc, LazyLock}; diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 2ad827d8cae5..c894ecf18412 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -22,7 +22,6 @@ #![feature(is_sorted)] #![feature(let_chains)] #![feature(btree_cursors)] -#![feature(lazy_cell)] mod key_cmp; use std::cmp::Ordering; diff --git a/src/storage/hummock_trace/src/lib.rs b/src/storage/hummock_trace/src/lib.rs index 64417832206e..48b0a71010a7 100644 --- a/src/storage/hummock_trace/src/lib.rs +++ b/src/storage/hummock_trace/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] #![feature(cursor_remaining)] #![feature(trait_alias)] #![feature(coroutines)] diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index ab69c45093b4..e11d3e1cee1c 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -31,12 +31,10 @@ #![feature(is_sorted)] #![feature(btree_extract_if)] #![feature(exact_size_is_empty)] -#![feature(lazy_cell)] #![cfg_attr(coverage, feature(coverage_attribute))] #![recursion_limit = "256"] #![feature(error_generic_member_access)] #![feature(let_chains)] -#![feature(exclusive_range_pattern)] #![feature(impl_trait_in_assoc_type)] #![feature(maybe_uninit_uninit_array)] #![feature(maybe_uninit_array_assume_init)] diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 15d60f6546f9..4b837ce6d098 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -193,21 +193,24 @@ impl ChangeLogValue { } pub fn into_op_value_iter(self) -> impl Iterator { - std::iter::from_coroutine(move || match self { - Self::Insert(row) => { - yield (Op::Insert, row); - } - Self::Delete(row) => { - yield (Op::Delete, row); - } - Self::Update { - old_value, - new_value, - } => { - yield (Op::UpdateDelete, old_value); - yield (Op::UpdateInsert, new_value); - } - }) + std::iter::from_coroutine( + #[coroutine] + move || match self { + Self::Insert(row) => { + yield (Op::Insert, row); + } + Self::Delete(row) => { + yield (Op::Delete, row); + } + Self::Update { + old_value, + new_value, + } => { + yield (Op::UpdateDelete, old_value); + yield (Op::UpdateInsert, new_value); + } + }, + ) } } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 7e1c24758624..3ee9e849dda4 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -42,9 +42,27 @@ use crate::monitor::{ use crate::opts::StorageOpts; use crate::StateStore; -pub type HummockStorageType = impl StateStore + AsHummock; -pub type MemoryStateStoreType = impl StateStore + AsHummock; -pub type SledStateStoreType = impl StateStore + AsHummock; +mod opaque_type { + use super::*; + + pub type HummockStorageType = impl StateStore + AsHummock; + pub type MemoryStateStoreType = impl StateStore + AsHummock; + pub type SledStateStoreType = impl StateStore + AsHummock; + + pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType { + may_dynamic_dispatch(state_store) + } + + pub fn hummock(state_store: HummockStorage) -> HummockStorageType { + may_dynamic_dispatch(may_verify(state_store)) + } + + pub fn sled(state_store: SledStateStore) -> SledStateStoreType { + may_dynamic_dispatch(state_store) + } +} +use opaque_type::{hummock, in_memory, sled}; +pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType}; /// The type erased [`StateStore`]. #[derive(Clone, EnumAsInner)] @@ -114,7 +132,7 @@ impl StateStoreImpl { storage_metrics: Arc, ) -> Self { // The specific type of MemoryStateStoreType in deducted here. - Self::MemoryStateStore(may_dynamic_dispatch(state_store).monitored(storage_metrics)) + Self::MemoryStateStore(in_memory(state_store).monitored(storage_metrics)) } pub fn hummock( @@ -122,16 +140,14 @@ impl StateStoreImpl { storage_metrics: Arc, ) -> Self { // The specific type of HummockStateStoreType in deducted here. - Self::HummockStateStore( - may_dynamic_dispatch(may_verify(state_store)).monitored(storage_metrics), - ) + Self::HummockStateStore(hummock(state_store).monitored(storage_metrics)) } pub fn sled( state_store: SledStateStore, storage_metrics: Arc, ) -> Self { - Self::SledStateStore(may_dynamic_dispatch(state_store).monitored(storage_metrics)) + Self::SledStateStore(sled(state_store).monitored(storage_metrics)) } pub fn shared_in_memory_store(storage_metrics: Arc) -> Self { diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index a3fb15eef544..876deabc80f9 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -29,7 +29,6 @@ #![feature(map_try_insert)] #![feature(never_type)] #![feature(btreemap_alloc)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(btree_extract_if)] #![feature(iter_order_by)] diff --git a/src/tests/simulation/src/lib.rs b/src/tests/simulation/src/lib.rs index ae2614e094a6..aa6303b8e2f6 100644 --- a/src/tests/simulation/src/lib.rs +++ b/src/tests/simulation/src/lib.rs @@ -14,7 +14,6 @@ #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(try_blocks)] #![feature(register_tool)] diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 31cc48804464..e45c4a3285ac 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -13,7 +13,6 @@ // limitations under the License. #![cfg_attr(not(madsim), allow(dead_code))] -#![feature(lazy_cell)] use std::path::PathBuf; diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 0ad0725dde40..fe57a2c896f1 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -18,7 +18,6 @@ //! for the rationale behind this approach. #![feature(stmt_expr_attributes)] -#![feature(lazy_cell)] #![feature(extract_if)] mod backfill_tests; diff --git a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs index 30d9e0c73fec..e87659d4f54d 100644 --- a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs +++ b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs @@ -81,7 +81,9 @@ async fn test_delta_join() -> Result<()> { .assert_result_eq(result); #[allow(unused_assignments)] - test_times += 1; + { + test_times += 1; + } }; } diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index 0c215c9146a4..2eb9c59d1bf3 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -14,7 +14,6 @@ #![feature(let_chains)] #![feature(if_let_guard)] -#![feature(lazy_cell)] #![feature(box_patterns)] #![feature(register_tool)] #![register_tool(rw)] diff --git a/src/utils/pgwire/src/lib.rs b/src/utils/pgwire/src/lib.rs index 2279156d89b3..8d1c00541bb9 100644 --- a/src/utils/pgwire/src/lib.rs +++ b/src/utils/pgwire/src/lib.rs @@ -16,7 +16,6 @@ #![feature(trait_alias)] #![feature(iterator_try_collect)] #![feature(trusted_len)] -#![feature(lazy_cell)] #![feature(buf_read_has_data_left)] #![feature(round_char_boundary)] #![feature(never_type)]