From 63ec6f33cecfbe4c1039bcedfbf323e959a56687 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Mon, 9 Sep 2024 11:27:48 -0700 Subject: [PATCH] Add retry loop to deleting timeseries by name (#6504) We currently support deleting timeseries by name in a schema upgrade. This deletion is implemented as a mutation in ClickHouse, which walks all affected data parts and deletes the relevant records in a merge operation. That's asynchronous by default, and run in a pool of background tasks. Despite that, with large tables, it can take a while for each mutation to complete, which blocks the server from queueing new deletion requests. This can lead to timeouts, like seen in #6501. This should fix #6501, but I'm not certain of that because I don't have a good way to reproduce the bug. It seems likely that this is only seen when the database is already heavily loaded, as it might be when doing these mutations on large tables. --- oximeter/db/src/client/mod.rs | 69 +++++++++++++++++++++++++++++------ 1 file changed, 58 insertions(+), 11 deletions(-) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index c2b07ebaa6..04d34c515d 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -29,6 +29,7 @@ use dropshot::EmptyScanParams; use dropshot::PaginationOrder; use dropshot::ResultsPage; use dropshot::WhichPage; +use omicron_common::backoff; use oximeter::schema::TimeseriesKey; use oximeter::types::Sample; use oximeter::TimeseriesName; @@ -627,12 +628,20 @@ impl Client { upgrade_files .insert(name.to_string(), (path.to_owned(), contents)); } else { - warn!( - log, - "skipping non-SQL schema dir entry"; - "dir" => version_schema_dir.display(), - "path" => path.display(), - ); + // Warn on unexpected files, _except_ the + // timeseries-to-delete.txt files we use to expunge timeseries. + if path + .file_name() + .map(|name| name == crate::TIMESERIES_TO_DELETE_FILE) + .unwrap_or(true) + { + warn!( + log, + "skipping non-SQL schema dir entry"; + "dir" => version_schema_dir.display(), + "path" => path.display(), + ); + } continue; } } @@ -1008,10 +1017,46 @@ impl Client { /// Given a list of timeseries by name, delete their schema and any /// associated data records from all tables. + /// + /// If the database isn't available or the request times out, this method + /// will continue to retry the operation until it succeeds. async fn expunge_timeseries_by_name( &self, replicated: bool, to_delete: &[TimeseriesName], + ) -> Result<(), Error> { + let op = || async { + self.expunge_timeseries_by_name_once(replicated, to_delete) + .await + .map_err(|err| match err { + Error::DatabaseUnavailable(_) => { + backoff::BackoffError::transient(err) + } + _ => backoff::BackoffError::permanent(err), + }) + }; + let notify = |error, count, delay| { + warn!( + self.log, + "failed to delete some timeseries"; + "error" => ?error, + "call_count" => count, + "retry_after" => ?delay, + ); + }; + backoff::retry_notify_ext( + backoff::retry_policy_internal_service(), + op, + notify, + ) + .await + } + + /// Attempt to delete the named timeseries once. + async fn expunge_timeseries_by_name_once( + &self, + replicated: bool, + to_delete: &[TimeseriesName], ) -> Result<(), Error> { // The version table should not have any matching data, but let's avoid // it entirely anyway. @@ -4685,7 +4730,7 @@ mod tests { // timeseries share some field types and have others that are distinct // between them, so that we can test that we don't touch tables we // shouldn't, and only delete the parts we should. - let samples = generate_expunge_timeseries_samples(); + let samples = generate_expunge_timeseries_samples(4); client .insert_samples(&samples) .await @@ -4805,7 +4850,9 @@ mod tests { } } - fn generate_expunge_timeseries_samples() -> Vec { + fn generate_expunge_timeseries_samples( + n_samples_per_timeseries: u64, + ) -> Vec { #[derive(oximeter::Target)] struct FirstTarget { first_field: String, @@ -4833,12 +4880,12 @@ mod tests { }; let mut m = SharedMetric { datum: 0 }; - let mut out = Vec::with_capacity(8); - for i in 0..4 { + let mut out = Vec::with_capacity(2 * n_samples_per_timeseries as usize); + for i in 0..n_samples_per_timeseries { m.datum = i; out.push(Sample::new(&ft, &m).unwrap()); } - for i in 4..8 { + for i in n_samples_per_timeseries..(2 * n_samples_per_timeseries) { m.datum = i; out.push(Sample::new(&st, &m).unwrap()); }