diff --git a/src/domain/task-system/services/src/task_agent_impl.rs b/src/domain/task-system/services/src/task_agent_impl.rs index 84adf0a9b2..2aa4f0ad7f 100644 --- a/src/domain/task-system/services/src/task_agent_impl.rs +++ b/src/domain/task-system/services/src/task_agent_impl.rs @@ -86,19 +86,21 @@ impl TaskAgentImpl { }) .try_collect() .await?; + let batch_size = running_task_ids.len(); - for running_task_id in &running_task_ids { - // TODO: batch loading of tasks - let mut task = Task::load(*running_task_id, task_event_store.as_ref()) - .await - .int_err()?; + let tasks = Task::load_multi(running_task_ids, task_event_store.as_ref()) + .await + .int_err()?; + + for task in tasks { + let mut t = task.int_err()?; // Requeue - task.requeue(self.time_source.now()).int_err()?; - task.save(task_event_store.as_ref()).await.int_err()?; + t.requeue(self.time_source.now()).int_err()?; + t.save(task_event_store.as_ref()).await.int_err()?; } - processed_running_tasks += running_task_ids.len(); + processed_running_tasks += batch_size; } Ok(()) diff --git a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs index 365fd33d16..e880aa0fba 100644 --- a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs +++ b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs @@ -388,7 +388,7 @@ impl EventStore for PostgresFlowEventStore { let event = serde_json::from_value::(event_row.event_payload) .map_err(|e| sqlx::Error::Decode(Box::new(e)))?; - Ok((FlowID::try_from(event_row.flow_id).unwrap(), // todo: handle error + Ok((FlowID::try_from(event_row.flow_id).unwrap(), // ids are always > 0 EventID::new(event_row.event_id), event)) }) diff --git a/src/infra/task-system/postgres/.sqlx/query-4ab9fb67546df9ac01da967d2ac00ddbbbcf898c7646c89aebc06cf83045686e.json b/src/infra/task-system/postgres/.sqlx/query-4ab9fb67546df9ac01da967d2ac00ddbbbcf898c7646c89aebc06cf83045686e.json new file mode 100644 index 0000000000..0a8a19b21d --- /dev/null +++ b/src/infra/task-system/postgres/.sqlx/query-4ab9fb67546df9ac01da967d2ac00ddbbbcf898c7646c89aebc06cf83045686e.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT task_id, event_id, event_payload\n FROM task_events\n WHERE task_id = ANY($1)\n ORDER BY event_id ASC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "task_id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "event_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "event_payload", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "4ab9fb67546df9ac01da967d2ac00ddbbbcf898c7646c89aebc06cf83045686e" +} diff --git a/src/infra/task-system/postgres/src/postgres_task_event_store.rs b/src/infra/task-system/postgres/src/postgres_task_event_store.rs index b09c644fd0..c34a30300e 100644 --- a/src/infra/task-system/postgres/src/postgres_task_event_store.rs +++ b/src/infra/task-system/postgres/src/postgres_task_event_store.rs @@ -184,6 +184,40 @@ impl EventStore for PostgresTaskEventStore { }) } + fn get_events_multi(&self, queries: Vec) -> MultiEventStream { + let task_ids: Vec = queries.iter().map(|id| (*id).try_into().unwrap()).collect(); + + Box::pin(async_stream::stream! { + let mut tr = self.transaction.lock().await; + let connection_mut = tr + .connection_mut() + .await?; + + let mut query_stream = sqlx::query!( + r#" + SELECT task_id, event_id, event_payload + FROM task_events + WHERE task_id = ANY($1) + ORDER BY event_id ASC + "#, + &task_ids, + ).try_map(|event_row| { + let event = serde_json::from_value::(event_row.event_payload) + .map_err(|e| sqlx::Error::Decode(Box::new(e)))?; + + Ok((TaskID::try_from(event_row.task_id).unwrap(), // ids are always > 0 + EventID::new(event_row.event_id), + event)) + }) + .fetch(connection_mut) + .map_err(|e| GetEventsError::Internal(e.int_err())); + + while let Some((task_id, event_id, event)) = query_stream.try_next().await? { + yield Ok((task_id, event_id, event)); + } + }) + } + async fn save_events( &self, task_id: &TaskID,