Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove timeseries schema from cache on insertion failure #4348

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 191 additions & 70 deletions oximeter/db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,20 @@ impl Client {
Ok(res.contains("oximeter_cluster"))
}

// Verifies that the schema for a sample matches the schema in the database.
// Verifies that the schema for a sample matches the schema in the database,
// or cache a new one internally.
//
// If the schema exists in the database, and the sample matches that schema, `None` is
// returned. If the schema does not match, an Err is returned (the caller skips the sample in
// this case). If the schema does not _exist_ in the database, Some(schema) is returned, so
// that the caller can insert it into the database at the appropriate time.
async fn verify_sample_schema(
// this case). If the schema does not _exist_ in the database,
// Some((timeseries_name, schema)) is returned, so that the caller can
// insert it into the database at the appropriate time. Note that the schema
// is added to the internal cache, but not inserted into the DB at this
// time.
async fn verify_or_cache_sample_schema(
&self,
sample: &Sample,
) -> Result<Option<String>, Error> {
) -> Result<Option<(TimeseriesName, String)>, Error> {
let sample_schema = model::schema_for(sample);
let name = sample_schema.timeseries_name.clone();
let mut schema = self.schema.lock().await;
Expand Down Expand Up @@ -413,13 +417,15 @@ impl Client {
}
}
Entry::Vacant(entry) => {
let name = entry.key().clone();
entry.insert(sample_schema.clone());
Ok(Some(
Ok(Some((
name,
serde_json::to_string(&model::DbTimeseriesSchema::from(
sample_schema,
))
.expect("Failed to convert schema to DB model"),
))
)))
}
}
}
Expand Down Expand Up @@ -476,7 +482,6 @@ impl Client {
Ok(timeseries_by_key.into_values().collect())
}

// Initialize ClickHouse with the database and metric table schema.
// Execute a generic SQL statement.
//
// TODO-robustness This currently does no validation of the statement.
Expand Down Expand Up @@ -568,53 +573,32 @@ impl Client {
}
Ok(())
}
}

/// A trait allowing a [`Client`] to write data into the timeseries database.
///
/// The vanilla [`Client`] object allows users to query the timeseries database, returning
/// timeseries samples corresponding to various filtering criteria. This trait segregates the
/// methods required for _writing_ new data into the database, and is intended only for use by the
/// `oximeter-collector` crate.
#[async_trait]
pub trait DbWrite {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>;

/// Initialize the replicated telemetry database, creating tables as needed.
async fn init_replicated_db(&self) -> Result<(), Error>;

/// Initialize a single node telemetry database, creating tables as needed.
async fn init_single_node_db(&self) -> Result<(), Error>;

/// Wipe the ClickHouse database entirely from a single node set up.
async fn wipe_single_node_db(&self) -> Result<(), Error>;

/// Wipe the ClickHouse database entirely from a replicated set up.
async fn wipe_replicated_db(&self) -> Result<(), Error>;
}

#[async_trait]
impl DbWrite for Client {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
debug!(self.log, "unrolling {} total samples", samples.len());
// Unroll each sample into its consituent rows, after verifying the schema.
//
// Note that this also inserts the schema into the internal cache, if it
// does not already exist there.
async fn unroll_samples(&self, samples: &[Sample]) -> UnrolledSampleRows {
let mut seen_timeseries = BTreeSet::new();
let mut rows = BTreeMap::new();
let mut new_schema = Vec::new();
let mut new_schema = BTreeMap::new();

for sample in samples.iter() {
match self.verify_sample_schema(sample).await {
match self.verify_or_cache_sample_schema(sample).await {
Err(_) => {
// Skip the sample, but otherwise do nothing. The error is logged in the above
// call.
continue;
}
Ok(schema) => {
if let Some(schema) = schema {
debug!(self.log, "new timeseries schema"; "schema" => ?schema);
new_schema.push(schema);
}
Ok(None) => {}
Ok(Some((name, schema))) => {
debug!(
self.log,
"new timeseries schema";
"timeseries_name" => %name,
"schema" => %schema
);
new_schema.insert(name, schema);
}
}

Expand Down Expand Up @@ -642,34 +626,78 @@ impl DbWrite for Client {
seen_timeseries.insert(key);
}

// Insert the new schema into the database
//
// TODO-robustness There's still a race possible here. If two distinct clients receive new
// but conflicting schema, they will both try to insert those at some point into the schema
// tables. It's not clear how to handle this, since ClickHouse provides no transactions.
// This is unlikely to happen at this point, because the design is such that there will be
// a single `oximeter` instance, which has one client object, connected to a single
// ClickHouse server. But once we start replicating data, the window within which the race
// can occur is much larger, since it includes the time it takes ClickHouse to replicate
// data between nodes.
//
// NOTE: This is an issue even in the case where the schema don't conflict. Two clients may
// receive a sample with a new schema, and both would then try to insert that schema.
UnrolledSampleRows { new_schema, rows }
}

// Save new schema to the database, or remove them from the cache on
// failure.
//
// This attempts to insert the provided schema into the timeseries schema
// table. If that fails, those schema are _also_ removed from the internal
// cache.
//
// TODO-robustness There's still a race possible here. If two distinct clients receive new
// but conflicting schema, they will both try to insert those at some point into the schema
// tables. It's not clear how to handle this, since ClickHouse provides no transactions.
// This is unlikely to happen at this point, because the design is such that there will be
// a single `oximeter` instance, which has one client object, connected to a single
// ClickHouse server. But once we start replicating data, the window within which the race
// can occur is much larger, since it includes the time it takes ClickHouse to replicate
// data between nodes.
//
// NOTE: This is an issue even in the case where the schema don't conflict. Two clients may
// receive a sample with a new schema, and both would then try to insert that schema.
async fn save_new_schema_or_remove(
&self,
new_schema: BTreeMap<TimeseriesName, String>,
) -> Result<(), Error> {
if !new_schema.is_empty() {
debug!(
self.log,
"inserting {} new timeseries schema",
new_schema.len()
);
let body = format!(
"INSERT INTO {db_name}.timeseries_schema FORMAT JSONEachRow\n{row_data}\n",
db_name = crate::DATABASE_NAME,
row_data = new_schema.join("\n"),
const APPROX_ROW_SIZE: usize = 64;
let mut body = String::with_capacity(
APPROX_ROW_SIZE + APPROX_ROW_SIZE * new_schema.len(),
);
self.execute(body).await?;
body.push_str("INSERT INTO ");
body.push_str(crate::DATABASE_NAME);
body.push_str(".timeseries_schema FORMAT JSONEachRow\n");
for row_data in new_schema.values() {
body.push_str(row_data);
body.push_str("\n");
}

// Try to insert the schema.
//
// If this fails, be sure to remove the schema we've added from the
// internal cache. Since we check the internal cache first for
// schema, if we fail here but _don't_ remove the schema, we'll
// never end up inserting the schema, but we will insert samples.
if let Err(e) = self.execute(body).await {
debug!(
self.log,
"failed to insert new schema, removing from cache";
"error" => ?e,
);
let mut schema = self.schema.lock().await;
for name in new_schema.keys() {
schema
.remove(name)
.expect("New schema should have been cached");
}
bnaecker marked this conversation as resolved.
Show resolved Hide resolved
return Err(e);
}
}
Ok(())
}

// Insert the actual target/metric field rows and measurement rows.
// Insert unrolled sample rows into the corresponding tables.
async fn insert_unrolled_samples(
&self,
rows: BTreeMap<String, Vec<String>>,
) -> Result<(), Error> {
for (table_name, rows) in rows {
let body = format!(
"INSERT INTO {table_name} FORMAT JSONEachRow\n{row_data}\n",
Expand All @@ -682,16 +710,60 @@ impl DbWrite for Client {
self.execute(body).await?;
debug!(
self.log,
"inserted {} rows into table {}",
rows.len(),
table_name
"inserted rows into table";
"n_rows" => rows.len(),
"table_name" => table_name,
);
}

// TODO-correctness We'd like to return all errors to clients here, and there may be as
// many as one per sample. It's not clear how to structure this in a way that's useful.
Ok(())
}
}

#[derive(Debug)]
struct UnrolledSampleRows {
// The timeseries schema rows, keyed by timeseries name.
new_schema: BTreeMap<TimeseriesName, String>,
// The rows to insert in all the other tables, keyed by the table name.
rows: BTreeMap<String, Vec<String>>,
}

/// A trait allowing a [`Client`] to write data into the timeseries database.
///
/// The vanilla [`Client`] object allows users to query the timeseries database, returning
/// timeseries samples corresponding to various filtering criteria. This trait segregates the
/// methods required for _writing_ new data into the database, and is intended only for use by the
/// `oximeter-collector` crate.
#[async_trait]
pub trait DbWrite {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>;

/// Initialize the replicated telemetry database, creating tables as needed.
async fn init_replicated_db(&self) -> Result<(), Error>;

/// Initialize a single node telemetry database, creating tables as needed.
async fn init_single_node_db(&self) -> Result<(), Error>;

/// Wipe the ClickHouse database entirely from a single node set up.
async fn wipe_single_node_db(&self) -> Result<(), Error>;

/// Wipe the ClickHouse database entirely from a replicated set up.
async fn wipe_replicated_db(&self) -> Result<(), Error>;
}

#[async_trait]
impl DbWrite for Client {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
debug!(self.log, "unrolling {} total samples", samples.len());
let UnrolledSampleRows { new_schema, rows } =
self.unroll_samples(samples).await;
self.save_new_schema_or_remove(new_schema).await?;
self.insert_unrolled_samples(rows).await
}

/// Initialize the replicated telemetry database, creating tables as needed.
async fn init_replicated_db(&self) -> Result<(), Error> {
Expand Down Expand Up @@ -1435,7 +1507,7 @@ mod tests {
datum: 1,
};
let sample = Sample::new(&bad_name, &metric).unwrap();
let result = client.verify_sample_schema(&sample).await;
let result = client.verify_or_cache_sample_schema(&sample).await;
assert!(matches!(result, Err(Error::SchemaMismatch { .. })));
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
logctx.cleanup_successful();
Expand Down Expand Up @@ -1589,7 +1661,8 @@ mod tests {

// Verify that this sample is considered new, i.e., we return rows to update the timeseries
// schema table.
let result = client.verify_sample_schema(&sample).await.unwrap();
let result =
client.verify_or_cache_sample_schema(&sample).await.unwrap();
assert!(
matches!(result, Some(_)),
"When verifying a new sample, the rows to be inserted should be returned"
Expand Down Expand Up @@ -1622,7 +1695,8 @@ mod tests {

// This should no longer return a new row to be inserted for the schema of this sample, as
// any schema have been included above.
let result = client.verify_sample_schema(&sample).await.unwrap();
let result =
client.verify_or_cache_sample_schema(&sample).await.unwrap();
assert!(
matches!(result, None),
"After inserting new schema, it should no longer be considered new"
Expand Down Expand Up @@ -2675,4 +2749,51 @@ mod tests {
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
logctx.cleanup_successful();
}

// Regression test for https://github.com/oxidecomputer/omicron/issues/4335.
//
// This tests that, when cache new schema but _fail_ to insert them, we also
// remove them from the internal cache.
#[tokio::test]
async fn test_new_schema_removed_when_not_inserted() {
usdt::register_probes().unwrap();
let logctx = test_setup_log("test_update_schema_cache_on_new_sample");
let log = &logctx.log;

// Let the OS assign a port and discover it after ClickHouse starts
let mut db = ClickHouseInstance::new_single_node(0)
.await
.expect("Failed to start ClickHouse");
let address = SocketAddr::new("::1".parse().unwrap(), db.port());

let client = Client::new(address, &log);
client
.init_single_node_db()
.await
.expect("Failed to initialize timeseries database");
let samples = [test_util::make_sample()];

// We're using the components of the `insert_samples()` method here,
// which has been refactored explicitly for this test. We need to insert
// the schema for this sample into the internal cache, which relies on
// access to the database (since they don't exist).
//
// First, insert the sample into the local cache. This method also
// checks the DB, since this schema doesn't exist in the cache.
let UnrolledSampleRows { new_schema, .. } =
client.unroll_samples(&samples).await;
assert_eq!(client.schema.lock().await.len(), 1);

// Next, we'll kill the database, and then try to insert the schema.
// That will fail, since the DB is now inaccessible.
db.cleanup().await.expect("failed to cleanup ClickHouse server");
let res = client.save_new_schema_or_remove(new_schema).await;
assert!(res.is_err(), "Should have failed since the DB is gone");
assert!(
client.schema.lock().await.is_empty(),
"Failed to remove new schema from the cache when \
they could not be inserted into the DB"
);
logctx.cleanup_successful();
}
}
Loading