Skip to content

Commit

Permalink
feat: add a mempool proxy support for concurrent calls
Browse files Browse the repository at this point in the history
  • Loading branch information
uriel-starkware committed Apr 16, 2024
1 parent a268176 commit 33445c0
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 29 deletions.
46 changes: 25 additions & 21 deletions crates/mempool_node/src/mempool_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::mempool::{AddTransactionCallType, AddTransactionReturnType, Mempool, MempoolTrait};
use async_trait::async_trait;
use std::sync::Arc;

use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::sync::mpsc::{channel, Sender};
use tokio::task;

enum ProxyFunc {
Expand All @@ -14,48 +12,54 @@ enum ProxyRetValue {
AddTransaction(AddTransactionReturnType),
}

#[derive(Clone)]
pub struct MempoolProxy {
tx_call: Sender<ProxyFunc>,
rx_ret_value: Receiver<ProxyRetValue>,
tx_call: Sender<(ProxyFunc, Sender<ProxyRetValue>)>,
}

impl Default for MempoolProxy {
fn default() -> Self {
Self::new()
}
}

impl MempoolProxy {
pub fn new(mempool: Arc<Mutex<Mempool>>) -> Self {
let (tx_call, mut rx_call) = channel(32);
let (tx_ret_value, rx_ret_value) = channel(32);
pub fn new() -> Self {
let (tx_call, mut rx_call) = channel::<(ProxyFunc, Sender<ProxyRetValue>)>(32);

task::spawn(async move {
let mut mempool = Mempool::default();
while let Some(call) = rx_call.recv().await {
match call {
ProxyFunc::AddTransaction(tx) => {
let ret_value = mempool.lock().await.add_transaction(tx).await;
tx_ret_value
(ProxyFunc::AddTransaction(tx), tx_response) => {
let ret_value = mempool.add_transaction(tx).await;
tx_response
.send(ProxyRetValue::AddTransaction(ret_value))
.await
.expect("Sender of the func call is expecting a return value");
.expect("Receiver should be listening.");
}
}
}
});

MempoolProxy {
tx_call,
rx_ret_value,
}
MempoolProxy { tx_call }
}
}

#[async_trait]
impl MempoolTrait for MempoolProxy {
async fn add_transaction(&mut self, tx: AddTransactionCallType) -> AddTransactionReturnType {
let (tx_response, mut rx_response) = channel(32);
self.tx_call
.send(ProxyFunc::AddTransaction(tx))
.send((ProxyFunc::AddTransaction(tx), tx_response))
.await
.expect("Receiver is always listening in a dedicated task");
.expect("Receiver should be listening.");

match self.rx_ret_value.recv().await.expect(
"Receiver of the function call always returns a return value after sending a func call",
) {
match rx_response
.recv()
.await
.expect("Sender should be responding.")
{
ProxyRetValue::AddTransaction(ret_value) => ret_value,
}
}
Expand Down
40 changes: 32 additions & 8 deletions crates/mempool_node/src/mempool_proxy_test.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,42 @@
mod tests {
use std::sync::Arc;

use tokio::sync::Mutex;
use tokio::task::JoinSet;

use crate::{
mempool::{Mempool, MempoolTrait},
mempool::{AddTransactionCallType, AddTransactionReturnType, MempoolTrait},
mempool_proxy::MempoolProxy,
};

#[tokio::test]
async fn test_proxy_add_transaction() {
let mempool = Arc::new(Mutex::new(Mempool::new()));
let mut proxy = MempoolProxy::new(mempool);
assert_eq!(proxy.add_transaction(1).await, 1);
assert_eq!(proxy.add_transaction(1).await, 2);
async fn test_proxy_simple_add_transaction() {
let mut proxy = MempoolProxy::default();
let tx: AddTransactionCallType = 1;
let expect_result: AddTransactionReturnType = 1;
assert_eq!(proxy.add_transaction(tx).await, expect_result);
}

#[tokio::test]
async fn test_proxy_concurrent_add_transaction() {
let proxy = MempoolProxy::default();

let mut tasks: JoinSet<_> = (0..5)
.map(|_| {
let mut proxy = proxy.clone();
async move {
let tx: AddTransactionCallType = 1;
proxy.add_transaction(tx).await
}
})
.collect();

let mut results: Vec<AddTransactionReturnType> = vec![];
while let Some(result) = tasks.join_next().await {
results.push(result.unwrap());
}

results.sort();

let expected_results: Vec<AddTransactionReturnType> = (1..=5).collect();
assert_eq!(results, expected_results);
}
}

0 comments on commit 33445c0

Please sign in to comment.