diff --git a/proto/backup_service.proto b/proto/backup_service.proto index feca5f17b7dc3..c84b628e58550 100644 --- a/proto/backup_service.proto +++ b/proto/backup_service.proto @@ -45,6 +45,7 @@ message MetaSnapshotMetadata { uint64 hummock_version_id = 2; uint64 max_committed_epoch = 3; uint64 safe_epoch = 4; + optional uint32 format_version = 5; } service BackupService { diff --git a/src/meta/src/backup_restore/mod.rs b/src/meta/src/backup_restore/mod.rs index 0dfe5b3442415..fc33af5ef4628 100644 --- a/src/meta/src/backup_restore/mod.rs +++ b/src/meta/src/backup_restore/mod.rs @@ -19,6 +19,7 @@ mod meta_snapshot_builder; mod meta_snapshot_builder_v2; mod metrics; mod restore; +mod restore_impl; mod utils; pub use restore::*; diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index 48dd6fc0cb664..5474b43654017 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -14,22 +14,20 @@ use std::sync::Arc; -use itertools::Itertools; use risingwave_backup::error::{BackupError, BackupResult}; -use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1; +use risingwave_backup::meta_snapshot::Metadata; use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef}; +use risingwave_backup::MetaSnapshotId; use risingwave_common::config::MetaBackend; use risingwave_hummock_sdk::version_checkpoint_path; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::hummock::{HummockVersion, HummockVersionCheckpoint}; +use crate::backup_restore::restore_impl::v1::{LoaderV1, WriterModelV1ToMetaStoreV1}; +use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2}; +use crate::backup_restore::restore_impl::{Loader, Writer}; use crate::backup_restore::utils::{get_backup_store, get_meta_store, MetaStoreBackendImpl}; -use crate::dispatch_meta_store; -use crate::hummock::model::CompactionGroup; -use crate::manager::model::SystemParamsModel; -use crate::model::{ClusterId, MetadataModel, TableFragments}; -use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY}; /// Command-line arguments for restore. #[derive(clap::Args, Debug, Clone)] @@ -98,99 +96,6 @@ async fn restore_hummock_version( Ok(()) } -async fn restore_metadata_model( - meta_store: &S, - metadata: &[T], -) -> BackupResult<()> { - if !T::list(meta_store).await?.is_empty() { - return Err(BackupError::NonemptyMetaStorage); - } - for d in metadata { - d.insert(meta_store).await?; - } - Ok(()) -} - -async fn restore_system_param_model( - meta_store: &S, - metadata: &[T], -) -> BackupResult<()> { - if T::get(meta_store).await?.is_some() { - return Err(BackupError::NonemptyMetaStorage); - } - for d in metadata { - d.insert(meta_store).await?; - } - Ok(()) -} - -async fn restore_cluster_id( - meta_store: &S, - cluster_id: ClusterId, -) -> BackupResult<()> { - if ClusterId::from_meta_store(meta_store).await?.is_some() { - return Err(BackupError::NonemptyMetaStorage); - } - cluster_id.put_at_meta_store(meta_store).await?; - Ok(()) -} - -async fn restore_default_cf( - meta_store: &S, - snapshot: &MetaSnapshotV1, -) -> BackupResult<()> { - if !meta_store.list_cf(DEFAULT_COLUMN_FAMILY).await?.is_empty() { - return Err(BackupError::NonemptyMetaStorage); - } - for (k, v) in &snapshot.metadata.default_cf { - meta_store - .put_cf(DEFAULT_COLUMN_FAMILY, k.clone(), v.clone()) - .await?; - } - Ok(()) -} - -async fn restore_metadata( - meta_store: S, - snapshot: MetaSnapshotV1, -) -> BackupResult<()> { - restore_default_cf(&meta_store, &snapshot).await?; - restore_metadata_model(&meta_store, &[snapshot.metadata.version_stats]).await?; - restore_metadata_model( - &meta_store, - &snapshot - .metadata - .compaction_groups - .into_iter() - .map(CompactionGroup::from_protobuf) - .collect_vec(), - ) - .await?; - restore_metadata_model( - &meta_store, - &snapshot - .metadata - .table_fragments - .into_iter() - .map(TableFragments::from_protobuf) - .collect_vec(), - ) - .await?; - restore_metadata_model(&meta_store, &snapshot.metadata.user_info).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.database).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.schema).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.table).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.index).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.sink).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.view).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.source).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.function).await?; - restore_metadata_model(&meta_store, &snapshot.metadata.connection).await?; - restore_system_param_model(&meta_store, &[snapshot.metadata.system_param]).await?; - restore_cluster_id(&meta_store, snapshot.metadata.cluster_id.into()).await?; - Ok(()) -} - /// Restores a meta store. /// Uses `meta_store` and `backup_store` if provided. /// Otherwise creates them based on `opts`. @@ -212,62 +117,69 @@ async fn restore_impl( Some(b) => b, }; let target_id = opts.meta_snapshot_id; - let snapshot_list = backup_store.manifest().snapshot_metadata.clone(); + let snapshot_list = &backup_store.manifest().snapshot_metadata; if !snapshot_list.iter().any(|m| m.id == target_id) { return Err(BackupError::Other(anyhow::anyhow!( "snapshot id {} not found", target_id ))); } - let mut target_snapshot: MetaSnapshotV1 = backup_store.get(target_id).await?; - tracing::info!( - "snapshot {} before rewrite:\n{}", - target_id, - target_snapshot - ); - let newest_id = snapshot_list - .into_iter() - .map(|m| m.id) - .max() - .expect("should exist"); - assert!(newest_id >= target_id); - // Always use newest snapshot's `default_cf` during restoring, in order not to corrupt shared - // data of snapshots. Otherwise, for example if we restore a older SST id generator, an - // existent SST in object store is at risk of being overwrote by the restored cluster. - // All these risky metadata are in `default_cf`, e.g. id generator, epoch. They must satisfy: - // - Value is monotonically non-decreasing. - // - Value is memcomparable. - // - Keys of newest_snapshot is a superset of that of target_snapshot. - if newest_id > target_id { - let newest_snapshot: MetaSnapshotV1 = backup_store.get(newest_id).await?; - for (k, v) in &target_snapshot.metadata.default_cf { - let newest_v = newest_snapshot - .metadata - .default_cf - .get(k) - .unwrap_or_else(|| panic!("violate superset requirement. key {:x?}", k)); - assert!(newest_v >= v, "violate monotonicity requirement"); + + let format_version = match snapshot_list.iter().find(|m| m.id == target_id) { + None => { + return Err(BackupError::Other(anyhow::anyhow!( + "snapshot id {} not found", + target_id + ))); + } + Some(s) => s.format_version, + }; + match &meta_store { + MetaStoreBackendImpl::Sql(m) => { + if format_version < 2 { + todo!("write model V1 to meta store V2"); + } else { + dispatch( + target_id, + &opts, + LoaderV2::new(backup_store), + WriterModelV2ToMetaStoreV2::new(m.to_owned()), + ) + .await?; + } + } + _ => { + assert!(format_version < 2, "format_version {}", format_version); + dispatch( + target_id, + &opts, + LoaderV1::new(backup_store), + WriterModelV1ToMetaStoreV1::new(meta_store), + ) + .await?; } - target_snapshot.metadata.default_cf = newest_snapshot.metadata.default_cf; - tracing::info!( - "snapshot {} after rewrite by snapshot {}:\n{}", - target_id, - newest_id, - target_snapshot, - ); } + + Ok(()) +} + +async fn dispatch, W: Writer, S: Metadata>( + target_id: MetaSnapshotId, + opts: &RestoreOpts, + loader: L, + writer: W, +) -> BackupResult<()> { + let target_snapshot = loader.load(target_id).await?; if opts.dry_run { return Ok(()); } restore_hummock_version( &opts.hummock_storage_url, &opts.hummock_storage_directory, - &target_snapshot.metadata.hummock_version, + target_snapshot.metadata.hummock_version_ref(), ) .await?; - dispatch_meta_store!(meta_store.clone(), store, { - restore_metadata(store.clone(), target_snapshot.clone()).await?; - }); + writer.write(target_snapshot).await?; Ok(()) } diff --git a/src/meta/src/backup_restore/restore_impl/mod.rs b/src/meta/src/backup_restore/restore_impl/mod.rs new file mode 100644 index 0000000000000..0d997c7af7cab --- /dev/null +++ b/src/meta/src/backup_restore/restore_impl/mod.rs @@ -0,0 +1,32 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_backup::error::BackupResult; +use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata}; +use risingwave_backup::MetaSnapshotId; + +pub mod v1; +pub mod v2; + +/// `Loader` gets, validates and amends `MetaSnapshot`. +#[async_trait::async_trait] +pub trait Loader { + async fn load(&self, id: MetaSnapshotId) -> BackupResult>; +} + +/// `Writer` writes `MetaSnapshot` to meta store. +#[async_trait::async_trait] +pub trait Writer { + async fn write(&self, s: MetaSnapshot) -> BackupResult<()>; +} diff --git a/src/meta/src/backup_restore/restore_impl/v1.rs b/src/meta/src/backup_restore/restore_impl/v1.rs new file mode 100644 index 0000000000000..793ce411714c1 --- /dev/null +++ b/src/meta/src/backup_restore/restore_impl/v1.rs @@ -0,0 +1,202 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_backup::error::{BackupError, BackupResult}; +use risingwave_backup::meta_snapshot::MetaSnapshot; +use risingwave_backup::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1}; +use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef}; +use risingwave_backup::MetaSnapshotId; + +use crate::backup_restore::restore_impl::{Loader, Writer}; +use crate::backup_restore::utils::MetaStoreBackendImpl; +use crate::dispatch_meta_store; +use crate::hummock::model::CompactionGroup; +use crate::manager::model::SystemParamsModel; +use crate::model::{ClusterId, MetadataModel, TableFragments}; +use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY}; + +pub struct LoaderV1 { + backup_store: MetaSnapshotStorageRef, +} + +impl LoaderV1 { + pub fn new(backup_store: MetaSnapshotStorageRef) -> Self { + Self { backup_store } + } +} + +#[async_trait::async_trait] +impl Loader for LoaderV1 { + async fn load(&self, target_id: MetaSnapshotId) -> BackupResult> { + let backup_store = &self.backup_store; + let snapshot_list = &backup_store.manifest().snapshot_metadata; + let mut target_snapshot: MetaSnapshotV1 = backup_store.get(target_id).await?; + tracing::info!( + "snapshot {} before rewrite:\n{}", + target_id, + target_snapshot + ); + let newest_id = snapshot_list + .iter() + .map(|m| m.id) + .max() + .expect("should exist"); + assert!( + newest_id >= target_id, + "newest_id={}, target_id={}", + newest_id, + target_id + ); + // Always use newest snapshot's `default_cf` during restoring, in order not to corrupt shared + // data of snapshots. Otherwise, for example if we restore a older SST id generator, an + // existent SST in object store is at risk of being overwrote by the restored cluster. + // All these risky metadata are in `default_cf`, e.g. id generator, epoch. They must satisfy: + // - Value is monotonically non-decreasing. + // - Value is memcomparable. + // - Keys of newest_snapshot is a superset of that of target_snapshot. + if newest_id > target_id { + let newest_snapshot: MetaSnapshotV1 = backup_store.get(newest_id).await?; + for (k, v) in &target_snapshot.metadata.default_cf { + let newest_v = newest_snapshot + .metadata + .default_cf + .get(k) + .unwrap_or_else(|| panic!("violate superset requirement. key {:x?}", k)); + assert!(newest_v >= v, "violate monotonicity requirement"); + } + target_snapshot.metadata.default_cf = newest_snapshot.metadata.default_cf; + tracing::info!( + "snapshot {} after rewrite by snapshot {}:\n{}", + target_id, + newest_id, + target_snapshot, + ); + } + Ok(target_snapshot) + } +} + +pub struct WriterModelV1ToMetaStoreV1 { + meta_store: MetaStoreBackendImpl, +} + +impl WriterModelV1ToMetaStoreV1 { + pub fn new(meta_store: MetaStoreBackendImpl) -> Self { + Self { meta_store } + } +} + +#[async_trait::async_trait] +impl Writer for WriterModelV1ToMetaStoreV1 { + async fn write(&self, target_snapshot: MetaSnapshot) -> BackupResult<()> { + dispatch_meta_store!(&self.meta_store, store, { + restore_metadata(store.clone(), target_snapshot.clone()).await?; + }); + Ok(()) + } +} + +async fn restore_metadata_model( + meta_store: &S, + metadata: &[T], +) -> BackupResult<()> { + if !T::list(meta_store).await?.is_empty() { + return Err(BackupError::NonemptyMetaStorage); + } + for d in metadata { + d.insert(meta_store).await?; + } + Ok(()) +} + +async fn restore_system_param_model( + meta_store: &S, + metadata: &[T], +) -> BackupResult<()> { + if T::get(meta_store).await?.is_some() { + return Err(BackupError::NonemptyMetaStorage); + } + for d in metadata { + d.insert(meta_store).await?; + } + Ok(()) +} + +async fn restore_cluster_id( + meta_store: &S, + cluster_id: ClusterId, +) -> BackupResult<()> { + if ClusterId::from_meta_store(meta_store).await?.is_some() { + return Err(BackupError::NonemptyMetaStorage); + } + cluster_id.put_at_meta_store(meta_store).await?; + Ok(()) +} + +async fn restore_default_cf( + meta_store: &S, + snapshot: &MetaSnapshotV1, +) -> BackupResult<()> { + if !meta_store.list_cf(DEFAULT_COLUMN_FAMILY).await?.is_empty() { + return Err(BackupError::NonemptyMetaStorage); + } + for (k, v) in &snapshot.metadata.default_cf { + meta_store + .put_cf(DEFAULT_COLUMN_FAMILY, k.clone(), v.clone()) + .await?; + } + Ok(()) +} + +async fn restore_metadata( + meta_store: S, + snapshot: MetaSnapshotV1, +) -> BackupResult<()> { + restore_default_cf(&meta_store, &snapshot).await?; + restore_metadata_model(&meta_store, &[snapshot.metadata.version_stats]).await?; + restore_metadata_model( + &meta_store, + &snapshot + .metadata + .compaction_groups + .into_iter() + .map(CompactionGroup::from_protobuf) + .collect_vec(), + ) + .await?; + restore_metadata_model( + &meta_store, + &snapshot + .metadata + .table_fragments + .into_iter() + .map(TableFragments::from_protobuf) + .collect_vec(), + ) + .await?; + restore_metadata_model(&meta_store, &snapshot.metadata.user_info).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.database).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.schema).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.table).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.index).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.sink).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.view).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.source).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.function).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.connection).await?; + restore_system_param_model(&meta_store, &[snapshot.metadata.system_param]).await?; + restore_cluster_id(&meta_store, snapshot.metadata.cluster_id.into()).await?; + Ok(()) +} diff --git a/src/meta/src/backup_restore/restore_impl/v2.rs b/src/meta/src/backup_restore/restore_impl/v2.rs new file mode 100644 index 0000000000000..086ad70f38437 --- /dev/null +++ b/src/meta/src/backup_restore/restore_impl/v2.rs @@ -0,0 +1,96 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::iter; + +use risingwave_backup::error::{BackupError, BackupResult}; +use risingwave_backup::meta_snapshot::MetaSnapshot; +use risingwave_backup::meta_snapshot_v2::{MetaSnapshotV2, MetadataV2}; +use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef}; +use risingwave_backup::MetaSnapshotId; + +use crate::backup_restore::restore_impl::{Loader, Writer}; +use crate::controller::SqlMetaStore; + +pub struct LoaderV2 { + backup_store: MetaSnapshotStorageRef, +} + +impl LoaderV2 { + pub fn new(backup_store: MetaSnapshotStorageRef) -> Self { + Self { backup_store } + } +} + +#[async_trait::async_trait] +impl Loader for LoaderV2 { + async fn load(&self, target_id: MetaSnapshotId) -> BackupResult> { + let target_snapshot: MetaSnapshotV2 = self.backup_store.get(target_id).await?; + tracing::info!( + "snapshot {} before rewrite:\n{}", + target_id, + target_snapshot + ); + todo!("validate and rewrite seq") + } +} + +pub struct WriterModelV2ToMetaStoreV2 { + meta_store: SqlMetaStore, +} + +impl WriterModelV2ToMetaStoreV2 { + pub fn new(meta_store: SqlMetaStore) -> Self { + Self { meta_store } + } +} + +#[async_trait::async_trait] +impl Writer for WriterModelV2ToMetaStoreV2 { + async fn write(&self, target_snapshot: MetaSnapshot) -> BackupResult<()> { + let metadata = target_snapshot.metadata; + let db = &self.meta_store.conn; + insert_models(iter::once(metadata.version_stats), db).await?; + insert_models(metadata.compaction_configs, db).await?; + todo!("write other metadata") + } +} + +async fn insert_models( + models: impl IntoIterator, + db: &impl sea_orm::ConnectionTrait, +) -> BackupResult<()> +where + S: sea_orm::ModelTrait + Sync + Send + Sized + sea_orm::IntoActiveModel, + A: sea_orm::ActiveModelTrait + sea_orm::ActiveModelBehavior + Send + Sync + From, + <::Entity as sea_orm::EntityTrait>::Model: + sea_orm::IntoActiveModel, +{ + use sea_orm::EntityTrait; + if ::Entity::find() + .one(db) + .await + .map_err(|e| BackupError::MetaStorage(e.into()))? + .is_some() + { + return Err(BackupError::NonemptyMetaStorage); + } + for m in models { + m.into_active_model() + .insert(db) + .await + .map_err(|e| BackupError::MetaStorage(e.into()))?; + } + Ok(()) +} diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index c17650fb2aead..83cd1095beb25 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -23,6 +23,7 @@ use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; use crate::backup_restore::RestoreOpts; +use crate::controller::SqlMetaStore; use crate::storage::{EtcdMetaStore, MemStore, WrappedEtcdClient as EtcdClient}; use crate::MetaStoreBackend; @@ -30,6 +31,8 @@ use crate::MetaStoreBackend; pub enum MetaStoreBackendImpl { Etcd(EtcdMetaStore), Mem(MemStore), + #[expect(dead_code, reason = "WIP")] + Sql(SqlMetaStore), } #[macro_export] @@ -38,6 +41,7 @@ macro_rules! dispatch_meta_store { match $impl { MetaStoreBackendImpl::Etcd($store) => $body, MetaStoreBackendImpl::Mem($store) => $body, + MetaStoreBackendImpl::Sql(_) => panic!("not supported"), } }}; } diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index af3c989995b9a..a69c4a3479c51 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -56,10 +56,12 @@ pub struct MetaSnapshotMetadata { pub ssts: Vec, pub max_committed_epoch: u64, pub safe_epoch: u64, + #[serde(default)] + pub format_version: u32, } impl MetaSnapshotMetadata { - pub fn new(id: MetaSnapshotId, v: &HummockVersion) -> Self { + pub fn new(id: MetaSnapshotId, v: &HummockVersion, format_version: u32) -> Self { Self { id, hummock_version_id: v.id, @@ -68,6 +70,7 @@ impl MetaSnapshotMetadata { .collect_vec(), max_committed_epoch: v.max_committed_epoch, safe_epoch: v.safe_epoch, + format_version, } } } @@ -104,6 +107,7 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata { hummock_version_id: m.hummock_version_id, max_committed_epoch: m.max_committed_epoch, safe_epoch: m.safe_epoch, + format_version: Some(m.format_version), } } } diff --git a/src/storage/backup/src/storage.rs b/src/storage/backup/src/storage.rs index 5c325809f2650..3db89abfacf33 100644 --- a/src/storage/backup/src/storage.rs +++ b/src/storage/backup/src/storage.rs @@ -124,6 +124,7 @@ impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage { .push(MetaSnapshotMetadata::new( snapshot.id, snapshot.metadata.hummock_version_ref(), + snapshot.format_version, )); self.update_manifest(new_manifest).await?; Ok(()) diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs index b9fa4a6258bbe..acca6454c2fec 100644 --- a/src/storage/src/hummock/backup_reader.rs +++ b/src/storage/src/hummock/backup_reader.rs @@ -192,6 +192,7 @@ impl BackupReader { } else { let this = self.clone(); let f = async move { + // TODO: change to v2 let snapshot: meta_snapshot_v1::MetaSnapshotV1 = current_store.0.get(snapshot_id).await.map_err(|e| { format!("failed to get meta snapshot {}. {}", snapshot_id, e)