Skip to content

Commit

Permalink
Flows: improve sorting of listed flows
Browse files Browse the repository at this point in the history
  • Loading branch information
demusdev committed Dec 30, 2024
1 parent e16677d commit ccedcd4
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 49 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Changed
- GraphQL: accountListFlows returns list sorted by status and last event time

## [0.215.1] - 2024-12-30
### Fixed
- GraphQL: in a multi-tenant workspace, `datasets.createEmpty` and `datasets.createFromSnapshot` mutations now return dataset aliases prefixed with account name.
Expand Down
58 changes: 47 additions & 11 deletions src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,25 +548,61 @@ impl FlowEventStore for InMemoryFlowEventStore {
let flow_ids_page: Vec<_> = {
let state = self.inner.as_state();
let g = state.lock().unwrap();
let mut result: Vec<Result<FlowID, _>> = vec![];
let mut total_count = 0;
for flow_id in g.all_flows.iter().rev() {

// Collect FlowID -> Most recent event time, for sorting purposes
let recent_events: HashMap<FlowID, DateTime<Utc>> = g.events.iter().fold(
HashMap::new(),
|mut acc: HashMap<FlowID, DateTime<Utc>>, i: &FlowEvent| {
let key = i.flow_id();
match acc.get(&key) {
Some(ref current) if current.ge(&&i.event_time()) => {}
_ => {
acc.insert(key, i.event_time());
}
};
acc
},
);

// Split events by type
let mut waiting_flows: Vec<_> = vec![];
let mut running_flows: Vec<_> = vec![];
let mut finished_flows: Vec<_> = vec![];
for flow_id in &g.all_flows {
// Also also apply given filters on this stage in order to reduce amount of
// items to process in further steps
let flow_key = g.flow_key_by_flow_id.get(flow_id).unwrap();
if let FlowKey::Dataset(flow_key_dataset) = flow_key {
if dataset_ids.contains(&flow_key_dataset.dataset_id)
&& g.matches_dataset_flow(*flow_id, filters)
{
if result.len() >= pagination.limit {
break;
}
if total_count >= pagination.offset {
result.push(Ok(*flow_id));
if let Some(flow) = g.flow_search_index.get(flow_id) {
let item = (flow_id, recent_events.get(flow_id));
match flow.flow_status {
FlowStatus::Waiting => waiting_flows.push(item),
FlowStatus::Running => running_flows.push(item),
FlowStatus::Finished => finished_flows.push(item),
}
}
total_count += 1;
}
};
}
}
result
// Sort every group separately
waiting_flows.sort_by(|a, b| b.cmp(a));
running_flows.sort_by(|a, b| b.cmp(a));
finished_flows.sort_by(|a, b| b.cmp(a));

let mut ordered_flows = vec![];
ordered_flows.append(&mut waiting_flows);
ordered_flows.append(&mut running_flows);
ordered_flows.append(&mut finished_flows);

ordered_flows
.iter()
.skip(pagination.offset)
.take(pagination.limit)
.map(|(flow_id, _)| Ok(**flow_id))
.collect()
};

Box::pin(futures::stream::iter(flow_ids_page))
Expand Down
25 changes: 19 additions & 6 deletions src/infra/flow-system/postgres/src/postgres_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,25 @@ impl FlowEventStore for PostgresFlowEventStore {

let mut query_stream = sqlx::query!(

Check failure on line 679 in src/infra/flow-system/postgres/src/postgres_flow_event_store.rs

View workflow job for this annotation

GitHub Actions / Lint / Code

`SQLX_OFFLINE=true` but there is no cached data for this query, run `cargo sqlx prepare` to update the query cache or unset `SQLX_OFFLINE`
r#"
SELECT flow_id FROM flows
WHERE dataset_id = ANY($1)
AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)
AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)
AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))
ORDER BY flow_id DESC
WITH unsorted_flows AS
(SELECT
f.flow_id,
f.flow_status,
MAX(e.event_time) as last_event_time,
(CASE
WHEN f.flow_status = 'waiting' THEN 1
WHEN f.flow_status = 'running' THEN 2
ELSE 3 END) AS ord_status
FROM flows f
LEFT JOIN flow_events e USING(flow_id)
WHERE
f.dataset_id = ANY($1)
AND (cast($2 as dataset_flow_type) IS NULL OR f.dataset_flow_type = $2)
AND (cast($3 as flow_status_type) IS NULL OR f.flow_status = $3)
AND (cast($4 as TEXT[]) IS NULL OR f.initiator = ANY($4))
GROUP BY f.flow_id, f.flow_status)
SELECT flow_id FROM unsorted_flows
ORDER BY ord_status, last_event_time DESC
LIMIT $5 OFFSET $6
"#,
dataset_ids as Vec<String>,
Expand Down
75 changes: 45 additions & 30 deletions src/infra/flow-system/repo-tests/src/test_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,29 +362,29 @@ pub async fn test_dataset_flow_filter_by_datasets(catalog: &Catalog) {
(
vec![foo_cases.dataset_id.clone()],
vec![
foo_cases.compaction_flow_ids.flow_id_finished,
foo_cases.compaction_flow_ids.flow_id_running,
foo_cases.compaction_flow_ids.flow_id_waiting,
foo_cases.ingest_flow_ids.flow_id_finished,
foo_cases.ingest_flow_ids.flow_id_running,
foo_cases.ingest_flow_ids.flow_id_waiting,
foo_cases.compaction_flow_ids.flow_id_running,
foo_cases.ingest_flow_ids.flow_id_running,
foo_cases.compaction_flow_ids.flow_id_finished,
foo_cases.ingest_flow_ids.flow_id_finished,
],
),
(
vec![foo_cases.dataset_id.clone(), bar_cases.dataset_id.clone()],
vec![
bar_cases.compaction_flow_ids.flow_id_finished,
bar_cases.compaction_flow_ids.flow_id_running,
bar_cases.compaction_flow_ids.flow_id_waiting,
bar_cases.ingest_flow_ids.flow_id_finished,
bar_cases.ingest_flow_ids.flow_id_running,
bar_cases.ingest_flow_ids.flow_id_waiting,
foo_cases.compaction_flow_ids.flow_id_finished,
foo_cases.compaction_flow_ids.flow_id_running,
foo_cases.compaction_flow_ids.flow_id_waiting,
foo_cases.ingest_flow_ids.flow_id_finished,
foo_cases.ingest_flow_ids.flow_id_running,
foo_cases.ingest_flow_ids.flow_id_waiting,
bar_cases.compaction_flow_ids.flow_id_running,
bar_cases.ingest_flow_ids.flow_id_running,
foo_cases.compaction_flow_ids.flow_id_running,
foo_cases.ingest_flow_ids.flow_id_running,
bar_cases.compaction_flow_ids.flow_id_finished,
bar_cases.ingest_flow_ids.flow_id_finished,
foo_cases.compaction_flow_ids.flow_id_finished,
foo_cases.ingest_flow_ids.flow_id_finished,
],
),
(vec![DatasetID::new_seeded_ed25519(b"wrong")], vec![]),
Expand Down Expand Up @@ -461,13 +461,26 @@ pub async fn test_dataset_flow_filter_by_datasets_with_pagination(catalog: &Cata
let bar_cases = make_dataset_test_case(flow_event_store.clone()).await;
make_system_test_case(flow_event_store.clone()).await;

// Expected order:
// bar compact waiting
// bar ingest waiting <- (foo+bar) offset: 1
// foo compact waiting
// foo ingest waiting
// bar compact running
// bar ingest running
// foo compact running <- (foo) offset: 2
// foo ingest running <- (foo+bar) offset: 1, limit: 7
// bar compact finished
// bar ingest finished
// foo compact finished <- (foo) offset: 2, limit: 3
// foo ingest finished
let cases = vec![
(
vec![foo_cases.dataset_id.clone()],
vec![
foo_cases.compaction_flow_ids.flow_id_waiting,
foo_cases.ingest_flow_ids.flow_id_finished,
foo_cases.compaction_flow_ids.flow_id_running,
foo_cases.ingest_flow_ids.flow_id_running,
foo_cases.compaction_flow_ids.flow_id_finished,
],
PaginationOpts {
offset: 2,
Expand All @@ -477,13 +490,13 @@ pub async fn test_dataset_flow_filter_by_datasets_with_pagination(catalog: &Cata
(
vec![foo_cases.dataset_id.clone(), bar_cases.dataset_id.clone()],
vec![
bar_cases.ingest_flow_ids.flow_id_waiting,
foo_cases.compaction_flow_ids.flow_id_waiting,
foo_cases.ingest_flow_ids.flow_id_waiting,
bar_cases.compaction_flow_ids.flow_id_running,
bar_cases.compaction_flow_ids.flow_id_waiting,
bar_cases.ingest_flow_ids.flow_id_finished,
bar_cases.ingest_flow_ids.flow_id_running,
bar_cases.ingest_flow_ids.flow_id_waiting,
foo_cases.compaction_flow_ids.flow_id_finished,
foo_cases.compaction_flow_ids.flow_id_running,
foo_cases.ingest_flow_ids.flow_id_running,
],
PaginationOpts {
offset: 1,
Expand Down Expand Up @@ -2236,21 +2249,23 @@ struct TestFlowIDs {

async fn make_dataset_test_case(flow_event_store: Arc<dyn FlowEventStore>) -> DatasetTestCase {
let (_, dataset_id) = DatasetID::new_generated_ed25519();
let ingest_flow_ids = make_dataset_test_flows(
&dataset_id,
DatasetFlowType::Ingest,
flow_event_store.clone(),
)
.await;
let compaction_flow_ids = make_dataset_test_flows(
&dataset_id,
DatasetFlowType::HardCompaction,
flow_event_store,
)
.await;

DatasetTestCase {
dataset_id: dataset_id.clone(),
ingest_flow_ids: make_dataset_test_flows(
&dataset_id,
DatasetFlowType::Ingest,
flow_event_store.clone(),
)
.await,
compaction_flow_ids: make_dataset_test_flows(
&dataset_id,
DatasetFlowType::HardCompaction,
flow_event_store,
)
.await,
ingest_flow_ids,
compaction_flow_ids,
}
}

Expand Down
16 changes: 14 additions & 2 deletions src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,24 @@ impl FlowEventStore for SqliteFlowEventStore {

let query_str = format!(
r#"
SELECT flow_id FROM flows
WITH unsorted_flows AS
(SELECT
f.flow_id,
f.flow_status,
MAX(e.event_time) as last_event_time,
(CASE
WHEN f.flow_status = 'waiting' THEN 1
WHEN f.flow_status = 'running' THEN 2
ELSE 3 END) AS ord_status
FROM flows f
LEFT JOIN flow_events e USING(flow_id)
WHERE dataset_id in ({})
AND (cast($1 as dataset_flow_type) IS NULL OR dataset_flow_type = $1)
AND (cast($2 as flow_status_type) IS NULL OR flow_status = $2)
AND ($3 = 0 OR initiator in ({}))
ORDER BY flow_id DESC
GROUP BY f.flow_id, f.flow_status)
SELECT flow_id FROM unsorted_flows
ORDER BY ord_status, last_event_time DESC
LIMIT $4 OFFSET $5
"#,
sqlite_generate_placeholders_list(dataset_ids.len(), 6),
Expand Down

0 comments on commit ccedcd4

Please sign in to comment.