Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow AxonServerEventStore to utilize FilteringEventStorageEngine #322

Open
bsanders1979 opened this issue Nov 17, 2023 · 9 comments
Open

Comments

@bsanders1979
Copy link

Enhancement Description

I would like to be able to prevent some events from being published to the event store. Previously, our application was not connected to an event store, so this issue wasn't prevalent.

Current Behaviour

All events are published to event store

Wanted Behaviour

I would like to be able to prevent certain events from being published.

I suggest the following enhancement for AxonServerEventStore.Builder

public Builder eventMessageFilter(Predicate<? super EventMessage<?>> filter) {
  ...
}

private void buildStorageEngine() {
  ...
      EventStorageEngine storageEngine = AxonIQEventStorageEngine.builder()
                          .snapshotSerializer(snapshotSerializer.get())
                          .upcasterChain(upcasterChain)
                          .snapshotFilter(snapshotFilter)
                          .eventSerializer(eventSerializer.get())
                          .configuration(configuration)
                          .eventStoreClient(axonServerConnectionManager)
                          .converter(new GrpcMetaDataConverter(eventSerializer.get()))
                          .build();
                          
      if (eventMessageFilter != null)
        storageEngine = new FilteringEventStorageEngine(storageEngine, eventMessageFilter);
        
      super.storageEngine(storageEngine);
}

Possible Workarounds

No known workarounds

@smcvb
Copy link
Contributor

smcvb commented Nov 17, 2023

Thanks for filing this issue with us, @bsanders1979!
Although the FilteringEventStorageEngine works to filter out events you're not interested in, you can resolve this with an EventUpcaster too.

The behavior would essentially be the same.
An event stream is retrieved containing everything, but a predicate removes events from the stream before they're consumed.

I whipped up the following in an Intellij scratch, but haven't ran it.
Nonetheless, I think it should do the trick:

class FilteringEventUpcaster implements EventUpcaster {
    
    private final List<String> eventsToSkip;

    FilteringEventUpcaster(List<String> eventsToSkip) {
        this.eventsToSkip = eventsToSkip;
    }

    @Override
    public Stream<IntermediateEventRepresentation> upcast(
            Stream<IntermediateEventRepresentation> intermediateRepresentations
    ) {
        return intermediateRepresentations.filter(
                representation -> !eventsToSkip.contains(representation.getType().getName())
        );
    }
}

Would you mind giving this a try as a solution?

@bsanders1979
Copy link
Author

bsanders1979 commented Nov 17, 2023

Hey Steven,
I will certainly give that a shot and report back. Thanks for the quick response! Been a fan since 2017 and I was finally given the green light to break some ground this year.

Thanks,
Brian

@bsanders1979
Copy link
Author

bsanders1979 commented Nov 18, 2023

Hey Steven,
So, I started looking into this and I'm not sure it will yield the results I want as it sounds like it's more of inbound filter whereas (I think) I'm looking for an outbound filter. If that's not the case, then I will keep looking into it. But, a little back story. A few years ago, I wrote some code that publishes some events and I have a few listeners spread out because I didn't want to group a bunch of random code together simply because they're interested in the same event. We are using the cloud event store which has a quota and I don't want to waste it on these messages. The listeners that react to the events are subscribing. I plan on fixing those bits, but if I could just keep these messages from being published in the meantime, I feel like it would buy me some time to make sure I fix it properly.

Have a good day!

PS: I just wanted to clarify that I'm using 4.5.15 of axon-server-connector

@smcvb
Copy link
Contributor

smcvb commented Nov 20, 2023

You are entirely right, @bsanders1979, my apologies.
I should've opened the FilteringEventStorageEngine before typing my upcaster suggestion, as it would've been apparent immediately that it was a filter for publishing, not for consumption.

This makes your request pretty viable as an introduction, as there's no straightforward workaround apart from making your own build of Axon Framework.

That said, I think we are dealing with a feature for the Axon Framework repository instead of the Axon Server Connector Java project.
From there, I would envision it as an adjustment in the AxonServerEventStore its builder so that a Predicate can be given there.
That way, Axon Framework can do the wrapping into the FilteringEventStorageEngine for you so that we can keep the AxonIQEventStorageEngine a private inner class to the AxonServerEventStore.

Does that make sense to you, @bsanders1979?

Now, let quote two sentences from you in my reply:

PS: I just wanted to clarify that I'm using 4.5.15 of axon-server-connector

I assume this means you're also on an old version of Axon Framework, right?

Thanks for the quick response! Been a fan since 2017 and I was finally given the green light to break some ground this year.

That's awesome news! Happy to have you use Axon's stuff :-)

@bsanders1979
Copy link
Author

np. Can't say I'm not guilty of doing the same thing. 😆 Yeah, our monolith has to use 4.5 as trying to go any higher causes some errors. I want to upgrade the stack for the monolith, but it's kind of a tall order, so it will require more effort in terms of coding and testing. I'm including my config class for reference.

public class AxonConfiguration implements BeanFactoryAware {
	
	private BeanFactory beanFactory;
	
	@Override
	public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
		this.beanFactory = beanFactory;
	}
	
	@Bean(destroyMethod="shutdown")
	public Configuration serverConfiguration(AxonServerConfiguration axonServerConfiguration) {
		return DefaultConfigurer.defaultConfiguration()			
			.registerComponent(AxonServerConfiguration.class, c -> axonServerConfiguration)
			.registerComponent(TokenStore.class, this::configureTokenStore)
			.eventProcessing(this::configureEventProcessing) 
			.configureSerializer(c -> JacksonSerializer.defaultSerializer())
			.start();
	}

	@Bean
	public AxonServerConfiguration axonServerConfiguration(Properties applicationConfig) {
		AxonServerConfiguration config = AxonServerConfiguration.builder()
			.componentName(applicationConfig.getProperty("axon.axonserver.componentName", "PM"))
			.servers(applicationConfig.getProperty("axon.axonserver.servers", "axonserver.cloud.axoniq.io:443"))
			.token(applicationConfig.getProperty("axon.axonserver.token", ""))
			.context(applicationConfig.getProperty("axon.axonserver.context", ""))
			.build();
		
		config.setSslEnabled(Boolean.parseBoolean(applicationConfig.getProperty("axon.axonserver.ssl-enabled", "true")));
		config.setKeepAliveTime(Long.parseLong(applicationConfig.getProperty("axon.axonserver.keep-alive-time", "0")));
		
		return config;
	}
	
	public TokenStore configureTokenStore(Configuration c) {
		return JdbcTokenStore.builder()
			.serializer(c.serializer())
			.schema(TokenSchema.builder()
				.setTokenTable("TOKEN_ENTRY")
				.setProcessorNameColumn("PROCESSOR_NAME")
				.setTokenTypeColumn("TOKEN_TYPE")
				.build())
			.connectionProvider(new DataSourceConnectionProvider(beanFactory.getBean(DataSource.class)))
			.build();
	}
	
	public void configureEventProcessing(EventProcessingConfigurer eventProcessingConfigurer) {
		eventProcessingConfigurer
			.usingSubscribingEventProcessors()
			.registerEventHandler(c -> beanFactory.getBean(SafetyDataSheetEventListener.class))
			.registerEventHandler(c -> beanFactory.getBean(SpecificationEventListener.class))
			.registerEventHandler(c -> beanFactory.getBean(SupplierEventListener.class))
			.registerEventHandler(c -> beanFactory.getBean(InvoiceEventListener.class))
			.registerEventHandler(c -> beanFactory.getBean(SupplierContactEventListener.class))
			.registerEventHandler(c -> beanFactory.getBean(InspectionEventListener.class));
	}
	
	@Bean
	public EventBus eventBus(Configuration c) {
		return c.getComponent(EventBus.class);
	}
	
	@Bean
	public EventGateway eventGateway(Configuration c) {
		return c.getComponent(EventGateway.class);
	}
}

@smcvb
Copy link
Contributor

smcvb commented Nov 22, 2023

The fact you're on 4.5 is kind of a bummer to get you this enhancement ASAP.
We tend to do bug fixes about 3 minor releases back. But firstly, that would be a fix in 4.6 of Axon Framework, and secondly, we're not dealing with a bug fix, but an enhancement. :-[

So, as proposed before, I can move this issue to the Axon Framework repository where it would allow impact on construction of the AxonServerEventStore and start on a fix there, but that's about it.
If you need a hand with upgrading to 4.9 of AF (current version), be sure to reach out on (e.g.) our forum!

For the time being, if you need the filtering going forward, your best bet is to copy the AxonServerEventStore from the Axon Framework version you're on and add the filtering manually, I'm afraid.

@bsanders1979
Copy link
Author

Alright, I really appreciate you looking into this. As luck would have it, I was told yesterday that I can start looking into upgrading our stack very soon. I was able to compile 4.5 from the CLI, so I can probably slap a simple fix in there in the meantime. Have a good one. ✌️

@smcvb
Copy link
Contributor

smcvb commented Nov 24, 2023

That's great news, @bsanders1979!
Just doing a final check from my send: do you agree this is an enhancement for the Axon Framework repository instead of the Axon Server Connector Jave repository? ;-)

@bsanders1979
Copy link
Author

Hey Steven,
Sorry for the delay. Personally, I'm not sure my use-case is applicable to other types of non-metered event stores, but you never know... However, if you think that it's better suited at a higher level, then I will take your word for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants