diff --git a/datastores/gossip_kv/protocol/lib.rs b/datastores/gossip_kv/protocol/lib.rs index d06a688e354..be725878a24 100644 --- a/datastores/gossip_kv/protocol/lib.rs +++ b/datastores/gossip_kv/protocol/lib.rs @@ -154,14 +154,21 @@ pub enum GossipMessage { /// An "infecting message" to share updates with a peer. Gossip { message_id: String, + member_id: String, writes: Namespaces, }, /// An acknowledgement message sent by a peer in response to a Gossip message, to indicate /// that it hasn't seen some of the writes in the Gossip message before. - Ack { message_id: String }, + Ack { + message_id: String, + member_id: String, + }, /// A negative acknowledgement sent by a peer in response to a Gossip message, to indicate /// that it has seen all of the writes in the Gossip message before. - Nack { message_id: String }, + Nack { + message_id: String, + member_id: String, + }, } #[cfg(test)] diff --git a/datastores/gossip_kv/server/server.rs b/datastores/gossip_kv/server/server.rs index bdd8047fe61..6d890fc3b1e 100644 --- a/datastores/gossip_kv/server/server.rs +++ b/datastores/gossip_kv/server/server.rs @@ -4,8 +4,9 @@ use std::hash::Hash; use gossip_protocol::membership::{MemberData, MemberId}; use gossip_protocol::model::{ - delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableName, + delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName, }; +use gossip_protocol::GossipMessage::{Ack, Nack}; use gossip_protocol::{ClientRequest, ClientResponse, GossipMessage, Key, Namespace}; use hydroflow::futures::{Sink, Stream}; use hydroflow::hydroflow_syntax; @@ -15,7 +16,7 @@ use hydroflow::lattices::set_union::SetUnionHashSet; use hydroflow::lattices::{Lattice, PairBimorphism}; use hydroflow::scheduled::graph::Hydroflow; use lattices::set_union::SetUnion; -use lattices::{IsTop, Pair}; +use lattices::{IsTop, Max, Pair}; use rand::seq::IteratorRandom; use rand::thread_rng; use serde::de::DeserializeOwned; @@ -73,6 +74,11 @@ where E: Debug + 'static, { let my_member_id = member_info.id.clone(); + // TODO: This is ugly, but the only way this works at the moment. + let member_id_2 = my_member_id.clone(); + let member_id_3 = my_member_id.clone(); + let member_id_4 = my_member_id.clone(); + let member_id_5 = my_member_id.clone(); hydroflow_syntax! { @@ -80,7 +86,7 @@ where on_start -> for_each(|_| info!("{:?}: Transducer started.", context.current_tick())); // Setup member metadata for this process. - on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), member_info.id.clone(), serde_json::to_string(&member_info).unwrap())) + on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap())) -> writes; client_out = @@ -119,10 +125,9 @@ where -> map(|(msg, addr)| GossipRequestWithAddress::from_request_and_address(msg, addr)) -> demux_enum::>(); - gossip_in[Gossip] + incoming_gossip_messages = gossip_in[Gossip] -> inspect(|request| trace!("{:?}: Received gossip request: {:?}.", context.current_tick(), request)) - -> map(|(_msg_id, writes, _addr)| writes ) - -> writes; + -> tee(); gossip_in[Ack] -> inspect(|request| trace!("{:?}: Received gossip ack: {:?}.", context.current_tick(), request)) @@ -130,24 +135,78 @@ where gossip_in[Nack] -> inspect(|request| trace!("{:?}: Received gossip nack: {:?}.", context.current_tick(), request)) - -> null(); + -> map( |(message_id, member_id, _addr)| { + MapUnionSingletonMap::new_from((message_id, InfectingWrite { write: Default::default(), members: BoundedSetLattice::new_from([member_id]) })) + }) + -> infecting_writes; + + gossip_out = union() -> dest_sink(gossip_outputs); + + incoming_gossip_messages + -> map(|(_msg_id, _member_id, writes, _addr)| writes ) + -> writes; + + gossip_processing_pipeline = incoming_gossip_messages + -> map(|(msg_id, _member_id, writes, sender_address) : (String, MemberId, Namespaces>, Addr)| { + let namespaces = &#namespaces; + let all_data: &HashMap>> = namespaces.as_reveal_ref(); + let possible_new_data: &HashMap>>>= writes.as_reveal_ref(); + + // Check if any of the data is new + /* TODO: This logic is duplicated in MapUnion::Merge and ideally should be accessed + from the pass-through streaming output from `state`. See + https://www.notion.so/hydro-project/Proposal-for-State-API-10a2a586262f8080b981d1a2948a69ac + for more. */ + let gossip_has_new_data = possible_new_data.iter() + .flat_map(|(namespace, tables)| { + tables.as_reveal_ref().iter().flat_map(move |(table, rows)|{ + rows.as_reveal_ref().iter().map(move |(row_key, row_value)| (namespace, table, row_key, row_value.as_reveal_ref().0.as_reveal_ref())) + }) + }) + .any(|(ns,table, row_key, new_ts)| { + let existing_tables = all_data.get(ns); + let existing_rows = existing_tables.and_then(|tables| tables.as_reveal_ref().get(table)); + let existing_row = existing_rows.and_then(|rows| rows.as_reveal_ref().get(row_key)); + let existing_ts = existing_row.map(|row| row.as_reveal_ref().0.as_reveal_ref()); + + if let Some(existing_ts) = existing_ts { + trace!("Comparing timestamps: {:?} vs {:?}", new_ts, existing_ts); + new_ts > existing_ts + } else { + true + } + }); - gossip_out = dest_sink(gossip_outputs); + if gossip_has_new_data { + (Ack { message_id: msg_id, member_id: member_id_2.clone()}, sender_address, Some(writes)) + } else { + (Nack { message_id: msg_id, member_id: member_id_3.clone()}, sender_address, None) + } + }) + -> tee(); + + gossip_processing_pipeline + -> map(|(response, address, _writes)| (response, address)) + -> inspect( |(msg, addr)| trace!("{:?}: Sending gossip response: {:?} to {:?}.", context.current_tick(), msg, addr)) + -> gossip_out; + + gossip_processing_pipeline + -> filter(|(_, _, writes)| writes.is_some()) + -> map(|(_, _, writes)| writes.unwrap()) + -> writes; writes = union(); writes -> namespaces; - // writes -> new_writes; namespaces = state::<'static, Namespaces::>(); - new_writes = namespaces -> tee(); + new_writes = namespaces -> tee(); // TODO: Use the output from here to generate NACKs / ACKs reads = state::<'tick, MapUnionHashMap>>>>(); new_writes -> [0]process_system_table_reads; reads -> [1]process_system_table_reads; - process_system_table_reads = lattice_bimorphism(KeyedBimorphism::, _>::new(KeyedBimorphism::, _>::new(KeyedBimorphism::, _>::new(PairBimorphism))), #namespaces, #reads) -> lattice_reduce::<'tick>() // TODO: This can be removed if we fix https://github.com/hydro-project/hydroflow/issues/1401. Otherwise the result can be returned twice if get & gossip arrive in the same tick. -> flat_map(|result: NamespaceMap, SetUnion>>>| { @@ -202,9 +261,7 @@ where #infecting_writes.as_reveal_ref().clone() } ) - -> inspect(|x| trace!("Before: {:?}", x)) -> filter(|(_id, infecting_write)| !infecting_write.members.is_top()) - -> inspect(|x| trace!("After: {:?}", x)) -> map(|(id, infecting_write)| { trace!("{:?}: Choosing a peer to gossip to. {:?}:{:?}", context.current_tick(), id, infecting_write); let peers = #namespaces.as_reveal_ref().get(&Namespace::System).unwrap().as_reveal_ref().get("members").unwrap().as_reveal_ref().clone(); @@ -219,7 +276,7 @@ where }); // Exclude self from the list of peers. - peer_names.remove(&my_member_id); + peer_names.remove(&member_id_5); trace!("{:?}: Peers: {:?}", context.current_tick(), peer_names); @@ -237,26 +294,15 @@ where (id, infecting_write, gossip_address) }) -> inspect(|(message_id, infecting_write, peer_gossip_address)| trace!("{:?}: Sending write:\nMessageId:{:?}\nWrite:{:?}\nPeer Address:{:?}", context.current_tick(), message_id, infecting_write, peer_gossip_address)) - -> tee(); - - gossip_messages -> map(|(message_id, infecting_write, peer_gossip_address): (String, InfectingWrite, Addr)| { let gossip_request = GossipMessage::Gossip { message_id: message_id.clone(), + member_id: member_id_4.clone(), writes: infecting_write.write.clone(), }; (gossip_request, peer_gossip_address) }) -> gossip_out; - - gossip_messages - -> map(|(message_id, write, _peer_gossip_address) : (String, InfectingWrite, Addr)| { - let dummy_peer_id = uuid::Uuid::new_v4().to_string(); - trace!("{:?}: Infecting write: {:?}", context.current_tick(), dummy_peer_id); - MapUnionSingletonMap::new_from((message_id, InfectingWrite { write: write.write, members: BoundedSetLattice::new_from([dummy_peer_id]) })) - }) - //-> defer_tick() - -> infecting_writes; } } @@ -265,7 +311,6 @@ mod tests { use std::collections::HashSet; use gossip_protocol::membership::{MemberDataBuilder, Protocol}; - use hydroflow::tokio; use hydroflow::util::simulation::{Address, Fleet, Hostname}; use super::*; diff --git a/datastores/gossip_kv/server/util.rs b/datastores/gossip_kv/server/util.rs index 04787534b62..59397027254 100644 --- a/datastores/gossip_kv/server/util.rs +++ b/datastores/gossip_kv/server/util.rs @@ -32,27 +32,55 @@ pub enum GossipRequestWithAddress { /// A gossip request with the message id, writes and the address of the client. Gossip { message_id: String, + member_id: String, writes: Namespaces, addr: A, }, /// An ack request with the message id and the address of the client. - Ack { message_id: String, addr: A }, + Ack { + message_id: String, + member_id: String, + addr: A, + }, /// A nack request with the message id and the address of the client. - Nack { message_id: String, addr: A }, + Nack { + message_id: String, + member_id: String, + addr: A, + }, } impl GossipRequestWithAddress { /// Create a `GossipRequestWithAddress` from a `GossipMessage` and an address. pub fn from_request_and_address(request: GossipMessage, addr: A) -> Self { match request { - GossipMessage::Gossip { message_id, writes } => Self::Gossip { + GossipMessage::Gossip { + message_id, + member_id, + writes, + } => Self::Gossip { message_id, + member_id, writes, addr, }, - GossipMessage::Ack { message_id } => Self::Ack { message_id, addr }, - GossipMessage::Nack { message_id } => Self::Nack { message_id, addr }, + GossipMessage::Ack { + message_id, + member_id, + } => Self::Ack { + message_id, + addr, + member_id, + }, + GossipMessage::Nack { + message_id, + member_id, + } => Self::Nack { + message_id, + addr, + member_id, + }, } } }