diff --git a/pkg/ccl/changefeedccl/avro.go b/pkg/ccl/changefeedccl/avro.go index e8d466d18af3..1195111a1ad8 100644 --- a/pkg/ccl/changefeedccl/avro.go +++ b/pkg/ccl/changefeedccl/avro.go @@ -169,6 +169,7 @@ type avroMetadata map[string]interface{} type avroEnvelopeOpts struct { beforeField, afterField, recordField bool updatedField, resolvedField bool + mvccTimestampField bool } // avroEnvelopeRecord is an `avroRecord` that wraps a changed SQL row and some @@ -954,6 +955,14 @@ func envelopeToAvroSchema( } schema.Fields = append(schema.Fields, updatedField) } + if opts.mvccTimestampField { + mvccTimestampField := &avroSchemaField{ + SchemaType: []avroSchemaType{avroSchemaNull, avroSchemaString}, + Name: `mvcc_timestamp`, + Default: nil, + } + schema.Fields = append(schema.Fields, mvccTimestampField) + } if opts.resolvedField { resolvedField := &avroSchemaField{ SchemaType: []avroSchemaType{avroSchemaNull, avroSchemaString}, @@ -1037,6 +1046,20 @@ func (r *avroEnvelopeRecord) BinaryFromRow( native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } } + + if r.opts.mvccTimestampField { + native[`mvcc_timestamp`] = nil + if u, ok := meta[`mvcc_timestamp`]; ok { + delete(meta, `mvcc_timestamp`) + ts, ok := u.(hlc.Timestamp) + if !ok { + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown metadata timestamp type: %T`, u)) + } + native[`mvcc_timestamp`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) + } + } + if r.opts.resolvedField { native[`resolved`] = nil if u, ok := meta[`resolved`]; ok { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index a4812069b310..775947dc19dd 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -998,6 +998,34 @@ func TestChangefeedMVCCTimestamps(t *testing.T) { cdcTest(t, testFn) } +func TestChangefeedMVCCTimestampsAvro(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE mvcc_timestamp_test_table (id UUID PRIMARY KEY DEFAULT gen_random_uuid())`) + + const rowCount = 5 + expectedPayloads := make([]string, rowCount) + for i := 0; i < rowCount; i++ { + row := sqlDB.QueryRow(t, `INSERT INTO mvcc_timestamp_test_table VALUES (DEFAULT) RETURNING id, cluster_logical_timestamp()`) + + var id string + var mvccTimestamp string + row.Scan(&id, &mvccTimestamp) + expectedPayloads[i] = fmt.Sprintf(`mvcc_timestamp_test_table: {"id":{"string":"%[1]s"}}->{"after":{"mvcc_timestamp_test_table":{"id":{"string":"%[1]s"}}},"mvcc_timestamp":{"string":"%[2]s"}}`, + id, mvccTimestamp) + } + + changeFeed := feed(t, f, `CREATE CHANGEFEED FOR mvcc_timestamp_test_table WITH mvcc_timestamp, format='avro'`) + defer closeFeed(t, changeFeed) + assertPayloads(t, changeFeed, expectedPayloads) + } + + cdcTest(t, testFn, feedTestForceSink(`kafka`)) +} + func TestChangefeedResolvedFrequency(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/encoder_avro.go b/pkg/ccl/changefeedccl/encoder_avro.go index d6cb74f3f424..0346e2311cad 100644 --- a/pkg/ccl/changefeedccl/encoder_avro.go +++ b/pkg/ccl/changefeedccl/encoder_avro.go @@ -34,6 +34,7 @@ type confluentAvroEncoder struct { schemaRegistry schemaRegistry schemaPrefix string updatedField, beforeField bool + mvccTimestampField bool virtualColumnVisibility changefeedbase.VirtualColumnVisibility targets changefeedbase.Targets envelopeType changefeedbase.EnvelopeType @@ -90,6 +91,7 @@ func newConfluentAvroEncoder( e.updatedField = opts.UpdatedTimestamps e.beforeField = opts.Diff e.customKeyColumn = opts.CustomKeyColumn + e.mvccTimestampField = opts.MVCCTimestamps // TODO: Implement this. if opts.KeyInValue { @@ -257,10 +259,10 @@ func (e *confluentAvroEncoder) EncodeValue( // This means metadata can safely go at the top level as there are never arbitrary column names // for it to conflict with. if e.envelopeType == changefeedbase.OptEnvelopeWrapped { - opts = avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField} + opts = avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField, mvccTimestampField: e.mvccTimestampField} afterDataSchema = currentSchema } else { - opts = avroEnvelopeOpts{recordField: true, updatedField: e.updatedField} + opts = avroEnvelopeOpts{recordField: true, updatedField: e.updatedField, mvccTimestampField: e.mvccTimestampField} recordDataSchema = currentSchema } @@ -284,11 +286,12 @@ func (e *confluentAvroEncoder) EncodeValue( e.valueCache.Add(cacheKey, registered) } - var meta avroMetadata + meta := avroMetadata{} if registered.schema.opts.updatedField { - meta = map[string]interface{}{ - `updated`: evCtx.updated, - } + meta[`updated`] = evCtx.updated + } + if registered.schema.opts.mvccTimestampField { + meta[`mvcc_timestamp`] = evCtx.mvcc } // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format