Skip to content

Commit

Permalink
design: prefer DLQ in error handling design
Browse files Browse the repository at this point in the history
Per feedback, the DLQ approach is preferred. Adjust the design proposal
accordingly.
  • Loading branch information
benesch committed Jun 17, 2024
1 parent ae5e2a8 commit 7bd9bfa
Showing 1 changed file with 177 additions and 61 deletions.
238 changes: 177 additions & 61 deletions doc/developer/design/20240609_error_handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,182 @@ See [Bad Data Kills Sources (Programmable Errors) in Notion][product-brief].

## Solution Proposal

### Motivation

Only the Kafka source is particularly prone to bad data. With PostgreSQL and
MySQL sources, it is almost always a bug (either in in the upstream system or in
Materialize) if data fails to decode, as the upstream systems enforce schemas.
But Kafka does not guarantee schema enforcement, so it is common for bad data to
slip into a Kafka topic.

Because Kafka sources are the most prone to bad data, we propose to pursue a
Kafka source-specific solution for handling source decoding errors.

### User experience

#### Dead letter queue

We propose to introduce a [dead letter queue][dlq] (DLQ) for Kafka sources.

Intuitively, the DLQ provides a running log of all decoding errors encountered
by the source. Materialize users can monitor the count of errors in the DLQ and
set up alerts whenever the count increases. Users can filter on `mz_timestamp`
and/or `timestamp` to eliminate old errors that are no longe relevant. (Using
the former column filters by ingestion time, while filtering on the latter
column filters by production time.)

Concretely, we propose to add a `REDIRECT ERRORS` option to `CREATE SOURCE` that
specifies the name of a DLQ table in which to emit information about undecodable
messages:

```sql
CREATE SOURCE src TO KAFKA CONNECTION kconn ... WITH (
REDIRECT ERRORS = dlq
)
```

The specified DLQ table (`dlq` in the example above) must not exist before the
`CREATE SOURCE` command is executed. Materialize will automatically create the
DLQ table with the following Kafka-specific structure.

Name | Type | Nullable | Description
----------------|----------------|----------|------------
`mz_timestamp` | `mz_timestamp` | No | The logical timestamp that the message was reclocked to.
`error` | `text` | No | The text of the decoding error.
`error_code` | `text` | No | The code of the decoding error.
`partition` | `integer` | No | The partition of the Kafka message that failed to decode.
`offset` | `bigint` | No | The offset of the Kafka message that failed to decode.
`timestamp` | `timestamp` | No | The timestamp of the Kafka message that failed to decode.
`key` | `bytea` | Yes | The key bytes of the Kafka message that failed to decode.
`value` | `bytea` | Yes | The value bytes of the Kafka message that failed to decode.

The DLQ table works like a subsource. It can be queried via `SELECT`, subscribed
to via `SUBSCRIBE`, and used as the upstream relation for a Kafka sink.

The DLQ table is append only. Allowing users to specify a retention policy on
the DLQ table is left to future work. Users cannot issue `INSERT`, `UPDATE`, or
`DELETE` commands against the DLQ.

When a DLQ table is attached to a source, no errors are emitted to the source's
*errs stream* (see the [error handling section][error-handling-docs] of the
`compute::render` module).

**Open questions:**

* Is there a better name for the option than `REDIRECT ERRORS`?
* What type gets reported in the system catalog for DLQ tables? Is it really a
table? Or is it a source or a subsource?
* Is it useful to report the `key` and the `value` in the DLQ relation? Most
of the time they will be unreadable bytes.
* Can the `REDIRECT ERRORS` option be added to an existing source (i.e., via
`ALTER SOURCE`)? If so, what is the behavior? Does it pick up for all new
messages?
* Does the `mz_timestamp` column make sense? Is there something better
to call this column? Do the semantics make sense? (Including an
`ingested_at` wall clock timestamp would be nice, but that wouldn't
be a deterministic function of reclocking.)

#### Upsert errors

While the DLQ is useful for alerting users to data quality issues, it doesn't
help users answer the question "does my upsert source currently have any keys
whose most recent value failed to decode?"

To solve this problem, we propose the addition of an `INLINE ERRORS` option
to `ENVELOPE UPSERT`:

```sql
CREATE SOURCE src TO KAFKA CONNECTION kconn ...
ENVELOPE UPSERT (
VALUE DECODING ERRORS = ({INLINE | PROPAGATE}, ...)
)
```

The default behavior, which matches today's behavior, is `PROPAGATE`. Value
decoding errors are propagated to the DLQ table or the source's *err stream*,
whichever is active.

When the `INLINE` behavior is specified, the source's relation gains a nullable
column named `error` with a type of `record(description: text, code: text)`. If
the most recent value for a key has been successfully decoded, this column will
be `NULL`. If the most recent value for a key was not succesfully decoded, this
column will contain details about the error. Additionally, the value is forced
into a single nullable column named `value` with a type reflecting the
configured value format—flattening of record-type values into top-level columns
does not occur.

When the `PROPAGATE` behavior is not specified, value decoding errors are
essentially *not* treated as errors. They are neither forwarded to the DLQ table
nor the source *errs stream*.

When both `INLINE` and `PROPAGATE` are specified, the errors are both reported
inline in the source and propagated to the DLQ table or the source's *errs
stream*, whichever is active.

Even when using `VALUE DECODING ERRORS = INLINE`, users need to monitor the DLQ
table or the source's *errs stream* for errors, as errors while decoding the key
still get sent to the DLQ. (There is no good way to represent key decoding
errors inline without breaking upsert semantics.)

The `INLINE` behavior allows users to discover upsert sources that presently
have bad data by querying each source and checking `count(error) > 0`. They can
discover the affected keys by running `SELECT key ... WHERE error IS NOT NULL`.

**Open questions:**

* Does the separation of the `REDIRECT ERRORS` and the `VALUE DECODING ERRORS`
options make sense?

#### Limitations

The above options do not generalize to handling decoding errors that occur
outside of the source decoding pipeline. For example, imagine a JSON source that
is further parsed into columns by a downstream materialized view:

```sql
CREATE SOURCE my_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'samsa')
FORMAT JSON;

CREATE MATERIALIZED VIEW my_view AS
SELECT
(data->>'userid')::int AS userid,
FROM my_source;
```

Bad data can cause both the initial JSON parsing to fail, or the cast to `int`
in `my_view` to fail. The `REDIRECT ERRORS` and `VALUE DECODING ERRORS` options
do not help with the second type of error. We'll likely need to additionally
pursue a solution for gracefully handling invalid function calls, like those
described in [#6367].

### Implementation

TODO.

**Open questions:**

* How do we ensure consistency? The DLQ shard and data shard need to be
updated in lockstep.

* When using `VALUE DECODING ERRORS = INLINE`, do we correctly handle
retractions across versions if error text changes? (How do we handle this
today?)

### Future extensions

We expect to add other source types in the future that are prone to bad data.
Imagine S3 sources or Kinesis sources. We'll need to be comfortable extending
the DLQ concept to these new sources. This seems admittedly straightforward,
though each new source type will require its own specific DLQ relation
structure.

## Rejected alternative

> [!CAUTION]
> This section describes a design that was rejected due to concerns over its
> implications for correctness.
### Synopsis

We propose to introduce new table reference modifiers that control the
Expand Down Expand Up @@ -227,67 +403,6 @@ or become a `FlavoredGid`.

### Kafka source-specific dead letter queue

Only the Kafka source is particularly prone to bad data. With PostgreSQL and
MySQL sources, it is almost always a bug (either in in the upstream system or in
Materialize) if data fails to decode, as the upstream systems enforce schemas.
But Kafka does not guarantee schema enforcement, so it is common for bad data to
slip into a Kafka topic.

Because Kafka sources are the most prone to bad data, we could instead choose to
pursue a Kafka source-specific solution. For example, imagine that we introduced
a dead letter queue (DLQ) for Kafka sources. Concretely, we could add a `DEAD
LETTERS` option (strawman name) to `CREATE SOURCE` that specifies the name of a
relation in which to emit undecodable messages:

```
CREATE SOURCE ksrc TO KAFKA CONNECTION kconn ... WITH (
DEAD LETTERS = kbad
)
```

The `kbad` relation would have a structure like:

Name | Type
--------------|--------
`key` | `bytea`
`value` | `bytea`
`error` | `text`
`error_code` | `text`


There are a few considerations to note.

First, we expect to add other source types in the future that are prone to bad
data. Imagine S3 sources or Kinesis sources. We'd need to be comfortable
extending the DLQ concept to these new sources—which seems admittedly
straightforward.

Second, the source DLQ does not generalize to handling decoding errors that
occur outside of the source decoding pipeline. For example, imagine a JSON
source that is further parsed into columns by a downstream materialized view:

```sql
CREATE SOURCE my_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'samsa')
FORMAT JSON;

CREATE MATERIALIZED VIEW my_view AS
SELECT
(data->>'userid')::int AS userid,
FROM my_source;
```

Bad data can cause both the initial JSON parsing to fail, or the cast to `int`
in `my_view` to fail. A source DLQ does not help with the second type of error.
We'd need to additionally pursue a solution for gracefully handling
invalid function calls, like those described in [#6367].

**Open questions:**

* Would this alternative be preferable to the proposed design?
* Is this alternative compatible with introducing `IGNORE ERRORS` and
`ONLY ERRORS` in the future?
* Can a dead letter queue be added to an existing source?

[^1]: Refer to the PostgreSQL documentation for [details on the behavior of `LATERAL`][lateral-docs].

Expand All @@ -296,6 +411,7 @@ invalid function calls, like those described in [#6367].
[table factor]: https://github.com/MaterializeInc/materialize/blob/2c7ca4fb6b2a63134f6c123557bc09c1565524b7/src/sql-parser/src/parser.rs#L6991
[error-handling-docs]: https://github.com/MaterializeInc/materialize/blob/2c7ca4fb6b2a63134f6c123557bc09c1565524b7/src/compute/src/render.rs#L12-L101
[@maddyblue]: https://github.com/maddyblue
[dlq]: https://aws.amazon.com/what-is/dead-letter-queue/

[#6367]: https://github.com/MaterializeInc/materialize/issues/6367
[#22430]: https://github.com/MaterializeInc/materialize/issues/22430
Expand Down

0 comments on commit 7bd9bfa

Please sign in to comment.