Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): adapt metadata restore to metadata model V2 #13197

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message MetaSnapshotMetadata {
uint64 hummock_version_id = 2;
uint64 max_committed_epoch = 3;
uint64 safe_epoch = 4;
optional uint32 format_version = 5;
}

service BackupService {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/backup_restore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod meta_snapshot_builder;
mod meta_snapshot_builder_v2;
mod metrics;
mod restore;
mod restore_impl;
mod utils;

pub use restore::*;
192 changes: 52 additions & 140 deletions src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,20 @@

use std::sync::Arc;

use itertools::Itertools;
use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1;
use risingwave_backup::meta_snapshot::Metadata;
use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
use risingwave_backup::MetaSnapshotId;
use risingwave_common::config::MetaBackend;
use risingwave_hummock_sdk::version_checkpoint_path;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::hummock::{HummockVersion, HummockVersionCheckpoint};

use crate::backup_restore::restore_impl::v1::{LoaderV1, WriterModelV1ToMetaStoreV1};
use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
use crate::backup_restore::restore_impl::{Loader, Writer};
use crate::backup_restore::utils::{get_backup_store, get_meta_store, MetaStoreBackendImpl};
use crate::dispatch_meta_store;
use crate::hummock::model::CompactionGroup;
use crate::manager::model::SystemParamsModel;
use crate::model::{ClusterId, MetadataModel, TableFragments};
use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY};

/// Command-line arguments for restore.
#[derive(clap::Args, Debug, Clone)]
Expand Down Expand Up @@ -98,99 +96,6 @@ async fn restore_hummock_version(
Ok(())
}

async fn restore_metadata_model<S: MetaStore, T: MetadataModel + Send + Sync>(
meta_store: &S,
metadata: &[T],
) -> BackupResult<()> {
if !T::list(meta_store).await?.is_empty() {
return Err(BackupError::NonemptyMetaStorage);
}
for d in metadata {
d.insert(meta_store).await?;
}
Ok(())
}

async fn restore_system_param_model<S: MetaStore, T: SystemParamsModel + Send + Sync>(
meta_store: &S,
metadata: &[T],
) -> BackupResult<()> {
if T::get(meta_store).await?.is_some() {
return Err(BackupError::NonemptyMetaStorage);
}
for d in metadata {
d.insert(meta_store).await?;
}
Ok(())
}

async fn restore_cluster_id<S: MetaStore>(
meta_store: &S,
cluster_id: ClusterId,
) -> BackupResult<()> {
if ClusterId::from_meta_store(meta_store).await?.is_some() {
return Err(BackupError::NonemptyMetaStorage);
}
cluster_id.put_at_meta_store(meta_store).await?;
Ok(())
}

async fn restore_default_cf<S: MetaStore>(
meta_store: &S,
snapshot: &MetaSnapshotV1,
) -> BackupResult<()> {
if !meta_store.list_cf(DEFAULT_COLUMN_FAMILY).await?.is_empty() {
return Err(BackupError::NonemptyMetaStorage);
}
for (k, v) in &snapshot.metadata.default_cf {
meta_store
.put_cf(DEFAULT_COLUMN_FAMILY, k.clone(), v.clone())
.await?;
}
Ok(())
}

async fn restore_metadata<S: MetaStore>(
meta_store: S,
snapshot: MetaSnapshotV1,
) -> BackupResult<()> {
restore_default_cf(&meta_store, &snapshot).await?;
restore_metadata_model(&meta_store, &[snapshot.metadata.version_stats]).await?;
restore_metadata_model(
&meta_store,
&snapshot
.metadata
.compaction_groups
.into_iter()
.map(CompactionGroup::from_protobuf)
.collect_vec(),
)
.await?;
restore_metadata_model(
&meta_store,
&snapshot
.metadata
.table_fragments
.into_iter()
.map(TableFragments::from_protobuf)
.collect_vec(),
)
.await?;
restore_metadata_model(&meta_store, &snapshot.metadata.user_info).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.database).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.schema).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.table).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.index).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.sink).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.view).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.source).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.function).await?;
restore_metadata_model(&meta_store, &snapshot.metadata.connection).await?;
restore_system_param_model(&meta_store, &[snapshot.metadata.system_param]).await?;
restore_cluster_id(&meta_store, snapshot.metadata.cluster_id.into()).await?;
Ok(())
}

/// Restores a meta store.
/// Uses `meta_store` and `backup_store` if provided.
/// Otherwise creates them based on `opts`.
Expand All @@ -212,62 +117,69 @@ async fn restore_impl(
Some(b) => b,
};
let target_id = opts.meta_snapshot_id;
let snapshot_list = backup_store.manifest().snapshot_metadata.clone();
let snapshot_list = &backup_store.manifest().snapshot_metadata;
if !snapshot_list.iter().any(|m| m.id == target_id) {
return Err(BackupError::Other(anyhow::anyhow!(
"snapshot id {} not found",
target_id
)));
}
let mut target_snapshot: MetaSnapshotV1 = backup_store.get(target_id).await?;
tracing::info!(
"snapshot {} before rewrite:\n{}",
target_id,
target_snapshot
);
let newest_id = snapshot_list
.into_iter()
.map(|m| m.id)
.max()
.expect("should exist");
assert!(newest_id >= target_id);
// Always use newest snapshot's `default_cf` during restoring, in order not to corrupt shared
// data of snapshots. Otherwise, for example if we restore a older SST id generator, an
// existent SST in object store is at risk of being overwrote by the restored cluster.
// All these risky metadata are in `default_cf`, e.g. id generator, epoch. They must satisfy:
// - Value is monotonically non-decreasing.
// - Value is memcomparable.
// - Keys of newest_snapshot is a superset of that of target_snapshot.
if newest_id > target_id {
let newest_snapshot: MetaSnapshotV1 = backup_store.get(newest_id).await?;
for (k, v) in &target_snapshot.metadata.default_cf {
let newest_v = newest_snapshot
.metadata
.default_cf
.get(k)
.unwrap_or_else(|| panic!("violate superset requirement. key {:x?}", k));
assert!(newest_v >= v, "violate monotonicity requirement");

let format_version = match snapshot_list.iter().find(|m| m.id == target_id) {
None => {
return Err(BackupError::Other(anyhow::anyhow!(
"snapshot id {} not found",
target_id
)));
}
Some(s) => s.format_version,
};
match &meta_store {
MetaStoreBackendImpl::Sql(m) => {
if format_version < 2 {
todo!("write model V1 to meta store V2");
} else {
dispatch(
target_id,
&opts,
LoaderV2::new(backup_store),
WriterModelV2ToMetaStoreV2::new(m.to_owned()),
)
.await?;
}
}
_ => {
assert!(format_version < 2, "format_version {}", format_version);
dispatch(
target_id,
&opts,
LoaderV1::new(backup_store),
WriterModelV1ToMetaStoreV1::new(meta_store),
)
.await?;
}
target_snapshot.metadata.default_cf = newest_snapshot.metadata.default_cf;
tracing::info!(
"snapshot {} after rewrite by snapshot {}:\n{}",
target_id,
newest_id,
target_snapshot,
);
}

Ok(())
}

async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
target_id: MetaSnapshotId,
opts: &RestoreOpts,
loader: L,
writer: W,
) -> BackupResult<()> {
let target_snapshot = loader.load(target_id).await?;
if opts.dry_run {
return Ok(());
}
restore_hummock_version(
&opts.hummock_storage_url,
&opts.hummock_storage_directory,
&target_snapshot.metadata.hummock_version,
target_snapshot.metadata.hummock_version_ref(),
)
.await?;
dispatch_meta_store!(meta_store.clone(), store, {
restore_metadata(store.clone(), target_snapshot.clone()).await?;
});
writer.write(target_snapshot).await?;
Ok(())
}

Expand Down
32 changes: 32 additions & 0 deletions src/meta/src/backup_restore/restore_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_backup::error::BackupResult;
use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata};
use risingwave_backup::MetaSnapshotId;

pub mod v1;
pub mod v2;

/// `Loader` gets, validates and amends `MetaSnapshot`.
#[async_trait::async_trait]
pub trait Loader<S: Metadata> {
async fn load(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>>;
}

/// `Writer` writes `MetaSnapshot` to meta store.
#[async_trait::async_trait]
pub trait Writer<S: Metadata> {
async fn write(&self, s: MetaSnapshot<S>) -> BackupResult<()>;
}
Loading
Loading