Skip to content

Commit

Permalink
feat(meta): support sql meta store backup and restoration (#16384)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Apr 26, 2024
1 parent 6329101 commit 9e6528a
Show file tree
Hide file tree
Showing 41 changed files with 721 additions and 173 deletions.
25 changes: 14 additions & 11 deletions ci/scripts/run-meta-backup-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,17 @@ cluster_stop() {

download_and_prepare_rw "$profile" common

echo "--- e2e, ci-meta-backup-test"
test_root="src/storage/backup/integration_tests"
BACKUP_TEST_MCLI=".risingwave/bin/mcli" \
BACKUP_TEST_MCLI_CONFIG=".risingwave/config/mcli" \
BACKUP_TEST_RW_ALL_IN_ONE="target/debug/risingwave" \
RW_HUMMOCK_URL="hummock+minio://hummockadmin:[email protected]:9301/hummock001" \
RW_META_ADDR="http://127.0.0.1:5690" \
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
bash "${test_root}/run_all.sh"
echo "--- Kill cluster"
risedev kill
for meta_store_type in "etcd" "sql"; do
echo "--- e2e, ci-meta-backup-test-${meta_store_type}"
test_root="src/storage/backup/integration_tests"
BACKUP_TEST_MCLI=".risingwave/bin/mcli" \
BACKUP_TEST_MCLI_CONFIG=".risingwave/config/mcli" \
BACKUP_TEST_RW_ALL_IN_ONE="target/debug/risingwave" \
RW_HUMMOCK_URL="hummock+minio://hummockadmin:[email protected]:9301/hummock001" \
RW_META_ADDR="http://127.0.0.1:5690" \
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
META_STORE_TYPE="${meta_store_type}" \
bash "${test_root}/run_all.sh"
echo "--- Kill cluster"
risedev kill
done
20 changes: 18 additions & 2 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ profile:
- use: kafka
persist-data: true

ci-meta-backup-test:
ci-meta-backup-test-etcd:
config-path: src/config/ci-meta-backup-test.toml
steps:
- use: etcd
Expand All @@ -931,12 +931,28 @@ profile:
- use: frontend
- use: compactor

ci-meta-backup-test-restore:
ci-meta-backup-test-restore-etcd:
config-path: src/config/ci-meta-backup-test.toml
steps:
- use: etcd
- use: minio

ci-meta-backup-test-sql:
config-path: src/config/ci-meta-backup-test.toml
steps:
- use: sqlite
- use: minio
- use: meta-node
- use: compute-node
- use: frontend
- use: compactor

ci-meta-backup-test-restore-sql:
config-path: src/config/ci-meta-backup-test.toml
steps:
- use: sqlite
- use: minio

ci-meta-etcd-for-migration:
config-path: src/config/ci.toml
steps:
Expand Down
5 changes: 3 additions & 2 deletions src/meta/model_v2/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{
ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, VnodeBitmap, WorkerId,
};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum ActorStatus {
#[sea_orm(string_value = "INACTIVE")]
Expand Down Expand Up @@ -47,7 +48,7 @@ impl From<ActorStatus> for PbActorState {
}
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "actor")]
pub struct Model {
#[sea_orm(primary_key)]
Expand Down
5 changes: 3 additions & 2 deletions src/meta/model_v2/src/actor_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType};
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{ActorId, ActorMapping, FragmentId, I32Array};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Deserialize, Serialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum DispatcherType {
#[sea_orm(string_value = "HASH")]
Expand Down Expand Up @@ -81,7 +82,7 @@ impl From<Model> for PbDispatcher {
}
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Deserialize, Serialize)]
#[sea_orm(table_name = "actor_dispatcher")]
pub struct Model {
#[sea_orm(primary_key)]
Expand Down
5 changes: 3 additions & 2 deletions src/meta/model_v2/src/catalog_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum VersionCategory {
#[sea_orm(string_value = "NOTIFICATION")]
Expand All @@ -23,7 +24,7 @@ pub enum VersionCategory {
TableRevision,
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "catalog_version")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
3 changes: 2 additions & 1 deletion src/meta/model_v2/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "cluster")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
1 change: 0 additions & 1 deletion src/meta/model_v2/src/compaction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use risingwave_pb::hummock::{CompactTask as PbCompactTask, CompactTaskAssignment};
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{CompactionTaskId, WorkerId};

Expand Down
3 changes: 2 additions & 1 deletion src/meta/model_v2/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use risingwave_pb::catalog::connection::PbInfo;
use risingwave_pb::catalog::PbConnection;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::{ConnectionId, PrivateLinkService};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "connection")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
3 changes: 2 additions & 1 deletion src/meta/model_v2/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
use risingwave_pb::catalog::PbDatabase;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::DatabaseId;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "database")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
5 changes: 3 additions & 2 deletions src/meta/model_v2/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "fragment")]
pub struct Model {
#[sea_orm(primary_key)]
Expand All @@ -31,7 +32,7 @@ pub struct Model {
pub upstream_fragment_id: I32Array,
}

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum DistributionType {
#[sea_orm(string_value = "SINGLE")]
Expand Down
5 changes: 3 additions & 2 deletions src/meta/model_v2/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use risingwave_pb::catalog::function::Kind;
use risingwave_pb::catalog::PbFunction;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::{DataType, DataTypeArray, FunctionId};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum FunctionKind {
#[sea_orm(string_value = "Scalar")]
Expand All @@ -30,7 +31,7 @@ pub enum FunctionKind {
Aggregate,
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "function")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
4 changes: 2 additions & 2 deletions src/meta/model_v2/src/hummock_sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// limitations under the License.

use sea_orm::entity::prelude::*;

use serde::{Deserialize, Serialize};
pub const COMPACTION_TASK_ID: &str = "compaction_task";
pub const COMPACTION_GROUP_ID: &str = "compaction_group";
pub const SSTABLE_OBJECT_ID: &str = "sstable_object";
pub const META_BACKUP_ID: &str = "meta_backup";

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default, Serialize, Deserialize)]
#[sea_orm(table_name = "hummock_sequence")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
1 change: 0 additions & 1 deletion src/meta/model_v2/src/hummock_version_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use risingwave_pb::hummock::PbHummockVersionDelta;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{Epoch, HummockVersionId};

Expand Down
3 changes: 2 additions & 1 deletion src/meta/model_v2/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
use risingwave_pb::catalog::PbIndex;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::{ExprNodeArray, IndexId, TableId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "index")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
9 changes: 5 additions & 4 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod index;
pub mod object;
pub mod object_dependency;
pub mod schema;
pub mod serde_seaql_migration;
pub mod session_parameter;
pub mod sink;
pub mod source;
Expand Down Expand Up @@ -83,7 +84,7 @@ pub type HummockSstableObjectId = i64;
pub type FragmentId = i32;
pub type ActorId = i32;

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum JobStatus {
#[sea_orm(string_value = "INITIAL")]
Expand Down Expand Up @@ -115,7 +116,7 @@ impl From<JobStatus> for PbStreamJobState {
}
}

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum CreateType {
#[sea_orm(string_value = "BACKGROUND")]
Expand Down Expand Up @@ -170,7 +171,7 @@ macro_rules! derive_from_json_struct {
/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
macro_rules! derive_from_blob {
($struct_name:ident, $field_type:ty) => {
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, DeriveValueType)]
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
pub struct $struct_name(#[sea_orm] Vec<u8>);

impl $struct_name {
Expand Down Expand Up @@ -212,7 +213,7 @@ macro_rules! derive_from_blob {
/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
macro_rules! derive_array_from_blob {
($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
#[derive(Clone, PartialEq, Eq, DeriveValueType)]
#[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
pub struct $struct_name(#[sea_orm] Vec<u8>);

#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
5 changes: 3 additions & 2 deletions src/meta/model_v2/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{DatabaseId, ObjectId, SchemaId, UserId};

#[derive(Clone, Debug, PartialEq, Eq, Copy, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Eq, Copy, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum ObjectType {
#[sea_orm(string_value = "DATABASE")]
Expand Down Expand Up @@ -58,7 +59,7 @@ impl ObjectType {
}
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "object")]
pub struct Model {
#[sea_orm(primary_key)]
Expand Down
3 changes: 2 additions & 1 deletion src/meta/model_v2/src/object_dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{ObjectId, UserId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "object_dependency")]
pub struct Model {
#[sea_orm(primary_key)]
Expand Down
3 changes: 2 additions & 1 deletion src/meta/model_v2/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
use risingwave_pb::catalog::PbSchema;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::SchemaId;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "schema")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
31 changes: 31 additions & 0 deletions src/meta/model_v2/src/serde_seaql_migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// It duplicates the one found in crate sea-orm-migration, but derives serde.
/// It's only used by metadata backup/restore.
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
// One should override the name of migration table via `MigratorTrait::migration_table_name` method
#[sea_orm(table_name = "seaql_migrations")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub version: String,
pub applied_at: i64,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/session_parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use sea_orm::entity::prelude::*;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, serde::Deserialize, serde::Serialize)]
#[sea_orm(table_name = "session_parameter")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
Expand Down
Loading

0 comments on commit 9e6528a

Please sign in to comment.