Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/adapter: Opt-in migration of sources to the new table model #30483

Draft
wants to merge 39 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9870b80
Opt-in catalog migration for converting subsources to source tables
rjobanp Sep 16, 2024
f9ebeaf
Add migration logic for source statements
rjobanp Oct 23, 2024
5825163
Rename the feature flag
rjobanp Oct 23, 2024
51b1765
Add new table to audit log
rjobanp Oct 23, 2024
c486e12
Add platform check scenario to test migration
rjobanp Oct 23, 2024
d619d5b
Switch to using system vars instead of flags to allow console access …
rjobanp Oct 24, 2024
b12e394
Fixes to migration based on testing
rjobanp Oct 24, 2024
0437619
Also test the migration in the legacy upgrade tests
rjobanp Oct 24, 2024
55292a9
platform checks: unique source name
nrainer-materialize Oct 25, 2024
70edead
Migration structure cleanup from feedback
rjobanp Oct 25, 2024
8b343d4
Address more feedback; ensure new source name is unique
rjobanp Oct 25, 2024
1fc1c64
Fix legacy-upgrade checks
rjobanp Oct 25, 2024
20c1d39
Fixes caused by rebase on main
rjobanp Oct 25, 2024
1c400f0
ci: print source table migration issues
nrainer-materialize Oct 28, 2024
bb51c06
migration tests: pg-cdc-old-syntax
nrainer-materialize Oct 25, 2024
ede1b97
migration tests: extract logic
nrainer-materialize Oct 28, 2024
d130b66
migration tests: improve verification
nrainer-materialize Oct 28, 2024
2ec7de2
migration tests: mysql-cdc-old-syntax
nrainer-materialize Oct 28, 2024
3968ff2
migration tests: testdrive-old-kafka-syntax
nrainer-materialize Oct 28, 2024
97184a2
migration tests: improve output
nrainer-materialize Oct 29, 2024
1a45397
migration tests: fixes
nrainer-materialize Oct 29, 2024
bd7286b
migration tests: fixes
nrainer-materialize Oct 29, 2024
4c72ba8
Fix for mysql source being restarted after new table added
rjobanp Oct 29, 2024
acec835
Avoid needing to rewrite ids of dependent statements by changing the …
rjobanp Oct 29, 2024
1c78f0f
Fix merge skew
jkosh44 Nov 14, 2024
e158d0e
Fix dependency tracking
jkosh44 Nov 14, 2024
b674f66
Fix lint
jkosh44 Nov 14, 2024
f724904
Fix some issues
jkosh44 Nov 14, 2024
6fb5019
Fix dependency tracking
jkosh44 Nov 14, 2024
d097a28
Fix merge skew
jkosh44 Nov 15, 2024
96cd8e0
Update test versions
jkosh44 Nov 15, 2024
1fa1454
More merge skew fixes
jkosh44 Dec 12, 2024
0195195
Update item sorting to only sort within item groups
jkosh44 Dec 16, 2024
3bc8a27
Experiment for migrate
jkosh44 Dec 17, 2024
5b26267
Fix migration idempotency
jkosh44 Dec 18, 2024
3f6c1b9
Fixup
jkosh44 Dec 18, 2024
2c8c23c
resolve merge conflicts
jkosh44 Jan 2, 2025
7b6be9d
Update versions
jkosh44 Jan 14, 2025
39434cd
Fix more merge skew
jkosh44 Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,17 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-migration
label: MySQL CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: mysql-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-resumption-old-syntax
label: MySQL CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -602,6 +613,17 @@ steps:
queue: hetzner-aarch64-4cpu-8gb
# the mzbuild postgres version will be used, which depends on the Dockerfile specification

- id: pg-cdc-migration
label: Postgres CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: pg-cdc-resumption-old-syntax
label: Postgres CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -632,6 +654,17 @@ steps:
agents:
queue: hetzner-aarch64-8cpu-16gb

- id: testdrive-kafka-migration
label: "Testdrive %N migration tests"
depends_on: build-aarch64
timeout_in_minutes: 180
parallelism: 8
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive-old-kafka-src-syntax
run: migration
agents:
queue: hetzner-aarch64-8cpu-16gb

- group: AWS
key: aws
Expand Down
56 changes: 56 additions & 0 deletions misc/python/materialize/checks/all_checks/upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,59 @@ def validate(self) -> Testdrive:
"""
)
)


class UpsertLegacy(Check):
"""
An upsert source test that uses the legacy syntax to create the source
on all versions to ensure the source is properly migrated with the
ActivateSourceVersioningMigration scenario
"""

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
+ dedent(
"""
$ kafka-create-topic topic=upsert-legacy-syntax

$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

> CREATE SOURCE upsert_insert_legacy
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-legacy-syntax-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT

> CREATE MATERIALIZED VIEW upsert_insert_legacy_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert_legacy;
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert_legacy
10000 10000 10000

> SELECT * FROM upsert_insert_legacy_view;
10000
"""
)
)
46 changes: 46 additions & 0 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,49 @@ def actions(self) -> list[Action]:
),
Validate(self),
]


class ActivateSourceVersioningMigration(Scenario):
"""
Starts MZ, initializes and manipulates, then forces the migration
of sources to the new table model (introducing Source Versioning).
"""

def base_version(self) -> MzVersion:
return get_last_version()

def actions(self) -> list[Action]:
print(f"Upgrading from tag {self.base_version()}")
return [
StartMz(
self,
tag=self.base_version(),
),
Initialize(self),
Manipulate(self, phase=1),
KillMz(
capture_logs=True
), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
StartMz(
self,
tag=None,
# Activate the `force_source_table_syntax` flag
# which should trigger the migration of sources
# using the old syntax to the new table model.
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Manipulate(self, phase=2),
Validate(self),
# A second restart while already on the new version
KillMz(capture_logs=True),
StartMz(
self,
tag=None,
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Validate(self),
]
2 changes: 2 additions & 0 deletions misc/python/materialize/cli/ci_annotate_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@
| (FAIL|TIMEOUT)\s+\[\s*\d+\.\d+s\]
# parallel-workload
| worker_.*\ still\ running: [\s\S]* Threads\ have\ not\ stopped\ within\ 5\ minutes,\ exiting\ hard
# source-table migration
| source-table-migration\ issue
)
.* $
""",
Expand Down
7 changes: 5 additions & 2 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@


def get_default_system_parameters(
version: MzVersion | None = None, zero_downtime: bool = False
version: MzVersion | None = None,
zero_downtime: bool = False,
force_source_table_syntax: bool = False,
) -> dict[str, str]:
"""For upgrade tests we only want parameters set when all environmentd /
clusterd processes have reached a specific version (or higher)
Expand Down Expand Up @@ -89,7 +91,7 @@ def get_default_system_parameters(
"enable_0dt_deployment": "true" if zero_downtime else "false",
"enable_0dt_deployment_panic_after_timeout": "true",
"enable_0dt_deployment_sources": (
"true" if version >= MzVersion.parse_mz("v0.125.0-dev") else "false"
"true" if version >= MzVersion.parse_mz("v0.130.0-dev") else "false"
),
"enable_alter_swap": "true",
"enable_columnation_lgalloc": "true",
Expand Down Expand Up @@ -125,6 +127,7 @@ def get_default_system_parameters(
"persist_record_schema_id": (
"true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
),
"force_source_table_syntax": "true" if force_source_table_syntax else "false",
"persist_batch_columnar_format": "both_v2",
"persist_batch_delete_enabled": "true",
"persist_batch_structured_order": "true",
Expand Down
71 changes: 71 additions & 0 deletions misc/python/materialize/source_table_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Utilities for testing the source table migration"""
from materialize.mz_version import MzVersion
from materialize.mzcompose.composition import Composition


def verify_sources_after_source_table_migration(
c: Composition, file: str, fail: bool = False
) -> None:
source_names_rows = c.sql_query(
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
)
source_names = [row[0] for row in source_names_rows]

print(f"Sources created in {file} are: {source_names}")

c.sql("SET statement_timeout = '20s'")

for source_name in source_names:
_verify_source(c, file, source_name, fail=fail)


def _verify_source(
c: Composition, file: str, source_name: str, fail: bool = False
) -> None:
try:
print(f"Checking source: {source_name}")

# must not crash
statement = f"SELECT count(*) FROM {source_name};"
print(statement)
c.sql_query(statement)

statement = f"SHOW CREATE SOURCE {source_name};"
print(statement)
result = c.sql_query(statement)
sql = result[0][1]
assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"

if not source_name.endswith("_progress"):
assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"

print("OK.")
except Exception as e:
print(f"source-table-migration issue in {file}: {str(e)}")

if fail:
raise e


def check_source_table_migration_test_sensible() -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat!

assert MzVersion.parse_cargo() < MzVersion.parse_mz(
"v0.137.0"
), "migration test probably no longer needed"


def get_old_image_for_source_table_migration_test() -> str:
return "materialize/materialized:v0.129.0"


def get_new_image_for_source_table_migration_test() -> str | None:
return None
6 changes: 6 additions & 0 deletions src/adapter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rust_library(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -119,6 +120,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -165,6 +167,7 @@ rust_doc_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -231,6 +234,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -297,6 +301,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -363,6 +368,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
governor = "0.6.0"
hex = "0.4.3"
http = "1.1.0"
ipnet = "2.5.0"
itertools = "0.12.1"
Expand Down Expand Up @@ -53,6 +54,7 @@ mz-pgcopy = { path = "../pgcopy" }
mz-pgrepr = { path = "../pgrepr" }
mz-pgwire-common = { path = "../pgwire-common" }
mz-postgres-util = { path = "../postgres-util" }
mz-proto = { path = "../proto" }
mz-repr = { path = "../repr", features = ["tracing_"] }
mz-rocksdb-types = { path = "../rocksdb-types" }
mz-secrets = { path = "../secrets" }
Expand All @@ -68,6 +70,7 @@ mz-transform = { path = "../transform" }
mz-timestamp-oracle = { path = "../timestamp-oracle" }
opentelemetry = { version = "0.24.0", features = ["trace"] }
prometheus = { version = "0.13.3", default-features = false }
prost = { version = "0.13.2", features = ["no-recursion-limit"] }
qcell = "0.5"
rand = "0.8.5"
rand_chacha = "0.3"
Expand Down
Loading
Loading