Skip to content

Commit

Permalink
feat(meta): add hummock version relevant tables to rw_catalog (#12309)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Sep 14, 2023
1 parent a740364 commit 5aa5a47
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 2 deletions.
14 changes: 14 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,12 @@ message RiseCtlListCompactionStatusResponse {
repeated CompactTaskProgress task_progress = 3;
}

message ListBranchedObjectRequest {}

message ListBranchedObjectResponse {
repeated BranchedObject branched_objects = 1;
}

service HummockManagerService {
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
Expand Down Expand Up @@ -657,6 +663,7 @@ service HummockManagerService {
rpc SplitCompactionGroup(SplitCompactionGroupRequest) returns (SplitCompactionGroupResponse);
rpc RiseCtlListCompactionStatus(RiseCtlListCompactionStatusRequest) returns (RiseCtlListCompactionStatusResponse);
rpc SubscribeCompactionEvent(stream SubscribeCompactionEventRequest) returns (stream SubscribeCompactionEventResponse);
rpc ListBranchedObject(ListBranchedObjectRequest) returns (ListBranchedObjectResponse);
}

message CompactionConfig {
Expand Down Expand Up @@ -708,3 +715,10 @@ message WriteLimits {
// < compaction group id, write limit info >
map<uint64, WriteLimit> write_limits = 1;
}

message BranchedObject {
uint64 object_id = 1;
uint64 sst_id = 2;
// Compaction group id the SST belongs to.
uint64 compaction_group_id = 3;
}
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ prepare_sys_catalog! {
{ BuiltinCatalog::Table(&RW_TYPES), read_rw_types },
{ BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_VERSIONS), read_hummock_pinned_versions await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_PINNED_SNAPSHOTS), read_hummock_pinned_snapshots await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_CURRENT_VERSION), read_hummock_current_version await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_CHECKPOINT_VERSION), read_hummock_checkpoint_version await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_VERSION_DELTAS), read_hummock_version_deltas await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await },
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ mod rw_databases;
mod rw_ddl_progress;
mod rw_fragments;
mod rw_functions;
mod rw_hummock_branched_objects;
mod rw_hummock_pinned_snapshots;
mod rw_hummock_pinned_versions;
mod rw_hummock_version;
mod rw_hummock_version_deltas;
mod rw_indexes;
mod rw_materialized_views;
mod rw_meta_snapshot;
Expand All @@ -47,8 +50,11 @@ pub use rw_databases::*;
pub use rw_ddl_progress::*;
pub use rw_fragments::*;
pub use rw_functions::*;
pub use rw_hummock_branched_objects::*;
pub use rw_hummock_pinned_snapshots::*;
pub use rw_hummock_pinned_versions::*;
pub use rw_hummock_version::*;
pub use rw_hummock_version_deltas::*;
pub use rw_indexes::*;
pub use rw_materialized_views::*;
pub use rw_meta_snapshot::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_BRANCHED_OBJECTS: BuiltinTable = BuiltinTable {
name: "rw_hummock_branched_objects",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int64, "object_id"),
(DataType::Int64, "sst_id"),
(DataType::Int64, "compaction_group_id"),
],
pk: &[],
};

impl SysCatalogReaderImpl {
pub async fn read_hummock_branched_objects(&self) -> Result<Vec<OwnedRow>> {
let branched_objects = self.meta_client.list_branched_objects().await?;
let rows = branched_objects
.into_iter()
.map(|o| {
OwnedRow::new(vec![
Some(ScalarImpl::Int64(o.object_id as _)),
Some(ScalarImpl::Int64(o.sst_id as _)),
Some(ScalarImpl::Int64(o.compaction_group_id as _)),
])
})
.collect();
Ok(rows)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_pb::hummock::HummockVersion;
use serde_json::json;

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_CURRENT_VERSION: BuiltinTable = BuiltinTable {
name: "rw_hummock_current_version",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int64, "version_id"),
(DataType::Int64, "max_committed_epoch"),
(DataType::Int64, "safe_epoch"),
(DataType::Jsonb, "compaction_group"),
],
pk: &[],
};

pub const RW_HUMMOCK_CHECKPOINT_VERSION: BuiltinTable = BuiltinTable {
name: "rw_hummock_checkpoint_version",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int64, "version_id"),
(DataType::Int64, "max_committed_epoch"),
(DataType::Int64, "safe_epoch"),
(DataType::Jsonb, "compaction_group"),
],
pk: &[],
};

impl SysCatalogReaderImpl {
pub async fn read_hummock_current_version(&self) -> Result<Vec<OwnedRow>> {
let version = self.meta_client.get_hummock_current_version().await?;
Ok(version_to_rows(&version))
}

pub async fn read_hummock_checkpoint_version(&self) -> Result<Vec<OwnedRow>> {
let version = self.meta_client.get_hummock_checkpoint_version().await?;
Ok(version_to_rows(&version))
}
}

fn version_to_rows(version: &HummockVersion) -> Vec<OwnedRow> {
version
.levels
.values()
.map(|cg| {
OwnedRow::new(vec![
Some(ScalarImpl::Int64(version.id as _)),
Some(ScalarImpl::Int64(version.max_committed_epoch as _)),
Some(ScalarImpl::Int64(version.safe_epoch as _)),
// FIXME #8612: The byte array key_range is encoded to a string by serde_json. We need disable this behavior as it makes it harder to understand the key range.
Some(ScalarImpl::Jsonb(json!(cg).into())),
])
})
.collect()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};
use serde_json::json;

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_VERSION_DELTAS: BuiltinTable = BuiltinTable {
name: "rw_hummock_version_deltas",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int64, "id"),
(DataType::Int64, "prev_id"),
(DataType::Int64, "max_committed_epoch"),
(DataType::Int64, "safe_epoch"),
(DataType::Boolean, "trivial_move"),
(DataType::Jsonb, "gc_object_ids"),
(DataType::Jsonb, "group_deltas"),
],
pk: &[0],
};

impl SysCatalogReaderImpl {
pub async fn read_hummock_version_deltas(&self) -> Result<Vec<OwnedRow>> {
let deltas = self.meta_client.list_version_deltas().await?;
let rows = deltas
.into_iter()
.map(|d| {
OwnedRow::new(vec![
Some(ScalarImpl::Int64(d.id as _)),
Some(ScalarImpl::Int64(d.prev_id as _)),
Some(ScalarImpl::Int64(d.max_committed_epoch as _)),
Some(ScalarImpl::Int64(d.safe_epoch as _)),
Some(ScalarImpl::Bool(d.trivial_move)),
Some(ScalarImpl::Jsonb(json!(d.gc_object_ids).into())),
Some(ScalarImpl::Jsonb(json!(d.group_deltas).into())),
])
})
.collect();
Ok(rows)
}
}
35 changes: 34 additions & 1 deletion src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::Table;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::hummock::{
BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
Expand Down Expand Up @@ -76,6 +78,14 @@ pub trait FrontendMetaClient: Send + Sync {

/// Returns vector of (worker_id, min_pinned_snapshot_id)
async fn list_hummock_pinned_snapshots(&self) -> Result<Vec<(u32, u64)>>;

async fn get_hummock_current_version(&self) -> Result<HummockVersion>;

async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;

async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;

async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -181,4 +191,27 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
.collect();
Ok(ret)
}

async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
self.0.get_current_version().await
}

async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion> {
self.0
.risectl_get_checkpoint_hummock_version()
.await
.map(|v| v.checkpoint_version.unwrap())
}

async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>> {
// FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks.
self.0
.list_version_deltas(0, u32::MAX, u64::MAX)
.await
.map(|v| v.version_deltas)
}

async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
self.0.list_branched_object().await
}
}
20 changes: 19 additions & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table,
};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::hummock::{
BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
Expand Down Expand Up @@ -832,6 +834,22 @@ impl FrontendMetaClient for MockFrontendMetaClient {
async fn list_hummock_pinned_snapshots(&self) -> RpcResult<Vec<(u32, u64)>> {
unimplemented!()
}

async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
unimplemented!()
}

async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
unimplemented!()
}

async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
unimplemented!()
}

async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
unimplemented!()
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ impl HummockManager {
let guard = read_lock!(self, versioning).await;
guard.write_limit.clone()
}

#[named]
pub async fn list_branched_objects(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
let guard = read_lock!(self, versioning).await;
guard.branched_ssts.clone()
}
}

/// Calculates write limits for `target_groups`.
Expand Down
23 changes: 23 additions & 0 deletions src/meta/src/rpc/service/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,4 +513,27 @@ impl HummockManagerService for HummockServiceImpl {

Ok(Response::new(RwReceiverStream::new(rx)))
}

async fn list_branched_object(
&self,
_request: Request<ListBranchedObjectRequest>,
) -> Result<Response<ListBranchedObjectResponse>, Status> {
let branched_objects = self
.hummock_manager
.list_branched_objects()
.await
.into_iter()
.flat_map(|(object_id, v)| {
v.into_iter()
.map(move |(compaction_group_id, sst_id)| BranchedObject {
object_id,
sst_id,
compaction_group_id,
})
})
.collect();
Ok(Response::new(ListBranchedObjectResponse {
branched_objects,
}))
}
}
7 changes: 7 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,12 @@ impl MetaClient {
))
}

pub async fn list_branched_object(&self) -> Result<Vec<BranchedObject>> {
let req = ListBranchedObjectRequest {};
let resp = self.inner.list_branched_object(req).await?;
Ok(resp.branched_objects)
}

pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> {
let _resp = self
.inner
Expand Down Expand Up @@ -1709,6 +1715,7 @@ macro_rules! for_all_meta_rpc {
,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
,{ user_client, drop_user, DropUserRequest, DropUserResponse }
Expand Down

0 comments on commit 5aa5a47

Please sign in to comment.