Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

design: write design doc for bad data breaking sources #27540

Merged
merged 5 commits into from
Jun 27, 2024

Conversation

benesch
Copy link
Contributor

@benesch benesch commented Jun 10, 2024

Rendered: https://github.com/benesch/materialize/blob/source-errors-design-doc/doc/developer/design/20240609_error_handling.md

I've not been able to spend as much time on this as I hoped, but I promised I'd spill some ink on this subject this weekend. Please pardon any typos or half-finished thoughts. Leaving in draft to better capture the state.

We'll need to find someone else to pick this up—I won't have time to drive the review and iteration on the design myself.

Motivation

  • This PR adds a design doc for a problem that is known to be desirable to solve.

Checklist

Copy link
Contributor

@ggevay ggevay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks quite doable, especially if we would consider only persist-backed objects in the table factor. We would still satisfy both the "Must haves" and "Nice to haves" from here.

```

Syntatically, the `IGNORE ERRORS` and `ONLY ERRORS` modifiers may appear before
any table factor. For example, all of the following will be syntatically
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely disallow correlated subqueries in the table factor, because the evaluation of correlated subqueries can't be cleanly separated from the rest of the evaluation, so separating the errors in the subquery would be almost impossible.

`key` | `bytea`
`value` | `bytea`
`error` | `text`
`error_code` | `text`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that one benefit of the DLQ approach is that we could add a ingested_at column here that describes when the error occurred. Not sure we have that same luxury with the ONLY ERRORS approach ... unless we never advanced the since frontier of that collection?

In general there's a tangle of considerations here around "how do I ignore errors in the DLQ that I've already seen?"

Also need to think through how errors would get retracted from this collection, if at all, for upsert sources.

@benesch benesch force-pushed the source-errors-design-doc branch from 7bd9bfa to 9f5670c Compare June 17, 2024 06:40
@benesch
Copy link
Contributor Author

benesch commented Jun 17, 2024

Per our discussions, I pushed up a major revision of the design document that fleshes out the DLQ approach and marks it as the preferred approach. PTAL.


### Implementation

TODO.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 WARNING 🚨

Whoever picks this up will need to fill this important section in!


The DLQ table is append only. Allowing users to specify a retention policy on
the DLQ table is left to future work. Users cannot issue `INSERT`, `UPDATE`, or
`DELETE` commands against the DLQ.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an intended limitation or just one that follows from the DLQ being a subsource? If we implement https://github.com/MaterializeInc/materialize/issues/20208, and make the DLQ tables DELETE FROM-able, that would seem desirable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one that follows from it being a subsource! If we supported writing at sources (#15903) then yeah, we should also allow modifying the DLQ!

column will contain details about the error. Additionally, the value is forced
into a single nullable column named `value` with a type reflecting the
configured value format—flattening of record-type values into top-level columns
does not occur.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, the value is forced into a single nullable column named value

Why is this change necessary? Couldn't we just make all the columns nullable and put null in all value columns when there is an error? Or better yet, if the error happened only when decoding the value of one of the columns, put null only in that column. (Note that distinguishing real nulls from error nulls would still be possible, because one could additionally look at the error column.)

Note that unpacking a record is currently not a very efficient operation, because if you write

SELECT rec.a, rec.b, rec.c, rec.d, rec.e

we compile this to 5 record_get function calls, and each record_get call gets the full record, and it iterates through the fields, decoding each Datum, to get to the requested one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we just make all the columns nullable and put null in all value columns when there is an error?

Yes, but you'd weaken nullability constraints unnecessarily. Suppose my upstream table has schema (a int NOT NULL, b int NOT NULL). If that gets ingested into Materialize as value record(a int NOT NULL, b int NOT NULL) NULL, if the optimizer can prove that value IS NOT NULL, then it knows that a IS NOT NULL AND b IS NOT NULL. Whereas if you ingest the record as (a int, b int), and filter out WHERE error IS NOT NULL, you don't recover the non-nullability of a and b.

In fact I wish we'd never added the flattening behavior at all, because of weirdness like this. @petrosagg and I had a long discussion about this here: https://github.com/MaterializeInc/materialize/issues/10114#issuecomment-1943143981

Note that unpacking a record is currently not a very efficient operation

Understood! That said, nested Avro records are very common already, and so I don't think we're making the performance situation here worse by forcing one additional layer of nesting. Would rather aim for the optimal UX and fix the performance issues as they come up—especially since making nested records more performant would benefit of a lot of existing use cases that have explicitly nested records in the upstream data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related question -- is this value column also at the top-level of the source relation? So we are introducing two new columns to each upsert source, error and value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly. So if the upsert source used to contain offset, timestamp, a, b, it would now contain offset, timestamp, value, error, and value would be a record type containing columns a and b.

Copy link
Contributor

@ggevay ggevay Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Started a thread on potential performance improvements of record unpacking: https://materializeinc.slack.com/archives/C0761MZ3QD9/p1718655594016239

doc/developer/design/20240609_error_handling.md Outdated Show resolved Hide resolved
doc/developer/design/20240609_error_handling.md Outdated Show resolved Hide resolved
doc/developer/design/20240609_error_handling.md Outdated Show resolved Hide resolved
**Open questions:**

* Is there a better name for the option than `REDIRECT ERRORS`?
* What type gets reported in the system catalog for DLQ tables? Is it really a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, this seems similar to the progress subsource, though...if we're going to deprecate those and subsources as a concept soon, we should maybe think about what would make that transition easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, exactly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same thing -- it is likely easier to just treat these as another subsource, very similar to the progress subsource, but then we will have to migrate all the catalog objects over later. If we instead try to represent it as a table now, it might require some additional legwork to introduce the appropriate scaffolding but we would avoid the later migration.
Need to look a bit more at the code to decide this, but are there any concerns from a product perspective in either case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then we will have to migrate all the catalog objects over later.

Is the concern that we'd break user-created indexes/views that expect the DLQ to be a subsource?

Also, @morsapaes what's the rough timeline for eliminating subsources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less worried about that, and more worried about the educational impact—just, like, what do we say about this thing in the docs.

Copy link
Contributor

@morsapaes morsapaes Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, @morsapaes what's the rough timeline for eliminating subsources?

Now that this work took priority over the subsource work, the timeline is..."after this is done". The concern is a mix of what Roshan and Nikhil mentioned: do we make this a subsource to be consistent with the as is, but make our lives more complicated down the road when we remove the concept of subsources; or do we model it as a table with the will be in mind, but make it awkward to explain to users in light of how other objects currently work.

Since Kafka users will be largely unfamiliar with the concept of a subsource, I'd be inclined towards the latter, if we find it technically feasible and like a good precursor to the (sub)source refactoring work.

table? Or is it a source or a subsource?
* Is it useful to report the `key` and the `value` in the DLQ relation? Most
of the time they will be unreadable bytes.
* Can the `REDIRECT ERRORS` option be added to an existing source (i.e., via
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cases where a source is stuck with an error, would it be possible to recover it by issuing this ALTER statement? Should we be creating these DLQ for all sources, by default? It's hard to think of a scenario where users would prefer their source to stall instead of progress and log errors, so maybe the option should instead be designed to allow disabling the behavior, rather than enabling it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cases where a source is stuck with an error, would it be possible to recover it by issuing this ALTER statement?

That would be the hardest behavior to support. Once we've committed to putting an error in the errs stream, we can't uncommit to that without violating Materialize's correctness guarantees.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above: is this a question we need to answer before building the first version (to make sure we don't build ourselves into a corner)? Or can we punt on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the easy thing to do is just not to allow it to be added later.

Copy link
Contributor

@sthm sthm Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the easy thing to do is just not to allow it to be added later.

In that case I think we should add a dead letter queue by default as Marta proposed. The way errors are treated (and potentially brick sources) does not seem to be obvious. So if folks can't easily fix a broken source (because they cannot retrospectively add a dead letter queue), we should make sure they avoid the problem by adding a dead letter queue by default.


**Open questions:**

* Does the separation of the `REDIRECT ERRORS` and the `VALUE DECODING ERRORS`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a hard time understanding why we'd want both options if all errors end up in the DLQ. I'm not sure the inline option helps with the user ask of being able to identify sources that have errors, either, because it requires looping through all existing sources? At least my understanding was that the ask was being able to query something like a system catalog table that rolls up all the affected sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a hard time understanding why we'd want both options if all errors end up in the DLQ.

The DLQ only tells you that an error did happen. For an upsert source, it can't tell you whether all keys currently have valid values or not.

I'm not sure the inline option helps with the user ask of being able to identify sources that have errors, either, because it requires looping through all existing sources? At least my understanding was that the ask was being able to query something like a system catalog table that rolls up all the affected sources.

Yeah, there's no single query that you can run to find all sources that contain errors. I think that's acceptable for an MVP though. We can help the customer solve for that in dbt or the Prometheus SQL exporter in the short term. The long term fix would require doing something like unified compute introspection but for sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also slightly confused on why we have both the DLQ and the inline errors feature to solve this issue. If we're already adding two new columns to support the INLINE feature for upsert sources, why not just add those same two columns for append-only sources too? And then expose one option to treat decode errors as either real errors or inline errors, with no DLQ at all?

The downside is that append-only source values would become nested in this case, but this seems like it could be an acceptable tradeoff for folks who need inline error support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're already adding two new columns to support the INLINE feature for upsert sources, why not just add those same two columns for append-only sources too? ... The downside is that append-only source values would become nested in this case, but this seems like it could be an acceptable tradeoff for folks who need inline error support?

Agreed that this works nicely for ENVELOPE NONE sources, and agree that the nesting of the values column seems acceptable.

The place where I got stuck is: where do you put errors for ENVELOPE UPSERT that occur when decoding the key? Without a DLQ there isn't anywhere good to put them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an upsert source, it can't tell you whether all keys currently have valid values or not.

Similar to above, I think we could punt on this. I think the V0 deliverable is "sources don't break when bad data shows up," and the V1+ deliverables can improve developer workflows when something is broken.

Can we cut this from scope? Or would we be painting ourselves into a corner?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

referencing @benesch 's follow up comment here: #27540 (comment)

It sounds like we're all thinking that we could get away with just 1 option for the MVP and satisfy this customer's issue, but depends on which. @JLDLaughlin suggested cutting the inline-error feature and just delivering the DLQ, and @benesch is suggesting cutting the DLQ and just supporting the inline-errors which solve the primary issue of value decode errors in upsert sources.

I'm on the fence -- the DLQ handles more cases overall but it means an upsert source may present incorrect data (a stale value for a key), whereas the inline-error option provides correctness but doesnt account for key-decode errors and append-only sources...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this live with Jessica, since I've been thinking we might want to go in the other direction and only do VALUE DECODING ERRORS = INLINE.

* Is there a better name for the option than `REDIRECT ERRORS`?
* What type gets reported in the system catalog for DLQ tables? Is it really a
table? Or is it a source or a subsource?
* Is it useful to report the `key` and the `value` in the DLQ relation? Most
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time they will be unreadable bytes.

Does this mean that the way to identify the faulty record upstream (e.g. if you want to emit a fix) is by using the offset? More specifically, do you mean this because error records will either have a bogus key or a bogus value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just meant because the key and value will be hex encoded byte strings like \xa49f19c19. Clarified the wording.

doc/developer/design/20240609_error_handling.md Outdated Show resolved Hide resolved
@benesch benesch force-pushed the source-errors-design-doc branch from 9f5670c to c14eb56 Compare June 17, 2024 18:46
Per feedback, the DLQ approach is preferred. Adjust the design proposal
accordingly.
@benesch benesch force-pushed the source-errors-design-doc branch from c14eb56 to 7dccfaf Compare June 17, 2024 18:53
@benesch benesch marked this pull request as ready for review June 17, 2024 19:02
@benesch
Copy link
Contributor Author

benesch commented Jun 17, 2024

@rjobanp based on your comment about "why both a DLQ and inline errors?", here's a take on an MVP: just add support for VALUE DECODING ERROR = {PROPAGATE | INLINE}. That might be sufficient to cover the customer's use case for this feature—I think they're almost entirely using ENVELOPE UPSERT sources. Key decoding errors would still be sent to the errs stream and break the source, but I think key decoding errors are much much less common.


**Open questions:**

* Is there a better name for the option than `REDIRECT ERRORS`?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about ON ERROR? It's the Snowpipe syntax for similar behavior, and it makes sense to me!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is the equivalent Snowpipe option not ERROR_INTEGRATION? ON_ERROR seems to be about whether to CONTINUE or to SKIP_FILE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't they related? At least my understanding is that ERROR_INTEGRATION relies on setting up a notification integration, which is only triggered if ON_ERROR is set to SKIP_FILE. To query the errors, you then have to use the NOTIFICATION_HISTORY table function. But this is for Snowpipe (batch).

For the (Snowpipe Streaming-based) Kafka connector, they're also following a DLQ approach (docs).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they're related in Snowflake! What I'm getting at is that I don't think REDIRECT ERRORS option proposed here maps directly to the ON_ERROR option. REDIRECT ERRORS seems closer to the ERROR_INTEGRATION option than the ON_ERROR option.

If we wanted the to to be more ON_ERROR-like, we could do ON ERROR = {RECORD | SEND TO <name>}, where RECORD means "record it in this source" like we do presently, and SEND TO <name> means "do the DLQ thing."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is that...it doesn't map to either because error logging in Snowpipe uses a different logic, whereas in Snowpipe Streaming REDIRECT ERRORS is directly equivalent to errors.deadletterqueue.topic.name!

table? Or is it a source or a subsource?
* Is it useful to report the `key` and the `value` in the DLQ relation? Most
of the time they will be unreadable bytes.
* Can the `REDIRECT ERRORS` option be added to an existing source (i.e., via
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above: is this a question we need to answer before building the first version (to make sure we don't build ourselves into a corner)? Or can we punt on this?

* Is there a better name for the option than `REDIRECT ERRORS`?
* What type gets reported in the system catalog for DLQ tables? Is it really a
table? Or is it a source or a subsource?
* Is it useful to report the `key` and the `value` in the DLQ relation? The
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a question we need to answer before building the first version (to make sure we don't build ourselves into a corner)? Or can we punt on this?

the DLQ table is left to future work. Users cannot issue `INSERT`, `UPDATE`, or
`DELETE` commands against the DLQ[^2].

When a DLQ table is attached to a source, no errors are emitted to the source's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is a DLQ table attached to a source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, when the REDIRECT ERRORS option is specified.

**Open questions:**

* Is there a better name for the option than `REDIRECT ERRORS`?
* What type gets reported in the system catalog for DLQ tables? Is it really a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then we will have to migrate all the catalog objects over later.

Is the concern that we'd break user-created indexes/views that expect the DLQ to be a subsource?

Also, @morsapaes what's the rough timeline for eliminating subsources?


**Open questions:**

* Does the separation of the `REDIRECT ERRORS` and the `VALUE DECODING ERRORS`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an upsert source, it can't tell you whether all keys currently have valid values or not.

Similar to above, I think we could punt on this. I think the V0 deliverable is "sources don't break when bad data shows up," and the V1+ deliverables can improve developer workflows when something is broken.

Can we cut this from scope? Or would we be painting ourselves into a corner?

* Can the `REDIRECT ERRORS` option be added to an existing source (i.e., via
`ALTER SOURCE`)? If so, what is the behavior? Does it pick up for all new
messages?
* Does the `mz_timestamp` column make sense? Is there something better
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense!


**Open questions:**

* How do we ensure consistency? The DLQ shard and data shard need to be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could once again propose "subsources are actually tables and ingestions write to them atomically" again. 😅 Though that has it's know downsides and is a bigger project!

Comment on lines +197 to +199
* When using `VALUE DECODING ERRORS = INLINE`, do we correctly handle
retractions across versions if the error text changes? (How do we handle
this today?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly confused by this question -- can anyone clarify if my understanding below is incorrect:

  • An upsert source receives a (key, value) pair and checks its backend (e.g. rocksdb) to see if the key is already present. If it is already present, it emits a diff ((key, old_value), -1) to retract the old value for the key before emitting ((key, value), 1) to signify the new value.
  • When an upsert source is restarted, it reads back its own collection from persist, which may or may not have compacted away some of the previous data whose diffs cancel out.
  • This is used to rehydrate the upsert backend (rocksdb) with the present value of each key

Let's say a key contained an inline error-value with some error text at version foo: (key, error-value-foo) and this was rehydrated into a new instance running code at version bar.

The error value read from persist on rehydration would still be (key, error-value-foo). And if the bad value was re-read from the upstream source such that the decoding error generated (key, error-value-bar), we would just emit a ((key, error-value-foo), -1), ((key, error-value-bar), 1), right?

In what case might we accidentally miss a retraction if the error text changed between versions?

@rjobanp rjobanp force-pushed the source-errors-design-doc branch from 8f2ab7a to 3f26117 Compare June 20, 2024 17:25
Copy link
Contributor Author

@benesch benesch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation makes sense to me, with the caveat that I'm not too familiar with the upsert implementation!

Comment on lines +230 to +231
If `propagate_errors` is set to `true`, it will continue to produce an additional row with the
`UpsertValueError` error. This will require switching the `map` collection operator to a `flat_map`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏽

@rjobanp
Copy link
Contributor

rjobanp commented Jun 21, 2024

I've just opened a PR #27802 to implement the inline errors feature in this design doc

Copy link
Contributor

@rjobanp rjobanp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a note that part of the inline-errors design was implemented, and the DLQ was punted for now. We should probably merge this and then edit it in the future with the implementation of the DLQ, if and when we decide to do that work.

@benesch
Copy link
Contributor Author

benesch commented Jun 27, 2024

I added a note that part of the inline-errors design was implemented, and the DLQ was punted for now. We should probably merge this and then edit it in the future with the implementation of the DLQ, if and when we decide to do that work.

Yep, sounds great. Thanks very much, @rjobanp! 🙇🏽

@benesch benesch merged commit 7d8ad36 into MaterializeInc:main Jun 27, 2024
7 checks passed
@benesch benesch deleted the source-errors-design-doc branch June 27, 2024 01:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants