From 254a3e27b37e3306f5e87f25c257cb1d27491fd1 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 11 Sep 2024 11:17:19 +0800 Subject: [PATCH] remove pin/unpin snapshot --- proto/hummock.proto | 47 ----- src/ctl/src/cmd_impl/hummock/list_version.rs | 34 +--- src/ctl/src/lib.rs | 9 +- .../catalog/system_catalog/rw_catalog/mod.rs | 1 - .../rw_catalog/rw_hummock_pinned_snapshots.rs | 41 ---- src/frontend/src/meta_client.rs | 37 ---- src/frontend/src/scheduler/snapshot.rs | 191 +----------------- src/frontend/src/test_utils.rs | 16 -- src/meta/service/src/hummock_service.rs | 64 ------ src/meta/service/src/notification_service.rs | 7 +- src/meta/src/hummock/manager/context.rs | 132 ++---------- src/meta/src/hummock/manager/mod.rs | 23 +-- src/meta/src/hummock/manager/tests.rs | 18 +- src/meta/src/hummock/manager/transaction.rs | 2 +- src/meta/src/hummock/manager/utils.rs | 2 - src/meta/src/hummock/manager/versioning.rs | 12 +- src/meta/src/hummock/metrics_utils.rs | 19 +- .../src/hummock/mock_hummock_meta_client.rs | 26 --- src/rpc_client/src/hummock_meta_client.rs | 3 - src/rpc_client/src/meta_client.rs | 51 ----- .../src/hummock/hummock_meta_client.rs | 12 -- .../src/compaction_test_runner.rs | 3 +- 22 files changed, 31 insertions(+), 719 deletions(-) delete mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs diff --git a/proto/hummock.proto b/proto/hummock.proto index 8491365227e13..c29c44389d56f 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -280,26 +280,12 @@ message UnpinVersionResponse { common.Status status = 1; } -message PinSnapshotRequest { - uint32 context_id = 1; -} - -message PinSpecificSnapshotRequest { - uint32 context_id = 1; - uint64 epoch = 2; -} - message GetAssignedCompactTaskNumRequest {} message GetAssignedCompactTaskNumResponse { uint32 num_tasks = 1; } -message PinSnapshotResponse { - common.Status status = 1; - HummockSnapshot snapshot = 2; -} - message GetEpochRequest {} message GetEpochResponse { @@ -307,23 +293,6 @@ message GetEpochResponse { HummockSnapshot snapshot = 2; } -message UnpinSnapshotRequest { - uint32 context_id = 1; -} - -message UnpinSnapshotResponse { - common.Status status = 1; -} - -message UnpinSnapshotBeforeRequest { - uint32 context_id = 1; - HummockSnapshot min_snapshot = 3; -} - -message UnpinSnapshotBeforeResponse { - common.Status status = 1; -} - // When right_exclusive=false, it represents [left, right], of which both boundary are open. When right_exclusive=true, // it represents [left, right), of which right is exclusive. message KeyRange { @@ -650,23 +619,12 @@ message PinnedVersionsSummary { map workers = 2; } -message PinnedSnapshotsSummary { - repeated HummockPinnedSnapshot pinned_snapshots = 1; - map workers = 2; -} - message RiseCtlGetPinnedVersionsSummaryRequest {} message RiseCtlGetPinnedVersionsSummaryResponse { PinnedVersionsSummary summary = 1; } -message RiseCtlGetPinnedSnapshotsSummaryRequest {} - -message RiseCtlGetPinnedSnapshotsSummaryResponse { - PinnedSnapshotsSummary summary = 1; -} - message InitMetadataForReplayRequest { repeated catalog.Table tables = 1; repeated CompactionGroupInfo compaction_groups = 2; @@ -862,18 +820,13 @@ service HummockManagerService { rpc GetAssignedCompactTaskNum(GetAssignedCompactTaskNumRequest) returns (GetAssignedCompactTaskNumResponse); rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse); rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse); - rpc PinSnapshot(PinSnapshotRequest) returns (PinSnapshotResponse); - rpc PinSpecificSnapshot(PinSpecificSnapshotRequest) returns (PinSnapshotResponse); rpc GetEpoch(GetEpochRequest) returns (GetEpochResponse); - rpc UnpinSnapshot(UnpinSnapshotRequest) returns (UnpinSnapshotResponse); - rpc UnpinSnapshotBefore(UnpinSnapshotBeforeRequest) returns (UnpinSnapshotBeforeResponse); rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse); rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse); rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse); rpc ReportFullScanTask(ReportFullScanTaskRequest) returns (ReportFullScanTaskResponse); rpc TriggerFullGC(TriggerFullGCRequest) returns (TriggerFullGCResponse); rpc RiseCtlGetPinnedVersionsSummary(RiseCtlGetPinnedVersionsSummaryRequest) returns (RiseCtlGetPinnedVersionsSummaryResponse); - rpc RiseCtlGetPinnedSnapshotsSummary(RiseCtlGetPinnedSnapshotsSummaryRequest) returns (RiseCtlGetPinnedSnapshotsSummaryResponse); rpc RiseCtlListCompactionGroup(RiseCtlListCompactionGroupRequest) returns (RiseCtlListCompactionGroupResponse); rpc RiseCtlUpdateCompactionConfig(RiseCtlUpdateCompactionConfigRequest) returns (RiseCtlUpdateCompactionConfigResponse); rpc RiseCtlPauseVersionCheckpoint(RiseCtlPauseVersionCheckpointRequest) returns (RiseCtlPauseVersionCheckpointResponse); diff --git a/src/ctl/src/cmd_impl/hummock/list_version.rs b/src/ctl/src/cmd_impl/hummock/list_version.rs index 460dd88621eb9..ac8ba6982316e 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::hummock::{PinnedSnapshotsSummary, PinnedVersionsSummary}; +use risingwave_pb::hummock::PinnedVersionsSummary; use risingwave_rpc_client::HummockMetaClient; use crate::CtlContext; @@ -118,38 +118,6 @@ pub async fn list_pinned_versions(context: &CtlContext) -> anyhow::Result<()> { Ok(()) } -pub async fn list_pinned_snapshots(context: &CtlContext) -> anyhow::Result<()> { - let meta_client = context.meta_client().await?; - let PinnedSnapshotsSummary { - mut pinned_snapshots, - workers, - } = meta_client - .risectl_get_pinned_snapshots_summary() - .await? - .summary - .unwrap(); - pinned_snapshots.sort_by_key(|s| s.minimal_pinned_snapshot); - for pinned_snapshot in pinned_snapshots { - match workers.get(&pinned_snapshot.context_id) { - None => { - println!( - "Worker {} may have been dropped, min_pinned_snapshot {}", - pinned_snapshot.context_id, pinned_snapshot.minimal_pinned_snapshot - ); - } - Some(worker) => { - println!( - "Worker {} type {} min_pinned_snapshot {}", - pinned_snapshot.context_id, - worker.r#type().as_str_name(), - pinned_snapshot.minimal_pinned_snapshot - ); - } - } - } - Ok(()) -} - pub async fn rebuild_table_stats(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; meta_client.risectl_rebuild_table_stats().await?; diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index b35b8d1e42cb2..dead46ba2b2b1 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -27,9 +27,7 @@ use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::Compressi use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; use thiserror_ext::AsReport; -use crate::cmd_impl::hummock::{ - build_compaction_config_vec, list_pinned_snapshots, list_pinned_versions, -}; +use crate::cmd_impl::hummock::{build_compaction_config_vec, list_pinned_versions}; use crate::cmd_impl::meta::EtcdBackend; use crate::cmd_impl::throttle::apply_throttle; use crate::common::CtlContext; @@ -223,8 +221,6 @@ enum HummockCommands { }, /// List pinned versions of each worker. ListPinnedVersions {}, - /// List pinned snapshots of each worker. - ListPinnedSnapshots {}, /// List all compaction groups. ListCompactionGroup, /// Update compaction config for compaction groups. @@ -654,9 +650,6 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::ListPinnedVersions {}) => { list_pinned_versions(context).await? } - Commands::Hummock(HummockCommands::ListPinnedSnapshots {}) => { - list_pinned_snapshots(context).await? - } Commands::Hummock(HummockCommands::ListCompactionGroup) => { cmd_impl::hummock::list_compaction_group(context).await? } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 5e3261c06d186..75825a74320cb 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -29,7 +29,6 @@ mod rw_hummock_compact_task_assignment; mod rw_hummock_compact_task_progress; mod rw_hummock_compaction_group_configs; mod rw_hummock_meta_configs; -mod rw_hummock_pinned_snapshots; mod rw_hummock_pinned_versions; mod rw_hummock_version; mod rw_hummock_version_deltas; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs deleted file mode 100644 index e4f18c8fecaf3..0000000000000 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use risingwave_common::types::Fields; -use risingwave_frontend_macro::system_catalog; - -use crate::catalog::system_catalog::SysCatalogReaderImpl; -use crate::error::Result; - -#[derive(Fields)] -struct RwHummockPinnedSnapshot { - #[primary_key] - worker_node_id: i32, - min_pinned_snapshot_id: i64, -} - -#[system_catalog(table, "rw_catalog.rw_hummock_pinned_snapshots")] -async fn read(reader: &SysCatalogReaderImpl) -> Result> { - let pinned_snapshots = reader - .meta_client - .list_hummock_pinned_snapshots() - .await? - .into_iter() - .map(|s| RwHummockPinnedSnapshot { - worker_node_id: s.0 as _, - min_pinned_snapshot_id: s.1 as _, - }) - .collect(); - Ok(pinned_snapshots) -} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index c58dcc365f431..97395dcd786bb 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -47,9 +47,6 @@ use risingwave_rpc_client::{HummockMetaClient, MetaClient}; #[async_trait::async_trait] pub trait FrontendMetaClient: Send + Sync { async fn try_unregister(&self); - - async fn pin_snapshot(&self) -> Result; - async fn get_snapshot(&self) -> Result; async fn flush(&self, checkpoint: bool) -> Result; @@ -73,10 +70,6 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_object_dependencies(&self) -> Result>; - async fn unpin_snapshot(&self) -> Result<()>; - - async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()>; - async fn list_meta_snapshots(&self) -> Result>; async fn get_system_params(&self) -> Result; @@ -98,9 +91,6 @@ pub trait FrontendMetaClient: Send + Sync { /// Returns vector of (worker_id, min_pinned_version_id) async fn list_hummock_pinned_versions(&self) -> Result>; - /// Returns vector of (worker_id, min_pinned_snapshot_id) - async fn list_hummock_pinned_snapshots(&self) -> Result>; - async fn get_hummock_current_version(&self) -> Result; async fn get_hummock_checkpoint_version(&self) -> Result; @@ -149,10 +139,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.try_unregister().await; } - async fn pin_snapshot(&self) -> Result { - self.0.pin_snapshot().await - } - async fn get_snapshot(&self) -> Result { self.0.get_snapshot().await } @@ -196,14 +182,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.list_object_dependencies().await } - async fn unpin_snapshot(&self) -> Result<()> { - self.0.unpin_snapshot().await - } - - async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()> { - self.0.unpin_snapshot_before(epoch).await - } - async fn list_meta_snapshots(&self) -> Result> { let manifest = self.0.get_meta_snapshot_manifest().await?; Ok(manifest.snapshot_metadata) @@ -257,21 +235,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { Ok(ret) } - async fn list_hummock_pinned_snapshots(&self) -> Result> { - let pinned_snapshots = self - .0 - .risectl_get_pinned_snapshots_summary() - .await? - .summary - .unwrap() - .pinned_snapshots; - let ret = pinned_snapshots - .into_iter() - .map(|s| (s.context_id, s.minimal_pinned_snapshot)) - .collect(); - Ok(ret) - } - async fn get_hummock_current_version(&self) -> Result { self.0.get_current_version().await } diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 4eb23e97dd5f7..fa6d91da2b224 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -12,25 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; -use std::collections::btree_map::Entry; -use std::collections::BTreeMap; use std::sync::Arc; -use std::time::Duration; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::hummock::PbHummockSnapshot; -use thiserror_ext::AsReport; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::watch; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; -/// The interval between two unpin batches. -const UNPIN_INTERVAL_SECS: u64 = 10; - /// The storage snapshot to read from in a query, which can be freely cloned. #[derive(Clone)] pub enum ReadSnapshot { @@ -86,7 +77,6 @@ impl ReadSnapshot { // singleton for each snapshot. Use `PinnedSnapshotRef` instead. pub struct PinnedSnapshot { value: PbHummockSnapshot, - unpin_sender: UnboundedSender, } impl std::fmt::Debug for PinnedSnapshot { @@ -113,12 +103,6 @@ impl PinnedSnapshot { } } -impl Drop for PinnedSnapshot { - fn drop(&mut self) { - let _ = self.unpin_sender.send(Operation::Unpin(self.value)); - } -} - /// Returns an invalid snapshot, used for initial values. fn invalid_snapshot() -> PbHummockSnapshot { PbHummockSnapshot { @@ -128,10 +112,6 @@ fn invalid_snapshot() -> PbHummockSnapshot { /// Cache of hummock snapshot in meta. pub struct HummockSnapshotManager { - /// Send epoch-related operations to [`UnpinWorker`] for managing the pinned snapshots and - /// unpin them in a batch through RPC. - worker_sender: UnboundedSender, - /// The latest snapshot synced from the meta service. /// /// The `max_committed_epoch` and `max_current_epoch` are pushed from meta node to reduce rpc @@ -147,22 +127,14 @@ pub struct HummockSnapshotManager { pub type HummockSnapshotManagerRef = Arc; impl HummockSnapshotManager { - pub fn new(meta_client: Arc) -> Self { - let (worker_sender, worker_receiver) = tokio::sync::mpsc::unbounded_channel(); - - tokio::spawn(UnpinWorker::new(meta_client, worker_receiver).run()); - + pub fn new(_meta_client: Arc) -> Self { let latest_snapshot = Arc::new(PinnedSnapshot { value: invalid_snapshot(), - unpin_sender: worker_sender.clone(), }); let (latest_snapshot, _) = watch::channel(latest_snapshot); - Self { - worker_sender, - latest_snapshot, - } + Self { latest_snapshot } } /// Acquire the latest snapshot by increasing its reference count. @@ -190,13 +162,8 @@ impl HummockSnapshotManager { // Ignore the same snapshot false } else { - // First tell the worker that a new snapshot is going to be pinned. - self.worker_sender.send(Operation::Pin(snapshot)).unwrap(); // Then set the latest snapshot. - *old_snapshot = Arc::new(PinnedSnapshot { - value: snapshot, - unpin_sender: self.worker_sender.clone(), - }); + *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot }); true } @@ -211,155 +178,3 @@ impl HummockSnapshotManager { } } } - -/// The pin state of a snapshot. -#[derive(Debug)] -enum PinState { - /// The snapshot is currently pinned by some sessions in this frontend. - Pinned, - - /// The snapshot is no longer pinned by any session in this frontend, but it's still considered - /// to be pinned by the meta service. It will be unpinned by the [`UnpinWorker`] in the next - /// unpin batch through RPC, and the entry will be removed then. - Unpinned, -} - -/// The operation handled by the [`UnpinWorker`]. -#[derive(Debug)] -enum Operation { - /// Mark the snapshot as pinned, sent when a new snapshot is pinned with `update`. - Pin(PbHummockSnapshot), - - /// Mark the snapshot as unpinned, sent when all references to a [`PinnedSnapshot`] is dropped. - Unpin(PbHummockSnapshot), -} - -impl Operation { - /// Returns whether the operation is for an invalid snapshot, which should be ignored. - fn is_invalid(&self) -> bool { - match self { - Operation::Pin(s) | Operation::Unpin(s) => s, - } - .committed_epoch - == INVALID_EPOCH - } -} - -/// The key for the states map in [`UnpinWorker`]. -/// -/// The snapshot will be first sorted by `committed_epoch`, then by `current_epoch`. -#[derive(Debug, PartialEq, Clone)] -struct SnapshotKey(PbHummockSnapshot); - -impl Eq for SnapshotKey {} - -impl Ord for SnapshotKey { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.0.committed_epoch.cmp(&other.0.committed_epoch) - } -} - -impl PartialOrd for SnapshotKey { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// The worker that manages the pin states of snapshots and unpins them periodically in a batch -/// through RPC to the meta service. -struct UnpinWorker { - meta_client: Arc, - - /// The receiver of operations from snapshot updating in [`HummockSnapshotManager`] and - /// dropping of [`PinnedSnapshot`]. - receiver: UnboundedReceiver, - - /// The pin states of existing snapshots in this frontend. - /// - /// All snapshots in this map are considered to be pinned by the meta service, those with - /// [`PinState::Unpinned`] will be unpinned in the next unpin batch through RPC. - states: BTreeMap, -} - -impl UnpinWorker { - fn new( - meta_client: Arc, - receiver: UnboundedReceiver, - ) -> Self { - Self { - meta_client, - receiver, - states: Default::default(), - } - } - - /// Run the loop of handling operations and unpinning snapshots. - async fn run(mut self) { - let mut ticker = tokio::time::interval(Duration::from_secs(UNPIN_INTERVAL_SECS)); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - - loop { - tokio::select! { - operation = self.receiver.recv() => { - let Some(operation) = operation else { return }; // manager dropped - self.handle_operation(operation); - } - - _ = ticker.tick() => { - self.unpin_batch().await; - } - } - } - } - - /// Handle an operation and manipulate the states. - fn handle_operation(&mut self, operation: Operation) { - if operation.is_invalid() { - return; - } - - match operation { - Operation::Pin(snapshot) => { - self.states - .try_insert(SnapshotKey(snapshot), PinState::Pinned) - .unwrap(); - } - Operation::Unpin(snapshot) => match self.states.entry(SnapshotKey(snapshot)) { - Entry::Vacant(_v) => unreachable!("unpin a snapshot that is not pinned"), - Entry::Occupied(o) => { - assert_matches!(o.get(), PinState::Pinned); - *o.into_mut() = PinState::Unpinned; - } - }, - } - } - - /// Try to unpin all continuous snapshots with [`PinState::Unpinned`] in a batch through RPC, - /// and clean up their entries. - async fn unpin_batch(&mut self) { - // Find the minimum snapshot that is pinned. Unpin all snapshots before it. - if let Some(min_snapshot) = self - .states - .iter() - .find(|(_, s)| matches!(s, PinState::Pinned)) - .map(|(k, _)| k.clone()) - { - if &min_snapshot == self.states.first_key_value().unwrap().0 { - // Nothing to unpin. - return; - } - - let min_epoch = min_snapshot.0.committed_epoch; - - match self.meta_client.unpin_snapshot_before(min_epoch).await { - Ok(()) => { - // Remove all snapshots before this one. - self.states = self.states.split_off(&min_snapshot); - } - Err(e) => { - tracing::error!(error = %e.as_report(), min_epoch, "unpin snapshot failed") - } - } - } - } -} diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 6123889262155..f8ee983b50551 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -938,10 +938,6 @@ pub struct MockFrontendMetaClient {} impl FrontendMetaClient for MockFrontendMetaClient { async fn try_unregister(&self) {} - async fn pin_snapshot(&self) -> RpcResult { - Ok(HummockSnapshot { committed_epoch: 0 }) - } - async fn get_snapshot(&self) -> RpcResult { Ok(HummockSnapshot { committed_epoch: 0 }) } @@ -981,14 +977,6 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(vec![]) } - async fn unpin_snapshot(&self) -> RpcResult<()> { - Ok(()) - } - - async fn unpin_snapshot_before(&self, _epoch: u64) -> RpcResult<()> { - Ok(()) - } - async fn list_meta_snapshots(&self) -> RpcResult> { Ok(vec![]) } @@ -1025,10 +1013,6 @@ impl FrontendMetaClient for MockFrontendMetaClient { unimplemented!() } - async fn list_hummock_pinned_snapshots(&self) -> RpcResult> { - unimplemented!() - } - async fn get_hummock_current_version(&self) -> RpcResult { Ok(HummockVersion::default()) } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index c3fc2da229585..ab73dbd2d6a2f 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -163,53 +163,6 @@ impl HummockManagerService for HummockServiceImpl { Ok(Response::new(resp)) } - async fn pin_specific_snapshot( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let hummock_snapshot = self - .hummock_manager - .pin_specific_snapshot(req.context_id, req.epoch) - .await?; - Ok(Response::new(PinSnapshotResponse { - status: None, - snapshot: Some(hummock_snapshot), - })) - } - - async fn pin_snapshot( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let hummock_snapshot = self.hummock_manager.pin_snapshot(req.context_id).await?; - Ok(Response::new(PinSnapshotResponse { - status: None, - snapshot: Some(hummock_snapshot), - })) - } - - async fn unpin_snapshot( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - self.hummock_manager.unpin_snapshot(req.context_id).await?; - Ok(Response::new(UnpinSnapshotResponse { status: None })) - } - - async fn unpin_snapshot_before( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - self.hummock_manager - .unpin_snapshot_before(req.context_id, req.min_snapshot.unwrap()) - .await?; - Ok(Response::new(UnpinSnapshotBeforeResponse { status: None })) - } - async fn get_new_sst_ids( &self, request: Request, @@ -363,23 +316,6 @@ impl HummockManagerService for HummockServiceImpl { })) } - async fn rise_ctl_get_pinned_snapshots_summary( - &self, - _request: Request, - ) -> Result, Status> { - let pinned_snapshots = self.hummock_manager.list_pinned_snapshot().await; - let workers = self - .hummock_manager - .list_workers(&pinned_snapshots.iter().map(|p| p.context_id).collect_vec()) - .await?; - Ok(Response::new(RiseCtlGetPinnedSnapshotsSummaryResponse { - summary: Some(PinnedSnapshotsSummary { - pinned_snapshots, - workers, - }), - })) - } - async fn get_assigned_compact_task_num( &self, _request: Request, diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index aada6c6876afe..909c3a856da6f 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -406,12 +406,7 @@ impl NotificationService for NotificationServiceImpl { let meta_snapshot = match subscribe_type { SubscribeType::Compactor => self.compactor_subscribe().await?, - SubscribeType::Frontend => { - self.hummock_manager - .pin_snapshot(req.get_worker_id()) - .await?; - self.frontend_subscribe().await? - } + SubscribeType::Frontend => self.frontend_subscribe().await?, SubscribeType::Hummock => { self.hummock_manager .pin_version(req.get_worker_id()) diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 1ff31013c2ee3..d90c61b168531 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -17,22 +17,17 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use fail::fail_point; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, INVALID_VERSION_ID, }; -use risingwave_pb::hummock::{ - HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, ValidationTask, -}; +use risingwave_pb::hummock::{HummockPinnedVersion, HummockSnapshot, ValidationTask}; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender}; use crate::hummock::manager::{commit_multi_var, start_measure_real_process_timer}; -use crate::hummock::metrics_utils::{ - trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state, -}; +use crate::hummock::metrics_utils::trigger_pin_unpin_version_state; use crate::hummock::HummockManager; use crate::manager::{MetaStoreImpl, MetadataManager, META_NODE_ID}; use crate::model::BTreeMapTransaction; @@ -60,7 +55,6 @@ impl Drop for HummockVersionSafePoint { #[derive(Default)] pub(super) struct ContextInfo { pub pinned_versions: BTreeMap, - pub pinned_snapshots: BTreeMap, /// `version_safe_points` is similar to `pinned_versions` expect for being a transient state. pub version_safe_points: Vec, } @@ -82,12 +76,10 @@ impl ContextInfo { ))); let mut pinned_versions = BTreeMapTransaction::new(&mut self.pinned_versions); - let mut pinned_snapshots = BTreeMapTransaction::new(&mut self.pinned_snapshots); for context_id in context_ids.as_ref() { pinned_versions.remove(*context_id); - pinned_snapshots.remove(*context_id); } - commit_multi_var!(meta_store_ref, pinned_versions, pinned_snapshots)?; + commit_multi_var!(meta_store_ref, pinned_versions)?; Ok(()) } @@ -174,7 +166,6 @@ impl HummockManager { .map(|c| c.context_id), ); active_context_ids.extend(context_info.pinned_versions.keys()); - active_context_ids.extend(context_info.pinned_snapshots.keys()); (active_context_ids, context_info) }; @@ -365,123 +356,26 @@ impl HummockManager { Ok(()) } - pub async fn pin_specific_snapshot( - &self, - context_id: HummockContextId, - epoch: HummockEpoch, - ) -> Result { + #[cfg(any(test, feature = "test"))] + pub async fn pin_snapshot(&self, _context_id: HummockContextId) -> Result { + // TODO #18214: remove this method let snapshot = self.latest_snapshot.load(); - let mut guard = self.context_info.write().await; - self.check_context_with_meta_node(context_id, &guard) - .await?; - let mut pinned_snapshots = BTreeMapTransaction::new(&mut guard.pinned_snapshots); - let mut context_pinned_snapshot = pinned_snapshots.new_entry_txn_or_default( - context_id, - HummockPinnedSnapshot { - context_id, - minimal_pinned_snapshot: INVALID_EPOCH, - }, - ); - let epoch_to_pin = std::cmp::min(epoch, snapshot.committed_epoch); - if context_pinned_snapshot.minimal_pinned_snapshot == INVALID_EPOCH { - context_pinned_snapshot.minimal_pinned_snapshot = epoch_to_pin; - commit_multi_var!(self.meta_store_ref(), context_pinned_snapshot)?; - } Ok(HummockSnapshot::clone(&snapshot)) } - /// Make sure `max_committed_epoch` is pinned and return it. - pub async fn pin_snapshot(&self, context_id: HummockContextId) -> Result { - let snapshot = self.latest_snapshot.load(); - let mut guard = self.context_info.write().await; - self.check_context_with_meta_node(context_id, &guard) - .await?; - let _timer = start_measure_real_process_timer!(self, "pin_snapshot"); - let mut pinned_snapshots = BTreeMapTransaction::new(&mut guard.pinned_snapshots); - let mut context_pinned_snapshot = pinned_snapshots.new_entry_txn_or_default( - context_id, - HummockPinnedSnapshot { - context_id, - minimal_pinned_snapshot: INVALID_EPOCH, - }, - ); - if context_pinned_snapshot.minimal_pinned_snapshot == INVALID_EPOCH { - context_pinned_snapshot.minimal_pinned_snapshot = snapshot.committed_epoch; - commit_multi_var!(self.meta_store_ref(), context_pinned_snapshot)?; - trigger_pin_unpin_snapshot_state(&self.metrics, &guard.pinned_snapshots); - } - Ok(HummockSnapshot::clone(&snapshot)) - } - - pub async fn unpin_snapshot(&self, context_id: HummockContextId) -> Result<()> { - let mut context_info = self.context_info.write().await; - self.check_context_with_meta_node(context_id, &context_info) - .await?; - let _timer = start_measure_real_process_timer!(self, "unpin_snapshot"); - let mut pinned_snapshots = BTreeMapTransaction::new(&mut context_info.pinned_snapshots); - let release_snapshot = pinned_snapshots.remove(context_id); - if release_snapshot.is_some() { - commit_multi_var!(self.meta_store_ref(), pinned_snapshots)?; - trigger_pin_unpin_snapshot_state(&self.metrics, &context_info.pinned_snapshots); - } - - #[cfg(test)] - { - drop(context_info); - self.check_state_consistency().await; - } - + #[cfg(any(test, feature = "test"))] + pub async fn unpin_snapshot(&self, _context_id: HummockContextId) -> Result<()> { + // TODO #18214: remove this method Ok(()) } - /// Unpin all snapshots smaller than specified epoch for current context. + #[cfg(any(test, feature = "test"))] pub async fn unpin_snapshot_before( &self, - context_id: HummockContextId, - hummock_snapshot: HummockSnapshot, + _context_id: HummockContextId, + _hummock_snapshot: HummockSnapshot, ) -> Result<()> { - let versioning = self.versioning.read().await; - let mut context_info = self.context_info.write().await; - self.check_context_with_meta_node(context_id, &context_info) - .await?; - let _timer = start_measure_real_process_timer!(self, "unpin_snapshot_before"); - // Use the max_committed_epoch in storage as the snapshot ts so only committed changes are - // visible in the snapshot. - let max_committed_epoch = versioning.current_version.visible_table_committed_epoch(); - // Ensure the unpin will not clean the latest one. - let snapshot_committed_epoch = hummock_snapshot.committed_epoch; - #[cfg(not(test))] - { - assert!(snapshot_committed_epoch <= max_committed_epoch); - } - let last_read_epoch = std::cmp::min(snapshot_committed_epoch, max_committed_epoch); - - let mut pinned_snapshots = BTreeMapTransaction::new(&mut context_info.pinned_snapshots); - let mut context_pinned_snapshot = pinned_snapshots.new_entry_txn_or_default( - context_id, - HummockPinnedSnapshot { - context_id, - minimal_pinned_snapshot: INVALID_EPOCH, - }, - ); - - // Unpin the snapshots pinned by meta but frontend doesn't know. Also equal to unpin all - // epochs below specific watermark. - if context_pinned_snapshot.minimal_pinned_snapshot < last_read_epoch - || context_pinned_snapshot.minimal_pinned_snapshot == INVALID_EPOCH - { - context_pinned_snapshot.minimal_pinned_snapshot = last_read_epoch; - commit_multi_var!(self.meta_store_ref(), context_pinned_snapshot)?; - trigger_pin_unpin_snapshot_state(&self.metrics, &context_info.pinned_snapshots); - } - - #[cfg(test)] - { - drop(context_info); - drop(versioning); - self.check_state_consistency().await; - } - + // TODO #18214: remove this method Ok(()) } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index d43b1cc6f5421..4cf29ca060e1d 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -29,12 +29,12 @@ use risingwave_hummock_sdk::{ HummockContextId, HummockVersionId, }; use risingwave_meta_model_v2::{ - compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version, - hummock_version_delta, hummock_version_stats, + compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta, + hummock_version_stats, }; use risingwave_pb::hummock::{ - HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, - PbCompactTaskAssignment, PbCompactionGroupInfo, SubscribeCompactionEventRequest, + HummockPinnedVersion, HummockSnapshot, HummockVersionStats, PbCompactTaskAssignment, + PbCompactionGroupInfo, SubscribeCompactionEventRequest, }; use risingwave_pb::meta::subscribe_response::Operation; use tokio::sync::mpsc::UnboundedSender; @@ -451,21 +451,6 @@ impl HummockManager { .collect(), }; - context_info.pinned_snapshots = match &meta_store { - MetaStoreImpl::Kv(meta_store) => HummockPinnedSnapshot::list(meta_store) - .await? - .into_iter() - .map(|p| (p.context_id, p)) - .collect(), - MetaStoreImpl::Sql(sql_meta_store) => hummock_pinned_snapshot::Entity::find() - .all(&sql_meta_store.conn) - .await - .map_err(MetadataModelError::from)? - .into_iter() - .map(|m| (m.context_id as HummockContextId, m.into())) - .collect(), - }; - self.delete_object_tracker.clear(); // Not delete stale objects when archive or time travel is enabled if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await { diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index d0183d84d23c5..ba00ad153b713 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -92,21 +92,9 @@ fn get_compaction_group_object_ids( .collect_vec() } -async fn list_pinned_snapshot_from_meta_store(env: &MetaSrvEnv) -> Vec { - match env.meta_store_ref() { - MetaStoreImpl::Kv(meta_store) => HummockPinnedSnapshot::list(meta_store).await.unwrap(), - MetaStoreImpl::Sql(sql_meta_store) => { - use risingwave_meta_model_v2::hummock_pinned_snapshot; - use sea_orm::EntityTrait; - hummock_pinned_snapshot::Entity::find() - .all(&sql_meta_store.conn) - .await - .unwrap() - .into_iter() - .map(Into::into) - .collect() - } - } +async fn list_pinned_snapshot_from_meta_store(_env: &MetaSrvEnv) -> Vec { + // TODO #18214: remove this method + vec![] } async fn list_pinned_version_from_meta_store(env: &MetaSrvEnv) -> Vec { diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index d2ca64e522f02..3f650c8380777 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -201,7 +201,7 @@ impl<'a> HummockVersionTransaction<'a> { *table_id, StateTableInfoDelta { committed_epoch, - safe_epoch: info.safe_epoch, + safe_epoch: 0, compaction_group_id: info.compaction_group_id, } ) diff --git a/src/meta/src/hummock/manager/utils.rs b/src/meta/src/hummock/manager/utils.rs index fd1372b3a5009..2020eee1172e9 100644 --- a/src/meta/src/hummock/manager/utils.rs +++ b/src/meta/src/hummock/manager/utils.rs @@ -97,14 +97,12 @@ impl HummockManager { let compact_statuses_copy = compaction_guard.compaction_statuses.clone(); let compact_task_assignment_copy = compaction_guard.compact_task_assignment.clone(); let pinned_versions_copy = context_info_guard.pinned_versions.clone(); - let pinned_snapshots_copy = context_info_guard.pinned_snapshots.clone(); let hummock_version_deltas_copy = versioning_guard.hummock_version_deltas.clone(); let version_stats_copy = versioning_guard.version_stats.clone(); (( compact_statuses_copy, compact_task_assignment_copy, pinned_versions_copy, - pinned_snapshots_copy, hummock_version_deltas_copy, version_stats_copy, ),) diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 3d621a1d59913..fc9fe3b28f650 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -31,7 +31,7 @@ use risingwave_hummock_sdk::{ use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, TableStats, + HummockPinnedVersion, HummockSnapshot, HummockVersionStats, TableStats, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -118,16 +118,6 @@ impl HummockManager { .collect_vec() } - pub async fn list_pinned_snapshot(&self) -> Vec { - self.context_info - .read() - .await - .pinned_snapshots - .values() - .cloned() - .collect_vec() - } - pub async fn list_workers( &self, context_ids: &[HummockContextId], diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index b80fd0537659a..ad39e2228fba0 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -24,10 +24,10 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_m use risingwave_hummock_sdk::level::Levels; use risingwave_hummock_sdk::table_stats::PbTableStatsMap; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId, HummockEpoch, HummockVersionId}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId, HummockVersionId}; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - CompactionConfig, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats, LevelType, + CompactionConfig, HummockPinnedVersion, HummockVersionStats, LevelType, }; use super::compaction::selector::DynamicLevelSelectorCore; @@ -401,21 +401,6 @@ pub fn trigger_pin_unpin_version_state( } } -pub fn trigger_pin_unpin_snapshot_state( - metrics: &MetaMetrics, - pinned_snapshots: &BTreeMap, -) { - if let Some(m) = pinned_snapshots - .values() - .map(|v| v.minimal_pinned_snapshot) - .min() - { - metrics.min_pinned_epoch.set(m as i64); - } else { - metrics.min_pinned_epoch.set(HummockEpoch::MAX as _); - } -} - pub fn trigger_gc_stat( metrics: &MetaMetrics, checkpoint: &HummockVersionCheckpoint, diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 499d9df0958c4..849b79cbaa5a5 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -117,36 +117,10 @@ impl HummockMetaClient for MockHummockMetaClient { Ok(self.hummock_manager.get_current_version().await) } - async fn pin_snapshot(&self) -> Result { - self.hummock_manager - .pin_snapshot(self.context_id) - .await - .map_err(mock_err) - } - async fn get_snapshot(&self) -> Result { Ok(self.hummock_manager.latest_snapshot()) } - async fn unpin_snapshot(&self) -> Result<()> { - self.hummock_manager - .unpin_snapshot(self.context_id) - .await - .map_err(mock_err) - } - - async fn unpin_snapshot_before(&self, pinned_epochs: HummockEpoch) -> Result<()> { - self.hummock_manager - .unpin_snapshot_before( - self.context_id, - HummockSnapshot { - committed_epoch: pinned_epochs, - }, - ) - .await - .map_err(mock_err) - } - async fn get_new_sst_ids(&self, number: u32) -> Result { fail_point!("get_new_sst_ids_err", |_| Err(anyhow!( "failpoint get_new_sst_ids_err" diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index bb62875b3fae1..23a367d3603e6 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -32,9 +32,6 @@ use crate::error::Result; pub trait HummockMetaClient: Send + Sync + 'static { async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()>; async fn get_current_version(&self) -> Result; - async fn pin_snapshot(&self) -> Result; - async fn unpin_snapshot(&self) -> Result<()>; - async fn unpin_snapshot_before(&self, pinned_epochs: HummockEpoch) -> Result<()>; async fn get_snapshot(&self) -> Result; async fn get_new_sst_ids(&self, number: u32) -> Result; // We keep `commit_epoch` only for test/benchmark. diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index c7b7204bff7c8..ddaaea022c173 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1030,15 +1030,6 @@ impl MetaClient { .await } - pub async fn risectl_get_pinned_snapshots_summary( - &self, - ) -> Result { - let request = RiseCtlGetPinnedSnapshotsSummaryRequest {}; - self.inner - .rise_ctl_get_pinned_snapshots_summary(request) - .await - } - pub async fn risectl_get_checkpoint_hummock_version( &self, ) -> Result { @@ -1135,15 +1126,6 @@ impl MetaClient { )) } - pub async fn pin_specific_snapshot(&self, epoch: HummockEpoch) -> Result { - let req = PinSpecificSnapshotRequest { - context_id: self.worker_id(), - epoch, - }; - let resp = self.inner.pin_specific_snapshot(req).await?; - Ok(resp.snapshot.unwrap()) - } - pub async fn get_assigned_compact_task_num(&self) -> Result { let req = GetAssignedCompactTaskNumRequest {}; let resp = self.inner.get_assigned_compact_task_num(req).await?; @@ -1489,40 +1471,12 @@ impl HummockMetaClient for MetaClient { )) } - async fn pin_snapshot(&self) -> Result { - let req = PinSnapshotRequest { - context_id: self.worker_id(), - }; - let resp = self.inner.pin_snapshot(req).await?; - Ok(resp.snapshot.unwrap()) - } - async fn get_snapshot(&self) -> Result { let req = GetEpochRequest {}; let resp = self.inner.get_epoch(req).await?; Ok(resp.snapshot.unwrap()) } - async fn unpin_snapshot(&self) -> Result<()> { - let req = UnpinSnapshotRequest { - context_id: self.worker_id(), - }; - self.inner.unpin_snapshot(req).await?; - Ok(()) - } - - async fn unpin_snapshot_before(&self, pinned_epochs: HummockEpoch) -> Result<()> { - let req = UnpinSnapshotBeforeRequest { - context_id: self.worker_id(), - // For unpin_snapshot_before, we do not care about snapshots list but only min epoch. - min_snapshot: Some(HummockSnapshot { - committed_epoch: pinned_epochs, - }), - }; - self.inner.unpin_snapshot_before(req).await?; - Ok(()) - } - async fn get_new_sst_ids(&self, number: u32) -> Result { let resp = self .inner @@ -2109,18 +2063,13 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse } ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse } ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse } - ,{ hummock_client, pin_snapshot, PinSnapshotRequest, PinSnapshotResponse } - ,{ hummock_client, pin_specific_snapshot, PinSpecificSnapshotRequest, PinSnapshotResponse } ,{ hummock_client, get_epoch, GetEpochRequest, GetEpochResponse } - ,{ hummock_client, unpin_snapshot, UnpinSnapshotRequest, UnpinSnapshotResponse } - ,{ hummock_client, unpin_snapshot_before, UnpinSnapshotBeforeRequest, UnpinSnapshotBeforeResponse } ,{ hummock_client, get_new_sst_ids, GetNewSstIdsRequest, GetNewSstIdsResponse } ,{ hummock_client, report_vacuum_task, ReportVacuumTaskRequest, ReportVacuumTaskResponse } ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse } ,{ hummock_client, report_full_scan_task, ReportFullScanTaskRequest, ReportFullScanTaskResponse } ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse } ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse } - ,{ hummock_client, rise_ctl_get_pinned_snapshots_summary, RiseCtlGetPinnedSnapshotsSummaryRequest, RiseCtlGetPinnedSnapshotsSummaryResponse } ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse } ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse } ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse } diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index d123558acc50b..8accf288b357f 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -56,22 +56,10 @@ impl HummockMetaClient for MonitoredHummockMetaClient { self.meta_client.get_current_version().await } - async fn pin_snapshot(&self) -> Result { - self.meta_client.pin_snapshot().await - } - async fn get_snapshot(&self) -> Result { self.meta_client.get_snapshot().await } - async fn unpin_snapshot(&self) -> Result<()> { - self.meta_client.unpin_snapshot().await - } - - async fn unpin_snapshot_before(&self, _min_epoch: HummockEpoch) -> Result<()> { - unreachable!("Currently CNs should not call this function") - } - async fn get_new_sst_ids(&self, number: u32) -> Result { self.stats.get_new_sst_ids_counts.inc(); let timer = self.stats.get_new_sst_ids_latency.start_timer(); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 328c23f8fbe80..15d73c0eb86ef 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -520,14 +520,13 @@ async fn start_replay( } async fn pin_old_snapshots( - meta_client: &MetaClient, + _meta_client: &MetaClient, replayed_epochs: &[HummockEpoch], num: usize, ) -> Vec { let mut old_epochs = vec![]; for &epoch in replayed_epochs.iter().rev().take(num) { old_epochs.push(epoch); - let _ = meta_client.pin_specific_snapshot(epoch).await; } old_epochs }