-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
design: write design doc for bad data breaking sources #27540
design: write design doc for bad data breaking sources #27540
Conversation
Touches #22430.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks quite doable, especially if we would consider only persist-backed objects in the table factor. We would still satisfy both the "Must haves" and "Nice to haves" from here.
``` | ||
|
||
Syntatically, the `IGNORE ERRORS` and `ONLY ERRORS` modifiers may appear before | ||
any table factor. For example, all of the following will be syntatically |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should definitely disallow correlated subqueries in the table factor, because the evaluation of correlated subqueries can't be cleanly separated from the rest of the evaluation, so separating the errors in the subquery would be almost impossible.
`key` | `bytea` | ||
`value` | `bytea` | ||
`error` | `text` | ||
`error_code` | `text` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It occurs to me that one benefit of the DLQ approach is that we could add a ingested_at
column here that describes when the error occurred. Not sure we have that same luxury with the ONLY ERRORS
approach ... unless we never advanced the since frontier of that collection?
In general there's a tangle of considerations here around "how do I ignore errors in the DLQ that I've already seen?"
Also need to think through how errors would get retracted from this collection, if at all, for upsert sources.
7bd9bfa
to
9f5670c
Compare
Per our discussions, I pushed up a major revision of the design document that fleshes out the DLQ approach and marks it as the preferred approach. PTAL. |
|
||
### Implementation | ||
|
||
TODO. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 WARNING 🚨
Whoever picks this up will need to fill this important section in!
|
||
The DLQ table is append only. Allowing users to specify a retention policy on | ||
the DLQ table is left to future work. Users cannot issue `INSERT`, `UPDATE`, or | ||
`DELETE` commands against the DLQ. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an intended limitation or just one that follows from the DLQ being a subsource? If we implement https://github.com/MaterializeInc/materialize/issues/20208, and make the DLQ tables DELETE FROM
-able, that would seem desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one that follows from it being a subsource! If we supported writing at sources (#15903) then yeah, we should also allow modifying the DLQ!
column will contain details about the error. Additionally, the value is forced | ||
into a single nullable column named `value` with a type reflecting the | ||
configured value format—flattening of record-type values into top-level columns | ||
does not occur. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, the value is forced into a single nullable column named
value
Why is this change necessary? Couldn't we just make all the columns nullable and put null in all value columns when there is an error? Or better yet, if the error happened only when decoding the value of one of the columns, put null only in that column. (Note that distinguishing real nulls from error nulls would still be possible, because one could additionally look at the error column.)
Note that unpacking a record is currently not a very efficient operation, because if you write
SELECT rec.a, rec.b, rec.c, rec.d, rec.e
we compile this to 5 record_get
function calls, and each record_get
call gets the full record, and it iterates through the fields, decoding each Datum
, to get to the requested one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we just make all the columns nullable and put null in all value columns when there is an error?
Yes, but you'd weaken nullability constraints unnecessarily. Suppose my upstream table has schema (a int NOT NULL, b int NOT NULL)
. If that gets ingested into Materialize as value record(a int NOT NULL, b int NOT NULL) NULL
, if the optimizer can prove that value IS NOT NULL
, then it knows that a IS NOT NULL AND b IS NOT NULL
. Whereas if you ingest the record as (a int, b int)
, and filter out WHERE error IS NOT NULL
, you don't recover the non-nullability of a
and b
.
In fact I wish we'd never added the flattening behavior at all, because of weirdness like this. @petrosagg and I had a long discussion about this here: https://github.com/MaterializeInc/materialize/issues/10114#issuecomment-1943143981
Note that unpacking a record is currently not a very efficient operation
Understood! That said, nested Avro records are very common already, and so I don't think we're making the performance situation here worse by forcing one additional layer of nesting. Would rather aim for the optimal UX and fix the performance issues as they come up—especially since making nested records more performant would benefit of a lot of existing use cases that have explicitly nested records in the upstream data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related question -- is this value
column also at the top-level of the source relation? So we are introducing two new columns to each upsert source, error
and value
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, exactly. So if the upsert source used to contain offset, timestamp, a, b
, it would now contain offset, timestamp, value, error
, and value
would be a record type containing columns a
and b
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Started a thread on potential performance improvements of record unpacking: https://materializeinc.slack.com/archives/C0761MZ3QD9/p1718655594016239
**Open questions:** | ||
|
||
* Is there a better name for the option than `REDIRECT ERRORS`? | ||
* What type gets reported in the system catalog for DLQ tables? Is it really a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, this seems similar to the progress subsource, though...if we're going to deprecate those and subsources as a concept soon, we should maybe think about what would make that transition easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking the same thing -- it is likely easier to just treat these as another subsource, very similar to the progress subsource, but then we will have to migrate all the catalog objects over later. If we instead try to represent it as a table now, it might require some additional legwork to introduce the appropriate scaffolding but we would avoid the later migration.
Need to look a bit more at the code to decide this, but are there any concerns from a product perspective in either case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but then we will have to migrate all the catalog objects over later.
Is the concern that we'd break user-created indexes/views that expect the DLQ to be a subsource?
Also, @morsapaes what's the rough timeline for eliminating subsources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Less worried about that, and more worried about the educational impact—just, like, what do we say about this thing in the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, @morsapaes what's the rough timeline for eliminating subsources?
Now that this work took priority over the subsource work, the timeline is..."after this is done". The concern is a mix of what Roshan and Nikhil mentioned: do we make this a subsource to be consistent with the as is, but make our lives more complicated down the road when we remove the concept of subsources; or do we model it as a table with the will be in mind, but make it awkward to explain to users in light of how other objects currently work.
Since Kafka users will be largely unfamiliar with the concept of a subsource, I'd be inclined towards the latter, if we find it technically feasible and like a good precursor to the (sub)source refactoring work.
table? Or is it a source or a subsource? | ||
* Is it useful to report the `key` and the `value` in the DLQ relation? Most | ||
of the time they will be unreadable bytes. | ||
* Can the `REDIRECT ERRORS` option be added to an existing source (i.e., via |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For cases where a source is stuck with an error, would it be possible to recover it by issuing this ALTER
statement? Should we be creating these DLQ for all sources, by default? It's hard to think of a scenario where users would prefer their source to stall instead of progress and log errors, so maybe the option should instead be designed to allow disabling the behavior, rather than enabling it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For cases where a source is stuck with an error, would it be possible to recover it by issuing this
ALTER
statement?
That would be the hardest behavior to support. Once we've committed to putting an error in the errs stream, we can't uncommit to that without violating Materialize's correctness guarantees.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above: is this a question we need to answer before building the first version (to make sure we don't build ourselves into a corner)? Or can we punt on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the easy thing to do is just not to allow it to be added later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the easy thing to do is just not to allow it to be added later.
In that case I think we should add a dead letter queue by default as Marta proposed. The way errors are treated (and potentially brick sources) does not seem to be obvious. So if folks can't easily fix a broken source (because they cannot retrospectively add a dead letter queue), we should make sure they avoid the problem by adding a dead letter queue by default.
|
||
**Open questions:** | ||
|
||
* Does the separation of the `REDIRECT ERRORS` and the `VALUE DECODING ERRORS` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a hard time understanding why we'd want both options if all errors end up in the DLQ. I'm not sure the inline option helps with the user ask of being able to identify sources that have errors, either, because it requires looping through all existing sources? At least my understanding was that the ask was being able to query something like a system catalog table that rolls up all the affected sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a hard time understanding why we'd want both options if all errors end up in the DLQ.
The DLQ only tells you that an error did happen. For an upsert source, it can't tell you whether all keys currently have valid values or not.
I'm not sure the inline option helps with the user ask of being able to identify sources that have errors, either, because it requires looping through all existing sources? At least my understanding was that the ask was being able to query something like a system catalog table that rolls up all the affected sources.
Yeah, there's no single query that you can run to find all sources that contain errors. I think that's acceptable for an MVP though. We can help the customer solve for that in dbt or the Prometheus SQL exporter in the short term. The long term fix would require doing something like unified compute introspection but for sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also slightly confused on why we have both the DLQ and the inline errors feature to solve this issue. If we're already adding two new columns to support the INLINE feature for upsert sources, why not just add those same two columns for append-only sources too? And then expose one option to treat decode errors as either real errors or inline errors, with no DLQ at all?
The downside is that append-only source values would become nested in this case, but this seems like it could be an acceptable tradeoff for folks who need inline error support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're already adding two new columns to support the INLINE feature for upsert sources, why not just add those same two columns for append-only sources too? ... The downside is that append-only source values would become nested in this case, but this seems like it could be an acceptable tradeoff for folks who need inline error support?
Agreed that this works nicely for ENVELOPE NONE
sources, and agree that the nesting of the values
column seems acceptable.
The place where I got stuck is: where do you put errors for ENVELOPE UPSERT
that occur when decoding the key? Without a DLQ there isn't anywhere good to put them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For an upsert source, it can't tell you whether all keys currently have valid values or not.
Similar to above, I think we could punt on this. I think the V0 deliverable is "sources don't break when bad data shows up," and the V1+ deliverables can improve developer workflows when something is broken.
Can we cut this from scope? Or would we be painting ourselves into a corner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
referencing @benesch 's follow up comment here: #27540 (comment)
It sounds like we're all thinking that we could get away with just 1 option for the MVP and satisfy this customer's issue, but depends on which. @JLDLaughlin suggested cutting the inline-error feature and just delivering the DLQ, and @benesch is suggesting cutting the DLQ and just supporting the inline-errors which solve the primary issue of value decode errors in upsert sources.
I'm on the fence -- the DLQ handles more cases overall but it means an upsert source may present incorrect data (a stale value for a key), whereas the inline-error option provides correctness but doesnt account for key-decode errors and append-only sources...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this live with Jessica, since I've been thinking we might want to go in the other direction and only do VALUE DECODING ERRORS = INLINE
.
* Is there a better name for the option than `REDIRECT ERRORS`? | ||
* What type gets reported in the system catalog for DLQ tables? Is it really a | ||
table? Or is it a source or a subsource? | ||
* Is it useful to report the `key` and the `value` in the DLQ relation? Most |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time they will be unreadable bytes.
Does this mean that the way to identify the faulty record upstream (e.g. if you want to emit a fix) is by using the offset? More specifically, do you mean this because error records will either have a bogus key
or a bogus value
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I just meant because the key
and value
will be hex encoded byte strings like \xa49f19c19
. Clarified the wording.
9f5670c
to
c14eb56
Compare
Per feedback, the DLQ approach is preferred. Adjust the design proposal accordingly.
c14eb56
to
7dccfaf
Compare
@rjobanp based on your comment about "why both a DLQ and inline errors?", here's a take on an MVP: just add support for |
|
||
**Open questions:** | ||
|
||
* Is there a better name for the option than `REDIRECT ERRORS`? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about ON ERROR
? It's the Snowpipe syntax for similar behavior, and it makes sense to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, is the equivalent Snowpipe option not ERROR_INTEGRATION
? ON_ERROR
seems to be about whether to CONTINUE
or to SKIP_FILE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't they related? At least my understanding is that ERROR_INTEGRATION
relies on setting up a notification integration, which is only triggered if ON_ERROR
is set to SKIP_FILE
. To query the errors, you then have to use the NOTIFICATION_HISTORY
table function. But this is for Snowpipe (batch).
For the (Snowpipe Streaming-based) Kafka connector, they're also following a DLQ approach (docs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they're related in Snowflake! What I'm getting at is that I don't think REDIRECT ERRORS
option proposed here maps directly to the ON_ERROR
option. REDIRECT ERRORS
seems closer to the ERROR_INTEGRATION
option than the ON_ERROR
option.
If we wanted the to to be more ON_ERROR
-like, we could do ON ERROR = {RECORD | SEND TO <name>}
, where RECORD
means "record it in this source" like we do presently, and SEND TO <name>
means "do the DLQ thing."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is that...it doesn't map to either because error logging in Snowpipe uses a different logic, whereas in Snowpipe Streaming REDIRECT ERRORS
is directly equivalent to errors.deadletterqueue.topic.name
!
table? Or is it a source or a subsource? | ||
* Is it useful to report the `key` and the `value` in the DLQ relation? Most | ||
of the time they will be unreadable bytes. | ||
* Can the `REDIRECT ERRORS` option be added to an existing source (i.e., via |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above: is this a question we need to answer before building the first version (to make sure we don't build ourselves into a corner)? Or can we punt on this?
* Is there a better name for the option than `REDIRECT ERRORS`? | ||
* What type gets reported in the system catalog for DLQ tables? Is it really a | ||
table? Or is it a source or a subsource? | ||
* Is it useful to report the `key` and the `value` in the DLQ relation? The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a question we need to answer before building the first version (to make sure we don't build ourselves into a corner)? Or can we punt on this?
the DLQ table is left to future work. Users cannot issue `INSERT`, `UPDATE`, or | ||
`DELETE` commands against the DLQ[^2]. | ||
|
||
When a DLQ table is attached to a source, no errors are emitted to the source's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is a DLQ table attached to a source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, when the REDIRECT ERRORS
option is specified.
**Open questions:** | ||
|
||
* Is there a better name for the option than `REDIRECT ERRORS`? | ||
* What type gets reported in the system catalog for DLQ tables? Is it really a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but then we will have to migrate all the catalog objects over later.
Is the concern that we'd break user-created indexes/views that expect the DLQ to be a subsource?
Also, @morsapaes what's the rough timeline for eliminating subsources?
|
||
**Open questions:** | ||
|
||
* Does the separation of the `REDIRECT ERRORS` and the `VALUE DECODING ERRORS` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For an upsert source, it can't tell you whether all keys currently have valid values or not.
Similar to above, I think we could punt on this. I think the V0 deliverable is "sources don't break when bad data shows up," and the V1+ deliverables can improve developer workflows when something is broken.
Can we cut this from scope? Or would we be painting ourselves into a corner?
* Can the `REDIRECT ERRORS` option be added to an existing source (i.e., via | ||
`ALTER SOURCE`)? If so, what is the behavior? Does it pick up for all new | ||
messages? | ||
* Does the `mz_timestamp` column make sense? Is there something better |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense!
|
||
**Open questions:** | ||
|
||
* How do we ensure consistency? The DLQ shard and data shard need to be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could once again propose "subsources are actually tables and ingestions write to them atomically" again. 😅 Though that has it's know downsides and is a bigger project!
* When using `VALUE DECODING ERRORS = INLINE`, do we correctly handle | ||
retractions across versions if the error text changes? (How do we handle | ||
this today?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly confused by this question -- can anyone clarify if my understanding below is incorrect:
- An upsert source receives a (key, value) pair and checks its backend (e.g. rocksdb) to see if the key is already present. If it is already present, it emits a diff
((key, old_value), -1)
to retract the old value for the key before emitting((key, value), 1)
to signify the new value. - When an upsert source is restarted, it reads back its own collection from persist, which may or may not have compacted away some of the previous data whose diffs cancel out.
- This is used to rehydrate the upsert backend (rocksdb) with the present value of each key
Let's say a key contained an inline error-value with some error text at version foo: (key, error-value-foo)
and this was rehydrated into a new instance running code at version bar.
The error value read from persist on rehydration would still be (key, error-value-foo)
. And if the bad value was re-read from the upstream source such that the decoding error generated (key, error-value-bar)
, we would just emit a ((key, error-value-foo), -1)
, ((key, error-value-bar), 1)
, right?
In what case might we accidentally miss a retraction if the error text changed between versions?
8f2ab7a
to
3f26117
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation makes sense to me, with the caveat that I'm not too familiar with the upsert implementation!
If `propagate_errors` is set to `true`, it will continue to produce an additional row with the | ||
`UpsertValueError` error. This will require switching the `map` collection operator to a `flat_map`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏽
I've just opened a PR #27802 to implement the inline errors feature in this design doc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a note that part of the inline-errors design was implemented, and the DLQ was punted for now. We should probably merge this and then edit it in the future with the implementation of the DLQ, if and when we decide to do that work.
Yep, sounds great. Thanks very much, @rjobanp! 🙇🏽 |
Rendered: https://github.com/benesch/materialize/blob/source-errors-design-doc/doc/developer/design/20240609_error_handling.md
I've not been able to spend as much time on this as I hoped, but I promised I'd spill some ink on this subject this weekend. Please pardon any typos or half-finished thoughts. Leaving in draft to better capture the state.
We'll need to find someone else to pick this up—I won't have time to drive the review and iteration on the design myself.
Motivation
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.