Skip to content

Commit

Permalink
Switching to injectable factories per bootique/bootique#345
Browse files Browse the repository at this point in the history
  • Loading branch information
andrus committed Dec 10, 2023
1 parent 392ad50 commit 4eb8ad8
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import io.bootique.ModuleCrate;
import io.bootique.config.ConfigurationFactory;
import io.bootique.di.Binder;
import io.bootique.di.Injector;
import io.bootique.di.Provides;
import io.bootique.rabbitmq.client.channel.RmqChannelManager;
import io.bootique.rabbitmq.client.connection.RmqConnectionManager;
import io.bootique.rabbitmq.client.topology.RmqTopologyManager;
import io.bootique.shutdown.ShutdownManager;

import javax.inject.Singleton;

Expand Down Expand Up @@ -56,11 +54,8 @@ RmqObjectsFactory provideRmqObjectsFactory(ConfigurationFactory configFactory) {

@Singleton
@Provides
RmqConnectionManager provideConnectionManager(
RmqObjectsFactory factory,
ShutdownManager shutdownManager,
Injector injector) {
return factory.createConnectionManager(injector, shutdownManager);
RmqConnectionManager provideConnectionManager(RmqObjectsFactory factory) {
return factory.createConnectionManager();
}

@Singleton
Expand All @@ -80,8 +75,7 @@ RmqTopologyManager provideTopologyManager(RmqObjectsFactory factory) {
RmqEndpoints provideEndpoints(
RmqObjectsFactory factory,
RmqChannelManager channelManager,
RmqTopologyManager topologyManager,
ShutdownManager shutdownManager) {
return factory.createEndpoints(channelManager, topologyManager, shutdownManager);
RmqTopologyManager topologyManager) {
return factory.createEndpoints(channelManager, topologyManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.rabbitmq.client.ConnectionFactory;
import io.bootique.annotation.BQConfig;
import io.bootique.annotation.BQConfigProperty;
import io.bootique.di.Injector;
import io.bootique.rabbitmq.client.channel.PoolingChannelManager;
import io.bootique.rabbitmq.client.channel.RmqChannelManager;
import io.bootique.rabbitmq.client.channel.SimpleChannelManager;
Expand All @@ -35,6 +34,7 @@
import io.bootique.rabbitmq.client.topology.*;
import io.bootique.shutdown.ShutdownManager;

import javax.inject.Inject;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -47,13 +47,27 @@
@BQConfig
public class RmqObjectsFactory {

private final ShutdownManager shutdownManager;

private Map<String, ConnectionFactoryFactory> connections;
private Map<String, RmqExchangeConfigFactory> exchanges;
private Map<String, RmqQueueConfigFactory> queues;
private Map<String, RmqPubEndpointFactory> pub;
private Map<String, RmqSubEndpointFactory> sub;
private int channelPoolCapacity;

@Inject
public RmqObjectsFactory(ShutdownManager shutdownManager) {
this.shutdownManager = shutdownManager;
}

public RmqConnectionManager createConnectionManager() {
Map<String, ConnectionFactory> factories = createConnectionFactories();
return shutdownManager.onShutdown(
new RmqConnectionManager(factories),
RmqConnectionManager::shutdown);
}

/**
* @since 3.0
*/
Expand All @@ -73,17 +87,10 @@ public RmqTopologyManager createTopologyManager() {
createQueueConfigs());
}

public RmqEndpoints createEndpoints(RmqChannelManager channelManager, RmqTopologyManager topologyManager, ShutdownManager shutdownManager) {
public RmqEndpoints createEndpoints(RmqChannelManager channelManager, RmqTopologyManager topologyManager) {
return new RmqEndpoints(
createPubEndpoints(channelManager, topologyManager),
createSubEndpoints(channelManager, topologyManager, shutdownManager));
}

public RmqConnectionManager createConnectionManager(Injector injector, ShutdownManager shutdownManager) {
Map<String, ConnectionFactory> factories = createConnectionFactories(injector);
return shutdownManager.onShutdown(
new RmqConnectionManager(factories),
RmqConnectionManager::shutdown);
createSubEndpoints(channelManager, topologyManager));
}

protected Map<String, RmqQueueConfig> createQueueConfigs() {
Expand All @@ -106,13 +113,13 @@ protected Map<String, RmqExchangeConfig> createExchangeConfigs() {
return map;
}

protected Map<String, ConnectionFactory> createConnectionFactories(Injector injector) {
protected Map<String, ConnectionFactory> createConnectionFactories() {
if (connections == null || connections.isEmpty()) {
return Collections.emptyMap();
}

Map<String, ConnectionFactory> map = new HashMap<>();
connections.forEach((k, v) -> map.put(k, v.createConnectionFactory(k, injector)));
connections.forEach((k, v) -> map.put(k, v.createConnectionFactory(k)));
return map;
}

Expand All @@ -129,17 +136,14 @@ protected Map<String, RmqPubEndpoint> createPubEndpoints(
return map;
}

protected Map<String, RmqSubEndpoint> createSubEndpoints(
RmqChannelManager channelManager,
RmqTopologyManager topologyManager,
ShutdownManager shutdownManager) {
protected Map<String, RmqSubEndpoint> createSubEndpoints(RmqChannelManager channelManager, RmqTopologyManager topologyManager) {

if (sub == null || sub.isEmpty()) {
return Collections.emptyMap();
}

Map<String, RmqSubEndpoint> map = new HashMap<>();
sub.forEach((k, v) -> map.put(k, v.create(channelManager, topologyManager, shutdownManager)));
sub.forEach((k, v) -> map.put(k, v.create(channelManager, topologyManager)));
return map;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.rabbitmq.client.ConnectionFactory;
import io.bootique.annotation.BQConfig;
import io.bootique.annotation.BQConfigProperty;
import io.bootique.di.Injector;

@BQConfig
@JsonTypeName("amqp")
Expand All @@ -36,7 +35,7 @@ public class AMQPConnectionFactoryFactory extends ConnectionFactoryFactory {
private int port = -1;

@Override
protected ConnectionFactory configureFactory(ConnectionFactory factory, String connectionName, Injector injector) {
protected ConnectionFactory configureFactory(ConnectionFactory factory, String connectionName) {
if (username != null) {
factory.setUsername(username);
}
Expand All @@ -55,7 +54,7 @@ protected ConnectionFactory configureFactory(ConnectionFactory factory, String c

factory.setPort(port);

return super.configureFactory(factory, connectionName, injector);
return super.configureFactory(factory, connectionName);
}

@BQConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.bootique.annotation.BQConfig;
import io.bootique.annotation.BQConfigProperty;
import io.bootique.config.PolymorphicConfiguration;
import io.bootique.di.Injector;

@BQConfig
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = AMQPConnectionFactoryFactory.class)
Expand All @@ -40,11 +39,11 @@ public abstract class ConnectionFactoryFactory implements PolymorphicConfigurati
private Boolean topologyRecovery;
private Long networkRecoveryInterval;

public ConnectionFactory createConnectionFactory(String connectionName, Injector injector) {
return configureFactory(new ConnectionFactory(), connectionName, injector);
public ConnectionFactory createConnectionFactory(String connectionName) {
return configureFactory(new ConnectionFactory(), connectionName);
}

protected ConnectionFactory configureFactory(ConnectionFactory factory, String connectionName, Injector injector) {
protected ConnectionFactory configureFactory(ConnectionFactory factory, String connectionName) {

// let's preserve RMQ defaults if parameters are not setup explicitly
if (requestedChannelMax != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.rabbitmq.client.ConnectionFactory;
import io.bootique.annotation.BQConfig;
import io.bootique.annotation.BQConfigProperty;
import io.bootique.di.Injector;

@BQConfig
@JsonTypeName("uri")
Expand All @@ -32,13 +31,13 @@ public class URIConnectionFactoryFactory extends ConnectionFactoryFactory {
private String uri;

@Override
protected ConnectionFactory configureFactory(ConnectionFactory factory, String connectionName, Injector injector) {
protected ConnectionFactory configureFactory(ConnectionFactory factory, String connectionName) {
try {
factory.setUri(uri);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize RabbitMQ URI connection factory", e);
}
return super.configureFactory(factory, connectionName, injector);
return super.configureFactory(factory, connectionName);
}

@BQConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.bootique.rabbitmq.client.topology.*;
import io.bootique.shutdown.ShutdownManager;

import javax.inject.Inject;
import java.util.Objects;

/**
Expand All @@ -32,6 +33,8 @@
@BQConfig
public class RmqSubEndpointFactory {

private final ShutdownManager shutdownManager;

private String connection;
private String exchangeConfig;
private String exchangeName;
Expand All @@ -40,10 +43,12 @@ public class RmqSubEndpointFactory {
private String routingKey;
private boolean autoAck = true;

public RmqSubEndpoint create(
RmqChannelManager channelManager,
RmqTopologyManager topologyManager,
ShutdownManager shutdownManager) {
@Inject
public RmqSubEndpointFactory(ShutdownManager shutdownManager) {
this.shutdownManager = shutdownManager;
}

public RmqSubEndpoint create(RmqChannelManager channelManager, RmqTopologyManager topologyManager) {

Objects.requireNonNull(connection, "Subscriber connection name is undefined");
RmqEndpointDriver driver = new RmqEndpointDriver(channelManager, connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
*/
public class ConnectionPropertyBuilder {

private String propertyPrefix;
private BQCoreModuleExtender extender;
private final String propertyPrefix;
private final BQCoreModuleExtender extender;

protected ConnectionPropertyBuilder(String propertyPrefix, BQCoreModuleExtender extender) {
this.propertyPrefix = propertyPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;

@JsonTypeName("bqrmqtest")
// must be able to deserialize over the existing configs, so instruct Jackson to be lenient
@JsonIgnoreProperties(ignoreUnknown = true)
public class TestConnectionFactoryFactory extends ConnectionFactoryFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(TestConnectionFactoryFactory.class);

private final Injector injector;

@Inject
public TestConnectionFactoryFactory(Injector injector) {
this.injector = injector;
}

@Override
protected ConnectionFactory configureFactory(ConnectionFactory factory, String connectionName, Injector injector) {
protected ConnectionFactory configureFactory(ConnectionFactory factory, String connectionName) {

RmqTester tester = injector.getInstance(Key.get(RmqTester.class, connectionName));
String url = tester.getAmqpUrl();
Expand All @@ -49,6 +58,6 @@ protected ConnectionFactory configureFactory(ConnectionFactory factory, String c
throw new RuntimeException("Failed to initialize RabbitMQ URI connection factory", e);
}

return super.configureFactory(factory, connectionName, injector);
return super.configureFactory(factory, connectionName);
}
}

0 comments on commit 4eb8ad8

Please sign in to comment.