diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index e8463813e9f00..f6440c74430a6 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -54,7 +54,7 @@ use crate::task::{BatchTaskContext, ShutdownToken}; /// 2. Iterate over the probe side (i.e. left table) and compute the hash value of each row. /// Then find the matched build side row for each probe side row in the hash map. /// 3. Concatenate the matched pair of probe side row and build side row into a single row and push -/// it into the data chunk builder. +/// it into the data chunk builder. /// 4. Yield chunks from the builder. pub struct HashJoinExecutor { /// Join type e.g. inner, left outer, ... @@ -1669,7 +1669,7 @@ impl HashJoinExecutor { /// | 4 | 3 | 3 | - | /// /// 3. Remove duplicate rows with NULL build side. This is done by setting the visibility bitmap - /// of the chunk. + /// of the chunk. /// /// | offset | v1 | v2 | v3 | /// |---|---|---|---| diff --git a/src/batch/src/executor/test_utils.rs b/src/batch/src/executor/test_utils.rs index 0deee6da0a5bc..1d949d6e0e06a 100644 --- a/src/batch/src/executor/test_utils.rs +++ b/src/batch/src/executor/test_utils.rs @@ -75,7 +75,7 @@ pub fn gen_sorted_data( let mut array_builder = DataType::Int64.create_array_builder(batch_size); for _ in 0..batch_size { - array_builder.append(&data_gen.generate_datum(0)); + array_builder.append(data_gen.generate_datum(0)); } let array = array_builder.finish(); @@ -102,7 +102,7 @@ pub fn gen_projected_data( let mut array_builder = DataType::Int64.create_array_builder(batch_size); for j in 0..batch_size { - array_builder.append(&data_gen.generate_datum(((i + 1) * (j + 1)) as u64)); + array_builder.append(data_gen.generate_datum(((i + 1) * (j + 1)) as u64)); } let chunk = DataChunk::new(vec![array_builder.finish().into()], batch_size); diff --git a/src/common/metrics/src/relabeled_metric.rs b/src/common/metrics/src/relabeled_metric.rs index 3ccbcb201c5c7..8c0582f15f875 100644 --- a/src/common/metrics/src/relabeled_metric.rs +++ b/src/common/metrics/src/relabeled_metric.rs @@ -24,8 +24,9 @@ use crate::{ /// - when `metric_level` <= `relabel_threshold`, they behaves exactly the same as their inner /// metric. /// - when `metric_level` > `relabel_threshold`, all their input label values are rewrite to "" when -/// calling `with_label_values`. That's means the metric vec is aggregated into a single metric. - +/// calling `with_label_values`. That's means the metric vec is aggregated into a single metric. +/// +/// /// These wrapper classes add a `metric_level` field to corresponding metric. /// We could have use one single struct to represent all `MetricVec`, rather /// than specializing them one by one. However, that's undoable because prometheus crate doesn't diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index 7fb908ecc138f..4e08163817e23 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -804,7 +804,7 @@ impl DataChunkTestExt for DataChunk { let arr = col; let mut builder = arr.create_builder(n * 2); for v in arr.iter() { - builder.append(&v.to_owned_datum()); + builder.append(v.to_owned_datum()); builder.append_null(); } diff --git a/src/common/src/cache.rs b/src/common/src/cache.rs index bb4af1d615eb9..e86ef432eea95 100644 --- a/src/common/src/cache.rs +++ b/src/common/src/cache.rs @@ -1033,6 +1033,7 @@ mod tests { pub struct Block { pub offset: u64, + #[allow(dead_code)] pub sst: u64, } diff --git a/src/common/src/session_config/search_path.rs b/src/common/src/session_config/search_path.rs index 4573294ab24c4..7a99f07833b96 100644 --- a/src/common/src/session_config/search_path.rs +++ b/src/common/src/session_config/search_path.rs @@ -23,12 +23,12 @@ pub const USER_NAME_WILD_CARD: &str = "\"$user\""; /// see /// /// 1. when we `select` or `drop` object and don't give a specified schema, it will search the -/// object from the valid items in schema `rw_catalog`, `pg_catalog` and `search_path`. If schema -/// `rw_catalog` and `pg_catalog` are not in `search_path`, we will search them firstly. If they're -/// in `search_path`, we will follow the order in `search_path`. +/// object from the valid items in schema `rw_catalog`, `pg_catalog` and `search_path`. If schema +/// `rw_catalog` and `pg_catalog` are not in `search_path`, we will search them firstly. If they're +/// in `search_path`, we will follow the order in `search_path`. /// /// 2. when we `create` a `source` or `mv` and don't give a specified schema, it will use the first -/// valid schema in `search_path`. +/// valid schema in `search_path`. /// /// 3. when we `create` a `index` or `sink`, it will use the schema of the associated table. #[derive(Clone, Debug, PartialEq)] diff --git a/src/common/src/util/chunk_coalesce.rs b/src/common/src/util/chunk_coalesce.rs index 5cdeb8026a0f1..94ca4cd783449 100644 --- a/src/common/src/util/chunk_coalesce.rs +++ b/src/common/src/util/chunk_coalesce.rs @@ -459,13 +459,13 @@ mod tests { let mut left_array_builder = DataType::Int32.create_array_builder(5); for v in [1, 2, 3, 4, 5] { - left_array_builder.append(&Some(ScalarImpl::Int32(v))); + left_array_builder.append(Some(ScalarImpl::Int32(v))); } let left_arrays = [left_array_builder.finish()]; let mut right_array_builder = DataType::Int64.create_array_builder(5); for v in [5, 4, 3, 2, 1] { - right_array_builder.append(&Some(ScalarImpl::Int64(v))); + right_array_builder.append(Some(ScalarImpl::Int64(v))); } let right_arrays = [right_array_builder.finish()]; diff --git a/src/common/src/util/memcmp_encoding.rs b/src/common/src/util/memcmp_encoding.rs index 7f262d0ff8d2c..5a5ad598093af 100644 --- a/src/common/src/util/memcmp_encoding.rs +++ b/src/common/src/util/memcmp_encoding.rs @@ -543,7 +543,7 @@ mod tests { use rand::seq::SliceRandom; fn serialize(f: F32) -> MemcmpEncoded { - encode_value(&Some(ScalarImpl::from(f)), OrderType::default()).unwrap() + encode_value(Some(ScalarImpl::from(f)), OrderType::default()).unwrap() } fn deserialize(data: MemcmpEncoded) -> F32 { diff --git a/src/connector/src/schema/schema_registry/util.rs b/src/connector/src/schema/schema_registry/util.rs index 0d43f33baa31c..44b7a350e6823 100644 --- a/src/connector/src/schema/schema_registry/util.rs +++ b/src/connector/src/schema/schema_registry/util.rs @@ -150,6 +150,7 @@ pub struct Subject { #[derive(Debug, Deserialize)] pub struct SchemaReference { /// The name of the reference. + #[allow(dead_code)] pub name: String, /// The subject that the referenced schema belongs to pub subject: String, diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index a9b78fad2572e..35b48c6e31faf 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; -use std::usize; use anyhow::{anyhow, Context}; use aws_sdk_dynamodb as dynamodb; diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index a0515bb7bc165..40d85625c5b1d 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -58,7 +58,7 @@ pub trait RowEncoder { /// * an json object /// * a protobuf message /// * an avro record -/// into +/// into /// * string (required by kinesis key) /// * bytes /// diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index 5f8c22948d849..a0e4d41dc58de 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -205,6 +205,7 @@ impl MaybeData for () { /// * Top level is always a message. /// * All message fields can be omitted in proto3. /// * All repeated elements must have a value. +/// /// So we handle [`ScalarRefImpl`] rather than [`DatumRef`] here. impl MaybeData for ScalarRefImpl<'_> { type Out = Value; diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 9fd1a5cb6f2be..6239a55d862f4 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -45,9 +45,9 @@ pub trait SinkFormatter { type V; /// * Key may be None so that messages are partitioned using round-robin. - /// For example append-only without `primary_key` (aka `downstream_pk`) set. + /// For example append-only without `primary_key` (aka `downstream_pk`) set. /// * Value may be None so that messages with same key are removed during log compaction. - /// For example debezium tombstone event. + /// For example debezium tombstone event. fn format_chunk( &self, chunk: &StreamChunk, diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index 498f15e3f3fb2..9f250b6692cd2 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::BTreeMap; -use std::usize; use anyhow::{anyhow, Context}; use google_cloud_gax::conn::Environment; diff --git a/src/connector/src/source/pulsar/topic.rs b/src/connector/src/source/pulsar/topic.rs index 352c7e47d8da2..f4cb95bd01437 100644 --- a/src/connector/src/source/pulsar/topic.rs +++ b/src/connector/src/source/pulsar/topic.rs @@ -108,6 +108,7 @@ pub fn get_partition_index(topic: &str) -> Result> { /// The short topic name can be: /// - `` /// - `//` +/// /// The fully qualified topic name can be: /// `:////` pub fn parse_topic(topic: &str) -> Result { diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 02dbc6a28bc10..2876ec4aa9447 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -107,6 +107,7 @@ pub enum BindingCteState { pub struct RecursiveUnion { /// currently this *must* be true, /// otherwise binding will fail. + #[allow(dead_code)] pub all: bool, /// lhs part of the `UNION ALL` operator pub base: Box, diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 8d72aa7e3d286..cf12417334612 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -261,17 +261,17 @@ impl UdfContext { /// following the rules: /// 1. At the beginning, it contains the user specified parameters type. /// 2. When the binder encounters a parameter, it will record it as unknown(call `record_new_param`) -/// if it didn't exist in `ParameterTypes`. +/// if it didn't exist in `ParameterTypes`. /// 3. When the binder encounters a cast on parameter, if it's a unknown type, the cast function -/// will record the target type as infer type for that parameter(call `record_infer_type`). If the -/// parameter has been inferred, the cast function will act as a normal cast. +/// will record the target type as infer type for that parameter(call `record_infer_type`). If the +/// parameter has been inferred, the cast function will act as a normal cast. /// 4. After bind finished: /// (a) parameter not in `ParameterTypes` means that the user didn't specify it and it didn't -/// occur in the query. `export` will return error if there is a kind of -/// parameter. This rule is compatible with PostgreSQL +/// occur in the query. `export` will return error if there is a kind of +/// parameter. This rule is compatible with PostgreSQL /// (b) parameter is None means that it's a unknown type. The user didn't specify it -/// and we can't infer it in the query. We will treat it as VARCHAR type finally. This rule is -/// compatible with PostgreSQL. +/// and we can't infer it in the query. We will treat it as VARCHAR type finally. This rule is +/// compatible with PostgreSQL. /// (c) parameter is Some means that it's a known type. #[derive(Clone, Debug)] pub struct ParameterTypes(Arc>>>); diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index d66e01bac1dc1..bfe4537fa7085 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -60,7 +60,7 @@ use crate::user::UserId; /// - **Order Key**: the primary key for storage, used to sort and access data. /// /// For an MV, the columns in `ORDER BY` clause will be put at the beginning of the order key. And -/// the remaining columns in pk will follow behind. +/// the remaining columns in pk will follow behind. /// /// If there's no `ORDER BY` clause, the order key will be the same as pk. /// diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index a30b78d44e00c..a5ae569456336 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -406,7 +406,7 @@ impl LogicalMultiJoin { /// a. a filter with the non eq conditions /// b. a projection which reorders the output column ordering to agree with the /// original ordering of the joins. - /// The filter will then be pushed down by another filter pushdown pass. + /// The filter will then be pushed down by another filter pushdown pass. pub(crate) fn heuristic_ordering(&self) -> Result> { let mut labeller = ConnectedComponentLabeller::new(self.inputs.len()); @@ -494,9 +494,9 @@ impl LogicalMultiJoin { /// 2. Second, for every isolated node will create connection to every other nodes. /// 3. Third, select and merge one node for a iteration, and use a bfs policy for which node the /// selected node merged with. - /// i. The select node mentioned above is the node with least number of relations and the + /// i. The select node mentioned above is the node with least number of relations and the /// lowerst join tree. - /// ii. nodes with a join tree higher than the temporal optimal join tree will be pruned. + /// ii. nodes with a join tree higher than the temporal optimal join tree will be pruned. pub fn as_bushy_tree_join(&self) -> Result { let (nodes, condition) = self.get_join_graph()?; diff --git a/src/frontend/src/optimizer/plan_node/predicate_pushdown.rs b/src/frontend/src/optimizer/plan_node/predicate_pushdown.rs index af56ae3f76582..b7c429f4f6399 100644 --- a/src/frontend/src/optimizer/plan_node/predicate_pushdown.rs +++ b/src/frontend/src/optimizer/plan_node/predicate_pushdown.rs @@ -29,14 +29,14 @@ pub trait PredicatePushdown { /// There are three kinds of predicates: /// /// 1. those can't be pushed down. We just create a `LogicalFilter` for them above the current - /// `PlanNode`. i.e., + /// `PlanNode`. i.e., /// /// ```ignore /// LogicalFilter::create(self.clone().into(), predicate) /// ``` /// /// 2. those can be merged with current `PlanNode` (e.g., `LogicalJoin`). We just merge - /// the predicates with the `Condition` of it. + /// the predicates with the `Condition` of it. /// /// 3. those can be pushed down. We pass them to current `PlanNode`'s input. fn predicate_pushdown( diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 458707910c089..35991349f17cc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -147,6 +147,7 @@ impl StreamTableScan { /// | 1002 | Int64(1) | t | 10 | /// | 1003 | Int64(1) | t | 10 | /// | 1003 | Int64(1) | t | 10 | + /// /// Eventually we should track progress per vnode, to support scaling with both mview and /// the corresponding `no_shuffle_backfill`. /// However this is not high priority, since we are working on supporting arrangement backfill, diff --git a/src/frontend/src/optimizer/rule/logical_filter_expression_simplify_rule.rs b/src/frontend/src/optimizer/rule/logical_filter_expression_simplify_rule.rs index ebc724c200481..67f7e40ed3455 100644 --- a/src/frontend/src/optimizer/rule/logical_filter_expression_simplify_rule.rs +++ b/src/frontend/src/optimizer/rule/logical_filter_expression_simplify_rule.rs @@ -33,6 +33,7 @@ impl Rule for LogicalFilterExpressionSimplifyRule { /// The pattern we aim to optimize, e.g., /// 1. (NOT (e)) OR (e) => True /// 2. (NOT (e)) AND (e) => False + /// /// NOTE: `e` should only contain at most a single column /// otherwise we will not conduct the optimization fn apply(&self, plan: PlanRef) -> Option { @@ -153,6 +154,7 @@ fn check_optimizable_pattern(e1: ExprImpl, e2: ExprImpl) -> (bool, Option True /// 2. False and (...) | (...) and False => False +/// /// NOTE: the `True` and `False` here not only represent a single `ExprImpl::Literal` /// but represent every `ExprImpl` that can be *evaluated* to `ScalarImpl::Bool` /// during optimization phase as well diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index c727d2e61b1ff..8b7e07a0aefcd 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -94,7 +94,7 @@ pub enum StageEvent { reason: SchedulerError, }, /// All tasks in stage finished. - Completed(StageId), + Completed(#[allow(dead_code)] StageId), } #[derive(Clone)] diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs index 860981762cde4..84bbce4dc1ee3 100644 --- a/src/meta/src/controller/rename.rs +++ b/src/meta/src/controller/rename.rs @@ -170,7 +170,7 @@ impl QueryRewriter<'_> { /// /// So that we DON'T have to: /// 1. rewrite the select and expr part like `schema.table.column`, `table.column`, - /// `alias.column` etc. + /// `alias.column` etc. /// 2. handle the case that the old name is used as alias. /// 3. handle the case that the new name is used as alias. fn visit_table_factor(&self, table_factor: &mut TableFactor) { diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 7e33123272684..f6dc46c99106c 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -97,18 +97,3 @@ pub trait CompactionPicker { stats: &mut LocalPickerStatistic, ) -> Option; } - -#[derive(Default, Clone, Debug)] -pub struct PartitionLevelInfo { - pub level_id: u32, - pub sub_level_id: u64, - pub left_idx: usize, - pub right_idx: usize, - pub total_file_size: u64, -} - -#[derive(Default, Clone, Debug)] -pub struct LevelPartition { - pub sub_levels: Vec, - pub total_file_size: u64, -} diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index fcbac63817fff..d9b5a481f420a 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -111,12 +111,12 @@ impl Compactor { /// `CompactTaskAssignment`. /// /// A compact task can be in one of these states: -/// - 1. Success: an assigned task is reported as success via `CompactStatus::report_compact_task`. -/// It's the final state. -/// - 2. Failed: an Failed task is reported as success via `CompactStatus::report_compact_task`. -/// It's the final state. -/// - 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's -/// the final state. +/// 1. Success: an assigned task is reported as success via `CompactStatus::report_compact_task`. +/// It's the final state. +/// 2. Failed: an Failed task is reported as success via `CompactStatus::report_compact_task`. +/// It's the final state. +/// 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's +/// the final state. pub struct CompactorManagerInner { pub task_expired_seconds: u64, pub heartbeat_expired_seconds: u64, diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 363055316bac7..5f5150b7777cb 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -173,7 +173,7 @@ impl HummockManager { /// Starts a full GC. /// 1. Meta node sends a `FullScanTask` to a compactor in this method. /// 2. The compactor returns scan result of object store to meta node. See - /// `HummockManager::full_scan_inner` in storage crate. + /// `HummockManager::full_scan_inner` in storage crate. /// 3. Meta node decides which SSTs to delete. See `HummockManager::complete_full_gc`. /// /// Returns Ok(false) if there is no worker available. diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index de9c637df9dda..3aeab64bef128 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -187,6 +187,7 @@ impl CatalogManager { /// We identify a 'legacy' source based on two conditions: /// 1. The `format_encode_options` in `source_info` is empty. /// 2. Keys with certain prefixes belonging to `format_encode_options` exist in `with_properties` instead. + /// /// And if the source is identified as 'legacy', we copy the misplaced keys from `with_properties` to `format_encode_options`. async fn source_backward_compat_check(&self) -> MetaResult<()> { let core = &mut *self.core.lock().await; @@ -897,6 +898,7 @@ impl CatalogManager { /// with: /// 1. `stream_job_status` = CREATING /// 2. Not belonging to a background stream job. + /// /// Clean up these hanging tables by the id. pub async fn clean_dirty_tables(&self, fragment_manager: FragmentManagerRef) -> MetaResult<()> { let core = &mut *self.core.lock().await; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index af7218219afc4..0b0c2ed2a3ced 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -192,7 +192,7 @@ pub struct MetaOpts { /// The Dashboard service uses this in the following ways: /// 1. Query Prometheus for relevant metrics to find Stream Graph Bottleneck, and display it. /// 2. Provide cluster diagnostics, at `/api/monitor/diagnose` to troubleshoot cluster. - /// These are just examples which show how the Meta Dashboard Service queries Prometheus. + /// These are just examples which show how the Meta Dashboard Service queries Prometheus. pub prometheus_endpoint: Option, /// The additional selector used when querying Prometheus. @@ -251,7 +251,7 @@ pub struct MetaOpts { /// When `hybrid_partition_vnode_count` > 0, in hybrid compaction group /// - Tables with high write throughput will be split at vnode granularity /// - Tables with high size tables will be split by table granularity - /// When `hybrid_partition_vnode_count` = 0,no longer be special alignment operations for the hybird compaction group + /// When `hybrid_partition_vnode_count` = 0,no longer be special alignment operations for the hybird compaction group pub hybrid_partition_node_count: u32, pub event_log_enabled: bool, diff --git a/src/meta/src/storage/transaction.rs b/src/meta/src/storage/transaction.rs index 9a9df92b5f833..daac28f276049 100644 --- a/src/meta/src/storage/transaction.rs +++ b/src/meta/src/storage/transaction.rs @@ -17,9 +17,9 @@ use crate::storage::{ColumnFamily, Key, Value}; /// A `Transaction` executes several writes(aka. operations) to meta store atomically with optional /// preconditions checked. It executes as follow: /// 1. If all `preconditions` are valid, all `operations` are executed; Otherwise no operation -/// is executed. +/// is executed. /// 2. Upon `commit` the transaction, the `TransactionAbort` error will be returned if -/// any precondition was not met in previous step. +/// any precondition was not met in previous step. #[derive(Default)] pub struct Transaction { preconditions: Vec, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d3bd0683674fd..0b7a711b38643 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -237,19 +237,19 @@ impl RescheduleContext { /// The specific process is as follows /// /// 1. Calculate the number of target actors, and calculate the average value and the remainder, and -/// use the average value as expected. +/// use the average value as expected. /// /// 2. Filter out the actor to be removed and the actor to be retained, and sort them from largest -/// to smallest (according to the number of virtual nodes held). +/// to smallest (according to the number of virtual nodes held). /// /// 3. Calculate their balance, 1) For the actors to be removed, the number of virtual nodes per -/// actor is the balance. 2) For retained actors, the number of virtual nodes - expected is the -/// balance. 3) For newly created actors, -expected is the balance (always negative). +/// actor is the balance. 2) For retained actors, the number of virtual nodes - expected is the +/// balance. 3) For newly created actors, -expected is the balance (always negative). /// /// 4. Allocate the remainder, high priority to newly created nodes. /// /// 5. After that, merge removed, retained and created into a queue, with the head of the queue -/// being the source, and move the virtual nodes to the destination at the end of the queue. +/// being the source, and move the virtual nodes to the destination at the end of the queue. /// /// This can handle scale in, scale out, migration, and simultaneous scaling with as much affinity /// as possible. diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 80004aa48eaa7..7b02920ac3c4d 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -121,7 +121,7 @@ impl ActorBuilder { /// /// During this process, the following things will be done: /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the - /// compute nodes. + /// compute nodes. /// 2. Fill the upstream mview info of the `Merge` node under the other "leaf" nodes. fn rewrite(&self) -> MetaResult { self.rewrite_inner(&self.nodes, 0) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index f55caa3ea67b0..d582095367fdc 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -219,9 +219,9 @@ impl GlobalStreamManager { /// Create streaming job, it works as follows: /// /// 1. Broadcast the actor info based on the scheduling result in the context, build the hanging - /// channels in upstream worker nodes. + /// channels in upstream worker nodes. /// 2. (optional) Get the split information of the `StreamSource` via source manager and patch - /// actors. + /// actors. /// 3. Notify related worker nodes to update and build the actors. /// 4. Store related meta data. pub async fn create_streaming_job( diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index fa3f50964dcfa..43a812081a36c 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -8,6 +8,9 @@ license = { workspace = true } repository = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lints] +workspace = true + [dependencies] async-trait = "0.1" await-tree = { workspace = true } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 9ecd641692ae2..5369d914a7514 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -383,7 +383,7 @@ impl MonitoredStreamingUploader { } /// NOTICE: after #16231, streaming uploader implemented via aws-sdk-s3 will maintain metrics internally in s3.rs -/// so MonitoredStreamingUploader will only be used when the inner object store is opendal. +/// so `MonitoredStreamingUploader` will only be used when the inner object store is opendal. impl MonitoredStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { let operation_type = OperationType::StreamingUpload; diff --git a/src/risedevtool/src/task/compute_node_service.rs b/src/risedevtool/src/task/compute_node_service.rs index b87a83579ed7e..95ff9d6412fa2 100644 --- a/src/risedevtool/src/task/compute_node_service.rs +++ b/src/risedevtool/src/task/compute_node_service.rs @@ -55,9 +55,9 @@ impl ComputeNodeService { .arg("--async-stack-trace") .arg(&config.async_stack_trace) .arg("--parallelism") - .arg(&config.parallelism.to_string()) + .arg(config.parallelism.to_string()) .arg("--total-memory-bytes") - .arg(&config.total_memory_bytes.to_string()) + .arg(config.total_memory_bytes.to_string()) .arg("--role") .arg(&config.role); diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 440d8f58d2802..92683f42e742a 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -28,7 +28,6 @@ use crate::tokenizer::Token; #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterDatabaseOperation { ChangeOwner { new_owner_name: Ident }, RenameDatabase { database_name: ObjectName }, @@ -36,7 +35,6 @@ pub enum AlterDatabaseOperation { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterSchemaOperation { ChangeOwner { new_owner_name: Ident }, RenameSchema { schema_name: ObjectName }, @@ -112,7 +110,6 @@ pub enum AlterTableOperation { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterIndexOperation { RenameIndex { index_name: ObjectName, @@ -126,7 +123,6 @@ pub enum AlterIndexOperation { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterViewOperation { RenameView { view_name: ObjectName, @@ -150,7 +146,6 @@ pub enum AlterViewOperation { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterSinkOperation { RenameSink { sink_name: ObjectName, @@ -170,7 +165,6 @@ pub enum AlterSinkOperation { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterSubscriptionOperation { RenameSubscription { subscription_name: ObjectName }, ChangeOwner { new_owner_name: Ident }, @@ -179,7 +173,6 @@ pub enum AlterSubscriptionOperation { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterSourceOperation { RenameSource { source_name: ObjectName }, AddColumn { column_def: ColumnDef }, @@ -192,14 +185,12 @@ pub enum AlterSourceOperation { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterFunctionOperation { SetSchema { new_schema_name: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub enum AlterConnectionOperation { SetSchema { new_schema_name: ObjectName }, } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 0ce72be34bf82..37ff551b99511 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2766,7 +2766,6 @@ impl fmt::Display for DropFunctionOption { /// Function describe in DROP FUNCTION. #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] pub struct FunctionDesc { pub name: ObjectName, pub args: Option>, diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index b61041d79c335..c94ef26e993ba 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -12,7 +12,7 @@ //! This module defines //! 1) a list of constants for every keyword that -//! can appear in [crate::tokenizer::Word::keyword]: +//! can appear in [crate::tokenizer::Word::keyword]: //! pub const KEYWORD = "KEYWORD" //! 2) an `ALL_KEYWORDS` array with every keyword in it //! This is not a list of *reserved* keywords: some of these can be @@ -22,7 +22,7 @@ //! As a matter of fact, most of these keywords are not used at all //! and could be removed. //! 3) a `RESERVED_FOR_TABLE_ALIAS` array with keywords reserved in a -//! "table alias" context. +//! "table alias" context. use core::fmt; diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 470cf651e31d6..c01a97563237e 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -438,17 +438,16 @@ impl HummockEventHandler { mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion), ) { let instances = { - #[cfg(debug_assertion)] + #[cfg(debug_assertions)] { // check duplication on debug_mode - use std::collections::HashSet; let mut id_set = HashSet::new(); for instance in instances { assert!(id_set.insert(instance)); } id_set } - #[cfg(not(debug_assertion))] + #[cfg(not(debug_assertions))] { instances } diff --git a/src/storage/src/hummock/sstable/delete_range_aggregator.rs b/src/storage/src/hummock/sstable/delete_range_aggregator.rs index 58f4958919e04..a0c01d8cb80ea 100644 --- a/src/storage/src/hummock/sstable/delete_range_aggregator.rs +++ b/src/storage/src/hummock/sstable/delete_range_aggregator.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Ordering; use std::future::Future; #[cfg(test)] @@ -25,33 +24,6 @@ use crate::hummock::iterator::{DeleteRangeIterator, ForwardMergeRangeIterator}; use crate::hummock::sstable_store::TableHolder; use crate::hummock::{HummockResult, Sstable}; -pub struct SortedBoundary { - sequence: HummockEpoch, - user_key: UserKey>, -} - -impl PartialEq for SortedBoundary { - fn eq(&self, other: &Self) -> bool { - self.user_key.eq(&other.user_key) && self.sequence == other.sequence - } -} - -impl PartialOrd for SortedBoundary { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Eq for SortedBoundary {} - -impl Ord for SortedBoundary { - fn cmp(&self, other: &Self) -> Ordering { - self.user_key - .cmp(&other.user_key) - .then_with(|| other.sequence.cmp(&self.sequence)) - } -} - pub struct CompactionDeleteRangeIterator { inner: ForwardMergeRangeIterator, } diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index c49155efdd43e..cfed5f0eb7824 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -491,26 +491,37 @@ pub mod delete_range { /// overlaps among range tombstones among different SSTs/batchs in compaction. /// The core idea contains two parts: /// 1) we only need to keep the smallest epoch of the overlapping - /// range tomstone intervals since the key covered by the range tombstone in lower level must have - /// smaller epoches; + /// range tomstone intervals since the key covered by the range tombstone in lower level must have + /// smaller epoches; /// 2) due to 1), we lose the information to delete a key by tombstone in a single - /// SST so we add a tombstone key in the data block. - /// We leverage `events` to calculate the epoch information mentioned above. + /// SST so we add a tombstone key in the data block. + /// We leverage `events` to calculate the epoch information mentioned above. + /// /// e.g. Delete range [1, 5) at epoch1, delete range [3, 7) at epoch2 and delete range [10, 12) at /// epoch3 will first be transformed into `events` below: + /// /// `<1, +epoch1> <5, -epoch1> <3, +epoch2> <7, -epoch2> <10, +epoch3> <12, -epoch3>` + /// /// Then `events` are sorted by user key: + /// /// `<1, +epoch1> <3, +epoch2> <5, -epoch1> <7, -epoch2> <10, +epoch3> <12, -epoch3>` + /// /// We rely on the fact that keys met in compaction are in order. + /// /// When user key 0 comes, no events have happened yet so no range delete epoch. (will be /// represented as range delete epoch MAX EPOCH) + /// /// When user key 1 comes, event `<1, +epoch1>` happens so there is currently one range delete /// epoch: epoch1. + /// /// When user key 2 comes, no more events happen so the set remains `{epoch1}`. + /// /// When user key 3 comes, event `<3, +epoch2>` happens so the range delete epoch set is now /// `{epoch1, epoch2}`. + /// /// When user key 5 comes, event `<5, -epoch1>` happens so epoch1 exits the set, /// therefore the current range delete epoch set is `{epoch2}`. + /// /// When user key 11 comes, events `<7, -epoch2>` and `<10, +epoch3>` /// both happen, one after another. The set changes to `{epoch3}` from `{epoch2}`. pub fn apply_event(epochs: &mut BTreeSet, event: &CompactionDeleteRangeEvent) { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 92015136a1f3f..beb19a0a98c5d 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -630,10 +630,10 @@ pub struct NewLocalOptions { /// Whether the operation is consistent. The term `consistent` requires the following: /// /// 1. A key cannot be inserted or deleted for more than once, i.e. inserting to an existing - /// key or deleting an non-existing key is not allowed. + /// key or deleting an non-existing key is not allowed. /// /// 2. The old value passed from - /// `update` and `delete` should match the original stored value. + /// `update` and `delete` should match the original stored value. pub op_consistency_level: OpConsistencyLevel, pub table_option: TableOption, diff --git a/src/stream/src/common/compact_chunk.rs b/src/stream/src/common/compact_chunk.rs index 5d6acb5125422..b6598a80872ac 100644 --- a/src/stream/src/common/compact_chunk.rs +++ b/src/stream/src/common/compact_chunk.rs @@ -232,6 +232,7 @@ impl StreamChunkCompactor { /// have three kind of patterns Insert, Delete or Update. /// - For the update (-old row, +old row), when old row is exactly same. The two rowOp will be /// removed. + /// /// All UPDATE INSERT and UPDATE DELETE will be converted to INSERT and DELETE, and dropped according to /// certain rules (see `merge_insert` and `merge_delete` for more details). pub fn into_compacted_chunks(self) -> impl Iterator { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 9871139bafddc..57d838044171f 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -742,7 +742,7 @@ impl LogStoreRowOpStream { "a stream has reached the end but some other stream has not started yet" )); } - if cfg!(debug_assertion) { + if cfg!(debug_assertions) { while let Some((opt, _stream)) = self.row_streams.next().await { if let Some(result) = opt { return Err( diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 1746924a08dfe..f67442e723198 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -716,6 +716,7 @@ where /// - Format: | vnode | pk | true | `row_count` | /// - If previous state is `InProgress` / `NotStarted`: Persist. /// - If previous state is Completed: Do not persist. +/// /// TODO(kwannoel): we should check committed state to be all `finished` in the tests. /// TODO(kwannoel): Instead of persisting state per vnode each time, /// we can optimize by persisting state for a subset of vnodes which were updated. diff --git a/src/stream/src/executor/over_window/frame_finder.rs b/src/stream/src/executor/over_window/frame_finder.rs index 3154284653f11..343d9e4b3df79 100644 --- a/src/stream/src/executor/over_window/frame_finder.rs +++ b/src/stream/src/executor/over_window/frame_finder.rs @@ -1219,12 +1219,12 @@ mod tests { ]; let ord_key_1 = StateKey { - order_key: memcmp_encoding::encode_value(&Some(ScalarImpl::Int64(1)), order_type) + order_key: memcmp_encoding::encode_value(Some(ScalarImpl::Int64(1)), order_type) .unwrap(), pk: OwnedRow::empty().into(), }; let ord_key_2 = StateKey { - order_key: memcmp_encoding::encode_value(&Some(ScalarImpl::Int64(3)), order_type) + order_key: memcmp_encoding::encode_value(Some(ScalarImpl::Int64(3)), order_type) .unwrap(), pk: OwnedRow::empty().into(), }; @@ -1261,7 +1261,7 @@ mod tests { let ord_key_1 = StateKey { order_key: memcmp_encoding::encode_value( - &Some(ScalarImpl::Timestamp( + Some(ScalarImpl::Timestamp( "2024-01-28 00:30:00".parse().unwrap(), )), order_type, @@ -1271,7 +1271,7 @@ mod tests { }; let ord_key_2 = StateKey { order_key: memcmp_encoding::encode_value( - &Some(ScalarImpl::Timestamp( + Some(ScalarImpl::Timestamp( "2024-01-26 15:47:00".parse().unwrap(), )), order_type, diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index baac154593f7a..a3672fc2061e6 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -249,10 +249,10 @@ pub(super) struct OverPartitionStats { /// represented by [`DeltaBTreeMap`]. /// /// - `first_curr_key` and `last_curr_key` are the current keys of the first and the last -/// windows affected. They are used to pinpoint the bounds where state needs to be updated. +/// windows affected. They are used to pinpoint the bounds where state needs to be updated. /// - `first_frame_start` and `last_frame_end` are the frame start and end of the first and -/// the last windows affected. They are used to pinpoint the bounds where state needs to be -/// included for computing the new state. +/// the last windows affected. They are used to pinpoint the bounds where state needs to be +/// included for computing the new state. #[derive(Debug, Educe)] #[educe(Clone, Copy)] pub(super) struct AffectedRange<'cache> { diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 7dd999d11fb1a..32d5d533d904a 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -315,18 +315,16 @@ impl FsSourceExecutor { let start_with_paused = barrier.is_pause_on_startup(); let mut boot_state = Vec::default(); - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Add(AddMutation { splits, .. }) - | Mutation::Update(UpdateMutation { - actor_splits: splits, - .. - }) => { - if let Some(splits) = splits.get(&self.actor_ctx.id) { - boot_state.clone_from(splits); - } - } - _ => {} + if let Some( + Mutation::Add(AddMutation { splits, .. }) + | Mutation::Update(UpdateMutation { + actor_splits: splits, + .. + }), + ) = barrier.mutation.as_deref() + { + if let Some(splits) = splits.get(&self.actor_ctx.id) { + boot_state.clone_from(splits); } } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 2c80f59af1d26..8e5c3f9726c28 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -396,23 +396,21 @@ impl SourceExecutor { }; let mut boot_state = Vec::default(); - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Add(AddMutation { splits, .. }) - | Mutation::Update(UpdateMutation { - actor_splits: splits, - .. - }) => { - if let Some(splits) = splits.get(&self.actor_ctx.id) { - tracing::debug!( - "source exector: actor {:?} boot with splits: {:?}", - self.actor_ctx.id, - splits - ); - boot_state.clone_from(splits); - } - } - _ => {} + if let Some( + Mutation::Add(AddMutation { splits, .. }) + | Mutation::Update(UpdateMutation { + actor_splits: splits, + .. + }), + ) = barrier.mutation.as_deref() + { + if let Some(splits) = splits.get(&self.actor_ctx.id) { + tracing::debug!( + "source exector: actor {:?} boot with splits: {:?}", + self.actor_ctx.id, + splits + ); + boot_state.clone_from(splits); } } diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 7a6fd40f9231a..77f21b52406f5 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -57,9 +57,9 @@ pub struct SharedContext { /// There are three cases when we need local channels to pass around messages: /// 1. pass `Message` between two local actors /// 2. The RPC client at the downstream actor forwards received `Message` to one channel in - /// `ReceiverExecutor` or `MergerExecutor`. + /// `ReceiverExecutor` or `MergerExecutor`. /// 3. The RPC `Output` at the upstream actor forwards received `Message` to - /// `ExchangeServiceImpl`. + /// `ExchangeServiceImpl`. /// /// The channel serves as a buffer because `ExchangeServiceImpl` /// is on the server-side and we will also introduce backpressure. diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 0ca1c76696f39..e492b64979070 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -69,9 +69,9 @@ impl CompactionTestMetrics { /// 1. Start the cluster with ci-compaction-test config: `./risedev d ci-compaction-test` /// 2. Ingest enough L0 SSTs, for example we can use the tpch-bench tool /// 3. Disable hummock manager commit new epochs: `./risedev ctl hummock disable-commit-epoch`, and -/// it will print the current max committed epoch in Meta. +/// it will print the current max committed epoch in Meta. /// 4. Use the test tool to replay hummock version deltas and trigger compactions: -/// `./risedev compaction-test --state-store hummock+s3://your-bucket -t ` +/// `./risedev compaction-test --state-store hummock+s3://your-bucket -t ` pub async fn compaction_test_main( _listen_addr: SocketAddr, advertise_addr: HostAddr,