Skip to content

Commit

Permalink
feat(anna): Adding ack/nacks to gossip protocol. (#1465)
Browse files Browse the repository at this point in the history
This ensures that an infected node continues to infect other nodes until
it loses "interest" (because the nodes it contacted are already
infected).
  • Loading branch information
rohitkulshreshtha authored Sep 24, 2024
1 parent 46116e5 commit 789ed17
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 34 deletions.
11 changes: 9 additions & 2 deletions datastores/gossip_kv/protocol/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,21 @@ pub enum GossipMessage {
/// An "infecting message" to share updates with a peer.
Gossip {
message_id: String,
member_id: String,
writes: Namespaces<Clock>,
},
/// An acknowledgement message sent by a peer in response to a Gossip message, to indicate
/// that it hasn't seen some of the writes in the Gossip message before.
Ack { message_id: String },
Ack {
message_id: String,
member_id: String,
},
/// A negative acknowledgement sent by a peer in response to a Gossip message, to indicate
/// that it has seen all of the writes in the Gossip message before.
Nack { message_id: String },
Nack {
message_id: String,
member_id: String,
},
}

#[cfg(test)]
Expand Down
99 changes: 72 additions & 27 deletions datastores/gossip_kv/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::hash::Hash;

use gossip_protocol::membership::{MemberData, MemberId};
use gossip_protocol::model::{
delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableName,
delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName,
};
use gossip_protocol::GossipMessage::{Ack, Nack};
use gossip_protocol::{ClientRequest, ClientResponse, GossipMessage, Key, Namespace};
use hydroflow::futures::{Sink, Stream};
use hydroflow::hydroflow_syntax;
Expand All @@ -15,7 +16,7 @@ use hydroflow::lattices::set_union::SetUnionHashSet;
use hydroflow::lattices::{Lattice, PairBimorphism};
use hydroflow::scheduled::graph::Hydroflow;
use lattices::set_union::SetUnion;
use lattices::{IsTop, Pair};
use lattices::{IsTop, Max, Pair};
use rand::seq::IteratorRandom;
use rand::thread_rng;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -73,14 +74,19 @@ where
E: Debug + 'static,
{
let my_member_id = member_info.id.clone();
// TODO: This is ugly, but the only way this works at the moment.
let member_id_2 = my_member_id.clone();
let member_id_3 = my_member_id.clone();
let member_id_4 = my_member_id.clone();
let member_id_5 = my_member_id.clone();

hydroflow_syntax! {

on_start = initialize() -> tee();
on_start -> for_each(|_| info!("{:?}: Transducer started.", context.current_tick()));

// Setup member metadata for this process.
on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), member_info.id.clone(), serde_json::to_string(&member_info).unwrap()))
on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap()))
-> writes;

client_out =
Expand Down Expand Up @@ -119,35 +125,88 @@ where
-> map(|(msg, addr)| GossipRequestWithAddress::from_request_and_address(msg, addr))
-> demux_enum::<GossipRequestWithAddress<Addr>>();

gossip_in[Gossip]
incoming_gossip_messages = gossip_in[Gossip]
-> inspect(|request| trace!("{:?}: Received gossip request: {:?}.", context.current_tick(), request))
-> map(|(_msg_id, writes, _addr)| writes )
-> writes;
-> tee();

gossip_in[Ack]
-> inspect(|request| trace!("{:?}: Received gossip ack: {:?}.", context.current_tick(), request))
-> null();

gossip_in[Nack]
-> inspect(|request| trace!("{:?}: Received gossip nack: {:?}.", context.current_tick(), request))
-> null();
-> map( |(message_id, member_id, _addr)| {
MapUnionSingletonMap::new_from((message_id, InfectingWrite { write: Default::default(), members: BoundedSetLattice::new_from([member_id]) }))
})
-> infecting_writes;

gossip_out = union() -> dest_sink(gossip_outputs);

incoming_gossip_messages
-> map(|(_msg_id, _member_id, writes, _addr)| writes )
-> writes;

gossip_processing_pipeline = incoming_gossip_messages
-> map(|(msg_id, _member_id, writes, sender_address) : (String, MemberId, Namespaces<Max<u64>>, Addr)| {
let namespaces = &#namespaces;
let all_data: &HashMap<Namespace, TableMap<RowValue<Clock>>> = namespaces.as_reveal_ref();
let possible_new_data: &HashMap<Namespace, TableMap<RowValue<Max<u64>>>>= writes.as_reveal_ref();

// Check if any of the data is new
/* TODO: This logic is duplicated in MapUnion::Merge and ideally should be accessed
from the pass-through streaming output from `state`. See
https://www.notion.so/hydro-project/Proposal-for-State-API-10a2a586262f8080b981d1a2948a69ac
for more. */
let gossip_has_new_data = possible_new_data.iter()
.flat_map(|(namespace, tables)| {
tables.as_reveal_ref().iter().flat_map(move |(table, rows)|{
rows.as_reveal_ref().iter().map(move |(row_key, row_value)| (namespace, table, row_key, row_value.as_reveal_ref().0.as_reveal_ref()))
})
})
.any(|(ns,table, row_key, new_ts)| {
let existing_tables = all_data.get(ns);
let existing_rows = existing_tables.and_then(|tables| tables.as_reveal_ref().get(table));
let existing_row = existing_rows.and_then(|rows| rows.as_reveal_ref().get(row_key));
let existing_ts = existing_row.map(|row| row.as_reveal_ref().0.as_reveal_ref());

if let Some(existing_ts) = existing_ts {
trace!("Comparing timestamps: {:?} vs {:?}", new_ts, existing_ts);
new_ts > existing_ts
} else {
true
}
});

gossip_out = dest_sink(gossip_outputs);
if gossip_has_new_data {
(Ack { message_id: msg_id, member_id: member_id_2.clone()}, sender_address, Some(writes))
} else {
(Nack { message_id: msg_id, member_id: member_id_3.clone()}, sender_address, None)
}
})
-> tee();

gossip_processing_pipeline
-> map(|(response, address, _writes)| (response, address))
-> inspect( |(msg, addr)| trace!("{:?}: Sending gossip response: {:?} to {:?}.", context.current_tick(), msg, addr))
-> gossip_out;

gossip_processing_pipeline
-> filter(|(_, _, writes)| writes.is_some())
-> map(|(_, _, writes)| writes.unwrap())
-> writes;

writes = union();

writes -> namespaces;
// writes -> new_writes;

namespaces = state::<'static, Namespaces::<Clock>>();
new_writes = namespaces -> tee();
new_writes = namespaces -> tee(); // TODO: Use the output from here to generate NACKs / ACKs

reads = state::<'tick, MapUnionHashMap<Namespace, MapUnionHashMap<TableName, MapUnionHashMap<RowKey, SetUnionHashSet<Addr>>>>>();

new_writes -> [0]process_system_table_reads;
reads -> [1]process_system_table_reads;


process_system_table_reads = lattice_bimorphism(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(PairBimorphism))), #namespaces, #reads)
-> lattice_reduce::<'tick>() // TODO: This can be removed if we fix https://github.com/hydro-project/hydroflow/issues/1401. Otherwise the result can be returned twice if get & gossip arrive in the same tick.
-> flat_map(|result: NamespaceMap<Pair<RowValue<Clock>, SetUnion<HashSet<Addr>>>>| {
Expand Down Expand Up @@ -202,9 +261,7 @@ where
#infecting_writes.as_reveal_ref().clone()
}
)
-> inspect(|x| trace!("Before: {:?}", x))
-> filter(|(_id, infecting_write)| !infecting_write.members.is_top())
-> inspect(|x| trace!("After: {:?}", x))
-> map(|(id, infecting_write)| {
trace!("{:?}: Choosing a peer to gossip to. {:?}:{:?}", context.current_tick(), id, infecting_write);
let peers = #namespaces.as_reveal_ref().get(&Namespace::System).unwrap().as_reveal_ref().get("members").unwrap().as_reveal_ref().clone();
Expand All @@ -219,7 +276,7 @@ where
});

// Exclude self from the list of peers.
peer_names.remove(&my_member_id);
peer_names.remove(&member_id_5);

trace!("{:?}: Peers: {:?}", context.current_tick(), peer_names);

Expand All @@ -237,26 +294,15 @@ where
(id, infecting_write, gossip_address)
})
-> inspect(|(message_id, infecting_write, peer_gossip_address)| trace!("{:?}: Sending write:\nMessageId:{:?}\nWrite:{:?}\nPeer Address:{:?}", context.current_tick(), message_id, infecting_write, peer_gossip_address))
-> tee();

gossip_messages
-> map(|(message_id, infecting_write, peer_gossip_address): (String, InfectingWrite, Addr)| {
let gossip_request = GossipMessage::Gossip {
message_id: message_id.clone(),
member_id: member_id_4.clone(),
writes: infecting_write.write.clone(),
};
(gossip_request, peer_gossip_address)
})
-> gossip_out;

gossip_messages
-> map(|(message_id, write, _peer_gossip_address) : (String, InfectingWrite, Addr)| {
let dummy_peer_id = uuid::Uuid::new_v4().to_string();
trace!("{:?}: Infecting write: {:?}", context.current_tick(), dummy_peer_id);
MapUnionSingletonMap::new_from((message_id, InfectingWrite { write: write.write, members: BoundedSetLattice::new_from([dummy_peer_id]) }))
})
//-> defer_tick()
-> infecting_writes;
}
}

Expand All @@ -265,7 +311,6 @@ mod tests {
use std::collections::HashSet;

use gossip_protocol::membership::{MemberDataBuilder, Protocol};
use hydroflow::tokio;
use hydroflow::util::simulation::{Address, Fleet, Hostname};

use super::*;
Expand Down
38 changes: 33 additions & 5 deletions datastores/gossip_kv/server/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,55 @@ pub enum GossipRequestWithAddress<A> {
/// A gossip request with the message id, writes and the address of the client.
Gossip {
message_id: String,
member_id: String,
writes: Namespaces<Clock>,
addr: A,
},
/// An ack request with the message id and the address of the client.
Ack { message_id: String, addr: A },
Ack {
message_id: String,
member_id: String,
addr: A,
},
/// A nack request with the message id and the address of the client.
Nack { message_id: String, addr: A },
Nack {
message_id: String,
member_id: String,
addr: A,
},
}

impl<A> GossipRequestWithAddress<A> {
/// Create a `GossipRequestWithAddress` from a `GossipMessage` and an address.
pub fn from_request_and_address(request: GossipMessage, addr: A) -> Self {
match request {
GossipMessage::Gossip { message_id, writes } => Self::Gossip {
GossipMessage::Gossip {
message_id,
member_id,
writes,
} => Self::Gossip {
message_id,
member_id,
writes,
addr,
},

GossipMessage::Ack { message_id } => Self::Ack { message_id, addr },
GossipMessage::Nack { message_id } => Self::Nack { message_id, addr },
GossipMessage::Ack {
message_id,
member_id,
} => Self::Ack {
message_id,
addr,
member_id,
},
GossipMessage::Nack {
message_id,
member_id,
} => Self::Nack {
message_id,
addr,
member_id,
},
}
}
}

0 comments on commit 789ed17

Please sign in to comment.