From 7bd9bfacb96ec4815d49c01eeea42cdd4b6af1d0 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Sat, 15 Jun 2024 10:31:37 -0400 Subject: [PATCH] design: prefer DLQ in error handling design Per feedback, the DLQ approach is preferred. Adjust the design proposal accordingly. --- .../design/20240609_error_handling.md | 238 +++++++++++++----- 1 file changed, 177 insertions(+), 61 deletions(-) diff --git a/doc/developer/design/20240609_error_handling.md b/doc/developer/design/20240609_error_handling.md index e9d2f43fb6215..e64261b156958 100644 --- a/doc/developer/design/20240609_error_handling.md +++ b/doc/developer/design/20240609_error_handling.md @@ -10,6 +10,182 @@ See [Bad Data Kills Sources (Programmable Errors) in Notion][product-brief]. ## Solution Proposal +### Motivation + +Only the Kafka source is particularly prone to bad data. With PostgreSQL and +MySQL sources, it is almost always a bug (either in in the upstream system or in +Materialize) if data fails to decode, as the upstream systems enforce schemas. +But Kafka does not guarantee schema enforcement, so it is common for bad data to +slip into a Kafka topic. + +Because Kafka sources are the most prone to bad data, we propose to pursue a +Kafka source-specific solution for handling source decoding errors. + +### User experience + +#### Dead letter queue + +We propose to introduce a [dead letter queue][dlq] (DLQ) for Kafka sources. + +Intuitively, the DLQ provides a running log of all decoding errors encountered +by the source. Materialize users can monitor the count of errors in the DLQ and +set up alerts whenever the count increases. Users can filter on `mz_timestamp` +and/or `timestamp` to eliminate old errors that are no longe relevant. (Using +the former column filters by ingestion time, while filtering on the latter +column filters by production time.) + +Concretely, we propose to add a `REDIRECT ERRORS` option to `CREATE SOURCE` that +specifies the name of a DLQ table in which to emit information about undecodable +messages: + +```sql +CREATE SOURCE src TO KAFKA CONNECTION kconn ... WITH ( + REDIRECT ERRORS = dlq +) +``` + +The specified DLQ table (`dlq` in the example above) must not exist before the +`CREATE SOURCE` command is executed. Materialize will automatically create the +DLQ table with the following Kafka-specific structure. + + Name | Type | Nullable | Description +----------------|----------------|----------|------------ + `mz_timestamp` | `mz_timestamp` | No | The logical timestamp that the message was reclocked to. + `error` | `text` | No | The text of the decoding error. + `error_code` | `text` | No | The code of the decoding error. + `partition` | `integer` | No | The partition of the Kafka message that failed to decode. + `offset` | `bigint` | No | The offset of the Kafka message that failed to decode. + `timestamp` | `timestamp` | No | The timestamp of the Kafka message that failed to decode. + `key` | `bytea` | Yes | The key bytes of the Kafka message that failed to decode. + `value` | `bytea` | Yes | The value bytes of the Kafka message that failed to decode. + +The DLQ table works like a subsource. It can be queried via `SELECT`, subscribed +to via `SUBSCRIBE`, and used as the upstream relation for a Kafka sink. + +The DLQ table is append only. Allowing users to specify a retention policy on +the DLQ table is left to future work. Users cannot issue `INSERT`, `UPDATE`, or +`DELETE` commands against the DLQ. + +When a DLQ table is attached to a source, no errors are emitted to the source's +*errs stream* (see the [error handling section][error-handling-docs] of the +`compute::render` module). + +**Open questions:** + + * Is there a better name for the option than `REDIRECT ERRORS`? + * What type gets reported in the system catalog for DLQ tables? Is it really a + table? Or is it a source or a subsource? + * Is it useful to report the `key` and the `value` in the DLQ relation? Most + of the time they will be unreadable bytes. + * Can the `REDIRECT ERRORS` option be added to an existing source (i.e., via + `ALTER SOURCE`)? If so, what is the behavior? Does it pick up for all new + messages? + * Does the `mz_timestamp` column make sense? Is there something better + to call this column? Do the semantics make sense? (Including an + `ingested_at` wall clock timestamp would be nice, but that wouldn't + be a deterministic function of reclocking.) + +#### Upsert errors + +While the DLQ is useful for alerting users to data quality issues, it doesn't +help users answer the question "does my upsert source currently have any keys +whose most recent value failed to decode?" + +To solve this problem, we propose the addition of an `INLINE ERRORS` option +to `ENVELOPE UPSERT`: + +```sql +CREATE SOURCE src TO KAFKA CONNECTION kconn ... +ENVELOPE UPSERT ( + VALUE DECODING ERRORS = ({INLINE | PROPAGATE}, ...) +) +``` + +The default behavior, which matches today's behavior, is `PROPAGATE`. Value +decoding errors are propagated to the DLQ table or the source's *err stream*, +whichever is active. + +When the `INLINE` behavior is specified, the source's relation gains a nullable +column named `error` with a type of `record(description: text, code: text)`. If +the most recent value for a key has been successfully decoded, this column will +be `NULL`. If the most recent value for a key was not succesfully decoded, this +column will contain details about the error. Additionally, the value is forced +into a single nullable column named `value` with a type reflecting the +configured value format—flattening of record-type values into top-level columns +does not occur. + +When the `PROPAGATE` behavior is not specified, value decoding errors are +essentially *not* treated as errors. They are neither forwarded to the DLQ table +nor the source *errs stream*. + +When both `INLINE` and `PROPAGATE` are specified, the errors are both reported +inline in the source and propagated to the DLQ table or the source's *errs +stream*, whichever is active. + +Even when using `VALUE DECODING ERRORS = INLINE`, users need to monitor the DLQ +table or the source's *errs stream* for errors, as errors while decoding the key +still get sent to the DLQ. (There is no good way to represent key decoding +errors inline without breaking upsert semantics.) + +The `INLINE` behavior allows users to discover upsert sources that presently +have bad data by querying each source and checking `count(error) > 0`. They can +discover the affected keys by running `SELECT key ... WHERE error IS NOT NULL`. + +**Open questions:** + + * Does the separation of the `REDIRECT ERRORS` and the `VALUE DECODING ERRORS` + options make sense? + +#### Limitations + +The above options do not generalize to handling decoding errors that occur +outside of the source decoding pipeline. For example, imagine a JSON source that +is further parsed into columns by a downstream materialized view: + +```sql +CREATE SOURCE my_source + FROM KAFKA CONNECTION kafka_connection (TOPIC 'samsa') + FORMAT JSON; + +CREATE MATERIALIZED VIEW my_view AS + SELECT + (data->>'userid')::int AS userid, + FROM my_source; +``` + +Bad data can cause both the initial JSON parsing to fail, or the cast to `int` +in `my_view` to fail. The `REDIRECT ERRORS` and `VALUE DECODING ERRORS` options +do not help with the second type of error. We'll likely need to additionally +pursue a solution for gracefully handling invalid function calls, like those +described in [#6367]. + +### Implementation + +TODO. + +**Open questions:** + + * How do we ensure consistency? The DLQ shard and data shard need to be + updated in lockstep. + + * When using `VALUE DECODING ERRORS = INLINE`, do we correctly handle + retractions across versions if error text changes? (How do we handle this + today?) + +### Future extensions + +We expect to add other source types in the future that are prone to bad data. +Imagine S3 sources or Kinesis sources. We'll need to be comfortable extending +the DLQ concept to these new sources. This seems admittedly straightforward, +though each new source type will require its own specific DLQ relation +structure. + +## Rejected alternative + +> [!CAUTION] +> This section describes a design that was rejected due to concerns over its +> implications for correctness. + ### Synopsis We propose to introduce new table reference modifiers that control the @@ -227,67 +403,6 @@ or become a `FlavoredGid`. ### Kafka source-specific dead letter queue -Only the Kafka source is particularly prone to bad data. With PostgreSQL and -MySQL sources, it is almost always a bug (either in in the upstream system or in -Materialize) if data fails to decode, as the upstream systems enforce schemas. -But Kafka does not guarantee schema enforcement, so it is common for bad data to -slip into a Kafka topic. - -Because Kafka sources are the most prone to bad data, we could instead choose to -pursue a Kafka source-specific solution. For example, imagine that we introduced -a dead letter queue (DLQ) for Kafka sources. Concretely, we could add a `DEAD -LETTERS` option (strawman name) to `CREATE SOURCE` that specifies the name of a -relation in which to emit undecodable messages: - -``` -CREATE SOURCE ksrc TO KAFKA CONNECTION kconn ... WITH ( - DEAD LETTERS = kbad -) -``` - -The `kbad` relation would have a structure like: - - Name | Type ---------------|-------- - `key` | `bytea` - `value` | `bytea` - `error` | `text` - `error_code` | `text` - - -There are a few considerations to note. - -First, we expect to add other source types in the future that are prone to bad -data. Imagine S3 sources or Kinesis sources. We'd need to be comfortable -extending the DLQ concept to these new sources—which seems admittedly -straightforward. - -Second, the source DLQ does not generalize to handling decoding errors that -occur outside of the source decoding pipeline. For example, imagine a JSON -source that is further parsed into columns by a downstream materialized view: - -```sql -CREATE SOURCE my_source - FROM KAFKA CONNECTION kafka_connection (TOPIC 'samsa') - FORMAT JSON; - -CREATE MATERIALIZED VIEW my_view AS - SELECT - (data->>'userid')::int AS userid, - FROM my_source; -``` - -Bad data can cause both the initial JSON parsing to fail, or the cast to `int` -in `my_view` to fail. A source DLQ does not help with the second type of error. -We'd need to additionally pursue a solution for gracefully handling -invalid function calls, like those described in [#6367]. - -**Open questions:** - - * Would this alternative be preferable to the proposed design? - * Is this alternative compatible with introducing `IGNORE ERRORS` and - `ONLY ERRORS` in the future? - * Can a dead letter queue be added to an existing source? [^1]: Refer to the PostgreSQL documentation for [details on the behavior of `LATERAL`][lateral-docs]. @@ -296,6 +411,7 @@ invalid function calls, like those described in [#6367]. [table factor]: https://github.com/MaterializeInc/materialize/blob/2c7ca4fb6b2a63134f6c123557bc09c1565524b7/src/sql-parser/src/parser.rs#L6991 [error-handling-docs]: https://github.com/MaterializeInc/materialize/blob/2c7ca4fb6b2a63134f6c123557bc09c1565524b7/src/compute/src/render.rs#L12-L101 [@maddyblue]: https://github.com/maddyblue +[dlq]: https://aws.amazon.com/what-is/dead-letter-queue/ [#6367]: https://github.com/MaterializeInc/materialize/issues/6367 [#22430]: https://github.com/MaterializeInc/materialize/issues/22430