Skip to content

Commit

Permalink
bump toolchain
Browse files Browse the repository at this point in the history
Signed-off-by: TennyZhuang <[email protected]>
  • Loading branch information
TennyZhuang committed Jul 28, 2023
1 parent 94feca2 commit 9d3e587
Show file tree
Hide file tree
Showing 37 changed files with 75 additions and 69 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"
members = [
"scripts/source/prepare_ci_pubsub",
"src/batch",
Expand Down
2 changes: 1 addition & 1 deletion ci/rust-toolchain
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2023-05-31"
channel = "nightly-2023-07-28"
2 changes: 1 addition & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#![allow(rustdoc::private_intra_doc_links)]
#![feature(drain_filter)]
#![feature(extract_if)]
#![feature(trait_alias)]
#![feature(binary_heap_drain_sorted)]
#![feature(is_sorted)]
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn place_vnode(
// evenly among workers.
let mut selected_pu_ids = Vec::new();
while !new_pus.is_empty() {
new_pus.drain_filter(|ps| {
let _ = new_pus.extract_if(|ps| {
if let Some(p) = ps.next() {
selected_pu_ids.push(p.id);
false
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#![feature(let_chains)]
#![feature(hash_drain_filter)]
#![feature(hash_extract_if)]

use anyhow::Result;
use clap::{Parser, Subcommand};
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub fn factorization_expr(expr: ExprImpl) -> Vec<ExprImpl> {
let (last, remaining) = disjunctions.split_last_mut().unwrap();
// now greatest_common_factor == [C, D]
let greatest_common_divider: Vec<_> = last
.drain_filter(|factor| remaining.iter().all(|expr| expr.contains(factor)))
.extract_if(|factor| remaining.iter().all(|expr| expr.contains(factor)))
.collect();
for disjunction in remaining {
// remove common factors
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub async fn handle_alter_table_column(
// Locate the column by name and remove it.
let column_name = column_name.real_value();
let removed_column = columns
.drain_filter(|c| c.name.real_value() == column_name)
.extract_if(|c| c.name.real_value() == column_name)
.at_most_one()
.ok()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#![feature(generators)]
#![feature(proc_macro_hygiene, stmt_expr_attributes)]
#![feature(trait_alias)]
#![feature(drain_filter)]
#![feature(extract_if)]
#![feature(if_let_guard)]
#![feature(let_chains)]
#![feature(assert_matches)]
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ pub fn push_down_into_join(
// Do not push now on to the on, it will be pulled up into a filter instead.
let on = Condition {
conjunctions: conjunctions
.drain_filter(|expr| expr.count_nows() == 0)
.extract_if(|expr| expr.count_nows() == 0)
.collect(),
};
predicate.conjunctions = conjunctions;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl LogicalJoin {
Condition {
conjunctions: others
.conjunctions
.drain_filter(|expr| expr.count_nows() == 0)
.extract_if(|expr| expr.count_nows() == 0)
.collect(),
}
} else {
Expand Down Expand Up @@ -651,8 +651,8 @@ impl ExprRewritable for LogicalJoin {
/// then we proceed. Else abort.
/// 2. Then, we collect `InputRef`s in the conjunction.
/// 3. If they are all columns in the given side of join eq condition, then we proceed. Else abort.
/// 4. We then rewrite the `ExprImpl`, by replacing `InputRef` column indices with
/// the equivalent in the other side.
/// 4. We then rewrite the `ExprImpl`, by replacing `InputRef` column indices with the equivalent in
/// the other side.
///
/// # Arguments
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ impl ColPrunable for LogicalOverWindow {

let (req_cols_input_part, req_cols_win_func_part) = {
let mut in_input = required_cols.to_vec();
let in_win_funcs: IndexSet = in_input.drain_filter(|i| *i >= input_len).collect();
let in_win_funcs: IndexSet = in_input.extract_if(|i| *i >= input_len).collect();
(IndexSet::from(in_input), in_win_funcs)
};

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl PredicatePushdown for LogicalScan {
}
let non_pushable_predicate: Vec<_> = predicate
.conjunctions
.drain_filter(|expr| expr.count_nows() > 0 || HasCorrelated {}.visit_expr(expr))
.extract_if(|expr| expr.count_nows() > 0 || HasCorrelated {}.visit_expr(expr))
.collect();
let predicate = predicate.rewrite_expr(&mut ColIndexMapping::with_target_size(
self.output_col_idx().iter().map(|i| Some(*i)).collect(),
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ where
async fn finish_commands(&mut self, checkpoint: bool) -> MetaResult<bool> {
for command in self
.finished_commands
.drain_filter(|c| checkpoint || c.context.kind.is_barrier())
.extract_if(|c| checkpoint || c.context.kind.is_barrier())
{
// The command is ready to finish. We can now call `pre_finish`.
command.context.pre_finish().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<S: MetaStore> HummockManager<S> {
== Some(true);
let mut pairs = vec![];
if let Some(mv_table) = mv_table {
if internal_tables.drain_filter(|t| *t == mv_table).count() > 0 {
if internal_tables.extract_if(|t| *t == mv_table).count() > 0 {
tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
}
// materialized_view
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
#![feature(trait_alias)]
#![feature(binary_heap_drain_sorted)]
#![feature(type_alias_impl_trait)]
#![feature(drain_filter)]
#![feature(extract_if)]
#![feature(custom_test_frameworks)]
#![feature(lint_reasons)]
#![feature(map_try_insert)]
#![feature(hash_drain_filter)]
#![feature(btree_drain_filter)]
#![feature(hash_extract_if)]
#![feature(btree_extract_if)]
#![feature(result_option_inspect)]
#![feature(lazy_cell)]
#![feature(let_chains)]
Expand All @@ -32,7 +32,6 @@
#![cfg_attr(coverage, feature(no_coverage))]
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]
#![feature(is_sorted)]
#![feature(string_leak)]
#![feature(impl_trait_in_assoc_type)]
#![feature(type_name_of_val)]

Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ where
assert!(actor_id_set.contains(actor_id));
}

actors.drain_filter(|actor_id| to_remove.contains(actor_id));
actors.retain(|actor_id| !to_remove.contains(actor_id));
actors.extend_from_slice(to_create);
}

Expand Down Expand Up @@ -775,7 +775,7 @@ where
for table_id in to_update_table_fragments {
// Takes out the reschedules of the fragments in this table.
let reschedules = reschedules
.drain_filter(|fragment_id, _| {
.extract_if(|fragment_id, _| {
table_fragments
.get(&table_id)
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ impl ClusterManagerCore {
let mut streaming_worker_node = self.list_streaming_worker_node(Some(State::Running));

let unschedulable_worker_node = streaming_worker_node
.drain_filter(|worker| {
.extract_if(|worker| {
worker
.property
.as_ref()
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,7 @@ where
{
dispatcher
.downstream_actor_id
.drain_filter(|id| downstream_actors_to_remove.contains_key(id));
.retain(|id| !downstream_actors_to_remove.contains_key(id));
}

if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
Expand Down Expand Up @@ -1828,8 +1828,8 @@ where
}
}

target_plan.drain_filter(|_, plan| {
plan.added_parallel_units.is_empty() && plan.removed_parallel_units.is_empty()
target_plan.retain(|_, plan| {
!plan.added_parallel_units.is_empty() && plan.removed_parallel_units.is_empty()
});

Ok(target_plan)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl Scheduler {
// Visit the parallel units in a round-robin manner on each worker.
let mut round_robin = Vec::new();
while !parallel_units.is_empty() {
parallel_units.drain_filter(|ps| {
let _ = parallel_units.extract_if(|ps| {
if let Some(p) = ps.next() {
round_robin.push(p);
false
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#![feature(associated_type_defaults)]
#![feature(generators)]
#![feature(iterator_try_collect)]
#![feature(hash_drain_filter)]
#![feature(hash_extract_if)]
#![feature(try_blocks)]
#![feature(let_chains)]
#![feature(impl_trait_in_assoc_type)]
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#![feature(lint_reasons)]
#![feature(result_option_inspect)]
#![feature(generators)]
#![feature(hash_drain_filter)]
#![feature(hash_extract_if)]
#![feature(type_alias_impl_trait)]
#![feature(box_patterns)]

Expand Down
6 changes: 3 additions & 3 deletions src/storage/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
#![feature(trait_alias)]
#![feature(binary_heap_drain_sorted)]
#![feature(type_alias_impl_trait)]
#![feature(drain_filter)]
#![feature(extract_if)]
#![feature(custom_test_frameworks)]
#![feature(lint_reasons)]
#![feature(map_try_insert)]
#![feature(hash_drain_filter)]
#![feature(btree_drain_filter)]
#![feature(hash_extract_if)]
#![feature(btree_extract_if)]
#![feature(result_option_inspect)]
#![feature(lazy_cell)]
#![feature(let_chains)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl HummockVersionUpdateExt for HummockVersion {
);
sub_level
.table_infos
.drain_filter(|sst_info| sst_info.table_ids.is_empty())
.extract_if(|sst_info| sst_info.table_ids.is_empty())
.for_each(|sst_info| {
sub_level.total_file_size -= sst_info.file_size;
sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size;
Expand Down Expand Up @@ -363,7 +363,7 @@ impl HummockVersionUpdateExt for HummockVersion {
});
level
.table_infos
.drain_filter(|sst_info| sst_info.table_ids.is_empty())
.extract_if(|sst_info| sst_info.table_ids.is_empty())
.for_each(|sst_info| {
level.total_file_size -= sst_info.file_size;
level.uncompressed_file_size -= sst_info.uncompressed_file_size;
Expand Down Expand Up @@ -407,7 +407,7 @@ impl HummockVersionUpdateExt for HummockVersion {
.expect("compaction group should exist");
let mut moving_tables = levels
.member_table_ids
.drain_filter(|t| group_change.table_ids.contains(t))
.extract_if(|t| group_change.table_ids.contains(t))
.collect_vec();
self.levels
.get_mut(compaction_group_id)
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.

#![feature(async_closure)]
#![feature(drain_filter)]
#![feature(hash_drain_filter)]
#![feature(extract_if)]
#![feature(hash_extract_if)]
#![feature(lint_reasons)]
#![feature(map_many_mut)]
#![feature(bound_map)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl HummockEventHandler {
);
self.uploader.clear();

for (epoch, result_sender) in self.pending_sync_requests.drain_filter(|_, _| true) {
for (epoch, result_sender) in self.pending_sync_requests.extract_if(|_, _| true) {
send_sync_result(
result_sender,
Err(HummockError::other(format!(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/iterator/merge_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ where

self.heap = self
.unused_iters
.drain_filter(|i| i.iter.is_valid())
.extract_if(|i| i.iter.is_valid())
.collect();
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#![feature(bound_as_ref)]
#![feature(bound_map)]
#![feature(custom_test_frameworks)]
#![feature(drain_filter)]
#![feature(extract_if)]
#![feature(generators)]
#![feature(hash_drain_filter)]
#![feature(hash_extract_if)]
#![feature(lint_reasons)]
#![feature(proc_macro_hygiene)]
#![feature(result_option_inspect)]
Expand All @@ -33,7 +33,7 @@
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]
#![feature(assert_matches)]
#![feature(is_sorted)]
#![feature(btree_drain_filter)]
#![feature(btree_extract_if)]
#![feature(exact_size_is_empty)]
#![feature(lazy_cell)]
#![cfg_attr(coverage, feature(no_coverage))]
Expand Down
14 changes: 10 additions & 4 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;
use std::ops::Bound;
use std::sync::Arc;

Expand Down Expand Up @@ -87,7 +88,7 @@ pub type MonitoredStateStoreIterStream<'s, S: StateStoreIterItemStream + 's> =
// `'a`. If we simply use `impl StateStoreIterItemStream + 's`, the rust compiler will also capture
// the lifetime `'a` in the scope defined in the scope.
impl<S> MonitoredStateStore<S> {
async fn monitored_iter<'a, 's, St: StateStoreIterItemStream + 's>(
async fn monitored_iter<'a, 's: 'a, St: StateStoreIterItemStream + 's>(
&'a self,
table_id: TableId,
iter_stream_future: impl Future<Output = StorageResult<St>> + 'a,
Expand Down Expand Up @@ -122,6 +123,7 @@ impl<S> MonitoredStateStore<S> {
storage_metrics: self.storage_metrics.clone(),
table_id,
},
_phantom: Default::default(),
};
Ok(monitored.into_stream())
}
Expand Down Expand Up @@ -359,9 +361,10 @@ impl MonitoredStateStore<HummockStorage> {
}

/// A state store iterator wrapper for monitoring metrics.
pub struct MonitoredStateStoreIter<S> {
pub struct MonitoredStateStoreIter<'s, S: 's> {
inner: S,
stats: MonitoredStateStoreIterStats,
_phantom: PhantomData<&'s ()>,
}

struct MonitoredStateStoreIterStats {
Expand All @@ -373,7 +376,10 @@ struct MonitoredStateStoreIterStats {
table_id: TableId,
}

impl<S: StateStoreIterItemStream> MonitoredStateStoreIter<S> {
impl<'s, S: StateStoreIterItemStream> MonitoredStateStoreIter<'s, S>
where
S: 's,
{
#[try_stream(ok = StateStoreIterItem, error = StorageError)]
async fn into_stream_inner(self) {
let inner = self.inner;
Expand All @@ -392,7 +398,7 @@ impl<S: StateStoreIterItemStream> MonitoredStateStoreIter<S> {
drop(stats);
}

fn into_stream(self) -> impl StateStoreIterItemStream {
fn into_stream(self) -> MonitoredStateStoreIterStream<'s, S> {
Self::into_stream_inner(self).instrument(tracing::trace_span!("store_iter"))
}
}
Expand Down
Loading

2 comments on commit 9d3e587

@xxchan
Copy link
Member

@xxchan xxchan commented on 9d3e587 Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TAIT regression is in this commit

searched nightlies: from nightly-2023-06-16 to nightly-2023-07-08
regressed nightly: nightly-2023-06-18
searched commit range: rust-lang/rust@6bba061...3b2073f
regressed commit: rust-lang/rust@0cc541e

bisected with cargo-bisect-rustc v0.6.6

Host triple: aarch64-apple-darwin
Reproduce with:

cargo bisect-rustc --start 2023-06-16 --end 2023-07-08 -- build -p risingwave_storage 

error: concrete type differs from previous defining opaque type use
   --> src/storage/src/table/batch_table/storage_table.rs:504:9
    |
504 |         Ok(iter)
    |         ^^^^^^^^ expected `StorageTableInnerIter<S, SD>`, got `impl Stream<Item = std::result::Result<(Vec<u8>, OwnedRow), error::StorageError>> + 'static`
    |
note: previous use here
   --> src/storage/src/table/batch_table/storage_table.rs:412:54
    |
412 |       ) -> StorageResult<StorageTableInnerIter<S, SD>> {
    |  ______________________________________________________^
413 | |         let cache_policy = match (
414 | |             encoded_key_range.start_bound(),
415 | |             encoded_key_range.end_bound(),
...   |
504 | |         Ok(iter)
505 | |     }
    | |_____^

warning: `risingwave_storage` (lib) generated 1 warning

@xxchan
Copy link
Member

@xxchan xxchan commented on 9d3e587 Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.