From 9fd30bb1ba5be7f6d6f1a795790605749eaa465f Mon Sep 17 00:00:00 2001 From: chgl Date: Tue, 1 Aug 2023 15:23:41 +0200 Subject: [PATCH] fix: avoid infinite loops by not overriding the output topic if it doesn't match the generateTopicMatchExpression (#79) [skip ci] --- .../fhirgateway/processors/KafkaProcessor.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/miracum/etl/fhirgateway/processors/KafkaProcessor.java b/src/main/java/org/miracum/etl/fhirgateway/processors/KafkaProcessor.java index 5d03644..4e4fffd 100644 --- a/src/main/java/org/miracum/etl/fhirgateway/processors/KafkaProcessor.java +++ b/src/main/java/org/miracum/etl/fhirgateway/processors/KafkaProcessor.java @@ -7,16 +7,15 @@ import org.apache.commons.lang3.StringUtils; import org.hl7.fhir.r4.model.Bundle; import org.hl7.fhir.r4.model.Resource; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.integration.support.MessageBuilder; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; -import org.springframework.stereotype.Component; -@Component +@Configuration @ConditionalOnExpression( "${services.kafka.enabled:true} and !T(org.springframework.util.StringUtils).isEmpty('${spring.cloud.stream.bindings.process-out-0.destination:}')") public class KafkaProcessor extends BaseKafkaProcessor { @@ -24,7 +23,6 @@ public class KafkaProcessor extends BaseKafkaProcessor { private String generateTopicMatchExpression; private String generateTopicReplacement; - @Autowired public KafkaProcessor( ResourcePipeline pipeline, @Value("${services.kafka.generate-output-topic.match-expression}") @@ -37,7 +35,7 @@ public KafkaProcessor( } @Bean - public Function, Message> process() { + Function, Message> process() { return message -> { var processed = super.process(message); @@ -51,8 +49,8 @@ public Function, Message> process() { var inputTopic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC).toString(); var outputTopic = computeOutputTopicFromInputTopic(inputTopic); - // see https://github.com/spring-cloud/spring-cloud-stream/issues/1909 for details on - // "spring.cloud.stream.sendto.destination" + // see https://github.com/spring-cloud/spring-cloud-stream/issues/1909 for + // details on "spring.cloud.stream.sendto.destination" outputTopic.ifPresent( s -> messageBuilder.setHeader("spring.cloud.stream.sendto.destination", s)); @@ -73,6 +71,10 @@ private Optional computeOutputTopicFromInputTopic(String inputTopic) { kv("inputTopic", inputTopic), kv("outputTopic", outputTopic)); + if (inputTopic.equals(outputTopic)) { + return Optional.empty(); + } + return Optional.of(outputTopic); }