Skip to content

Commit

Permalink
Store method to get all un-indexed operation ids
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Jun 14, 2024
1 parent 6da24b0 commit 41981ea
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions aquadoggo/src/db/stores/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,30 @@ fn group_and_parse_operation_rows(
}

impl SqlStore {
/// Returns ids of operations which have not been processed by `reduce` task yet.
pub async fn get_unindexed_operation_ids(
&self,
) -> Result<Vec<OperationId>, OperationStorageError> {
let id_rows: Vec<String> = query_scalar(
"
SELECT
operations_v1.operation_id
FROM
operations_v1
WHERE
operations_v1.sorted_index IS NULL
",
)
.fetch_all(&self.pool)
.await
.map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?;

Ok(id_rows
.iter()
.map(|id| id.parse().expect("invalid operation id in database"))
.collect())
}

/// Update the sorted index of an operation. This method is used in `reduce` tasks as each
/// operation is processed.
pub async fn update_operation_index(
Expand Down

0 comments on commit 41981ea

Please sign in to comment.