Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSK Java producer/consumer with both key and value in AVRO and using glue schema registry #234

Closed
pc-akothapeta opened this issue Nov 11, 2022 · 12 comments

Comments

@pc-akothapeta
Copy link

pc-akothapeta commented Nov 11, 2022

0

I am trying to use the MSK connect Mysql CDC connector with both key and value as AVRO schemas using Glue Schema registry (GSR).

When I was doing this using confluent schema registry, the schema name for both key and value will be something like serverName.schemaName.tableName_key and serverName.schemaName.tableName_value.

But when I use the GSR both key and value schemas are coming out as serverName.schemaName.tableName and hence try to overwrite each schema and it fails.

So workaround I figured out was to use two different registries for key and value and it works. I was also able to read the topics using JDBC sink connector.

However I was not able to figure out how to write a simple java producer/consumer to write/read the topic that key and value both are AVRO and use two different registries for key and value schema.

I looked at the code in GSR github, but there are only one registry name and one schema name in the AWSSchemaRegistryConstants class, so not sure how to pass two different registries and schemas. I appreciate any examples. https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java

 /**
 * Registry Name.
 */
 public static final String REGISTRY_NAME = "registry.name";
 /**
 * Schema name.
 */
 public static final String SCHEMA_NAME = "schemaName";
@pc-akothapeta pc-akothapeta changed the title MskConnect with debezium cdc source connector with GSR MSK Java producer/consumer with both key and value in AVRO and using glue schema registry Nov 22, 2022
@OneCricketeer
Copy link

Duplicates #93

@pc-akothapeta
Copy link
Author

@OneCricketeer , I guess my issue is slightly different.

  1. Ability to have different Key and value schema naming. Which is Key and value serializers for the same topic use the same Glue schema #93
  2. Ability to specify two different registries for key and value schemas, which is what I am asking for here. Key and value serializers for the same topic use the same Glue schema #93 above kinda solves the need of having different registries, but I am sure some one will run into an use case in the feature.

@OneCricketeer
Copy link

OneCricketeer commented Nov 29, 2022

Can you please explain why you think you'll need two complete separate Registry servers? Topics exist in one Kafka cluster. Only one Registry should be associated with that cluster.

(note: nothing would prevent you from setting key converter.schema.registry.url and value.converter.schema.registry.url to completely different values, though these are Confluent settings, I'm not sure about Glue, but try replacing schema.registry.url by registry.name on each Converter)

My point was that you can already set different schema names in one cluster / registry

@pc-akothapeta
Copy link
Author

I wont need two different registries if the schema names are fixed.

@OneCricketeer
Copy link

Clarify what you mean by "fixed". They are "fixed (to static values)" if you use schemaName, as mentioned in the other issue.

Only Confluent defaults to use -key / -value suffix. That's not a standard used across any implementation of a Registry...

@pc-akothapeta
Copy link
Author

OK, I thought #93 is still not resolved.
In CDC connectors one cant specify the schema name. CDC connectors create a topic for each table. So if you specify a schema name for all the tables it will try to create a schema with same name, which would fail as the schemas are all different and they will try to overwrite each other.
The other option is like @laxgoalie392 did create a subclass. Is that what the final resolution is? If so where do I set the schemaNameGenerationClass property?

@OneCricketeer
Copy link

OneCricketeer commented Nov 29, 2022

It is; schemaName can be set, just like you'd suggested for the registry name.

CDC connectors create a topic for each table

That is true, but you can set a whitelist for a single table at a time. Deploy multiple connector configs rather than capture all tables (that pattern will actually scale better because errors are limited to one table at a time, and Kafka producers in each connector only need to write to one topic).

Otherwise, yeah, subclassing the subject naming strategy is what you want. As mentioned before, you'd set those properties into value.converter and key.converter, but maybe also look at #126

@pc-akothapeta
Copy link
Author

Thanks @OneCricketeer

I would imaging the CDC connector for each table is going to be inefficient.. As the connectors have to read the same logs many times.

So overriding the class would work for schema name and by specifying the key.converter.schemaNameGenerationClass and value.converter.schemaNameGenerationClass parameters.

However how would you read this topic thru a Java consumer? So this goes back to original question..
I looked at the code in GSR github, but there are only one registry name and one schema name in the AWSSchemaRegistryConstants class, so not sure how to pass two different registries OR schemas. I appreciate any examples. https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java

 /**
 * Registry Name.
 */
 public static final String REGISTRY_NAME = "registry.name";
 /**
 * Schema name.
 */
 public static final String SCHEMA_NAME = "schemaName";

@OneCricketeer
Copy link

You'd need to use SCHEMA_NAMING_GENERATION_CLASS in your consumer config.

Copy the class/dependency between the Connect workers and consumers

@pc-akothapeta
Copy link
Author

OK.. Thats in the AWSSchemaRegistryConstants..
I will give it a try and see how that goes.

@pc-akothapeta
Copy link
Author

pc-akothapeta commented Nov 29, 2022

@OneCricketeer this works great.. Thanks again for helping me out here.

For reference, this is what I did.

In the common project create a custom class

import com.amazonaws.services.schemaregistry.common.AWSSchemaNamingStrategy

class MySchemaNamingStrategy extends AWSSchemaNamingStrategy {
  @Override 
  public String getSchemaName(String transportName, String, data, boolean isKey) {
     return transportName + (isKey ? "-key" : "-value");
  }
  @Override 
  public String getSchemaName(String transportName) {
     return transportName + "-value";
  }
}

On the MSK Connect I set the following parameters

    key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    key.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.MySchemaNamingStrategy
    value.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.MySchemaNamingStrategy
    key.converter.registryName=MyRegistry
    value.converter.registryName=MyRegistry
    key.converter.region=us-east-1
    value.converter.region=us-east-1
    key.converter.schemaAutoRegistrationEnabled=true
    value.converter.schemaAutoRegistrationEnabled=true
    key.converter.avroRecordType=GENERIC_RECORD
    value.converter.avroRecordType=GENERIC_RECORD

This will create topics for each table with appropriate schemas

then the java consumer will look similar to

        Properties props = new Properties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        
        props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
        props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
        props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);
        props.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS , "com.amazonaws.services.schemaregistry.common.ConfluentSchemaNamingStrategy");
        
        try {
	final KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<GenericRecord, GenericRecord>(props);
	consumer.subscribe(Collections.singletonList(topicName));
  
            while (true) {
                final ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(Duration.ofMillis(1000) );
                System.out.println("Received messages : count = " + records.count());
                for (final ConsumerRecord<GenericRecord, GenericRecord> record : records) {
                    final String key = (record.key() == null ? "NULL_KEY" : record.key().toString());
                    final String value = (record.value() == null ? "NULL_VALUE" : record.value().toString());
                    System.out.println("Received message: key = " + key );
                    System.out.println("Received message: value = " + value);
                }
            }
        } catch (Exception e) {
        	e.printStackTrace();
        }

@OneCricketeer
Copy link

Looks great. Feel free to close the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants