From 6023cc7f3587db8e58737ea0125a2f25479b374b Mon Sep 17 00:00:00 2001 From: Markus Pettersson Date: Fri, 12 Jan 2024 15:54:54 +0100 Subject: [PATCH] De-duplicate find `DaemonEvent` function --- test/test-manager/src/tests/account.rs | 51 ++++++++++---------------- test/test-manager/src/tests/helpers.rs | 26 ++++++++----- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/test/test-manager/src/tests/account.rs b/test/test-manager/src/tests/account.rs index 8bcbab8f609f..87ad87405177 100644 --- a/test/test-manager/src/tests/account.rs +++ b/test/test-manager/src/tests/account.rs @@ -305,7 +305,6 @@ pub async fn test_automatic_wireguard_rotation( rpc: ServiceClient, mut mullvad_client: MullvadProxyClient, ) -> Result<(), Error> { - use futures::StreamExt; // Make note of current WG key let old_key = mullvad_client .get_device() @@ -339,36 +338,26 @@ pub async fn test_automatic_wireguard_rotation( // Verify rotation has happened after a minute const KEY_ROTATION_TIMEOUT: Duration = Duration::from_secs(100); - let mut event_stream = mullvad_client.events_listen().await.unwrap(); - let get_pub_key_event = async { - loop { - // TODO(markus): See if this can be refactored. This is exactly the same as helpers:274. - match event_stream.next().await { - Some(Ok(DaemonEvent::Device(device_event))) => { - let pubkey = device_event - .new_state - .into_device() - .expect("Could not get device") - .device - .pubkey; - return Ok(pubkey); - } - Some(Ok(_)) => continue, - Some(Err(status)) => { - break Err(Error::Daemon(format!( - "Failed to get next event: {}", - status - ))) - } - None => break Err(Error::Daemon(String::from("Lost daemon event stream"))), - } - } - }; - - let new_key = tokio::time::timeout(KEY_ROTATION_TIMEOUT, get_pub_key_event) - .await - .unwrap() - .unwrap(); + let new_key = tokio::time::timeout( + KEY_ROTATION_TIMEOUT, + helpers::find_daemon_event( + mullvad_client.events_listen().await.unwrap(), + |daemon_event| match daemon_event { + DaemonEvent::Device(device_event) => Some(device_event), + _ => None, + }, + ), + ) + .await + .map_err(|_error| Error::Daemon(String::from("Tunnel event listener timed out")))? + .map(|device_event| { + device_event + .new_state + .into_device() + .expect("Could not get device") + .device + .pubkey + })?; assert_ne!(old_key, new_key); Ok(()) diff --git a/test/test-manager/src/tests/helpers.rs b/test/test-manager/src/tests/helpers.rs index 478bb2eb46fa..30f89267ccf9 100644 --- a/test/test-manager/src/tests/helpers.rs +++ b/test/test-manager/src/tests/helpers.rs @@ -263,23 +263,29 @@ pub async fn find_next_tunnel_state( ) -> Result { tokio::time::timeout( WAIT_FOR_TUNNEL_STATE_TIMEOUT, - find_next_tunnel_state_inner(stream, accept_state_fn), + find_daemon_event(stream, |daemon_event| match daemon_event { + DaemonEvent::TunnelState(state) if accept_state_fn(&state) => Some(state), + _ => None, + }), ) .await .map_err(|_error| Error::Daemon(String::from("Tunnel event listener timed out")))? } -async fn find_next_tunnel_state_inner( - mut stream: impl futures::Stream> +pub async fn find_daemon_event( + mut event_stream: impl futures::Stream> + Unpin, - accept_state_fn: impl Fn(&mullvad_types::states::TunnelState) -> bool, -) -> Result { + accept_event: Accept, +) -> Result +where + Accept: Fn(DaemonEvent) -> Option, +{ loop { - match stream.next().await { - Some(Ok(DaemonEvent::TunnelState(state))) if accept_state_fn(&state) => { - return Ok(state) - } - Some(Ok(_)) => continue, + match event_stream.next().await { + Some(Ok(daemon_event)) => match accept_event(daemon_event) { + Some(accepted_event) => break Ok(accepted_event), + None => continue, + }, Some(Err(status)) => { break Err(Error::Daemon(format!( "Failed to get next event: {}",