Skip to content

Commit

Permalink
Add timeouts to API communication
Browse files Browse the repository at this point in the history
  • Loading branch information
zargony committed Dec 22, 2024
1 parent 5b2db1d commit d684575
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 53 deletions.
2 changes: 1 addition & 1 deletion firmware/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[env]
# Task arena size of embassy-executor, see https://docs.embassy.dev/embassy-executor/git/cortex-m/index.html#task-arena
EMBASSY_EXECUTOR_TASK_ARENA_SIZE = "34816"
EMBASSY_EXECUTOR_TASK_ARENA_SIZE = "45056"
# Log filter for esp-println to apply at runtime. Also, a feature of the log crate strips all
# logging above info level from release builds at compile time (feature `release_max_level_info`).
ESP_LOG = "info,touch_n_drink=debug"
Expand Down
2 changes: 2 additions & 0 deletions firmware/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ async fn main(spawner: Spawner) {
}

loop {
// FIXME: Ui::run is a pretty large future, but pinning it to the heap seems even worse
#[allow(clippy::large_futures)]
match ui.run().await {
// Success: start over again
Ok(()) => (),
Expand Down
31 changes: 23 additions & 8 deletions firmware/src/mixpanel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use crate::http::{self, Http};
use crate::telemetry::Event;
use crate::time;
use core::fmt;
use embassy_time::Instant;
use embassy_time::{with_timeout, Duration, Instant};
use log::{debug, warn};

/// Mixpanel API base URL
const BASE_URL: &str = "https://api-eu.mixpanel.com";

/// How long to wait for a server response
const TIMEOUT: Duration = Duration::from_secs(10);

/// Mixpanel API error
#[derive(Debug)]
pub enum Error {
Expand All @@ -19,6 +22,14 @@ pub enum Error {
Connect(http::Error),
/// Failed to submit events to API server
Submit(http::Error),
/// Timeout waiting for response from API server
Timeout,
}

impl From<embassy_time::TimeoutError> for Error {
fn from(_err: embassy_time::TimeoutError) -> Self {
Self::Timeout
}
}

impl fmt::Display for Error {
Expand All @@ -27,6 +38,7 @@ impl fmt::Display for Error {
Self::CurrentTimeNotSet => write!(f, "Unknown current time"),
Self::Connect(err) => write!(f, "MP connect failed ({err})"),
Self::Submit(err) => write!(f, "MP submit failed ({err})"),
Self::Timeout => write!(f, "MP timeout"),
}
}
}
Expand Down Expand Up @@ -72,18 +84,19 @@ impl Connection<'_> {
}

debug!("Mixpanel: Submitting {} events...", events.len());
let response: TrackResponse = self
.http
.post(
let response: TrackResponse = with_timeout(
TIMEOUT,
self.http.post(
"track?verbose=1",
&TrackRequest {
token: self.token,
device_id: self.device_id,
events,
},
)
.await
.map_err(Error::Submit)?;
),
)
.await?
.map_err(Error::Submit)?;
debug!(
"Mixpanel: Submit successul, status {} {}",
response.status, response.error
Expand All @@ -96,7 +109,9 @@ impl<'a> Connection<'a> {
/// Connect to API server
async fn new(mp: &'a Mixpanel<'_>, http: &'a mut Http<'_>) -> Result<Self, Error> {
// Connect to API server
let connection = http.connect(BASE_URL).await.map_err(Error::Connect)?;
let connection = with_timeout(TIMEOUT, http.connect(BASE_URL))
.await?
.map_err(Error::Connect)?;

Ok(Self {
http: connection,
Expand Down
115 changes: 71 additions & 44 deletions firmware/src/vereinsflieger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,18 @@ use alloc::format;
use alloc::string::String;
use core::cell::RefCell;
use core::fmt;
use embassy_time::{with_timeout, Duration};
use log::{debug, info, warn};

/// Vereinsflieger API base URL
const BASE_URL: &str = "https://www.vereinsflieger.de/interface/rest";

/// How long to wait for a server response
const TIMEOUT: Duration = Duration::from_secs(10);

/// How long to wait to finish streaming a server's response
const FETCH_TIMEOUT: Duration = Duration::from_secs(60);

/// Vereinsflieger API error
#[derive(Debug)]
pub enum Error {
Expand All @@ -31,6 +38,14 @@ pub enum Error {
Connect(http::Error),
/// Failed to sign in to API server
SignIn(http::Error),
/// Timeout waiting for response from API server
Timeout,
}

impl From<embassy_time::TimeoutError> for Error {
fn from(_err: embassy_time::TimeoutError) -> Self {
Self::Timeout
}
}

impl fmt::Display for Error {
Expand All @@ -42,6 +57,7 @@ impl fmt::Display for Error {
Self::Purchase(err) => write!(f, "VF purchase failed ({err})"),
Self::Connect(err) => write!(f, "VF connect failed ({err})"),
Self::SignIn(err) => write!(f, "VF sign in failed ({err})"),
Self::Timeout => write!(f, "VF timeout"),
}
}
}
Expand Down Expand Up @@ -116,16 +132,17 @@ impl Connection<'_> {
pub async fn get_user_information(&mut self) -> Result<(), Error> {
use proto_auth::{UserInformationRequest, UserInformationResponse};

let response: UserInformationResponse = self
.http
.post(
let response: UserInformationResponse = with_timeout(
TIMEOUT,
self.http.post(
"auth/getuser",
&UserInformationRequest {
accesstoken: self.accesstoken,
},
)
.await
.map_err(Error::FetchUserInformation)?;
),
)
.await?
.map_err(Error::FetchUserInformation)?;
debug!("Vereinsflieger: Got user information: {:?}", response);
Ok(())
}
Expand All @@ -144,20 +161,22 @@ impl Connection<'_> {
.await
.map_err(Error::FetchArticles)?;
let mut rx_buf = [0; 4096];
let mut json = self
.http
.post_json("articles/list", &request_body, &mut rx_buf)
.await
.map_err(Error::FetchArticles)?;
let mut json = with_timeout(
TIMEOUT,
self.http
.post_json("articles/list", &request_body, &mut rx_buf),
)
.await?
.map_err(Error::FetchArticles)?;

articles.clear();
let articles = RefCell::new(articles);

let response: ArticleListResponse<N> = json
.read_object_with_context(&articles)
.await
.map_err(http::Error::MalformedResponse)
.map_err(Error::FetchArticles)?;
let response: ArticleListResponse<N> =
with_timeout(FETCH_TIMEOUT, json.read_object_with_context(&articles))
.await?
.map_err(http::Error::MalformedResponse)
.map_err(Error::FetchArticles)?;
info!(
"Vereinsflieger: Refreshed {} of {} articles",
articles.borrow().count(),
Expand All @@ -184,20 +203,21 @@ impl Connection<'_> {
.await
.map_err(Error::FetchUsers)?;
let mut rx_buf = [0; 4096];
let mut json = self
.http
.post_json("user/list", &request_body, &mut rx_buf)
.await
.map_err(Error::FetchUsers)?;
let mut json = with_timeout(
TIMEOUT,
self.http.post_json("user/list", &request_body, &mut rx_buf),
)
.await?
.map_err(Error::FetchUsers)?;

users.clear();
let users = RefCell::new(users);

let response: UserListResponse = json
.read_object_with_context(&users)
.await
.map_err(http::Error::MalformedResponse)
.map_err(Error::FetchUsers)?;
let response: UserListResponse =
with_timeout(FETCH_TIMEOUT, json.read_object_with_context(&users))
.await?
.map_err(http::Error::MalformedResponse)
.map_err(Error::FetchUsers)?;
info!(
"Vereinsflieger: Refreshed {} of {} users",
users.borrow().count(),
Expand Down Expand Up @@ -228,9 +248,9 @@ impl Connection<'_> {
amount, article_id, total_price, user_id
);

let _response: SaleAddResponse = self
.http
.post(
let _response: SaleAddResponse = with_timeout(
TIMEOUT,
self.http.post(
"sale/add",
&SaleAddRequest {
accesstoken: self.accesstoken,
Expand All @@ -241,9 +261,10 @@ impl Connection<'_> {
totalprice: Some(total_price),
comment: None,
},
)
.await
.map_err(Error::Purchase)?;
),
)
.await?
.map_err(Error::Purchase)?;
debug!("Vereinsflieger: Purchase successful");
Ok(())
}
Expand All @@ -254,15 +275,19 @@ impl<'a> Connection<'a> {
/// in. Return connection for authenticated API requests.
async fn new(vf: &'a mut Vereinsflieger<'_>, http: &'a mut Http<'_>) -> Result<Self, Error> {
// Connect to API server
let mut connection = http.connect(BASE_URL).await.map_err(Error::Connect)?;
let mut connection = with_timeout(TIMEOUT, http.connect(BASE_URL))
.await?
.map_err(Error::Connect)?;

// If exist, check validity of access token
if let Some(ref accesstoken) = vf.accesstoken {
use proto_auth::{UserInformationRequest, UserInformationResponse};

let response: Result<UserInformationResponse, _> = connection
.post("auth/getuser", &UserInformationRequest { accesstoken })
.await;
let response: Result<UserInformationResponse, _> = with_timeout(
TIMEOUT,
connection.post("auth/getuser", &UserInformationRequest { accesstoken }),
)
.await?;
match response {
Ok(_userinfo) => debug!("Vereinsflieger: Access token valid"),
Err(http::Error::Unauthorized) => {
Expand All @@ -278,10 +303,10 @@ impl<'a> Connection<'a> {
use proto_auth::{AccessTokenResponse, SignInRequest, SignInResponse};

// Fetch a new access token
let response: AccessTokenResponse = connection
.get("auth/accesstoken")
.await
.map_err(Error::SignIn)?;
let response: AccessTokenResponse =
with_timeout(TIMEOUT, connection.get("auth/accesstoken"))
.await?
.map_err(Error::SignIn)?;
let accesstoken = response.accesstoken;
// debug!("Vereinsflieger: Got access token {}", accesstoken);
debug!(
Expand All @@ -290,8 +315,9 @@ impl<'a> Connection<'a> {
);

// Use credentials to sign in
let response: Result<SignInResponse, _> = connection
.post(
let response: Result<SignInResponse, _> = with_timeout(
TIMEOUT,
connection.post(
"auth/signin",
&SignInRequest {
accesstoken: &accesstoken,
Expand All @@ -301,8 +327,9 @@ impl<'a> Connection<'a> {
cid: vf.cid,
auth_secret: None,
},
)
.await;
),
)
.await?;
match response {
Ok(_signin) => {
vf.accesstoken = Some(accesstoken);
Expand Down

0 comments on commit d684575

Please sign in to comment.