From 47871776d4424c9ba0a8161ffd288ba4163c3637 Mon Sep 17 00:00:00 2001 From: stewartboyd119 Date: Mon, 23 Sep 2024 22:50:21 -0700 Subject: [PATCH] Updated --- config.go | 2 +- formatter.go | 27 ++++++++++++++++++--------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/config.go b/config.go index e78a5e3..538033b 100644 --- a/config.go +++ b/config.go @@ -235,7 +235,7 @@ type SerializationConfig struct { } type DeserializationConfig struct { - // Schema is used exclusively by the avro schema registry formatter today. Its necessary to provide proper schema evolution properties + // Schema is used exclusively by the avro schema registry formatter today. It's necessary to provide proper schema evolution properties // expected by typical use cases. Schema string } diff --git a/formatter.go b/formatter.go index d304beb..b8ba118 100644 --- a/formatter.go +++ b/formatter.go @@ -107,6 +107,10 @@ func newAvroSchemaRegistryFormatter(afmt avroFmt) (avroSchemaRegistryFormatter, }, nil } +// marshall looks a subject's schema (id) so that it can prefix the eventual message payload. +// A schema must be provided and hamba/avro is used in conjunction with this schema to marshall they payload. +// Structs generated using hamba/avro work best, since they provide avro tags which handles casing +// which can lead to errors otherwise. func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) { if req.schema == "" { return nil, errors.New("avro schema is required for schema registry formatter") @@ -130,6 +134,15 @@ func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) { return payload, nil } +// unmarshall looks up the schema based on the schemaID in the message payload (`dataSchema`). +// Additionally, a target schema is provided in the request (`targetSchema`). +// +// The 'targetSchema' and 'dataSchema' are resolved so that data written by the `dataSchema` may +// be read by the `targetSchema`. +// +// The avro.Deserializer has much of this functionality built in, other than being able to specify +// the `targetSchema` and instead infers the target schema from the target struct. This creates +// issues in some common use cases. func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error { if req.schema == "" { return errors.New("avro schema is required for schema registry formatter") @@ -139,31 +152,27 @@ func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error { return fmt.Errorf("failed to get schema from message payload: %w", err) } - inSchema, err := avro.Parse(inInfo.Schema) + // schema of data that exists on the wire, that is about to be marshalled into the schema of our target + dataSchema, err := avro.Parse(inInfo.Schema) if err != nil { return fmt.Errorf("failed to parse schema associated with message: %w", err) } - outSchema, err := avro.Parse(req.schema) + targetSchema, err := avro.Parse(req.schema) if err != nil { return fmt.Errorf("failed to parse schema : %w", err) } sc := avro.NewSchemaCompatibility() - finalSchema, err := sc.Resolve(inSchema, outSchema) + resolvedSchema, err := sc.Resolve(dataSchema, targetSchema) if err != nil { return fmt.Errorf("failed to get schema from payload: %w", err) } - err = avro.Unmarshal(finalSchema, req.data[5:], req.target) + err = avro.Unmarshal(resolvedSchema, req.data[5:], req.target) if err != nil { return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err) } return nil - //err := f.afmt.Deserialize(req.topic, req.data, req.target) - //if err != nil { - // return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err) - //} - //return nil } type protoSchemaRegistryFormatter struct {