From 8a77fa266fe9e61e63abe98d554879b577aa11ad Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Sat, 5 Oct 2024 18:34:21 +0300 Subject: [PATCH] Enhanced test to run faster --- .../services/gateway/http/HttpGateway.java | 17 +++++------------ .../gateway/websocket/WebsocketGateway.java | 15 ++++----------- .../rsocket/RSocketServiceTransport.java | 18 +++--------------- .../services/ServiceCallLocalTest.java | 2 +- .../services/ServiceCallRemoteTest.java | 2 +- .../scalecube/services/ServiceLocalTest.java | 6 +++--- .../scalecube/services/ServiceRemoteTest.java | 5 +---- .../services/StreamingServiceTest.java | 10 ++++------ .../services/routings/RoutersTest.java | 9 ++------- .../services/sut/SimpleQuoteService.java | 4 ++-- .../rsocket/RSocketServiceTransportTest.java | 6 ------ 11 files changed, 26 insertions(+), 68 deletions(-) diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java index b992ee153..d7c196da3 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java @@ -41,7 +41,9 @@ public String id() { public Gateway start() { HttpGatewayAcceptor gatewayAcceptor = new HttpGatewayAcceptor(options.call(), errorMapper); - loopResources = LoopResources.create(options.id() + ":" + options.port()); + loopResources = + LoopResources.create( + options.id() + ":" + options.port(), LoopResources.DEFAULT_IO_WORKER_COUNT, true); try { prepareHttpServer(loopResources, options.port()) @@ -89,21 +91,12 @@ public void stop() { private void shutdownServer(DisposableServer server) { if (server != null) { server.dispose(); - try { - server.onDispose().toFuture().get(); - } catch (Exception e) { - // TODO: log it - } } } private void shutdownLoopResources(LoopResources loopResources) { if (loopResources != null) { - try { - loopResources.disposeLater().toFuture().get(); - } catch (Exception e) { - // TODO: log it - } + loopResources.dispose(); } } @@ -169,7 +162,7 @@ public Builder corsConfigBuilder(CorsConfigBuilder corsConfigBuilder) { } public Builder corsConfigBuilder(Consumer consumer) { - consumer.accept(this.corsConfigBuilder); + consumer.accept(corsConfigBuilder); return this; } diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java index 8a226c5cb..07e449dfa 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/websocket/WebsocketGateway.java @@ -47,7 +47,9 @@ public Gateway start() { WebsocketGatewayAcceptor gatewayAcceptor = new WebsocketGatewayAcceptor(options.call(), gatewayHandler, errorMapper); - loopResources = LoopResources.create(options.id() + ":" + options.port()); + loopResources = + LoopResources.create( + options.id() + ":" + options.port(), LoopResources.DEFAULT_IO_WORKER_COUNT, true); try { prepareHttpServer(loopResources, options.port()) @@ -91,21 +93,12 @@ public void stop() { private void shutdownServer(DisposableServer server) { if (server != null) { server.dispose(); - try { - server.onDispose().toFuture().get(); - } catch (Exception e) { - // TODO: log it - } } } private void shutdownLoopResources(LoopResources loopResources) { if (loopResources != null) { - try { - loopResources.disposeLater().toFuture().get(); - } catch (Exception e) { - // TODO: log it - } + loopResources.dispose(); } } diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java index a1aac61ef..04f01e9b1 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java @@ -5,7 +5,6 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.Future; import io.scalecube.services.auth.Authenticator; import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.registry.api.ServiceRegistry; @@ -17,13 +16,11 @@ import java.util.Collection; import java.util.StringJoiner; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; -import reactor.core.publisher.Mono; -import reactor.netty.FutureMono; import reactor.netty.channel.AbortedException; import reactor.netty.resources.LoopResources; @@ -214,17 +211,8 @@ public ServiceTransport start() { @Override public void stop() { - try { - //noinspection unchecked,rawtypes - Flux.concatDelayError( - Mono.defer(() -> serverLoopResources.disposeLater()), - Mono.defer(() -> FutureMono.from((Future) eventLoopGroup.shutdownGracefully()))) - .then() - .toFuture() - .get(); - } catch (Exception e) { - throw new RuntimeException(e); - } + serverLoopResources.dispose(); + eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } private EventLoopGroup newEventLoopGroup() { diff --git a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java index d1ccf9255..e836e8f07 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java @@ -148,7 +148,7 @@ public void test_local_greeting_request_timeout_expires() { Publisher future = service.requestOne(GREETING_REQUEST_TIMEOUT_REQ); Throwable exception = - assertThrows(RuntimeException.class, () -> Mono.from(future).block(Duration.ofSeconds(1))); + assertThrows(RuntimeException.class, () -> Mono.from(future).block(Duration.ofMillis(500))); assertTrue(exception.getMessage().contains("Timeout on blocking read")); } diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index 1fc679b19..e168c82a4 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -189,7 +189,7 @@ public void test_remote_greeting_request_timeout_expires() { // call the service. Publisher future = service.requestOne(GREETING_REQUEST_TIMEOUT_REQ); Throwable exception = - assertThrows(RuntimeException.class, () -> Mono.from(future).block(Duration.ofSeconds(1))); + assertThrows(RuntimeException.class, () -> Mono.from(future).block(Duration.ofMillis(500))); assertTrue(exception.getMessage().contains("Timeout on blocking read")); } diff --git a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java index eb3cd4bbe..d20ccf96a 100644 --- a/services/src/test/java/io/scalecube/services/ServiceLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceLocalTest.java @@ -46,8 +46,8 @@ public void test_local_greeting_request_completes_before_timeout() { // call the service. GreetingResponse result = service - .greetingRequestTimeout(new GreetingRequest("joe", timeout)) - .block(timeout.plusSeconds(1)); + .greetingRequestTimeout(new GreetingRequest("joe", Duration.ofMillis(500))) + .block(timeout.plusMillis(500)); // print the greeting. System.out.println("2. greeting_request_completes_before_timeout : " + result.getResult()); @@ -156,7 +156,7 @@ public void test_local_greeting_request_timeout_expires() { RuntimeException.class, () -> Mono.from(service.greetingRequestTimeout(new GreetingRequest("joe", timeout))) - .timeout(Duration.ofSeconds(1)) + .timeout(Duration.ofMillis(500)) .block()); assertTrue( exception.getCause().getMessage().contains("Did not observe any item or terminal signal")); diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index e32915728..ca2433924 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -88,7 +88,7 @@ private static Microservices serviceProvider() { @Test public void test_remote_greeting_request_completes_before_timeout() { - Duration duration = Duration.ofSeconds(1); + Duration duration = Duration.ofMillis(500); GreetingService service = gateway.call().api(GreetingService.class); @@ -107,8 +107,6 @@ public void test_remote_void_greeting() throws Exception { service.greetingVoid(new GreetingRequest("joe")).block(Duration.ofSeconds(3)); System.out.println("test_remote_void_greeting done."); - - Thread.sleep(1000); } @Test @@ -333,7 +331,6 @@ public void test_remote_serviceA_calls_serviceB_with_timeout() { InternalServiceException.class, () -> Mono.from(service.callGreetingTimeout("joe")).block()); assertTrue(exception.getMessage().contains("Did not observe any item or terminal signal")); - System.out.println("done"); ms.close(); } diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index 87c8c4673..d6e7774eb 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -78,10 +78,9 @@ public void test_local_quotes_service() { QuoteService service = node.call().api(QuoteService.class); - int expected = 3; - List list = service.quotes().take(Duration.ofMillis(3500)).collectList().block(); + List list = service.quotes().take(Duration.ofMillis(500)).collectList().block(); - assertEquals(expected, list.size()); + assertTrue(list.size() > 1, "list.size"); } @Test @@ -160,11 +159,10 @@ public void test_scheduled_messages() { ServiceMessage scheduled = ServiceMessage.builder().qualifier(QuoteService.NAME, "scheduled").data(1000).build(); - int expected = 3; List list = - serviceCall.requestMany(scheduled).take(Duration.ofSeconds(4)).collectList().block(); + serviceCall.requestMany(scheduled).take(Duration.ofMillis(500)).collectList().block(); - assertEquals(expected, list.size()); + assertTrue(list.size() > 1, "list.size"); } @Test diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index 073ce51c8..17591573f 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -36,7 +36,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -163,7 +162,7 @@ public void test_round_robin() { } @Test - public void test_remote_service_tags() throws Exception { + public void test_remote_service_tags() { CanaryService service = gateway @@ -171,8 +170,6 @@ public void test_remote_service_tags() throws Exception { .router(Routers.getRouter(WeightedRandomRouter.class)) .api(CanaryService.class); - Thread.sleep(1000); - AtomicInteger serviceBCount = new AtomicInteger(0); int n = (int) 1e3; @@ -262,9 +259,7 @@ public void test_tag_request_selection_logic() { } @Test - public void test_service_tags() throws Exception { - - TimeUnit.SECONDS.sleep(3); + public void test_service_tags() { ServiceCall service = gateway.call().router(WeightedRandomRouter.class); ServiceMessage req = diff --git a/services/src/test/java/io/scalecube/services/sut/SimpleQuoteService.java b/services/src/test/java/io/scalecube/services/sut/SimpleQuoteService.java index 7c418f801..ed4fa33b9 100644 --- a/services/src/test/java/io/scalecube/services/sut/SimpleQuoteService.java +++ b/services/src/test/java/io/scalecube/services/sut/SimpleQuoteService.java @@ -19,12 +19,12 @@ public Mono justOne() { @Override public Flux scheduled(int interval) { - return Flux.interval(Duration.ofSeconds(1)).map(s -> "quote : " + counter.incrementAndGet()); + return Flux.interval(Duration.ofMillis(100)).map(s -> "quote : " + counter.incrementAndGet()); } @Override public Flux quotes() { - return Flux.interval(Duration.ofSeconds(1)).map(s -> "quote : " + counter.incrementAndGet()); + return Flux.interval(Duration.ofMillis(100)).map(s -> "quote : " + counter.incrementAndGet()); } @Override diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index 8c2419212..a80249361 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -122,15 +122,12 @@ public void test_remote_node_died_many_never() throws Exception { .subscribe(onNext -> latch.countDown(), System.err::println); // service node goes down - TimeUnit.SECONDS.sleep(3); serviceNode.close(); if (!latch.await(20, TimeUnit.SECONDS)) { fail("latch.await"); } - TimeUnit.MILLISECONDS.sleep(1000); - assertEquals(0, latch.getCount()); assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); @@ -157,15 +154,12 @@ public void test_remote_node_died_many_then_never() throws Exception { .subscribe(onNext -> latch.countDown(), System.err::println); // service node goes down - TimeUnit.SECONDS.sleep(3); serviceNode.close(); if (!latch.await(20, TimeUnit.SECONDS)) { fail("latch.await"); } - TimeUnit.MILLISECONDS.sleep(1000); - assertEquals(0, latch.getCount()); assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed());