diff --git a/Cargo.lock b/Cargo.lock index dfca3105d..13580b66c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -864,6 +864,15 @@ dependencies = [ "serde", ] +[[package]] +name = "castaway" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0abae9be0aaf9ea96a3b1b8b1b55c602ca751eba1b1500220cea4ecbafe7c0d5" +dependencies = [ + "rustversion", +] + [[package]] name = "cbc" version = "0.1.2" @@ -1078,6 +1087,20 @@ dependencies = [ "memchr", ] +[[package]] +name = "compact_str" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6050c3a16ddab2e412160b31f2c871015704239bca62f72f6e5f0be631d3f644" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "rustversion", + "ryu", + "static_assertions", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -3204,7 +3227,9 @@ dependencies = [ "serde_json", "serde_yaml", "sqlx", + "syn2mas", "tokio", + "tokio-stream", "tokio-util", "tower", "tower-http", @@ -6044,6 +6069,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "static_assertions_next" version = "1.1.2" @@ -6106,6 +6137,23 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn2mas" +version = "0.12.0" +dependencies = [ + "chrono", + "compact_str", + "futures-util", + "rand", + "sqlx", + "thiserror 2.0.3", + "thiserror-ext", + "tokio", + "tracing", + "ulid", + "uuid", +] + [[package]] name = "sync_wrapper" version = "0.1.2" @@ -6178,6 +6226,28 @@ dependencies = [ "thiserror-impl 2.0.3", ] +[[package]] +name = "thiserror-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa35fd08b65a716e1a91479b00d03ed2ef4c92371a4900ceb6ec2b332f9d71df" +dependencies = [ + "thiserror 1.0.69", + "thiserror-ext-derive", +] + +[[package]] +name = "thiserror-ext-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85ec5bcb8889378397e46bcd9f8ac636e9045f42851561e05a700667151abd18" +dependencies = [ + "either", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thiserror-impl" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index 246e7850b..caa051e58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ mas-tasks = { path = "./crates/tasks/", version = "=0.12.0" } mas-templates = { path = "./crates/templates/", version = "=0.12.0" } mas-tower = { path = "./crates/tower/", version = "=0.12.0" } oauth2-types = { path = "./crates/oauth2-types/", version = "=0.12.0" } +syn2mas = { path = "./crates/syn2mas", version = "=0.12.0" } # OpenAPI schema generation and validation [workspace.dependencies.aide] @@ -66,6 +67,9 @@ features = ["axum", "axum-headers", "macros"] version = "7.0.11" features = ["chrono", "url", "tracing"] +[workspace.dependencies.async-stream] +version = "0.3.6" + # Utility to write and implement async traits [workspace.dependencies.async-trait] version = "0.1.83" @@ -91,6 +95,10 @@ version = "1.9.0" [workspace.dependencies.camino] version = "1.1.9" +# Memory optimisation for short strings +[workspace.dependencies.compact_str] +version = "0.8.0" + # Time utilities [workspace.dependencies.chrono] version = "0.4.38" @@ -309,11 +317,17 @@ features = [ [workspace.dependencies.thiserror] version = "2.0.3" +[workspace.dependencies.thiserror-ext] +version = "0.2.0" + # Async runtime [workspace.dependencies.tokio] version = "1.41.1" features = ["full"] +[workspace.dependencies.tokio-stream] +version = "0.1.16" + # Useful async utilities [workspace.dependencies.tokio-util] version = "0.7.13" diff --git a/clippy.toml b/clippy.toml index ac0f49bf4..3cbf7c74c 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,4 +1,4 @@ -doc-valid-idents = ["OpenID", "OAuth", "..", "PostgreSQL"] +doc-valid-idents = ["OpenID", "OAuth", "..", "PostgreSQL", "SQLite"] disallowed-methods = [ { path = "rand::thread_rng", reason = "do not create rngs on the fly, pass them as parameters" }, diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 45dd9f8c6..0aa0d8597 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -38,6 +38,7 @@ serde_yaml = "0.9.34" sqlx.workspace = true tokio.workspace = true tokio-util.workspace = true +tokio-stream.workspace = true tower.workspace = true tower-http.workspace = true url.workspace = true @@ -89,6 +90,7 @@ mas-tasks.workspace = true mas-templates.workspace = true mas-tower.workspace = true oauth2-types.workspace = true +syn2mas.workspace = true [features] # Features used for the prebuilt binaries diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index eed27c2f7..97ee47230 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -19,6 +19,7 @@ mod debug; mod doctor; mod manage; mod server; +mod syn2mas; mod templates; mod worker; @@ -48,6 +49,10 @@ enum Subcommand { /// Run diagnostics on the deployment Doctor(self::doctor::Options), + + /// Migrate from Synapse's built-in auth system to MAS. + #[clap(name = "syn2mas")] + Syn2Mas(self::syn2mas::Options), } #[derive(Parser, Debug)] @@ -74,6 +79,7 @@ impl Options { Some(S::Templates(c)) => Box::pin(c.run(figment)).await, Some(S::Debug(c)) => Box::pin(c.run(figment)).await, Some(S::Doctor(c)) => Box::pin(c.run(figment)).await, + Some(S::Syn2Mas(c)) => Box::pin(c.run(figment)).await, None => Box::pin(self::server::Options::default().run(figment)).await, } } diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs new file mode 100644 index 000000000..b814763ed --- /dev/null +++ b/crates/cli/src/commands/syn2mas.rs @@ -0,0 +1,83 @@ +use std::process::ExitCode; + +use anyhow::Context; +use clap::Parser; +use figment::Figment; +use mas_config::{ConfigurationSectionExt, DatabaseConfig}; +use rand::thread_rng; +use sqlx::{Connection, Either, PgConnection}; +use syn2mas::{LockedMasDatabase, MasWriter, SynapseReader}; +use tracing::{error, warn}; + +use crate::util::database_connection_from_config; + +#[derive(Parser, Debug)] +pub(super) struct Options { + #[command(subcommand)] + subcommand: Subcommand, + + /// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is only suitable for TESTING. + /// If you want to use this tool anyway, please pass this argument. + /// + /// If you want to migrate from Synapse to MAS today, please use the Node.js-based tool in the MAS repository. + #[clap(long = "i-swear-i-am-just-testing-in-a-staging-environment")] + experimental_accepted: bool, +} + +#[derive(Parser, Debug)] +enum Subcommand { + Check, + Migrate, +} + +/// The number of parallel writing transactions active against the MAS database. +const NUM_WRITER_CONNECTIONS: usize = 8; + +impl Options { + pub async fn run(self, figment: &Figment) -> anyhow::Result { + warn!("This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. Do not use it, except for TESTING."); + if !self.experimental_accepted { + error!("Please agree that you can only use this tool for testing."); + return Ok(ExitCode::FAILURE); + } + + // TODO allow configuring the synapse database location + let mut syn_conn = PgConnection::connect("postgres:///fakesyn").await.unwrap(); + + let config = DatabaseConfig::extract_or_default(figment)?; + + let mut mas_connection = database_connection_from_config(&config).await?; + + let Either::Left(mut mas_connection) = LockedMasDatabase::try_new(&mut mas_connection) + .await + .context("failed to issue query to lock database")? + else { + error!("Failed to acquire syn2mas lock on the database."); + error!("This likely means that another syn2mas instance is already running!"); + return Ok(ExitCode::FAILURE); + }; + + syn2mas::mas_pre_migration_checks(&mut mas_connection).await?; + syn2mas::synapse_pre_migration_checks(&mut syn_conn).await?; + + let mut reader = SynapseReader::new(&mut syn_conn, true).await?; + let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS); + for _ in 0..NUM_WRITER_CONNECTIONS { + writer_mas_connections.push(database_connection_from_config(&config).await?); + } + let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?; + + // TODO is this rng ok? + #[allow(clippy::disallowed_methods)] + let mut rng = thread_rng(); + + // TODO progress reporting + // TODO allow configuring the server name + syn2mas::migrate(&mut reader, &mut writer, "matrix.org", &mut rng).await?; + + reader.finish().await?; + writer.finish().await?; + + Ok(ExitCode::SUCCESS) + } +} diff --git a/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json b/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json new file mode 100644 index 000000000..c7f5fce5e --- /dev/null +++ b/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas_restore_indices (name, table_name, definition)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752" +} diff --git a/crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json b/crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json new file mode 100644 index 000000000..f1b8bad90 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name!", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 2, + "name": "definition!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null, + false, + null + ] + }, + "hash": "12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca" +} diff --git a/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json b/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json new file mode 100644 index 000000000..68b0722e1 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT indexname AS \"name!\", indexdef AS \"definition!\", schemaname AS \"table_name!\"\n FROM pg_indexes\n WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 1, + "name": "definition!", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "table_name!", + "type_info": "Name" + } + ], + "parameters": { + "Left": [ + "Name" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0" +} diff --git a/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json b/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json new file mode 100644 index 000000000..3dcc1fc48 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype = 'f' AND confrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name!", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 2, + "name": "definition!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null, + false, + null + ] + }, + "hash": "5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299" +} diff --git a/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json b/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json new file mode 100644 index 000000000..855da3ba6 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas_restore_constraints (name, table_name, definition)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93" +} diff --git a/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json b/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json new file mode 100644 index 000000000..759cc5f8b --- /dev/null +++ b/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT table_name, name, definition FROM syn2mas_restore_constraints ORDER BY order_key", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "definition", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84" +} diff --git a/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json b/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json new file mode 100644 index 000000000..9ae8f1e35 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT table_name, name, definition FROM syn2mas_restore_indices ORDER BY order_key", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "definition", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2" +} diff --git a/crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json b/crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json new file mode 100644 index 000000000..df1f3fb7c --- /dev/null +++ b/crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema\n AND tablename = ANY($1)\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "_dummy", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "NameArray" + ] + }, + "nullable": [ + null + ] + }, + "hash": "b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76" +} diff --git a/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json b/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json new file mode 100644 index 000000000..efa2c4d24 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_passwords\n (user_password_id, user_id, hashed_password, created_at, version)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray", + "Int4Array" + ] + }, + "nullable": [] + }, + "hash": "c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425" +} diff --git a/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json b/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json new file mode 100644 index 000000000..d8be21736 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__users\n (user_id, username, created_at, locked_at, can_request_admin)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray", + "BoolArray" + ] + }, + "nullable": [] + }, + "hash": "c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f" +} diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml new file mode 100644 index 000000000..1b9c051d8 --- /dev/null +++ b/crates/syn2mas/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "syn2mas" +version.workspace = true +license.workspace = true +authors.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +thiserror.workspace = true +thiserror-ext.workspace = true +tokio.workspace = true +sqlx.workspace = true +chrono.workspace = true +compact_str.workspace = true +tracing.workspace = true +futures-util = "0.3.30" + +rand.workspace = true +uuid = "1.10.0" +ulid = { workspace = true, features = ["uuid"] } + +[lints] +workspace = true diff --git a/crates/syn2mas/src/checks.rs b/crates/syn2mas/src/checks.rs new file mode 100644 index 000000000..5f56ce982 --- /dev/null +++ b/crates/syn2mas/src/checks.rs @@ -0,0 +1,30 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # Checks +//! +//! This module provides safety checks to run against a Synapse database before running the Synapse-to-MAS migration. + +use sqlx::PgConnection; +use thiserror::Error; + +use crate::mas_writer; + +#[derive(Debug, Error)] +pub enum Error { + #[error("problem with MAS database: {0}")] + MasDatabase(#[source] mas_writer::checks::Error), + + #[error("query failed: {0}")] + Sqlx(#[from] sqlx::Error), +} + +#[tracing::instrument(skip_all)] +pub async fn synapse_pre_migration_checks( + synapse_connection: &mut PgConnection, +) -> Result<(), Error> { + // TODO check that the database looks like a Synapse database and is sane for migration + Ok(()) +} diff --git a/crates/syn2mas/src/lib.rs b/crates/syn2mas/src/lib.rs new file mode 100644 index 000000000..ec5455f7d --- /dev/null +++ b/crates/syn2mas/src/lib.rs @@ -0,0 +1,16 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +mod mas_writer; +mod synapse_reader; + +mod checks; +mod migration; + +pub use self::checks::synapse_pre_migration_checks; +pub use self::mas_writer::locking::LockedMasDatabase; +pub use self::mas_writer::{checks::mas_pre_migration_checks, MasWriter}; +pub use self::migration::migrate; +pub use self::synapse_reader::SynapseReader; diff --git a/crates/syn2mas/src/mas_writer/checks.rs b/crates/syn2mas/src/mas_writer/checks.rs new file mode 100644 index 000000000..85a6e74a9 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/checks.rs @@ -0,0 +1,74 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # MAS Database Checks +//! +//! This module provides safety checks to run against a MAS database before running the Synapse-to-MAS migration. + +use thiserror::Error; +use thiserror_ext::ContextInto; + +use super::{is_syn2mas_in_progress, locking::LockedMasDatabase, MAS_TABLES_AFFECTED_BY_MIGRATION}; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("the MAS database is not empty: rows found in at least `{table}`")] + MasDatabaseNotEmpty { table: &'static str }, + + #[error("query against {table} failed — is this actually a MAS database?")] + MaybeNotMas { + #[source] + source: sqlx::Error, + table: &'static str, + }, + + #[error(transparent)] + Sqlx(#[from] sqlx::Error), + + #[error("unable to check if syn2mas is already in progress")] + UnableToCheckInProgress(#[source] super::Error), +} + +/// Check that a MAS database is ready for being migrated to. +/// +/// Concretely, this checks that the database is empty. +/// +/// If syn2mas is already in progress on this database, the checks are skipped. +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - If any database access error occurs. +/// - If any MAS tables involved in the migration are not empty. +/// - If we can't check whether syn2mas is already in progress on this database or not. +#[tracing::instrument(skip_all)] +pub async fn mas_pre_migration_checks<'a>( + mas_connection: &mut LockedMasDatabase<'a>, +) -> Result<(), Error> { + if is_syn2mas_in_progress(mas_connection.as_mut()) + .await + .map_err(Error::UnableToCheckInProgress)? + { + // syn2mas already in progress, so we already performed the checks + return Ok(()); + } + + // Check that the database looks like a MAS database and that it is also an empty database. + + for &table in MAS_TABLES_AFFECTED_BY_MIGRATION { + let row_present = sqlx::query(&format!("SELECT 1 AS dummy FROM {table} LIMIT 1")) + .fetch_optional(mas_connection.as_mut()) + .await + .into_maybe_not_mas(table)? + .is_some(); + + if row_present { + return Err(Error::MasDatabaseNotEmpty { table }); + } + } + + Ok(()) +} diff --git a/crates/syn2mas/src/mas_writer/constraint_pausing.rs b/crates/syn2mas/src/mas_writer/constraint_pausing.rs new file mode 100644 index 000000000..6a420888f --- /dev/null +++ b/crates/syn2mas/src/mas_writer/constraint_pausing.rs @@ -0,0 +1,151 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use sqlx::PgConnection; +use tracing::debug; + +use super::{Error, IntoDatabase}; + +/// Description of a constraint, which allows recreating it later. +pub struct ConstraintDescription { + pub name: String, + pub table_name: String, + pub definition: String, +} + +pub struct IndexDescription { + pub name: String, + pub table_name: String, + pub definition: String, +} + +/// Look up and return the definition of a constraint. +pub async fn describe_constraints_on_table( + conn: &mut PgConnection, + table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + ConstraintDescription, + r#" + SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!" + FROM pg_constraint c + JOIN pg_namespace n ON n.oid = c.connamespace + WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1 + AND n.nspname = current_schema; + "#, + table_name + ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read constraint definitions of {table_name}")) +} + +/// Look up and return the definitions of foreign-key constraints whose +/// target table is the one specified. +pub async fn describe_foreign_key_constraints_to_table( + conn: &mut PgConnection, + target_table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + ConstraintDescription, + r#" + SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!" + FROM pg_constraint c + JOIN pg_namespace n ON n.oid = c.connamespace + WHERE contype = 'f' AND confrelid::regclass::text = $1 + AND n.nspname = current_schema; + "#, + target_table_name + ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read FK constraint definitions targetting {target_table_name}")) +} + +/// Look up and return the definitions of all indices on a given table. +pub async fn describe_indices_on_table( + conn: &mut PgConnection, + table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + IndexDescription, + r#" + SELECT indexname AS "name!", indexdef AS "definition!", schemaname AS "table_name!" + FROM pg_indexes + WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL + "#, + table_name + ).fetch_all(&mut *conn).await.into_database("cannot search for indices") +} + +/// Drops a constraint from the database. +/// +/// The constraint must exist prior to this call. +pub async fn drop_constraint( + conn: &mut PgConnection, + constraint: &ConstraintDescription, +) -> Result<(), Error> { + let name = &constraint.name; + let table_name = &constraint.table_name; + debug!("dropping constraint {name} on table {table_name}"); + sqlx::query(&format!("ALTER TABLE {table_name} DROP CONSTRAINT {name};")) + .execute(&mut *conn) + .await + .into_database_with(|| format!("failed to drop constraint {name} on {table_name}"))?; + + Ok(()) +} + +/// Drops an index from the database. +/// +/// The index must exist prior to this call. +pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> { + let index_name = &index.name; + debug!("dropping index {index_name}"); + sqlx::query(&format!("DROP INDEX {index_name};")) + .execute(&mut *conn) + .await + .into_database_with(|| format!("failed to temporarily drop {index_name}"))?; + + Ok(()) +} + +/// Restores (recreates) a constraint. +/// +/// The constraint must not exist prior to this call. +pub async fn restore_constraint( + conn: &mut PgConnection, + constraint: &ConstraintDescription, +) -> Result<(), Error> { + let ConstraintDescription { + name, + table_name, + definition, + } = &constraint; + sqlx::query(&format!( + "ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};" + )) + .execute(conn) + .await + .into_database_with(|| { + format!("failed to recreate constraint {name} on {table_name} with {definition}") + })?; + + Ok(()) +} + +/// Restores (recreates) a index. +/// +/// The index must not exist prior to this call. +pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> { + let IndexDescription { + name, + table_name, + definition, + } = &index; + + sqlx::query(&format!("{definition};")) + .execute(conn) + .await + .into_database_with(|| { + format!("failed to recreate index {name} on {table_name} with {definition}") + })?; + + Ok(()) +} diff --git a/crates/syn2mas/src/mas_writer/locking.rs b/crates/syn2mas/src/mas_writer/locking.rs new file mode 100644 index 000000000..147bad4a6 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/locking.rs @@ -0,0 +1,58 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use std::sync::LazyLock; + +use sqlx::{ + postgres::{PgAdvisoryLock, PgAdvisoryLockGuard}, + Either, PgConnection, +}; + +static SYN2MAS_ADVISORY_LOCK: LazyLock = + LazyLock::new(|| PgAdvisoryLock::new("syn2mas-maswriter")); + +/// A wrapper around a Postgres connection which holds a session-wide advisory lock +/// preventing concurrent access by other syn2mas instances. +pub struct LockedMasDatabase<'conn> { + inner: PgAdvisoryLockGuard<'static, &'conn mut PgConnection>, +} + +impl<'conn> LockedMasDatabase<'conn> { + /// Attempts to lock the MAS database against concurrent access by other syn2mas instances. + /// + /// If the lock can be acquired, returns a `LockedMasDatabase`. + /// If the lock cannot be acquired, returns the connection back to the caller wrapped in `Either::Right`. + /// + /// # Errors + /// + /// Errors are returned for underlying database errors. + pub async fn try_new( + mas_connection: &'conn mut PgConnection, + ) -> Result, sqlx::Error> { + SYN2MAS_ADVISORY_LOCK + .try_acquire(mas_connection) + .await + .map(|either| match either { + Either::Left(inner) => Either::Left(LockedMasDatabase { inner }), + Either::Right(unlocked) => Either::Right(unlocked), + }) + } + + /// Releases the advisory lock on the MAS database, returning the underlying + /// connection. + /// + /// # Errors + /// + /// Errors are returned for underlying database errors. + pub async fn unlock(self) -> Result<&'conn mut PgConnection, sqlx::Error> { + self.inner.release_now().await + } +} + +impl AsMut for LockedMasDatabase<'_> { + fn as_mut(&mut self) -> &mut PgConnection { + self.inner.as_mut() + } +} diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs new file mode 100644 index 000000000..fe950592c --- /dev/null +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -0,0 +1,671 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # MAS Writer +//! +//! This module is responsible for writing new records to MAS' database. + +use std::fmt::Display; + +use chrono::{DateTime, Utc}; +use futures_util::{future::BoxFuture, TryStreamExt}; +use sqlx::{query, query_as, Executor, PgConnection}; +use thiserror::Error; +use thiserror_ext::{Construct, ContextInto}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tracing::{error, info, warn, Level}; +use uuid::Uuid; + +use self::{ + constraint_pausing::{ConstraintDescription, IndexDescription}, + locking::LockedMasDatabase, +}; + +pub mod checks; +pub mod locking; + +mod constraint_pausing; + +#[derive(Debug, Error, Construct, ContextInto)] +pub enum Error { + #[error("database error whilst {context}")] + Database { + #[source] + source: sqlx::Error, + context: String, + }, + + #[error("writer connection pool shut down due to error")] + #[allow(clippy::enum_variant_names)] + WriterConnectionPoolError, + + #[error("inconsistent database: {0}")] + Inconsistent(String), + + #[error("{0}")] + Multiple(MultipleErrors), +} + +#[derive(Debug)] +pub struct MultipleErrors { + errors: Vec, +} + +impl Display for MultipleErrors { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "multiple errors")?; + for error in &self.errors { + write!(f, "\n- {error}")?; + } + Ok(()) + } +} + +impl From> for MultipleErrors { + fn from(value: Vec) -> Self { + MultipleErrors { errors: value } + } +} + +struct WriterConnectionPool { + /// How many connections are in circulation + num_connections: usize, + + /// A receiver handle to get a writer connection + /// The writer connection will be mid-transaction! + connection_rx: Receiver>, + + /// A sender handle to return a writer connection to the pool + /// The connection should still be mid-transaction! + connection_tx: Sender>, +} + +impl WriterConnectionPool { + pub fn new(connections: Vec) -> Self { + let num_connections = connections.len(); + let (connection_tx, connection_rx) = mpsc::channel(num_connections); + for connection in connections { + connection_tx + .try_send(Ok(connection)) + .expect("there should be room for this connection"); + } + + WriterConnectionPool { + num_connections, + connection_rx, + connection_tx, + } + } + + pub async fn spawn_with_connection(&mut self, task: F) -> Result<(), Error> + where + F: for<'conn> FnOnce(&'conn mut PgConnection) -> BoxFuture<'conn, Result<(), Error>> + + Send + + Sync + + 'static, + { + match self.connection_rx.recv().await { + Some(Ok(mut connection)) => { + let connection_tx = self.connection_tx.clone(); + tokio::task::spawn(async move { + let to_return = match task(&mut connection).await { + Ok(()) => Ok(connection), + Err(error) => { + error!("error in writer: {error}"); + Err(error) + } + }; + // This should always succeed in sending unless we're already shutting + // down for some other reason. + let _: Result<_, _> = connection_tx.send(to_return).await; + }); + + Ok(()) + } + Some(Err(error)) => { + // This should always succeed in sending unless we're already shutting + // down for some other reason. + let _: Result<_, _> = self.connection_tx.send(Err(error)).await; + + Err(Error::WriterConnectionPoolError) + } + None => { + unreachable!("we still hold a reference to the sender, so this shouldn't happen") + } + } + } + + /// Finishes writing to the database, committing all changes. + /// + /// # Errors + /// + /// - If any errors were returned to the pool. + /// - If committing the changes failed. + /// + /// # Panics + /// + /// - If connections were not returned to the pool. (This indicates a serious bug.) + pub async fn finish(self) -> Result<(), Vec> { + let mut errors = Vec::new(); + + let Self { + num_connections, + mut connection_rx, + connection_tx, + } = self; + // Drop the sender handle so we gracefully allow the receiver to close + drop(connection_tx); + + let mut finished_connections = 0; + + while let Some(connection_or_error) = connection_rx.recv().await { + finished_connections += 1; + + match connection_or_error { + Ok(mut connection) => { + if let Err(err) = query("COMMIT;").execute(&mut connection).await { + errors.push(err.into_database("commit writer transaction")); + } + } + Err(error) => { + errors.push(error); + } + } + } + assert_eq!(finished_connections, num_connections, "syn2mas had a bug: connections went missing {finished_connections} != {num_connections}"); + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} + +pub struct MasWriter<'c> { + conn: LockedMasDatabase<'c>, + writer_pool: WriterConnectionPool, + + indices_to_restore: Vec, + constraints_to_restore: Vec, +} + +pub struct MasNewUser { + pub user_id: Uuid, + pub username: String, + pub created_at: DateTime, + pub locked_at: Option>, + pub can_request_admin: bool, +} + +pub struct MasNewUserPassword { + pub user_password_id: Uuid, + pub user_id: Uuid, + pub hashed_password: String, + pub created_at: DateTime, +} + +/// List of all MAS tables that are written to by syn2mas. +pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &["users", "user_passwords"]; + +/// Detect whether a syn2mas migration has started on the given database. +/// +/// Concretly, this checks for the presence of syn2mas restoration tables. +/// +/// Returns `true` if syn2mas has started, or `false` if it hasn't. +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - If any database error occurs whilst querying the database. +/// - If some, but not all, syn2mas restoration tables are present. +/// (This shouldn't be possible without syn2mas having been sabotaged!) +pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result { + // Names of tables used for syn2mas resumption + // Must be `String`s, not just `&str`, for the query. + let restore_table_names = vec![ + "syn2mas_restore_constraints".to_owned(), + "syn2mas_restore_indices".to_owned(), + ]; + + let num_resumption_tables = query!( + r#" + SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema + AND tablename = ANY($1) + "#, + &restore_table_names, + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to query count of resumption tables")? + .len(); + + if num_resumption_tables == 0 { + Ok(false) + } else if num_resumption_tables == restore_table_names.len() { + Ok(true) + } else { + Err(Error::inconsistent( + "some, but not all, syn2mas resumption tables were found", + )) + } +} + +impl<'conn> MasWriter<'conn> { + /// Creates a new MAS writer. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database connection experiences an error. + #[allow(clippy::missing_panics_doc)] // not real + #[tracing::instrument(skip_all)] + pub async fn new( + mut conn: LockedMasDatabase<'conn>, + mut writer_connections: Vec, + ) -> Result { + // Given that we don't have any concurrent transactions here, + // the READ COMMITTED isolation level is sufficient. + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + let syn2mas_started = is_syn2mas_in_progress(conn.as_mut()).await?; + + let indices_to_restore; + let constraints_to_restore; + + if syn2mas_started { + // We are resuming from a partially-done syn2mas migration + // We should reset the database so that we're starting from scratch. + warn!("Partial syn2mas migration has already been done; resetting."); + for table in MAS_TABLES_AFFECTED_BY_MIGRATION { + query(&format!("TRUNCATE syn2mas__{table};")) + .execute(conn.as_mut()) + .await + .into_database_with(|| format!("failed to truncate table syn2mas__{table}"))?; + } + + indices_to_restore = query_as!( + IndexDescription, + "SELECT table_name, name, definition FROM syn2mas_restore_indices ORDER BY order_key" + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to get syn2mas restore data (index descriptions)")?; + constraints_to_restore = query_as!( + ConstraintDescription, + "SELECT table_name, name, definition FROM syn2mas_restore_constraints ORDER BY order_key" + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to get syn2mas restore data (constraint descriptions)")?; + } else { + info!("Starting new syn2mas migration"); + + conn.as_mut() + .execute_many(include_str!("syn2mas_temporary_tables.sql")) + // We don't care about any query results + .try_collect::>() + .await + .into_database("could not create temporary tables")?; + + // Pause (temporarily drop) indices and constraints in order to improve + // performance of bulk data loading. + (indices_to_restore, constraints_to_restore) = + Self::pause_indices(conn.as_mut()).await?; + + // Persist these index and constraint definitions. + for IndexDescription { + name, + table_name, + definition, + } in &indices_to_restore + { + query!( + r#" + INSERT INTO syn2mas_restore_indices (name, table_name, definition) + VALUES ($1, $2, $3) + "#, + name, + table_name, + definition + ) + .execute(conn.as_mut()) + .await + .into_database("failed to save restore data (index)")?; + } + for ConstraintDescription { + name, + table_name, + definition, + } in &constraints_to_restore + { + query!( + r#" + INSERT INTO syn2mas_restore_constraints (name, table_name, definition) + VALUES ($1, $2, $3) + "#, + name, + table_name, + definition + ) + .execute(conn.as_mut()) + .await + .into_database("failed to save restore data (index)")?; + } + } + + query("COMMIT;") + .execute(conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + // Now after all the schema changes have been done, begin writer transactions + for writer_connection in &mut writer_connections { + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(&mut *writer_connection) + .await + .into_database("begin MAS writer transaction")?; + } + + Ok(Self { + conn, + writer_pool: WriterConnectionPool::new(writer_connections), + indices_to_restore, + constraints_to_restore, + }) + } + + #[tracing::instrument(skip_all)] + async fn pause_indices( + conn: &mut PgConnection, + ) -> Result<(Vec, Vec), Error> { + let mut indices_to_restore = Vec::new(); + let mut constraints_to_restore = Vec::new(); + + for &unprefixed_table in MAS_TABLES_AFFECTED_BY_MIGRATION { + let table = format!("syn2mas__{unprefixed_table}"); + // First drop incoming foreign key constraints + for constraint in + constraint_pausing::describe_foreign_key_constraints_to_table(&mut *conn, &table) + .await? + { + constraint_pausing::drop_constraint(&mut *conn, &constraint).await?; + constraints_to_restore.push(constraint); + } + // After all incoming foreign key constraints have been removed, + // we can now drop internal constraints. + for constraint in + constraint_pausing::describe_constraints_on_table(&mut *conn, &table).await? + { + constraint_pausing::drop_constraint(&mut *conn, &constraint).await?; + constraints_to_restore.push(constraint); + } + // After all constraints have been removed, we can drop indices. + for index in constraint_pausing::describe_indices_on_table(&mut *conn, &table).await? { + constraint_pausing::drop_index(&mut *conn, &index).await?; + indices_to_restore.push(index); + } + } + + Ok((indices_to_restore, constraints_to_restore)) + } + + async fn restore_indices<'a>( + conn: &mut LockedMasDatabase<'a>, + indices_to_restore: &[IndexDescription], + constraints_to_restore: &[ConstraintDescription], + ) -> Result<(), Error> { + // First restore all indices. The order is not important as far as I know. + // However the indices are needed before constraints. + for index in indices_to_restore.iter().rev() { + constraint_pausing::restore_index(conn.as_mut(), index).await?; + } + // Then restore all constraints. + // The order here is the reverse of drop order, since some constraints may rely + // on other constraints to work. + for constraint in constraints_to_restore.iter().rev() { + constraint_pausing::restore_constraint(conn.as_mut(), constraint).await?; + } + Ok(()) + } + + /// Finish writing to the MAS database, flushing and committing all changes. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database connection experiences an error. + #[tracing::instrument(skip_all)] + pub async fn finish(mut self) -> Result<(), Error> { + // Commit all writer transactions to the database. + self.writer_pool + .finish() + .await + .map_err(|errors| Error::Multiple(MultipleErrors::from(errors)))?; + + // Now all the data has been migrated, finish off by restoring indices and constraints! + + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(self.conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + Self::restore_indices( + &mut self.conn, + &self.indices_to_restore, + &self.constraints_to_restore, + ) + .await?; + + self.conn + .as_mut() + .execute_many(include_str!("syn2mas_revert_temporary_tables.sql")) + // We don't care about any query results + .try_collect::>() + .await + .into_database("could not revert temporary tables")?; + + query("COMMIT;") + .execute(self.conn.as_mut()) + .await + .into_database("ending MAS transaction")?; + + self.conn + .unlock() + .await + .into_database("could not unlock MAS database")?; + + Ok(()) + } + + /// Write a batch of users to the database. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database writer connection pool had an error. + #[allow(clippy::missing_panics_doc)] // not a real panic + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub async fn write_users(&mut self, users: Vec) -> Result<(), Error> { + self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { + // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement + // without having to change the statement SQL thus altering the query plan. + // See . + // In the future we could consider using sqlx's support for `PgCopyIn` / the `COPY FROM STDIN` statement, + // which is allegedly the best for insert performance, but is less simple to encode. + if users.is_empty() { + return Ok(()); + } + + let mut user_ids: Vec = Vec::with_capacity(users.len()); + let mut usernames: Vec = Vec::with_capacity(users.len()); + let mut created_ats: Vec> = Vec::with_capacity(users.len()); + let mut locked_ats: Vec>> = Vec::with_capacity(users.len()); + let mut can_request_admins: Vec = Vec::with_capacity(users.len()); + for MasNewUser { + user_id, + username, + created_at, + locked_at, + can_request_admin, + } in users + { + user_ids.push(user_id); + usernames.push(username); + created_ats.push(created_at); + locked_ats.push(locked_at); + can_request_admins.push(can_request_admin); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__users + (user_id, username, created_at, locked_at, can_request_admin) + SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[]) + "#, + &user_ids[..], + &usernames[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &locked_ats[..] as &[Option>], + &can_request_admins[..], + ).execute(&mut *conn).await.into_database("writing users to MAS")?; + + Ok(()) + })).await + } + + /// Write a batch of user passwords to the database. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database writer connection pool had an error. + #[allow(clippy::missing_panics_doc)] // not a real panic + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub async fn write_passwords( + &mut self, + passwords: Vec, + ) -> Result<(), Error> { + self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { + if passwords.is_empty() { + return Ok(()); + } + + let mut user_password_ids: Vec = Vec::with_capacity(passwords.len()); + let mut user_ids: Vec = Vec::with_capacity(passwords.len()); + let mut hashed_passwords: Vec = Vec::with_capacity(passwords.len()); + let mut created_ats: Vec> = Vec::with_capacity(passwords.len()); + let mut versions: Vec = Vec::with_capacity(passwords.len()); + for MasNewUserPassword { + user_password_id, + user_id, + hashed_password, + created_at, + } in passwords + { + user_password_ids.push(user_password_id); + user_ids.push(user_id); + hashed_passwords.push(hashed_password); + created_ats.push(created_at); + // TODO hardcoding version to `1` may not be correct long-term? + versions.push(1); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__user_passwords + (user_password_id, user_id, hashed_password, created_at, version) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[]) + "#, + &user_password_ids[..], + &user_ids[..], + &hashed_passwords[..], + &created_ats[..], + &versions[..], + ).execute(&mut *conn).await.into_database("writing users to MAS")?; + + Ok(()) + })).await + } +} + +// How many entries to buffer at once, before writing a batch of rows to the database. +// TODO tune: didn't see that much difference between 4k and 64k +// (4k: 13.5~14, 64k: 12.5~13s — streaming the whole way would be better, especially for DB latency, but probably fiiine +// and also we won't be able to stream to two tables at once...) +const WRITE_BUFFER_BATCH_SIZE: usize = 4096; + +pub struct MasUserWriteBuffer<'writer, 'conn> { + users: Vec, + passwords: Vec, + writer: &'writer mut MasWriter<'conn>, +} + +impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> { + pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self { + MasUserWriteBuffer { + users: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + passwords: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + writer, + } + } + + pub async fn finish(mut self) -> Result<(), Error> { + self.flush_users().await?; + self.flush_passwords().await?; + Ok(()) + } + + pub async fn flush_users(&mut self) -> Result<(), Error> { + // via copy: 13s + // not via copy: 14s + // difference probably gets worse with latency + self.writer + .write_users(std::mem::take(&mut self.users)) + .await?; + + self.users.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + Ok(()) + } + + pub async fn flush_passwords(&mut self) -> Result<(), Error> { + self.writer + .write_passwords(std::mem::take(&mut self.passwords)) + .await?; + self.passwords.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + + Ok(()) + } + + pub async fn write_user(&mut self, user: MasNewUser) -> Result<(), Error> { + self.users.push(user); + if self.users.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_users().await?; + } + Ok(()) + } + + pub async fn write_password(&mut self, password: MasNewUserPassword) -> Result<(), Error> { + self.passwords.push(password); + if self.passwords.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_passwords().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + // TODO test me +} diff --git a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql new file mode 100644 index 000000000..d82be82c8 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql @@ -0,0 +1,12 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- This script should revert what `syn2mas_temporary_tables.sql` does. + +DROP TABLE syn2mas_restore_constraints; +DROP TABLE syn2mas_restore_indices; + +ALTER TABLE syn2mas__users RENAME TO users; +ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords; diff --git a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql new file mode 100644 index 000000000..3e5b86e40 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql @@ -0,0 +1,41 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + + +-- # syn2mas Temporary Tables +-- This file takes a MAS database and: +-- +-- 1. creates temporary tables used by syn2mas for storing restore data +-- 2. renames important tables with the `syn2mas__` prefix, to prevent +-- running MAS instances from having any opportunity to see or modify +-- the partial data in the database, especially whilst it is not protected +-- by constraints. +-- +-- All changes in this file must be reverted by `syn2mas_revert_temporary_tables.sql` +-- in the same directory. + +-- corresponds to `ConstraintDescription` +CREATE TABLE syn2mas_restore_constraints ( + -- synthetic auto-incrementing ID so we can load these in order + order_key SERIAL NOT NULL PRIMARY KEY, + + table_name TEXT NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL +); + +-- corresponds to `IndexDescription` +CREATE TABLE syn2mas_restore_indices ( + -- synthetic auto-incrementing ID so we can load these in order + order_key SERIAL NOT NULL PRIMARY KEY, + + table_name TEXT NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL +); + +-- Now we rename all tables that we touch during the migration. +ALTER TABLE users RENAME TO syn2mas__users; +ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords; diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs new file mode 100644 index 000000000..59afa73d9 --- /dev/null +++ b/crates/syn2mas/src/migration.rs @@ -0,0 +1,165 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # Migration +//! +//! This module provides the high-level logic for performing the Synapse-to-MAS database migration. +//! +//! This module does not implement any of the safety checks that should be run *before* the migration. + +use std::{collections::HashMap, pin::pin}; + +use chrono::{DateTime, Utc}; +use compact_str::CompactString; +use futures_util::StreamExt as _; +use rand::RngCore; +use thiserror::Error; +use thiserror_ext::ContextInto; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{ + mas_writer::{self, MasNewUser, MasNewUserPassword, MasUserWriteBuffer, MasWriter}, + synapse_reader::{self, ExtractLocalpartError, FullUserId, SynapseUser}, + SynapseReader, +}; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("error when reading synapse DB ({context}): {source}")] + Synapse { + source: synapse_reader::Error, + context: String, + }, + #[error("error when writing to MAS DB ({context}): {source}")] + Mas { + source: mas_writer::Error, + context: String, + }, + #[error("failed to extract localpart of {user:?}: {source}")] + ExtractLocalpart { + source: ExtractLocalpartError, + user: FullUserId, + }, +} + +struct UsersMigrated { + /// Lookup table from user localpart to that user's UUID in MAS. + user_localparts_to_uuid: HashMap, +} + +/// Performs a migration from Synapse's database to MAS' database. +/// +/// # Panics +/// +/// - If there are more than `usize::MAX` users +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - An underlying database access error, either to MAS or to Synapse. +/// - Invalid data in the Synapse database. +pub async fn migrate( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + rng: &mut impl RngCore, +) -> Result<(), Error> { + let counts = synapse.count_rows().await.into_synapse("counting users")?; + + migrate_users( + synapse, + mas, + counts + .users + .try_into() + .expect("More than usize::MAX users — wow!"), + server_name, + rng, + ) + .await?; + + Ok(()) +} + +async fn migrate_users( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + user_count_hint: usize, + server_name: &str, + rng: &mut impl RngCore, +) -> Result { + let mut write_buffer = MasUserWriteBuffer::new(mas); + let mut users_stream = pin!(synapse.read_users()); + // TODO is 1:1 capacity enough for a hashmap? + let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint); + + while let Some(user_res) = users_stream.next().await { + let user = user_res.into_synapse("reading user")?; + let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?; + + user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id); + + write_buffer + .write_user(mas_user) + .await + .into_mas("writing user")?; + + if let Some(mas_password) = mas_password_opt { + write_buffer + .write_password(mas_password) + .await + .into_mas("writing password")?; + } + } + + write_buffer + .finish() + .await + .into_mas("writing users & passwords")?; + + Ok(UsersMigrated { + user_localparts_to_uuid, + }) +} + +fn transform_user( + user: &SynapseUser, + server_name: &str, + rng: &mut impl RngCore, +) -> Result<(MasNewUser, Option), Error> { + let username = user + .name + .extract_localpart(server_name) + .into_extract_localpart(user.name.clone())? + .to_owned(); + + let new_user = MasNewUser { + user_id: Uuid::from(Ulid::from_datetime_with_source( + DateTime::::from(user.creation_ts).into(), + rng, + )), + username, + created_at: user.creation_ts.into(), + locked_at: bool::from(user.deactivated).then_some(user.creation_ts.into()), + can_request_admin: bool::from(user.admin), + }; + + let mas_password = user + .password_hash + .clone() + .map(|password_hash| MasNewUserPassword { + user_password_id: Uuid::from(Ulid::from_datetime_with_source( + DateTime::::from(user.creation_ts).into(), + rng, + )), + user_id: new_user.user_id, + hashed_password: password_hash, + created_at: new_user.created_at, + }); + + Ok((new_user, mas_password)) +} diff --git a/crates/syn2mas/src/synapse_reader.rs b/crates/syn2mas/src/synapse_reader.rs new file mode 100644 index 000000000..d619307ac --- /dev/null +++ b/crates/syn2mas/src/synapse_reader.rs @@ -0,0 +1,265 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # Synapse Database Reader +//! +//! This module provides facilities for streaming relevant types of database records from a Synapse database. + +use chrono::{DateTime, Utc}; +use futures_util::{Stream, TryStreamExt}; +use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Row, Transaction, Type}; +use thiserror::Error; +use thiserror_ext::ContextInto; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("database error whilst {context}")] + Database { + #[source] + source: sqlx::Error, + context: String, + }, +} + +#[derive(Clone, Debug, sqlx::Decode)] +pub struct FullUserId(pub String); + +impl Type for FullUserId { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +#[derive(Debug, Error)] +pub enum ExtractLocalpartError { + #[error("user ID does not start with `@` sigil")] + NoAtSigil, + #[error("user ID does not have a `:` separator")] + NoSeparator, + #[error("wrong server name: expected {expected:?}, got {found:?}")] + WrongServerName { expected: String, found: String }, +} + +impl FullUserId { + /// Extract the localpart from the User ID, asserting that the User ID has the correct + /// server name. + /// + /// # Errors + /// + /// A handful of basic validity checks are performed and an error may be returned + /// if the User ID is not valid. + /// However, the User ID grammar is not checked fully. + /// + /// If the wrong server name is asserted, returns an error. + pub fn extract_localpart( + &self, + expected_server_name: &str, + ) -> Result<&str, ExtractLocalpartError> { + let Some(without_sigil) = self.0.strip_prefix('@') else { + return Err(ExtractLocalpartError::NoAtSigil); + }; + + let Some((localpart, server_name)) = without_sigil.split_once(':') else { + return Err(ExtractLocalpartError::NoSeparator); + }; + + if server_name != expected_server_name { + return Err(ExtractLocalpartError::WrongServerName { + expected: expected_server_name.to_owned(), + found: server_name.to_owned(), + }); + }; + + Ok(localpart) + } +} + +/// A Synapse boolean. +/// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite versions +/// that did not have native boolean support. +#[derive(Copy, Clone, Debug)] +pub struct SynapseBool(bool); + +impl<'r> sqlx::Decode<'r, Postgres> for SynapseBool { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + >::decode(value) + .map(|boolean_int| SynapseBool(boolean_int != 0)) + } +} + +impl sqlx::Type for SynapseBool { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl From for bool { + fn from(SynapseBool(value): SynapseBool) -> Self { + value + } +} + +/// A timestamp stored as the number of seconds since the Unix epoch. +/// Note that Synapse stores MOST timestamps as numbers of **milliseconds** since the Unix epoch. +/// But some timestamps are still stored in seconds. +#[derive(Copy, Clone, Debug)] +pub struct SecondsTimestamp(DateTime); + +impl From for DateTime { + fn from(SecondsTimestamp(value): SecondsTimestamp) -> Self { + value + } +} + +impl<'r> sqlx::Decode<'r, Postgres> for SecondsTimestamp { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + >::decode(value).map(|milliseconds_since_epoch| { + SecondsTimestamp(DateTime::from_timestamp_nanos( + milliseconds_since_epoch * 1_000_000_000, + )) + }) + } +} + +impl sqlx::Type for SecondsTimestamp { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +#[derive(Clone, Debug, FromRow)] +pub struct SynapseUser { + /// Full User ID of the user + pub name: FullUserId, + /// Password hash string for the user. Optional (null if no password is set). + pub password_hash: Option, + /// Whether the user is a Synapse Admin + pub admin: SynapseBool, + /// Whether the user is deactivated + pub deactivated: SynapseBool, + /// When the user was created + pub creation_ts: SecondsTimestamp, + // TODO ... + // TODO is_guest + // TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts) +} + +/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on. +/// +/// This is a safety measure against other processes changing the data underneath our feet. +/// It's still not a good idea to run Synapse at the same time as the migration. +// TODO not complete! +const TABLES_TO_LOCK: &[&str] = &["users"]; + +/// Number of migratable rows in various Synapse tables. +/// Used to estimate progress. +#[derive(Clone, Debug)] +pub struct SynapseRowCounts { + pub users: i64, +} + +pub struct SynapseReader<'c> { + txn: Transaction<'c, Postgres>, +} + +impl<'conn> SynapseReader<'conn> { + /// Create a new Synapse reader, which entails creating a transaction and locking Synapse tables. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error + /// - If we can't lock the Synapse tables (pointing to the fact that Synapse may still be running) + pub async fn new( + synapse_connection: &'conn mut PgConnection, + dry_run: bool, + ) -> Result { + let mut txn = synapse_connection + .begin() + .await + .into_database("begin transaction")?; + + query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;") + .execute(&mut *txn) + .await + .into_database("set transaction")?; + + let lock_type = if dry_run { + // We expect dry runs to be done alongside Synapse running, so we don't want to + // interfere with Synapse's database access in that case. + "ACCESS SHARE" + } else { + "EXCLUSIVE" + }; + for table in TABLES_TO_LOCK { + query(&format!("LOCK TABLE {table} IN {lock_type} MODE NOWAIT;")) + .execute(&mut *txn) + .await + .into_database_with(|| format!("locking Synapse table `{table}`"))?; + } + + Ok(Self { txn }) + } + + /// Finishes the Synapse reader, committing the transaction. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error whilst committing the transaction. + pub async fn finish(self) -> Result<(), Error> { + // TODO enforce that this is called somehow. + self.txn.commit().await.into_database("end transaction")?; + Ok(()) + } + + /// Counts the rows in the Synapse database to get an estimate of how large the migration is going to be. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error + pub async fn count_rows(&mut self) -> Result { + let users = sqlx::query( + " + SELECT COUNT(1) FROM users + WHERE appservice_id IS NULL AND is_guest = 0 + ", + ) + .fetch_one(&mut *self.txn) + .await + .into_database("counting Synapse users")? + .try_get::(0) + .into_database("couldn't decode count of Synapse users table")?; + + Ok(SynapseRowCounts { users }) + } + + /// Reads Synapse users, excluding application service users (which do not need to be migrated), from the database. + pub fn read_users(&mut self) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + name, password_hash, admin, deactivated, creation_ts + FROM users + WHERE appservice_id IS NULL AND is_guest = 0 + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse users")) + } +} + +#[cfg(test)] +mod test { + // TODO test me +} diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index de62f4998..c60e250b0 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -13,7 +13,7 @@ workspace = true [dependencies] anyhow.workspace = true -async-stream = "0.3.6" +async-stream.workspace = true async-trait.workspace = true cron.workspace = true chrono.workspace = true diff --git a/misc/sqlx_update.sh b/misc/sqlx_update.sh new file mode 100755 index 000000000..6dee03da4 --- /dev/null +++ b/misc/sqlx_update.sh @@ -0,0 +1,35 @@ +#!/bin/sh +set -eu + +if [ "${DATABASE_URL+defined}" != defined ]; then + echo "You need to set DATABASE_URL" + exit 1 +fi + +if [ "$DATABASE_URL" = "postgres:" ]; then + # Hacky, but psql doesn't accept `postgres:` on its own like sqlx does + export DATABASE_URL="postgres:///" +fi + +crates_dir=$(dirname $(realpath $0))"/../crates" + +CRATES_WITH_SQLX="storage-pg syn2mas" + +for crate in $CRATES_WITH_SQLX; do + echo "=== Updating sqlx query info for $crate ===" + + if [ $crate = syn2mas ]; then + # We need to apply the syn2mas_temporary_tables.sql one-off 'migration' + # for checking the syn2mas queries + + # not evident from the help text, but psql accepts connection URLs as the dbname + psql --dbname="$DATABASE_URL" --single-transaction --file="${crates_dir}/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql" + fi + + (cd "$crates_dir/$crate" && cargo sqlx prepare) || echo "(failed to prepare for $crate)" + + if [ $crate = syn2mas ]; then + # Revert syn2mas temporary tables + psql --dbname="$DATABASE_URL" --single-transaction --file="${crates_dir}/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql" + fi +done