diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 88542b4aa5f12..9d00b47bcd3aa 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240812 +export BUILD_ENV_VERSION=v20240911 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 4b1954ff5ae2c..11d29d7236367 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911 depends_on: - mysql - sqlserver-server @@ -85,7 +85,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911 depends_on: - mysql - db @@ -108,12 +108,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -124,7 +124,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911 depends_on: db: condition: service_healthy diff --git a/ci/rust-toolchain b/ci/rust-toolchain index 6bc57a2a65d8f..158ecbbdb0dfd 100644 --- a/ci/rust-toolchain +++ b/ci/rust-toolchain @@ -4,4 +4,4 @@ # 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints [toolchain] -channel = "nightly-2024-06-06" +channel = "nightly-2024-07-19" diff --git a/lints/Cargo.lock b/lints/Cargo.lock index e3b748e6da670..aa1e1e4ef9b32 100644 --- a/lints/Cargo.lock +++ b/lints/Cargo.lock @@ -162,7 +162,8 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clippy_config" -version = "0.1.80" +version = "0.1.81" +source = "git+https://github.com/risingwavelabs/clippy?rev=5135d0218365e85f3371405b604a7fb1459eb256#5135d0218365e85f3371405b604a7fb1459eb256" dependencies = [ "rustc-semver", "serde", @@ -171,12 +172,14 @@ dependencies = [ [[package]] name = "clippy_utils" -version = "0.1.80" +version = "0.1.81" +source = "git+https://github.com/risingwavelabs/clippy?rev=5135d0218365e85f3371405b604a7fb1459eb256#5135d0218365e85f3371405b604a7fb1459eb256" dependencies = [ "arrayvec", "clippy_config", "itertools", "rustc-semver", + "rustc_apfloat", ] [[package]] @@ -869,6 +872,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5be1bdc7edf596692617627bbfeaba522131b18e06ca4df2b6b689e3c5d5ce84" +[[package]] +name = "rustc_apfloat" +version = "0.2.1+llvm-462a31f5a5ab" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "886d94c63c812a8037c4faca2607453a0fa4cf82f734665266876b022244543f" +dependencies = [ + "bitflags 1.3.2", + "smallvec", +] + [[package]] name = "rustfix" version = "0.6.1" @@ -975,6 +988,12 @@ dependencies = [ "digest", ] +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "syn" version = "2.0.39" diff --git a/lints/Cargo.toml b/lints/Cargo.toml index 43ece1f6fc5b7..e0b8fe5d96664 100644 --- a/lints/Cargo.toml +++ b/lints/Cargo.toml @@ -14,7 +14,7 @@ path = "ui/format_error.rs" # See `README.md` before bumping the version. # Remember to update the version in `ci/Dockerfile` as well. [dependencies] -clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "5e2a7c6adebdb0478ee6d5b67ab4ee94153b2997" } +clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "61e1d2fd7062e46ccf1237707ee6da5aac018f70" } dylint_linting = "3.1.0" itertools = "0.12" diff --git a/lints/rust-toolchain b/lints/rust-toolchain index a146af66cd637..31dbc57d04b2b 100644 --- a/lints/rust-toolchain +++ b/lints/rust-toolchain @@ -1,5 +1,5 @@ # See `README.md` before bumping the version. [toolchain] -channel = "nightly-2024-06-06" +channel = "nightly-2024-07-19" components = ["llvm-tools-preview", "rustc-dev"] diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 3bfb583d6459d..863e53035626a 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -162,9 +162,8 @@ impl<'a> Iterator for RowIdIter<'a> { type Item = RowId; fn next(&mut self) -> Option { - self.current_row_id.map(|row_id| { - self.current_row_id = self.next_row_id[row_id]; - row_id + self.current_row_id.inspect(|row_id| { + self.current_row_id = self.next_row_id[*row_id]; }) } } diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 414f27b33b4a7..9b88c3be9cd68 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -20,7 +20,6 @@ #![feature(coroutines)] #![feature(proc_macro_hygiene, stmt_expr_attributes)] #![feature(iterator_try_collect)] -#![feature(lint_reasons)] #![feature(is_sorted)] #![recursion_limit = "256"] #![feature(let_chains)] diff --git a/src/common/benches/bench_data_chunk_encoding.rs b/src/common/benches/bench_data_chunk_encoding.rs index 96413a4305205..4b09aeaeed5c2 100644 --- a/src/common/benches/bench_data_chunk_encoding.rs +++ b/src/common/benches/bench_data_chunk_encoding.rs @@ -55,7 +55,7 @@ fn bench_data_chunk_encoding(c: &mut Criterion) { for null_ratio in NULL_RATIOS { for chunk_size in CHUNK_SIZES { let chunk = rand_chunk::gen_chunk(&case.data_types, *chunk_size, SEED, *null_ratio); - let mut group = c.benchmark_group(&format!( + let mut group = c.benchmark_group(format!( "data chunk encoding: {}, {} rows, Pr[null]={}", case.name, chunk_size, null_ratio )); diff --git a/src/common/benches/bench_sequencer.rs b/src/common/benches/bench_sequencer.rs index 12e92f1f3332d..591b5fd64ee3a 100644 --- a/src/common/benches/bench_sequencer.rs +++ b/src/common/benches/bench_sequencer.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lint_reasons)] - use std::cell::RefCell; use std::hint::black_box; use std::sync::atomic::{AtomicUsize, Ordering}; diff --git a/src/common/common_service/src/lib.rs b/src/common/common_service/src/lib.rs index 2cf9a56e076f3..ecf89a84fce88 100644 --- a/src/common/common_service/src/lib.rs +++ b/src/common/common_service/src/lib.rs @@ -14,7 +14,6 @@ // This is a stub lib.rs. -#![feature(lint_reasons)] #![feature(impl_trait_in_assoc_type)] #![feature(error_generic_member_access)] diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 7751eb591239d..e3417853b0201 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -23,7 +23,6 @@ #![feature(test)] #![feature(trusted_len)] #![feature(allocator_api)] -#![feature(lint_reasons)] #![feature(coroutines)] #![feature(map_try_insert)] #![feature(error_generic_member_access)] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index d91fb56b1cb88..1336a84980cea 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -16,7 +16,6 @@ #![feature(coroutines)] #![feature(type_alias_impl_trait)] #![feature(let_chains)] -#![feature(lint_reasons)] #![feature(impl_trait_in_assoc_type)] #![cfg_attr(coverage, feature(coverage_attribute))] @@ -103,8 +102,9 @@ pub struct ComputeNodeOpts { pub role: Role, /// Used for control the metrics level, similar to log level. - /// 0 = disable metrics - /// >0 = enable metrics + /// + /// level = 0: disable metrics + /// level > 0: enable metrics #[clap(long, hide = true, env = "RW_METRICS_LEVEL")] #[override_opts(path = server.metrics_level)] pub metrics_level: Option, diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index cbf0ad14046f7..2119c1ece4e57 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -21,7 +21,6 @@ #![feature(stmt_expr_attributes)] #![feature(box_patterns)] #![feature(trait_alias)] -#![feature(lint_reasons)] #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 6ee28a2161aa1..f66b5116c110b 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -19,7 +19,6 @@ #![feature(stmt_expr_attributes)] #![feature(box_patterns)] #![feature(trait_alias)] -#![feature(lint_reasons)] #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index f1ac65d79a654..e9c9436fd295f 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -297,10 +297,9 @@ mod tests { .unwrap() .into_iter() .filter(|c| c.cardinality() > 0) - .map(|c| { + .inspect(|c| { // 5 data messages in a single chunk assert_eq!(5, c.cardinality()); - c }) .collect_vec(); diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index ea0e0e4776318..ff9079591a2f5 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -14,11 +14,7 @@ use std::collections::BTreeMap; -use anyhow::{anyhow, Context}; -use futures::future::try_join_all; -use futures::prelude::future::FutureExt; -use futures::prelude::TryFuture; -use futures::TryFutureExt; +use anyhow::anyhow; use google_cloud_gax::conn::Environment; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::apiv1; @@ -26,7 +22,7 @@ use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile use google_cloud_pubsub::client::google_cloud_auth::project; use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider; use google_cloud_pubsub::client::{Client, ClientConfig}; -use google_cloud_pubsub::publisher::{Awaiter, Publisher}; +use google_cloud_pubsub::publisher::Publisher; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use serde_derive::Deserialize; @@ -46,19 +42,33 @@ use crate::dispatch_sink_formatter_str_key_impl; pub const PUBSUB_SINK: &str = "google_pubsub"; const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536; -fn may_delivery_future(awaiter: Vec) -> GooglePubSubSinkDeliveryFuture { - try_join_all(awaiter.into_iter().map(|awaiter| { - awaiter.get().map(|result| { - result - .context("Google Pub/Sub sink error") - .map_err(SinkError::GooglePubSub) - .map(|_| ()) - }) - })) - .map_ok(|_: Vec<()>| ()) - .boxed() +mod delivery_future { + use anyhow::Context; + use futures::future::try_join_all; + use futures::{FutureExt, TryFuture, TryFutureExt}; + use google_cloud_pubsub::publisher::Awaiter; + + use crate::sink::SinkError; + + pub type GooglePubSubSinkDeliveryFuture = + impl TryFuture + Unpin + 'static; + + pub(super) fn may_delivery_future(awaiter: Vec) -> GooglePubSubSinkDeliveryFuture { + try_join_all(awaiter.into_iter().map(|awaiter| { + awaiter.get().map(|result| { + result + .context("Google Pub/Sub sink error") + .map_err(SinkError::GooglePubSub) + .map(|_| ()) + }) + })) + .map_ok(|_: Vec<()>| ()) + .boxed() + } } +use delivery_future::*; + #[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct GooglePubSubConfig { @@ -172,9 +182,6 @@ struct GooglePubSubPayloadWriter<'w> { add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>, } -pub type GooglePubSubSinkDeliveryFuture = - impl TryFuture + Unpin + 'static; - impl GooglePubSubSinkWriter { pub async fn new( config: GooglePubSubConfig, diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs index d85d712c41ac3..463b1f3c9dbd4 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_partition_writer.rs @@ -27,7 +27,6 @@ pub struct MonitoredFanoutPartitionedWriterBuilder { } impl MonitoredFanoutPartitionedWriterBuilder { - #[expect(dead_code)] pub fn new( inner: FanoutPartitionedWriterBuilder, partition_num: LabelGuardedIntGauge<2>, diff --git a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs index dc44434e5d9c2..aebb5939ff143 100644 --- a/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitored_write_writer.rs @@ -28,7 +28,6 @@ pub struct MonitoredWriteWriterBuilder { impl MonitoredWriteWriterBuilder { /// Create writer context. - #[expect(dead_code)] pub fn new( inner: B, write_qps: LabelGuardedIntCounter<2>, diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 49207e668e41b..763d7e9bba49a 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -288,7 +288,7 @@ impl RedisSinkPayloadWriter { return Ok(()); } } - self.pipe.query(self.conn.as_mut().unwrap()).await?; + self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?; self.pipe.clear(); Ok(()) } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 6fcef5d41b654..4988a00b95645 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -23,7 +23,6 @@ use async_trait::async_trait; use await_tree::InstrumentAwait; use futures::future::select; use futures::TryStreamExt; -use itertools::Itertools; use jni::JavaVM; use prost::Message; use risingwave_common::array::StreamChunk; @@ -175,7 +174,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe bail!("Es sink only supports single pk or pk with delimiter option"); } // FIXME: support struct and array in stream sink - param.columns.iter().map(|col| { + param.columns.iter().try_for_each(|col| { match &col.data_type { DataType::Int16 | DataType::Int32 @@ -218,7 +217,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}", col.name, col.data_type, - )))}}).try_collect()?; + )))}})?; let jvm = JVM.get_or_init()?; let sink_param = param.to_proto(); diff --git a/src/dml/src/lib.rs b/src/dml/src/lib.rs index a15a4dfb3fba9..f0034a630a823 100644 --- a/src/dml/src/lib.rs +++ b/src/dml/src/lib.rs @@ -14,7 +14,6 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] -#![feature(lint_reasons)] #![feature(coroutines)] #![feature(hash_extract_if)] #![feature(type_alias_impl_trait)] diff --git a/src/error/src/lib.rs b/src/error/src/lib.rs index 4dde816be458b..010308bf95cc8 100644 --- a/src/error/src/lib.rs +++ b/src/error/src/lib.rs @@ -21,7 +21,6 @@ //! access if `risingwave_common` is already a dependency. #![feature(error_generic_member_access)] -#![feature(lint_reasons)] #![feature(register_tool)] #![register_tool(rw)] #![feature(trait_alias)] diff --git a/src/expr/core/src/lib.rs b/src/expr/core/src/lib.rs index d45d4ca11f80a..73e3b6a6ed2e3 100644 --- a/src/expr/core/src/lib.rs +++ b/src/expr/core/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(let_chains)] -#![feature(lint_reasons)] #![feature(iterator_try_collect)] #![feature(coroutines)] #![feature(never_type)] diff --git a/src/expr/impl/src/lib.rs b/src/expr/impl/src/lib.rs index e5c69c2660eeb..e710749a122d6 100644 --- a/src/expr/impl/src/lib.rs +++ b/src/expr/impl/src/lib.rs @@ -23,7 +23,6 @@ #![allow(non_snake_case)] // for `ctor` generated code #![feature(let_chains)] #![feature(assert_matches)] -#![feature(lint_reasons)] #![feature(iterator_try_collect)] #![feature(coroutines)] #![feature(test)] diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 8fd03e344db89..630c82a87701b 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lint_reasons)] #![feature(let_chains)] use std::vec; diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index f650fa3cb521b..c7acdfa5c4a3c 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -988,10 +988,9 @@ impl ExprImpl { _ => return None, }; let list: Vec<_> = inputs - .map(|expr| { + .inspect(|expr| { // Non constant IN will be bound to OR assert!(expr.is_const()); - expr }) .collect(); diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index d8b484e3d6fa2..d3d5d1623bd58 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -23,7 +23,6 @@ #![feature(if_let_guard)] #![feature(let_chains)] #![feature(assert_matches)] -#![feature(lint_reasons)] #![feature(box_patterns)] #![feature(macro_metavar_expr)] #![feature(min_specialization)] @@ -142,8 +141,9 @@ pub struct FrontendOpts { pub config_path: String, /// Used for control the metrics level, similar to log level. - /// 0 = disable metrics - /// >0 = enable metrics + /// + /// level = 0: disable metrics + /// level > 0: enable metrics #[clap(long, hide = true, env = "RW_METRICS_LEVEL")] #[override_opts(path = server.metrics_level)] pub metrics_level: Option, diff --git a/src/frontend/src/optimizer/delta_join_solver.rs b/src/frontend/src/optimizer/delta_join_solver.rs index 5dc1bb30cc9f9..470fc0426d7d5 100644 --- a/src/frontend/src/optimizer/delta_join_solver.rs +++ b/src/frontend/src/optimizer/delta_join_solver.rs @@ -66,7 +66,8 @@ //! possible that every lookup path produces different distribution. We need to shuffle them //! before feeding data to union. -#![expect(dead_code)] +// FIXME: https://github.com/rust-lang/rust-analyzer/issues/17685 +#![allow(dead_code)] use std::collections::{BTreeMap, BTreeSet}; diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index 7a81b164fbafe..bb78380482752 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -548,11 +548,10 @@ impl ColPrunable for LogicalOverWindow { let new_window_functions = req_cols_win_func_part .indices() .map(|idx| self.window_functions()[idx - input_len].clone()) - .map(|func| { + .inspect(|func| { tmp.extend(func.args.iter().map(|x| x.index())); tmp.extend(func.partition_by.iter().map(|x| x.index())); tmp.extend(func.order_by.iter().map(|x| x.column_index)); - func }) .collect_vec(); (tmp, new_window_functions) diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index 548fda7b92af4..a995dd9878620 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -48,7 +48,7 @@ use std::cmp::min; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; use itertools::Itertools; @@ -962,17 +962,6 @@ impl ExprVisitor for TableScanIoEstimator<'_> { } } -#[derive(Default)] -struct ExprInputRefFinder { - pub input_ref_index_set: HashSet, -} - -impl ExprVisitor for ExprInputRefFinder { - fn visit_input_ref(&mut self, input_ref: &InputRef) { - self.input_ref_index_set.insert(input_ref.index); - } -} - struct ShiftInputRefRewriter { offset: usize, } diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index 86a54cf9c0f98..2d977cfb675e6 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -230,14 +230,13 @@ impl QueryManager { self.query_metrics.clone(), ) .await - .map_err(|err| { + .inspect_err(|_| { // Clean up query execution on error. context .session() .env() .query_manager() .delete_query(&query_id); - err })?; Ok(query_result_fetcher.stream_from_channel()) } diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 88a76d1a1c706..6fa88fd412e31 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lint_reasons)] #![feature(let_chains)] #![cfg_attr(coverage, feature(coverage_attribute))] diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index e2f57d4a26bbb..2e327dc47a59e 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lint_reasons)] #![feature(let_chains)] #![feature(impl_trait_in_assoc_type)] #![cfg_attr(coverage, feature(coverage_attribute))] diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 1e7d9b5dfa759..97b3636e8dba3 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -14,14 +14,13 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; -use std::future::Future; use std::time::Duration; use anyhow::anyhow; use fail::fail_point; use futures::future::try_join_all; use futures::stream::{BoxStream, FuturesUnordered}; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorId; @@ -58,33 +57,47 @@ struct ControlStreamNode { sender: UnboundedSender, } -fn into_future( - worker_id: WorkerId, - stream: BoxStream< - 'static, - risingwave_rpc_client::error::Result, - >, -) -> ResponseStreamFuture { - stream.into_future().map(move |(opt, stream)| { - ( - worker_id, - stream, - opt.ok_or_else(|| anyhow!("end of stream").into()) - .and_then(|result| result.map_err(|e| e.into())), - ) - }) +mod response_stream_future { + use std::future::Future; + + use anyhow::anyhow; + use futures::stream::BoxStream; + use futures::{FutureExt, StreamExt}; + use risingwave_pb::stream_service::StreamingControlStreamResponse; + + use crate::manager::WorkerId; + use crate::MetaResult; + + pub(super) fn into_future( + worker_id: WorkerId, + stream: BoxStream< + 'static, + risingwave_rpc_client::error::Result, + >, + ) -> ResponseStreamFuture { + stream.into_future().map(move |(opt, stream)| { + ( + worker_id, + stream, + opt.ok_or_else(|| anyhow!("end of stream").into()) + .and_then(|result| result.map_err(|e| e.into())), + ) + }) + } + + pub(super) type ResponseStreamFuture = impl Future< + Output = ( + WorkerId, + BoxStream< + 'static, + risingwave_rpc_client::error::Result, + >, + MetaResult, + ), + > + 'static; } -type ResponseStreamFuture = impl Future< - Output = ( - WorkerId, - BoxStream< - 'static, - risingwave_rpc_client::error::Result, - >, - MetaResult, - ), - > + 'static; +use response_stream_future::*; pub(super) struct ControlStreamManager { context: GlobalBarrierManagerContext, @@ -360,7 +373,7 @@ impl ControlStreamManager { self.nodes .iter_mut() - .map(|(node_id, node)| { + .try_for_each(|(node_id, node)| { let actor_ids_to_collect: Vec<_> = pre_applied_graph_info .actor_ids_to_collect(*node_id) .collect(); @@ -427,7 +440,6 @@ impl ControlStreamManager { Result::<_, MetaError>::Ok(()) } }) - .try_collect() .inspect_err(|e| { // Record failure in event log. use risingwave_pb::meta::event_log; diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 2e20d1fcf65ea..807ba6f3fd35f 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -231,12 +231,9 @@ impl HummockManager { let mut is_group_init = false; group_id = *new_compaction_group_id .get_or_try_init(|| async { - next_compaction_group_id(&self.env) - .await - .map(|new_group_id| { - is_group_init = true; - new_group_id - }) + next_compaction_group_id(&self.env).await.inspect(|_| { + is_group_init = true; + }) }) .await?; if is_group_init { diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 61e29b2fb1129..eab9dd1287ebf 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -15,7 +15,6 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] #![feature(type_alias_impl_trait)] -#![feature(lint_reasons)] #![feature(map_try_insert)] #![feature(extract_if)] #![feature(hash_extract_if)] diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 12c1596841f67..4db6711862810 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1811,15 +1811,11 @@ impl CatalogManager { all_table_ids.extend(index_table_ids.iter().cloned()); for index_table_id in &index_table_ids { - let internal_table_ids = match fragment_manager + let internal_table_ids = fragment_manager .select_table_fragments_by_table_id(&(index_table_id.into())) .await .map(|fragments| fragments.internal_table_ids()) - { - Ok(v) => v, - // Handle backwards compat with no state persistence. - Err(_) => vec![], - }; + .unwrap_or_default(); // 1 should be used by table scan. if internal_table_ids.len() == 1 { @@ -1901,15 +1897,11 @@ impl CatalogManager { } all_table_ids.insert(index.index_table_id); - let internal_table_ids = match fragment_manager + let internal_table_ids = fragment_manager .select_table_fragments_by_table_id(&(index.index_table_id.into())) .await .map(|fragments| fragments.internal_table_ids()) - { - Ok(v) => v, - // Handle backwards compat with no state persistence. - Err(_) => vec![], - }; + .unwrap_or_default(); // 1 should be used by table scan. if internal_table_ids.len() == 1 { diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 751ee92beebc1..ae5ca2a610b9c 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -188,10 +188,9 @@ impl ConnectorSourceWorker

{ let source_is_up = |res: i64| { self.source_is_up.set(res); }; - let splits = self.enumerator.list_splits().await.map_err(|e| { + let splits = self.enumerator.list_splits().await.inspect_err(|_| { source_is_up(0); self.fail_cnt += 1; - e })?; source_is_up(1); self.fail_cnt = 0; diff --git a/src/object_store/src/lib.rs b/src/object_store/src/lib.rs index d9e768b7f0290..c70d38eb90a90 100644 --- a/src/object_store/src/lib.rs +++ b/src/object_store/src/lib.rs @@ -14,7 +14,6 @@ #![feature(trait_alias)] #![feature(type_alias_impl_trait)] -#![feature(lint_reasons)] #![feature(error_generic_member_access)] #![feature(let_chains)] diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index c8ad9de582edc..e965f76282da4 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -15,7 +15,6 @@ // for derived code of `Message` #![expect(clippy::all)] #![expect(clippy::doc_markdown)] -#![feature(lint_reasons)] use std::str::FromStr; diff --git a/src/risedevtool/src/lib.rs b/src/risedevtool/src/lib.rs index 57294e5a7eafa..e7b2fdf56f777 100644 --- a/src/risedevtool/src/lib.rs +++ b/src/risedevtool/src/lib.rs @@ -15,7 +15,6 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(exit_status_error)] #![feature(let_chains)] -#![feature(lint_reasons)] mod config; pub use config::*; diff --git a/src/risedevtool/src/task/task_kafka_ready_check.rs b/src/risedevtool/src/task/task_kafka_ready_check.rs index 79838bf8eca66..b749822a1ebe2 100644 --- a/src/risedevtool/src/task/task_kafka_ready_check.rs +++ b/src/risedevtool/src/task/task_kafka_ready_check.rs @@ -42,7 +42,7 @@ impl Task for KafkaReadyCheckTask { let mut config = ClientConfig::new(); config.set( "bootstrap.servers", - &format!("{}:{}", self.config.address, self.config.port), + format!("{}:{}", self.config.address, self.config.port), ); let rt = tokio::runtime::Builder::new_current_thread() diff --git a/src/sqlparser/src/lib.rs b/src/sqlparser/src/lib.rs index a102e5428edae..07967d4cf75a7 100644 --- a/src/sqlparser/src/lib.rs +++ b/src/sqlparser/src/lib.rs @@ -31,7 +31,6 @@ //! ``` #![cfg_attr(not(feature = "std"), no_std)] -#![feature(lint_reasons)] #![feature(let_chains)] #![expect(clippy::doc_markdown)] #![expect(clippy::upper_case_acronyms)] diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 8dfba1b62a181..e543d139b44f0 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -17,7 +17,6 @@ #![feature(type_alias_impl_trait)] #![feature(extract_if)] #![feature(custom_test_frameworks)] -#![feature(lint_reasons)] #![feature(map_try_insert)] #![feature(hash_extract_if)] #![feature(btree_extract_if)] diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 22e70ac759aed..4c503f3d7a8d5 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lint_reasons)] - mod compactor_observer; mod rpc; pub mod server; diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 9e6962ab117aa..12f04dbaf5c33 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -15,7 +15,6 @@ #![feature(async_closure)] #![feature(extract_if)] #![feature(hash_extract_if)] -#![feature(lint_reasons)] #![feature(map_many_mut)] #![feature(type_alias_impl_trait)] #![feature(impl_trait_in_assoc_type)] diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index 9181e37c992e2..7760d7ce530c6 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -31,7 +31,7 @@ use clap::Parser; use foyer::HybridCacheBuilder; use replay_impl::{get_replay_notification_client, GlobalReplayImpl}; use risingwave_common::config::{ - extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, StorageConfig, + extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, }; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_hummock_trace::{ @@ -46,7 +46,6 @@ use risingwave_storage::filter_key_extractor::{ use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig}; use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics}; use risingwave_storage::opts::StorageOpts; -use serde::{Deserialize, Serialize}; // use a large offset to avoid collision with real sstables const SST_OFFSET: u64 = 2147383647000; @@ -183,8 +182,3 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result Option<(HummockValue, EpochWithGap)> { - imm.get(table_key, read_epoch, read_options).map(|v| { + imm.get(table_key, read_epoch, read_options).inspect(|_| { local_stats.get_shared_buffer_hit_counts += 1; - v }) } diff --git a/src/storage/src/hummock/sstable/bloom.rs b/src/storage/src/hummock/sstable/bloom.rs index f2ca47ba00e12..b38a4c10ada30 100644 --- a/src/storage/src/hummock/sstable/bloom.rs +++ b/src/storage/src/hummock/sstable/bloom.rs @@ -102,7 +102,7 @@ impl BloomFilterReader { true } else { let nbits = self.data.bit_len(); - let delta = (h >> 17) | (h << 15); + let delta = h.rotate_left(15); for _ in 0..self.k { let bit_pos = h % (nbits as u32); if !self.data.get_bit(bit_pos as usize) { @@ -171,7 +171,7 @@ impl FilterBuilder for BloomFilterBuilder { filter.resize(nbytes, 0); for h in &self.key_hash_entries { let mut h = *h; - let delta = (h >> 17) | (h << 15); + let delta = h.rotate_left(15); for _ in 0..k { let bit_pos = (h as usize) % nbits; filter.set_bit(bit_pos, true); diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index e11d3e1cee1ca..779062767c7ae 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -18,7 +18,6 @@ #![feature(extract_if)] #![feature(coroutines)] #![feature(hash_extract_if)] -#![feature(lint_reasons)] #![feature(proc_macro_hygiene)] #![feature(stmt_expr_attributes)] #![feature(strict_provenance)] diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 5497b989a0873..c84db97002b02 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -16,7 +16,7 @@ use std::future::Future; use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::pin::Pin; -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::anyhow; use await_tree::InstrumentAwait; @@ -53,18 +53,28 @@ use crate::common::log_store_impl::kv_log_store::serde::{ }; use crate::common::log_store_impl::kv_log_store::KvLogStoreMetrics; -type RewindBackoffPolicy = impl Iterator; pub(crate) const REWIND_BASE_DELAY: Duration = Duration::from_secs(1); pub(crate) const REWIND_BACKOFF_FACTOR: u64 = 2; pub(crate) const REWIND_MAX_DELAY: Duration = Duration::from_secs(180); -fn initial_rewind_backoff_policy() -> RewindBackoffPolicy { - tokio_retry::strategy::ExponentialBackoff::from_millis(REWIND_BASE_DELAY.as_millis() as _) - .factor(REWIND_BACKOFF_FACTOR) - .max_delay(REWIND_MAX_DELAY) - .map(tokio_retry::strategy::jitter) +mod rewind_backoff_policy { + use std::time::Duration; + + use crate::common::log_store_impl::kv_log_store::{ + REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY, + }; + + pub(super) type RewindBackoffPolicy = impl Iterator; + pub(super) fn initial_rewind_backoff_policy() -> RewindBackoffPolicy { + tokio_retry::strategy::ExponentialBackoff::from_millis(REWIND_BASE_DELAY.as_millis() as _) + .factor(REWIND_BACKOFF_FACTOR) + .max_delay(REWIND_MAX_DELAY) + .map(tokio_retry::strategy::jitter) + } } +use rewind_backoff_policy::*; + struct RewindDelay { last_rewind_truncate_offset: Option, backoff_policy: RewindBackoffPolicy, @@ -218,58 +228,71 @@ impl bool> AutoRebuildStateStoreReadIter } } -type TimeoutAutoRebuildIter = - AutoRebuildStateStoreReadIter bool + Send>; +mod timeout_auto_rebuild { + use std::time::{Duration, Instant}; -async fn iter_with_timeout_rebuild( - state_store: S, - range: TableKeyRange, - epoch: HummockEpoch, - options: ReadOptions, - timeout: Duration, -) -> StorageResult> { - const CHECK_TIMEOUT_PERIOD: usize = 100; - // use a struct here to avoid accidental copy instead of move on primitive usize - struct Count(usize); - let mut check_count = Count(0); - let mut total_count = Count(0); - let mut curr_iter_item_count = Count(0); - let mut start_time = Instant::now(); - let initial_start_time = start_time; - AutoRebuildStateStoreReadIter::new( - state_store, - move || { - check_count.0 += 1; - curr_iter_item_count.0 += 1; - total_count.0 += 1; - if check_count.0 == CHECK_TIMEOUT_PERIOD { - check_count.0 = 0; - if start_time.elapsed() > timeout { - let prev_iter_item_count = curr_iter_item_count.0; - curr_iter_item_count.0 = 0; - start_time = Instant::now(); - info!( - table_id = options.table_id.table_id, - iter_exist_time_secs = initial_start_time.elapsed().as_secs(), - prev_iter_item_count, - total_iter_item_count = total_count.0, - "kv log store iter is rebuilt" - ); - true + use risingwave_hummock_sdk::key::TableKeyRange; + use risingwave_hummock_sdk::HummockEpoch; + use risingwave_storage::error::StorageResult; + use risingwave_storage::store::{ReadOptions, StateStoreRead}; + + use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter; + + pub(super) type TimeoutAutoRebuildIter = + AutoRebuildStateStoreReadIter bool + Send>; + + pub(super) async fn iter_with_timeout_rebuild( + state_store: S, + range: TableKeyRange, + epoch: HummockEpoch, + options: ReadOptions, + timeout: Duration, + ) -> StorageResult> { + const CHECK_TIMEOUT_PERIOD: usize = 100; + // use a struct here to avoid accidental copy instead of move on primitive usize + struct Count(usize); + let mut check_count = Count(0); + let mut total_count = Count(0); + let mut curr_iter_item_count = Count(0); + let mut start_time = Instant::now(); + let initial_start_time = start_time; + AutoRebuildStateStoreReadIter::new( + state_store, + move || { + check_count.0 += 1; + curr_iter_item_count.0 += 1; + total_count.0 += 1; + if check_count.0 == CHECK_TIMEOUT_PERIOD { + check_count.0 = 0; + if start_time.elapsed() > timeout { + let prev_iter_item_count = curr_iter_item_count.0; + curr_iter_item_count.0 = 0; + start_time = Instant::now(); + info!( + table_id = options.table_id.table_id, + iter_exist_time_secs = initial_start_time.elapsed().as_secs(), + prev_iter_item_count, + total_iter_item_count = total_count.0, + "kv log store iter is rebuilt" + ); + true + } else { + false + } } else { false } - } else { - false - } - }, - range, - epoch, - options, - ) - .await + }, + range, + epoch, + options, + ) + .await + } } +use timeout_auto_rebuild::*; + impl bool + Send> StateStoreIter for AutoRebuildStateStoreReadIter { diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index e00a0da45979a..7ecac2c625e69 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -15,16 +15,13 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use anyhow::{anyhow, Context as _}; -use futures::pin_mut; -use futures_async_stream::try_stream; +use anyhow::anyhow; +use local_input::LocalInputStreamInner; use pin_project::pin_project; use risingwave_common::util::addr::{is_local_address, HostAddr}; -use risingwave_pb::task_service::{permits, GetStreamResponse}; use risingwave_rpc_client::ComputeClientPool; use tokio::sync::mpsc; -use super::error::ExchangeChannelClosed; use super::permit::Receiver; use crate::executor::prelude::*; use crate::executor::{DispatcherBarrier, DispatcherMessage}; @@ -64,7 +61,6 @@ pub struct LocalInput { actor_id: ActorId, } -type LocalInputStreamInner = impl MessageStream; async fn process_msg<'a>( msg: DispatcherMessage, @@ -110,7 +106,7 @@ impl LocalInput { local_barrier_manager: LocalBarrierManager, ) -> Self { Self { - inner: Self::run( + inner: local_input::run( channel, upstream_actor_id, self_actor_id, @@ -119,9 +115,36 @@ impl LocalInput { actor_id: upstream_actor_id, } } +} + +mod local_input { + use await_tree::InstrumentAwait; + + use crate::executor::exchange::error::ExchangeChannelClosed; + use crate::executor::exchange::input::process_msg; + use crate::executor::exchange::permit::Receiver; + use crate::executor::prelude::try_stream; + use crate::executor::{Message, StreamExecutorError}; + use crate::task::{ActorId, LocalBarrierManager}; + + pub(super) type LocalInputStreamInner = impl crate::executor::MessageStream; + + pub(super) fn run( + channel: Receiver, + upstream_actor_id: ActorId, + self_actor_id: ActorId, + local_barrier_manager: LocalBarrierManager, + ) -> LocalInputStreamInner { + run_inner( + channel, + upstream_actor_id, + self_actor_id, + local_barrier_manager, + ) + } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn run( + async fn run_inner( mut channel: Receiver, upstream_actor_id: ActorId, self_actor_id: ActorId, @@ -166,7 +189,8 @@ pub struct RemoteInput { actor_id: ActorId, } -type RemoteInputStreamInner = impl MessageStream; + +use remote_input::RemoteInputStreamInner; impl RemoteInput { /// Create a remote input from compute client and related info. Should provide the corresponding @@ -184,7 +208,7 @@ impl RemoteInput { Self { actor_id, - inner: Self::run( + inner: remote_input::run( local_barrier_manager, client_pool, upstream_addr, @@ -195,9 +219,48 @@ impl RemoteInput { ), } } +} + +mod remote_input { + use std::sync::Arc; + + use anyhow::Context; + use await_tree::InstrumentAwait; + use risingwave_common::util::addr::HostAddr; + use risingwave_pb::task_service::{permits, GetStreamResponse}; + use risingwave_rpc_client::ComputeClientPool; + + use crate::executor::exchange::error::ExchangeChannelClosed; + use crate::executor::exchange::input::process_msg; + use crate::executor::monitor::StreamingMetrics; + use crate::executor::prelude::{pin_mut, try_stream, StreamExt}; + use crate::executor::{DispatcherMessage, Message, StreamExecutorError}; + use crate::task::{LocalBarrierManager, UpDownActorIds, UpDownFragmentIds}; + + pub(super) type RemoteInputStreamInner = impl crate::executor::MessageStream; + + pub(super) fn run( + local_barrier_manager: LocalBarrierManager, + client_pool: ComputeClientPool, + upstream_addr: HostAddr, + up_down_ids: UpDownActorIds, + up_down_frag: UpDownFragmentIds, + metrics: Arc, + batched_permits_limit: usize, + ) -> RemoteInputStreamInner { + run_inner( + local_barrier_manager, + client_pool, + upstream_addr, + up_down_ids, + up_down_frag, + metrics, + batched_permits_limit, + ) + } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn run( + async fn run_inner( local_barrier_manager: LocalBarrierManager, client_pool: ComputeClientPool, upstream_addr: HostAddr, diff --git a/src/stream/src/executor/nested_loop_temporal_join.rs b/src/stream/src/executor/nested_loop_temporal_join.rs index 0888d8981fc8c..55d21b468a777 100644 --- a/src/stream/src/executor/nested_loop_temporal_join.rs +++ b/src/stream/src/executor/nested_loop_temporal_join.rs @@ -98,8 +98,7 @@ async fn phase1_handle_chunk( } impl NestedLoopTemporalJoinExecutor { - #[allow(clippy::too_many_arguments)] - #[expect(dead_code)] + #[expect(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, info: ExecutorInfo, diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index 30de0804b0ac0..bd22e47c737ad 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -16,7 +16,7 @@ use std::pin::Pin; use std::task::Poll; use either::Either; -use futures::stream::{select_with_strategy, BoxStream, PollNext, SelectWithStrategy}; +use futures::stream::BoxStream; use futures::{Stream, StreamExt, TryStreamExt}; use crate::executor::error::StreamExecutorResult; @@ -25,8 +25,34 @@ use crate::executor::Message; type ExecutorMessageStream = BoxStream<'static, StreamExecutorResult>; type StreamReaderData = StreamExecutorResult>; type ReaderArm = BoxStream<'static, StreamReaderData>; -type StreamReaderWithPauseInner = - SelectWithStrategy, ReaderArm, impl FnMut(&mut PollNext) -> PollNext, PollNext>; + +mod stream_reader_with_pause { + use futures::stream::{select_with_strategy, PollNext, SelectWithStrategy}; + + use crate::executor::stream_reader::ReaderArm; + + pub(super) type StreamReaderWithPauseInner = SelectWithStrategy< + ReaderArm, + ReaderArm, + impl FnMut(&mut PollNext) -> PollNext, + PollNext, + >; + + pub(super) fn new_inner( + message_stream: ReaderArm, + data_stream: ReaderArm, + ) -> StreamReaderWithPauseInner { + let strategy = if BIASED { + |_: &mut PollNext| PollNext::Left + } else { + // The poll strategy is not biased: we poll the two streams in a round robin way. + |last: &mut PollNext| last.toggle() + }; + select_with_strategy(message_stream, data_stream, strategy) + } +} + +use stream_reader_with_pause::*; /// [`StreamReaderWithPause`] merges two streams, with one receiving barriers (and maybe other types /// of messages) and the other receiving data only (no barrier). The merged stream can be paused @@ -40,7 +66,7 @@ type StreamReaderWithPauseInner = /// priority over the right-hand one. Otherwise, the two streams will be polled in a round robin /// fashion. pub(super) struct StreamReaderWithPause { - inner: StreamReaderWithPauseInner, + inner: StreamReaderWithPauseInner, /// Whether the source stream is paused. paused: bool, } @@ -54,26 +80,13 @@ impl StreamReaderWithPause { ) -> Self { let message_stream_arm = message_stream.map_ok(Either::Left).boxed(); let data_stream_arm = data_stream.map_ok(Either::Right).boxed(); - let inner = Self::new_inner(message_stream_arm, data_stream_arm); + let inner = new_inner(message_stream_arm, data_stream_arm); Self { inner, paused: false, } } - fn new_inner( - message_stream: ReaderArm, - data_stream: ReaderArm, - ) -> StreamReaderWithPauseInner { - let strategy = if BIASED { - |_: &mut PollNext| PollNext::Left - } else { - // The poll strategy is not biased: we poll the two streams in a round robin way. - |last: &mut PollNext| last.toggle() - }; - select_with_strategy(message_stream, data_stream, strategy) - } - /// Replace the data stream with a new one for given `stream`. Used for split change. pub fn replace_data_stream( &mut self, @@ -87,7 +100,7 @@ impl StreamReaderWithPause { // Note: create a new `SelectWithStrategy` instead of replacing the source stream arm here, // to ensure the internal state of the `SelectWithStrategy` is reset. (#6300) - self.inner = Self::new_inner( + self.inner = new_inner( barrier_receiver_arm, data_stream.map_ok(Either::Right).boxed(), ); diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 876deabc80f98..577b829945620 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -17,7 +17,6 @@ #![feature(trait_alias)] #![feature(type_alias_impl_trait)] #![feature(more_qualified_paths)] -#![feature(lint_reasons)] #![feature(let_chains)] #![feature(hash_extract_if)] #![feature(extract_if)] diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 43a979af2b568..8f4ab2b49ea2e 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -42,7 +42,7 @@ use super::{BarrierCompleteResult, SubscribeMutationItem}; use crate::error::{StreamError, StreamResult}; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; -use crate::task::{await_tree_key, ActorId, PartialGraphId, SharedContext, StreamActorManager}; +use crate::task::{ActorId, PartialGraphId, SharedContext, StreamActorManager}; struct IssuedState { pub mutation: Option>, @@ -88,8 +88,59 @@ pub(super) struct BarrierState { inner: ManagedBarrierStateInner, } -type AwaitEpochCompletedFuture = - impl Future)> + 'static; +mod await_epoch_completed_future { + use std::future::Future; + + use futures::future::BoxFuture; + use futures::FutureExt; + use risingwave_hummock_sdk::SyncResult; + use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; + + use crate::error::StreamResult; + use crate::executor::Barrier; + use crate::task::{await_tree_key, BarrierCompleteResult}; + + pub(super) type AwaitEpochCompletedFuture = + impl Future)> + 'static; + + pub(super) fn instrument_complete_barrier_future( + complete_barrier_future: Option>>, + barrier: Barrier, + barrier_await_tree_reg: Option<&await_tree::Registry>, + create_mview_progress: Vec, + ) -> AwaitEpochCompletedFuture { + let prev_epoch = barrier.epoch.prev; + let future = async move { + if let Some(future) = complete_barrier_future { + let result = future.await; + result.map(Some) + } else { + Ok(None) + } + } + .map(move |result| { + ( + barrier, + result.map(|sync_result| BarrierCompleteResult { + sync_result, + create_mview_progress, + }), + ) + }); + if let Some(reg) = barrier_await_tree_reg { + reg.register( + await_tree_key::BarrierAwait { prev_epoch }, + format!("SyncEpoch({})", prev_epoch), + ) + .instrument(future) + .left_future() + } else { + future.right_future() + } + } +} + +use await_epoch_completed_future::*; fn sync_epoch( state_store: &S, @@ -787,33 +838,12 @@ impl PartialGraphManagedBarrierState { let barrier = barrier_state.barrier.clone(); self.await_epoch_completed_futures.push_back({ - let future = async move { - if let Some(future) = complete_barrier_future { - let result = future.await; - result.map(Some) - } else { - Ok(None) - } - } - .map(move |result| { - ( - barrier, - result.map(|sync_result| BarrierCompleteResult { - sync_result, - create_mview_progress, - }), - ) - }); - if let Some(reg) = &self.barrier_await_tree_reg { - reg.register( - await_tree_key::BarrierAwait { prev_epoch }, - format!("SyncEpoch({})", prev_epoch), - ) - .instrument(future) - .left_future() - } else { - future.right_future() - } + instrument_complete_barrier_future( + complete_barrier_future, + barrier, + self.barrier_await_tree_reg.as_ref(), + create_mview_progress, + ) }); } } diff --git a/src/tests/simulation/src/lib.rs b/src/tests/simulation/src/lib.rs index aa6303b8e2f65..af9cf158a3350 100644 --- a/src/tests/simulation/src/lib.rs +++ b/src/tests/simulation/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(trait_alias)] -#![feature(lint_reasons)] #![feature(let_chains)] #![feature(try_blocks)] #![feature(register_tool)] diff --git a/src/utils/futures_util/src/lib.rs b/src/utils/futures_util/src/lib.rs index 4d086951dbb5f..115da2e7676f9 100644 --- a/src/utils/futures_util/src/lib.rs +++ b/src/utils/futures_util/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lint_reasons)] - use std::future::Future; use futures::stream::TryStream; diff --git a/src/utils/iter_util/src/lib.rs b/src/utils/iter_util/src/lib.rs index 58758c64a1ce5..92f19a0ee46fc 100644 --- a/src/utils/iter_util/src/lib.rs +++ b/src/utils/iter_util/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lint_reasons)] - pub trait ZipEqFast: ExactSizeIterator + Sized where B::IntoIter: ExactSizeIterator, diff --git a/src/utils/local_stats_alloc/src/lib.rs b/src/utils/local_stats_alloc/src/lib.rs index 3950d0cb4931e..94265768815c2 100644 --- a/src/utils/local_stats_alloc/src/lib.rs +++ b/src/utils/local_stats_alloc/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(allocator_api)] -#![feature(lint_reasons)] use std::alloc::Allocator; use std::ops::Deref; diff --git a/src/utils/pgwire/src/lib.rs b/src/utils/pgwire/src/lib.rs index 8d1c00541bb95..fae5489e81097 100644 --- a/src/utils/pgwire/src/lib.rs +++ b/src/utils/pgwire/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lint_reasons)] #![feature(trait_alias)] #![feature(iterator_try_collect)] #![feature(trusted_len)]