diff --git a/Cargo.lock b/Cargo.lock index 47021c250540f..ff81905491270 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4581,6 +4581,7 @@ dependencies = [ "fail", "futures", "governor", + "hex", "http 1.1.0", "ipnet", "itertools 0.12.1", @@ -4609,6 +4610,7 @@ dependencies = [ "mz-pgrepr", "mz-pgwire-common", "mz-postgres-util", + "mz-proto", "mz-repr", "mz-rocksdb-types", "mz-secrets", @@ -4624,6 +4626,7 @@ dependencies = [ "mz-transform", "opentelemetry", "prometheus", + "prost", "qcell", "rand 0.8.5", "rand_chacha 0.3.0", diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index e9988ef3c3772..53575be5a2338 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -627,6 +627,17 @@ steps: agents: queue: hetzner-aarch64-4cpu-8gb + - id: mysql-cdc-migration + label: MySQL CDC source-versioning migration tests + depends_on: build-aarch64 + timeout_in_minutes: 360 + plugins: + - ./ci/plugins/mzcompose: + composition: mysql-cdc-old-syntax + run: migration + agents: + queue: hetzner-aarch64-4cpu-8gb + - id: mysql-cdc-resumption-old-syntax label: MySQL CDC resumption tests (before source versioning) depends_on: build-aarch64 @@ -658,6 +669,17 @@ steps: queue: hetzner-aarch64-4cpu-8gb # the mzbuild postgres version will be used, which depends on the Dockerfile specification + - id: pg-cdc-migration + label: Postgres CDC source-versioning migration tests + depends_on: build-aarch64 + timeout_in_minutes: 360 + plugins: + - ./ci/plugins/mzcompose: + composition: pg-cdc-old-syntax + run: migration + agents: + queue: hetzner-aarch64-4cpu-8gb + - id: pg-cdc-resumption-old-syntax label: Postgres CDC resumption tests (before source versioning) depends_on: build-aarch64 @@ -689,6 +711,17 @@ steps: agents: queue: hetzner-aarch64-8cpu-16gb + - id: testdrive-kafka-migration + label: "Testdrive %N migration tests" + depends_on: build-aarch64 + timeout_in_minutes: 180 + parallelism: 8 + plugins: + - ./ci/plugins/mzcompose: + composition: testdrive-old-kafka-src-syntax + run: migration + agents: + queue: hetzner-aarch64-8cpu-16gb - group: AWS key: aws diff --git a/misc/python/materialize/checks/all_checks/upsert.py b/misc/python/materialize/checks/all_checks/upsert.py index dd7f92caf831e..543478f0a0fe8 100644 --- a/misc/python/materialize/checks/all_checks/upsert.py +++ b/misc/python/materialize/checks/all_checks/upsert.py @@ -164,3 +164,59 @@ def validate(self) -> Testdrive: """ ) ) + + +class UpsertLegacy(Check): + """ + An upsert source test that uses the legacy syntax to create the source + on all versions to ensure the source is properly migrated with the + ActivateSourceVersioningMigration scenario + """ + + def initialize(self) -> Testdrive: + return Testdrive( + schemas() + + dedent( + """ + $ kafka-create-topic topic=upsert-legacy-syntax + + $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000 + {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"} + + > CREATE SOURCE upsert_insert_legacy + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-legacy-syntax-${testdrive.seed}') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE UPSERT + + > CREATE MATERIALIZED VIEW upsert_insert_legacy_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert_legacy; + """ + ) + ) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(schemas() + dedent(s)) + for s in [ + """ + $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000 + {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"} + """, + """ + $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000 + {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"} + """, + ] + ] + + def validate(self) -> Testdrive: + return Testdrive( + dedent( + """ + > SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert_legacy + 10000 10000 10000 + + > SELECT * FROM upsert_insert_legacy_view; + 10000 + """ + ) + ) diff --git a/misc/python/materialize/checks/scenarios_upgrade.py b/misc/python/materialize/checks/scenarios_upgrade.py index fe9a9bd26c470..5c404c4f0b9be 100644 --- a/misc/python/materialize/checks/scenarios_upgrade.py +++ b/misc/python/materialize/checks/scenarios_upgrade.py @@ -387,3 +387,49 @@ def actions(self) -> list[Action]: ), Validate(self), ] + + +class ActivateSourceVersioningMigration(Scenario): + """ + Starts MZ, initializes and manipulates, then forces the migration + of sources to the new table model (introducing Source Versioning). + """ + + def base_version(self) -> MzVersion: + return get_last_version() + + def actions(self) -> list[Action]: + print(f"Upgrading from tag {self.base_version()}") + return [ + StartMz( + self, + tag=self.base_version(), + ), + Initialize(self), + Manipulate(self, phase=1), + KillMz( + capture_logs=True + ), # We always use True here otherwise docker-compose will lose the pre-upgrade logs + StartMz( + self, + tag=None, + # Activate the `force_source_table_syntax` flag + # which should trigger the migration of sources + # using the old syntax to the new table model. + additional_system_parameter_defaults={ + "force_source_table_syntax": "true", + }, + ), + Manipulate(self, phase=2), + Validate(self), + # A second restart while already on the new version + KillMz(capture_logs=True), + StartMz( + self, + tag=None, + additional_system_parameter_defaults={ + "force_source_table_syntax": "true", + }, + ), + Validate(self), + ] diff --git a/misc/python/materialize/cli/ci_annotate_errors.py b/misc/python/materialize/cli/ci_annotate_errors.py index 53b7fc82d272b..84865cbbcaff4 100644 --- a/misc/python/materialize/cli/ci_annotate_errors.py +++ b/misc/python/materialize/cli/ci_annotate_errors.py @@ -101,6 +101,8 @@ | (FAIL|TIMEOUT)\s+\[\s*\d+\.\d+s\] # parallel-workload | worker_.*\ still\ running: [\s\S]* Threads\ have\ not\ stopped\ within\ 5\ minutes,\ exiting\ hard + # source-table migration + | source-table-migration\ issue ) .* $ """, diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index bba376d4c250e..de6bc52529c22 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -53,7 +53,9 @@ def get_default_system_parameters( - version: MzVersion | None = None, zero_downtime: bool = False + version: MzVersion | None = None, + zero_downtime: bool = False, + force_source_table_syntax: bool = False, ) -> dict[str, str]: """For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher) @@ -89,7 +91,7 @@ def get_default_system_parameters( "enable_0dt_deployment": "true" if zero_downtime else "false", "enable_0dt_deployment_panic_after_timeout": "true", "enable_0dt_deployment_sources": ( - "true" if version >= MzVersion.parse_mz("v0.125.0-dev") else "false" + "true" if version >= MzVersion.parse_mz("v0.130.0-dev") else "false" ), "enable_alter_swap": "true", "enable_columnation_lgalloc": "true", @@ -125,6 +127,7 @@ def get_default_system_parameters( "persist_record_schema_id": ( "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false" ), + "force_source_table_syntax": "true" if force_source_table_syntax else "false", "persist_batch_columnar_format": "both_v2", "persist_batch_delete_enabled": "true", "persist_batch_structured_order": "true", diff --git a/misc/python/materialize/source_table_migration.py b/misc/python/materialize/source_table_migration.py new file mode 100644 index 0000000000000..45add7e76fd73 --- /dev/null +++ b/misc/python/materialize/source_table_migration.py @@ -0,0 +1,71 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +"""Utilities for testing the source table migration""" +from materialize.mz_version import MzVersion +from materialize.mzcompose.composition import Composition + + +def verify_sources_after_source_table_migration( + c: Composition, file: str, fail: bool = False +) -> None: + source_names_rows = c.sql_query( + "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';" + ) + source_names = [row[0] for row in source_names_rows] + + print(f"Sources created in {file} are: {source_names}") + + c.sql("SET statement_timeout = '20s'") + + for source_name in source_names: + _verify_source(c, file, source_name, fail=fail) + + +def _verify_source( + c: Composition, file: str, source_name: str, fail: bool = False +) -> None: + try: + print(f"Checking source: {source_name}") + + # must not crash + statement = f"SELECT count(*) FROM {source_name};" + print(statement) + c.sql_query(statement) + + statement = f"SHOW CREATE SOURCE {source_name};" + print(statement) + result = c.sql_query(statement) + sql = result[0][1] + assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}" + assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}" + + if not source_name.endswith("_progress"): + assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}" + + print("OK.") + except Exception as e: + print(f"source-table-migration issue in {file}: {str(e)}") + + if fail: + raise e + + +def check_source_table_migration_test_sensible() -> None: + assert MzVersion.parse_cargo() < MzVersion.parse_mz( + "v0.137.0" + ), "migration test probably no longer needed" + + +def get_old_image_for_source_table_migration_test() -> str: + return "materialize/materialized:v0.129.0" + + +def get_new_image_for_source_table_migration_test() -> str | None: + return None diff --git a/src/adapter/BUILD.bazel b/src/adapter/BUILD.bazel index 65920e5ef9811..f833c66a2fd83 100644 --- a/src/adapter/BUILD.bazel +++ b/src/adapter/BUILD.bazel @@ -53,6 +53,7 @@ rust_library( "//src/pgrepr:mz_pgrepr", "//src/pgwire-common:mz_pgwire_common", "//src/postgres-util:mz_postgres_util", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/rocksdb-types:mz_rocksdb_types", "//src/secrets:mz_secrets", @@ -119,6 +120,7 @@ rust_test( "//src/pgrepr:mz_pgrepr", "//src/pgwire-common:mz_pgwire_common", "//src/postgres-util:mz_postgres_util", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/rocksdb-types:mz_rocksdb_types", "//src/secrets:mz_secrets", @@ -165,6 +167,7 @@ rust_doc_test( "//src/pgrepr:mz_pgrepr", "//src/pgwire-common:mz_pgwire_common", "//src/postgres-util:mz_postgres_util", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/rocksdb-types:mz_rocksdb_types", "//src/secrets:mz_secrets", @@ -231,6 +234,7 @@ rust_test( "//src/pgrepr:mz_pgrepr", "//src/pgwire-common:mz_pgwire_common", "//src/postgres-util:mz_postgres_util", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/rocksdb-types:mz_rocksdb_types", "//src/secrets:mz_secrets", @@ -297,6 +301,7 @@ rust_test( "//src/pgrepr:mz_pgrepr", "//src/pgwire-common:mz_pgwire_common", "//src/postgres-util:mz_postgres_util", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/rocksdb-types:mz_rocksdb_types", "//src/secrets:mz_secrets", @@ -363,6 +368,7 @@ rust_test( "//src/pgrepr:mz_pgrepr", "//src/pgwire-common:mz_pgwire_common", "//src/postgres-util:mz_postgres_util", + "//src/proto:mz_proto", "//src/repr:mz_repr", "//src/rocksdb-types:mz_rocksdb_types", "//src/secrets:mz_secrets", diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index a4298b2fe1703..d84638fa3f525 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -23,6 +23,7 @@ enum-kinds = "0.5.1" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" governor = "0.6.0" +hex = "0.4.3" http = "1.1.0" ipnet = "2.5.0" itertools = "0.12.1" @@ -53,6 +54,7 @@ mz-pgcopy = { path = "../pgcopy" } mz-pgrepr = { path = "../pgrepr" } mz-pgwire-common = { path = "../pgwire-common" } mz-postgres-util = { path = "../postgres-util" } +mz-proto = { path = "../proto" } mz-repr = { path = "../repr", features = ["tracing_"] } mz-rocksdb-types = { path = "../rocksdb-types" } mz-secrets = { path = "../secrets" } @@ -68,6 +70,7 @@ mz-transform = { path = "../transform" } mz-timestamp-oracle = { path = "../timestamp-oracle" } opentelemetry = { version = "0.24.0", features = ["trace"] } prometheus = { version = "0.13.3", default-features = false } +prost = { version = "0.13.2", features = ["no-recursion-limit"] } qcell = "0.5" rand = "0.8.5" rand_chacha = "0.3" diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs index 9db55bc11d605..cd6f3fea66a74 100644 --- a/src/adapter/src/catalog/apply.rs +++ b/src/adapter/src/catalog/apply.rs @@ -25,7 +25,7 @@ use mz_catalog::builtin::{ use mz_catalog::durable::objects::{ ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleKey, SchemaKey, }; -use mz_catalog::durable::{CatalogError, DurableCatalogError, SystemObjectMapping}; +use mz_catalog::durable::{CatalogError, SystemObjectMapping}; use mz_catalog::memory::error::{Error, ErrorKind}; use mz_catalog::memory::objects::{ CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log, @@ -54,7 +54,7 @@ use mz_sql::session::vars::{VarError, VarInput}; use mz_sql::{plan, rbac}; use mz_sql_parser::ast::Expr; use mz_storage_types::sources::Timeline; -use tracing::{error, info_span, warn, Instrument}; +use tracing::{info_span, warn, Instrument}; use crate::catalog::state::LocalExpressionCache; use crate::catalog::{BuiltinTableUpdate, CatalogState}; @@ -1029,17 +1029,7 @@ impl CatalogState { } } }; - // We allow sinks to break this invariant due to a know issue with `ALTER SINK`. - // https://github.com/MaterializeInc/materialize/pull/28708. - if !entry.is_sink() && entry.uses().iter().any(|id| *id > entry.id) { - let msg = format!( - "item cannot depend on items with larger GlobalIds, item: {:?}, dependencies: {:?}", - entry, - entry.uses() - ); - error!("internal catalog errr: {msg}"); - return Err(CatalogError::Durable(DurableCatalogError::Internal(msg))); - } + self.insert_entry(entry); } StateDiff::Retraction => { @@ -1894,25 +1884,68 @@ fn sort_updates_inner(updates: Vec) -> Vec { } } - /// Sort item updates by [`CatalogItemId`]. + /// Sort item updates by parsing statements to identify any id-based dependencies within + /// this set of updates and then performing a topological sort. fn sort_item_updates( item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>, ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> { - item_updates - .into_iter() - // HACK: due to `ALTER SINK`, sinks can appear before the objects they - // depend upon. Fortunately, because sinks can never have dependencies - // and can never depend upon one another, to fix the topological sort, - // we can just always move sinks to the end. - .sorted_by_key(|(item, _ts, _diff)| { - if item.create_sql.starts_with("CREATE SINK") { - CatalogItemId::User(u64::MAX) - } else { - item.id - } - }) + // Partition items into groups s.t. each item in one group has a predefined order with all + // items in other groups. For example, all sinks are ordered greater than all tables. + let mut types = Vec::new(); + // N.B. Functions can depend on system tables, but not user tables. + let mut funcs = Vec::new(); + let mut secrets = Vec::new(); + let mut connections = Vec::new(); + let mut sources = Vec::new(); + let mut tables = Vec::new(); + let mut derived_items = Vec::new(); + let mut sinks = Vec::new(); + let mut continual_tasks = Vec::new(); + + for update in item_updates { + match update.0.item_type() { + CatalogItemType::Type => types.push(update), + CatalogItemType::Func => funcs.push(update), + CatalogItemType::Secret => secrets.push(update), + CatalogItemType::Connection => connections.push(update), + CatalogItemType::Source => sources.push(update), + CatalogItemType::Table => tables.push(update), + CatalogItemType::View + | CatalogItemType::MaterializedView + | CatalogItemType::Index => derived_items.push(update), + CatalogItemType::Sink => sinks.push(update), + CatalogItemType::ContinualTask => continual_tasks.push(update), + } + } + + // Within each group, sort by ID. + for group in [ + &mut types, + &mut funcs, + &mut secrets, + &mut connections, + &mut sources, + &mut tables, + &mut derived_items, + &mut sinks, + &mut continual_tasks, + ] { + group.sort_by_key(|(item, _, _)| item.id); + } + + iter::empty() + .chain(types) + .chain(funcs) + .chain(secrets) + .chain(connections) + .chain(sources) + .chain(tables) + .chain(derived_items) + .chain(sinks) + .chain(continual_tasks) .collect() } + let item_retractions = sort_item_updates(item_retractions); let item_additions = sort_item_updates(item_additions); @@ -1920,16 +1953,60 @@ fn sort_updates_inner(updates: Vec) -> Vec { fn sort_temp_item_updates( temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>, ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> { - temp_item_updates - .into_iter() - // HACK: due to `ALTER SINK`, sinks can appear before the objects they - // depend upon. Fortunately, because sinks can never have dependencies - // and can never depend upon one another, to fix the topological sort, - // we can just always move sinks to the end. - .sorted_by_key(|(item, _ts, _diff)| match item.item.typ() { - CatalogItemType::Sink => CatalogItemId::User(u64::MAX), - _ => item.id, - }) + // Partition items into groups s.t. each item in one group has a predefined order with all + // items in other groups. For example, all sinks are ordered greater than all tables. + let mut types = Vec::new(); + // N.B. Functions can depend on system tables, but not user tables. + let mut funcs = Vec::new(); + let mut secrets = Vec::new(); + let mut connections = Vec::new(); + let mut sources = Vec::new(); + let mut tables = Vec::new(); + let mut derived_items = Vec::new(); + let mut sinks = Vec::new(); + let mut continual_tasks = Vec::new(); + + for update in temp_item_updates { + match update.0.item.typ() { + CatalogItemType::Type => types.push(update), + CatalogItemType::Func => funcs.push(update), + CatalogItemType::Secret => secrets.push(update), + CatalogItemType::Connection => connections.push(update), + CatalogItemType::Source => sources.push(update), + CatalogItemType::Table => tables.push(update), + CatalogItemType::View + | CatalogItemType::MaterializedView + | CatalogItemType::Index => derived_items.push(update), + CatalogItemType::Sink => sinks.push(update), + CatalogItemType::ContinualTask => continual_tasks.push(update), + } + } + + // Within each group, sort by ID. + for group in [ + &mut types, + &mut funcs, + &mut secrets, + &mut connections, + &mut sources, + &mut tables, + &mut derived_items, + &mut sinks, + &mut continual_tasks, + ] { + group.sort_by_key(|(item, _, _)| item.id); + } + + iter::empty() + .chain(types) + .chain(funcs) + .chain(secrets) + .chain(connections) + .chain(sources) + .chain(tables) + .chain(derived_items) + .chain(sinks) + .chain(continual_tasks) .collect() } let temp_item_retractions = sort_temp_item_updates(temp_item_retractions); diff --git a/src/adapter/src/catalog/migrate.rs b/src/adapter/src/catalog/migrate.rs index c62d768a838e6..4c3e2668b9b4f 100644 --- a/src/adapter/src/catalog/migrate.rs +++ b/src/adapter/src/catalog/migrate.rs @@ -11,15 +11,17 @@ use std::collections::BTreeMap; use mz_catalog::builtin::BuiltinTable; use mz_catalog::durable::Transaction; -use mz_catalog::memory::objects::StateUpdate; +use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate}; use mz_ore::collections::CollectionExt; use mz_ore::now::NowFn; -use mz_repr::{CatalogItemId, Timestamp}; +use mz_repr::{CatalogItemId, GlobalId, Timestamp}; use mz_sql::ast::display::AstDisplay; +use mz_sql::names::FullItemName; use mz_sql_parser::ast::{Raw, Statement}; use semver::Version; use tracing::info; use uuid::Uuid; + // DO NOT add any more imports from `crate` outside of `crate::catalog`. use crate::catalog::open::into_consolidatable_updates_startup; use crate::catalog::state::LocalExpressionCache; @@ -75,6 +77,11 @@ where Ok(()) } +pub(crate) struct MigrateResult { + pub(crate) builtin_table_updates: Vec>, + pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, i64)>, +} + /// Migrates all user items and loads them into `state`. /// /// Returns the builtin updates corresponding to all user items. @@ -83,9 +90,9 @@ pub(crate) async fn migrate( tx: &mut Transaction<'_>, local_expr_cache: &mut LocalExpressionCache, item_updates: Vec, - _now: NowFn, + now: NowFn, _boot_ts: Timestamp, -) -> Result>, anyhow::Error> { +) -> Result { let catalog_version = tx.get_catalog_content_version(); let catalog_version = match catalog_version { Some(v) => Version::parse(&v)?, @@ -97,6 +104,12 @@ pub(crate) async fn migrate( catalog_version ); + // Special block for `ast_rewrite_sources_to_tables` migration + // since it requires a feature flag needs to update multiple AST items at once. + if state.system_config().force_source_table_syntax() { + ast_rewrite_sources_to_tables(tx, now)?; + } + rewrite_ast_items(tx, |_tx, _id, _stmt| { // Add per-item AST migrations below. // @@ -118,6 +131,18 @@ pub(crate) async fn migrate( let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts); item_updates.extend(op_item_updates); differential_dataflow::consolidation::consolidate_updates(&mut item_updates); + + // Since some migrations might introduce non-item 'post-item' updates, we sequester those + // so they can be applied with other post-item updates after migrations to avoid + // accumulating negative diffs. + let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates + .into_iter() + // The only post-item update kind we currently generate is to + // update storage collection metadata. + .partition(|(kind, _, _)| { + matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_)) + }); + let item_updates = item_updates .into_iter() .map(|(kind, ts, diff)| StateUpdate { @@ -169,7 +194,10 @@ pub(crate) async fn migrate( "migration from catalog version {:?} complete", catalog_version ); - Ok(ast_builtin_table_updates) + Ok(MigrateResult { + builtin_table_updates: ast_builtin_table_updates, + post_item_updates, + }) } // Add new migrations below their appropriate heading, and precede them with a @@ -182,6 +210,575 @@ pub(crate) async fn migrate( // Please include the adapter team on any code reviews that add or edit // migrations. +/// Migrates all sources to use the new sources as tables model +/// +/// First we migrate existing `CREATE SUBSOURCE` statements, turning them into +/// `CREATE TABLE .. FROM SOURCE` statements. This covers existing Postgres, +/// MySQL, and multi-output (tpch, auction, marketing) load-generator subsources. +/// +/// Second we migrate existing `CREATE SOURCE` statements for these multi-output +/// sources to remove any subsource-specific options (e.g. TEXT COLUMNS). +/// +/// Third we migrate existing single-output `CREATE SOURCE` statements. +/// This includes existing Kafka and single-output load-generator +/// subsources. This will generate an additional `CREATE TABLE .. FROM SOURCE` +/// statement that copies over all the export-specific options. This table will use +/// to the existing source statement's persist shard but use a new GlobalID. +/// The original source statement will be updated to remove the export-specific options, +/// renamed to `_source`, and use a new empty shard while keeping its +/// same GlobalId. +/// +fn ast_rewrite_sources_to_tables( + tx: &mut Transaction<'_>, + now: NowFn, +) -> Result<(), anyhow::Error> { + use maplit::btreemap; + use maplit::btreeset; + use mz_persist_types::ShardId; + use mz_proto::RustType; + use mz_sql::ast::{ + CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName, + CreateSubsourceStatement, CreateTableFromSourceStatement, Ident, + KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName, + RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, + UnresolvedItemName, Value, WithOptionValue, + }; + use mz_storage_client::controller::StorageTxn; + use mz_storage_types::sources::load_generator::LoadGeneratorOutput; + use mz_storage_types::sources::SourceExportStatementDetails; + use prost::Message; + + let items_with_statements = tx + .get_items() + .map(|item| { + let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast; + Ok((item, stmt)) + }) + .collect::, anyhow::Error>>()?; + let items_with_statements_copied = items_with_statements.clone(); + + let item_names_per_schema = items_with_statements_copied + .iter() + .map(|(item, _)| (item.schema_id.clone(), &item.name)) + .fold(BTreeMap::new(), |mut acc, (schema_id, name)| { + acc.entry(schema_id) + .or_insert_with(|| btreeset! {}) + .insert(name); + acc + }); + + // Any GlobalId that should be changed to a new GlobalId in any statements that + // reference it. This is necessary for ensuring downstream statements (e.g. + // mat views, indexes) that reference a single-output source (e.g. kafka) + // will now reference the corresponding new table, with the same data, instead. + let mut changed_ids = BTreeMap::new(); + + for (mut item, stmt) in items_with_statements { + match stmt { + // Migrate each `CREATE SUBSOURCE` statement to an equivalent + // `CREATE TABLE .. FROM SOURCE` statement. + Statement::CreateSubsource(CreateSubsourceStatement { + name, + columns, + constraints, + of_source, + if_not_exists, + mut with_options, + }) => { + let raw_source_name = match of_source { + // If `of_source` is None then this is a `progress` subsource which we + // are not migrating as they are not currently relevant to the new table model. + None => continue, + Some(name) => name, + }; + let source = match raw_source_name { + // Some legacy subsources have named-only references to their `of_source` + // so we ensure we always use an ID-based reference in the stored + // `CREATE TABLE .. FROM SOURCE` statements. + RawItemName::Name(name) => { + // Convert the name reference to an ID reference. + let (source_item, _) = items_with_statements_copied + .iter() + .find(|(_, statement)| match statement { + Statement::CreateSource(stmt) => stmt.name == name, + _ => false, + }) + .expect("source must exist"); + RawItemName::Id(source_item.global_id.to_string(), name, None) + } + RawItemName::Id(..) => raw_source_name, + }; + + // The external reference is a `with_option` on subsource statements but is a + // separate field on table statements. + let external_reference = match with_options + .iter() + .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference) + { + Some(i) => match with_options.remove(i).value { + Some(WithOptionValue::UnresolvedItemName(name)) => name, + _ => unreachable!("external reference must be an unresolved item name"), + }, + None => panic!("subsource must have an external reference"), + }; + + let with_options = with_options + .into_iter() + .map(|option| { + match option.name { + CreateSubsourceOptionName::Details => TableFromSourceOption { + name: TableFromSourceOptionName::Details, + // The `details` option on both subsources and tables is identical, using the same + // ProtoSourceExportStatementDetails serialized value. + value: option.value, + }, + CreateSubsourceOptionName::TextColumns => TableFromSourceOption { + name: TableFromSourceOptionName::TextColumns, + value: option.value, + }, + CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption { + name: TableFromSourceOptionName::ExcludeColumns, + value: option.value, + }, + CreateSubsourceOptionName::Progress => { + panic!("progress option should not exist on this subsource") + } + CreateSubsourceOptionName::ExternalReference => { + unreachable!("This option is handled separately above.") + } + } + }) + .collect::>(); + + let table = CreateTableFromSourceStatement { + name, + constraints, + columns: mz_sql::ast::TableFromSourceColumns::Defined(columns), + if_not_exists, + source, + external_reference: Some(external_reference.clone()), + with_options, + // Subsources don't have `envelope`, `include_metadata`, or `format` options. + envelope: None, + include_metadata: vec![], + format: None, + }; + + info!( + "migrate: converted subsource {} to table {}", + item.create_sql, table + ); + item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable(); + tx.update_item(item.id, item)?; + } + + // Postgres sources are multi-output sources whose subsources are + // migrated above. All we need to do is remove the subsource-related + // options from this statement since they are no longer relevant. + Statement::CreateSource(CreateSourceStatement { + connection: + mut conn @ (CreateSourceConnection::Postgres { .. } + | CreateSourceConnection::Yugabyte { .. }), + name, + if_not_exists, + in_cluster, + include_metadata, + format, + envelope, + col_names, + with_options, + key_constraint, + external_references, + progress_subsource, + }) => { + let options = match &mut conn { + CreateSourceConnection::Postgres { options, .. } => options, + CreateSourceConnection::Yugabyte { options, .. } => options, + _ => unreachable!("match determined above"), + }; + // This option storing text columns on the primary source statement is redundant + // with the option on subsource statements so can just be removed. + // This was kept for round-tripping of `CREATE SOURCE` statements that automatically + // generated subsources, which is no longer necessary. + if options + .iter() + .any(|o| matches!(o.name, PgConfigOptionName::TextColumns)) + { + options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns)); + let stmt = Statement::CreateSource(CreateSourceStatement { + connection: conn, + name, + if_not_exists, + in_cluster, + include_metadata, + format, + envelope, + col_names, + with_options, + key_constraint, + external_references, + progress_subsource, + }); + item.create_sql = stmt.to_ast_string_stable(); + tx.update_item(item.id, item)?; + info!("migrate: converted postgres source {stmt} to remove subsource options"); + } + } + // MySQL sources are multi-output sources whose subsources are + // migrated above. All we need to do is remove the subsource-related + // options from this statement since they are no longer relevant. + Statement::CreateSource(CreateSourceStatement { + connection: mut conn @ CreateSourceConnection::MySql { .. }, + name, + if_not_exists, + in_cluster, + include_metadata, + format, + envelope, + col_names, + with_options, + key_constraint, + external_references, + progress_subsource, + .. + }) => { + let options = match &mut conn { + CreateSourceConnection::MySql { options, .. } => options, + _ => unreachable!("match determined above"), + }; + // These options storing text and exclude columns on the primary source statement + // are redundant with the options on subsource statements so can just be removed. + // They was kept for round-tripping of `CREATE SOURCE` statements that automatically + // generated subsources, which is no longer necessary. + if options.iter().any(|o| { + matches!( + o.name, + MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns + ) + }) { + options.retain(|o| { + !matches!( + o.name, + MySqlConfigOptionName::TextColumns + | MySqlConfigOptionName::ExcludeColumns + ) + }); + let stmt = Statement::CreateSource(CreateSourceStatement { + connection: conn, + name, + if_not_exists, + in_cluster, + include_metadata, + format, + envelope, + col_names, + with_options, + key_constraint, + external_references, + progress_subsource, + }); + item.create_sql = stmt.to_ast_string_stable(); + tx.update_item(item.id, item)?; + info!("migrate: converted mysql source {stmt} to remove subsource options"); + } + } + // Multi-output load generator sources whose subsources are already + // migrated above. There is no need to remove any options from this + // statement since they are not export-specific. + Statement::CreateSource(CreateSourceStatement { + connection: + CreateSourceConnection::LoadGenerator { + generator: + LoadGenerator::Auction | LoadGenerator::Marketing | LoadGenerator::Tpch, + .. + }, + .. + }) => {} + // Single-output sources that need to be migrated to tables. These sources currently output + // data to the primary collection of the source statement. We will create a new table + // statement for them and move all export-specific options over from the source statement, + // while moving the `CREATE SOURCE` statement to a new name and moving its shard to the + // new table statement. + Statement::CreateSource(CreateSourceStatement { + connection: + conn @ (CreateSourceConnection::Kafka { .. } + | CreateSourceConnection::LoadGenerator { + generator: + LoadGenerator::Clock + | LoadGenerator::Datums + | LoadGenerator::Counter + | LoadGenerator::KeyValue, + .. + }), + name, + col_names, + include_metadata, + format, + envelope, + with_options, + if_not_exists, + in_cluster, + progress_subsource, + external_references, + key_constraint, + }) => { + // To check if this is a source that has already been migrated we use a basic + // heuristic: if there is at least one existing table for the source, and if + // the envelope/format/include_metadata options are empty, we assume it's + // already been migrated. + let tables_for_source = + items_with_statements_copied + .iter() + .any(|(_, statement)| match statement { + Statement::CreateTableFromSource(stmt) => { + let source: GlobalId = match &stmt.source { + RawItemName::Name(_) => { + unreachable!("tables store source as ID") + } + RawItemName::Id(source_id, _, _) => { + source_id.parse().expect("valid id") + } + }; + source == item.global_id + } + _ => false, + }); + if tables_for_source + && envelope.is_none() + && format.is_none() + && include_metadata.is_empty() + { + info!("migrate: skipping already migrated source: {}", name); + continue; + } + + // Use the current source name as the new table name, and rename the source to + // `_source`. This is intended to allow users to continue using + // queries that reference the source name, since they will now need to query the + // table instead. + + // First find an unused name within the same schema to avoid conflicts. + let mut new_source_item_name = format!("{}_source", item.name); + let mut new_source_name_inner = + format!("{}_source", name.0.last().expect("at least one ident")); + let mut i = 0; + while item_names_per_schema + .get(&item.schema_id) + .expect("schema must exist") + .contains(&new_source_item_name) + { + new_source_item_name = format!("{}_source_{}", item.name, i); + new_source_name_inner = format!( + "{}_source_{}", + name.0.last().expect("at least one ident"), + i + ); + i += 1; + } + // We will use the original item name for the new table item. + let table_item_name = item.name.clone(); + + // Update the source item/statement to use the new name. + let mut new_source_name = name.clone(); + *new_source_name.0.last_mut().expect("at least one ident") = + Ident::new_unchecked(new_source_name_inner); + item.name = new_source_item_name; + + // A reference to the source that will be included in the table statement + let source_ref = + RawItemName::Id(item.global_id.to_string(), new_source_name.clone(), None); + + let columns = if col_names.is_empty() { + TableFromSourceColumns::NotSpecified + } else { + TableFromSourceColumns::Named(col_names) + }; + + // All source tables must have a `details` option, which is a serialized proto + // describing any source-specific details for this table statement. + let details = match &conn { + // For kafka sources this proto is currently empty. + CreateSourceConnection::Kafka { .. } => SourceExportStatementDetails::Kafka {}, + CreateSourceConnection::LoadGenerator { .. } => { + // Since these load generators are single-output we use the default output. + SourceExportStatementDetails::LoadGenerator { + output: LoadGeneratorOutput::Default, + } + } + _ => unreachable!("match determined above"), + }; + let table_with_options = vec![TableFromSourceOption { + name: TableFromSourceOptionName::Details, + value: Some(WithOptionValue::Value(Value::String(hex::encode( + details.into_proto().encode_to_vec(), + )))), + }]; + + // Generate the same external-reference that would have been generated + // during purification for single-output sources. + let external_reference = match &conn { + CreateSourceConnection::Kafka { options, .. } => { + let topic_option = options + .iter() + .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic)) + .expect("kafka sources must have a topic"); + let topic = match &topic_option.value { + Some(WithOptionValue::Value(Value::String(topic))) => topic, + _ => unreachable!("topic must be a string"), + }; + + Some(UnresolvedItemName::qualified(&[Ident::new(topic)?])) + } + CreateSourceConnection::LoadGenerator { generator, .. } => { + // Since these load generators are single-output the external reference + // uses the schema-name for both namespace and name. + let name = FullItemName { + database: mz_sql::names::RawDatabaseSpecifier::Name( + mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME + .to_owned(), + ), + schema: generator.schema_name().to_string(), + item: generator.schema_name().to_string(), + }; + Some(UnresolvedItemName::from(name)) + } + _ => unreachable!("match determined above"), + }; + + // The new table statement, stealing the name and the export-specific fields from + // the create source statement. + let table = CreateTableFromSourceStatement { + name, + constraints: vec![], + columns, + if_not_exists: false, + source: source_ref, + external_reference, + with_options: table_with_options, + envelope, + include_metadata, + format, + }; + + // The source statement with a new name and many of its fields emptied + let source = CreateSourceStatement { + connection: conn, + name: new_source_name, + if_not_exists, + in_cluster, + include_metadata: vec![], + format: None, + envelope: None, + col_names: vec![], + with_options, + key_constraint, + external_references, + progress_subsource, + }; + + let source_id = item.global_id; + let schema_id = item.schema_id.clone(); + let schema = tx.get_schema(&item.schema_id).expect("schema must exist"); + + let owner_id = item.owner_id.clone(); + let privileges = item.privileges.clone(); + let extra_versions = item.extra_versions.clone(); + + // Update the source statement in the catalog first, since the name will + // otherwise conflict with the new table statement. + info!("migrate: updated source {} to {source}", item.create_sql); + item.create_sql = Statement::CreateSource(source).to_ast_string_stable(); + tx.update_item(item.id, item)?; + + // Insert the new table statement into the catalog with a new id. + let ids = tx.allocate_user_item_ids(1)?; + let (new_table_id, new_table_global_id) = ids[0]; + info!("migrate: added table {new_table_id}: {table}"); + tx.insert_user_item( + new_table_id, + new_table_global_id, + schema_id, + &table_item_name, + table.to_ast_string_stable(), + owner_id, + privileges, + &Default::default(), + extra_versions, + )?; + // We need to move the shard currently attached to the source statement to the + // table statement such that the existing data in the shard is preserved and can + // be queried on the new table statement. However, we need to keep the GlobalId of + // the source the same, to preserve existing references to that statement in + // external tools such as DBT and Terraform. We will insert a new shard for the source + // statement which will be automatically created after the migration is complete. + let new_source_shard = ShardId::new(); + let (source_id, existing_source_shard) = tx + .delete_collection_metadata(btreeset! {source_id}) + .pop() + .expect("shard should exist"); + tx.insert_collection_metadata(btreemap! { + new_table_global_id => existing_source_shard, + source_id => new_source_shard + })?; + + add_to_audit_log( + tx, + mz_audit_log::EventType::Create, + mz_audit_log::ObjectType::Table, + mz_audit_log::EventDetails::IdFullNameV1(mz_audit_log::IdFullNameV1 { + id: new_table_id.to_string(), + name: mz_audit_log::FullNameV1 { + database: schema + .database_id + .map(|d| d.to_string()) + .unwrap_or_default(), + schema: schema.name, + item: table_item_name, + }, + }), + now(), + )?; + + // We also need to update any other statements that reference the source to use the new + // table id/name instead. + changed_ids.insert(source_id, new_table_global_id); + } + + // When we upgrade to > rust 1.81 we should use #[expect(unreachable_patterns)] + // to enforce that we have covered all CreateSourceStatement variants. + #[allow(unreachable_patterns)] + Statement::CreateSource(_) => {} + _ => (), + } + } + + let mut updated_items = BTreeMap::new(); + for (mut item, mut statement) in items_with_statements_copied { + match &statement { + // Don’t rewrite any of the statements we just migrated. + Statement::CreateSource(_) => {} + Statement::CreateSubsource(_) => {} + Statement::CreateTableFromSource(_) => {} + // We need to rewrite any statements that reference a source id to use the new + // table id instead, since any contained data in the source will now be in the table. + // This assumes the table has stolen the source's name, which is the case + // for all sources that were migrated. + _ => { + if mz_sql::names::modify_dependency_item_ids(&mut statement, &changed_ids) { + info!("migrate: updated dependency reference in statement {statement}"); + item.create_sql = statement.to_ast_string_stable(); + updated_items.insert(item.id, item); + } + } + } + } + if !updated_items.is_empty() { + tx.update_items(updated_items)?; + } + + Ok(()) +} + // Durable migrations /// Migrations that run only on the durable catalog before any data is loaded into memory. @@ -202,3 +799,17 @@ pub(crate) fn durable_migrate( // // Please include the adapter team on any code reviews that add or edit // migrations. + +fn add_to_audit_log( + tx: &mut Transaction, + event_type: mz_audit_log::EventType, + object_type: mz_audit_log::ObjectType, + details: mz_audit_log::EventDetails, + occurred_at: mz_ore::now::EpochMillis, +) -> Result<(), anyhow::Error> { + let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?; + let event = + mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at); + tx.insert_audit_log_event(event); + Ok(()) +} diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index eab051fd2057b..7a2e0c08c3a1e 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -379,7 +379,6 @@ impl Catalog { let mut post_item_updates = Vec::new(); let mut audit_log_updates = Vec::new(); for (kind, ts, diff) in updates { - let diff = diff.try_into().expect("valid diff"); match kind { BootstrapStateUpdateKind::Role(_) | BootstrapStateUpdateKind::Database(_) @@ -393,7 +392,7 @@ impl Catalog { pre_item_updates.push(StateUpdate { kind: kind.into(), ts, - diff, + diff: diff.try_into().expect("valid diff"), }) } BootstrapStateUpdateKind::IntrospectionSourceIndex(_) @@ -401,29 +400,25 @@ impl Catalog { system_item_updates.push(StateUpdate { kind: kind.into(), ts, - diff, + diff: diff.try_into().expect("valid diff"), }) } BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate { kind: kind.into(), ts, - diff, + diff: diff.try_into().expect("valid diff"), }), BootstrapStateUpdateKind::Comment(_) | BootstrapStateUpdateKind::StorageCollectionMetadata(_) | BootstrapStateUpdateKind::SourceReferences(_) | BootstrapStateUpdateKind::UnfinalizedShard(_) => { - post_item_updates.push(StateUpdate { - kind: kind.into(), - ts, - diff, - }) + post_item_updates.push((kind, ts, diff)); } BootstrapStateUpdateKind::AuditLog(_) => { audit_log_updates.push(StateUpdate { kind: kind.into(), ts, - diff, + diff: diff.try_into().expect("valid diff"), }); } } @@ -501,7 +496,7 @@ impl Catalog { // Migrate item ASTs. let builtin_table_update = if !config.skip_migrations { - migrate::migrate( + let migrate_result = migrate::migrate( &mut state, &mut txn, &mut local_expr_cache, @@ -516,7 +511,21 @@ impl Catalog { this_version: config.build_info.version, cause: e.to_string(), }) - })? + })?; + if !migrate_result.post_item_updates.is_empty() { + // Include any post-item-updates generated by migrations, and then consolidate + // them to ensure diffs are all positive. + post_item_updates.extend(migrate_result.post_item_updates); + // Push everything to the same timestamp so it consolidates cleanly. + if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() { + for (_, ts, _) in &mut post_item_updates { + *ts = max_ts; + } + } + differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates); + } + + migrate_result.builtin_table_updates } else { state .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache) @@ -524,6 +533,14 @@ impl Catalog { }; builtin_table_updates.extend(builtin_table_update); + let post_item_updates = post_item_updates + .into_iter() + .map(|(kind, ts, diff)| StateUpdate { + kind: kind.into(), + ts, + diff: diff.try_into().expect("valid diff"), + }) + .collect(); let builtin_table_update = state .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache) .await; diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 2f26cff656e02..039365243c075 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1943,29 +1943,6 @@ impl Coordinator { let mut privatelink_connections = BTreeMap::new(); for entry in &entries { - // TODO(database-issues#7922): we should move this invariant into `CatalogEntry`. - mz_ore::soft_assert_or_log!( - // We only expect user objects to objects obey this invariant. - // System objects, for instance, can depend on other system - // objects that belong to a schema that is simply loaded first. - // To meaningfully resolve this, we could need more careful - // loading order or more complex IDs, neither of which seem very - // beneficial. - // - // HACK: sinks are permitted to depend on items with larger IDs, - // due to `ALTER SINK`. - !entry.id().is_user() - || entry.is_sink() - || entry - .uses() - .iter() - .all(|dependency_id| *dependency_id <= entry.id), - "entries should only use to items with lesser `GlobalId`s, but \ - {:?} uses {:?}", - entry.id, - entry.uses() - ); - debug!( "coordinator init: installing {} {}", entry.item().typ(), diff --git a/src/catalog/src/durable/objects.rs b/src/catalog/src/durable/objects.rs index e6bb869ce84fb..1d0002461d61f 100644 --- a/src/catalog/src/durable/objects.rs +++ b/src/catalog/src/durable/objects.rs @@ -509,6 +509,12 @@ pub struct Item { pub extra_versions: BTreeMap, } +impl Item { + pub fn item_type(&self) -> CatalogItemType { + item_type(&self.create_sql) + } +} + impl DurableType for Item { type Key = ItemKey; type Value = ItemValue; @@ -1289,32 +1295,36 @@ pub struct ItemValue { } impl ItemValue { - pub(crate) fn item_type(&self) -> CatalogItemType { - // NOTE(benesch): the implementation of this method is hideous, but is - // there a better alternative? Storing the object type alongside the - // `create_sql` would introduce the possibility of skew. - let mut tokens = self.create_sql.split_whitespace(); - assert_eq!(tokens.next(), Some("CREATE")); - match tokens.next() { - Some("TABLE") => CatalogItemType::Table, - Some("SOURCE") | Some("SUBSOURCE") => CatalogItemType::Source, - Some("SINK") => CatalogItemType::Sink, - Some("VIEW") => CatalogItemType::View, - Some("MATERIALIZED") => { - assert_eq!(tokens.next(), Some("VIEW")); - CatalogItemType::MaterializedView - } - Some("CONTINUAL") => { - assert_eq!(tokens.next(), Some("TASK")); - CatalogItemType::ContinualTask - } - Some("INDEX") => CatalogItemType::Index, - Some("TYPE") => CatalogItemType::Type, - Some("FUNCTION") => CatalogItemType::Func, - Some("SECRET") => CatalogItemType::Secret, - Some("CONNECTION") => CatalogItemType::Connection, - _ => panic!("unexpected create sql: {}", self.create_sql), + pub fn item_type(&self) -> CatalogItemType { + item_type(&self.create_sql) + } +} + +fn item_type(create_sql: &str) -> CatalogItemType { + // NOTE(benesch): the implementation of this method is hideous, but is + // there a better alternative? Storing the object type alongside the + // `create_sql` would introduce the possibility of skew. + let mut tokens = create_sql.split_whitespace(); + assert_eq!(tokens.next(), Some("CREATE")); + match tokens.next() { + Some("TABLE") => CatalogItemType::Table, + Some("SOURCE") | Some("SUBSOURCE") => CatalogItemType::Source, + Some("SINK") => CatalogItemType::Sink, + Some("VIEW") => CatalogItemType::View, + Some("MATERIALIZED") => { + assert_eq!(tokens.next(), Some("VIEW")); + CatalogItemType::MaterializedView + } + Some("CONTINUAL") => { + assert_eq!(tokens.next(), Some("TASK")); + CatalogItemType::ContinualTask } + Some("INDEX") => CatalogItemType::Index, + Some("TYPE") => CatalogItemType::Type, + Some("FUNCTION") => CatalogItemType::Func, + Some("SECRET") => CatalogItemType::Secret, + Some("CONNECTION") => CatalogItemType::Connection, + _ => panic!("unexpected create sql: {}", create_sql), } } diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index 544b6702f5721..1255333895ed1 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -2047,6 +2047,13 @@ impl<'a> Transaction<'a> { .map(|(k, v)| DurableType::from_key_value(k, v)) } + pub fn get_schema(&self, id: &SchemaId) -> Option { + let key = SchemaKey { id: *id }; + self.schemas + .get(&key) + .map(|v| DurableType::from_key_value(key, v.clone())) + } + pub fn get_introspection_source_indexes( &self, cluster_id: ClusterId, diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 7bffdcc990681..573aa56f4dfce 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -1294,6 +1294,24 @@ impl AstDisplay for LoadGenerator { } impl_display!(LoadGenerator); +impl LoadGenerator { + /// Corresponds with the same mapping on the `LoadGenerator` enum defined in + /// src/storage-types/src/sources/load_generator.rs, but re-defined here for + /// cases where we only have the AST representation. This can be removed once + /// the `ast_rewrite_sources_to_tables` migration is removed. + pub fn schema_name(&self) -> &'static str { + match self { + LoadGenerator::Counter => "counter", + LoadGenerator::Clock => "clock", + LoadGenerator::Marketing => "marketing", + LoadGenerator::Auction => "auction", + LoadGenerator::Datums => "datums", + LoadGenerator::Tpch => "tpch", + LoadGenerator::KeyValue => "key_value", + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum LoadGeneratorOptionName { ScaleFactor, diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index 7af22528fce85..2eb24322e83b1 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -24,6 +24,7 @@ use mz_repr::network_policy_id::NetworkPolicyId; use mz_repr::role_id::RoleId; use mz_repr::{CatalogItemId, GlobalId, RelationVersion}; use mz_repr::{ColumnName, RelationVersionSelector}; +use mz_sql_parser::ast::visit_mut::VisitMutNode; use mz_sql_parser::ast::{CreateContinualTaskStatement, Expr, RawNetworkPolicyName, Version}; use mz_sql_parser::ident; use proptest_derive::Arbitrary; @@ -2386,6 +2387,44 @@ where ResolvedIds::new(visitor.ids) } +#[derive(Debug)] +pub struct ItemDependencyModifier<'a> { + pub modified: bool, + pub id_map: &'a BTreeMap, +} + +impl<'ast, 'a> VisitMut<'ast, Raw> for ItemDependencyModifier<'a> { + fn visit_item_name_mut(&mut self, item_name: &mut RawItemName) { + if let RawItemName::Id(id, _, _) = item_name { + let parsed_id = id.parse::().unwrap(); + if let Some(new_id) = self.id_map.get(&parsed_id) { + *id = new_id.to_string(); + self.modified = true; + } + } + } +} + +/// Updates any references in the provided AST node that are keys in `id_map`. +/// If an id is found it will be updated to the value of the key in `id_map`. +/// This assumes the names of the reference(s) are unmodified (e.g. each pair of +/// ids refer to an item of the same name, whose id has changed). +pub fn modify_dependency_item_ids<'ast, N>( + node: &'ast mut N, + id_map: &BTreeMap, +) -> bool +where + N: VisitMutNode<'ast, Raw>, +{ + let mut modifier = ItemDependencyModifier { + id_map, + modified: false, + }; + node.visit_mut(&mut modifier); + + modifier.modified +} + // Used when displaying a view's source for human creation. If the name // specified is the same as the name in the catalog, we don't use the ID format. #[derive(Debug)] diff --git a/src/storage-types/src/sources/load_generator.rs b/src/storage-types/src/sources/load_generator.rs index 3d9dd471cd71a..0d6ea646992f1 100644 --- a/src/storage-types/src/sources/load_generator.rs +++ b/src/storage-types/src/sources/load_generator.rs @@ -196,6 +196,8 @@ pub enum LoadGenerator { pub const LOAD_GENERATOR_DATABASE_NAME: &str = "mz_load_generators"; impl LoadGenerator { + /// Must be kept in-sync with the same mapping on the `LoadGenerator` enum defined in + /// src/sql-parser/src/ast/defs/ddl.rs. pub fn schema_name(&self) -> &'static str { match self { LoadGenerator::Counter { .. } => "counter", diff --git a/src/storage/src/source/mysql/replication/events.rs b/src/storage/src/source/mysql/replication/events.rs index 14a628e8db25a..56f0016a3faa4 100644 --- a/src/storage/src/source/mysql/replication/events.rs +++ b/src/storage/src/source/mysql/replication/events.rs @@ -197,6 +197,11 @@ pub(super) async fn handle_query_event( (Some("commit"), None) => { is_complete_event = true; } + // Detect `CREATE TABLE ` statements which don't affect existing tables but do + // signify a complete event (e.g. for the purposes of advancing the GTID) + (Some("create"), Some("table")) => { + is_complete_event = true; + } _ => {} } diff --git a/test/legacy-upgrade/check-from-v0.27.0-compile-proto-sources.td b/test/legacy-upgrade/check-from-v0.27.0-compile-proto-sources.td index 4240d3a2229de..fcc08c2529d2d 100644 --- a/test/legacy-upgrade/check-from-v0.27.0-compile-proto-sources.td +++ b/test/legacy-upgrade/check-from-v0.27.0-compile-proto-sources.td @@ -24,5 +24,3 @@ c h a e - -> DROP SOURCE kafka_proto_source CASCADE; diff --git a/test/legacy-upgrade/check-from-v0.45.0-kafka-progress.td b/test/legacy-upgrade/check-from-v0.45.0-kafka-progress.td index 69af351dbbe33..9592f9055b7d5 100644 --- a/test/legacy-upgrade/check-from-v0.45.0-kafka-progress.td +++ b/test/legacy-upgrade/check-from-v0.45.0-kafka-progress.td @@ -18,5 +18,3 @@ $ set-regex match=\d+ replacement= $ set-regex match=testdrive-upgrade-kafka-source-.*?' replacement=' >[version<8100] SHOW CREATE SOURCE kafka_source materialize.public.kafka_source "CREATE SOURCE \"materialize\".\"public\".\"kafka_source\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = '') FORMAT AVRO USING SCHEMA '{ \"type\": \"record\", \"name\": \"cpx\", \"fields\": [ {\"name\": \"a\", \"type\": \"long\"}, {\"name\": \"b\", \"type\": \"long\"} ] }' ENVELOPE NONE EXPOSE PROGRESS AS \"materialize\".\"public\".\"kafka_source_progress\"" - -> DROP SOURCE kafka_source; diff --git a/test/legacy-upgrade/mzcompose.py b/test/legacy-upgrade/mzcompose.py index a839aabe7e23b..6f88e272d3011 100644 --- a/test/legacy-upgrade/mzcompose.py +++ b/test/legacy-upgrade/mzcompose.py @@ -116,20 +116,56 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: else: if parallelism_count == 1 or parallelism_index == 0: test_upgrade_from_version( - c, f"{version}", priors, filter=args.filter, zero_downtime=True + c, + f"{version}", + priors, + filter=args.filter, + zero_downtime=True, + force_source_table_syntax=False, ) if parallelism_count == 1 or parallelism_index == 1: test_upgrade_from_version( - c, f"{version}", priors, filter=args.filter, zero_downtime=False + c, + f"{version}", + priors, + filter=args.filter, + zero_downtime=False, + force_source_table_syntax=False, + ) + test_upgrade_from_version( + c, + f"{version}", + priors, + filter=args.filter, + zero_downtime=False, + force_source_table_syntax=True, ) if parallelism_count == 1 or parallelism_index == 0: test_upgrade_from_version( - c, "current_source", priors=[], filter=args.filter, zero_downtime=True + c, + "current_source", + priors=[], + filter=args.filter, + zero_downtime=True, + force_source_table_syntax=False, ) if parallelism_count == 1 or parallelism_index == 1: test_upgrade_from_version( - c, "current_source", priors=[], filter=args.filter, zero_downtime=False + c, + "current_source", + priors=[], + filter=args.filter, + zero_downtime=False, + force_source_table_syntax=False, + ) + test_upgrade_from_version( + c, + "current_source", + priors=[], + filter=args.filter, + zero_downtime=False, + force_source_table_syntax=True, ) @@ -152,13 +188,14 @@ def test_upgrade_from_version( priors: list[MzVersion], filter: str, zero_downtime: bool, + force_source_table_syntax: bool, ) -> None: print( f"+++ Testing {'0dt upgrade' if zero_downtime else 'regular upgrade'} from Materialize {from_version} to current_source." ) system_parameter_defaults = get_default_system_parameters( - zero_downtime=zero_downtime + zero_downtime=zero_downtime, ) deploy_generation = 0 @@ -288,6 +325,13 @@ def test_upgrade_from_version( c.rm(mz_service) print(f"{'0dt-' if zero_downtime else ''}Upgrading to final version") + system_parameter_defaults = get_default_system_parameters( + zero_downtime=zero_downtime, + # We can only force the syntax on the final version so that the migration to convert + # sources to the new model can be applied without preventing sources from being + # created in the old syntax on the older version. + force_source_table_syntax=force_source_table_syntax, + ) mz_to = Materialized( name=mz_service, options=list(mz_options.values()), diff --git a/test/mysql-cdc-old-syntax/mzcompose.py b/test/mysql-cdc-old-syntax/mzcompose.py index 32d78456c45c2..308cd4a9c2856 100644 --- a/test/mysql-cdc-old-syntax/mzcompose.py +++ b/test/mysql-cdc-old-syntax/mzcompose.py @@ -22,9 +22,19 @@ ) from materialize.mzcompose.composition import Composition, WorkflowArgumentParser from materialize.mzcompose.services.materialized import Materialized +from materialize.mzcompose.services.minio import Minio from materialize.mzcompose.services.mysql import MySql +from materialize.mzcompose.services.postgres import ( + METADATA_STORE, + CockroachOrPostgresMetadata, +) from materialize.mzcompose.services.test_certs import TestCerts from materialize.mzcompose.services.testdrive import Testdrive +from materialize.source_table_migration import ( + get_new_image_for_source_table_migration_test, + get_old_image_for_source_table_migration_test, + verify_sources_after_source_table_migration, +) def create_mysql(mysql_version: str) -> MySql: @@ -46,6 +56,7 @@ def create_mysql_replica(mysql_version: str) -> MySql: SERVICES = [ Materialized( + external_minio=True, additional_system_parameter_defaults={ "log_filter": "mz_storage::source::mysql=trace,info" }, @@ -53,6 +64,8 @@ def create_mysql_replica(mysql_version: str) -> MySql: create_mysql(MySql.DEFAULT_VERSION), create_mysql_replica(MySql.DEFAULT_VERSION), TestCerts(), + CockroachOrPostgresMetadata(), + Minio(setup_materialize=True), Testdrive(default_timeout="60s"), ] @@ -72,7 +85,11 @@ def get_targeted_mysql_version(parser: WorkflowArgumentParser) -> str: def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: workflows_with_internal_sharding = ["cdc"] sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list( - [w for w in c.workflows if w not in workflows_with_internal_sharding], + [ + w + for w in c.workflows + if w not in workflows_with_internal_sharding and w != "migration" + ], lambda w: w, ) print( @@ -273,3 +290,88 @@ def do_inserts(c: Composition): """ ), ) + + +def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: + parser.add_argument( + "filter", + nargs="*", + default=["*.td"], + help="limit to only the files matching filter", + ) + args = parser.parse_args() + + matching_files = [] + for filter in args.filter: + matching_files.extend(glob.glob(filter, root_dir="test/mysql-cdc-old-syntax")) + + sharded_files: list[str] = sorted( + buildkite.shard_list(matching_files, lambda file: file) + ) + print(f"Files: {sharded_files}") + + mysql_version = get_targeted_mysql_version(parser) + + for file in sharded_files: + + mz_old = Materialized( + name="materialized", + image=get_old_image_for_source_table_migration_test(), + external_metadata_store=True, + external_minio=True, + additional_system_parameter_defaults={ + "log_filter": "mz_storage::source::mysql=trace,info" + }, + ) + + mz_new = Materialized( + name="materialized", + image=get_new_image_for_source_table_migration_test(), + external_metadata_store=True, + external_minio=True, + additional_system_parameter_defaults={ + "log_filter": "mz_storage::source::mysql=trace,info", + "force_source_table_syntax": "true", + }, + ) + + with c.override(mz_old, create_mysql(mysql_version)): + c.up("materialized", "mysql") + + print(f"Running {file} with mz_old") + + valid_ssl_context = retrieve_ssl_context_for_mysql(c) + wrong_ssl_context = retrieve_invalid_ssl_context_for_mysql(c) + + c.sources_and_sinks_ignored_from_validation.add("drop_table") + + c.run_testdrive_files( + f"--var=ssl-ca={valid_ssl_context.ca}", + f"--var=ssl-client-cert={valid_ssl_context.client_cert}", + f"--var=ssl-client-key={valid_ssl_context.client_key}", + f"--var=ssl-wrong-ca={wrong_ssl_context.ca}", + f"--var=ssl-wrong-client-cert={wrong_ssl_context.client_cert}", + f"--var=ssl-wrong-client-key={wrong_ssl_context.client_key}", + f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}", + "--var=mysql-user-password=us3rp4ssw0rd", + f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}", + f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1", + "--no-reset", + file, + ) + + c.kill("materialized", wait=True) + + with c.override(mz_new): + c.up("materialized") + + print("Running mz_new") + verify_sources_after_source_table_migration(c, file) + + c.kill("materialized", wait=True) + c.kill("mysql", wait=True) + c.kill(METADATA_STORE, wait=True) + c.rm("materialized") + c.rm(METADATA_STORE) + c.rm("mysql") + c.rm_volumes("mzdata") diff --git a/test/mysql-cdc-old-syntax/subsource-resolution-duplicates.td b/test/mysql-cdc-old-syntax/subsource-resolution-duplicates.td index 1b5662cc78956..c762d583849e5 100644 --- a/test/mysql-cdc-old-syntax/subsource-resolution-duplicates.td +++ b/test/mysql-cdc-old-syntax/subsource-resolution-duplicates.td @@ -53,6 +53,3 @@ detail: subsources referencing table: x, y mz_source mysql quickstart "" mz_source_progress progress "" t subsource quickstart "" - -$ mysql-execute name=mysql -DROP DATABASE other; diff --git a/test/pg-cdc-old-syntax/mzcompose.py b/test/pg-cdc-old-syntax/mzcompose.py index f44e64c31615b..51ab5ed98d0e9 100644 --- a/test/pg-cdc-old-syntax/mzcompose.py +++ b/test/pg-cdc-old-syntax/mzcompose.py @@ -21,10 +21,20 @@ from materialize.mzcompose.composition import Composition, WorkflowArgumentParser from materialize.mzcompose.service import Service, ServiceConfig from materialize.mzcompose.services.materialized import Materialized -from materialize.mzcompose.services.postgres import Postgres +from materialize.mzcompose.services.minio import Minio +from materialize.mzcompose.services.postgres import ( + METADATA_STORE, + CockroachOrPostgresMetadata, + Postgres, +) from materialize.mzcompose.services.test_certs import TestCerts from materialize.mzcompose.services.testdrive import Testdrive from materialize.mzcompose.services.toxiproxy import Toxiproxy +from materialize.source_table_migration import ( + get_new_image_for_source_table_migration_test, + get_old_image_for_source_table_migration_test, + verify_sources_after_source_table_migration, +) # Set the max slot WAL keep size to 10MB DEFAULT_PG_EXTRA_COMMAND = ["-c", "max_slot_wal_keep_size=10"] @@ -88,8 +98,11 @@ def create_postgres( additional_system_parameter_defaults={ "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error" }, + external_minio=True, ), Testdrive(), + CockroachOrPostgresMetadata(), + Minio(setup_materialize=True), TestCerts(), Toxiproxy(), create_postgres(pg_version=None), @@ -323,7 +336,11 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None: def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: workflows_with_internal_sharding = ["cdc"] sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list( - [w for w in c.workflows if w not in workflows_with_internal_sharding], + [ + w + for w in c.workflows + if w not in workflows_with_internal_sharding and w != "migration" + ], lambda w: w, ) print( @@ -348,3 +365,88 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: with c.test_case(name): c.workflow(name, *parser.args) + + +def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: + parser.add_argument( + "filter", + nargs="*", + default=["*.td"], + help="limit to only the files matching filter", + ) + args = parser.parse_args() + + matching_files = [] + for filter in args.filter: + matching_files.extend(glob.glob(filter, root_dir="test/pg-cdc-old-syntax")) + sharded_files: list[str] = sorted( + buildkite.shard_list(matching_files, lambda file: file) + ) + print(f"Files: {sharded_files}") + + ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout + ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout + ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout + ssl_wrong_cert = c.run( + "test-certs", "cat", "/secrets/postgres.crt", capture=True + ).stdout + ssl_wrong_key = c.run( + "test-certs", "cat", "/secrets/postgres.key", capture=True + ).stdout + + pg_version = get_targeted_pg_version(parser) + + for file in sharded_files: + mz_old = Materialized( + name="materialized", + image=get_old_image_for_source_table_migration_test(), + volumes_extra=["secrets:/share/secrets"], + external_metadata_store=True, + external_minio=True, + additional_system_parameter_defaults={ + "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error" + }, + ) + + mz_new = Materialized( + name="materialized", + image=get_new_image_for_source_table_migration_test(), + volumes_extra=["secrets:/share/secrets"], + external_metadata_store=True, + external_minio=True, + additional_system_parameter_defaults={ + "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error", + "force_source_table_syntax": "true", + }, + ) + with c.override(mz_old, create_postgres(pg_version=pg_version)): + c.up("materialized", "test-certs", "postgres") + + print(f"Running {file} with mz_old") + + c.run_testdrive_files( + f"--var=ssl-ca={ssl_ca}", + f"--var=ssl-cert={ssl_cert}", + f"--var=ssl-key={ssl_key}", + f"--var=ssl-wrong-cert={ssl_wrong_cert}", + f"--var=ssl-wrong-key={ssl_wrong_key}", + f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}", + f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1", + "--no-reset", + file, + ) + c.kill("materialized", wait=True) + + with c.override(mz_new): + c.up("materialized") + + print("Running mz_new") + verify_sources_after_source_table_migration(c, file) + + c.kill("materialized", wait=True) + c.kill("postgres", wait=True) + c.kill(METADATA_STORE, wait=True) + c.rm("materialized") + c.rm(METADATA_STORE) + c.rm("postgres") + c.rm_volumes("mzdata") diff --git a/test/pg-cdc-old-syntax/subsource-resolution-duplicates.td b/test/pg-cdc-old-syntax/subsource-resolution-duplicates.td index 3f510be4088b8..8f053fd552679 100644 --- a/test/pg-cdc-old-syntax/subsource-resolution-duplicates.td +++ b/test/pg-cdc-old-syntax/subsource-resolution-duplicates.td @@ -57,6 +57,3 @@ detail: subsources referencing table: x, y mz_source postgres quickstart "" mz_source_progress progress "" t subsource quickstart "" - -$ postgres-execute connection=postgres://postgres:postgres@postgres -DROP SCHEMA other CASCADE; diff --git a/test/testdrive-old-kafka-src-syntax/mzcompose.py b/test/testdrive-old-kafka-src-syntax/mzcompose.py index 77fd29709bf9d..075ddd07ea6e5 100644 --- a/test/testdrive-old-kafka-src-syntax/mzcompose.py +++ b/test/testdrive-old-kafka-src-syntax/mzcompose.py @@ -12,10 +12,10 @@ the expected-result/actual-result (aka golden testing) paradigm. A query is retried until it produces the desired result. """ - +import glob from pathlib import Path -from materialize import ci_util +from materialize import ci_util, spawn from materialize.mzcompose import get_default_system_parameters from materialize.mzcompose.composition import Composition, WorkflowArgumentParser from materialize.mzcompose.services.azure import Azurite @@ -24,11 +24,20 @@ from materialize.mzcompose.services.materialized import Materialized from materialize.mzcompose.services.minio import Minio from materialize.mzcompose.services.mysql import MySql -from materialize.mzcompose.services.postgres import Postgres +from materialize.mzcompose.services.postgres import ( + METADATA_STORE, + CockroachOrPostgresMetadata, + Postgres, +) from materialize.mzcompose.services.redpanda import Redpanda from materialize.mzcompose.services.schema_registry import SchemaRegistry from materialize.mzcompose.services.testdrive import Testdrive from materialize.mzcompose.services.zookeeper import Zookeeper +from materialize.source_table_migration import ( + get_new_image_for_source_table_migration_test, + get_old_image_for_source_table_migration_test, + verify_sources_after_source_table_migration, +) SERVICES = [ Zookeeper(), @@ -40,12 +49,25 @@ Minio(setup_materialize=True, additional_directories=["copytos3"]), Azurite(), Materialized(external_blob_store=True), + CockroachOrPostgresMetadata(), FivetranDestination(volumes_extra=["tmp:/share/tmp"]), Testdrive(external_blob_store=True), ] def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: + for name in c.workflows: + if name == "default": + continue + + if name == "migration": + continue + + with c.test_case(name): + c.workflow(name) + + +def workflow_kafka(c: Composition, parser: WorkflowArgumentParser) -> None: """Run testdrive.""" parser.add_argument( "--redpanda", @@ -237,3 +259,204 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: ci_util.upload_junit_report( "testdrive", Path(__file__).parent / junit_report ) + + +def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: + """Run testdrive.""" + parser.add_argument( + "--redpanda", + action="store_true", + help="run against Redpanda instead of the Confluent Platform", + ) + parser.add_argument( + "--aws-region", + help="run against the specified AWS region instead of localstack", + ) + parser.add_argument( + "--kafka-default-partitions", + type=int, + metavar="N", + help="set the default number of kafka partitions per topic", + ) + parser.add_argument( + "--system-param", + type=str, + action="append", + nargs="*", + help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`", + ) + + parser.add_argument("--replicas", type=int, default=1, help="use multiple replicas") + + parser.add_argument( + "--default-timeout", + type=str, + help="set the default timeout for Testdrive", + ) + parser.add_argument( + "files", + nargs="*", + default=["*.td"], + help="run against the specified files", + ) + + (args, _) = parser.parse_known_args() + + matching_files = [] + for filter in args.files: + matching_files.extend( + glob.glob(filter, root_dir="test/testdrive-old-kafka-src-syntax") + ) + matching_files = [file for file in matching_files if file != "session.td"] + + dependencies = [ + "fivetran-destination", + "minio", + "materialized", + "postgres", + "mysql", + ] + + if args.redpanda: + kafka_deps = ["redpanda"] + else: + kafka_deps = ["zookeeper", "kafka", "schema-registry"] + + dependencies += kafka_deps + + testdrive = Testdrive( + forward_buildkite_shard=True, + kafka_default_partitions=args.kafka_default_partitions, + aws_region=args.aws_region, + # validate_catalog_store=True, + default_timeout=args.default_timeout, + volumes_extra=["mzdata:/mzdata"], + external_minio=True, + fivetran_destination=True, + fivetran_destination_files_path="/share/tmp", + entrypoint_extra=[f"--var=uses-redpanda={args.redpanda}"], + ) + + sysparams = args.system_param + if not args.system_param: + sysparams = [] + + additional_system_parameter_defaults = {} + for val in sysparams: + x = val[0].split("=", maxsplit=1) + assert len(x) == 2, f"--system-param '{val}' should be the format =" + key = x[0] + val = x[1] + + additional_system_parameter_defaults[key] = val + + additional_system_parameter_defaults["force_source_table_syntax"] = "true" + + mz_old = Materialized( + default_size=Materialized.Size.DEFAULT_SIZE, + image=get_old_image_for_source_table_migration_test(), + external_metadata_store=True, + external_minio=True, + additional_system_parameter_defaults=additional_system_parameter_defaults, + ) + mz_new = Materialized( + default_size=Materialized.Size.DEFAULT_SIZE, + image=get_new_image_for_source_table_migration_test(), + external_metadata_store=True, + external_minio=True, + additional_system_parameter_defaults=additional_system_parameter_defaults, + ) + + for file in matching_files: + with c.override(testdrive, mz_old): + c.up(*dependencies) + + c.sql( + "ALTER SYSTEM SET max_clusters = 50;", + port=6877, + user="mz_system", + ) + + non_default_testdrive_vars = [] + + if args.replicas > 1: + c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877) + # Make sure a replica named 'r1' always exists + replica_names = [ + "r1" if replica_id == 0 else f"replica{replica_id}" + for replica_id in range(0, args.replicas) + ] + replica_string = ",".join( + f"{replica_name} (SIZE '{mz_old.default_replica_size}')" + for replica_name in replica_names + ) + c.sql( + f"CREATE CLUSTER quickstart REPLICAS ({replica_string})", + user="mz_system", + port=6877, + ) + + # Note that any command that outputs SHOW CLUSTERS will have output + # that depends on the number of replicas testdrive has. This means + # it might be easier to skip certain tests if the number of replicas + # is > 1. + c.sql( + f""" + CREATE CLUSTER testdrive_single_replica_cluster SIZE = '{mz_old.default_replica_size}'; + GRANT ALL PRIVILEGES ON CLUSTER testdrive_single_replica_cluster TO materialize; + """, + user="mz_system", + port=6877, + ) + + non_default_testdrive_vars.append(f"--var=replicas={args.replicas}") + non_default_testdrive_vars.append( + "--var=single-replica-cluster=testdrive_single_replica_cluster" + ) + + non_default_testdrive_vars.append( + f"--var=default-replica-size={mz_old.default_replica_size}" + ) + non_default_testdrive_vars.append( + f"--var=default-storage-size={mz_old.default_storage_size}" + ) + + print(f"Running {file} with mz_old") + + c.run_testdrive_files( + *non_default_testdrive_vars, + "--no-reset", + file, + ) + + c.kill("materialized", wait=True) + + with c.override(mz_new): + c.up("materialized") + + print("Running mz_new") + verify_sources_after_source_table_migration(c, file) + + c.kill("materialized", wait=True) + c.kill("postgres", wait=True) + c.kill("mysql", wait=True) + c.kill(METADATA_STORE, wait=True) + + for dep in kafka_deps: + c.kill(dep, wait=True) + + for dep in kafka_deps: + c.rm(dep) + + c.rm("materialized") + c.rm(METADATA_STORE) + c.rm("postgres") + c.rm("mysql") + + # remove the testdrive container which uses the mzdata volume + testdrive_container_id = spawn.capture( + ["docker", "ps", "-a", "--filter", f"volume={c.name}_mzdata", "-q"] + ).strip() + spawn.runv(["docker", "rm", testdrive_container_id]) + + c.rm_volumes("mzdata", force=True)