Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: bump toolchain to 2024-07-19 #18470

Merged
merged 13 commits into from
Sep 11, 2024
2 changes: 1 addition & 1 deletion ci/rust-toolchain
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints

[toolchain]
channel = "nightly-2024-06-06"
channel = "nightly-2024-07-19"
23 changes: 21 additions & 2 deletions lints/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "ui/format_error.rs"
# See `README.md` before bumping the version.
# Remember to update the version in `ci/Dockerfile` as well.
[dependencies]
clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "5e2a7c6adebdb0478ee6d5b67ab4ee94153b2997" }
clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "61e1d2fd7062e46ccf1237707ee6da5aac018f70" }
dylint_linting = "3.1.0"
itertools = "0.12"

Expand Down
2 changes: 1 addition & 1 deletion lints/rust-toolchain
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# See `README.md` before bumping the version.

[toolchain]
channel = "nightly-2024-06-06"
channel = "nightly-2024-07-19"
components = ["llvm-tools-preview", "rustc-dev"]
5 changes: 2 additions & 3 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ impl<'a> Iterator for RowIdIter<'a> {
type Item = RowId;

fn next(&mut self) -> Option<Self::Item> {
self.current_row_id.map(|row_id| {
self.current_row_id = self.next_row_id[row_id];
row_id
self.current_row_id.inspect(|row_id| {
self.current_row_id = self.next_row_id[*row_id];
})
}
}
Expand Down
1 change: 0 additions & 1 deletion src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#![feature(coroutines)]
#![feature(proc_macro_hygiene, stmt_expr_attributes)]
#![feature(iterator_try_collect)]
#![feature(lint_reasons)]
#![feature(is_sorted)]
#![recursion_limit = "256"]
#![feature(let_chains)]
Expand Down
2 changes: 1 addition & 1 deletion src/common/benches/bench_data_chunk_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn bench_data_chunk_encoding(c: &mut Criterion) {
for null_ratio in NULL_RATIOS {
for chunk_size in CHUNK_SIZES {
let chunk = rand_chunk::gen_chunk(&case.data_types, *chunk_size, SEED, *null_ratio);
let mut group = c.benchmark_group(&format!(
let mut group = c.benchmark_group(format!(
"data chunk encoding: {}, {} rows, Pr[null]={}",
case.name, chunk_size, null_ratio
));
Expand Down
2 changes: 0 additions & 2 deletions src/common/benches/bench_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]

use std::cell::RefCell;
use std::hint::black_box;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down
1 change: 0 additions & 1 deletion src/common/common_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

// This is a stub lib.rs.

#![feature(lint_reasons)]
#![feature(impl_trait_in_assoc_type)]
#![feature(error_generic_member_access)]

Expand Down
1 change: 0 additions & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#![feature(test)]
#![feature(trusted_len)]
#![feature(allocator_api)]
#![feature(lint_reasons)]
#![feature(coroutines)]
#![feature(map_try_insert)]
#![feature(error_generic_member_access)]
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#![feature(coroutines)]
#![feature(type_alias_impl_trait)]
#![feature(let_chains)]
#![feature(lint_reasons)]
#![feature(impl_trait_in_assoc_type)]
#![cfg_attr(coverage, feature(coverage_attribute))]

Expand Down Expand Up @@ -103,8 +102,9 @@ pub struct ComputeNodeOpts {
pub role: Role,

/// Used for control the metrics level, similar to log level.
/// 0 = disable metrics
/// >0 = enable metrics
///
/// level = 0: disable metrics
/// level > 0: enable metrics
#[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
#[override_opts(path = server.metrics_level)]
pub metrics_level: Option<MetricLevel>,
Expand Down
1 change: 0 additions & 1 deletion src/connector/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#![feature(stmt_expr_attributes)]
#![feature(box_patterns)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![feature(box_into_inner)]
#![feature(type_alias_impl_trait)]
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#![feature(stmt_expr_attributes)]
#![feature(box_patterns)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![feature(box_into_inner)]
#![feature(type_alias_impl_trait)]
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,9 @@ mod tests {
.unwrap()
.into_iter()
.filter(|c| c.cardinality() > 0)
.map(|c| {
.inspect(|c| {
// 5 data messages in a single chunk
assert_eq!(5, c.cardinality());
c
})
.collect_vec();

Expand Down
47 changes: 27 additions & 20 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@

use std::collections::BTreeMap;

use anyhow::{anyhow, Context};
use futures::future::try_join_all;
use futures::prelude::future::FutureExt;
use futures::prelude::TryFuture;
use futures::TryFutureExt;
use anyhow::anyhow;
use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::apiv1;
use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
use google_cloud_pubsub::client::google_cloud_auth::project;
use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use google_cloud_pubsub::publisher::Publisher;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::Deserialize;
Expand All @@ -46,19 +42,33 @@ use crate::dispatch_sink_formatter_str_key_impl;
pub const PUBSUB_SINK: &str = "google_pubsub";
const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;

fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
mod delivery_future {
use anyhow::Context;
use futures::future::try_join_all;
use futures::{FutureExt, TryFuture, TryFutureExt};
use google_cloud_pubsub::publisher::Awaiter;

use crate::sink::SinkError;

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
}
}

use delivery_future::*;

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct GooglePubSubConfig {
Expand Down Expand Up @@ -172,9 +182,6 @@ struct GooglePubSubPayloadWriter<'w> {
add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
}

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl GooglePubSubSinkWriter {
pub async fn new(
config: GooglePubSubConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct MonitoredFanoutPartitionedWriterBuilder<B: IcebergWriterBuilder> {
}

impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriterBuilder<B> {
#[expect(dead_code)]
pub fn new(
inner: FanoutPartitionedWriterBuilder<B>,
partition_num: LabelGuardedIntGauge<2>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub struct MonitoredWriteWriterBuilder<B: IcebergWriterBuilder> {

impl<B: IcebergWriterBuilder> MonitoredWriteWriterBuilder<B> {
/// Create writer context.
#[expect(dead_code)]
pub fn new(
inner: B,
write_qps: LabelGuardedIntCounter<2>,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl RedisSinkPayloadWriter {
return Ok(());
}
}
self.pipe.query(self.conn.as_mut().unwrap()).await?;
self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
self.pipe.clear();
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use async_trait::async_trait;
use await_tree::InstrumentAwait;
use futures::future::select;
use futures::TryStreamExt;
use itertools::Itertools;
use jni::JavaVM;
use prost::Message;
use risingwave_common::array::StreamChunk;
Expand Down Expand Up @@ -175,7 +174,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
bail!("Es sink only supports single pk or pk with delimiter option");
}
// FIXME: support struct and array in stream sink
param.columns.iter().map(|col| {
param.columns.iter().try_for_each(|col| {
match &col.data_type {
DataType::Int16
| DataType::Int32
Expand Down Expand Up @@ -218,7 +217,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
"remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
col.name,
col.data_type,
)))}}).try_collect()?;
)))}})?;

let jvm = JVM.get_or_init()?;
let sink_param = param.to_proto();
Expand Down
1 change: 0 additions & 1 deletion src/dml/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(coroutines)]
#![feature(hash_extract_if)]
#![feature(type_alias_impl_trait)]
Expand Down
1 change: 0 additions & 1 deletion src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//! access if `risingwave_common` is already a dependency.

#![feature(error_generic_member_access)]
#![feature(lint_reasons)]
#![feature(register_tool)]
#![register_tool(rw)]
#![feature(trait_alias)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#![feature(let_chains)]
#![feature(lint_reasons)]
#![feature(iterator_try_collect)]
#![feature(coroutines)]
#![feature(never_type)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#![allow(non_snake_case)] // for `ctor` generated code
#![feature(let_chains)]
#![feature(assert_matches)]
#![feature(lint_reasons)]
#![feature(iterator_try_collect)]
#![feature(coroutines)]
#![feature(test)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]
#![feature(let_chains)]

use std::vec;
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,10 +988,9 @@ impl ExprImpl {
_ => return None,
};
let list: Vec<_> = inputs
.map(|expr| {
.inspect(|expr| {
// Non constant IN will be bound to OR
assert!(expr.is_const());
expr
})
.collect();

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#![feature(if_let_guard)]
#![feature(let_chains)]
#![feature(assert_matches)]
#![feature(lint_reasons)]
#![feature(box_patterns)]
#![feature(macro_metavar_expr)]
#![feature(min_specialization)]
Expand Down Expand Up @@ -142,8 +141,9 @@ pub struct FrontendOpts {
pub config_path: String,

/// Used for control the metrics level, similar to log level.
/// 0 = disable metrics
/// >0 = enable metrics
///
/// level = 0: disable metrics
/// level > 0: enable metrics
#[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
#[override_opts(path = server.metrics_level)]
pub metrics_level: Option<MetricLevel>,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/delta_join_solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
//! possible that every lookup path produces different distribution. We need to shuffle them
//! before feeding data to union.

#![expect(dead_code)]
// FIXME: https://github.com/rust-lang/rust-analyzer/issues/17685
#![allow(dead_code)]

use std::collections::{BTreeMap, BTreeSet};

Expand Down
Loading
Loading