Skip to content

Commit

Permalink
feat: introduce the region lease keeper
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Oct 24, 2023
1 parent 0fbde48 commit f300245
Show file tree
Hide file tree
Showing 17 changed files with 984 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ derive_builder = "0.12"
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1f1dd532a111e3834cc3019c5605e2993ffb9dc3" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9fa3c42d5aaf38d8056dffd8273583b04f62c06c" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
45 changes: 8 additions & 37 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub mod table_region;
// TODO(weny): removes it.
#[allow(deprecated)]
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;

use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
Expand Down Expand Up @@ -684,12 +686,11 @@ mod tests {
use std::sync::Arc;

use bytes::Bytes;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use futures::TryStreamExt;
use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder};
use table::metadata::{RawTableInfo, TableInfo};

use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::utils::region_storage_path;
use crate::key::datanode_table::RegionInfo;
use crate::key::table_info::TableInfoValue;
Expand Down Expand Up @@ -735,40 +736,6 @@ mod tests {
assert_eq!(removed, to_removed_key(key));
}

fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();

let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(10)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}

fn new_test_region_route() -> RegionRoute {
new_region_route(1, 2)
}
Expand All @@ -786,6 +753,10 @@ mod tests {
}
}

fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
test_utils::new_test_table_info(10, region_numbers)
}

#[tokio::test]
async fn test_create_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
Expand Down
57 changes: 57 additions & 0 deletions src/common/meta/src/key/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::storage::TableId;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};

pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
table_id: TableId,
region_numbers: I,
) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();

let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(table_id)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}
8 changes: 8 additions & 0 deletions src/common/meta/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ impl Peer {
addr: addr.into(),
}
}

#[cfg(any(test, feature = "testing"))]
pub fn empty(id: u64) -> Self {
Self {
id,
addr: String::new(),
}
}
}

impl Display for Peer {
Expand Down
69 changes: 65 additions & 4 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
.collect()
}

pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap<u32, &Peer> {
/// Returns the HashMap<[RegionNumber], &[Peer]>;
///
/// If the region doesn't have a leader peer, the [Region] will be omitted.
pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<u32, &Peer> {
region_routes
.iter()
.filter_map(|x| {
Expand All @@ -115,6 +118,50 @@ pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap<u32, &Pee
.collect::<HashMap<_, _>>()
}

/// Returns the HashMap<[RegionNumber], BTreeMap<DatanodeId, &[Peer]>>.
pub fn convert_to_region_followers_map(
region_routes: &[RegionRoute],
) -> HashMap<u32, BTreeMap<u64, &Peer>> {
region_routes
.iter()
.map(|x| {
(
x.region.id.region_number(),
x.follower_peers
.iter()
.map(|peer| (peer.id, peer))
.collect::<BTreeMap<u64, &Peer>>(),
)
})
.collect::<HashMap<_, _>>()
}

/// Returns the HashMap<[RegionNumber], HashSet<DatanodeId>>.
pub fn convert_to_region_peers_map(region_routes: &[RegionRoute]) -> HashMap<u32, HashSet<u64>> {
let mut set = region_routes
.iter()
.map(|x| {
(
x.region.id.region_number(),
x.follower_peers
.iter()
.map(|peer| (peer.id))
.collect::<HashSet<u64>>(),
)
})
.collect::<HashMap<_, _>>();

for route in region_routes {
if let Some(peer) = &route.leader_peer {
let entry = set.entry(route.region.id.region_number()).or_default();

entry.insert(peer.id);
}
}

set
}

pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> {
region_routes
.iter()
Expand Down Expand Up @@ -348,12 +395,16 @@ pub struct RegionRoute {
pub struct RegionRoutes(pub Vec<RegionRoute>);

impl RegionRoutes {
pub fn region_map(&self) -> HashMap<u32, &Peer> {
convert_to_region_map(&self.0)
pub fn region_leader_map(&self) -> HashMap<u32, &Peer> {
convert_to_region_leader_map(&self.0)
}

pub fn region_followers_map(&self) -> HashMap<u32, BTreeMap<u64, &Peer>> {
convert_to_region_followers_map(&self.0)
}

pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> {
self.region_map().get(&region_number).copied()
self.region_leader_map().get(&region_number).copied()
}
}

Expand All @@ -365,6 +416,16 @@ pub struct Region {
pub attrs: BTreeMap<String, String>,
}

impl Region {
#[cfg(any(test, feature = "testing"))]
pub fn new_test(id: RegionId) -> Self {
Self {
id,
..Default::default()
}
}
}

impl From<PbRegion> for Region {
fn from(r: PbRegion) -> Self {
Self {
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ uuid.workspace = true
[dev-dependencies]
chrono.workspace = true
client = { workspace = true, features = ["testing"] }
common-meta = { workspace = true, features = ["testing"] }
common-procedure-test = { workspace = true }
session = { workspace = true }
tracing = "0.1"
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ impl HeartbeatHandler for RegionFailureHandler {

#[cfg(test)]
mod tests {
use store_api::region_engine::RegionRole;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -129,6 +131,8 @@ mod tests {
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
engine: default_engine().to_string(),
role: RegionRole::Follower,
}
}
acc.stat = Some(Stat {
Expand Down
26 changes: 25 additions & 1 deletion src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;
use api::v1::meta::HeartbeatRequest;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
use store_api::region_engine::RegionRole;

use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::keys::StatKey;
Expand All @@ -25,7 +26,9 @@ use crate::keys::StatKey;
pub struct Stat {
pub timestamp_millis: i64,
pub cluster_id: u64,
// The datanode Id.
pub id: u64,
// The datanode address.
pub addr: String,
/// The read capacity units during this period
pub rcus: i64,
Expand All @@ -38,8 +41,9 @@ pub struct Stat {
pub node_epoch: u64,
}

#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionStat {
/// The region_id.
pub id: u64,
/// The read capacity units during this period
pub rcus: i64,
Expand All @@ -49,6 +53,24 @@ pub struct RegionStat {
pub approximate_bytes: i64,
/// Approximate number of rows in this region
pub approximate_rows: i64,
/// The engine name.
pub engine: String,
/// The region role.
pub role: RegionRole,
}

impl Default for RegionStat {
fn default() -> Self {
Self {
id: 0,
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
engine: String::new(),
role: RegionRole::Follower,
}
}
}

impl Stat {
Expand Down Expand Up @@ -132,6 +154,8 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
approximate_rows: value.approximate_rows,
engine: value.engine.to_string(),
role: RegionRole::from(value.role()),
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(async_closure)]
#![feature(result_flattening)]
#![feature(extract_if)]

pub mod bootstrap;
mod cache_invalidator;
Expand All @@ -31,6 +32,7 @@ mod metrics;
pub mod mocks;
pub mod procedure;
pub mod pubsub;
pub mod region;
pub mod selector;
pub mod service;
pub mod table_meta_alloc;
Expand Down
17 changes: 17 additions & 0 deletions src/meta-srv/src/region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod lease_keeper;

pub use lease_keeper::RegionLeaseKeeper;
Loading

0 comments on commit f300245

Please sign in to comment.