Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry loop to deleting timeseries by name #6504

Merged
merged 2 commits into from
Sep 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use dropshot::EmptyScanParams;
use dropshot::PaginationOrder;
use dropshot::ResultsPage;
use dropshot::WhichPage;
use omicron_common::backoff;
use oximeter::schema::TimeseriesKey;
use oximeter::types::Sample;
use oximeter::TimeseriesName;
Expand Down Expand Up @@ -627,12 +628,20 @@ impl Client {
upgrade_files
.insert(name.to_string(), (path.to_owned(), contents));
} else {
warn!(
log,
"skipping non-SQL schema dir entry";
"dir" => version_schema_dir.display(),
"path" => path.display(),
);
// Warn on unexpected files, _except_ the
// timeseries-to-delete.txt files we use to expunge timeseries.
if path
.file_name()
.map(|name| name == crate::TIMESERIES_TO_DELETE_FILE)
.unwrap_or(true)
{
warn!(
log,
"skipping non-SQL schema dir entry";
"dir" => version_schema_dir.display(),
"path" => path.display(),
);
}
continue;
}
}
Expand Down Expand Up @@ -1008,10 +1017,46 @@ impl Client {

/// Given a list of timeseries by name, delete their schema and any
/// associated data records from all tables.
///
/// If the database isn't available or the request times out, this method
/// will continue to retry the operation until it succeeds.
async fn expunge_timeseries_by_name(
&self,
replicated: bool,
to_delete: &[TimeseriesName],
) -> Result<(), Error> {
let op = || async {
self.expunge_timeseries_by_name_once(replicated, to_delete)
.await
.map_err(|err| match err {
Error::DatabaseUnavailable(_) => {
backoff::BackoffError::transient(err)
}
_ => backoff::BackoffError::permanent(err),
})
};
let notify = |error, count, delay| {
warn!(
self.log,
"failed to delete some timeseries";
"error" => ?error,
"call_count" => count,
"retry_after" => ?delay,
);
};
backoff::retry_notify_ext(
backoff::retry_policy_internal_service(),
op,
notify,
)
.await
}

/// Attempt to delete the named timeseries once.
async fn expunge_timeseries_by_name_once(
&self,
replicated: bool,
to_delete: &[TimeseriesName],
) -> Result<(), Error> {
// The version table should not have any matching data, but let's avoid
// it entirely anyway.
Expand Down Expand Up @@ -4685,7 +4730,7 @@ mod tests {
// timeseries share some field types and have others that are distinct
// between them, so that we can test that we don't touch tables we
// shouldn't, and only delete the parts we should.
let samples = generate_expunge_timeseries_samples();
let samples = generate_expunge_timeseries_samples(4);
client
.insert_samples(&samples)
.await
Expand Down Expand Up @@ -4805,7 +4850,9 @@ mod tests {
}
}

fn generate_expunge_timeseries_samples() -> Vec<Sample> {
fn generate_expunge_timeseries_samples(
n_samples_per_timeseries: u64,
) -> Vec<Sample> {
#[derive(oximeter::Target)]
struct FirstTarget {
first_field: String,
Expand Down Expand Up @@ -4833,12 +4880,12 @@ mod tests {
};
let mut m = SharedMetric { datum: 0 };

let mut out = Vec::with_capacity(8);
for i in 0..4 {
let mut out = Vec::with_capacity(2 * n_samples_per_timeseries as usize);
for i in 0..n_samples_per_timeseries {
m.datum = i;
out.push(Sample::new(&ft, &m).unwrap());
}
for i in 4..8 {
for i in n_samples_per_timeseries..(2 * n_samples_per_timeseries) {
m.datum = i;
out.push(Sample::new(&st, &m).unwrap());
}
Expand Down
Loading