Skip to content

Commit

Permalink
feat(meta): expose hummock pinned versions and snapshots via rw_catal…
Browse files Browse the repository at this point in the history
…og (#12285)
  • Loading branch information
zwang28 authored Sep 13, 2023
1 parent a61b5aa commit 857a20e
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ prepare_sys_catalog! {
{ BuiltinCatalog::View(&RW_RELATIONS) },
{ BuiltinCatalog::Table(&RW_COLUMNS), read_rw_columns_info },
{ BuiltinCatalog::Table(&RW_TYPES), read_rw_types },
{ BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_VERSIONS), read_hummock_pinned_versions await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_SNAPSHOTS), read_hummock_pinned_snapshots await },
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ mod rw_databases;
mod rw_ddl_progress;
mod rw_fragments;
mod rw_functions;
mod rw_hummock_pinned_snapshots;
mod rw_hummock_pinned_versions;
mod rw_indexes;
mod rw_materialized_views;
mod rw_meta_snapshot;
Expand All @@ -45,6 +47,8 @@ pub use rw_databases::*;
pub use rw_ddl_progress::*;
pub use rw_fragments::*;
pub use rw_functions::*;
pub use rw_hummock_pinned_snapshots::*;
pub use rw_hummock_pinned_versions::*;
pub use rw_indexes::*;
pub use rw_materialized_views::*;
pub use rw_meta_snapshot::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_PINNED_SNAPSHOTS: BuiltinTable = BuiltinTable {
name: "rw_hummock_pinned_snapshots",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int32, "worker_node_id"),
(DataType::Int64, "min_pinned_snapshot_id"),
],
pk: &[],
};

impl SysCatalogReaderImpl {
pub async fn read_hummock_pinned_snapshots(&self) -> Result<Vec<OwnedRow>> {
let pinned_snapshots = self
.meta_client
.list_hummock_pinned_snapshots()
.await?
.into_iter()
.map(|s| {
OwnedRow::new(vec![
Some(ScalarImpl::Int32(s.0 as i32)),
Some(ScalarImpl::Int64(s.1 as i64)),
])
})
.collect_vec();
Ok(pinned_snapshots)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_PINNED_VERSIONS: BuiltinTable = BuiltinTable {
name: "rw_hummock_pinned_versions",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int32, "worker_node_id"),
(DataType::Int64, "min_pinned_version_id"),
],
pk: &[],
};

impl SysCatalogReaderImpl {
pub async fn read_hummock_pinned_versions(&self) -> Result<Vec<OwnedRow>> {
let pinned_versions = self
.meta_client
.list_hummock_pinned_versions()
.await?
.into_iter()
.map(|s| {
OwnedRow::new(vec![
Some(ScalarImpl::Int32(s.0 as i32)),
Some(ScalarImpl::Int64(s.1 as i64)),
])
})
.collect_vec();
Ok(pinned_versions)
}
}
36 changes: 36 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub trait FrontendMetaClient: Send + Sync {
async fn list_ddl_progress(&self) -> Result<Vec<DdlProgress>>;

async fn get_tables(&self, table_ids: &[u32]) -> Result<HashMap<u32, Table>>;

/// Returns vector of (worker_id, min_pinned_version_id)
async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;

/// Returns vector of (worker_id, min_pinned_snapshot_id)
async fn list_hummock_pinned_snapshots(&self) -> Result<Vec<(u32, u64)>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -145,4 +151,34 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
let tables = self.0.get_tables(table_ids).await?;
Ok(tables)
}

async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>> {
let pinned_versions = self
.0
.risectl_get_pinned_versions_summary()
.await?
.summary
.unwrap()
.pinned_versions;
let ret = pinned_versions
.into_iter()
.map(|v| (v.context_id, v.min_pinned_id))
.collect();
Ok(ret)
}

async fn list_hummock_pinned_snapshots(&self) -> Result<Vec<(u32, u64)>> {
let pinned_snapshots = self
.0
.risectl_get_pinned_snapshots_summary()
.await?
.summary
.unwrap()
.pinned_snapshots;
let ret = pinned_snapshots
.into_iter()
.map(|s| (s.context_id, s.minimal_pinned_snapshot))
.collect();
Ok(ret)
}
}
8 changes: 8 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,14 @@ impl FrontendMetaClient for MockFrontendMetaClient {
async fn get_tables(&self, _table_ids: &[u32]) -> RpcResult<HashMap<u32, Table>> {
Ok(HashMap::new())
}

async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(u32, u64)>> {
unimplemented!()
}

async fn list_hummock_pinned_snapshots(&self) -> RpcResult<Vec<(u32, u64)>> {
unimplemented!()
}
}

#[cfg(test)]
Expand Down

0 comments on commit 857a20e

Please sign in to comment.