diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 5d4a785bd451..2793351a8e53 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -403,6 +403,8 @@ prepare_sys_catalog! { { BuiltinCatalog::View(&RW_RELATIONS) }, { BuiltinCatalog::Table(&RW_COLUMNS), read_rw_columns_info }, { BuiltinCatalog::Table(&RW_TYPES), read_rw_types }, + { BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_VERSIONS), read_hummock_pinned_versions await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_SNAPSHOTS), read_hummock_pinned_snapshots await }, } #[cfg(test)] diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 943f1e5d69ae..4c5b79cd1de5 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -19,6 +19,8 @@ mod rw_databases; mod rw_ddl_progress; mod rw_fragments; mod rw_functions; +mod rw_hummock_pinned_snapshots; +mod rw_hummock_pinned_versions; mod rw_indexes; mod rw_materialized_views; mod rw_meta_snapshot; @@ -45,6 +47,8 @@ pub use rw_databases::*; pub use rw_ddl_progress::*; pub use rw_fragments::*; pub use rw_functions::*; +pub use rw_hummock_pinned_snapshots::*; +pub use rw_hummock_pinned_versions::*; pub use rw_indexes::*; pub use rw_materialized_views::*; pub use rw_meta_snapshot::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs new file mode 100644 index 000000000000..8628e2562e69 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs @@ -0,0 +1,49 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_PINNED_SNAPSHOTS: BuiltinTable = BuiltinTable { + name: "rw_hummock_pinned_snapshots", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int32, "worker_node_id"), + (DataType::Int64, "min_pinned_snapshot_id"), + ], + pk: &[], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_pinned_snapshots(&self) -> Result> { + let pinned_snapshots = self + .meta_client + .list_hummock_pinned_snapshots() + .await? + .into_iter() + .map(|s| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int32(s.0 as i32)), + Some(ScalarImpl::Int64(s.1 as i64)), + ]) + }) + .collect_vec(); + Ok(pinned_snapshots) + } +} diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_versions.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_versions.rs new file mode 100644 index 000000000000..87b9804d8a26 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_versions.rs @@ -0,0 +1,49 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_PINNED_VERSIONS: BuiltinTable = BuiltinTable { + name: "rw_hummock_pinned_versions", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int32, "worker_node_id"), + (DataType::Int64, "min_pinned_version_id"), + ], + pk: &[], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_pinned_versions(&self) -> Result> { + let pinned_versions = self + .meta_client + .list_hummock_pinned_versions() + .await? + .into_iter() + .map(|s| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int32(s.0 as i32)), + Some(ScalarImpl::Int64(s.1 as i64)), + ]) + }) + .collect_vec(); + Ok(pinned_versions) + } +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 0ab4fe6d5993..670cc483a905 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -70,6 +70,12 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_ddl_progress(&self) -> Result>; async fn get_tables(&self, table_ids: &[u32]) -> Result>; + + /// Returns vector of (worker_id, min_pinned_version_id) + async fn list_hummock_pinned_versions(&self) -> Result>; + + /// Returns vector of (worker_id, min_pinned_snapshot_id) + async fn list_hummock_pinned_snapshots(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -145,4 +151,34 @@ impl FrontendMetaClient for FrontendMetaClientImpl { let tables = self.0.get_tables(table_ids).await?; Ok(tables) } + + async fn list_hummock_pinned_versions(&self) -> Result> { + let pinned_versions = self + .0 + .risectl_get_pinned_versions_summary() + .await? + .summary + .unwrap() + .pinned_versions; + let ret = pinned_versions + .into_iter() + .map(|v| (v.context_id, v.min_pinned_id)) + .collect(); + Ok(ret) + } + + async fn list_hummock_pinned_snapshots(&self) -> Result> { + let pinned_snapshots = self + .0 + .risectl_get_pinned_snapshots_summary() + .await? + .summary + .unwrap() + .pinned_snapshots; + let ret = pinned_snapshots + .into_iter() + .map(|s| (s.context_id, s.minimal_pinned_snapshot)) + .collect(); + Ok(ret) + } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 086c4f0c496e..1907c243bd05 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -824,6 +824,14 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn get_tables(&self, _table_ids: &[u32]) -> RpcResult> { Ok(HashMap::new()) } + + async fn list_hummock_pinned_versions(&self) -> RpcResult> { + unimplemented!() + } + + async fn list_hummock_pinned_snapshots(&self) -> RpcResult> { + unimplemented!() + } } #[cfg(test)]