Skip to content

Commit

Permalink
Requires refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz committed Nov 22, 2024
1 parent 1339994 commit 5b27c13
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 21 deletions.
22 changes: 12 additions & 10 deletions crates/driver/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,26 @@ fn web3(eth: &Ethereum) -> Web3 {

/// Builds a web3 client that buffers requests and sends them in a
/// batch call.
pub fn buffered_web3_client(ethrpc: &Url) -> Web3 {
web3_client(ethrpc, 20, 10)
pub fn buffered_web3_client(ethrpc: &Url, ethrpc_args: &shared::ethrpc::Arguments) -> Web3 {
web3_client(ethrpc, ethrpc_args)
}

/// Builds a web3 client that sends requests one by one.
pub fn unbuffered_web3_client(ethrpc: &Url) -> Web3 {
web3_client(ethrpc, 0, 0)
web3_client(
ethrpc,
&shared::ethrpc::Arguments {
ethrpc_max_batch_size: 0,
ethrpc_max_concurrent_requests: 0,
ethrpc_batch_delay: Default::default(),
},
)
}

fn web3_client(ethrpc: &Url, max_batch_size: usize, max_concurrent_requests: usize) -> Web3 {
let ethrpc_args = shared::ethrpc::Arguments {
ethrpc_max_batch_size: max_batch_size,
ethrpc_max_concurrent_requests: max_concurrent_requests,
ethrpc_batch_delay: Default::default(),
};
fn web3_client(ethrpc: &Url, ethrpc_args: &shared::ethrpc::Arguments) -> Web3 {
let http_factory =
shared::http_client::HttpClientFactory::new(&shared::http_client::Arguments {
http_timeout: std::time::Duration::from_secs(10),
});
shared::ethrpc::web3(&ethrpc_args, &http_factory, ethrpc, "base")
shared::ethrpc::web3(ethrpc_args, &http_factory, ethrpc, "base")
}
6 changes: 3 additions & 3 deletions crates/driver/src/infra/blockchain/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ impl Contracts {
.0,
);

let archive_node_web3 = archive_node_url
.as_ref()
.map_or(web3.clone(), |url| boundary::buffered_web3_client(url));
let archive_node_web3 = archive_node_url.as_ref().map_or(web3.clone(), |url| {
boundary::buffered_web3_client(url, &shared::ethrpc::Arguments::default())
}); // todo: provide it from the config
let mut cow_amm_registry = cow_amm::Registry::new(archive_node_web3);
for config in addresses.cow_amms {
cow_amm_registry
Expand Down
4 changes: 2 additions & 2 deletions crates/driver/src/infra/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pub struct Rpc {
impl Rpc {
/// Instantiate an RPC client to an Ethereum (or Ethereum-compatible) node
/// at the specifed URL.
pub async fn new(url: &url::Url) -> Result<Self, Error> {
let web3 = boundary::buffered_web3_client(url);
pub async fn new(url: &url::Url, args: &shared::ethrpc::Arguments) -> Result<Self, Error> {
let web3 = boundary::buffered_web3_client(url, args);
let chain = web3.eth().chain_id().await?.into();

Ok(Self {
Expand Down
3 changes: 3 additions & 0 deletions crates/driver/src/infra/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub struct Args {
#[clap(long, env)]
pub ethrpc: Url,

#[clap(flatten)]
pub ethrpc_args: shared::ethrpc::Arguments,

/// Path to the driver configuration file. This file should be in TOML
/// format. For an example see
/// https://github.com/cowprotocol/services/blob/main/crates/driver/example.toml.
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ fn simulator(config: &infra::Config, eth: &Ethereum) -> Simulator {
}

async fn ethrpc(args: &cli::Args) -> blockchain::Rpc {
blockchain::Rpc::new(&args.ethrpc)
blockchain::Rpc::new(&args.ethrpc, &args.ethrpc_args)
.await
.expect("connect ethereum RPC")
}
Expand Down
48 changes: 48 additions & 0 deletions crates/driver/src/tests/cases/settle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
setup::{ab_order, ab_pool, ab_solution},
},
},
futures::future::join_all,
web3::Transport,
};

Expand Down Expand Up @@ -110,3 +111,50 @@ async fn high_gas_limit() {
.unwrap();
test.settle(&id).await.ok().await;
}

#[tokio::test]
#[ignore]
async fn too_many_settle_calls() {
let test = tests::setup()
.allow_multiple_solve_requests()
.pool(ab_pool())
.order(ab_order())
.solution(ab_solution())
.ethrpc_args(shared::ethrpc::Arguments {
ethrpc_max_batch_size: 10,
ethrpc_max_concurrent_requests: 10,
ethrpc_batch_delay: std::time::Duration::from_secs(1),
})
.solve_deadline_timeout(chrono::Duration::seconds(4))
.done()
.await;

let id1 = test.solve().await.ok().id();
let id2 = test.solve().await.ok().id();
let id3 = test.solve().await.ok().id();
let id4 = test.solve().await.ok().id();

assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_ne!(id1, id3);
assert_ne!(id1, id4);

let results = join_all(vec![
test.settle(&id1),
test.settle(&id2),
test.settle(&id3),
test.settle(&id4),
])
.await;

for (index, result) in results.into_iter().enumerate() {
match index {
0 => {
result.ok().await.ab_order_executed().await;
}
1 | 2 => result.err().kind("FailedToSubmit"),
3 => result.err().kind("QueueAwaitingDeadlineExceeded"),
_ => unreachable!(),
}
}
}
12 changes: 11 additions & 1 deletion crates/driver/src/tests/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct Config {
pub enable_simulation: bool,
pub mempools: Vec<Mempool>,
pub order_priority_strategies: Vec<OrderPriorityStrategy>,
pub ethrpc_args: Option<shared::ethrpc::Arguments>,
}

pub struct Driver {
Expand All @@ -40,7 +41,8 @@ impl Driver {
}
};
let (addr_sender, addr_receiver) = oneshot::channel();
let args = vec![

let mut args = vec![
"/test/driver/path".to_owned(),
"--addr".to_owned(),
"0.0.0.0:0".to_owned(),
Expand All @@ -49,6 +51,14 @@ impl Driver {
"--config".to_owned(),
config_file.to_str().unwrap().to_owned(),
];
if let Some(ethrpc_arg) = &config.ethrpc_args {
args.push("--ethrpc-max-batch-size".to_owned());
args.push(ethrpc_arg.ethrpc_max_batch_size.to_string());
args.push("--ethrpc-max-concurrent-requests".to_owned());
args.push(ethrpc_arg.ethrpc_max_concurrent_requests.to_string());
args.push("--ethrpc-batch-delay".to_owned());
args.push(ethrpc_arg.ethrpc_batch_delay.as_millis().to_string() + "ms");
}
tokio::spawn(crate::run(args.into_iter(), Some(addr_sender)));
let addr = addr_receiver.await.unwrap();
Self {
Expand Down
17 changes: 16 additions & 1 deletion crates/driver/src/tests/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ pub fn setup() -> Setup {
rpc_args: vec!["--gas-limit".into(), "10000000".into()],
allow_multiple_solve_requests: false,
auction_id: 1,
solve_deadline_timeout: chrono::Duration::seconds(2),
..Default::default()
}
}
Expand Down Expand Up @@ -532,6 +533,9 @@ pub struct Setup {
allow_multiple_solve_requests: bool,
/// Auction ID used during tests
auction_id: i64,
ethrpc_args: Option<shared::ethrpc::Arguments>,
/// Auction solving deadline timeout
solve_deadline_timeout: chrono::Duration,
}

/// The validity of a solution.
Expand Down Expand Up @@ -842,6 +846,11 @@ impl Setup {
self
}

pub fn ethrpc_args(mut self, ethrpc_args: shared::ethrpc::Arguments) -> Self {
self.ethrpc_args = Some(ethrpc_args);
self
}

/// Create the test: set up onchain contracts and pools, start a mock HTTP
/// server for the solver and start the HTTP server for the driver.
pub async fn done(self) -> Test {
Expand Down Expand Up @@ -945,6 +954,7 @@ impl Setup {
enable_simulation: self.enable_simulation,
mempools: self.mempools,
order_priority_strategies: self.order_priority_strategies,
ethrpc_args: self.ethrpc_args,
},
&solvers_with_address,
&blockchain,
Expand Down Expand Up @@ -981,7 +991,12 @@ impl Setup {
}

fn deadline(&self) -> chrono::DateTime<chrono::Utc> {
crate::infra::time::now() + chrono::Duration::seconds(2)
crate::infra::time::now() + self.solve_deadline_timeout
}

pub fn solve_deadline_timeout(mut self, timeout: chrono::Duration) -> Self {
self.solve_deadline_timeout = timeout;
self
}

pub fn allow_multiple_solve_requests(mut self) -> Self {
Expand Down
4 changes: 3 additions & 1 deletion crates/driver/src/tests/setup/solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ impl Solver {
.collect::<HashMap<_, _>>();

let url = config.blockchain.web3_url.parse().unwrap();
let rpc = infra::blockchain::Rpc::new(&url).await.unwrap();
let rpc = infra::blockchain::Rpc::new(&url, &shared::ethrpc::Arguments::default())
.await
.unwrap();
let gas = Arc::new(
infra::blockchain::GasPriceEstimator::new(
rpc.web3(),
Expand Down
11 changes: 9 additions & 2 deletions crates/shared/src/ethrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use ethrpc::{
};
use {
crate::http_client::HttpClientFactory,
clap::Parser,
reqwest::Url,
std::{
fmt::{self, Display, Formatter},
Expand All @@ -17,12 +18,12 @@ use {
pub const MAX_BATCH_SIZE: usize = 100;

/// Command line arguments for the common Ethereum RPC connections.
#[derive(clap::Parser)]
#[derive(clap::Parser, Debug)]
#[group(skip)]
pub struct Arguments {
/// Maximum batch size for Ethereum RPC requests. Use '0' to disable
/// batching.
#[clap(long, env, default_value = "100")]
#[clap(long, env, default_value = "20")]
pub ethrpc_max_batch_size: usize,

/// Maximum number of concurrent requests to send to the node. Use '0' for
Expand All @@ -36,6 +37,12 @@ pub struct Arguments {
pub ethrpc_batch_delay: Duration,
}

impl Default for Arguments {
fn default() -> Self {
Arguments::parse_from([""])
}
}

impl Display for Arguments {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let Self {
Expand Down

0 comments on commit 5b27c13

Please sign in to comment.