Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add urgency support #815

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions autoconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ docopt = "1.1"
default = ["bigtable"]
bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"]
emulator = ["bigtable"]
urgency = ["autoconnect_ws/urgency"]
log_vapid = []
1 change: 1 addition & 0 deletions autoconnect/autoconnect-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ autopush_common.workspace = true

[features]
test-support = []
urgency = []
13 changes: 13 additions & 0 deletions autoconnect/autoconnect-common/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use uuid::Uuid;

use autopush_common::notification::Notification;

#[cfg(feature = "urgency")]
use autopush_common::db::Urgency;

#[derive(Debug, Eq, PartialEq, Serialize)]
#[serde(untagged)]
pub enum BroadcastValue {
Expand Down Expand Up @@ -66,6 +69,11 @@ pub enum ClientMessage {
version: String,
},

p1gp1g marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "urgency")]
Urgency {
min: Urgency,
},

Ping,
}

Expand Down Expand Up @@ -123,6 +131,11 @@ pub enum ServerMessage {

Notification(Notification),

p1gp1g marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "urgency")]
Urgency {
status: u32,
},

Ping,
}

Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ ctor.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
urgency = ["autoconnect_ws_sm/urgency"]
1 change: 1 addition & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ serde_json.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
urgency = ["autoconnect_common/urgency"]
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use autoconnect_common::{

use autoconnect_settings::{AppState, Settings};
use autopush_common::{
db::User,
db::{Urgency, User},
notification::Notification,
util::{ms_since_epoch, user_agent::UserAgentInfo},
};
Expand Down Expand Up @@ -291,6 +291,8 @@ pub struct ClientFlags {
pub old_record_version: bool,
/// First time a user has connected "today"
pub emit_channel_metrics: bool,
/// Minimum urgency
pub min_urgency: Urgency,
}

impl Default for ClientFlags {
Expand All @@ -301,6 +303,7 @@ impl Default for ClientFlags {
check_storage: false,
old_record_version: false,
emit_channel_metrics: false,
min_urgency: Urgency::VeryLow,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use autoconnect_common::{
};
use autopush_common::{endpoint::make_endpoint, util::sec_since_epoch};

#[cfg(feature = "urgency")]
use autopush_common::{db::Urgency, util::ms_since_epoch};

use super::WebPushClient;
use crate::error::{SMError, SMErrorKind};

Expand Down Expand Up @@ -38,6 +41,8 @@ impl WebPushClient {
self.nack(code);
Ok(vec![])
}
#[cfg(feature = "urgency")]
ClientMessage::Urgency { min } => Ok(self.change_min_urgency(min).await?),
ClientMessage::Ping => Ok(vec![self.ping()?]),
}
}
Expand Down Expand Up @@ -330,4 +335,40 @@ impl WebPushClient {
Ok(vec![])
}
}

/// Update minimum urgency for the user and the flag
///
/// If the new urgency is lower than the previous one,
/// We check pending messages, to send messages that were
/// retained because of their urgency
#[cfg(feature = "urgency")]
async fn change_min_urgency(
&mut self,
new_min: Urgency,
) -> Result<Vec<ServerMessage>, SMError> {
// Change the min urgency
self.flags.min_urgency = new_min;

if let Some(mut user) = self.app_state.db.get_user(&self.uaid).await? {
// If the user hasn't set a minimum urgency yet, they receive all messages,
// which is equivalent to setting very-low as a minimum
let current_urgency = user.urgency.unwrap_or(Urgency::VeryLow);
jrconlin marked this conversation as resolved.
Show resolved Hide resolved

// We update the user
user.urgency = Some(new_min);
user.connected_at = ms_since_epoch();
self.app_state.db.update_user(&mut user).await?;

let mut res = vec![ServerMessage::Urgency { status: 200 }];
// if new urgency < previous: fetch pending messages
if new_min < current_urgency {
self.ack_state.unacked_stored_highest = None;
self.current_timestamp = None;
res.append(&mut self.check_storage().await?);
}
Ok(res)
} else {
Ok(vec![ServerMessage::Urgency { status: 404 }])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use autopush_common::{
};

use super::WebPushClient;
use crate::error::{SMError, SMErrorKind};
use crate::{
error::{SMError, SMErrorKind},
identified::Urgency,
};

impl WebPushClient {
/// Handle a `ServerNotification` for this user
Expand Down Expand Up @@ -119,7 +122,11 @@ impl WebPushClient {
let mut expired_topic_sort_keys = vec![];
messages.retain(|msg| {
if !msg.expired(now_sec) {
return true;
if let Some(headers) = msg.headers.as_ref() {
return Urgency::from(headers.get("urgency")) >= self.flags.min_urgency;
} else {
return true;
}
}
if msg.sortkey_timestamp.is_none() {
expired_topic_sort_keys.push(msg.chidmessageid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use autoconnect_common::{
};
use autoconnect_settings::{AppState, Settings};
use autopush_common::{
db::{User, USER_RECORD_VERSION},
db::{Urgency, User, USER_RECORD_VERSION},
util::{ms_since_epoch, ms_utc_midnight},
};

Expand Down Expand Up @@ -145,6 +145,7 @@ impl UnidentifiedClient {
.record_version
.map_or(true, |rec_ver| rec_ver < USER_RECORD_VERSION),
emit_channel_metrics: user.connected_at < ms_utc_midnight(),
min_urgency: user.urgency.unwrap_or(Urgency::VeryLow),
..Default::default()
};
user.node_id = Some(self.app_state.router_url.to_owned());
Expand Down
22 changes: 21 additions & 1 deletion autoendpoint/src/extractors/notification_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use std::cmp::min;
use std::collections::HashMap;
use validator::Validate;
use validator::{Validate, ValidationError};
use validator_derive::Validate;

lazy_static! {
Expand Down Expand Up @@ -37,6 +37,9 @@ pub struct NotificationHeaders {
)]
pub topic: Option<String>,

#[validate(custom(function = "validate_urgency"))]
pub urgency: Option<String>,

// These fields are validated separately, because the validation is complex
// and based upon the content encoding
pub encoding: Option<String>,
Expand All @@ -45,10 +48,21 @@ pub struct NotificationHeaders {
pub crypto_key: Option<String>,
}

fn validate_urgency(value: &str) -> Result<(), ValidationError> {
if ["very-low", "low", "normal", "high"].contains(&value) {
Ok(())
} else {
Err(ValidationError::new(
"Value not equal to \"very-low\", \"low\", \"normal\" or \"high\"",
))
}
}

impl From<NotificationHeaders> for HashMap<String, String> {
fn from(headers: NotificationHeaders) -> Self {
let mut map = HashMap::new();

map.insert_opt("urgency", headers.urgency);
map.insert_opt("encoding", headers.encoding);
map.insert_opt("encryption", headers.encryption);
map.insert_opt("encryption_key", headers.encryption_key);
Expand All @@ -73,11 +87,13 @@ impl NotificationHeaders {
.map(|ttl| min(ttl, MAX_NOTIFICATION_TTL as i64))
.ok_or(ApiErrorKind::NoTTL)?;
let topic = get_owned_header(req, "topic");
let urgency = get_owned_header(req, "urgency");

let headers = if has_data {
NotificationHeaders {
ttl,
topic,
urgency,
encoding: get_owned_header(req, "content-encoding"),
encryption: get_owned_header(req, "encryption").map(Self::strip_header),
encryption_key: get_owned_header(req, "encryption-key"),
Expand All @@ -88,6 +104,7 @@ impl NotificationHeaders {
NotificationHeaders {
ttl,
topic,
urgency,
encoding: None,
encryption: None,
encryption_key: None,
Expand Down Expand Up @@ -359,6 +376,7 @@ mod tests {
NotificationHeaders {
ttl: 10,
topic: None,
urgency: None,
encoding: Some("aesgcm".to_string()),
encryption: Some("salt=foo".to_string()),
encryption_key: None,
Expand All @@ -384,6 +402,7 @@ mod tests {
NotificationHeaders {
ttl: 10,
topic: None,
urgency: None,
encoding: Some("aes128gcm".to_string()),
encryption: Some("notsalt=foo".to_string()),
encryption_key: None,
Expand All @@ -410,6 +429,7 @@ mod tests {
NotificationHeaders {
ttl: 10,
topic: None,
urgency: None,
encoding: Some("aesgcm".to_string()),
encryption: Some("salt=foo".to_string()),
encryption_key: None,
Expand Down
1 change: 1 addition & 0 deletions autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ pub mod tests {
},
headers: NotificationHeaders {
ttl: 0,
urgency: None,
topic: Some("test-topic".to_string()),
encoding: Some("test-encoding".to_string()),
encryption: Some("test-encryption".to_string()),
Expand Down
12 changes: 11 additions & 1 deletion autoendpoint/src/routers/webpush.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use autopush_common::db::Urgency;
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use reqwest::{Response, StatusCode};
use serde_json::Value;
Expand Down Expand Up @@ -46,8 +47,17 @@ impl Router for WebPushRouter {
);
trace!("✉ Notification = {:?}", notification);

let notif_urgency = Urgency::from(notification.headers.urgency.as_ref());
// If the notification urgency is lower than the user one, we do not send it
// If the user hasn't set a minimum urgency, we accept all notifications
if notif_urgency < user.urgency.unwrap_or(Urgency::VeryLow) {
trace!(
"✉ Notification has an urgency lower than the user one: {:?} < {:?}",
&notif_urgency,
&user.urgency
);
// Check if there is a node connected to the client
if let Some(node_id) = &user.node_id {
} else if let Some(node_id) = &user.node_id {
trace!(
"✉ User has a node ID, sending notification to node: {}",
&node_id
Expand Down
35 changes: 33 additions & 2 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use std::result::Result as StdResult;
use derive_builder::Builder;
use lazy_static::lazy_static;
use regex::RegexSet;
use serde::Serializer;
use serde_derive::{Deserialize, Serialize};
use serde::{Serialize, Serializer};
use serde_derive::Deserialize;
use uuid::Uuid;

#[cfg(feature = "bigtable")]
Expand Down Expand Up @@ -148,6 +148,9 @@ pub struct User {
/// Last node/port the client was or may be connected to
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
/// Last minimum urgency set by the client
#[serde(skip_serializing_if = "Option::is_none")]
pub urgency: Option<Urgency>,
/// Record version
#[serde(skip_serializing_if = "Option::is_none")]
pub record_version: Option<u64>,
Expand Down Expand Up @@ -184,6 +187,7 @@ impl Default for User {
router_type: "webpush".to_string(),
router_data: None,
node_id: None,
urgency: None,
record_version: Some(USER_RECORD_VERSION),
current_timestamp: None,
version: Some(Uuid::new_v4()),
Expand All @@ -203,6 +207,33 @@ impl User {
}
}

#[repr(u8)]
#[derive(Debug, PartialEq, PartialOrd, Serialize, Deserialize, Clone, Copy)]
#[serde(rename_all = "kebab-case")]
pub enum Urgency {
VeryLow = 0,
Low = 1,
Normal = 2,
High = 3,
}

impl From<&str> for Urgency {
fn from(value: &str) -> Self {
match value.to_lowercase().as_str() {
"high" => Urgency::High,
"low" => Urgency::Low,
"very-low" => Urgency::VeryLow,
_ => Urgency::Normal,
}
}
}
jrconlin marked this conversation as resolved.
Show resolved Hide resolved

impl From<Option<&String>> for Urgency {
fn from(value: Option<&String>) -> Self {
Urgency::from(value.and_then(|v| Some(v.as_str())).unwrap_or(""))
}
}

/// A stored Notification record. This is a notification that is to be stored
/// until the User Agent reconnects. These are then converted to publishable
/// [crate::db::Notification] records.
Expand Down
4 changes: 4 additions & 0 deletions autopush-common/src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub(crate) struct NotificationHeaders {
encryption_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
encoding: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
urgency: Option<String>,
}

#[allow(clippy::implicit_hasher)]
Expand All @@ -29,6 +31,7 @@ impl From<NotificationHeaders> for HashMap<String, String> {
map.insert_opt("encryption", val.encryption);
map.insert_opt("encryption_key", val.encryption_key);
map.insert_opt("encoding", val.encoding);
map.insert_opt("urgency", val.urgency);
map
}
}
Expand All @@ -40,6 +43,7 @@ impl From<HashMap<String, String>> for NotificationHeaders {
encryption: val.get("encryption").map(|v| v.to_string()),
encryption_key: val.get("encryption_key").map(|v| v.to_string()),
encoding: val.get("encoding").map(|v| v.to_string()),
urgency: val.get("urgency").map(|v| v.to_string()),
}
}
}
Expand Down