Skip to content

Commit

Permalink
feat: work on multiplexed layer specific to our networking stack
Browse files Browse the repository at this point in the history
  • Loading branch information
tbraun96 committed Nov 6, 2024
1 parent f6c1656 commit 3f5aea3
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 5 deletions.
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ testcontainers = { version = "0.20.1" }

# Symbiotic
symbiotic-rs = { version = "0.1.0" }
dashmap = "6.1.0"
bincode2 = "2.0.1"

# Config for 'cargo dist'
[workspace.metadata.dist]
Expand Down
Submodule forge-std updated 1 files
+1 −1 package.json
2 changes: 1 addition & 1 deletion blueprints/incredible-squaring/contracts/lib/forge-std
Submodule forge-std updated 1 files
+1 −1 package.json
2 changes: 2 additions & 0 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ gadget-blueprint-proc-macro-core = { workspace = true, default-features = false

# Benchmarking deps
sysinfo = { workspace = true }
dashmap = { workspace = true }
lazy_static = "1.5.0"
bincode2 = { workspace = true }


[target.'cfg(not(target_family = "wasm"))'.dependencies.libp2p]
Expand Down
192 changes: 190 additions & 2 deletions sdk/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use core::fmt::Display;

use async_trait::async_trait;
use core::fmt::Display;
use dashmap::DashMap;
use futures::Stream;
use serde::{Deserialize, Serialize};
use sp_core::ecdsa;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::error::Error;

Expand Down Expand Up @@ -116,6 +121,189 @@ pub trait Network: Send + Sync + Clone + 'static {
}
}

pub struct NetworkMultiplexer {
to_receiving_streams: ActiveStreams,
unclaimed_receiving_streams: Arc<DashMap<StreamKey, MultiplexedReceiver>>,
tx_to_networking_layer: MultiplexedSender,
}

type ActiveStreams = Arc<DashMap<StreamKey, tokio::sync::mpsc::UnboundedSender<ProtocolMessage>>>;

#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Default)]
pub struct StreamKey {
pub job_id: u8,
pub task_hash: [u8; 32],
pub stream_key: u16,
}

pub struct MultiplexedReceiver {
inner: tokio::sync::mpsc::UnboundedReceiver<ProtocolMessage>,
stream_id: StreamKey,
// For post-drop removal purposes
active_streams: ActiveStreams,
}

#[derive(Clone)]
pub struct MultiplexedSender {
inner: tokio::sync::mpsc::UnboundedSender<(StreamKey, ProtocolMessage)>,
stream_id: StreamKey,
}

impl MultiplexedSender {
pub fn send(&self, message: ProtocolMessage) -> Result<(), Error> {
self.inner
.send((self.stream_id, message))
.map_err(|err| Error::Other(err.to_string()))
}
}

impl Stream for MultiplexedReceiver {
type Item = <tokio::sync::mpsc::UnboundedReceiver<ProtocolMessage> as Stream>::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().inner).poll_next(cx)
}
}

impl Deref for MultiplexedReceiver {
type Target = tokio::sync::mpsc::UnboundedReceiver<ProtocolMessage>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl Drop for MultiplexedReceiver {
fn drop(&mut self) {
let _ = self.active_streams.remove(&self.stream_id);
}
}

#[derive(Debug, Serialize, Deserialize)]
struct MultiplexedMessage {
payload: Vec<u8>,
stream_id: StreamKey,
}

impl NetworkMultiplexer {
pub fn new<N: Network>(network: N) -> Self {
let (tx_to_networking_layer, mut rx_from_substreams) =
tokio::sync::mpsc::unbounded_channel();
let this = NetworkMultiplexer {
to_receiving_streams: Arc::new(DashMap::new()),
unclaimed_receiving_streams: Arc::new(DashMap::new()),
tx_to_networking_layer: MultiplexedSender {
inner: tx_to_networking_layer,
stream_id: Default::default(), // Start with an arbitrary stream ID, this won't get used
},
};

let active_streams = this.to_receiving_streams.clone();
let unclaimed_streams = this.unclaimed_receiving_streams.clone();
let tx_to_networking_layer = this.tx_to_networking_layer.clone();
tokio::spawn(async move {
let network_clone = &network.clone();
let task0 = network.run();

let task1 = async move {
while let Some((stream_id, proto_message)) = rx_from_substreams.recv().await {
let multiplexed_message = MultiplexedMessage {
payload: proto_message.payload,
stream_id,
};
let message = ProtocolMessage {
identifier_info: proto_message.identifier_info,
sender: proto_message.sender,
recipient: proto_message.recipient,
payload: bincode2::serialize(&multiplexed_message)
.expect("Failed to serialize message"),
};

if let Err(err) = network_clone.send_message(message).await {
crate::error!(%err, "Failed to send message to network");
break;
}
}
};

let task2 = async move {
while let Some(mut msg) = network_clone.next_message().await {
if let Ok(multiplexed_message) =
bincode2::deserialize::<MultiplexedMessage>(&msg.payload)
{
let stream_id = multiplexed_message.stream_id;
msg.payload = multiplexed_message.payload;
// Two possibilities: the entry already exists, or, it doesn't and we need to enqueue
if let Some(active_receiver) = active_streams.get(&stream_id) {
if let Err(err) = active_receiver.send(msg) {
crate::error!(%err, "Failed to send message to receiver");
// Delete entry since the receiver is dead
let _ = active_streams.remove(&stream_id);
}
} else {
// Second possibility: the entry does not exist, and another substream is received for this task.
// In this case, reserve an entry locally and store the message in the unclaimed streams. Later,
// when the user attempts to open the substream with the same ID, the message will be sent to the user.
let (tx, rx) = Self::create_multiplexed_stream_inner(
tx_to_networking_layer.clone(),
&active_streams,
stream_id,
);
let _ = tx.send(msg);
//let _ = active_streams.insert(stream_id, tx); TX already passed into active_streams above
let _ = unclaimed_streams.insert(stream_id, rx);
}
}
}
};

tokio::select! {
_ = task0 => {
crate::error!("Network task exited");
},
_ = task1 => {
crate::error!("Task 1 exited");
},
_ = task2 => {
crate::error!("Task 2 exited");
}
}
});

this
}

pub fn multiplex(&self, id: StreamKey) -> (MultiplexedSender, MultiplexedReceiver) {
let tx_to_networking_layer = self.tx_to_networking_layer.clone();
if let Some(unclaimed) = self.unclaimed_receiving_streams.remove(&id) {
return (tx_to_networking_layer, unclaimed.1);
}

Self::create_multiplexed_stream_inner(
tx_to_networking_layer,
&self.to_receiving_streams,
id,
)
}

fn create_multiplexed_stream_inner(
tx_to_networking_layer: MultiplexedSender,
active_streams: &ActiveStreams,
stream_id: StreamKey,
) -> (MultiplexedSender, MultiplexedReceiver) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
active_streams.insert(stream_id, tx);
(
tx_to_networking_layer,
MultiplexedReceiver {
inner: rx,
stream_id,
active_streams: active_streams.clone(),
},
)
}
}

pub fn deserialize<'a, T>(data: &'a [u8]) -> Result<T, serde_json::Error>
where
T: Deserialize<'a>,
Expand Down

0 comments on commit 3f5aea3

Please sign in to comment.