diff --git a/Cargo.lock b/Cargo.lock index 656600d553e5..60dfd87f06ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3894,7 +3894,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=47af36e2b218bdb09fa3a84a31999245152db2ee#47af36e2b218bdb09fa3a84a31999245152db2ee" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e152fcbf173b8759dd8a91ce7f6f4b0ca987828e#e152fcbf173b8759dd8a91ce7f6f4b0ca987828e" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 6579614ab3ca..526c6a46bf2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "47af36e2b218bdb09fa3a84a31999245152db2ee" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e152fcbf173b8759dd8a91ce7f6f4b0ca987828e" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 788778d49f18..c035a172e322 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -264,7 +264,6 @@ impl ClusterInfo for MetaClient { let mut nodes = if get_metasrv_nodes { let last_activity_ts = -1; // Metasrv does not provide this information. - let start_time_ms = 0; let (leader, followers) = cluster_client.get_metasrv_peers().await?; followers @@ -275,7 +274,7 @@ impl ClusterInfo for MetaClient { status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }), version: node.version, git_commit: node.git_commit, - start_time_ms, + start_time_ms: node.start_time_ms, }) .chain(leader.into_iter().map(|node| NodeInfo { peer: node.peer.map(|p| p.into()).unwrap_or_default(), @@ -283,7 +282,7 @@ impl ClusterInfo for MetaClient { status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }), version: node.version, git_commit: node.git_commit, - start_time_ms, + start_time_ms: node.start_time_ms, })) .collect::>() } else { diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 50cac6a27e77..d73f453a88f0 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -68,7 +68,7 @@ pub trait Election: Send + Sync { fn in_infancy(&self) -> bool; /// Registers a candidate for the election. - async fn register_candidate(&self) -> Result<()>; + async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>; /// Gets all candidates in the election. async fn all_candidates(&self) -> Result>; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index e15e7cbc2767..1cc72ace2e20 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -131,7 +131,7 @@ impl Election for EtcdElection { .is_ok() } - async fn register_candidate(&self) -> Result<()> { + async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { const CANDIDATE_LEASE_SECS: u64 = 600; const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; @@ -144,15 +144,9 @@ impl Election for EtcdElection { // The register info: key is the candidate key, value is its node info(addr, version, git_commit). let key = self.candidate_key().into_bytes(); - let build_info = common_version::build_info(); - let value = MetasrvNodeInfo { - addr: self.leader_value.clone(), - version: build_info.version.to_string(), - git_commit: build_info.commit_short.to_string(), - }; - let value = serde_json::to_string(&value) + let value = serde_json::to_string(node_info) .with_context(|_| error::SerializeToJsonSnafu { - input: format!("{value:?}"), + input: format!("{node_info:?}"), })? .into_bytes(); // Puts with the lease id diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 311ecd25382c..c34187832e56 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -225,6 +225,8 @@ pub struct MetasrvNodeInfo { pub version: String, // The node build git commit hash pub git_commit: String, + // The node start timestamp in milliseconds + pub start_time_ms: u64, } impl From for api::v1::meta::MetasrvNodeInfo { @@ -236,6 +238,7 @@ impl From for api::v1::meta::MetasrvNodeInfo { }), version: node_info.version, git_commit: node_info.git_commit, + start_time_ms: node_info.start_time_ms, } } } @@ -305,6 +308,7 @@ impl MetaStateHandler { pub struct Metasrv { state: StateRef, started: Arc, + start_time_ms: u64, options: MetasrvOptions, // It is only valid at the leader node and is used to temporarily // store some data that will not be persisted. @@ -339,7 +343,11 @@ impl Metasrv { return Ok(()); } - self.create_default_schema_if_not_exist().await?; + // Creates default schema if not exists + self.table_metadata_manager + .init() + .await + .context(InitMetadataSnafu)?; if let Some(election) = self.election() { let procedure_manager = self.procedure_manager.clone(); @@ -394,9 +402,10 @@ impl Metasrv { { let election = election.clone(); let started = self.started.clone(); + let node_info = self.node_info(); let _handle = common_runtime::spawn_bg(async move { while started.load(Ordering::Relaxed) { - let res = election.register_candidate().await; + let res = election.register_candidate(&node_info).await; if let Err(e) = res { warn!("Metasrv register candidate error: {}", e); } @@ -435,14 +444,8 @@ impl Metasrv { } info!("Metasrv started"); - Ok(()) - } - async fn create_default_schema_if_not_exist(&self) -> Result<()> { - self.table_metadata_manager - .init() - .await - .context(InitMetadataSnafu) + Ok(()) } pub async fn shutdown(&self) -> Result<()> { @@ -453,6 +456,20 @@ impl Metasrv { .context(StopProcedureManagerSnafu) } + pub fn start_time_ms(&self) -> u64 { + self.start_time_ms + } + + pub fn node_info(&self) -> MetasrvNodeInfo { + let build_info = common_version::build_info(); + MetasrvNodeInfo { + addr: self.options().server_addr.clone(), + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + start_time_ms: self.start_time_ms(), + } + } + /// Lookup a peer by peer_id, return it only when it's alive. pub(crate) async fn lookup_peer( &self, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index a2cd95e67b2c..5e082fa4aca5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -360,6 +360,7 @@ impl MetasrvBuilder { Ok(Metasrv { state, started, + start_time_ms: common_time::util::current_time_millis() as u64, options, in_memory, kv_backend, diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index afc0f608a6be..f5f5661b01d5 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -88,35 +88,23 @@ impl cluster_server::Cluster for Metasrv { return Ok(Response::new(resp)); } - fn make_node_info(addr: String) -> Option { - let build_info = common_version::build_info(); - Some( - metasrv::MetasrvNodeInfo { - addr, - version: build_info.version.to_string(), - git_commit: build_info.commit_short.to_string(), - } - .into(), - ) - } - + let leader_addr = &self.options().server_addr; let (leader, followers) = match self.election() { Some(election) => { - let leader = election.leader().await?; let nodes = election.all_candidates().await?; let followers = nodes .into_iter() - .filter(|node_info| node_info.addr != leader.0) + .filter(|node_info| &node_info.addr != leader_addr) .map(api::v1::meta::MetasrvNodeInfo::from) .collect(); - (make_node_info(leader.0.clone()), followers) + (self.node_info().into(), followers) } - None => (make_node_info(self.options().server_addr.clone()), vec![]), + None => (self.make_node_info(leader_addr), vec![]), }; let resp = MetasrvPeersResponse { header: Some(ResponseHeader::success(0)), - leader, + leader: Some(leader), followers, }; @@ -129,4 +117,15 @@ impl Metasrv { // Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader. self.election().map(|x| x.is_leader()).unwrap_or(true) } + + fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo { + let build_info = common_version::build_info(); + metasrv::MetasrvNodeInfo { + addr: addr.to_string(), + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + start_time_ms: self.start_time_ms(), + } + .into() + } } diff --git a/tests/cases/distributed/information_schema/cluster_info.result b/tests/cases/distributed/information_schema/cluster_info.result index 6388ebbbf22d..e0817e60f3e7 100644 --- a/tests/cases/distributed/information_schema/cluster_info.result +++ b/tests/cases/distributed/information_schema/cluster_info.result @@ -25,7 +25,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; -+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++ ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++ -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version @@ -35,7 +35,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; -+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++ ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++ -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version @@ -55,7 +55,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; -+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++ ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++ -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version