Skip to content

Commit

Permalink
Add implementation details for inline errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Jun 18, 2024
1 parent 13bbf8a commit 8f2ab7a
Showing 1 changed file with 54 additions and 1 deletion.
55 changes: 54 additions & 1 deletion doc/developer/design/20240609_error_handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,60 @@ described in [#6367].

### Implementation

TODO.
#### Inline errors

When `VALUE DECODING ERRORS = INLINE` is set:

[`KafkaSourceConnection` `metadata_columns`](/src/storage-types/src/sources/kafka.rs#L59)
will include a new `error` column in [`plan_create_source`](/src/sql/src/plan/statement/ddl.rs#L605).

[`UpsertStyle`](/src/storage-types/src/sources/envelope.rs#L142) will be extended with a new
`ValueErrInline` enum value to indicate the inline style should be used.
```
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum UpsertStyle {
/// `ENVELOPE UPSERT`, where the key shape depends on the independent
/// `KeyEnvelope`
Default(KeyEnvelope),
/// `ENVELOPE DEBEZIUM UPSERT`
Debezium { after_idx: usize },
/// `ENVELOPE UPSERT`, where any decoded value will get packed into a ScalarType::Record
/// named `value`, and any decode errors will get serialized into a ScalarType::Record
/// named `error`. The error will be propagated to the error stream if `propagate_errors`
/// is set. The key shape depends on the independent `KeyEnvelope`.
ValueErrInline {
key_envelope: KeyEnvelope,
propagate_errors: bool,
},
}
```
and this will be set in [`plan_create_source`](/src/sql/src/plan/statement/ddl.rs#L605) based
on the value of the `VALUE DECODING ERRORS` option. If `PROPAGATE` is also included in the
option value, `propagate_errors` will be set to true.

[`UnplannedSourceEnvelope::desc`](/src/storage-types/src/sources/envelope.rs#L79) will be
updated to handle the new `UpsertStyle::ValueErrInline` value, with the same logic as
`UpsertStyle::Default` to determine the column-key and key_desc but returning a
`ScalarType::Record` in a `value` column rather than merging the value desc into the
top-level desc.

The source rendering [`upsert_commands`](/src/storage/src/render/sources.rs#L520) method
will be updated to handle the new `UpsertStyle::ValueErrInline` style. If it receives a
`DecodeError` row it will serialize the error into a `record(description: text, code: text)`
and include that in the `error` column, and if it receives a valid value Row it will insert
the value row into a `record` datum for the `value` column.

If `propagate_errors` is set to `true`, it will continue to produce an additional row with the
`UpsertValueError` error. This will require switching the `map` collection operator to a `flat_map`.

At this point the downstream upsert operators should not require any additional changes, as they
will continue to operate on the same
`Collection<G, (UpsertKey, Option<Result<Row, UpsertError>>, FromTime), Diff>` input collection
received from `upsert_commands` as before.

#### Dead-letter queue (DLQ)

TODO

**Open questions:**

Expand Down

0 comments on commit 8f2ab7a

Please sign in to comment.