From 61f02f0d70e6fef00715037f6db705dd2e13ecfe Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 29 Apr 2024 10:10:50 +0800 Subject: [PATCH] fix: forgot files --- .../src/information_schema/cluster_info.rs | 244 ++++++++++++++++++ src/catalog/src/information_schema/utils.rs | 53 ++++ 2 files changed, 297 insertions(+) create mode 100644 src/catalog/src/information_schema/cluster_info.rs create mode 100644 src/catalog/src/information_schema/utils.rs diff --git a/src/catalog/src/information_schema/cluster_info.rs b/src/catalog/src/information_schema/cluster_info.rs new file mode 100644 index 000000000000..9319226c5aae --- /dev/null +++ b/src/catalog/src/information_schema/cluster_info.rs @@ -0,0 +1,244 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID; +use common_config::Mode; +use common_error::ext::BoxedError; +use common_meta::cluster::{ClusterInfo, NodeInfo, NodeStatus}; +use common_meta::peer::Peer; +use common_query::physical_plan::TaskContext; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_telemetry::logging::warn; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; +use datatypes::vectors::{StringVectorBuilder, UInt64VectorBuilder}; +use snafu::ResultExt; +use store_api::storage::{ScanRequest, TableId}; + +use super::CLUSTER_INFO; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListNodesSnafu, Result}; +use crate::information_schema::{utils, InformationTable, Predicates}; +use crate::CatalogManager; + +const PEER_ID: &str = "peer_id"; +const PEER_TYPE: &str = "peer_type"; +const PEER_ADDR: &str = "peer_addr"; +const VERSION: &str = "version"; +const GIT_COMMIT: &str = "git_commit"; +// TODO(dennis): adds `uptime`, `start_time` columns etc. + +const INIT_CAPACITY: usize = 42; + +/// The `CLUSTER_INFO` table provides information about the current topology information of the cluster. +/// +/// - `peer_id`: the peer server id. +/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc. +/// - `peer_addr`: the peer gRPC address. +/// - `version`: the build package version of the peer. +/// - `git_commit`: the build git commit hash of the peer. +/// +pub(super) struct InformationSchemaClusterInfo { + schema: SchemaRef, + catalog_manager: Weak, +} + +impl InformationSchemaClusterInfo { + pub(super) fn new(catalog_manager: Weak) -> Self { + Self { + schema: Self::schema(), + catalog_manager, + } + } + + pub(crate) fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), false), + ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false), + ])) + } + + fn builder(&self) -> InformationSchemaClusterInfoBuilder { + InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone()) + } +} + +impl InformationTable for InformationSchemaClusterInfo { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID + } + + fn table_name(&self) -> &'static str { + CLUSTER_INFO + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_cluster_info(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +struct InformationSchemaClusterInfoBuilder { + schema: SchemaRef, + catalog_manager: Weak, + + peer_ids: UInt64VectorBuilder, + peer_types: StringVectorBuilder, + peer_addrs: StringVectorBuilder, + versions: StringVectorBuilder, + git_commits: StringVectorBuilder, +} + +impl InformationSchemaClusterInfoBuilder { + fn new(schema: SchemaRef, catalog_manager: Weak) -> Self { + Self { + schema, + catalog_manager, + peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), + peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY), + versions: StringVectorBuilder::with_capacity(INIT_CAPACITY), + git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY), + } + } + + /// Construct the `information_schema.cluster_info` virtual table + async fn make_cluster_info(&mut self, request: Option) -> Result { + let predicates = Predicates::from_scan_request(&request); + let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone); + + match mode { + Mode::Standalone => { + let build_info = common_version::build_info(); + + self.add_node_info( + &predicates, + NodeInfo { + // For the standalone: + // - id always 0 + // - empty string for peer_addr + peer: Peer { + id: 0, + addr: "".to_string(), + }, + last_activity_ts: -1, + status: NodeStatus::Standalone, + version: build_info.version.to_string(), + git_commit: build_info.commit.to_string(), + }, + ); + } + Mode::Distributed => { + if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? { + let node_infos = meta_client + .list_nodes(None) + .await + .map_err(BoxedError::new) + .context(ListNodesSnafu)?; + + for node_info in node_infos { + self.add_node_info(&predicates, node_info); + } + } else { + warn!("Could not find meta client in distributed mode."); + } + } + } + + self.finish() + } + + fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) { + let peer_type = node_info.status.role_name(); + + let row = [ + (PEER_ID, &Value::from(node_info.peer.id)), + (PEER_TYPE, &Value::from(peer_type)), + (PEER_ADDR, &Value::from(node_info.peer.addr.as_str())), + (VERSION, &Value::from(node_info.version.as_str())), + (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())), + ]; + + if !predicates.eval(&row) { + return; + } + + self.peer_ids.push(Some(node_info.peer.id)); + self.peer_types.push(Some(peer_type)); + self.peer_addrs.push(Some(&node_info.peer.addr)); + self.versions.push(Some(&node_info.version)); + self.git_commits.push(Some(&node_info.git_commit)); + } + + fn finish(&mut self) -> Result { + let columns: Vec = vec![ + Arc::new(self.peer_ids.finish()), + Arc::new(self.peer_types.finish()), + Arc::new(self.peer_addrs.finish()), + Arc::new(self.versions.finish()), + Arc::new(self.git_commits.finish()), + ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaClusterInfo { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_cluster_info(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/information_schema/utils.rs b/src/catalog/src/information_schema/utils.rs new file mode 100644 index 000000000000..e476e1fc9147 --- /dev/null +++ b/src/catalog/src/information_schema/utils.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use common_config::Mode; +use meta_client::client::MetaClient; +use snafu::OptionExt; + +use crate::error::{Result, UpgradeWeakCatalogManagerRefSnafu}; +use crate::kvbackend::KvBackendCatalogManager; +use crate::CatalogManager; + +/// Try to get the server running mode from `[CatalogManager]` weak reference. +pub fn running_mode(catalog_manager: &Weak) -> Result> { + let catalog_manager = catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + + Ok(catalog_manager + .as_any() + .downcast_ref::() + .map(|manager| manager.running_mode()) + .copied()) +} + +/// Try to get the `[MetaClient]` from `[CatalogManager]` weak reference. +pub fn meta_client(catalog_manager: &Weak) -> Result>> { + let catalog_manager = catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + + let meta_client = match catalog_manager + .as_any() + .downcast_ref::() + { + None => None, + Some(manager) => manager.meta_client(), + }; + + Ok(meta_client) +}