Skip to content

Commit

Permalink
refactor(routing): have a specific Command::SendMessageDeliveryGroup …
Browse files Browse the repository at this point in the history
…for when the message needs to be sent only to a subset of recipients
  • Loading branch information
bochaco committed Jul 14, 2021
1 parent e98561f commit 6e1f449
Show file tree
Hide file tree
Showing 11 changed files with 18 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/routing/core/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl Core {
let dst_pk = self.section_key_by_name(&target_name);
wire_msg.set_dst_section_pk(dst_pk);

let command = Command::SendMessage {
let command = Command::SendMessageDeliveryGroup {
recipients: targets,
delivery_group_size: dg_size,
wire_msg,
Expand Down
1 change: 0 additions & 1 deletion src/routing/core/bootstrap/relocate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ impl JoiningAsRelocated {

let cmd = Command::SendMessage {
recipients: recipients.to_vec(),
delivery_group_size: recipients.len(),
wire_msg,
};

Expand Down
8 changes: 4 additions & 4 deletions src/routing/core/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::future;
use qp2p::Config;
use std::{net::Ipv4Addr, slice, time::Duration};
use std::{net::Ipv4Addr, time::Duration};
use tokio::{net::UdpSocket, sync::mpsc, time};

const TIMEOUT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -496,7 +496,7 @@ mod tests {
};
let msg0 = WireMsg::new_section_info_msg(&query, dst_location)?;
let _ = send_comm
.send(slice::from_ref(&(name, recv_addr)), 1, msg0.clone())
.send(&[(name, recv_addr)], 1, msg0.clone())
.await?;

let mut msg0_received = false;
Expand All @@ -520,7 +520,7 @@ mod tests {
};
let msg1 = WireMsg::new_section_info_msg(&query, dst_location)?;
let _ = send_comm
.send(slice::from_ref(&(name, recv_addr)), 1, msg1.clone())
.send(&[(name, recv_addr)], 1, msg1.clone())
.await?;

let mut msg1_received = false;
Expand Down Expand Up @@ -548,7 +548,7 @@ mod tests {
// Send a message to establish the connection
let _ = comm1
.send(
slice::from_ref(&(XorName::random(), addr0)),
&[(XorName::random(), addr0)],
1,
new_section_info_message()?,
)
Expand Down
5 changes: 0 additions & 5 deletions src/routing/core/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,10 @@ impl Core {
}

if !others.is_empty() {
let count = others.len();
let dst_section_pk = self.section_key_by_name(&others[0].0);
wire_msg.set_dst_section_pk(dst_section_pk);
commands.push(Command::SendMessage {
recipients: others,
delivery_group_size: count,
wire_msg: wire_msg.clone(),
});
}
Expand Down Expand Up @@ -356,7 +354,6 @@ impl Core {

Ok(Command::SendMessage {
recipients: vec![recipient],
delivery_group_size: 1,
wire_msg,
})
}
Expand All @@ -373,11 +370,9 @@ impl Core {
node_msg,
self.section.authority_provider().section_key(),
)?;
let delivery_group_size = recipients.len();

Ok(Command::SendMessage {
recipients,
delivery_group_size,
wire_msg,
})
}
Expand Down
1 change: 0 additions & 1 deletion src/routing/core/msg_handling/end_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ impl Core {

return Ok(vec![Command::SendMessage {
recipients: vec![(xorname, *addr)],
delivery_group_size: 1,
wire_msg,
}]);
}
Expand Down
1 change: 0 additions & 1 deletion src/routing/core/msg_handling/section_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl Core {
match WireMsg::new_section_info_msg(&response, dst_location) {
Ok(wire_msg) => vec![Command::SendMessage {
recipients: vec![(name, sender)],
delivery_group_size: 1,
wire_msg,
}],
Err(err) => {
Expand Down
4 changes: 0 additions & 4 deletions src/routing/dkg/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,9 @@ impl DkgCommand {
let node_msg = NodeMsg::DkgMessage { dkg_key, message };
let wire_msg =
WireMsg::single_src(node, DstLocation::DirectAndUnrouted(key), node_msg, key)?;
let delivery_group_size = recipients.len();

Ok(Command::SendMessage {
recipients,
delivery_group_size,
wire_msg,
})
}
Expand All @@ -85,11 +83,9 @@ impl DkgCommand {
};
let wire_msg =
WireMsg::single_src(node, DstLocation::DirectAndUnrouted(key), node_msg, key)?;
let delivery_group_size = recipients.len();

Ok(Command::SendMessage {
recipients,
delivery_group_size,
wire_msg,
})
}
Expand Down
7 changes: 6 additions & 1 deletion src/routing/routing_api/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ pub(crate) enum Command {
},
/// Handle a DKG failure that was observed by a majority of the DKG participants.
HandleDkgFailure(DkgFailureSigSet),
/// Send a message to `delivery_group_size` peers out of the given `recipients`.
/// Send a message to the given `recipients`.
SendMessage {
recipients: Vec<(XorName, SocketAddr)>,
wire_msg: WireMsg,
},
/// Send a message to `delivery_group_size` peers out of the given `recipients`.
SendMessageDeliveryGroup {
recipients: Vec<(XorName, SocketAddr)>,
delivery_group_size: usize,
wire_msg: WireMsg,
Expand Down
8 changes: 7 additions & 1 deletion src/routing/routing_api/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ impl Dispatcher {
.handle_dkg_failure(signeds)
.map(|command| vec![command]),
Command::SendMessage {
// send to network from routing layer
recipients,
wire_msg,
} => {
self.send_message(&recipients, recipients.len(), wire_msg)
.await
}
Command::SendMessageDeliveryGroup {
recipients,
delivery_group_size,
wire_msg,
Expand Down
1 change: 0 additions & 1 deletion src/routing/routing_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ impl Routing {

let command = Command::SendMessage {
recipients,
delivery_group_size: 1,
wire_msg,
};
return self.dispatcher.clone().handle_commands(command).await;
Expand Down
1 change: 0 additions & 1 deletion src/routing/routing_api/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,6 @@ async fn message_to_self(dst: MessageDst) -> Result<()> {
let commands = dispatcher
.handle_command(Command::SendMessage {
recipients: vec![(node.name(), node.addr)],
delivery_group_size: 1,
wire_msg,
})
.await?;
Expand Down

0 comments on commit 6e1f449

Please sign in to comment.