Skip to content

Commit

Permalink
Makes it possible to register batching listeners
Browse files Browse the repository at this point in the history
Upon reset_node and apply_update, we want to be
able to react to a batch of changes rather than individual changes.

In quickwit for instance, we want to be able to react to the reception of a batch of deleted shard and group this reaction into a single metastore call.
  • Loading branch information
fulmicoton committed May 2, 2024
1 parent 2e63747 commit f09f842
Show file tree
Hide file tree
Showing 6 changed files with 950 additions and 492 deletions.
6 changes: 2 additions & 4 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ impl Api {
#[oai(path = "/set_kv/", method = "get")]
async fn set_kv(&self, key: Query<String>, value: Query<String>) -> Json<serde_json::Value> {
let mut chitchat_guard = self.chitchat.lock().await;

let cc_state = chitchat_guard.self_node_state();
let mut cc_state = chitchat_guard.self_node_state();
cc_state.set(key.as_str(), value.as_str());

Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
Expand All @@ -48,8 +47,7 @@ impl Api {
#[oai(path = "/mark_for_deletion/", method = "get")]
async fn mark_for_deletion(&self, key: Query<String>) -> Json<serde_json::Value> {
let mut chitchat_guard = self.chitchat.lock().await;

let cc_state = chitchat_guard.self_node_state();
let mut cc_state = chitchat_guard.self_node_state();
cc_state.delete(key.as_str());
Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
}
Expand Down
24 changes: 24 additions & 0 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,30 @@ impl NodeDelta {
}
}

impl NodeDelta {
pub(crate) fn applicable_key_value_mutations(
self,
max_version: u64,
last_gc_version: u64,
) -> impl Iterator<Item = KeyValueMutation> {
self.key_values
.into_iter()
.filter(move |key_value_mutation| {
if key_value_mutation.version <= max_version {
// We already know about this KV.
return false;
}
if key_value_mutation.status.scheduled_for_deletion() {
// This KV has already been GCed.
if key_value_mutation.version <= last_gc_version {
return false;
}
}
true
})
}
}

#[derive(Default)]
struct DeltaBuilder {
existing_nodes: HashSet<ChitchatId>,
Expand Down
Loading

0 comments on commit f09f842

Please sign in to comment.