diff --git a/benchmarks/src/main/java/io/scalecube/services/benchmarks/service/BenchmarkServiceState.java b/benchmarks/src/main/java/io/scalecube/services/benchmarks/service/BenchmarkServiceState.java index f97149f7b..c7008ed01 100644 --- a/benchmarks/src/main/java/io/scalecube/services/benchmarks/service/BenchmarkServiceState.java +++ b/benchmarks/src/main/java/io/scalecube/services/benchmarks/service/BenchmarkServiceState.java @@ -40,9 +40,7 @@ public void beforeAll() { "Seed address: " + seed.discovery().address() + ", services address: " - + node.serviceAddress() - + ", seed serviceRegistry: " - + seed.serviceRegistry().listServiceReferences()); + + node.serviceAddress()); } @Override diff --git a/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java b/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java index 3068e372b..7627eab05 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java @@ -3,6 +3,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class ServiceEndpoint { @@ -64,10 +65,27 @@ public Map tags() { return tags; } + /** + * Return collection of service registratrions. + * + * @return collection of {@link ServiceRegistration} + */ public Collection serviceRegistrations() { return serviceRegistrations; } + /** + * Creates collection of service references from this service endpoint. + * + * @return collection of {@link ServiceReference} + */ + public Collection serviceReferences() { + return serviceRegistrations + .stream() + .flatMap(sr -> sr.methods().stream().map(sm -> new ServiceReference(sm, sr, this))) + .collect(Collectors.toList()); + } + @Override public String toString() { return "ServiceEndpoint{" diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java index af9956012..1d7e8695e 100644 --- a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java +++ b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java @@ -23,9 +23,9 @@ static ServiceDiscovery getDiscovery() { .orElseThrow(() -> new IllegalStateException("ServiceDiscovery not configured")); } - Mono start(DiscoveryConfig discoveryConfig); + Mono start(ServiceDiscoveryConfig config); Mono shutdown(); - Flux listen(); + Flux listen(); } diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryConfig.java b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryConfig.java similarity index 79% rename from services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryConfig.java rename to services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryConfig.java index 0984c83f4..c530a68ad 100644 --- a/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryConfig.java +++ b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryConfig.java @@ -3,12 +3,13 @@ import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.transport.Address; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; -public class DiscoveryConfig { +public class ServiceDiscoveryConfig { private Integer port; private Address[] seeds; @@ -18,7 +19,7 @@ public class DiscoveryConfig { private String memberHost; private Integer memberPort; - private DiscoveryConfig(Builder builder) { + private ServiceDiscoveryConfig(Builder builder) { this.seeds = builder.seeds; this.serviceRegistry = builder.serviceRegistry; this.port = builder.port; @@ -99,8 +100,8 @@ public Builder serviceRegistry(ServiceRegistry serviceRegistry) { return this; } - public DiscoveryConfig build() { - return new DiscoveryConfig(this); + public ServiceDiscoveryConfig build() { + return new ServiceDiscoveryConfig(this); } public Builder tags(Map tags) { @@ -123,4 +124,18 @@ public Builder memberPort(Integer memberPort) { return this; } } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ServiceDiscoveryConfig{"); + sb.append("port=").append(port); + sb.append(", seeds=").append(Arrays.toString(seeds)); + sb.append(", serviceRegistry=").append(serviceRegistry); + sb.append(", tags=").append(tags); + sb.append(", endpoint=").append(endpoint); + sb.append(", memberHost='").append(memberHost).append('\''); + sb.append(", memberPort=").append(memberPort); + sb.append('}'); + return sb.toString(); + } } diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryEvent.java b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryEvent.java similarity index 59% rename from services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryEvent.java rename to services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryEvent.java index a6b475cd1..470f083c5 100644 --- a/services-api/src/main/java/io/scalecube/services/discovery/api/DiscoveryEvent.java +++ b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryEvent.java @@ -7,7 +7,7 @@ * Service registration event. This event is being fired when {@link ServiceEndpoint} is being added * (or removed from) to (from) {@link ServiceRegistry}. */ -public class DiscoveryEvent { +public class ServiceDiscoveryEvent { public enum Type { REGISTERED, // service endpoint added @@ -17,17 +17,17 @@ public enum Type { private final ServiceEndpoint serviceEndpoint; private final Type type; - private DiscoveryEvent(Type type, ServiceEndpoint serviceEndpoint) { + private ServiceDiscoveryEvent(ServiceEndpoint serviceEndpoint, Type type) { this.serviceEndpoint = serviceEndpoint; this.type = type; } - public static DiscoveryEvent registered(ServiceEndpoint serviceEndpoint) { - return new DiscoveryEvent(Type.REGISTERED, serviceEndpoint); + public static ServiceDiscoveryEvent registered(ServiceEndpoint serviceEndpoint) { + return new ServiceDiscoveryEvent(serviceEndpoint, Type.REGISTERED); } - public static DiscoveryEvent unregistered(ServiceEndpoint serviceEndpoint) { - return new DiscoveryEvent(Type.UNREGISTERED, serviceEndpoint); + public static ServiceDiscoveryEvent unregistered(ServiceEndpoint serviceEndpoint) { + return new ServiceDiscoveryEvent(serviceEndpoint, Type.UNREGISTERED); } public ServiceEndpoint serviceEndpoint() { @@ -38,11 +38,6 @@ public Type type() { return this.type; } - @Override - public String toString() { - return "RegistrationEvent [serviceEndpoint=" + serviceEndpoint + ", type=" + type + "]"; - } - public boolean isRegistered() { return Type.REGISTERED.equals(this.type); } @@ -50,4 +45,13 @@ public boolean isRegistered() { public boolean isUnregistered() { return Type.UNREGISTERED.equals(this.type); } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ServiceDiscoveryEvent{"); + sb.append("serviceEndpoint=").append(serviceEndpoint); + sb.append(", type=").append(type); + sb.append('}'); + return sb.toString(); + } } diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java b/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java deleted file mode 100644 index 420c9bb7d..000000000 --- a/services-api/src/main/java/io/scalecube/services/registry/api/RegistryEvent.java +++ /dev/null @@ -1,53 +0,0 @@ -package io.scalecube.services.registry.api; - -import io.scalecube.services.ServiceReference; -import io.scalecube.transport.Address; - -public class RegistryEvent { - - public enum Type { - ADDED, - REMOVED; - } - - private ServiceReference serviceReference; - private Type type; - - public static RegistryEvent createAdded(ServiceReference serviceReference) { - return new RegistryEvent(Type.ADDED, serviceReference); - } - - public static RegistryEvent createRemoved(ServiceReference serviceReference) { - return new RegistryEvent(Type.REMOVED, serviceReference); - } - - private RegistryEvent(Type type, ServiceReference serviceReference) { - this.serviceReference = serviceReference; - this.type = type; - } - - private RegistryEvent(RegistryEvent e) { - this.serviceReference = e.serviceReference; - this.type = e.type; - } - - public ServiceReference serviceReference() { - return this.serviceReference; - } - - public boolean isAdded() { - return Type.ADDED.equals(type); - } - - public boolean isRemoved() { - return Type.REMOVED.equals(type); - } - - public Type type() { - return this.type; - } - - public Address address() { - return Address.create(this.serviceReference.host(), this.serviceReference.port()); - } -} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java index c248ea18f..6f4086fc7 100644 --- a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java +++ b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java @@ -4,8 +4,6 @@ import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; import java.util.List; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * Service registry interface provides API to register/unregister services in the system and make @@ -22,8 +20,4 @@ public interface ServiceRegistry { boolean registerService(ServiceEndpoint serviceEndpoint); ServiceEndpoint unregisterService(String endpointId); - - Flux listen(); - - Mono close(); } diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index f27433686..198a6f910 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -7,9 +7,9 @@ import io.scalecube.cluster.ClusterConfig.Builder; import io.scalecube.cluster.Member; import io.scalecube.services.ServiceEndpoint; -import io.scalecube.services.discovery.api.DiscoveryConfig; -import io.scalecube.services.discovery.api.DiscoveryEvent; import io.scalecube.services.discovery.api.ServiceDiscovery; +import io.scalecube.services.discovery.api.ServiceDiscoveryConfig; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.transport.Address; import java.util.Optional; @@ -24,18 +24,16 @@ public class ScalecubeServiceDiscovery implements ServiceDiscovery { - public static final String SERVICE_METADATA = "service"; - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); - private ServiceRegistry serviceRegistry; + public static final String SERVICE_METADATA = "service"; + private ServiceRegistry serviceRegistry; private Cluster cluster; - private ServiceEndpoint endpoint; - private final DirectProcessor subject = DirectProcessor.create(); - private final FluxSink sink = subject.serialize().sink(); + private final DirectProcessor subject = DirectProcessor.create(); + private final FluxSink sink = subject.serialize().sink(); private enum DiscoveryType { ADDED, @@ -54,12 +52,12 @@ public ServiceEndpoint endpoint() { } @Override - public Mono start(DiscoveryConfig discoveryConfig) { - this.serviceRegistry = discoveryConfig.serviceRegistry(); - this.endpoint = discoveryConfig.endpoint(); + public Mono start(ServiceDiscoveryConfig config) { + this.serviceRegistry = config.serviceRegistry(); + this.endpoint = config.endpoint(); ClusterConfig clusterConfig = - clusterConfigBuilder(discoveryConfig) + clusterConfigBuilder(config) .addMetadata( this.serviceRegistry .listServiceEndpoints() @@ -85,9 +83,9 @@ public Mono start(DiscoveryConfig discoveryConfig) { } @Override - public Flux listen() { + public Flux listen() { return Flux.fromIterable(serviceRegistry.listServiceEndpoints()) - .map(DiscoveryEvent::registered) + .map(ServiceDiscoveryEvent::registered) .concatWith(subject); } @@ -102,7 +100,7 @@ public Mono shutdown() { }); } - private ClusterConfig.Builder clusterConfigBuilder(DiscoveryConfig config) { + private ClusterConfig.Builder clusterConfigBuilder(ServiceDiscoveryConfig config) { Builder builder = ClusterConfig.builder(); if (config.seeds() != null) { builder.seedMembers(config.seeds()); @@ -141,12 +139,7 @@ private void listenCluster(Cluster cluster) { } private void loadClusterServices(Cluster cluster) { - cluster - .otherMembers() - .forEach( - member -> { - loadMemberServices(DiscoveryType.DISCOVERED, member); - }); + cluster.otherMembers().forEach(member -> loadMemberServices(DiscoveryType.DISCOVERED, member)); } private void loadMemberServices(DiscoveryType type, Member member) { @@ -165,27 +158,25 @@ private void loadMemberServices(DiscoveryType type, Member member) { LOGGER.debug("Member: {} is {} : {}", member, type, serviceEndpoint); if ((type.equals(DiscoveryType.ADDED) || type.equals(DiscoveryType.DISCOVERED)) && (this.serviceRegistry.registerService(serviceEndpoint))) { - LOGGER.info( "Service Reference was ADDED since new Member has joined the cluster {} : {}", member, serviceEndpoint); - DiscoveryEvent registrationEvent = DiscoveryEvent.registered(serviceEndpoint); - LOGGER.debug("Publish registered: " + registrationEvent); - sink.next(registrationEvent); + ServiceDiscoveryEvent event = ServiceDiscoveryEvent.registered(serviceEndpoint); + LOGGER.debug("Publish registered: " + event); + sink.next(event); } else if (type.equals(DiscoveryType.REMOVED) && (this.serviceRegistry.unregisterService(serviceEndpoint.id()) != null)) { - LOGGER.info( "Service Reference was REMOVED since Member have left the cluster {} : {}", member, serviceEndpoint); - DiscoveryEvent registrationEvent = DiscoveryEvent.unregistered(serviceEndpoint); - LOGGER.debug("Publish unregistered: " + registrationEvent); - sink.next(registrationEvent); + ServiceDiscoveryEvent event = ServiceDiscoveryEvent.unregistered(serviceEndpoint); + LOGGER.debug("Publish unregistered: " + event); + sink.next(event); } }); } diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 10e50b8c9..72aa4fba3 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -4,8 +4,8 @@ import io.scalecube.cluster.membership.IdGenerator; import io.scalecube.services.ServiceCall.Call; import io.scalecube.services.discovery.ServiceScanner; -import io.scalecube.services.discovery.api.DiscoveryConfig; import io.scalecube.services.discovery.api.ServiceDiscovery; +import io.scalecube.services.discovery.api.ServiceDiscoveryConfig; import io.scalecube.services.gateway.Gateway; import io.scalecube.services.gateway.GatewayConfig; import io.scalecube.services.methods.ServiceMethodRegistry; @@ -111,7 +111,7 @@ public class Microservices { private final ServiceTransportBootstrap transportBootstrap; private final GatewayBootstrap gatewayBootstrap; private final ServiceDiscovery discovery; - private final Consumer discoveryOptions; + private final Consumer discoveryOptions; private Microservices(Builder builder) { this.id = IdGenerator.generateId(); @@ -158,8 +158,8 @@ private Mono start() { } // configure discovery and publish to the cluster - DiscoveryConfig discoveryConfig = - DiscoveryConfig.builder(discoveryOptions) + ServiceDiscoveryConfig discoveryConfig = + ServiceDiscoveryConfig.builder(discoveryOptions) .serviceRegistry(serviceRegistry) .endpoint(endpoint) .build(); @@ -211,7 +211,7 @@ public static final class Builder { private ServiceRegistry serviceRegistry = new ServiceRegistryImpl(); private ServiceMethodRegistry methodRegistry = new ServiceMethodRegistryImpl(); private ServiceDiscovery discovery = ServiceDiscovery.getDiscovery(); - private Consumer discoveryOptions; + private Consumer discoveryOptions; private ServiceTransportBootstrap transportBootstrap = new ServiceTransportBootstrap(); private GatewayBootstrap gatewayBootstrap = new GatewayBootstrap(); @@ -248,8 +248,8 @@ public Builder discovery(ServiceDiscovery discovery) { return this; } - public Builder discovery(Consumer options) { - this.discoveryOptions = options; + public Builder discovery(Consumer discoveryOptions) { + this.discoveryOptions = discoveryOptions; return this; } @@ -359,10 +359,6 @@ public static Builder builder() { return new Builder(); } - public ServiceRegistry serviceRegistry() { - return serviceRegistry; - } - public InetSocketAddress serviceAddress() { return transportBootstrap.listenAddress(); } @@ -392,9 +388,6 @@ public Mono shutdown() { return Mono.defer( () -> Mono.when( - Optional.ofNullable(serviceRegistry) - .map(ServiceRegistry::close) - .orElse(Mono.empty()), Optional.ofNullable(discovery).map(ServiceDiscovery::shutdown).orElse(Mono.empty()), Optional.ofNullable(gatewayBootstrap) .map(GatewayBootstrap::shutdown) diff --git a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java index 1a7e98c88..dc4b749a7 100644 --- a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java +++ b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java @@ -3,7 +3,6 @@ import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.registry.api.RegistryEvent; import io.scalecube.services.registry.api.ServiceRegistry; import java.util.ArrayList; import java.util.Collection; @@ -13,13 +12,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.jctools.maps.NonBlockingHashMap; -import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; public class ServiceRegistryImpl implements ServiceRegistry { @@ -28,10 +21,6 @@ public class ServiceRegistryImpl implements ServiceRegistry { private final Map> referencesByQualifier = new NonBlockingHashMap<>(); - private final FluxProcessor events = DirectProcessor.create(); - - private final FluxSink eventSink = events.serialize().sink(); - @Override public List listServiceEndpoints() { // todo how to collect tags correctly? @@ -40,7 +29,11 @@ public List listServiceEndpoints() { @Override public List listServiceReferences() { - return serviceReferenceStream().collect(Collectors.toList()); + return referencesByQualifier + .values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); } @Override @@ -60,24 +53,13 @@ public List lookupService(ServiceMessage request) { public boolean registerService(ServiceEndpoint serviceEndpoint) { boolean success = serviceEndpoints.putIfAbsent(serviceEndpoint.id(), serviceEndpoint) == null; if (success) { - List serviceReferences = - serviceEndpoint - .serviceRegistrations() - .stream() - .flatMap( - sr -> - sr.methods() - .stream() - .map(sm -> new ServiceReference(sm, sr, serviceEndpoint))) - .collect(Collectors.toList()); - - serviceReferences.forEach( - sr -> - referencesByQualifier - .computeIfAbsent(sr.qualifier(), key -> new CopyOnWriteArrayList<>()) - .add(sr)); - - serviceReferences.forEach(sr -> eventSink.next(RegistryEvent.createAdded(sr))); + serviceEndpoint + .serviceReferences() + .forEach( + sr -> + referencesByQualifier + .computeIfAbsent(sr.qualifier(), key -> new CopyOnWriteArrayList<>()) + .add(sr)); } return success; } @@ -86,6 +68,7 @@ public boolean registerService(ServiceEndpoint serviceEndpoint) { public ServiceEndpoint unregisterService(String endpointId) { ServiceEndpoint serviceEndpoint = serviceEndpoints.remove(endpointId); if (serviceEndpoint != null) { + Map serviceReferencesOfEndpoint = referencesByQualifier .values() @@ -104,37 +87,7 @@ public ServiceEndpoint unregisterService(String endpointId) { return !list.isEmpty() ? list : null; }); }); - - serviceReferencesOfEndpoint - .values() - .forEach(sr -> eventSink.next(RegistryEvent.createRemoved(sr))); } - return serviceEndpoint; } - - Stream serviceReferenceStream() { - return referencesByQualifier.values().stream().flatMap(Collection::stream); - } - - /** - * Listen on service registry events. - * - * @return flux object - */ - public Flux listen() { - return Flux.fromIterable(referencesByQualifier.values()) - .flatMap(Flux::fromIterable) - .map(RegistryEvent::createAdded) - .concatWith(events); - } - - @Override - public Mono close() { - return Mono.create( - sink -> { - eventSink.complete(); - sink.success(); - }); - } } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java index 51a62c63a..81465d35d 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryEventsTest.java @@ -1,10 +1,9 @@ package io.scalecube.services; -import static io.scalecube.services.discovery.api.DiscoveryEvent.Type.REGISTERED; -import static io.scalecube.services.discovery.api.DiscoveryEvent.Type.UNREGISTERED; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.scalecube.services.discovery.api.DiscoveryEvent; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent.Type; import io.scalecube.services.sut.GreetingServiceImpl; import java.time.Duration; import java.util.ArrayList; @@ -17,7 +16,7 @@ public class ServiceRegistryEventsTest { @Test public void test_added_removed_registration_events() { - List events = new ArrayList<>(); + List events = new ArrayList<>(); Microservices seed = Microservices.builder().startAwait(); @@ -38,10 +37,10 @@ public void test_added_removed_registration_events() { Mono.when(ms1.shutdown(), ms2.shutdown()).block(Duration.ofSeconds(6)); assertEquals(4, events.size()); - assertEquals(REGISTERED, events.get(0).type()); - assertEquals(REGISTERED, events.get(1).type()); - assertEquals(UNREGISTERED, events.get(2).type()); - assertEquals(UNREGISTERED, events.get(3).type()); + assertEquals(Type.REGISTERED, events.get(0).type()); + assertEquals(Type.REGISTERED, events.get(1).type()); + assertEquals(Type.UNREGISTERED, events.get(2).type()); + assertEquals(Type.UNREGISTERED, events.get(3).type()); seed.shutdown().block(Duration.ofSeconds(6)); } diff --git a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java index 1ce839695..41d247c0b 100644 --- a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java @@ -4,7 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.discovery.api.DiscoveryEvent; +import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; @@ -82,7 +82,7 @@ public void test_remote_node_died_mono_never() throws Exception { gateway .discovery() .listen() - .filter(DiscoveryEvent::isUnregistered) + .filter(ServiceDiscoveryEvent::isUnregistered) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -111,7 +111,7 @@ public void test_remote_node_died_many_never() throws Exception { gateway .discovery() .listen() - .filter(DiscoveryEvent::isUnregistered) + .filter(ServiceDiscoveryEvent::isUnregistered) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down @@ -144,7 +144,7 @@ public void test_remote_node_died_many_then_never() throws Exception { gateway .discovery() .listen() - .filter(DiscoveryEvent::isUnregistered) + .filter(ServiceDiscoveryEvent::isUnregistered) .subscribe(onNext -> latch1.countDown(), System.err::println); // service node goes down