Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
- Ensure sequential / contiguous version numbers
- Bump keeper startup timeout
  • Loading branch information
bnaecker committed Nov 2, 2023
1 parent e721359 commit fbd8983
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 3 deletions.
56 changes: 55 additions & 1 deletion oximeter/db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,21 @@ impl Client {
return Err(Error::MissingSchemaVersion(desired_version));
}

// Check we have no gaps in version numbers, starting with the latest
// version and walking through all available ones strictly greater. This
// is to check that the _next_ version is also 1 greater than the
// latest.
let range = (Bound::Excluded(latest), Bound::Included(desired_version));
if available
.range(latest..)
.zip(available.range(range))
.any(|(current, next)| next - current != 1)
{
return Err(Error::NonSequentialSchemaVersions);
}

// Walk through all changes between current version (exclusive) and
// the desired version (inclusive).
let range = (Bound::Excluded(latest), Bound::Included(desired_version));
let versions_to_apply = available.range(range);
let mut current = latest;
for version in versions_to_apply {
Expand Down Expand Up @@ -3863,6 +3875,48 @@ mod tests {
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_ensure_schema_with_version_gaps_fails() {
let logctx =
test_setup_log("test_ensure_schema_with_version_gaps_fails");
let log = &logctx.log;
let mut db = ClickHouseInstance::new_single_node(0)
.await
.expect("Failed to start ClickHouse");
let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port());
let client = Client::new(address, &log);
const REPLICATED: bool = false;
client
.initialize_db_with_version(
REPLICATED,
crate::model::OXIMETER_VERSION,
)
.await
.expect("failed to initialize DB");

const BOGUS_VERSION: u64 = u64::MAX;
let (schema_dir, _) = create_test_upgrade_schema_directory(
REPLICATED,
&[crate::model::OXIMETER_VERSION, BOGUS_VERSION],
)
.await;

let err = client
.ensure_schema(REPLICATED, BOGUS_VERSION, schema_dir.path())
.await
.expect_err(
"Should have received an error when ensuring \
non-sequential version numbers",
);
let Error::NonSequentialSchemaVersions = err else {
panic!(
"Expected an Error::NonSequentialSchemaVersions, found {err:?}"
);
};
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_ensure_schema_with_missing_desired_schema_version_fails() {
let logctx = test_setup_log(
Expand Down
3 changes: 3 additions & 0 deletions oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub enum Error {

#[error("Schema update SQL files should contain at most 1 statement")]
MultipleSqlStatementsInSchemaUpdate { path: PathBuf },

#[error("Schema update versions must be sequential without gaps")]
NonSequentialSchemaVersions,
}

/// A timeseries name.
Expand Down
7 changes: 5 additions & 2 deletions test-utils/src/dev/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use crate::dev::poll;
// Timeout used when starting up ClickHouse subprocess.
const CLICKHOUSE_TIMEOUT: Duration = Duration::from_secs(30);

// Timeout used when starting a ClickHouse keeper subprocess.
const CLICKHOUSE_KEEPER_TIMEOUT: Duration = Duration::from_secs(60);

/// A `ClickHouseInstance` is used to start and manage a ClickHouse single node server process.
#[derive(Debug)]
pub struct ClickHouseInstance {
Expand Down Expand Up @@ -520,7 +523,7 @@ async fn find_clickhouse_port_in_log(
pub async fn wait_for_ready(log_path: PathBuf) -> Result<(), anyhow::Error> {
let p = poll::wait_for_condition(
|| async {
let result = discover_ready(&log_path, CLICKHOUSE_TIMEOUT).await;
let result = discover_ready(&log_path, CLICKHOUSE_KEEPER_TIMEOUT).await;
match result {
Ok(ready) => Ok(ready),
Err(e) => {
Expand All @@ -540,7 +543,7 @@ pub async fn wait_for_ready(log_path: PathBuf) -> Result<(), anyhow::Error> {
}
},
&Duration::from_millis(500),
&CLICKHOUSE_TIMEOUT,
&CLICKHOUSE_KEEPER_TIMEOUT,
)
.await
.context("waiting to discover if ClickHouse is ready for connections")?;
Expand Down

0 comments on commit fbd8983

Please sign in to comment.