Skip to content

Commit

Permalink
Merge branch 'release-2.1' into auto-release-2.1-82a80631aa94b45284ff…
Browse files Browse the repository at this point in the history
…68c07e61c474a5c6132d-1731055571
  • Loading branch information
lmatz authored Nov 8, 2024
2 parents e65a1f1 + b5d291a commit 830f35e
Showing 1 changed file with 45 additions and 49 deletions.
94 changes: 45 additions & 49 deletions src/meta/src/backup_restore/restore_impl/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_backup::meta_snapshot::MetaSnapshot;
use risingwave_backup::meta_snapshot_v2::{MetaSnapshotV2, MetadataV2};
use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
use risingwave_backup::MetaSnapshotId;
use sea_orm::{DatabaseBackend, DbBackend, DbErr, Statement};
use sea_orm::DbErr;

use crate::backup_restore::restore_impl::{Loader, Writer};
use crate::controller::SqlMetaStore;
Expand Down Expand Up @@ -137,60 +137,56 @@ fn map_db_err(e: DbErr) -> BackupError {
BackupError::MetaStorage(e.into())
}

// TODO: the code snippet is similar to the one found in migration.rs
async fn update_auto_inc(
metadata: &MetadataV2,
db: &impl sea_orm::ConnectionTrait,
) -> BackupResult<()> {
match db.get_database_backend() {
DbBackend::MySql => {
if let Some(next_worker_id) = metadata.workers.iter().map(|w| w.worker_id + 1).max() {
db.execute(Statement::from_string(
DatabaseBackend::MySql,
format!("ALTER TABLE worker AUTO_INCREMENT = {next_worker_id};"),
))
.await
.map_err(map_db_err)?;
#[macro_export]
macro_rules! for_all_auto_increment {
($metadata:ident, $db:ident, $macro:ident) => {
$macro! ($metadata, $db,
{"worker", workers, worker_id},
{"object", objects, oid},
{"user", users, user_id},
{"user_privilege", user_privileges, id},
{"actor", actors, actor_id},
{"actor_dispatcher", actor_dispatchers, id},
{"fragment", fragments, fragment_id},
{"object_dependency", object_dependencies, id}
)
};
}

macro_rules! reset_mysql_sequence {
($metadata:ident, $db:ident, $( {$table:expr, $model:ident, $id_field:ident} ),*) => {
$(
match $db.get_database_backend() {
sea_orm::DbBackend::MySql => {
if let Some(v) = $metadata.$model.iter().map(|w| w.$id_field + 1).max() {
$db.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::MySql,
format!("ALTER TABLE {} AUTO_INCREMENT = {};", $table, v),
))
.await
.map_err(map_db_err)?;
}
}
if let Some(next_object_id) = metadata.objects.iter().map(|o| o.oid + 1).max() {
db.execute(Statement::from_string(
DatabaseBackend::MySql,
format!("ALTER TABLE object AUTO_INCREMENT = {next_object_id};"),
sea_orm::DbBackend::Postgres => {
$db.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
format!("SELECT setval('{}_{}_seq', (SELECT MAX({}) FROM \"{}\"));", $table, stringify!($id_field), stringify!($id_field), $table),
))
.await
.map_err(map_db_err)?;
}
if let Some(next_user_id) = metadata.users.iter().map(|u| u.user_id + 1).max() {
db.execute(Statement::from_string(
DatabaseBackend::MySql,
format!("ALTER TABLE user AUTO_INCREMENT = {next_user_id};"),
))
.await
.map_err(map_db_err)?;
sea_orm::DbBackend::Sqlite => {}
}
}
DbBackend::Postgres => {
db.execute(Statement::from_string(
DatabaseBackend::Postgres,
"SELECT setval('worker_worker_id_seq', (SELECT MAX(worker_id) FROM worker));",
))
.await
.map_err(map_db_err)?;
db.execute(Statement::from_string(
DatabaseBackend::Postgres,
"SELECT setval('object_oid_seq', (SELECT MAX(oid) FROM object) + 1);",
))
.await
.map_err(map_db_err)?;
db.execute(Statement::from_string(
DatabaseBackend::Postgres,
"SELECT setval('user_user_id_seq', (SELECT MAX(user_id) FROM \"user\") + 1);",
))
.await
.map_err(map_db_err)?;
}
DbBackend::Sqlite => {}
}
)*
};
}

/// Fixes `auto_increment` fields.
async fn update_auto_inc(
metadata: &MetadataV2,
db: &impl sea_orm::ConnectionTrait,
) -> BackupResult<()> {
for_all_auto_increment!(metadata, db, reset_mysql_sequence);
Ok(())
}

Expand Down

0 comments on commit 830f35e

Please sign in to comment.