diff --git a/services-api/src/main/java/io/scalecube/services/annotations/ExecuteOn.java b/services-api/src/main/java/io/scalecube/services/annotations/ExecuteOn.java index bdc7fec66..832511742 100644 --- a/services-api/src/main/java/io/scalecube/services/annotations/ExecuteOn.java +++ b/services-api/src/main/java/io/scalecube/services/annotations/ExecuteOn.java @@ -18,10 +18,9 @@ public @interface ExecuteOn { /** - * Returns {@link reactor.core.scheduler.Scheduler} spec, in the format: {@code - * :} + * Returns scheduler name. * - * @return scheduler spec + * @return scheduler name */ String value(); } diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 99f08b6f6..c807d3119 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -133,8 +134,9 @@ private Microservices(Microservices.Context context) { } public static Microservices start(Microservices.Context context) { - final Microservices microservices = new Microservices(context.conclude()); + Microservices microservices = null; try { + microservices = new Microservices(context.conclude()); LOGGER.log(Level.INFO, "[{0}] Starting {1}", microservices.instanceId, microservices); microservices.startTransport(context.transportSupplier, context.serviceRegistry); microservices.createServiceEndpoint(); @@ -144,7 +146,10 @@ public static Microservices start(Microservices.Context context) { microservices.startListen(); LOGGER.log(Level.INFO, "[{0}] Started {1}", microservices.instanceId, microservices); } catch (Exception ex) { - microservices.close(); + if (microservices != null) { + microservices.close(); + } + context.close(); throw Exceptions.propagate(ex); } return microservices; @@ -415,6 +420,7 @@ public void close() { closeDiscovery(); closeGateways(); closeTransport(); + context.close(); LOGGER.log(Level.INFO, "[{0}] Closed {1}", instanceId, this); } @@ -511,7 +517,7 @@ public static final class Context { private final AtomicBoolean isConcluded = new AtomicBoolean(); private Map tags; - private List serviceProviders = new ArrayList<>(); + private final List serviceProviders = new ArrayList<>(); private ServiceRegistry serviceRegistry; private Authenticator defaultAuthenticator; private PrincipalMapper defaultPrincipalMapper; @@ -521,7 +527,9 @@ public static final class Context { private Integer externalPort; private ServiceDiscoveryFactory discoveryFactory; private Supplier transportSupplier; - private List> gatewaySuppliers = new ArrayList<>(); + private final List> gatewaySuppliers = new ArrayList<>(); + private final Map> schedulerSuppliers = new HashMap<>(); + private final Map schedulers = new ConcurrentHashMap<>(); public Context() {} @@ -638,7 +646,7 @@ public Context transport(Supplier transportSupplier) { /** * Adds {@link Gateway} supplier to the list of gateway suppliers. * - * @param gatewaySupplier gatewaySupplier + * @param gatewaySupplier {@link Gateway} supplier * @return this */ public Context gateway(Supplier gatewaySupplier) { @@ -699,6 +707,18 @@ public Context defaultPrincipalMapper( return this; } + /** + * Adds {@link Scheduler} supplier to the list of scheduler suppliers. + * + * @param name scheduler name + * @param schedulerSupplier {@link Scheduler} supplier + * @return this + */ + public Context scheduler(String name, Supplier schedulerSupplier) { + schedulerSuppliers.put(name, schedulerSupplier); + return this; + } + private Context conclude() { if (!isConcluded.compareAndSet(false, true)) { throw new IllegalStateException("Context is already concluded"); @@ -722,15 +742,14 @@ private Context conclude() { tags = new HashMap<>(); } - if (serviceProviders == null) { - serviceProviders = new ArrayList<>(); - } - - if (gatewaySuppliers == null) { - gatewaySuppliers = new ArrayList<>(); - } + schedulerSuppliers.forEach((s, supplier) -> schedulers.put(s, supplier.get())); return this; } + + private void close() { + schedulers.values().forEach(Scheduler::dispose); + schedulers.clear(); + } } }