From 55ba002e243ed7bae317dabc489a35596f7f4aa7 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Thu, 18 Jan 2024 15:50:42 +0100 Subject: [PATCH] Add avro generated classes --- pom.xml | 23 +++++ .../kafka/ConsumedCustomersResource.java | 13 ++- .../probe/persistence/CustomerEntity.java | 8 ++ src/main/resources/avro/customer.avsc | 95 +++++++++++++++++++ src/main/resources/avro/source.avsc | 84 ++++++++++++++++ 5 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 src/main/resources/avro/customer.avsc create mode 100644 src/main/resources/avro/source.avsc diff --git a/pom.xml b/pom.xml index 8a0bf2d..e9e6110 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ 2.5.8.Final 2.0.6 3.2.9.Final + 1.11.3 @@ -140,6 +141,28 @@ + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + String + + ${project.basedir}/src/main/resources/avro/source.avsc + ${project.basedir}/src/main/resources/avro/customer.avsc + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/target/generated-sources + + + + diff --git a/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java b/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java index 553c8c0..536bf2b 100644 --- a/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java +++ b/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java @@ -11,6 +11,8 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import server1.inventory.customers.Envelope; +import server1.inventory.customers.Value; import java.util.concurrent.CompletionStage; @@ -22,11 +24,16 @@ public class ConsumedCustomersResource { @Incoming("customers-from-kafka") @Transactional @Blocking - public CompletionStage consume(Message customerMessage) { - final CustomerEntity customer = customerMessage.getPayload(); + public CompletionStage consume(Message customerMessage) { + final Value customer = customerMessage.getPayload().getAfter(); try { log.info("Deleting customer with email: {}", customer.getEmail()); - customer.delete(); + CustomerEntity customerEntity = new CustomerEntity(); + customerEntity.setId((long) customer.getId()); + customerEntity.setEmail(customer.getEmail()); + customerEntity.setFirstName(customer.getFirstName()); + customerEntity.setLastName(customer.getLastName()); + customerEntity.delete(); } catch (Exception e) { log.error("Exception detected in the Probe application: {}", e.getCause(), e); return customerMessage.nack(e); diff --git a/src/main/java/io/apicurio/registry/probe/persistence/CustomerEntity.java b/src/main/java/io/apicurio/registry/probe/persistence/CustomerEntity.java index d278f48..b59b1f2 100644 --- a/src/main/java/io/apicurio/registry/probe/persistence/CustomerEntity.java +++ b/src/main/java/io/apicurio/registry/probe/persistence/CustomerEntity.java @@ -26,6 +26,14 @@ public class CustomerEntity extends PanacheEntityBase { @Column public String email; + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + public String getFirstName() { return firstName; } diff --git a/src/main/resources/avro/customer.avsc b/src/main/resources/avro/customer.avsc new file mode 100644 index 0000000..0b33414 --- /dev/null +++ b/src/main/resources/avro/customer.avsc @@ -0,0 +1,95 @@ +{ + "type": "record", + "name": "Envelope", + "namespace": "server1.inventory.customers", + "fields": [ + { + "name": "before", + "type": [ + "null", + { + "type": "record", + "name": "Value", + "fields": [ + { + "name": "id", + "type": { + "type": "int", + "connect.default": 0 + }, + "default": 0 + }, + { + "name": "first_name", + "type": "string" + }, + { + "name": "last_name", + "type": "string" + }, + { + "name": "email", + "type": "string" + } + ], + "connect.name": "server1.inventory.customers.Value" + } + ], + "default": null + }, + { + "name": "after", + "type": [ + "null", + "Value" + ], + "default": null + }, + { + "name": "source", + "type": "io.debezium.connector.postgresql.Source" + }, + { + "name": "op", + "type": "string" + }, + { + "name": "ts_ms", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "transaction", + "type": [ + "null", + { + "type": "record", + "name": "block", + "namespace": "event", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "total_order", + "type": "long" + }, + { + "name": "data_collection_order", + "type": "long" + } + ], + "connect.version": 1, + "connect.name": "event.block" + } + ], + "default": null + } + ], + "connect.version": 1, + "connect.name": "server1.inventory.customers.Envelope" +} \ No newline at end of file diff --git a/src/main/resources/avro/source.avsc b/src/main/resources/avro/source.avsc new file mode 100644 index 0000000..1f3e91a --- /dev/null +++ b/src/main/resources/avro/source.avsc @@ -0,0 +1,84 @@ +{ + "type": "record", + "name": "Source", + "namespace": "io.debezium.connector.postgresql", + "fields": [ + { + "name": "version", + "type": "string" + }, + { + "name": "connector", + "type": "string" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "ts_ms", + "type": "long" + }, + { + "name": "snapshot", + "type": [ + { + "type": "string", + "connect.version": 1, + "connect.parameters": { + "allowed": "true,last,false,incremental" + }, + "connect.default": "false", + "connect.name": "io.debezium.data.Enum" + }, + "null" + ], + "default": "false" + }, + { + "name": "db", + "type": "string" + }, + { + "name": "sequence", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "schema", + "type": "string" + }, + { + "name": "table", + "type": "string" + }, + { + "name": "txId", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "lsn", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "xmin", + "type": [ + "null", + "long" + ], + "default": null + } + ], + "connect.name": "io.debezium.connector.postgresql.Source" +} \ No newline at end of file