From 4da80173b48b5000e1ad167540a4a919ffef7bac Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Mon, 12 Apr 2021 21:22:05 +0300 Subject: [PATCH 1/9] Added Operators.onDiscard --- .../services/discovery/ScalecubeServiceDiscovery.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index 9ff1e094e..bf09515c8 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -35,6 +35,7 @@ import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; @@ -44,6 +45,10 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); + static { + Operators.enableOnDiscard(null, obj -> LOGGER.warn("[onDiscard] obj = {}", obj)); + } + private final ServiceEndpoint serviceEndpoint; private ClusterConfig clusterConfig; From f426b8d3de80c541c658bf8ddd4a7f11566334bf Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Mon, 12 Apr 2021 21:23:30 +0300 Subject: [PATCH 2/9] Added Operators.onDiscard --- .../scalecube/services/discovery/ScalecubeServiceDiscovery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index bf09515c8..1ed61b841 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -46,7 +46,7 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); static { - Operators.enableOnDiscard(null, obj -> LOGGER.warn("[onDiscard] obj = {}", obj)); + Operators.enableOnDiscard(null, o -> LOGGER.warn("[onDiscard] element = {}", o)); } private final ServiceEndpoint serviceEndpoint; From 8a6255401ad97e2446ae6f3cb3267770badb219a Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Mon, 12 Apr 2021 21:27:46 +0300 Subject: [PATCH 3/9] Added log at RetryEmiFailureHnadler --- .../scalecube/services/discovery/ScalecubeServiceDiscovery.java | 1 + 1 file changed, 1 insertion(+) diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index 1ed61b841..bbbb750b4 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -301,6 +301,7 @@ private static class RetryEmitFailureHandler implements EmitFailureHandler { @Override public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { + LOGGER.warn("[onEmitFailure] signalType={}, emitResult={}", signalType, emitResult); return emitResult == FAIL_NON_SERIALIZED; } } From 45ff8dc833c4abb6256289bde1cf26c74548a40c Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Mon, 12 Apr 2021 23:17:37 +0300 Subject: [PATCH 4/9] Set back directProcessor in CompositeDiscovery, ScaldecubeServiceDsicovery --- .../discovery/ScalecubeServiceDiscovery.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index bbbb750b4..74794db58 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -33,11 +33,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; +import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.core.publisher.SignalType; -import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.EmitResult; @@ -55,8 +56,8 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery { private Cluster cluster; // Sink - private final Sinks.Many sink = - Sinks.many().multicast().directBestEffort(); + private final DirectProcessor subject = DirectProcessor.create(); + private final FluxSink sink = subject.sink(); /** * Constructor. @@ -170,7 +171,7 @@ public void onMembershipEvent(MembershipEvent event) { @Override public Flux listen() { - return sink.asFlux().onBackpressureBuffer(); + return subject.onBackpressureBuffer(); } @Override @@ -178,13 +179,11 @@ public Mono shutdown() { return Mono.defer( () -> { if (cluster == null) { - sink.emitComplete(RetryEmitFailureHandler.INSTANCE); + sink.complete(); return Mono.empty(); } cluster.shutdown(); - return cluster - .onShutdown() - .doFinally(s -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE)); + return cluster.onShutdown().doFinally(s -> sink.complete()); }); } @@ -201,7 +200,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) { if (discoveryEvent != null) { LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent); - sink.emitNext(discoveryEvent, RetryEmitFailureHandler.INSTANCE); + sink.next(discoveryEvent); } } From c761b8f9c245842cf63b5cd898ea2392da927f19 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Tue, 13 Apr 2021 01:17:18 +0300 Subject: [PATCH 5/9] Set back directProcessor in CompositeDiscovery, ScaldecubeServiceDsicovery --- .../java/io/scalecube/services/Microservices.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index e0088bb39..29a1f5caa 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -57,7 +57,9 @@ import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.Exceptions; +import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; @@ -555,9 +557,9 @@ private static class CompositeServiceDiscovery implements ServiceDiscovery { private final Map discoveryContexts = new ConcurrentHashMap<>(); - // Sink - private final Sinks.Many sink = - Sinks.many().multicast().directBestEffort(); + // Subject + private final DirectProcessor subject = DirectProcessor.create(); + private final FluxSink sink = subject.sink(); private final Disposable.Composite disposables = Disposables.composite(); private Scheduler scheduler; @@ -614,7 +616,7 @@ private Mono startListen() { public Flux listen() { return Flux.fromStream(microservices.serviceRegistry.listServiceEndpoints().stream()) .map(ServiceDiscoveryEvent::newEndpointAdded) - .concatWith(sink.asFlux().onBackpressureBuffer()) + .concatWith(subject) .subscribeOn(scheduler) .publishOn(scheduler); } @@ -650,7 +652,7 @@ private Mono start0(String id, ServiceDiscovery discovery) { .subscribeOn(scheduler) .publishOn(scheduler) .doOnNext(event -> onDiscoveryEvent(microservices, event)) - .doOnNext(event -> sink.emitNext(event, RetryEmitFailureHandler.INSTANCE)) + .doOnNext(sink::next) .subscribe()); return Mono.deferContextual(context -> discovery.start()) From a18785be1da819dca0ac1c2eee50aa064ec4d26b Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Tue, 13 Apr 2021 10:27:12 +0300 Subject: [PATCH 6/9] Fixing CI --- .../discovery/ScalecubeServiceDiscovery.java | 23 ++++++++----------- .../io/scalecube/services/Microservices.java | 11 ++++----- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index 74794db58..9ff1e094e 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -33,12 +33,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.Operators; import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.EmitResult; @@ -46,18 +44,14 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); - static { - Operators.enableOnDiscard(null, o -> LOGGER.warn("[onDiscard] element = {}", o)); - } - private final ServiceEndpoint serviceEndpoint; private ClusterConfig clusterConfig; private Cluster cluster; // Sink - private final DirectProcessor subject = DirectProcessor.create(); - private final FluxSink sink = subject.sink(); + private final Sinks.Many sink = + Sinks.many().multicast().directBestEffort(); /** * Constructor. @@ -171,7 +165,7 @@ public void onMembershipEvent(MembershipEvent event) { @Override public Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } @Override @@ -179,11 +173,13 @@ public Mono shutdown() { return Mono.defer( () -> { if (cluster == null) { - sink.complete(); + sink.emitComplete(RetryEmitFailureHandler.INSTANCE); return Mono.empty(); } cluster.shutdown(); - return cluster.onShutdown().doFinally(s -> sink.complete()); + return cluster + .onShutdown() + .doFinally(s -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE)); }); } @@ -200,7 +196,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) { if (discoveryEvent != null) { LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent); - sink.next(discoveryEvent); + sink.emitNext(discoveryEvent, RetryEmitFailureHandler.INSTANCE); } } @@ -300,7 +296,6 @@ private static class RetryEmitFailureHandler implements EmitFailureHandler { @Override public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { - LOGGER.warn("[onEmitFailure] signalType={}, emitResult={}", signalType, emitResult); return emitResult == FAIL_NON_SERIALIZED; } } diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 29a1f5caa..03abc6c27 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -57,9 +57,7 @@ import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; @@ -558,8 +556,8 @@ private static class CompositeServiceDiscovery implements ServiceDiscovery { new ConcurrentHashMap<>(); // Subject - private final DirectProcessor subject = DirectProcessor.create(); - private final FluxSink sink = subject.sink(); + private final Sinks.Many sink = + Sinks.many().multicast().directBestEffort(); private final Disposable.Composite disposables = Disposables.composite(); private Scheduler scheduler; @@ -616,7 +614,7 @@ private Mono startListen() { public Flux listen() { return Flux.fromStream(microservices.serviceRegistry.listServiceEndpoints().stream()) .map(ServiceDiscoveryEvent::newEndpointAdded) - .concatWith(subject) + .concatWith(sink.asFlux().onBackpressureBuffer()) .subscribeOn(scheduler) .publishOn(scheduler); } @@ -652,7 +650,7 @@ private Mono start0(String id, ServiceDiscovery discovery) { .subscribeOn(scheduler) .publishOn(scheduler) .doOnNext(event -> onDiscoveryEvent(microservices, event)) - .doOnNext(sink::next) + .doOnNext(event -> sink.emitNext(event, RetryEmitFailureHandler.INSTANCE)) .subscribe()); return Mono.deferContextual(context -> discovery.start()) @@ -676,6 +674,7 @@ public Mono shutdown() { return Mono.defer( () -> { disposables.dispose(); + sink.emitComplete(RetryEmitFailureHandler.INSTANCE); return Mono.whenDelayError( discoveryInstances.values().stream() .map(ServiceDiscovery::shutdown) From cd0eba58ee246145acfc23e07a3cc25358dd67ad Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Wed, 14 Apr 2021 16:01:36 +0300 Subject: [PATCH 7/9] Updated scalecube-cluster, updated workflows files --- .github/workflows/branch-ci.yml | 4 +++- .github/workflows/pre-release-ci.yml | 1 + .github/workflows/release-ci.yml | 4 +++- pom.xml | 6 +++--- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/.github/workflows/branch-ci.yml b/.github/workflows/branch-ci.yml index b63996889..603791e5a 100644 --- a/.github/workflows/branch-ci.yml +++ b/.github/workflows/branch-ci.yml @@ -33,4 +33,6 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.ORGANIZATION_TOKEN }} - name: Maven Verify - run: mvn verify -B + run: | + sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts + mvn verify -B diff --git a/.github/workflows/pre-release-ci.yml b/.github/workflows/pre-release-ci.yml index a7f358871..4948f5bc2 100644 --- a/.github/workflows/pre-release-ci.yml +++ b/.github/workflows/pre-release-ci.yml @@ -25,6 +25,7 @@ jobs: server-password: GITHUB_TOKEN - name: Deploy pre-release version to GitHub Packages run: | + sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts pre_release_version=${{ github.event.release.tag_name }} echo Pre-release version $pre_release_version mvn versions:set -DnewVersion=$pre_release_version -DgenerateBackupPoms=false diff --git a/.github/workflows/release-ci.yml b/.github/workflows/release-ci.yml index f4c7b083e..9cc975bcf 100644 --- a/.github/workflows/release-ci.yml +++ b/.github/workflows/release-ci.yml @@ -31,7 +31,9 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.ORGANIZATION_TOKEN }} - name: Maven Verify - run: mvn verify -B + run: | + sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts + mvn verify -B - name: Configure git run: | git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com" diff --git a/pom.xml b/pom.xml index dadc65309..c55037936 100644 --- a/pom.xml +++ b/pom.xml @@ -57,18 +57,18 @@ - 2.6.8 + 2.6.9-test-rc1 1.0.13 1.0.19 - 2020.0.5 + 2020.0.6 2.11.0 1.0.4 1.6.0 1.7.30 2.13.2 3.4.2 - 4.1.60.Final + 4.1.63.Final 1.26 3.0.2 From d39a4794bd18be207cf86466521aa14003059e0a Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Wed, 14 Apr 2021 16:03:00 +0300 Subject: [PATCH 8/9] Minor --- services/src/main/java/io/scalecube/services/Microservices.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 03abc6c27..aa33a92b6 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -555,7 +555,7 @@ private static class CompositeServiceDiscovery implements ServiceDiscovery { private final Map discoveryContexts = new ConcurrentHashMap<>(); - // Subject + // Sink private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); From 4dfad4fb8a5255984e64a795ee53ee1fc4025b0a Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Wed, 14 Apr 2021 17:53:01 +0300 Subject: [PATCH 9/9] Updated scalecube-services to 2.6.9 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c55037936..aea144836 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 2.6.9-test-rc1 + 2.6.9 1.0.13 1.0.19