Skip to content

Commit

Permalink
feat: metasrvs nodeinfo (#3873)
Browse files Browse the repository at this point in the history
* feat: get metasrv nodeinfo

* fix: sqlness test

* chore: by comment

* feat: proto version
  • Loading branch information
fengjiachun authored May 7, 2024
1 parent 65f80af commit 6e1cc1d
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 69 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a191edaea1089362a86ebc7d8e98ee9a1bd522d1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "07de9a47ed46eae0fab4cbc5c32a17c67ed5cb38" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/information_schema/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ impl InformationSchemaClusterInfoBuilder {
return;
}

if peer_type == "FRONTEND" {
// Always set peer_id to be -1 for frontends
if peer_type == "FRONTEND" || peer_type == "METASRV" {
// Always set peer_id to be -1 for frontends and metasrvs
self.peer_ids.push(Some(-1));
} else {
self.peer_ids.push(Some(node_info.peer.id as i64));
Expand Down
20 changes: 8 additions & 12 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,29 +264,25 @@ impl ClusterInfo for MetaClient {

let mut nodes = if get_metasrv_nodes {
let last_activity_ts = -1; // Metasrv does not provide this information.

// TODO(dennis): Get Metasrv node info
let git_commit = "unknown";
let version = "unknown";
let start_time_ms = 0;

let (leader, followers) = cluster_client.get_metasrv_peers().await?;
followers
.into_iter()
.map(|peer| NodeInfo {
peer,
.map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: version.to_string(),
git_commit: git_commit.to_string(),
version: node.version,
git_commit: node.git_commit,
start_time_ms,
})
.chain(leader.into_iter().map(|leader| NodeInfo {
peer: leader,
.chain(leader.into_iter().map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: version.to_string(),
git_commit: git_commit.to_string(),
version: node.version,
git_commit: node.git_commit,
start_time_ms,
}))
.collect::<Vec<_>>()
Expand Down
15 changes: 6 additions & 9 deletions src/meta-client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use std::sync::Arc;

use api::greptime_proto::v1;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{MetasrvPeersRequest, ResponseHeader, Role};
use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::Peer;
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse};
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -72,7 +71,9 @@ impl Client {
inner.batch_get(req).await
}

pub async fn get_metasrv_peers(&self) -> Result<(Option<Peer>, Vec<Peer>)> {
pub async fn get_metasrv_peers(
&self,
) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
let inner = self.inner.read().await;
inner.get_metasrv_peers().await
}
Expand Down Expand Up @@ -225,7 +226,7 @@ impl Inner {
.context(ConvertMetaResponseSnafu)
}

async fn get_metasrv_peers(&self) -> Result<(Option<Peer>, Vec<Peer>)> {
async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> {
self.with_retry(
"get_metasrv_peers",
move |mut client| {
Expand All @@ -241,10 +242,6 @@ impl Inner {
|res| &res.header,
)
.await
.map(|res| {
let leader = res.leader.map(|x| x.into());
let peers = res.followers.into_iter().map(|x| x.into()).collect();
(leader, peers)
})
.map(|res| (res.leader, res.followers))
}
}
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ common-procedure.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use etcd_client::LeaderKey;
use tokio::sync::broadcast::Receiver;

use crate::error::Result;
use crate::metasrv::LeaderValue;
use crate::metasrv::MetasrvNodeInfo;

pub const ELECTION_KEY: &str = "__metasrv_election";
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
Expand Down Expand Up @@ -71,7 +71,7 @@ pub trait Election: Send + Sync {
async fn register_candidate(&self) -> Result<()>;

/// Gets all candidates in the election.
async fn all_candidates(&self) -> Result<Vec<LeaderValue>>;
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;

/// Campaign waits to acquire leadership in an election.
///
Expand Down
32 changes: 27 additions & 5 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::sync::broadcast::Receiver;
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};

pub struct EtcdElection {
leader_value: String,
Expand Down Expand Up @@ -142,9 +142,19 @@ impl Election for EtcdElection {
.context(error::EtcdFailedSnafu)?;
let lease_id = res.id();

// The register info: key is the candidate key, value is its leader value.
// The register info: key is the candidate key, value is its node info(addr, version, git_commit).
let key = self.candidate_key().into_bytes();
let value = self.leader_value.clone().into_bytes();
let build_info = common_version::build_info();
let value = MetasrvNodeInfo {
addr: self.leader_value.clone(),
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
};
let value = serde_json::to_string(&value)
.with_context(|_| error::SerializeToJsonSnafu {
input: format!("{value:?}"),
})?
.into_bytes();
// Puts with the lease id
self.client
.kv_client()
Expand Down Expand Up @@ -175,15 +185,27 @@ impl Election for EtcdElection {
Ok(())
}

async fn all_candidates(&self) -> Result<Vec<LeaderValue>> {
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
let key = self.candidate_root().into_bytes();
let res = self
.client
.kv_client()
.get(key, Some(GetOptions::new().with_prefix()))
.await
.context(error::EtcdFailedSnafu)?;
res.kvs().iter().map(|kv| Ok(kv.value().into())).collect()

let mut nodes = Vec::with_capacity(res.kvs().len());
for kv in res.kvs() {
let node =
serde_json::from_slice::<MetasrvNodeInfo>(kv.value()).with_context(|_| {
error::DeserializeFromJsonSnafu {
input: String::from_utf8_lossy(kv.value()),
}
})?;
nodes.push(node);
}

Ok(nodes)
}

async fn campaign(&self) -> Result<()> {
Expand Down
24 changes: 24 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl Context {
}
}

/// The value of the leader. It is used to store the leader's address.
pub struct LeaderValue(pub String);

impl<T: AsRef<[u8]>> From<T> for LeaderValue {
Expand All @@ -216,6 +217,29 @@ impl<T: AsRef<[u8]>> From<T> for LeaderValue {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetasrvNodeInfo {
// The metasrv's address
pub addr: String,
// The node build version
pub version: String,
// The node build git commit hash
pub git_commit: String,
}

impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
fn from(node_info: MetasrvNodeInfo) -> Self {
Self {
peer: Some(api::v1::meta::Peer {
addr: node_info.addr,
..Default::default()
}),
version: node_info.version,
git_commit: node_info.git_commit,
}
}
}

#[derive(Clone)]
pub struct SelectorContext {
pub server_addr: String,
Expand Down
46 changes: 22 additions & 24 deletions src/meta-srv/src/service/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

use api::v1::meta::{
cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
Error, MetasrvPeersRequest, MetasrvPeersResponse, Peer, RangeRequest as PbRangeRequest,
RangeResponse as PbRangeResponse, ResponseHeader,
Error, MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse,
RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
};
use common_telemetry::warn;
use snafu::ResultExt;
use tonic::{Request, Response};

use crate::error;
use crate::metasrv::Metasrv;
use crate::service::GrpcResult;
use crate::{error, metasrv};

#[async_trait::async_trait]
impl cluster_server::Cluster for Metasrv {
Expand Down Expand Up @@ -88,40 +88,38 @@ impl cluster_server::Cluster for Metasrv {
return Ok(Response::new(resp));
}

fn make_node_info(addr: String) -> Option<MetasrvNodeInfo> {
let build_info = common_version::build_info();
Some(
metasrv::MetasrvNodeInfo {
addr,
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
}
.into(),
)
}

let (leader, followers) = match self.election() {
Some(election) => {
let leader = election.leader().await?;
let peers = election.all_candidates().await?;
let followers = peers
let nodes = election.all_candidates().await?;
let followers = nodes
.into_iter()
.filter(|peer| peer.0 != leader.0)
.map(|peer| Peer {
addr: peer.0,
..Default::default()
})
.filter(|node_info| node_info.addr != leader.0)
.map(api::v1::meta::MetasrvNodeInfo::from)
.collect();
(
Some(Peer {
addr: leader.0,
..Default::default()
}),
followers,
)
(make_node_info(leader.0.clone()), followers)
}
None => (
Some(Peer {
addr: self.options().server_addr.clone(),
..Default::default()
}),
vec![],
),
None => (make_node_info(self.options().server_addr.clone()), vec![]),
};

let resp = MetasrvPeersResponse {
header: Some(ResponseHeader::success(0)),
leader,
followers,
};

Ok(Response::new(resp))
}
}
Expand Down
11 changes: 3 additions & 8 deletions tests/cases/distributed/information_schema/cluster_info.result
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,26 @@ DESC TABLE CLUSTER_INFO;
+-------------+----------------------+-----+------+---------+---------------+

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
Expand All @@ -51,18 +48,16 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration|+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||0|METASRV|127.0.0.1:3002|UNKNOWN|UNKNOWN||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE unknown UNKNOWN
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{23}) Start_time
Expand Down
Loading

0 comments on commit 6e1cc1d

Please sign in to comment.