Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS Listner for SQS FIFO Queue is sending all messages concurrently, FIFO not followed. AWSPRING Cloud version 3.0.2 #892

Closed
nikhilznr opened this issue Sep 20, 2023 · 5 comments
Labels
component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer

Comments

@nikhilznr
Copy link

Type: Bug

Component:
SQS

Describe the bug
SQS listener for SQS FIFO queue is sending all messages concurrently instead of one by one per messageGroupId.

Sample

@Import(SqsBootstrapConfiguration.class)
@Configuration
@Slf4j
@ConditionalOnProperty(name = "outboundservice.sqs.enable", havingValue = "true", matchIfMissing = true)
public class SQSConfiguration {
        
    @Value("${outboundservice.sqs.async.client.maxConcurrency:100}")
    private int maxConcurrency;

    @PostConstruct
    public void logInitialization() {
        log.info("SQSConfiguration is initialized " +
                "and turned on due to 'outboundservice.sqs.enable' being set to true.");
    }

    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
        return SqsMessageListenerContainerFactory
                .builder()
                .configure(options -> options
                        .acknowledgementMode(AcknowledgementMode.MANUAL)
                        .acknowledgementOrdering(AcknowledgementOrdering.ORDERED_BY_GROUP)
                )
                .sqsAsyncClient(sqsAsyncClient())
                .build();
    }

    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
                .httpClientBuilder(
                        NettyNioAsyncHttpClient.builder()
                                .maxConcurrency(maxConcurrency) // Adjust this value as needed
                )
                .build();
    }

    @Bean
    public SqsAsyncOperations sqsDelayQueueAsyncTemplate(SqsAsyncClient sqsAsyncClient) {
        return SqsTemplate.builder()
                .sqsAsyncClient(sqsAsyncClient)
                .configure(options -> options
                        .acknowledgementMode(TemplateAcknowledgementMode.MANUAL))
                .buildAsyncTemplate();
    }
}
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(value="outboundservice.sqs.listner.outbound.enable", havingValue = "true", matchIfMissing = true)
public class OutboundSqsEventListener {

    private final LoggingService loggingService;
    private final SqsInternalMessageProcessor internalMessageProcessor;
    private final SqsExternalMessageProcessor externalMessageProcessor;
    private final SQSUtils sqsUtils;

    @PostConstruct
    public void logInitialization() {
        log.info("OutboundSqsEventListener is initialized " +
                "and turned on due to 'outboundservice.sqs.listner.outbound.enable' being set to true.");
    }

    @SqsListener(value = "${outboundservice.sqs.outbound.queue.name}", maxConcurrentMessages = "${outboundservice.sqs.outbound.queue.maxConcurrentMessages}")
    public void processMessage(Message message, Acknowledgement acknowledgement) {
      if (typeAttribute != null && "internal".equals(typeAttribute.stringValue())) {
            // Process internal Message
            processingMono = internalMessageProcessor.process(message, acknowledgement);
        } else {
            // Process external Message
            processingMono = externalMessageProcessor.process(message, acknowledgement);
        }

        processingMono.subscribe(
                null,
                error -> log.error("Error processing message with ID {}: {}", messageId, error.getMessage()),
                () -> log.debug("Finished processing for message with ID {}", messageId)
        );
    }
    
    
@nikhilznr nikhilznr changed the title SQS Fifo not working with AWSPRING Cloud version 3.0.2 SQS Listner for SQS FIFO Queue is sending all messages concurrently, FIFO not followed. AWSPRING Cloud version 3.0.2 Sep 20, 2023
@tomazfernandes
Copy link
Contributor

Hi @nikhilznr.

You can add a breakpoint to FifoSqsComponentFactory.java and MessageGroupingSinkAdapter.java if you'd like to debug this further.

Now, this use of Project Reactor is not compatible with this kind of Listener method. When you subscribe to your reactive chain, the listener thread will be released and the flow will go on to the other components such as error handling, interceptors and back pressure release before the message is actually processed, triggering a potentially unbounded polling loop that might lead to issues such as OOM.

We don't officially support Project Reactor in this project, but you could try creating some kind of adapter to turn your Mono into a CompletableFuture<Void> and return that in the listener method.

Does this make sense?

If you'd like me to take a closer look, please provide a minimal sample project that reproduces the issue.

Thanks.

@tomazfernandes tomazfernandes added component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer labels Oct 4, 2023
@K2CO3
Copy link

K2CO3 commented Oct 12, 2023

@tomazfernandes We observed the same behavior as OP without using Reactor (although OP uses it in a synchronous way). According to our experiments it happens when maxMessagesPerPoll << maxConcurrentMessages. E.g. when maxMessagesPerPoll=1 and maxConcurrentMessages=10 the FIFO order is violated. When maxMessagesPerPoll=10 and maxConcurrentMessages=10 - seemingly not. The acknowledgement mode seems to have no influence on this behavior - we saw it in both manual and automated modes.

@nikhilznr
Copy link
Author

We fixed the issue by adding custom / non zero value of messageVisibilitySeconds in @SqsListener

I think by default, 0 gets sent.

@tomazfernandes
Copy link
Contributor

Hi @K2CO3, thanks for bringing this up, and thanks @nikhilznr for showing up again.

Just to get this out of the way - are you testing against AWS, or LocalStack? LocalStack had a quite flaky FIFO implementation last time I checked.

E.g. when maxMessagesPerPoll=1 and maxConcurrentMessages=10 the FIFO order is violated.

This would be curious, because since we're fetching one message per poll, FIFO itself guarantees not to send a message from the same message group until the previously polled message from the same group is acknowledged or visibility timeout has expired.

I think by default, 0 gets sent.

If that was the case and we were overriding the queues' default visibility with a zero visibility it might indeed cause this issue. Now, in all of my tests, when the annotation does not provide a value and no value is configured in the ContainerOptions, the framework correctly sets it as null and does not override visibility.

(although OP uses it in a synchronous way)

I don't think we can affirm that without looking at the rest of the code - unless I'm missing something, the first I/O the logic encounters it'll hop to a different thread and release the subscribing thread.

If you could provide a sample application that can consistently reproduce the issue against AWS it would help me take a closer look into this.

Thanks.

@tomazfernandes
Copy link
Contributor

Closing due to lack of feedback. We can reopen if necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer
Projects
None yet
Development

No branches or pull requests

3 participants