From 139674355a295db78c66609deba4737f0323c657 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Mon, 15 Jun 2020 20:25:05 +0300 Subject: [PATCH] Updated service-discovery api on Microservices part --- .../java/io/scalecube/services/Microservices.java | 12 ++++-------- .../io/scalecube/services/ServiceRegistryTest.java | 6 +++--- .../services/sut/AnnotationServiceImpl.java | 2 +- .../rsocket/RSocketServiceTransportTest.java | 9 +++------ 4 files changed, 11 insertions(+), 18 deletions(-) diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 60ca0a095..f4e0f7d6b 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -302,16 +302,12 @@ public ServiceDiscoveryContext discovery(String id) { } /** - * Returns composite service discovery context. + * Function to subscribe and listen on {@code ServiceDiscoveryEvent} events. * - * @return composite service discovery context + * @return stream of {@code ServiceDiscoveryEvent} events */ - public ServiceDiscoveryContext discovery() { - return ServiceDiscoveryContext.builder() - .id("composite-discovery") - .address(Address.NULL_ADDRESS) - .discovery(compositeDiscovery) - .build(); + public Flux listenDiscovery() { + return compositeDiscovery.listen(); } /** diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java index 8ac062094..8a5c044d8 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java @@ -50,7 +50,7 @@ public void test_added_removed_registration_events(MetadataCodec metadataCodec) .transport(RSocketServiceTransport::new) .startAwait(); - seed.discovery().listen().subscribe(events); + seed.listenDiscovery().subscribe(events); Address seedAddress = seed.discovery("seed").address(); @@ -96,7 +96,7 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) { .startAwait(); cluster.add(seed); - seed.discovery().listen().subscribe(processor); + seed.listenDiscovery().subscribe(processor); Address seedAddress = seed.discovery("seed").address(); @@ -161,7 +161,7 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) .startAwait(); cluster.add(seed); - seed.discovery().listen().subscribe(processor); + seed.listenDiscovery().subscribe(processor); Address seedAddress = seed.discovery("seed").address(); diff --git a/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java b/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java index 564d25705..7de7c8747 100644 --- a/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java +++ b/services/src/test/java/io/scalecube/services/sut/AnnotationServiceImpl.java @@ -13,7 +13,7 @@ public class AnnotationServiceImpl implements AnnotationService { @AfterConstruct void init(Microservices microservices) { this.serviceDiscoveryEvents = ReplayProcessor.create(); - microservices.discovery().listen().subscribe(serviceDiscoveryEvents); + microservices.listenDiscovery().subscribe(serviceDiscoveryEvents); } @Override diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index 9ad58625c..e1a31afc2 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -82,8 +82,7 @@ public void test_remote_node_died_mono_never() throws Exception { sub1.set(serviceCall.requestOne(JUST_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway - .discovery() - .listen() + .listenDiscovery() .filter(ServiceDiscoveryEvent::isEndpointRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); @@ -111,8 +110,7 @@ public void test_remote_node_died_many_never() throws Exception { sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).doOnError(exceptionHolder::set).subscribe()); gateway - .discovery() - .listen() + .listenDiscovery() .filter(ServiceDiscoveryEvent::isEndpointRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println); @@ -144,8 +142,7 @@ public void test_remote_node_died_many_then_never() throws Exception { .subscribe()); gateway - .discovery() - .listen() + .listenDiscovery() .filter(ServiceDiscoveryEvent::isEndpointRemoved) .subscribe(onNext -> latch1.countDown(), System.err::println);