Skip to content

Commit

Permalink
Replace refinery with our own backwards compatible migration code
Browse files Browse the repository at this point in the history
Currently server_tests must be run with --test-threads=1 because of two things:
1. port collision
2. concurrent migrations failing

I explored fixing these in #926. Ports are easily solved, but migrations are a pain

This migration code works around concurrency with two changes from refinery:
1. ignore unique constraint violation from CREATE TABLE IF NOT EXISTS
2. lock migration table so concurrent migrations are serialized

Considered submitting a PR to refinery with these two fixes,
but this simple change was non trivial since they support multiple async/sync database drivers
  • Loading branch information
serprex committed Dec 29, 2023
1 parent efef9a6 commit f9abbc7
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 115 deletions.
119 changes: 25 additions & 94 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion nexus/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ prost = "0.12"
peer-cursor = { path = "../peer-cursor" }
peer-postgres = { path = "../peer-postgres" }
pt = { path = "../pt" }
refinery = { version = "0.8", features = ["tokio-postgres"] }
include_dir = { version = "0.7", default-features = false }
tokio = { version = "1.13.0", features = ["full"] }
tokio-postgres = { version = "0.7.6", features = [
"with-chrono-0_4",
Expand All @@ -21,4 +21,5 @@ tokio-postgres = { version = "0.7.6", features = [
] }
tracing = "0.1.29"
serde_json = "1.0"
siphasher = "1.0"
postgres-connection = { path = "../postgres-connection" }
110 changes: 90 additions & 20 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};

use anyhow::{anyhow, Context};
use peer_cursor::QueryExecutor;
Expand All @@ -11,31 +13,55 @@ use pt::{
peerdb_peers::{peer::Config, DbType, Peer},
};
use serde_json::Value;
use tokio_postgres::{types, Client};
use siphasher::sip::SipHasher13;
use tokio_postgres::{types, Client, error::SqlState};
use include_dir::{include_dir, Dir, File};

mod embedded {
use refinery::embed_migrations;
embed_migrations!("migrations");
static MIGRATIONS: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/migrations");

#[derive(Eq)]
struct Migration<'a> {
pub file: &'a File<'a>,
pub version: i32,
pub name: &'a str,
}

pub struct Catalog {
pg: Box<Client>,
executor: Arc<dyn QueryExecutor>,
impl<'a> Migration<'a> {
pub fn new(file: &'a File<'a>) -> Self {
let f = file.path().to_str().expect("migration name must be utf8");
let f = f.strip_prefix('V').expect("migration name must start with V");
let __idx = f.find("__").expect("migration name must contain __");
let version = f[..__idx].parse().expect("migration name must have number in between V & __");
let name = &f[__idx+2..];
Self {
file,
version,
name,
}
}
}

async fn run_migrations(client: &mut Client) -> anyhow::Result<()> {
let migration_report = embedded::migrations::runner()
.run_async(client)
.await
.context("Failed to run migrations")?;
for migration in migration_report.applied_migrations() {
tracing::info!(
"Migration Applied - Name: {}, Version: {}",
migration.name(),
migration.version()
);
impl<'a> PartialEq for Migration<'a> {
fn eq(&self, other: &Self) -> bool {
self.version == other.version
}
Ok(())
}

impl<'a> Ord for Migration<'a> {
fn cmp(&self, other: &Self) -> Ordering {
self.version.cmp(&other.version)
}
}

impl<'a> PartialOrd for Migration<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.version.cmp(&other.version))
}
}

pub struct Catalog {
pg: Box<Client>,
executor: Arc<dyn QueryExecutor>,
}

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -86,7 +112,51 @@ impl Catalog {
}

pub async fn run_migrations(&mut self) -> anyhow::Result<()> {
run_migrations(&mut self.pg).await
let mut migrations: Vec<_> = MIGRATIONS.files().map(Migration::new).collect();
migrations.sort();
let tx = self.pg.transaction().await?;
let create = tx.query(
"create table if not exists refinery_schema_history(\
version int4 primary key, name text, applied_on text, checksum text)", &[]).await;
if let Err(err) = create {
if err.code() != Some(&SqlState::UNIQUE_VIOLATION) {
return Err(err.into());
}
}

tx.query("lock table refinery_schema_history in share update exclusive mode", &[]).await?;
let rows = tx.query("select version, name from refinery_schema_history order by version", &[]).await?;
let mut applied = rows.iter().map(|row| (row.get::<usize, i32>(0), row.get::<usize, &str>(1)));

for migration in migrations {
if let Some((applied_version, applied_name)) = applied.next() {
if migration.version != applied_version {
return Err(anyhow!("Migration version mismatch: {} & {}", migration.version, applied_version));
}
if migration.name != applied_name {
return Err(anyhow!(
"Migration name mismatch: '{}' & '{}'", migration.name, applied_name,
).into())
}
continue
}
let sql = migration.file.contents_utf8().expect("migration file must be utf8");

let checksum = {
let mut hasher = SipHasher13::new();
migration.name.hash(&mut hasher);
migration.version.hash(&mut hasher);
sql.hash(&mut hasher);
hasher.finish()
};

tx.query(sql, &[]).await?;
tx.query("insert into refinery_schema_history (version, name, applied_on, checksum) values ($1, $2, NOW(), $3)",
&[&migration.version, &migration.name, &checksum.to_string()]).await?;
tracing::info!("Migration Applied: {} {}", migration.version, migration.name);
}

Ok(())
}

pub fn get_executor(&self) -> &Arc<dyn QueryExecutor> {
Expand Down

0 comments on commit f9abbc7

Please sign in to comment.