diff --git a/doc/developer/design/20240609_error_handling.md b/doc/developer/design/20240609_error_handling.md index 0f874b37e7161..03b01fbcf9e72 100644 --- a/doc/developer/design/20240609_error_handling.md +++ b/doc/developer/design/20240609_error_handling.md @@ -187,7 +187,60 @@ described in [#6367]. ### Implementation -TODO. +#### Inline errors + +When `VALUE DECODING ERRORS = INLINE` is set: + +[`KafkaSourceConnection` `metadata_columns`](/src/storage-types/src/sources/kafka.rs#L59) +will include a new `error` column in [`plan_create_source`](/src/sql/src/plan/statement/ddl.rs#L605). + +[`UpsertStyle`](/src/storage-types/src/sources/envelope.rs#L142) will be extended with a new +`ValueErrInline` enum value to indicate the inline style should be used. +``` +#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum UpsertStyle { + /// `ENVELOPE UPSERT`, where the key shape depends on the independent + /// `KeyEnvelope` + Default(KeyEnvelope), + /// `ENVELOPE DEBEZIUM UPSERT` + Debezium { after_idx: usize }, + /// `ENVELOPE UPSERT`, where any decoded value will get packed into a ScalarType::Record + /// named `value`, and any decode errors will get serialized into a ScalarType::Record + /// named `error`. The error will be propagated to the error stream if `propagate_errors` + /// is set. The key shape depends on the independent `KeyEnvelope`. + ValueErrInline { + key_envelope: KeyEnvelope, + propagate_errors: bool, + }, +} +``` +and this will be set in [`plan_create_source`](/src/sql/src/plan/statement/ddl.rs#L605) based +on the value of the `VALUE DECODING ERRORS` option. If `PROPAGATE` is also included in the +option value, `propagate_errors` will be set to true. + +[`UnplannedSourceEnvelope::desc`](/src/storage-types/src/sources/envelope.rs#L79) will be +updated to handle the new `UpsertStyle::ValueErrInline` value, with the same logic as +`UpsertStyle::Default` to determine the column-key and key_desc but returning a +`ScalarType::Record` in a `value` column rather than merging the value desc into the +top-level desc. + +The source rendering [`upsert_commands`](/src/storage/src/render/sources.rs#L520) method +will be updated to handle the new `UpsertStyle::ValueErrInline` style. If it receives a +`DecodeError` row it will serialize the error into a `record(description: text, code: text)` +and include that in the `error` column, and if it receives a valid value Row it will insert +the value row into a `record` datum for the `value` column. + +If `propagate_errors` is set to `true`, it will continue to produce an additional row with the +`UpsertValueError` error. This will require switching the `map` collection operator to a `flat_map`. + +At this point the downstream upsert operators should not require any additional changes, as they +will continue to operate on the same +`Collection>, FromTime), Diff>` input collection +received from `upsert_commands` as before. + +#### Dead-letter queue (DLQ) + +TODO **Open questions:**