Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxy): dont trigger error alerts for unknown topics #10266

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion proxy/src/redis/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ async fn try_connect(client: &ConnectionWithCredentialsProvider) -> anyhow::Resu
Ok(conn)
}

#[derive(Debug, Deserialize)]
struct NotificationHeader<'a> {
topic: &'a str,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
cloneable marked this conversation as resolved.
Show resolved Hide resolved
#[serde(tag = "topic", content = "data")]
pub(crate) enum Notification {
Expand Down Expand Up @@ -69,6 +74,7 @@ pub(crate) enum Notification {
#[serde(rename = "/cancel_session")]
Cancel(CancelSession),
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct AllowedIpsUpdate {
project_id: ProjectIdInt,
Expand Down Expand Up @@ -96,6 +102,7 @@ pub(crate) struct PasswordUpdate {
project_id: ProjectIdInt,
role_name: RoleNameInt,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct CancelSession {
pub(crate) region_id: Option<String>,
Expand Down Expand Up @@ -141,18 +148,23 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
region_id,
}
}

pub(crate) async fn increment_active_listeners(&self) {
self.cache.increment_active_listeners().await;
}

pub(crate) async fn decrement_active_listeners(&self) {
self.cache.decrement_active_listeners().await;
}

#[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))]
async fn handle_message(&self, msg: redis::Msg) -> anyhow::Result<()> {
let payload: String = msg.get_payload()?;
tracing::debug!(?payload, "received a message payload");

let msg: Notification = match serde_json::from_str(&payload) {
// For better error handling, we first parse the payload to extract the topic.
// If there's a topic we don't support, we can handle that error more gracefully.
let header: NotificationHeader = match serde_json::from_str(&payload) {
Ok(msg) => msg,
Err(e) => {
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
Expand All @@ -162,6 +174,32 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
return Ok(());
}
};

match header.topic {
"/allowed_ips_updated"
| "/block_public_or_vpc_access_updated"
| "/allowed_vpc_endpoints_updated_for_org"
| "/allowed_vpc_endpoints_updated_for_projects"
| "/password_updated"
| "/cancel_session" => {}
topic => {
// don't update the metric for redis errors if it's just a topic we don't know about.
tracing::warn!(topic, "unknown topic");
return Ok(());
}
}

let msg: Notification = match serde_json::from_str(&payload) {
Ok(msg) => msg,
Err(e) => {
Metrics::get().proxy.redis_errors_total.inc(RedisErrors {
channel: msg.get_channel_name(),
});
tracing::error!(topic = header.topic, "broken message: {e}");
return Ok(());
}
};

tracing::debug!(?msg, "received a message");
match msg {
Notification::Cancel(cancel_session) => {
Expand Down
Loading