diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 09159e4d29..150f14ec69 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -429,7 +429,7 @@ dependencies = [ "serde_json", "tar", "tempfile", - "toml 0.8.2", + "toml", "xz2", "zopfli", ] @@ -441,7 +441,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a969e13a7589e9e3e4207e153bae624ade2b5622fb4684a4923b23ec3d57719" dependencies = [ "serde", - "toml 0.8.2", + "toml", ] [[package]] @@ -450,13 +450,14 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "include_dir", "peer-cursor", "peer-postgres", "postgres-connection", "prost", "pt", - "refinery", "serde_json", + "siphasher 1.0.0", "tokio", "tokio-postgres", "tracing", @@ -1308,6 +1309,25 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "include_dir" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18762faeff7122e89e0857b02f7ce6fcc0d101d5e9ad2ad7846cc01d61b7f19e" +dependencies = [ + "include_dir_macros", +] + +[[package]] +name = "include_dir_macros" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b139284b5cf57ecfa712bcc66950bb635b31aff41c188e8a4cfc758eca374a3f" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -2209,7 +2229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97dc5fea232fc28d2f597b37c4876b348a40e33f3b02cc975c8d006d78d94b1a" dependencies = [ "toml_datetime", - "toml_edit 0.20.2", + "toml_edit", ] [[package]] @@ -2415,51 +2435,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "refinery" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "529664dbccc0a296947615c997a857912d72d1c44be1fafb7bae54ecfa7a8c24" -dependencies = [ - "refinery-core", - "refinery-macros", -] - -[[package]] -name = "refinery-core" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e895cb870cf06e92318cbbeb701f274d022d5ca87a16fa8244e291cd035ef954" -dependencies = [ - "async-trait", - "cfg-if", - "lazy_static", - "log", - "regex", - "serde", - "siphasher 1.0.0", - "thiserror", - "time", - "tokio", - "tokio-postgres", - "toml 0.7.8", - "url", - "walkdir", -] - -[[package]] -name = "refinery-macros" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "123e8b80f8010c3ae38330c81e76938fc7adf6cdbfbaad20295bb8c22718b4f1" -dependencies = [ - "proc-macro2", - "quote", - "refinery-core", - "regex", - "syn 2.0.43", -] - [[package]] name = "regex" version = "1.10.2" @@ -2750,15 +2725,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.22" @@ -3372,18 +3338,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "toml" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.19.15", -] - [[package]] name = "toml" version = "0.8.2" @@ -3393,7 +3347,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.20.2", + "toml_edit", ] [[package]] @@ -3405,19 +3359,6 @@ dependencies = [ "serde", ] -[[package]] -name = "toml_edit" -version = "0.19.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" -dependencies = [ - "indexmap 2.1.0", - "serde", - "serde_spanned", - "toml_datetime", - "winnow", -] - [[package]] name = "toml_edit" version = "0.20.2" @@ -3728,16 +3669,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "walkdir" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" -dependencies = [ - "same-file", - "winapi-util", -] - [[package]] name = "want" version = "0.3.1" diff --git a/nexus/catalog/Cargo.toml b/nexus/catalog/Cargo.toml index 4065833565..280582a010 100644 --- a/nexus/catalog/Cargo.toml +++ b/nexus/catalog/Cargo.toml @@ -12,7 +12,7 @@ prost = "0.12" peer-cursor = { path = "../peer-cursor" } peer-postgres = { path = "../peer-postgres" } pt = { path = "../pt" } -refinery = { version = "0.8", features = ["tokio-postgres"] } +include_dir = { version = "0.7", default-features = false } tokio = { version = "1.13.0", features = ["full"] } tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", @@ -21,4 +21,5 @@ tokio-postgres = { version = "0.7.6", features = [ ] } tracing = "0.1.29" serde_json = "1.0" +siphasher = "1.0" postgres-connection = { path = "../postgres-connection" } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 32ee57c034..69bf8986be 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -1,6 +1,9 @@ +use std::cmp::Ordering; +use std::hash::{Hash, Hasher}; use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, Context}; +use include_dir::{include_dir, Dir, File}; use peer_cursor::QueryExecutor; use peer_postgres::PostgresQueryExecutor; use postgres_connection::{connect_postgres, get_pg_connection_string}; @@ -11,31 +14,65 @@ use pt::{ peerdb_peers::{peer::Config, DbType, Peer}, }; use serde_json::Value; -use tokio_postgres::{types, Client}; +use siphasher::sip::SipHasher13; +use tokio_postgres::{error::SqlState, types, Client}; -mod embedded { - use refinery::embed_migrations; - embed_migrations!("migrations"); +static MIGRATIONS: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/migrations"); + +#[derive(Eq)] +struct Migration<'a> { + pub file: &'a File<'a>, + pub version: i32, + pub name: &'a str, } -pub struct Catalog { - pg: Box, - executor: Arc, +impl<'a> Migration<'a> { + pub fn new(file: &'a File<'a>) -> anyhow::Result { + let Some(f) = file.path().to_str() else { + return Err(anyhow!("migration filename must be utf8")); + }; + let Some(f) = f.strip_prefix('V') else { + return Err(anyhow!("migration filename must start with V")); + }; + let Some(f) = f.strip_suffix(".sql") else { + return Err(anyhow!("migration filename must end with .sql")); + }; + let Some(__idx) = f.find("__") else { + return Err(anyhow!("migration filename must contain __")); + }; + let Ok(version) = f[..__idx].parse() else { + return Err(anyhow!("migration filename must have number between V & __")); + }; + let name = &f[__idx + 2..]; + Ok(Self { + file, + version, + name, + }) + } +} + +impl<'a> PartialEq for Migration<'a> { + fn eq(&self, other: &Self) -> bool { + self.version == other.version + } +} + +impl<'a> Ord for Migration<'a> { + fn cmp(&self, other: &Self) -> Ordering { + self.version.cmp(&other.version) + } } -async fn run_migrations(client: &mut Client) -> anyhow::Result<()> { - let migration_report = embedded::migrations::runner() - .run_async(client) - .await - .context("Failed to run migrations")?; - for migration in migration_report.applied_migrations() { - tracing::info!( - "Migration Applied - Name: {}, Version: {}", - migration.name(), - migration.version() - ); +impl<'a> PartialOrd for Migration<'a> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.version.cmp(&other.version)) } - Ok(()) +} + +pub struct Catalog { + pg: Box, + executor: Arc, } #[derive(Debug, Copy, Clone)] @@ -86,7 +123,80 @@ impl Catalog { } pub async fn run_migrations(&mut self) -> anyhow::Result<()> { - run_migrations(&mut self.pg).await + let mut migrations = MIGRATIONS + .files() + .map(Migration::new) + .collect::>>()?; + migrations.sort(); + let tx = self.pg.transaction().await?; + let create = tx + .query( + "create table if not exists refinery_schema_history(\ + version int4 primary key, name text, applied_on text, checksum text)", + &[], + ) + .await; + if let Err(err) = create { + if err.code() != Some(&SqlState::UNIQUE_VIOLATION) { + return Err(err.into()); + } + } + + tx.execute( + "lock table refinery_schema_history in share update exclusive mode", + &[], + ) + .await?; + let rows = tx + .query( + "select version, name from refinery_schema_history order by version", + &[], + ) + .await?; + let mut applied = rows + .iter() + .map(|row| (row.get::(0), row.get::(1))); + + for migration in migrations { + if let Some((applied_version, applied_name)) = applied.next() { + if migration.version != applied_version { + return Err(anyhow!( + "Migration version mismatch: {} & {}", + migration.version, + applied_version + )); + } + if migration.name != applied_name { + return Err(anyhow!( + "Migration name mismatch: '{}' & '{}'", + migration.name, + applied_name + )); + } + continue; + } + let Some(sql) = migration.file.contents_utf8() else { + return Err(anyhow!("migration sql must be utf8")); + }; + let checksum = { + let mut hasher = SipHasher13::new(); + migration.name.hash(&mut hasher); + migration.version.hash(&mut hasher); + sql.hash(&mut hasher); + hasher.finish() + }; + + tx.batch_execute(sql).await?; + tx.execute("insert into refinery_schema_history (version, name, applied_on, checksum) values ($1, $2, NOW(), $3)", + &[&migration.version, &migration.name, &checksum.to_string()]).await?; + tracing::info!( + "Migration Applied: {} {}", + migration.version, + migration.name + ); + } + + tx.commit().await.map_err(|err| err.into()) } pub fn get_executor(&self) -> &Arc {