Skip to content

Commit

Permalink
refactor(meta): adapt metadata backup to metadata model V2
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Oct 25, 2023
1 parent abafae0 commit 1acf3f6
Show file tree
Hide file tree
Showing 27 changed files with 800 additions and 367 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 11 additions & 10 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Instant;

use arc_swap::ArcSwap;
use risingwave_backup::error::BackupError;
use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest};
use risingwave_common::bail;
use risingwave_hummock_sdk::HummockSstableObjectId;
Expand All @@ -28,7 +28,7 @@ use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::task::JoinHandle;

use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder;
use crate::backup_restore::meta_snapshot_builder;
use crate::backup_restore::metrics::BackupManagerMetrics;
use crate::hummock::{HummockManagerRef, HummockVersionSafePoint};
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv};
Expand Down Expand Up @@ -66,7 +66,7 @@ type StoreConfig = (String, String);
pub struct BackupManager {
env: MetaSrvEnv,
hummock_manager: HummockManagerRef,
backup_store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>,
backup_store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>,
/// Tracks the running backup job. Concurrent jobs is not supported.
running_job_handle: tokio::sync::Mutex<Option<BackupJobHandle>>,
metrics: BackupManagerMetrics,
Expand Down Expand Up @@ -143,7 +143,7 @@ impl BackupManager {
env: MetaSrvEnv,
hummock_manager: HummockManagerRef,
meta_metrics: Arc<MetaMetrics>,
backup_store: (BoxedMetaSnapshotStorage, StoreConfig),
backup_store: (ObjectStoreMetaSnapshotStorage, StoreConfig),
) -> Self {
Self {
env,
Expand All @@ -169,13 +169,13 @@ impl BackupManager {
}

#[cfg(test)]
pub fn for_test(env: MetaSrvEnv, hummock_manager: HummockManagerRef) -> Self {
pub async fn for_test(env: MetaSrvEnv, hummock_manager: HummockManagerRef) -> Self {
Self::with_store(
env,
hummock_manager,
Arc::new(MetaMetrics::default()),
(
Box::<risingwave_backup::storage::DummyMetaSnapshotStorage>::default(),
risingwave_backup::storage::unused().await,
StoreConfig::default(),
),
)
Expand Down Expand Up @@ -334,8 +334,9 @@ impl BackupWorker {
fn start(self, job_id: u64) -> JoinHandle<()> {
let backup_manager_clone = self.backup_manager.clone();
let job = async move {
let mut snapshot_builder =
MetaSnapshotBuilder::new(backup_manager_clone.env.meta_store_ref());
let mut snapshot_builder = meta_snapshot_builder::MetaSnapshotV1Builder::new(
backup_manager_clone.env.meta_store_ref(),
);
// Reuse job id as snapshot id.
let hummock_manager = backup_manager_clone.hummock_manager.clone();
snapshot_builder
Expand Down Expand Up @@ -364,8 +365,8 @@ impl BackupWorker {
async fn create_snapshot_store(
config: &StoreConfig,
metric: Arc<ObjectStoreMetrics>,
) -> MetaResult<BoxedMetaSnapshotStorage> {
) -> MetaResult<ObjectStoreMetaSnapshotStorage> {
let object_store = Arc::new(parse_remote_object_store(&config.0, metric, "Meta Backup").await);
let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?;
Ok(Box::new(store))
Ok(store)
}
19 changes: 11 additions & 8 deletions src/meta/src/backup_restore/meta_snapshot_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::future::Future;

use anyhow::anyhow;
use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::meta_snapshot::{ClusterMetadata, MetaSnapshot};
use risingwave_backup::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
use risingwave_backup::MetaSnapshotId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
use risingwave_pb::catalog::{
Expand All @@ -33,15 +33,15 @@ use crate::storage::{MetaStore, Snapshot, DEFAULT_COLUMN_FAMILY};

const VERSION: u32 = 1;

pub struct MetaSnapshotBuilder<S> {
snapshot: MetaSnapshot,
pub struct MetaSnapshotV1Builder<S> {
snapshot: MetaSnapshotV1,
meta_store: S,
}

impl<S: MetaStore> MetaSnapshotBuilder<S> {
impl<S: MetaStore> MetaSnapshotV1Builder<S> {
pub fn new(meta_store: S) -> Self {
Self {
snapshot: MetaSnapshot::default(),
snapshot: MetaSnapshotV1::default(),
meta_store,
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ impl<S: MetaStore> MetaSnapshotBuilder<S> {
Ok(())
}

pub fn finish(self) -> BackupResult<MetaSnapshot> {
pub fn finish(self) -> BackupResult<MetaSnapshotV1> {
// Any sanity check goes here.
Ok(self.snapshot)
}
Expand All @@ -168,16 +168,19 @@ mod tests {
use assert_matches::assert_matches;
use itertools::Itertools;
use risingwave_backup::error::BackupError;
use risingwave_backup::meta_snapshot::MetaSnapshot;
use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1;
use risingwave_common::error::ToErrorStr;
use risingwave_common::system_param::system_params_for_test;
use risingwave_pb::hummock::{HummockVersion, HummockVersionStats};

use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder;
use crate::backup_restore::meta_snapshot_builder;
use crate::manager::model::SystemParamsModel;
use crate::model::{ClusterId, MetadataModel};
use crate::storage::{MemStore, MetaStore, DEFAULT_COLUMN_FAMILY};

type MetaSnapshot = MetaSnapshotV1;
type MetaSnapshotBuilder<S> = meta_snapshot_builder::MetaSnapshotV1Builder<S>;

#[tokio::test]
async fn test_snapshot_builder() {
let meta_store = MemStore::new();
Expand Down
66 changes: 66 additions & 0 deletions src/meta/src/backup_restore/meta_snapshot_builder_v2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::future::Future;

use risingwave_backup::error::BackupResult;
use risingwave_backup::meta_snapshot_v2::MetaSnapshotV2;
use risingwave_backup::MetaSnapshotId;
use risingwave_pb::hummock::HummockVersion;

use crate::controller::SqlMetaStore;

const VERSION: u32 = 2;

pub struct MetaSnapshotV2Builder {
snapshot: MetaSnapshotV2,
meta_store: SqlMetaStore,
}

impl MetaSnapshotV2Builder {
pub fn new(meta_store: SqlMetaStore) -> Self {
Self {
snapshot: MetaSnapshotV2::default(),
meta_store,
}
}

pub async fn build<D: Future<Output = HummockVersion>>(
&mut self,
id: MetaSnapshotId,
hummock_version_builder: D,
) -> BackupResult<()> {
self.snapshot.format_version = VERSION;
self.snapshot.id = id;
// Get `hummock_version` before `meta_store_snapshot`.
// We have ensure the required delta logs for replay is available, see
// `HummockManager::delete_version_deltas`.
let hummock_version = hummock_version_builder.await;
todo!("take snapshot");
// hummock_version and version_stats is guaranteed to exist in a initialized cluster.
let hummock_version = {
let mut redo_state = hummock_version;
todo!("apply deltas");
redo_state
};
todo!("other");
Ok(())
}

pub fn finish(self) -> BackupResult<MetaSnapshotV2> {
// Any sanity check goes here.
Ok(self.snapshot)
}
}
1 change: 1 addition & 0 deletions src/meta/src/backup_restore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod backup_manager;
pub use backup_manager::*;
mod error;
mod meta_snapshot_builder;
mod meta_snapshot_builder_v2;
mod metrics;
mod restore;
mod utils;
Expand Down
20 changes: 13 additions & 7 deletions src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_backup::error::{BackupError, BackupResult};
use risingwave_backup::meta_snapshot::MetaSnapshot;
use risingwave_backup::storage::MetaSnapshotStorageRef;
use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1;
use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
use risingwave_common::config::MetaBackend;
use risingwave_hummock_sdk::version_checkpoint_path;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn restore_cluster_id<S: MetaStore>(

async fn restore_default_cf<S: MetaStore>(
meta_store: &S,
snapshot: &MetaSnapshot,
snapshot: &MetaSnapshotV1,
) -> BackupResult<()> {
if !meta_store.list_cf(DEFAULT_COLUMN_FAMILY).await?.is_empty() {
return Err(BackupError::NonemptyMetaStorage);
Expand All @@ -150,7 +150,10 @@ async fn restore_default_cf<S: MetaStore>(
Ok(())
}

async fn restore_metadata<S: MetaStore>(meta_store: S, snapshot: MetaSnapshot) -> BackupResult<()> {
async fn restore_metadata<S: MetaStore>(
meta_store: S,
snapshot: MetaSnapshotV1,
) -> BackupResult<()> {
restore_default_cf(&meta_store, &snapshot).await?;
restore_metadata_model(&meta_store, &[snapshot.metadata.version_stats]).await?;
restore_metadata_model(
Expand Down Expand Up @@ -216,7 +219,7 @@ async fn restore_impl(
target_id
)));
}
let mut target_snapshot = backup_store.get(target_id).await?;
let mut target_snapshot: MetaSnapshotV1 = backup_store.get(target_id).await?;
tracing::info!(
"snapshot {} before rewrite:\n{}",
target_id,
Expand All @@ -236,7 +239,7 @@ async fn restore_impl(
// - Value is memcomparable.
// - Keys of newest_snapshot is a superset of that of target_snapshot.
if newest_id > target_id {
let newest_snapshot = backup_store.get(newest_id).await?;
let newest_snapshot: MetaSnapshotV1 = backup_store.get(newest_id).await?;
for (k, v) in &target_snapshot.metadata.default_cf {
let newest_v = newest_snapshot
.metadata
Expand Down Expand Up @@ -287,7 +290,8 @@ mod tests {
use std::collections::HashMap;

use itertools::Itertools;
use risingwave_backup::meta_snapshot::{ClusterMetadata, MetaSnapshot};
use risingwave_backup::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
use risingwave_backup::storage::MetaSnapshotStorage;
use risingwave_common::config::{MetaBackend, SystemConfig};
use risingwave_pb::hummock::{HummockVersion, HummockVersionStats};
use risingwave_pb::meta::SystemParams;
Expand All @@ -300,6 +304,8 @@ mod tests {
use crate::model::MetadataModel;
use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY};

type MetaSnapshot = MetaSnapshotV1;

fn get_restore_opts() -> RestoreOpts {
RestoreOpts {
meta_snapshot_id: 1,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType};
mod picker;
pub mod selector;

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand Down Expand Up @@ -145,7 +145,7 @@ impl CompactStatus {
compression_algorithm,
target_file_size: ret.target_file_size,
compaction_filter_mask: 0,
table_options: HashMap::default(),
table_options: BTreeMap::default(),
current_epoch_time: 0,
target_sub_level_id: ret.input.target_sub_level_id,
task_type: ret.compaction_task_type as i32,
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,8 @@ mod tests {
let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await;
let context_id = worker_node.id;
let compactor_manager = hummock_manager.compactor_manager_ref_for_test();
let backup_manager = Arc::new(BackupManager::for_test(
env.clone(),
hummock_manager.clone(),
));
let backup_manager =
Arc::new(BackupManager::for_test(env.clone(), hummock_manager.clone()).await);
let vacuum = Arc::new(VacuumManager::new(
env,
hummock_manager.clone(),
Expand Down
12 changes: 9 additions & 3 deletions src/meta/src/model_v2/compaction_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::CompactionConfig as PbCompactionConfig;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "compaction_config")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub compaction_group_id: i64,
#[sea_orm(column_type = "JsonBinary", nullable)]
pub config: Option<Json>,
pub compaction_group_id: CompactionGroupId,
pub config: CompactionConfig,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Clone, Debug, PartialEq, Eq, FromJsonQueryResult, Serialize, Deserialize, Default)]
pub struct CompactionConfig(pub PbCompactionConfig);
13 changes: 10 additions & 3 deletions src/meta/src/model_v2/compaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::StreamExt;
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::LevelHandler as PbLevelHandler;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "compaction_status")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub compaction_group_id: i64,
#[sea_orm(column_type = "JsonBinary", nullable)]
pub status: Option<Json>,
pub compaction_group_id: CompactionGroupId,
pub status: LevelHandlers,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Clone, Debug, PartialEq, Eq, FromJsonQueryResult, Serialize, Deserialize, Default)]
pub struct LevelHandlers(pub Vec<PbLevelHandler>);
Loading

0 comments on commit 1acf3f6

Please sign in to comment.