From 6ca91db6cd057cf2c226eae66516991a5524c2ec Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 8 Oct 2024 16:02:04 +0100 Subject: [PATCH 01/10] Foundation of new syn2mas tool --- Cargo.lock | 85 +++ Cargo.toml | 14 + clippy.toml | 2 +- crates/cli/Cargo.toml | 2 + crates/cli/src/commands/mod.rs | 6 + crates/cli/src/commands/syn2mas.rs | 83 +++ ...4c8d05c577cf8f049d8822746c7d1dbd23752.json | 16 + ...d9f96a754eba64912566e81a90bd4cbd186f0.json | 34 + ...dbc0cf9e93235ab5679d40b9caf37e022219c.json | 34 + ...664b16ebd813dfa0aa32a6d39dd5c393af299.json | 34 + ...747fcb5e79d7e8c1212b2a679c3bde908ce93.json | 16 + ...2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json | 32 + ...397e2449786972fa514905773de2dd501bd20.json | 20 + ...7cee05ac1a628e51fe61ba6dfed253e0c63c2.json | 32 + ...7cafe3f85d639452fd0593b2773997dfc7425.json | 18 + ...d57488c930fe431311e53e5e1af6fb1d4e56f.json | 18 + crates/syn2mas/Cargo.toml | 45 ++ crates/syn2mas/src/checks.rs | 24 + crates/syn2mas/src/lib.rs | 11 + crates/syn2mas/src/mas_writer.rs | 656 ++++++++++++++++++ crates/syn2mas/src/mas_writer/checks.rs | 67 ++ .../src/mas_writer/constraint_pausing.rs | 148 ++++ crates/syn2mas/src/mas_writer/locking.rs | 53 ++ .../syn2mas_revert_temporary_tables.sql | 12 + .../mas_writer/syn2mas_temporary_tables.sql | 41 ++ crates/syn2mas/src/migration.rs | 159 +++++ crates/syn2mas/src/synapse_reader.rs | 281 ++++++++ crates/tasks/Cargo.toml | 2 +- misc/sqlx_update.sh | 35 + 29 files changed, 1978 insertions(+), 2 deletions(-) create mode 100644 crates/cli/src/commands/syn2mas.rs create mode 100644 crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json create mode 100644 crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json create mode 100644 crates/syn2mas/.sqlx/query-5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c.json create mode 100644 crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json create mode 100644 crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json create mode 100644 crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json create mode 100644 crates/syn2mas/.sqlx/query-939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20.json create mode 100644 crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json create mode 100644 crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json create mode 100644 crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json create mode 100644 crates/syn2mas/Cargo.toml create mode 100644 crates/syn2mas/src/checks.rs create mode 100644 crates/syn2mas/src/lib.rs create mode 100644 crates/syn2mas/src/mas_writer.rs create mode 100644 crates/syn2mas/src/mas_writer/checks.rs create mode 100644 crates/syn2mas/src/mas_writer/constraint_pausing.rs create mode 100644 crates/syn2mas/src/mas_writer/locking.rs create mode 100644 crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql create mode 100644 crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql create mode 100644 crates/syn2mas/src/migration.rs create mode 100644 crates/syn2mas/src/synapse_reader.rs create mode 100755 misc/sqlx_update.sh diff --git a/Cargo.lock b/Cargo.lock index dfca3105d..96b82af2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -864,6 +864,15 @@ dependencies = [ "serde", ] +[[package]] +name = "castaway" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0abae9be0aaf9ea96a3b1b8b1b55c602ca751eba1b1500220cea4ecbafe7c0d5" +dependencies = [ + "rustversion", +] + [[package]] name = "cbc" version = "0.1.2" @@ -1078,6 +1087,20 @@ dependencies = [ "memchr", ] +[[package]] +name = "compact_str" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6050c3a16ddab2e412160b31f2c871015704239bca62f72f6e5f0be631d3f644" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "rustversion", + "ryu", + "static_assertions", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -3204,8 +3227,10 @@ dependencies = [ "serde_json", "serde_yaml", "sqlx", + "syn2mas", "tokio", "tokio-util", + "tokio-stream", "tower", "tower-http", "tracing", @@ -6044,6 +6069,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "static_assertions_next" version = "1.1.2" @@ -6106,6 +6137,38 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn2mas" +version = "0.12.0" +dependencies = [ + "anyhow", + "async-stream", + "async-trait", + "chrono", + "compact_str", + "futures-util", + "mas-data-model", + "mas-iana", + "mas-jose", + "mas-storage", + "oauth2-types", + "opentelemetry-semantic-conventions", + "rand", + "rand_chacha", + "sea-query", + "sea-query-binder", + "serde", + "serde_json", + "sqlx", + "thiserror", + "thiserror-ext", + "tokio", + "tracing", + "ulid", + "url", + "uuid", +] + [[package]] name = "sync_wrapper" version = "0.1.2" @@ -6178,6 +6241,28 @@ dependencies = [ "thiserror-impl 2.0.3", ] +[[package]] +name = "thiserror-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa35fd08b65a716e1a91479b00d03ed2ef4c92371a4900ceb6ec2b332f9d71df" +dependencies = [ + "thiserror", + "thiserror-ext-derive", +] + +[[package]] +name = "thiserror-ext-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85ec5bcb8889378397e46bcd9f8ac636e9045f42851561e05a700667151abd18" +dependencies = [ + "either", + "proc-macro2", + "quote", + "syn 2.0.68", +] + [[package]] name = "thiserror-impl" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index 246e7850b..2665f27bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ mas-tasks = { path = "./crates/tasks/", version = "=0.12.0" } mas-templates = { path = "./crates/templates/", version = "=0.12.0" } mas-tower = { path = "./crates/tower/", version = "=0.12.0" } oauth2-types = { path = "./crates/oauth2-types/", version = "=0.12.0" } +syn2mas = { path = "./crates/syn2mas", version = "=0.12.0" } # OpenAPI schema generation and validation [workspace.dependencies.aide] @@ -66,6 +67,9 @@ features = ["axum", "axum-headers", "macros"] version = "7.0.11" features = ["chrono", "url", "tracing"] +[workspace.dependencies.async-stream] +version = "0.3.6" + # Utility to write and implement async traits [workspace.dependencies.async-trait] version = "0.1.83" @@ -91,6 +95,10 @@ version = "1.9.0" [workspace.dependencies.camino] version = "1.1.9" +# Memory optimisation for short strings +[workspace.dependencies.compact_str] +version = "0.8.0" + # Time utilities [workspace.dependencies.chrono] version = "0.4.38" @@ -308,11 +316,17 @@ features = [ # Custom error types [workspace.dependencies.thiserror] version = "2.0.3" +version = "1.0.64" +version = "1.0.64" +[workspace.dependencies.thiserror-ext] +version = "0.2.0" # Async runtime [workspace.dependencies.tokio] version = "1.41.1" features = ["full"] +[workspace.dependencies.tokio-stream] +version = "0.1.16" # Useful async utilities [workspace.dependencies.tokio-util] diff --git a/clippy.toml b/clippy.toml index ac0f49bf4..3cbf7c74c 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,4 +1,4 @@ -doc-valid-idents = ["OpenID", "OAuth", "..", "PostgreSQL"] +doc-valid-idents = ["OpenID", "OAuth", "..", "PostgreSQL", "SQLite"] disallowed-methods = [ { path = "rand::thread_rng", reason = "do not create rngs on the fly, pass them as parameters" }, diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 45dd9f8c6..0aa0d8597 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -38,6 +38,7 @@ serde_yaml = "0.9.34" sqlx.workspace = true tokio.workspace = true tokio-util.workspace = true +tokio-stream.workspace = true tower.workspace = true tower-http.workspace = true url.workspace = true @@ -89,6 +90,7 @@ mas-tasks.workspace = true mas-templates.workspace = true mas-tower.workspace = true oauth2-types.workspace = true +syn2mas.workspace = true [features] # Features used for the prebuilt binaries diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index eed27c2f7..97ee47230 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -19,6 +19,7 @@ mod debug; mod doctor; mod manage; mod server; +mod syn2mas; mod templates; mod worker; @@ -48,6 +49,10 @@ enum Subcommand { /// Run diagnostics on the deployment Doctor(self::doctor::Options), + + /// Migrate from Synapse's built-in auth system to MAS. + #[clap(name = "syn2mas")] + Syn2Mas(self::syn2mas::Options), } #[derive(Parser, Debug)] @@ -74,6 +79,7 @@ impl Options { Some(S::Templates(c)) => Box::pin(c.run(figment)).await, Some(S::Debug(c)) => Box::pin(c.run(figment)).await, Some(S::Doctor(c)) => Box::pin(c.run(figment)).await, + Some(S::Syn2Mas(c)) => Box::pin(c.run(figment)).await, None => Box::pin(self::server::Options::default().run(figment)).await, } } diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs new file mode 100644 index 000000000..b814763ed --- /dev/null +++ b/crates/cli/src/commands/syn2mas.rs @@ -0,0 +1,83 @@ +use std::process::ExitCode; + +use anyhow::Context; +use clap::Parser; +use figment::Figment; +use mas_config::{ConfigurationSectionExt, DatabaseConfig}; +use rand::thread_rng; +use sqlx::{Connection, Either, PgConnection}; +use syn2mas::{LockedMasDatabase, MasWriter, SynapseReader}; +use tracing::{error, warn}; + +use crate::util::database_connection_from_config; + +#[derive(Parser, Debug)] +pub(super) struct Options { + #[command(subcommand)] + subcommand: Subcommand, + + /// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is only suitable for TESTING. + /// If you want to use this tool anyway, please pass this argument. + /// + /// If you want to migrate from Synapse to MAS today, please use the Node.js-based tool in the MAS repository. + #[clap(long = "i-swear-i-am-just-testing-in-a-staging-environment")] + experimental_accepted: bool, +} + +#[derive(Parser, Debug)] +enum Subcommand { + Check, + Migrate, +} + +/// The number of parallel writing transactions active against the MAS database. +const NUM_WRITER_CONNECTIONS: usize = 8; + +impl Options { + pub async fn run(self, figment: &Figment) -> anyhow::Result { + warn!("This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. Do not use it, except for TESTING."); + if !self.experimental_accepted { + error!("Please agree that you can only use this tool for testing."); + return Ok(ExitCode::FAILURE); + } + + // TODO allow configuring the synapse database location + let mut syn_conn = PgConnection::connect("postgres:///fakesyn").await.unwrap(); + + let config = DatabaseConfig::extract_or_default(figment)?; + + let mut mas_connection = database_connection_from_config(&config).await?; + + let Either::Left(mut mas_connection) = LockedMasDatabase::try_new(&mut mas_connection) + .await + .context("failed to issue query to lock database")? + else { + error!("Failed to acquire syn2mas lock on the database."); + error!("This likely means that another syn2mas instance is already running!"); + return Ok(ExitCode::FAILURE); + }; + + syn2mas::mas_pre_migration_checks(&mut mas_connection).await?; + syn2mas::synapse_pre_migration_checks(&mut syn_conn).await?; + + let mut reader = SynapseReader::new(&mut syn_conn, true).await?; + let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS); + for _ in 0..NUM_WRITER_CONNECTIONS { + writer_mas_connections.push(database_connection_from_config(&config).await?); + } + let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?; + + // TODO is this rng ok? + #[allow(clippy::disallowed_methods)] + let mut rng = thread_rng(); + + // TODO progress reporting + // TODO allow configuring the server name + syn2mas::migrate(&mut reader, &mut writer, "matrix.org", &mut rng).await?; + + reader.finish().await?; + writer.finish().await?; + + Ok(ExitCode::SUCCESS) + } +} diff --git a/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json b/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json new file mode 100644 index 000000000..c7f5fce5e --- /dev/null +++ b/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas_restore_indices (name, table_name, definition)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752" +} diff --git a/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json b/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json new file mode 100644 index 000000000..68b0722e1 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT indexname AS \"name!\", indexdef AS \"definition!\", schemaname AS \"table_name!\"\n FROM pg_indexes\n WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 1, + "name": "definition!", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "table_name!", + "type_info": "Name" + } + ], + "parameters": { + "Left": [ + "Name" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0" +} diff --git a/crates/syn2mas/.sqlx/query-5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c.json b/crates/syn2mas/.sqlx/query-5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c.json new file mode 100644 index 000000000..0eeeb3a62 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype IN ('f', 'p ', 'u') AND conrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name!", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 2, + "name": "definition!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null, + false, + null + ] + }, + "hash": "5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c" +} diff --git a/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json b/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json new file mode 100644 index 000000000..3dcc1fc48 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype = 'f' AND confrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name!", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 2, + "name": "definition!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null, + false, + null + ] + }, + "hash": "5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299" +} diff --git a/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json b/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json new file mode 100644 index 000000000..855da3ba6 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas_restore_constraints (name, table_name, definition)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93" +} diff --git a/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json b/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json new file mode 100644 index 000000000..759cc5f8b --- /dev/null +++ b/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT table_name, name, definition FROM syn2mas_restore_constraints ORDER BY order_key", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "definition", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84" +} diff --git a/crates/syn2mas/.sqlx/query-939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20.json b/crates/syn2mas/.sqlx/query-939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20.json new file mode 100644 index 000000000..9a98edf47 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema\n AND tablename IN ('syn2mas_restore_constraints', 'syn2mas_restore_indices')\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "_dummy", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20" +} diff --git a/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json b/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json new file mode 100644 index 000000000..9ae8f1e35 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT table_name, name, definition FROM syn2mas_restore_indices ORDER BY order_key", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "definition", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2" +} diff --git a/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json b/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json new file mode 100644 index 000000000..efa2c4d24 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_passwords\n (user_password_id, user_id, hashed_password, created_at, version)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray", + "Int4Array" + ] + }, + "nullable": [] + }, + "hash": "c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425" +} diff --git a/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json b/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json new file mode 100644 index 000000000..d8be21736 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__users\n (user_id, username, created_at, locked_at, can_request_admin)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray", + "BoolArray" + ] + }, + "nullable": [] + }, + "hash": "c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f" +} diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml new file mode 100644 index 000000000..03f1beb2b --- /dev/null +++ b/crates/syn2mas/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "syn2mas" +version.workspace = true +license.workspace = true +authors.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# TODO remove anything unnecessary! :-) +anyhow.workspace = true +thiserror.workspace = true +thiserror-ext.workspace = true + +async-stream.workspace = true +async-trait.workspace = true +tokio.workspace = true +sqlx.workspace = true +sea-query.workspace = true +sea-query-binder.workspace = true +chrono.workspace = true +compact_str.workspace = true +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +futures-util = "0.3.30" +opentelemetry-semantic-conventions.workspace = true + +rand.workspace = true +rand_chacha = "0.3.1" +url.workspace = true +uuid = "1.10.0" +ulid = { workspace = true, features = ["uuid"] } + +oauth2-types.workspace = true +mas-storage.workspace = true +mas-data-model.workspace = true +mas-iana.workspace = true +mas-jose.workspace = true + +[lints] +workspace = true diff --git a/crates/syn2mas/src/checks.rs b/crates/syn2mas/src/checks.rs new file mode 100644 index 000000000..54a40125f --- /dev/null +++ b/crates/syn2mas/src/checks.rs @@ -0,0 +1,24 @@ +//! # Checks +//! +//! This module provides safety checks to run against a Synapse database before running the Synapse-to-MAS migration. + +use sqlx::PgConnection; +use thiserror::Error; + +use crate::mas_writer; + +#[derive(Debug, Error)] +pub enum Error { + #[error("problem with MAS database: {0}")] + MasDatabase(mas_writer::checks::Error), + + #[error("query failed: {0}")] + Sqlx(#[from] sqlx::Error), +} + +pub async fn synapse_pre_migration_checks( + synapse_connection: &mut PgConnection, +) -> Result<(), Error> { + // TODO check that the database looks like a Synapse database and is sane for migration + Ok(()) +} diff --git a/crates/syn2mas/src/lib.rs b/crates/syn2mas/src/lib.rs new file mode 100644 index 000000000..e3130b0bd --- /dev/null +++ b/crates/syn2mas/src/lib.rs @@ -0,0 +1,11 @@ +mod mas_writer; +mod synapse_reader; + +mod checks; +mod migration; + +pub use checks::synapse_pre_migration_checks; +pub use mas_writer::locking::LockedMasDatabase; +pub use mas_writer::{checks::mas_pre_migration_checks, MasWriter}; +pub use migration::migrate; +pub use synapse_reader::SynapseReader; diff --git a/crates/syn2mas/src/mas_writer.rs b/crates/syn2mas/src/mas_writer.rs new file mode 100644 index 000000000..62e4629d5 --- /dev/null +++ b/crates/syn2mas/src/mas_writer.rs @@ -0,0 +1,656 @@ +//! # MAS Writer +//! +//! This module is responsible for writing new records to MAS' database. + +use std::fmt::Display; + +use chrono::{DateTime, Utc}; +use futures_util::{future::BoxFuture, TryStreamExt}; +use sqlx::{query, query_as, Executor, PgConnection}; +use thiserror::Error; +use thiserror_ext::{Construct, ContextInto}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tracing::{error, info, warn}; +use uuid::Uuid; + +use self::{ + constraint_pausing::{ConstraintDescription, IndexDescription}, + locking::LockedMasDatabase, +}; + +pub mod checks; +pub mod locking; + +mod constraint_pausing; + +#[derive(Debug, Error, Construct, ContextInto)] +pub enum Error { + #[error("database error whilst {context}: {source}")] + Database { + source: sqlx::Error, + context: String, + }, + + #[error("writer connection pool shut down due to error")] + #[allow(clippy::enum_variant_names)] + WriterConnectionPoolError, + + #[error("inconsistent database: {0}")] + Inconsistent(String), + + #[error("{0}")] + Multiple(MultipleErrors), +} + +#[derive(Debug)] +pub struct MultipleErrors { + errors: Vec, +} + +impl Display for MultipleErrors { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "multiple errors")?; + for error in &self.errors { + write!(f, "\n- {error}")?; + } + Ok(()) + } +} + +impl From> for MultipleErrors { + fn from(value: Vec) -> Self { + MultipleErrors { errors: value } + } +} + +struct WriterConnectionPool { + /// How many connections are in circulation + num_connections: usize, + + /// A receiver handle to get a writer connection + /// The writer connection will be mid-transaction! + connection_rx: Receiver>, + + /// A sender handle to return a writer connection to the pool + /// The connection should still be mid-transaction! + connection_tx: Sender>, +} + +impl WriterConnectionPool { + pub fn new(connections: Vec) -> Self { + let num_connections = connections.len(); + let (connection_tx, connection_rx) = mpsc::channel(num_connections); + for connection in connections { + connection_tx + .try_send(Ok(connection)) + .expect("there should be room for this connection"); + } + + WriterConnectionPool { + num_connections, + connection_rx, + connection_tx, + } + } + + pub async fn spawn_with_connection(&mut self, task: F) -> Result<(), Error> + where + F: for<'conn> FnOnce(&'conn mut PgConnection) -> BoxFuture<'conn, Result<(), Error>> + + Send + + Sync + + 'static, + { + match self.connection_rx.recv().await { + Some(Ok(mut connection)) => { + let connection_tx = self.connection_tx.clone(); + tokio::task::spawn(async move { + let to_return = match task(&mut connection).await { + Ok(()) => Ok(connection), + Err(error) => { + error!("error in writer: {error}"); + Err(error) + } + }; + // This should always succeed in sending unless we're already shutting + // down for some other reason. + let _: Result<_, _> = connection_tx.send(to_return).await; + }); + + Ok(()) + } + Some(Err(error)) => { + // This should always succeed in sending unless we're already shutting + // down for some other reason. + let _: Result<_, _> = self.connection_tx.send(Err(error)).await; + + Err(Error::WriterConnectionPoolError) + } + None => { + unreachable!("we still hold a reference to the sender, so this shouldn't happen") + } + } + } + + /// Finishes writing to the database, committing all changes. + /// + /// # Errors + /// + /// - If any errors were returned to the pool. + /// - If committing the changes failed. + /// + /// # Panics + /// + /// - If connections were not returned to the pool. (This indicates a serious bug.) + pub async fn finish(self) -> Result<(), Vec> { + let mut errors = Vec::new(); + + let Self { + num_connections, + mut connection_rx, + connection_tx, + } = self; + // Drop the sender handle so we gracefully allow the receiver to close + drop(connection_tx); + + let mut finished_connections = 0; + + while let Some(connection_or_error) = connection_rx.recv().await { + finished_connections += 1; + + match connection_or_error { + Ok(mut connection) => { + if let Err(err) = query("COMMIT;").execute(&mut connection).await { + errors.push(err.into_database("commit writer transaction")); + } + } + Err(error) => { + errors.push(error); + } + } + } + assert_eq!(finished_connections, num_connections, "syn2mas had a bug: connections went missing {finished_connections} != {num_connections}"); + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} + +pub struct MasWriter<'c> { + conn: LockedMasDatabase<'c>, + writer_pool: WriterConnectionPool, + + indices_to_restore: Vec, + constraints_to_restore: Vec, +} + +pub struct MasNewUser { + pub user_id: Uuid, + pub username: String, + pub created_at: DateTime, + pub locked_at: Option>, + pub can_request_admin: bool, +} + +pub struct MasNewUserPassword { + pub user_password_id: Uuid, + pub user_id: Uuid, + pub hashed_password: String, + pub created_at: DateTime, +} + +/// List of all MAS tables that are written to by syn2mas. +pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &["users", "user_passwords"]; + +/// Detect whether a syn2mas migration has started on the given database. +/// +/// Concretly, this checks for the presence of syn2mas restoration tables. +/// +/// Returns `true` if syn2mas has started, or `false` if it hasn't. +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - If any database error occurs whilst querying the database. +/// - If some, but not all, syn2mas restoration tables are present. +/// (This shouldn't be possible without syn2mas having been sabotaged!) +pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result { + // Check if there is a resumption table... + let num_resumption_tables = query!( + r#" + SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema + AND tablename IN ('syn2mas_restore_constraints', 'syn2mas_restore_indices') + "# + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to query count of resumption tables")? + .len(); + + match num_resumption_tables { + 0 => Ok(false), + 2 => Ok(true), + _other => Err(Error::inconsistent( + "some, but not all, syn2mas resumption tables were found", + )), + } +} + +impl<'conn> MasWriter<'conn> { + /// Creates a new MAS writer. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database connection experiences an error. + #[allow(clippy::missing_panics_doc)] // not real + pub async fn new( + mut conn: LockedMasDatabase<'conn>, + mut writer_connections: Vec, + ) -> Result { + // Acquire an advisory lock on the database. + // This lets us know that there is no other instance of syn2mas active. + // Given that we don't have any concurrent transactions here, + // the READ COMMITTED isolation level is sufficient. + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + let syn2mas_started = is_syn2mas_in_progress(conn.as_mut()).await?; + + let indices_to_restore; + let constraints_to_restore; + + if syn2mas_started { + // We are resuming from a partially-done syn2mas migration + // We should reset the database so that we're starting from scratch. + warn!("Partial syn2mas migration has already been done; resetting."); + for table in MAS_TABLES_AFFECTED_BY_MIGRATION { + query(&format!("TRUNCATE syn2mas__{table};")) + .execute(conn.as_mut()) + .await + .into_database_with(|| format!("failed to truncate table syn2mas__{table}"))?; + } + + indices_to_restore = query_as!( + IndexDescription, + "SELECT table_name, name, definition FROM syn2mas_restore_indices ORDER BY order_key" + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to get syn2mas restore data (index descriptions)")?; + constraints_to_restore = query_as!( + ConstraintDescription, + "SELECT table_name, name, definition FROM syn2mas_restore_constraints ORDER BY order_key" + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to get syn2mas restore data (constraint descriptions)")?; + } else { + info!("Starting new syn2mas migration"); + + conn.as_mut() + .execute_many(include_str!("mas_writer/syn2mas_temporary_tables.sql")) + // We don't care about any query results + .try_collect::>() + .await + .into_database("could not create temporary tables")?; + + // Pause (temporarily drop) indices and constraints in order to improve + // performance of bulk data loading. + (indices_to_restore, constraints_to_restore) = + Self::pause_indices(conn.as_mut()).await?; + + // Persist these index and constraint definitions. + for IndexDescription { + name, + table_name, + definition, + } in &indices_to_restore + { + query!( + r#" + INSERT INTO syn2mas_restore_indices (name, table_name, definition) + VALUES ($1, $2, $3) + "#, + name, + table_name, + definition + ) + .execute(conn.as_mut()) + .await + .into_database("failed to save restore data (index)")?; + } + for ConstraintDescription { + name, + table_name, + definition, + } in &constraints_to_restore + { + query!( + r#" + INSERT INTO syn2mas_restore_constraints (name, table_name, definition) + VALUES ($1, $2, $3) + "#, + name, + table_name, + definition + ) + .execute(conn.as_mut()) + .await + .into_database("failed to save restore data (index)")?; + } + } + + query("COMMIT;") + .execute(conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + // Now after all the schema changes have been done, begin writer transactions + for writer_connection in &mut writer_connections { + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(&mut *writer_connection) + .await + .into_database("begin MAS writer transaction")?; + } + + Ok(Self { + conn, + writer_pool: WriterConnectionPool::new(writer_connections), + indices_to_restore, + constraints_to_restore, + }) + } + + async fn pause_indices( + conn: &mut PgConnection, + ) -> Result<(Vec, Vec), Error> { + let mut indices_to_restore = Vec::new(); + let mut constraints_to_restore = Vec::new(); + + for &unprefixed_table in MAS_TABLES_AFFECTED_BY_MIGRATION { + let table = format!("syn2mas__{unprefixed_table}"); + // First drop incoming foreign key constraints + for constraint in + constraint_pausing::describe_foreign_key_constraints_to_table(&mut *conn, &table) + .await? + { + constraint_pausing::drop_constraint(&mut *conn, &constraint).await?; + constraints_to_restore.push(constraint); + } + // After all incoming foreign key constraints have been removed, + // we can now drop internal constraints. + for constraint in + constraint_pausing::describe_constraints_on_table(&mut *conn, &table).await? + { + constraint_pausing::drop_constraint(&mut *conn, &constraint).await?; + constraints_to_restore.push(constraint); + } + // After all constraints have been removed, we can drop indices. + for index in constraint_pausing::describe_indices_on_table(&mut *conn, &table).await? { + constraint_pausing::drop_index(&mut *conn, &index).await?; + indices_to_restore.push(index); + } + } + + Ok((indices_to_restore, constraints_to_restore)) + } + + async fn restore_indices<'a>( + conn: &mut LockedMasDatabase<'a>, + indices_to_restore: &[IndexDescription], + constraints_to_restore: &[ConstraintDescription], + ) -> Result<(), Error> { + // First restore all indices. The order is not important as far as I know. + // However the indices are needed before constraints. + for index in indices_to_restore.iter().rev() { + constraint_pausing::restore_index(conn.as_mut(), index).await?; + } + // Then restore all constraints. + // The order here is the reverse of drop order, since some constraints may rely + // on other constraints to work. + for constraint in constraints_to_restore.iter().rev() { + constraint_pausing::restore_constraint(conn.as_mut(), constraint).await?; + } + Ok(()) + } + + /// Finish writing to the MAS database, flushing and committing all changes. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database connection experiences an error. + pub async fn finish(mut self) -> Result<(), Error> { + // Commit all writer transactions to the database. + self.writer_pool + .finish() + .await + .map_err(|errors| Error::Multiple(MultipleErrors::from(errors)))?; + + // Now all the data has been migrated, finish off by restoring indices and constraints! + + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(self.conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + Self::restore_indices( + &mut self.conn, + &self.indices_to_restore, + &self.constraints_to_restore, + ) + .await?; + + self.conn + .as_mut() + .execute_many(include_str!( + "mas_writer/syn2mas_revert_temporary_tables.sql" + )) + // We don't care about any query results + .try_collect::>() + .await + .into_database("could not revert temporary tables")?; + + query("COMMIT;") + .execute(self.conn.as_mut()) + .await + .into_database("ending MAS transaction")?; + + self.conn + .unlock() + .await + .into_database("could not unlock MAS database")?; + + Ok(()) + } + + /// Write a batch of users to the database. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database writer connection pool had an error. + #[allow(clippy::missing_panics_doc)] // not a real panic + pub async fn write_users(&mut self, users: Vec) -> Result<(), Error> { + self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { + // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement + // without having to change the statement SQL thus altering the query plan. + // See . + // In the future we could consider using sqlx's support for `PgCopyIn` / the `COPY FROM STDIN` statement, + // which is allegedly the best for insert performance, but is less simple to encode. + if users.is_empty() { + return Ok(()); + } + + let mut user_ids: Vec = Vec::with_capacity(users.len()); + let mut usernames: Vec = Vec::with_capacity(users.len()); + let mut created_ats: Vec> = Vec::with_capacity(users.len()); + let mut locked_ats: Vec>> = Vec::with_capacity(users.len()); + let mut can_request_admins: Vec = Vec::with_capacity(users.len()); + for MasNewUser { + user_id, + username, + created_at, + locked_at, + can_request_admin, + } in users + { + user_ids.push(user_id); + usernames.push(username); + created_ats.push(created_at); + locked_ats.push(locked_at); + can_request_admins.push(can_request_admin); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__users + (user_id, username, created_at, locked_at, can_request_admin) + SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[]) + "#, + &user_ids[..], + &usernames[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &locked_ats[..] as &[Option>], + &can_request_admins[..], + ).execute(&mut *conn).await.into_database("writing users to MAS")?; + + Ok(()) + })).await + } + + /// Write a batch of user passwords to the database. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database writer connection pool had an error. + #[allow(clippy::missing_panics_doc)] // not a real panic + pub async fn write_passwords( + &mut self, + passwords: Vec, + ) -> Result<(), Error> { + self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { + if passwords.is_empty() { + return Ok(()); + } + + let mut user_password_ids: Vec = Vec::with_capacity(passwords.len()); + let mut user_ids: Vec = Vec::with_capacity(passwords.len()); + let mut hashed_passwords: Vec = Vec::with_capacity(passwords.len()); + let mut created_ats: Vec> = Vec::with_capacity(passwords.len()); + let mut versions: Vec = Vec::with_capacity(passwords.len()); + for MasNewUserPassword { + user_password_id, + user_id, + hashed_password, + created_at, + } in passwords + { + user_password_ids.push(user_password_id); + user_ids.push(user_id); + hashed_passwords.push(hashed_password); + created_ats.push(created_at); + // TODO hardcoding version to `1` may not be correct long-term? + versions.push(1); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__user_passwords + (user_password_id, user_id, hashed_password, created_at, version) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[]) + "#, + &user_password_ids[..], + &user_ids[..], + &hashed_passwords[..], + &created_ats[..], + &versions[..], + ).execute(&mut *conn).await.into_database("writing users to MAS")?; + + Ok(()) + })).await + } +} + +// How many entries to buffer at once, before writing a batch of rows to the database. +// TODO tune: didn't see that much difference between 4k and 64k +// (4k: 13.5~14, 64k: 12.5~13s — streaming the whole way would be better, especially for DB latency, but probably fiiine +// and also we won't be able to stream to two tables at once...) +const WRITE_BUFFER_BATCH_SIZE: usize = 4096; + +// TODO should split this out into the different stages +pub struct MasUserWriteBuffer<'writer, 'conn> { + users: Vec, + passwords: Vec, + writer: &'writer mut MasWriter<'conn>, +} + +impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> { + pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self { + MasUserWriteBuffer { + users: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + passwords: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + writer, + } + } + + pub async fn finish(mut self) -> Result<(), Error> { + self.flush_users().await?; + self.flush_passwords().await?; + Ok(()) + } + + pub async fn flush_users(&mut self) -> Result<(), Error> { + // via copy: 13s + // not via copy: 14s + // difference probably gets worse with latency + self.writer + .write_users(std::mem::take(&mut self.users)) + .await?; + + self.users.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + Ok(()) + } + + pub async fn flush_passwords(&mut self) -> Result<(), Error> { + self.writer + .write_passwords(std::mem::take(&mut self.passwords)) + .await?; + self.passwords.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + + Ok(()) + } + + pub async fn write_user(&mut self, user: MasNewUser) -> Result<(), Error> { + self.users.push(user); + if self.users.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_users().await?; + } + Ok(()) + } + + pub async fn write_password(&mut self, password: MasNewUserPassword) -> Result<(), Error> { + self.passwords.push(password); + if self.passwords.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_passwords().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + // TODO test me +} diff --git a/crates/syn2mas/src/mas_writer/checks.rs b/crates/syn2mas/src/mas_writer/checks.rs new file mode 100644 index 000000000..f05de21ce --- /dev/null +++ b/crates/syn2mas/src/mas_writer/checks.rs @@ -0,0 +1,67 @@ +//! # MAS Database Checks +//! +//! This module provides safety checks to run against a MAS database before running the Synapse-to-MAS migration. + +use thiserror::Error; +use thiserror_ext::ContextInto; + +use super::{is_syn2mas_in_progress, locking::LockedMasDatabase, MAS_TABLES_AFFECTED_BY_MIGRATION}; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("the MAS database is not empty: rows found in at least `{table}`")] + MasDatabaseNotEmpty { table: &'static str }, + + #[error("query against {table} failed — is this actually a MAS database?: {source}")] + MaybeNotMas { + source: sqlx::Error, + table: &'static str, + }, + + #[error("query failed: {0}")] + Sqlx(#[from] sqlx::Error), + + #[error("unable to check if syn2mas is already in progress: {0}")] + UnableToCheckInProgress(super::Error), +} + +/// Check that a MAS database is ready for being migrated to. +/// +/// Concretely, this checks that the database is empty. +/// +/// If syn2mas is already in progress on this database, the checks are skipped. +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - If any database access error occurs. +/// - If any MAS tables involved in the migration are not empty. +/// - If we can't check whether syn2mas is already in progress on this database or not. +pub async fn mas_pre_migration_checks<'a>( + mas_connection: &mut LockedMasDatabase<'a>, +) -> Result<(), Error> { + if is_syn2mas_in_progress(mas_connection.as_mut()) + .await + .map_err(Error::UnableToCheckInProgress)? + { + // syn2mas already in progress, so we already performed the checks + return Ok(()); + } + + // Check that the database looks like a MAS database and that it is also an empty database. + + for &table in MAS_TABLES_AFFECTED_BY_MIGRATION { + let row_present = sqlx::query(&format!("SELECT 1 AS dummy FROM {table} LIMIT 1")) + .fetch_optional(mas_connection.as_mut()) + .await + .into_maybe_not_mas(table)? + .is_some(); + + if row_present { + return Err(Error::MasDatabaseNotEmpty { table }); + } + } + + Ok(()) +} diff --git a/crates/syn2mas/src/mas_writer/constraint_pausing.rs b/crates/syn2mas/src/mas_writer/constraint_pausing.rs new file mode 100644 index 000000000..3af157b3c --- /dev/null +++ b/crates/syn2mas/src/mas_writer/constraint_pausing.rs @@ -0,0 +1,148 @@ +use sqlx::{FromRow, PgConnection}; +use tracing::debug; + +use super::{Error, IntoDatabase}; + +/// Description of a constraint, which allows recreating it later. +#[derive(FromRow)] +pub struct ConstraintDescription { + pub name: String, + pub table_name: String, + pub definition: String, +} + +#[derive(FromRow)] +pub struct IndexDescription { + pub name: String, + pub table_name: String, + pub definition: String, +} + +/// Look up and return the definition of a constraint. +pub async fn describe_constraints_on_table( + conn: &mut PgConnection, + table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + ConstraintDescription, + r#" + SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!" + FROM pg_constraint c + JOIN pg_namespace n ON n.oid = c.connamespace + WHERE contype IN ('f', 'p ', 'u') AND conrelid::regclass::text = $1 + AND n.nspname = current_schema; + "#, + table_name + ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read constraint definitions of {table_name}")) +} + +/// Look up and return the definitions of foreign-key constraints whose +/// target table is the one specified. +pub async fn describe_foreign_key_constraints_to_table( + conn: &mut PgConnection, + target_table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + ConstraintDescription, + r#" + SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!" + FROM pg_constraint c + JOIN pg_namespace n ON n.oid = c.connamespace + WHERE contype = 'f' AND confrelid::regclass::text = $1 + AND n.nspname = current_schema; + "#, + target_table_name + ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read FK constraint definitions targetting {target_table_name}")) +} + +/// Look up and return the definitions of all indices on a given table. +pub async fn describe_indices_on_table( + conn: &mut PgConnection, + table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + IndexDescription, + r#" + SELECT indexname AS "name!", indexdef AS "definition!", schemaname AS "table_name!" + FROM pg_indexes + WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL + "#, + table_name + ).fetch_all(&mut *conn).await.into_database("cannot search for indices") +} + +/// Drops a constraint from the database. +/// +/// The constraint must exist prior to this call. +pub async fn drop_constraint( + conn: &mut PgConnection, + constraint: &ConstraintDescription, +) -> Result<(), Error> { + let name = &constraint.name; + let table_name = &constraint.table_name; + debug!("dropping constraint {name} on table {table_name}"); + sqlx::query(&format!("ALTER TABLE {table_name} DROP CONSTRAINT {name};")) + .execute(&mut *conn) + .await + .into_database_with(|| format!("failed to drop constraint {name} on {table_name}"))?; + + Ok(()) +} + +/// Drops an index from the database. +/// +/// The index must exist prior to this call. +pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> { + let index_name = &index.name; + debug!("dropping index {index_name}"); + sqlx::query(&format!("DROP INDEX {index_name};")) + .execute(&mut *conn) + .await + .into_database_with(|| format!("failed to temporarily drop {index_name}"))?; + + Ok(()) +} + +/// Restores (recreates) a constraint. +/// +/// The constraint must not exist prior to this call. +pub async fn restore_constraint( + conn: &mut PgConnection, + constraint: &ConstraintDescription, +) -> Result<(), Error> { + let ConstraintDescription { + name, + table_name, + definition, + } = &constraint; + sqlx::query(&format!( + "ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};" + )) + .execute(conn) + .await + .into_database_with(|| { + format!("failed to recreate constraint {name} on {table_name} with {definition}") + })?; + + Ok(()) +} + +/// Restores (recreates) a index. +/// +/// The index must not exist prior to this call. +pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> { + let IndexDescription { + name, + table_name, + definition, + } = &index; + + sqlx::query(&format!("{definition};")) + .execute(conn) + .await + .into_database_with(|| { + format!("failed to recreate index {name} on {table_name} with {definition}") + })?; + + Ok(()) +} diff --git a/crates/syn2mas/src/mas_writer/locking.rs b/crates/syn2mas/src/mas_writer/locking.rs new file mode 100644 index 000000000..6aab157fa --- /dev/null +++ b/crates/syn2mas/src/mas_writer/locking.rs @@ -0,0 +1,53 @@ +use std::sync::LazyLock; + +use sqlx::{ + postgres::{PgAdvisoryLock, PgAdvisoryLockGuard}, + Either, PgConnection, +}; + +static SYN2MAS_ADVISORY_LOCK: LazyLock = + LazyLock::new(|| PgAdvisoryLock::new("syn2mas-maswriter")); + +/// A wrapper around a Postgres connection which holds a session-wide advisory lock +/// preventing concurrent access by other syn2mas instances. +pub struct LockedMasDatabase<'conn> { + inner: PgAdvisoryLockGuard<'static, &'conn mut PgConnection>, +} + +impl<'conn> LockedMasDatabase<'conn> { + /// Attempts to lock the MAS database against concurrent access by other syn2mas instances. + /// + /// If the lock can be acquired, returns a `LockedMasDatabase`. + /// If the lock cannot be acquired, returns the connection back to the caller wrapped in `Either::Right`. + /// + /// # Errors + /// + /// Errors are returned for underlying database errors. + pub async fn try_new( + mas_connection: &'conn mut PgConnection, + ) -> Result, sqlx::Error> { + SYN2MAS_ADVISORY_LOCK + .try_acquire(mas_connection) + .await + .map(|either| match either { + Either::Left(inner) => Either::Left(LockedMasDatabase { inner }), + Either::Right(unlocked) => Either::Right(unlocked), + }) + } + + /// Releases the advisory lock on the MAS database, returning the underlying + /// connection. + /// + /// # Errors + /// + /// Errors are returned for underlying database errors. + pub async fn unlock(self) -> Result<&'conn mut PgConnection, sqlx::Error> { + self.inner.release_now().await + } +} + +impl AsMut for LockedMasDatabase<'_> { + fn as_mut(&mut self) -> &mut PgConnection { + self.inner.as_mut() + } +} diff --git a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql new file mode 100644 index 000000000..d82be82c8 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql @@ -0,0 +1,12 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- This script should revert what `syn2mas_temporary_tables.sql` does. + +DROP TABLE syn2mas_restore_constraints; +DROP TABLE syn2mas_restore_indices; + +ALTER TABLE syn2mas__users RENAME TO users; +ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords; diff --git a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql new file mode 100644 index 000000000..3e5b86e40 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql @@ -0,0 +1,41 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + + +-- # syn2mas Temporary Tables +-- This file takes a MAS database and: +-- +-- 1. creates temporary tables used by syn2mas for storing restore data +-- 2. renames important tables with the `syn2mas__` prefix, to prevent +-- running MAS instances from having any opportunity to see or modify +-- the partial data in the database, especially whilst it is not protected +-- by constraints. +-- +-- All changes in this file must be reverted by `syn2mas_revert_temporary_tables.sql` +-- in the same directory. + +-- corresponds to `ConstraintDescription` +CREATE TABLE syn2mas_restore_constraints ( + -- synthetic auto-incrementing ID so we can load these in order + order_key SERIAL NOT NULL PRIMARY KEY, + + table_name TEXT NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL +); + +-- corresponds to `IndexDescription` +CREATE TABLE syn2mas_restore_indices ( + -- synthetic auto-incrementing ID so we can load these in order + order_key SERIAL NOT NULL PRIMARY KEY, + + table_name TEXT NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL +); + +-- Now we rename all tables that we touch during the migration. +ALTER TABLE users RENAME TO syn2mas__users; +ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords; diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs new file mode 100644 index 000000000..33f986205 --- /dev/null +++ b/crates/syn2mas/src/migration.rs @@ -0,0 +1,159 @@ +//! # Migration +//! +//! This module provides the high-level logic for performing the Synapse-to-MAS database migration. +//! +//! This module does not implement any of the safety checks that should be run *before* the migration. + +use std::{collections::HashMap, pin::pin}; + +use compact_str::CompactString; +use futures_util::StreamExt as _; +use rand::rngs::ThreadRng; +use thiserror::Error; +use thiserror_ext::ContextInto; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{ + mas_writer::{self, MasNewUser, MasNewUserPassword, MasUserWriteBuffer, MasWriter}, + synapse_reader::{self, ExtractLocalpartError, FullUserId, SynapseUser}, + SynapseReader, +}; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("error when reading synapse DB ({context}): {source}")] + Synapse { + source: synapse_reader::Error, + context: String, + }, + #[error("error when writing to MAS DB ({context}): {source}")] + Mas { + source: mas_writer::Error, + context: String, + }, + #[error("failed to extract localpart of {user:?}: {source}")] + ExtractLocalpart { + source: ExtractLocalpartError, + user: FullUserId, + }, +} + +struct UsersMigrated { + /// Lookup table from user localpart to that user's UUID in MAS. + user_localparts_to_uuid: HashMap, +} + +/// Performs a migration from Synapse's database to MAS' database. +/// +/// # Panics +/// +/// - If there are more than `usize::MAX` users +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - An underlying database access error, either to MAS or to Synapse. +/// - Invalid data in the Synapse database. +pub async fn migrate( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + rng: &mut ThreadRng, +) -> Result<(), Error> { + let counts = synapse.count_rows().await.into_synapse("counting users")?; + + migrate_users( + synapse, + mas, + counts + .users + .try_into() + .expect("More than usize::MAX users — wow!"), + server_name, + rng, + ) + .await?; + + Ok(()) +} + +async fn migrate_users( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + user_count_hint: usize, + server_name: &str, + rng: &mut ThreadRng, +) -> Result { + let mut write_buffer = MasUserWriteBuffer::new(mas); + let mut users_stream = pin!(synapse.read_users()); + // TODO is 1:1 capacity enough for a hashmap? + let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint); + + while let Some(user_res) = users_stream.next().await { + let user = user_res.into_synapse("reading user")?; + let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?; + + user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id); + + write_buffer + .write_user(mas_user) + .await + .into_mas("writing user")?; + + if let Some(mas_password) = mas_password_opt { + write_buffer + .write_password(mas_password) + .await + .into_mas("writing password")?; + } + } + + write_buffer + .finish() + .await + .into_mas("writing users & passwords")?; + + Ok(UsersMigrated { + user_localparts_to_uuid, + }) +} + +fn transform_user( + user: &SynapseUser, + server_name: &str, + rng: &mut ThreadRng, +) -> Result<(MasNewUser, Option), Error> { + let username = user + .name + .extract_localpart(server_name) + .into_extract_localpart(user.name.clone())? + .to_owned(); + + let new_user = MasNewUser { + user_id: Uuid::from(Ulid::from_datetime_with_source( + user.creation_ts.0.into(), + rng, + )), + username, + created_at: user.creation_ts.0, + locked_at: user.deactivated.0.then_some(user.creation_ts.0), + can_request_admin: user.admin.0, + }; + + let mas_password = user + .password_hash + .clone() + .map(|password_hash| MasNewUserPassword { + user_password_id: Uuid::from(Ulid::from_datetime_with_source( + user.creation_ts.0.into(), + rng, + )), + user_id: new_user.user_id, + hashed_password: password_hash, + created_at: new_user.created_at, + }); + + Ok((new_user, mas_password)) +} diff --git a/crates/syn2mas/src/synapse_reader.rs b/crates/syn2mas/src/synapse_reader.rs new file mode 100644 index 000000000..75a4f578c --- /dev/null +++ b/crates/syn2mas/src/synapse_reader.rs @@ -0,0 +1,281 @@ +//! # Synapse Database Reader +//! +//! This module provides facilities for streaming relevant types of database records from a Synapse database. + +use async_stream::stream; +use chrono::{DateTime, Utc}; +use futures_util::Stream; +use sea_query::{enum_def, Expr, Iden, PostgresQueryBuilder, Query}; +use sea_query_binder::SqlxBinder; +use sqlx::{query, query_with, FromRow, PgConnection, Postgres, Row, Type}; +use thiserror::Error; +use thiserror_ext::ContextInto; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("database error whilst {context}: {source}")] + Database { + source: sqlx::Error, + context: String, + }, +} + +#[derive(Clone, Debug, sqlx::Decode)] +pub struct FullUserId(pub String); + +impl Type for FullUserId { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +#[derive(Debug, Error)] +pub enum ExtractLocalpartError { + #[error("user ID does not start with `@` sigil")] + NoAtSigil, + #[error("user ID does not have a `:` separator")] + NoSeparator, + #[error("wrong server name: expected {expected:?}, got {found:?}")] + WrongServerName { expected: String, found: String }, +} + +impl FullUserId { + /// Extract the localpart from the User ID, asserting that the User ID has the correct + /// server name. + /// + /// # Errors + /// + /// A handful of basic validity checks are performed and an error may be returned + /// if the User ID is not valid. + /// However, the User ID grammar is not checked fully. + /// + /// If the wrong server name is asserted, returns an error. + pub fn extract_localpart( + &self, + expected_server_name: &str, + ) -> Result<&str, ExtractLocalpartError> { + let Some(without_sigil) = self.0.strip_prefix('@') else { + return Err(ExtractLocalpartError::NoAtSigil); + }; + + let Some((localpart, server_name)) = without_sigil.split_once(':') else { + return Err(ExtractLocalpartError::NoSeparator); + }; + + if server_name != expected_server_name { + return Err(ExtractLocalpartError::WrongServerName { + expected: expected_server_name.to_owned(), + found: server_name.to_owned(), + }); + }; + + Ok(localpart) + } +} + +/// A Synapse boolean. +/// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite versions +/// that did not have native boolean support. +#[derive(Clone, Debug)] +pub struct SynapseBool(pub bool); + +impl<'r> sqlx::Decode<'r, Postgres> for SynapseBool { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + >::decode(value) + .map(|boolean_int| SynapseBool(boolean_int != 0)) + } +} + +impl sqlx::Type for SynapseBool { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +/// A timestamp stored as the number of seconds since the Unix epoch. +/// Note that Synapse stores MOST timestamps as numbers of **milliseconds** since the Unix epoch. +/// But some timestamps are still stored in seconds. +#[derive(Clone, Debug)] +pub struct SecondsTimestamp(pub DateTime); + +impl<'r> sqlx::Decode<'r, Postgres> for SecondsTimestamp { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + >::decode(value).map(|milliseconds_since_epoch| { + SecondsTimestamp(DateTime::from_timestamp_nanos( + milliseconds_since_epoch * 1_000_000_000, + )) + }) + } +} + +impl sqlx::Type for SecondsTimestamp { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +#[derive(Clone, Debug, FromRow)] +#[enum_def(table_name = "users")] +pub struct SynapseUser { + /// Full User ID of the user + pub name: FullUserId, + /// Password hash string for the user. Optional (null if no password is set). + pub password_hash: Option, + /// Whether the user is a Synapse Admin + pub admin: SynapseBool, + /// Whether the user is deactivated + pub deactivated: SynapseBool, + /// When the user was created + pub creation_ts: SecondsTimestamp, + // TODO ... + // TODO is_guest + // TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts) +} + +#[derive(Iden)] +pub enum ExtraSynapseUserIden { + AppserviceId, + IsGuest, +} + +/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on. +/// +/// This is a safety measure against other processes changing the data underneath our feet. +/// It's still not a good idea to run Synapse at the same time as the migration. +// TODO not complete! +const TABLES_TO_LOCK: &[&str] = &["users"]; + +/// Number of migratable rows in various Synapse tables. +/// Used to estimate progress. +#[derive(Clone, Debug)] +pub struct SynapseRowCounts { + pub users: i64, +} + +pub struct SynapseReader<'c> { + conn: &'c mut PgConnection, +} + +impl<'conn> SynapseReader<'conn> { + /// Create a new Synapse reader, which entails creating a transaction and locking Synapse tables. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error + /// - If we can't lock the Synapse tables (pointing to the fact that Synapse may still be running) + pub async fn new( + synapse_connection: &'conn mut PgConnection, + dry_run: bool, + ) -> Result { + query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;") + .execute(&mut *synapse_connection) + .await + .into_database("begin transaction")?; + + let lock_type = if dry_run { + // We expect dry runs to be done alongside Synapse running, so we don't want to + // interfere with Synapse's database access in that case. + "ACCESS SHARE" + } else { + "EXCLUSIVE" + }; + for table in TABLES_TO_LOCK { + query(&format!("LOCK TABLE {table} IN {lock_type} MODE NOWAIT;")) + .execute(&mut *synapse_connection) + .await + .into_database_with(|| format!("locking Synapse table `{table}`"))?; + } + + Ok(Self { + conn: synapse_connection, + }) + } + + /// Finishes the Synapse reader, committing the transaction. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error whilst committing the transaction. + pub async fn finish(self) -> Result<(), Error> { + // TODO enforce that this is called somehow. + + query("COMMIT;") + .execute(self.conn) + .await + .into_database("end transaction")?; + Ok(()) + } + + /// Counts the rows in the Synapse database to get an estimate of how large the migration is going to be. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error + pub async fn count_rows(&mut self) -> Result { + // TODO no need for query builder here + let (sql, args) = Query::select() + .expr(Expr::val(1).count()) + .and_where(Expr::col(ExtraSynapseUserIden::AppserviceId).is_null()) + // TODO support migrating at least skeleton records for guests + .and_where(Expr::col(ExtraSynapseUserIden::IsGuest).eq(0)) + .from(SynapseUserIden::Table) + .build_sqlx(PostgresQueryBuilder); + let users = query_with(&sql, args) + .fetch_one(&mut *self.conn) + .await + .into_database("counting Synapse users")? + .try_get::(0) + .into_database("couldn't decode count of Synapse users table")?; + + Ok(SynapseRowCounts { users }) + } + + /// Reads Synapse users, excluding application service users (which do not need to be migrated), from the database. + pub fn read_users<'a, 'ret>( + &'a mut self, + ) -> impl Stream> + 'ret + where + 'conn: 'a, + 'a: 'ret, + { + // TODO no need for query builder here + let (sql, args) = Query::select() + .columns([ + SynapseUserIden::Name, + SynapseUserIden::PasswordHash, + SynapseUserIden::Admin, + SynapseUserIden::Deactivated, + SynapseUserIden::CreationTs, + ]) + .and_where(Expr::col(ExtraSynapseUserIden::AppserviceId).is_null()) + // TODO support migrating at least skeleton records for guests + .and_where(Expr::col(ExtraSynapseUserIden::IsGuest).eq(0)) + .from(SynapseUserIden::Table) + .build_sqlx(PostgresQueryBuilder); + + let conn = &mut *self.conn; + + // The async stream macro works around an issue where the QueryAs output stream borrows the SQL. + // See: https://github.com/launchbadge/sqlx/issues/1594#issuecomment-1493146479 + stream! { + for await row in sqlx::query_as_with::<_, SynapseUser, _>(&sql, args).fetch(conn) { + yield row.into_database("reading Synapse users"); + } + } + } +} + +#[cfg(test)] +mod test { + // TODO test me +} diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index de62f4998..c60e250b0 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -13,7 +13,7 @@ workspace = true [dependencies] anyhow.workspace = true -async-stream = "0.3.6" +async-stream.workspace = true async-trait.workspace = true cron.workspace = true chrono.workspace = true diff --git a/misc/sqlx_update.sh b/misc/sqlx_update.sh new file mode 100755 index 000000000..6dee03da4 --- /dev/null +++ b/misc/sqlx_update.sh @@ -0,0 +1,35 @@ +#!/bin/sh +set -eu + +if [ "${DATABASE_URL+defined}" != defined ]; then + echo "You need to set DATABASE_URL" + exit 1 +fi + +if [ "$DATABASE_URL" = "postgres:" ]; then + # Hacky, but psql doesn't accept `postgres:` on its own like sqlx does + export DATABASE_URL="postgres:///" +fi + +crates_dir=$(dirname $(realpath $0))"/../crates" + +CRATES_WITH_SQLX="storage-pg syn2mas" + +for crate in $CRATES_WITH_SQLX; do + echo "=== Updating sqlx query info for $crate ===" + + if [ $crate = syn2mas ]; then + # We need to apply the syn2mas_temporary_tables.sql one-off 'migration' + # for checking the syn2mas queries + + # not evident from the help text, but psql accepts connection URLs as the dbname + psql --dbname="$DATABASE_URL" --single-transaction --file="${crates_dir}/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql" + fi + + (cd "$crates_dir/$crate" && cargo sqlx prepare) || echo "(failed to prepare for $crate)" + + if [ $crate = syn2mas ]; then + # Revert syn2mas temporary tables + psql --dbname="$DATABASE_URL" --single-transaction --file="${crates_dir}/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql" + fi +done From cb691bb448278fe5c5046ccaa88526b3527c2b5c Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 10 Dec 2024 15:36:47 +0000 Subject: [PATCH 02/10] Misc review tweaks --- Cargo.lock | 8 +++---- Cargo.toml | 4 ++-- ...d4a86bf5758f8c32d9d41a22999b2f0698ca.json} | 4 ++-- crates/syn2mas/src/checks.rs | 2 +- crates/syn2mas/src/lib.rs | 10 ++++---- crates/syn2mas/src/mas_writer.rs | 6 ++--- crates/syn2mas/src/mas_writer/checks.rs | 9 ++++---- .../src/mas_writer/constraint_pausing.rs | 6 ++--- crates/syn2mas/src/migration.rs | 11 +++++---- crates/syn2mas/src/synapse_reader.rs | 23 +++++++++++++++---- 10 files changed, 47 insertions(+), 36 deletions(-) rename crates/syn2mas/.sqlx/{query-5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c.json => query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json} (78%) diff --git a/Cargo.lock b/Cargo.lock index 96b82af2a..d5bff614c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3229,8 +3229,8 @@ dependencies = [ "sqlx", "syn2mas", "tokio", - "tokio-util", "tokio-stream", + "tokio-util", "tower", "tower-http", "tracing", @@ -6160,7 +6160,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "thiserror", + "thiserror 2.0.3", "thiserror-ext", "tokio", "tracing", @@ -6247,7 +6247,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa35fd08b65a716e1a91479b00d03ed2ef4c92371a4900ceb6ec2b332f9d71df" dependencies = [ - "thiserror", + "thiserror 1.0.69", "thiserror-ext-derive", ] @@ -6260,7 +6260,7 @@ dependencies = [ "either", "proc-macro2", "quote", - "syn 2.0.68", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2665f27bc..caa051e58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -316,8 +316,7 @@ features = [ # Custom error types [workspace.dependencies.thiserror] version = "2.0.3" -version = "1.0.64" -version = "1.0.64" + [workspace.dependencies.thiserror-ext] version = "0.2.0" @@ -325,6 +324,7 @@ version = "0.2.0" [workspace.dependencies.tokio] version = "1.41.1" features = ["full"] + [workspace.dependencies.tokio-stream] version = "0.1.16" diff --git a/crates/syn2mas/.sqlx/query-5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c.json b/crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json similarity index 78% rename from crates/syn2mas/.sqlx/query-5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c.json rename to crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json index 0eeeb3a62..f1b8bad90 100644 --- a/crates/syn2mas/.sqlx/query-5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c.json +++ b/crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype IN ('f', 'p ', 'u') AND conrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ", + "query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ", "describe": { "columns": [ { @@ -30,5 +30,5 @@ null ] }, - "hash": "5a28af4699944f11978193a9c62dbc0cf9e93235ab5679d40b9caf37e022219c" + "hash": "12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca" } diff --git a/crates/syn2mas/src/checks.rs b/crates/syn2mas/src/checks.rs index 54a40125f..da7a9ca29 100644 --- a/crates/syn2mas/src/checks.rs +++ b/crates/syn2mas/src/checks.rs @@ -10,7 +10,7 @@ use crate::mas_writer; #[derive(Debug, Error)] pub enum Error { #[error("problem with MAS database: {0}")] - MasDatabase(mas_writer::checks::Error), + MasDatabase(#[source] mas_writer::checks::Error), #[error("query failed: {0}")] Sqlx(#[from] sqlx::Error), diff --git a/crates/syn2mas/src/lib.rs b/crates/syn2mas/src/lib.rs index e3130b0bd..afba21a5a 100644 --- a/crates/syn2mas/src/lib.rs +++ b/crates/syn2mas/src/lib.rs @@ -4,8 +4,8 @@ mod synapse_reader; mod checks; mod migration; -pub use checks::synapse_pre_migration_checks; -pub use mas_writer::locking::LockedMasDatabase; -pub use mas_writer::{checks::mas_pre_migration_checks, MasWriter}; -pub use migration::migrate; -pub use synapse_reader::SynapseReader; +pub use self::checks::synapse_pre_migration_checks; +pub use self::mas_writer::locking::LockedMasDatabase; +pub use self::mas_writer::{checks::mas_pre_migration_checks, MasWriter}; +pub use self::migration::migrate; +pub use self::synapse_reader::SynapseReader; diff --git a/crates/syn2mas/src/mas_writer.rs b/crates/syn2mas/src/mas_writer.rs index 62e4629d5..22a8cd92b 100644 --- a/crates/syn2mas/src/mas_writer.rs +++ b/crates/syn2mas/src/mas_writer.rs @@ -25,8 +25,9 @@ mod constraint_pausing; #[derive(Debug, Error, Construct, ContextInto)] pub enum Error { - #[error("database error whilst {context}: {source}")] + #[error("database error whilst {context}")] Database { + #[source] source: sqlx::Error, context: String, }, @@ -252,8 +253,6 @@ impl<'conn> MasWriter<'conn> { mut conn: LockedMasDatabase<'conn>, mut writer_connections: Vec, ) -> Result { - // Acquire an advisory lock on the database. - // This lets us know that there is no other instance of syn2mas active. // Given that we don't have any concurrent transactions here, // the READ COMMITTED isolation level is sufficient. query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") @@ -590,7 +589,6 @@ impl<'conn> MasWriter<'conn> { // and also we won't be able to stream to two tables at once...) const WRITE_BUFFER_BATCH_SIZE: usize = 4096; -// TODO should split this out into the different stages pub struct MasUserWriteBuffer<'writer, 'conn> { users: Vec, passwords: Vec, diff --git a/crates/syn2mas/src/mas_writer/checks.rs b/crates/syn2mas/src/mas_writer/checks.rs index f05de21ce..f12714d19 100644 --- a/crates/syn2mas/src/mas_writer/checks.rs +++ b/crates/syn2mas/src/mas_writer/checks.rs @@ -12,17 +12,18 @@ pub enum Error { #[error("the MAS database is not empty: rows found in at least `{table}`")] MasDatabaseNotEmpty { table: &'static str }, - #[error("query against {table} failed — is this actually a MAS database?: {source}")] + #[error("query against {table} failed — is this actually a MAS database?")] MaybeNotMas { + #[source] source: sqlx::Error, table: &'static str, }, - #[error("query failed: {0}")] + #[error(transparent)] Sqlx(#[from] sqlx::Error), - #[error("unable to check if syn2mas is already in progress: {0}")] - UnableToCheckInProgress(super::Error), + #[error("unable to check if syn2mas is already in progress")] + UnableToCheckInProgress(#[source] super::Error), } /// Check that a MAS database is ready for being migrated to. diff --git a/crates/syn2mas/src/mas_writer/constraint_pausing.rs b/crates/syn2mas/src/mas_writer/constraint_pausing.rs index 3af157b3c..a8ec914c3 100644 --- a/crates/syn2mas/src/mas_writer/constraint_pausing.rs +++ b/crates/syn2mas/src/mas_writer/constraint_pausing.rs @@ -1,17 +1,15 @@ -use sqlx::{FromRow, PgConnection}; +use sqlx::PgConnection; use tracing::debug; use super::{Error, IntoDatabase}; /// Description of a constraint, which allows recreating it later. -#[derive(FromRow)] pub struct ConstraintDescription { pub name: String, pub table_name: String, pub definition: String, } -#[derive(FromRow)] pub struct IndexDescription { pub name: String, pub table_name: String, @@ -29,7 +27,7 @@ pub async fn describe_constraints_on_table( SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!" FROM pg_constraint c JOIN pg_namespace n ON n.oid = c.connamespace - WHERE contype IN ('f', 'p ', 'u') AND conrelid::regclass::text = $1 + WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1 AND n.nspname = current_schema; "#, table_name diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 33f986205..25035fd3a 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -6,6 +6,7 @@ use std::{collections::HashMap, pin::pin}; +use chrono::{DateTime, Utc}; use compact_str::CompactString; use futures_util::StreamExt as _; use rand::rngs::ThreadRng; @@ -133,13 +134,13 @@ fn transform_user( let new_user = MasNewUser { user_id: Uuid::from(Ulid::from_datetime_with_source( - user.creation_ts.0.into(), + DateTime::::from(user.creation_ts).into(), rng, )), username, - created_at: user.creation_ts.0, - locked_at: user.deactivated.0.then_some(user.creation_ts.0), - can_request_admin: user.admin.0, + created_at: user.creation_ts.into(), + locked_at: bool::from(user.deactivated).then_some(user.creation_ts.into()), + can_request_admin: bool::from(user.admin), }; let mas_password = user @@ -147,7 +148,7 @@ fn transform_user( .clone() .map(|password_hash| MasNewUserPassword { user_password_id: Uuid::from(Ulid::from_datetime_with_source( - user.creation_ts.0.into(), + DateTime::::from(user.creation_ts).into(), rng, )), user_id: new_user.user_id, diff --git a/crates/syn2mas/src/synapse_reader.rs b/crates/syn2mas/src/synapse_reader.rs index 75a4f578c..f9a0958bc 100644 --- a/crates/syn2mas/src/synapse_reader.rs +++ b/crates/syn2mas/src/synapse_reader.rs @@ -13,8 +13,9 @@ use thiserror_ext::ContextInto; #[derive(Debug, Error, ContextInto)] pub enum Error { - #[error("database error whilst {context}: {source}")] + #[error("database error whilst {context}")] Database { + #[source] source: sqlx::Error, context: String, }, @@ -76,8 +77,8 @@ impl FullUserId { /// A Synapse boolean. /// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite versions /// that did not have native boolean support. -#[derive(Clone, Debug)] -pub struct SynapseBool(pub bool); +#[derive(Copy, Clone, Debug)] +pub struct SynapseBool(bool); impl<'r> sqlx::Decode<'r, Postgres> for SynapseBool { fn decode( @@ -94,11 +95,23 @@ impl sqlx::Type for SynapseBool { } } +impl From for bool { + fn from(SynapseBool(value): SynapseBool) -> Self { + value + } +} + /// A timestamp stored as the number of seconds since the Unix epoch. /// Note that Synapse stores MOST timestamps as numbers of **milliseconds** since the Unix epoch. /// But some timestamps are still stored in seconds. -#[derive(Clone, Debug)] -pub struct SecondsTimestamp(pub DateTime); +#[derive(Copy, Clone, Debug)] +pub struct SecondsTimestamp(DateTime); + +impl From for DateTime { + fn from(SecondsTimestamp(value): SecondsTimestamp) -> Self { + value + } +} impl<'r> sqlx::Decode<'r, Postgres> for SecondsTimestamp { fn decode( From f54911ef8f9a0ef3f1dd3c85b0eb3caa9d5c3c9a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 10 Dec 2024 15:36:54 +0000 Subject: [PATCH 03/10] tracing spans --- crates/syn2mas/src/checks.rs | 1 + crates/syn2mas/src/mas_writer.rs | 7 ++++++- crates/syn2mas/src/mas_writer/checks.rs | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/syn2mas/src/checks.rs b/crates/syn2mas/src/checks.rs index da7a9ca29..fae7e1a8c 100644 --- a/crates/syn2mas/src/checks.rs +++ b/crates/syn2mas/src/checks.rs @@ -16,6 +16,7 @@ pub enum Error { Sqlx(#[from] sqlx::Error), } +#[tracing::instrument(skip_all)] pub async fn synapse_pre_migration_checks( synapse_connection: &mut PgConnection, ) -> Result<(), Error> { diff --git a/crates/syn2mas/src/mas_writer.rs b/crates/syn2mas/src/mas_writer.rs index 22a8cd92b..1cf170a11 100644 --- a/crates/syn2mas/src/mas_writer.rs +++ b/crates/syn2mas/src/mas_writer.rs @@ -10,7 +10,7 @@ use sqlx::{query, query_as, Executor, PgConnection}; use thiserror::Error; use thiserror_ext::{Construct, ContextInto}; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tracing::{error, info, warn}; +use tracing::{error, info, warn, Level}; use uuid::Uuid; use self::{ @@ -249,6 +249,7 @@ impl<'conn> MasWriter<'conn> { /// /// - If the database connection experiences an error. #[allow(clippy::missing_panics_doc)] // not real + #[tracing::instrument(skip_all)] pub async fn new( mut conn: LockedMasDatabase<'conn>, mut writer_connections: Vec, @@ -367,6 +368,7 @@ impl<'conn> MasWriter<'conn> { }) } + #[tracing::instrument(skip_all)] async fn pause_indices( conn: &mut PgConnection, ) -> Result<(Vec, Vec), Error> { @@ -427,6 +429,7 @@ impl<'conn> MasWriter<'conn> { /// Errors are returned in the following conditions: /// /// - If the database connection experiences an error. + #[tracing::instrument(skip_all)] pub async fn finish(mut self) -> Result<(), Error> { // Commit all writer transactions to the database. self.writer_pool @@ -479,6 +482,7 @@ impl<'conn> MasWriter<'conn> { /// /// - If the database writer connection pool had an error. #[allow(clippy::missing_panics_doc)] // not a real panic + #[tracing::instrument(skip_all, level = Level::DEBUG)] pub async fn write_users(&mut self, users: Vec) -> Result<(), Error> { self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement @@ -536,6 +540,7 @@ impl<'conn> MasWriter<'conn> { /// /// - If the database writer connection pool had an error. #[allow(clippy::missing_panics_doc)] // not a real panic + #[tracing::instrument(skip_all, level = Level::DEBUG)] pub async fn write_passwords( &mut self, passwords: Vec, diff --git a/crates/syn2mas/src/mas_writer/checks.rs b/crates/syn2mas/src/mas_writer/checks.rs index f12714d19..10678a1a3 100644 --- a/crates/syn2mas/src/mas_writer/checks.rs +++ b/crates/syn2mas/src/mas_writer/checks.rs @@ -39,6 +39,7 @@ pub enum Error { /// - If any database access error occurs. /// - If any MAS tables involved in the migration are not empty. /// - If we can't check whether syn2mas is already in progress on this database or not. +#[tracing::instrument(skip_all)] pub async fn mas_pre_migration_checks<'a>( mas_connection: &mut LockedMasDatabase<'a>, ) -> Result<(), Error> { From 78623e76857747f09d196e7a32a2dc43dbd77a34 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 10 Dec 2024 15:41:19 +0000 Subject: [PATCH 04/10] Use rng trait --- crates/syn2mas/src/migration.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 25035fd3a..267d371df 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -9,7 +9,7 @@ use std::{collections::HashMap, pin::pin}; use chrono::{DateTime, Utc}; use compact_str::CompactString; use futures_util::StreamExt as _; -use rand::rngs::ThreadRng; +use rand::RngCore; use thiserror::Error; use thiserror_ext::ContextInto; use ulid::Ulid; @@ -61,7 +61,7 @@ pub async fn migrate( synapse: &mut SynapseReader<'_>, mas: &mut MasWriter<'_>, server_name: &str, - rng: &mut ThreadRng, + rng: &mut impl RngCore, ) -> Result<(), Error> { let counts = synapse.count_rows().await.into_synapse("counting users")?; @@ -85,7 +85,7 @@ async fn migrate_users( mas: &mut MasWriter<'_>, user_count_hint: usize, server_name: &str, - rng: &mut ThreadRng, + rng: &mut impl RngCore, ) -> Result { let mut write_buffer = MasUserWriteBuffer::new(mas); let mut users_stream = pin!(synapse.read_users()); @@ -124,7 +124,7 @@ async fn migrate_users( fn transform_user( user: &SynapseUser, server_name: &str, - rng: &mut ThreadRng, + rng: &mut impl RngCore, ) -> Result<(MasNewUser, Option), Error> { let username = user .name From f155227b90162b2fd772d752dbbc5fe2a9179893 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 10 Dec 2024 17:44:49 +0000 Subject: [PATCH 05/10] mod.rs --- crates/syn2mas/src/{mas_writer.rs => mas_writer/mod.rs} | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) rename crates/syn2mas/src/{mas_writer.rs => mas_writer/mod.rs} (99%) diff --git a/crates/syn2mas/src/mas_writer.rs b/crates/syn2mas/src/mas_writer/mod.rs similarity index 99% rename from crates/syn2mas/src/mas_writer.rs rename to crates/syn2mas/src/mas_writer/mod.rs index 1cf170a11..2fffe3eb6 100644 --- a/crates/syn2mas/src/mas_writer.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -295,7 +295,7 @@ impl<'conn> MasWriter<'conn> { info!("Starting new syn2mas migration"); conn.as_mut() - .execute_many(include_str!("mas_writer/syn2mas_temporary_tables.sql")) + .execute_many(include_str!("syn2mas_temporary_tables.sql")) // We don't care about any query results .try_collect::>() .await @@ -453,9 +453,7 @@ impl<'conn> MasWriter<'conn> { self.conn .as_mut() - .execute_many(include_str!( - "mas_writer/syn2mas_revert_temporary_tables.sql" - )) + .execute_many(include_str!("syn2mas_revert_temporary_tables.sql")) // We don't care about any query results .try_collect::>() .await From 64f5c65b5e329c72e6960639c6a030984431f4d7 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 10 Dec 2024 17:52:09 +0000 Subject: [PATCH 06/10] Array of tables --- ...2878ee329ca72070d849eb61ac9c8f9d1c76.json} | 8 +++--- crates/syn2mas/src/mas_writer/mod.rs | 25 +++++++++++++------ 2 files changed, 22 insertions(+), 11 deletions(-) rename crates/syn2mas/.sqlx/{query-939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20.json => query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json} (57%) diff --git a/crates/syn2mas/.sqlx/query-939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20.json b/crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json similarity index 57% rename from crates/syn2mas/.sqlx/query-939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20.json rename to crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json index 9a98edf47..df1f3fb7c 100644 --- a/crates/syn2mas/.sqlx/query-939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20.json +++ b/crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema\n AND tablename IN ('syn2mas_restore_constraints', 'syn2mas_restore_indices')\n ", + "query": "\n SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema\n AND tablename = ANY($1)\n ", "describe": { "columns": [ { @@ -10,11 +10,13 @@ } ], "parameters": { - "Left": [] + "Left": [ + "NameArray" + ] }, "nullable": [ null ] }, - "hash": "939abc131d941f14eb4ad6358b1397e2449786972fa514905773de2dd501bd20" + "hash": "b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76" } diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index 2fffe3eb6..58ef78e90 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -219,24 +219,33 @@ pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &["users", "user_passwords /// - If some, but not all, syn2mas restoration tables are present. /// (This shouldn't be possible without syn2mas having been sabotaged!) pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result { - // Check if there is a resumption table... + // Names of tables used for syn2mas resumption + // Must be `String`s, not just `&str`, for the query. + let restore_table_names = vec![ + "syn2mas_restore_constraints".to_owned(), + "syn2mas_restore_indices".to_owned(), + ]; + let num_resumption_tables = query!( r#" SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema - AND tablename IN ('syn2mas_restore_constraints', 'syn2mas_restore_indices') - "# + AND tablename = ANY($1) + "#, + &restore_table_names, ) .fetch_all(conn.as_mut()) .await .into_database("failed to query count of resumption tables")? .len(); - match num_resumption_tables { - 0 => Ok(false), - 2 => Ok(true), - _other => Err(Error::inconsistent( + if num_resumption_tables == 0 { + Ok(false) + } else if num_resumption_tables == restore_table_names.len() { + Ok(true) + } else { + Err(Error::inconsistent( "some, but not all, syn2mas resumption tables were found", - )), + )) } } From b9a069ae931ef3aaca5c14c4d1142da9a2aa40ef Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 11 Dec 2024 17:36:18 +0000 Subject: [PATCH 07/10] Copyright headers --- crates/syn2mas/src/checks.rs | 5 +++++ crates/syn2mas/src/lib.rs | 5 +++++ crates/syn2mas/src/mas_writer/checks.rs | 5 +++++ crates/syn2mas/src/mas_writer/constraint_pausing.rs | 5 +++++ crates/syn2mas/src/mas_writer/locking.rs | 5 +++++ crates/syn2mas/src/mas_writer/mod.rs | 5 +++++ crates/syn2mas/src/migration.rs | 5 +++++ crates/syn2mas/src/synapse_reader.rs | 5 +++++ 8 files changed, 40 insertions(+) diff --git a/crates/syn2mas/src/checks.rs b/crates/syn2mas/src/checks.rs index fae7e1a8c..5f56ce982 100644 --- a/crates/syn2mas/src/checks.rs +++ b/crates/syn2mas/src/checks.rs @@ -1,3 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + //! # Checks //! //! This module provides safety checks to run against a Synapse database before running the Synapse-to-MAS migration. diff --git a/crates/syn2mas/src/lib.rs b/crates/syn2mas/src/lib.rs index afba21a5a..ec5455f7d 100644 --- a/crates/syn2mas/src/lib.rs +++ b/crates/syn2mas/src/lib.rs @@ -1,3 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + mod mas_writer; mod synapse_reader; diff --git a/crates/syn2mas/src/mas_writer/checks.rs b/crates/syn2mas/src/mas_writer/checks.rs index 10678a1a3..85a6e74a9 100644 --- a/crates/syn2mas/src/mas_writer/checks.rs +++ b/crates/syn2mas/src/mas_writer/checks.rs @@ -1,3 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + //! # MAS Database Checks //! //! This module provides safety checks to run against a MAS database before running the Synapse-to-MAS migration. diff --git a/crates/syn2mas/src/mas_writer/constraint_pausing.rs b/crates/syn2mas/src/mas_writer/constraint_pausing.rs index a8ec914c3..6a420888f 100644 --- a/crates/syn2mas/src/mas_writer/constraint_pausing.rs +++ b/crates/syn2mas/src/mas_writer/constraint_pausing.rs @@ -1,3 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + use sqlx::PgConnection; use tracing::debug; diff --git a/crates/syn2mas/src/mas_writer/locking.rs b/crates/syn2mas/src/mas_writer/locking.rs index 6aab157fa..147bad4a6 100644 --- a/crates/syn2mas/src/mas_writer/locking.rs +++ b/crates/syn2mas/src/mas_writer/locking.rs @@ -1,3 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + use std::sync::LazyLock; use sqlx::{ diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index 58ef78e90..fe950592c 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -1,3 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + //! # MAS Writer //! //! This module is responsible for writing new records to MAS' database. diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 267d371df..59afa73d9 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -1,3 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + //! # Migration //! //! This module provides the high-level logic for performing the Synapse-to-MAS database migration. diff --git a/crates/syn2mas/src/synapse_reader.rs b/crates/syn2mas/src/synapse_reader.rs index f9a0958bc..39f52fe54 100644 --- a/crates/syn2mas/src/synapse_reader.rs +++ b/crates/syn2mas/src/synapse_reader.rs @@ -1,3 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + //! # Synapse Database Reader //! //! This module provides facilities for streaming relevant types of database records from a Synapse database. From 23ce5ed5daa9c092fb3f04a695fad51729f115ef Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 11 Dec 2024 17:46:15 +0000 Subject: [PATCH 08/10] Hardcode `read_users` query for simplicity --- Cargo.lock | 1 - crates/syn2mas/Cargo.toml | 1 - crates/syn2mas/src/synapse_reader.rs | 45 ++++++++-------------------- 3 files changed, 12 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d5bff614c..e75394012 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6142,7 +6142,6 @@ name = "syn2mas" version = "0.12.0" dependencies = [ "anyhow", - "async-stream", "async-trait", "chrono", "compact_str", diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml index 03f1beb2b..bcf9aba0b 100644 --- a/crates/syn2mas/Cargo.toml +++ b/crates/syn2mas/Cargo.toml @@ -15,7 +15,6 @@ anyhow.workspace = true thiserror.workspace = true thiserror-ext.workspace = true -async-stream.workspace = true async-trait.workspace = true tokio.workspace = true sqlx.workspace = true diff --git a/crates/syn2mas/src/synapse_reader.rs b/crates/syn2mas/src/synapse_reader.rs index 39f52fe54..7cc87881f 100644 --- a/crates/syn2mas/src/synapse_reader.rs +++ b/crates/syn2mas/src/synapse_reader.rs @@ -7,9 +7,8 @@ //! //! This module provides facilities for streaming relevant types of database records from a Synapse database. -use async_stream::stream; use chrono::{DateTime, Utc}; -use futures_util::Stream; +use futures_util::{Stream, TryStreamExt}; use sea_query::{enum_def, Expr, Iden, PostgresQueryBuilder, Query}; use sea_query_binder::SqlxBinder; use sqlx::{query, query_with, FromRow, PgConnection, Postgres, Row, Type}; @@ -259,37 +258,17 @@ impl<'conn> SynapseReader<'conn> { } /// Reads Synapse users, excluding application service users (which do not need to be migrated), from the database. - pub fn read_users<'a, 'ret>( - &'a mut self, - ) -> impl Stream> + 'ret - where - 'conn: 'a, - 'a: 'ret, - { - // TODO no need for query builder here - let (sql, args) = Query::select() - .columns([ - SynapseUserIden::Name, - SynapseUserIden::PasswordHash, - SynapseUserIden::Admin, - SynapseUserIden::Deactivated, - SynapseUserIden::CreationTs, - ]) - .and_where(Expr::col(ExtraSynapseUserIden::AppserviceId).is_null()) - // TODO support migrating at least skeleton records for guests - .and_where(Expr::col(ExtraSynapseUserIden::IsGuest).eq(0)) - .from(SynapseUserIden::Table) - .build_sqlx(PostgresQueryBuilder); - - let conn = &mut *self.conn; - - // The async stream macro works around an issue where the QueryAs output stream borrows the SQL. - // See: https://github.com/launchbadge/sqlx/issues/1594#issuecomment-1493146479 - stream! { - for await row in sqlx::query_as_with::<_, SynapseUser, _>(&sql, args).fetch(conn) { - yield row.into_database("reading Synapse users"); - } - } + pub fn read_users(&mut self) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + name, password_hash, admin, deactivated, creation_ts + FROM users + WHERE appservice_id IS NULL AND is_guest = 0 + ", + ) + .fetch(&mut *self.conn) + .map_err(|err| err.into_database("reading Synapse users")) } } From 32fda584fce2ce84138277665a8f82e4ceb6db94 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 11 Dec 2024 17:49:56 +0000 Subject: [PATCH 09/10] Hardcode `count_rows` query for simplicity --- Cargo.lock | 2 -- crates/syn2mas/Cargo.toml | 2 -- crates/syn2mas/src/synapse_reader.rs | 36 ++++++++++------------------ 3 files changed, 12 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e75394012..bbed45f42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6154,8 +6154,6 @@ dependencies = [ "opentelemetry-semantic-conventions", "rand", "rand_chacha", - "sea-query", - "sea-query-binder", "serde", "serde_json", "sqlx", diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml index bcf9aba0b..443ccc2a5 100644 --- a/crates/syn2mas/Cargo.toml +++ b/crates/syn2mas/Cargo.toml @@ -18,8 +18,6 @@ thiserror-ext.workspace = true async-trait.workspace = true tokio.workspace = true sqlx.workspace = true -sea-query.workspace = true -sea-query-binder.workspace = true chrono.workspace = true compact_str.workspace = true serde.workspace = true diff --git a/crates/syn2mas/src/synapse_reader.rs b/crates/syn2mas/src/synapse_reader.rs index 7cc87881f..8035b0e62 100644 --- a/crates/syn2mas/src/synapse_reader.rs +++ b/crates/syn2mas/src/synapse_reader.rs @@ -9,9 +9,7 @@ use chrono::{DateTime, Utc}; use futures_util::{Stream, TryStreamExt}; -use sea_query::{enum_def, Expr, Iden, PostgresQueryBuilder, Query}; -use sea_query_binder::SqlxBinder; -use sqlx::{query, query_with, FromRow, PgConnection, Postgres, Row, Type}; +use sqlx::{query, FromRow, PgConnection, Postgres, Row, Type}; use thiserror::Error; use thiserror_ext::ContextInto; @@ -136,7 +134,6 @@ impl sqlx::Type for SecondsTimestamp { } #[derive(Clone, Debug, FromRow)] -#[enum_def(table_name = "users")] pub struct SynapseUser { /// Full User ID of the user pub name: FullUserId, @@ -153,12 +150,6 @@ pub struct SynapseUser { // TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts) } -#[derive(Iden)] -pub enum ExtraSynapseUserIden { - AppserviceId, - IsGuest, -} - /// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on. /// /// This is a safety measure against other processes changing the data underneath our feet. @@ -239,20 +230,17 @@ impl<'conn> SynapseReader<'conn> { /// /// - An underlying database error pub async fn count_rows(&mut self) -> Result { - // TODO no need for query builder here - let (sql, args) = Query::select() - .expr(Expr::val(1).count()) - .and_where(Expr::col(ExtraSynapseUserIden::AppserviceId).is_null()) - // TODO support migrating at least skeleton records for guests - .and_where(Expr::col(ExtraSynapseUserIden::IsGuest).eq(0)) - .from(SynapseUserIden::Table) - .build_sqlx(PostgresQueryBuilder); - let users = query_with(&sql, args) - .fetch_one(&mut *self.conn) - .await - .into_database("counting Synapse users")? - .try_get::(0) - .into_database("couldn't decode count of Synapse users table")?; + let users = sqlx::query( + " + SELECT COUNT(1) FROM users + WHERE appservice_id IS NULL AND is_guest = 0 + ", + ) + .fetch_one(&mut *self.conn) + .await + .into_database("counting Synapse users")? + .try_get::(0) + .into_database("couldn't decode count of Synapse users table")?; Ok(SynapseRowCounts { users }) } From e7172d6078f5edcc076527d0cc8a53c969ff9155 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 11 Dec 2024 17:52:44 +0000 Subject: [PATCH 10/10] Remove unused deps --- Cargo.lock | 12 ------------ crates/syn2mas/Cargo.toml | 15 --------------- 2 files changed, 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbed45f42..13580b66c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6141,28 +6141,16 @@ dependencies = [ name = "syn2mas" version = "0.12.0" dependencies = [ - "anyhow", - "async-trait", "chrono", "compact_str", "futures-util", - "mas-data-model", - "mas-iana", - "mas-jose", - "mas-storage", - "oauth2-types", - "opentelemetry-semantic-conventions", "rand", - "rand_chacha", - "serde", - "serde_json", "sqlx", "thiserror 2.0.3", "thiserror-ext", "tokio", "tracing", "ulid", - "url", "uuid", ] diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml index 443ccc2a5..1b9c051d8 100644 --- a/crates/syn2mas/Cargo.toml +++ b/crates/syn2mas/Cargo.toml @@ -10,33 +10,18 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# TODO remove anything unnecessary! :-) -anyhow.workspace = true thiserror.workspace = true thiserror-ext.workspace = true - -async-trait.workspace = true tokio.workspace = true sqlx.workspace = true chrono.workspace = true compact_str.workspace = true -serde.workspace = true -serde_json.workspace = true tracing.workspace = true futures-util = "0.3.30" -opentelemetry-semantic-conventions.workspace = true rand.workspace = true -rand_chacha = "0.3.1" -url.workspace = true uuid = "1.10.0" ulid = { workspace = true, features = ["uuid"] } -oauth2-types.workspace = true -mas-storage.workspace = true -mas-data-model.workspace = true -mas-iana.workspace = true -mas-jose.workspace = true - [lints] workspace = true