-
Notifications
You must be signed in to change notification settings - Fork 464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage/adapter: Opt-in migration of sources to the new table model #30483
base: main
Are you sure you want to change the base?
Conversation
MitigationsCompleting required mitigations increases Resilience Coverage.
Risk Summary:The pull request has a high risk score of 81, driven by predictors such as the "Sum Bug Reports Of Files" and the "Delta of Executable Lines". Historically, PRs with these predictors are 115% more likely to cause a bug than the repo baseline. While the observed and predicted bug trends for the repository are decreasing, the presence of 4 file hotspots suggests a heightened risk. Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity. Bug Hotspots:
|
42c6af0
to
c505cfc
Compare
I've re-opened #30168 with my own fork to avoid CI issues. |
src/sql/src/names.rs
Outdated
if let RawItemName::Id(id, _, _) = item_name { | ||
let parsed_id = id.parse::<GlobalId>().unwrap(); | ||
self.ids.insert(parsed_id); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm skeptical that this is correct, because it will skip over all named references leading to a potentially incorrect order.
1972e06
to
db98078
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly focused on migrate.rs
but overall the PR LGTM!
raise e | ||
|
||
|
||
def check_source_table_migration_test_sensible() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
neat!
src/adapter/src/catalog/apply.rs
Outdated
// This must be a CTE. | ||
_ => continue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure about this? What about Types and Functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely not sure, I was just experimenting to see if this would work. I think there's still CI failures, so it might not actually work. Types and functions should be fully qualified though.
src/sql/src/names.rs
Outdated
@@ -2351,6 +2352,79 @@ where | |||
ResolvedIds::new(visitor.ids) | |||
} | |||
|
|||
#[derive(Debug)] | |||
struct RawItemDependencyIds<'a> { | |||
ids: BTreeSet<GlobalId>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this probably needs to switch to CatalogItemId
?
impl<'ast, 'a> VisitMut<'ast, Raw> for ItemDependencyModifier<'a> { | ||
fn visit_item_name_mut(&mut self, item_name: &mut RawItemName) { | ||
if let RawItemName::Id(id, _, _) = item_name { | ||
let parsed_id = id.parse::<GlobalId>().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, switch to CatalogItemId
?
02ad145
to
098739e
Compare
|
||
def check_source_table_migration_test_sensible() -> None: | ||
assert MzVersion.parse_cargo() < MzVersion.parse_mz( | ||
"v0.133.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self, does this version still make sense?
|
||
|
||
def get_old_image_for_source_table_migration_test() -> str: | ||
return "materialize/materialized:v0.125.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self, does this version still make sense?
0f50e5f
to
cf8f451
Compare
e8930e8
to
619116d
Compare
There seems to be an issue with the migration with at least Kafka sources. If I have the following user objects:
Then I enable the migrations via:
Then I restart Materialize, the following user objects will exist:
That looks correct and is what we would expect to happen. However, if I restart Materialize one more time, then the following user objects will exist:
As you can see, the already migrated objects were migrated again. If I restart Materialize once again, then I see the following:
I'm assuming that each time Materialize is reset the migration is re-run. |
I was able to fix the migration idempotency issue in the most recent batch of commits. Now there's another issue that is triggered in the materialize=> EXPLAIN TIMESTAMP FOR SELECT * FROM kafka_proto_source;
Timestamp
---------------------------------------------------------------------------
query timestamp: 1734549613466 (2024-12-18 19:20:13.466) +
oracle read timestamp: 1734549613466 (2024-12-18 19:20:13.466) +
largest not in advance of upper: 0 (1970-01-01 00:00:00.000) +
upper:[ 0 (1970-01-01 00:00:00.000)]+
since:[ 0 (1970-01-01 00:00:00.000)]+
can respond immediately: false +
timeline: Some(EpochMilliseconds) +
session wall time: 1734549614451 (2024-12-18 19:20:14.451) +
+
source materialize.public.kafka_proto_source (u5, storage): +
read frontier:[ 0 (1970-01-01 00:00:00.000)]+
write frontier:[ 0 (1970-01-01 00:00:00.000)]+
(1 row)
|
5bb01ec
to
cb45af0
Compare
cb45af0
to
254b6ed
Compare
The issue also repros with |
The load gen output looks like this, which is slightly different that the kafka source above:
EDIT: Actually taking a closer look at this, I don't know how to explain it at all. Why would the since be different than the read frontier? |
…sorting of item updates
254b6ed
to
dc960f3
Compare
This describes creating the source. When creating the table from source, we attempt to re-start the source dataflow, but it is also skipped. The materialize/src/storage/src/storage_state/async_storage_worker.rs Lines 224 to 264 in 5ffa224
However, materialize/src/storage/src/storage_state/async_storage_worker.rs Lines 266 to 323 in 5ffa224
I'm still investigating why |
The table has a data source of type |
This small change #30940 (i.e. enabling |
Motivation
The subsequent PR will implement https://github.com/MaterializeInc/database-issues/issues/8678, which will also disable use of the 'old style' source statements using the same feature-flag introduced here. Once this PR and that PR land, then enabling the
force_source_table_syntax
flag will completely switch over users to the new syntax.Tips for reviewer
To test this I've added a new scenario to
platform-checks
calledActivateSourceVersioningMigration
, that runs materialize on an existing version for each check'sinitialize()
method, and then restarts materialize on the latest version with theforce_source_table_syntax
, activating the migration of any sources created using the 'old style' syntax. Then thevalidate()
step is run on this new version, confirming that all the queries continue to work.There are already existing
platform-checks
Checks
that use the 'old style' source syntax:TableFromPgSource
,TableFromMySqlSource
,LoadGeneratorAsOfUpTo
, and one I added calledUpsertLegacy
, that cover the 4 source types we need to test. There are also many other checks that use the old syntax when running on 'old' versions before 0.119, but I wasn't sure how to make theActivateSourceVersioningMigration
scenario target a specific version rather than just the 'previous' version for the base run. @def- @nrainer-materialize let me know if you have ideas on doing that.I've also updated the
legacy upgrade tests
to activate this migration after the upgrade which should provide additional coverage too.Nightly
https://buildkite.com/materialize/nightly/builds?branch=rjobanp%3Asource-table-migration
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.