Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro with Kafka Connect sink connector #303

Open
OuesFa opened this issue Nov 17, 2021 · 8 comments
Open

Avro with Kafka Connect sink connector #303

OuesFa opened this issue Nov 17, 2021 · 8 comments
Labels

Comments

@OuesFa
Copy link

OuesFa commented Nov 17, 2021

Hello 👋
Is it possible to read avro data from Kafka using the sink connector and automatically retrieve schemas from the Confluent Schema Registry? In which case in which destination formats can be used to store data into Pub/Sub?
Thanks !

@kamalaboulhosn
Copy link
Collaborator

Can you explain a little more what you are trying to do? What is the format you want the data written in when sent to Pub/Sub? In what format is the data you've written to Kafka stored?

@OuesFa
Copy link
Author

OuesFa commented Nov 17, 2021

Thanks @kamalaboulhosn for your help!

Yes, so I'm trying to sink data from Kafka topics that has been written using kafka-avro-serializer, so basically there are byte arrays within the topics, containing the schema ID registered within the Confluent Schema Registry then the payload of the event itself.

I would like to copy the data from those topics to Pub/Sub, in JSON target format for instance.

I would like the connector to read the data, deserialize it then convert it to json and send to PubSub topic.

That's what does for instance the GCS sink connector using the following confs

      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://localhost:8081",

@kamalaboulhosn
Copy link
Collaborator

I believe the notion of the schema registry is a concept specific to Confluent Kafka and not part of the generic, open-source Kafka connect infrastructure. The GCE connector you link to is provided by Confluent, whereas this one is not. At this time, we do not support any lookup of schema in a schema registry via this connector.

@OuesFa
Copy link
Author

OuesFa commented Nov 17, 2021

Ok so for now the only way to read avro from kafka topics would be to provide avro file with the schema embedded within every kafka message?

@kamalaboulhosn
Copy link
Collaborator

You can dump the Avro messages into the Cloud Pub/Sub as-is since they are just bytes. You'd then have to rely on your subscribers to decode the messages. If all messages on a topic use the same schema, then you could potentially take advantage of Pub/Sub's schema support.

@OuesFa
Copy link
Author

OuesFa commented Nov 17, 2021

Actually we use Kafka topics with several schemas. For instance UserDeleted & UserCreated messages have different schemas but need to be stored in the same topic partition to ensure ordering.

@kamalaboulhosn
Copy link
Collaborator

Yeah, so in that case, there is no way to convert the Avro into another format within the connector. You could store your schema in Pub/Sub and then manually attach the path to it as an attribute in your messages so that you can pull the schema and decode messages, though this would require your Kafka publisher to publish with the metadata in the headers.

@xsajkha
Copy link

xsajkha commented Apr 3, 2024

I have a topic that has also an avro schema for the message key. I am configuring the key.converter for this connector but I can not see the messages are sinked in my pubsub topic.
"key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "<%= @schema_registry_url %>"

In the pubsub topic, I have created the topic and defined avro schema for pubsub topic(for message value offcourse - copied the avro scehma from the confluent schema registry and created a new schema in gcp pubsub for the topic). I dont see any error etc in my logs that why messages are not being sinked in pubsub topic.
Is there any possibility to sink the kafka message key e.g {"mykey":"mykeyvalue"} in this pubsub sink connector?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants