Skip to content

Commit

Permalink
Updated
Browse files Browse the repository at this point in the history
  • Loading branch information
stewartboyd119 committed Sep 24, 2024
1 parent 741a7e7 commit 4787177
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ type SerializationConfig struct {
}

type DeserializationConfig struct {
// Schema is used exclusively by the avro schema registry formatter today. Its necessary to provide proper schema evolution properties
// Schema is used exclusively by the avro schema registry formatter today. It's necessary to provide proper schema evolution properties
// expected by typical use cases.
Schema string
}
Expand Down
27 changes: 18 additions & 9 deletions formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func newAvroSchemaRegistryFormatter(afmt avroFmt) (avroSchemaRegistryFormatter,
}, nil
}

// marshall looks a subject's schema (id) so that it can prefix the eventual message payload.
// A schema must be provided and hamba/avro is used in conjunction with this schema to marshall they payload.
// Structs generated using hamba/avro work best, since they provide avro tags which handles casing
// which can lead to errors otherwise.
func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) {
if req.schema == "" {
return nil, errors.New("avro schema is required for schema registry formatter")
Expand All @@ -130,6 +134,15 @@ func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) {
return payload, nil
}

// unmarshall looks up the schema based on the schemaID in the message payload (`dataSchema`).
// Additionally, a target schema is provided in the request (`targetSchema`).
//
// The 'targetSchema' and 'dataSchema' are resolved so that data written by the `dataSchema` may
// be read by the `targetSchema`.
//
// The avro.Deserializer has much of this functionality built in, other than being able to specify
// the `targetSchema` and instead infers the target schema from the target struct. This creates
// issues in some common use cases.
func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error {
if req.schema == "" {
return errors.New("avro schema is required for schema registry formatter")
Expand All @@ -139,31 +152,27 @@ func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error {
return fmt.Errorf("failed to get schema from message payload: %w", err)
}

inSchema, err := avro.Parse(inInfo.Schema)
// schema of data that exists on the wire, that is about to be marshalled into the schema of our target
dataSchema, err := avro.Parse(inInfo.Schema)
if err != nil {
return fmt.Errorf("failed to parse schema associated with message: %w", err)
}

outSchema, err := avro.Parse(req.schema)
targetSchema, err := avro.Parse(req.schema)
if err != nil {
return fmt.Errorf("failed to parse schema : %w", err)
}
sc := avro.NewSchemaCompatibility()
finalSchema, err := sc.Resolve(inSchema, outSchema)
resolvedSchema, err := sc.Resolve(dataSchema, targetSchema)
if err != nil {
return fmt.Errorf("failed to get schema from payload: %w", err)
}

err = avro.Unmarshal(finalSchema, req.data[5:], req.target)
err = avro.Unmarshal(resolvedSchema, req.data[5:], req.target)
if err != nil {
return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err)
}
return nil
//err := f.afmt.Deserialize(req.topic, req.data, req.target)
//if err != nil {
// return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err)
//}
//return nil
}

type protoSchemaRegistryFormatter struct {
Expand Down

0 comments on commit 4787177

Please sign in to comment.