diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs
index c2b7c820a89..6339b861ea6 100644
--- a/oximeter/db/src/client.rs
+++ b/oximeter/db/src/client.rs
@@ -370,16 +370,20 @@ impl Client {
Ok(res.contains("oximeter_cluster"))
}
- // Verifies that the schema for a sample matches the schema in the database.
+ // Verifies that the schema for a sample matches the schema in the database,
+ // or cache a new one internally.
//
// If the schema exists in the database, and the sample matches that schema, `None` is
// returned. If the schema does not match, an Err is returned (the caller skips the sample in
- // this case). If the schema does not _exist_ in the database, Some(schema) is returned, so
- // that the caller can insert it into the database at the appropriate time.
- async fn verify_sample_schema(
+ // this case). If the schema does not _exist_ in the database,
+ // Some((timeseries_name, schema)) is returned, so that the caller can
+ // insert it into the database at the appropriate time. Note that the schema
+ // is added to the internal cache, but not inserted into the DB at this
+ // time.
+ async fn verify_or_cache_sample_schema(
&self,
sample: &Sample,
- ) -> Result, Error> {
+ ) -> Result , Error> {
let sample_schema = model::schema_for(sample);
let name = sample_schema.timeseries_name.clone();
let mut schema = self.schema.lock().await;
@@ -413,13 +417,15 @@ impl Client {
}
}
Entry::Vacant(entry) => {
+ let name = entry.key().clone();
entry.insert(sample_schema.clone());
- Ok(Some(
+ Ok(Some((
+ name,
serde_json::to_string(&model::DbTimeseriesSchema::from(
sample_schema,
))
.expect("Failed to convert schema to DB model"),
- ))
+ )))
}
}
}
@@ -476,7 +482,6 @@ impl Client {
Ok(timeseries_by_key.into_values().collect())
}
- // Initialize ClickHouse with the database and metric table schema.
// Execute a generic SQL statement.
//
// TODO-robustness This currently does no validation of the statement.
@@ -568,53 +573,32 @@ impl Client {
}
Ok(())
}
-}
-
-/// A trait allowing a [`Client`] to write data into the timeseries database.
-///
-/// The vanilla [`Client`] object allows users to query the timeseries database, returning
-/// timeseries samples corresponding to various filtering criteria. This trait segregates the
-/// methods required for _writing_ new data into the database, and is intended only for use by the
-/// `oximeter-collector` crate.
-#[async_trait]
-pub trait DbWrite {
- /// Insert the given samples into the database.
- async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>;
-
- /// Initialize the replicated telemetry database, creating tables as needed.
- async fn init_replicated_db(&self) -> Result<(), Error>;
-
- /// Initialize a single node telemetry database, creating tables as needed.
- async fn init_single_node_db(&self) -> Result<(), Error>;
- /// Wipe the ClickHouse database entirely from a single node set up.
- async fn wipe_single_node_db(&self) -> Result<(), Error>;
-
- /// Wipe the ClickHouse database entirely from a replicated set up.
- async fn wipe_replicated_db(&self) -> Result<(), Error>;
-}
-
-#[async_trait]
-impl DbWrite for Client {
- /// Insert the given samples into the database.
- async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
- debug!(self.log, "unrolling {} total samples", samples.len());
+ // Unroll each sample into its consituent rows, after verifying the schema.
+ //
+ // Note that this also inserts the schema into the internal cache, if it
+ // does not already exist there.
+ async fn unroll_samples(&self, samples: &[Sample]) -> UnrolledSampleRows {
let mut seen_timeseries = BTreeSet::new();
let mut rows = BTreeMap::new();
- let mut new_schema = Vec::new();
+ let mut new_schema = BTreeMap::new();
for sample in samples.iter() {
- match self.verify_sample_schema(sample).await {
+ match self.verify_or_cache_sample_schema(sample).await {
Err(_) => {
// Skip the sample, but otherwise do nothing. The error is logged in the above
// call.
continue;
}
- Ok(schema) => {
- if let Some(schema) = schema {
- debug!(self.log, "new timeseries schema"; "schema" => ?schema);
- new_schema.push(schema);
- }
+ Ok(None) => {}
+ Ok(Some((name, schema))) => {
+ debug!(
+ self.log,
+ "new timeseries schema";
+ "timeseries_name" => %name,
+ "schema" => %schema
+ );
+ new_schema.insert(name, schema);
}
}
@@ -642,34 +626,78 @@ impl DbWrite for Client {
seen_timeseries.insert(key);
}
- // Insert the new schema into the database
- //
- // TODO-robustness There's still a race possible here. If two distinct clients receive new
- // but conflicting schema, they will both try to insert those at some point into the schema
- // tables. It's not clear how to handle this, since ClickHouse provides no transactions.
- // This is unlikely to happen at this point, because the design is such that there will be
- // a single `oximeter` instance, which has one client object, connected to a single
- // ClickHouse server. But once we start replicating data, the window within which the race
- // can occur is much larger, since it includes the time it takes ClickHouse to replicate
- // data between nodes.
- //
- // NOTE: This is an issue even in the case where the schema don't conflict. Two clients may
- // receive a sample with a new schema, and both would then try to insert that schema.
+ UnrolledSampleRows { new_schema, rows }
+ }
+
+ // Flush new schema to the database, or remove them from the cache on
+ // failure.
+ //
+ // This attempts to insert the provided schema into the timeseries schema
+ // table. If that fails, those schema are _also_ removed from the internal
+ // cache.
+ //
+ // TODO-robustness There's still a race possible here. If two distinct clients receive new
+ // but conflicting schema, they will both try to insert those at some point into the schema
+ // tables. It's not clear how to handle this, since ClickHouse provides no transactions.
+ // This is unlikely to happen at this point, because the design is such that there will be
+ // a single `oximeter` instance, which has one client object, connected to a single
+ // ClickHouse server. But once we start replicating data, the window within which the race
+ // can occur is much larger, since it includes the time it takes ClickHouse to replicate
+ // data between nodes.
+ //
+ // NOTE: This is an issue even in the case where the schema don't conflict. Two clients may
+ // receive a sample with a new schema, and both would then try to insert that schema.
+ async fn flush_new_schema_or_remove(
+ &self,
+ new_schema: BTreeMap,
+ ) -> Result<(), Error> {
if !new_schema.is_empty() {
debug!(
self.log,
"inserting {} new timeseries schema",
new_schema.len()
);
- let body = format!(
- "INSERT INTO {db_name}.timeseries_schema FORMAT JSONEachRow\n{row_data}\n",
- db_name = crate::DATABASE_NAME,
- row_data = new_schema.join("\n"),
+ const APPROX_ROW_SIZE: usize = 64;
+ let mut body = String::with_capacity(
+ APPROX_ROW_SIZE + APPROX_ROW_SIZE * new_schema.len(),
);
- self.execute(body).await?;
+ body.push_str("INSERT INTO ");
+ body.push_str(crate::DATABASE_NAME);
+ body.push_str(".timeseries_schema FORMAT JSONEachRow\n");
+ for row_data in new_schema.values() {
+ body.push_str(row_data);
+ body.push_str("\n");
+ }
+
+ // Try to insert the schema.
+ //
+ // If this fails, be sure to remove the schema we've added from the
+ // internal cache. Since we check the internal cache first for
+ // schema, if we fail here but _don't_ remove the schema, we'll
+ // never end up inserting the schema, but we will insert samples.
+ if let Err(e) = self.execute(body).await {
+ debug!(
+ self.log,
+ "failed to insert new schema, removing from cache";
+ "error" => ?e,
+ );
+ let mut schema = self.schema.lock().await;
+ for name in new_schema.keys() {
+ schema
+ .remove(name)
+ .expect("New schema should have been cached");
+ }
+ return Err(e);
+ }
}
+ Ok(())
+ }
- // Insert the actual target/metric field rows and measurement rows.
+ // Insert unrolled sample rows into the corresponding tables.
+ async fn insert_unrolled_samples(
+ &self,
+ rows: BTreeMap>,
+ ) -> Result<(), Error> {
for (table_name, rows) in rows {
let body = format!(
"INSERT INTO {table_name} FORMAT JSONEachRow\n{row_data}\n",
@@ -682,9 +710,9 @@ impl DbWrite for Client {
self.execute(body).await?;
debug!(
self.log,
- "inserted {} rows into table {}",
- rows.len(),
- table_name
+ "inserted rows into table";
+ "n_rows" => rows.len(),
+ "table_name" => table_name,
);
}
@@ -692,6 +720,50 @@ impl DbWrite for Client {
// many as one per sample. It's not clear how to structure this in a way that's useful.
Ok(())
}
+}
+
+#[derive(Debug)]
+struct UnrolledSampleRows {
+ // The timeseries schema rows, keyed by timeseries name.
+ new_schema: BTreeMap,
+ // The rows to insert in all the other tables, keyed by the table name.
+ rows: BTreeMap>,
+}
+
+/// A trait allowing a [`Client`] to write data into the timeseries database.
+///
+/// The vanilla [`Client`] object allows users to query the timeseries database, returning
+/// timeseries samples corresponding to various filtering criteria. This trait segregates the
+/// methods required for _writing_ new data into the database, and is intended only for use by the
+/// `oximeter-collector` crate.
+#[async_trait]
+pub trait DbWrite {
+ /// Insert the given samples into the database.
+ async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>;
+
+ /// Initialize the replicated telemetry database, creating tables as needed.
+ async fn init_replicated_db(&self) -> Result<(), Error>;
+
+ /// Initialize a single node telemetry database, creating tables as needed.
+ async fn init_single_node_db(&self) -> Result<(), Error>;
+
+ /// Wipe the ClickHouse database entirely from a single node set up.
+ async fn wipe_single_node_db(&self) -> Result<(), Error>;
+
+ /// Wipe the ClickHouse database entirely from a replicated set up.
+ async fn wipe_replicated_db(&self) -> Result<(), Error>;
+}
+
+#[async_trait]
+impl DbWrite for Client {
+ /// Insert the given samples into the database.
+ async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
+ debug!(self.log, "unrolling {} total samples", samples.len());
+ let UnrolledSampleRows { new_schema, rows } =
+ self.unroll_samples(samples).await;
+ self.flush_new_schema_or_remove(new_schema).await?;
+ self.insert_unrolled_samples(rows).await
+ }
/// Initialize the replicated telemetry database, creating tables as needed.
async fn init_replicated_db(&self) -> Result<(), Error> {
@@ -1435,7 +1507,7 @@ mod tests {
datum: 1,
};
let sample = Sample::new(&bad_name, &metric).unwrap();
- let result = client.verify_sample_schema(&sample).await;
+ let result = client.verify_or_cache_sample_schema(&sample).await;
assert!(matches!(result, Err(Error::SchemaMismatch { .. })));
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
logctx.cleanup_successful();
@@ -1589,7 +1661,8 @@ mod tests {
// Verify that this sample is considered new, i.e., we return rows to update the timeseries
// schema table.
- let result = client.verify_sample_schema(&sample).await.unwrap();
+ let result =
+ client.verify_or_cache_sample_schema(&sample).await.unwrap();
assert!(
matches!(result, Some(_)),
"When verifying a new sample, the rows to be inserted should be returned"
@@ -1622,7 +1695,8 @@ mod tests {
// This should no longer return a new row to be inserted for the schema of this sample, as
// any schema have been included above.
- let result = client.verify_sample_schema(&sample).await.unwrap();
+ let result =
+ client.verify_or_cache_sample_schema(&sample).await.unwrap();
assert!(
matches!(result, None),
"After inserting new schema, it should no longer be considered new"
@@ -2634,4 +2708,51 @@ mod tests {
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
logctx.cleanup_successful();
}
+
+ // Regression test for https://github.com/oxidecomputer/omicron/issues/4335.
+ //
+ // This tests that, when cache new schema but _fail_ to insert them, we also
+ // remove them from the internal cache.
+ #[tokio::test]
+ async fn test_new_schema_removed_when_not_inserted() {
+ usdt::register_probes().unwrap();
+ let logctx = test_setup_log("test_update_schema_cache_on_new_sample");
+ let log = &logctx.log;
+
+ // Let the OS assign a port and discover it after ClickHouse starts
+ let mut db = ClickHouseInstance::new_single_node(0)
+ .await
+ .expect("Failed to start ClickHouse");
+ let address = SocketAddr::new("::1".parse().unwrap(), db.port());
+
+ let client = Client::new(address, &log);
+ client
+ .init_single_node_db()
+ .await
+ .expect("Failed to initialize timeseries database");
+ let samples = [test_util::make_sample()];
+
+ // We're using the components of the `insert_samples()` method here,
+ // which has been refactored explicitly for this test. We need to insert
+ // the schema for this sample into the internal cache, which relies on
+ // access to the database (since they don't exist).
+ //
+ // First, insert the sample into the local cache. This method also
+ // checks the DB, since this schema doesn't exist in the cache.
+ let UnrolledSampleRows { new_schema, .. } =
+ client.unroll_samples(&samples).await;
+ assert_eq!(client.schema.lock().await.len(), 1);
+
+ // Next, we'll kill the database, and then try to insert the schema.
+ // That will fail, since the DB is now inaccessible.
+ db.cleanup().await.expect("failed to cleanup ClickHouse server");
+ let res = client.flush_new_schema_or_remove(new_schema).await;
+ assert!(res.is_err(), "Should have failed since the DB is gone");
+ assert!(
+ client.schema.lock().await.is_empty(),
+ "Failed to remove new schema from the cache when \
+ they could not be inserted into the DB"
+ );
+ logctx.cleanup_successful();
+ }
}