From 0573b390536c5c0f9cee334790e89d489c13c503 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Tue, 24 Sep 2024 21:15:38 +0300 Subject: [PATCH] Get rid of commons (#849) * Removed RetryNonSerializedEmitFailureHandler * Removed dependency on commons * Added `Address` (copied from commons) --- pom.xml | 14 +- services-api/pom.xml | 13 +- .../java/io/scalecube/services/Address.java | 139 ++++++++++++++++++ .../scalecube/services/ServiceEndpoint.java | 1 - .../scalecube/services/ServiceReference.java | 1 - .../discovery/api/ServiceDiscovery.java | 2 +- .../scalecube/services/gateway/Gateway.java | 2 +- .../transport/api/ServerTransport.java | 2 +- .../discovery/ScalecubeServiceDiscovery.java | 11 +- .../ScalecubeServiceDiscoveryTest.java | 6 +- .../auth/CompositeProfileAuthExample.java | 2 +- .../auth/PrincipalMapperAuthExample.java | 2 +- .../auth/ServiceTransportAuthExample.java | 2 +- .../services/examples/codecs/Example1.java | 4 +- .../exceptions/ExceptionMapperExample.java | 4 +- .../examples/gateway/HttpGatewayExample.java | 2 +- .../gateway/WebsocketGatewayExample.java | 2 +- .../examples/helloworld/Example1.java | 4 +- .../examples/helloworld/Example2.java | 4 +- .../examples/helloworld/Example3.java | 4 +- .../services/examples/services/Example1.java | 6 +- .../services/examples/services/Example2.java | 6 +- .../services/gateway/http/HttpGateway.java | 2 +- .../transport/GatewayClientSettings.java | 2 +- .../transport/StaticAddressRouter.java | 2 +- .../transport/http/HttpGatewayClient.java | 7 +- .../websocket/WebsocketGatewayClient.java | 6 +- .../WebsocketGatewayClientSession.java | 15 +- .../services/gateway/ws/WebsocketGateway.java | 2 +- .../gateway/AbstractGatewayExtension.java | 4 +- .../AbstractLocalGatewayExtension.java | 2 +- .../http/HttpClientConnectionTest.java | 5 +- .../WebsocketClientConnectionTest.java | 5 +- .../websocket/WebsocketClientTest.java | 5 +- .../websocket/WebsocketServerTest.java | 2 +- .../transport/rsocket/ConnectionSetup.java | 9 -- .../rsocket/RSocketClientTransport.java | 9 +- .../RSocketClientTransportFactory.java | 2 +- .../rsocket/RSocketServerTransport.java | 2 +- .../io/scalecube/services/Microservices.java | 12 +- .../io/scalecube/services/ErrorFlowTest.java | 2 +- .../services/ServiceAuthRemoteTest.java | 6 +- .../services/ServiceCallRemoteTest.java | 2 +- .../services/ServiceRegistryTest.java | 3 +- .../scalecube/services/ServiceRemoteTest.java | 3 +- .../services/StreamingServiceTest.java | 3 +- .../services/routings/RoutersTest.java | 8 +- .../services/routings/ServiceTagsExample.java | 6 +- ...ocketNettyColocatedEventLoopGroupTest.java | 8 +- .../rsocket/RSocketServiceTransportTest.java | 4 +- 50 files changed, 241 insertions(+), 130 deletions(-) create mode 100644 services-api/src/main/java/io/scalecube/services/Address.java diff --git a/pom.xml b/pom.xml index 6dc4f0f7c..d09049960 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -57,8 +59,7 @@ - 2.6.17 - 1.0.24 + 2.6.18.rc1 1.0.32 2020.0.32 @@ -93,13 +94,6 @@ - - - io.scalecube - scalecube-commons - ${scalecube-commons.version} - - io.scalecube diff --git a/services-api/pom.xml b/services-api/pom.xml index 2762a2590..ada0a3ab5 100644 --- a/services-api/pom.xml +++ b/services-api/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -11,20 +13,11 @@ scalecube-services-api - - UTF-8 - - - - io.scalecube - scalecube-commons - io.projectreactor reactor-core - org.slf4j slf4j-api diff --git a/services-api/src/main/java/io/scalecube/services/Address.java b/services-api/src/main/java/io/scalecube/services/Address.java new file mode 100644 index 000000000..72cf0884e --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/Address.java @@ -0,0 +1,139 @@ +package io.scalecube.services; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Address { + + public static final Address NULL_ADDRESS = Address.create("nullhost", 0); + + public static final Pattern ADDRESS_FORMAT = Pattern.compile("(?^.*):(?\\d+$)"); + + private String host; + private int port; + + Address() {} + + private Address(String host, int port) { + this.host = host; + this.port = port; + } + + /** + * Parses given host:port string to create Address instance. + * + * @param hostandport must come in form {@code host:port} + */ + public static Address from(String hostandport) { + if (hostandport == null || hostandport.isEmpty()) { + throw new IllegalArgumentException("host-and-port string must be present"); + } + + Matcher matcher = ADDRESS_FORMAT.matcher(hostandport); + if (!matcher.find()) { + throw new IllegalArgumentException("can't parse host-and-port string from: " + hostandport); + } + + String host = matcher.group(1); + if (host == null || host.isEmpty()) { + throw new IllegalArgumentException("can't parse host from: " + hostandport); + } + + int port; + try { + port = Integer.parseInt(matcher.group(2)); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("can't parse port from: " + hostandport, ex); + } + + return new Address(host, port); + } + + /** + * Create address from host and port. + * + * @param host host + * @param port port + * @return address + */ + public static Address create(String host, int port) { + return new Address(host, port); + } + + /** + * Getting local IP address by the address of local host. NOTE: returned IP address is + * expected to be a publicly visible IP address. + * + * @throws RuntimeException wrapped {@link UnknownHostException} + */ + public static InetAddress getLocalIpAddress() { + try { + return InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + /** + * Returns host. + * + * @return host + */ + public String host() { + return host; + } + + /** + * Returns port. + * + * @return port + */ + public int port() { + return port; + } + + /** + * Returns new address instance with the specified port. + * + * @param port port + * @return address instance + */ + public Address port(int port) { + return Address.create(host, port); + } + + /** + * Returns new address instance with applied port offset. + * + * @param portOffset portOffset + * @return address instance + */ + public Address withPortOffset(int portOffset) { + return Address.create(host, port + portOffset); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + Address that = (Address) other; + return Objects.equals(host, that.host) && Objects.equals(port, that.port); + } + + @Override + public int hashCode() { + return Objects.hash(host, port); + } + + @Override + public String toString() { + return host + ":" + port; + } +} diff --git a/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java b/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java index ea7a1d4d0..d5714d253 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java @@ -1,6 +1,5 @@ package io.scalecube.services; -import io.scalecube.net.Address; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; diff --git a/services-api/src/main/java/io/scalecube/services/ServiceReference.java b/services-api/src/main/java/io/scalecube/services/ServiceReference.java index 57238479e..c9c7b7e14 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceReference.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceReference.java @@ -1,6 +1,5 @@ package io.scalecube.services; -import io.scalecube.net.Address; import io.scalecube.services.api.Qualifier; import java.util.Collections; import java.util.HashMap; diff --git a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java index 42acacb28..a1b804c11 100644 --- a/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java +++ b/services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java @@ -1,6 +1,6 @@ package io.scalecube.services.discovery.api; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import reactor.core.publisher.Flux; public interface ServiceDiscovery { diff --git a/services-api/src/main/java/io/scalecube/services/gateway/Gateway.java b/services-api/src/main/java/io/scalecube/services/gateway/Gateway.java index 95b7b55ca..cc0ed2d29 100644 --- a/services-api/src/main/java/io/scalecube/services/gateway/Gateway.java +++ b/services-api/src/main/java/io/scalecube/services/gateway/Gateway.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import reactor.core.publisher.Mono; public interface Gateway { diff --git a/services-api/src/main/java/io/scalecube/services/transport/api/ServerTransport.java b/services-api/src/main/java/io/scalecube/services/transport/api/ServerTransport.java index de7427858..26a9a79ca 100644 --- a/services-api/src/main/java/io/scalecube/services/transport/api/ServerTransport.java +++ b/services-api/src/main/java/io/scalecube/services/transport/api/ServerTransport.java @@ -1,6 +1,6 @@ package io.scalecube.services.transport.api; -import io.scalecube.net.Address; +import io.scalecube.services.Address; public interface ServerTransport { diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index 6b060b881..4874b99ec 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -1,9 +1,9 @@ package io.scalecube.services.discovery; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointAdded; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointLeaving; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointRemoved; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.scalecube.cluster.Cluster; import io.scalecube.cluster.ClusterConfig; @@ -14,11 +14,12 @@ import io.scalecube.cluster.membership.MembershipConfig; import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.TransportConfig; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.StringJoiner; import java.util.function.UnaryOperator; import org.slf4j.Logger; @@ -89,7 +90,7 @@ public void onMembershipEvent(MembershipEvent event) { @Override public Address address() { - return cluster != null ? cluster.address() : null; + return cluster != null ? Address.from(cluster.address()) : null; } @Override @@ -99,7 +100,7 @@ public Flux listen() { @Override public void shutdown() { - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); if (cluster != null) { cluster.shutdown(); } @@ -117,7 +118,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) { } LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent); - sink.emitNext(discoveryEvent, RETRY_NON_SERIALIZED); + sink.emitNext(discoveryEvent, busyLooping(Duration.ofSeconds(3))); } private ServiceDiscoveryEvent toServiceDiscoveryEvent(MembershipEvent membershipEvent) { diff --git a/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java b/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java index 824d66125..1f83277dd 100644 --- a/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java +++ b/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java @@ -12,7 +12,7 @@ import io.scalecube.cluster.membership.MembershipConfig; import io.scalecube.cluster.metadata.JdkMetadataCodec; import io.scalecube.cluster.metadata.MetadataCodec; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceMethodDefinition; import io.scalecube.services.ServiceRegistration; @@ -234,13 +234,13 @@ private Mono newServiceDiscovery( .gossip(cfg -> GOSSIP_CONFIG) .failureDetector(cfg -> FAILURE_DETECTOR_CONFIG) .membership(cfg -> MEMBERSHIP_CONFIG) - .membership(cfg -> cfg.seedMembers(seedAddress))); + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))); } private void startSeed(MetadataCodec metadataCodec) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) - .membership(opts -> opts.seedMembers(SEED_ADDRESS)) + .membership(opts -> opts.seedMembers(SEED_ADDRESS.toString())) .options(opts -> opts.metadata(newServiceEndpoint())) .options(opts -> opts.metadataCodec(metadataCodec)) .gossip(cfg -> GOSSIP_CONFIG) diff --git a/services-examples/src/main/java/io/scalecube/services/examples/auth/CompositeProfileAuthExample.java b/services-examples/src/main/java/io/scalecube/services/examples/auth/CompositeProfileAuthExample.java index e9d4e6734..8d7aba427 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/auth/CompositeProfileAuthExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/auth/CompositeProfileAuthExample.java @@ -108,6 +108,6 @@ private static ScalecubeServiceDiscovery discovery( return new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(opts -> opts.seedMembers(service.discoveryAddress())); + .membership(opts -> opts.seedMembers(service.discoveryAddress().toString())); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java b/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java index 4221a8676..5c97ac099 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java @@ -160,6 +160,6 @@ private static ScalecubeServiceDiscovery discovery( return new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(opts -> opts.seedMembers(service.discoveryAddress())); + .membership(opts -> opts.seedMembers(service.discoveryAddress().toString())); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java b/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java index 02e0b7a2b..23f5d0cb0 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java @@ -80,6 +80,6 @@ private static ScalecubeServiceDiscovery discovery( return new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(opts -> opts.seedMembers(service.discoveryAddress())); + .membership(opts -> opts.seedMembers(service.discoveryAddress().toString())); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java index 187713509..0c03f973a 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.codecs; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; @@ -42,7 +42,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress))) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java b/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java index 18c9a5f02..37f03aeb1 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.exceptions; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceInfo; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; @@ -43,7 +43,7 @@ public static void main(String[] args) throws InterruptedException { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(address1))) + .membership(cfg -> cfg.seedMembers(address1.toString()))) .transport(RSocketServiceTransport::new) .services( call -> { diff --git a/services-examples/src/main/java/io/scalecube/services/examples/gateway/HttpGatewayExample.java b/services-examples/src/main/java/io/scalecube/services/examples/gateway/HttpGatewayExample.java index 6e50cf75d..3002dd941 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/gateway/HttpGatewayExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/gateway/HttpGatewayExample.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.gateway; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.gateway.Gateway; import io.scalecube.services.gateway.GatewayOptions; import java.net.InetSocketAddress; diff --git a/services-examples/src/main/java/io/scalecube/services/examples/gateway/WebsocketGatewayExample.java b/services-examples/src/main/java/io/scalecube/services/examples/gateway/WebsocketGatewayExample.java index 82b6dcd3f..e067af37f 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/gateway/WebsocketGatewayExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/gateway/WebsocketGatewayExample.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.gateway; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.gateway.Gateway; import io.scalecube.services.gateway.GatewayOptions; import java.net.InetSocketAddress; diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java index 45a94e109..e20249ff9 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.helloworld; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; @@ -46,7 +46,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress))) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java index 43edb1153..7bd9aed60 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.helloworld; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; import io.scalecube.services.api.ServiceMessage; @@ -53,7 +53,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress))) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java index 4a912e1c2..7fc0b7a8a 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.helloworld; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.examples.helloworld.service.BidiGreetingImpl; @@ -47,7 +47,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress))) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new BidiGreetingImpl()) .startAwait(); diff --git a/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java index d4b59c4d2..1d61f73e5 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.services; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; @@ -35,7 +35,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new Service2Impl()) .startAwait(); @@ -47,7 +47,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new Service1Impl()) .startAwait(); diff --git a/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java b/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java index f6dcdd963..670372245 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java @@ -1,6 +1,6 @@ package io.scalecube.services.examples.services; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; @@ -35,7 +35,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new Service2Impl()) .startAwait(); @@ -47,7 +47,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new Service1Impl()) .startAwait(); diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java index e0516599f..d1c9a630b 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java @@ -4,7 +4,7 @@ import io.netty.handler.codec.http.cors.CorsConfig; import io.netty.handler.codec.http.cors.CorsConfigBuilder; import io.netty.handler.codec.http.cors.CorsHandler; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.exceptions.DefaultErrorMapper; import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.gateway.Gateway; diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java index c02acbc58..013a9e910 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway.transport; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.exceptions.DefaultErrorMapper; import io.scalecube.services.exceptions.ServiceClientErrorMapper; import java.time.Duration; diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/StaticAddressRouter.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/StaticAddressRouter.java index 4979ff48c..43ba4654a 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/StaticAddressRouter.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/StaticAddressRouter.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway.transport; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.ServiceEndpoint; import io.scalecube.services.ServiceMethodDefinition; import io.scalecube.services.ServiceReference; diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java index 3363a7691..5f827604b 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway.transport.http; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.netty.buffer.ByteBuf; import io.scalecube.services.api.ServiceMessage; @@ -8,6 +8,7 @@ import io.scalecube.services.gateway.transport.GatewayClient; import io.scalecube.services.gateway.transport.GatewayClientCodec; import io.scalecube.services.gateway.transport.GatewayClientSettings; +import java.time.Duration; import java.util.function.BiFunction; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -87,7 +88,7 @@ private HttpGatewayClient( close .asMono() .then(doClose()) - .doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onClose.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .doOnTerminate(() -> LOGGER.info("Closed HttpGatewayClient resources")) .subscribe(null, ex -> LOGGER.warn("Exception occurred on HttpGatewayClient close: " + ex)); } @@ -128,7 +129,7 @@ public Flux requestChannel(Flux requests) { @Override public void close() { - close.emitEmpty(RETRY_NON_SERIALIZED); + close.emitEmpty(busyLooping(Duration.ofSeconds(3))); } @Override diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java index 8f93d6418..ff6afab2f 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway.transport.websocket; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; @@ -101,7 +101,7 @@ private WebsocketGatewayClient( close .asMono() .then(doClose()) - .doFinally(s -> onClose.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onClose.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .doOnTerminate(() -> LOGGER.info("Closed client")) .subscribe(null, ex -> LOGGER.warn("Failed to close client, cause: " + ex)); } @@ -143,7 +143,7 @@ public Flux requestChannel(Flux requests) { @Override public void close() { - close.emitEmpty(RETRY_NON_SERIALIZED); + close.emitEmpty(busyLooping(Duration.ofSeconds(3))); } @Override diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java index eb0da1dd3..f75a7c9e6 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway.transport.websocket; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; @@ -9,6 +9,7 @@ import io.scalecube.services.gateway.ReferenceCountUtil; import io.scalecube.services.gateway.transport.GatewayClientCodec; import java.nio.channels.ClosedChannelException; +import java.time.Duration; import java.util.Map; import java.util.StringJoiner; import org.jctools.maps.NonBlockingHashMapLong; @@ -187,29 +188,29 @@ private void handleResponse(ServiceMessage response, Object processor) { private static void emitNext(Object processor, ServiceMessage message) { if (processor instanceof One) { //noinspection unchecked - ((One) processor).emitValue(message, RETRY_NON_SERIALIZED); + ((One) processor).emitValue(message, busyLooping(Duration.ofSeconds(3))); } if (processor instanceof Many) { //noinspection unchecked - ((Many) processor).emitNext(message, RETRY_NON_SERIALIZED); + ((Many) processor).emitNext(message, busyLooping(Duration.ofSeconds(3))); } } private static void emitComplete(Object processor) { if (processor instanceof One) { - ((One) processor).emitEmpty(RETRY_NON_SERIALIZED); + ((One) processor).emitEmpty(busyLooping(Duration.ofSeconds(3))); } if (processor instanceof Many) { - ((Many) processor).emitComplete(RETRY_NON_SERIALIZED); + ((Many) processor).emitComplete(busyLooping(Duration.ofSeconds(3))); } } private static void emitError(Object processor, Exception e) { if (processor instanceof One) { - ((One) processor).emitError(e, RETRY_NON_SERIALIZED); + ((One) processor).emitError(e, busyLooping(Duration.ofSeconds(3))); } if (processor instanceof Many) { - ((Many) processor).emitError(e, RETRY_NON_SERIALIZED); + ((Many) processor).emitError(e, busyLooping(Duration.ofSeconds(3))); } } diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/ws/WebsocketGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/ws/WebsocketGateway.java index 6d2b322b5..25e465702 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/ws/WebsocketGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/ws/WebsocketGateway.java @@ -1,7 +1,7 @@ package io.scalecube.services.gateway.ws; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.exceptions.DefaultErrorMapper; import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.gateway.Gateway; diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/AbstractGatewayExtension.java b/services-gateway/src/test/java/io/scalecube/services/gateway/AbstractGatewayExtension.java index 835a0e4af..7d5e3f1ac 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/AbstractGatewayExtension.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/AbstractGatewayExtension.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceEndpoint; @@ -108,7 +108,7 @@ private ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint) { return new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discoveryAddress())); + .membership(opts -> opts.seedMembers(gateway.discoveryAddress().toString())); } public void shutdownServices() { diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/AbstractLocalGatewayExtension.java b/services-gateway/src/test/java/io/scalecube/services/gateway/AbstractLocalGatewayExtension.java index 83c02b3c6..a09d3fbbb 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/AbstractLocalGatewayExtension.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/AbstractLocalGatewayExtension.java @@ -1,6 +1,6 @@ package io.scalecube.services.gateway; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceInfo; diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java index 52c0ec3b2..f100ce12b 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java @@ -1,7 +1,7 @@ package io.scalecube.services.gateway.http; import io.netty.buffer.ByteBuf; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; import io.scalecube.services.annotations.Service; @@ -61,7 +61,8 @@ void beforEach() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discoveryAddress()))) + .membership( + opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) .transport(RSocketServiceTransport::new) .services(new TestServiceImpl()) .startAwait(); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java index 2732ebbc6..a68ca78bd 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java @@ -7,7 +7,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; @@ -80,7 +80,8 @@ void beforEach() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discoveryAddress()))) + .membership( + opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) .transport(RSocketServiceTransport::new) .services(new TestServiceImpl(onCloseCounter::incrementAndGet)) .startAwait(); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java index a6815da56..5309ba7a9 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java @@ -1,7 +1,7 @@ package io.scalecube.services.gateway.websocket; import io.netty.buffer.ByteBuf; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; import io.scalecube.services.annotations.Service; @@ -66,7 +66,8 @@ static void beforeAll() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discoveryAddress()))) + .membership( + opts -> opts.seedMembers(gateway.discoveryAddress().toString()))) .transport(RSocketServiceTransport::new) .services(new TestServiceImpl()) .startAwait(); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java index 27f1699d8..8b0b5bcc4 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java @@ -1,7 +1,7 @@ package io.scalecube.services.gateway.websocket; import io.netty.buffer.ByteBuf; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; import io.scalecube.services.annotations.Service; diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ConnectionSetup.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ConnectionSetup.java index d8e251e9a..fdc614cdc 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ConnectionSetup.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/ConnectionSetup.java @@ -1,6 +1,5 @@ package io.scalecube.services.transport.rsocket; -import io.scalecube.utils.MaskUtil; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -10,7 +9,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.StringJoiner; public final class ConnectionSetup implements Externalizable { @@ -45,13 +43,6 @@ public boolean hasCredentials() { return !credentials.isEmpty(); } - @Override - public String toString() { - return new StringJoiner(", ", ConnectionSetup.class.getSimpleName() + "[", "]") - .add("credentials=" + MaskUtil.mask(credentials)) - .toString(); - } - @Override public void writeExternal(ObjectOutput out) throws IOException { // credentials diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java index 99e97f95d..b97a6847f 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java @@ -8,7 +8,7 @@ import io.rsocket.core.RSocketConnector; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.util.ByteBufPayload; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.ServiceReference; import io.scalecube.services.auth.CredentialsSupplier; import io.scalecube.services.exceptions.MessageCodecException; @@ -18,7 +18,6 @@ import io.scalecube.services.transport.api.ClientTransport; import io.scalecube.services.transport.api.DataCodec; import io.scalecube.services.transport.api.HeadersCodec; -import io.scalecube.utils.MaskUtil; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -86,12 +85,6 @@ private Mono> getCredentials(ServiceReference serviceReferen return credentialsSupplier .apply(serviceReference) .switchIfEmpty(Mono.just(Collections.emptyMap())) - .doOnSuccess( - creds -> - LOGGER.debug( - "[credentialsSupplier] Got credentials ({}) for service: {}", - MaskUtil.mask(creds), - serviceReference)) .doOnError( ex -> LOGGER.error( diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransportFactory.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransportFactory.java index 3bf983bc9..2ee4f5656 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransportFactory.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransportFactory.java @@ -4,7 +4,7 @@ import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import java.util.function.Function; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java index e14bf5350..76f4ed39f 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java @@ -3,7 +3,7 @@ import io.rsocket.core.RSocketServer; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.server.CloseableChannel; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.auth.Authenticator; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.services.transport.api.DataCodec; diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index ce808145e..4ed4a959d 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -1,8 +1,7 @@ package io.scalecube.services; -import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; -import io.scalecube.net.Address; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.auth.Authenticator; import io.scalecube.services.auth.PrincipalMapper; @@ -26,6 +25,7 @@ import io.scalecube.services.transport.api.ServiceTransport; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -155,7 +155,7 @@ private Microservices(Builder builder) { shutdown .asMono() .then(doShutdown()) - .doFinally(s -> onShutdown.emitEmpty(RETRY_NON_SERIALIZED)) + .doFinally(s -> onShutdown.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .subscribe( null, ex -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", id, ex.toString())); } @@ -313,7 +313,7 @@ public Flux listenDiscovery() { public Mono shutdown() { return Mono.defer( () -> { - shutdown.emitEmpty(RETRY_NON_SERIALIZED); + shutdown.emitEmpty(busyLooping(Duration.ofSeconds(3))); return onShutdown.asMono(); }); } @@ -564,7 +564,7 @@ private Mono startListen() { .subscribeOn(scheduler) .publishOn(scheduler) .doOnNext(event -> onDiscoveryEvent(microservices, event)) - .doOnNext(event -> sink.emitNext(event, RETRY_NON_SERIALIZED)) + .doOnNext(event -> sink.emitNext(event, busyLooping(Duration.ofSeconds(3)))) .subscribe()); return Mono.fromRunnable(serviceDiscovery::start) @@ -601,7 +601,7 @@ private void onDiscoveryEvent(Microservices microservices, ServiceDiscoveryEvent public void close() { disposables.dispose(); - sink.emitComplete(RETRY_NON_SERIALIZED); + sink.emitComplete(busyLooping(Duration.ofSeconds(3))); try { if (serviceDiscovery != null) { diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java index f17d67b7d..7189e2174 100644 --- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java +++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java @@ -46,7 +46,7 @@ public static void initNodes() throws InterruptedException { .discovery( endpoint -> new ScalecubeServiceDiscovery() - .membership(cfg -> cfg.seedMembers(provider.discoveryAddress())) + .membership(cfg -> cfg.seedMembers(provider.discoveryAddress().toString())) .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .transport(cfg -> cfg.port(PORT.incrementAndGet())) .options(opts -> opts.metadata(endpoint))) diff --git a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java index 7f295a7cd..3bcae7948 100644 --- a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java @@ -318,8 +318,8 @@ private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint) { .membership( opts -> opts.seedMembers( - service.discoveryAddress(), - serviceWithoutAuthenticator.discoveryAddress(), - partiallySecuredService.discoveryAddress())); + service.discoveryAddress().toString(), + serviceWithoutAuthenticator.discoveryAddress().toString(), + partiallySecuredService.discoveryAddress().toString())); } } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index 668722b7f..55563b1af 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -79,7 +79,7 @@ private static Microservices serviceProvider(Object service) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gateway.discoveryAddress()))) + .membership(cfg -> cfg.seedMembers(gateway.discoveryAddress().toString()))) .transport(RSocketServiceTransport::new) .services(service) .startAwait(); diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java index 9a9bdf283..d4da76f70 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java @@ -8,7 +8,6 @@ import io.scalecube.cluster.codec.jackson.JacksonMetadataCodec; import io.scalecube.cluster.metadata.JdkMetadataCodec; import io.scalecube.cluster.metadata.MetadataCodec; -import io.scalecube.net.Address; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; import io.scalecube.services.discovery.api.ServiceDiscoveryFactory; @@ -217,6 +216,6 @@ private static ServiceDiscoveryFactory defServiceDiscovery( .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) .options(cfg -> cfg.metadataCodec(metadataCodec)) - .membership(cfg -> cfg.seedMembers(address)); + .membership(cfg -> cfg.seedMembers(address.toString())); } } diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index 1b2b02da7..630ac3b52 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -5,7 +5,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST; -import io.scalecube.net.Address; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscovery; @@ -569,6 +568,6 @@ private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint) { return new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress)); + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString())); } } diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index 890927d4c..025053d1a 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -5,7 +5,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import io.scalecube.net.Address; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.sut.QuoteService; @@ -48,7 +47,7 @@ public static void setup() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .defaultDataDecoder(ServiceMessageCodec::decodeData) .services(new SimpleQuoteService()) diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index 2b0fdfa2e..05ed4d079 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -9,7 +9,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.BaseTest; import io.scalecube.services.Microservices; import io.scalecube.services.Reflect; @@ -75,7 +75,7 @@ public static void setup() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(1)) @@ -95,7 +95,7 @@ public static void setup() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImpl(2)) @@ -115,7 +115,7 @@ public static void setup() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(tagService) diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java index 9334bacb5..d11f07e7a 100644 --- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java +++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java @@ -1,6 +1,6 @@ package io.scalecube.services.routings; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceInfo; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; @@ -40,7 +40,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress))) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplA()) @@ -55,7 +55,7 @@ public static void main(String[] args) { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(seedAddress))) + .membership(cfg -> cfg.seedMembers(seedAddress.toString()))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new GreetingServiceImplB()) diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java index 6786d258a..71aee1a36 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java @@ -2,7 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.BaseTest; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; @@ -46,7 +46,7 @@ public void setUp() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new Facade()) .startAwait(); @@ -60,7 +60,7 @@ public void setUp() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(facadeAddress))) + .membership(cfg -> cfg.seedMembers(facadeAddress.toString()))) .transport(RSocketServiceTransport::new) .services((PingService) () -> Mono.just(Thread.currentThread().getName())) .startAwait(); @@ -72,7 +72,7 @@ public void setUp() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(endpoint)) - .membership(cfg -> cfg.seedMembers(facadeAddress))) + .membership(cfg -> cfg.seedMembers(facadeAddress.toString()))) .transport(RSocketServiceTransport::new) .services((PongService) () -> Mono.just(Thread.currentThread().getName())) .startAwait(); diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index 6838a4595..37fa6bc5f 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -4,7 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import io.scalecube.net.Address; +import io.scalecube.services.Address; import io.scalecube.services.BaseTest; import io.scalecube.services.Microservices; import io.scalecube.services.ServiceCall; @@ -60,7 +60,7 @@ public void setUp() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(cfg -> cfg.seedMembers(gatewayAddress))) + .membership(cfg -> cfg.seedMembers(gatewayAddress.toString()))) .transport(RSocketServiceTransport::new) .services(new SimpleQuoteService()) .startAwait();