From eed6b4ec5aa4ef779d387b10644edfa755ca0382 Mon Sep 17 00:00:00 2001 From: Eduardas Kazakas Date: Fri, 13 Dec 2024 14:19:12 +0200 Subject: [PATCH 1/3] feat: implement fine grained serialization and deserialization control --- .../avro_generic_consumer_example.go | 2 +- .../avro_generic_producer_example.go | 2 +- .../avro_specific_consumer_example.go | 2 +- .../avro_specific_producer_example.go | 2 +- .../avrov2_consumer_encryption_example.go | 2 +- .../avrov2_consumer_example.go | 2 +- .../avrov2_consumer_migration_example.go | 10 +- .../avrov2_producer_encryption_example.go | 8 +- .../avrov2_producer_example.go | 2 +- .../avrov2_producer_migration_example.go | 13 +- .../confluent_cloud_example.go | 4 +- examples/docker_aws_lambda_example/go.sum | 1 + .../json_consumer_example.go | 2 +- .../json_producer_encryption_example.go | 7 +- .../json_producer_example.go | 2 +- .../protobuf_consumer_encryption_example.go | 2 +- .../protobuf_consumer_example.go | 2 +- .../protobuf_producer_encryption_example.go | 7 +- .../protobuf_producer_example.go | 2 +- schemaregistry/serde/avro/avro_generic.go | 8 +- .../serde/avro/avro_generic_test.go | 24 +- schemaregistry/serde/avro/avro_specific.go | 8 +- .../serde/avro/avro_specific_test.go | 24 +- schemaregistry/serde/avrov2/avro.go | 23 +- schemaregistry/serde/avrov2/avro_test.go | 358 ++++++++++-------- schemaregistry/serde/config.go | 50 ++- .../serde/jsonschema/json_schema.go | 23 +- .../serde/jsonschema/json_schema_test.go | 245 +++++++----- schemaregistry/serde/protobuf/protobuf.go | 23 +- .../serde/protobuf/protobuf_test.go | 196 +++++----- schemaregistry/serde/serde.go | 29 +- 31 files changed, 619 insertions(+), 466 deletions(-) diff --git a/examples/avro_generic_consumer_example/avro_generic_consumer_example.go b/examples/avro_generic_consumer_example/avro_generic_consumer_example.go index 6cb98d3af..4ade8b6a7 100644 --- a/examples/avro_generic_consumer_example/avro_generic_consumer_example.go +++ b/examples/avro_generic_consumer_example/avro_generic_consumer_example.go @@ -97,7 +97,7 @@ func main() { switch e := ev.(type) { case *kafka.Message: value := User{} - err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value) + err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value, serde.NewDeserializeHint()) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/avro_generic_producer_example/avro_generic_producer_example.go b/examples/avro_generic_producer_example/avro_generic_producer_example.go index 9d9b45462..7d16c875a 100644 --- a/examples/avro_generic_producer_example/avro_generic_producer_example.go +++ b/examples/avro_generic_producer_example/avro_generic_producer_example.go @@ -71,7 +71,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serde.NewSerializeHint()) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/examples/avro_specific_consumer_example/avro_specific_consumer_example.go b/examples/avro_specific_consumer_example/avro_specific_consumer_example.go index 87f2bf36f..1d574d720 100644 --- a/examples/avro_specific_consumer_example/avro_specific_consumer_example.go +++ b/examples/avro_specific_consumer_example/avro_specific_consumer_example.go @@ -97,7 +97,7 @@ func main() { switch e := ev.(type) { case *kafka.Message: value := User{} - err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value) + err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value, serde.NewDeserializeHint()) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/avro_specific_producer_example/avro_specific_producer_example.go b/examples/avro_specific_producer_example/avro_specific_producer_example.go index df40f642c..f486e29f0 100644 --- a/examples/avro_specific_producer_example/avro_specific_producer_example.go +++ b/examples/avro_specific_producer_example/avro_specific_producer_example.go @@ -71,7 +71,7 @@ func main() { Favorite_number: 42, Favorite_color: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serde.NewSerializeHint()) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/examples/avrov2_consumer_encryption_example/avrov2_consumer_encryption_example.go b/examples/avrov2_consumer_encryption_example/avrov2_consumer_encryption_example.go index 8fe589379..d20b4ef5f 100644 --- a/examples/avrov2_consumer_encryption_example/avrov2_consumer_encryption_example.go +++ b/examples/avrov2_consumer_encryption_example/avrov2_consumer_encryption_example.go @@ -111,7 +111,7 @@ func main() { switch e := ev.(type) { case *kafka.Message: value := User{} - err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value) + err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value, serde.NewDeserializeHint()) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/avrov2_consumer_example/avrov2_consumer_example.go b/examples/avrov2_consumer_example/avrov2_consumer_example.go index 588e5030b..e51cf345c 100644 --- a/examples/avrov2_consumer_example/avrov2_consumer_example.go +++ b/examples/avrov2_consumer_example/avrov2_consumer_example.go @@ -97,7 +97,7 @@ func main() { switch e := ev.(type) { case *kafka.Message: value := User{} - err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value) + err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value, serde.NewDeserializeHint()) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/avrov2_consumer_migration_example/avrov2_consumer_migration_example.go b/examples/avrov2_consumer_migration_example/avrov2_consumer_migration_example.go index 8ceef2d43..149d57ee4 100644 --- a/examples/avrov2_consumer_migration_example/avrov2_consumer_migration_example.go +++ b/examples/avrov2_consumer_migration_example/avrov2_consumer_migration_example.go @@ -71,13 +71,13 @@ func main() { os.Exit(1) } - deserConfig := avrov2.NewDeserializerConfig() - deserConfig.UseLatestWithMetadata = map[string]string{ + deser, err := avrov2.NewDeserializer(client, serde.ValueSerde, avrov2.NewDeserializerConfig()) + + deserializeHint := serde.NewDeserializeHint() + deserializeHint.UseLatestWithMetadata = map[string]string{ "application.major.version": "2", } - deser, err := avrov2.NewDeserializer(client, serde.ValueSerde, deserConfig) - if err != nil { fmt.Printf("Failed to create deserializer: %s\n", err) os.Exit(1) @@ -106,7 +106,7 @@ func main() { switch e := ev.(type) { case *kafka.Message: value := User{} - err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value) + err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value, deserializeHint) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/avrov2_producer_encryption_example/avrov2_producer_encryption_example.go b/examples/avrov2_producer_encryption_example/avrov2_producer_encryption_example.go index 774d19e5e..25bf1535c 100644 --- a/examples/avrov2_producer_encryption_example/avrov2_producer_encryption_example.go +++ b/examples/avrov2_producer_encryption_example/avrov2_producer_encryption_example.go @@ -114,7 +114,6 @@ func main() { serConfig := avrov2.NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true // KMS properties can be passed as follows //serConfig.RuleConfig = map[string]string{ // "secret.access.key": "xxx", @@ -123,6 +122,9 @@ func main() { ser, err := avrov2.NewSerializer(client, serde.ValueSerde, serConfig) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + if err != nil { fmt.Printf("Failed to create serializer: %s\n", err) os.Exit(1) @@ -137,7 +139,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serializeHint) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) @@ -158,7 +160,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err = ser.Serialize(topic, &value) + payload, err = ser.Serialize(topic, &value, serializeHint) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/examples/avrov2_producer_example/avrov2_producer_example.go b/examples/avrov2_producer_example/avrov2_producer_example.go index 23d1340b3..fc13d6a91 100644 --- a/examples/avrov2_producer_example/avrov2_producer_example.go +++ b/examples/avrov2_producer_example/avrov2_producer_example.go @@ -71,7 +71,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serde.NewSerializeHint()) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/examples/avrov2_producer_migration_example/avrov2_producer_migration_example.go b/examples/avrov2_producer_migration_example/avrov2_producer_migration_example.go index 6e594dd5d..a7577909e 100644 --- a/examples/avrov2_producer_migration_example/avrov2_producer_migration_example.go +++ b/examples/avrov2_producer_migration_example/avrov2_producer_migration_example.go @@ -134,17 +134,18 @@ func main() { serConfig := avrov2.NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestWithMetadata = map[string]string{ - "application.major.version": "1", - } ser, err := avrov2.NewSerializer(client, serde.ValueSerde, serConfig) - if err != nil { fmt.Printf("Failed to create serializer: %s\n", err) os.Exit(1) } + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestWithMetadata = map[string]string{ + "application.major.version": "1", + } + // Optional delivery channel, if not specified the Producer object's // .Events channel is used. deliveryChan := make(chan kafka.Event) @@ -154,7 +155,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serializeHint) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) @@ -175,7 +176,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err = ser.Serialize(topic, &value) + payload, err = ser.Serialize(topic, &value, serializeHint) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/examples/confluent_cloud_example/confluent_cloud_example.go b/examples/confluent_cloud_example/confluent_cloud_example.go index 1d5974bae..9b2f23a3d 100644 --- a/examples/confluent_cloud_example/confluent_cloud_example.go +++ b/examples/confluent_cloud_example/confluent_cloud_example.go @@ -118,7 +118,7 @@ func main() { FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serde.NewSerializeHint()) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) @@ -171,7 +171,7 @@ func main() { message, err := consumer.ReadMessage(100 * time.Millisecond) if err == nil { received := User{} - err := deser.DeserializeInto(*message.TopicPartition.Topic, message.Value, &received) + err := deser.DeserializeInto(*message.TopicPartition.Topic, message.Value, &received, serde.NewDeserializeHint()) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/docker_aws_lambda_example/go.sum b/examples/docker_aws_lambda_example/go.sum index 9be073518..860c4d375 100644 --- a/examples/docker_aws_lambda_example/go.sum +++ b/examples/docker_aws_lambda_example/go.sum @@ -53,6 +53,7 @@ github.com/confluentinc/confluent-kafka-go/v2 v2.4.0 h1:NbOku86JJlsRJPJKE0snNsz6 github.com/confluentinc/confluent-kafka-go/v2 v2.4.0/go.mod h1:E1dEQy50ZLfqs7T9luxz0rLxaeFZJZE92XvApJOr/Rk= github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y= github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0= +github.com/confluentinc/confluent-kafka-go/v2 v2.6.1/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves= github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= github.com/containerd/containerd v1.7.12 h1:+KQsnv4VnzyxWcfO9mlxxELaoztsDEjOuCMPAuPqgU0= diff --git a/examples/json_consumer_example/json_consumer_example.go b/examples/json_consumer_example/json_consumer_example.go index 369693ba5..18c3010d3 100644 --- a/examples/json_consumer_example/json_consumer_example.go +++ b/examples/json_consumer_example/json_consumer_example.go @@ -97,7 +97,7 @@ func main() { switch e := ev.(type) { case *kafka.Message: value := User{} - err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value) + err := deser.DeserializeInto(*e.TopicPartition.Topic, e.Value, &value, serde.NewDeserializeHint()) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/json_producer_encryption_example/json_producer_encryption_example.go b/examples/json_producer_encryption_example/json_producer_encryption_example.go index 87fe88b15..6409cd30d 100644 --- a/examples/json_producer_encryption_example/json_producer_encryption_example.go +++ b/examples/json_producer_encryption_example/json_producer_encryption_example.go @@ -116,7 +116,6 @@ func main() { serConfig := jsonschema.NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true // KMS properties can be passed as follows //serConfig.RuleConfig = map[string]string{ // "secret.access.key": "xxx", @@ -124,12 +123,14 @@ func main() { //} ser, err := jsonschema.NewSerializer(client, serde.ValueSerde, serConfig) - if err != nil { fmt.Printf("Failed to create serializer: %s\n", err) os.Exit(1) } + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + // Optional delivery channel, if not specified the Producer object's // .Events channel is used. deliveryChan := make(chan kafka.Event) @@ -139,7 +140,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serializeHint) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/examples/json_producer_example/json_producer_example.go b/examples/json_producer_example/json_producer_example.go index 474562b4e..b4ae06b37 100644 --- a/examples/json_producer_example/json_producer_example.go +++ b/examples/json_producer_example/json_producer_example.go @@ -71,7 +71,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serde.NewSerializeHint()) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/examples/protobuf_consumer_encryption_example/protobuf_consumer_encryption_example.go b/examples/protobuf_consumer_encryption_example/protobuf_consumer_encryption_example.go index e8a741e2a..54813a77a 100644 --- a/examples/protobuf_consumer_encryption_example/protobuf_consumer_encryption_example.go +++ b/examples/protobuf_consumer_encryption_example/protobuf_consumer_encryption_example.go @@ -116,7 +116,7 @@ func main() { switch e := ev.(type) { case *kafka.Message: - value, err := deser.Deserialize(*e.TopicPartition.Topic, e.Value) + value, err := deser.Deserialize(*e.TopicPartition.Topic, e.Value, serde.NewDeserializeHint()) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/protobuf_consumer_example/protobuf_consumer_example.go b/examples/protobuf_consumer_example/protobuf_consumer_example.go index a919005c2..62d80d742 100644 --- a/examples/protobuf_consumer_example/protobuf_consumer_example.go +++ b/examples/protobuf_consumer_example/protobuf_consumer_example.go @@ -101,7 +101,7 @@ func main() { switch e := ev.(type) { case *kafka.Message: - value, err := deser.Deserialize(*e.TopicPartition.Topic, e.Value) + value, err := deser.Deserialize(*e.TopicPartition.Topic, e.Value, serde.NewDeserializeHint()) if err != nil { fmt.Printf("Failed to deserialize payload: %s\n", err) } else { diff --git a/examples/protobuf_producer_encryption_example/protobuf_producer_encryption_example.go b/examples/protobuf_producer_encryption_example/protobuf_producer_encryption_example.go index afe92feff..61a7d153a 100644 --- a/examples/protobuf_producer_encryption_example/protobuf_producer_encryption_example.go +++ b/examples/protobuf_producer_encryption_example/protobuf_producer_encryption_example.go @@ -117,7 +117,6 @@ message User { serConfig := protobuf.NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true // KMS properties can be passed as follows //serConfig.RuleConfig = map[string]string{ // "secret.access.key": "xxx", @@ -125,12 +124,14 @@ message User { //} ser, err := protobuf.NewSerializer(client, serde.ValueSerde, serConfig) - if err != nil { fmt.Printf("Failed to create serializer: %s\n", err) os.Exit(1) } + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + // Optional delivery channel, if not specified the Producer object's // .Events channel is used. deliveryChan := make(chan kafka.Event) @@ -141,7 +142,7 @@ message User { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serializeHint) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/examples/protobuf_producer_example/protobuf_producer_example.go b/examples/protobuf_producer_example/protobuf_producer_example.go index c6688c021..ef2304eeb 100644 --- a/examples/protobuf_producer_example/protobuf_producer_example.go +++ b/examples/protobuf_producer_example/protobuf_producer_example.go @@ -71,7 +71,7 @@ func main() { FavoriteNumber: 42, FavoriteColor: "blue", } - payload, err := ser.Serialize(topic, &value) + payload, err := ser.Serialize(topic, &value, serde.NewSerializeHint()) if err != nil { fmt.Printf("Failed to serialize payload: %s\n", err) os.Exit(1) diff --git a/schemaregistry/serde/avro/avro_generic.go b/schemaregistry/serde/avro/avro_generic.go index 8199fb50a..edd8abe68 100644 --- a/schemaregistry/serde/avro/avro_generic.go +++ b/schemaregistry/serde/avro/avro_generic.go @@ -51,7 +51,7 @@ func NewGenericSerializer(client schemaregistry.Client, serdeType serde.Type, co } // Serialize implements serialization of generic Avro data -func (s *GenericSerializer) Serialize(topic string, msg interface{}) ([]byte, error) { +func (s *GenericSerializer) Serialize(topic string, msg interface{}, hint *serde.SerializeHint) ([]byte, error) { if msg == nil { return nil, nil } @@ -67,7 +67,7 @@ func (s *GenericSerializer) Serialize(topic string, msg interface{}) ([]byte, er info := schemaregistry.SchemaInfo{ Schema: avroType.String(), } - id, err := s.GetID(topic, msg, &info) + id, err := s.GetID(topic, msg, &info, hint) if err != nil { return nil, err } @@ -93,7 +93,7 @@ func NewGenericDeserializer(client schemaregistry.Client, serdeType serde.Type, } // Deserialize implements deserialization of generic Avro data -func (s *GenericDeserializer) Deserialize(topic string, payload []byte) (interface{}, error) { +func (s *GenericDeserializer) Deserialize(topic string, payload []byte, _ *serde.DeserializeHint) (interface{}, error) { if payload == nil { return nil, nil } @@ -118,7 +118,7 @@ func (s *GenericDeserializer) Deserialize(topic string, payload []byte) (interfa } // DeserializeInto implements deserialization of generic Avro data to the given object -func (s *GenericDeserializer) DeserializeInto(topic string, payload []byte, msg interface{}) error { +func (s *GenericDeserializer) DeserializeInto(topic string, payload []byte, msg interface{}, _ *serde.DeserializeHint) error { if payload == nil { return nil } diff --git a/schemaregistry/serde/avro/avro_generic_test.go b/schemaregistry/serde/avro/avro_generic_test.go index 86e6e57cd..d39b7c70d 100644 --- a/schemaregistry/serde/avro/avro_generic_test.go +++ b/schemaregistry/serde/avro/avro_generic_test.go @@ -59,7 +59,7 @@ func TestGenericAvroSerdeWithSimple(t *testing.T) { obj.StringField = "hi" obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewGenericDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -67,11 +67,13 @@ func TestGenericAvroSerdeWithSimple(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactoryGeneric + deserializeHint := serde.NewDeserializeHint() + var newobj GenericDemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } @@ -96,7 +98,7 @@ func TestGenericAvroSerdeWithNested(t *testing.T) { OtherField: nested, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewGenericDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -104,11 +106,13 @@ func TestGenericAvroSerdeWithNested(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactoryGeneric + deserializeHint := serde.NewDeserializeHint() + var newobj GenericNestedTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } @@ -131,7 +135,7 @@ func TestGenericAvroSerdeWithCycle(t *testing.T) { Next: &nested, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewGenericDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -139,11 +143,13 @@ func TestGenericAvroSerdeWithCycle(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactoryGeneric + deserializeHint := serde.NewDeserializeHint() + var newobj GenericLinkedList - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } diff --git a/schemaregistry/serde/avro/avro_specific.go b/schemaregistry/serde/avro/avro_specific.go index b6a50211e..28af5e737 100644 --- a/schemaregistry/serde/avro/avro_specific.go +++ b/schemaregistry/serde/avro/avro_specific.go @@ -61,7 +61,7 @@ func NewSpecificSerializer(client schemaregistry.Client, serdeType serde.Type, c } // Serialize implements serialization of specific Avro data -func (s *SpecificSerializer) Serialize(topic string, msg interface{}) ([]byte, error) { +func (s *SpecificSerializer) Serialize(topic string, msg interface{}, hint *serde.SerializeHint) ([]byte, error) { if msg == nil { return nil, nil } @@ -76,7 +76,7 @@ func (s *SpecificSerializer) Serialize(topic string, msg interface{}) ([]byte, e info := schemaregistry.SchemaInfo{ Schema: avroMsg.Schema(), } - id, err := s.GetID(topic, avroMsg, &info) + id, err := s.GetID(topic, avroMsg, &info, hint) if err != nil { return nil, err } @@ -103,7 +103,7 @@ func NewSpecificDeserializer(client schemaregistry.Client, serdeType serde.Type, } // Deserialize implements deserialization of specific Avro data -func (s *SpecificDeserializer) Deserialize(topic string, payload []byte) (interface{}, error) { +func (s *SpecificDeserializer) Deserialize(topic string, payload []byte, _ *serde.DeserializeHint) (interface{}, error) { if payload == nil { return nil, nil } @@ -147,7 +147,7 @@ func (s *SpecificDeserializer) Deserialize(topic string, payload []byte) (interf } // DeserializeInto implements deserialization of specific Avro data to the given object -func (s *SpecificDeserializer) DeserializeInto(topic string, payload []byte, msg interface{}) error { +func (s *SpecificDeserializer) DeserializeInto(topic string, payload []byte, msg interface{}, _ *serde.DeserializeHint) error { if payload == nil { return nil } diff --git a/schemaregistry/serde/avro/avro_specific_test.go b/schemaregistry/serde/avro/avro_specific_test.go index 4335e4f22..14ce7f6d2 100644 --- a/schemaregistry/serde/avro/avro_specific_test.go +++ b/schemaregistry/serde/avro/avro_specific_test.go @@ -59,7 +59,7 @@ func TestSpecificAvroSerdeWithSimple(t *testing.T) { obj.StringField = "hi" obj.BoolField = true obj.BytesField = []byte{0, 0, 0, 1} - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewSpecificDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -67,11 +67,13 @@ func TestSpecificAvroSerdeWithSimple(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactorySpecific + deserializeHint := serde.NewDeserializeHint() + var newobj test.DemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } @@ -101,7 +103,7 @@ func TestSpecificAvroSerdeWithNested(t *testing.T) { NumberField: number, OtherField: nested, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewSpecificDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -109,11 +111,13 @@ func TestSpecificAvroSerdeWithNested(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactorySpecific + deserializeHint := serde.NewDeserializeHint() + var newobj test.NestedTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } @@ -138,7 +142,7 @@ func TestSpecificAvroSerdeWithCycle(t *testing.T) { obj := test.RecursiveUnionTestRecord{ RecursiveField: &wrapper, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewSpecificDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -146,10 +150,12 @@ func TestSpecificAvroSerdeWithCycle(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactorySpecific + deserializeHint := serde.NewDeserializeHint() + var newobj test.RecursiveUnionTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } diff --git a/schemaregistry/serde/avrov2/avro.go b/schemaregistry/serde/avrov2/avro.go index 0ff8d4574..d43007b7e 100644 --- a/schemaregistry/serde/avrov2/avro.go +++ b/schemaregistry/serde/avrov2/avro.go @@ -84,7 +84,7 @@ func NewSerializer(client schemaregistry.Client, serdeType serde.Type, conf *Ser } // Serialize implements serialization of generic Avro data -func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { +func (s *Serializer) Serialize(topic string, msg interface{}, hint *serde.SerializeHint) ([]byte, error) { if msg == nil { return nil, nil } @@ -92,9 +92,10 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { var info schemaregistry.SchemaInfo var err error // Don't derive the schema if it is being looked up in the following ways - if s.Conf.UseSchemaID == -1 && - !s.Conf.UseLatestVersion && - len(s.Conf.UseLatestWithMetadata) == 0 { + if hint.UseSchemaID == -1 && + hint.UseSpecificVersion == -1 && + !hint.UseLatestVersion && + len(hint.UseLatestWithMetadata) == 0 { msgType := reflect.TypeOf(msg) if msgType.Kind() != reflect.Pointer { return nil, errors.New("input message must be a pointer") @@ -107,7 +108,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { Schema: avroSchema.String(), } } - id, err := s.GetID(topic, msg, &info) + id, err := s.GetID(topic, msg, &info, hint) if err != nil { return nil, err } @@ -166,17 +167,17 @@ func NewDeserializer(client schemaregistry.Client, serdeType serde.Type, conf *D } // Deserialize implements deserialization of generic Avro data -func (s *Deserializer) Deserialize(topic string, payload []byte) (interface{}, error) { - return s.deserialize(topic, payload, nil) +func (s *Deserializer) Deserialize(topic string, payload []byte, hint *serde.DeserializeHint) (interface{}, error) { + return s.deserialize(topic, payload, nil, hint) } // DeserializeInto implements deserialization of generic Avro data to the given object -func (s *Deserializer) DeserializeInto(topic string, payload []byte, msg interface{}) error { - _, err := s.deserialize(topic, payload, msg) +func (s *Deserializer) DeserializeInto(topic string, payload []byte, msg interface{}, hint *serde.DeserializeHint) error { + _, err := s.deserialize(topic, payload, msg, hint) return err } -func (s *Deserializer) deserialize(topic string, payload []byte, result interface{}) (interface{}, error) { +func (s *Deserializer) deserialize(topic string, payload []byte, result interface{}, hint *serde.DeserializeHint) (interface{}, error) { if len(payload) == 0 { return nil, nil } @@ -188,7 +189,7 @@ func (s *Deserializer) deserialize(topic string, payload []byte, result interfac if err != nil { return nil, err } - readerMeta, err := s.GetReaderSchema(subject) + readerMeta, err := s.GetReaderSchema(subject, hint) if err != nil { return nil, err } diff --git a/schemaregistry/serde/avrov2/avro_test.go b/schemaregistry/serde/avrov2/avro_test.go index 181742066..2eaeef2ea 100644 --- a/schemaregistry/serde/avrov2/avro_test.go +++ b/schemaregistry/serde/avrov2/avro_test.go @@ -300,7 +300,7 @@ func TestAvroSerdeWithSimple(t *testing.T) { obj.StringField = "hi" obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -308,11 +308,13 @@ func TestAvroSerdeWithSimple(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactory + deserializeHint := serde.NewDeserializeHint() + var newobj DemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) // serialize second object @@ -322,10 +324,11 @@ func TestAvroSerdeWithSimple(t *testing.T) { obj.StringField = "bye" obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err = ser.Serialize("topic1", &obj) + + bytes, err = ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) - msg, err = deser.Deserialize("topic1", bytes) + msg, err = deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } @@ -339,7 +342,8 @@ func TestAvroSerdeWithSimpleMap(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -360,7 +364,7 @@ func TestAvroSerdeWithSimpleMap(t *testing.T) { obj["StringField"] = "hi" obj["BoolField"] = true obj["BytesField"] = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -369,7 +373,7 @@ func TestAvroSerdeWithSimpleMap(t *testing.T) { deser.MessageFactory = testMessageFactory var newobj map[string]interface{} - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) } @@ -385,7 +389,7 @@ func TestAvroSerdeWithPrimitive(t *testing.T) { serde.MaybeFail("Serializer configuration", err) obj := "Hello, World!" - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -394,7 +398,7 @@ func TestAvroSerdeWithPrimitive(t *testing.T) { deser.MessageFactory = testMessageFactory var newobj string - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) } @@ -419,7 +423,7 @@ func TestAvroSerdeWithNested(t *testing.T) { OtherField: nested, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -427,11 +431,13 @@ func TestAvroSerdeWithNested(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactory + deserializeHint := serde.NewDeserializeHint() + var newobj NestedTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } @@ -446,7 +452,8 @@ func TestAvroSerdeWithReferences(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -489,7 +496,7 @@ func TestAvroSerdeWithReferences(t *testing.T) { OtherField: nested, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -497,11 +504,13 @@ func TestAvroSerdeWithReferences(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactory + deserializeHint := serde.NewDeserializeHint() + var newobj NestedTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj)) } @@ -516,7 +525,7 @@ func TestAvroSerdeUnionWithReferences(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) _ = ser.RegisterTypeFromMessageFactory("DemoSchema", testMessageFactory) @@ -574,7 +583,10 @@ func TestAvroSerdeUnionWithReferences(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -594,14 +606,16 @@ func TestAvroSerdeUnionWithReferences(t *testing.T) { }, } + deserializeHint := serde.NewDeserializeHint() + // deserialize into map var newmap map[string]interface{} - err = deser.DeserializeInto("topic1", bytes, &newmap) + err = deser.DeserializeInto("topic1", bytes, &newmap, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newmap, oldmap)) // deserialize into interface{} var newany interface{} - err = deser.DeserializeInto("topic1", bytes, &newany) + err = deser.DeserializeInto("topic1", bytes, &newany, deserializeHint) var newobj = newany.(DemoSchema) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) } @@ -617,7 +631,7 @@ func TestAvroSchemaEvolution(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -635,7 +649,9 @@ func TestAvroSchemaEvolution(t *testing.T) { obj := SchemaEvolution1{} obj.FieldToDelete = "bye" - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) info = schemaregistry.SchemaInfo{ @@ -651,9 +667,7 @@ func TestAvroSchemaEvolution(t *testing.T) { client.ClearLatestCaches() - deserConfig := NewDeserializerConfig() - deserConfig.UseLatestVersion = true - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client deser.MessageFactory = testMessageFactory @@ -661,11 +675,14 @@ func TestAvroSchemaEvolution(t *testing.T) { obj2 := SchemaEvolution2{} obj2.NewOptionalField = "optional" + deserializeHint := serde.NewDeserializeHint() + deserializeHint.UseLatestVersion = true + var newobj SchemaEvolution2 - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj2)) - msg, err := deser.Deserialize("topic1", bytes) + msg, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(msg, &obj2)) } @@ -680,7 +697,7 @@ func TestAvroSerdeWithCELCondition(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -714,16 +731,18 @@ func TestAvroSerdeWithCELCondition(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -738,7 +757,7 @@ func TestAvroSerdeWithCELConditionLogicalType(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -774,16 +793,18 @@ func TestAvroSerdeWithCELConditionLogicalType(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -798,7 +819,7 @@ func TestAvroSerdeWithCELConditionFail(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -832,7 +853,10 @@ func TestAvroSerdeWithCELConditionFail(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - _, err = ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + _, err = ser.Serialize("topic1", &obj, serializeHint) var ruleErr serde.RuleConditionErr errors.As(err, &ruleErr) serde.MaybeFail("serialization", nil, serde.Expect(encRule, *ruleErr.Rule)) @@ -849,7 +873,7 @@ func TestAvroSerdeWithCELConditionIgnoreFail(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -884,16 +908,18 @@ func TestAvroSerdeWithCELConditionIgnoreFail(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -908,7 +934,7 @@ func TestAvroSerdeWithCELFieldTransformDisable(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -950,16 +976,18 @@ func TestAvroSerdeWithCELFieldTransformDisable(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -974,7 +1002,7 @@ func TestAvroSerdeWithCELFieldTransform(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -1008,7 +1036,10 @@ func TestAvroSerdeWithCELFieldTransform(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deserConfig := NewDeserializerConfig() @@ -1024,7 +1055,7 @@ func TestAvroSerdeWithCELFieldTransform(t *testing.T) { obj2.BoolField = true obj2.BytesField = []byte{1, 2} - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj2)) } @@ -1039,7 +1070,7 @@ func TestAvroSerdeWithCELFieldTransformComplex(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -1072,11 +1103,13 @@ func TestAvroSerdeWithCELFieldTransformComplex(t *testing.T) { obj.MapField = map[string]string{"key": "world"} obj.UnionField = &str - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client deser.MessageFactory = testMessageFactory @@ -1087,7 +1120,7 @@ func TestAvroSerdeWithCELFieldTransformComplex(t *testing.T) { obj2.MapField = map[string]string{"key": "world-suffix"} obj2.UnionField = &str2 - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj2)) } @@ -1102,7 +1135,7 @@ func TestAvroSerdeWithCELFieldTransformComplexWithNil(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -1134,11 +1167,13 @@ func TestAvroSerdeWithCELFieldTransformComplexWithNil(t *testing.T) { obj.MapField = map[string]string{"key": "world"} obj.UnionField = nil - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client deser.MessageFactory = testMessageFactory @@ -1148,7 +1183,7 @@ func TestAvroSerdeWithCELFieldTransformComplexWithNil(t *testing.T) { obj2.MapField = map[string]string{"key": "world-suffix"} obj2.UnionField = nil - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj2)) } @@ -1163,7 +1198,7 @@ func TestAvroSerdeWithCELFieldCondition(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -1197,16 +1232,18 @@ func TestAvroSerdeWithCELFieldCondition(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -1221,7 +1258,7 @@ func TestAvroSerdeWithCELFieldConditionFail(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -1255,7 +1292,10 @@ func TestAvroSerdeWithCELFieldConditionFail(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - _, err = ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + _, err = ser.Serialize("topic1", &obj, serializeHint) var ruleErr serde.RuleConditionErr errors.As(err, &ruleErr) serde.MaybeFail("serialization", nil, serde.Expect(ruleErr, serde.RuleConditionErr{Rule: &encRule})) @@ -1286,7 +1326,7 @@ func TestAvroSerdeEncryption(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1329,7 +1369,10 @@ func TestAvroSerdeEncryption(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1345,7 +1388,7 @@ func TestAvroSerdeEncryption(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -1360,7 +1403,7 @@ func TestAvroSerdeEncryptionWithSimpleMap(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1403,7 +1446,10 @@ func TestAvroSerdeEncryptionWithSimpleMap(t *testing.T) { obj["BoolField"] = true obj["BytesField"] = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1420,7 +1466,7 @@ func TestAvroSerdeEncryptionWithSimpleMap(t *testing.T) { deser.MessageFactory = testMessageFactory var newobj map[string]interface{} - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization into", err, serde.Expect(newobj, obj)) } @@ -1438,7 +1484,7 @@ func TestAvroSerdeEncryptionDekRotation(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1482,7 +1528,10 @@ func TestAvroSerdeEncryptionDekRotation(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1497,7 +1546,7 @@ func TestAvroSerdeEncryptionDekRotation(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) dek, err := executor.Client.GetDekVersion( @@ -1514,13 +1563,13 @@ func TestAvroSerdeEncryptionDekRotation(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err = ser.Serialize("topic1", &obj) + bytes, err = ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field obj.StringField = "hi" - newobj, err = deser.Deserialize("topic1", bytes) + newobj, err = deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) dek, err = executor.Client.GetDekVersion( @@ -1537,13 +1586,13 @@ func TestAvroSerdeEncryptionDekRotation(t *testing.T) { obj.BoolField = true obj.BytesField = []byte{1, 2} - bytes, err = ser.Serialize("topic1", &obj) + bytes, err = ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field obj.StringField = "hi" - newobj, err = deser.Deserialize("topic1", bytes) + newobj, err = deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) dek, err = executor.Client.GetDekVersion( @@ -1613,7 +1662,7 @@ func TestAvroSerdeEncryptionF1Preserialized(t *testing.T) { serde.MaybeFail("Dek registration", err) bytes := []byte{0, 0, 0, 0, 1, 104, 122, 103, 121, 47, 106, 70, 78, 77, 86, 47, 101, 70, 105, 108, 97, 72, 114, 77, 121, 101, 66, 103, 100, 97, 86, 122, 114, 82, 48, 117, 100, 71, 101, 111, 116, 87, 56, 99, 65, 47, 74, 97, 108, 55, 117, 107, 114, 43, 77, 47, 121, 122} - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) executor.Client.Close() @@ -1680,7 +1729,7 @@ func TestAvroSerdeEncryptionDeterministicF1Preserialized(t *testing.T) { serde.MaybeFail("Dek registration", err) bytes := []byte{0, 0, 0, 0, 1, 72, 68, 54, 89, 116, 120, 114, 108, 66, 110, 107, 84, 87, 87, 57, 78, 54, 86, 98, 107, 51, 73, 73, 110, 106, 87, 72, 56, 49, 120, 109, 89, 104, 51, 107, 52, 100} - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) executor.Client.Close() @@ -1747,7 +1796,7 @@ func TestAvroSerdeEncryptionDekRotationF1Preserialized(t *testing.T) { serde.MaybeFail("Dek registration", err) bytes := []byte{0, 0, 0, 0, 1, 120, 65, 65, 65, 65, 65, 65, 71, 52, 72, 73, 54, 98, 49, 110, 88, 80, 88, 113, 76, 121, 71, 56, 99, 73, 73, 51, 53, 78, 72, 81, 115, 101, 113, 113, 85, 67, 100, 43, 73, 101, 76, 101, 70, 86, 65, 101, 78, 112, 83, 83, 51, 102, 120, 80, 110, 74, 51, 50, 65, 61} - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) executor.Client.Close() @@ -1764,7 +1813,6 @@ func TestAvroSerdeEncryptionWithReferences(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1828,7 +1876,10 @@ func TestAvroSerdeEncryptionWithReferences(t *testing.T) { OtherField: nested, } - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1844,7 +1895,7 @@ func TestAvroSerdeEncryptionWithReferences(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -1869,7 +1920,6 @@ func TestAvroSerdeEncryptionWithPointerReferences(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1924,7 +1974,10 @@ func TestAvroSerdeEncryptionWithPointerReferences(t *testing.T) { t.Errorf("Expected valid schema id, found %d", id) } - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1941,7 +1994,7 @@ func TestAvroSerdeEncryptionWithPointerReferences(t *testing.T) { deser.MessageFactory = testMessageFactory _ = deser.RegisterTypeFromMessageFactory("DemoSchema", testMessageFactory) - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -1956,7 +2009,6 @@ func TestAvroSerdeEncryptionWithUnion(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -2001,7 +2053,10 @@ func TestAvroSerdeEncryptionWithUnion(t *testing.T) { obj.BoolField = true obj.BytesField = &b - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -2017,7 +2072,7 @@ func TestAvroSerdeEncryptionWithUnion(t *testing.T) { deser.Client = ser.Client deser.MessageFactory = testMessageFactory - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &obj)) } @@ -2117,23 +2172,20 @@ func TestAvroSerdeJSONataWithCEL(t *testing.T) { serConfig1 := NewSerializerConfig() serConfig1.AutoRegisterSchemas = false - serConfig1.UseLatestVersion = false - serConfig1.UseLatestWithMetadata = map[string]string{ - "application.version": "v1", - } ser1, err := NewSerializer(client, serde.ValueSerde, serConfig1) serde.MaybeFail("Serializer configuration", err) - bytes, err := ser1.Serialize("topic1", &widget) - serde.MaybeFail("serialization", err) - - deserConfig2 := NewDeserializerConfig() - deserConfig2.UseLatestWithMetadata = map[string]string{ - "application.version": "v2", + serializeHint1 := serde.NewSerializeHint() + serializeHint1.UseLatestVersion = true + serializeHint1.UseLatestWithMetadata = map[string]string{ + "application.version": "v1", } - deser2, err := NewDeserializer(client, serde.ValueSerde, deserConfig2) + bytes, err := ser1.Serialize("topic1", &widget, serializeHint1) + serde.MaybeFail("serialization", err) + + deser2, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser2.Client = ser1.Client deser2.MessageFactory = testMessageFactory @@ -2144,7 +2196,12 @@ func TestAvroSerdeJSONataWithCEL(t *testing.T) { Version: 1, } - newobj, err := deser2.Deserialize("topic1", bytes) + deserializeHint2 := serde.NewDeserializeHint() + deserializeHint2.UseLatestWithMetadata = map[string]string{ + "application.version": "v2", + } + + newobj, err := deser2.Deserialize("topic1", bytes, deserializeHint2) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &newWidget2)) } @@ -2301,91 +2358,76 @@ func TestAvroSerdeJSONataFullyCompatible(t *testing.T) { t.Errorf("Expected valid schema id, found %d", id) } - serConfig1 := NewSerializerConfig() - serConfig1.AutoRegisterSchemas = false - serConfig1.UseLatestVersion = false - serConfig1.UseLatestWithMetadata = map[string]string{ - "application.version": "v1", - } + serConfig := NewSerializerConfig() + serConfig.AutoRegisterSchemas = false - ser1, err := NewSerializer(client, serde.ValueSerde, serConfig1) + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) - bytes, err := ser1.Serialize("topic1", &widget) + serializeHint1 := serde.NewSerializeHint() + serializeHint1.UseLatestVersion = false + serializeHint1.UseLatestWithMetadata = map[string]string{ + "application.version": "v1", + } + + bytes, err := ser.Serialize("topic1", &widget, serializeHint1) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser1, bytes, widget, newWidget, newerWidget) + deserializeWithAllVersions(client, ser, bytes, widget, newWidget, newerWidget) - serConfig2 := NewSerializerConfig() - serConfig2.AutoRegisterSchemas = false - serConfig2.UseLatestVersion = false - serConfig2.UseLatestWithMetadata = map[string]string{ + serializeHint2 := serde.NewSerializeHint() + serializeHint2.UseLatestVersion = false + serializeHint2.UseLatestWithMetadata = map[string]string{ "application.version": "v2", } - ser2, err := NewSerializer(client, serde.ValueSerde, serConfig2) - serde.MaybeFail("Serializer configuration", err) - - bytes, err = ser2.Serialize("topic1", &newWidget) + bytes, err = ser.Serialize("topic1", &newWidget, serializeHint2) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser2, bytes, widget, newWidget, newerWidget) + deserializeWithAllVersions(client, ser, bytes, widget, newWidget, newerWidget) - serConfig3 := NewSerializerConfig() - serConfig3.AutoRegisterSchemas = false - serConfig3.UseLatestVersion = false - serConfig3.UseLatestWithMetadata = map[string]string{ + serializeHint3 := serde.NewSerializeHint() + serializeHint3.UseLatestVersion = false + serializeHint3.UseLatestWithMetadata = map[string]string{ "application.version": "v3", } - ser3, err := NewSerializer(client, serde.ValueSerde, serConfig3) - serde.MaybeFail("Serializer configuration", err) - - bytes, err = ser3.Serialize("topic1", &newerWidget) + bytes, err = ser.Serialize("topic1", &newerWidget, serializeHint3) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser3, bytes, widget, newWidget, newerWidget) + deserializeWithAllVersions(client, ser, bytes, widget, newWidget, newerWidget) } func deserializeWithAllVersions(client schemaregistry.Client, ser *Serializer, bytes []byte, widget OldWidget, newWidget NewWidget, newerWidget NewerWidget) { - deserConfig1 := NewDeserializerConfig() - deserConfig1.UseLatestWithMetadata = map[string]string{ - "application.version": "v1", - } - deser1, err := NewDeserializer(client, serde.ValueSerde, deserConfig1) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) - deser1.Client = ser.Client - deser1.MessageFactory = testMessageFactory + deser.Client = ser.Client + deser.MessageFactory = testMessageFactory + + deserializeHint1 := serde.NewDeserializeHint() + deserializeHint1.UseLatestWithMetadata = map[string]string{ + "application.version": "v1", + } - newobj, err := deser1.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, deserializeHint1) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &widget)) - deserConfig2 := NewDeserializerConfig() - deserConfig2.UseLatestWithMetadata = map[string]string{ + deserializeHint2 := serde.NewDeserializeHint() + deserializeHint2.UseLatestWithMetadata = map[string]string{ "application.version": "v2", } - deser2, err := NewDeserializer(client, serde.ValueSerde, deserConfig2) - serde.MaybeFail("Deserializer configuration", err) - deser2.Client = ser.Client - deser2.MessageFactory = testMessageFactory - - newobj, err = deser2.Deserialize("topic1", bytes) + newobj, err = deser.Deserialize("topic1", bytes, deserializeHint2) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &newWidget)) - deserConfig3 := NewDeserializerConfig() - deserConfig3.UseLatestWithMetadata = map[string]string{ + deserializeHint3 := serde.NewDeserializeHint() + deserializeHint3.UseLatestWithMetadata = map[string]string{ "application.version": "v3", } - deser3, err := NewDeserializer(client, serde.ValueSerde, deserConfig3) - serde.MaybeFail("Deserializer configuration", err) - deser3.Client = ser.Client - deser3.MessageFactory = testMessageFactory - - newobj, err = deser3.Deserialize("topic1", bytes) + newobj, err = deser.Deserialize("topic1", bytes, deserializeHint3) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &newerWidget)) } diff --git a/schemaregistry/serde/config.go b/schemaregistry/serde/config.go index 234ecda6b..fe40122fc 100644 --- a/schemaregistry/serde/config.go +++ b/schemaregistry/serde/config.go @@ -20,12 +20,6 @@ package serde type SerializerConfig struct { // AutoRegisterSchemas determines whether to automatically register schemas during serialization AutoRegisterSchemas bool - // UseSchemaID specifies a schema ID to use during serialization - UseSchemaID int - // UseLatestVersion specifies whether to use the latest schema version during serialization - UseLatestVersion bool - // UseLatestWithMetadata specifies whether to use the latest schema with metadata during serialization - UseLatestWithMetadata map[string]string // NormalizeSchemas determines whether to normalize schemas during serialization NormalizeSchemas bool // RuleConfig specifies configuration options to the rules @@ -37,8 +31,6 @@ func NewSerializerConfig() *SerializerConfig { c := &SerializerConfig{} c.AutoRegisterSchemas = true - c.UseSchemaID = -1 - c.UseLatestVersion = false c.NormalizeSchemas = false return c @@ -46,10 +38,6 @@ func NewSerializerConfig() *SerializerConfig { // DeserializerConfig is used to pass multiple configuration options to the deserializers. type DeserializerConfig struct { - // UseLatestVersion specifies whether to use the latest schema version during deserialization - UseLatestVersion bool - // UseLatestWithMetadata specifies whether to use the latest schema with metadata during serialization - UseLatestWithMetadata map[string]string // RuleConfig specifies configuration options to the rules RuleConfig map[string]string } @@ -60,3 +48,41 @@ func NewDeserializerConfig() *DeserializerConfig { return c } + +type SerializeHint struct { + // UseSchemaID specifies a schema ID to use during serialization + UseSchemaID int + // UseVersion specifies a specific schema version to use during serialization + UseSpecificVersion int + // UseLatestVersion specifies whether to use the latest schema version during serialization + UseLatestVersion bool + // UseLatestWithMetadata specifies whether to use the latest schema with metadata during serialization + UseLatestWithMetadata map[string]string +} + +func NewSerializeHint() *SerializeHint { + c := &SerializeHint{ + UseSchemaID: -1, + UseSpecificVersion: -1, + UseLatestVersion: false, + UseLatestWithMetadata: make(map[string]string), + } + + return c +} + +type DeserializeHint struct { + // UseLatestVersion specifies whether to use the latest schema version during deserialization + UseLatestVersion bool + // UseLatestWithMetadata specifies whether to use the latest schema with metadata during deserialization + UseLatestWithMetadata map[string]string +} + +func NewDeserializeHint() *DeserializeHint { + c := &DeserializeHint{ + UseLatestVersion: false, + UseLatestWithMetadata: make(map[string]string), + } + + return c +} diff --git a/schemaregistry/serde/jsonschema/json_schema.go b/schemaregistry/serde/jsonschema/json_schema.go index 8b637974e..c97b85cd1 100644 --- a/schemaregistry/serde/jsonschema/json_schema.go +++ b/schemaregistry/serde/jsonschema/json_schema.go @@ -87,16 +87,17 @@ func NewSerializer(client schemaregistry.Client, serdeType serde.Type, conf *Ser } // Serialize implements serialization of generic data to JSON -func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { +func (s *Serializer) Serialize(topic string, msg interface{}, hint *serde.SerializeHint) ([]byte, error) { if msg == nil { return nil, nil } var info schemaregistry.SchemaInfo var err error // Don't derive the schema if it is being looked up in the following ways - if s.Conf.UseSchemaID == -1 && - !s.Conf.UseLatestVersion && - len(s.Conf.UseLatestWithMetadata) == 0 { + if hint.UseSchemaID == -1 && + hint.UseSpecificVersion == -1 && + !hint.UseLatestVersion && + len(hint.UseLatestWithMetadata) == 0 { jschema := jsonschema.Reflect(msg) raw, err := json.Marshal(jschema) if err != nil { @@ -107,7 +108,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { SchemaType: "JSON", } } - id, err := s.GetID(topic, msg, &info) + id, err := s.GetID(topic, msg, &info, hint) if err != nil { return nil, err } @@ -175,17 +176,17 @@ func NewDeserializer(client schemaregistry.Client, serdeType serde.Type, conf *D } // Deserialize implements deserialization of generic data from JSON -func (s *Deserializer) Deserialize(topic string, payload []byte) (interface{}, error) { - return s.deserialize(topic, payload, nil) +func (s *Deserializer) Deserialize(topic string, payload []byte, hint *serde.DeserializeHint) (interface{}, error) { + return s.deserialize(topic, payload, nil, hint) } // DeserializeInto implements deserialization of generic data from JSON to the given object -func (s *Deserializer) DeserializeInto(topic string, payload []byte, msg interface{}) error { - _, err := s.deserialize(topic, payload, msg) +func (s *Deserializer) DeserializeInto(topic string, payload []byte, msg interface{}, hint *serde.DeserializeHint) error { + _, err := s.deserialize(topic, payload, msg, hint) return err } -func (s *Deserializer) deserialize(topic string, payload []byte, result interface{}) (interface{}, error) { +func (s *Deserializer) deserialize(topic string, payload []byte, result interface{}, hint *serde.DeserializeHint) (interface{}, error) { if len(payload) == 0 { return nil, nil } @@ -213,7 +214,7 @@ func (s *Deserializer) deserialize(topic string, payload []byte, result interfac if err != nil { return nil, err } - readerMeta, err := s.GetReaderSchema(subject) + readerMeta, err := s.GetReaderSchema(subject, hint) if err != nil { return nil, err } diff --git a/schemaregistry/serde/jsonschema/json_schema_test.go b/schemaregistry/serde/jsonschema/json_schema_test.go index 487581e00..eb713bda6 100644 --- a/schemaregistry/serde/jsonschema/json_schema_test.go +++ b/schemaregistry/serde/jsonschema/json_schema_test.go @@ -247,15 +247,17 @@ func TestJSONSchemaSerdeWithSimple(t *testing.T) { obj.StringField = "hi" obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{0, 0, 0, 1}) - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client + deserializeHint := serde.NewDeserializeHint() + var newobj JSONDemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj)) // serialize second object @@ -265,10 +267,10 @@ func TestJSONSchemaSerdeWithSimple(t *testing.T) { obj.StringField = "bye" obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{0, 0, 0, 1}) - bytes, err = ser.Serialize("topic1", &obj) + bytes, err = ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj)) } @@ -289,7 +291,7 @@ func TestJSONSchemaSerdeWithSimpleMap(t *testing.T) { obj["StringField"] = "hi" obj["BoolField"] = true obj["BytesField"] = base64.StdEncoding.EncodeToString([]byte{0, 0, 0, 1}) - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -300,7 +302,7 @@ func TestJSONSchemaSerdeWithSimpleMap(t *testing.T) { obj["IntField"] = 123.0 var newobj map[string]interface{} - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj)) } @@ -324,7 +326,7 @@ func TestJSONSchemaSerdeWithNested(t *testing.T) { obj := JSONNestedTestRecord{ OtherField: nested, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -332,7 +334,7 @@ func TestJSONSchemaSerdeWithNested(t *testing.T) { deser.Client = ser.Client var newobj JSONNestedTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj)) } @@ -347,7 +349,6 @@ func TestJSONSchemaSerdeWithReferences(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -390,16 +391,18 @@ func TestJSONSchemaSerdeWithReferences(t *testing.T) { obj := JSONNestedTestRecord{} obj.OtherField = nested - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client var newobj JSONNestedTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj)) } @@ -415,7 +418,7 @@ func TestFailingJSONSchemaValidationWithSimple(t *testing.T) { serConfig.EnableValidation = true // We don't want to risk registering one instead of using the already registered one serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -434,13 +437,16 @@ func TestFailingJSONSchemaValidationWithSimple(t *testing.T) { t.Errorf("Expected valid schema id, found %d", id) } - _, err = ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + _, err = ser.Serialize("topic1", &obj, serializeHint) if err != nil { t.Errorf("Expected no validation error, found %s", err) } diffObj := DifferentJSONDemoSchema{} - _, err = ser.Serialize("topic1", &diffObj) + _, err = ser.Serialize("topic1", &diffObj, serializeHint) if err == nil || !strings.Contains(err.Error(), "jsonschema") { t.Errorf("Expected validation error, found %s", err) } @@ -457,7 +463,7 @@ func TestJSONSchemaSerdeWithCELCondition(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -491,16 +497,18 @@ func TestJSONSchemaSerdeWithCELCondition(t *testing.T) { obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2}) - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client var newobj JSONDemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj)) } @@ -515,7 +523,7 @@ func TestJSONSchemaSerdeWithCELConditionFail(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -549,7 +557,10 @@ func TestJSONSchemaSerdeWithCELConditionFail(t *testing.T) { obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2}) - _, err = ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + _, err = ser.Serialize("topic1", &obj, serializeHint) var ruleErr serde.RuleConditionErr errors.As(err, &ruleErr) serde.MaybeFail("serialization", nil, serde.Expect(encRule, *ruleErr.Rule)) @@ -566,7 +577,7 @@ func TestJSONSchemaSerdeWithCELFieldTransform(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -600,11 +611,13 @@ func TestJSONSchemaSerdeWithCELFieldTransform(t *testing.T) { obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2}) - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client @@ -616,7 +629,7 @@ func TestJSONSchemaSerdeWithCELFieldTransform(t *testing.T) { obj2.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2}) var newobj JSONDemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2)) } @@ -631,7 +644,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithDef(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -666,11 +679,13 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithDef(t *testing.T) { obj.Name = "bob" obj.Address = addr - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client @@ -682,7 +697,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithDef(t *testing.T) { obj2.Address = addr2 var newobj JSONPerson - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2)) } @@ -697,7 +712,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithSimpleMap(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -730,7 +745,11 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithSimpleMap(t *testing.T) { obj["StringField"] = "hi" obj["BoolField"] = true obj["BytesField"] = base64.StdEncoding.EncodeToString([]byte{0, 0, 0, 1}) - bytes, err := ser.Serialize("topic1", &obj) + + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -746,7 +765,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithSimpleMap(t *testing.T) { obj2.BytesField = base64.StdEncoding.EncodeToString([]byte{0, 0, 0, 1}) var newobj JSONDemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2)) } @@ -761,7 +780,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithNestedMap(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -797,7 +816,10 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithNestedMap(t *testing.T) { obj := make(map[string]interface{}) obj["OtherField"] = nested - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -814,7 +836,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformWithNestedMap(t *testing.T) { obj2 := JSONNestedTestRecord{nested2} var newobj JSONNestedTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2)) } @@ -829,7 +851,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformComplex(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -862,11 +884,13 @@ func TestJSONSchemaSerdeWithCELFieldTransformComplex(t *testing.T) { obj.ObjectField = NestedSchema{StringField: "world"} obj.UnionField = &str - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client @@ -877,7 +901,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformComplex(t *testing.T) { obj2.UnionField = &str2 var newobj JSONComplexSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2)) } @@ -892,7 +916,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformComplexWithNil(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -924,11 +948,13 @@ func TestJSONSchemaSerdeWithCELFieldTransformComplexWithNil(t *testing.T) { obj.ObjectField = NestedSchema{StringField: "world"} obj.UnionField = nil - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client @@ -938,7 +964,7 @@ func TestJSONSchemaSerdeWithCELFieldTransformComplexWithNil(t *testing.T) { obj2.UnionField = nil var newobj JSONComplexSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj2)) } @@ -953,7 +979,7 @@ func TestJSONSchemaSerdeWithCELFieldCondition(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -987,16 +1013,18 @@ func TestJSONSchemaSerdeWithCELFieldCondition(t *testing.T) { obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2}) - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) - deserConfig := NewDeserializerConfig() - deser, err := NewDeserializer(client, serde.ValueSerde, deserConfig) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) deser.Client = ser.Client var newobj JSONDemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj)) } @@ -1011,7 +1039,7 @@ func TestJSONSchemaSerdeWithCELFieldConditionFail(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -1045,7 +1073,10 @@ func TestJSONSchemaSerdeWithCELFieldConditionFail(t *testing.T) { obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2}) - _, err = ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + _, err = ser.Serialize("topic1", &obj, serializeHint) var ruleErr serde.RuleConditionErr errors.As(err, &ruleErr) serde.MaybeFail("serialization", nil, serde.Expect(ruleErr, serde.RuleConditionErr{Rule: &encRule})) @@ -1062,7 +1093,6 @@ func TestJSONSchemaSerdeEncryptionWithSimpleMap(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1105,7 +1135,10 @@ func TestJSONSchemaSerdeEncryptionWithSimpleMap(t *testing.T) { obj["BoolField"] = true obj["BytesField"] = base64.StdEncoding.EncodeToString([]byte{0, 0, 0, 1}) - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1124,7 +1157,7 @@ func TestJSONSchemaSerdeEncryptionWithSimpleMap(t *testing.T) { deser.Client = ser.Client var newobj map[string]interface{} - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj)) } @@ -1139,7 +1172,6 @@ func TestJSONSchemaSerdeEncryption(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1182,7 +1214,10 @@ func TestJSONSchemaSerdeEncryption(t *testing.T) { obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2}) - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1198,7 +1233,7 @@ func TestJSONSchemaSerdeEncryption(t *testing.T) { deser.Client = ser.Client var newobj JSONDemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj)) } @@ -1213,7 +1248,6 @@ func TestJSONSchemaSerdeEncryptionWithUnion(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1256,7 +1290,10 @@ func TestJSONSchemaSerdeEncryptionWithUnion(t *testing.T) { obj.BoolField = true obj.BytesField = base64.StdEncoding.EncodeToString([]byte{1, 2}) - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1272,7 +1309,7 @@ func TestJSONSchemaSerdeEncryptionWithUnion(t *testing.T) { deser.Client = ser.Client var newobj JSONDemoSchema - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj)) } @@ -1287,7 +1324,6 @@ func TestJSONSchemaSerdeEncryptionWithReferences(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -1350,7 +1386,10 @@ func TestJSONSchemaSerdeEncryptionWithReferences(t *testing.T) { obj := JSONNestedTestRecord{} obj.OtherField = nested - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) // Reset encrypted field @@ -1366,7 +1405,7 @@ func TestJSONSchemaSerdeEncryptionWithReferences(t *testing.T) { deser.Client = ser.Client var newobj JSONNestedTestRecord - err = deser.DeserializeInto("topic1", bytes, &newobj) + err = deser.DeserializeInto("topic1", bytes, &newobj, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(&newobj, &obj)) } @@ -1516,69 +1555,66 @@ func TestJSONSchemaSerdeJSONataFullyCompatible(t *testing.T) { t.Errorf("Expected valid schema id, found %d", id) } - serConfig1 := NewSerializerConfig() - serConfig1.AutoRegisterSchemas = false - serConfig1.UseLatestVersion = false - serConfig1.UseLatestWithMetadata = map[string]string{ - "application.version": "v1", - } + serConfig := NewSerializerConfig() + serConfig.AutoRegisterSchemas = false - ser1, err := NewSerializer(client, serde.ValueSerde, serConfig1) + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) - bytes, err := ser1.Serialize("topic1", &widget) + serializeHint1 := serde.NewSerializeHint() + serializeHint1.UseLatestVersion = false + serializeHint1.UseLatestWithMetadata = map[string]string{ + "application.version": "v1", + } + + bytes, err := ser.Serialize("topic1", &widget, serializeHint1) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser1, bytes, widget, newWidget, newerWidget) + deserializeWithAllVersions(client, ser, bytes, widget, newWidget, newerWidget) - serConfig2 := NewSerializerConfig() - serConfig2.AutoRegisterSchemas = false - serConfig2.UseLatestVersion = false - serConfig2.UseLatestWithMetadata = map[string]string{ + serializeHint2 := serde.NewSerializeHint() + serializeHint2.UseLatestVersion = false + serializeHint2.UseLatestWithMetadata = map[string]string{ "application.version": "v2", } - ser2, err := NewSerializer(client, serde.ValueSerde, serConfig2) - serde.MaybeFail("Serializer configuration", err) - - bytes, err = ser2.Serialize("topic1", &newWidget) + bytes, err = ser.Serialize("topic1", &newWidget, serializeHint2) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser2, bytes, widget, newWidget, newerWidget) + deserializeWithAllVersions(client, ser, bytes, widget, newWidget, newerWidget) - serConfig3 := NewSerializerConfig() - serConfig3.AutoRegisterSchemas = false - serConfig3.UseLatestVersion = false - serConfig3.UseLatestWithMetadata = map[string]string{ + serializeHint3 := serde.NewSerializeHint() + serializeHint3.UseLatestVersion = false + serializeHint3.UseLatestWithMetadata = map[string]string{ "application.version": "v3", } - ser3, err := NewSerializer(client, serde.ValueSerde, serConfig3) - serde.MaybeFail("Serializer configuration", err) - - bytes, err = ser3.Serialize("topic1", &newerWidget) + bytes, err = ser.Serialize("topic1", &newerWidget, serializeHint3) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser3, bytes, widget, newWidget, newerWidget) + deserializeWithAllVersions(client, ser, bytes, widget, newWidget, newerWidget) } func deserializeWithAllVersions(client schemaregistry.Client, ser *Serializer, bytes []byte, widget OldWidget, newWidget NewWidget, newerWidget NewerWidget) { - deserConfig1 := NewDeserializerConfig() - deserConfig1.UseLatestWithMetadata = map[string]string{ - "application.version": "v1", - } + deserConfig1 := NewDeserializerConfig() deser1, err := NewDeserializer(client, serde.ValueSerde, deserConfig1) serde.MaybeFail("Deserializer configuration", err) deser1.Client = ser.Client deser1.MessageFactory = testMessageFactory1 - newobj, err := deser1.Deserialize("topic1", bytes) + deserializeHint1 := serde.NewDeserializeHint() + deserializeHint1.UseLatestWithMetadata = map[string]string{ + "application.version": "v1", + } + + newobj, err := deser1.Deserialize("topic1", bytes, deserializeHint1) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &widget)) deserConfig2 := NewDeserializerConfig() - deserConfig2.UseLatestWithMetadata = map[string]string{ + deserializeHint2 := serde.NewDeserializeHint() + deserializeHint2.UseLatestWithMetadata = map[string]string{ "application.version": "v2", } @@ -1587,11 +1623,12 @@ func deserializeWithAllVersions(client schemaregistry.Client, ser *Serializer, deser2.Client = ser.Client deser2.MessageFactory = testMessageFactory2 - newobj, err = deser2.Deserialize("topic1", bytes) + newobj, err = deser2.Deserialize("topic1", bytes, deserializeHint2) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &newWidget)) deserConfig3 := NewDeserializerConfig() - deserConfig3.UseLatestWithMetadata = map[string]string{ + deserializeHint3 := serde.NewDeserializeHint() + deserializeHint3.UseLatestWithMetadata = map[string]string{ "application.version": "v3", } @@ -1600,7 +1637,7 @@ func deserializeWithAllVersions(client schemaregistry.Client, ser *Serializer, deser3.Client = ser.Client deser3.MessageFactory = testMessageFactory3 - newobj, err = deser3.Deserialize("topic1", bytes) + newobj, err = deser3.Deserialize("topic1", bytes, deserializeHint3) serde.MaybeFail("deserialization", err, serde.Expect(newobj, &newerWidget)) } diff --git a/schemaregistry/serde/protobuf/protobuf.go b/schemaregistry/serde/protobuf/protobuf.go index a021e61d3..d5e245245 100644 --- a/schemaregistry/serde/protobuf/protobuf.go +++ b/schemaregistry/serde/protobuf/protobuf.go @@ -193,7 +193,7 @@ func (s *Deserializer) ConfigureDeserializer(client schemaregistry.Client, serde } // Serialize implements serialization of Protobuf data -func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { +func (s *Serializer) Serialize(topic string, msg interface{}, hint *serde.SerializeHint) ([]byte, error) { if msg == nil { return nil, nil } @@ -207,16 +207,17 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { var info schemaregistry.SchemaInfo var err error // Don't derive the schema if it is being looked up in the following ways - if s.Conf.UseSchemaID == -1 && - !s.Conf.UseLatestVersion && - len(s.Conf.UseLatestWithMetadata) == 0 { + if hint.UseSchemaID == -1 && + hint.UseSpecificVersion == -1 && + !hint.UseLatestVersion && + len(hint.UseLatestWithMetadata) == 0 { schemaInfo, err := s.getSchemaInfo(protoMsg) if err != nil { return nil, err } info = *schemaInfo } - id, err := s.GetID(topic, protoMsg, &info) + id, err := s.GetID(topic, protoMsg, &info, hint) if err != nil { return nil, err } @@ -513,13 +514,13 @@ func NewDeserializer(client schemaregistry.Client, serdeType serde.Type, conf *D } // Deserialize implements deserialization of Protobuf data -func (s *Deserializer) Deserialize(topic string, payload []byte) (interface{}, error) { - return s.deserialize(topic, payload, nil) +func (s *Deserializer) Deserialize(topic string, payload []byte, hint *serde.DeserializeHint) (interface{}, error) { + return s.deserialize(topic, payload, nil, hint) } // DeserializeInto implements deserialization of Protobuf data to the given object -func (s *Deserializer) DeserializeInto(topic string, payload []byte, msg interface{}) error { - result, err := s.deserialize(topic, payload, msg) +func (s *Deserializer) DeserializeInto(topic string, payload []byte, msg interface{}, hint *serde.DeserializeHint) error { + result, err := s.deserialize(topic, payload, msg, hint) // Copy the result into the target since we may have created a clone during transformations value := reflect.ValueOf(msg) if value.Kind() == reflect.Pointer { @@ -529,7 +530,7 @@ func (s *Deserializer) DeserializeInto(topic string, payload []byte, msg interfa return err } -func (s *Deserializer) deserialize(topic string, payload []byte, result interface{}) (interface{}, error) { +func (s *Deserializer) deserialize(topic string, payload []byte, result interface{}, hint *serde.DeserializeHint) (interface{}, error) { if len(payload) == 0 { return nil, nil } @@ -554,7 +555,7 @@ func (s *Deserializer) deserialize(topic string, payload []byte, result interfac if err != nil { return nil, err } - readerMeta, err := s.GetReaderSchema(subject) + readerMeta, err := s.GetReaderSchema(subject, hint) if err != nil { return nil, err } diff --git a/schemaregistry/serde/protobuf/protobuf_test.go b/schemaregistry/serde/protobuf/protobuf_test.go index d09ae423d..b2dc8a757 100644 --- a/schemaregistry/serde/protobuf/protobuf_test.go +++ b/schemaregistry/serde/protobuf/protobuf_test.go @@ -113,7 +113,7 @@ func TestProtobufSerdeWithSimple(t *testing.T) { Id: 123, Works: []string{"The Castle", "The Trial"}, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -123,7 +123,9 @@ func TestProtobufSerdeWithSimple(t *testing.T) { err = deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser.Deserialize("topic1", bytes) + deserializeHint := serde.NewDeserializeHint() + + newobj, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) // serialize second object @@ -132,13 +134,13 @@ func TestProtobufSerdeWithSimple(t *testing.T) { Id: 123, Works: []string{"Fear And Trembling"}, } - bytes, err = ser.Serialize("topic1", &obj) + bytes, err = ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) - newobj, err = deser.Deserialize("topic1", bytes) + newobj, err = deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) - err = deser.DeserializeInto("topic1", bytes, newobj) + err = deser.DeserializeInto("topic1", bytes, newobj, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) } @@ -157,7 +159,7 @@ func TestProtobufSerdeWithSecondMessage(t *testing.T) { Size: "Extra extra large", Toppings: []string{"anchovies", "mushrooms"}, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -167,7 +169,7 @@ func TestProtobufSerdeWithSecondMessage(t *testing.T) { err = deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) } @@ -185,7 +187,7 @@ func TestProtobufSerdeWithNestedMessage(t *testing.T) { obj := test.NestedMessage_InnerMessage{ Id: "inner", } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -195,7 +197,7 @@ func TestProtobufSerdeWithNestedMessage(t *testing.T) { err = deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) } @@ -231,7 +233,7 @@ func TestProtobufSerdeWithReference(t *testing.T) { IsActive: true, TestMesssage: &msg, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -241,7 +243,7 @@ func TestProtobufSerdeWithReference(t *testing.T) { err = deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) } @@ -263,7 +265,7 @@ func TestProtobufSerdeWithCycle(t *testing.T) { Value: 1, Next: &inner, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -273,7 +275,7 @@ func TestProtobufSerdeWithCycle(t *testing.T) { err = deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) } @@ -286,9 +288,11 @@ func TestProtobufSerdeEmptyMessage(t *testing.T) { deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) - _, err = deser.Deserialize("topic1", nil) + deserializeHint := serde.NewDeserializeHint() + + _, err = deser.Deserialize("topic1", nil, deserializeHint) serde.MaybeFail("deserialization", err) - _, err = deser.Deserialize("topic1", []byte{}) + _, err = deser.Deserialize("topic1", []byte{}, deserializeHint) serde.MaybeFail("deserialization", err) } @@ -303,7 +307,7 @@ func TestProtobufSerdeWithCELCondition(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -337,7 +341,10 @@ func TestProtobufSerdeWithCELCondition(t *testing.T) { Works: []string{"The Castle", "The Trial"}, } - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deserConfig := NewDeserializerConfig() @@ -348,7 +355,7 @@ func TestProtobufSerdeWithCELCondition(t *testing.T) { err = deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) } @@ -363,7 +370,7 @@ func TestProtobufSerdeWithCELConditionFail(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -397,7 +404,10 @@ func TestProtobufSerdeWithCELConditionFail(t *testing.T) { Works: []string{"The Castle", "The Trial"}, } - _, err = ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + _, err = ser.Serialize("topic1", &obj, serializeHint) var ruleErr serde.RuleConditionErr errors.As(err, &ruleErr) serde.MaybeFail("serialization", nil, serde.Expect(encRule, *ruleErr.Rule)) @@ -414,7 +424,7 @@ func TestProtobufSerdeWithCELFieldTransform(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -448,7 +458,10 @@ func TestProtobufSerdeWithCELFieldTransform(t *testing.T) { Works: []string{"The Castle", "The Trial"}, } - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deserConfig := NewDeserializerConfig() @@ -466,7 +479,7 @@ func TestProtobufSerdeWithCELFieldTransform(t *testing.T) { Works: []string{"The Castle", "The Trial"}, } - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj2.ProtoReflect())) } @@ -481,7 +494,7 @@ func TestProtobufSerdeWithCELFieldCondition(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -515,7 +528,10 @@ func TestProtobufSerdeWithCELFieldCondition(t *testing.T) { Works: []string{"The Castle", "The Trial"}, } - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deserConfig := NewDeserializerConfig() @@ -526,7 +542,7 @@ func TestProtobufSerdeWithCELFieldCondition(t *testing.T) { err = deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser.Deserialize("topic1", bytes) + newobj, err := deser.Deserialize("topic1", bytes, serde.NewDeserializeHint()) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), obj.ProtoReflect())) } @@ -541,7 +557,7 @@ func TestProtobufSerdeWithCELFieldConditionFail(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -575,7 +591,10 @@ func TestProtobufSerdeWithCELFieldConditionFail(t *testing.T) { Works: []string{"The Castle", "The Trial"}, } - _, err = ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + _, err = ser.Serialize("topic1", &obj, serializeHint) var ruleErr serde.RuleConditionErr errors.As(err, &ruleErr) serde.MaybeFail("serialization", nil, serde.Expect(ruleErr, serde.RuleConditionErr{Rule: &encRule})) @@ -592,7 +611,6 @@ func TestProtobufSerdeEncryption(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serConfig.UseLatestVersion = true serConfig.RuleConfig = map[string]string{ "secret": "mysecret", } @@ -635,7 +653,10 @@ func TestProtobufSerdeEncryption(t *testing.T) { Works: []string{"The Castle", "The Trial"}, } - bytes, err := ser.Serialize("topic1", &obj) + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) deserConfig := NewDeserializerConfig() @@ -649,10 +670,12 @@ func TestProtobufSerdeEncryption(t *testing.T) { err = deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser.Deserialize("topic1", bytes) + deserializeHint := serde.NewDeserializeHint() + + newobj, err := deser.Deserialize("topic1", bytes, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(*test.Author).Name, obj.Name)) - err = deser.DeserializeInto("topic1", bytes, newobj) + err = deser.DeserializeInto("topic1", bytes, newobj, deserializeHint) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(*test.Author).Name, obj.Name)) } @@ -802,94 +825,83 @@ func TestProtobufSerdeJSONataFullyCompatible(t *testing.T) { t.Errorf("Expected valid schema id, found %d", id) } - serConfig1 := NewSerializerConfig() - serConfig1.AutoRegisterSchemas = false - serConfig1.UseLatestVersion = false - serConfig1.UseLatestWithMetadata = map[string]string{ - "application.version": "v1", - } + serConfig := NewSerializerConfig() + serConfig.AutoRegisterSchemas = false - ser1, err := NewSerializer(client, serde.ValueSerde, serConfig1) + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) - bytes, err := ser1.Serialize("topic1", &widget) + serializeHint1 := serde.NewSerializeHint() + serializeHint1.UseLatestVersion = false + serializeHint1.UseLatestWithMetadata = map[string]string{ + "application.version": "v1", + } + + bytes, err := ser.Serialize("topic1", &widget, serializeHint1) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser1, bytes, &widget, &newWidget, &newerWidget) + deserializeWithAllVersions(client, ser, bytes, &widget, &newWidget, &newerWidget) - serConfig2 := NewSerializerConfig() - serConfig2.AutoRegisterSchemas = false - serConfig2.UseLatestVersion = false - serConfig2.UseLatestWithMetadata = map[string]string{ + serializeHint2 := serde.NewSerializeHint() + serializeHint2.UseLatestVersion = false + serializeHint2.UseLatestWithMetadata = map[string]string{ "application.version": "v2", } - ser2, err := NewSerializer(client, serde.ValueSerde, serConfig2) - serde.MaybeFail("Serializer configuration", err) - - bytes, err = ser2.Serialize("topic1", &newWidget) + bytes, err = ser.Serialize("topic1", &newWidget, serializeHint2) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser2, bytes, &widget, &newWidget, &newerWidget) + deserializeWithAllVersions(client, ser, bytes, &widget, &newWidget, &newerWidget) - serConfig3 := NewSerializerConfig() - serConfig3.AutoRegisterSchemas = false - serConfig3.UseLatestVersion = false - serConfig3.UseLatestWithMetadata = map[string]string{ + serializeHint3 := serde.NewSerializeHint() + serializeHint3.UseLatestVersion = false + serializeHint3.UseLatestWithMetadata = map[string]string{ "application.version": "v3", } - ser3, err := NewSerializer(client, serde.ValueSerde, serConfig3) - serde.MaybeFail("Serializer configuration", err) - - bytes, err = ser3.Serialize("topic1", &newerWidget) + bytes, err = ser.Serialize("topic1", &newerWidget, serializeHint3) serde.MaybeFail("serialization", err) - deserializeWithAllVersions(client, ser3, bytes, &widget, &newWidget, &newerWidget) + deserializeWithAllVersions(client, ser, bytes, &widget, &newWidget, &newerWidget) } func deserializeWithAllVersions(client schemaregistry.Client, ser *Serializer, bytes []byte, widget *test.Widget, newWidget *test.NewWidget, newerWidget *test.NewerWidget) { - deserConfig1 := NewDeserializerConfig() - deserConfig1.UseLatestWithMetadata = map[string]string{ - "application.version": "v1", - } - deser1, err := NewDeserializer(client, serde.ValueSerde, deserConfig1) + deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) serde.MaybeFail("Deserializer configuration", err) - deser1.Client = ser.Client - err = deser1.ProtoRegistry.RegisterMessage(widget.ProtoReflect().Type()) + deser.Client = ser.Client + err = deser.ProtoRegistry.RegisterMessage(widget.ProtoReflect().Type()) + serde.MaybeFail("register message", err) + err = deser.ProtoRegistry.RegisterMessage(newWidget.ProtoReflect().Type()) + serde.MaybeFail("register message", err) + err = deser.ProtoRegistry.RegisterMessage(newerWidget.ProtoReflect().Type()) serde.MaybeFail("register message", err) - newobj, err := deser1.Deserialize("topic1", bytes) + deserializeHint1 := serde.NewDeserializeHint() + deserializeHint1.UseLatestWithMetadata = map[string]string{ + "application.version": "v1", + } + + newobj, err := deser.Deserialize("topic1", bytes, deserializeHint1) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), widget.ProtoReflect())) - deserConfig2 := NewDeserializerConfig() - deserConfig2.UseLatestWithMetadata = map[string]string{ + deserializeHint2 := serde.NewDeserializeHint() + deserializeHint2.UseLatestWithMetadata = map[string]string{ "application.version": "v2", } - deser2, err := NewDeserializer(client, serde.ValueSerde, deserConfig2) - serde.MaybeFail("Deserializer configuration", err) - deser2.Client = ser.Client - err = deser2.ProtoRegistry.RegisterMessage(newWidget.ProtoReflect().Type()) - serde.MaybeFail("register message", err) - - newobj, err = deser2.Deserialize("topic1", bytes) + newobj, err = deser.Deserialize("topic1", bytes, deserializeHint2) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), newWidget.ProtoReflect())) - deserConfig3 := NewDeserializerConfig() - deserConfig3.UseLatestWithMetadata = map[string]string{ + serde.MaybeFail("register message", err) + + deserializeHint3 := serde.NewDeserializeHint() + deserializeHint3.UseLatestWithMetadata = map[string]string{ "application.version": "v3", } - deser3, err := NewDeserializer(client, serde.ValueSerde, deserConfig3) - serde.MaybeFail("Deserializer configuration", err) - deser3.Client = ser.Client - err = deser3.ProtoRegistry.RegisterMessage(newerWidget.ProtoReflect().Type()) - serde.MaybeFail("register message", err) - - newobj, err = deser3.Deserialize("topic1", bytes) + newobj, err = deser.Deserialize("topic1", bytes, deserializeHint3) serde.MaybeFail("deserialization", err, serde.Expect(newobj.(proto.Message).ProtoReflect(), newerWidget.ProtoReflect())) } @@ -926,9 +938,11 @@ func BenchmarkProtobufSerWithReference(b *testing.B) { TestMesssage: &msg, } + serializeHint := serde.NewSerializeHint() + b.ResetTimer() for i := 0; i < b.N; i++ { - ser.Serialize("topic1", &obj) + ser.Serialize("topic1", &obj, serializeHint) } } @@ -967,9 +981,11 @@ func BenchmarkProtobufSerWithReferenceCached(b *testing.B) { TestMesssage: &msg, } + serializeHint := serde.NewSerializeHint() + b.ResetTimer() for i := 0; i < b.N; i++ { - ser.Serialize("topic1", &obj) + ser.Serialize("topic1", &obj, serializeHint) } } @@ -1005,7 +1021,7 @@ func BenchmarkProtobufDeserWithReference(b *testing.B) { IsActive: true, TestMesssage: &msg, } - bytes, err := ser.Serialize("topic1", &obj) + bytes, err := ser.Serialize("topic1", &obj, serde.NewSerializeHint()) serde.MaybeFail("serialization", err) deser, err := NewDeserializer(client, serde.ValueSerde, NewDeserializerConfig()) @@ -1014,8 +1030,10 @@ func BenchmarkProtobufDeserWithReference(b *testing.B) { deser.ProtoRegistry.RegisterMessage(obj.ProtoReflect().Type()) + deserializeHint := serde.NewDeserializeHint() + b.ResetTimer() for i := 0; i < b.N; i++ { - deser.Deserialize("topic1", bytes) + deser.Deserialize("topic1", bytes, deserializeHint) } } diff --git a/schemaregistry/serde/serde.go b/schemaregistry/serde/serde.go index ddccefe72..3418ec482 100644 --- a/schemaregistry/serde/serde.go +++ b/schemaregistry/serde/serde.go @@ -60,7 +60,7 @@ type Serializer interface { conf *SerializerConfig) error // Serialize will serialize the given message, which should be a pointer. // For example, in Protobuf, messages are always a pointer to a struct and never just a struct. - Serialize(topic string, msg interface{}) ([]byte, error) + Serialize(topic string, msg interface{}, hint *SerializeHint) ([]byte, error) Close() error } @@ -70,9 +70,9 @@ type Deserializer interface { conf *DeserializerConfig) error // Deserialize will call the MessageFactory to create an object // into which we will unmarshal data. - Deserialize(topic string, payload []byte) (interface{}, error) + Deserialize(topic string, payload []byte, hint *DeserializeHint) (interface{}, error) // DeserializeInto will unmarshal data into the given object. - DeserializeInto(topic string, payload []byte, msg interface{}) error + DeserializeInto(topic string, payload []byte, msg interface{}, hint *DeserializeHint) error Close() error } @@ -405,11 +405,12 @@ func TopicNameStrategy(topic string, serdeType Type, schema schemaregistry.Schem } // GetID returns a schema ID for the given schema -func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregistry.SchemaInfo) (int, error) { +func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregistry.SchemaInfo, hint *SerializeHint) (int, error) { autoRegister := s.Conf.AutoRegisterSchemas - useSchemaID := s.Conf.UseSchemaID - useLatestWithMetadata := s.Conf.UseLatestWithMetadata - useLatest := s.Conf.UseLatestVersion + useSchemaID := hint.UseSchemaID + useSpecificVersion := hint.UseSpecificVersion + useLatestWithMetadata := hint.UseLatestWithMetadata + useLatest := hint.UseLatestVersion normalizeSchema := s.Conf.NormalizeSchemas var id = -1 @@ -428,6 +429,13 @@ func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregist return -1, err } id = useSchemaID + } else if useSpecificVersion >= 0 { + metadata, err := s.Client.GetSchemaMetadata(subject, useSpecificVersion) + if err != nil { + return -1, err + } + *info = metadata.SchemaInfo + id = metadata.ID } else if len(useLatestWithMetadata) != 0 { metadata, err := s.Client.GetLatestWithMetadata(subject, useLatestWithMetadata, true) if err != nil { @@ -785,9 +793,10 @@ func (s *BaseDeserializer) GetSchema(topic string, payload []byte) (schemaregist } // GetReaderSchema returns a schema for reading -func (s *BaseDeserializer) GetReaderSchema(subject string) (*schemaregistry.SchemaMetadata, error) { - useLatestWithMetadata := s.Conf.UseLatestWithMetadata - useLatest := s.Conf.UseLatestVersion +func (s *BaseDeserializer) GetReaderSchema(subject string, hint *DeserializeHint) (*schemaregistry.SchemaMetadata, error) { + useLatestWithMetadata := hint.UseLatestWithMetadata + useLatest := hint.UseLatestVersion + if len(useLatestWithMetadata) != 0 { meta, err := s.Client.GetLatestWithMetadata(subject, useLatestWithMetadata, true) if err != nil { From b88dbd1cc0c6ebcac91aee8e1bcb870d71dc1f47 Mon Sep 17 00:00:00 2001 From: Eduardas Kazakas Date: Mon, 16 Dec 2024 07:11:55 +0200 Subject: [PATCH 2/3] Organize code --- schemaregistry/serde/avrov2/avro_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/schemaregistry/serde/avrov2/avro_test.go b/schemaregistry/serde/avrov2/avro_test.go index aa5fcd90d..031c55826 100644 --- a/schemaregistry/serde/avrov2/avro_test.go +++ b/schemaregistry/serde/avrov2/avro_test.go @@ -342,8 +342,7 @@ func TestAvroSerdeWithSimpleMap(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serializeHint := serde.NewSerializeHint() - serializeHint.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -364,6 +363,10 @@ func TestAvroSerdeWithSimpleMap(t *testing.T) { obj["StringField"] = "hi" obj["BoolField"] = true obj["BytesField"] = []byte{1, 2} + + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) @@ -452,8 +455,7 @@ func TestAvroSerdeWithReferences(t *testing.T) { serConfig := NewSerializerConfig() serConfig.AutoRegisterSchemas = false - serializeHint := serde.NewSerializeHint() - serializeHint.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) serde.MaybeFail("Serializer configuration", err) @@ -496,6 +498,9 @@ func TestAvroSerdeWithReferences(t *testing.T) { OtherField: nested, } + serializeHint := serde.NewSerializeHint() + serializeHint.UseLatestVersion = true + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err) From 0f283ae448f9bba09311e0cbec9c8f959abc446e Mon Sep 17 00:00:00 2001 From: Eduardas Kazakas Date: Mon, 16 Dec 2024 07:17:11 +0200 Subject: [PATCH 3/3] cleanup --- schemaregistry/serde/avrov2/avro_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/schemaregistry/serde/avrov2/avro_test.go b/schemaregistry/serde/avrov2/avro_test.go index 031c55826..3c503bf04 100644 --- a/schemaregistry/serde/avrov2/avro_test.go +++ b/schemaregistry/serde/avrov2/avro_test.go @@ -656,6 +656,7 @@ func TestAvroSchemaEvolution(t *testing.T) { serializeHint := serde.NewSerializeHint() serializeHint.UseLatestVersion = true + bytes, err := ser.Serialize("topic1", &obj, serializeHint) serde.MaybeFail("serialization", err)