Skip to content

Commit

Permalink
Add parallelism column to WorkerProperty migration
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jul 4, 2024
1 parent 21065d2 commit 41117dd
Showing 1 changed file with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use sea_orm_migration::prelude::*;
use serde::{Deserialize, Serialize};

use crate::sea_orm::{FromJsonQueryResult, FromQueryResult, Statement};

#[derive(DeriveMigrationName)]
pub struct Migration;
Expand All @@ -10,15 +13,13 @@ impl MigrationTrait for Migration {
.alter_table(
Table::alter()
.table(WorkerProperty::Table)
.add_column(
ColumnDef::new(WorkerProperty::Parallelism)
.integer()
.not_null(),
)
.add_column(ColumnDef::new(WorkerProperty::Parallelism).integer())
.to_owned(),
)
.await?;

set_worker_parallelism(manager).await?;

manager
.alter_table(
Table::alter()
Expand Down Expand Up @@ -90,9 +91,58 @@ impl MigrationTrait for Migration {
}
}

// Set worker parallelism based on the number of parallel unit ids
async fn set_worker_parallelism(manager: &SchemaManager<'_>) -> Result<(), DbErr> {
let connection = manager.get_connection();

let database_backend = connection.get_database_backend();

let (sql, values) = Query::select()
.columns([
(WorkerProperty::Table, WorkerProperty::WorkerId),
(WorkerProperty::Table, WorkerProperty::ParallelUnitIds),
])
.from(WorkerProperty::Table)
.to_owned()
.build_any(&*database_backend.get_query_builder());

let stmt = Statement::from_sql_and_values(database_backend, sql, values);

for WorkerPropertyParallelUnitIds {
worker_id,
parallel_unit_ids,
} in WorkerPropertyParallelUnitIds::find_by_statement(stmt)
.all(connection)
.await?
{
manager
.exec_stmt(
Query::update()
.table(WorkerProperty::Table)
.value(
WorkerProperty::Parallelism,
Expr::value(parallel_unit_ids.0.len() as i32),
)
.and_where(Expr::col(WorkerProperty::WorkerId).eq(worker_id))
.to_owned(),
)
.await?;
}
Ok(())
}
#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
pub struct I32Array(pub Vec<i32>);

#[derive(Debug, FromQueryResult)]
pub struct WorkerPropertyParallelUnitIds {
worker_id: i32,
parallel_unit_ids: I32Array,
}

#[derive(DeriveIden)]
enum WorkerProperty {
Table,
WorkerId,
Parallelism,
ParallelUnitIds,
}
Expand Down

0 comments on commit 41117dd

Please sign in to comment.