From 9e6528a45c7417a41f8dd245057d51996d10d3a0 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Fri, 26 Apr 2024 15:37:02 +0800 Subject: [PATCH] feat(meta): support sql meta store backup and restoration (#16384) --- ci/scripts/run-meta-backup-test.sh | 25 ++- risedev.yml | 20 +- src/meta/model_v2/src/actor.rs | 5 +- src/meta/model_v2/src/actor_dispatcher.rs | 5 +- src/meta/model_v2/src/catalog_version.rs | 5 +- src/meta/model_v2/src/cluster.rs | 3 +- src/meta/model_v2/src/compaction_task.rs | 1 - src/meta/model_v2/src/connection.rs | 3 +- src/meta/model_v2/src/database.rs | 3 +- src/meta/model_v2/src/fragment.rs | 5 +- src/meta/model_v2/src/function.rs | 5 +- src/meta/model_v2/src/hummock_sequence.rs | 4 +- .../model_v2/src/hummock_version_delta.rs | 1 - src/meta/model_v2/src/index.rs | 3 +- src/meta/model_v2/src/lib.rs | 9 +- src/meta/model_v2/src/object.rs | 5 +- src/meta/model_v2/src/object_dependency.rs | 3 +- src/meta/model_v2/src/schema.rs | 3 +- .../model_v2/src/serde_seaql_migration.rs | 31 +++ src/meta/model_v2/src/session_parameter.rs | 2 +- src/meta/model_v2/src/sink.rs | 5 +- src/meta/model_v2/src/source.rs | 3 +- src/meta/model_v2/src/streaming_job.rs | 3 +- src/meta/model_v2/src/subscription.rs | 3 +- src/meta/model_v2/src/system_parameter.rs | 4 +- src/meta/model_v2/src/table.rs | 9 +- src/meta/model_v2/src/user.rs | 3 +- src/meta/model_v2/src/user_privilege.rs | 5 +- src/meta/model_v2/src/view.rs | 3 +- src/meta/model_v2/src/worker.rs | 7 +- src/meta/model_v2/src/worker_property.rs | 3 +- src/meta/src/backup_restore/backup_manager.rs | 54 +++-- .../meta_snapshot_builder_v2.rs | 163 +++++++++++++-- src/meta/src/backup_restore/restore.rs | 5 +- .../src/backup_restore/restore_impl/v2.rs | 148 ++++++++++++-- src/meta/src/backup_restore/utils.rs | 28 ++- .../backup/integration_tests/Makefile.toml | 1 + .../backup/integration_tests/common.sh | 63 +++++- src/storage/backup/src/error.rs | 8 - src/storage/backup/src/meta_snapshot_v2.rs | 189 +++++++++++++++--- src/storage/src/hummock/backup_reader.rs | 46 +++-- 41 files changed, 721 insertions(+), 173 deletions(-) create mode 100644 src/meta/model_v2/src/serde_seaql_migration.rs diff --git a/ci/scripts/run-meta-backup-test.sh b/ci/scripts/run-meta-backup-test.sh index 120d247cedd4..14d9113ff6f3 100755 --- a/ci/scripts/run-meta-backup-test.sh +++ b/ci/scripts/run-meta-backup-test.sh @@ -67,14 +67,17 @@ cluster_stop() { download_and_prepare_rw "$profile" common -echo "--- e2e, ci-meta-backup-test" -test_root="src/storage/backup/integration_tests" -BACKUP_TEST_MCLI=".risingwave/bin/mcli" \ -BACKUP_TEST_MCLI_CONFIG=".risingwave/config/mcli" \ -BACKUP_TEST_RW_ALL_IN_ONE="target/debug/risingwave" \ -RW_HUMMOCK_URL="hummock+minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001" \ -RW_META_ADDR="http://127.0.0.1:5690" \ -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -bash "${test_root}/run_all.sh" -echo "--- Kill cluster" -risedev kill \ No newline at end of file +for meta_store_type in "etcd" "sql"; do + echo "--- e2e, ci-meta-backup-test-${meta_store_type}" + test_root="src/storage/backup/integration_tests" + BACKUP_TEST_MCLI=".risingwave/bin/mcli" \ + BACKUP_TEST_MCLI_CONFIG=".risingwave/config/mcli" \ + BACKUP_TEST_RW_ALL_IN_ONE="target/debug/risingwave" \ + RW_HUMMOCK_URL="hummock+minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001" \ + RW_META_ADDR="http://127.0.0.1:5690" \ + RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ + META_STORE_TYPE="${meta_store_type}" \ + bash "${test_root}/run_all.sh" + echo "--- Kill cluster" + risedev kill +done diff --git a/risedev.yml b/risedev.yml index bfef64e6b598..6419519ee1ce 100644 --- a/risedev.yml +++ b/risedev.yml @@ -921,7 +921,7 @@ profile: - use: kafka persist-data: true - ci-meta-backup-test: + ci-meta-backup-test-etcd: config-path: src/config/ci-meta-backup-test.toml steps: - use: etcd @@ -931,12 +931,28 @@ profile: - use: frontend - use: compactor - ci-meta-backup-test-restore: + ci-meta-backup-test-restore-etcd: config-path: src/config/ci-meta-backup-test.toml steps: - use: etcd - use: minio + ci-meta-backup-test-sql: + config-path: src/config/ci-meta-backup-test.toml + steps: + - use: sqlite + - use: minio + - use: meta-node + - use: compute-node + - use: frontend + - use: compactor + + ci-meta-backup-test-restore-sql: + config-path: src/config/ci-meta-backup-test.toml + steps: + - use: sqlite + - use: minio + ci-meta-etcd-for-migration: config-path: src/config/ci.toml steps: diff --git a/src/meta/model_v2/src/actor.rs b/src/meta/model_v2/src/actor.rs index 2ebafd23a51a..c75eac7dbc4c 100644 --- a/src/meta/model_v2/src/actor.rs +++ b/src/meta/model_v2/src/actor.rs @@ -14,12 +14,13 @@ use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; use crate::{ ActorId, ActorUpstreamActors, ConnectorSplits, ExprContext, FragmentId, VnodeBitmap, WorkerId, }; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum ActorStatus { #[sea_orm(string_value = "INACTIVE")] @@ -47,7 +48,7 @@ impl From for PbActorState { } } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "actor")] pub struct Model { #[sea_orm(primary_key)] diff --git a/src/meta/model_v2/src/actor_dispatcher.rs b/src/meta/model_v2/src/actor_dispatcher.rs index 1d6a50665b12..81211cc57270 100644 --- a/src/meta/model_v2/src/actor_dispatcher.rs +++ b/src/meta/model_v2/src/actor_dispatcher.rs @@ -14,10 +14,11 @@ use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType}; use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; use crate::{ActorId, ActorMapping, FragmentId, I32Array}; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Deserialize, Serialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum DispatcherType { #[sea_orm(string_value = "HASH")] @@ -81,7 +82,7 @@ impl From for PbDispatcher { } } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Deserialize, Serialize)] #[sea_orm(table_name = "actor_dispatcher")] pub struct Model { #[sea_orm(primary_key)] diff --git a/src/meta/model_v2/src/catalog_version.rs b/src/meta/model_v2/src/catalog_version.rs index 53c6f9109635..cfcd90c12f6a 100644 --- a/src/meta/model_v2/src/catalog_version.rs +++ b/src/meta/model_v2/src/catalog_version.rs @@ -13,8 +13,9 @@ // limitations under the License. use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum VersionCategory { #[sea_orm(string_value = "NOTIFICATION")] @@ -23,7 +24,7 @@ pub enum VersionCategory { TableRevision, } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "catalog_version")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/cluster.rs b/src/meta/model_v2/src/cluster.rs index 767094f1a1e1..39d0bc343998 100644 --- a/src/meta/model_v2/src/cluster.rs +++ b/src/meta/model_v2/src/cluster.rs @@ -13,8 +13,9 @@ // limitations under the License. use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "cluster")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/compaction_task.rs b/src/meta/model_v2/src/compaction_task.rs index 074fe9af450e..302bd15744f2 100644 --- a/src/meta/model_v2/src/compaction_task.rs +++ b/src/meta/model_v2/src/compaction_task.rs @@ -14,7 +14,6 @@ use risingwave_pb::hummock::{CompactTask as PbCompactTask, CompactTaskAssignment}; use sea_orm::entity::prelude::*; -use serde::{Deserialize, Serialize}; use crate::{CompactionTaskId, WorkerId}; diff --git a/src/meta/model_v2/src/connection.rs b/src/meta/model_v2/src/connection.rs index 0e513e7061fd..a6cfa4aefb58 100644 --- a/src/meta/model_v2/src/connection.rs +++ b/src/meta/model_v2/src/connection.rs @@ -16,10 +16,11 @@ use risingwave_pb::catalog::connection::PbInfo; use risingwave_pb::catalog::PbConnection; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::{ConnectionId, PrivateLinkService}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "connection")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/database.rs b/src/meta/model_v2/src/database.rs index f8951f63c968..089a68712969 100644 --- a/src/meta/model_v2/src/database.rs +++ b/src/meta/model_v2/src/database.rs @@ -15,10 +15,11 @@ use risingwave_pb::catalog::PbDatabase; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::DatabaseId; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "database")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model_v2/src/fragment.rs index af1d529a0598..7f6958453859 100644 --- a/src/meta/model_v2/src/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -14,10 +14,11 @@ use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "fragment")] pub struct Model { #[sea_orm(primary_key)] @@ -31,7 +32,7 @@ pub struct Model { pub upstream_fragment_id: I32Array, } -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum DistributionType { #[sea_orm(string_value = "SINGLE")] diff --git a/src/meta/model_v2/src/function.rs b/src/meta/model_v2/src/function.rs index 520df5cd2bf0..5589c1daa023 100644 --- a/src/meta/model_v2/src/function.rs +++ b/src/meta/model_v2/src/function.rs @@ -16,10 +16,11 @@ use risingwave_pb::catalog::function::Kind; use risingwave_pb::catalog::PbFunction; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::{DataType, DataTypeArray, FunctionId}; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum FunctionKind { #[sea_orm(string_value = "Scalar")] @@ -30,7 +31,7 @@ pub enum FunctionKind { Aggregate, } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "function")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/hummock_sequence.rs b/src/meta/model_v2/src/hummock_sequence.rs index 63cfe6592458..58156c33266f 100644 --- a/src/meta/model_v2/src/hummock_sequence.rs +++ b/src/meta/model_v2/src/hummock_sequence.rs @@ -13,13 +13,13 @@ // limitations under the License. use sea_orm::entity::prelude::*; - +use serde::{Deserialize, Serialize}; pub const COMPACTION_TASK_ID: &str = "compaction_task"; pub const COMPACTION_GROUP_ID: &str = "compaction_group"; pub const SSTABLE_OBJECT_ID: &str = "sstable_object"; pub const META_BACKUP_ID: &str = "meta_backup"; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default, Serialize, Deserialize)] #[sea_orm(table_name = "hummock_sequence")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/hummock_version_delta.rs b/src/meta/model_v2/src/hummock_version_delta.rs index ae69f5eca9a1..317e3c719986 100644 --- a/src/meta/model_v2/src/hummock_version_delta.rs +++ b/src/meta/model_v2/src/hummock_version_delta.rs @@ -14,7 +14,6 @@ use risingwave_pb::hummock::PbHummockVersionDelta; use sea_orm::entity::prelude::*; -use serde::{Deserialize, Serialize}; use crate::{Epoch, HummockVersionId}; diff --git a/src/meta/model_v2/src/index.rs b/src/meta/model_v2/src/index.rs index 8f6dd60d6d76..ca2f39c0f179 100644 --- a/src/meta/model_v2/src/index.rs +++ b/src/meta/model_v2/src/index.rs @@ -15,10 +15,11 @@ use risingwave_pb::catalog::PbIndex; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::{ExprNodeArray, IndexId, TableId}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "index")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 05fb29cb70ba..b34cd5e73e6c 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -43,6 +43,7 @@ pub mod index; pub mod object; pub mod object_dependency; pub mod schema; +pub mod serde_seaql_migration; pub mod session_parameter; pub mod sink; pub mod source; @@ -83,7 +84,7 @@ pub type HummockSstableObjectId = i64; pub type FragmentId = i32; pub type ActorId = i32; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum JobStatus { #[sea_orm(string_value = "INITIAL")] @@ -115,7 +116,7 @@ impl From for PbStreamJobState { } } -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum CreateType { #[sea_orm(string_value = "BACKGROUND")] @@ -170,7 +171,7 @@ macro_rules! derive_from_json_struct { /// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct. macro_rules! derive_from_blob { ($struct_name:ident, $field_type:ty) => { - #[derive(Clone, PartialEq, Eq, Serialize, Deserialize, DeriveValueType)] + #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)] pub struct $struct_name(#[sea_orm] Vec); impl $struct_name { @@ -212,7 +213,7 @@ macro_rules! derive_from_blob { /// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array. macro_rules! derive_array_from_blob { ($struct_name:ident, $field_type:ty, $field_array_name:ident) => { - #[derive(Clone, PartialEq, Eq, DeriveValueType)] + #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)] pub struct $struct_name(#[sea_orm] Vec); #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/meta/model_v2/src/object.rs b/src/meta/model_v2/src/object.rs index 6df5db623ae3..2b9c291f1e4f 100644 --- a/src/meta/model_v2/src/object.rs +++ b/src/meta/model_v2/src/object.rs @@ -13,10 +13,11 @@ // limitations under the License. use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; use crate::{DatabaseId, ObjectId, SchemaId, UserId}; -#[derive(Clone, Debug, PartialEq, Eq, Copy, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, Copy, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum ObjectType { #[sea_orm(string_value = "DATABASE")] @@ -58,7 +59,7 @@ impl ObjectType { } } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "object")] pub struct Model { #[sea_orm(primary_key)] diff --git a/src/meta/model_v2/src/object_dependency.rs b/src/meta/model_v2/src/object_dependency.rs index 0adaacfced3a..d5ca89215a93 100644 --- a/src/meta/model_v2/src/object_dependency.rs +++ b/src/meta/model_v2/src/object_dependency.rs @@ -13,10 +13,11 @@ // limitations under the License. use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; use crate::{ObjectId, UserId}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "object_dependency")] pub struct Model { #[sea_orm(primary_key)] diff --git a/src/meta/model_v2/src/schema.rs b/src/meta/model_v2/src/schema.rs index e1183d0bbbb2..89c1c5c75205 100644 --- a/src/meta/model_v2/src/schema.rs +++ b/src/meta/model_v2/src/schema.rs @@ -15,10 +15,11 @@ use risingwave_pb::catalog::PbSchema; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::SchemaId; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "schema")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/serde_seaql_migration.rs b/src/meta/model_v2/src/serde_seaql_migration.rs new file mode 100644 index 000000000000..6cc75f57c031 --- /dev/null +++ b/src/meta/model_v2/src/serde_seaql_migration.rs @@ -0,0 +1,31 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// It duplicates the one found in crate sea-orm-migration, but derives serde. +/// It's only used by metadata backup/restore. +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] +// One should override the name of migration table via `MigratorTrait::migration_table_name` method +#[sea_orm(table_name = "seaql_migrations")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub version: String, + pub applied_at: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/model_v2/src/session_parameter.rs b/src/meta/model_v2/src/session_parameter.rs index b0623270982f..1fcb3f5bc2da 100644 --- a/src/meta/model_v2/src/session_parameter.rs +++ b/src/meta/model_v2/src/session_parameter.rs @@ -14,7 +14,7 @@ use sea_orm::entity::prelude::*; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, serde::Deserialize, serde::Serialize)] #[sea_orm(table_name = "session_parameter")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index ab7e869daee6..eafa1beee92f 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -15,13 +15,14 @@ use risingwave_pb::catalog::{PbSink, PbSinkType}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::{ ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SinkFormatDesc, SinkId, TableId, }; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum SinkType { #[sea_orm(string_value = "APPEND_ONLY")] @@ -53,7 +54,7 @@ impl From for SinkType { } } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "sink")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model_v2/src/source.rs index 2b0e511e4afe..be2d2f7110ca 100644 --- a/src/meta/model_v2/src/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -16,13 +16,14 @@ use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::PbSource; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::{ ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId, WatermarkDescArray, }; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "source")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/streaming_job.rs b/src/meta/model_v2/src/streaming_job.rs index bbb6ac332f2e..1ab9225f43cd 100644 --- a/src/meta/model_v2/src/streaming_job.rs +++ b/src/meta/model_v2/src/streaming_job.rs @@ -13,10 +13,11 @@ // limitations under the License. use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; use crate::{CreateType, JobStatus, StreamingParallelism}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "streaming_job")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/subscription.rs b/src/meta/model_v2/src/subscription.rs index 8a695c2b4c65..1aa470257449 100644 --- a/src/meta/model_v2/src/subscription.rs +++ b/src/meta/model_v2/src/subscription.rs @@ -15,10 +15,11 @@ use risingwave_pb::catalog::PbSubscription; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::{ColumnCatalogArray, ColumnOrderArray, I32Array, Property, SubscriptionId}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscription")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/system_parameter.rs b/src/meta/model_v2/src/system_parameter.rs index f8447c57ab7a..4e76e8aae7f9 100644 --- a/src/meta/model_v2/src/system_parameter.rs +++ b/src/meta/model_v2/src/system_parameter.rs @@ -13,8 +13,8 @@ // limitations under the License. use sea_orm::entity::prelude::*; - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +use serde::{Deserialize, Serialize}; +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "system_parameter")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 974f1c8defac..75fb66fcb2a5 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -18,13 +18,16 @@ use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; use sea_orm::NotSet; +use serde::{Deserialize, Serialize}; use crate::{ Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId, TableId, TableVersion, }; -#[derive(Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum)] +#[derive( + Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, +)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum TableType { #[sea_orm(string_value = "TABLE")] @@ -60,7 +63,7 @@ impl From for TableType { } } -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum HandleConflictBehavior { #[sea_orm(string_value = "OVERWRITE")] @@ -98,7 +101,7 @@ impl From for HandleConflictBehavior { } } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "table")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/user.rs b/src/meta/model_v2/src/user.rs index f238683a83b0..7991d4f13588 100644 --- a/src/meta/model_v2/src/user.rs +++ b/src/meta/model_v2/src/user.rs @@ -16,10 +16,11 @@ use risingwave_pb::user::PbUserInfo; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; use sea_orm::NotSet; +use serde::{Deserialize, Serialize}; use crate::{AuthInfo, UserId}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "user")] pub struct Model { #[sea_orm(primary_key)] diff --git a/src/meta/model_v2/src/user_privilege.rs b/src/meta/model_v2/src/user_privilege.rs index 0ce0b78242cc..48f9a38a5504 100644 --- a/src/meta/model_v2/src/user_privilege.rs +++ b/src/meta/model_v2/src/user_privilege.rs @@ -14,10 +14,11 @@ use risingwave_pb::user::grant_privilege::PbAction; use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; use crate::{ObjectId, PrivilegeId, UserId}; -#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum Action { #[sea_orm(string_value = "INSERT")] @@ -69,7 +70,7 @@ impl From for PbAction { } } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "user_privilege")] pub struct Model { #[sea_orm(primary_key)] diff --git a/src/meta/model_v2/src/view.rs b/src/meta/model_v2/src/view.rs index 0e32cd227515..54783f8ac387 100644 --- a/src/meta/model_v2/src/view.rs +++ b/src/meta/model_v2/src/view.rs @@ -15,10 +15,11 @@ use risingwave_pb::catalog::PbView; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::{FieldArray, Property, ViewId}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "view")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/model_v2/src/worker.rs b/src/meta/model_v2/src/worker.rs index 8694bb1c6d20..ee2ab9b22d3c 100644 --- a/src/meta/model_v2/src/worker.rs +++ b/src/meta/model_v2/src/worker.rs @@ -16,10 +16,11 @@ use risingwave_pb::common::worker_node::PbState; use risingwave_pb::common::{PbWorkerNode, PbWorkerType}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use serde::{Deserialize, Serialize}; use crate::{TransactionId, WorkerId}; -#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum WorkerType { #[sea_orm(string_value = "FRONTEND")] @@ -59,7 +60,7 @@ impl From for PbWorkerType { } } -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum WorkerStatus { #[sea_orm(string_value = "STARTING")] @@ -101,7 +102,7 @@ impl From<&PbWorkerNode> for ActiveModel { } } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "worker")] pub struct Model { #[sea_orm(primary_key)] diff --git a/src/meta/model_v2/src/worker_property.rs b/src/meta/model_v2/src/worker_property.rs index a20604353282..3ab8d411c8b5 100644 --- a/src/meta/model_v2/src/worker_property.rs +++ b/src/meta/model_v2/src/worker_property.rs @@ -13,10 +13,11 @@ // limitations under the License. use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; use crate::{I32Array, WorkerId}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "worker_property")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index 6fdcb1d13ba3..3c7c2dc2f9bd 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -31,11 +31,11 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use thiserror_ext::AsReport; use tokio::task::JoinHandle; -use crate::backup_restore::meta_snapshot_builder; use crate::backup_restore::metrics::BackupManagerMetrics; +use crate::backup_restore::{meta_snapshot_builder, meta_snapshot_builder_v2}; use crate::hummock::sequence::next_meta_backup_id; use crate::hummock::{HummockManagerRef, HummockVersionSafePoint}; -use crate::manager::{LocalNotification, MetaSrvEnv}; +use crate::manager::{LocalNotification, MetaSrvEnv, MetaStoreImpl}; use crate::rpc::metrics::MetaMetrics; use crate::MetaResult; @@ -345,23 +345,41 @@ impl BackupWorker { fn start(self, job_id: u64, remarks: Option) -> JoinHandle<()> { let backup_manager_clone = self.backup_manager.clone(); let job = async move { - let mut snapshot_builder = meta_snapshot_builder::MetaSnapshotV1Builder::new( - backup_manager_clone.env.meta_store().as_kv().clone(), - ); - // Reuse job id as snapshot id. let hummock_manager = backup_manager_clone.hummock_manager.clone(); - snapshot_builder - .build(job_id, async move { - hummock_manager.get_current_version().await - }) - .await?; - let snapshot = snapshot_builder.finish()?; - backup_manager_clone - .backup_store - .load() - .0 - .create(&snapshot, remarks) - .await?; + let hummock_version_builder = + async move { hummock_manager.get_current_version().await }; + match backup_manager_clone.env.meta_store() { + MetaStoreImpl::Kv(kv) => { + let mut snapshot_builder = + meta_snapshot_builder::MetaSnapshotV1Builder::new(kv.clone()); + // Reuse job id as snapshot id. + snapshot_builder + .build(job_id, hummock_version_builder) + .await?; + let snapshot = snapshot_builder.finish()?; + backup_manager_clone + .backup_store + .load() + .0 + .create(&snapshot, remarks) + .await?; + } + MetaStoreImpl::Sql(sql) => { + let mut snapshot_builder = + meta_snapshot_builder_v2::MetaSnapshotV2Builder::new(sql.clone()); + // Reuse job id as snapshot id. + snapshot_builder + .build(job_id, hummock_version_builder) + .await?; + let snapshot = snapshot_builder.finish()?; + backup_manager_clone + .backup_store + .load() + .0 + .create(&snapshot, remarks) + .await?; + } + } Ok(BackupJobResult::Succeeded) }; tokio::spawn(async move { diff --git a/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs b/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs index 107db4d27119..c2bafcd071c6 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder_v2.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![expect(dead_code, reason = "WIP")] - use std::future::Future; use itertools::Itertools; @@ -23,12 +21,16 @@ use risingwave_backup::MetaSnapshotId; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_meta_model_v2 as model_v2; use risingwave_pb::hummock::PbHummockVersionDelta; -use sea_orm::{EntityTrait, QueryOrder, TransactionTrait}; +use sea_orm::{DbErr, EntityTrait, QueryOrder, TransactionTrait}; use crate::controller::SqlMetaStore; const VERSION: u32 = 2; +fn map_db_err(e: DbErr) -> BackupError { + BackupError::MetaStorage(e.into()) +} + pub struct MetaSnapshotV2Builder { snapshot: MetaSnapshotV2, meta_store: SqlMetaStore, @@ -61,12 +63,12 @@ impl MetaSnapshotV2Builder { Some(sea_orm::AccessMode::ReadOnly), ) .await - .map_err(|e| BackupError::MetaStorage(e.into()))?; + .map_err(map_db_err)?; let version_deltas = model_v2::prelude::HummockVersionDelta::find() .order_by_asc(model_v2::hummock_version_delta::Column::Id) .all(&txn) .await - .map_err(|e| BackupError::MetaStorage(e.into()))? + .map_err(map_db_err)? .into_iter() .map_into::() .map(|pb_delta| HummockVersionDelta::from_persisted_protobuf(&pb_delta)); @@ -89,29 +91,150 @@ impl MetaSnapshotV2Builder { } redo_state }; - let version_stats = model_v2::prelude::HummockVersionStats::find_by_id( - hummock_version.id as model_v2::HummockVersionId, - ) - .one(&txn) - .await - .map_err(|e| BackupError::MetaStorage(e.into()))? - .unwrap_or_else(|| panic!("version stats for version {} not found", hummock_version.id)); + let version_stats = model_v2::prelude::HummockVersionStats::find() + .all(&txn) + .await + .map_err(map_db_err)?; let compaction_configs = model_v2::prelude::CompactionConfig::find() .all(&txn) .await - .map_err(|e| BackupError::MetaStorage(e.into()))?; - - // TODO: other metadata - let cluster_id = "TODO".to_string(); - - txn.commit() + .map_err(map_db_err)?; + let actors = model_v2::prelude::Actor::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let clusters = model_v2::prelude::Cluster::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let actor_dispatchers = model_v2::prelude::ActorDispatcher::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let catalog_versions = model_v2::prelude::CatalogVersion::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let connections = model_v2::prelude::Connection::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let databases = model_v2::prelude::Database::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let fragments = model_v2::prelude::Fragment::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let functions = model_v2::prelude::Function::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let indexes = model_v2::prelude::Index::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let objects = model_v2::prelude::Object::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let object_dependencies = model_v2::prelude::ObjectDependency::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let schemas = model_v2::prelude::Schema::find() + .all(&txn) .await - .map_err(|e| BackupError::MetaStorage(e.into()))?; + .map_err(map_db_err)?; + let sinks = model_v2::prelude::Sink::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let sources = model_v2::prelude::Source::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let streaming_jobs = model_v2::prelude::StreamingJob::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let subscriptions = model_v2::prelude::Subscription::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let system_parameters = model_v2::prelude::SystemParameter::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let tables = model_v2::prelude::Table::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let users = model_v2::prelude::User::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let user_privileges = model_v2::prelude::UserPrivilege::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let views = model_v2::prelude::View::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let workers = model_v2::prelude::Worker::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let worker_properties = model_v2::prelude::WorkerProperty::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let hummock_sequences = model_v2::prelude::HummockSequence::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let seaql_migrations = model_v2::serde_seaql_migration::Entity::find() + .all(&txn) + .await + .map_err(map_db_err)?; + let session_parameters = model_v2::prelude::SessionParameter::find() + .all(&txn) + .await + .map_err(map_db_err)?; + + txn.commit().await.map_err(map_db_err)?; self.snapshot.metadata = MetadataV2 { - cluster_id, + seaql_migrations, hummock_version, version_stats, compaction_configs, + actors, + clusters, + actor_dispatchers, + catalog_versions, + connections, + databases, + fragments, + functions, + indexes, + objects, + object_dependencies, + schemas, + sinks, + sources, + streaming_jobs, + subscriptions, + system_parameters, + tables, + users, + user_privileges, + views, + workers, + worker_properties, + hummock_sequences, + session_parameters, }; Ok(()) } diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index dfc4f37234b3..f742c904ad85 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -41,6 +41,8 @@ pub struct RestoreOpts { /// Type of meta store to restore. #[clap(long, value_enum, default_value_t = MetaBackend::Etcd)] pub meta_store_type: MetaBackend, + #[clap(long, default_value_t = String::from(""))] + pub sql_endpoint: String, /// Etcd endpoints. #[clap(long, default_value_t = String::from(""))] pub etcd_endpoints: String, @@ -140,7 +142,7 @@ async fn restore_impl( match &meta_store { MetaStoreBackendImpl::Sql(m) => { if format_version < 2 { - todo!("write model V1 to meta store V2"); + unimplemented!("not supported: write model V1 to meta store V2"); } else { dispatch( target_id, @@ -227,6 +229,7 @@ mod tests { RestoreOpts { meta_snapshot_id: 1, meta_store_type: MetaBackend::Mem, + sql_endpoint: "".to_string(), etcd_endpoints: "".to_string(), etcd_auth: false, etcd_username: "".to_string(), diff --git a/src/meta/src/backup_restore/restore_impl/v2.rs b/src/meta/src/backup_restore/restore_impl/v2.rs index 5195845f9e25..cccb5c9b641d 100644 --- a/src/meta/src/backup_restore/restore_impl/v2.rs +++ b/src/meta/src/backup_restore/restore_impl/v2.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::iter; - use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::meta_snapshot::MetaSnapshot; use risingwave_backup::meta_snapshot_v2::{MetaSnapshotV2, MetadataV2}; use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef}; use risingwave_backup::MetaSnapshotId; +use sea_orm::{DatabaseBackend, DbBackend, DbErr, Statement}; use crate::backup_restore::restore_impl::{Loader, Writer}; use crate::controller::SqlMetaStore; @@ -36,13 +35,51 @@ impl LoaderV2 { #[async_trait::async_trait] impl Loader for LoaderV2 { async fn load(&self, target_id: MetaSnapshotId) -> BackupResult> { - let target_snapshot: MetaSnapshotV2 = self.backup_store.get(target_id).await?; + let snapshot_list = &self.backup_store.manifest().snapshot_metadata; + let mut target_snapshot: MetaSnapshotV2 = self.backup_store.get(target_id).await?; tracing::info!( "snapshot {} before rewrite:\n{}", target_id, target_snapshot ); - todo!("validate and rewrite seq") + let newest_id = snapshot_list + .iter() + .map(|m| m.id) + .max() + .expect("should exist"); + assert!( + newest_id >= target_id, + "newest_id={}, target_id={}", + newest_id, + target_id + ); + + // validate and rewrite seq + if newest_id > target_id { + let newest_snapshot: MetaSnapshotV2 = self.backup_store.get(newest_id).await?; + for seq in &target_snapshot.metadata.hummock_sequences { + let newest = newest_snapshot + .metadata + .hummock_sequences + .iter() + .find(|s| s.name == seq.name) + .unwrap_or_else(|| { + panic!( + "violate superset requirement. Hummock sequence name {}", + seq.name + ) + }); + assert!(newest.seq >= seq.seq, "violate monotonicity requirement"); + } + target_snapshot.metadata.hummock_sequences = newest_snapshot.metadata.hummock_sequences; + tracing::info!( + "snapshot {} after rewrite by snapshot {}:\n{}", + target_id, + newest_id, + target_snapshot, + ); + } + Ok(target_snapshot) } } @@ -61,12 +98,102 @@ impl Writer for WriterModelV2ToMetaStoreV2 { async fn write(&self, target_snapshot: MetaSnapshot) -> BackupResult<()> { let metadata = target_snapshot.metadata; let db = &self.meta_store.conn; - insert_models(iter::once(metadata.version_stats), db).await?; - insert_models(metadata.compaction_configs, db).await?; - todo!("write other metadata") + insert_models(metadata.seaql_migrations.clone(), db).await?; + insert_models(metadata.clusters.clone(), db).await?; + insert_models(metadata.version_stats.clone(), db).await?; + insert_models(metadata.compaction_configs.clone(), db).await?; + insert_models(metadata.hummock_sequences.clone(), db).await?; + insert_models(metadata.workers.clone(), db).await?; + insert_models(metadata.worker_properties.clone(), db).await?; + insert_models(metadata.users.clone(), db).await?; + insert_models(metadata.user_privileges.clone(), db).await?; + insert_models(metadata.objects.clone(), db).await?; + insert_models(metadata.object_dependencies.clone(), db).await?; + insert_models(metadata.databases.clone(), db).await?; + insert_models(metadata.schemas.clone(), db).await?; + insert_models(metadata.streaming_jobs.clone(), db).await?; + insert_models(metadata.fragments.clone(), db).await?; + insert_models(metadata.actors.clone(), db).await?; + insert_models(metadata.actor_dispatchers.clone(), db).await?; + insert_models(metadata.connections.clone(), db).await?; + insert_models(metadata.sources.clone(), db).await?; + insert_models(metadata.tables.clone(), db).await?; + insert_models(metadata.sinks.clone(), db).await?; + insert_models(metadata.views.clone(), db).await?; + insert_models(metadata.indexes.clone(), db).await?; + insert_models(metadata.functions.clone(), db).await?; + insert_models(metadata.system_parameters.clone(), db).await?; + insert_models(metadata.catalog_versions.clone(), db).await?; + insert_models(metadata.subscriptions.clone(), db).await?; + insert_models(metadata.session_parameters.clone(), db).await?; + + // update_auto_inc must be called last. + update_auto_inc(&metadata, db).await?; + Ok(()) } } +fn map_db_err(e: DbErr) -> BackupError { + BackupError::MetaStorage(e.into()) +} + +// TODO: the code snippet is similar to the one found in migration.rs +async fn update_auto_inc( + metadata: &MetadataV2, + db: &impl sea_orm::ConnectionTrait, +) -> BackupResult<()> { + match db.get_database_backend() { + DbBackend::MySql => { + if let Some(next_worker_id) = metadata.workers.iter().map(|w| w.worker_id + 1).max() { + db.execute(Statement::from_string( + DatabaseBackend::MySql, + format!("ALTER TABLE worker AUTO_INCREMENT = {next_worker_id};"), + )) + .await + .map_err(map_db_err)?; + } + if let Some(next_object_id) = metadata.objects.iter().map(|o| o.oid + 1).max() { + db.execute(Statement::from_string( + DatabaseBackend::MySql, + format!("ALTER TABLE object AUTO_INCREMENT = {next_object_id};"), + )) + .await + .map_err(map_db_err)?; + } + if let Some(next_user_id) = metadata.users.iter().map(|u| u.user_id + 1).max() { + db.execute(Statement::from_string( + DatabaseBackend::MySql, + format!("ALTER TABLE user AUTO_INCREMENT = {next_user_id};"), + )) + .await + .map_err(map_db_err)?; + } + } + DbBackend::Postgres => { + db.execute(Statement::from_string( + DatabaseBackend::Postgres, + "SELECT setval('worker_worker_id_seq', (SELECT MAX(worker_id) FROM worker));", + )) + .await + .map_err(map_db_err)?; + db.execute(Statement::from_string( + DatabaseBackend::Postgres, + "SELECT setval('object_oid_seq', (SELECT MAX(oid) FROM object) + 1);", + )) + .await + .map_err(map_db_err)?; + db.execute(Statement::from_string( + DatabaseBackend::Postgres, + "SELECT setval('user_user_id_seq', (SELECT MAX(user_id) FROM \"user\") + 1);", + )) + .await + .map_err(map_db_err)?; + } + DbBackend::Sqlite => {} + } + Ok(()) +} + async fn insert_models( models: impl IntoIterator, db: &impl sea_orm::ConnectionTrait, @@ -81,16 +208,13 @@ where if ::Entity::find() .one(db) .await - .map_err(|e| BackupError::MetaStorage(e.into()))? + .map_err(map_db_err)? .is_some() { return Err(BackupError::NonemptyMetaStorage); } for m in models { - m.into_active_model() - .insert(db) - .await - .map_err(|e| BackupError::MetaStorage(e.into()))?; + m.into_active_model().insert(db).await.map_err(map_db_err)?; } Ok(()) } diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index 48eec28f4ffe..521acaf557af 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -17,11 +17,12 @@ use std::time::Duration; use anyhow::Context; use etcd_client::ConnectOptions; -use risingwave_backup::error::BackupResult; +use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage}; use risingwave_common::config::{MetaBackend, ObjectStoreConfig}; use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; +use sea_orm::DbBackend; use crate::backup_restore::RestoreOpts; use crate::controller::SqlMetaStore; @@ -32,7 +33,6 @@ use crate::MetaStoreBackend; pub enum MetaStoreBackendImpl { Etcd(EtcdMetaStore), Mem(MemStore), - #[expect(dead_code, reason = "WIP")] Sql(SqlMetaStore), } @@ -62,7 +62,9 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult MetaStoreBackend::Mem, - MetaBackend::Sql => panic!("not supported"), + MetaBackend::Sql => MetaStoreBackend::Sql { + endpoint: opts.sql_endpoint, + }, }; match meta_store_backend { MetaStoreBackend::Etcd { @@ -80,7 +82,25 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult Ok(MetaStoreBackendImpl::Mem(MemStore::new())), - MetaStoreBackend::Sql { .. } => panic!("not supported"), + MetaStoreBackend::Sql { endpoint } => { + let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) { + // Due to the fact that Sqlite is prone to the error "(code: 5) database is locked" under concurrent access, + // here we forcibly specify the number of connections as 1. + 1 + } else { + 10 + }; + let mut options = sea_orm::ConnectOptions::new(endpoint); + options + .max_connections(max_connection) + .connect_timeout(Duration::from_secs(10)) + .idle_timeout(Duration::from_secs(30)); + let conn = sea_orm::Database::connect(options) + .await + .map_err(|e| BackupError::MetaStorage(e.into()))?; + let meta_store_sql = SqlMetaStore::new(conn); + Ok(MetaStoreBackendImpl::Sql(meta_store_sql)) + } } } diff --git a/src/storage/backup/integration_tests/Makefile.toml b/src/storage/backup/integration_tests/Makefile.toml index ad5b2810c9f1..3e177cda5d43 100644 --- a/src/storage/backup/integration_tests/Makefile.toml +++ b/src/storage/backup/integration_tests/Makefile.toml @@ -13,5 +13,6 @@ BACKUP_TEST_MCLI_CONFIG="${PREFIX_CONFIG}/mcli" \ BACKUP_TEST_RW_ALL_IN_ONE="${BUILD_BIN}/risingwave" \ RW_HUMMOCK_URL="hummock+minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001" \ RW_META_ADDR="http://127.0.0.1:5690" \ +RW_SQLITE_DB="${PREFIX_DATA}/sqlite/metadata.db" \ bash "${test_root}/run_all.sh" """ diff --git a/src/storage/backup/integration_tests/common.sh b/src/storage/backup/integration_tests/common.sh index 1c1af196691f..6ac011f8d44e 100644 --- a/src/storage/backup/integration_tests/common.sh +++ b/src/storage/backup/integration_tests/common.sh @@ -13,13 +13,52 @@ function clean_all_data { cargo make --allow-private clean-data 1>/dev/null 2>&1 } +function get_meta_store_type() { + meta_store_type=${META_STORE_TYPE:-etcd} + if [ "${meta_store_type}" = "sql" ] + then + if ! command -v sqlite3 &> /dev/null; + then + echo "SQLite3 is not installed." + exit 1 + fi + fi + echo "${meta_store_type}" +} + +echo "meta store: $(get_meta_store_type)" + +function clean_meta_store() { + meta_store_type=$(get_meta_store_type) + if [ "$(get_meta_store_type)" = "sql" ]; then + clean_sqlite_data + else + clean_etcd_data + fi +} + +function clean_sqlite_data() { + tables=$(sqlite3 "${RW_SQLITE_DB}" "select name from sqlite_master where type='table';") + while IFS= read table + do + if [ -z "${table}" ]; then + break + fi + sqlite3 "${RW_SQLITE_DB}" "delete from [${table}]" + done <<< "${tables}" +} + function clean_etcd_data() { cargo make --allow-private clean-etcd-data 1>/dev/null 2>&1 } function start_cluster() { stop_cluster - cargo make d ci-meta-backup-test 1>/dev/null 2>&1 + if [ "$(get_meta_store_type)" = "sql" ]; then + cargo make d ci-meta-backup-test-sql 1>/dev/null 2>&1 + else + cargo make d ci-meta-backup-test-etcd 1>/dev/null 2>&1 + fi sleep 5 } @@ -34,8 +73,20 @@ function manual_compaction() { ${BACKUP_TEST_RW_ALL_IN_ONE} risectl hummock trigger-manual-compaction "$@" 1>/dev/null 2>&1 } +function start_meta_store_minio() { + if [ "$(get_meta_store_type)" = "sql" ]; then + start_sql_minio + else + start_etcd_minio + fi +} + +function start_sql_minio() { + cargo make d ci-meta-backup-test-restore-sql 1>/dev/null 2>&1 +} + function start_etcd_minio() { - cargo make d ci-meta-backup-test-restore 1>/dev/null 2>&1 + cargo make d ci-meta-backup-test-restore-etcd 1>/dev/null 2>&1 } function create_mvs() { @@ -66,17 +117,19 @@ function delete_snapshot() { function restore() { local job_id job_id=$1 + meta_store_type=$(get_meta_store_type) echo "try to restore snapshot ${job_id}" stop_cluster - clean_etcd_data - start_etcd_minio + clean_meta_store + start_meta_store_minio ${BACKUP_TEST_RW_ALL_IN_ONE} \ risectl \ meta \ restore-meta \ - --meta-store-type etcd \ + --meta-store-type "${meta_store_type}" \ --meta-snapshot-id "${job_id}" \ --etcd-endpoints 127.0.0.1:2388 \ + --sql-endpoint "sqlite://${RW_SQLITE_DB}?mode=rwc" \ --backup-storage-url minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001 \ --hummock-storage-url minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001 \ 1>/dev/null 2>&1 diff --git a/src/storage/backup/src/error.rs b/src/storage/backup/src/error.rs index 8bac7c4b1c5d..2a65584749b9 100644 --- a/src/storage/backup/src/error.rs +++ b/src/storage/backup/src/error.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use bincode::Error; use risingwave_common::error::BoxedError; use thiserror::Error; @@ -61,10 +60,3 @@ pub enum BackupError { anyhow::Error, ), } - -impl From for BackupError { - fn from(value: Error) -> Self { - // TODO: match error - BackupError::Other(value.into()) - } -} diff --git a/src/storage/backup/src/meta_snapshot_v2.rs b/src/storage/backup/src/meta_snapshot_v2.rs index 20f620f8b654..feb5f12540d0 100644 --- a/src/storage/backup/src/meta_snapshot_v2.rs +++ b/src/storage/backup/src/meta_snapshot_v2.rs @@ -20,41 +20,92 @@ use risingwave_meta_model_v2 as model_v2; use serde::{Deserialize, Serialize}; use crate::meta_snapshot::{MetaSnapshot, Metadata}; -use crate::BackupResult; - +use crate::{BackupError, BackupResult}; pub type MetaSnapshotV2 = MetaSnapshot; +impl From for BackupError { + fn from(value: serde_json::Error) -> Self { + BackupError::Other(value.into()) + } +} + #[derive(Default)] pub struct MetadataV2 { - pub cluster_id: String, + pub seaql_migrations: Vec, pub hummock_version: HummockVersion, - pub version_stats: model_v2::hummock_version_stats::Model, + pub version_stats: Vec, pub compaction_configs: Vec, - // TODO other metadata + pub actors: Vec, + pub clusters: Vec, + pub actor_dispatchers: Vec, + pub catalog_versions: Vec, + pub connections: Vec, + pub databases: Vec, + pub fragments: Vec, + pub functions: Vec, + pub indexes: Vec, + pub objects: Vec, + pub object_dependencies: Vec, + pub schemas: Vec, + pub sinks: Vec, + pub sources: Vec, + pub streaming_jobs: Vec, + pub subscriptions: Vec, + pub system_parameters: Vec, + pub tables: Vec, + pub users: Vec, + pub user_privileges: Vec, + pub views: Vec, + pub workers: Vec, + pub worker_properties: Vec, + pub hummock_sequences: Vec, + pub session_parameters: Vec, } impl Display for MetadataV2 { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - writeln!(f, "cluster_id:")?; - writeln!(f, "{:#?}", self.cluster_id)?; - writeln!(f, "hummock_version:")?; - writeln!(f, "{:#?}", self.hummock_version)?; - writeln!(f, "version_stats:")?; - writeln!(f, "{:#?}", self.version_stats)?; - writeln!(f, "compaction_configs:")?; - writeln!(f, "{:#?}", self.compaction_configs)?; - // TODO: other metadata + writeln!(f, "clusters: {:#?}", self.clusters)?; + writeln!( + f, + "Hummock version: id {}, max_committed_epoch: {}", + self.hummock_version.id, self.hummock_version.max_committed_epoch + )?; + // optionally dump other metadata Ok(()) } } impl Metadata for MetadataV2 { fn encode_to(&self, buf: &mut Vec) -> BackupResult<()> { - put_with_len_prefix(buf, &self.cluster_id)?; - put_with_len_prefix(buf, &self.hummock_version.to_protobuf())?; - put_with_len_prefix(buf, &self.version_stats)?; - put_with_len_prefix(buf, &self.compaction_configs)?; - // TODO: other metadata + put_n(buf, &self.seaql_migrations)?; + put_1(buf, &self.hummock_version.to_protobuf())?; + put_n(buf, &self.version_stats)?; + put_n(buf, &self.compaction_configs)?; + put_n(buf, &self.actors)?; + put_n(buf, &self.clusters)?; + put_n(buf, &self.actor_dispatchers)?; + put_n(buf, &self.catalog_versions)?; + put_n(buf, &self.connections)?; + put_n(buf, &self.databases)?; + put_n(buf, &self.fragments)?; + put_n(buf, &self.functions)?; + put_n(buf, &self.indexes)?; + put_n(buf, &self.objects)?; + put_n(buf, &self.object_dependencies)?; + put_n(buf, &self.schemas)?; + put_n(buf, &self.sinks)?; + put_n(buf, &self.sources)?; + put_n(buf, &self.streaming_jobs)?; + put_n(buf, &self.subscriptions)?; + put_n(buf, &self.system_parameters)?; + put_n(buf, &self.tables)?; + put_n(buf, &self.users)?; + put_n(buf, &self.user_privileges)?; + put_n(buf, &self.views)?; + put_n(buf, &self.workers)?; + put_n(buf, &self.worker_properties)?; + put_n(buf, &self.hummock_sequences)?; + put_n(buf, &self.session_parameters)?; Ok(()) } @@ -62,16 +113,65 @@ impl Metadata for MetadataV2 { where Self: Sized, { - let cluster_id = get_with_len_prefix(&mut buf)?; - let pb_hummock_version = get_with_len_prefix(&mut buf)?; - let version_stats = get_with_len_prefix(&mut buf)?; - let compaction_configs = get_with_len_prefix(&mut buf)?; - // TODO: other metadata + let seaql_migrations = get_n(&mut buf)?; + let pb_hummock_version = get_1(&mut buf)?; + let version_stats = get_n(&mut buf)?; + let compaction_configs = get_n(&mut buf)?; + let actors = get_n(&mut buf)?; + let clusters = get_n(&mut buf)?; + let actor_dispatchers = get_n(&mut buf)?; + let catalog_versions = get_n(&mut buf)?; + let connections = get_n(&mut buf)?; + let databases = get_n(&mut buf)?; + let fragments = get_n(&mut buf)?; + let functions = get_n(&mut buf)?; + let indexes = get_n(&mut buf)?; + let objects = get_n(&mut buf)?; + let object_dependencies = get_n(&mut buf)?; + let schemas = get_n(&mut buf)?; + let sinks = get_n(&mut buf)?; + let sources = get_n(&mut buf)?; + let streaming_jobs = get_n(&mut buf)?; + let subscriptions = get_n(&mut buf)?; + let system_parameters = get_n(&mut buf)?; + let tables = get_n(&mut buf)?; + let users = get_n(&mut buf)?; + let user_privileges = get_n(&mut buf)?; + let views = get_n(&mut buf)?; + let workers = get_n(&mut buf)?; + let worker_properties = get_n(&mut buf)?; + let hummock_sequences = get_n(&mut buf)?; + let session_parameters = get_n(&mut buf)?; Ok(Self { - cluster_id, + seaql_migrations, hummock_version: HummockVersion::from_persisted_protobuf(&pb_hummock_version), version_stats, compaction_configs, + actors, + clusters, + actor_dispatchers, + catalog_versions, + connections, + databases, + fragments, + functions, + indexes, + objects, + object_dependencies, + schemas, + sinks, + sources, + streaming_jobs, + subscriptions, + system_parameters, + tables, + users, + user_privileges, + views, + workers, + worker_properties, + hummock_sequences, + session_parameters, }) } @@ -84,8 +184,39 @@ impl Metadata for MetadataV2 { } } -fn put_with_len_prefix(buf: &mut Vec, data: &T) -> Result<(), bincode::Error> { - let b = bincode::serialize(data)?; +fn put_n(buf: &mut Vec, data: &[T]) -> Result<(), serde_json::Error> { + buf.put_u32_le( + data.len() + .try_into() + .unwrap_or_else(|_| panic!("cannot convert {} into u32", data.len())), + ); + for d in data { + put_with_len_prefix(buf, d)?; + } + Ok(()) +} + +fn put_1(buf: &mut Vec, data: &T) -> Result<(), serde_json::Error> { + put_n(buf, &[data]) +} + +fn get_n<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result, serde_json::Error> { + let n = buf.get_u32_le() as usize; + let mut elements = Vec::with_capacity(n); + for _ in 0..n { + elements.push(get_with_len_prefix(buf)?); + } + Ok(elements) +} + +fn get_1<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result { + let elements = get_n(buf)?; + assert_eq!(elements.len(), 1); + Ok(elements.into_iter().next().unwrap()) +} + +fn put_with_len_prefix(buf: &mut Vec, data: &T) -> Result<(), serde_json::Error> { + let b = serde_json::to_vec(data)?; buf.put_u32_le( b.len() .try_into() @@ -95,9 +226,9 @@ fn put_with_len_prefix(buf: &mut Vec, data: &T) -> Result<(), Ok(()) } -fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result { +fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result { let len = buf.get_u32_le() as usize; - let d = bincode::deserialize(&buf[..len])?; + let d = serde_json::from_slice(&buf[..len])?; buf.advance(len); Ok(d) } diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs index cd0a231420c1..f5c78c05c1bd 100644 --- a/src/storage/src/hummock/backup_reader.rs +++ b/src/storage/src/hummock/backup_reader.rs @@ -24,7 +24,7 @@ use futures::FutureExt; use risingwave_backup::error::BackupError; use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata}; use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage}; -use risingwave_backup::{meta_snapshot_v1, MetaSnapshotId}; +use risingwave_backup::{meta_snapshot_v1, meta_snapshot_v2, MetaSnapshotId}; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; @@ -187,19 +187,17 @@ impl BackupReader { // Use the same store throughout the call. let current_store = self.store.load_full(); // 1. check manifest to locate snapshot, if any. - let snapshot_id = current_store + let Some(snapshot_metadata) = current_store .0 .manifest() .snapshot_metadata .iter() .find(|v| epoch >= v.safe_epoch && epoch <= v.max_committed_epoch) - .map(|s| s.id); - let snapshot_id = match snapshot_id { - None => { - return Ok(None); - } - Some(s) => s, + .cloned() + else { + return Ok(None); }; + let snapshot_id = snapshot_metadata.id; // 2. load hummock version of chosen snapshot. let future = { let mut req_guard = self.inflight_request.lock(); @@ -211,16 +209,28 @@ impl BackupReader { } else { let this = self.clone(); let f = async move { - // TODO: change to v2 - let snapshot: meta_snapshot_v1::MetaSnapshotV1 = - current_store.0.get(snapshot_id).await.map_err(|e| { - format!( - "failed to get meta snapshot {}: {}", - snapshot_id, - e.as_report() - ) - })?; - let version_holder = build_version_holder(snapshot); + let to_not_found_error = |e: BackupError| { + format!( + "failed to get meta snapshot {}: {}", + snapshot_id, + e.as_report() + ) + }; + let version_holder = if snapshot_metadata.format_version < 2 { + let snapshot: meta_snapshot_v1::MetaSnapshotV1 = current_store + .0 + .get(snapshot_id) + .await + .map_err(to_not_found_error)?; + build_version_holder(snapshot) + } else { + let snapshot: meta_snapshot_v2::MetaSnapshotV2 = current_store + .0 + .get(snapshot_id) + .await + .map_err(to_not_found_error)?; + build_version_holder(snapshot) + }; let version_clone = version_holder.0.clone(); this.versions.write().insert(snapshot_id, version_holder); Ok(version_clone)