Skip to content

Commit

Permalink
Merge branch 'master' into release/0.17
Browse files Browse the repository at this point in the history
  • Loading branch information
pjenvey committed Oct 19, 2024
2 parents 68e8149 + c535e5a commit 061206e
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 28 deletions.
96 changes: 96 additions & 0 deletions glean/metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
## This file describes the syncserver-rs daily active user (DAU) metrics.
## This defines the various allowed metrics that are to be captured.
## Each metric is written as a JSON blob to the default logger output.

---
# Schema
$schema: moz://mozilla.org/schemas/glean/metrics/2-0-0

# Category
syncstorage:
sync_event:
type: event
description: |
Event to record an instance of sync backend activity initiated by client.
notification_emails:
- [email protected]
- [email protected]
- [email protected]
bugs:
- https://github.com/mozilla-services/syncstorage-rs/issues
data_reviews:
- https://bugzilla.mozilla.org/show_bug.cgi?id=1923967
expires: never

hashed_fxa_uid:
type: uuid
# yamllint disable
description: >
User identifier. Uses `hashed_fxa_uid` for accurate count of sync actions.
Used to determine which user has initiated sync activity.
A single user could make numerous sync actions in a given time
and this id is required to ensure only a single count of daily active use
is made, given a number of actions. Sync_id is not used due to possibility
of new keys being generated during resets or timeouts, whenever encryption
keys change.
# yamllint enable
notification_emails:
- [email protected]
- [email protected]
- [email protected]
bugs:
- https://github.com/mozilla-services/syncstorage-rs/issues
data_reviews:
- https://bugzilla.mozilla.org/show_bug.cgi?id=1923967
expires: never

platform:
type: string
# yamllint disable
description: |
Platform from which sync action was initiated.
Firefox Desktop, Fenix, or Firefox iOS.
# yamllint enable
notification_emails:
- [email protected]
- [email protected]
- [email protected]
bugs:
- https://github.com/mozilla-services/syncstorage-rs/issues
data_reviews:
- https://bugzilla.mozilla.org/show_bug.cgi?id=1923967
expires: never

device_family:
type: string
# yamllint disable
description: |
Device family from which sync action was initiated.
Desktop PC, Tablet, Mobile, and Other.
# yamllint enable
notification_emails:
- [email protected]
- [email protected]
- [email protected]
bugs:
- https://github.com/mozilla-services/syncstorage-rs/issues
data_reviews:
- https://bugzilla.mozilla.org/show_bug.cgi?id=1923967
expires: never

collection:
type: string
# yamllint disable
description: |
Related individual collection where sync activity took place.
Includes bookmarks, history, forms, prefs, tabs, and passwords.
# yamllint enable
notification_emails:
- [email protected]
- [email protected]
- [email protected]
bugs:
- https://github.com/mozilla-services/syncstorage-rs/issues
data_reviews:
- https://bugzilla.mozilla.org/show_bug.cgi?id=1923967
expires: never
20 changes: 20 additions & 0 deletions glean/pings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## Describes the pings being sent out to Glean.

# Schema
$schema: moz://mozilla.org/schemas/glean/pings/2-0-0

# Name
syncstorage:
# Ping parameters
description: |
Ping record for sync active use metrics.
notification_emails:
- [email protected]
- [email protected]
- [email protected]
bugs:
- https://github.com/mozilla-services/syncstorage-rs/issues
data_reviews:
- https://bugzilla.mozilla.org/show_bug.cgi?id=1923967
expires: never
include_client_id: false
47 changes: 43 additions & 4 deletions syncserver-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod tags;
use std::{
fmt,
sync::atomic::{AtomicU64, Ordering},
sync::Arc,
};

use actix_web::web;
Expand Down Expand Up @@ -103,12 +104,21 @@ pub trait InternalError {
/// mostly useful for running I/O tasks). `BlockingThreadpool` intentionally does not implement
/// `Clone`: `Arc`s are not used internally, so a `BlockingThreadpool` should be instantiated once
/// and shared by passing around `Arc<BlockingThreadpool>`s.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct BlockingThreadpool {
spawned_tasks: AtomicU64,
active_threads: Arc<AtomicU64>,
max_thread_count: usize,
}

impl BlockingThreadpool {
pub fn new(max_thread_count: usize) -> Self {
Self {
spawned_tasks: Default::default(),
active_threads: Default::default(),
max_thread_count,
}
}
/// Runs a function as a task on the blocking threadpool.
///
/// WARNING: Spawning a blocking task through means other than calling this method will
Expand All @@ -127,14 +137,43 @@ impl BlockingThreadpool {
self.spawned_tasks.fetch_sub(1, Ordering::Relaxed);
}

web::block(f).await.unwrap_or_else(|_| {
let active_threads = Arc::clone(&self.active_threads);
let f_with_metrics = move || {
active_threads.fetch_add(1, Ordering::Relaxed);
scopeguard::defer! {
active_threads.fetch_sub(1, Ordering::Relaxed);
}
f()
};
web::block(f_with_metrics).await.unwrap_or_else(|_| {
Err(E::internal_error(
"Blocking threadpool operation canceled".to_owned(),
))
})
}

pub fn active_threads(&self) -> u64 {
self.spawned_tasks.load(Ordering::Relaxed)
/// Return the pool's current metrics
pub fn metrics(&self) -> BlockingThreadpoolMetrics {
let spawned_tasks = self.spawned_tasks.load(Ordering::Relaxed);
let active_threads = self.active_threads.load(Ordering::Relaxed);
BlockingThreadpoolMetrics {
queued_tasks: spawned_tasks - active_threads,
active_threads,
max_idle_threads: self.max_thread_count as u64 - active_threads,
}
}
}

/// The thread pool's current metrics
#[derive(Debug)]
pub struct BlockingThreadpoolMetrics {
/// The number of tasks pending
pub queued_tasks: u64,
/// The active number of threads running blocking tasks
pub active_threads: u64,
/// The max number of idle threads: the actual number of idle threads may
/// be smaller as idle threads may exit when left idle for too long (this
/// is tokio's threadpool behavior, which is the underlying thread pool
/// used by actix-web's web::block)
pub max_idle_threads: u64,
}
4 changes: 2 additions & 2 deletions syncserver-common/src/middleware/sentry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
// add an info here, temporarily turn on info level debugging on a given server,
// capture it, and then turn it off before we run out of money.
if let Some(label) = reportable_err.metric_label() {
info!("Sentry: Sending error to metrics: {:?}", reportable_err);
debug!("Sentry: Sending error to metrics: {:?}", reportable_err);
let _ = metrics.incr(&format!("{}.{}", metric_label, label));
}
debug!("Sentry: Not reporting error (service error): {:?}", error);
Expand All @@ -121,7 +121,7 @@ where
if let Some(reportable_err) = error.as_error::<E>() {
if !reportable_err.is_sentry_event() {
if let Some(label) = reportable_err.metric_label() {
info!("Sentry: Sending error to metrics: {:?}", reportable_err);
debug!("Sentry: Sending error to metrics: {:?}", reportable_err);
let _ = metrics.incr(&format!("{}.{}", metric_label, label));
}
debug!("Not reporting error (service error): {:?}", error);
Expand Down
6 changes: 5 additions & 1 deletion syncserver-db-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ impl From<MysqlErrorKind> for MysqlError {

impl ReportableError for MysqlError {
fn is_sentry_event(&self) -> bool {
true
#[allow(clippy::match_like_matches_macro)]
match &self.kind {
MysqlErrorKind::Pool(_) => false,
_ => true,
}
}

fn metric_label(&self) -> Option<String> {
Expand Down
29 changes: 18 additions & 11 deletions syncserver/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use actix_web::{
};
use cadence::{Gauged, StatsdClient};
use futures::future::{self, Ready};
use syncserver_common::{middleware::sentry::SentryWrapper, BlockingThreadpool, Metrics, Taggable};
use syncserver_common::{
middleware::sentry::SentryWrapper, BlockingThreadpool, BlockingThreadpoolMetrics, Metrics,
Taggable,
};
use syncserver_db_common::{GetPoolState, PoolState};
use syncserver_settings::Settings;
use syncstorage_db::{DbError, DbPool, DbPoolImpl};
Expand Down Expand Up @@ -259,16 +262,16 @@ impl Server {
let host = settings.host.clone();
let port = settings.port;
let deadman = Arc::new(RwLock::new(Deadman::from(&settings.syncstorage)));
let blocking_threadpool = Arc::new(BlockingThreadpool::default());
let blocking_threadpool = Arc::new(BlockingThreadpool::new(
settings.worker_max_blocking_threads,
));
let db_pool = DbPoolImpl::new(
&settings.syncstorage,
&Metrics::from(&metrics),
blocking_threadpool.clone(),
)?;
let worker_thread_count =
calculate_worker_max_blocking_threads(settings.worker_max_blocking_threads);
// This is used as part of the metric reporter, which is only run when tokenserver is not
let total_thread_count = settings.worker_max_blocking_threads;
let limits = Arc::new(settings.syncstorage.limits);
let limits_json =
serde_json::to_string(&*limits).expect("ServerLimits failed to serialize");
Expand Down Expand Up @@ -296,7 +299,6 @@ impl Server {
metrics.clone(),
db_pool.clone(),
blocking_threadpool,
total_thread_count,
)?;

None
Expand Down Expand Up @@ -343,13 +345,13 @@ impl Server {
let host = settings.host.clone();
let port = settings.port;
let secrets = Arc::new(settings.master_secret.clone());
let blocking_threadpool = Arc::new(BlockingThreadpool::default());
// Adjust the thread count to include FxA blocking threads.
let thread_count = settings.worker_max_blocking_threads
+ settings
.tokenserver
.additional_blocking_threads_for_fxa_requests
.unwrap_or(0) as usize;
let blocking_threadpool = Arc::new(BlockingThreadpool::new(thread_count));
let worker_thread_count = calculate_worker_max_blocking_threads(thread_count);
let tokenserver_state = tokenserver::ServerState::from_settings(
&settings.tokenserver,
Expand All @@ -366,7 +368,6 @@ impl Server {
tokenserver_state.metrics.clone(),
tokenserver_state.db_pool.clone(),
blocking_threadpool,
thread_count,
)?;

let server = HttpServer::new(move || {
Expand Down Expand Up @@ -465,7 +466,6 @@ fn spawn_metric_periodic_reporter<T: GetPoolState + Send + 'static>(
metrics: Arc<StatsdClient>,
pool: T,
blocking_threadpool: Arc<BlockingThreadpool>,
thread_count: usize,
) -> Result<(), DbError> {
let hostname = hostname::get()
.expect("Couldn't get hostname")
Expand All @@ -489,14 +489,21 @@ fn spawn_metric_periodic_reporter<T: GetPoolState + Send + 'static>(
.with_tag("hostname", &hostname)
.send();

let active_threads = blocking_threadpool.active_threads();
let idle_threads = thread_count as u64 - active_threads;
let BlockingThreadpoolMetrics {
queued_tasks,
active_threads,
max_idle_threads,
} = blocking_threadpool.metrics();
metrics
.gauge_with_tags("blocking_threadpool.queued", queued_tasks)
.with_tag("hostname", &hostname)
.send();
metrics
.gauge_with_tags("blocking_threadpool.active", active_threads)
.with_tag("hostname", &hostname)
.send();
metrics
.gauge_with_tags("blocking_threadpool.idle", idle_threads)
.gauge_with_tags("blocking_threadpool.max_idle", max_idle_threads)
.with_tag("hostname", &hostname)
.send();

Expand Down
4 changes: 3 additions & 1 deletion syncserver/src/server/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ fn get_test_settings() -> Settings {

async fn get_test_state(settings: &Settings) -> ServerState {
let metrics = Arc::new(Metrics::sink());
let blocking_threadpool = Arc::new(BlockingThreadpool::default());
let blocking_threadpool = Arc::new(BlockingThreadpool::new(
settings.worker_max_blocking_threads,
));

ServerState {
db_pool: Box::new(
Expand Down
1 change: 1 addition & 0 deletions syncserver/src/tokenserver/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl FromRequest for DbWrapper {
.map(Self)
.map_err(|e| TokenserverError {
context: format!("Couldn't acquire a database connection: {}", e),
source: Some(Box::new(e)),
..TokenserverError::internal_error()
})
})
Expand Down
2 changes: 2 additions & 0 deletions syncserver/src/tokenserver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#[allow(clippy::result_large_err)]
pub mod extractors;
#[allow(clippy::result_large_err)]
pub mod handlers;
pub mod logging;

Expand Down
2 changes: 1 addition & 1 deletion syncstorage-db/src/tests/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn db_pool(settings: Option<SyncstorageSettings>) -> Result<DbPoolImpl
settings.database_use_test_transactions = use_test_transactions;

let metrics = Metrics::noop();
let pool = DbPoolImpl::new(&settings, &metrics, Arc::new(BlockingThreadpool::default()))?;
let pool = DbPoolImpl::new(&settings, &metrics, Arc::new(BlockingThreadpool::new(512)))?;
Ok(pool)
}

Expand Down
2 changes: 1 addition & 1 deletion syncstorage-mysql/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn db(settings: &SyncstorageSettings) -> DbResult<MysqlDb> {
let pool = MysqlDbPool::new(
settings,
&Metrics::noop(),
Arc::new(BlockingThreadpool::default()),
Arc::new(BlockingThreadpool::new(512)),
)?;
pool.get_sync()
}
Expand Down
2 changes: 2 additions & 0 deletions tokenserver-auth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ mod crypto;

#[cfg(not(feature = "py"))]
pub use crypto::{JWTVerifier, JWTVerifierImpl};
#[allow(clippy::result_large_err)]
pub mod oauth;
#[allow(clippy::result_large_err)]
mod token;
use syncserver_common::Metrics;
pub use token::Tokenlib;
Expand Down
Loading

0 comments on commit 061206e

Please sign in to comment.