From d684575f8a8a0756aa2b85d3bb70c3ebe592d590 Mon Sep 17 00:00:00 2001 From: Andreas Neuhaus Date: Sun, 22 Dec 2024 15:50:23 +0100 Subject: [PATCH] Add timeouts to API communication --- firmware/.cargo/config.toml | 2 +- firmware/src/main.rs | 2 + firmware/src/mixpanel/mod.rs | 31 ++++++-- firmware/src/vereinsflieger/mod.rs | 115 ++++++++++++++++++----------- 4 files changed, 97 insertions(+), 53 deletions(-) diff --git a/firmware/.cargo/config.toml b/firmware/.cargo/config.toml index 76b1cdf..417afce 100644 --- a/firmware/.cargo/config.toml +++ b/firmware/.cargo/config.toml @@ -1,6 +1,6 @@ [env] # Task arena size of embassy-executor, see https://docs.embassy.dev/embassy-executor/git/cortex-m/index.html#task-arena -EMBASSY_EXECUTOR_TASK_ARENA_SIZE = "34816" +EMBASSY_EXECUTOR_TASK_ARENA_SIZE = "45056" # Log filter for esp-println to apply at runtime. Also, a feature of the log crate strips all # logging above info level from release builds at compile time (feature `release_max_level_info`). ESP_LOG = "info,touch_n_drink=debug" diff --git a/firmware/src/main.rs b/firmware/src/main.rs index 9524eea..c89aa1e 100644 --- a/firmware/src/main.rs +++ b/firmware/src/main.rs @@ -261,6 +261,8 @@ async fn main(spawner: Spawner) { } loop { + // FIXME: Ui::run is a pretty large future, but pinning it to the heap seems even worse + #[allow(clippy::large_futures)] match ui.run().await { // Success: start over again Ok(()) => (), diff --git a/firmware/src/mixpanel/mod.rs b/firmware/src/mixpanel/mod.rs index f0b39c3..505f3ff 100644 --- a/firmware/src/mixpanel/mod.rs +++ b/firmware/src/mixpanel/mod.rs @@ -4,12 +4,15 @@ use crate::http::{self, Http}; use crate::telemetry::Event; use crate::time; use core::fmt; -use embassy_time::Instant; +use embassy_time::{with_timeout, Duration, Instant}; use log::{debug, warn}; /// Mixpanel API base URL const BASE_URL: &str = "https://api-eu.mixpanel.com"; +/// How long to wait for a server response +const TIMEOUT: Duration = Duration::from_secs(10); + /// Mixpanel API error #[derive(Debug)] pub enum Error { @@ -19,6 +22,14 @@ pub enum Error { Connect(http::Error), /// Failed to submit events to API server Submit(http::Error), + /// Timeout waiting for response from API server + Timeout, +} + +impl From for Error { + fn from(_err: embassy_time::TimeoutError) -> Self { + Self::Timeout + } } impl fmt::Display for Error { @@ -27,6 +38,7 @@ impl fmt::Display for Error { Self::CurrentTimeNotSet => write!(f, "Unknown current time"), Self::Connect(err) => write!(f, "MP connect failed ({err})"), Self::Submit(err) => write!(f, "MP submit failed ({err})"), + Self::Timeout => write!(f, "MP timeout"), } } } @@ -72,18 +84,19 @@ impl Connection<'_> { } debug!("Mixpanel: Submitting {} events...", events.len()); - let response: TrackResponse = self - .http - .post( + let response: TrackResponse = with_timeout( + TIMEOUT, + self.http.post( "track?verbose=1", &TrackRequest { token: self.token, device_id: self.device_id, events, }, - ) - .await - .map_err(Error::Submit)?; + ), + ) + .await? + .map_err(Error::Submit)?; debug!( "Mixpanel: Submit successul, status {} {}", response.status, response.error @@ -96,7 +109,9 @@ impl<'a> Connection<'a> { /// Connect to API server async fn new(mp: &'a Mixpanel<'_>, http: &'a mut Http<'_>) -> Result { // Connect to API server - let connection = http.connect(BASE_URL).await.map_err(Error::Connect)?; + let connection = with_timeout(TIMEOUT, http.connect(BASE_URL)) + .await? + .map_err(Error::Connect)?; Ok(Self { http: connection, diff --git a/firmware/src/vereinsflieger/mod.rs b/firmware/src/vereinsflieger/mod.rs index 44f108a..56c961e 100644 --- a/firmware/src/vereinsflieger/mod.rs +++ b/firmware/src/vereinsflieger/mod.rs @@ -11,11 +11,18 @@ use alloc::format; use alloc::string::String; use core::cell::RefCell; use core::fmt; +use embassy_time::{with_timeout, Duration}; use log::{debug, info, warn}; /// Vereinsflieger API base URL const BASE_URL: &str = "https://www.vereinsflieger.de/interface/rest"; +/// How long to wait for a server response +const TIMEOUT: Duration = Duration::from_secs(10); + +/// How long to wait to finish streaming a server's response +const FETCH_TIMEOUT: Duration = Duration::from_secs(60); + /// Vereinsflieger API error #[derive(Debug)] pub enum Error { @@ -31,6 +38,14 @@ pub enum Error { Connect(http::Error), /// Failed to sign in to API server SignIn(http::Error), + /// Timeout waiting for response from API server + Timeout, +} + +impl From for Error { + fn from(_err: embassy_time::TimeoutError) -> Self { + Self::Timeout + } } impl fmt::Display for Error { @@ -42,6 +57,7 @@ impl fmt::Display for Error { Self::Purchase(err) => write!(f, "VF purchase failed ({err})"), Self::Connect(err) => write!(f, "VF connect failed ({err})"), Self::SignIn(err) => write!(f, "VF sign in failed ({err})"), + Self::Timeout => write!(f, "VF timeout"), } } } @@ -116,16 +132,17 @@ impl Connection<'_> { pub async fn get_user_information(&mut self) -> Result<(), Error> { use proto_auth::{UserInformationRequest, UserInformationResponse}; - let response: UserInformationResponse = self - .http - .post( + let response: UserInformationResponse = with_timeout( + TIMEOUT, + self.http.post( "auth/getuser", &UserInformationRequest { accesstoken: self.accesstoken, }, - ) - .await - .map_err(Error::FetchUserInformation)?; + ), + ) + .await? + .map_err(Error::FetchUserInformation)?; debug!("Vereinsflieger: Got user information: {:?}", response); Ok(()) } @@ -144,20 +161,22 @@ impl Connection<'_> { .await .map_err(Error::FetchArticles)?; let mut rx_buf = [0; 4096]; - let mut json = self - .http - .post_json("articles/list", &request_body, &mut rx_buf) - .await - .map_err(Error::FetchArticles)?; + let mut json = with_timeout( + TIMEOUT, + self.http + .post_json("articles/list", &request_body, &mut rx_buf), + ) + .await? + .map_err(Error::FetchArticles)?; articles.clear(); let articles = RefCell::new(articles); - let response: ArticleListResponse = json - .read_object_with_context(&articles) - .await - .map_err(http::Error::MalformedResponse) - .map_err(Error::FetchArticles)?; + let response: ArticleListResponse = + with_timeout(FETCH_TIMEOUT, json.read_object_with_context(&articles)) + .await? + .map_err(http::Error::MalformedResponse) + .map_err(Error::FetchArticles)?; info!( "Vereinsflieger: Refreshed {} of {} articles", articles.borrow().count(), @@ -184,20 +203,21 @@ impl Connection<'_> { .await .map_err(Error::FetchUsers)?; let mut rx_buf = [0; 4096]; - let mut json = self - .http - .post_json("user/list", &request_body, &mut rx_buf) - .await - .map_err(Error::FetchUsers)?; + let mut json = with_timeout( + TIMEOUT, + self.http.post_json("user/list", &request_body, &mut rx_buf), + ) + .await? + .map_err(Error::FetchUsers)?; users.clear(); let users = RefCell::new(users); - let response: UserListResponse = json - .read_object_with_context(&users) - .await - .map_err(http::Error::MalformedResponse) - .map_err(Error::FetchUsers)?; + let response: UserListResponse = + with_timeout(FETCH_TIMEOUT, json.read_object_with_context(&users)) + .await? + .map_err(http::Error::MalformedResponse) + .map_err(Error::FetchUsers)?; info!( "Vereinsflieger: Refreshed {} of {} users", users.borrow().count(), @@ -228,9 +248,9 @@ impl Connection<'_> { amount, article_id, total_price, user_id ); - let _response: SaleAddResponse = self - .http - .post( + let _response: SaleAddResponse = with_timeout( + TIMEOUT, + self.http.post( "sale/add", &SaleAddRequest { accesstoken: self.accesstoken, @@ -241,9 +261,10 @@ impl Connection<'_> { totalprice: Some(total_price), comment: None, }, - ) - .await - .map_err(Error::Purchase)?; + ), + ) + .await? + .map_err(Error::Purchase)?; debug!("Vereinsflieger: Purchase successful"); Ok(()) } @@ -254,15 +275,19 @@ impl<'a> Connection<'a> { /// in. Return connection for authenticated API requests. async fn new(vf: &'a mut Vereinsflieger<'_>, http: &'a mut Http<'_>) -> Result { // Connect to API server - let mut connection = http.connect(BASE_URL).await.map_err(Error::Connect)?; + let mut connection = with_timeout(TIMEOUT, http.connect(BASE_URL)) + .await? + .map_err(Error::Connect)?; // If exist, check validity of access token if let Some(ref accesstoken) = vf.accesstoken { use proto_auth::{UserInformationRequest, UserInformationResponse}; - let response: Result = connection - .post("auth/getuser", &UserInformationRequest { accesstoken }) - .await; + let response: Result = with_timeout( + TIMEOUT, + connection.post("auth/getuser", &UserInformationRequest { accesstoken }), + ) + .await?; match response { Ok(_userinfo) => debug!("Vereinsflieger: Access token valid"), Err(http::Error::Unauthorized) => { @@ -278,10 +303,10 @@ impl<'a> Connection<'a> { use proto_auth::{AccessTokenResponse, SignInRequest, SignInResponse}; // Fetch a new access token - let response: AccessTokenResponse = connection - .get("auth/accesstoken") - .await - .map_err(Error::SignIn)?; + let response: AccessTokenResponse = + with_timeout(TIMEOUT, connection.get("auth/accesstoken")) + .await? + .map_err(Error::SignIn)?; let accesstoken = response.accesstoken; // debug!("Vereinsflieger: Got access token {}", accesstoken); debug!( @@ -290,8 +315,9 @@ impl<'a> Connection<'a> { ); // Use credentials to sign in - let response: Result = connection - .post( + let response: Result = with_timeout( + TIMEOUT, + connection.post( "auth/signin", &SignInRequest { accesstoken: &accesstoken, @@ -301,8 +327,9 @@ impl<'a> Connection<'a> { cid: vf.cid, auth_secret: None, }, - ) - .await; + ), + ) + .await?; match response { Ok(_signin) => { vf.accesstoken = Some(accesstoken);