Skip to content

Commit

Permalink
Bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
itadventurer committed Jun 18, 2024
1 parent eb4d091 commit baa7408
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 10 deletions.
2 changes: 1 addition & 1 deletion csp1_transformer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies {
// Logging ausgaben aktivieren
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.32'
// Kafka Client-Bibliothek
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.3.2'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.0'
// GSON für JSON-Serialisierung
implementation 'com.google.code.gson:gson:2.9.0'
}
Expand Down
2 changes: 1 addition & 1 deletion csp1_transformer/gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
13 changes: 6 additions & 7 deletions csp1_transformer/src/main/java/charging/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final Properties consumerProps = new Properties();
final Properties producerProps = new Properties();
String configFile = "csp1_transformer.properties";
if(args.length == 1) {
if (args.length == 1) {
configFile = args[0];
}
consumerProps.load(new FileReader(configFile));
Expand Down Expand Up @@ -53,10 +53,9 @@ public static void main(final String[] args) throws IOException, InterruptedExce
while (true) {
ConsumerRecords<String, CSP1Transaction> records = consumer.poll(Duration.ofMillis(100));

producer.beginTransaction();
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

for (ConsumerRecord<String, CSP1Transaction> record : records) {
producer.beginTransaction();
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
CSP1Transaction csp1Transaction = record.value();
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
Expand All @@ -72,12 +71,12 @@ public static void main(final String[] args) throws IOException, InterruptedExce
ProducerRecord<String, Transaction> transactionRecord =
new ProducerRecord<>(OUTPUT_TOPIC, transaction.customerId, transaction);
producer.send(transactionRecord);
if(logInfos) {
if (logInfos) {
System.out.println("Processed message for customer " + transaction.customerId);
}
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
}
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
}
}
}
Expand Down
1 change: 0 additions & 1 deletion kubernetes/csp1_transformer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ data:
bootstrap.servers=my-cluster-kafka-bootstrap:9092
csp1.topic=csp1_transactions
output.topic=transactions
producer.msgs.per.sec=1
group.id=csp1_transformer
transactional.id=transactions-transformer-app
processing.time.ms=1000
Expand Down

0 comments on commit baa7408

Please sign in to comment.