Building microservices around Apache Kafka is your job, and life is great. One day, you hear community members talking about some neat Apache Pulsar features, and get you intrigued. I mean, we all love Kafka, but you can't avoid wondering if migrating one of your projects to Pulsar is a good idea. Then it happens. You find Pulsar supports Kafka clients natively via a protocol handler called KoP: Kafka-On-Pulsar.
This gets you pumped. Is that it? Can I go ahead and simply point my microservices to Pulsar and be a hero with this migration? But you must be responsible; and history says you shouldn't believe migrations like this are refactoring free. This is why you may get interested in what this repository offers. To prove that KoP is worth your time, use it in scenarios that would really put the technology to stress, going way beyond the bread-and-buttter of producing and consuming messages. Follow the instructions below to test KoP in the following scenarios:
-
Microservice built for Apache Kafka: microservice written in Spring Boot, that connects to a "Kafka Cluster" using two endpoints exposed by Apache Pulsar with KoP enabled.
-
CDC using Debezium for MySQL: Kafka Connect with a connector from Debezium to capture in near real-time changes made to a MySQL database. Kafka Connect uses an endpoint exposed by Apache Pulsar with KoP enabled.
-
Stream processing using ksqlDB: Stream processing pipeline using ksqlDB to flat out events produced to Kafka. ksqlDB uses an endpoint exposed by Apache Pulsar with KoP enabled.
💡 This repository was created as part of the session entitled Is Using KoP (Kafka-On-Pulsar) a Good Idea? delivered in the Pulsar Summit 2022 conference, in San Francisco, CA. Click in the image below to access slides and recording of this session.
- Docker 4.11+
- Java 17+
- Maven 3.8.6+
- Apache Pulsar 2.10.1+
- Confluent Platform 7.0.5+
Before jumping into any of the scenarios, you must start the shared infrastructure all of them will use. This includes one Zookeeper instance, two Bookkeepers, and two Pulsar brokers.
1️⃣ Start the persistence layer
sh start-persistence.sh
👀 You must wait until the containers zookeeper
, persistence-bookie-1
, and persistence-bookie-2
are healthy to proceed with the next step.
2️⃣ Start the Pulsar brokers with KoP enabled
sh start-brokers.sh
👀 You must wait until the containers kafka-1
and kafka-2
are healthy to proceed with any next step.
This scenario checks two things. First, if KoP provides a truly Kafka-compatible API where third-party frameworks such as Spring Boot can connect with without deployment problems. Second, to check whether KoP is capable of mimic the distributed protocol from Kafka. Kafka is not just a one-directional typical client-server protocol. Instead, it is a bi-directional protocol where messages are exchanged from both parties. A good example is when a producer connects to the Kafka cluster using one bootstrap server endpoint, and the cluster keeps periodically updating that list back to the producer with metadata about the new cluster formation. Same for the consumer, which after joining a group, may be eventually removed by the cluster for the absence of valid heartbeats.
To validate this scenario, two Apache Pulsar brokers with KoP enabled will be executed, and the microservice will use the endpoint of only one broker to bootstrap the cluster. When everything is up-and-running and working as expected, the broker being used by the microservice will be killed, and the assumption is that the microservice should fallback to the other available broker, and continue its execution. If that ever happens, it means that the bootstrap worked as expected, giving the specifications of how Kafka manages clusters and sends this information to its clients.
1️⃣ Run the Spring Boot microservice
sh microservice-with-kafka/run-microservice.sh
👀 You must wait until the microservice connects with the brokers and start producing and consuming messages like this:
org.summit.pulsar.demo.FiveSecondsTom : Hi, I'm Tom 😄
2️⃣ Find out which broker is the leader of the partition
$PULSAR_HOME/bin/pulsar-admin --admin-url http://localhost:8081 topics lookup persistent://public/default/fiveSecondsTomTopic
👀 Take a note of which broker shows up in this lookup.
3️⃣ Forcing a fail-over situation by killing the leader
sh kill-broker.sh <BROKER_CONTAINER_NAME_FROM_STEP_TWO>
👀 Observe the microservice for a couple minutes. It must continue its processing.
#️⃣ stop all containers if you're done for the day.
sh stop-everything.sh
One of the most popular use cases for Apache Kafka is using it to replicate data from relational databases to purpose-built systems, while Kafka acts as a persistent layer for the data streams. To enable this use case, Kafka Connect, which is an integration technology from the Kafka ecosystem, is usually used with connectors capable of pulling data from relational databases either using a polling query or via CDC (Change-Data Capture).
This scenario check if an Apache Pulsar broker with KoP enabled can be used as the data stream layer for Kafka Connect, which uses a connector from Debezium to stream data changes made in near real-time in a MySQL database. The validation process is very simple: you just need to set this up as you would using Apache Kafka — but use Apache Pulsar with KoP enabled instead. Everything must work as advertised.
1️⃣ Start the containers for this scenario
sh start-cdc-with-debezium.sh
👀 You must wait until the containers mysql
and connect
are healthy to proceed with the next step.
2️⃣ Connect with the MySQL database and check the data
docker compose -f cdc-with-debezium/docker-compose.yml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD cdc_with_debezium'
Then, in the MySQL shell, execute the following command:
select * from customers;
You should see an output like this:
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | [email protected] |
| 1002 | George | Bailey | [email protected] |
| 1003 | Edward | Walker | [email protected] |
| 1004 | Anne | Kretchmar | [email protected] |
+------+------------+-----------+-----------------------+
3️⃣ Deploy the Debezium connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @cdc-with-debezium/mysql-connector.json
To verify if the connector has been sucessfully deployed, execute the following command:
curl -X GET http://localhost:8083/connectors/cdc-with-debezium
You should see an output like this:
{"name":"cdc-with-debezium","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.user":"debezium","database.server.id":"184054","tasks.max":"1","database.hostname":"mysql","database.password":"dbz","database.history.kafka.bootstrap.servers":"kafka-1:9092,kafka-2:9093","database.history.kafka.topic":"dbhistory.cdc_with_debezium","name":"cdc-with-debezium","database.server.name":"mysql","database.port":"3306","database.include.list":"cdc_with_debezium"},"tasks":[{"connector":"cdc-with-debezium","task":0}],"type":"source"}
4️⃣ Use the kafka-console-consumer
to monitor data streams
$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic mysql.cdc_with_debezium.customers
👀 Leave this console open so you can see the data coming in.
5️⃣ Insert a new record into the customers
table.
docker compose -f cdc-with-debezium/docker-compose.yml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD cdc_with_debezium'
Then, in the MySQL shell, execute the following command:
insert into customers values (1006, "Ricardo", "Ferreira", "[email protected]");
6️⃣ Look to the kafka-console-consumer
output. You should see an output like this:
Struct{after=Struct{id=1006,first_name=Ricardo,last_name=Ferreira,[email protected]},source=Struct{version=1.9.3.Final,connector=mysql,name=mysql,ts_ms=1660957053000,db=cdc_with_debezium,table=customers,server_id=1,file=binlog.000002,pos=406,row=0,thread=36},op=c,ts_ms=1660957053478}
#️⃣ Stop the containers from this scenario.
sh stop-cdc-with-debezium.sh
Alternatively, you can also stop all containers if you're done for the day.
sh stop-everything.sh
🧑🏻💻 https://github.com/debezium/debezium-examples/tree/main/tutorial#using-mysql
Another popular use case for Apache Kafka is using it as a persistent layer for stream processing pipelines. The idea is to have events being stored into Kafka, and stream processing technologies such as Kafka Streams, ksqlDB, Apache Flink be able to execute event processing pipelines that compute near real-time results as new events arrive.
This scenario check if an Apache Pulsar broker with KoP enabled can be used as the data stream layer for ksqlDB, which implements a stream processing pipeline that flattens out events with a complex nested layout and also changes the data format from JSON to Protobuf. The validation process is very simple: you just need to set this up as you would using Apache Kafka — but use Apache Pulsar with KoP enabled instead. Everything must work as advertised.
1️⃣ Start the containers for this scenario
sh start-stream-processing.sh
👀 You must wait until the containers schema-registry
and ksqldb-server
are healthy to proceed with the next step.
2️⃣ Connect with the ksqlDB server via CLI:
$KAFKA_HOME/bin/ksql http://localhost:8088
Then, in the ksqlDB shell, execute the following command:
RUN SCRIPT 'stream-processing/statements.sql';
Once this command finishes, you should have two streams created. Before moving further, it is a good idea to check if they were really created by executing the following command:
SHOW STREAMS;
You should see an output like this:
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
----------------------------------------------------------------------------
FLATTENED_ORDERS | FLATTENED_ORDERS | KAFKA | PROTOBUF | false
ORDERS | ORDERS | KAFKA | JSON | false
----------------------------------------------------------------------------
3️⃣ Execute a continuous query
SELECT
ORDER_ID,
ORDER_TS,
ORDER_AMOUNT,
CUST_FIRST_NAME,
CUST_LAST_NAME,
CUST_PHONE_NUMBER,
CUST_ADDR_STREET,
CUST_ADDR_NUMBER,
CUST_ADDR_ZIPCODE,
CUST_ADDR_CITY,
CUST_ADDR_STATE,
PROD_SKU,
PROD_NAME,
PROD_VENDOR_NAME,
PROD_VENDOR_COUNTRY
FROM FLATTENED_ORDERS
EMIT CHANGES;
👀 This query never stops, unless you press Cmd+C to interrupt its execution. Leave this console open so you can see the data coming in.
4️⃣ Ingest data into the input topic
$KAFKA_HOME/bin/kafka-console-producer --bootstrap-server localhost:9092 --topic ORDERS
Copy-and-paste the events below to the kafka-console-producer
CLI:
{"id": "1", "timestamp": "2020-01-18 01:12:05", "amount": 84.02, "customer": {"firstName": "Ricardo", "lastName": "Ferreira", "phoneNumber": "1234567899", "address": {"street": "Street SDF", "number": "8602", "zipcode": "27640", "city": "Raleigh", "state": "NC"}}, "product": {"sku": "P12345", "name": "Highly Durable Glue", "vendor": {"vendorName": "Acme Corp", "country": "US"}}}
{"id": "2", "timestamp": "2020-01-18 01:35:12", "amount": 84.02, "customer": {"firstName": "Tim", "lastName": "Berglund", "phoneNumber": "9987654321", "address": {"street": "Street UOI", "number": "1124", "zipcode": "85756", "city": "Littletown", "state": "CO"}}, "product": {"sku": "P12345", "name": "Highly Durable Glue", "vendor": {"vendorName": "Acme Corp", "country": "US"}}}
{"id": "3", "timestamp": "2020-01-18 01:58:55", "amount": 84.02, "customer": {"firstName": "Robin", "lastName": "Moffatt", "phoneNumber": "4412356789", "address": {"street": "Street YUP", "number": "9066", "zipcode": "BD111NE", "city": "Leeds", "state": "YS"}}, "product": {"sku": "P12345", "name": "Highly Durable Glue", "vendor": {"vendorName": "Acme Corp", "country": "US"}}}
{"id": "4", "timestamp": "2020-01-18 02:31:43", "amount": 84.02, "customer": {"firstName": "Viktor", "lastName": "Gamov", "phoneNumber": "9874563210", "address": {"street": "Street SHT", "number": "12450", "zipcode": "07003", "city": "New Jersey", "state": "NJ"}}, "product": {"sku": "P12345", "name": "Highly Durable Glue", "vendor": {"vendorName": "Acme Corp", "country": "US"}}}
5️⃣ Verify if the pipeline executed correctly
Go back to the continuous query that you started on step 3️⃣. With new events arriving, you should see that the query generated an output similar to this:
+-----------+--------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------+----------------------+
|ORDER_ID |ORDER_AMOUNT |CUST_FIRST_NAME |CUST_LAST_NAME |CUST_PHONE_NUMBER|CUST_ADDR_STREET |CUST_ADDR_NUMBER |PROD_SKU |PROD_NAME |
+-----------+--------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------+----------------------+
|1 |84.02 |Ricardo |Ferreira |1234567899 |Street SDF |8602 |P12345 |Highly Durable Glue |
|2 |84.02 |Tim |Berglund |9987654321 |Street UOI |1124 |P12345 |Highly Durable Glue |
|3 |84.02 |Robin |Moffatt |4412356789 |Street YUP |9066 |P12345 |Highly Durable Glue |
|4 |84.02 |Viktor |Gamov |9874563210 |Street SHT |12450 |P12345 |Highly Durable Glue |
💡 The actual output includes more columns that what is shown above.
#️⃣ Stop the containers from this scenario
sh stop-stream-processing.sh
Alternatively, you can also stop all containers if you're done for the day.
sh stop-everything.sh
🧑🏻💻 https://developer.confluent.io/tutorials/flatten-nested-data/ksql.html
This project is licensed under the Apache 2.0 License.