Skip to content

Commit

Permalink
feat: introduce the region lease keeper (#2645)
Browse files Browse the repository at this point in the history
* feat: introduce the region lease keeper

* chore: apply suggestions from CR

* refactor: simplify `retain_active_regions`

* refactor: remove Default of RegionStat

* chore: add todo comments

* chore: apply suggestions from CR

* refactor: simplify `retain_active_regions`

* fix: fix ci
  • Loading branch information
WenyXu authored Nov 2, 2023
1 parent 04a8fc5 commit ce867fb
Show file tree
Hide file tree
Showing 17 changed files with 602 additions and 49 deletions.
45 changes: 8 additions & 37 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub mod table_region;
// TODO(weny): removes it.
#[allow(deprecated)]
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;

use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
Expand Down Expand Up @@ -684,12 +686,11 @@ mod tests {
use std::sync::Arc;

use bytes::Bytes;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use futures::TryStreamExt;
use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder};
use table::metadata::{RawTableInfo, TableInfo};

use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::utils::region_storage_path;
use crate::key::datanode_table::RegionInfo;
use crate::key::table_info::TableInfoValue;
Expand Down Expand Up @@ -735,40 +736,6 @@ mod tests {
assert_eq!(removed, to_removed_key(key));
}

fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();

let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(10)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}

fn new_test_region_route() -> RegionRoute {
new_region_route(1, 2)
}
Expand All @@ -787,6 +754,10 @@ mod tests {
}
}

fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
test_utils::new_test_table_info(10, region_numbers)
}

#[tokio::test]
async fn test_create_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
Expand Down
34 changes: 34 additions & 0 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
Expand All @@ -23,6 +24,7 @@ use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_P
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute};
use crate::rpc::store::BatchGetRequest;

pub struct TableRouteKey {
pub table_id: TableId,
Expand Down Expand Up @@ -197,6 +199,38 @@ impl TableRouteManager {
.transpose()
}

/// It may return a subset of the `table_ids`.
pub async fn batch_get(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, TableRouteValue>> {
let lookup_table = table_ids
.iter()
.map(|id| (TableRouteKey::new(*id).as_raw_key(), id))
.collect::<HashMap<_, _>>();

let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;

let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
// Safety: must exist.
**lookup_table.get(kv.key()).unwrap(),
TableRouteValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;

Ok(values)
}

#[cfg(test)]
pub async fn get_removed(
&self,
Expand Down
57 changes: 57 additions & 0 deletions src/common/meta/src/key/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::storage::TableId;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};

pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
table_id: TableId,
region_numbers: I,
) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();

let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(table_id)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}
8 changes: 8 additions & 0 deletions src/common/meta/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ impl Peer {
addr: addr.into(),
}
}

#[cfg(any(test, feature = "testing"))]
pub fn empty(id: u64) -> Self {
Self {
id,
addr: String::new(),
}
}
}

impl Display for Peer {
Expand Down
28 changes: 22 additions & 6 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
.collect()
}

pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap<u32, &Peer> {
/// Returns the HashMap<[RegionNumber], &[Peer]>;
///
/// If the region doesn't have a leader peer, the [Region] will be omitted.
pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
region_routes
.iter()
.filter_map(|x| {
Expand All @@ -69,7 +72,10 @@ pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap<u32, &Pee
.collect::<HashMap<_, _>>()
}

pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> {
pub fn find_region_leader(
region_routes: &[RegionRoute],
region_number: RegionNumber,
) -> Option<&Peer> {
region_routes
.iter()
.find(|x| x.region.id.region_number() == region_number)
Expand Down Expand Up @@ -241,12 +247,12 @@ impl RegionRoute {
pub struct RegionRoutes(pub Vec<RegionRoute>);

impl RegionRoutes {
pub fn region_map(&self) -> HashMap<u32, &Peer> {
convert_to_region_map(&self.0)
pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
convert_to_region_leader_map(&self.0)
}

pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> {
self.region_map().get(&region_number).copied()
pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
self.region_leader_map().get(&region_number).copied()
}
}

Expand All @@ -258,6 +264,16 @@ pub struct Region {
pub attrs: BTreeMap<String, String>,
}

impl Region {
#[cfg(any(test, feature = "testing"))]
pub fn new_test(id: RegionId) -> Self {
Self {
id,
..Default::default()
}
}
}

impl From<PbRegion> for Region {
fn from(r: PbRegion) -> Self {
Self {
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ impl HeartbeatTask {
role: RegionRole::from(stat.role).into(),
approximate_bytes,
// TODO(ruihang): scratch more info
..Default::default()
rcus: 0,
wcus: 0,
approximate_rows: 0,
};
region_stats.push(region_stat);
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ uuid.workspace = true
[dev-dependencies]
chrono.workspace = true
client = { workspace = true, features = ["testing"] }
common-meta = { workspace = true, features = ["testing"] }
common-procedure-test = { workspace = true }
session = { workspace = true }
tracing = "0.1"
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ impl HeartbeatHandler for RegionFailureHandler {

#[cfg(test)]
mod tests {
use store_api::region_engine::RegionRole;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -129,6 +131,8 @@ mod tests {
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
engine: default_engine().to_string(),
role: RegionRole::Follower,
}
}
acc.stat = Some(Stat {
Expand Down
12 changes: 11 additions & 1 deletion src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;
use api::v1::meta::HeartbeatRequest;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
use store_api::region_engine::RegionRole;

use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::keys::StatKey;
Expand All @@ -25,7 +26,9 @@ use crate::keys::StatKey;
pub struct Stat {
pub timestamp_millis: i64,
pub cluster_id: u64,
// The datanode Id.
pub id: u64,
// The datanode address.
pub addr: String,
/// The read capacity units during this period
pub rcus: i64,
Expand All @@ -38,8 +41,9 @@ pub struct Stat {
pub node_epoch: u64,
}

#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionStat {
/// The region_id.
pub id: u64,
/// The read capacity units during this period
pub rcus: i64,
Expand All @@ -49,6 +53,10 @@ pub struct RegionStat {
pub approximate_bytes: i64,
/// Approximate number of rows in this region
pub approximate_rows: i64,
/// The engine name.
pub engine: String,
/// The region role.
pub role: RegionRole,
}

impl Stat {
Expand Down Expand Up @@ -132,6 +140,8 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
approximate_rows: value.approximate_rows,
engine: value.engine.to_string(),
role: RegionRole::from(value.role()),
})
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod test {

use common_meta::key::TableMetadataManager;
use common_meta::{distributed_time_constants, RegionIdent};
use store_api::region_engine::RegionRole;
use store_api::storage::{RegionId, RegionNumber};

use super::*;
Expand Down Expand Up @@ -110,7 +111,12 @@ mod test {
let region_id = RegionId::new(table_id, region_number);
RegionStat {
id: region_id.as_u64(),
..Default::default()
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
engine: String::new(),
role: RegionRole::Leader,
}
};
acc.stat = Some(Stat {
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod metrics;
pub mod mocks;
pub mod procedure;
pub mod pubsub;
pub mod region;
pub mod selector;
pub mod service;
pub mod table_meta_alloc;
Expand Down
17 changes: 17 additions & 0 deletions src/meta-srv/src/region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod lease_keeper;

pub use lease_keeper::RegionLeaseKeeper;
Loading

0 comments on commit ce867fb

Please sign in to comment.