Skip to content

Commit

Permalink
fix: avoid infinite loops by not overriding the output topic if it do…
Browse files Browse the repository at this point in the history
…esn't match the generateTopicMatchExpression (#79)

[skip ci]
  • Loading branch information
chgl authored Aug 1, 2023
1 parent eacf2a2 commit 9fd30bb
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,22 @@
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@Configuration
@ConditionalOnExpression(
"${services.kafka.enabled:true} and !T(org.springframework.util.StringUtils).isEmpty('${spring.cloud.stream.bindings.process-out-0.destination:}')")
public class KafkaProcessor extends BaseKafkaProcessor {

private String generateTopicMatchExpression;
private String generateTopicReplacement;

@Autowired
public KafkaProcessor(
ResourcePipeline pipeline,
@Value("${services.kafka.generate-output-topic.match-expression}")
Expand All @@ -37,7 +35,7 @@ public KafkaProcessor(
}

@Bean
public Function<Message<Resource>, Message<Bundle>> process() {
Function<Message<Resource>, Message<Bundle>> process() {
return message -> {
var processed = super.process(message);

Expand All @@ -51,8 +49,8 @@ public Function<Message<Resource>, Message<Bundle>> process() {
var inputTopic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC).toString();

var outputTopic = computeOutputTopicFromInputTopic(inputTopic);
// see https://github.com/spring-cloud/spring-cloud-stream/issues/1909 for details on
// "spring.cloud.stream.sendto.destination"
// see https://github.com/spring-cloud/spring-cloud-stream/issues/1909 for
// details on "spring.cloud.stream.sendto.destination"
outputTopic.ifPresent(
s -> messageBuilder.setHeader("spring.cloud.stream.sendto.destination", s));

Expand All @@ -73,6 +71,10 @@ private Optional<String> computeOutputTopicFromInputTopic(String inputTopic) {
kv("inputTopic", inputTopic),
kv("outputTopic", outputTopic));

if (inputTopic.equals(outputTopic)) {
return Optional.empty();
}

return Optional.of(outputTopic);
}

Expand Down

0 comments on commit 9fd30bb

Please sign in to comment.