Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/adapter: Opt-in migration of sources to the new table model #30483

Draft
wants to merge 37 commits into
base: main
Choose a base branch
from

Conversation

jkosh44
Copy link
Contributor

@jkosh44 jkosh44 commented Nov 14, 2024

Motivation

The subsequent PR will implement https://github.com/MaterializeInc/database-issues/issues/8678, which will also disable use of the 'old style' source statements using the same feature-flag introduced here. Once this PR and that PR land, then enabling the force_source_table_syntax flag will completely switch over users to the new syntax.

Tips for reviewer

To test this I've added a new scenario to platform-checks called ActivateSourceVersioningMigration, that runs materialize on an existing version for each check's initialize() method, and then restarts materialize on the latest version with the force_source_table_syntax, activating the migration of any sources created using the 'old style' syntax. Then the validate() step is run on this new version, confirming that all the queries continue to work.

There are already existing platform-checks Checks that use the 'old style' source syntax: TableFromPgSource, TableFromMySqlSource, LoadGeneratorAsOfUpTo, and one I added called UpsertLegacy, that cover the 4 source types we need to test. There are also many other checks that use the old syntax when running on 'old' versions before 0.119, but I wasn't sure how to make the ActivateSourceVersioningMigration scenario target a specific version rather than just the 'previous' version for the base run. @def- @nrainer-materialize let me know if you have ideas on doing that.

I've also updated the legacy upgrade tests to activate this migration after the upgrade which should provide additional coverage too.

Nightly

https://buildkite.com/materialize/nightly/builds?branch=rjobanp%3Asource-table-migration

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@jkosh44 jkosh44 marked this pull request as ready for review November 14, 2024 18:04
@jkosh44 jkosh44 requested review from a team as code owners November 14, 2024 18:04
@jkosh44 jkosh44 requested a review from ParkMyCar November 14, 2024 18:04
Copy link

shepherdlybot bot commented Nov 14, 2024

Risk Score:81 / 100 Bug Hotspots:4 Resilience Coverage:50%

Mitigations

Completing required mitigations increases Resilience Coverage.

  • (Required) Code Review 🔍 Detected
  • (Required) Feature Flag
  • (Required) Integration Test 🔍 Detected
  • (Required) Observability 🔍 Detected
  • (Required) QA Review
  • (Required) Run Nightly Tests
  • Unit Test
Risk Summary:

The pull request has a high risk score of 81, driven by predictors such as the "Sum Bug Reports Of Files" and the "Delta of Executable Lines". Historically, PRs with these predictors are 115% more likely to cause a bug than the repo baseline. While the observed and predicted bug trends for the repository are decreasing, the presence of 4 file hotspots suggests a heightened risk.

Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity.

Bug Hotspots:
What's This?

File Percentile
../catalog/apply.rs 98
../src/coord.rs 100
../src/names.rs 92
../catalog/open.rs 99

@jkosh44
Copy link
Contributor Author

jkosh44 commented Nov 14, 2024

I've re-opened #30168 with my own fork to avoid CI issues.

Comment on lines 2362 to 2371
if let RawItemName::Id(id, _, _) = item_name {
let parsed_id = id.parse::<GlobalId>().unwrap();
self.ids.insert(parsed_id);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm skeptical that this is correct, because it will skip over all named references leading to a potentially incorrect order.

@jkosh44 jkosh44 force-pushed the source-table-migration branch 2 times, most recently from 1972e06 to db98078 Compare November 14, 2024 20:11
Copy link
Member

@ParkMyCar ParkMyCar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly focused on migrate.rs but overall the PR LGTM!

raise e


def check_source_table_migration_test_sensible() -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat!

Comment on lines 1897 to 1916
// This must be a CTE.
_ => continue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure about this? What about Types and Functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not sure, I was just experimenting to see if this would work. I think there's still CI failures, so it might not actually work. Types and functions should be fully qualified though.

@@ -2351,6 +2352,79 @@ where
ResolvedIds::new(visitor.ids)
}

#[derive(Debug)]
struct RawItemDependencyIds<'a> {
ids: BTreeSet<GlobalId>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably needs to switch to CatalogItemId?

impl<'ast, 'a> VisitMut<'ast, Raw> for ItemDependencyModifier<'a> {
fn visit_item_name_mut(&mut self, item_name: &mut RawItemName) {
if let RawItemName::Id(id, _, _) = item_name {
let parsed_id = id.parse::<GlobalId>().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, switch to CatalogItemId?

@jkosh44 jkosh44 force-pushed the source-table-migration branch from 02ad145 to 098739e Compare December 12, 2024 21:05

def check_source_table_migration_test_sensible() -> None:
assert MzVersion.parse_cargo() < MzVersion.parse_mz(
"v0.133.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self, does this version still make sense?



def get_old_image_for_source_table_migration_test() -> str:
return "materialize/materialized:v0.125.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self, does this version still make sense?

@jkosh44 jkosh44 force-pushed the source-table-migration branch 3 times, most recently from 0f50e5f to cf8f451 Compare December 12, 2024 21:20
@jkosh44 jkosh44 marked this pull request as draft December 16, 2024 17:58
@jkosh44 jkosh44 force-pushed the source-table-migration branch 4 times, most recently from e8930e8 to 619116d Compare December 17, 2024 18:34
@jkosh44
Copy link
Contributor Author

jkosh44 commented Dec 17, 2024

There seems to be an issue with the migration with at least Kafka sources.

If I have the following user objects:

=> SELECT * FROM mz_objects WHERE id LIKE 'u%' ORDER BY id ASC;
 id |  oid  | schema_id |     name      |    type    | owner_id | cluster_id |    privileges     
----+-------+-----------+---------------+------------+----------+------------+-------------------
 u1 | 20189 | u3        | csr_conn      | connection | u1       |            | {u1=U/u1}
 u2 | 20190 | u3        | kafka_conn    | connection | u1       |            | {u1=U/u1}
 u3 | 20191 | u3        | data_progress | source     | u1       |            | {s2=r/u1,u1=r/u1}
 u4 | 20192 | u3        | data          | source     | u1       | u1         | {u1=r/u1}
(4 rows)

Then I enable the migrations via:

=> ALTER SYSTEM SET enable_create_table_from_source TO on;
NOTICE:  variable "enable_create_table_from_source" was updated for the system, this will have no effect on the current session
ALTER SYSTEM
=> ALTER SYSTEM SET force_source_table_syntax TO on;
NOTICE:  variable "force_source_table_syntax" was updated for the system, this will have no effect on the current session
ALTER SYSTEM

Then I restart Materialize, the following user objects will exist:

=> SELECT * FROM mz_objects WHERE id LIKE 'u%' ORDER BY id ASC;
 id |  oid  | schema_id |     name      |    type    | owner_id | cluster_id |    privileges     
----+-------+-----------+---------------+------------+----------+------------+-------------------
 u1 | 20189 | u3        | csr_conn      | connection | u1       |            | {u1=U/u1}
 u2 | 20190 | u3        | kafka_conn    | connection | u1       |            | {u1=U/u1}
 u3 | 20191 | u3        | data_progress | source     | u1       |            | {s2=r/u1,u1=r/u1}
 u4 | 20192 | u3        | data_source   | source     | u1       | u1         | {u1=r/u1}
 u5 | 20193 | u3        | data          | table      | u1       |            | {u1=r/u1}
(5 rows)

That looks correct and is what we would expect to happen. However, if I restart Materialize one more time, then the following user objects will exist:

=> SELECT * FROM mz_objects WHERE id LIKE 'u%' ORDER BY id ASC;
 id |  oid  | schema_id |        name        |    type    | owner_id | cluster_id |    privileges     
----+-------+-----------+--------------------+------------+----------+------------+-------------------
 u1 | 20189 | u3        | csr_conn           | connection | u1       |            | {u1=U/u1}
 u2 | 20190 | u3        | kafka_conn         | connection | u1       |            | {u1=U/u1}
 u3 | 20191 | u3        | data_progress      | source     | u1       |            | {s2=r/u1,u1=r/u1}
 u4 | 20192 | u3        | data_source_source | source     | u1       | u1         | {u1=r/u1}
 u5 | 20193 | u3        | data               | table      | u1       |            | {u1=r/u1}
 u6 | 20194 | u3        | data_source        | table      | u1       |            | {u1=r/u1}
(6 rows)

As you can see, the already migrated objects were migrated again. If I restart Materialize once again, then I see the following:

=> SELECT * FROM mz_objects WHERE id LIKE 'u%' ORDER BY id ASC;
 id |  oid  | schema_id |           name            |    type    | owner_id | cluster_id |    privileges     
----+-------+-----------+---------------------------+------------+----------+------------+-------------------
 u1 | 20189 | u3        | csr_conn                  | connection | u1       |            | {u1=U/u1}
 u2 | 20190 | u3        | kafka_conn                | connection | u1       |            | {u1=U/u1}
 u3 | 20191 | u3        | data_progress             | source     | u1       |            | {s2=r/u1,u1=r/u1}
 u4 | 20192 | u3        | data_source_source_source | source     | u1       | u1         | {u1=r/u1}
 u5 | 20193 | u3        | data                      | table      | u1       |            | {u1=r/u1}
 u6 | 20194 | u3        | data_source               | table      | u1       |            | {u1=r/u1}
 u7 | 20195 | u3        | data_source_source        | table      | u1       |            | {u1=r/u1}
(7 rows)

I'm assuming that each time Materialize is reset the migration is re-run.

@jkosh44
Copy link
Contributor Author

jkosh44 commented Dec 18, 2024

I was able to fix the migration idempotency issue in the most recent batch of commits. Now there's another issue that is triggered in the *-compile-proto-sources.td legacy upgrade test. For some reason, after being migrated, the kafka_proto_source source has it's upper and since stuck at 0.

materialize=> EXPLAIN TIMESTAMP FOR SELECT * FROM kafka_proto_source;
                                 Timestamp                                 
---------------------------------------------------------------------------
                 query timestamp: 1734549613466 (2024-12-18 19:20:13.466) +
           oracle read timestamp: 1734549613466 (2024-12-18 19:20:13.466) +
 largest not in advance of upper:             0 (1970-01-01 00:00:00.000) +
                           upper:[            0 (1970-01-01 00:00:00.000)]+
                           since:[            0 (1970-01-01 00:00:00.000)]+
         can respond immediately: false                                   +
                        timeline: Some(EpochMilliseconds)                 +
               session wall time: 1734549614451 (2024-12-18 19:20:14.451) +
                                                                          +
 source materialize.public.kafka_proto_source (u5, storage):              +
                   read frontier:[            0 (1970-01-01 00:00:00.000)]+
                  write frontier:[            0 (1970-01-01 00:00:00.000)]+
 
(1 row)

@jkosh44 jkosh44 force-pushed the source-table-migration branch from 5bb01ec to cb45af0 Compare December 18, 2024 20:16
@jkosh44 jkosh44 force-pushed the source-table-migration branch from cb45af0 to 254b6ed Compare December 26, 2024 15:01
@jkosh44
Copy link
Contributor Author

jkosh44 commented Dec 26, 2024

I was able to fix the migration idempotency issue in the most recent batch of commits. Now there's another issue that is triggered in the *-compile-proto-sources.td legacy upgrade test. For some reason, after being migrated, the kafka_proto_source source has it's upper and since stuck at 0.

materialize=> EXPLAIN TIMESTAMP FOR SELECT * FROM kafka_proto_source;
                                 Timestamp                                 
---------------------------------------------------------------------------
                 query timestamp: 1734549613466 (2024-12-18 19:20:13.466) +
           oracle read timestamp: 1734549613466 (2024-12-18 19:20:13.466) +
 largest not in advance of upper:             0 (1970-01-01 00:00:00.000) +
                           upper:[            0 (1970-01-01 00:00:00.000)]+
                           since:[            0 (1970-01-01 00:00:00.000)]+
         can respond immediately: false                                   +
                        timeline: Some(EpochMilliseconds)                 +
               session wall time: 1734549614451 (2024-12-18 19:20:14.451) +
                                                                          +
 source materialize.public.kafka_proto_source (u5, storage):              +
                   read frontier:[            0 (1970-01-01 00:00:00.000)]+
                  write frontier:[            0 (1970-01-01 00:00:00.000)]+
 
(1 row)

The issue also repros with *-kafka-source.td which suggests that it's an issue with all Kafka sources. I was also able to repro this with counter load generator sources which suggests that it's an issue with all single output sources. I haven't tried multi-output sources yet.

@jkosh44
Copy link
Contributor Author

jkosh44 commented Dec 26, 2024

The load gen output looks like this, which is slightly different that the kafka source above:

materialize=> EXPLAIN TIMESTAMP FOR SELECT COUNT(*) > 0 FROM lg_source;
                                 Timestamp                                 
---------------------------------------------------------------------------
                 query timestamp: 1735236290985 (2024-12-26 18:04:50.985) +
           oracle read timestamp: 1735236290985 (2024-12-26 18:04:50.985) +
 largest not in advance of upper: 1735236080000 (2024-12-26 18:01:20.000) +
                           upper:[1735236080001 (2024-12-26 18:01:20.001)]+
                           since:[            0 (1970-01-01 00:00:00.000)]+
         can respond immediately: false                                   +
                        timeline: Some(EpochMilliseconds)                 +
               session wall time: 1735236291320 (2024-12-26 18:04:51.320) +
                                                                          +
 source materialize.public.lg_source (u3, storage):                       +
                   read frontier:[1735236079000 (2024-12-26 18:01:19.000)]+
                  write frontier:[1735236080001 (2024-12-26 18:01:20.001)]+
 
(1 row)

EDIT: Actually taking a closer look at this, I don't know how to explain it at all. Why would the since be different than the read frontier?

@jkosh44 jkosh44 force-pushed the source-table-migration branch from 254b6ed to dc960f3 Compare January 2, 2025 16:29
@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 2, 2025

Here's my best understanding of the full end-to-end issue:

This describes creating the source. When creating the table from source, we attempt to re-start the source dataflow, but it is also skipped. The source_exports is not empty, it contains a single export, the newly created table. The resume_uppers is set to {table_id: [0]} because the table upper is [0].

for (id, export) in ingestion_description.source_exports.iter() {
// Explicit destructuring to force a compile error when the metadata change
let CollectionMetadata {
persist_location,
remap_shard,
data_shard,
relation_desc,
txns_shard,
} = &export.storage_metadata;
assert_eq!(
txns_shard, &None,
"source {} unexpectedly using txn-wal",
id
);
let client = persist_clients
.open(persist_location.clone())
.await
.expect("error creating persist client");
let mut write_handle = client
.open_writer::<SourceData, (), T, Diff>(
*data_shard,
Arc::new(relation_desc.clone()),
Arc::new(UnitSchema),
Diagnostics {
shard_name: id.to_string(),
handle_purpose: format!("resumption data {}", id),
},
)
.await
.unwrap();
let upper = write_handle.fetch_recent_upper().await;
let upper = match export.data_config.envelope {
// The CdcV2 envelope must re-ingest everything since the Mz frontier does not have a relation to upstream timestamps.
// TODO(petrosagg): move this reasoning to the controller
SourceEnvelope::CdcV2 if upper.is_empty() => Antichain::new(),
SourceEnvelope::CdcV2 => Antichain::from_elem(Timestamp::minimum()),
_ => upper.clone(),
};
resume_uppers.insert(*id, upper);
write_handle.expire().await;

However, as_of is still empty because the remap_shard is set to None.

// TODO(petrosagg): The as_of of the ingestion should normally be based
// on the since frontiers of its outputs. Even though the storage
// controller makes sure to make downgrade decisions in an organized
// and ordered fashion, it then proceeds to persist them in an
// asynchronous and disorganized fashion to persist. The net effect is
// that upon restart, or upon observing the persist state like this
// function, one can see non-sensical results like the since of A be in
// advance of B even when B depends on A! This can happen because the
// downgrade of B gets reordered and lost. Here is our best attempt at
// playing detective of what the controller meant to do by blindly
// assuming that the since of the remap shard is a suitable since
// frontier without consulting the since frontier of the outputs. One
// day we will enforce order to chaos and this comment will be deleted.
if let Some(remap_shard) = remap_shard {
match seen_remap_shard.as_ref() {
None => {
let read_handle = client
.open_leased_reader::<SourceData, (), T, Diff>(
*remap_shard,
Arc::new(
ingestion_description
.desc
.connection
.timestamp_desc(),
),
Arc::new(UnitSchema),
Diagnostics {
shard_name: ingestion_description
.remap_collection_id
.to_string(),
handle_purpose: format!(
"resumption data for {}",
id
),
},
false,
)
.await
.unwrap();
as_of.clone_from(read_handle.since());
mz_ore::task::spawn(
move || "deferred_expire",
async move {
tokio::time::sleep(std::time::Duration::from_secs(
300,
))
.await;
read_handle.expire().await;
},
);
seen_remap_shard = Some(remap_shard.clone());
}
Some(shard) => assert_eq!(
shard, remap_shard,
"ingestion with multiple remap shards"
),
}
}

I'm still investigating why remap_shard is None.

@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 3, 2025

I'm still investigating why remap_shard is None.

The table has a data source of type IngestionExport and only data sources of type Ingestion have a remap_shard, so this is by definition. At this point I'm not really sure how this is supposed to work and will probably need some storage help to continue.

@jkosh44
Copy link
Contributor Author

jkosh44 commented Jan 3, 2025

This small change #30940 (i.e. enabling force_source_table_syntax) breaks all types of sources. So it looks like this issue isn't specific to load gen sources.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants