From 95715da81103fc8469035fe168e96e188e9bc7d0 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Thu, 14 Sep 2023 21:02:01 +0800 Subject: [PATCH] feat(meta): add hummock version relevant tables to rw_catalog (#12309) --- proto/hummock.proto | 14 ++++ .../src/catalog/system_catalog/mod.rs | 4 + .../catalog/system_catalog/rw_catalog/mod.rs | 6 ++ .../rw_catalog/rw_hummock_branched_objects.rs | 48 ++++++++++++ .../rw_catalog/rw_hummock_version.rs | 74 +++++++++++++++++++ .../rw_catalog/rw_hummock_version_deltas.rs | 57 ++++++++++++++ src/frontend/src/meta_client.rs | 35 ++++++++- src/frontend/src/test_utils.rs | 20 ++++- src/meta/src/hummock/manager/versioning.rs | 6 ++ src/meta/src/rpc/service/hummock_service.rs | 23 ++++++ src/rpc_client/src/meta_client.rs | 7 ++ 11 files changed, 292 insertions(+), 2 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_branched_objects.rs create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version_deltas.rs diff --git a/proto/hummock.proto b/proto/hummock.proto index 475feda58af48..9de07b714e2e5 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -632,6 +632,12 @@ message RiseCtlListCompactionStatusResponse { repeated CompactTaskProgress task_progress = 3; } +message ListBranchedObjectRequest {} + +message ListBranchedObjectResponse { + repeated BranchedObject branched_objects = 1; +} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -662,6 +668,7 @@ service HummockManagerService { rpc SplitCompactionGroup(SplitCompactionGroupRequest) returns (SplitCompactionGroupResponse); rpc RiseCtlListCompactionStatus(RiseCtlListCompactionStatusRequest) returns (RiseCtlListCompactionStatusResponse); rpc SubscribeCompactionEvent(stream SubscribeCompactionEventRequest) returns (stream SubscribeCompactionEventResponse); + rpc ListBranchedObject(ListBranchedObjectRequest) returns (ListBranchedObjectResponse); } message CompactionConfig { @@ -713,3 +720,10 @@ message WriteLimits { // < compaction group id, write limit info > map write_limits = 1; } + +message BranchedObject { + uint64 object_id = 1; + uint64 sst_id = 2; + // Compaction group id the SST belongs to. + uint64 compaction_group_id = 3; +} diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 2793351a8e532..59685698ddc99 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -405,6 +405,10 @@ prepare_sys_catalog! { { BuiltinCatalog::Table(&RW_TYPES), read_rw_types }, { BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_VERSIONS), read_hummock_pinned_versions await }, { BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_SNAPSHOTS), read_hummock_pinned_snapshots await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_CURRENT_VERSION), read_hummock_current_version await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_CHECKPOINT_VERSION), read_hummock_checkpoint_version await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_VERSION_DELTAS), read_hummock_version_deltas await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await }, } #[cfg(test)] diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 4c5b79cd1de55..58beecbb528a6 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -19,8 +19,11 @@ mod rw_databases; mod rw_ddl_progress; mod rw_fragments; mod rw_functions; +mod rw_hummock_branched_objects; mod rw_hummock_pinned_snapshots; mod rw_hummock_pinned_versions; +mod rw_hummock_version; +mod rw_hummock_version_deltas; mod rw_indexes; mod rw_materialized_views; mod rw_meta_snapshot; @@ -47,8 +50,11 @@ pub use rw_databases::*; pub use rw_ddl_progress::*; pub use rw_fragments::*; pub use rw_functions::*; +pub use rw_hummock_branched_objects::*; pub use rw_hummock_pinned_snapshots::*; pub use rw_hummock_pinned_versions::*; +pub use rw_hummock_version::*; +pub use rw_hummock_version_deltas::*; pub use rw_indexes::*; pub use rw_materialized_views::*; pub use rw_meta_snapshot::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_branched_objects.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_branched_objects.rs new file mode 100644 index 0000000000000..5e9ad57107690 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_branched_objects.rs @@ -0,0 +1,48 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_BRANCHED_OBJECTS: BuiltinTable = BuiltinTable { + name: "rw_hummock_branched_objects", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int64, "object_id"), + (DataType::Int64, "sst_id"), + (DataType::Int64, "compaction_group_id"), + ], + pk: &[], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_branched_objects(&self) -> Result> { + let branched_objects = self.meta_client.list_branched_objects().await?; + let rows = branched_objects + .into_iter() + .map(|o| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(o.object_id as _)), + Some(ScalarImpl::Int64(o.sst_id as _)), + Some(ScalarImpl::Int64(o.compaction_group_id as _)), + ]) + }) + .collect(); + Ok(rows) + } +} diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs new file mode 100644 index 0000000000000..550636a6c9c14 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs @@ -0,0 +1,74 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_pb::hummock::HummockVersion; +use serde_json::json; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_CURRENT_VERSION: BuiltinTable = BuiltinTable { + name: "rw_hummock_current_version", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int64, "version_id"), + (DataType::Int64, "max_committed_epoch"), + (DataType::Int64, "safe_epoch"), + (DataType::Jsonb, "compaction_group"), + ], + pk: &[], +}; + +pub const RW_HUMMOCK_CHECKPOINT_VERSION: BuiltinTable = BuiltinTable { + name: "rw_hummock_checkpoint_version", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int64, "version_id"), + (DataType::Int64, "max_committed_epoch"), + (DataType::Int64, "safe_epoch"), + (DataType::Jsonb, "compaction_group"), + ], + pk: &[], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_current_version(&self) -> Result> { + let version = self.meta_client.get_hummock_current_version().await?; + Ok(version_to_rows(&version)) + } + + pub async fn read_hummock_checkpoint_version(&self) -> Result> { + let version = self.meta_client.get_hummock_checkpoint_version().await?; + Ok(version_to_rows(&version)) + } +} + +fn version_to_rows(version: &HummockVersion) -> Vec { + version + .levels + .values() + .map(|cg| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(version.id as _)), + Some(ScalarImpl::Int64(version.max_committed_epoch as _)), + Some(ScalarImpl::Int64(version.safe_epoch as _)), + // FIXME #8612: The byte array key_range is encoded to a string by serde_json. We need disable this behavior as it makes it harder to understand the key range. + Some(ScalarImpl::Jsonb(json!(cg).into())), + ]) + }) + .collect() +} diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version_deltas.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version_deltas.rs new file mode 100644 index 0000000000000..059fa5d7d47da --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version_deltas.rs @@ -0,0 +1,57 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; +use serde_json::json; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_VERSION_DELTAS: BuiltinTable = BuiltinTable { + name: "rw_hummock_version_deltas", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int64, "id"), + (DataType::Int64, "prev_id"), + (DataType::Int64, "max_committed_epoch"), + (DataType::Int64, "safe_epoch"), + (DataType::Boolean, "trivial_move"), + (DataType::Jsonb, "gc_object_ids"), + (DataType::Jsonb, "group_deltas"), + ], + pk: &[0], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_version_deltas(&self) -> Result> { + let deltas = self.meta_client.list_version_deltas().await?; + let rows = deltas + .into_iter() + .map(|d| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(d.id as _)), + Some(ScalarImpl::Int64(d.prev_id as _)), + Some(ScalarImpl::Int64(d.max_committed_epoch as _)), + Some(ScalarImpl::Int64(d.safe_epoch as _)), + Some(ScalarImpl::Bool(d.trivial_move)), + Some(ScalarImpl::Jsonb(json!(d.gc_object_ids).into())), + Some(ScalarImpl::Jsonb(json!(d.group_deltas).into())), + ]) + }) + .collect(); + Ok(rows) + } +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 670cc483a905a..c45cf12377269 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -18,7 +18,9 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::DdlProgress; -use risingwave_pb::hummock::HummockSnapshot; +use risingwave_pb::hummock::{ + BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta, +}; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; @@ -76,6 +78,14 @@ pub trait FrontendMetaClient: Send + Sync { /// Returns vector of (worker_id, min_pinned_snapshot_id) async fn list_hummock_pinned_snapshots(&self) -> Result>; + + async fn get_hummock_current_version(&self) -> Result; + + async fn get_hummock_checkpoint_version(&self) -> Result; + + async fn list_version_deltas(&self) -> Result>; + + async fn list_branched_objects(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -181,4 +191,27 @@ impl FrontendMetaClient for FrontendMetaClientImpl { .collect(); Ok(ret) } + + async fn get_hummock_current_version(&self) -> Result { + self.0.get_current_version().await + } + + async fn get_hummock_checkpoint_version(&self) -> Result { + self.0 + .risectl_get_checkpoint_hummock_version() + .await + .map(|v| v.checkpoint_version.unwrap()) + } + + async fn list_version_deltas(&self) -> Result> { + // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks. + self.0 + .list_version_deltas(0, u32::MAX, u64::MAX) + .await + .map(|v| v.version_deltas) + } + + async fn list_branched_objects(&self) -> Result> { + self.0.list_branched_object().await + } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 1907c243bd05d..abdfa90c064b2 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -35,7 +35,9 @@ use risingwave_pb::catalog::{ PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table, }; use risingwave_pb::ddl_service::{create_connection_request, DdlProgress}; -use risingwave_pb::hummock::HummockSnapshot; +use risingwave_pb::hummock::{ + BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta, +}; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; @@ -832,6 +834,22 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn list_hummock_pinned_snapshots(&self) -> RpcResult> { unimplemented!() } + + async fn get_hummock_current_version(&self) -> RpcResult { + unimplemented!() + } + + async fn get_hummock_checkpoint_version(&self) -> RpcResult { + unimplemented!() + } + + async fn list_version_deltas(&self) -> RpcResult> { + unimplemented!() + } + + async fn list_branched_objects(&self) -> RpcResult> { + unimplemented!() + } } #[cfg(test)] diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 09a331fa3d9f5..1e939513bbf3d 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -271,6 +271,12 @@ impl HummockManager { let guard = read_lock!(self, versioning).await; guard.write_limit.clone() } + + #[named] + pub async fn list_branched_objects(&self) -> BTreeMap { + let guard = read_lock!(self, versioning).await; + guard.branched_ssts.clone() + } } /// Calculates write limits for `target_groups`. diff --git a/src/meta/src/rpc/service/hummock_service.rs b/src/meta/src/rpc/service/hummock_service.rs index 58310cc6eb110..a8419da6e2077 100644 --- a/src/meta/src/rpc/service/hummock_service.rs +++ b/src/meta/src/rpc/service/hummock_service.rs @@ -513,4 +513,27 @@ impl HummockManagerService for HummockServiceImpl { Ok(Response::new(RwReceiverStream::new(rx))) } + + async fn list_branched_object( + &self, + _request: Request, + ) -> Result, Status> { + let branched_objects = self + .hummock_manager + .list_branched_objects() + .await + .into_iter() + .flat_map(|(object_id, v)| { + v.into_iter() + .map(move |(compaction_group_id, sst_id)| BranchedObject { + object_id, + sst_id, + compaction_group_id, + }) + }) + .collect(); + Ok(Response::new(ListBranchedObjectResponse { + branched_objects, + })) + } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index a6a8b1d5baff4..e4b837ba517f0 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1044,6 +1044,12 @@ impl MetaClient { )) } + pub async fn list_branched_object(&self) -> Result> { + let req = ListBranchedObjectRequest {}; + let resp = self.inner.list_branched_object(req).await?; + Ok(resp.branched_objects) + } + pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> { let _resp = self .inner @@ -1709,6 +1715,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse } ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse } ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest, Streaming } + ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse }