From 5af68d1095766fe4c55c450ff94c0932ebe244ef Mon Sep 17 00:00:00 2001 From: segabriel Date: Wed, 12 Sep 2018 22:55:18 +0300 Subject: [PATCH 1/4] Simplified service discovery and refactoring of registry events --- .../services/ServicesBenchmarksState.java | 4 +- .../scalecube/examples/BootstrapExample.java | 6 +-- .../examples/helloworld/Example1.java | 2 +- .../examples/helloworld/Example2.java | 2 +- .../examples/orderbook/Example1.java | 2 +- .../discovery/api/DiscoveryEvent.java | 53 ------------------- .../discovery/api/ServiceDiscovery.java | 3 -- .../registry/api/EndpointRegistryEvent.java | 48 +++++++++++++++++ .../registry/api/ReferenceRegistryEvent.java | 48 +++++++++++++++++ .../services/registry/api/RegistryEvent.java | 53 ------------------- .../registry/api/RegistryEventType.java | 6 +++ .../registry/api/ServiceRegistry.java | 4 +- .../discovery/ScalecubeServiceDiscovery.java | 38 ++----------- .../io/scalecube/services/Microservices.java | 8 ++- .../registry/ServiceRegistryImpl.java | 44 +++++++++++---- .../io/scalecube/services/ErrorFlowTest.java | 2 +- .../services/ServiceCallRemoteTest.java | 2 +- .../services/ServiceRegistryEventsTest.java | 22 ++++---- .../scalecube/services/ServiceRemoteTest.java | 12 ++--- .../services/ServiceTransportTest.java | 22 ++++---- .../services/StreamingServiceTest.java | 2 +- .../services/routings/RoutersTest.java | 4 +- .../services/routings/ServiceTagsExample.java | 4 +- 23 files changed, 192 insertions(+), 199 deletions(-) delete mode 100644 services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryEvent.java create mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/EndpointRegistryEvent.java create mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/ReferenceRegistryEvent.java delete mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java create mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/RegistryEventType.java diff --git a/benchmarks/src/main/java/io/scalecube/services/benchmarks/services/ServicesBenchmarksState.java b/benchmarks/src/main/java/io/scalecube/services/benchmarks/services/ServicesBenchmarksState.java index ae8445fd1..a0b736453 100644 --- a/benchmarks/src/main/java/io/scalecube/services/benchmarks/services/ServicesBenchmarksState.java +++ b/benchmarks/src/main/java/io/scalecube/services/benchmarks/services/ServicesBenchmarksState.java @@ -32,13 +32,13 @@ public void beforeAll() { node = Microservices.builder() .metrics(registry()) - .seeds(seed.discovery().address()) + .seeds(seed.address()) .services(services) .startAwait(); LOGGER.info( "Seed address: " - + seed.discovery().address() + + seed.address() + ", services address: " + node.serviceAddress() + ", seed serviceRegistry: " diff --git a/examples/src/main/java/io/scalecube/examples/BootstrapExample.java b/examples/src/main/java/io/scalecube/examples/BootstrapExample.java index 6cc6afdf0..f6dd2aee8 100644 --- a/examples/src/main/java/io/scalecube/examples/BootstrapExample.java +++ b/examples/src/main/java/io/scalecube/examples/BootstrapExample.java @@ -44,7 +44,7 @@ public static void main(String[] args) throws Exception { System.out.println("Start HelloWorldService with BusinessLogicFacade"); final Microservices node1 = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services( call -> Collections.singletonList( @@ -57,14 +57,14 @@ public static void main(String[] args) throws Exception { System.out.println("Start ServiceHello"); final Microservices node2 = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(new ServiceHelloImpl()) .startAwait(); System.out.println("Start ServiceWorld"); final Microservices node3 = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(new ServiceWorldImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java b/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java index 601bbfc62..2b14f980a 100644 --- a/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java +++ b/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java @@ -26,7 +26,7 @@ public static void main(String[] args) { // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices microservices = Microservices.builder() - .seeds(seed.discovery().address()) + .seeds(seed.address()) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java b/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java index 4de889d34..829e9fa86 100644 --- a/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java +++ b/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java @@ -34,7 +34,7 @@ public static void main(String[] args) { // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices microservices = Microservices.builder() - .seeds(seed.discovery().address()) + .seeds(seed.address()) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java b/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java index 3a9f92355..019c0a52f 100644 --- a/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java +++ b/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java @@ -33,7 +33,7 @@ public static void main(String[] args) throws InterruptedException { Microservices ms = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(new DefaultMarketDataService()) .startAwait(); diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryEvent.java b/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryEvent.java deleted file mode 100644 index a6b475cd1..000000000 --- a/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryEvent.java +++ /dev/null @@ -1,53 +0,0 @@ -package io.scalecube.services.discovery.api; - -import io.scalecube.services.ServiceEndpoint; -import io.scalecube.services.registry.api.ServiceRegistry; - -/** - * Service registration event. This event is being fired when {@link ServiceEndpoint} is being added - * (or removed from) to (from) {@link ServiceRegistry}. - */ -public class DiscoveryEvent { - - public enum Type { - REGISTERED, // service endpoint added - UNREGISTERED // service endpoint removed - } - - private final ServiceEndpoint serviceEndpoint; - private final Type type; - - private DiscoveryEvent(Type type, ServiceEndpoint serviceEndpoint) { - this.serviceEndpoint = serviceEndpoint; - this.type = type; - } - - public static DiscoveryEvent registered(ServiceEndpoint serviceEndpoint) { - return new DiscoveryEvent(Type.REGISTERED, serviceEndpoint); - } - - public static DiscoveryEvent unregistered(ServiceEndpoint serviceEndpoint) { - return new DiscoveryEvent(Type.UNREGISTERED, serviceEndpoint); - } - - public ServiceEndpoint serviceEndpoint() { - return this.serviceEndpoint; - } - - public Type type() { - return this.type; - } - - @Override - public String toString() { - return "RegistrationEvent [serviceEndpoint=" + serviceEndpoint + ", type=" + type + "]"; - } - - public boolean isRegistered() { - return Type.REGISTERED.equals(this.type); - } - - public boolean isUnregistered() { - return Type.UNREGISTERED.equals(this.type); - } -} diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java index af9956012..9b11d3af5 100644 --- a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java +++ b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java @@ -4,7 +4,6 @@ import io.scalecube.services.ServiceLoaderUtil; import io.scalecube.transport.Address; import java.util.ServiceLoader; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface ServiceDiscovery { @@ -26,6 +25,4 @@ static ServiceDiscovery getDiscovery() { Mono start(DiscoveryConfig discoveryConfig); Mono shutdown(); - - Flux listen(); } diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/EndpointRegistryEvent.java b/services-api/src/main/java/io/scalecube/services/registry/api/EndpointRegistryEvent.java new file mode 100644 index 000000000..e56403559 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/registry/api/EndpointRegistryEvent.java @@ -0,0 +1,48 @@ +package io.scalecube.services.registry.api; + +import io.scalecube.services.ServiceEndpoint; +import io.scalecube.transport.Address; + +public class EndpointRegistryEvent { + + private ServiceEndpoint serviceEndpoint; + private RegistryEventType type; + + public static EndpointRegistryEvent createAdded(ServiceEndpoint serviceEndpoint) { + return new EndpointRegistryEvent(RegistryEventType.ADDED, serviceEndpoint); + } + + public static EndpointRegistryEvent createRemoved(ServiceEndpoint serviceEndpoint) { + return new EndpointRegistryEvent(RegistryEventType.REMOVED, serviceEndpoint); + } + + private EndpointRegistryEvent(RegistryEventType type, ServiceEndpoint serviceEndpoint) { + this.serviceEndpoint = serviceEndpoint; + this.type = type; + } + + private EndpointRegistryEvent(EndpointRegistryEvent e) { + this.serviceEndpoint = e.serviceEndpoint; + this.type = e.type; + } + + public ServiceEndpoint serviceEndpoint() { + return this.serviceEndpoint; + } + + public boolean isAdded() { + return RegistryEventType.ADDED.equals(type); + } + + public boolean isRemoved() { + return RegistryEventType.REMOVED.equals(type); + } + + public RegistryEventType type() { + return this.type; + } + + public Address address() { + return Address.create(this.serviceEndpoint.host(), this.serviceEndpoint.port()); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/ReferenceRegistryEvent.java b/services-api/src/main/java/io/scalecube/services/registry/api/ReferenceRegistryEvent.java new file mode 100644 index 000000000..1662db678 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/registry/api/ReferenceRegistryEvent.java @@ -0,0 +1,48 @@ +package io.scalecube.services.registry.api; + +import io.scalecube.services.ServiceReference; +import io.scalecube.transport.Address; + +public class ReferenceRegistryEvent { + + private ServiceReference serviceReference; + private RegistryEventType type; + + public static ReferenceRegistryEvent createAdded(ServiceReference serviceReference) { + return new ReferenceRegistryEvent(RegistryEventType.ADDED, serviceReference); + } + + public static ReferenceRegistryEvent createRemoved(ServiceReference serviceReference) { + return new ReferenceRegistryEvent(RegistryEventType.REMOVED, serviceReference); + } + + private ReferenceRegistryEvent(RegistryEventType type, ServiceReference serviceReference) { + this.serviceReference = serviceReference; + this.type = type; + } + + private ReferenceRegistryEvent(ReferenceRegistryEvent e) { + this.serviceReference = e.serviceReference; + this.type = e.type; + } + + public ServiceReference serviceReference() { + return this.serviceReference; + } + + public boolean isAdded() { + return RegistryEventType.ADDED.equals(type); + } + + public boolean isRemoved() { + return RegistryEventType.REMOVED.equals(type); + } + + public RegistryEventType type() { + return this.type; + } + + public Address address() { + return Address.create(this.serviceReference.host(), this.serviceReference.port()); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java b/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java deleted file mode 100644 index 420c9bb7d..000000000 --- a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java +++ /dev/null @@ -1,53 +0,0 @@ -package io.scalecube.services.registry.api; - -import io.scalecube.services.ServiceReference; -import io.scalecube.transport.Address; - -public class RegistryEvent { - - public enum Type { - ADDED, - REMOVED; - } - - private ServiceReference serviceReference; - private Type type; - - public static RegistryEvent createAdded(ServiceReference serviceReference) { - return new RegistryEvent(Type.ADDED, serviceReference); - } - - public static RegistryEvent createRemoved(ServiceReference serviceReference) { - return new RegistryEvent(Type.REMOVED, serviceReference); - } - - private RegistryEvent(Type type, ServiceReference serviceReference) { - this.serviceReference = serviceReference; - this.type = type; - } - - private RegistryEvent(RegistryEvent e) { - this.serviceReference = e.serviceReference; - this.type = e.type; - } - - public ServiceReference serviceReference() { - return this.serviceReference; - } - - public boolean isAdded() { - return Type.ADDED.equals(type); - } - - public boolean isRemoved() { - return Type.REMOVED.equals(type); - } - - public Type type() { - return this.type; - } - - public Address address() { - return Address.create(this.serviceReference.host(), this.serviceReference.port()); - } -} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEventType.java b/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEventType.java new file mode 100644 index 000000000..e97fe98d1 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEventType.java @@ -0,0 +1,6 @@ +package io.scalecube.services.registry.api; + +public enum RegistryEventType { + ADDED, + REMOVED; +} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java index c248ea18f..5e62477ab 100644 --- a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java +++ b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java @@ -23,7 +23,9 @@ public interface ServiceRegistry { ServiceEndpoint unregisterService(String endpointId); - Flux listen(); + Flux listenReferenceEvents(); + + Flux listenEndpointEvents(); Mono close(); } diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index 7c2f3b5b6..ba74c8f35 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -7,7 +7,6 @@ import io.scalecube.cluster.Member; import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.discovery.api.DiscoveryConfig; -import io.scalecube.services.discovery.api.DiscoveryEvent; import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.transport.Address; @@ -16,9 +15,6 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; public class ScalecubeServiceDiscovery implements ServiceDiscovery { @@ -35,9 +31,6 @@ public class ScalecubeServiceDiscovery implements ServiceDiscovery { private ServiceEndpoint endpoint; - private final DirectProcessor subject = DirectProcessor.create(); - private final FluxSink sink = subject.serialize().sink(); - private enum DiscoveryType { ADDED, REMOVED, @@ -78,22 +71,12 @@ public Mono start(DiscoveryConfig config) { return Mono.fromFuture(promise).map(mapper -> this); } - @Override - public Flux listen() { - return Flux.fromIterable(serviceRegistry.listServiceEndpoints()) - .map(DiscoveryEvent::registered) - .concatWith(subject); - } - @Override public Mono shutdown() { return Mono.defer( - () -> { - sink.complete(); - return Optional.ofNullable(cluster) - .map(cluster1 -> Mono.fromFuture(cluster1.shutdown())) - .orElse(Mono.empty()); - }); + () -> Optional.ofNullable(cluster) + .map(cluster1 -> Mono.fromFuture(cluster1.shutdown())) + .orElse(Mono.empty())); } private void configure(DiscoveryConfig config) { @@ -135,9 +118,7 @@ private void loadClusterServices(Cluster cluster) { cluster .otherMembers() .forEach( - member -> { - loadMemberServices(DiscoveryType.DISCOVERED, member); - }); + member -> loadMemberServices(DiscoveryType.DISCOVERED, member)); } private void loadMemberServices(DiscoveryType type, Member member) { @@ -156,27 +137,16 @@ private void loadMemberServices(DiscoveryType type, Member member) { LOGGER.debug("Member: {} is {} : {}", member, type, serviceEndpoint); if ((type.equals(DiscoveryType.ADDED) || type.equals(DiscoveryType.DISCOVERED)) && (this.serviceRegistry.registerService(serviceEndpoint))) { - LOGGER.info( "Service Reference was ADDED since new Member has joined the cluster {} : {}", member, serviceEndpoint); - - DiscoveryEvent registrationEvent = DiscoveryEvent.registered(serviceEndpoint); - LOGGER.debug("Publish registered: " + registrationEvent); - sink.next(registrationEvent); - } else if (type.equals(DiscoveryType.REMOVED) && (this.serviceRegistry.unregisterService(serviceEndpoint.id()) != null)) { - LOGGER.info( "Service Reference was REMOVED since Member have left the cluster {} : {}", member, serviceEndpoint); - - DiscoveryEvent registrationEvent = DiscoveryEvent.unregistered(serviceEndpoint); - LOGGER.debug("Publish unregistered: " + registrationEvent); - sink.next(registrationEvent); } }); } diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 5eed8fb11..0e62143a4 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -393,8 +393,12 @@ public Map gatewayAddresses() { return gatewayBootstrap.gatewayAddresses(); } - public ServiceDiscovery discovery() { - return this.discovery; + public Address address() { + return this.discovery.address(); + } + + public ServiceEndpoint serviceEndpoint() { + return this.discovery.endpoint(); } /** diff --git a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java index 1a7e98c88..1b071e0e0 100644 --- a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java +++ b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java @@ -3,7 +3,8 @@ import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.registry.api.RegistryEvent; +import io.scalecube.services.registry.api.EndpointRegistryEvent; +import io.scalecube.services.registry.api.ReferenceRegistryEvent; import io.scalecube.services.registry.api.ServiceRegistry; import java.util.ArrayList; import java.util.Collection; @@ -28,9 +29,15 @@ public class ServiceRegistryImpl implements ServiceRegistry { private final Map> referencesByQualifier = new NonBlockingHashMap<>(); - private final FluxProcessor events = DirectProcessor.create(); + private final FluxProcessor referenceEvents = + DirectProcessor.create(); + private final FluxSink referenceEventSink = + referenceEvents.serialize().sink(); - private final FluxSink eventSink = events.serialize().sink(); + private final FluxProcessor endpointEvents = + DirectProcessor.create(); + private final FluxSink endpointEventSink = + endpointEvents.serialize().sink(); @Override public List listServiceEndpoints() { @@ -77,7 +84,9 @@ public boolean registerService(ServiceEndpoint serviceEndpoint) { .computeIfAbsent(sr.qualifier(), key -> new CopyOnWriteArrayList<>()) .add(sr)); - serviceReferences.forEach(sr -> eventSink.next(RegistryEvent.createAdded(sr))); + serviceReferences.forEach( + sr -> referenceEventSink.next(ReferenceRegistryEvent.createAdded(sr))); + endpointEventSink.next(EndpointRegistryEvent.createAdded(serviceEndpoint)); } return success; } @@ -107,7 +116,8 @@ public ServiceEndpoint unregisterService(String endpointId) { serviceReferencesOfEndpoint .values() - .forEach(sr -> eventSink.next(RegistryEvent.createRemoved(sr))); + .forEach(sr -> referenceEventSink.next(ReferenceRegistryEvent.createRemoved(sr))); + endpointEventSink.next(EndpointRegistryEvent.createRemoved(serviceEndpoint)); } return serviceEndpoint; @@ -118,22 +128,36 @@ Stream serviceReferenceStream() { } /** - * Listen on service registry events. + * Listen on service reference registry events. * * @return flux object */ - public Flux listen() { + @Override + public Flux listenReferenceEvents() { return Flux.fromIterable(referencesByQualifier.values()) .flatMap(Flux::fromIterable) - .map(RegistryEvent::createAdded) - .concatWith(events); + .map(ReferenceRegistryEvent::createAdded) + .concatWith(referenceEvents); + } + + /** + * Listen on service endpoint registry events. + * + * @return flux object + */ + @Override + public Flux listenEndpointEvents() { + return Flux.fromIterable(serviceEndpoints.values()) + .map(EndpointRegistryEvent::createAdded) + .concatWith(endpointEvents); } @Override public Mono close() { return Mono.create( sink -> { - eventSink.complete(); + referenceEventSink.complete(); + endpointEventSink.complete(); sink.success(); }); } diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java index 6e86e6511..99bcc69b4 100644 --- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java +++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java @@ -33,7 +33,7 @@ public static void initNodes() { consumer = Microservices.builder() .discoveryPort(port.incrementAndGet()) - .seeds(provider.discovery().address()) + .seeds(provider.address()) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index a969094dc..0197669bd 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -57,7 +57,7 @@ public static void tearDown() { private static Microservices serviceProvider() { return Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(new GreetingServiceImpl()) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java index 80e97e6b2..cc71582d3 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java @@ -1,10 +1,10 @@ package io.scalecube.services; -import static io.scalecube.services.discovery.api.DiscoveryEvent.Type.REGISTERED; -import static io.scalecube.services.discovery.api.DiscoveryEvent.Type.UNREGISTERED; +import static io.scalecube.services.registry.api.RegistryEventType.ADDED; +import static io.scalecube.services.registry.api.RegistryEventType.REMOVED; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.scalecube.services.discovery.api.DiscoveryEvent; +import io.scalecube.services.registry.api.EndpointRegistryEvent; import io.scalecube.services.sut.GreetingServiceImpl; import java.time.Duration; import java.util.ArrayList; @@ -17,31 +17,31 @@ public class ServiceRegistryEventsTest { @Test public void test_added_removed_registration_events() { - List events = new ArrayList<>(); + List events = new ArrayList<>(); Microservices seed = Microservices.builder().startAwait(); - seed.discovery().listen().subscribe(events::add); + seed.serviceRegistry().listenEndpointEvents().subscribe(events::add); Microservices ms1 = Microservices.builder() - .seeds(seed.discovery().address()) + .seeds(seed.address()) .services(new GreetingServiceImpl()) .startAwait(); Microservices ms2 = Microservices.builder() - .seeds(seed.discovery().address()) + .seeds(seed.address()) .services(new GreetingServiceImpl()) .startAwait(); Mono.when(ms1.shutdown(), ms2.shutdown()).block(Duration.ofSeconds(6)); assertEquals(4, events.size()); - assertEquals(REGISTERED, events.get(0).type()); - assertEquals(REGISTERED, events.get(1).type()); - assertEquals(UNREGISTERED, events.get(2).type()); - assertEquals(UNREGISTERED, events.get(3).type()); + assertEquals(ADDED, events.get(0).type()); + assertEquals(ADDED, events.get(1).type()); + assertEquals(REMOVED, events.get(2).type()); + assertEquals(REMOVED, events.get(3).type()); seed.shutdown().block(Duration.ofSeconds(6)); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index 18805353b..b634f9b8a 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -56,7 +56,7 @@ private static Microservices gateway() { private static Microservices serviceProvider() { return Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(new GreetingServiceImpl()) .startAwait(); } @@ -187,7 +187,7 @@ public void test_remote_serviceA_calls_serviceB_using_setter() { // noinspection unused Microservices provider = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(new CoarseGrainedServiceImpl()) // add service a and b .startAwait(); @@ -208,7 +208,7 @@ public void test_remote_serviceA_calls_serviceB() { // Create microservices instance cluster. // noinspection unused Microservices provider = - Microservices.builder().seeds(gateway.discovery().address()).services(another).startAwait(); + Microservices.builder().seeds(gateway.address()).services(another).startAwait(); // Get a proxy to the service api. CoarseGrainedService service = gateway.call().create().api(CoarseGrainedService.class); @@ -225,7 +225,7 @@ public void test_remote_serviceA_calls_serviceB_with_timeout() { // Create microservices instance cluster. Microservices ms = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(another) // add service a and b .startAwait(); @@ -249,7 +249,7 @@ public void test_remote_serviceA_calls_serviceB_with_dispatcher() throws Excepti // Create microservices instance cluster. Microservices provider = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(another) // add service a and b .startAwait(); @@ -332,7 +332,7 @@ public void test_services_contribute_to_cluster_metadata() { Microservices ms = Microservices.builder().tags(tags).services(new GreetingServiceImpl()).startAwait(); - assertTrue(ms.discovery().endpoint().tags().containsKey("HOSTNAME")); + assertTrue(ms.serviceEndpoint().tags().containsKey("HOSTNAME")); } @Test diff --git a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java index 6ec4e3a4c..9c50e3874 100644 --- a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java @@ -4,8 +4,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.discovery.api.DiscoveryEvent; import io.scalecube.services.exceptions.ConnectionClosedException; +import io.scalecube.services.registry.api.EndpointRegistryEvent; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; import java.time.Duration; @@ -39,7 +39,7 @@ public void setUp() { serviceNode = Microservices.builder() .discoveryPort(port.incrementAndGet()) - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(new SimpleQuoteService()) .startAwait(); } @@ -72,9 +72,9 @@ public void test_remote_node_died_mono_never() throws Exception { sub1.set(serviceCall.requestOne(JUST_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway - .discovery() - .listen() - .filter(DiscoveryEvent::isUnregistered) + .serviceRegistry() + .listenEndpointEvents() + .filter(EndpointRegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -101,9 +101,9 @@ public void test_remote_node_died_many_never() throws Exception { sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway - .discovery() - .listen() - .filter(DiscoveryEvent::isUnregistered) + .serviceRegistry() + .listenEndpointEvents() + .filter(EndpointRegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -134,9 +134,9 @@ public void test_remote_node_died_many_then_never() throws Exception { .subscribe()); gateway - .discovery() - .listen() - .filter(DiscoveryEvent::isUnregistered) + .serviceRegistry() + .listenEndpointEvents() + .filter(EndpointRegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index 204296377..73209ab57 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -27,7 +27,7 @@ public static void setup() { node = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services(new SimpleQuoteService()) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index a9b7f810c..39fefd21a 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -47,7 +47,7 @@ public static void setup() { // Create microservices instance cluster. provider1 = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(1)) .tag("ONLYFOR", "joe") @@ -61,7 +61,7 @@ public static void setup() { // Create microservices instance cluster. provider2 = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(2)) .tag("ONLYFOR", "fransin") diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java index 70d340e65..48a15219d 100644 --- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java +++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java @@ -16,7 +16,7 @@ public static void main(String[] args) { Microservices services1 = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) .tag("Weight", "0.3") @@ -25,7 +25,7 @@ public static void main(String[] args) { Microservices services2 = Microservices.builder() - .seeds(gateway.discovery().address()) + .seeds(gateway.address()) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) .tag("Weight", "0.7") From 5a7901dddb6efb6a5922139714aa26fa933518fc Mon Sep 17 00:00:00 2001 From: segabriel Date: Sun, 16 Sep 2018 16:44:17 +0300 Subject: [PATCH 2/4] Fixed merging issues --- .../services/ServicesBenchmarksState.java | 4 ++-- .../scalecube/examples/BootstrapExample.java | 6 ++--- .../examples/helloworld/Example1.java | 2 +- .../examples/helloworld/Example2.java | 2 +- .../examples/orderbook/Example1.java | 2 +- .../io/scalecube/services/Microservices.java | 1 + .../io/scalecube/services/ErrorFlowTest.java | 2 +- .../services/ServiceCallRemoteTest.java | 2 +- .../services/ServiceRegistryEventsTest.java | 22 ++++++++--------- .../scalecube/services/ServiceRemoteTest.java | 12 +++++----- .../services/ServiceTransportTest.java | 24 +++++++++---------- .../services/StreamingServiceTest.java | 2 +- .../services/routings/RoutersTest.java | 4 ++-- .../services/routings/ServiceTagsExample.java | 4 ++-- 14 files changed, 44 insertions(+), 45 deletions(-) diff --git a/benchmarks/src/main/java/io/scalecube/services/benchmarks/services/ServicesBenchmarksState.java b/benchmarks/src/main/java/io/scalecube/services/benchmarks/services/ServicesBenchmarksState.java index fa365269c..c6e6c0758 100644 --- a/benchmarks/src/main/java/io/scalecube/services/benchmarks/services/ServicesBenchmarksState.java +++ b/benchmarks/src/main/java/io/scalecube/services/benchmarks/services/ServicesBenchmarksState.java @@ -32,13 +32,13 @@ public void beforeAll() { node = Microservices.builder() .metrics(registry()) - .discovery(options -> options.seeds(seed.discovery().address())) + .discovery(options -> options.seeds(seed.address())) .services(services) .startAwait(); LOGGER.info( "Seed address: " - + seed.discovery().address() + + seed.address() + ", services address: " + node.serviceAddress() + ", seed serviceRegistry: " diff --git a/examples/src/main/java/io/scalecube/examples/BootstrapExample.java b/examples/src/main/java/io/scalecube/examples/BootstrapExample.java index db1203a00..83e35d288 100644 --- a/examples/src/main/java/io/scalecube/examples/BootstrapExample.java +++ b/examples/src/main/java/io/scalecube/examples/BootstrapExample.java @@ -44,7 +44,7 @@ public static void main(String[] args) throws Exception { System.out.println("Start HelloWorldService with BusinessLogicFacade"); final Microservices node1 = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services( call -> Collections.singletonList( @@ -57,14 +57,14 @@ public static void main(String[] args) throws Exception { System.out.println("Start ServiceHello"); final Microservices node2 = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(new ServiceHelloImpl()) .startAwait(); System.out.println("Start ServiceWorld"); final Microservices node3 = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(new ServiceWorldImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java b/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java index 8496cc112..d09251b41 100644 --- a/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java +++ b/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java @@ -26,7 +26,7 @@ public static void main(String[] args) { // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices microservices = Microservices.builder() - .discovery(options -> options.seeds(seed.discovery().address())) + .discovery(options -> options.seeds(seed.address())) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java b/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java index bb7bd69c9..004bab791 100644 --- a/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java +++ b/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java @@ -34,7 +34,7 @@ public static void main(String[] args) { // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices microservices = Microservices.builder() - .discovery(options -> options.seeds(seed.discovery().address())) + .discovery(options -> options.seeds(seed.address())) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java b/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java index 9ce072fe8..27fb64bae 100644 --- a/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java +++ b/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java @@ -33,7 +33,7 @@ public static void main(String[] args) throws InterruptedException { Microservices ms = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(new DefaultMarketDataService()) .startAwait(); diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index c85794e22..f9e40c1ef 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -17,6 +17,7 @@ import io.scalecube.services.transport.api.ServerTransport; import io.scalecube.services.transport.api.ServiceTransport; import io.scalecube.services.transport.api.WorkerThreadChooser; +import io.scalecube.transport.Address; import io.scalecube.transport.Addressing; import java.net.InetSocketAddress; import java.util.ArrayList; diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java index 8909237c9..de26a32b9 100644 --- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java +++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java @@ -35,7 +35,7 @@ public static void initNodes() { Microservices.builder() .discovery( options -> - options.seeds(provider.discovery().address()).port(port.incrementAndGet())) + options.seeds(provider.address()).port(port.incrementAndGet())) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index e6539f2ed..59c450edd 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -61,7 +61,7 @@ public static void tearDown() { private static Microservices serviceProvider() { return Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(new GreetingServiceImpl()) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java index 51a62c63a..993426df3 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java @@ -1,10 +1,10 @@ package io.scalecube.services; -import static io.scalecube.services.discovery.api.DiscoveryEvent.Type.REGISTERED; -import static io.scalecube.services.discovery.api.DiscoveryEvent.Type.UNREGISTERED; +import static io.scalecube.services.registry.api.RegistryEventType.ADDED; +import static io.scalecube.services.registry.api.RegistryEventType.REMOVED; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.scalecube.services.discovery.api.DiscoveryEvent; +import io.scalecube.services.registry.api.EndpointRegistryEvent; import io.scalecube.services.sut.GreetingServiceImpl; import java.time.Duration; import java.util.ArrayList; @@ -17,31 +17,31 @@ public class ServiceRegistryEventsTest { @Test public void test_added_removed_registration_events() { - List events = new ArrayList<>(); + List events = new ArrayList<>(); Microservices seed = Microservices.builder().startAwait(); - seed.discovery().listen().subscribe(events::add); + seed.serviceRegistry().listenEndpointEvents().subscribe(events::add); Microservices ms1 = Microservices.builder() - .discovery(options -> options.seeds(seed.discovery().address())) + .discovery(options -> options.seeds(seed.address())) .services(new GreetingServiceImpl()) .startAwait(); Microservices ms2 = Microservices.builder() - .discovery(options -> options.seeds(seed.discovery().address())) + .discovery(options -> options.seeds(seed.address())) .services(new GreetingServiceImpl()) .startAwait(); Mono.when(ms1.shutdown(), ms2.shutdown()).block(Duration.ofSeconds(6)); assertEquals(4, events.size()); - assertEquals(REGISTERED, events.get(0).type()); - assertEquals(REGISTERED, events.get(1).type()); - assertEquals(UNREGISTERED, events.get(2).type()); - assertEquals(UNREGISTERED, events.get(3).type()); + assertEquals(ADDED, events.get(0).type()); + assertEquals(ADDED, events.get(1).type()); + assertEquals(REMOVED, events.get(2).type()); + assertEquals(REMOVED, events.get(3).type()); seed.shutdown().block(Duration.ofSeconds(6)); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index e236e1e5e..a19707f47 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -60,7 +60,7 @@ private static Microservices gateway() { private static Microservices serviceProvider() { return Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(new GreetingServiceImpl()) .startAwait(); } @@ -191,7 +191,7 @@ public void test_remote_serviceA_calls_serviceB_using_setter() { // noinspection unused Microservices provider = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(new CoarseGrainedServiceImpl()) // add service a and b .startAwait(); @@ -213,7 +213,7 @@ public void test_remote_serviceA_calls_serviceB() { // noinspection unused Microservices provider = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(another) .startAwait(); @@ -232,7 +232,7 @@ public void test_remote_serviceA_calls_serviceB_with_timeout() { // Create microservices instance cluster. Microservices ms = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(another) // add service a and b .startAwait(); @@ -256,7 +256,7 @@ public void test_remote_serviceA_calls_serviceB_with_dispatcher() throws Excepti // Create microservices instance cluster. Microservices provider = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(another) // add service a and b .startAwait(); @@ -340,7 +340,7 @@ public void test_services_contribute_to_cluster_metadata() { Microservices ms = Microservices.builder().tags(tags).services(new GreetingServiceImpl()).startAwait(); - assertTrue(ms.discovery().endpoint().tags().containsKey("HOSTNAME")); + assertTrue(ms.serviceEndpoint().tags().containsKey("HOSTNAME")); } @Test diff --git a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java index 1ce839695..8e257ecfe 100644 --- a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java @@ -4,8 +4,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.discovery.api.DiscoveryEvent; import io.scalecube.services.exceptions.ConnectionClosedException; +import io.scalecube.services.registry.api.EndpointRegistryEvent; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; import java.time.Duration; @@ -42,9 +42,7 @@ public void setUp() { serviceNode = Microservices.builder() - .discovery( - options -> - options.seeds(gateway.discovery().address()).port(port.incrementAndGet())) + .discovery(options -> options.seeds(gateway.address()).port(port.incrementAndGet())) .services(new SimpleQuoteService()) .startAwait(); } @@ -80,9 +78,9 @@ public void test_remote_node_died_mono_never() throws Exception { sub1.set(serviceCall.requestOne(JUST_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway - .discovery() - .listen() - .filter(DiscoveryEvent::isUnregistered) + .serviceRegistry() + .listenEndpointEvents() + .filter(EndpointRegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -109,9 +107,9 @@ public void test_remote_node_died_many_never() throws Exception { sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway - .discovery() - .listen() - .filter(DiscoveryEvent::isUnregistered) + .serviceRegistry() + .listenEndpointEvents() + .filter(EndpointRegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -142,9 +140,9 @@ public void test_remote_node_died_many_then_never() throws Exception { .subscribe()); gateway - .discovery() - .listen() - .filter(DiscoveryEvent::isUnregistered) + .serviceRegistry() + .listenEndpointEvents() + .filter(EndpointRegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index 03fc888bc..97733ca2f 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -28,7 +28,7 @@ public static void setup() { node = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services(new SimpleQuoteService()) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index 86a220a72..f4d093fdd 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -48,7 +48,7 @@ public static void setup() { // Create microservices instance cluster. provider1 = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(1)) .tag("ONLYFOR", "joe") @@ -62,7 +62,7 @@ public static void setup() { // Create microservices instance cluster. provider2 = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(2)) .tag("ONLYFOR", "fransin") diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java index 7ee1503e9..5206aee9d 100644 --- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java +++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java @@ -21,7 +21,7 @@ public static void main(String[] args) { Microservices services1 = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) .tag("Weight", "0.3") @@ -30,7 +30,7 @@ public static void main(String[] args) { Microservices services2 = Microservices.builder() - .discovery(options -> options.seeds(gateway.discovery().address())) + .discovery(options -> options.seeds(gateway.address())) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) .tag("Weight", "0.7") From d3575421b1c9c072ae2f98a981f715964e65a0fd Mon Sep 17 00:00:00 2001 From: segabriel Date: Sun, 16 Sep 2018 22:51:06 +0300 Subject: [PATCH 3/4] Fixed review issues --- .../registry/api/EndpointRegistryEvent.java | 48 ------------ .../registry/api/ReferenceRegistryEvent.java | 48 ------------ .../services/registry/api/RegistryEvent.java | 56 ++++++++++++++ .../registry/api/RegistryEventType.java | 6 -- .../registry/api/ServiceRegistry.java | 5 +- .../registry/ServiceRegistryImpl.java | 76 ++++++++----------- .../services/ServiceRegistryEventsTest.java | 10 +-- .../services/ServiceTransportTest.java | 14 ++-- 8 files changed, 102 insertions(+), 161 deletions(-) delete mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/EndpointRegistryEvent.java delete mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/ReferenceRegistryEvent.java create mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java delete mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/RegistryEventType.java diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/EndpointRegistryEvent.java b/services-api/src/main/java/io/scalecube/services/registry/api/EndpointRegistryEvent.java deleted file mode 100644 index e56403559..000000000 --- a/services-api/src/main/java/io/scalecube/services/registry/api/EndpointRegistryEvent.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.scalecube.services.registry.api; - -import io.scalecube.services.ServiceEndpoint; -import io.scalecube.transport.Address; - -public class EndpointRegistryEvent { - - private ServiceEndpoint serviceEndpoint; - private RegistryEventType type; - - public static EndpointRegistryEvent createAdded(ServiceEndpoint serviceEndpoint) { - return new EndpointRegistryEvent(RegistryEventType.ADDED, serviceEndpoint); - } - - public static EndpointRegistryEvent createRemoved(ServiceEndpoint serviceEndpoint) { - return new EndpointRegistryEvent(RegistryEventType.REMOVED, serviceEndpoint); - } - - private EndpointRegistryEvent(RegistryEventType type, ServiceEndpoint serviceEndpoint) { - this.serviceEndpoint = serviceEndpoint; - this.type = type; - } - - private EndpointRegistryEvent(EndpointRegistryEvent e) { - this.serviceEndpoint = e.serviceEndpoint; - this.type = e.type; - } - - public ServiceEndpoint serviceEndpoint() { - return this.serviceEndpoint; - } - - public boolean isAdded() { - return RegistryEventType.ADDED.equals(type); - } - - public boolean isRemoved() { - return RegistryEventType.REMOVED.equals(type); - } - - public RegistryEventType type() { - return this.type; - } - - public Address address() { - return Address.create(this.serviceEndpoint.host(), this.serviceEndpoint.port()); - } -} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/ReferenceRegistryEvent.java b/services-api/src/main/java/io/scalecube/services/registry/api/ReferenceRegistryEvent.java deleted file mode 100644 index 1662db678..000000000 --- a/services-api/src/main/java/io/scalecube/services/registry/api/ReferenceRegistryEvent.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.scalecube.services.registry.api; - -import io.scalecube.services.ServiceReference; -import io.scalecube.transport.Address; - -public class ReferenceRegistryEvent { - - private ServiceReference serviceReference; - private RegistryEventType type; - - public static ReferenceRegistryEvent createAdded(ServiceReference serviceReference) { - return new ReferenceRegistryEvent(RegistryEventType.ADDED, serviceReference); - } - - public static ReferenceRegistryEvent createRemoved(ServiceReference serviceReference) { - return new ReferenceRegistryEvent(RegistryEventType.REMOVED, serviceReference); - } - - private ReferenceRegistryEvent(RegistryEventType type, ServiceReference serviceReference) { - this.serviceReference = serviceReference; - this.type = type; - } - - private ReferenceRegistryEvent(ReferenceRegistryEvent e) { - this.serviceReference = e.serviceReference; - this.type = e.type; - } - - public ServiceReference serviceReference() { - return this.serviceReference; - } - - public boolean isAdded() { - return RegistryEventType.ADDED.equals(type); - } - - public boolean isRemoved() { - return RegistryEventType.REMOVED.equals(type); - } - - public RegistryEventType type() { - return this.type; - } - - public Address address() { - return Address.create(this.serviceReference.host(), this.serviceReference.port()); - } -} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java b/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java new file mode 100644 index 000000000..ded06d1e5 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java @@ -0,0 +1,56 @@ +package io.scalecube.services.registry.api; + +import io.scalecube.services.ServiceEndpoint; +import io.scalecube.services.ServiceReference; +import java.util.function.Predicate; + +public interface RegistryEvent { + + enum Type { + ADDED, + REMOVED; + } + + static Predicate> asReference() { + return registryEvent -> + ((RegistryEvent) registryEvent).value().getClass().equals(ServiceReference.class); + } + + static Predicate> asEndpoint() { + return registryEvent -> + ((RegistryEvent) registryEvent).value().getClass().equals(ServiceEndpoint.class); + } + + /** + * Creates a registry event instance by the given args. + * + * @param type registry event type {@link Type} + * @param value value + * @return registry event instance + */ + static RegistryEvent create(RegistryEvent.Type type, T value) { + return new RegistryEvent() { + @Override + public RegistryEvent.Type type() { + return type; + } + + @Override + public T value() { + return value; + } + }; + } + + RegistryEvent.Type type(); + + T value(); + + default boolean isAdded() { + return RegistryEvent.Type.ADDED.equals(type()); + } + + default boolean isRemoved() { + return RegistryEvent.Type.REMOVED.equals(type()); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEventType.java b/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEventType.java deleted file mode 100644 index e97fe98d1..000000000 --- a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEventType.java +++ /dev/null @@ -1,6 +0,0 @@ -package io.scalecube.services.registry.api; - -public enum RegistryEventType { - ADDED, - REMOVED; -} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java index 5e62477ab..1cf59891b 100644 --- a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java +++ b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java @@ -4,6 +4,7 @@ import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; import java.util.List; +import java.util.function.Predicate; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -23,9 +24,7 @@ public interface ServiceRegistry { ServiceEndpoint unregisterService(String endpointId); - Flux listenReferenceEvents(); - - Flux listenEndpointEvents(); + Flux> listen(Predicate> predicate); Mono close(); } diff --git a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java index 1b071e0e0..2780c01f8 100644 --- a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java +++ b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java @@ -1,10 +1,12 @@ package io.scalecube.services.registry; +import static io.scalecube.services.registry.api.RegistryEvent.Type.ADDED; +import static io.scalecube.services.registry.api.RegistryEvent.Type.REMOVED; + import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.registry.api.EndpointRegistryEvent; -import io.scalecube.services.registry.api.ReferenceRegistryEvent; +import io.scalecube.services.registry.api.RegistryEvent; import io.scalecube.services.registry.api.ServiceRegistry; import java.util.ArrayList; import java.util.Collection; @@ -13,6 +15,7 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.jctools.maps.NonBlockingHashMap; @@ -29,15 +32,9 @@ public class ServiceRegistryImpl implements ServiceRegistry { private final Map> referencesByQualifier = new NonBlockingHashMap<>(); - private final FluxProcessor referenceEvents = - DirectProcessor.create(); - private final FluxSink referenceEventSink = - referenceEvents.serialize().sink(); - - private final FluxProcessor endpointEvents = + private final FluxProcessor registryEvents = DirectProcessor.create(); - private final FluxSink endpointEventSink = - endpointEvents.serialize().sink(); + private final FluxSink registryEventSink = registryEvents.serialize().sink(); @Override public List listServiceEndpoints() { @@ -47,7 +44,11 @@ public List listServiceEndpoints() { @Override public List listServiceReferences() { - return serviceReferenceStream().collect(Collectors.toList()); + return referencesByQualifier + .values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); } @Override @@ -84,9 +85,8 @@ public boolean registerService(ServiceEndpoint serviceEndpoint) { .computeIfAbsent(sr.qualifier(), key -> new CopyOnWriteArrayList<>()) .add(sr)); - serviceReferences.forEach( - sr -> referenceEventSink.next(ReferenceRegistryEvent.createAdded(sr))); - endpointEventSink.next(EndpointRegistryEvent.createAdded(serviceEndpoint)); + serviceReferences.forEach(sr -> registryEventSink.next(RegistryEvent.create(ADDED, sr))); + registryEventSink.next(RegistryEvent.create(ADDED, serviceEndpoint)); } return success; } @@ -116,48 +116,36 @@ public ServiceEndpoint unregisterService(String endpointId) { serviceReferencesOfEndpoint .values() - .forEach(sr -> referenceEventSink.next(ReferenceRegistryEvent.createRemoved(sr))); - endpointEventSink.next(EndpointRegistryEvent.createRemoved(serviceEndpoint)); + .forEach(sr -> registryEventSink.next(RegistryEvent.create(REMOVED, sr))); + registryEventSink.next(RegistryEvent.create(REMOVED, serviceEndpoint)); } return serviceEndpoint; } - Stream serviceReferenceStream() { - return referencesByQualifier.values().stream().flatMap(Collection::stream); - } - - /** - * Listen on service reference registry events. - * - * @return flux object - */ - @Override - public Flux listenReferenceEvents() { - return Flux.fromIterable(referencesByQualifier.values()) - .flatMap(Flux::fromIterable) - .map(ReferenceRegistryEvent::createAdded) - .concatWith(referenceEvents); - } - - /** - * Listen on service endpoint registry events. - * - * @return flux object - */ @Override - public Flux listenEndpointEvents() { - return Flux.fromIterable(serviceEndpoints.values()) - .map(EndpointRegistryEvent::createAdded) - .concatWith(endpointEvents); + public Flux> listen(Predicate> predicate) { + //noinspection unchecked + return Flux.fromStream( + Stream.concat( + referencesByQualifier + .values() + .stream() + .flatMap(Collection::stream) + .map(sr -> (RegistryEvent) RegistryEvent.create(ADDED, sr)), + serviceEndpoints + .values() + .stream() + .map(se -> (RegistryEvent) RegistryEvent.create(ADDED, se)))) + .concatWith(registryEvents) + .filter((Predicate) predicate); } @Override public Mono close() { return Mono.create( sink -> { - referenceEventSink.complete(); - endpointEventSink.complete(); + registryEventSink.complete(); sink.success(); }); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java index 993426df3..b082d3b68 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java @@ -1,10 +1,10 @@ package io.scalecube.services; -import static io.scalecube.services.registry.api.RegistryEventType.ADDED; -import static io.scalecube.services.registry.api.RegistryEventType.REMOVED; +import static io.scalecube.services.registry.api.RegistryEvent.Type.ADDED; +import static io.scalecube.services.registry.api.RegistryEvent.Type.REMOVED; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.scalecube.services.registry.api.EndpointRegistryEvent; +import io.scalecube.services.registry.api.RegistryEvent; import io.scalecube.services.sut.GreetingServiceImpl; import java.time.Duration; import java.util.ArrayList; @@ -17,11 +17,11 @@ public class ServiceRegistryEventsTest { @Test public void test_added_removed_registration_events() { - List events = new ArrayList<>(); + List> events = new ArrayList<>(); Microservices seed = Microservices.builder().startAwait(); - seed.serviceRegistry().listenEndpointEvents().subscribe(events::add); + seed.serviceRegistry().listen(RegistryEvent.asEndpoint()).subscribe(events::add); Microservices ms1 = Microservices.builder() diff --git a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java index 8e257ecfe..402b98eeb 100644 --- a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java @@ -5,7 +5,7 @@ import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.exceptions.ConnectionClosedException; -import io.scalecube.services.registry.api.EndpointRegistryEvent; +import io.scalecube.services.registry.api.RegistryEvent; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; import java.time.Duration; @@ -79,8 +79,8 @@ public void test_remote_node_died_mono_never() throws Exception { gateway .serviceRegistry() - .listenEndpointEvents() - .filter(EndpointRegistryEvent::isRemoved) + .listen(RegistryEvent.asEndpoint()) + .filter(RegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -108,8 +108,8 @@ public void test_remote_node_died_many_never() throws Exception { gateway .serviceRegistry() - .listenEndpointEvents() - .filter(EndpointRegistryEvent::isRemoved) + .listen(RegistryEvent.asEndpoint()) + .filter(RegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -141,8 +141,8 @@ public void test_remote_node_died_many_then_never() throws Exception { gateway .serviceRegistry() - .listenEndpointEvents() - .filter(EndpointRegistryEvent::isRemoved) + .listen(RegistryEvent.asEndpoint()) + .filter(RegistryEvent::isRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down From af1b82ffd4d1977d9b32dcb60292a8af730b6f13 Mon Sep 17 00:00:00 2001 From: artem-v Date: Tue, 18 Sep 2018 18:35:27 +0300 Subject: [PATCH 4/4] CR2, cleanup; reviewed with Sergei G --- .../service/BenchmarkServiceState.java | 8 +- .../scalecube/examples/BootstrapExample.java | 6 +- .../examples/helloworld/Example1.java | 2 +- .../examples/helloworld/Example2.java | 2 +- .../examples/orderbook/Example1.java | 2 +- .../scalecube/services/ServiceEndpoint.java | 18 +++++ .../discovery/api/ServiceDiscovery.java | 5 +- ...onfig.java => ServiceDiscoveryConfig.java} | 23 +++++- .../discovery/api/ServiceDiscoveryEvent.java | 57 ++++++++++++++ .../services/registry/api/RegistryEvent.java | 56 -------------- .../registry/api/ServiceRegistry.java | 7 -- .../discovery/ScalecubeServiceDiscovery.java | 55 +++++++++----- .../io/scalecube/services/Microservices.java | 30 +++----- .../registry/ServiceRegistryImpl.java | 75 ++----------------- .../io/scalecube/services/ErrorFlowTest.java | 2 +- .../services/ServiceCallRemoteTest.java | 2 +- .../services/ServiceRegistryEventsTest.java | 21 +++--- .../scalecube/services/ServiceRemoteTest.java | 12 +-- .../services/ServiceTransportTest.java | 24 +++--- .../services/StreamingServiceTest.java | 2 +- .../services/routings/RoutersTest.java | 4 +- .../services/routings/ServiceTagsExample.java | 4 +- 22 files changed, 198 insertions(+), 219 deletions(-) rename services-api/src/main/java/io/scalecube/services/discovery/api/{DiscoveryConfig.java => ServiceDiscoveryConfig.java} (79%) create mode 100644 services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryEvent.java delete mode 100644 services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java diff --git a/benchmarks/src/main/java/io/scalecube/services/benchmarks/service/BenchmarkServiceState.java b/benchmarks/src/main/java/io/scalecube/services/benchmarks/service/BenchmarkServiceState.java index 4874f7680..c7008ed01 100644 --- a/benchmarks/src/main/java/io/scalecube/services/benchmarks/service/BenchmarkServiceState.java +++ b/benchmarks/src/main/java/io/scalecube/services/benchmarks/service/BenchmarkServiceState.java @@ -32,17 +32,15 @@ public void beforeAll() { node = Microservices.builder() .metrics(registry()) - .discovery(options -> options.seeds(seed.address())) + .discovery(options -> options.seeds(seed.discovery().address())) .services(services) .startAwait(); LOGGER.info( "Seed address: " - + seed.address() + + seed.discovery().address() + ", services address: " - + node.serviceAddress() - + ", seed serviceRegistry: " - + seed.serviceRegistry().listServiceReferences()); + + node.serviceAddress()); } @Override diff --git a/examples/src/main/java/io/scalecube/examples/BootstrapExample.java b/examples/src/main/java/io/scalecube/examples/BootstrapExample.java index 83e35d288..db1203a00 100644 --- a/examples/src/main/java/io/scalecube/examples/BootstrapExample.java +++ b/examples/src/main/java/io/scalecube/examples/BootstrapExample.java @@ -44,7 +44,7 @@ public static void main(String[] args) throws Exception { System.out.println("Start HelloWorldService with BusinessLogicFacade"); final Microservices node1 = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services( call -> Collections.singletonList( @@ -57,14 +57,14 @@ public static void main(String[] args) throws Exception { System.out.println("Start ServiceHello"); final Microservices node2 = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(new ServiceHelloImpl()) .startAwait(); System.out.println("Start ServiceWorld"); final Microservices node3 = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(new ServiceWorldImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java b/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java index d09251b41..8496cc112 100644 --- a/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java +++ b/examples/src/main/java/io/scalecube/examples/helloworld/Example1.java @@ -26,7 +26,7 @@ public static void main(String[] args) { // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices microservices = Microservices.builder() - .discovery(options -> options.seeds(seed.address())) + .discovery(options -> options.seeds(seed.discovery().address())) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java b/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java index 004bab791..bb7bd69c9 100644 --- a/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java +++ b/examples/src/main/java/io/scalecube/examples/helloworld/Example2.java @@ -34,7 +34,7 @@ public static void main(String[] args) { // Construct a ScaleCube node which joins the cluster hosting the Greeting Service Microservices microservices = Microservices.builder() - .discovery(options -> options.seeds(seed.address())) + .discovery(options -> options.seeds(seed.discovery().address())) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java b/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java index 27fb64bae..9ce072fe8 100644 --- a/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java +++ b/examples/src/main/java/io/scalecube/examples/orderbook/Example1.java @@ -33,7 +33,7 @@ public static void main(String[] args) throws InterruptedException { Microservices ms = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(new DefaultMarketDataService()) .startAwait(); diff --git a/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java b/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java index 3068e372b..7627eab05 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java @@ -3,6 +3,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class ServiceEndpoint { @@ -64,10 +65,27 @@ public Map tags() { return tags; } + /** + * Return collection of service registratrions. + * + * @return collection of {@link ServiceRegistration} + */ public Collection serviceRegistrations() { return serviceRegistrations; } + /** + * Creates collection of service references from this service endpoint. + * + * @return collection of {@link ServiceReference} + */ + public Collection serviceReferences() { + return serviceRegistrations + .stream() + .flatMap(sr -> sr.methods().stream().map(sm -> new ServiceReference(sm, sr, this))) + .collect(Collectors.toList()); + } + @Override public String toString() { return "ServiceEndpoint{" diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java index 9b11d3af5..1d7e8695e 100644 --- a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java +++ b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java @@ -4,6 +4,7 @@ import io.scalecube.services.ServiceLoaderUtil; import io.scalecube.transport.Address; import java.util.ServiceLoader; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface ServiceDiscovery { @@ -22,7 +23,9 @@ static ServiceDiscovery getDiscovery() { .orElseThrow(() -> new IllegalStateException("ServiceDiscovery not configured")); } - Mono start(DiscoveryConfig discoveryConfig); + Mono start(ServiceDiscoveryConfig config); Mono shutdown(); + + Flux listen(); } diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryConfig.java b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryConfig.java similarity index 79% rename from services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryConfig.java rename to services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryConfig.java index 0984c83f4..c530a68ad 100644 --- a/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryConfig.java +++ b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryConfig.java @@ -3,12 +3,13 @@ import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.transport.Address; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; -public class DiscoveryConfig { +public class ServiceDiscoveryConfig { private Integer port; private Address[] seeds; @@ -18,7 +19,7 @@ public class DiscoveryConfig { private String memberHost; private Integer memberPort; - private DiscoveryConfig(Builder builder) { + private ServiceDiscoveryConfig(Builder builder) { this.seeds = builder.seeds; this.serviceRegistry = builder.serviceRegistry; this.port = builder.port; @@ -99,8 +100,8 @@ public Builder serviceRegistry(ServiceRegistry serviceRegistry) { return this; } - public DiscoveryConfig build() { - return new DiscoveryConfig(this); + public ServiceDiscoveryConfig build() { + return new ServiceDiscoveryConfig(this); } public Builder tags(Map tags) { @@ -123,4 +124,18 @@ public Builder memberPort(Integer memberPort) { return this; } } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ServiceDiscoveryConfig{"); + sb.append("port=").append(port); + sb.append(", seeds=").append(Arrays.toString(seeds)); + sb.append(", serviceRegistry=").append(serviceRegistry); + sb.append(", tags=").append(tags); + sb.append(", endpoint=").append(endpoint); + sb.append(", memberHost='").append(memberHost).append('\''); + sb.append(", memberPort=").append(memberPort); + sb.append('}'); + return sb.toString(); + } } diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryEvent.java b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryEvent.java new file mode 100644 index 000000000..470f083c5 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryEvent.java @@ -0,0 +1,57 @@ +package io.scalecube.services.discovery.api; + +import io.scalecube.services.ServiceEndpoint; +import io.scalecube.services.registry.api.ServiceRegistry; + +/** + * Service registration event. This event is being fired when {@link ServiceEndpoint} is being added + * (or removed from) to (from) {@link ServiceRegistry}. + */ +public class ServiceDiscoveryEvent { + + public enum Type { + REGISTERED, // service endpoint added + UNREGISTERED // service endpoint removed + } + + private final ServiceEndpoint serviceEndpoint; + private final Type type; + + private ServiceDiscoveryEvent(ServiceEndpoint serviceEndpoint, Type type) { + this.serviceEndpoint = serviceEndpoint; + this.type = type; + } + + public static ServiceDiscoveryEvent registered(ServiceEndpoint serviceEndpoint) { + return new ServiceDiscoveryEvent(serviceEndpoint, Type.REGISTERED); + } + + public static ServiceDiscoveryEvent unregistered(ServiceEndpoint serviceEndpoint) { + return new ServiceDiscoveryEvent(serviceEndpoint, Type.UNREGISTERED); + } + + public ServiceEndpoint serviceEndpoint() { + return this.serviceEndpoint; + } + + public Type type() { + return this.type; + } + + public boolean isRegistered() { + return Type.REGISTERED.equals(this.type); + } + + public boolean isUnregistered() { + return Type.UNREGISTERED.equals(this.type); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ServiceDiscoveryEvent{"); + sb.append("serviceEndpoint=").append(serviceEndpoint); + sb.append(", type=").append(type); + sb.append('}'); + return sb.toString(); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java b/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java deleted file mode 100644 index ded06d1e5..000000000 --- a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java +++ /dev/null @@ -1,56 +0,0 @@ -package io.scalecube.services.registry.api; - -import io.scalecube.services.ServiceEndpoint; -import io.scalecube.services.ServiceReference; -import java.util.function.Predicate; - -public interface RegistryEvent { - - enum Type { - ADDED, - REMOVED; - } - - static Predicate> asReference() { - return registryEvent -> - ((RegistryEvent) registryEvent).value().getClass().equals(ServiceReference.class); - } - - static Predicate> asEndpoint() { - return registryEvent -> - ((RegistryEvent) registryEvent).value().getClass().equals(ServiceEndpoint.class); - } - - /** - * Creates a registry event instance by the given args. - * - * @param type registry event type {@link Type} - * @param value value - * @return registry event instance - */ - static RegistryEvent create(RegistryEvent.Type type, T value) { - return new RegistryEvent() { - @Override - public RegistryEvent.Type type() { - return type; - } - - @Override - public T value() { - return value; - } - }; - } - - RegistryEvent.Type type(); - - T value(); - - default boolean isAdded() { - return RegistryEvent.Type.ADDED.equals(type()); - } - - default boolean isRemoved() { - return RegistryEvent.Type.REMOVED.equals(type()); - } -} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java index 1cf59891b..6f4086fc7 100644 --- a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java +++ b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java @@ -4,9 +4,6 @@ import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; import java.util.List; -import java.util.function.Predicate; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * Service registry interface provides API to register/unregister services in the system and make @@ -23,8 +20,4 @@ public interface ServiceRegistry { boolean registerService(ServiceEndpoint serviceEndpoint); ServiceEndpoint unregisterService(String endpointId); - - Flux> listen(Predicate> predicate); - - Mono close(); } diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index 6d872e977..198a6f910 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -7,8 +7,9 @@ import io.scalecube.cluster.ClusterConfig.Builder; import io.scalecube.cluster.Member; import io.scalecube.services.ServiceEndpoint; -import io.scalecube.services.discovery.api.DiscoveryConfig; import io.scalecube.services.discovery.api.ServiceDiscovery; +import io.scalecube.services.discovery.api.ServiceDiscoveryConfig; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.transport.Address; import java.util.Optional; @@ -16,20 +17,24 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; public class ScalecubeServiceDiscovery implements ServiceDiscovery { - public static final String SERVICE_METADATA = "service"; - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); - private ServiceRegistry serviceRegistry; + public static final String SERVICE_METADATA = "service"; + private ServiceRegistry serviceRegistry; private Cluster cluster; - private ServiceEndpoint endpoint; + private final DirectProcessor subject = DirectProcessor.create(); + private final FluxSink sink = subject.serialize().sink(); + private enum DiscoveryType { ADDED, REMOVED, @@ -47,12 +52,12 @@ public ServiceEndpoint endpoint() { } @Override - public Mono start(DiscoveryConfig discoveryConfig) { - this.serviceRegistry = discoveryConfig.serviceRegistry(); - this.endpoint = discoveryConfig.endpoint(); + public Mono start(ServiceDiscoveryConfig config) { + this.serviceRegistry = config.serviceRegistry(); + this.endpoint = config.endpoint(); ClusterConfig clusterConfig = - clusterConfigBuilder(discoveryConfig) + clusterConfigBuilder(config) .addMetadata( this.serviceRegistry .listServiceEndpoints() @@ -77,15 +82,25 @@ public Mono start(DiscoveryConfig discoveryConfig) { return Mono.fromFuture(promise).map(mapper -> this); } + @Override + public Flux listen() { + return Flux.fromIterable(serviceRegistry.listServiceEndpoints()) + .map(ServiceDiscoveryEvent::registered) + .concatWith(subject); + } + @Override public Mono shutdown() { return Mono.defer( - () -> Optional.ofNullable(cluster) - .map(cluster1 -> Mono.fromFuture(cluster1.shutdown())) - .orElse(Mono.empty())); + () -> { + sink.complete(); + return Optional.ofNullable(cluster) + .map(cluster1 -> Mono.fromFuture(cluster1.shutdown())) + .orElse(Mono.empty()); + }); } - private ClusterConfig.Builder clusterConfigBuilder(DiscoveryConfig config) { + private ClusterConfig.Builder clusterConfigBuilder(ServiceDiscoveryConfig config) { Builder builder = ClusterConfig.builder(); if (config.seeds() != null) { builder.seedMembers(config.seeds()); @@ -124,10 +139,7 @@ private void listenCluster(Cluster cluster) { } private void loadClusterServices(Cluster cluster) { - cluster - .otherMembers() - .forEach( - member -> loadMemberServices(DiscoveryType.DISCOVERED, member)); + cluster.otherMembers().forEach(member -> loadMemberServices(DiscoveryType.DISCOVERED, member)); } private void loadMemberServices(DiscoveryType type, Member member) { @@ -150,12 +162,21 @@ private void loadMemberServices(DiscoveryType type, Member member) { "Service Reference was ADDED since new Member has joined the cluster {} : {}", member, serviceEndpoint); + + ServiceDiscoveryEvent event = ServiceDiscoveryEvent.registered(serviceEndpoint); + LOGGER.debug("Publish registered: " + event); + sink.next(event); + } else if (type.equals(DiscoveryType.REMOVED) && (this.serviceRegistry.unregisterService(serviceEndpoint.id()) != null)) { LOGGER.info( "Service Reference was REMOVED since Member have left the cluster {} : {}", member, serviceEndpoint); + + ServiceDiscoveryEvent event = ServiceDiscoveryEvent.unregistered(serviceEndpoint); + LOGGER.debug("Publish unregistered: " + event); + sink.next(event); } }); } diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 8d785c547..72aa4fba3 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -4,8 +4,8 @@ import io.scalecube.cluster.membership.IdGenerator; import io.scalecube.services.ServiceCall.Call; import io.scalecube.services.discovery.ServiceScanner; -import io.scalecube.services.discovery.api.DiscoveryConfig; import io.scalecube.services.discovery.api.ServiceDiscovery; +import io.scalecube.services.discovery.api.ServiceDiscoveryConfig; import io.scalecube.services.gateway.Gateway; import io.scalecube.services.gateway.GatewayConfig; import io.scalecube.services.methods.ServiceMethodRegistry; @@ -17,7 +17,6 @@ import io.scalecube.services.transport.api.ServerTransport; import io.scalecube.services.transport.api.ServiceTransport; import io.scalecube.services.transport.api.WorkerThreadChooser; -import io.scalecube.transport.Address; import io.scalecube.transport.Addressing; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -112,7 +111,7 @@ public class Microservices { private final ServiceTransportBootstrap transportBootstrap; private final GatewayBootstrap gatewayBootstrap; private final ServiceDiscovery discovery; - private final Consumer discoveryOptions; + private final Consumer discoveryOptions; private Microservices(Builder builder) { this.id = IdGenerator.generateId(); @@ -159,8 +158,8 @@ private Mono start() { } // configure discovery and publish to the cluster - DiscoveryConfig discoveryConfig = - DiscoveryConfig.builder(discoveryOptions) + ServiceDiscoveryConfig discoveryConfig = + ServiceDiscoveryConfig.builder(discoveryOptions) .serviceRegistry(serviceRegistry) .endpoint(endpoint) .build(); @@ -212,7 +211,7 @@ public static final class Builder { private ServiceRegistry serviceRegistry = new ServiceRegistryImpl(); private ServiceMethodRegistry methodRegistry = new ServiceMethodRegistryImpl(); private ServiceDiscovery discovery = ServiceDiscovery.getDiscovery(); - private Consumer discoveryOptions; + private Consumer discoveryOptions; private ServiceTransportBootstrap transportBootstrap = new ServiceTransportBootstrap(); private GatewayBootstrap gatewayBootstrap = new GatewayBootstrap(); @@ -249,8 +248,8 @@ public Builder discovery(ServiceDiscovery discovery) { return this; } - public Builder discovery(Consumer options) { - this.discoveryOptions = options; + public Builder discovery(Consumer discoveryOptions) { + this.discoveryOptions = discoveryOptions; return this; } @@ -360,10 +359,6 @@ public static Builder builder() { return new Builder(); } - public ServiceRegistry serviceRegistry() { - return serviceRegistry; - } - public InetSocketAddress serviceAddress() { return transportBootstrap.listenAddress(); } @@ -380,12 +375,8 @@ public Map gatewayAddresses() { return gatewayBootstrap.gatewayAddresses(); } - public Address address() { - return this.discovery.address(); - } - - public ServiceEndpoint serviceEndpoint() { - return this.discovery.endpoint(); + public ServiceDiscovery discovery() { + return this.discovery; } /** @@ -397,9 +388,6 @@ public Mono shutdown() { return Mono.defer( () -> Mono.when( - Optional.ofNullable(serviceRegistry) - .map(ServiceRegistry::close) - .orElse(Mono.empty()), Optional.ofNullable(discovery).map(ServiceDiscovery::shutdown).orElse(Mono.empty()), Optional.ofNullable(gatewayBootstrap) .map(GatewayBootstrap::shutdown) diff --git a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java index 2780c01f8..dc4b749a7 100644 --- a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java +++ b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java @@ -1,12 +1,8 @@ package io.scalecube.services.registry; -import static io.scalecube.services.registry.api.RegistryEvent.Type.ADDED; -import static io.scalecube.services.registry.api.RegistryEvent.Type.REMOVED; - import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.registry.api.RegistryEvent; import io.scalecube.services.registry.api.ServiceRegistry; import java.util.ArrayList; import java.util.Collection; @@ -15,15 +11,8 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.jctools.maps.NonBlockingHashMap; -import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; public class ServiceRegistryImpl implements ServiceRegistry { @@ -32,10 +21,6 @@ public class ServiceRegistryImpl implements ServiceRegistry { private final Map> referencesByQualifier = new NonBlockingHashMap<>(); - private final FluxProcessor registryEvents = - DirectProcessor.create(); - private final FluxSink registryEventSink = registryEvents.serialize().sink(); - @Override public List listServiceEndpoints() { // todo how to collect tags correctly? @@ -68,25 +53,13 @@ public List lookupService(ServiceMessage request) { public boolean registerService(ServiceEndpoint serviceEndpoint) { boolean success = serviceEndpoints.putIfAbsent(serviceEndpoint.id(), serviceEndpoint) == null; if (success) { - List serviceReferences = - serviceEndpoint - .serviceRegistrations() - .stream() - .flatMap( - sr -> - sr.methods() - .stream() - .map(sm -> new ServiceReference(sm, sr, serviceEndpoint))) - .collect(Collectors.toList()); - - serviceReferences.forEach( - sr -> - referencesByQualifier - .computeIfAbsent(sr.qualifier(), key -> new CopyOnWriteArrayList<>()) - .add(sr)); - - serviceReferences.forEach(sr -> registryEventSink.next(RegistryEvent.create(ADDED, sr))); - registryEventSink.next(RegistryEvent.create(ADDED, serviceEndpoint)); + serviceEndpoint + .serviceReferences() + .forEach( + sr -> + referencesByQualifier + .computeIfAbsent(sr.qualifier(), key -> new CopyOnWriteArrayList<>()) + .add(sr)); } return success; } @@ -95,6 +68,7 @@ public boolean registerService(ServiceEndpoint serviceEndpoint) { public ServiceEndpoint unregisterService(String endpointId) { ServiceEndpoint serviceEndpoint = serviceEndpoints.remove(endpointId); if (serviceEndpoint != null) { + Map serviceReferencesOfEndpoint = referencesByQualifier .values() @@ -113,40 +87,7 @@ public ServiceEndpoint unregisterService(String endpointId) { return !list.isEmpty() ? list : null; }); }); - - serviceReferencesOfEndpoint - .values() - .forEach(sr -> registryEventSink.next(RegistryEvent.create(REMOVED, sr))); - registryEventSink.next(RegistryEvent.create(REMOVED, serviceEndpoint)); } - return serviceEndpoint; } - - @Override - public Flux> listen(Predicate> predicate) { - //noinspection unchecked - return Flux.fromStream( - Stream.concat( - referencesByQualifier - .values() - .stream() - .flatMap(Collection::stream) - .map(sr -> (RegistryEvent) RegistryEvent.create(ADDED, sr)), - serviceEndpoints - .values() - .stream() - .map(se -> (RegistryEvent) RegistryEvent.create(ADDED, se)))) - .concatWith(registryEvents) - .filter((Predicate) predicate); - } - - @Override - public Mono close() { - return Mono.create( - sink -> { - registryEventSink.complete(); - sink.success(); - }); - } } diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java index de26a32b9..8909237c9 100644 --- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java +++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java @@ -35,7 +35,7 @@ public static void initNodes() { Microservices.builder() .discovery( options -> - options.seeds(provider.address()).port(port.incrementAndGet())) + options.seeds(provider.discovery().address()).port(port.incrementAndGet())) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index 59c450edd..e6539f2ed 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -61,7 +61,7 @@ public static void tearDown() { private static Microservices serviceProvider() { return Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(new GreetingServiceImpl()) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java index b082d3b68..81465d35d 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java @@ -1,10 +1,9 @@ package io.scalecube.services; -import static io.scalecube.services.registry.api.RegistryEvent.Type.ADDED; -import static io.scalecube.services.registry.api.RegistryEvent.Type.REMOVED; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.scalecube.services.registry.api.RegistryEvent; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent.Type; import io.scalecube.services.sut.GreetingServiceImpl; import java.time.Duration; import java.util.ArrayList; @@ -17,31 +16,31 @@ public class ServiceRegistryEventsTest { @Test public void test_added_removed_registration_events() { - List> events = new ArrayList<>(); + List events = new ArrayList<>(); Microservices seed = Microservices.builder().startAwait(); - seed.serviceRegistry().listen(RegistryEvent.asEndpoint()).subscribe(events::add); + seed.discovery().listen().subscribe(events::add); Microservices ms1 = Microservices.builder() - .discovery(options -> options.seeds(seed.address())) + .discovery(options -> options.seeds(seed.discovery().address())) .services(new GreetingServiceImpl()) .startAwait(); Microservices ms2 = Microservices.builder() - .discovery(options -> options.seeds(seed.address())) + .discovery(options -> options.seeds(seed.discovery().address())) .services(new GreetingServiceImpl()) .startAwait(); Mono.when(ms1.shutdown(), ms2.shutdown()).block(Duration.ofSeconds(6)); assertEquals(4, events.size()); - assertEquals(ADDED, events.get(0).type()); - assertEquals(ADDED, events.get(1).type()); - assertEquals(REMOVED, events.get(2).type()); - assertEquals(REMOVED, events.get(3).type()); + assertEquals(Type.REGISTERED, events.get(0).type()); + assertEquals(Type.REGISTERED, events.get(1).type()); + assertEquals(Type.UNREGISTERED, events.get(2).type()); + assertEquals(Type.UNREGISTERED, events.get(3).type()); seed.shutdown().block(Duration.ofSeconds(6)); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index a19707f47..e236e1e5e 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -60,7 +60,7 @@ private static Microservices gateway() { private static Microservices serviceProvider() { return Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(new GreetingServiceImpl()) .startAwait(); } @@ -191,7 +191,7 @@ public void test_remote_serviceA_calls_serviceB_using_setter() { // noinspection unused Microservices provider = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(new CoarseGrainedServiceImpl()) // add service a and b .startAwait(); @@ -213,7 +213,7 @@ public void test_remote_serviceA_calls_serviceB() { // noinspection unused Microservices provider = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(another) .startAwait(); @@ -232,7 +232,7 @@ public void test_remote_serviceA_calls_serviceB_with_timeout() { // Create microservices instance cluster. Microservices ms = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(another) // add service a and b .startAwait(); @@ -256,7 +256,7 @@ public void test_remote_serviceA_calls_serviceB_with_dispatcher() throws Excepti // Create microservices instance cluster. Microservices provider = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(another) // add service a and b .startAwait(); @@ -340,7 +340,7 @@ public void test_services_contribute_to_cluster_metadata() { Microservices ms = Microservices.builder().tags(tags).services(new GreetingServiceImpl()).startAwait(); - assertTrue(ms.serviceEndpoint().tags().containsKey("HOSTNAME")); + assertTrue(ms.discovery().endpoint().tags().containsKey("HOSTNAME")); } @Test diff --git a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java index 402b98eeb..41d247c0b 100644 --- a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java @@ -4,8 +4,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.scalecube.services.api.ServiceMessage; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import io.scalecube.services.exceptions.ConnectionClosedException; -import io.scalecube.services.registry.api.RegistryEvent; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; import java.time.Duration; @@ -42,7 +42,9 @@ public void setUp() { serviceNode = Microservices.builder() - .discovery(options -> options.seeds(gateway.address()).port(port.incrementAndGet())) + .discovery( + options -> + options.seeds(gateway.discovery().address()).port(port.incrementAndGet())) .services(new SimpleQuoteService()) .startAwait(); } @@ -78,9 +80,9 @@ public void test_remote_node_died_mono_never() throws Exception { sub1.set(serviceCall.requestOne(JUST_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway - .serviceRegistry() - .listen(RegistryEvent.asEndpoint()) - .filter(RegistryEvent::isRemoved) + .discovery() + .listen() + .filter(ServiceDiscoveryEvent::isUnregistered) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -107,9 +109,9 @@ public void test_remote_node_died_many_never() throws Exception { sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway - .serviceRegistry() - .listen(RegistryEvent.asEndpoint()) - .filter(RegistryEvent::isRemoved) + .discovery() + .listen() + .filter(ServiceDiscoveryEvent::isUnregistered) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -140,9 +142,9 @@ public void test_remote_node_died_many_then_never() throws Exception { .subscribe()); gateway - .serviceRegistry() - .listen(RegistryEvent.asEndpoint()) - .filter(RegistryEvent::isRemoved) + .discovery() + .listen() + .filter(ServiceDiscoveryEvent::isUnregistered) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index 97733ca2f..03fc888bc 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -28,7 +28,7 @@ public static void setup() { node = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services(new SimpleQuoteService()) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index f4d093fdd..86a220a72 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -48,7 +48,7 @@ public static void setup() { // Create microservices instance cluster. provider1 = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(1)) .tag("ONLYFOR", "joe") @@ -62,7 +62,7 @@ public static void setup() { // Create microservices instance cluster. provider2 = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(2)) .tag("ONLYFOR", "fransin") diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java index 5206aee9d..7ee1503e9 100644 --- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java +++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java @@ -21,7 +21,7 @@ public static void main(String[] args) { Microservices services1 = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) .tag("Weight", "0.3") @@ -30,7 +30,7 @@ public static void main(String[] args) { Microservices services2 = Microservices.builder() - .discovery(options -> options.seeds(gateway.address())) + .discovery(options -> options.seeds(gateway.discovery().address())) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) .tag("Weight", "0.7")