diff --git a/src/meta/src/backup_restore/restore_impl/v2.rs b/src/meta/src/backup_restore/restore_impl/v2.rs index 938050ce4d300..ab67b0f1cb7c4 100644 --- a/src/meta/src/backup_restore/restore_impl/v2.rs +++ b/src/meta/src/backup_restore/restore_impl/v2.rs @@ -17,7 +17,7 @@ use risingwave_backup::meta_snapshot::MetaSnapshot; use risingwave_backup::meta_snapshot_v2::{MetaSnapshotV2, MetadataV2}; use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef}; use risingwave_backup::MetaSnapshotId; -use sea_orm::{DatabaseBackend, DbBackend, DbErr, Statement}; +use sea_orm::DbErr; use crate::backup_restore::restore_impl::{Loader, Writer}; use crate::controller::SqlMetaStore; @@ -137,60 +137,56 @@ fn map_db_err(e: DbErr) -> BackupError { BackupError::MetaStorage(e.into()) } -// TODO: the code snippet is similar to the one found in migration.rs -async fn update_auto_inc( - metadata: &MetadataV2, - db: &impl sea_orm::ConnectionTrait, -) -> BackupResult<()> { - match db.get_database_backend() { - DbBackend::MySql => { - if let Some(next_worker_id) = metadata.workers.iter().map(|w| w.worker_id + 1).max() { - db.execute(Statement::from_string( - DatabaseBackend::MySql, - format!("ALTER TABLE worker AUTO_INCREMENT = {next_worker_id};"), - )) - .await - .map_err(map_db_err)?; +#[macro_export] +macro_rules! for_all_auto_increment { + ($metadata:ident, $db:ident, $macro:ident) => { + $macro! ($metadata, $db, + {"worker", workers, worker_id}, + {"object", objects, oid}, + {"user", users, user_id}, + {"user_privilege", user_privileges, id}, + {"actor", actors, actor_id}, + {"actor_dispatcher", actor_dispatchers, id}, + {"fragment", fragments, fragment_id}, + {"object_dependency", object_dependencies, id} + ) + }; +} + +macro_rules! reset_mysql_sequence { + ($metadata:ident, $db:ident, $( {$table:expr, $model:ident, $id_field:ident} ),*) => { + $( + match $db.get_database_backend() { + sea_orm::DbBackend::MySql => { + if let Some(v) = $metadata.$model.iter().map(|w| w.$id_field + 1).max() { + $db.execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::MySql, + format!("ALTER TABLE {} AUTO_INCREMENT = {};", $table, v), + )) + .await + .map_err(map_db_err)?; + } } - if let Some(next_object_id) = metadata.objects.iter().map(|o| o.oid + 1).max() { - db.execute(Statement::from_string( - DatabaseBackend::MySql, - format!("ALTER TABLE object AUTO_INCREMENT = {next_object_id};"), + sea_orm::DbBackend::Postgres => { + $db.execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Postgres, + format!("SELECT setval('{}_{}_seq', (SELECT MAX({}) FROM \"{}\"));", $table, stringify!($id_field), stringify!($id_field), $table), )) .await .map_err(map_db_err)?; } - if let Some(next_user_id) = metadata.users.iter().map(|u| u.user_id + 1).max() { - db.execute(Statement::from_string( - DatabaseBackend::MySql, - format!("ALTER TABLE user AUTO_INCREMENT = {next_user_id};"), - )) - .await - .map_err(map_db_err)?; + sea_orm::DbBackend::Sqlite => {} } - } - DbBackend::Postgres => { - db.execute(Statement::from_string( - DatabaseBackend::Postgres, - "SELECT setval('worker_worker_id_seq', (SELECT MAX(worker_id) FROM worker));", - )) - .await - .map_err(map_db_err)?; - db.execute(Statement::from_string( - DatabaseBackend::Postgres, - "SELECT setval('object_oid_seq', (SELECT MAX(oid) FROM object) + 1);", - )) - .await - .map_err(map_db_err)?; - db.execute(Statement::from_string( - DatabaseBackend::Postgres, - "SELECT setval('user_user_id_seq', (SELECT MAX(user_id) FROM \"user\") + 1);", - )) - .await - .map_err(map_db_err)?; - } - DbBackend::Sqlite => {} - } + )* + }; +} + +/// Fixes `auto_increment` fields. +async fn update_auto_inc( + metadata: &MetadataV2, + db: &impl sea_orm::ConnectionTrait, +) -> BackupResult<()> { + for_all_auto_increment!(metadata, db, reset_mysql_sequence); Ok(()) }