Skip to content

Commit

Permalink
Remove timeseries schema from cache on insertion failure (#4348)
Browse files Browse the repository at this point in the history
- Fixes #4335
- Factors out internals of `oximeter_db::Client::insert_samples()` to
aid testing, and add regression
- Remove new timeseries schema from the internal cache, if we fail to
insert them into the schema table.
  • Loading branch information
bnaecker authored Oct 25, 2023
1 parent 6f4923e commit 12029d4
Showing 1 changed file with 191 additions and 70 deletions.
261 changes: 191 additions & 70 deletions oximeter/db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,20 @@ impl Client {
Ok(res.contains("oximeter_cluster"))
}

// Verifies that the schema for a sample matches the schema in the database.
// Verifies that the schema for a sample matches the schema in the database,
// or cache a new one internally.
//
// If the schema exists in the database, and the sample matches that schema, `None` is
// returned. If the schema does not match, an Err is returned (the caller skips the sample in
// this case). If the schema does not _exist_ in the database, Some(schema) is returned, so
// that the caller can insert it into the database at the appropriate time.
async fn verify_sample_schema(
// this case). If the schema does not _exist_ in the database,
// Some((timeseries_name, schema)) is returned, so that the caller can
// insert it into the database at the appropriate time. Note that the schema
// is added to the internal cache, but not inserted into the DB at this
// time.
async fn verify_or_cache_sample_schema(
&self,
sample: &Sample,
) -> Result<Option<String>, Error> {
) -> Result<Option<(TimeseriesName, String)>, Error> {
let sample_schema = model::schema_for(sample);
let name = sample_schema.timeseries_name.clone();
let mut schema = self.schema.lock().await;
Expand Down Expand Up @@ -413,13 +417,15 @@ impl Client {
}
}
Entry::Vacant(entry) => {
let name = entry.key().clone();
entry.insert(sample_schema.clone());
Ok(Some(
Ok(Some((
name,
serde_json::to_string(&model::DbTimeseriesSchema::from(
sample_schema,
))
.expect("Failed to convert schema to DB model"),
))
)))
}
}
}
Expand Down Expand Up @@ -476,7 +482,6 @@ impl Client {
Ok(timeseries_by_key.into_values().collect())
}

// Initialize ClickHouse with the database and metric table schema.
// Execute a generic SQL statement.
//
// TODO-robustness This currently does no validation of the statement.
Expand Down Expand Up @@ -568,53 +573,32 @@ impl Client {
}
Ok(())
}
}

/// A trait allowing a [`Client`] to write data into the timeseries database.
///
/// The vanilla [`Client`] object allows users to query the timeseries database, returning
/// timeseries samples corresponding to various filtering criteria. This trait segregates the
/// methods required for _writing_ new data into the database, and is intended only for use by the
/// `oximeter-collector` crate.
#[async_trait]
pub trait DbWrite {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>;

/// Initialize the replicated telemetry database, creating tables as needed.
async fn init_replicated_db(&self) -> Result<(), Error>;

/// Initialize a single node telemetry database, creating tables as needed.
async fn init_single_node_db(&self) -> Result<(), Error>;

/// Wipe the ClickHouse database entirely from a single node set up.
async fn wipe_single_node_db(&self) -> Result<(), Error>;

/// Wipe the ClickHouse database entirely from a replicated set up.
async fn wipe_replicated_db(&self) -> Result<(), Error>;
}

#[async_trait]
impl DbWrite for Client {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
debug!(self.log, "unrolling {} total samples", samples.len());
// Unroll each sample into its consituent rows, after verifying the schema.
//
// Note that this also inserts the schema into the internal cache, if it
// does not already exist there.
async fn unroll_samples(&self, samples: &[Sample]) -> UnrolledSampleRows {
let mut seen_timeseries = BTreeSet::new();
let mut rows = BTreeMap::new();
let mut new_schema = Vec::new();
let mut new_schema = BTreeMap::new();

for sample in samples.iter() {
match self.verify_sample_schema(sample).await {
match self.verify_or_cache_sample_schema(sample).await {
Err(_) => {
// Skip the sample, but otherwise do nothing. The error is logged in the above
// call.
continue;
}
Ok(schema) => {
if let Some(schema) = schema {
debug!(self.log, "new timeseries schema"; "schema" => ?schema);
new_schema.push(schema);
}
Ok(None) => {}
Ok(Some((name, schema))) => {
debug!(
self.log,
"new timeseries schema";
"timeseries_name" => %name,
"schema" => %schema
);
new_schema.insert(name, schema);
}
}

Expand Down Expand Up @@ -642,34 +626,78 @@ impl DbWrite for Client {
seen_timeseries.insert(key);
}

// Insert the new schema into the database
//
// TODO-robustness There's still a race possible here. If two distinct clients receive new
// but conflicting schema, they will both try to insert those at some point into the schema
// tables. It's not clear how to handle this, since ClickHouse provides no transactions.
// This is unlikely to happen at this point, because the design is such that there will be
// a single `oximeter` instance, which has one client object, connected to a single
// ClickHouse server. But once we start replicating data, the window within which the race
// can occur is much larger, since it includes the time it takes ClickHouse to replicate
// data between nodes.
//
// NOTE: This is an issue even in the case where the schema don't conflict. Two clients may
// receive a sample with a new schema, and both would then try to insert that schema.
UnrolledSampleRows { new_schema, rows }
}

// Save new schema to the database, or remove them from the cache on
// failure.
//
// This attempts to insert the provided schema into the timeseries schema
// table. If that fails, those schema are _also_ removed from the internal
// cache.
//
// TODO-robustness There's still a race possible here. If two distinct clients receive new
// but conflicting schema, they will both try to insert those at some point into the schema
// tables. It's not clear how to handle this, since ClickHouse provides no transactions.
// This is unlikely to happen at this point, because the design is such that there will be
// a single `oximeter` instance, which has one client object, connected to a single
// ClickHouse server. But once we start replicating data, the window within which the race
// can occur is much larger, since it includes the time it takes ClickHouse to replicate
// data between nodes.
//
// NOTE: This is an issue even in the case where the schema don't conflict. Two clients may
// receive a sample with a new schema, and both would then try to insert that schema.
async fn save_new_schema_or_remove(
&self,
new_schema: BTreeMap<TimeseriesName, String>,
) -> Result<(), Error> {
if !new_schema.is_empty() {
debug!(
self.log,
"inserting {} new timeseries schema",
new_schema.len()
);
let body = format!(
"INSERT INTO {db_name}.timeseries_schema FORMAT JSONEachRow\n{row_data}\n",
db_name = crate::DATABASE_NAME,
row_data = new_schema.join("\n"),
const APPROX_ROW_SIZE: usize = 64;
let mut body = String::with_capacity(
APPROX_ROW_SIZE + APPROX_ROW_SIZE * new_schema.len(),
);
self.execute(body).await?;
body.push_str("INSERT INTO ");
body.push_str(crate::DATABASE_NAME);
body.push_str(".timeseries_schema FORMAT JSONEachRow\n");
for row_data in new_schema.values() {
body.push_str(row_data);
body.push_str("\n");
}

// Try to insert the schema.
//
// If this fails, be sure to remove the schema we've added from the
// internal cache. Since we check the internal cache first for
// schema, if we fail here but _don't_ remove the schema, we'll
// never end up inserting the schema, but we will insert samples.
if let Err(e) = self.execute(body).await {
debug!(
self.log,
"failed to insert new schema, removing from cache";
"error" => ?e,
);
let mut schema = self.schema.lock().await;
for name in new_schema.keys() {
schema
.remove(name)
.expect("New schema should have been cached");
}
return Err(e);
}
}
Ok(())
}

// Insert the actual target/metric field rows and measurement rows.
// Insert unrolled sample rows into the corresponding tables.
async fn insert_unrolled_samples(
&self,
rows: BTreeMap<String, Vec<String>>,
) -> Result<(), Error> {
for (table_name, rows) in rows {
let body = format!(
"INSERT INTO {table_name} FORMAT JSONEachRow\n{row_data}\n",
Expand All @@ -682,16 +710,60 @@ impl DbWrite for Client {
self.execute(body).await?;
debug!(
self.log,
"inserted {} rows into table {}",
rows.len(),
table_name
"inserted rows into table";
"n_rows" => rows.len(),
"table_name" => table_name,
);
}

// TODO-correctness We'd like to return all errors to clients here, and there may be as
// many as one per sample. It's not clear how to structure this in a way that's useful.
Ok(())
}
}

#[derive(Debug)]
struct UnrolledSampleRows {
// The timeseries schema rows, keyed by timeseries name.
new_schema: BTreeMap<TimeseriesName, String>,
// The rows to insert in all the other tables, keyed by the table name.
rows: BTreeMap<String, Vec<String>>,
}

/// A trait allowing a [`Client`] to write data into the timeseries database.
///
/// The vanilla [`Client`] object allows users to query the timeseries database, returning
/// timeseries samples corresponding to various filtering criteria. This trait segregates the
/// methods required for _writing_ new data into the database, and is intended only for use by the
/// `oximeter-collector` crate.
#[async_trait]
pub trait DbWrite {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>;

/// Initialize the replicated telemetry database, creating tables as needed.
async fn init_replicated_db(&self) -> Result<(), Error>;

/// Initialize a single node telemetry database, creating tables as needed.
async fn init_single_node_db(&self) -> Result<(), Error>;

/// Wipe the ClickHouse database entirely from a single node set up.
async fn wipe_single_node_db(&self) -> Result<(), Error>;

/// Wipe the ClickHouse database entirely from a replicated set up.
async fn wipe_replicated_db(&self) -> Result<(), Error>;
}

#[async_trait]
impl DbWrite for Client {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
debug!(self.log, "unrolling {} total samples", samples.len());
let UnrolledSampleRows { new_schema, rows } =
self.unroll_samples(samples).await;
self.save_new_schema_or_remove(new_schema).await?;
self.insert_unrolled_samples(rows).await
}

/// Initialize the replicated telemetry database, creating tables as needed.
async fn init_replicated_db(&self) -> Result<(), Error> {
Expand Down Expand Up @@ -1435,7 +1507,7 @@ mod tests {
datum: 1,
};
let sample = Sample::new(&bad_name, &metric).unwrap();
let result = client.verify_sample_schema(&sample).await;
let result = client.verify_or_cache_sample_schema(&sample).await;
assert!(matches!(result, Err(Error::SchemaMismatch { .. })));
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
logctx.cleanup_successful();
Expand Down Expand Up @@ -1589,7 +1661,8 @@ mod tests {

// Verify that this sample is considered new, i.e., we return rows to update the timeseries
// schema table.
let result = client.verify_sample_schema(&sample).await.unwrap();
let result =
client.verify_or_cache_sample_schema(&sample).await.unwrap();
assert!(
matches!(result, Some(_)),
"When verifying a new sample, the rows to be inserted should be returned"
Expand Down Expand Up @@ -1622,7 +1695,8 @@ mod tests {

// This should no longer return a new row to be inserted for the schema of this sample, as
// any schema have been included above.
let result = client.verify_sample_schema(&sample).await.unwrap();
let result =
client.verify_or_cache_sample_schema(&sample).await.unwrap();
assert!(
matches!(result, None),
"After inserting new schema, it should no longer be considered new"
Expand Down Expand Up @@ -2675,4 +2749,51 @@ mod tests {
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
logctx.cleanup_successful();
}

// Regression test for https://github.com/oxidecomputer/omicron/issues/4335.
//
// This tests that, when cache new schema but _fail_ to insert them, we also
// remove them from the internal cache.
#[tokio::test]
async fn test_new_schema_removed_when_not_inserted() {
usdt::register_probes().unwrap();
let logctx = test_setup_log("test_update_schema_cache_on_new_sample");
let log = &logctx.log;

// Let the OS assign a port and discover it after ClickHouse starts
let mut db = ClickHouseInstance::new_single_node(0)
.await
.expect("Failed to start ClickHouse");
let address = SocketAddr::new("::1".parse().unwrap(), db.port());

let client = Client::new(address, &log);
client
.init_single_node_db()
.await
.expect("Failed to initialize timeseries database");
let samples = [test_util::make_sample()];

// We're using the components of the `insert_samples()` method here,
// which has been refactored explicitly for this test. We need to insert
// the schema for this sample into the internal cache, which relies on
// access to the database (since they don't exist).
//
// First, insert the sample into the local cache. This method also
// checks the DB, since this schema doesn't exist in the cache.
let UnrolledSampleRows { new_schema, .. } =
client.unroll_samples(&samples).await;
assert_eq!(client.schema.lock().await.len(), 1);

// Next, we'll kill the database, and then try to insert the schema.
// That will fail, since the DB is now inaccessible.
db.cleanup().await.expect("failed to cleanup ClickHouse server");
let res = client.save_new_schema_or_remove(new_schema).await;
assert!(res.is_err(), "Should have failed since the DB is gone");
assert!(
client.schema.lock().await.is_empty(),
"Failed to remove new schema from the cache when \
they could not be inserted into the DB"
);
logctx.cleanup_successful();
}
}

0 comments on commit 12029d4

Please sign in to comment.