From 41117ddc5c25428742b41f056713b283189ff02c Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 4 Jul 2024 14:13:19 +0800 Subject: [PATCH] Add parallelism column to WorkerProperty migration --- .../m20240630_131430_remove_parallel_unit.rs | 60 +++++++++++++++++-- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/src/meta/model_v2/migration/src/m20240630_131430_remove_parallel_unit.rs b/src/meta/model_v2/migration/src/m20240630_131430_remove_parallel_unit.rs index bd0fb500c906..5acc8dc1a616 100644 --- a/src/meta/model_v2/migration/src/m20240630_131430_remove_parallel_unit.rs +++ b/src/meta/model_v2/migration/src/m20240630_131430_remove_parallel_unit.rs @@ -1,4 +1,7 @@ use sea_orm_migration::prelude::*; +use serde::{Deserialize, Serialize}; + +use crate::sea_orm::{FromJsonQueryResult, FromQueryResult, Statement}; #[derive(DeriveMigrationName)] pub struct Migration; @@ -10,15 +13,13 @@ impl MigrationTrait for Migration { .alter_table( Table::alter() .table(WorkerProperty::Table) - .add_column( - ColumnDef::new(WorkerProperty::Parallelism) - .integer() - .not_null(), - ) + .add_column(ColumnDef::new(WorkerProperty::Parallelism).integer()) .to_owned(), ) .await?; + set_worker_parallelism(manager).await?; + manager .alter_table( Table::alter() @@ -90,9 +91,58 @@ impl MigrationTrait for Migration { } } +// Set worker parallelism based on the number of parallel unit ids +async fn set_worker_parallelism(manager: &SchemaManager<'_>) -> Result<(), DbErr> { + let connection = manager.get_connection(); + + let database_backend = connection.get_database_backend(); + + let (sql, values) = Query::select() + .columns([ + (WorkerProperty::Table, WorkerProperty::WorkerId), + (WorkerProperty::Table, WorkerProperty::ParallelUnitIds), + ]) + .from(WorkerProperty::Table) + .to_owned() + .build_any(&*database_backend.get_query_builder()); + + let stmt = Statement::from_sql_and_values(database_backend, sql, values); + + for WorkerPropertyParallelUnitIds { + worker_id, + parallel_unit_ids, + } in WorkerPropertyParallelUnitIds::find_by_statement(stmt) + .all(connection) + .await? + { + manager + .exec_stmt( + Query::update() + .table(WorkerProperty::Table) + .value( + WorkerProperty::Parallelism, + Expr::value(parallel_unit_ids.0.len() as i32), + ) + .and_where(Expr::col(WorkerProperty::WorkerId).eq(worker_id)) + .to_owned(), + ) + .await?; + } + Ok(()) +} +#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)] +pub struct I32Array(pub Vec); + +#[derive(Debug, FromQueryResult)] +pub struct WorkerPropertyParallelUnitIds { + worker_id: i32, + parallel_unit_ids: I32Array, +} + #[derive(DeriveIden)] enum WorkerProperty { Table, + WorkerId, Parallelism, ParallelUnitIds, }