Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): remove most usage of max_committed_epoch #18641

Merged
merged 66 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
826da0b
feat(storage): notify frontend with more hummock version info
wenym1 Sep 17, 2024
14fdb68
temp save
wenym1 Sep 18, 2024
e4b390c
fix test
wenym1 Sep 18, 2024
9fc23f9
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 18, 2024
3b142d7
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 18, 2024
73674f3
feat(frontend): generate query epoch by committed epoch of involved t…
wenym1 Sep 18, 2024
8fff90b
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 18, 2024
880d15a
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 18, 2024
ab43fbd
rename
wenym1 Sep 18, 2024
d74fe42
fix recursive
wenym1 Sep 18, 2024
64b01d5
feat(frontend): frontend wait hummock version id for ddl
wenym1 Sep 19, 2024
04b112c
fix test
wenym1 Sep 19, 2024
c0ee9cc
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 19, 2024
cfd925d
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 20, 2024
9808588
avoid recalculate
wenym1 Sep 20, 2024
201dde3
fix typo
wenym1 Sep 20, 2024
deb0b17
Merge branch 'main' into yiming/more-frontend-hummock-info
wenym1 Sep 20, 2024
23bb7c6
Merge branch 'yiming/more-frontend-hummock-info' into yiming/per-tabl…
wenym1 Sep 20, 2024
c840b21
Merge branch 'yiming/more-frontend-hummock-info' into yiming/frontend…
wenym1 Sep 20, 2024
f1cf8ec
feat(storage): per table try wait epoch
wenym1 Sep 20, 2024
1836fec
Merge branch 'main' into yiming/per-table-try-wait-epoch
wenym1 Sep 20, 2024
f5f78d4
fix test
wenym1 Sep 20, 2024
902c065
fix test
wenym1 Sep 20, 2024
ff24069
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 21, 2024
a520eba
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 21, 2024
4bda7f4
Merge branch 'main' into yiming/frontend-wait-hummock-version
wenym1 Sep 21, 2024
302c576
Merge branch 'yiming/per-table-pin-snapshot' into yiming/no-frontend-…
wenym1 Sep 21, 2024
eda665c
remove frontend max committed epoch
wenym1 Sep 21, 2024
8ead9e7
Merge branch 'yiming/per-table-try-wait-epoch' into yiming/no-fronten…
wenym1 Sep 21, 2024
c81616e
reduce most usage on max committed epoch
wenym1 Sep 21, 2024
f06e7a1
remove HummockSnapshot
wenym1 Sep 22, 2024
5b92fa6
refactor(meta): remove BarrierInfo and simplify pause and resume log …
wenym1 Sep 22, 2024
84c8037
fix comment
wenym1 Sep 23, 2024
c88b9df
simplify now
wenym1 Sep 23, 2024
40f8ac6
refine
wenym1 Sep 23, 2024
6e0e2b7
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 23, 2024
4e897e9
address comment
wenym1 Sep 23, 2024
4723dbe
impl ScanTableVisitor
wenym1 Sep 23, 2024
1b2340c
Merge branch 'main' into yiming/remove-barrier-info
wenym1 Sep 23, 2024
71d25c5
Merge branch 'yiming/remove-barrier-info' into yiming/frontend-wait-h…
wenym1 Sep 23, 2024
5763711
Merge branch 'yiming/frontend-wait-hummock-version' into yiming/no-fr…
wenym1 Sep 23, 2024
5fb9ab5
Merge branch 'main' into yiming/per-table-try-wait-epoch
wenym1 Sep 23, 2024
3b8b9aa
Merge branch 'yiming/per-table-try-wait-epoch' into yiming/no-fronten…
wenym1 Sep 23, 2024
6867d41
Merge branch 'yiming/per-table-pin-snapshot' into yiming/no-frontend-…
wenym1 Sep 23, 2024
65bc6fa
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 23, 2024
0d0cc25
remove QuerySnapshot
wenym1 Sep 23, 2024
6d32389
Merge branch 'yiming/per-table-pin-snapshot' into yiming/per-table-tr…
wenym1 Sep 23, 2024
3a94f47
Merge branch 'yiming/per-table-try-wait-epoch' into yiming/no-fronten…
wenym1 Sep 23, 2024
da00775
revert planner change
wenym1 Sep 23, 2024
fee2a0f
use epoch::now
wenym1 Sep 23, 2024
9f8567b
fix test
wenym1 Sep 23, 2024
6cb7b8c
Merge branch 'main' into yiming/per-table-pin-snapshot
wenym1 Sep 24, 2024
0df1339
Merge branch 'yiming/per-table-pin-snapshot' into yiming/per-table-tr…
wenym1 Sep 24, 2024
86eaf2c
Merge branch 'yiming/per-table-try-wait-epoch' into yiming/no-fronten…
wenym1 Sep 24, 2024
44a3b4a
separate wait epoch of batch and streaming
wenym1 Sep 25, 2024
ab3e10d
refine
wenym1 Sep 25, 2024
3b10660
fix
wenym1 Sep 26, 2024
49c7cde
fix typo
wenym1 Sep 26, 2024
02b0682
fix
wenym1 Sep 26, 2024
c74a62a
fix borrow and update
wenym1 Sep 26, 2024
eba2456
add log
wenym1 Sep 26, 2024
cb0ccd2
remove log
wenym1 Sep 27, 2024
d2fe2b6
Merge branch 'yiming/per-table-try-wait-epoch' into yiming/no-fronten…
wenym1 Sep 27, 2024
ba325f1
Merge branch 'main' into yiming/per-table-try-wait-epoch
wenym1 Sep 27, 2024
f4aebc8
Merge branch 'yiming/per-table-try-wait-epoch' into yiming/no-fronten…
wenym1 Sep 27, 2024
daf58b3
Merge branch 'main' into yiming/no-frontend-max-committed-epoch
wenym1 Sep 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,6 @@ message HummockVersionArchive {
repeated HummockVersionDelta version_deltas = 2;
}

// We will have two epoch after decouple
message HummockSnapshot {
// Epoch with checkpoint, we will read durable data with it.
uint64 committed_epoch = 1;
// Epoch without checkpoint, we will read real-time data with it. But it may be rolled back.
reserved 2;
reserved "current_epoch";
}

message VersionUpdatePayload {
oneof payload {
HummockVersionDeltas version_deltas = 1;
Expand Down Expand Up @@ -273,13 +264,6 @@ message GetAssignedCompactTaskNumResponse {
uint32 num_tasks = 1;
}

message GetEpochRequest {}

message GetEpochResponse {
common.Status status = 1;
HummockSnapshot snapshot = 2;
}

// When right_exclusive=false, it represents [left, right], of which both boundary are open. When right_exclusive=true,
// it represents [left, right), of which right is exclusive.
message KeyRange {
Expand Down Expand Up @@ -811,7 +795,6 @@ service HummockManagerService {
rpc GetAssignedCompactTaskNum(GetAssignedCompactTaskNumRequest) returns (GetAssignedCompactTaskNumResponse);
rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse);
rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse);
rpc GetEpoch(GetEpochRequest) returns (GetEpochResponse);
rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse);
rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse);
rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse);
Expand Down
6 changes: 1 addition & 5 deletions src/ctl/src/cmd_impl/hummock/list_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ pub async fn list_version(

println!("{:#?}", version);
} else {
println!(
"Version {} max_committed_epoch {}",
version.id,
version.visible_table_committed_epoch()
);
println!("Version {}", version.id);

for (cg, levels) in &version.levels {
println!("CompactionGroup {}", cg);
Expand Down
3 changes: 1 addition & 2 deletions src/ctl/src/cmd_impl/hummock/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ pub async fn disable_commit_epoch(context: &CtlContext) -> anyhow::Result<()> {
let version = meta_client.disable_commit_epoch().await?;
println!(
"Disabled.\
Current version: id {}, max_committed_epoch {}",
Current version: id {}",
version.id,
version.visible_table_committed_epoch()
);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
args.use_new_object_prefix_strategy,
)?)
.await?;
let version = hummock.inner().get_pinned_version().version().clone();
let version = hummock.inner().get_pinned_version().clone();
let sstable_store = hummock.sstable_store();
for level in version.get_combined_levels() {
for sstable_info in &level.table_infos {
Expand Down
7 changes: 2 additions & 5 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,8 @@ pub async fn print_version_delta_in_archive(
if is_first {
is_first = false;
println!(
"delta: id {}, prev_id {}, max_committed_epoch {}, trivial_move {}",
delta.id,
delta.prev_id,
delta.max_committed_epoch,
delta.trivial_move
"delta: id {}, prev_id {}, trivial_move {}",
delta.id, delta.prev_id, delta.trivial_move
);
}
println!("compaction group id {cg_id}");
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.map(|vd| hummock_version_delta::ActiveModel {
id: Set(vd.id.to_u64() as _),
prev_id: Set(vd.prev_id.to_u64() as _),
max_committed_epoch: Set(vd.visible_table_committed_epoch() as _),
max_committed_epoch: Set(vd.max_committed_epoch_for_migration() as _),
safe_epoch: Set(0 as _),
trivial_move: Set(vd.trivial_move),
full_version_delta: Set((&vd.to_protobuf()).into()),
Expand Down
9 changes: 8 additions & 1 deletion src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,14 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorag
let read_epoch = hummock
.inner()
.get_pinned_version()
.visible_table_committed_epoch();
.table_committed_epoch(table.id);
let Some(read_epoch) = read_epoch else {
println!(
"table {} with id {} not exist in the latest version",
table.name, table.id
);
return Ok(());
};
let storage_table = make_storage_table(hummock, &table)?;
let stream = storage_table
.batch_iter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::error::Result;
struct RwHummockVersion {
#[primary_key]
version_id: i64,
max_committed_epoch: i64,
compaction_group: JsonbVal,
}

Expand Down Expand Up @@ -101,7 +100,6 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec<RwHummockVe
.values()
.map(|cg| RwHummockVersion {
version_id: version.id.to_u64() as _,
max_committed_epoch: version.visible_table_committed_epoch() as _,
compaction_group: json!(cg.to_protobuf()).into(),
})
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct RwHummockVersionDelta {
#[primary_key]
id: i64,
prev_id: i64,
max_committed_epoch: i64,
trivial_move: bool,
group_deltas: JsonbVal,
}
Expand All @@ -40,7 +39,6 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockVersionDelta
.map(|d| RwHummockVersionDelta {
id: d.id.to_u64() as _,
prev_id: d.prev_id.to_u64() as _,
max_committed_epoch: d.visible_table_committed_epoch() as _,
trivial_move: d.trivial_move,
group_deltas: json!(d
.group_deltas
Expand Down
6 changes: 0 additions & 6 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand All @@ -47,7 +46,6 @@ use risingwave_rpc_client::{HummockMetaClient, MetaClient};
#[async_trait::async_trait]
pub trait FrontendMetaClient: Send + Sync {
async fn try_unregister(&self);
async fn get_snapshot(&self) -> Result<HummockSnapshot>;

async fn flush(&self, checkpoint: bool) -> Result<HummockVersionId>;

Expand Down Expand Up @@ -139,10 +137,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.try_unregister().await;
}

async fn get_snapshot(&self) -> Result<HummockSnapshot> {
self.0.get_snapshot().await
}

async fn flush(&self, checkpoint: bool) -> Result<HummockVersionId> {
self.0.flush(checkpoint).await
}
Expand Down
5 changes: 0 additions & 5 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use risingwave_pb::ddl_service::{
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand Down Expand Up @@ -938,10 +937,6 @@ pub struct MockFrontendMetaClient {}
impl FrontendMetaClient for MockFrontendMetaClient {
async fn try_unregister(&self) {}

async fn get_snapshot(&self) -> RpcResult<HummockSnapshot> {
Ok(HummockSnapshot { committed_epoch: 0 })
}

async fn flush(&self, _checkpoint: bool) -> RpcResult<HummockVersionId> {
Ok(INVALID_VERSION_ID)
}
Expand Down
17 changes: 1 addition & 16 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let version_deltas = self
.hummock_manager
.list_version_deltas(
HummockVersionId::new(req.start_id),
req.num_limit,
req.committed_epoch_limit,
)
.list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit)
.await?;
let resp = ListVersionDeltasResponse {
version_deltas: Some(PbHummockVersionDeltas {
Expand Down Expand Up @@ -249,17 +245,6 @@ impl HummockManagerService for HummockServiceImpl {
}))
}

async fn get_epoch(
&self,
_request: Request<GetEpochRequest>,
) -> Result<Response<GetEpochResponse>, Status> {
let hummock_snapshot = self.hummock_manager.latest_snapshot();
Ok(Response::new(GetEpochResponse {
status: None,
snapshot: Some(hummock_snapshot),
}))
}

async fn report_full_scan_task(
&self,
request: Request<ReportFullScanTaskRequest>,
Expand Down
11 changes: 1 addition & 10 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,16 +712,13 @@ impl GlobalBarrierManager {
}

{
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into());

// Bootstrap recovery. Here we simply trigger a recovery process to achieve the
// consistency.
// Even if there's no actor to recover, we still go through the recovery process to
// inject the first `Initial` barrier.
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap));
let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);
let span = tracing::info_span!("bootstrap_recovery");
crate::telemetry::report_event(
risingwave_pb::telemetry::TelemetryEventStage::Recovery,
"normal_recovery",
Expand Down Expand Up @@ -1102,12 +1099,9 @@ impl GlobalBarrierManager {
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover(
err.clone(),
)));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch
let span = tracing::info_span!(
"failure_recovery",
error = %err.as_report(),
prev_epoch = prev_epoch.value().0
);

crate::telemetry::report_event(
Expand All @@ -1134,12 +1128,9 @@ impl GlobalBarrierManager {

self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch
let span = tracing::info_span!(
"adhoc_recovery",
error = %err.as_report(),
prev_epoch = prev_epoch.value().0
);

crate::telemetry::report_event(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl GlobalBarrierManager {
.context
.hummock_manager
.on_current_version(|version| {
let max_committed_epoch = version.visible_table_committed_epoch();
let max_committed_epoch = version.max_committed_epoch_for_meta();
for (table_id, info) in version.state_table_info.info() {
assert_eq!(
info.committed_epoch, max_committed_epoch,
Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
};
use risingwave_pb::hummock::compact_task::{self};
use risingwave_pb::hummock::HummockSnapshot;
use sea_orm::TransactionTrait;

use crate::hummock::error::{Error, Result};
Expand Down Expand Up @@ -288,12 +287,6 @@ impl HummockManager {
)?;
}

if is_visible_table_committed_epoch {
let snapshot = HummockSnapshot { committed_epoch };
let prev_snapshot = self.latest_snapshot.swap(snapshot.into());
assert!(prev_snapshot.committed_epoch < committed_epoch);
}

for compaction_group_id in &modified_compaction_groups {
trigger_sst_stat(
&self.metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_hummock_sdk::compact_task::ReportTask;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
get_compaction_group_ids, TableGroupInfo,
Expand Down Expand Up @@ -220,10 +221,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let epoch = new_version_delta
.latest_version()
.visible_table_committed_epoch();
let mut new_version_delta = version.new_delta(None);

for (table_id, raw_group_id) in pairs {
let mut group_id = *raw_group_id;
Expand Down Expand Up @@ -267,7 +265,7 @@ impl HummockManager {
.insert(
TableId::new(*table_id),
PbStateTableInfoDelta {
committed_epoch: epoch,
committed_epoch: INVALID_EPOCH,
compaction_group_id: *raw_group_id,
}
)
Expand Down Expand Up @@ -295,7 +293,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let mut new_version_delta = version.new_delta(None);
let mut modified_groups: HashMap<CompactionGroupId, /* #member table */ u64> =
HashMap::new();
// Remove member tables
Expand Down Expand Up @@ -483,7 +481,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let mut new_version_delta = version.new_delta(None);

let new_sst_start_id = next_sstable_object_id(
&self.env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let mut new_version_delta = version.new_delta(None);

let target_compaction_group_id = {
// merge right_group_id to left_group_id and remove right_group_id
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelecto

impl<'a> HummockVersionTransaction<'a> {
fn apply_compact_task(&mut self, compact_task: &CompactTask) {
let mut version_delta = self.new_delta();
let mut version_delta = self.new_delta(None);
let trivial_move = CompactStatus::is_trivial_move_task(compact_task);
version_delta.trivial_move = trivial_move;

Expand Down Expand Up @@ -1334,9 +1334,8 @@ impl HummockManager {
) -> Result<()> {
self.on_current_version(|old_version| {
tracing::info!(
"Trigger compaction for version {}, epoch {}, groups {:?}",
"Trigger compaction for version {}, groups {:?}",
old_version.id,
old_version.visible_table_committed_epoch(),
compaction_groups
);
})
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ impl HummockManager {
}

if is_visible_table_committed_epoch
&& committed_epoch <= current_version.visible_table_committed_epoch()
&& committed_epoch <= current_version.max_committed_epoch_for_meta()
{
return Err(anyhow::anyhow!(
"Epoch {} <= max_committed_epoch {}",
committed_epoch,
current_version.visible_table_committed_epoch()
current_version.max_committed_epoch_for_meta()
)
.into());
}
Expand Down
Loading
Loading