Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce the region lease keeper #2645

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 8 additions & 37 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub mod table_region;
// TODO(weny): removes it.
#[allow(deprecated)]
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;

use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
Expand Down Expand Up @@ -684,12 +686,11 @@ mod tests {
use std::sync::Arc;

use bytes::Bytes;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use futures::TryStreamExt;
use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder};
use table::metadata::{RawTableInfo, TableInfo};

use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::utils::region_storage_path;
use crate::key::datanode_table::RegionInfo;
use crate::key::table_info::TableInfoValue;
Expand Down Expand Up @@ -735,40 +736,6 @@ mod tests {
assert_eq!(removed, to_removed_key(key));
}

fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();

let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(10)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}

fn new_test_region_route() -> RegionRoute {
new_region_route(1, 2)
}
Expand All @@ -787,6 +754,10 @@ mod tests {
}
}

fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
test_utils::new_test_table_info(10, region_numbers)
}

#[tokio::test]
async fn test_create_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
Expand Down
34 changes: 34 additions & 0 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
Expand All @@ -23,6 +24,7 @@ use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_P
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute};
use crate::rpc::store::BatchGetRequest;

pub struct TableRouteKey {
pub table_id: TableId,
Expand Down Expand Up @@ -197,6 +199,38 @@ impl TableRouteManager {
.transpose()
}

/// It may return a subset of the `table_ids`.
pub async fn batch_get(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, TableRouteValue>> {
let lookup_table = table_ids
.iter()
.map(|id| (TableRouteKey::new(*id).as_raw_key(), id))
.collect::<HashMap<_, _>>();

let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;

let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
// Safety: must exist.
**lookup_table.get(kv.key()).unwrap(),
TableRouteValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;

Ok(values)
}

#[cfg(test)]
pub async fn get_removed(
&self,
Expand Down
57 changes: 57 additions & 0 deletions src/common/meta/src/key/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::storage::TableId;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};

pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
table_id: TableId,
region_numbers: I,
) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();

let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(table_id)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}
8 changes: 8 additions & 0 deletions src/common/meta/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ impl Peer {
addr: addr.into(),
}
}

#[cfg(any(test, feature = "testing"))]
pub fn empty(id: u64) -> Self {
Self {
id,
addr: String::new(),
}
}
}

impl Display for Peer {
Expand Down
28 changes: 22 additions & 6 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
.collect()
}

pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap<u32, &Peer> {
/// Returns the HashMap<[RegionNumber], &[Peer]>;
///
/// If the region doesn't have a leader peer, the [Region] will be omitted.
pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
region_routes
.iter()
.filter_map(|x| {
Expand All @@ -69,7 +72,10 @@ pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap<u32, &Pee
.collect::<HashMap<_, _>>()
}

pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> {
pub fn find_region_leader(
region_routes: &[RegionRoute],
region_number: RegionNumber,
) -> Option<&Peer> {
region_routes
.iter()
.find(|x| x.region.id.region_number() == region_number)
Expand Down Expand Up @@ -241,12 +247,12 @@ impl RegionRoute {
pub struct RegionRoutes(pub Vec<RegionRoute>);

impl RegionRoutes {
pub fn region_map(&self) -> HashMap<u32, &Peer> {
convert_to_region_map(&self.0)
pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
convert_to_region_leader_map(&self.0)
}

pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> {
self.region_map().get(&region_number).copied()
pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
self.region_leader_map().get(&region_number).copied()
}
}

Expand All @@ -258,6 +264,16 @@ pub struct Region {
pub attrs: BTreeMap<String, String>,
}

impl Region {
#[cfg(any(test, feature = "testing"))]
pub fn new_test(id: RegionId) -> Self {
Self {
id,
..Default::default()
}
}
}

impl From<PbRegion> for Region {
fn from(r: PbRegion) -> Self {
Self {
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ impl HeartbeatTask {
role: RegionRole::from(stat.role).into(),
approximate_bytes,
// TODO(ruihang): scratch more info
..Default::default()
rcus: 0,
wcus: 0,
approximate_rows: 0,
};
region_stats.push(region_stat);
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ uuid.workspace = true
[dev-dependencies]
chrono.workspace = true
client = { workspace = true, features = ["testing"] }
common-meta = { workspace = true, features = ["testing"] }
common-procedure-test = { workspace = true }
session = { workspace = true }
tracing = "0.1"
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ impl HeartbeatHandler for RegionFailureHandler {

#[cfg(test)]
mod tests {
use store_api::region_engine::RegionRole;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -129,6 +131,8 @@ mod tests {
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
engine: default_engine().to_string(),
role: RegionRole::Follower,
}
}
acc.stat = Some(Stat {
Expand Down
12 changes: 11 additions & 1 deletion src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;
use api::v1::meta::HeartbeatRequest;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
use store_api::region_engine::RegionRole;

use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::keys::StatKey;
Expand All @@ -25,7 +26,9 @@ use crate::keys::StatKey;
pub struct Stat {
pub timestamp_millis: i64,
pub cluster_id: u64,
// The datanode Id.
pub id: u64,
// The datanode address.
pub addr: String,
/// The read capacity units during this period
pub rcus: i64,
Expand All @@ -38,8 +41,9 @@ pub struct Stat {
pub node_epoch: u64,
}

#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionStat {
/// The region_id.
pub id: u64,
/// The read capacity units during this period
pub rcus: i64,
Expand All @@ -49,6 +53,10 @@ pub struct RegionStat {
pub approximate_bytes: i64,
/// Approximate number of rows in this region
pub approximate_rows: i64,
/// The engine name.
pub engine: String,
/// The region role.
pub role: RegionRole,
}

impl Stat {
Expand Down Expand Up @@ -132,6 +140,8 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
approximate_rows: value.approximate_rows,
engine: value.engine.to_string(),
role: RegionRole::from(value.role()),
})
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod test {

use common_meta::key::TableMetadataManager;
use common_meta::{distributed_time_constants, RegionIdent};
use store_api::region_engine::RegionRole;
use store_api::storage::{RegionId, RegionNumber};

use super::*;
Expand Down Expand Up @@ -110,7 +111,12 @@ mod test {
let region_id = RegionId::new(table_id, region_number);
RegionStat {
id: region_id.as_u64(),
..Default::default()
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
engine: String::new(),
role: RegionRole::Leader,
}
};
acc.stat = Some(Stat {
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod metrics;
pub mod mocks;
pub mod procedure;
pub mod pubsub;
pub mod region;
pub mod selector;
pub mod service;
pub mod table_meta_alloc;
Expand Down
17 changes: 17 additions & 0 deletions src/meta-srv/src/region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod lease_keeper;

pub use lease_keeper::RegionLeaseKeeper;
Loading
Loading