Skip to content

Commit

Permalink
feat: add random weighted choose in load_based selector (#2234)
Browse files Browse the repository at this point in the history
* feat: add random weigted choose in load_based selector

* fix: meta cannot save heartbeats when cluster have no region

* chore: print some log

* chore: remove unused code

* cr

* add some logs when filter result is empty
  • Loading branch information
fengys1996 authored Nov 20, 2023
1 parent 01867ad commit da68d8c
Show file tree
Hide file tree
Showing 21 changed files with 913 additions and 96 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl MetaClient {
mod tests {
use api::v1::meta::{HeartbeatRequest, Peer};
use meta_srv::metasrv::SelectorContext;
use meta_srv::selector::{Namespace, Selector};
use meta_srv::selector::{Namespace, Selector, SelectorOptions};
use meta_srv::Result as MetaResult;

use super::*;
Expand Down Expand Up @@ -547,7 +547,12 @@ mod tests {
type Context = SelectorContext;
type Output = Vec<Peer>;

async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> MetaResult<Self::Output> {
async fn select(
&self,
_ns: Namespace,
_ctx: &Self::Context,
_opts: SelectorOptions,
) -> MetaResult<Self::Output> {
Ok(vec![
Peer {
id: 0,
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ futures.workspace = true
h2 = "0.3"
http-body = "0.4"
humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
parking_lot = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result<M
let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;

let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector) as SelectorRef,
SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
};

Expand Down
19 changes: 16 additions & 3 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_macro::stack_trace_debug;
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::JoinError;
use rand::distributions::WeightedError;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -114,13 +115,13 @@ pub enum Error {
},

#[snafu(display(
"Failed to request Datanode, expected: {}, but only {} available",
expected,
"Failed to request Datanode, required: {}, but only {} available",
required,
available
))]
NoEnoughAvailableDatanode {
location: Location,
expected: usize,
required: usize,
available: usize,
},

Expand Down Expand Up @@ -562,6 +563,16 @@ pub enum Error {
operation: String,
location: Location,
},

#[snafu(display("Failed to set weight array"))]
WeightArray {
#[snafu(source)]
error: WeightedError,
location: Location,
},

#[snafu(display("Weight array is not set"))]
NotSetWeightArray { location: Location },
}

impl Error {
Expand Down Expand Up @@ -611,6 +622,8 @@ impl ErrorExt for Error {
| Error::NoEnoughAvailableDatanode { .. }
| Error::PublishMessage { .. }
| Error::Join { .. }
| Error::WeightArray { .. }
| Error::NotSetWeightArray { .. }
| Error::Unsupported { .. } => StatusCode::Internal,
Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Error::EmptyKey { .. }
Expand Down
5 changes: 1 addition & 4 deletions src/meta-srv/src/handler/collect_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ impl HeartbeatHandler for CollectStatsHandler {

match Stat::try_from(req.clone()) {
Ok(stat) => {
// If stat is empty, it means the request is a mailbox response
if !stat.is_empty() {
let _ = acc.stat.insert(stat);
}
let _ = acc.stat.insert(stat);
}
Err(err) => {
warn!("Incomplete heartbeat data: {:?}, err: {:?}", req, err);
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/mailbox_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl HeartbeatHandler for MailboxHandler {
) -> Result<()> {
if let Some(message) = &req.mailbox_message {
ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?;
ctx.set_skip_all();
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use store_api::storage::RegionId;
use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::keys::StatKey;

#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Stat {
pub timestamp_millis: i64,
pub cluster_id: u64,
Expand All @@ -42,7 +42,7 @@ pub struct Stat {
pub node_epoch: u64,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionStat {
/// The region_id.
pub id: u64,
Expand Down
33 changes: 32 additions & 1 deletion src/meta-srv/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl TryFrom<Vec<u8>> for StatKey {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct StatValue {
pub stats: Vec<Stat>,
Expand All @@ -198,6 +198,11 @@ impl StatValue {
pub fn region_num(&self) -> Option<u64> {
self.stats.last().map(|x| x.region_num)
}

/// Get the latest node addr.
pub fn node_addr(&self) -> Option<String> {
self.stats.last().map(|x| x.addr.clone())
}
}

impl TryFrom<StatValue> for Vec<u8> {
Expand Down Expand Up @@ -365,6 +370,32 @@ mod tests {
assert_eq!(new_value, value);
}

#[test]
fn test_get_addr_from_stat_val() {
let empty = StatValue { stats: vec![] };
let addr = empty.node_addr();
assert!(addr.is_none());

let stat_val = StatValue {
stats: vec![
Stat {
addr: "1".to_string(),
..Default::default()
},
Stat {
addr: "2".to_string(),
..Default::default()
},
Stat {
addr: "3".to_string(),
..Default::default()
},
],
};
let addr = stat_val.node_addr().unwrap();
assert_eq!("3", addr);
}

#[test]
fn test_get_region_num_from_stat_val() {
let empty = StatValue { stats: vec![] };
Expand Down
16 changes: 13 additions & 3 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ mod tests {
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::lock::memory::MemLock;
use crate::selector::{Namespace, Selector};
use crate::selector::{Namespace, Selector, SelectorOptions};
use crate::service::mailbox::Channel;
use crate::test_util;

Expand All @@ -413,7 +413,12 @@ mod tests {
type Context = SelectorContext;
type Output = Vec<Peer>;

async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result<Self::Output> {
async fn select(
&self,
_ns: Namespace,
_ctx: &Self::Context,
_opts: SelectorOptions,
) -> Result<Self::Output> {
let mut rng = rand::thread_rng();
let mut nodes = self.nodes.clone();
nodes.shuffle(&mut rng);
Expand Down Expand Up @@ -711,7 +716,12 @@ mod tests {
type Context = SelectorContext;
type Output = Vec<Peer>;

async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result<Self::Output> {
async fn select(
&self,
_ns: Namespace,
_ctx: &Self::Context,
_opts: SelectorOptions,
) -> Result<Self::Output> {
let mut peers = self.peers.lock().unwrap();
Ok(if let Some(Some(peer)) = peers.pop() {
vec![peer]
Expand Down
4 changes: 3 additions & 1 deletion src/meta-srv/src/procedure/region_failover/failover_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use snafu::ensure;
use super::deactivate_region::DeactivateRegion;
use super::{RegionFailoverContext, State};
use crate::error::{RegionFailoverCandidatesNotFoundSnafu, Result, RetryLaterSnafu};
use crate::selector::SelectorOptions;

#[derive(Serialize, Deserialize, Debug)]
pub(super) struct RegionFailoverStart {
Expand All @@ -50,9 +51,10 @@ impl RegionFailoverStart {
selector_ctx.table_id = Some(failed_region.table_id);

let cluster_id = failed_region.cluster_id;
let opts = SelectorOptions::default();
let candidates = ctx
.selector
.select(cluster_id, &selector_ctx)
.select(cluster_id, &selector_ctx, opts)
.await?
.iter()
.filter_map(|p| {
Expand Down
27 changes: 26 additions & 1 deletion src/meta-srv/src/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod common;
pub mod lease_based;
pub mod load_based;
mod weight_compute;
mod weighted_choose;

use serde::{Deserialize, Serialize};

Expand All @@ -27,7 +30,29 @@ pub trait Selector: Send + Sync {
type Context;
type Output;

async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output>;
async fn select(
&self,
ns: Namespace,
ctx: &Self::Context,
opts: SelectorOptions,
) -> Result<Self::Output>;
}

#[derive(Debug)]
pub struct SelectorOptions {
/// Minimum number of selected results.
pub min_required_items: usize,
/// Whether duplicates are allowed in the selected result, default false.
pub allow_duplication: bool,
}

impl Default for SelectorOptions {
fn default() -> Self {
Self {
min_required_items: 1,
allow_duplication: false,
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
Expand Down
Loading

0 comments on commit da68d8c

Please sign in to comment.