From c583e2c6c054764249acf484438c7bf7197765f4 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 31 Oct 2023 12:16:40 +0800 Subject: [PATCH] chore: cargo +nightly fmt (#13162) --- src/batch/src/executor/project_set.rs | 18 +- src/batch/src/executor/top_n.rs | 4 +- src/common/src/cache.rs | 8 +- src/common/src/types/interval.rs | 8 +- src/common/src/util/column_index_mapping.rs | 6 +- src/compute/tests/cdc_tests.rs | 6 +- src/connector/src/parser/mod.rs | 19 +- src/connector/src/parser/mysql.rs | 4 +- src/connector/src/parser/unified/debezium.rs | 62 +++--- src/connector/src/parser/unified/upsert.rs | 4 +- src/connector/src/parser/upsert_parser.rs | 4 +- src/connector/src/sink/encoder/template.rs | 11 +- src/connector/src/sink/log_store.rs | 4 +- src/connector/src/source/external.rs | 4 +- .../src/source/google_pubsub/source/reader.rs | 4 +- .../src/source/kafka/private_link.rs | 8 +- src/ctl/src/cmd_impl/meta/reschedule.rs | 4 +- src/ctl/src/cmd_impl/meta/serving.rs | 4 +- src/ctl/src/cmd_impl/scale/resize.rs | 4 +- src/expr/core/src/expr/build.rs | 4 +- src/expr/core/src/expr/expr_coalesce.rs | 4 +- .../impl/src/aggregate/percentile_cont.rs | 29 +-- .../impl/src/aggregate/percentile_disc.rs | 22 ++- src/expr/impl/src/scalar/to_timestamp.rs | 4 +- src/expr/macro/src/gen.rs | 28 ++- src/expr/macro/src/parse.rs | 4 +- src/frontend/src/binder/expr/column.rs | 6 +- src/frontend/src/binder/expr/function.rs | 19 +- src/frontend/src/binder/relation/join.rs | 4 +- src/frontend/src/binder/relation/mod.rs | 21 +- .../src/binder/relation/table_or_source.rs | 8 +- src/frontend/src/binder/select.rs | 90 ++++++--- src/frontend/src/expr/function_call.rs | 8 +- src/frontend/src/expr/mod.rs | 16 +- src/frontend/src/handler/alter_user.rs | 4 +- src/frontend/src/handler/create_user.rs | 4 +- src/frontend/src/handler/extended_handle.rs | 8 +- .../plan_expr_rewriter/cse_rewriter.rs | 4 +- .../optimizer/plan_node/generic/hop_window.rs | 4 +- .../src/optimizer/plan_node/logical_join.rs | 5 +- .../plan_node/logical_over_window.rs | 185 ++++++++---------- .../src/optimizer/plan_node/logical_source.rs | 4 +- .../optimizer/plan_node/stream_group_topn.rs | 4 +- .../optimizer/plan_node/stream_hash_join.rs | 4 +- .../src/optimizer/rule/except_merge_rule.rs | 4 +- .../optimizer/rule/index_delta_join_rule.rs | 7 +- .../optimizer/rule/intersect_merge_rule.rs | 4 +- .../rule/over_window_to_topn_rule.rs | 4 +- .../src/optimizer/rule/union_merge_rule.rs | 4 +- src/frontend/src/planner/query.rs | 4 +- src/frontend/src/planner/select.rs | 8 +- .../src/scheduler/distributed/stage.rs | 5 +- src/frontend/src/scheduler/local.rs | 13 +- src/frontend/src/utils/condition.rs | 4 +- .../src/utils/stream_graph_formatter.rs | 71 +++---- .../picker/space_reclaim_compaction_picker.rs | 8 +- src/meta/src/hummock/manager/mod.rs | 2 +- src/meta/src/manager/catalog/fragment.rs | 13 +- src/meta/src/manager/catalog/mod.rs | 3 +- src/meta/src/rpc/ddl_controller.rs | 8 +- src/meta/src/rpc/election/etcd.rs | 4 +- src/meta/src/stream/stream_manager.rs | 6 +- src/object_store/src/object/mem.rs | 4 +- .../opendal_engine/opendal_object_store.rs | 4 +- src/object_store/src/object/s3.rs | 4 +- .../src/config/provide_expander.rs | 4 +- src/risedevtool/src/preflight_check.rs | 5 +- src/risedevtool/src/task/compactor_service.rs | 4 +- .../src/task/compute_node_service.rs | 4 +- src/risedevtool/src/task/frontend_service.rs | 4 +- src/risedevtool/src/task/meta_node_service.rs | 4 +- src/rpc_client/src/meta_client.rs | 3 +- src/sqlparser/src/tokenizer.rs | 8 +- .../src/hummock/compactor/compactor_runner.rs | 16 +- .../src/hummock/event_handler/uploader.rs | 4 +- .../hummock/iterator/delete_range_iterator.rs | 5 +- src/storage/src/hummock/mod.rs | 10 +- .../src/hummock/sstable/multi_builder.rs | 17 +- src/storage/src/hummock/sstable_store.rs | 4 +- src/storage/src/memory.rs | 4 +- .../src/common/log_store_impl/in_mem.rs | 4 +- .../log_store_impl/kv_log_store/buffer.rs | 8 +- .../log_store_impl/kv_log_store/serde.rs | 4 +- src/stream/src/common/table/state_table.rs | 30 ++- .../executor/backfill/arrangement_backfill.rs | 18 +- .../src/executor/backfill/cdc_backfill.rs | 52 +++-- src/stream/src/executor/hash_agg.rs | 11 +- src/stream/src/executor/hash_join.rs | 3 +- .../src/executor/over_window/general.rs | 4 +- src/stream/src/executor/project_set.rs | 18 +- .../executor/source/state_table_handler.rs | 7 +- src/stream/src/executor/temporal_join.rs | 6 +- src/stream/src/executor/top_n/top_n_cache.rs | 42 ++-- src/stream/src/executor/top_n/top_n_state.rs | 8 +- src/stream/src/executor/watermark_filter.rs | 9 +- .../src/executor/wrapper/epoch_check.rs | 4 +- .../src/from_proto/source/trad_source.rs | 17 +- src/stream/src/task/stream_manager.rs | 4 +- src/tests/regress/src/schedule.rs | 12 +- src/tests/sqlsmith/src/runner.rs | 14 +- src/tests/sqlsmith/tests/frontend/mod.rs | 16 +- src/utils/runtime/src/logger.rs | 4 +- src/utils/runtime/src/panic_hook.rs | 4 +- 103 files changed, 793 insertions(+), 465 deletions(-) diff --git a/src/batch/src/executor/project_set.rs b/src/batch/src/executor/project_set.rs index fa3dfac917e8..1df7f9e246d7 100644 --- a/src/batch/src/executor/project_set.rs +++ b/src/batch/src/executor/project_set.rs @@ -92,11 +92,15 @@ impl ProjectSetExecutor { // for each column for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) { *value = match item { - Either::Left(state) => if let Some((i, value)) = state.peek() && i == row_idx { - valid = true; - value - } else { - None + Either::Left(state) => { + if let Some((i, value)) = state.peek() + && i == row_idx + { + valid = true; + value + } else { + None + } } Either::Right(array) => array.value_at(row_idx), }; @@ -110,7 +114,9 @@ impl ProjectSetExecutor { } // move to the next row for item in &mut results { - if let Either::Left(state) = item && matches!(state.peek(), Some((i, _)) if i == row_idx) { + if let Either::Left(state) = item + && matches!(state.peek(), Some((i, _)) if i == row_idx) + { state.next().await?; } } diff --git a/src/batch/src/executor/top_n.rs b/src/batch/src/executor/top_n.rs index cffbae855de6..b43f1bc07c24 100644 --- a/src/batch/src/executor/top_n.rs +++ b/src/batch/src/executor/top_n.rs @@ -180,7 +180,9 @@ impl TopNHeap { let mut ties_with_peek = vec![]; // pop all the ties with peek ties_with_peek.push(self.heap.pop().unwrap()); - while let Some(e) = self.heap.peek() && e.encoded_row == peek.encoded_row { + while let Some(e) = self.heap.peek() + && e.encoded_row == peek.encoded_row + { ties_with_peek.push(self.heap.pop().unwrap()); } self.heap.push(elem); diff --git a/src/common/src/cache.rs b/src/common/src/cache.rs index 5f80592fed27..f6af1ec60c0d 100644 --- a/src/common/src/cache.rs +++ b/src/common/src/cache.rs @@ -757,7 +757,9 @@ impl LruCache { shard.release(handle) }; // do not deallocate data with holding mutex. - if let Some((key, value)) = data && let Some(listener) = &self.listener { + if let Some((key, value)) = data + && let Some(listener) = &self.listener + { listener.on_release(key, value); } } @@ -819,7 +821,9 @@ impl LruCache { shard.erase(hash, key) }; // do not deallocate data with holding mutex. - if let Some((key, value)) = data && let Some(listener) = &self.listener { + if let Some((key, value)) = data + && let Some(listener) = &self.listener + { listener.on_release(key, value); } } diff --git a/src/common/src/types/interval.rs b/src/common/src/types/interval.rs index aca4d090bcac..c921905d8d9f 100644 --- a/src/common/src/types/interval.rs +++ b/src/common/src/types/interval.rs @@ -1386,7 +1386,9 @@ impl Interval { fn parse_postgres(s: &str) -> Result { use DateTimeField::*; let mut tokens = parse_interval(s)?; - if tokens.len() % 2 != 0 && let Some(TimeStrToken::Num(_)) = tokens.last() { + if tokens.len() % 2 != 0 + && let Some(TimeStrToken::Num(_)) = tokens.last() + { tokens.push(TimeStrToken::TimeUnit(DateTimeField::Second)); } if tokens.len() % 2 != 0 { @@ -1394,7 +1396,9 @@ impl Interval { } let mut token_iter = tokens.into_iter(); let mut result = Interval::from_month_day_usec(0, 0, 0); - while let Some(num) = token_iter.next() && let Some(interval_unit) = token_iter.next() { + while let Some(num) = token_iter.next() + && let Some(interval_unit) = token_iter.next() + { match (num, interval_unit) { (TimeStrToken::Num(num), TimeStrToken::TimeUnit(interval_unit)) => { result = (|| match interval_unit { diff --git a/src/common/src/util/column_index_mapping.rs b/src/common/src/util/column_index_mapping.rs index 2c12dc47efb1..212c07df1e28 100644 --- a/src/common/src/util/column_index_mapping.rs +++ b/src/common/src/util/column_index_mapping.rs @@ -67,8 +67,10 @@ impl ColIndexMapping { return false; } for (src, tar) in self.map.iter().enumerate() { - if let Some(tar_value) = tar && src == *tar_value { - continue + if let Some(tar_value) = tar + && src == *tar_value + { + continue; } else { return false; } diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 6a50b8410bbd..fff56a17d911 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -323,9 +323,11 @@ async fn consume_message_stream(mut stream: BoxedMessageStream) -> StreamResult< println!("[mv] chunk: {:#?}", c); } Message::Barrier(b) => { - if let Some(m) = b.mutation && matches!(*m, Mutation::Stop(_)) { + if let Some(m) = b.mutation + && matches!(*m, Mutation::Stop(_)) + { println!("encounter stop barrier"); - break + break; } } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index c7b8bf702e1c..bdbb110daf7f 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -169,12 +169,19 @@ impl MessageMeta<'_> { SourceColumnType::Offset => Datum::Some(self.offset.into()).into(), // Extract custom meta data per connector. SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => { - assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); - kafka_meta.timestamp.map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .to_scalar_value() - }).into() + assert_eq!( + desc.name.as_str(), + KAFKA_TIMESTAMP_COLUMN_NAME, + "unexpected meta column name" + ); + kafka_meta + .timestamp + .map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .to_scalar_value() + }) + .into() } // For other cases, return `None`. diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 58be305a6911..0a0f8f52e90b 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -143,7 +143,9 @@ mod tests { }); pin_mut!(row_stream); while let Some(row) = row_stream.next().await { - if let Ok(ro) = row && ro.is_some() { + if let Ok(ro) = row + && ro.is_some() + { let owned_row = ro.unwrap(); let d = owned_row.datum_at(2); if let Some(scalar) = d { diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index e16df28aebdf..e392e31e3644 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -145,42 +145,36 @@ pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> anyh .get("_id") .ok_or_else(|| anyhow::format_err!("Debezuim Mongo requires document has a `_id` field"))?; let id: Datum = match id_type { - DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(), - DataType::Varchar => match id_field { - serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())), - serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8( - obj["$oid"].as_str().to_owned().unwrap_or_default().into(), - )), - _ => anyhow::bail!( - "Can not convert bson {:?} to {:?}", - id_field, id_type - ), - }, - DataType::Int32 => { - if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberInt") { - let int_str = obj["$numberInt"].as_str().unwrap_or_default(); - Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default())) - } else { - anyhow::bail!( - "Can not convert bson {:?} to {:?}", - id_field, id_type - ) + DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(), + DataType::Varchar => match id_field { + serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())), + serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8( + obj["$oid"].as_str().to_owned().unwrap_or_default().into(), + )), + _ => anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type), + }, + DataType::Int32 => { + if let serde_json::Value::Object(ref obj) = id_field + && obj.contains_key("$numberInt") + { + let int_str = obj["$numberInt"].as_str().unwrap_or_default(); + Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default())) + } else { + anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type) + } } - } - DataType::Int64 => { - if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberLong") - { - let int_str = obj["$numberLong"].as_str().unwrap_or_default(); - Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default())) - } else { - anyhow::bail!( - "Can not convert bson {:?} to {:?}", - id_field, id_type - ) + DataType::Int64 => { + if let serde_json::Value::Object(ref obj) = id_field + && obj.contains_key("$numberLong") + { + let int_str = obj["$numberLong"].as_str().unwrap_or_default(); + Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default())) + } else { + anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type) + } } - } - _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."), -}; + _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."), + }; Ok(id) } impl MongoProjection { diff --git a/src/connector/src/parser/unified/upsert.rs b/src/connector/src/parser/unified/upsert.rs index a0be4f050b9c..2697d4bdf815 100644 --- a/src/connector/src/parser/unified/upsert.rs +++ b/src/connector/src/parser/unified/upsert.rs @@ -114,7 +114,9 @@ where other => return other, }; - if let Some(key_as_column_name) = &self.key_as_column_name && name == key_as_column_name { + if let Some(key_as_column_name) = &self.key_as_column_name + && name == key_as_column_name + { return self.access(&["key"], Some(type_expected)); } diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index f9ce0caa7e25..71210b9e4b8f 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -102,7 +102,9 @@ impl UpsertParser { row_op = row_op.with_key(self.key_builder.generate_accessor(data).await?); } // Empty payload of kafka is Some(vec![]) - if let Some(data) = payload && !data.is_empty() { + if let Some(data) = payload + && !data.is_empty() + { row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?); change_event_op = ChangeEventOperation::Upsert; } diff --git a/src/connector/src/sink/encoder/template.rs b/src/connector/src/sink/encoder/template.rs index 97d8271f9e83..1f70836ab453 100644 --- a/src/connector/src/sink/encoder/template.rs +++ b/src/connector/src/sink/encoder/template.rs @@ -50,9 +50,14 @@ impl TemplateEncoder { )); } for capture in re.captures_iter(format) { - if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){ - return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str()))) - } + if let Some(inner_content) = capture.get(1) + && !set.contains(inner_content.as_str()) + { + return Err(SinkError::Redis(format!( + "Can't find field({:?}) in key_format or value_format", + inner_content.as_str() + ))); + } } Ok(()) } diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index f7d99141139f..49e28bce2f79 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -400,7 +400,9 @@ impl<'a, F: TryFuture + Unpin + 'static> DeliveryFutureManagerAddFuture pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> { for (_, item) in &mut self.0.items { - if let DeliveryFutureManagerItem::Chunk {futures, ..} = item && let Some(mut delivery_future) = futures.pop_front() { + if let DeliveryFutureManagerItem::Chunk { futures, .. } = item + && let Some(mut delivery_future) = futures.pop_front() + { self.0.future_count -= 1; return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await; } else { diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index 9eff3991a4d4..953277ba3610 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -286,7 +286,9 @@ impl ExternalTableReader for MySqlExternalTableReader { impl MySqlExternalTableReader { pub fn new(properties: HashMap, rw_schema: Schema) -> ConnectorResult { - if let Some(field) = rw_schema.fields.last() && field.name.as_str() != OFFSET_COLUMN_NAME { + if let Some(field) = rw_schema.fields.last() + && field.name.as_str() != OFFSET_COLUMN_NAME + { return Err(ConnectorError::Config(anyhow!( "last column of schema must be `_rw_offset`" ))); diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index dfe95eeb1b80..8241e1657c49 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -96,7 +96,9 @@ impl CommonSplitReader for PubsubSplitReader { yield chunk; // Stop if we've approached the stop_offset - if let Some(stop_offset) = self.stop_offset && latest_offset >= stop_offset { + if let Some(stop_offset) = self.stop_offset + && latest_offset >= stop_offset + { return Ok(()); } } diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 573e14c3e073..5a6688a4cf8e 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -120,7 +120,9 @@ impl PrivateLinkConsumerContext { impl ClientContext for PrivateLinkConsumerContext { /// this func serves as a callback when `poll` is completed. fn stats(&self, statistics: Statistics) { - if let Some(metrics) = &self.metrics && let Some(id) = &self.identifier { + if let Some(metrics) = &self.metrics + && let Some(id) = &self.identifier + { metrics.report(id.as_str(), &statistics); } } @@ -160,7 +162,9 @@ impl PrivateLinkProducerContext { impl ClientContext for PrivateLinkProducerContext { fn stats(&self, statistics: Statistics) { - if let Some(metrics) = &self.metrics && let Some(id) = &self.identifier { + if let Some(metrics) = &self.metrics + && let Some(id) = &self.identifier + { metrics.report(id.as_str(), &statistics); } } diff --git a/src/ctl/src/cmd_impl/meta/reschedule.rs b/src/ctl/src/cmd_impl/meta/reschedule.rs index 6d7765e7b1a2..737d5c2e88b0 100644 --- a/src/ctl/src/cmd_impl/meta/reschedule.rs +++ b/src/ctl/src/cmd_impl/meta/reschedule.rs @@ -273,7 +273,9 @@ pub async fn unregister_workers( .ok() .or_else(|| worker_index_by_host.get(&worker).cloned()); - if let Some(worker_id) = worker_id && worker_ids.contains(&worker_id) { + if let Some(worker_id) = worker_id + && worker_ids.contains(&worker_id) + { if !target_worker_ids.insert(worker_id) { println!("Warn: {} and {} are the same worker", worker, worker_id); } diff --git a/src/ctl/src/cmd_impl/meta/serving.rs b/src/ctl/src/cmd_impl/meta/serving.rs index 867317c0915b..cff3c6f91128 100644 --- a/src/ctl/src/cmd_impl/meta/serving.rs +++ b/src/ctl/src/cmd_impl/meta/serving.rs @@ -82,7 +82,9 @@ pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Res ) .into(), ); - if let Some(w) = worker && let Some(addr) = w.host.as_ref() { + if let Some(w) = worker + && let Some(addr) = w.host.as_ref() + { row.add_cell(format!("id: {}; {}:{}", w.id, addr.host, addr.port).into()); } else { row.add_cell("".into()); diff --git a/src/ctl/src/cmd_impl/scale/resize.rs b/src/ctl/src/cmd_impl/scale/resize.rs index 786d0fa4c83b..59c2280d1787 100644 --- a/src/ctl/src/cmd_impl/scale/resize.rs +++ b/src/ctl/src/cmd_impl/scale/resize.rs @@ -393,7 +393,9 @@ pub async fn update_schedulability( .ok() .or_else(|| worker_index_by_host.get(&worker).cloned()); - if let Some(worker_id) = worker_id && worker_ids.contains(&worker_id){ + if let Some(worker_id) = worker_id + && worker_ids.contains(&worker_id) + { if !target_worker_ids.insert(worker_id) { println!("Warn: {} and {} are the same worker", worker, worker_id); } diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index 7dffbcd42d66..f0fd3397c4fa 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -349,7 +349,9 @@ pub(crate) fn lexer(input: &str) -> Vec { ':' => Token::Colon, '$' => { let mut number = String::new(); - while let Some(c) = chars.peek() && c.is_ascii_digit() { + while let Some(c) = chars.peek() + && c.is_ascii_digit() + { number.push(chars.next().unwrap()); } let index = number.parse::().expect("Invalid number"); diff --git a/src/expr/core/src/expr/expr_coalesce.rs b/src/expr/core/src/expr/expr_coalesce.rs index 71c7392c7ec3..b7916f414136 100644 --- a/src/expr/core/src/expr/expr_coalesce.rs +++ b/src/expr/core/src/expr/expr_coalesce.rs @@ -56,7 +56,9 @@ impl Expression for CoalesceExpression { } let mut builder = self.return_type.create_array_builder(len); for (i, sel) in selection.iter().enumerate() { - if init_vis.is_set(i) && let Some(child_idx) = sel { + if init_vis.is_set(i) + && let Some(child_idx) = sel + { builder.append(children_array[*child_idx].value_at(i)); } else { builder.append_null() diff --git a/src/expr/impl/src/aggregate/percentile_cont.rs b/src/expr/impl/src/aggregate/percentile_cont.rs index 46002d1f596f..eeaa257627cd 100644 --- a/src/expr/impl/src/aggregate/percentile_cont.rs +++ b/src/expr/impl/src/aggregate/percentile_cont.rs @@ -118,19 +118,22 @@ impl AggregateFunction for PercentileCont { async fn get_result(&self, state: &AggregateState) -> Result { let state = &state.downcast_ref::().0; - Ok(if let Some(fraction) = self.fraction && !state.is_empty() { - let rn = fraction * (state.len() - 1) as f64; - let crn = f64::ceil(rn); - let frn = f64::floor(rn); - let result = if crn == frn { - state[crn as usize] + Ok( + if let Some(fraction) = self.fraction + && !state.is_empty() + { + let rn = fraction * (state.len() - 1) as f64; + let crn = f64::ceil(rn); + let frn = f64::floor(rn); + let result = if crn == frn { + state[crn as usize] + } else { + (crn - rn) * state[frn as usize] + (rn - frn) * state[crn as usize] + }; + Some(result.into()) } else { - (crn - rn) * state[frn as usize] - + (rn - frn) * state[crn as usize] - }; - Some(result.into()) - } else { - None - }) + None + }, + ) } } diff --git a/src/expr/impl/src/aggregate/percentile_disc.rs b/src/expr/impl/src/aggregate/percentile_disc.rs index c9143dcf8e64..80ebbfd24e54 100644 --- a/src/expr/impl/src/aggregate/percentile_disc.rs +++ b/src/expr/impl/src/aggregate/percentile_disc.rs @@ -143,15 +143,19 @@ impl AggregateFunction for PercentileDisc { async fn get_result(&self, state: &AggregateState) -> Result { let state = &state.downcast_ref::().0; - Ok(if let Some(fractions) = self.fractions && !state.is_empty() { - let idx = if fractions == 0.0 { - 0 + Ok( + if let Some(fractions) = self.fractions + && !state.is_empty() + { + let idx = if fractions == 0.0 { + 0 + } else { + f64::ceil(fractions * state.len() as f64) as usize - 1 + }; + Some(state[idx].clone()) } else { - f64::ceil(fractions * state.len() as f64) as usize - 1 - }; - Some(state[idx].clone()) - } else { - None - }) + None + }, + ) } } diff --git a/src/expr/impl/src/scalar/to_timestamp.rs b/src/expr/impl/src/scalar/to_timestamp.rs index bc93720373c7..e4ef9edc235e 100644 --- a/src/expr/impl/src/scalar/to_timestamp.rs +++ b/src/expr/impl/src/scalar/to_timestamp.rs @@ -108,7 +108,9 @@ fn build_dummy(_return_type: DataType, _children: Vec) -> Resul )] pub fn to_date(s: &str, tmpl: &ChronoPattern) -> Result { let mut parsed = parse(s, tmpl)?; - if let Some(year) = &mut parsed.year && *year < 0 { + if let Some(year) = &mut parsed.year + && *year < 0 + { *year += 1; } Ok(parsed.to_naive_date()?.into()) diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 454d2a316913..5056d5dc24d1 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -609,14 +609,18 @@ impl FunctionAttr { .collect_vec(); let downcast_state = if custom_state.is_some() { quote! { let mut state: &mut #state_type = state0.downcast_mut(); } - } else if let Some(s) = &self.state && s == "ref" { + } else if let Some(s) = &self.state + && s == "ref" + { quote! { let mut state: Option<#state_type> = state0.as_datum_mut().as_ref().map(|x| x.as_scalar_ref_impl().try_into().unwrap()); } } else { quote! { let mut state: Option<#state_type> = state0.as_datum_mut().take().map(|s| s.try_into().unwrap()); } }; let restore_state = if custom_state.is_some() { quote! {} - } else if let Some(s) = &self.state && s == "ref" { + } else if let Some(s) = &self.state + && s == "ref" + { quote! { *state0.as_datum_mut() = state.map(|x| x.to_owned_scalar().into()); } } else { quote! { *state0.as_datum_mut() = state.map(|s| s.into()); } @@ -694,10 +698,14 @@ impl FunctionAttr { let first_state = if self.init_state.is_some() { // for count, the state will never be None quote! { unreachable!() } - } else if let Some(s) = &self.state && s == "ref" { + } else if let Some(s) = &self.state + && s == "ref" + { // for min/max/first/last, the state is the first value quote! { Some(v0) } - } else if let AggregateFnOrImpl::Impl(impl_) = user_fn && impl_.create_state.is_some() { + } else if let AggregateFnOrImpl::Impl(impl_) = user_fn + && impl_.create_state.is_some() + { // use user-defined create_state function quote! {{ let state = self.function.create_state(); @@ -727,7 +735,9 @@ impl FunctionAttr { }; let get_result = if custom_state.is_some() { quote! { Ok(Some(state.downcast_ref::<#state_type>().into())) } - } else if let AggregateFnOrImpl::Impl(impl_) = user_fn && impl_.finalize.is_some() { + } else if let AggregateFnOrImpl::Impl(impl_) = user_fn + && impl_.finalize.is_some() + { quote! { let state = match state.as_datum() { Some(s) => s.as_scalar_ref_impl().try_into().unwrap(), @@ -1109,8 +1119,12 @@ fn data_type(ty: &str) -> TokenStream2 { /// output_types("struct") -> ["varchar", "jsonb"] /// ``` fn output_types(ty: &str) -> Vec<&str> { - if let Some(s) = ty.strip_prefix("struct<") && let Some(args) = s.strip_suffix('>') { - args.split(',').map(|s| s.split_whitespace().nth(1).unwrap()).collect() + if let Some(s) = ty.strip_prefix("struct<") + && let Some(args) = s.strip_suffix('>') + { + args.split(',') + .map(|s| s.split_whitespace().nth(1).unwrap()) + .collect() } else { vec![ty] } diff --git a/src/expr/macro/src/parse.rs b/src/expr/macro/src/parse.rs index 8e2e8c6d0b2f..fc9e4d45437e 100644 --- a/src/expr/macro/src/parse.rs +++ b/src/expr/macro/src/parse.rs @@ -314,7 +314,9 @@ fn strip_iterator(ty: &syn::Type) -> Option<&syn::Type> { return None; }; for arg in &angle_bracketed.args { - if let syn::GenericArgument::AssocType(b) = arg && b.ident == "Item" { + if let syn::GenericArgument::AssocType(b) = arg + && b.ident == "Item" + { return Some(&b.ty); } } diff --git a/src/frontend/src/binder/expr/column.rs b/src/frontend/src/binder/expr/column.rs index 41ca86919def..4c1a426950b6 100644 --- a/src/frontend/src/binder/expr/column.rs +++ b/src/frontend/src/binder/expr/column.rs @@ -148,8 +148,10 @@ impl Binder { // FIXME: The type of `CTID` should be `tid`. // FIXME: The `CTID` column should be unique, so literal may break something. // FIXME: At least we should add a notice here. - if let ErrorCode::ItemNotFound(_) = err && column_name == "ctid" { - return Ok(Literal::new(Some("".into()), DataType::Varchar).into()) + if let ErrorCode::ItemNotFound(_) = err + && column_name == "ctid" + { + return Ok(Literal::new(Some("".into()), DataType::Varchar).into()); } Err(err.into()) } diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index a32e07f72ce5..145b11581be9 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -93,7 +93,9 @@ impl Binder { }; // agg calls - if f.over.is_none() && let Ok(kind) = function_name.parse() { + if f.over.is_none() + && let Ok(kind) = function_name.parse() + { return self.bind_agg(f, kind); } @@ -154,11 +156,12 @@ impl Binder { // user defined function // TODO: resolve schema name https://github.com/risingwavelabs/risingwave/issues/12422 - if let Ok(schema) = self.first_valid_schema() && - let Some(func) = schema.get_function_by_name_args( + if let Ok(schema) = self.first_valid_schema() + && let Some(func) = schema.get_function_by_name_args( &function_name, &inputs.iter().map(|arg| arg.return_type()).collect_vec(), - ) { + ) + { use crate::catalog::function_catalog::FunctionKind::*; match &func.kind { Scalar { .. } => return Ok(UserDefinedFunction::new(func.clone(), inputs).into()), @@ -360,8 +363,12 @@ impl Binder { // check signature and do implicit cast match (kind, direct_args.as_mut_slice(), args.as_mut_slice()) { (AggKind::PercentileCont | AggKind::PercentileDisc, [fraction], [arg]) => { - if fraction.cast_implicit_mut(DataType::Float64).is_ok() && let Ok(casted) = fraction.fold_const() { - if let Some(ref casted) = casted && !(0.0..=1.0).contains(&casted.as_float64().0) { + if fraction.cast_implicit_mut(DataType::Float64).is_ok() + && let Ok(casted) = fraction.fold_const() + { + if let Some(ref casted) = casted + && !(0.0..=1.0).contains(&casted.as_float64().0) + { return Err(ErrorCode::InvalidInputSyntax(format!( "direct arg in `{}` must between 0.0 and 1.0", kind diff --git a/src/frontend/src/binder/relation/join.rs b/src/frontend/src/binder/relation/join.rs index eb4ce96f9ab3..10c7cd2b646e 100644 --- a/src/frontend/src/binder/relation/join.rs +++ b/src/frontend/src/binder/relation/join.rs @@ -193,7 +193,9 @@ impl Binder { // TODO: is it ok to ignore quote style? // If we have a `USING` constraint, we only bind the columns appearing in the // constraint. - if let Some(cols) = &using_columns && !cols.contains(&column) { + if let Some(cols) = &using_columns + && !cols.contains(&column) + { continue; } let indices_l = match old_context.get_unqualified_indices(&column.real_value()) diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index a6a1a8d2b02f..856f221ec985 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -193,11 +193,13 @@ impl Binder { let schema_name = identifiers.pop().map(|ident| ident.real_value()); let database_name = identifiers.pop().map(|ident| ident.real_value()); - if let Some(database_name) = database_name && database_name != db_name { + if let Some(database_name) = database_name + && database_name != db_name + { return Err(ResolveQualifiedNameError::new( formatted_name, - ResolveQualifiedNameErrorKind::NotCurrentDatabase) - ); + ResolveQualifiedNameErrorKind::NotCurrentDatabase, + )); } Ok((schema_name, name)) @@ -330,7 +332,9 @@ impl Binder { for_system_time_as_of_proctime: bool, ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; - if schema_name.is_none() && let Some(item) = self.context.cte_to_relation.get(&table_name) { + if schema_name.is_none() + && let Some(item) = self.context.cte_to_relation.get(&table_name) + { // Handles CTE let (share_id, query, mut original_alias) = item.deref().clone(); @@ -341,9 +345,7 @@ impl Binder { original_alias.columns = original_alias .columns .into_iter() - .zip_longest( - from_alias.columns - ) + .zip_longest(from_alias.columns) .map(EitherOrBoth::into_right) .collect(); } @@ -360,7 +362,10 @@ impl Binder { )?; // Share the CTE. - let input_relation = Relation::Subquery(Box::new(BoundSubquery { query, lateral: false })); + let input_relation = Relation::Subquery(Box::new(BoundSubquery { + query, + lateral: false, + })); let share_relation = Relation::Share(Box::new(BoundShare { share_id, input: input_relation, diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index b05b5db42b30..cd2d2ef45efa 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -145,9 +145,11 @@ impl Binder { let user_name = &self.auth_context.user_name; for path in self.search_path.path() { - if is_system_schema(path) && - let Ok(sys_table_catalog) = - self.catalog.get_sys_table_by_name(&self.db_name, path, table_name) { + if is_system_schema(path) + && let Ok(sys_table_catalog) = + self.catalog + .get_sys_table_by_name(&self.db_name, path, table_name) + { return Ok(resolve_sys_table_relation(sys_table_catalog)); } else { let schema_name = if path == USER_NAME_WILD_CARD { diff --git a/src/frontend/src/binder/select.rs b/src/frontend/src/binder/select.rs index ac1a53e75f63..ceb7d55312f4 100644 --- a/src/frontend/src/binder/select.rs +++ b/src/frontend/src/binder/select.rs @@ -227,21 +227,50 @@ impl Binder { self.context.clause = Some(Clause::GroupBy); // Only support one grouping item in group by clause - let group_by = if select.group_by.len() == 1 && let Expr::GroupingSets(grouping_sets) = &select.group_by[0] { - GroupBy::GroupingSets(self.bind_grouping_items_expr_in_select(grouping_sets.clone(), &out_name_to_index, &select_items)?) - } else if select.group_by.len() == 1 && let Expr::Rollup(rollup) = &select.group_by[0] { - GroupBy::Rollup(self.bind_grouping_items_expr_in_select(rollup.clone(), &out_name_to_index, &select_items)?) - } else if select.group_by.len() == 1 && let Expr::Cube(cube) = &select.group_by[0] { - GroupBy::Cube(self.bind_grouping_items_expr_in_select(cube.clone(), &out_name_to_index, &select_items)?) + let group_by = if select.group_by.len() == 1 + && let Expr::GroupingSets(grouping_sets) = &select.group_by[0] + { + GroupBy::GroupingSets(self.bind_grouping_items_expr_in_select( + grouping_sets.clone(), + &out_name_to_index, + &select_items, + )?) + } else if select.group_by.len() == 1 + && let Expr::Rollup(rollup) = &select.group_by[0] + { + GroupBy::Rollup(self.bind_grouping_items_expr_in_select( + rollup.clone(), + &out_name_to_index, + &select_items, + )?) + } else if select.group_by.len() == 1 + && let Expr::Cube(cube) = &select.group_by[0] + { + GroupBy::Cube(self.bind_grouping_items_expr_in_select( + cube.clone(), + &out_name_to_index, + &select_items, + )?) } else { - if select.group_by.iter().any(|expr| matches!(expr, Expr::GroupingSets(_)) || matches!(expr, Expr::Rollup(_)) || matches!(expr, Expr::Cube(_))) { - return Err(ErrorCode::BindError("Only support one grouping item in group by clause".to_string()).into()); + if select.group_by.iter().any(|expr| { + matches!(expr, Expr::GroupingSets(_)) + || matches!(expr, Expr::Rollup(_)) + || matches!(expr, Expr::Cube(_)) + }) { + return Err(ErrorCode::BindError( + "Only support one grouping item in group by clause".to_string(), + ) + .into()); } - GroupBy::GroupKey(select - .group_by - .into_iter() - .map(|expr| self.bind_group_by_expr_in_select(expr, &out_name_to_index, &select_items)) - .try_collect()?) + GroupBy::GroupKey( + select + .group_by + .into_iter() + .map(|expr| { + self.bind_group_by_expr_in_select(expr, &out_name_to_index, &select_items) + }) + .try_collect()?, + ) }; self.context.clause = None; @@ -795,7 +824,9 @@ impl Binder { let mut bound_exprs = vec![]; for expr in exprs { let expr_impl = match expr { - Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => { + Expr::Identifier(name) + if let Some(index) = name_to_index.get(&name.real_value()) => + { match *index { usize::MAX => { return Err(ErrorCode::BindError(format!( @@ -809,24 +840,21 @@ impl Binder { } } } - Expr::Value(Value::Number(number)) => { - match number.parse::() { - Ok(index) if 1 <= index && index <= select_items.len() => { - let idx_from_0 = index - 1; - InputRef::new(idx_from_0, select_items[idx_from_0].return_type()).into() - } - _ => { - return Err(ErrorCode::InvalidInputSyntax(format!( - "Invalid ordinal number in DISTINCT ON: {}", - number - )) - .into()) - } + Expr::Value(Value::Number(number)) => match number.parse::() { + Ok(index) if 1 <= index && index <= select_items.len() => { + let idx_from_0 = index - 1; + InputRef::new(idx_from_0, select_items[idx_from_0].return_type()) + .into() } - } - expr => { - self.bind_expr(expr)? - } + _ => { + return Err(ErrorCode::InvalidInputSyntax(format!( + "Invalid ordinal number in DISTINCT ON: {}", + number + )) + .into()) + } + }, + expr => self.bind_expr(expr)?, }; bound_exprs.push(expr_impl); } diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs index f5e618892fc5..7f2c84c6cce7 100644 --- a/src/frontend/src/expr/function_call.rs +++ b/src/frontend/src/expr/function_call.rs @@ -111,12 +111,16 @@ impl FunctionCall { target: DataType, allows: CastContext, ) -> Result<(), CastError> { - if let ExprImpl::Parameter(expr) = child && !expr.has_infer() { + if let ExprImpl::Parameter(expr) = child + && !expr.has_infer() + { // Always Ok below. Safe to mutate `expr` (from `child`). expr.cast_infer_type(target); return Ok(()); } - if let ExprImpl::FunctionCall(func) = child && func.func_type == ExprType::Row { + if let ExprImpl::FunctionCall(func) = child + && func.func_type == ExprType::Row + { // Row function will have empty fields in Datatype::Struct at this point. Therefore, // we will need to take some special care to generate the cast types. For normal struct // types, they will be handled in `cast_ok`. diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 6eec2983f5c9..27e781a88690 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -822,13 +822,19 @@ impl ExprImpl { match expr_type { ExprType::Add | ExprType::Subtract => { let (_, lhs, rhs) = function_call.clone().decompose_as_binary(); - if let ExprImpl::InputRef(input_ref) = &lhs && rhs.is_const() { + if let ExprImpl::InputRef(input_ref) = &lhs + && rhs.is_const() + { // Currently we will return `None` for non-literal because the result of the expression might be '1 day'. However, there will definitely exist false positives such as '1 second + 1 second'. // We will treat the expression as an input offset when rhs is `null`. - if rhs.return_type() == DataType::Interval && rhs.as_literal().map_or(true, |literal| literal.get_data().as_ref().map_or(false, |scalar| { - let interval = scalar.as_interval(); - interval.months() != 0 || interval.days() != 0 - })) { + if rhs.return_type() == DataType::Interval + && rhs.as_literal().map_or(true, |literal| { + literal.get_data().as_ref().map_or(false, |scalar| { + let interval = scalar.as_interval(); + interval.months() != 0 || interval.days() != 0 + }) + }) + { None } else { Some((input_ref.index(), Some((expr_type, rhs)))) diff --git a/src/frontend/src/handler/alter_user.rs b/src/frontend/src/handler/alter_user.rs index 0d83c3ae867d..b72d8d32ae8b 100644 --- a/src/frontend/src/handler/alter_user.rs +++ b/src/frontend/src/handler/alter_user.rs @@ -102,7 +102,9 @@ fn alter_prost_user_info( } UserOption::Password(opt) => { // TODO: Behaviour of PostgreSQL: Notice when password is empty string. - if let Some(password) = opt && !password.0.is_empty() { + if let Some(password) = opt + && !password.0.is_empty() + { user_info.auth_info = encrypted_password(&user_info.name, &password.0); } else { user_info.auth_info = None; diff --git a/src/frontend/src/handler/create_user.rs b/src/frontend/src/handler/create_user.rs index 8659e1b647c3..0bac084db2c8 100644 --- a/src/frontend/src/handler/create_user.rs +++ b/src/frontend/src/handler/create_user.rs @@ -85,7 +85,9 @@ fn make_prost_user_info( } UserOption::Password(opt) => { // TODO: Behaviour of PostgreSQL: Notice when password is empty string. - if let Some(password) = opt && !password.0.is_empty() { + if let Some(password) = opt + && !password.0.is_empty() + { user_info.auth_info = encrypted_password(&user_info.name, &password.0); } } diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index bcd65782d59a..1c0f0d36f0cb 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -100,7 +100,9 @@ pub fn handle_parse( Ok(PrepareStatement::PureStatement(statement)) } Statement::CreateTable { query, .. } => { - if let Some(query) = query && have_parameter_in_query(query) { + if let Some(query) = query + && have_parameter_in_query(query) + { Err(ErrorCode::NotImplemented( "CREATE TABLE AS SELECT with parameters".to_string(), None.into(), @@ -111,7 +113,9 @@ pub fn handle_parse( } } Statement::CreateSink { stmt } => { - if let CreateSink::AsQuery(query) = &stmt.sink_from && have_parameter_in_query(query) { + if let CreateSink::AsQuery(query) = &stmt.sink_from + && have_parameter_in_query(query) + { Err(ErrorCode::NotImplemented( "CREATE SINK AS SELECT with parameters".to_string(), None.into(), diff --git a/src/frontend/src/optimizer/plan_expr_rewriter/cse_rewriter.rs b/src/frontend/src/optimizer/plan_expr_rewriter/cse_rewriter.rs index d027bf927369..bc59b0c89a8f 100644 --- a/src/frontend/src/optimizer/plan_expr_rewriter/cse_rewriter.rs +++ b/src/frontend/src/optimizer/plan_expr_rewriter/cse_rewriter.rs @@ -36,7 +36,9 @@ impl CseRewriter { impl ExprRewriter for CseRewriter { fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl { - if let Some(count) = self.expr_counter.counter.get(&func_call) && *count > 1 { + if let Some(count) = self.expr_counter.counter.get(&func_call) + && *count > 1 + { if let Some(expr) = self.cse_mapping.get(&func_call) { let expr: ExprImpl = ExprImpl::InputRef(expr.clone().into()); return expr; diff --git a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs index 9bd0dec4b70c..36095041b638 100644 --- a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs @@ -106,7 +106,9 @@ impl GenericPlanNode for HopWindow { internal2output.try_map(self.internal_window_end_col_idx()), ) }; - if let Some(start_idx) = start_idx_in_output && let Some(end_idx) = end_idx_in_output { + if let Some(start_idx) = start_idx_in_output + && let Some(end_idx) = end_idx_in_output + { fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]); fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]); } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index c594ededa40c..35a15a5d6479 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -977,7 +977,10 @@ impl LogicalJoin { // Use primary table. let mut result_plan = self.to_stream_temporal_join(predicate.clone(), ctx); // Return directly if this temporal join can match the pk of its right table. - if let Ok(temporal_join) = &result_plan && temporal_join.eq_join_predicate().eq_indexes().len() == logical_scan.primary_key().len() { + if let Ok(temporal_join) = &result_plan + && temporal_join.eq_join_predicate().eq_indexes().len() + == logical_scan.primary_key().len() + { return result_plan; } let indexes = logical_scan.indexes(); diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index a78a145ab199..6193c072563c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -115,45 +115,44 @@ impl<'a> LogicalOverWindowBuilder<'a> { match agg_kind { AggKind::Avg => { assert_eq!(args.len(), 1); - let left_ref = ExprImpl::from(self.push_window_func(WindowFunction::new( + let left_ref = ExprImpl::from(self.push_window_func(WindowFunction::new( WindowFuncKind::Aggregate(AggKind::Sum), partition_by.clone(), order_by.clone(), args.clone(), frame.clone(), - )?)).cast_explicit(return_type)?; + )?)) + .cast_explicit(return_type)?; let right_ref = ExprImpl::from(self.push_window_func(WindowFunction::new( - WindowFuncKind::Aggregate(AggKind::Count), - partition_by, - order_by, - args, - frame, - )?)); - - let new_expr = ExprImpl::from( - FunctionCall::new(ExprType::Divide, vec![left_ref, right_ref])?, - ); + WindowFuncKind::Aggregate(AggKind::Count), + partition_by, + order_by, + args, + frame, + )?)); + + let new_expr = ExprImpl::from(FunctionCall::new( + ExprType::Divide, + vec![left_ref, right_ref], + )?); Ok(new_expr) } - AggKind::StddevPop - | AggKind::StddevSamp - | AggKind::VarPop - | AggKind::VarSamp => { + AggKind::StddevPop | AggKind::StddevSamp | AggKind::VarPop | AggKind::VarSamp => { let input = args.first().unwrap(); - let squared_input_expr = ExprImpl::from( - FunctionCall::new( - ExprType::Multiply, - vec![input.clone(), input.clone()], - )?, - ); - - let sum_of_squares_expr = ExprImpl::from(self.push_window_func(WindowFunction::new( - WindowFuncKind::Aggregate(AggKind::Sum), - partition_by.clone(), - order_by.clone(), - vec![squared_input_expr], - frame.clone(), - )?)).cast_explicit(return_type.clone())?; + let squared_input_expr = ExprImpl::from(FunctionCall::new( + ExprType::Multiply, + vec![input.clone(), input.clone()], + )?); + + let sum_of_squares_expr = + ExprImpl::from(self.push_window_func(WindowFunction::new( + WindowFuncKind::Aggregate(AggKind::Sum), + partition_by.clone(), + order_by.clone(), + vec![squared_input_expr], + frame.clone(), + )?)) + .cast_explicit(return_type.clone())?; let sum_expr = ExprImpl::from(self.push_window_func(WindowFunction::new( WindowFuncKind::Aggregate(AggKind::Sum), @@ -161,7 +160,8 @@ impl<'a> LogicalOverWindowBuilder<'a> { order_by.clone(), args.clone(), frame.clone(), - )?)).cast_explicit(return_type.clone())?; + )?)) + .cast_explicit(return_type.clone())?; let count_expr = ExprImpl::from(self.push_window_func(WindowFunction::new( WindowFuncKind::Aggregate(AggKind::Count), @@ -171,32 +171,26 @@ impl<'a> LogicalOverWindowBuilder<'a> { frame, )?)); - let square_of_sum_expr = ExprImpl::from( - FunctionCall::new( - ExprType::Multiply, - vec![sum_expr.clone(), sum_expr], - )?, - ); - - let numerator_expr = ExprImpl::from( - FunctionCall::new( - ExprType::Subtract, - vec![ - sum_of_squares_expr, - ExprImpl::from( - FunctionCall::new( - ExprType::Divide, - vec![square_of_sum_expr, count_expr.clone()], - )?, - ), - ], - )?, - ); + let square_of_sum_expr = ExprImpl::from(FunctionCall::new( + ExprType::Multiply, + vec![sum_expr.clone(), sum_expr], + )?); + + let numerator_expr = ExprImpl::from(FunctionCall::new( + ExprType::Subtract, + vec![ + sum_of_squares_expr, + ExprImpl::from(FunctionCall::new( + ExprType::Divide, + vec![square_of_sum_expr, count_expr.clone()], + )?), + ], + )?); let denominator_expr = match agg_kind { AggKind::StddevPop | AggKind::VarPop => count_expr.clone(), - AggKind::StddevSamp | AggKind::VarSamp => ExprImpl::from( - FunctionCall::new( + AggKind::StddevSamp | AggKind::VarSamp => { + ExprImpl::from(FunctionCall::new( ExprType::Subtract, vec![ count_expr.clone(), @@ -205,17 +199,15 @@ impl<'a> LogicalOverWindowBuilder<'a> { DataType::Int64, )), ], - )?, - ), + )?) + } _ => unreachable!(), }; - let mut target_expr = ExprImpl::from( - FunctionCall::new( - ExprType::Divide, - vec![numerator_expr, denominator_expr], - )?, - ); + let mut target_expr = ExprImpl::from(FunctionCall::new( + ExprType::Divide, + vec![numerator_expr, denominator_expr], + )?); if matches!(agg_kind, AggKind::StddevPop | AggKind::StddevSamp) { target_expr = ExprImpl::from( @@ -224,31 +216,24 @@ impl<'a> LogicalOverWindowBuilder<'a> { } match agg_kind { - AggKind::VarPop | AggKind::StddevPop => { - Ok(target_expr) - } + AggKind::VarPop | AggKind::StddevPop => Ok(target_expr), AggKind::StddevSamp | AggKind::VarSamp => { - let less_than_expr = ExprImpl::from( - FunctionCall::new( - ExprType::LessThanOrEqual, - vec![ - count_expr, - ExprImpl::from(Literal::new( - Datum::from(ScalarImpl::Int64(1)), - DataType::Int64, - )), - ], - )?, - ); - let null_expr = - ExprImpl::from(Literal::new(None, return_type)); - - let case_expr = ExprImpl::from( - FunctionCall::new( - ExprType::Case, - vec![less_than_expr, null_expr, target_expr], - )?, - ); + let less_than_expr = ExprImpl::from(FunctionCall::new( + ExprType::LessThanOrEqual, + vec![ + count_expr, + ExprImpl::from(Literal::new( + Datum::from(ScalarImpl::Int64(1)), + DataType::Int64, + )), + ], + )?); + let null_expr = ExprImpl::from(Literal::new(None, return_type)); + + let case_expr = ExprImpl::from(FunctionCall::new( + ExprType::Case, + vec![less_than_expr, null_expr, target_expr], + )?); Ok(case_expr) } _ => unreachable!(), @@ -307,21 +292,19 @@ impl<'a> OverWindowProjectBuilder<'a> { window_function: &WindowFunction, ) -> std::result::Result<(), ErrorCode> { if let WindowFuncKind::Aggregate(agg_kind) = window_function.kind - && matches!( - agg_kind, - AggKind::StddevPop - | AggKind::StddevSamp - | AggKind::VarPop - | AggKind::VarSamp - ) - { - let input = window_function.args.iter().exactly_one().unwrap(); - let squared_input_expr = ExprImpl::from( - FunctionCall::new(ExprType::Multiply, vec![input.clone(), input.clone()]) - .unwrap(), - ); - self.builder.add_expr(&squared_input_expr).map_err(|err| ErrorCode::NotImplemented(format!("{err} inside args"), None.into()))?; - } + && matches!( + agg_kind, + AggKind::StddevPop | AggKind::StddevSamp | AggKind::VarPop | AggKind::VarSamp + ) + { + let input = window_function.args.iter().exactly_one().unwrap(); + let squared_input_expr = ExprImpl::from( + FunctionCall::new(ExprType::Multiply, vec![input.clone(), input.clone()]).unwrap(), + ); + self.builder.add_expr(&squared_input_expr).map_err(|err| { + ErrorCode::NotImplemented(format!("{err} inside args"), None.into()) + })?; + } for arg in &window_function.args { self.builder.add_expr(arg).map_err(|err| { ErrorCode::NotImplemented(format!("{err} inside args"), None.into()) diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index cac051957b0a..542178a830b7 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -571,7 +571,9 @@ impl ToStream for LogicalSource { } assert!(!(self.core.gen_row_id && self.core.for_table)); - if let Some(row_id_index) = self.core.row_id_index && self.core.gen_row_id { + if let Some(row_id_index) = self.core.row_id_index + && self.core.gen_row_id + { plan = StreamRowIdGen::new(plan, row_id_index).into(); } Ok(plan) diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index d0c3077f8328..ca208f94df05 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -57,7 +57,9 @@ impl StreamGroupTopN { let mut stream_key = core .stream_key() .expect("logical node should have stream key here"); - if let Some(vnode_col_idx) = vnode_col_idx && stream_key.len() > 1 { + if let Some(vnode_col_idx) = vnode_col_idx + && stream_key.len() > 1 + { // The output stream key of `GroupTopN` is a union of group key and input stream key, // while vnode is calculated from a subset of input stream key. So we can safely remove // the vnode column from output stream key. While at meanwhile we cannot leave the stream key diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 36aff15d9605..1dc35bb90990 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -161,7 +161,9 @@ impl StreamHashJoin { ) }; let mut is_valuable_inequality = do_state_cleaning; - if let Some(internal) = internal && !watermark_columns.contains(internal) { + if let Some(internal) = internal + && !watermark_columns.contains(internal) + { watermark_columns.insert(internal); is_valuable_inequality = true; } diff --git a/src/frontend/src/optimizer/rule/except_merge_rule.rs b/src/frontend/src/optimizer/rule/except_merge_rule.rs index 0979aaf262f0..77b4c827d07c 100644 --- a/src/frontend/src/optimizer/rule/except_merge_rule.rs +++ b/src/frontend/src/optimizer/rule/except_merge_rule.rs @@ -26,7 +26,9 @@ impl Rule for ExceptMergeRule { let top_except_inputs = top_except.inputs(); let (left_most_input, remain_vec) = top_except_inputs.split_at(1); - if let Some(bottom_except) = left_most_input[0].as_logical_except() && bottom_except.all() == top_all { + if let Some(bottom_except) = left_most_input[0].as_logical_except() + && bottom_except.all() == top_all + { let mut new_inputs = vec![]; new_inputs.extend(bottom_except.inputs()); new_inputs.extend(remain_vec.iter().cloned()); diff --git a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs index 30435d635568..5ae42877f645 100644 --- a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs +++ b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs @@ -122,11 +122,8 @@ impl Rule for IndexDeltaJoinRule { if chain_type != table_scan.chain_type() { Some( - StreamTableScan::new_with_chain_type( - table_scan.core().clone(), - chain_type, - ) - .into(), + StreamTableScan::new_with_chain_type(table_scan.core().clone(), chain_type) + .into(), ) } else { Some(table_scan.clone().into()) diff --git a/src/frontend/src/optimizer/rule/intersect_merge_rule.rs b/src/frontend/src/optimizer/rule/intersect_merge_rule.rs index c21ea85e6baf..3110b9e3881b 100644 --- a/src/frontend/src/optimizer/rule/intersect_merge_rule.rs +++ b/src/frontend/src/optimizer/rule/intersect_merge_rule.rs @@ -24,7 +24,9 @@ impl Rule for IntersectMergeRule { let mut new_inputs = vec![]; let mut has_merge = false; for input in top_intersect.inputs() { - if let Some(bottom_intersect) = input.as_logical_intersect() && bottom_intersect.all() == top_all { + if let Some(bottom_intersect) = input.as_logical_intersect() + && bottom_intersect.all() == top_all + { new_inputs.extend(bottom_intersect.inputs()); has_merge = true; } else { diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs index 93637d3ba819..496a51d6d9f3 100644 --- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs @@ -161,7 +161,9 @@ fn handle_rank_preds(rank_preds: &[ExprImpl], window_func_pos: usize) -> Option< assert_eq!(input_ref.index, window_func_pos); let v = v.cast_implicit(DataType::Int64).ok()?.fold_const().ok()??; let v = *v.as_int64(); - if let Some(eq) = eq && eq != v { + if let Some(eq) = eq + && eq != v + { tracing::warn!( "Failed to optimize rank predicate with conflicting equal conditions." ); diff --git a/src/frontend/src/optimizer/rule/union_merge_rule.rs b/src/frontend/src/optimizer/rule/union_merge_rule.rs index 169c9b72c530..cd7199c2a505 100644 --- a/src/frontend/src/optimizer/rule/union_merge_rule.rs +++ b/src/frontend/src/optimizer/rule/union_merge_rule.rs @@ -24,7 +24,9 @@ impl Rule for UnionMergeRule { let mut new_inputs = vec![]; let mut has_merge = false; for input in top_union.inputs() { - if let Some(bottom_union) = input.as_logical_union() && bottom_union.all() == top_all { + if let Some(bottom_union) = input.as_logical_union() + && bottom_union.all() == top_all + { new_inputs.extend(bottom_union.inputs()); has_merge = true; } else { diff --git a/src/frontend/src/planner/query.rs b/src/frontend/src/planner/query.rs index a5f9651c4141..d00ce93598a1 100644 --- a/src/frontend/src/planner/query.rs +++ b/src/frontend/src/planner/query.rs @@ -56,7 +56,9 @@ impl Planner { } let mut out_fields = FixedBitSet::with_capacity(plan.schema().len()); out_fields.insert_range(..plan.schema().len() - extra_order_exprs_len); - if let Some(field) = plan.schema().fields.get(0) && field.name == "projected_row_id" { + if let Some(field) = plan.schema().fields.get(0) + && field.name == "projected_row_id" + { // Do not output projected_row_id hidden column. out_fields.set(0, false); } diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index 96b32680309d..5266ce55710e 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -67,7 +67,9 @@ impl Planner { exprs.iter().map(|expr| (expr.clone(), false)).collect(); let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len(); let mut order_iter = order.iter().map(|o| &select_items[o.column_index]); - while uncovered_distinct_on_exprs_cnt > 0 && let Some(order_expr) = order_iter.next() { + while uncovered_distinct_on_exprs_cnt > 0 + && let Some(order_expr) = order_iter.next() + { match distinct_on_exprs.get_mut(order_expr) { Some(has_been_covered) => { if !*has_been_covered { @@ -179,7 +181,9 @@ impl Planner { if let BoundDistinct::Distinct = distinct { let fields = root.schema().fields(); - let group_key = if let Some(field) = fields.get(0) && field.name == "projected_row_id" { + let group_key = if let Some(field) = fields.get(0) + && field.name == "projected_row_id" + { // Do not group by projected_row_id hidden column. (1..fields.len()).collect() } else { diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 7c3030370d56..cee3db2986cf 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -344,7 +344,10 @@ impl StageRunner { // the task. // We schedule the task to the worker node that owns the data partition. let parallel_unit_ids = vnode_bitmaps.keys().cloned().collect_vec(); - let workers = self.worker_node_manager.manager.get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + let workers = self + .worker_node_manager + .manager + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; for (i, (parallel_unit_id, worker)) in parallel_unit_ids .into_iter() .zip_eq_fast(workers.into_iter()) diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 28cfa25b70bf..d3d558ef4eff 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -282,7 +282,10 @@ impl LocalQueryExecution { // `exchange_source`. let (parallel_unit_ids, vnode_bitmaps): (Vec<_>, Vec<_>) = vnode_bitmaps.clone().into_iter().unzip(); - let workers = self.worker_node_manager.manager.get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + let workers = self + .worker_node_manager + .manager + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; for (idx, (worker_node, partition)) in (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate() { @@ -355,8 +358,12 @@ impl LocalQueryExecution { sources.push(exchange_source); } } else { - let second_stage_plan_node = - self.convert_plan_node(&second_stage.root, &mut None, None, next_executor_id)?; + let second_stage_plan_node = self.convert_plan_node( + &second_stage.root, + &mut None, + None, + next_executor_id, + )?; let second_stage_plan_fragment = PlanFragment { root: Some(second_stage_plan_node), exchange_info: Some(ExchangeInfo { diff --git a/src/frontend/src/utils/condition.rs b/src/frontend/src/utils/condition.rs index 5305f9f1f356..332dda739bb3 100644 --- a/src/frontend/src/utils/condition.rs +++ b/src/frontend/src/utils/condition.rs @@ -889,7 +889,9 @@ impl Condition { } // remove all constant boolean `true` res.retain(|expr| { - if let Some(v) = try_get_bool_constant(expr) && v { + if let Some(v) = try_get_bool_constant(expr) + && v + { false } else { true diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index 2e9e6d1bb01e..500167f9380d 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -261,57 +261,58 @@ impl StreamGraphFormatter { self.pretty_add_table(node.get_state_table().unwrap()), )); } - stream_node::NodeBody::Chain(node) => { - fields.push(( - "state table", - self.pretty_add_table(node.get_state_table().unwrap()), - )) - } - stream_node::NodeBody::Sort(node) => { + stream_node::NodeBody::Chain(node) => fields.push(( + "state table", + self.pretty_add_table(node.get_state_table().unwrap()), + )), + stream_node::NodeBody::Sort(node) => { fields.push(( "state table", self.pretty_add_table(node.get_state_table().unwrap()), )); } stream_node::NodeBody::WatermarkFilter(node) => { - let vec = node.tables.iter().map(|tb| self.pretty_add_table(tb) ).collect_vec(); - fields.push(("state tables", Pretty::Array(vec) - )); + let vec = node + .tables + .iter() + .map(|tb| self.pretty_add_table(tb)) + .collect_vec(); + fields.push(("state tables", Pretty::Array(vec))); } stream_node::NodeBody::EowcOverWindow(node) => { fields.push(( - "state table", - self.pretty_add_table(node.get_state_table().unwrap()), + "state table", + self.pretty_add_table(node.get_state_table().unwrap()), )); } - stream_node::NodeBody::OverWindow(node) =>{ + stream_node::NodeBody::OverWindow(node) => { fields.push(( "state table", self.pretty_add_table(node.get_state_table().unwrap()), )); } - stream_node::NodeBody::Project(_) | - stream_node::NodeBody::Filter(_) | - stream_node::NodeBody::StatelessSimpleAgg(_) | - stream_node::NodeBody::HopWindow(_) | - stream_node::NodeBody::Merge(_) | - stream_node::NodeBody::Exchange(_) | - stream_node::NodeBody::BatchPlan(_) | - stream_node::NodeBody::Lookup(_) | - stream_node::NodeBody::LookupUnion(_) | - stream_node::NodeBody::Union(_) | - stream_node::NodeBody::DeltaIndexJoin(_) | - stream_node::NodeBody::Sink(_) | - stream_node::NodeBody::Expand(_) | - stream_node::NodeBody::ProjectSet(_) | - stream_node::NodeBody::Dml(_) | - stream_node::NodeBody::RowIdGen(_) | - stream_node::NodeBody::TemporalJoin(_) | - stream_node::NodeBody::BarrierRecv(_) | - stream_node::NodeBody::Values(_) | - stream_node::NodeBody::Source(_) | - stream_node::NodeBody::StreamFsFetch(_) | - stream_node::NodeBody::NoOp(_) => {} + stream_node::NodeBody::Project(_) + | stream_node::NodeBody::Filter(_) + | stream_node::NodeBody::StatelessSimpleAgg(_) + | stream_node::NodeBody::HopWindow(_) + | stream_node::NodeBody::Merge(_) + | stream_node::NodeBody::Exchange(_) + | stream_node::NodeBody::BatchPlan(_) + | stream_node::NodeBody::Lookup(_) + | stream_node::NodeBody::LookupUnion(_) + | stream_node::NodeBody::Union(_) + | stream_node::NodeBody::DeltaIndexJoin(_) + | stream_node::NodeBody::Sink(_) + | stream_node::NodeBody::Expand(_) + | stream_node::NodeBody::ProjectSet(_) + | stream_node::NodeBody::Dml(_) + | stream_node::NodeBody::RowIdGen(_) + | stream_node::NodeBody::TemporalJoin(_) + | stream_node::NodeBody::BarrierRecv(_) + | stream_node::NodeBody::Values(_) + | stream_node::NodeBody::Source(_) + | stream_node::NodeBody::StreamFsFetch(_) + | stream_node::NodeBody::NoOp(_) => {} }; if self.verbose { diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index 7dc7a4688e64..64b2e24c756d 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -63,12 +63,16 @@ impl SpaceReclaimCompactionPicker { ) -> Option { assert!(!levels.levels.is_empty()); let mut select_input_ssts = vec![]; - if let Some(l0) = levels.l0.as_ref() && state.last_level == 0 { + if let Some(l0) = levels.l0.as_ref() + && state.last_level == 0 + { // only pick trivial reclaim sstables because this kind of task could be optimized and do not need send to compactor. for level in &l0.sub_levels { for sst in &level.table_infos { let exist_count = self.exist_table_count(sst); - if exist_count == sst.table_ids.len() || level_handlers[0].is_pending_compact( &sst.sst_id) { + if exist_count == sst.table_ids.len() + || level_handlers[0].is_pending_compact(&sst.sst_id) + { if !select_input_ssts.is_empty() { break; } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 1b3a284e9ccc..678374b8c571 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -353,7 +353,7 @@ impl HummockManager { && !env.opts.do_not_config_object_storage_lifecycle { let is_bucket_expiration_configured = s3.inner().configure_bucket_lifecycle().await; - if is_bucket_expiration_configured{ + if is_bucket_expiration_configured { return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in. Please disable object expiration and restart the cluster.") .into()); diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 8b26b8afa11d..c3445da8cc57 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -78,7 +78,9 @@ impl FragmentManagerCore { .filter(|tf| tf.state() != State::Initial) .flat_map(|table_fragments| { table_fragments.fragments.values().filter_map(|fragment| { - if let Some(id_filter) = id_filter.as_ref() && !id_filter.contains(&fragment.fragment_id) { + if let Some(id_filter) = id_filter.as_ref() + && !id_filter.contains(&fragment.fragment_id) + { return None; } let parallelism = match fragment.vnode_mapping.as_ref() { @@ -687,7 +689,9 @@ impl FragmentManager { .with_context(|| format!("table_fragment not exist: id={}", table_id))?; for status in table_fragment.actor_status.values_mut() { - if let Some(pu) = &status.parallel_unit && migration_plan.parallel_unit_plan.contains_key(&pu.id) { + if let Some(pu) = &status.parallel_unit + && migration_plan.parallel_unit_plan.contains_key(&pu.id) + { status.parallel_unit = Some(migration_plan.parallel_unit_plan[&pu.id].clone()); } } @@ -717,8 +721,9 @@ impl FragmentManager { .values() .filter(|tf| { for status in tf.actor_status.values() { - if let Some(pu) = &status.parallel_unit && - migration_plan.parallel_unit_plan.contains_key(&pu.id) { + if let Some(pu) = &status.parallel_unit + && migration_plan.parallel_unit_plan.contains_key(&pu.id) + { return true; } } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index d2007dcab45d..781dd244c1c5 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -742,7 +742,8 @@ impl CatalogManager { fn assert_table_creating(tables: &BTreeMap, table: &Table) { if let Some(t) = tables.get(&table.id) && let Ok(StreamJobStatus::Creating) = t.get_stream_job_status() - {} else { + { + } else { panic!("Table must be in creating procedure: {table:#?}") } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8f6e7c0be691..c08281a2f59e 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -402,11 +402,15 @@ impl DdlController { async fn delete_vpc_endpoint(&self, connection: &Connection) -> MetaResult<()> { // delete AWS vpc endpoint if let Some(connection::Info::PrivateLinkService(svc)) = &connection.info - && svc.get_provider()? == PbPrivateLinkProvider::Aws { + && svc.get_provider()? == PbPrivateLinkProvider::Aws + { if let Some(aws_cli) = self.aws_client.as_ref() { aws_cli.delete_vpc_endpoint(&svc.endpoint_id).await?; } else { - warn!("AWS client is not initialized, skip deleting vpc endpoint {}", svc.endpoint_id); + warn!( + "AWS client is not initialized, skip deleting vpc endpoint {}", + svc.endpoint_id + ); } } Ok(()) diff --git a/src/meta/src/rpc/election/etcd.rs b/src/meta/src/rpc/election/etcd.rs index f30d8253cb95..dbcd62abfdbb 100644 --- a/src/meta/src/rpc/election/etcd.rs +++ b/src/meta/src/rpc/election/etcd.rs @@ -94,7 +94,9 @@ impl ElectionClient for EtcdElectionClient { let (mut keeper, mut resp_stream) = self.client.keep_alive(lease_id).await?; let _resp = keeper.keep_alive().await?; let resp = resp_stream.message().await?; - if let Some(resp) = resp && resp.ttl() <= 0 { + if let Some(resp) = resp + && resp.ttl() <= 0 + { tracing::info!("lease {} expired or revoked, re-granting", lease_id); if restored_leader { tracing::info!("restored leader lease {} lost", lease_id); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 77a784c64ac0..003252582d6c 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -126,7 +126,11 @@ impl CreatingStreamingJobInfo { && let Some(shutdown_tx) = job.shutdown_tx.take() { let (tx, rx) = oneshot::channel(); - if shutdown_tx.send(CreatingState::Canceling { finish_tx: tx }).await.is_ok() { + if shutdown_tx + .send(CreatingState::Canceling { finish_tx: tx }) + .await + .is_ok() + { receivers.insert(job_id, rx); } else { tracing::warn!("failed to send canceling state"); diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index 02ee6c744d33..0cd266abb293 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -237,7 +237,9 @@ impl InMemObjectStore { .map(|(_, obj)| obj) .ok_or_else(|| Error::not_found(format!("no object at path '{}'", path)))?; - if let Some(end) = range.end() && end > obj.len() { + if let Some(end) = range.end() + && end > obj.len() + { return Err(Error::other("bad block offset and size").into()); } diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index ff682946b065..204d9cac2575 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -86,7 +86,9 @@ impl ObjectStore for OpendalObjectStore { .await? }; - if let Some(len) = range.len() && len != data.len() { + if let Some(len) = range.len() + && len != data.len() + { return Err(ObjectError::internal(format!( "mismatched size: expected {}, found {} when reading {} at {:?}", len, diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 89f9aa5a053d..6dce56ed4c75 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -386,7 +386,9 @@ impl ObjectStore for S3ObjectStore { ) .await?; - if let Some(len) = range.len() && len != val.len() { + if let Some(len) = range.len() + && len != val.len() + { return Err(ObjectError::internal(format!( "mismatched size: expected {}, found {} when reading {} at {:?}", len, diff --git a/src/risedevtool/src/config/provide_expander.rs b/src/risedevtool/src/config/provide_expander.rs index 9948c81b0336..2860ffd6c985 100644 --- a/src/risedevtool/src/config/provide_expander.rs +++ b/src/risedevtool/src/config/provide_expander.rs @@ -70,7 +70,9 @@ impl ProvideExpander { .into_hash() .ok_or_else(|| anyhow!("expect a hashmap"))?; let map = map.into_iter().map(|(k, v)| { - if let Some(k) = k.as_str() && k.starts_with("provide-") { + if let Some(k) = k.as_str() + && k.starts_with("provide-") + { let array = v .as_vec() .ok_or_else(|| anyhow!("expect an array of provide-"))?; diff --git a/src/risedevtool/src/preflight_check.rs b/src/risedevtool/src/preflight_check.rs index 47fb8235495f..e29e3f4709c9 100644 --- a/src/risedevtool/src/preflight_check.rs +++ b/src/risedevtool/src/preflight_check.rs @@ -26,7 +26,10 @@ fn preflight_check_proxy() -> Result<()> { || env::var("all_proxy").is_ok() || env::var("ALL_PROXY").is_ok() { - if let Ok(x) = env::var("no_proxy") && x.contains("127.0.0.1") && x.contains("::1") { + if let Ok(x) = env::var("no_proxy") + && x.contains("127.0.0.1") + && x.contains("::1") + { println!( "[{}] {} - You are using proxies for all RisingWave components. Please make sure that `no_proxy` is set for all worker nodes within the cluster.", style("risedev-preflight-check").bold(), diff --git a/src/risedevtool/src/task/compactor_service.rs b/src/risedevtool/src/task/compactor_service.rs index adecc007b820..c7dcbb8c1179 100644 --- a/src/risedevtool/src/task/compactor_service.rs +++ b/src/risedevtool/src/task/compactor_service.rs @@ -34,7 +34,9 @@ impl CompactorService { fn compactor(&self) -> Result { let prefix_bin = env::var("PREFIX_BIN")?; - if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") && x == "true" { + if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") + && x == "true" + { Ok(Command::new( Path::new(&prefix_bin).join("risingwave").join("compactor"), )) diff --git a/src/risedevtool/src/task/compute_node_service.rs b/src/risedevtool/src/task/compute_node_service.rs index ced6bec115f6..20d01f33f53d 100644 --- a/src/risedevtool/src/task/compute_node_service.rs +++ b/src/risedevtool/src/task/compute_node_service.rs @@ -34,7 +34,9 @@ impl ComputeNodeService { fn compute_node(&self) -> Result { let prefix_bin = env::var("PREFIX_BIN")?; - if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") && x == "true" { + if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") + && x == "true" + { Ok(Command::new( Path::new(&prefix_bin) .join("risingwave") diff --git a/src/risedevtool/src/task/frontend_service.rs b/src/risedevtool/src/task/frontend_service.rs index cf0213028e46..b19167abbfdd 100644 --- a/src/risedevtool/src/task/frontend_service.rs +++ b/src/risedevtool/src/task/frontend_service.rs @@ -35,7 +35,9 @@ impl FrontendService { fn frontend(&self) -> Result { let prefix_bin = env::var("PREFIX_BIN")?; - if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") && x == "true" { + if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") + && x == "true" + { Ok(Command::new( Path::new(&prefix_bin) .join("risingwave") diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index 2494a9eceaf1..df48b59c2f1f 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -35,7 +35,9 @@ impl MetaNodeService { fn meta_node(&self) -> Result { let prefix_bin = env::var("PREFIX_BIN")?; - if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") && x == "true" { + if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") + && x == "true" + { Ok(Command::new( Path::new(&prefix_bin).join("risingwave").join("meta-node"), )) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index b8603fbe46e6..b58a13027261 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -247,7 +247,8 @@ impl MetaClient { }) .await?; if let Some(status) = &add_worker_resp.status - && status.code() == risingwave_pb::common::status::Code::UnknownWorker { + && status.code() == risingwave_pb::common::status::Code::UnknownWorker + { tracing::error!("invalid worker: {}", status.message); std::process::exit(1); } diff --git a/src/sqlparser/src/tokenizer.rs b/src/sqlparser/src/tokenizer.rs index 4fafde820f41..1f3b99314d14 100644 --- a/src/sqlparser/src/tokenizer.rs +++ b/src/sqlparser/src/tokenizer.rs @@ -1019,7 +1019,9 @@ impl<'a> Tokenizer<'a> { ) -> Result<(), String> { let mut unicode_seq: String = String::with_capacity(len); for _ in 0..len { - if let Some(c) = chars.peek() && c.is_ascii_hexdigit() { + if let Some(c) = chars.peek() + && c.is_ascii_hexdigit() + { unicode_seq.push(chars.next().unwrap()); } else { break; @@ -1063,7 +1065,9 @@ impl<'a> Tokenizer<'a> { let mut unicode_seq: String = String::with_capacity(3); unicode_seq.push(digit); for _ in 0..2 { - if let Some(c) = chars.peek() && matches!(*c, '0'..='7') { + if let Some(c) = chars.peek() + && matches!(*c, '0'..='7') + { unicode_seq.push(chars.next().unwrap()); } else { break; diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 1925acbce753..e46965b16e5f 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -701,7 +701,9 @@ where while iter.is_valid() { progress_key_num += 1; - if let Some(task_progress) = task_progress.as_ref() && progress_key_num >= PROGRESS_KEY_INTERVAL { + if let Some(task_progress) = task_progress.as_ref() + && progress_key_num >= PROGRESS_KEY_INTERVAL + { task_progress.inc_progress_key(progress_key_num); progress_key_num = 0; } @@ -751,7 +753,9 @@ where } del_iter.next(); progress_key_num += 1; - if let Some(task_progress) = task_progress.as_ref() && progress_key_num >= PROGRESS_KEY_INTERVAL { + if let Some(task_progress) = task_progress.as_ref() + && progress_key_num >= PROGRESS_KEY_INTERVAL + { task_progress.inc_progress_key(progress_key_num); progress_key_num = 0; } @@ -858,14 +862,18 @@ where .await?; del_iter.next(); progress_key_num += 1; - if let Some(task_progress) = task_progress.as_ref() && progress_key_num >= PROGRESS_KEY_INTERVAL { + if let Some(task_progress) = task_progress.as_ref() + && progress_key_num >= PROGRESS_KEY_INTERVAL + { task_progress.inc_progress_key(progress_key_num); progress_key_num = 0; } } } - if let Some(task_progress) = task_progress.as_ref() && progress_key_num > 0 { + if let Some(task_progress) = task_progress.as_ref() + && progress_key_num > 0 + { // Avoid losing the progress_key_num in the last Interval task_progress.inc_progress_key(progress_key_num); } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index a07da55fb704..3b9e9dc587fe 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1333,7 +1333,9 @@ mod tests { assert_eq!(epoch, uploader.max_sealed_epoch); // check sealed data has two imms let imms_by_epoch = uploader.sealed_data.imms_by_epoch(); - if let Some((e, imms)) = imms_by_epoch.last_key_value() && *e == epoch{ + if let Some((e, imms)) = imms_by_epoch.last_key_value() + && *e == epoch + { assert_eq!(2, imms.len()); } diff --git a/src/storage/src/hummock/iterator/delete_range_iterator.rs b/src/storage/src/hummock/iterator/delete_range_iterator.rs index 7936fb994a92..4d943ce63d3f 100644 --- a/src/storage/src/hummock/iterator/delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/delete_range_iterator.rs @@ -291,7 +291,10 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { async { self.tmp_buffer .push(self.heap.pop().expect("no inner iter")); - while let Some(node) = self.heap.peek() && node.is_valid() && node.next_extended_user_key() == self.tmp_buffer[0].next_extended_user_key() { + while let Some(node) = self.heap.peek() + && node.is_valid() + && node.next_extended_user_key() == self.tmp_buffer[0].next_extended_user_key() + { self.tmp_buffer.push(self.heap.pop().unwrap()); } for node in &self.tmp_buffer { diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 60553b5aa09a..11af5e7deaea 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -77,7 +77,15 @@ pub async fn get_from_sstable_info( // Bloom filter key is the distribution key, which is no need to be the prefix of pk, and do not // contain `TablePrefix` and `VnodePrefix`. if let Some(hash) = dist_key_hash - && !hit_sstable_bloom_filter(sstable.value(), &(Bound::Included(full_key.user_key), Bound::Included(full_key.user_key)), hash, local_stats) + && !hit_sstable_bloom_filter( + sstable.value(), + &( + Bound::Included(full_key.user_key), + Bound::Included(full_key.user_key), + ), + hash, + local_stats, + ) { if !read_options.ignore_range_tombstone { let delete_epoch = get_min_delete_range_epoch_from_sstable( diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 9bee67e78ca6..4baabb4fdafe 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -212,9 +212,17 @@ where } } } - if need_seal_current && let Some(event) = builder.last_range_tombstone() && event.new_epoch != HummockEpoch::MAX { + if need_seal_current + && let Some(event) = builder.last_range_tombstone() + && event.new_epoch != HummockEpoch::MAX + { last_range_tombstone_epoch = event.new_epoch; - if event.event_key.left_user_key.as_ref().eq(&full_key.user_key) { + if event + .event_key + .left_user_key + .as_ref() + .eq(&full_key.user_key) + { // If the last range tombstone equals the new key, we can not create new file because we must keep the new key in origin file. need_seal_current = false; } else { @@ -295,7 +303,10 @@ where /// Add kv pair to sstable. pub async fn add_monotonic_delete(&mut self, event: MonotonicDeleteEvent) -> HummockResult<()> { - if let Some(builder) = self.current_builder.as_mut() && builder.reach_capacity() && event.new_epoch != HummockEpoch::MAX { + if let Some(builder) = self.current_builder.as_mut() + && builder.reach_capacity() + && event.new_epoch != HummockEpoch::MAX + { if builder.last_range_tombstone_epoch() != HummockEpoch::MAX { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: event.event_key.clone(), diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 73d6110cacd2..25ac82636c77 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -821,7 +821,9 @@ impl SstableWriter for StreamingUploadWriter { self.sstable_store.insert_meta_cache(self.object_id, meta); // Add block cache. - if let CachePolicy::Fill(fill_high_priority_cache) = self.policy && !self.blocks.is_empty() { + if let CachePolicy::Fill(fill_high_priority_cache) = self.policy + && !self.blocks.is_empty() + { for (block_idx, block) in self.blocks.into_iter().enumerate() { self.sstable_store.block_cache.insert( self.object_id, diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index ac52b65e5488..c454ad94339d 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -524,7 +524,9 @@ impl RangeKvStateStore { } last_user_key = Some(key.user_key.clone()); } - if let Some(limit) = limit && data.len() >= limit { + if let Some(limit) = limit + && data.len() >= limit + { break; } } diff --git a/src/stream/src/common/log_store_impl/in_mem.rs b/src/stream/src/common/log_store_impl/in_mem.rs index 35040be82c93..8ff3e05d0bea 100644 --- a/src/stream/src/common/log_store_impl/in_mem.rs +++ b/src/stream/src/common/log_store_impl/in_mem.rs @@ -222,7 +222,9 @@ impl LogReader for BoundedInMemLogStoreReader { next_epoch, } = &self.epoch_progress { - if let TruncateOffset::Barrier {epoch} = offset && epoch == *sealed_epoch { + if let TruncateOffset::Barrier { epoch } = offset + && epoch == *sealed_epoch + { let sealed_epoch = *sealed_epoch; self.epoch_progress = Consuming(*next_epoch); self.truncated_epoch_tx diff --git a/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs index ed1c495c81d7..d0773b008c16 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs @@ -250,7 +250,9 @@ impl LogStoreBufferSender { pub(crate) fn pop_truncation(&self, curr_epoch: u64) -> Option { let mut inner = self.buffer.inner(); let mut ret = None; - while let Some((epoch, _)) = inner.truncation_list.front() && *epoch < curr_epoch { + while let Some((epoch, _)) = inner.truncation_list.front() + && *epoch < curr_epoch + { ret = inner.truncation_list.pop_front(); } ret @@ -380,7 +382,9 @@ impl LogStoreBufferReceiver { } } if let Some((epoch, seq_id)) = latest_offset { - if let Some((prev_epoch, ref mut prev_seq_id)) = inner.truncation_list.back_mut() && *prev_epoch == epoch { + if let Some((prev_epoch, ref mut prev_seq_id)) = inner.truncation_list.back_mut() + && *prev_epoch == epoch + { *prev_seq_id = seq_id; } else { inner.truncation_list.push_back((epoch, seq_id)); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index d3102aa936fa..ba69209887b6 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -586,7 +586,9 @@ impl LogStoreRowOpStream { .pop() .expect("have check non-empty"); self.row_streams.push(stream.into_future()); - while let Some((stream_epoch, _)) = self.not_started_streams.last() && *stream_epoch == epoch { + while let Some((stream_epoch, _)) = self.not_started_streams.last() + && *stream_epoch == epoch + { let (_, stream) = self.not_started_streams.pop().expect("should not be empty"); self.row_streams.push(stream.into_future()); } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 37e788a3e7ab..f0ef04ab7ec9 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -852,14 +852,14 @@ where match op { Op::Insert | Op::UpdateInsert => { if USE_WATERMARK_CACHE && let Some(ref pk) = key { - self.watermark_cache.insert(pk); - } + self.watermark_cache.insert(pk); + } self.insert_inner(TableKey(key_bytes), value); } Op::Delete | Op::UpdateDelete => { if USE_WATERMARK_CACHE && let Some(ref pk) = key { - self.watermark_cache.delete(pk); - } + self.watermark_cache.delete(pk); + } self.delete_inner(TableKey(key_bytes), value); } } @@ -870,14 +870,14 @@ where match op { Op::Insert | Op::UpdateInsert => { if USE_WATERMARK_CACHE && let Some(ref pk) = key { - self.watermark_cache.insert(pk); - } + self.watermark_cache.insert(pk); + } self.insert_inner(TableKey(key_bytes), value); } Op::Delete | Op::UpdateDelete => { if USE_WATERMARK_CACHE && let Some(ref pk) = key { - self.watermark_cache.delete(pk); - } + self.watermark_cache.delete(pk); + } self.delete_inner(TableKey(key_bytes), value); } } @@ -1026,11 +1026,21 @@ where }); // Compute Delete Ranges - if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix && let Some(first_byte) = watermark_suffix.first() { + if should_clean_watermark + && let Some(watermark_suffix) = watermark_suffix + && let Some(first_byte) = watermark_suffix.first() + { trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{ self.vnodes.iter_vnodes().collect_vec() }, "delete range"); - if prefix_serializer.as_ref().unwrap().get_order_types().first().unwrap().is_ascending() { + if prefix_serializer + .as_ref() + .unwrap() + .get_order_types() + .first() + .unwrap() + .is_ascending() + { // We either serialize null into `0u8`, data into `(1u8 || scalar)`, or serialize null // into `1u8`, data into `(0u8 || scalar)`. We do not want to delete null // here, so `range_begin_suffix` cannot be `vec![]` when null is represented as `0u8`. diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index ae5e8696de6c..0bd6e4784158 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -450,7 +450,9 @@ where while let Some(Ok(msg)) = upstream.next().await { if let Some(msg) = mapping_message(msg, &self.output_indices) { // If not finished then we need to update state, otherwise no need. - if let Message::Barrier(barrier) = &msg && !is_completely_finished { + if let Message::Barrier(barrier) = &msg + && !is_completely_finished + { // If snapshot was empty, we do not need to backfill, // but we still need to persist the finished state. // We currently persist it on the second barrier here rather than first. @@ -458,9 +460,13 @@ where // since it expects to have been initialized in previous epoch // (there's no epoch before the first epoch). if is_snapshot_empty { - let finished_state = construct_initial_finished_state(pk_in_output_indices.len()); + let finished_state = + construct_initial_finished_state(pk_in_output_indices.len()); for vnode in upstream_table.vnodes().iter_vnodes() { - backfill_state.update_progress(vnode, BackfillProgressPerVnode::InProgress(finished_state.clone())); + backfill_state.update_progress( + vnode, + BackfillProgressPerVnode::InProgress(finished_state.clone()), + ); } } @@ -471,9 +477,11 @@ where &backfill_state, &mut committed_progress, &mut temporary_state, - ).await?; + ) + .await?; - self.progress.finish(barrier.epoch.curr, total_snapshot_processed_rows); + self.progress + .finish(barrier.epoch.curr, total_snapshot_processed_rows); yield msg; break; } diff --git a/src/stream/src/executor/backfill/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc_backfill.rs index c17aad1d2d62..e3121e1bd0a7 100644 --- a/src/stream/src/executor/backfill/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc_backfill.rs @@ -509,35 +509,31 @@ impl CdcBackfillExecutor { .await?; if let Some(SplitImpl::MysqlCdc(split)) = cdc_split.as_mut() - && let Some(s) = split.mysql_split.as_mut() { - let start_offset = - last_binlog_offset.as_ref().map(|cdc_offset| { - let source_offset = - if let CdcOffset::MySql(o) = cdc_offset - { - DebeziumSourceOffset { - file: Some(o.filename.clone()), - pos: Some(o.position), - ..Default::default() - } - } else { - DebeziumSourceOffset::default() - }; - - let mut server = "RW_CDC_".to_string(); - server.push_str( - upstream_table_id.to_string().as_str(), - ); - DebeziumOffset { - source_partition: hashmap! { - "server".to_string() => server - }, - source_offset, - // upstream heartbeat event would not emit to the cdc backfill executor, - // since we don't parse heartbeat event in the source parser. - is_heartbeat: false, + && let Some(s) = split.mysql_split.as_mut() + { + let start_offset = last_binlog_offset.as_ref().map(|cdc_offset| { + let source_offset = if let CdcOffset::MySql(o) = cdc_offset { + DebeziumSourceOffset { + file: Some(o.filename.clone()), + pos: Some(o.position), + ..Default::default() } - }); + } else { + DebeziumSourceOffset::default() + }; + + let mut server = "RW_CDC_".to_string(); + server.push_str(upstream_table_id.to_string().as_str()); + DebeziumOffset { + source_partition: hashmap! { + "server".to_string() => server + }, + source_offset, + // upstream heartbeat event would not emit to the cdc backfill executor, + // since we don't parse heartbeat event in the source parser. + is_heartbeat: false, + } + }); // persist the last binlog offset into split state s.inner.start_offset = start_offset.map(|o| { diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 8d02cc328fa4..2a321dcc2b64 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -384,7 +384,9 @@ impl HashAggExecutor { .zip_eq_fast(&mut this.storages) .zip_eq_fast(call_visibilities.iter()) { - if let AggStateStorage::MaterializedInput { table, mapping } = storage && !call.distinct { + if let AggStateStorage::MaterializedInput { table, mapping } = storage + && !call.distinct + { let chunk = chunk.project_with_vis(mapping.upstream_columns(), visibility.clone()); table.write_chunk(chunk); } @@ -413,8 +415,11 @@ impl HashAggExecutor { .zip_eq_fast(&mut this.storages) .zip_eq_fast(visibilities.iter()) { - if let AggStateStorage::MaterializedInput { table, mapping } = storage && call.distinct { - let chunk = chunk.project_with_vis(mapping.upstream_columns(), visibility.clone()); + if let AggStateStorage::MaterializedInput { table, mapping } = storage + && call.distinct + { + let chunk = + chunk.project_with_vis(mapping.upstream_columns(), visibility.clone()); table.write_chunk(chunk); } } diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 75414fe24a37..93f1734a3ec0 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -912,7 +912,8 @@ impl HashJoinExecutor input_watermark.val = value.unwrap(), diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 9e66835b54b0..c9717f9defe6 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -389,7 +389,9 @@ impl OverWindowExecutor { } // Update recently accessed range for later shrinking cache. - if !this.cache_policy.is_full() && let Some(accessed_range) = accessed_range { + if !this.cache_policy.is_full() + && let Some(accessed_range) = accessed_range + { match vars.recently_accessed_ranges.entry(part_key) { btree_map::Entry::Vacant(vacant) => { vacant.insert(accessed_range); diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index ff3214db88ea..e1000122af24 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -189,11 +189,15 @@ impl Inner { // for each column for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) { *value = match item { - Either::Left(state) => if let Some((i, value)) = state.peek() && i == row_idx { - valid = true; - value - } else { - None + Either::Left(state) => { + if let Some((i, value)) = state.peek() + && i == row_idx + { + valid = true; + value + } else { + None + } } Either::Right(array) => array.value_at(row_idx), }; @@ -211,7 +215,9 @@ impl Inner { } // move to the next row for item in &mut results { - if let Either::Left(state) = item && matches!(state.peek(), Some((i, _)) if i == row_idx) { + if let Either::Left(state) = item + && matches!(state.peek(), Some((i, _)) if i == row_idx) + { state.next().await?; } } diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index d742e72a4c7a..170535042696 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -226,10 +226,13 @@ impl SourceStateTableHandler { Some(row) => match row.datum_at(1) { Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { let mut split_impl = SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())?; - if let SplitImpl::MysqlCdc(ref mut split) = split_impl && let Some(mysql_split) = split.mysql_split.as_mut() { + if let SplitImpl::MysqlCdc(ref mut split) = split_impl + && let Some(mysql_split) = split.mysql_split.as_mut() + { // if the snapshot_done is not set, we should check whether the backfill is finished if !mysql_split.inner.snapshot_done { - mysql_split.inner.snapshot_done = self.recover_cdc_snapshot_state(split_id).await?; + mysql_split.inner.snapshot_done = + self.recover_cdc_snapshot_state(split_id).await?; } } Some(split_impl) diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 82c1e5664967..ddfcfd6b8e04 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -444,7 +444,8 @@ impl TemporalJoinExecutor }; if key.null_bitmap().is_subset(&null_matched) && let join_entry = self.right_table.lookup(&key, epoch).await? - && !join_entry.is_empty() { + && !join_entry.is_empty() + { for right_row in join_entry.cached.values() { // check join condition let ok = if let Some(ref mut cond) = self.condition { @@ -458,7 +459,8 @@ impl TemporalJoinExecutor }; if ok { - if let Some(chunk) = builder.append_row(op, left_row, right_row) { + if let Some(chunk) = builder.append_row(op, left_row, right_row) + { yield Message::Chunk(chunk); } } diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index b8275eba52b1..a1b7e26e8ae3 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -232,7 +232,9 @@ impl TopNCache { return; } // For direct insert, we need to check if the key is smaller than the largest key - if let Some(high_last) = self.high.last_key_value() && cache_key <= *high_last.0 { + if let Some(high_last) = self.high.last_key_value() + && cache_key <= *high_last.0 + { debug_assert!(cache_key != *high_last.0, "cache_key should be unique"); self.high.insert(cache_key, row); } @@ -260,15 +262,16 @@ impl TopNCacheTrait for TopNCache { self.low.insert(cache_key, (&row).into()); return; } - let elem_to_compare_with_middle = - if let Some(low_last) = self.low.last_entry() && cache_key <= *low_last.key() { - // Take the last element of `cache.low` and insert input row to it. - let low_last = low_last.remove_entry(); - self.low.insert(cache_key, (&row).into()); - low_last - } else { - (cache_key, (&row).into()) - }; + let elem_to_compare_with_middle = if let Some(low_last) = self.low.last_entry() + && cache_key <= *low_last.key() + { + // Take the last element of `cache.low` and insert input row to it. + let low_last = low_last.remove_entry(); + self.low.insert(cache_key, (&row).into()); + low_last + } else { + (cache_key, (&row).into()) + }; if !self.is_middle_cache_full() { self.middle.insert( @@ -586,15 +589,16 @@ impl AppendOnlyTopNCacheTrait for TopNCache { return Ok(()); } - let elem_to_insert_into_middle = - if let Some(low_last) = self.low.last_entry() && &cache_key <= low_last.key() { - // Take the last element of `cache.low` and insert input row to it. - let low_last = low_last.remove_entry(); - self.low.insert(cache_key, row_ref.into()); - low_last - } else { - (cache_key, row_ref.into()) - }; + let elem_to_insert_into_middle = if let Some(low_last) = self.low.last_entry() + && &cache_key <= low_last.key() + { + // Take the last element of `cache.low` and insert input row to it. + let low_last = low_last.remove_entry(); + self.low.insert(cache_key, row_ref.into()); + low_last + } else { + (cache_key, row_ref.into()) + }; if !self.is_middle_cache_full() { self.middle.insert( diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 841e7f5bb50d..7214eb91064b 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -136,7 +136,9 @@ impl ManagedTopNState { while let Some(item) = state_table_iter.next().await { // Note(bugen): should first compare with start key before constructing TopNStateRow. let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); - if let Some(start_key) = start_key.as_ref() && &topn_row.cache_key <= start_key { + if let Some(start_key) = start_key.as_ref() + && &topn_row.cache_key <= start_key + { continue; } // let row= &topn_row.row; @@ -225,7 +227,9 @@ impl ManagedTopNState { topn_cache.high_capacity > 0, "topn cache high_capacity should always > 0" ); - while !topn_cache.is_high_cache_full() && let Some(item) = state_table_iter.next().await { + while !topn_cache.is_high_cache_full() + && let Some(item) = state_table_iter.next().await + { let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); topn_cache .high diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 5e5454cecff9..c8899553ac46 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -217,7 +217,9 @@ impl WatermarkFilterExecutor { if watermark.col_idx == event_time_col_idx { tracing::warn!("WatermarkFilterExecutor received a watermark on the event it is filtering."); let watermark = watermark.val; - if let Some(cur_watermark) = current_watermark.clone() && cur_watermark.default_cmp(&watermark).is_lt() { + if let Some(cur_watermark) = current_watermark.clone() + && cur_watermark.default_cmp(&watermark).is_lt() + { current_watermark = Some(watermark.clone()); idle_input = false; yield Message::Watermark(Watermark::new( @@ -267,7 +269,10 @@ impl WatermarkFilterExecutor { let global_max_watermark = Self::get_global_max_watermark(&table).await?; - current_watermark = if let Some(global_max_watermark) = global_max_watermark.clone() && let Some(watermark) = current_watermark.clone(){ + current_watermark = if let Some(global_max_watermark) = + global_max_watermark.clone() + && let Some(watermark) = current_watermark.clone() + { Some(cmp::max_by( watermark, global_max_watermark, diff --git a/src/stream/src/executor/wrapper/epoch_check.rs b/src/stream/src/executor/wrapper/epoch_check.rs index 3b2975d08366..fb3a24b3dda2 100644 --- a/src/stream/src/executor/wrapper/epoch_check.rs +++ b/src/stream/src/executor/wrapper/epoch_check.rs @@ -47,7 +47,9 @@ pub async fn epoch_check(info: Arc, input: impl MessageStream) { ); } - if let Some(last_epoch) = last_epoch && !b.is_with_stop_mutation() { + if let Some(last_epoch) = last_epoch + && !b.is_with_stop_mutation() + { assert_eq!( b.epoch.prev, last_epoch, diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index b87ce5ff39dc..b105837d9b09 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -160,8 +160,11 @@ impl ExecutorBuilder for SourceExecutorBuilder { ); let table_type = ExternalTableType::from_properties(&source.properties); - if table_type.can_backfill() && let Some(table_desc) = source_info.upstream_table.clone() { - let upstream_table_name = SchemaTableName::from_properties(&source.properties); + if table_type.can_backfill() + && let Some(table_desc) = source_info.upstream_table.clone() + { + let upstream_table_name = + SchemaTableName::from_properties(&source.properties); let table_pk_indices = table_desc .pk .iter() @@ -173,7 +176,8 @@ impl ExecutorBuilder for SourceExecutorBuilder { .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap())) .collect_vec(); - let table_reader = table_type.create_table_reader(source.properties.clone(), schema.clone())?; + let table_reader = table_type + .create_table_reader(source.properties.clone(), schema.clone())?; let external_table = ExternalStorageTable::new( TableId::new(source.source_id), upstream_table_name, @@ -188,18 +192,19 @@ impl ExecutorBuilder for SourceExecutorBuilder { let source_state_handler = SourceStateTableHandler::from_table_catalog( source.state_table.as_ref().unwrap(), store.clone(), - ).await; + ) + .await; let cdc_backfill = CdcBackfillExecutor::new( params.actor_context.clone(), external_table, Box::new(source_exec), - (0..source.columns.len()).collect_vec(), // eliminate the last column (_rw_offset) + (0..source.columns.len()).collect_vec(), /* eliminate the last column (_rw_offset) */ None, schema.clone(), params.pk_indices, params.executor_stats, source_state_handler, - source_ctrl_opts.chunk_size + source_ctrl_opts.chunk_size, ); cdc_backfill.boxed() } else { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index f54eb9921f77..4386413029ae 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -821,7 +821,9 @@ impl LocalStreamManagerCore { let mut actor_infos = self.context.actor_infos.write(); for actor in new_actor_infos { let ret = actor_infos.insert(actor.get_actor_id(), actor.clone()); - if let Some(prev_actor) = ret && actor != &prev_actor { + if let Some(prev_actor) = ret + && actor != &prev_actor + { bail!( "actor info mismatch when broadcasting {}", actor.get_actor_id() diff --git a/src/tests/regress/src/schedule.rs b/src/tests/regress/src/schedule.rs index 357cab473d13..c33cf5e91ed5 100644 --- a/src/tests/regress/src/schedule.rs +++ b/src/tests/regress/src/schedule.rs @@ -329,12 +329,16 @@ impl TestCase { // Find the matching output line, and collect lines before the next matching line. let mut expected_output = vec![]; - while let Some(line) = expected_lines.next() && line != original_input_line { + while let Some(line) = expected_lines.next() + && line != original_input_line + { expected_output.push(line); } let mut actual_output = vec![]; - while let Some(line) = actual_lines.next() && line != input_line { + while let Some(line) = actual_lines.next() + && line != input_line + { actual_output.push(line); } @@ -371,7 +375,9 @@ fn compare_output(query: &[&str], expected: &[String], actual: &[String]) -> boo eq }; - if let Some(l) = query.last() && l.starts_with(PREFIX_IGNORE) { + if let Some(l) = query.last() + && l.starts_with(PREFIX_IGNORE) + { return true; } if !expected.is_empty() diff --git a/src/tests/sqlsmith/src/runner.rs b/src/tests/sqlsmith/src/runner.rs index 5efc793cdd95..cebd50f7e008 100644 --- a/src/tests/sqlsmith/src/runner.rs +++ b/src/tests/sqlsmith/src/runner.rs @@ -473,7 +473,9 @@ fn validate_response( Ok(rows) => Ok((0, rows)), Err(e) => { // Permit runtime errors conservatively. - if let Some(e) = e.as_db_error() && is_permissible_error(&e.to_string()) { + if let Some(e) = e.as_db_error() + && is_permissible_error(&e.to_string()) + { tracing::info!("[SKIPPED ERROR]: {:#?}", e); return Ok((1, vec![])); } @@ -509,16 +511,20 @@ async fn run_query_inner( ), }; if let Err(e) = &response - && let Some(e) = e.as_db_error() { + && let Some(e) = e.as_db_error() + { if is_recovery_in_progress_error(&e.to_string()) { let tries = 5; let interval = 1; - for _ in 0..tries { // retry 5 times + for _ in 0..tries { + // retry 5 times sleep(Duration::from_secs(interval)).await; let query_task = client.simple_query(query); let response = timeout(Duration::from_secs(timeout_duration), query_task).await; match response { - Ok(Ok(r)) => { return Ok((0, r)); } + Ok(Ok(r)) => { + return Ok((0, r)); + } Err(_) => bail!( "[UNEXPECTED ERROR] Query timeout after {timeout_duration}s:\n{:?}", query diff --git a/src/tests/sqlsmith/tests/frontend/mod.rs b/src/tests/sqlsmith/tests/frontend/mod.rs index a0ab1d59cf58..8f681ab38a95 100644 --- a/src/tests/sqlsmith/tests/frontend/mod.rs +++ b/src/tests/sqlsmith/tests/frontend/mod.rs @@ -149,7 +149,9 @@ async fn test_stream_query( setup_sql: &str, ) -> Result<()> { let mut rng; - if let Ok(x) = env::var("RW_RANDOM_SEED_SQLSMITH") && x == "true" { + if let Ok(x) = env::var("RW_RANDOM_SEED_SQLSMITH") + && x == "true" + { rng = SmallRng::from_entropy(); } else { rng = SmallRng::seed_from_u64(seed); @@ -205,7 +207,9 @@ fn test_batch_query( setup_sql: &str, ) -> Result<()> { let mut rng; - if let Ok(x) = env::var("RW_RANDOM_SEED_SQLSMITH") && x == "true" { + if let Ok(x) = env::var("RW_RANDOM_SEED_SQLSMITH") + && x == "true" + { rng = SmallRng::from_entropy(); } else { rng = SmallRng::seed_from_u64(seed); @@ -248,7 +252,9 @@ async fn setup_sqlsmith_with_seed_inner(seed: u64) -> Result { let session = frontend.session_ref(); let mut rng; - if let Ok(x) = env::var("RW_RANDOM_SEED_SQLSMITH") && x == "true" { + if let Ok(x) = env::var("RW_RANDOM_SEED_SQLSMITH") + && x == "true" + { rng = SmallRng::from_entropy(); } else { rng = SmallRng::seed_from_u64(seed); @@ -266,7 +272,9 @@ async fn setup_sqlsmith_with_seed_inner(seed: u64) -> Result { /// Otherwise no error: skip status: false. fn validate_result(result: Result) -> Result { if let Err(e) = result { - if let Some(s) = e.message() && is_permissible_error(s) { + if let Some(s) = e.message() + && is_permissible_error(s) + { return Ok(true); } else { return Err(e); diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 916dd93d7a32..c4fe90c29408 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -188,7 +188,9 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { } // Overrides from env var. - if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV) && !rust_log.is_empty() { + if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV) + && !rust_log.is_empty() + { let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`"); if let Some(default_level) = rust_log_targets.default_level() { filter = filter.with_default(default_level); diff --git a/src/utils/runtime/src/panic_hook.rs b/src/utils/runtime/src/panic_hook.rs index 848e7df8509c..0a43fac48f19 100644 --- a/src/utils/runtime/src/panic_hook.rs +++ b/src/utils/runtime/src/panic_hook.rs @@ -15,7 +15,9 @@ /// Set panic hook to abort the process if we're not catching unwind, without losing the information /// of stack trace and await-tree. pub fn set_panic_hook() { - if let Ok(limit) = rlimit::Resource::CORE.get_soft() && limit > 0 { + if let Ok(limit) = rlimit::Resource::CORE.get_soft() + && limit > 0 + { tracing::info!(limit, "coredump on panic is likely to be enabled"); };