Skip to content

Commit

Permalink
refactor(meta): adapt metadata restore to metadata model V2
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Nov 1, 2023
1 parent 675bdca commit df29566
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 141 deletions.
1 change: 1 addition & 0 deletions proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message MetaSnapshotMetadata {
uint64 hummock_version_id = 2;
uint64 max_committed_epoch = 3;
uint64 safe_epoch = 4;
optional uint32 format_version = 5;
}

service BackupService {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/backup_restore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod meta_snapshot_builder;
mod meta_snapshot_builder_v2;
mod metrics;
mod restore;
mod restore_impl;
mod utils;

pub use restore::*;
192 changes: 52 additions & 140 deletions src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,20 @@

use std::sync::Arc;

use itertools::Itertools;
use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1;
use risingwave_backup::meta_snapshot::Metadata;
use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
use risingwave_backup::MetaSnapshotId;
use risingwave_common::config::MetaBackend;
use risingwave_hummock_sdk::version_checkpoint_path;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::hummock::{HummockVersion, HummockVersionCheckpoint};

use crate::backup_restore::restore_impl::v1::{LoaderV1, WriterModelV1ToMetaStoreV1};
use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
use crate::backup_restore::restore_impl::{Loader, Writer};
use crate::backup_restore::utils::{get_backup_store, get_meta_store, MetaStoreBackendImpl};
use crate::dispatch_meta_store;
use crate::hummock::model::CompactionGroup;
use crate::manager::model::SystemParamsModel;
use crate::model::{ClusterId, MetadataModel, TableFragments};
use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY};

/// Command-line arguments for restore.
#[derive(clap::Args, Debug, Clone)]
Expand Down Expand Up @@ -98,99 +96,6 @@ async fn restore_hummock_version(
Ok(())
}

async fn restore_metadata_model<S: MetaStore, T: MetadataModel + Send + Sync>(
meta_store: &S,
metadata: &[T],
) -> BackupResult<()> {
if !T::list(meta_store).await?.is_empty() {
return Err(BackupError::NonemptyMetaStorage);
}
for d in metadata {
d.insert(meta_store).await?;
}
Ok(())
}

async fn restore_system_param_model<S: MetaStore, T: SystemParamsModel + Send + Sync>(
meta_store: &S,
metadata: &[T],
) -> BackupResult<()> {
if T::get(meta_store).await?.is_some() {
return Err(BackupError::NonemptyMetaStorage);
}
for d in metadata {
d.insert(meta_store).await?;
}
Ok(())
}

async fn restore_cluster_id<S: MetaStore>(
meta_store: &S,
cluster_id: ClusterId,
) -> BackupResult<()> {
if ClusterId::from_meta_store(meta_store).await?.is_some() {
return Err(BackupError::NonemptyMetaStorage);
}
cluster_id.put_at_meta_store(meta_store).await?;
Ok(())
}

async fn restore_default_cf<S: MetaStore>(
meta_store: &S,
snapshot: &MetaSnapshotV1,
) -> BackupResult<()> {
if !meta_store.list_cf(DEFAULT_COLUMN_FAMILY).await?.is_empty() {
return Err(BackupError::NonemptyMetaStorage);
}
for (k, v) in &snapshot.metadata.default_cf {
meta_store
.put_cf(DEFAULT_COLUMN_FAMILY, k.clone(), v.clone())
.await?;
}
Ok(())
}

async fn restore_metadata<S: MetaStore>(
meta_store: S,
snapshot: MetaSnapshotV1,
) -> BackupResult<()> {
restore_default_cf(&meta_store, &snapshot).await?;
restore_metadata_model(&meta_store, &[snapshot.metadata.version_stats]).await?;
restore_metadata_model(
&meta_store,
&snapshot
.metadata
.compaction_groups
.into_iter()
.map(CompactionGroup::from_protobuf)
.collect_vec(),
)
.await?;
restore_metadata_model(
&meta_store,
&snapshot
.metadata
.table_fragments
.into_iter()
.map(TableFragments::from_protobuf)
.collect_vec(),
)
.await?;
restore_metadata_model(&meta_store, &snapshot.metadata.user_info).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.database).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.schema).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.table).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.index).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.sink).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.view).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.source).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.function).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.connection).await?;
restore_system_param_model(&meta_store, &[snapshot.metadata.system_param]).await?;
restore_cluster_id(&meta_store, snapshot.metadata.cluster_id.into()).await?;
Ok(())
}

/// Restores a meta store.
/// Uses `meta_store` and `backup_store` if provided.
/// Otherwise creates them based on `opts`.
Expand All @@ -212,62 +117,69 @@ async fn restore_impl(
Some(b) => b,
};
let target_id = opts.meta_snapshot_id;
let snapshot_list = backup_store.manifest().snapshot_metadata.clone();
let snapshot_list = &backup_store.manifest().snapshot_metadata;
if !snapshot_list.iter().any(|m| m.id == target_id) {
return Err(BackupError::Other(anyhow::anyhow!(
"snapshot id {} not found",
target_id
)));
}
let mut target_snapshot: MetaSnapshotV1 = backup_store.get(target_id).await?;
tracing::info!(
"snapshot {} before rewrite:\n{}",
target_id,
target_snapshot
);
let newest_id = snapshot_list
.into_iter()
.map(|m| m.id)
.max()
.expect("should exist");
assert!(newest_id >= target_id);
// Always use newest snapshot's `default_cf` during restoring, in order not to corrupt shared
// data of snapshots. Otherwise, for example if we restore a older SST id generator, an
// existent SST in object store is at risk of being overwrote by the restored cluster.
// All these risky metadata are in `default_cf`, e.g. id generator, epoch. They must satisfy:
// - Value is monotonically non-decreasing.
// - Value is memcomparable.
// - Keys of newest_snapshot is a superset of that of target_snapshot.
if newest_id > target_id {
let newest_snapshot: MetaSnapshotV1 = backup_store.get(newest_id).await?;
for (k, v) in &target_snapshot.metadata.default_cf {
let newest_v = newest_snapshot
.metadata
.default_cf
.get(k)
.unwrap_or_else(|| panic!("violate superset requirement. key {:x?}", k));
assert!(newest_v >= v, "violate monotonicity requirement");

let format_version = match snapshot_list.iter().find(|m| m.id == target_id) {
None => {
return Err(BackupError::Other(anyhow::anyhow!(
"snapshot id {} not found",
target_id
)));
}
Some(s) => s.format_version,
};
match &meta_store {
MetaStoreBackendImpl::Sql(m) => {
if format_version < 2 {
todo!("write model V1 to meta store V2");
} else {
dispatch(
target_id,
&opts,
LoaderV2::new(backup_store),
WriterModelV2ToMetaStoreV2::new(m.to_owned()),
)
.await?;
}
}
_ => {
assert!(format_version < 2, "format_version {}", format_version);
dispatch(
target_id,
&opts,
LoaderV1::new(backup_store),
WriterModelV1ToMetaStoreV1::new(meta_store),
)
.await?;
}
target_snapshot.metadata.default_cf = newest_snapshot.metadata.default_cf;
tracing::info!(
"snapshot {} after rewrite by snapshot {}:\n{}",
target_id,
newest_id,
target_snapshot,
);
}

Ok(())
}

async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
target_id: MetaSnapshotId,
opts: &RestoreOpts,
loader: L,
writer: W,
) -> BackupResult<()> {
let target_snapshot = loader.load(target_id).await?;
if opts.dry_run {
return Ok(());
}
restore_hummock_version(
&opts.hummock_storage_url,
&opts.hummock_storage_directory,
&target_snapshot.metadata.hummock_version,
target_snapshot.metadata.hummock_version_ref(),
)
.await?;
dispatch_meta_store!(meta_store.clone(), store, {
restore_metadata(store.clone(), target_snapshot.clone()).await?;
});
writer.write(target_snapshot).await?;
Ok(())
}

Expand Down
32 changes: 32 additions & 0 deletions src/meta/src/backup_restore/restore_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_backup::error::BackupResult;
use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata};
use risingwave_backup::MetaSnapshotId;

pub mod v1;
pub mod v2;

/// `Loader` gets, validates and amends `MetaSnapshot`.
#[async_trait::async_trait]
pub trait Loader<S: Metadata> {
async fn load(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>>;
}

/// `Writer` writes `MetaSnapshot` to meta store.
#[async_trait::async_trait]
pub trait Writer<S: Metadata> {
async fn write(&self, s: MetaSnapshot<S>) -> BackupResult<()>;
}
Loading

0 comments on commit df29566

Please sign in to comment.