diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs index 001862d98316..981460cb74a2 100644 --- a/src/catalog/src/remote/client.rs +++ b/src/catalog/src/remote/client.rs @@ -58,35 +58,12 @@ impl KvBackend for CachedMetaKvBackend { &self.name } - async fn range(&self, req: RangeRequest) -> Result { - self.kv_backend.range(req).await + fn as_any(&self) -> &dyn Any { + self } - async fn get(&self, key: &[u8]) -> Result> { - let _timer = timer!(METRIC_CATALOG_KV_GET); - - let init = async { - let _timer = timer!(METRIC_CATALOG_KV_REMOTE_GET); - self.kv_backend.get(key).await.map(|val| { - val.with_context(|| CacheNotGetSnafu { - key: String::from_utf8_lossy(key), - }) - })? - }; - - // currently moka doesn't have `optionally_try_get_with_by_ref` - // TODO(fys): change to moka method when available - // https://github.com/moka-rs/moka/issues/254 - match self.cache.try_get_with_by_ref(key, init).await { - Ok(val) => Ok(Some(val)), - Err(e) => match e.as_ref() { - CacheNotGet { .. } => Ok(None), - _ => Err(e), - }, - } - .map_err(|e| GetKvCache { - err_msg: e.to_string(), - }) + async fn range(&self, req: RangeRequest) -> Result { + self.kv_backend.range(req).await } async fn put(&self, req: PutRequest) -> Result { @@ -119,6 +96,22 @@ impl KvBackend for CachedMetaKvBackend { resp } + async fn batch_get(&self, req: BatchGetRequest) -> Result { + self.kv_backend.batch_get(req).await + } + + async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { + let key = &req.key.clone(); + + let ret = self.kv_backend.compare_and_put(req).await; + + if ret.is_ok() { + self.invalidate_key(key).await; + } + + ret + } + async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result { let prev_kv = req.prev_kv; @@ -159,22 +152,6 @@ impl KvBackend for CachedMetaKvBackend { } } - async fn batch_get(&self, req: BatchGetRequest) -> Result { - self.kv_backend.batch_get(req).await - } - - async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { - let key = &req.key.clone(); - - let ret = self.kv_backend.compare_and_put(req).await; - - if ret.is_ok() { - self.invalidate_key(key).await; - } - - ret - } - async fn move_value(&self, req: MoveValueRequest) -> Result { let from_key = &req.from_key.clone(); let to_key = &req.to_key.clone(); @@ -189,8 +166,31 @@ impl KvBackend for CachedMetaKvBackend { ret } - fn as_any(&self) -> &dyn Any { - self + async fn get(&self, key: &[u8]) -> Result> { + let _timer = timer!(METRIC_CATALOG_KV_GET); + + let init = async { + let _timer = timer!(METRIC_CATALOG_KV_REMOTE_GET); + self.kv_backend.get(key).await.map(|val| { + val.with_context(|| CacheNotGetSnafu { + key: String::from_utf8_lossy(key), + }) + })? + }; + + // currently moka doesn't have `optionally_try_get_with_by_ref` + // TODO(fys): change to moka method when available + // https://github.com/moka-rs/moka/issues/254 + match self.cache.try_get_with_by_ref(key, init).await { + Ok(val) => Ok(Some(val)), + Err(e) => match e.as_ref() { + CacheNotGet { .. } => Ok(None), + _ => Err(e), + }, + } + .map_err(|e| GetKvCache { + err_msg: e.to_string(), + }) } } diff --git a/src/common/meta/src/ident.rs b/src/common/meta/src/ident.rs index 522a242e2274..3e7bba884bee 100644 --- a/src/common/meta/src/ident.rs +++ b/src/common/meta/src/ident.rs @@ -20,7 +20,7 @@ use snafu::OptionExt; use crate::error::{Error, InvalidProtoMsgSnafu}; -#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(Eq, Hash, PartialEq, Clone, Debug, Default, Serialize, Deserialize)] pub struct TableIdent { pub catalog: String, pub schema: String, diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index d068782def40..30532a66fe93 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -40,12 +40,16 @@ where { fn name(&self) -> &str; + fn as_any(&self) -> &dyn Any; + async fn range(&self, req: RangeRequest) -> Result; async fn put(&self, req: PutRequest) -> Result; async fn batch_put(&self, req: BatchPutRequest) -> Result; + async fn batch_get(&self, req: BatchGetRequest) -> Result; + async fn compare_and_put( &self, req: CompareAndPutRequest, @@ -56,27 +60,17 @@ where req: DeleteRangeRequest, ) -> Result; - async fn delete(&self, key: &[u8], prev_kv: bool) -> Result, Self::Error> { - let mut req = DeleteRangeRequest::new().with_key(key.to_vec()); - if prev_kv { - req = req.with_prev_kv(); - } - - let resp = self.delete_range(req).await?; - - if prev_kv { - Ok(resp.prev_kvs.into_iter().next()) - } else { - Ok(None) - } - } - async fn batch_delete( &self, req: BatchDeleteRequest, ) -> Result; - /// Default get is implemented based on `range` method. + /// MoveValue atomically renames the key to the given updated key. + async fn move_value(&self, req: MoveValueRequest) -> Result; + + // The following methods are implemented based on the above methods, + // and a higher-level interface is provided for to simplify usage. + async fn get(&self, key: &[u8]) -> Result, Self::Error> { let req = RangeRequest::new().with_key(key.to_vec()); let mut resp = self.range(req).await?; @@ -87,10 +81,26 @@ where }) } - async fn batch_get(&self, req: BatchGetRequest) -> Result; + /// Check if the key exists, not returning the value. + /// If the value is large, this method is more efficient than `get`. + async fn exists(&self, key: &[u8]) -> Result { + let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only(); + let resp = self.range(req).await?; + Ok(!resp.kvs.is_empty()) + } - /// MoveValue atomically renames the key to the given updated key. - async fn move_value(&self, req: MoveValueRequest) -> Result; + async fn delete(&self, key: &[u8], prev_kv: bool) -> Result, Self::Error> { + let mut req = DeleteRangeRequest::new().with_key(key.to_vec()); + if prev_kv { + req = req.with_prev_kv(); + } - fn as_any(&self) -> &dyn Any; + let resp = self.delete_range(req).await?; + + if prev_kv { + Ok(resp.prev_kvs.into_iter().next()) + } else { + Ok(None) + } + } } diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 9b4a6f47b35b..719403e4414f 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -80,6 +80,10 @@ impl KvBackend for MemoryKvBackend { "Memory" } + fn as_any(&self) -> &dyn Any { + self + } + async fn range(&self, req: RangeRequest) -> Result { let RangeRequest { key, @@ -155,6 +159,23 @@ impl KvBackend for MemoryKvBackend { Ok(BatchPutResponse { prev_kvs }) } + async fn batch_get(&self, req: BatchGetRequest) -> Result { + let kvs = self.kvs.read().unwrap(); + + let kvs = req + .keys + .into_iter() + .filter_map(|key| { + kvs.get_key_value(&key).map(|(k, v)| KeyValue { + key: k.clone(), + value: v.clone(), + }) + }) + .collect::>(); + + Ok(BatchGetResponse { kvs }) + } + async fn compare_and_put( &self, req: CompareAndPutRequest, @@ -251,23 +272,6 @@ impl KvBackend for MemoryKvBackend { Ok(BatchDeleteResponse { prev_kvs }) } - async fn batch_get(&self, req: BatchGetRequest) -> Result { - let kvs = self.kvs.read().unwrap(); - - let kvs = req - .keys - .into_iter() - .filter_map(|key| { - kvs.get_key_value(&key).map(|(k, v)| KeyValue { - key: k.clone(), - value: v.clone(), - }) - }) - .collect::>(); - - Ok(BatchGetResponse { kvs }) - } - async fn move_value(&self, req: MoveValueRequest) -> Result { let MoveValueRequest { from_key, to_key } = req; @@ -288,10 +292,6 @@ impl KvBackend for MemoryKvBackend { Ok(MoveValueResponse(kv)) } - - fn as_any(&self) -> &dyn Any { - self - } } #[async_trait] diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 9996b5f572f9..3ec1a699acdd 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -458,6 +458,9 @@ pub enum Error { source: common_meta::error::Error, location: Location, }, + + #[snafu(display("Invalid heartbeat request: {}", err_msg))] + InvalidHeartbeatRequest { err_msg: String, location: Location }, } pub type Result = std::result::Result; @@ -469,10 +472,6 @@ impl From for tonic::Status { } impl ErrorExt for Error { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn status_code(&self) -> StatusCode { match self { Error::EtcdFailed { .. } @@ -516,7 +515,8 @@ impl ErrorExt for Error { | Error::InvalidStatKey { .. } | Error::ParseNum { .. } | Error::UnsupportedSelectorType { .. } - | Error::InvalidArguments { .. } => StatusCode::InvalidArguments, + | Error::InvalidArguments { .. } + | Error::InvalidHeartbeatRequest { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } | Error::StatKeyFromUtf8 { .. } @@ -558,6 +558,10 @@ impl ErrorExt for Error { Error::Other { source, .. } => source.status_code(), } } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } // for form tonic diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 5b02ae6f4385..4eb84e95a381 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::meta::{HeartbeatRequest, Role}; -use common_telemetry::debug; +use common_telemetry::warn; use super::node_stat::Stat; use crate::error::Result; @@ -45,8 +45,8 @@ impl HeartbeatHandler for CollectStatsHandler { Ok(stat) => { let _ = acc.stat.insert(stat); } - Err(_) => { - debug!("Incomplete heartbeat data: {:?}", req); + Err(err) => { + warn!("Incomplete heartbeat data: {:?}, err: {:?}", req, err); } }; diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 2504dc171507..72bbf1411310 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -36,7 +36,6 @@ pub(crate) struct DatanodeHeartbeat { pub struct RegionFailureHandler { failure_detect_runner: FailureDetectRunner, - region_failover_manager: Arc, } impl RegionFailureHandler { @@ -52,13 +51,8 @@ impl RegionFailureHandler { Ok(Self { failure_detect_runner, - region_failover_manager, }) } - - pub(crate) fn region_failover_manager(&self) -> &Arc { - &self.region_failover_manager - } } #[async_trait] @@ -89,9 +83,9 @@ impl HeartbeatHandler for RegionFailureHandler { cluster_id: stat.cluster_id, datanode_id: stat.id, table_ident: TableIdent { - catalog: x.catalog.clone(), - schema: x.schema.clone(), - table: x.table.clone(), + catalog: x.table_ident.catalog.clone(), + schema: x.table_ident.schema.clone(), + table: x.table_ident.table.clone(), table_id: RegionId::from(x.id).table_id(), // TODO(#1583): Use the actual table engine. engine: MITO_ENGINE.to_string(), @@ -132,9 +126,13 @@ mod tests { fn new_region_stat(region_id: u64) -> RegionStat { RegionStat { id: region_id, - catalog: "a".to_string(), - schema: "b".to_string(), - table: "c".to_string(), + table_ident: TableIdent { + catalog: "a".to_string(), + schema: "b".to_string(), + table: "c".to_string(), + table_id: 0, + engine: "d".to_string(), + }, rcus: 0, wcus: 0, approximate_bytes: 0, diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 555ac4afd452..7464bc86396f 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -13,9 +13,12 @@ // limitations under the License. use api::v1::meta::HeartbeatRequest; +use common_meta::ident::TableIdent; use common_time::util as time_util; use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::keys::StatKey; #[derive(Debug, Default, Serialize, Deserialize)] @@ -40,9 +43,7 @@ pub struct Stat { #[derive(Debug, Default, Serialize, Deserialize)] pub struct RegionStat { pub id: u64, - pub catalog: String, - pub schema: String, - pub table: String, + pub table_ident: TableIdent, /// The read capacity units during this period pub rcus: i64, /// The write capacity units during this period @@ -63,7 +64,7 @@ impl Stat { } impl TryFrom for Stat { - type Error = (); + type Error = Error; fn try_from(value: HeartbeatRequest) -> Result { let HeartbeatRequest { @@ -82,6 +83,10 @@ impl TryFrom for Stat { } else { None }; + let region_stats = region_stats + .into_iter() + .map(RegionStat::try_from) + .collect::, _>>()?; Ok(Self { timestamp_millis: time_util::current_time_millis(), @@ -92,31 +97,41 @@ impl TryFrom for Stat { wcus: node_stat.wcus, table_num: node_stat.table_num, region_num, - region_stats: region_stats.into_iter().map(RegionStat::from).collect(), + region_stats, node_epoch, }) } - _ => Err(()), + _ => InvalidHeartbeatRequestSnafu { + err_msg: "missing header, peer or node_stat", + } + .fail(), } } } -impl From for RegionStat { - fn from(value: api::v1::meta::RegionStat) -> Self { - let table = value - .table_ident - .as_ref() - .and_then(|t| t.table_name.as_ref()); - Self { +impl TryFrom for RegionStat { + type Error = Error; + + fn try_from(value: api::v1::meta::RegionStat) -> Result { + let table_ident = value.table_ident.context(InvalidHeartbeatRequestSnafu { + err_msg: "missing table_ident", + })?; + let table_ident_result = TableIdent::try_from(table_ident); + let Ok(table_ident) = table_ident_result else { + return InvalidHeartbeatRequestSnafu { + err_msg: format!("invalid table_ident: {:?}", table_ident_result.err()), + } + .fail(); + }; + + Ok(Self { id: value.region_id, - catalog: table.map_or("", |t| &t.catalog_name).to_string(), - schema: table.map_or("", |t| &t.schema_name).to_string(), - table: table.map_or("", |t| &t.table_name).to_string(), + table_ident, rcus: value.rcus, wcus: value.wcus, approximate_bytes: value.approximate_bytes, approximate_rows: value.approximate_rows, - } + }) } } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 96df5fea9338..ef8ed381da63 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -13,90 +13,23 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; -use common_meta::ident::TableIdent; -use common_meta::key::TableMetadataManagerRef; -use common_meta::ClusterId; -use common_telemetry::warn; -use snafu::ResultExt; -use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::TableId; +use store_api::storage::RegionId; -use crate::error::{Result, TableMetadataManagerSnafu}; +use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::inactive_node_manager::InactiveNodeManager; use crate::metasrv::Context; -use crate::procedure::region_failover::{RegionFailoverKey, RegionFailoverManager}; /// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus /// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second). // TODO(LFC): Make region lease seconds calculated from Datanode heartbeat configuration. pub(crate) const REGION_LEASE_SECONDS: u64 = 20; -pub(crate) struct RegionLeaseHandler { - region_failover_manager: Option>, - table_metadata_manager: TableMetadataManagerRef, -} - -impl RegionLeaseHandler { - pub(crate) fn new( - region_failover_manager: Option>, - table_metadata_manager: TableMetadataManagerRef, - ) -> Self { - Self { - region_failover_manager, - table_metadata_manager, - } - } - - async fn find_table_ident(&self, table_id: TableId) -> Result> { - let value = self - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)?; - Ok(value.map(|x| { - let table_info = &x.table_info; - TableIdent { - catalog: table_info.catalog_name.clone(), - schema: table_info.schema_name.clone(), - table: table_info.name.clone(), - table_id, - engine: table_info.meta.engine.clone(), - } - })) - } - - /// Filter out the regions that are currently in failover. - /// It's meaningless to extend the lease of a region if it is in failover. - fn filter_failover_regions( - &self, - cluster_id: ClusterId, - table_ident: &TableIdent, - regions: Vec, - ) -> Vec { - if let Some(region_failover_manager) = &self.region_failover_manager { - let mut region_failover_key = RegionFailoverKey { - cluster_id, - table_ident: table_ident.clone(), - region_number: 0, - }; - - regions - .into_iter() - .filter(|region| { - region_failover_key.region_number = *region; - !region_failover_manager.is_region_failover_running(®ion_failover_key) - }) - .collect() - } else { - regions - } - } -} +#[derive(Default)] +pub(crate) struct RegionLeaseHandler; #[async_trait] impl HeartbeatHandler for RegionLeaseHandler { @@ -107,73 +40,56 @@ impl HeartbeatHandler for RegionLeaseHandler { async fn handle( &self, req: &HeartbeatRequest, - _: &mut Context, + ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { let Some(stat) = acc.stat.as_ref() else { return Ok(()) }; - let datanode_id = stat.id; - let mut datanode_regions = HashMap::new(); - stat.region_stats.iter().for_each(|x| { - let region_id: RegionId = x.id.into(); - let table_id = region_id.table_id(); - datanode_regions - .entry(table_id) + let mut table_region_leases = HashMap::new(); + stat.region_stats.iter().for_each(|region_stat| { + let table_ident = region_stat.table_ident.clone(); + table_region_leases + .entry(table_ident) .or_insert_with(Vec::new) - .push(RegionId::from(x.id).region_number()); + .push(RegionId::from(region_stat.id).region_number()); }); - let mut region_leases = Vec::with_capacity(datanode_regions.len()); - for (table_id, local_regions) in datanode_regions { - let Some(table_ident) = self.find_table_ident(table_id).await? else { - warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ - Reason: table not found."); - continue; - }; - - let Some(table_region_value) = self - .table_metadata_manager - .table_region_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? else { - warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ - Reason: table region value not found."); - continue; - }; - let Some(global_regions) = table_region_value - .region_distribution - .get(&datanode_id) else { - warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \ - Reason: not expected to place the region on it."); - continue; - }; - - // Filter out the designated regions from table info value for the given table on the given Datanode. - let designated_regions = local_regions - .into_iter() - .filter(|x| global_regions.contains(x)) - .collect::>(); - - let designated_regions = - self.filter_failover_regions(stat.cluster_id, &table_ident, designated_regions); + let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory); + for (table_ident, region_numbers) in table_region_leases.iter_mut() { + inactive_node_manager + .retain_active_regions( + stat.cluster_id, + stat.id, + table_ident.table_id, + region_numbers, + ) + .await?; + } - region_leases.push(RegionLease { + acc.region_leases = table_region_leases + .into_iter() + .filter(|(_, regions)| !regions.is_empty()) // filter out empty region_numbers + .map(|(table_ident, regions)| RegionLease { table_ident: Some(table_ident.into()), - regions: designated_regions, + regions, duration_since_epoch: req.duration_since_epoch, lease_seconds: REGION_LEASE_SECONDS, - }); - } - acc.region_leases = region_leases; + }) + .collect(); + Ok(()) } } #[cfg(test)] mod test { + use std::sync::Arc; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManager; + use common_meta::RegionIdent; + use store_api::storage::RegionNumber; use super::*; use crate::handler::node_stat::{RegionStat, Stat}; @@ -208,18 +124,6 @@ mod test { table_id, engine: "mito".to_string(), }; - let _ = region_failover_manager - .running_procedures() - .write() - .unwrap() - .insert(RegionFailoverKey { - cluster_id: 1, - table_ident: table_ident.clone(), - region_number: 1, - }); - - let handler = - RegionLeaseHandler::new(Some(region_failover_manager), table_metadata_manager); let req = HeartbeatRequest { duration_since_epoch: 1234, @@ -229,15 +133,20 @@ mod test { let builder = MetaSrvBuilder::new(); let metasrv = builder.build().await.unwrap(); let ctx = &mut metasrv.new_ctx(); + let handler = RegionLeaseHandler::default(); let acc = &mut HeartbeatAccumulator::default(); let new_region_stat = |region_number: RegionNumber| -> RegionStat { let region_id = RegionId::new(table_id, region_number); RegionStat { id: region_id.as_u64(), - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: table_name.to_string(), + table_ident: TableIdent { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table_name.to_string(), + table_id: 1, + engine: "mito".to_string(), + }, ..Default::default() } }; @@ -248,10 +157,34 @@ mod test { ..Default::default() }); + let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory); + inactive_node_manager + .register_inactive_region(&RegionIdent { + cluster_id: 1, + datanode_id: 1, + table_ident: TableIdent { + table_id: 1, + ..Default::default() + }, + region_number: 1, + }) + .await + .unwrap(); + inactive_node_manager + .register_inactive_region(&RegionIdent { + cluster_id: 1, + datanode_id: 1, + table_ident: TableIdent { + table_id: 1, + ..Default::default() + }, + region_number: 3, + }) + .await + .unwrap(); + handler.handle(&req, ctx, acc).await.unwrap(); - // region 1 is during failover and region 3 is not in table region value, - // so only region 2's lease is extended. assert_eq!(acc.region_leases.len(), 1); let lease = acc.region_leases.remove(0); assert_eq!(lease.table_ident.unwrap(), table_ident.into()); diff --git a/src/meta-srv/src/inactive_node_manager.rs b/src/meta-srv/src/inactive_node_manager.rs new file mode 100644 index 000000000000..6462a8a89caa --- /dev/null +++ b/src/meta-srv/src/inactive_node_manager.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_meta::rpc::store::{BatchGetRequest, PutRequest}; +use common_meta::RegionIdent; +use store_api::storage::RegionNumber; + +use crate::error::Result; +use crate::keys::InactiveNodeKey; +use crate::service::store::kv::ResettableKvStoreRef; + +pub struct InactiveNodeManager<'a> { + store: &'a ResettableKvStoreRef, +} + +impl<'a> InactiveNodeManager<'a> { + pub fn new(store: &'a ResettableKvStoreRef) -> Self { + Self { store } + } + + pub async fn register_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> { + let key = InactiveNodeKey { + cluster_id: region_ident.cluster_id, + node_id: region_ident.datanode_id, + table_id: region_ident.table_ident.table_id, + region_number: region_ident.region_number, + }; + let req = PutRequest { + key: key.into(), + value: vec![], + prev_kv: false, + }; + self.store.put(req).await?; + Ok(()) + } + + pub async fn deregister_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> { + let key: Vec = InactiveNodeKey { + cluster_id: region_ident.cluster_id, + node_id: region_ident.datanode_id, + table_id: region_ident.table_ident.table_id, + region_number: region_ident.region_number, + } + .into(); + self.store.delete(&key, false).await?; + Ok(()) + } + + /// The input is a list of regions from a table on a specific node. If one or more + /// regions have been set to inactive state by metasrv, the corresponding regions + /// will be removed, then return the remaining regions. + pub async fn retain_active_regions( + &self, + cluster_id: u64, + node_id: u64, + table_id: u32, + region_numbers: &mut Vec, + ) -> Result<()> { + let key_region_numbers: Vec<(Vec, RegionNumber)> = region_numbers + .iter() + .map(|region_number| { + ( + InactiveNodeKey { + cluster_id, + node_id, + table_id, + region_number: *region_number, + } + .into(), + *region_number, + ) + }) + .collect(); + let keys = key_region_numbers + .iter() + .map(|(key, _)| key.clone()) + .collect(); + let resp = self.store.batch_get(BatchGetRequest { keys }).await?; + let kvs = resp.kvs; + if kvs.is_empty() { + return Ok(()); + } + + let inactive_keys = kvs.into_iter().map(|kv| kv.key).collect::>(); + let active_region_numbers = key_region_numbers + .into_iter() + .filter(|(key, _)| !inactive_keys.contains(key)) + .map(|(_, region_number)| region_number) + .collect::>(); + *region_numbers = active_region_numbers; + + Ok(()) + } +} diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 3553ffcf4ef4..a64763cbfd8c 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -19,6 +19,7 @@ use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::RegionNumber; use crate::error; use crate::error::Result; @@ -26,6 +27,7 @@ use crate::handler::node_stat::Stat; pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; +pub(crate) const INACTIVE_NODE_PREFIX: &str = "__meta_inactive_node"; pub const DN_STAT_PREFIX: &str = "__meta_dnstat"; @@ -136,7 +138,7 @@ pub fn build_table_route_prefix(catalog: impl AsRef, schema: impl AsRef> for StatValue { } } +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +pub struct InactiveNodeKey { + pub cluster_id: u64, + pub node_id: u64, + pub table_id: u32, + pub region_number: RegionNumber, +} + +impl From for Vec { + fn from(value: InactiveNodeKey) -> Self { + format!( + "{}-{}-{}-{}-{}", + INACTIVE_NODE_PREFIX, + value.cluster_id, + value.node_id, + value.table_id, + value.region_number + ) + .into_bytes() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 6034bbe11cbc..5a0032fa7897 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -39,5 +39,6 @@ pub mod table_routes; pub use crate::error::Result; +mod inactive_node_manager; #[cfg(test)] mod test_util; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index a8bcc066a736..9b24e208e317 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -18,11 +18,12 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_grpc::channel_manager::ChannelConfig; -use common_meta::key::TableMetadataManager; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::ProcedureManagerRef; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::ddl::DdlManager; +use crate::ddl::{DdlManager, DdlManagerRef}; use crate::error::Result; use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; @@ -41,6 +42,7 @@ use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; +use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; use crate::service::store::memory::MemStore; @@ -145,63 +147,26 @@ impl MetaSrvBuilder { let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default())); let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default())); - let leader_cached_kv_store = Arc::new(LeaderCachedKvStore::new( - Arc::new(CheckLeaderByElection(election.clone())), - kv_store.clone(), - )); - let meta_peer_client = meta_peer_client.unwrap_or_else(|| { - MetaPeerClientBuilder::default() - .election(election.clone()) - .in_memory(in_memory.clone()) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap() - }); + let leader_cached_kv_store = build_leader_cached_kv_store(&election, &kv_store); + let meta_peer_client = meta_peer_client + .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector)); let pushers = Pushers::default(); - let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone()); - let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); - let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); - - let manager_config = ManagerConfig { - max_retry_times: options.procedure.max_retry_times, - retry_delay: options.procedure.retry_delay, - ..Default::default() - }; - - let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store)); + let mailbox = build_mailbox(&kv_store, &pushers); + let procedure_manager = build_procedure_manager(&options, &kv_store); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); let metadata_service = metadata_service .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); - - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))); - - let datanode_clients = datanode_clients.unwrap_or_else(|| { - let datanode_client_channel_config = ChannelConfig::new() - .timeout(Duration::from_millis( - options.datanode.client_options.timeout_millis, - )) - .connect_timeout(Duration::from_millis( - options.datanode.client_options.connect_timeout_millis, - )) - .tcp_nodelay(options.datanode.client_options.tcp_nodelay); - Arc::new(DatanodeClients::new(datanode_client_channel_config)) - }); - - // TODO(weny): considers to modify the default config of procedure manager - let ddl_manager = Arc::new(DdlManager::new( - procedure_manager.clone(), - kv_store.clone(), + let table_metadata_manager = build_table_metadata_manager(&kv_store); + let ddl_manager = build_ddl_manager( + &options, datanode_clients, - mailbox.clone(), - options.server_addr.clone(), - table_metadata_manager.clone(), - )); - + &procedure_manager, + &kv_store, + &mailbox, + &table_metadata_manager, + ); let _ = ddl_manager.try_start(); let handler_group = match handler_group { @@ -210,36 +175,30 @@ impl MetaSrvBuilder { let region_failover_handler = if options.disable_region_failover { None } else { + let select_ctx = SelectorContext { + server_addr: options.server_addr.clone(), + datanode_lease_secs: options.datanode_lease_secs, + kv_store: kv_store.clone(), + meta_peer_client: meta_peer_client.clone(), + catalog: None, + schema: None, + table: None, + }; let region_failover_manager = Arc::new(RegionFailoverManager::new( + in_memory.clone(), mailbox.clone(), procedure_manager.clone(), selector.clone(), - SelectorContext { - server_addr: options.server_addr.clone(), - datanode_lease_secs: options.datanode_lease_secs, - kv_store: kv_store.clone(), - meta_peer_client: meta_peer_client.clone(), - catalog: None, - schema: None, - table: None, - }, + select_ctx, lock.clone(), table_metadata_manager.clone(), )); - Some( RegionFailureHandler::try_new(election.clone(), region_failover_manager) .await?, ) }; - let region_lease_handler = RegionLeaseHandler::new( - region_failover_handler - .as_ref() - .map(|x| x.region_failover_manager().clone()), - table_metadata_manager.clone(), - ); - let group = HeartbeatHandlerGroup::new(pushers); group.add_handler(ResponseHeaderHandler::default()).await; // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, @@ -253,7 +212,7 @@ impl MetaSrvBuilder { if let Some(region_failover_handler) = region_failover_handler { group.add_handler(region_failover_handler).await; } - group.add_handler(region_lease_handler).await; + group.add_handler(RegionLeaseHandler::default()).await; group.add_handler(PersistStatsHandler::default()).await; group } @@ -280,6 +239,80 @@ impl MetaSrvBuilder { } } +fn build_leader_cached_kv_store( + election: &Option, + kv_store: &KvStoreRef, +) -> Arc { + Arc::new(LeaderCachedKvStore::new( + Arc::new(CheckLeaderByElection(election.clone())), + kv_store.clone(), + )) +} + +fn build_default_meta_peer_client( + election: &Option, + in_memory: &ResettableKvStoreRef, +) -> MetaPeerClientRef { + MetaPeerClientBuilder::default() + .election(election.clone()) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap() +} + +fn build_mailbox(kv_store: &KvStoreRef, pushers: &Pushers) -> MailboxRef { + let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone()); + HeartbeatMailbox::create(pushers.clone(), mailbox_sequence) +} + +fn build_procedure_manager(options: &MetaSrvOptions, kv_store: &KvStoreRef) -> ProcedureManagerRef { + let manager_config = ManagerConfig { + max_retry_times: options.procedure.max_retry_times, + retry_delay: options.procedure.retry_delay, + ..Default::default() + }; + let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); + Arc::new(LocalManager::new(manager_config, state_store)) +} + +fn build_table_metadata_manager(kv_store: &KvStoreRef) -> TableMetadataManagerRef { + Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))) +} + +fn build_ddl_manager( + options: &MetaSrvOptions, + datanode_clients: Option>, + procedure_manager: &ProcedureManagerRef, + kv_store: &KvStoreRef, + mailbox: &MailboxRef, + table_metadata_manager: &TableMetadataManagerRef, +) -> DdlManagerRef { + let datanode_clients = datanode_clients.unwrap_or_else(|| { + let datanode_client_channel_config = ChannelConfig::new() + .timeout(Duration::from_millis( + options.datanode.client_options.timeout_millis, + )) + .connect_timeout(Duration::from_millis( + options.datanode.client_options.connect_timeout_millis, + )) + .tcp_nodelay(options.datanode.client_options.tcp_nodelay); + Arc::new(DatanodeClients::new(datanode_client_channel_config)) + }); + // TODO(weny): considers to modify the default config of procedure manager + Arc::new(DdlManager::new( + procedure_manager.clone(), + kv_store.clone(), + datanode_clients, + mailbox.clone(), + options.server_addr.clone(), + table_metadata_manager.clone(), + )) +} + impl Default for MetaSrvBuilder { fn default() -> Self { Self::new() diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 73268a4afe12..06b0049460ee 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -45,6 +45,7 @@ use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataMan use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::service::mailbox::MailboxRef; +use crate::service::store::kv::ResettableKvStoreRef; const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); const CLOSE_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(2); @@ -68,6 +69,7 @@ impl From for RegionFailoverKey { } pub(crate) struct RegionFailoverManager { + in_memory: ResettableKvStoreRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, selector: SelectorRef, @@ -90,6 +92,7 @@ impl Drop for FailoverProcedureGuard { impl RegionFailoverManager { pub(crate) fn new( + in_memory: ResettableKvStoreRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, selector: SelectorRef, @@ -98,6 +101,7 @@ impl RegionFailoverManager { table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { + in_memory, mailbox, procedure_manager, selector, @@ -110,6 +114,7 @@ impl RegionFailoverManager { pub(crate) fn create_context(&self) -> RegionFailoverContext { RegionFailoverContext { + in_memory: self.in_memory.clone(), mailbox: self.mailbox.clone(), selector: self.selector.clone(), selector_ctx: self.selector_ctx.clone(), @@ -133,10 +138,6 @@ impl RegionFailoverManager { }) } - pub(crate) fn is_region_failover_running(&self, key: &RegionFailoverKey) -> bool { - self.running_procedures.read().unwrap().contains(key) - } - fn insert_running_procedures( &self, failed_region: &RegionIdent, @@ -153,11 +154,6 @@ impl RegionFailoverManager { } } - #[cfg(test)] - pub(crate) fn running_procedures(&self) -> Arc>> { - self.running_procedures.clone() - } - pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> { let Some(guard) = self.insert_running_procedures(failed_region) else { warn!("Region failover procedure for region {failed_region} is already running!"); @@ -224,6 +220,7 @@ struct Node { /// The "Context" of region failover procedure state machine. #[derive(Clone)] pub struct RegionFailoverContext { + pub in_memory: ResettableKvStoreRef, pub mailbox: MailboxRef, pub selector: SelectorRef, pub selector_ctx: SelectorContext, @@ -463,6 +460,7 @@ mod tests { } pub async fn build(self) -> TestingEnv { + let in_memory = Arc::new(MemStore::new()); let kv_store: KvStoreRef = Arc::new(MemStore::new()); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) @@ -527,6 +525,7 @@ mod tests { TestingEnv { context: RegionFailoverContext { + in_memory, mailbox, selector, selector_ctx, diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index 66b8bef58ae4..8b07f6acb641 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -29,6 +29,7 @@ use crate::error::{ Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; use crate::handler::HeartbeatMailbox; +use crate::inactive_node_manager::InactiveNodeManager; use crate::procedure::region_failover::OPEN_REGION_MESSAGE_TIMEOUT; use crate::service::mailbox::{Channel, MailboxReceiver}; @@ -64,6 +65,21 @@ impl ActivateRegion { input: instruction.to_string(), })?; + // Ensure that metasrv will renew the lease for this candidate node. + // + // This operation may not be redundant, imagine the following scenario: + // This candidate once had the current region, and because it did not respond to the `close` + // command in time, it was considered an inactive node by metasrv, then it replied, and the + // current region failed over again, and the node was selected as a candidate, so it needs + // to clear its previous state first. + let candidate = RegionIdent { + datanode_id: self.candidate.id, + ..failed_region.clone() + }; + InactiveNodeManager::new(&ctx.in_memory) + .deregister_inactive_region(&candidate) + .await?; + let ch = Channel::Datanode(self.candidate.id); ctx.mailbox.send(&ch, msg, timeout).await } diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index 15ea43625582..c6c1dc23a77e 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -30,6 +30,7 @@ use crate::error::{ }; use crate::handler::region_lease_handler::REGION_LEASE_SECONDS; use crate::handler::HeartbeatMailbox; +use crate::inactive_node_manager::InactiveNodeManager; use crate::procedure::region_failover::CLOSE_REGION_MESSAGE_TIMEOUT; use crate::service::mailbox::{Channel, MailboxReceiver}; @@ -66,12 +67,17 @@ impl DeactivateRegion { input: instruction.to_string(), })?; + InactiveNodeManager::new(&ctx.in_memory) + .register_inactive_region(failed_region) + .await?; + let ch = Channel::Datanode(failed_region.datanode_id); ctx.mailbox.send(&ch, msg, timeout).await } async fn handle_response( self, + ctx: &RegionFailoverContext, mailbox_receiver: MailboxReceiver, failed_region: &RegionIdent, ) -> Result> { @@ -87,6 +93,10 @@ impl DeactivateRegion { }.fail(); }; if result { + InactiveNodeManager::new(&ctx.in_memory) + .deregister_inactive_region(failed_region) + .await?; + Ok(Box::new(ActivateRegion::new(self.candidate))) } else { // Under rare circumstances would a Datanode fail to close a Region. @@ -138,7 +148,8 @@ impl State for DeactivateRegion { Err(e) => return Err(e), }; - self.handle_response(mailbox_receiver, failed_region).await + self.handle_response(ctx, mailbox_receiver, failed_region) + .await } } @@ -207,7 +218,7 @@ mod tests { .unwrap(); let next_state = state - .handle_response(mailbox_receiver, &failed_region) + .handle_response(&env.context, mailbox_receiver, &failed_region) .await .unwrap(); assert_eq!( @@ -251,7 +262,7 @@ mod tests { ); let next_state = state - .handle_response(mailbox_receiver, &failed_region) + .handle_response(&env.context, mailbox_receiver, &failed_region) .await .unwrap(); // Timeout or not, proceed to `ActivateRegion`. diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 0e420e1e4746..4045d5a898ef 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -13,6 +13,7 @@ // limitations under the License. use api::v1::meta::Peer; +use common_meta::ident::TableIdent; use common_telemetry::warn; use crate::error::Result; @@ -102,9 +103,13 @@ fn contains_table( if let Some(latest) = may_latest { for RegionStat { - catalog, - schema, - table, + table_ident: + TableIdent { + catalog, + schema, + table, + .. + }, .. } in latest.region_stats.iter() { @@ -121,6 +126,8 @@ fn contains_table( #[cfg(test)] mod tests { + use common_meta::ident::TableIdent; + use crate::handler::node_stat::{RegionStat, Stat}; use crate::keys::StatValue; use crate::selector::load_based::contains_table; @@ -135,21 +142,30 @@ mod tests { Stat { region_stats: vec![ RegionStat { - catalog: "greptime_1".to_string(), - schema: "public_1".to_string(), - table: "demo_1".to_string(), + table_ident: TableIdent { + catalog: "greptime_1".to_string(), + schema: "public_1".to_string(), + table: "demo_1".to_string(), + ..Default::default() + }, ..Default::default() }, RegionStat { - catalog: "greptime_2".to_string(), - schema: "public_2".to_string(), - table: "demo_2".to_string(), + table_ident: TableIdent { + catalog: "greptime_2".to_string(), + schema: "public_2".to_string(), + table: "demo_2".to_string(), + ..Default::default() + }, ..Default::default() }, RegionStat { - catalog: "greptime_3".to_string(), - schema: "public_3".to_string(), - table: "demo_3".to_string(), + table_ident: TableIdent { + catalog: "greptime_3".to_string(), + schema: "public_3".to_string(), + table: "demo_3".to_string(), + ..Default::default() + }, ..Default::default() }, ], @@ -158,21 +174,30 @@ mod tests { Stat { region_stats: vec![ RegionStat { - catalog: "greptime_1".to_string(), - schema: "public_1".to_string(), - table: "demo_1".to_string(), + table_ident: TableIdent { + catalog: "greptime_1".to_string(), + schema: "public_1".to_string(), + table: "demo_1".to_string(), + ..Default::default() + }, ..Default::default() }, RegionStat { - catalog: "greptime_2".to_string(), - schema: "public_2".to_string(), - table: "demo_2".to_string(), + table_ident: TableIdent { + catalog: "greptime_2".to_string(), + schema: "public_2".to_string(), + table: "demo_2".to_string(), + ..Default::default() + }, ..Default::default() }, RegionStat { - catalog: "greptime_3".to_string(), - schema: "public_3".to_string(), - table: "demo_3".to_string(), + table_ident: TableIdent { + catalog: "greptime_3".to_string(), + schema: "public_3".to_string(), + table: "demo_3".to_string(), + ..Default::default() + }, ..Default::default() }, ], @@ -181,21 +206,30 @@ mod tests { Stat { region_stats: vec![ RegionStat { - catalog: "greptime_1".to_string(), - schema: "public_1".to_string(), - table: "demo_1".to_string(), + table_ident: TableIdent { + catalog: "greptime_1".to_string(), + schema: "public_1".to_string(), + table: "demo_1".to_string(), + ..Default::default() + }, ..Default::default() }, RegionStat { - catalog: "greptime_2".to_string(), - schema: "public_2".to_string(), - table: "demo_2".to_string(), + table_ident: TableIdent { + catalog: "greptime_2".to_string(), + schema: "public_2".to_string(), + table: "demo_2".to_string(), + ..Default::default() + }, ..Default::default() }, RegionStat { - catalog: "greptime_4".to_string(), - schema: "public_4".to_string(), - table: "demo_4".to_string(), + table_ident: TableIdent { + catalog: "greptime_4".to_string(), + schema: "public_4".to_string(), + table: "demo_4".to_string(), + ..Default::default() + }, ..Default::default() }, ], diff --git a/src/meta-srv/src/sequence.rs b/src/meta-srv/src/sequence.rs index 8dbd0771d245..4718ece71edb 100644 --- a/src/meta-srv/src/sequence.rs +++ b/src/meta-srv/src/sequence.rs @@ -213,6 +213,10 @@ mod tests { "Noop" } + fn as_any(&self) -> &dyn Any { + self + } + async fn range(&self, _: RangeRequest) -> Result { unreachable!() } @@ -240,16 +244,12 @@ mod tests { unreachable!() } - async fn move_value(&self, _: MoveValueRequest) -> Result { - unreachable!() - } - async fn batch_delete(&self, _: BatchDeleteRequest) -> Result { unreachable!() } - fn as_any(&self) -> &dyn Any { - self + async fn move_value(&self, _: MoveValueRequest) -> Result { + unreachable!() } } diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index 79ecf36992c9..080bd18f0782 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -120,6 +120,10 @@ impl KvBackend for LeaderCachedKvStore { &self.name } + fn as_any(&self) -> &dyn Any { + self + } + async fn range(&self, req: RangeRequest) -> Result { if !self.is_leader() { return self.store.range(req).await; @@ -173,6 +177,24 @@ impl KvBackend for LeaderCachedKvStore { Ok(res) } + async fn batch_put(&self, req: BatchPutRequest) -> Result { + if !self.is_leader() { + return self.store.batch_put(req).await; + } + + let ver = self.create_new_version(); + + let res = self.store.batch_put(req.clone()).await?; + let _ = self.cache.batch_put(req.clone()).await?; + + if !self.validate_version(ver) { + let keys = req.kvs.into_iter().map(|kv| kv.key).collect::>(); + self.invalid_keys(keys).await?; + } + + Ok(res) + } + async fn batch_get(&self, req: BatchGetRequest) -> Result { if !self.is_leader() { return self.store.batch_get(req).await; @@ -220,36 +242,6 @@ impl KvBackend for LeaderCachedKvStore { Ok(merged_res) } - async fn batch_put(&self, req: BatchPutRequest) -> Result { - if !self.is_leader() { - return self.store.batch_put(req).await; - } - - let ver = self.create_new_version(); - - let res = self.store.batch_put(req.clone()).await?; - let _ = self.cache.batch_put(req.clone()).await?; - - if !self.validate_version(ver) { - let keys = req.kvs.into_iter().map(|kv| kv.key).collect::>(); - self.invalid_keys(keys).await?; - } - - Ok(res) - } - - async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { - if !self.is_leader() { - return self.store.batch_delete(req).await; - } - - let _ = self.create_new_version(); - - let res = self.store.batch_delete(req.clone()).await?; - let _ = self.cache.batch_delete(req).await?; - Ok(res) - } - async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { if !self.is_leader() { return self.store.compare_and_put(req).await; @@ -279,6 +271,18 @@ impl KvBackend for LeaderCachedKvStore { Ok(res) } + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + if !self.is_leader() { + return self.store.batch_delete(req).await; + } + + let _ = self.create_new_version(); + + let res = self.store.batch_delete(req.clone()).await?; + let _ = self.cache.batch_delete(req).await?; + Ok(res) + } + async fn move_value(&self, req: MoveValueRequest) -> Result { if !self.is_leader() { return self.store.move_value(req).await; @@ -297,10 +301,6 @@ impl KvBackend for LeaderCachedKvStore { self.invalid_keys(vec![from_key, to_key]).await?; Ok(res) } - - fn as_any(&self) -> &dyn Any { - self - } } #[async_trait::async_trait] diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 50c18fc9be30..21aa4cf7022f 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -97,6 +97,10 @@ impl KvBackend for EtcdStore { "Etcd" } + fn as_any(&self) -> &dyn Any { + self + } + async fn range(&self, req: RangeRequest) -> Result { let Get { key, options } = req.try_into()?; @@ -137,31 +141,6 @@ impl KvBackend for EtcdStore { Ok(PutResponse { prev_kv }) } - async fn batch_get(&self, req: BatchGetRequest) -> Result { - let BatchGet { keys, options } = req.try_into()?; - - let get_ops: Vec<_> = keys - .into_iter() - .map(|k| TxnOp::get(k, options.clone())) - .collect(); - - let txn_responses = self.do_multi_txn(get_ops).await?; - - let mut kvs = vec![]; - for txn_res in txn_responses { - for op_res in txn_res.op_responses() { - let get_res = match op_res { - TxnOpResponse::Get(get_res) => get_res, - _ => unreachable!(), - }; - - kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv)); - } - } - - Ok(BatchGetResponse { kvs }) - } - async fn batch_put(&self, req: BatchPutRequest) -> Result { let BatchPut { kvs, options } = req.try_into()?; @@ -189,32 +168,29 @@ impl KvBackend for EtcdStore { Ok(BatchPutResponse { prev_kvs }) } - async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { - let BatchDelete { keys, options } = req.try_into()?; - - let mut prev_kvs = Vec::with_capacity(keys.len()); + async fn batch_get(&self, req: BatchGetRequest) -> Result { + let BatchGet { keys, options } = req.try_into()?; - let delete_ops = keys + let get_ops: Vec<_> = keys .into_iter() - .map(|k| TxnOp::delete(k, options.clone())) - .collect::>(); + .map(|k| TxnOp::get(k, options.clone())) + .collect(); - let txn_responses = self.do_multi_txn(delete_ops).await?; + let txn_responses = self.do_multi_txn(get_ops).await?; + let mut kvs = vec![]; for txn_res in txn_responses { for op_res in txn_res.op_responses() { - match op_res { - TxnOpResponse::Delete(delete_res) => { - delete_res.prev_kvs().iter().for_each(|kv| { - prev_kvs.push(KvPair::from_etcd_kv(kv)); - }); - } + let get_res = match op_res { + TxnOpResponse::Get(get_res) => get_res, _ => unreachable!(), - } + }; + + kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv)); } } - Ok(BatchDeleteResponse { prev_kvs }) + Ok(BatchGetResponse { kvs }) } async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { @@ -286,6 +262,34 @@ impl KvBackend for EtcdStore { }) } + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + let BatchDelete { keys, options } = req.try_into()?; + + let mut prev_kvs = Vec::with_capacity(keys.len()); + + let delete_ops = keys + .into_iter() + .map(|k| TxnOp::delete(k, options.clone())) + .collect::>(); + + let txn_responses = self.do_multi_txn(delete_ops).await?; + + for txn_res in txn_responses { + for op_res in txn_res.op_responses() { + match op_res { + TxnOpResponse::Delete(delete_res) => { + delete_res.prev_kvs().iter().for_each(|kv| { + prev_kvs.push(KvPair::from_etcd_kv(kv)); + }); + } + _ => unreachable!(), + } + } + } + + Ok(BatchDeleteResponse { prev_kvs }) + } + async fn move_value(&self, req: MoveValueRequest) -> Result { let MoveValue { from_key, @@ -358,10 +362,6 @@ impl KvBackend for EtcdStore { } .fail() } - - fn as_any(&self) -> &dyn Any { - self - } } #[async_trait::async_trait] diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index 28b848574028..ebe3b27e0e28 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -65,6 +65,10 @@ impl KvBackend for KvBackendAdapter { self.0.name() } + fn as_any(&self) -> &dyn Any { + self.0.as_any() + } + async fn range(&self, req: RangeRequest) -> Result { self.0 .range(req) @@ -89,6 +93,14 @@ impl KvBackend for KvBackendAdapter { .context(MetaSrvSnafu) } + async fn batch_get(&self, req: BatchGetRequest) -> Result { + self.0 + .batch_get(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + async fn compare_and_put( &self, req: CompareAndPutRequest, @@ -122,14 +134,6 @@ impl KvBackend for KvBackendAdapter { .context(MetaSrvSnafu) } - async fn batch_get(&self, req: BatchGetRequest) -> Result { - self.0 - .batch_get(req) - .await - .map_err(BoxedError::new) - .context(MetaSrvSnafu) - } - async fn move_value(&self, req: MoveValueRequest) -> Result { self.0 .move_value(req) @@ -137,8 +141,4 @@ impl KvBackend for KvBackendAdapter { .map_err(BoxedError::new) .context(MetaSrvSnafu) } - - fn as_any(&self) -> &dyn Any { - self.0.as_any() - } } diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 2af0b48023da..e040fc28bff3 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -41,7 +41,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { let in_memory = Arc::new(MemStore::new()); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) - .in_memory(in_memory) + .in_memory(in_memory.clone()) .build() .map(Arc::new) // Safety: all required fields set at initialization @@ -59,6 +59,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { }; Arc::new(RegionFailoverManager::new( + in_memory, mailbox, procedure_manager, selector, diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index f461c20d5a9c..e904f75274ba 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -349,6 +349,7 @@ async fn run_region_failover_procedure( let procedure = RegionFailoverProcedure::new( failed_region.clone(), RegionFailoverContext { + in_memory: meta_srv.in_memory().clone(), mailbox: meta_srv.mailbox().clone(), selector, selector_ctx: SelectorContext {