Skip to content

Commit

Permalink
feat: impl information_schema.cluster_info table
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Apr 29, 2024
1 parent c67f97a commit d5dfd03
Show file tree
Hide file tree
Showing 16 changed files with 198 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display("Failed to list nodes in cluster: {source}"))]
ListNodes {
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to re-compile script due to internal error"))]
CompileScriptInternal {
location: Location,
Expand Down Expand Up @@ -294,6 +300,7 @@ impl ErrorExt for Error {
}

Error::ListCatalogs { source, .. }
| Error::ListNodes { source, .. }
| Error::ListSchemas { source, .. }
| Error::ListTables { source, .. } => source.status_code(),

Expand Down
10 changes: 10 additions & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod cluster_info;
pub mod columns;
pub mod key_column_usage;
mod memory_table;
Expand All @@ -23,6 +24,7 @@ pub mod schemata;
mod table_constraints;
mod table_names;
pub mod tables;
pub(crate) mod utils;

use std::collections::HashMap;
use std::sync::{Arc, Weak};
Expand All @@ -47,6 +49,7 @@ pub use table_names::*;

use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::cluster_info::InformationSchemaClusterInfo;
use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
use crate::information_schema::partitions::InformationSchemaPartitions;
Expand Down Expand Up @@ -179,6 +182,10 @@ impl InformationSchemaProvider {
TABLE_CONSTRAINTS.to_string(),
self.build_table(TABLE_CONSTRAINTS).unwrap(),
);
tables.insert(
CLUSTER_INFO.to_string(),
self.build_table(CLUSTER_INFO).unwrap(),
);

// Add memory tables
for name in MEMORY_TABLES.iter() {
Expand Down Expand Up @@ -251,6 +258,9 @@ impl InformationSchemaProvider {
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
Expand Down
38 changes: 19 additions & 19 deletions src/catalog/src/information_schema/runtime_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub(super) struct InformationSchemaMetrics {
const METRIC_NAME: &str = "metric_name";
const METRIC_VALUE: &str = "value";
const METRIC_LABELS: &str = "labels";
const NODE: &str = "node";
const NODE_TYPE: &str = "node_type";
const PEER_ADDR: &str = "peer_addr";
const PEER_TYPE: &str = "peer_type";
const TIMESTAMP: &str = "timestamp";

/// The `information_schema.runtime_metrics` virtual table.
Expand All @@ -63,8 +63,8 @@ impl InformationSchemaMetrics {
ColumnSchema::new(METRIC_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(METRIC_VALUE, ConcreteDataType::float64_datatype(), false),
ColumnSchema::new(METRIC_LABELS, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(NODE, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(NODE_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
TIMESTAMP,
ConcreteDataType::timestamp_millisecond_datatype(),
Expand Down Expand Up @@ -119,8 +119,8 @@ struct InformationSchemaMetricsBuilder {
metric_names: StringVectorBuilder,
metric_values: Float64VectorBuilder,
metric_labels: StringVectorBuilder,
nodes: StringVectorBuilder,
node_types: StringVectorBuilder,
peer_addrs: StringVectorBuilder,
peer_types: StringVectorBuilder,
}

impl InformationSchemaMetricsBuilder {
Expand All @@ -130,8 +130,8 @@ impl InformationSchemaMetricsBuilder {
metric_names: StringVectorBuilder::with_capacity(42),
metric_values: Float64VectorBuilder::with_capacity(42),
metric_labels: StringVectorBuilder::with_capacity(42),
nodes: StringVectorBuilder::with_capacity(42),
node_types: StringVectorBuilder::with_capacity(42),
peer_addrs: StringVectorBuilder::with_capacity(42),
peer_types: StringVectorBuilder::with_capacity(42),
}
}

Expand All @@ -140,14 +140,14 @@ impl InformationSchemaMetricsBuilder {
metric_name: &str,
labels: String,
metric_value: f64,
node: Option<&str>,
node_type: &str,
peer: Option<&str>,
peer_type: &str,
) {
self.metric_names.push(Some(metric_name));
self.metric_values.push(Some(metric_value));
self.metric_labels.push(Some(&labels));
self.nodes.push(node);
self.node_types.push(Some(node_type));
self.peer_addrs.push(peer);
self.peer_types.push(Some(peer_type));
}

async fn make_metrics(&mut self, _request: Option<ScanRequest>) -> Result<RecordBatch> {
Expand Down Expand Up @@ -184,9 +184,9 @@ impl InformationSchemaMetricsBuilder {
.join(", "),
// Safety: always has a sample
ts.samples[0].value,
// TODO(dennis): fetching other nodes metrics
// TODO(dennis): fetching other peers metrics

// The node column is always `None` for standalone
// The peer column is always `None` for standalone
None,
"standalone",
);
Expand All @@ -209,8 +209,8 @@ impl InformationSchemaMetricsBuilder {
Arc::new(self.metric_names.finish()),
Arc::new(self.metric_values.finish()),
Arc::new(self.metric_labels.finish()),
Arc::new(self.nodes.finish()),
Arc::new(self.node_types.finish()),
Arc::new(self.peer_addrs.finish()),
Arc::new(self.peer_types.finish()),
timestamps,
];

Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {

#[tokio::test]
async fn test_make_metrics() {
let metrics = InformationSchemaMetrics::new(Mode::Standalone, None);
let metrics = InformationSchemaMetrics::new();

let stream = metrics.to_stream(ScanRequest::default()).unwrap();

Expand All @@ -258,8 +258,8 @@ mod tests {
assert!(result_literal.contains(METRIC_NAME));
assert!(result_literal.contains(METRIC_VALUE));
assert!(result_literal.contains(METRIC_LABELS));
assert!(result_literal.contains(NODE));
assert!(result_literal.contains(NODE_TYPE));
assert!(result_literal.contains(PEER_ADDR));
assert!(result_literal.contains(PEER_TYPE));
assert!(result_literal.contains(TIMESTAMP));
}
}
1 change: 1 addition & 0 deletions src/catalog/src/information_schema/table_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ pub const RUNTIME_METRICS: &str = "runtime_metrics";
pub const PARTITIONS: &str = "partitions";
pub const REGION_PEERS: &str = "greptime_region_peers";
pub const TABLE_CONSTRAINTS: &str = "table_constraints";
pub const CLUSTER_INFO: &str = "cluster_info";
2 changes: 2 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ pub const INFORMATION_SCHEMA_PARTITIONS_TABLE_ID: u32 = 28;
pub const INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID: u32 = 29;
/// id for information_schema.columns
pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30;
/// id for information_schema.cluster_info
pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31;
/// ----- End of information_schema tables -----
pub const MITO_ENGINE: &str = "mito";
Expand Down
20 changes: 20 additions & 0 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ pub struct NodeInfo {
pub last_activity_ts: i64,
/// The status of the node. Different roles have different node status.
pub status: NodeStatus,
// The node build version
pub version: String,
// The node build git commit hash
pub git_commit: String,
}

#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
Expand All @@ -100,6 +104,19 @@ pub enum NodeStatus {
Datanode(DatanodeStatus),
Frontend(FrontendStatus),
Metasrv(MetasrvStatus),
Standalone,
}

impl NodeStatus {
// Get the role name of the node status
pub fn role_name(&self) -> &str {
match self {
NodeStatus::Datanode(_) => "DATANODE",
NodeStatus::Frontend(_) => "FRONTEND",
NodeStatus::Metasrv(_) => "METASRV",
NodeStatus::Standalone => "STANDALONE",
}
}
}

/// The status of a datanode.
Expand Down Expand Up @@ -271,6 +288,8 @@ mod tests {
leader_regions: 3,
follower_regions: 4,
}),
version: "".to_string(),
git_commit: "".to_string(),
};

let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
Expand All @@ -287,6 +306,7 @@ mod tests {
leader_regions: 3,
follower_regions: 4,
}),
..
}
);
}
Expand Down
7 changes: 7 additions & 0 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,25 @@ impl ClusterInfo for MetaClient {

let mut nodes = if get_metasrv_nodes {
let last_activity_ts = -1; // Metasrv does not provide this information.
// TODO(dennis): Get Metasrv build info
let git_commit = "unknown";
let version = "unknown";
let (leader, followers) = cluster_client.get_metasrv_peers().await?;
followers
.into_iter()
.map(|peer| NodeInfo {
peer,
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: version.to_string(),
git_commit: git_commit.to_string(),
})
.chain(leader.into_iter().map(|leader| NodeInfo {
peer: leader,
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: version.to_string(),
git_commit: git_commit.to_string(),
}))
.collect::<Vec<_>>()
} else {
Expand Down
5 changes: 4 additions & 1 deletion src/meta-client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ impl Inner {
{
let ask_leader = self.ask_leader()?;
let mut times = 0;
let mut last_error = None;

while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let client = self.make_client(leader)?;
match body_fn(client).await {
Ok(res) => {
if util::is_not_leader(get_header(&res)) {
last_error = Some(format!("{leader} is not a leader"));
warn!("Failed to {task} to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("Cluster client updated to new leader addr: {leader}");
Expand All @@ -164,6 +166,7 @@ impl Inner {
Err(status) => {
// The leader may be unreachable.
if util::is_unreachable(&status) {
last_error = Some(status.to_string());
warn!("Failed to {task} to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("Cluster client updated to new leader addr: {leader}");
Expand All @@ -180,7 +183,7 @@ impl Inner {
}

RetryTimesExceededSnafu {
msg: "Failed to {task}",
msg: format!("Failed to {task}, last error: {:?}", last_error),
times: self.max_retry,
}
.fail()
Expand Down
5 changes: 4 additions & 1 deletion src/meta-client/src/client/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,15 @@ impl Inner {
{
let ask_leader = self.ask_leader()?;
let mut times = 0;
let mut last_error = None;

while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let client = self.make_client(leader)?;
match body_fn(client).await {
Ok(res) => {
if util::is_not_leader(get_header(&res)) {
last_error = Some(format!("{leader} is not a leader"));
warn!("Failed to {task} to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
Expand All @@ -180,6 +182,7 @@ impl Inner {
Err(status) => {
// The leader may be unreachable.
if util::is_unreachable(&status) {
last_error = Some(status.to_string());
warn!("Failed to {task} to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("Procedure client updated to new leader addr: {leader}");
Expand All @@ -196,7 +199,7 @@ impl Inner {
}

error::RetryTimesExceededSnafu {
msg: "Failed to {task}",
msg: format!("Failed to {task}, last error: {:?}", last_error),
times: self.max_retry,
}
.fail()
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ common-procedure.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions src/meta-srv/src/handler/collect_cluster_info_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
return Ok(HandleControl::Continue);
};

let build_info = common_version::build_info();
let value = NodeInfo {
peer,
last_activity_ts: common_time::util::current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
version: build_info.version.to_string(),
git_commit: build_info.commit.to_string(),
};

save_to_mem_store(key, value, ctx).await?;
Expand Down Expand Up @@ -86,6 +89,8 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
.count();
let follower_regions = stat.region_stats.len() - leader_regions;

let build_info = common_version::build_info();

let value = NodeInfo {
peer,
last_activity_ts: stat.timestamp_millis,
Expand All @@ -95,6 +100,8 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
leader_regions,
follower_regions,
}),
version: build_info.version.to_string(),
git_commit: build_info.commit.to_string(),
};

save_to_mem_store(key, value, ctx).await?;
Expand Down
Loading

0 comments on commit d5dfd03

Please sign in to comment.