Skip to content

Commit

Permalink
Retry sample insert after keeper cluster changes (#6515)
Browse files Browse the repository at this point in the history
In a [test run](#6508) we
saw that oximeter sample inserts failed because a keeper leader election
was occurring during an insert attempt. Wrap all inserts after keeper
cluster changes in a retry to make them resilient.

Fixes #6508
  • Loading branch information
andrewjstone authored Sep 4, 2024
1 parent 7a742d3 commit ea85b02
Showing 1 changed file with 37 additions and 5 deletions.
42 changes: 37 additions & 5 deletions oximeter/db/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,20 @@ async fn test_cluster() -> anyhow::Result<()> {
.expect("failed to get samples from client1");

// We still have a quorum (2 of 3 keepers), so we should be able to insert
// Removing a node may require a new leader election which would require
// some polling, so we go ahead and do that.
let samples = oximeter_test_utils::generate_test_samples(
input.n_projects,
input.n_instances,
input.n_cpus,
input.n_samples,
);
client3
.insert_samples(&samples)
wait_for_insert(log, &client3, &samples)
.await
.expect("failed to insert samples at server3");
wait_for_num_points(&log, &client2, samples.len() * 3)
.await
.expect("failed to get samples from client1");
.expect("failed to get samples from client2");

// Stop another keeper
deployment.stop_keeper(1.into()).expect("failed to stop keeper 1");
Expand Down Expand Up @@ -355,7 +356,9 @@ async fn test_cluster() -> anyhow::Result<()> {
input.n_cpus,
input.n_samples,
);
client1.insert_samples(&samples).await.expect("failed to insert samples");
wait_for_insert(log, &client1, &samples)
.await
.expect("failed to insert samples at server1");
wait_for_num_points(&log, &client2, samples.len() * 4)
.await
.expect("failed to get samples from client1");
Expand All @@ -375,7 +378,9 @@ async fn test_cluster() -> anyhow::Result<()> {
input.n_cpus,
input.n_samples,
);
client1.insert_samples(&samples).await.expect("failed to insert samples");
wait_for_insert(log, &client1, &samples)
.await
.expect("failed to insert samples at server1");
wait_for_num_points(&log, &client2, samples.len() * 5)
.await
.expect("failed to get samples from client1");
Expand Down Expand Up @@ -519,3 +524,30 @@ async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> {
info!(log, "Clickhouse server ready: {}", client.url());
Ok(())
}

/// Wait for insert to succeed after messing with keeper configuration
async fn wait_for_insert(
log: &Logger,
client: &Client,
samples: &[Sample],
) -> anyhow::Result<()> {
poll::wait_for_condition(
|| async {
client
.insert_samples(&samples)
.await
.map_err(|_| poll::CondCheckError::<oximeter_db::Error>::NotYet)
},
&Duration::from_millis(1000),
&Duration::from_secs(60),
)
.await
.with_context(|| {
format!(
"failed to insert samples at clickhouse server: {}",
client.url()
)
})?;
info!(log, "inserted samples at clickhouse server: {}", client.url());
Ok(())
}

0 comments on commit ea85b02

Please sign in to comment.