Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple object types in the same queue with @SqsListener #886

Open
jndietz opened this issue Sep 14, 2023 · 11 comments
Open

Support multiple object types in the same queue with @SqsListener #886

jndietz opened this issue Sep 14, 2023 · 11 comments
Labels
component: sqs SQS integration related issue status: ideal-for-contribution We agree it's nice to have but it is not team priority type: enhancement Smaller enhancement in existing integration

Comments

@jndietz
Copy link

jndietz commented Sep 14, 2023

Type: Feature

Is your feature request related to a problem? Please describe.
I'd like for my @SqsListener methods to perform logic based on an object type.

Describe the solution you'd like

@SqsListener("some-queue")
public void processSomeMessage(SomeMessage mesage) {
     // ...do stuff
}

@SqsListener("some-queue")
public void processAnotherMessage(AnotherMessage mesage) {
     // ...do something else
}

Describe alternatives you've considered
I have tried to use ObjectMapper.readValue() to first deserialize, then perform logic using instanceof, however, readValue always succeeds, even if the JSON string doesn't match the target object type.

Additional context
Add any other context or screenshots about the feature request here.

@dbouclier
Copy link

dbouclier commented Sep 14, 2023

@jndietz I'm also in trouble with a similar solution, I had deserialization issue and then we cannot use multiple @SqsListener with different types on same queue.

I'm in a publish/subscribe scenario, so I'm sending "event" to SNS Topic then I subscribe an SQS queue, each topics is receiving different type of events like "create", "update"...

image

If you are in the same scenario (I mean using SNS), I fixed the de-serialization issue you have by using the topic attribute "RawMessageDelivery=True" (see: https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html)

Then I realized, if you have two @SqsListener on the same queue, even with different type, the last one will catch all the SQS message.

Today I'm doing the type detection manually (by sending the type in the message header) then I'm doing the de-serialization myself, here my workaround :

@Service
@RequiredArgsConstructor
public class LeaveEventListener {

   private final ObjectMapper objectMapper;

   @SqsListener(queueNames = "Leaves.fifo")
   public void onLeaveEvent(final Message<String> event) throws ClassNotFoundException, JsonProcessingException {
       LOG.debug("Leave event received: {}", event);

       // TODO: find a way to avoid this manual step
       final Event payload = (Event) objectMapper.readValue(event.getPayload(), Class.forName(event.getHeaders().get("payloadType", String.class)));

       switch (payload.getType()) {
           case LEAVE_CREATED -> onLeaveCreatedEvent((LeaveCreatedEvent) payload);
           case LEAVE_UPDATED -> onLeaveUpdateEvent((LeaveUpdatedEvent) payload);
           default -> LOG.warn("Unsupported event received: {}", event.getPayload());
       }
   }

   private void onLeaveCreatedEvent(final LeaveCreatedEvent event) {
       LOG.debug("Leave created event received: {}", event);
   }

   private void onLeaveUpdateEvent(final LeaveUpdatedEvent event) {
       LOG.debug("Leave updated event received: {}", event);
   }

}

it's working but It's not working nicely as what I did in a previous project with @RabbitListener(example: https://engineering.celonis.com/blog/patterns-with-spring-and-rabbitmq-part-1-rabbithandler-and-its-usages/)
or with @JmsListner + AzureServiceBus 🤷‍♂️

I will be happy to know if there is a better solution wtih AWS SNS/SQS

@tomazfernandes
Copy link
Contributor

@jndietz, if I understand correctly, you want to deserialize the message based on some particular metadata in the message.

You should be able to achieve that by configuring a message converter

See this example for how to use an interface or super type in the listener, and receiving the correct deserialized payload.

Please let me know if that works for you.

Thanks.

@dbouclier
Copy link

@tomazfernandes message converters are working well for custom deserialization but how to "route" the message base on the type to different @SqsEventListener using the same queue?

Sorry for hijacking the issue of @jndietz should I create a specific issue for this ?

@jndietz
Copy link
Author

jndietz commented Sep 15, 2023

No worries at all @dbouclier - I think we're on the same page here. Ideally, I'd like a drop-in replacement for @RabbitHandler. I think I just realized how spring-rabbit does it, it must serialize the class name into the JSON when publishing the message, and then when a consumer/listener picks it up, it has the type already specified.

Thanks for the information @tomazfernandes , I will dig into this and get back to you.

@jndietz
Copy link
Author

jndietz commented Sep 18, 2023

Alright, I finally got some things "working". Creating and configuring a SqsMessageListenerContainerFactory and a subsequent SqsMessagingMessageConverter work okay. I'd prefer to use the fully qualified class name, but this might work as a stop-gap for a while. However, this has it's own set of problems/bugs that I will post in a separate thread (deserialization fails at random with Jackson errors, and the @SqsListener doesn't always pick up messages from the queue). Anywho...

The docs located here state "By default, the framework looks for a MessageHeader named Sqs_MA_JavaType containing the fully qualified class name (FQCN) for which the payload should be deserialized to. If such header is found, the message is automatically deserialized to the provided class.". However, I provided the aforementioned header with a fully qualified class name, and it still would go to the wrong @SqsListener method. Given the example in my original post, a value of com.someapp.model.SomeMessage would go to

@SqsListener("some-queue")
                                 // 👇 note the type
public void processAnotherMessage(AnotherMessage mesage) {
     // ...do stuff
}

@tomazfernandes
Copy link
Contributor

Hi @jndietz and @dbouclier

I think what you're looking for is something similar to the @KafkaHandler feature in Spring Kafka.

Is this understanding correct?

If so, we do not currently support a similar feature in Spring Cloud AWS, but I'd be happy to review a PR if you'd like to contribute one.

For now, you're really not supposed to have multiple @SqsListener annotations listening to the same queue and any behavior you see would not be deterministic.

Makes sense? Thanks.

@jndietz
Copy link
Author

jndietz commented Sep 18, 2023

Yes - that is correct, or similar to how @RabbitHandler works.

I have to say that I'm surprised at your comment that we're not supposed to use multiple @SqsListener in a project. Especially so, since it appears someone went through great lengths to document how you might deserialize to different types and configuring the various classes in spring-cloud-aws documentation.

It sounds like we'll have to work around this issue for now. We can proceed with a single @SqsListener annotation, and within that method we will pull the appropriate value from the Message headers, and then use ObjectMapper to convert to the correct type. I was really hoping there would be Spring way of doing this, but it doesn't sound like there is.

The code here sort of works, but it fails to deserialize correctly at random.

@jndietz jndietz closed this as completed Sep 18, 2023
@dbouclier
Copy link

@tomazfernandes thanks for your response.
this limitation on the @SqsListener should mentioned in the project documentation, since it's an implementation of "spring-messaging" like @RabbitListener, @KafkaListener or @JmsListener I was expecting same features. this drives me in a wrong direction and I was trying to build thing not supported.
I guess I should consider using amazon-mq instead of SNS+SQS for a pub/sub scenario.

@tomazfernandes
Copy link
Contributor

It sounds like we'll have to work around this issue for now. We can proceed with a single @SqsListener annotation, and within that method we will pull the appropriate value from the Message headers, and then use ObjectMapper to convert to the correct type. I was really hoping there would be Spring way of doing this, but it doesn't sound like there is.

I think there are a few ways to handle this - for one, you can configure the deserialization to convert to the proper type, then receive a Message<Object> in the @SqsListener method, or an interface / super type. Then based on the type you can call the appropriate service to handle the payload.

I have to say that I'm surprised at your comment that we're not supposed to use multiple @SqsListener in a project.

That's definitely not what I said, and I'm sorry if it sounded harsh. You can use as many listeners in a project as you'd like, just not for the same queue.

I was expecting same features. this drives me in a wrong direction and I was trying to build thing not supported.

I can see your frustration and I'm sorry you spent time going in the wrong direction. Perhaps you'd like to open a PR adding this information to the docs?

I guess I should consider using amazon-mq instead of SNS+SQS for a pub/sub scenario.

I think we've covered some alternatives already, but if you're willing to make architectural changes, perhaps a good approach would be using SNS filters so that only one type of message gets to a queue.

All in all, let's keep in mind this is a community-based project, so feel free to contribute with a PR if you'd like the feature to be included. I'd be happy to offer some guidance and review the PR.

I'll leave the issue open in case anyone else would like to contribute a PR for this.

@tomazfernandes tomazfernandes added component: sqs SQS integration related issue status: ideal-for-contribution We agree it's nice to have but it is not team priority labels Sep 18, 2023
@tomazfernandes tomazfernandes changed the title Support multiple object types with @SqsListener Support multiple object types in the same queue with @SqsListener Jan 5, 2024
@tomazfernandes tomazfernandes added the type: enhancement Smaller enhancement in existing integration label Jan 5, 2024
@shehrooz-sattar
Copy link

shehrooz-sattar commented Jan 12, 2024

Yeah, this would have been a very convenient feature.

@jndietz Were you able to figure out why it fails at random?

The code that you have pprovide at

https://github.com/jndietz/sqs-listener/blob/main/src/main/java/com/example/listener/config/MessageConverterConfig.java

@tomazfernandes
Copy link
Contributor

Hey @shehrooz-10p - this is really not supported by the framework at the moment, and for now there's now way for the listener to disambiguate the listener methods by payload type for the same queue.

Spring Kafka on the other hand has such a mechanism - if you'd like to take a look at its implementation and open a PR with the changes I'd be happy to review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue status: ideal-for-contribution We agree it's nice to have but it is not team priority type: enhancement Smaller enhancement in existing integration
Projects
None yet
Development

No branches or pull requests

4 participants