start() {
+ build();
return Mono.defer(() -> new Microservices(this).start());
}
@@ -382,32 +440,61 @@ public Microservices startAwait() {
return start().block();
}
+ /**
+ * Adds service instance to microservice.
+ *
+ * WARNING This service will be ignored if custom {@link ServiceFactory} is
+ * installed. This method has been left for backward compatibility only and will be removed in
+ * future releases.
+ *
+ * @param services service info instance.
+ * @return builder
+ * @deprecated use {@link this#serviceFactory(ServiceFactory)}
+ */
+ @Deprecated
public Builder services(ServiceInfo... services) {
- serviceProviders.add(call -> Arrays.stream(services).collect(Collectors.toList()));
+ this.services.addAll(Arrays.asList(services));
return this;
}
/**
* Adds service instance to microservice.
*
+ *
WARNING This service will be ignored if custom {@link ServiceFactory} is
+ * installed. This method has been left for backward compatibility only and will be removed in
+ * future releases.
+ *
* @param services service instance.
* @return builder
+ * @deprecated use {@link this#serviceFactory(ServiceFactory)}
*/
+ @Deprecated
public Builder services(Object... services) {
- serviceProviders.add(
- call ->
- Arrays.stream(services)
- .map(
- s ->
- s instanceof ServiceInfo
- ? (ServiceInfo) s
- : ServiceInfo.fromServiceInstance(s).build())
- .collect(Collectors.toList()));
+ Stream.of(services)
+ .map(service -> ServiceInfo.fromServiceInstance(service).build())
+ .forEach(this.services::add);
return this;
}
+ /**
+ * Set up service provider.
+ *
+ *
WARNING This service will be ignored if custom {@link ServiceFactory} is
+ * installed. This method has been left for backward compatibility only and will be removed in
+ * future releases.
+ *
+ * @param serviceProvider - old service provider
+ * @return this
+ * @deprecated use {@link this#serviceFactory(ServiceFactory)}
+ */
+ @Deprecated
public Builder services(ServiceProvider serviceProvider) {
- serviceProviders.add(serviceProvider);
+ this.serviceProviders.add(serviceProvider);
+ return this;
+ }
+
+ public Builder serviceFactory(ServiceFactory serviceFactory) {
+ this.serviceFactory = serviceFactory;
return this;
}
@@ -553,30 +640,28 @@ private CompositeServiceDiscovery addOperator(UnaryOperator createInstance(
- Microservices microservices, ServiceDiscoveryOptions options) {
+ private Mono createInstance(Microservices microservices) {
this.microservices = microservices;
this.scheduler = Schedulers.newSingle("composite-discovery", true);
+ ServiceDiscoveryOptions discoveryOptions = new ServiceDiscoveryOptions();
for (UnaryOperator operator : this.optionOperators) {
-
- final ServiceDiscoveryOptions finalOptions = operator.apply(options);
+ final ServiceDiscoveryOptions finalOptions = operator.apply(discoveryOptions);
final String id = finalOptions.id();
- final ServiceEndpoint serviceEndpoint = finalOptions.serviceEndpoint();
final ServiceDiscovery serviceDiscovery =
- finalOptions.discoveryFactory().createServiceDiscovery(serviceEndpoint);
+ finalOptions.discoveryFactory().createServiceDiscovery(microservices.serviceEndpoint());
- discoveryInstances.put(id, serviceDiscovery);
+ this.discoveryInstances.put(id, serviceDiscovery);
- discoveryContexts.put(
+ this.discoveryContexts.put(
id,
ServiceDiscoveryContext.builder()
.id(id)
.address(Address.NULL_ADDRESS)
.discovery(serviceDiscovery)
.serviceRegistry(microservices.serviceRegistry)
- .scheduler(scheduler)
+ .scheduler(this.scheduler)
.build());
}
@@ -585,28 +670,28 @@ private Mono createInstance(
private Mono startListen() {
return start() // start composite discovery
- .doOnSubscribe(s -> LOGGER.info("[{}][startListen] Starting", microservices.id()))
- .doOnSuccess(avoid -> LOGGER.info("[{}][startListen] Started", microservices.id()))
+ .doOnSubscribe(s -> LOGGER.info("[{}][startListen] Starting", this.microservices.id()))
+ .doOnSuccess(avoid -> LOGGER.info("[{}][startListen] Started", this.microservices.id()))
.doOnError(
ex ->
LOGGER.error(
"[{}][startListen] Exception occurred: {}",
- microservices.id(),
+ this.microservices.id(),
ex.toString()));
}
@Override
public Flux listen() {
- return Flux.fromStream(microservices.serviceRegistry.listServiceEndpoints().stream())
+ return Flux.fromStream(this.microservices.serviceRegistry.listServiceEndpoints().stream())
.map(ServiceDiscoveryEvent::newEndpointAdded)
- .concatWith(subject)
- .subscribeOn(scheduler)
- .publishOn(scheduler);
+ .concatWith(this.subject)
+ .subscribeOn(this.scheduler)
+ .publishOn(this.scheduler);
}
@Override
public Mono start() {
- return Flux.fromIterable(discoveryInstances.entrySet())
+ return Flux.fromIterable(this.discoveryInstances.entrySet())
.flatMap(
entry -> {
final String id = entry.getKey();
@@ -689,20 +774,20 @@ private Mono start(Microservices microservices, GatewayOptions
s ->
LOGGER.info(
"[{}][gateway][{}][start] Starting",
- microservices.id(),
+ microservices.id,
gateway.id()))
.doOnSuccess(
gateway1 ->
LOGGER.info(
"[{}][gateway][{}][start] Started, address: {}",
- microservices.id(),
+ microservices.id,
gateway1.id(),
gateway1.address()))
.doOnError(
ex ->
LOGGER.error(
"[{}][gateway][{}][start] Exception occurred: {}",
- microservices.id(),
+ microservices.id,
gateway.id(),
ex.toString()));
})
@@ -746,14 +831,14 @@ public ServiceTransportBootstrap(Supplier transportSupplier) {
}
private Mono start(Microservices microservices) {
- if (transportSupplier == NULL_SUPPLIER
- || (serviceTransport = transportSupplier.get()) == null) {
+ if (this.transportSupplier == NULL_SUPPLIER
+ || (this.serviceTransport = this.transportSupplier.get()) == null) {
return Mono.just(NULL_INSTANCE);
}
- return serviceTransport
+ return this.serviceTransport
.start()
- .doOnSuccess(transport -> serviceTransport = transport) // reset self
+ .doOnSuccess(transport -> this.serviceTransport = transport) // reset self
.flatMap(
transport -> serviceTransport.serverTransport().bind(microservices.methodRegistry))
.doOnSuccess(transport -> serverTransport = transport)
@@ -767,31 +852,35 @@ private Mono start(Microservices microservices) {
return this;
})
.doOnSubscribe(
- s -> LOGGER.info("[{}][serviceTransport][start] Starting", microservices.id()))
+ s -> LOGGER.info("[{}][serviceTransport][start] Starting", microservices.id))
.doOnSuccess(
transport ->
LOGGER.info(
"[{}][serviceTransport][start] Started, address: {}",
- microservices.id(),
+ microservices.id,
this.transportAddress))
.doOnError(
ex ->
LOGGER.error(
"[{}][serviceTransport][start] Exception occurred: {}",
- microservices.id(),
+ microservices.id,
ex.toString()));
}
+ public Address address() {
+ return this.transportAddress;
+ }
+
private Mono shutdown() {
return Mono.defer(
() ->
Flux.concatDelayError(
- Optional.ofNullable(serverTransport)
- .map(ServerTransport::stop)
- .orElse(Mono.empty()),
- Optional.ofNullable(serviceTransport)
- .map(ServiceTransport::stop)
- .orElse(Mono.empty()))
+ Optional.ofNullable(this.serverTransport)
+ .map(ServerTransport::stop)
+ .orElse(Mono.empty()),
+ Optional.ofNullable(this.serviceTransport)
+ .map(ServiceTransport::stop)
+ .orElse(Mono.empty()))
.then());
}
}
@@ -859,7 +948,7 @@ private static String asString(ServiceMethodInvoker invoker) {
.add("methodInfo=" + asString(invoker.methodInfo()))
.add(
"serviceMethod="
- + invoker.service().getClass().getCanonicalName()
+ + invoker.service()
+ "."
+ invoker.methodInfo().methodName()
+ "("
@@ -882,4 +971,47 @@ private static String asString(ServiceInfo serviceInfo) {
.toString();
}
}
+
+ private static final class MicroservicesContextImpl implements MicroservicesContext {
+
+ private final CompositeServiceDiscovery serviceDiscovery;
+ private final Supplier serviceCallSupplier;
+ private final ServiceEndpoint serviceEndpoint;
+
+ private MicroservicesContextImpl(
+ CompositeServiceDiscovery serviceDiscovery,
+ Supplier serviceCallSupplier,
+ ServiceEndpoint serviceEndpoint) {
+ this.serviceDiscovery = serviceDiscovery;
+ this.serviceCallSupplier = serviceCallSupplier;
+ this.serviceEndpoint = serviceEndpoint;
+ }
+
+ @Override
+ public ServiceEndpoint serviceEndpoint() {
+ return this.serviceEndpoint;
+ }
+
+ @Override
+ public ServiceCall serviceCall() {
+ return this.serviceCallSupplier.get();
+ }
+
+ @Override
+ public Flux listenDiscovery() {
+ return this.serviceDiscovery.listen();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see Microservices.Builder#discovery(String, ServiceDiscoveryFactory)
+ */
+ @Override
+ public Flux listenDiscovery(String id) {
+ return Optional.ofNullable(this.serviceDiscovery.discoveryInstances.get(id))
+ .map(ServiceDiscovery::listen)
+ .orElseThrow(() -> new NoSuchElementException("[discovery] id: " + id));
+ }
+ }
}
diff --git a/services/src/main/java/io/scalecube/services/ScalecubeServiceFactory.java b/services/src/main/java/io/scalecube/services/ScalecubeServiceFactory.java
new file mode 100644
index 000000000..4f444f835
--- /dev/null
+++ b/services/src/main/java/io/scalecube/services/ScalecubeServiceFactory.java
@@ -0,0 +1,150 @@
+package io.scalecube.services;
+
+import io.scalecube.services.inject.Injector;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import reactor.core.publisher.Mono;
+
+public class ScalecubeServiceFactory implements ServiceFactory {
+
+ private final Supplier> serviceFactory;
+
+ // lazy init
+ private final AtomicReference> services = new AtomicReference<>();
+ private Supplier serviceCallSupplier;
+ // lazy init
+ private MicroservicesContext microserviceContext;
+
+ private ScalecubeServiceFactory(Collection serviceProviders) {
+ this.serviceFactory =
+ () -> {
+ final ServiceCall serviceCall = this.serviceCallSupplier.get();
+ return serviceProviders.stream()
+ .map(serviceProvider -> serviceProvider.provide(serviceCall))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ };
+ }
+
+ /**
+ * Create the instance from {@link ServiceProvider}.
+ *
+ * @param serviceProviders old service providers.
+ * @return default services factory.
+ * @deprecated use {@link this#fromInstances(Object...)}
+ */
+ public static ServiceFactory create(Collection serviceProviders) {
+ return new ScalecubeServiceFactory(serviceProviders);
+ }
+
+ /**
+ * Create the instance {@link ServiceFactory} with pre-installed services.
+ *
+ * @param services user's services
+ * @return service factory
+ */
+ public static ServiceFactory fromInstances(Object... services) {
+ ServiceProvider provider =
+ call ->
+ Stream.of(services)
+ .map(
+ service -> {
+ ServiceInfo.Builder builder;
+ if (service instanceof ServiceInfo) {
+ builder = ServiceInfo.from((ServiceInfo) service);
+ } else {
+ builder = ServiceInfo.fromServiceInstance(service);
+ }
+ return builder.build();
+ })
+ .collect(Collectors.toList());
+ return new ScalecubeServiceFactory(Collections.singleton(provider));
+ }
+
+ /**
+ * Since the service instance factory ({@link ServiceProvider}) we have to leave behind does not
+ * provide us with information about the types of services produced, there is nothing left for us
+ * to do but start creating all the services and then retrieve the type of service, previously
+ * saving it as a {@link ScalecubeServiceFactory} state.
+ *
+ * {@inheritDoc}
+ *
+ *
Use {@link io.scalecube.services.annotations.Inject} for inject {@link Microservices},
+ * {@link io.scalecube.services.ServiceCall}.
+ *
+ * @see ServiceInfo
+ * @see ServiceDefinition
+ * @return
+ */
+ @Override
+ public Collection getServiceDefinitions() {
+ return this.services().stream()
+ .map(
+ serviceInfo ->
+ new ServiceDefinition(serviceInfo.serviceInstance().getClass(), serviceInfo.tags()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Use {@link io.scalecube.services.annotations.AfterConstruct} for initialization service's
+ * instance.
+ *
+ * @param microservices microservices context
+ */
+ @Override
+ public Mono extends Collection> initializeServices(
+ MicroservicesContext microservices) {
+ return Mono.fromCallable(
+ () -> {
+ this.microserviceContext = microservices;
+ return this.services().stream()
+ .map(service -> Injector.inject(microservices, service))
+ .map(service -> Injector.processAfterConstruct(microservices, service))
+ .collect(Collectors.toList());
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Use {@link io.scalecube.services.annotations.BeforeDestroy} for finilization service's
+ * instance.
+ *
+ * @return
+ */
+ @Override
+ public Mono shutdownServices() {
+ return Mono.fromRunnable(() -> shutdown0(this.microserviceContext));
+ }
+
+ private void shutdown0(MicroservicesContext microservices) {
+ if (this.services.get() != null) {
+ this.services.get().forEach(service -> Injector.processBeforeDestroy(microservices, service));
+ }
+ }
+
+ private Collection services() {
+ return this.services.updateAndGet(
+ currentValue -> currentValue == null ? this.serviceFactory.get() : currentValue);
+ }
+
+ /**
+ * Setting serviceCall supplier.
+ *
+ * @param serviceCallSupplier lazy serviceCall initialization function
+ * @return current instance scalecube service factory
+ * @deprecated see reason in {@link ServiceProvider}
+ */
+ @Deprecated
+ ScalecubeServiceFactory setServiceCall(Supplier serviceCallSupplier) {
+ this.serviceCallSupplier = serviceCallSupplier;
+ return this;
+ }
+}
diff --git a/services/src/main/java/io/scalecube/services/Injector.java b/services/src/main/java/io/scalecube/services/inject/Injector.java
similarity index 54%
rename from services/src/main/java/io/scalecube/services/Injector.java
rename to services/src/main/java/io/scalecube/services/inject/Injector.java
index c7433dbfe..f4dbc5a8b 100644
--- a/services/src/main/java/io/scalecube/services/Injector.java
+++ b/services/src/main/java/io/scalecube/services/inject/Injector.java
@@ -1,5 +1,9 @@
-package io.scalecube.services;
+package io.scalecube.services.inject;
+import io.scalecube.services.MicroservicesContext;
+import io.scalecube.services.Reflect;
+import io.scalecube.services.ServiceCall;
+import io.scalecube.services.ServiceInfo;
import io.scalecube.services.annotations.AfterConstruct;
import io.scalecube.services.annotations.BeforeDestroy;
import io.scalecube.services.annotations.Inject;
@@ -8,40 +12,40 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
-import java.util.Collection;
import reactor.core.Exceptions;
/** Service Injector scan and injects beans to a given Microservices instance. */
-final class Injector {
+public final class Injector {
private Injector() {
// Do not instantiate
}
/**
- * Inject instances to the microservices instance. either Microservices or ServiceProxy. Scan all
+ * Inject instance to the microservices instance. either Microservices or ServiceProxy. Scan all
* local service instances and inject a service proxy.
*
* @param microservices microservices instance
- * @param services services set
- * @return microservices instance
+ * @param service service
+ * @return service instance
*/
- public static Microservices inject(Microservices microservices, Collection services) {
- services.forEach(
- service ->
- Arrays.stream(service.getClass().getDeclaredFields())
- .forEach(field -> injectField(microservices, field, service)));
- services.forEach(service -> processAfterConstruct(microservices, service));
- return microservices;
+ public static ServiceInfo inject(
+ MicroservicesContext microservices, ServiceInfo service) {
+ Object serviceInstance = service.serviceInstance();
+ Arrays.stream(serviceInstance.getClass().getDeclaredFields())
+ .forEach(field -> injectField(microservices, field, serviceInstance));
+ return service;
}
- private static void injectField(Microservices microservices, Field field, Object service) {
- if (field.isAnnotationPresent(Inject.class) && field.getType().equals(Microservices.class)) {
+ private static void injectField(
+ MicroservicesContext microservices, Field field, Object service) {
+ if (field.isAnnotationPresent(Inject.class)
+ && MicroservicesContext.class.isAssignableFrom(field.getType())) {
setField(field, service, microservices);
} else if (field.isAnnotationPresent(Inject.class) && Reflect.isService(field.getType())) {
Inject injection = field.getAnnotation(Inject.class);
Class extends Router> routerClass = injection.router();
- final ServiceCall call = microservices.call();
+ final ServiceCall call = microservices.serviceCall();
if (!routerClass.isInterface()) {
call.router(routerClass);
}
@@ -59,16 +63,26 @@ private static void setField(Field field, Object object, Object value) {
}
}
- private static void processAfterConstruct(Microservices microservices, Object targetInstance) {
- processMethodWithAnnotation(microservices, targetInstance, AfterConstruct.class);
+ /**
+ * Run methods with {@link AfterConstruct} annotations.
+ *
+ * @param microservices scale cube instance.
+ * @param serviceInfo service info.
+ * @return service info with modified service's instance.
+ */
+ public static ServiceInfo processAfterConstruct(
+ MicroservicesContext microservices, ServiceInfo serviceInfo) {
+ processMethodWithAnnotation(microservices, serviceInfo.serviceInstance(), AfterConstruct.class);
+ return serviceInfo;
}
- public static void processBeforeDestroy(Microservices microservices, Object targetInstance) {
- processMethodWithAnnotation(microservices, targetInstance, BeforeDestroy.class);
+ public static void processBeforeDestroy(
+ MicroservicesContext microservices, ServiceInfo serviceInfo) {
+ processMethodWithAnnotation(microservices, serviceInfo.serviceInstance(), BeforeDestroy.class);
}
private static void processMethodWithAnnotation(
- Microservices microservices, Object targetInstance, Class annotation) {
+ MicroservicesContext microservices, Object targetInstance, Class annotation) {
Method[] declaredMethods = targetInstance.getClass().getDeclaredMethods();
Arrays.stream(declaredMethods)
.filter(method -> method.isAnnotationPresent(annotation))
@@ -80,10 +94,10 @@ private static void processMethodWithAnnotation(
Arrays.stream(targetMethod.getParameters())
.map(
mapper -> {
- if (mapper.getType().equals(Microservices.class)) {
+ if (MicroservicesContext.class.isAssignableFrom(mapper.getType())) {
return microservices;
} else if (Reflect.isService(mapper.getType())) {
- return microservices.call().api(mapper.getType());
+ return microservices.serviceCall().api(mapper.getType());
} else {
return null;
}
diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java
index 91b94a8ba..eb63a578f 100644
--- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java
+++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java
@@ -37,7 +37,7 @@ public static void initNodes() {
new ScalecubeServiceDiscovery(endpoint)
.transport(cfg -> cfg.port(PORT.incrementAndGet())))
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
final Address seedAddress = provider.discovery("provider").address();
@@ -64,7 +64,7 @@ public static void shutdownNodes() {
public void testCorruptedRequest() {
Publisher req =
consumer
- .call()
+ .serviceCall()
.requestOne(TestRequests.GREETING_CORRUPTED_PAYLOAD_REQUEST, GreetingResponse.class);
assertThrows(InternalServiceException.class, () -> from(req).block());
}
@@ -73,7 +73,7 @@ public void testCorruptedRequest() {
public void testNotAuthorized() {
Publisher req =
consumer
- .call()
+ .serviceCall()
.requestOne(TestRequests.GREETING_UNAUTHORIZED_REQUEST, GreetingResponse.class);
assertThrows(ForbiddenException.class, () -> from(req).block());
}
@@ -81,13 +81,15 @@ public void testNotAuthorized() {
@Test
public void testNullRequestPayload() {
Publisher req =
- consumer.call().requestOne(TestRequests.GREETING_NULL_PAYLOAD, GreetingResponse.class);
+ consumer
+ .serviceCall()
+ .requestOne(TestRequests.GREETING_NULL_PAYLOAD, GreetingResponse.class);
assertThrows(BadRequestException.class, () -> from(req).block());
}
@Test
public void testServiceUnavailable() {
- StepVerifier.create(consumer.call().requestOne(TestRequests.NOT_FOUND_REQ))
+ StepVerifier.create(consumer.serviceCall().requestOne(TestRequests.NOT_FOUND_REQ))
.expectError(ServiceUnavailableException.class)
.verify();
}
diff --git a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java
index 5376c73d5..37072970a 100644
--- a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java
+++ b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java
@@ -98,7 +98,7 @@ static void afterAll() {
@DisplayName("Successful authentication")
void successfulAuthentication() {
SecuredService securedService =
- caller.call().credentials(CREDENTIALS).api(SecuredService.class);
+ caller.serviceCall().credentials(CREDENTIALS).api(SecuredService.class);
StepVerifier.create(securedService.helloWithRequest("Bob"))
.assertNext(response -> assertEquals("Hello, Bob", response))
@@ -127,7 +127,7 @@ void failedAuthenticationWhenAuthenticatorNotProvided() {
.startAwait();
SecuredService securedService =
- caller.call().credentials(CREDENTIALS).api(SecuredService.class);
+ caller.serviceCall().credentials(CREDENTIALS).api(SecuredService.class);
Consumer verifyError =
th -> {
@@ -153,7 +153,7 @@ void failedAuthenticationWhenAuthenticatorNotProvided() {
@Test
@DisplayName("Authentication failed with invalid or empty credentials")
void failedAuthenticationWithInvalidOrEmptyCredentials() {
- SecuredService securedService = caller.call().api(SecuredService.class);
+ SecuredService securedService = caller.serviceCall().api(SecuredService.class);
Consumer verifyError =
th -> {
@@ -189,7 +189,7 @@ void successfulAuthenticationOnPartiallySecuredService() {
.startAwait();
PartiallySecuredService proxy =
- caller.call().credentials(CREDENTIALS).api(PartiallySecuredService.class);
+ caller.serviceCall().credentials(CREDENTIALS).api(PartiallySecuredService.class);
StepVerifier.create(proxy.securedMethod("Alice"))
.assertNext(response -> assertEquals("Hello, Alice", response))
@@ -211,7 +211,8 @@ void successfulCallOfPublicMethodWithoutAuthentication() {
.build())
.startAwait();
- PartiallySecuredService proxy = caller.call().api(PartiallySecuredService.class);
+ PartiallySecuredService proxy =
+ caller.serviceCall().api(PartiallySecuredService.class);
StepVerifier.create(proxy.publicMethod("Alice"))
.assertNext(response -> assertEquals("Hello, Alice", response))
diff --git a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java
index 44f26a202..5c02e16ec 100644
--- a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java
+++ b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java
@@ -52,7 +52,7 @@ public static void tearDown() {
@Test
public void test_local_async_no_params() {
- ServiceCall serviceCall = provider.call().router(RoundRobinServiceRouter.class);
+ ServiceCall serviceCall = provider.serviceCall().router(RoundRobinServiceRouter.class);
// call the service.
Publisher future = serviceCall.requestOne(GREETING_NO_PARAMS_REQUEST);
@@ -67,27 +67,27 @@ private static Microservices serviceProvider() {
return Microservices.builder()
.discovery("serviceProvider", ScalecubeServiceDiscovery::new)
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
}
@Test
public void test_local_void_greeting() {
// WHEN
- provider.call().oneWay(GREETING_VOID_REQ).block(Duration.ofSeconds(TIMEOUT));
+ provider.serviceCall().oneWay(GREETING_VOID_REQ).block(Duration.ofSeconds(TIMEOUT));
}
@Test
public void test_local_failng_void_greeting() {
- StepVerifier.create(provider.call().oneWay(GREETING_FAILING_VOID_REQ))
+ StepVerifier.create(provider.serviceCall().oneWay(GREETING_FAILING_VOID_REQ))
.expectErrorMessage(GREETING_FAILING_VOID_REQ.data().toString())
.verify(Duration.ofSeconds(TIMEOUT));
}
@Test
public void test_local_throwing_void_greeting() {
- StepVerifier.create(provider.call().oneWay(GREETING_THROWING_VOID_REQ))
+ StepVerifier.create(provider.serviceCall().oneWay(GREETING_THROWING_VOID_REQ))
.expectErrorMessage(GREETING_THROWING_VOID_REQ.data().toString())
.verify(Duration.ofSeconds(TIMEOUT));
}
@@ -98,7 +98,7 @@ public void test_local_fail_greeting() {
Throwable exception =
assertThrows(
ServiceException.class,
- () -> Mono.from(provider.call().requestOne(GREETING_FAIL_REQ)).block(timeout));
+ () -> Mono.from(provider.serviceCall().requestOne(GREETING_FAIL_REQ)).block(timeout));
assertEquals("GreetingRequest{name='joe'}", exception.getMessage());
}
@@ -109,14 +109,15 @@ public void test_local_exception_greeting() {
Throwable exception =
assertThrows(
ServiceException.class,
- () -> Mono.from(provider.call().requestOne(GREETING_ERROR_REQ)).block(timeout));
+ () -> Mono.from(provider.serviceCall().requestOne(GREETING_ERROR_REQ)).block(timeout));
}
@Test
public void test_local_async_greeting_return_GreetingResponse() {
// When
- Publisher resultFuture = provider.call().requestOne(GREETING_REQUEST_REQ);
+ Publisher resultFuture =
+ provider.serviceCall().requestOne(GREETING_REQUEST_REQ);
// Then
ServiceMessage result = Mono.from(resultFuture).block(Duration.ofSeconds(TIMEOUT));
@@ -128,7 +129,7 @@ public void test_local_async_greeting_return_GreetingResponse() {
@Test
public void test_local_greeting_request_timeout_expires() {
- ServiceCall service = provider.call();
+ ServiceCall service = provider.serviceCall();
// call the service.
Publisher future = service.requestOne(GREETING_REQUEST_TIMEOUT_REQ);
@@ -141,7 +142,7 @@ public void test_local_greeting_request_timeout_expires() {
@Test
public void test_local_async_greeting_return_Message() {
- ServiceMessage result = provider.call().requestOne(GREETING_REQUEST_REQ).block(timeout);
+ ServiceMessage result = provider.serviceCall().requestOne(GREETING_REQUEST_REQ).block(timeout);
// print the greeting.
GreetingResponse responseData = result.data();
@@ -153,9 +154,9 @@ public void test_local_async_greeting_return_Message() {
@Test
public void test_remote_mono_empty_request_response_greeting_messsage() {
StepVerifier.create(
- provider
- .call()
- .requestOne(GREETING_EMPTY_REQUEST_RESPONSE, EmptyGreetingResponse.class))
+ provider
+ .serviceCall()
+ .requestOne(GREETING_EMPTY_REQUEST_RESPONSE, EmptyGreetingResponse.class))
.expectNextMatches(resp -> resp.data() instanceof EmptyGreetingResponse)
.expectComplete()
.verify(timeout);
@@ -164,7 +165,7 @@ public void test_remote_mono_empty_request_response_greeting_messsage() {
@Test
public void test_async_greeting_return_string_service_not_found_error_case() {
- ServiceCall service = provider.call();
+ ServiceCall service = provider.serviceCall();
try {
// call the service.
diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java
index 09bb1456b..e2ef767bc 100644
--- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java
+++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java
@@ -72,14 +72,14 @@ private static Microservices serviceProvider(Object service) {
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(gateway.discovery("gateway").address())))
.transport(RSocketServiceTransport::new)
- .services(service)
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(service))
.startAwait();
}
@Test
public void test_remote_async_greeting_no_params() {
- ServiceCall serviceCall = gateway.call();
+ ServiceCall serviceCall = gateway.serviceCall();
// call the service.
Publisher future =
@@ -93,13 +93,17 @@ public void test_remote_async_greeting_no_params() {
@Test
public void test_remote_void_greeting() {
// When
- StepVerifier.create(gateway.call().oneWay(GREETING_VOID_REQ)).expectComplete().verify(TIMEOUT);
+ StepVerifier.create(gateway.serviceCall().oneWay(GREETING_VOID_REQ))
+ .expectComplete()
+ .verify(TIMEOUT);
}
@Test
public void test_remote_mono_empty_request_response_greeting_messsage() {
StepVerifier.create(
- gateway.call().requestOne(GREETING_EMPTY_REQUEST_RESPONSE, EmptyGreetingResponse.class))
+ gateway
+ .serviceCall()
+ .requestOne(GREETING_EMPTY_REQUEST_RESPONSE, EmptyGreetingResponse.class))
.expectNextMatches(resp -> resp.data() instanceof EmptyGreetingResponse)
.expectComplete()
.verify(TIMEOUT);
@@ -109,7 +113,7 @@ public void test_remote_mono_empty_request_response_greeting_messsage() {
public void test_remote_failing_void_greeting() {
// When
- StepVerifier.create(gateway.call().requestOne(GREETING_FAILING_VOID_REQ, Void.class))
+ StepVerifier.create(gateway.serviceCall().requestOne(GREETING_FAILING_VOID_REQ, Void.class))
.expectErrorMessage(GREETING_FAILING_VOID_REQ.data().toString())
.verify(TIMEOUT);
}
@@ -117,7 +121,7 @@ public void test_remote_failing_void_greeting() {
@Test
public void test_remote_throwing_void_greeting() {
// When
- StepVerifier.create(gateway.call().oneWay(GREETING_THROWING_VOID_REQ))
+ StepVerifier.create(gateway.serviceCall().oneWay(GREETING_THROWING_VOID_REQ))
.expectErrorMessage(GREETING_THROWING_VOID_REQ.data().toString())
.verify(TIMEOUT);
}
@@ -129,7 +133,8 @@ public void test_remote_fail_greeting() {
assertThrows(
ServiceException.class,
() ->
- Mono.from(gateway.call().requestOne(GREETING_FAIL_REQ, GreetingResponse.class))
+ Mono.from(
+ gateway.serviceCall().requestOne(GREETING_FAIL_REQ, GreetingResponse.class))
.block(TIMEOUT));
assertEquals("GreetingRequest{name='joe'}", exception.getMessage());
}
@@ -142,7 +147,10 @@ public void test_remote_exception_void() {
assertThrows(
ServiceException.class,
() ->
- Mono.from(gateway.call().requestOne(GREETING_ERROR_REQ, GreetingResponse.class))
+ Mono.from(
+ gateway
+ .serviceCall()
+ .requestOne(GREETING_ERROR_REQ, GreetingResponse.class))
.block(TIMEOUT));
assertEquals("GreetingRequest{name='joe'}", exception.getMessage());
}
@@ -150,7 +158,8 @@ public void test_remote_exception_void() {
@Test
public void test_remote_async_greeting_return_string() {
- Publisher resultFuture = gateway.call().requestOne(GREETING_REQ, String.class);
+ Publisher resultFuture =
+ gateway.serviceCall().requestOne(GREETING_REQ, String.class);
// Then
ServiceMessage result = Mono.from(resultFuture).block(TIMEOUT);
@@ -164,7 +173,7 @@ public void test_remote_async_greeting_return_GreetingResponse() {
// When
Publisher result =
- gateway.call().requestOne(GREETING_REQUEST_REQ, GreetingResponse.class);
+ gateway.serviceCall().requestOne(GREETING_REQUEST_REQ, GreetingResponse.class);
// Then
GreetingResponse greeting = Mono.from(result).block(TIMEOUT).data();
@@ -174,7 +183,7 @@ public void test_remote_async_greeting_return_GreetingResponse() {
@Test
public void test_remote_greeting_request_timeout_expires() {
- ServiceCall service = gateway.call();
+ ServiceCall service = gateway.serviceCall();
// call the service.
Publisher future = service.requestOne(GREETING_REQUEST_TIMEOUT_REQ);
@@ -186,7 +195,7 @@ public void test_remote_greeting_request_timeout_expires() {
// Since here and below tests were not reviewed [sergeyr]
@Test
public void test_remote_async_greeting_return_Message() {
- ServiceCall service = gateway.call();
+ ServiceCall service = gateway.serviceCall();
// call the service.
Publisher future = service.requestOne(GREETING_REQUEST_REQ);
@@ -206,7 +215,7 @@ public void test_remote_async_greeting_return_Message() {
public void test_remote_dispatcher_remote_greeting_request_completes_before_timeout() {
Publisher result =
- gateway.call().requestOne(GREETING_REQUEST_REQ, GreetingResponse.class);
+ gateway.serviceCall().requestOne(GREETING_REQUEST_REQ, GreetingResponse.class);
GreetingResponse greetings = Mono.from(result).block(TIMEOUT).data();
System.out.println("greeting_request_completes_before_timeout : " + greetings.getResult());
@@ -218,7 +227,7 @@ public void test_service_address_lookup_occur_only_after_subscription() {
Flux quotes =
gateway
- .call()
+ .serviceCall()
.requestMany(
ServiceMessage.builder()
.qualifier(QuoteService.NAME, "onlyOneAndThenNever")
@@ -240,7 +249,7 @@ public void test_service_address_lookup_occur_only_after_subscription() {
@Disabled("https://github.com/scalecube/scalecube-services/issues/742")
public void test_many_stream_block_first() {
- ServiceCall call = gateway.call();
+ ServiceCall call = gateway.serviceCall();
ServiceMessage request = TestRequests.GREETING_MANY_STREAM_30;
diff --git a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java
index 4470e08e3..088603128 100644
--- a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java
+++ b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java
@@ -9,8 +9,10 @@
import io.scalecube.services.sut.GreetingResponse;
import io.scalecube.services.sut.GreetingService;
import io.scalecube.services.sut.GreetingServiceImpl;
+
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
+
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -27,7 +29,10 @@ public class ServiceLocalTest extends BaseTest {
@BeforeEach
public void setUp() {
- microservices = Microservices.builder().services(new GreetingServiceImpl()).startAwait();
+ microservices =
+ Microservices.builder()
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
+ .startAwait();
}
@AfterEach
@@ -39,7 +44,7 @@ public void cleanUp() {
@Test
public void test_local_greeting_request_completes_before_timeout() {
- GreetingService service = microservices.call().api(GreetingService.class);
+ GreetingService service = microservices.serviceCall().api(GreetingService.class);
// call the service.
GreetingResponse result =
@@ -209,7 +214,7 @@ void test_local_greeting_message() {
.verify(timeout);
// using serviceCall directly
- ServiceCall serviceCall = microservices.call();
+ ServiceCall serviceCall = microservices.serviceCall();
StepVerifier.create(
serviceCall.requestOne(
@@ -305,6 +310,6 @@ public void test_local_bidi_greeting_expect_GreetingResponse() {
}
private GreetingService createProxy(Microservices gateway) {
- return gateway.call().api(GreetingService.class); // create proxy for GreetingService API
+ return gateway.serviceCall().api(GreetingService.class); // create proxy for GreetingService API
}
}
diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java
index 8a5c044d8..e36ad9d99 100644
--- a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java
+++ b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java
@@ -58,14 +58,14 @@ public void test_added_removed_registration_events(MetadataCodec metadataCodec)
Microservices.builder()
.discovery("ms1", defServiceDiscovery(seedAddress, metadataCodec))
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
Microservices ms2 =
Microservices.builder()
.discovery("ms2", defServiceDiscovery(seedAddress, metadataCodec))
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
StepVerifier.create(events)
@@ -92,7 +92,7 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) {
Microservices.builder()
.discovery("seed", defServiceDiscovery(metadataCodec))
.transport(RSocketServiceTransport::new)
- .services(new AnnotationServiceImpl())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new AnnotationServiceImpl()))
.startAwait();
cluster.add(seed);
@@ -107,7 +107,8 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) {
Microservices.builder()
.discovery("ms1", defServiceDiscovery(seedAddress, metadataCodec))
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(
+ ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
cluster.add(ms1);
})
@@ -118,21 +119,25 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) {
Microservices.builder()
.discovery("ms2", defServiceDiscovery(seedAddress, metadataCodec))
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(
+ ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
cluster.add(ms2);
})
.assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type()))
.then(() -> cluster.remove(2).shutdown().block(TIMEOUT))
.assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type()))
+ .thenAwait(TIMEOUT)
.assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type()))
.then(() -> cluster.remove(1).shutdown().block(TIMEOUT))
.assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type()))
+ .thenAwait(TIMEOUT)
.assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type()))
.thenCancel()
.verify(TIMEOUT);
- StepVerifier.create(seed.call().api(AnnotationService.class).serviceDiscoveryEventTypes())
+ StepVerifier.create(
+ seed.serviceCall().api(AnnotationService.class).serviceDiscoveryEventTypes())
.assertNext(type -> assertEquals(ENDPOINT_ADDED, type))
.assertNext(type -> assertEquals(ENDPOINT_ADDED, type))
.assertNext(type -> assertEquals(ENDPOINT_LEAVING, type))
@@ -157,7 +162,7 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec)
Microservices.builder()
.discovery("seed", defServiceDiscovery(metadataCodec))
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
cluster.add(seed);
@@ -172,7 +177,9 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec)
Microservices.builder()
.discovery("ms1", defServiceDiscovery(seedAddress, metadataCodec))
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl(), new AnnotationServiceImpl())
+ .serviceFactory(
+ ScalecubeServiceFactory.fromInstances(
+ new GreetingServiceImpl(), new AnnotationServiceImpl()))
.startAwait();
cluster.add(ms1);
})
@@ -183,7 +190,8 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec)
Microservices.builder()
.discovery("ms2", defServiceDiscovery(seedAddress, metadataCodec))
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(
+ ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
cluster.add(ms2);
})
@@ -191,7 +199,8 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec)
.thenCancel()
.verify(TIMEOUT);
- StepVerifier.create(seed.call().api(AnnotationService.class).serviceDiscoveryEventTypes())
+ StepVerifier.create(
+ seed.serviceCall().api(AnnotationService.class).serviceDiscoveryEventTypes())
.assertNext(type -> assertEquals(ENDPOINT_ADDED, type))
.assertNext(type -> assertEquals(ENDPOINT_ADDED, type))
.thenCancel()
diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java
index 1a51715f1..c3b4420c2 100644
--- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java
+++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java
@@ -76,7 +76,7 @@ private static Microservices serviceProvider() {
return Microservices.builder()
.discovery("serviceProvider", ServiceRemoteTest::serviceDiscovery)
.transport(RSocketServiceTransport::new)
- .services(new GreetingServiceImpl())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
}
@@ -84,7 +84,7 @@ private static Microservices serviceProvider() {
public void test_remote_greeting_request_completes_before_timeout() {
Duration duration = Duration.ofSeconds(1);
- GreetingService service = gateway.call().api(GreetingService.class);
+ GreetingService service = gateway.serviceCall().api(GreetingService.class);
// call the service.
Mono result =
@@ -95,7 +95,7 @@ public void test_remote_greeting_request_completes_before_timeout() {
@Test
public void test_remote_void_greeting() throws Exception {
- GreetingService service = gateway.call().api(GreetingService.class);
+ GreetingService service = gateway.serviceCall().api(GreetingService.class);
// call the service.
service.greetingVoid(new GreetingRequest("joe")).block(Duration.ofSeconds(3));
@@ -108,7 +108,7 @@ public void test_remote_void_greeting() throws Exception {
@Test
public void test_remote_failing_void_greeting() {
- GreetingService service = gateway.call().api(GreetingService.class);
+ GreetingService service = gateway.serviceCall().api(GreetingService.class);
GreetingRequest request = new GreetingRequest("joe");
// call the service.
@@ -119,7 +119,7 @@ public void test_remote_failing_void_greeting() {
@Test
public void test_remote_throwing_void_greeting() {
- GreetingService service = gateway.call().api(GreetingService.class);
+ GreetingService service = gateway.serviceCall().api(GreetingService.class);
GreetingRequest request = new GreetingRequest("joe");
// call the service.
@@ -228,7 +228,7 @@ void test_remote_greeting_message() {
.verify(TIMEOUT);
// using serviceCall directly
- ServiceCall serviceCall = gateway.call();
+ ServiceCall serviceCall = gateway.serviceCall();
StepVerifier.create(
serviceCall.requestOne(
@@ -272,11 +272,13 @@ public void test_remote_serviceA_calls_serviceB_using_setter() {
Microservices.builder()
.discovery("provider", ServiceRemoteTest::serviceDiscovery)
.transport(RSocketServiceTransport::new)
- .services(new CoarseGrainedServiceImpl()) // add service a and b
+ .serviceFactory(
+ ScalecubeServiceFactory.fromInstances(
+ new CoarseGrainedServiceImpl())) // add service a and b
.startAwait();
// Get a proxy to the service api.
- CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class);
+ CoarseGrainedService service = gateway.serviceCall().api(CoarseGrainedService.class);
Publisher future = service.callGreeting("joe");
@@ -295,11 +297,11 @@ public void test_remote_serviceA_calls_serviceB() {
Microservices.builder()
.discovery("provider", ServiceRemoteTest::serviceDiscovery)
.transport(RSocketServiceTransport::new)
- .services(another)
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(another))
.startAwait();
// Get a proxy to the service api.
- CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class);
+ CoarseGrainedService service = gateway.serviceCall().api(CoarseGrainedService.class);
Publisher future = service.callGreeting("joe");
assertEquals(" hello to: joe", Mono.from(future).block(Duration.ofSeconds(1)));
provider.shutdown().then(Mono.delay(TIMEOUT2)).block();
@@ -315,11 +317,11 @@ public void test_remote_serviceA_calls_serviceB_with_timeout() {
Microservices.builder()
.discovery("ms", ServiceRemoteTest::serviceDiscovery)
.transport(RSocketServiceTransport::new)
- .services(another) // add service a and b
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(another)) // add service a and b
.startAwait();
// Get a proxy to the service api.
- CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class);
+ CoarseGrainedService service = gateway.serviceCall().api(CoarseGrainedService.class);
InternalServiceException exception =
assertThrows(
InternalServiceException.class,
@@ -340,11 +342,11 @@ public void test_remote_serviceA_calls_serviceB_with_dispatcher() {
Microservices.builder()
.discovery("provider", ServiceRemoteTest::serviceDiscovery)
.transport(RSocketServiceTransport::new)
- .services(another) // add service a and b
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(another)) // add service a and b
.startAwait();
// Get a proxy to the service api.
- CoarseGrainedService service = gateway.call().api(CoarseGrainedService.class);
+ CoarseGrainedService service = gateway.serviceCall().api(CoarseGrainedService.class);
String response = service.callGreetingWithDispatcher("joe").block(Duration.ofSeconds(5));
assertEquals(response, " hello to: joe");
@@ -425,7 +427,7 @@ public void test_services_contribute_to_cluster_metadata() {
.discovery("ms", ScalecubeServiceDiscovery::new)
.transport(RSocketServiceTransport::new)
.tags(tags)
- .services(new GreetingServiceImpl())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new GreetingServiceImpl()))
.startAwait();
assertTrue(ms.serviceEndpoint().tags().containsKey("HOSTNAME"));
@@ -433,7 +435,7 @@ public void test_services_contribute_to_cluster_metadata() {
@Test
public void test_remote_mono_empty_greeting() {
- GreetingService service = gateway.call().api(GreetingService.class);
+ GreetingService service = gateway.serviceCall().api(GreetingService.class);
// call the service.
StepVerifier.create(service.greetingMonoEmpty(new GreetingRequest("empty")))
@@ -443,7 +445,7 @@ public void test_remote_mono_empty_greeting() {
@Test
public void test_remote_mono_empty_request_response_greeting() {
- GreetingService service = gateway.call().api(GreetingService.class);
+ GreetingService service = gateway.serviceCall().api(GreetingService.class);
// call the service.
StepVerifier.create(service.emptyGreeting(new EmptyGreetingRequest()))
@@ -454,7 +456,7 @@ public void test_remote_mono_empty_request_response_greeting() {
@Test
public void test_remote_flux_empty_greeting() {
- GreetingService service = gateway.call().api(GreetingService.class);
+ GreetingService service = gateway.serviceCall().api(GreetingService.class);
// call the service.
StepVerifier.create(service.greetingFluxEmpty(new GreetingRequest("empty")))
@@ -464,7 +466,7 @@ public void test_remote_flux_empty_greeting() {
@Disabled("https://github.com/scalecube/scalecube-services/issues/742")
public void test_many_stream_block_first() {
- GreetingService service = gateway.call().api(GreetingService.class);
+ GreetingService service = gateway.serviceCall().api(GreetingService.class);
for (int i = 0; i < 100; i++) {
//noinspection ConstantConditions
@@ -474,7 +476,7 @@ public void test_many_stream_block_first() {
}
private GreetingService createProxy() {
- return gateway.call().api(GreetingService.class); // create proxy for GreetingService API
+ return gateway.serviceCall().api(GreetingService.class); // create proxy for GreetingService API
}
private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint) {
diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java
index f694374be..e937d26fa 100644
--- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java
+++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java
@@ -45,7 +45,7 @@ public static void setup() {
.membership(cfg -> cfg.seedMembers(gatewayAddress)))
.transport(RSocketServiceTransport::new)
.defaultDataDecoder(ServiceMessageCodec::decodeData)
- .services(new SimpleQuoteService())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new SimpleQuoteService()))
.startAwait();
}
@@ -54,7 +54,7 @@ public void test_quotes() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
Disposable sub =
- node.call()
+ node.serviceCall()
.api(QuoteService.class)
.quotes()
.subscribe(
@@ -70,7 +70,7 @@ public void test_quotes() throws InterruptedException {
@Test
public void test_local_quotes_service() {
- QuoteService service = node.call().api(QuoteService.class);
+ QuoteService service = node.serviceCall().api(QuoteService.class);
int expected = 3;
List list = service.quotes().take(Duration.ofMillis(3500)).collectList().block();
@@ -84,7 +84,7 @@ public void test_remote_quotes_service() throws InterruptedException {
CountDownLatch latch1 = new CountDownLatch(3);
CountDownLatch latch2 = new CountDownLatch(3);
- QuoteService service = gateway.call().api(QuoteService.class);
+ QuoteService service = gateway.serviceCall().api(QuoteService.class);
service.snapshot(3).subscribe(onNext -> latch1.countDown());
service.snapshot(3).subscribe(onNext -> latch2.countDown());
@@ -100,7 +100,7 @@ public void test_remote_quotes_service() throws InterruptedException {
public void test_quotes_batch() throws InterruptedException {
int streamBound = 1000;
- QuoteService service = gateway.call().api(QuoteService.class);
+ QuoteService service = gateway.serviceCall().api(QuoteService.class);
CountDownLatch latch1 = new CountDownLatch(streamBound);
final Disposable sub1 = service.snapshot(streamBound).subscribe(onNext -> latch1.countDown());
@@ -115,7 +115,7 @@ public void test_quotes_batch() throws InterruptedException {
public void test_call_quotes_snapshot() {
int batchSize = 1000;
- ServiceCall serviceCall = gateway.call();
+ ServiceCall serviceCall = gateway.serviceCall();
ServiceMessage message =
ServiceMessage.builder().qualifier(QuoteService.NAME, "snapshot").data(batchSize).build();
@@ -128,14 +128,14 @@ public void test_call_quotes_snapshot() {
@Test
public void test_just_once() {
- QuoteService service = gateway.call().api(QuoteService.class);
+ QuoteService service = gateway.serviceCall().api(QuoteService.class);
assertEquals("1", service.justOne().block(Duration.ofSeconds(2)));
}
@Test
public void test_just_one_message() {
- ServiceCall service = gateway.call();
+ ServiceCall service = gateway.serviceCall();
ServiceMessage justOne =
ServiceMessage.builder().qualifier(QuoteService.NAME, "justOne").build();
@@ -149,7 +149,7 @@ public void test_just_one_message() {
@Test
public void test_scheduled_messages() {
- ServiceCall serviceCall = gateway.call();
+ ServiceCall serviceCall = gateway.serviceCall();
ServiceMessage scheduled =
ServiceMessage.builder().qualifier(QuoteService.NAME, "scheduled").data(1000).build();
@@ -164,7 +164,7 @@ public void test_scheduled_messages() {
@Test
public void test_unknown_method() {
- ServiceCall service = gateway.call();
+ ServiceCall service = gateway.serviceCall();
ServiceMessage scheduled =
ServiceMessage.builder().qualifier(QuoteService.NAME, "unknonwn").build();
@@ -180,7 +180,7 @@ public void test_unknown_method() {
public void test_snapshot_completes() {
int batchSize = 1000;
- ServiceCall serviceCall = gateway.call();
+ ServiceCall serviceCall = gateway.serviceCall();
ServiceMessage message =
ServiceMessage.builder().qualifier(QuoteService.NAME, "snapshot").data(batchSize).build();
diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java
index 6a138d1ee..5fd5d4a42 100644
--- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java
+++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java
@@ -11,8 +11,8 @@
import io.scalecube.net.Address;
import io.scalecube.services.BaseTest;
-import io.scalecube.services.Microservices;
import io.scalecube.services.Reflect;
+import io.scalecube.services.Microservices;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.ServiceInfo;
import io.scalecube.services.ServiceReference;
@@ -136,7 +136,7 @@ public void test_router_factory() {
@Test
public void test_round_robin() {
- ServiceCall service = gateway.call();
+ ServiceCall service = gateway.serviceCall();
// call the service.
GreetingResponse result1 =
@@ -158,7 +158,7 @@ public void test_remote_service_tags() throws Exception {
CanaryService service =
gateway
- .call()
+ .serviceCall()
.router(Routers.getRouter(WeightedRandomRouter.class))
.api(CanaryService.class);
@@ -185,7 +185,7 @@ public void test_remote_service_tags() throws Exception {
public void tesTagsFromAnnotation() {
ServiceCall serviceCall =
provider3
- .call()
+ .serviceCall()
.router(
(req, mes) -> {
ServiceReference tagServiceRef = req.listServiceReferences().get(0);
@@ -207,7 +207,7 @@ public void test_tag_selection_logic() {
ServiceCall service =
gateway
- .call()
+ .serviceCall()
.router(
(reg, msg) ->
reg.listServiceReferences().stream()
@@ -230,7 +230,7 @@ public void test_tag_request_selection_logic() {
ServiceCall service =
gateway
- .call()
+ .serviceCall()
.router(
(reg, msg) ->
reg.listServiceReferences().stream()
@@ -256,7 +256,7 @@ public void test_tag_request_selection_logic() {
public void test_service_tags() throws Exception {
TimeUnit.SECONDS.sleep(3);
- ServiceCall service = gateway.call().router(WeightedRandomRouter.class);
+ ServiceCall service = gateway.serviceCall().router(WeightedRandomRouter.class);
ServiceMessage req =
ServiceMessage.builder()
diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java
index 848e3929a..7a8f71c11 100644
--- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java
+++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java
@@ -57,7 +57,7 @@ public static void main(String[] args) {
.startAwait();
CanaryService service =
- gateway.call().router(WeightedRandomRouter.class).api(CanaryService.class);
+ gateway.serviceCall().router(WeightedRandomRouter.class).api(CanaryService.class);
for (int i = 0; i < 10; i++) {
Mono.from(service.greeting(new GreetingRequest("joe")))
diff --git a/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java b/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java
index 7de7c8747..a8b84cfe3 100644
--- a/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java
+++ b/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java
@@ -1,6 +1,6 @@
package io.scalecube.services.sut;
-import io.scalecube.services.Microservices;
+import io.scalecube.services.MicroservicesContext;
import io.scalecube.services.annotations.AfterConstruct;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import reactor.core.publisher.Flux;
@@ -11,7 +11,7 @@ public class AnnotationServiceImpl implements AnnotationService {
private ReplayProcessor serviceDiscoveryEvents;
@AfterConstruct
- void init(Microservices microservices) {
+ void init(MicroservicesContext microservices) {
this.serviceDiscoveryEvents = ReplayProcessor.create();
microservices.listenDiscovery().subscribe(serviceDiscoveryEvents);
}
diff --git a/services/src/test/java/io/scalecube/services/sut/CoarseGrainedServiceImpl.java b/services/src/test/java/io/scalecube/services/sut/CoarseGrainedServiceImpl.java
index be297b306..61bcf05d7 100644
--- a/services/src/test/java/io/scalecube/services/sut/CoarseGrainedServiceImpl.java
+++ b/services/src/test/java/io/scalecube/services/sut/CoarseGrainedServiceImpl.java
@@ -1,6 +1,6 @@
package io.scalecube.services.sut;
-import io.scalecube.services.Microservices;
+import io.scalecube.services.MicroservicesContext;
import io.scalecube.services.annotations.Inject;
import io.scalecube.services.api.ServiceMessage;
import java.time.Duration;
@@ -12,7 +12,7 @@ public class CoarseGrainedServiceImpl implements CoarseGrainedService {
@Inject private GreetingService greetingService;
- @Inject private Microservices microservices;
+ @Inject private MicroservicesContext microservices;
@Override
public Mono callGreeting(String name) {
@@ -32,7 +32,7 @@ public Mono callGreetingTimeout(String request) {
@Override
public Mono callGreetingWithDispatcher(String request) {
return microservices
- .call()
+ .serviceCall()
.requestOne(
ServiceMessage.builder()
.qualifier(GreetingService.SERVICE_NAME, "greeting")
diff --git a/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java b/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java
index 53425c72d..c990a30f1 100644
--- a/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java
+++ b/services/src/test/java/io/scalecube/services/sut/GreetingServiceImpl.java
@@ -1,6 +1,7 @@
package io.scalecube.services.sut;
import io.scalecube.services.Microservices;
+import io.scalecube.services.MicroservicesContext;
import io.scalecube.services.annotations.Inject;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ForbiddenException;
@@ -12,7 +13,8 @@
public final class GreetingServiceImpl implements GreetingService {
- @Inject Microservices ms;
+ @Inject
+ MicroservicesContext ms;
private int instanceId;
diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java
index 5bd9872ed..e00238f4d 100644
--- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java
+++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java
@@ -12,6 +12,8 @@
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.scalecube.services.ScalecubeServiceFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -42,11 +44,12 @@ public void setUp() {
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(gatewayAddress)))
.transport(RSocketServiceTransport::new)
- .services(new Facade())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new Facade()))
.startAwait();
final Address facadeAddress = facade.discovery("facade").address();
+ PingService pingService = () -> Mono.just(Thread.currentThread().getName());
this.ping =
Microservices.builder()
.discovery(
@@ -55,9 +58,10 @@ public void setUp() {
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(facadeAddress)))
.transport(RSocketServiceTransport::new)
- .services((PingService) () -> Mono.just(Thread.currentThread().getName()))
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(pingService))
.startAwait();
+ PongService pongService = () -> Mono.just(Thread.currentThread().getName());
this.pong =
Microservices.builder()
.discovery(
@@ -66,13 +70,13 @@ public void setUp() {
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(facadeAddress)))
.transport(RSocketServiceTransport::new)
- .services((PongService) () -> Mono.just(Thread.currentThread().getName()))
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(pongService))
.startAwait();
}
@Test
public void testColocatedEventLoopGroup() {
- ServiceCall call = gateway.call();
+ ServiceCall call = gateway.serviceCall();
FacadeService facade = call.api(FacadeService.class);
diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java
index e1a31afc2..1f67ebec1 100644
--- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java
+++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java
@@ -11,6 +11,7 @@
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.exceptions.ConnectionClosedException;
+import io.scalecube.services.ScalecubeServiceFactory;
import io.scalecube.services.sut.QuoteService;
import io.scalecube.services.sut.SimpleQuoteService;
import java.time.Duration;
@@ -54,7 +55,7 @@ public void setUp() {
new ScalecubeServiceDiscovery(serviceEndpoint)
.membership(cfg -> cfg.seedMembers(gatewayAddress)))
.transport(RSocketServiceTransport::new)
- .services(new SimpleQuoteService())
+ .serviceFactory(ScalecubeServiceFactory.fromInstances(new SimpleQuoteService()))
.startAwait();
}
@@ -78,7 +79,7 @@ public void test_remote_node_died_mono_never() throws Exception {
AtomicReference sub1 = new AtomicReference<>(null);
AtomicReference exceptionHolder = new AtomicReference<>(null);
- ServiceCall serviceCall = gateway.call();
+ ServiceCall serviceCall = gateway.serviceCall();
sub1.set(serviceCall.requestOne(JUST_NEVER).doOnError(exceptionHolder::set).subscribe());
gateway
@@ -106,7 +107,7 @@ public void test_remote_node_died_many_never() throws Exception {
AtomicReference sub1 = new AtomicReference<>(null);
AtomicReference exceptionHolder = new AtomicReference<>(null);
- ServiceCall serviceCall = gateway.call();
+ ServiceCall serviceCall = gateway.serviceCall();
sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).doOnError(exceptionHolder::set).subscribe());
gateway
@@ -134,7 +135,7 @@ public void test_remote_node_died_many_then_never() throws Exception {
AtomicReference sub1 = new AtomicReference<>(null);
AtomicReference exceptionHolder = new AtomicReference<>(null);
- ServiceCall serviceCall = gateway.call();
+ ServiceCall serviceCall = gateway.serviceCall();
sub1.set(
serviceCall
.requestMany(ONLY_ONE_AND_THEN_NEVER)