diff --git a/ephemeral-java-client/src/main/java/io/carbynestack/ephemeral/client/EphemeralClient.java b/ephemeral-java-client/src/main/java/io/carbynestack/ephemeral/client/EphemeralClient.java index 38fc2a26..1c7077da 100644 --- a/ephemeral-java-client/src/main/java/io/carbynestack/ephemeral/client/EphemeralClient.java +++ b/ephemeral-java-client/src/main/java/io/carbynestack/ephemeral/client/EphemeralClient.java @@ -10,7 +10,6 @@ import io.carbynestack.httpclient.CsHttpClient; import io.carbynestack.httpclient.CsHttpClientException; import io.carbynestack.httpclient.CsResponseEntity; -import io.vavr.concurrent.Future; import io.vavr.control.Either; import io.vavr.control.Option; import java.io.File; @@ -25,15 +24,6 @@ * A client for the Carbyne Stack Ephemeral service. * *

The client interacts with a single ephemeral backend service in order to execute MPC programs. - * - *

The methods of this client are all asynchronous and return a {@link Future}. This future - * completes normally with either the requested domain object or a http status code. - * - *

In case a low-level error occurs on the network or representation layer the future terminates - * exceptionally. - * - *

The public API of this class is defensive, i.e., arguments are checked for validity and an - * {@link IllegalArgumentException} is thrown in case of a contract violation. */ @Slf4j public class EphemeralClient { @@ -76,27 +66,25 @@ private EphemeralClient( * Triggers a program execution. * * @param activation {@link Activation} configuration used as input for the program activation. - * @return A future completing normally either with a list of Amphora secret identifiers or an - * http error code if the execution failed on server side and exceptionally in case an error - * occurs on the network or representation layer + * @return Either an {@link ActivationResult} containing a list of identifiers of Amphora secrets + * generated by the execution or an http error code if the execution failed on server side. + * @throws CsHttpClientException In case an error occurs on the network or representation layer */ - public Future> execute(@NonNull Activation activation) { - return Future.of( - () -> { - CsResponseEntity responseEntity = - csHttpClient.postForEntity( - endpoint.getActivationUri( - activation.getCode() != null && !activation.getCode().isEmpty()), - bearerToken.map(BearerTokenUtils::createBearerToken).collect(Collectors.toList()), - activation, - ActivationResult.class); - return responseEntity - .getContent() - .mapLeft( - l -> - new ActivationError() - .setMessage(l) - .setResponseCode(responseEntity.getHttpStatus())); - }); + public Either execute(@NonNull Activation activation) + throws CsHttpClientException { + CsResponseEntity responseEntity = + csHttpClient.postForEntity( + endpoint.getActivationUri( + activation.getCode() != null && !activation.getCode().isEmpty()), + bearerToken.map(BearerTokenUtils::createBearerToken).collect(Collectors.toList()), + activation, + ActivationResult.class); + return responseEntity + .getContent() + .mapLeft( + l -> + new ActivationError() + .setMessage(l) + .setResponseCode(responseEntity.getHttpStatus())); } } diff --git a/ephemeral-java-client/src/main/java/io/carbynestack/ephemeral/client/EphemeralMultiClient.java b/ephemeral-java-client/src/main/java/io/carbynestack/ephemeral/client/EphemeralMultiClient.java index 606e5244..c0862a5e 100644 --- a/ephemeral-java-client/src/main/java/io/carbynestack/ephemeral/client/EphemeralMultiClient.java +++ b/ephemeral-java-client/src/main/java/io/carbynestack/ephemeral/client/EphemeralMultiClient.java @@ -18,6 +18,8 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -39,9 +41,11 @@ public class EphemeralMultiClient { private final List clients; + private final Executor executor; EphemeralMultiClient(final List clients) { this.clients = clients; + this.executor = Executors.newFixedThreadPool(clients.size()); } /** @@ -110,7 +114,7 @@ public Future>> execute( clients.get(t._2).getEndpoint(), activation); } - return t._1.execute(activation); + return Future.of(executor, () -> t._1.execute(activation)); }); return Future.sequence(invocations) .andThen(a -> a.forEach(e -> log.debug("Results for game {} are {}", gameId, e.asJava()))) diff --git a/ephemeral-java-client/src/test/java/io/carbynestack/ephemeral/client/EphemeralClientTest.java b/ephemeral-java-client/src/test/java/io/carbynestack/ephemeral/client/EphemeralClientTest.java index 36717e7c..3d527b0f 100644 --- a/ephemeral-java-client/src/test/java/io/carbynestack/ephemeral/client/EphemeralClientTest.java +++ b/ephemeral-java-client/src/test/java/io/carbynestack/ephemeral/client/EphemeralClientTest.java @@ -6,7 +6,9 @@ */ package io.carbynestack.ephemeral.client; -import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -17,7 +19,6 @@ import io.carbynestack.httpclient.CsHttpClient; import io.carbynestack.httpclient.CsHttpClientException; import io.carbynestack.httpclient.CsResponseEntity; -import io.vavr.concurrent.Future; import io.vavr.control.Either; import io.vavr.control.Option; import io.vavr.control.Try; @@ -66,9 +67,8 @@ public void givenSuccessful_whenExecuteProgram_thenReturnResult() throws CsHttpC activation, ActivationResult.class)) .thenReturn(CsResponseEntity.success(200, result)); - Future> eitherFuture = client.execute(activation); - eitherFuture.await(); - assertThat(eitherFuture.get().get(), equalTo(result)); + Either response = client.execute(activation); + assertThat(response.get(), equalTo(result)); } @Test @@ -83,11 +83,9 @@ public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailur activation, ActivationResult.class)) .thenReturn(CsResponseEntity.failed(httpFailureCode, errMessage)); - Future> eitherFuture = - client.execute(Activation.builder().build()); - eitherFuture.await(); - assertThat(eitherFuture.get().getLeft().responseCode, equalTo(httpFailureCode)); - assertThat(eitherFuture.get().getLeft().message, equalTo(errMessage)); + Either result = client.execute(Activation.builder().build()); + assertThat(result.getLeft().responseCode, equalTo(httpFailureCode)); + assertThat(result.getLeft().message, equalTo(errMessage)); } @Test @@ -102,9 +100,7 @@ public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailur ArgumentCaptor> headersCaptor = ArgumentCaptor.forClass(List.class); when(specsHttpClientMock.postForEntity(any(), headersCaptor.capture(), any(), any())) .thenReturn(CsResponseEntity.success(200, result)); - Future> eitherFuture = - clientWithToken.execute(activation); - eitherFuture.await(); + clientWithToken.execute(activation); List

headers = headersCaptor.getValue(); assertThat("No header has been supplied", headers.size(), is(1)); Header header = headers.get(0); diff --git a/ephemeral-java-client/src/test/java/io/carbynestack/ephemeral/client/EphemeralMultiClientTest.java b/ephemeral-java-client/src/test/java/io/carbynestack/ephemeral/client/EphemeralMultiClientTest.java index bd953192..90fe5197 100644 --- a/ephemeral-java-client/src/test/java/io/carbynestack/ephemeral/client/EphemeralMultiClientTest.java +++ b/ephemeral-java-client/src/test/java/io/carbynestack/ephemeral/client/EphemeralMultiClientTest.java @@ -9,12 +9,21 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.verifyNew; import static org.powermock.api.mockito.PowerMockito.whenNew; +import io.carbynestack.httpclient.CsHttpClientException; import io.vavr.concurrent.Future; import io.vavr.control.Either; import io.vavr.control.Option; @@ -23,12 +32,19 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -68,12 +84,10 @@ public void givenServiceUrlsIsNull_whenCreateClient_thenThrowException() { } @Test - public void givenSuccessful_whenExecuteProgram_thenReturnResult() { + public void givenSuccessful_whenExecuteProgram_thenReturnResult() throws CsHttpClientException { ActivationResult result = new ActivationResult(Collections.singletonList(UUID.randomUUID())); - when(client1Mock.execute(any(Activation.class))) - .thenReturn(Future.successful(Either.right(result))); - when(client2Mock.execute(any(Activation.class))) - .thenReturn(Future.successful(Either.right(result))); + when(client1Mock.execute(any(Activation.class))).thenReturn(Either.right(result)); + when(client2Mock.execute(any(Activation.class))).thenReturn(Either.right(result)); Future>> results = client.execute(activation.getCode(), inputObjects); results.await(); @@ -87,15 +101,14 @@ public void givenSuccessful_whenExecuteProgram_thenReturnResult() { } @Test - public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailureCode() { + public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailureCode() + throws CsHttpClientException { int httpFailureCode = 404; String errMessage = "an unexpected error"; ActivationError activationError = new ActivationError().setMessage(errMessage).setResponseCode(httpFailureCode); - when(client1Mock.execute(any(Activation.class))) - .thenReturn(Future.successful(Either.left(activationError))); - when(client2Mock.execute(any(Activation.class))) - .thenReturn(Future.successful(Either.left(activationError))); + when(client1Mock.execute(any(Activation.class))).thenReturn(Either.left(activationError)); + when(client2Mock.execute(any(Activation.class))).thenReturn(Either.left(activationError)); Future>> results = client.execute(activation.getCode(), inputObjects); results.await(); @@ -106,10 +119,11 @@ public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailur } @Test - public void givenServiceCommunicationFails_whenExecuteProgram_thenFutureFails() { - Exception failure = new Exception("an unexpected error"); - when(client1Mock.execute(any(Activation.class))).thenReturn(Future.failed(failure)); - when(client2Mock.execute(any(Activation.class))).thenReturn(Future.failed(failure)); + public void givenServiceCommunicationFails_whenExecuteProgram_thenCallFails() + throws CsHttpClientException { + CsHttpClientException failure = new CsHttpClientException("error"); + when(client1Mock.execute(any(Activation.class))).thenThrow(failure); + when(client2Mock.execute(any(Activation.class))).thenThrow(failure); Future>> results = client.execute(activation.getCode(), inputObjects); results.await(); @@ -131,4 +145,33 @@ public void givenBearerTokenProvider_whenCreateClient_thenCorrectBearerTokensAre verifyNew(EphemeralClient.class, times(2)) .withArguments(any(), any(), any(), eq(Option.some(token)), any()); } + + @Test + public void givenMoreBackendsThanCores_whenExecuteProgram_thenReturnResult() + throws CsHttpClientException { + // This is the number of threads created by the thread pool backing the Vavr Future + // implementation + int cores = ForkJoinPool.commonPool().getParallelism(); + int vcps = 2 * cores; + List clientMocks = + IntStream.range(0, vcps) + .mapToObj(i -> Mockito.mock(EphemeralClient.class)) + .collect(Collectors.toList()); + CyclicBarrier barrier = new CyclicBarrier(vcps); + ActivationResult result = new ActivationResult(Collections.singletonList(UUID.randomUUID())); + for (EphemeralClient c : clientMocks) { + doAnswer( + (Answer>) + invocation -> { + barrier.await(); + return Either.right(result); + }) + .when(c) + .execute(any(Activation.class)); + } + EphemeralMultiClient client = new EphemeralMultiClient(clientMocks); + Future>> results = + client.execute(activation.getCode(), inputObjects); + assertThat("activation timed out", results.await(5, TimeUnit.SECONDS).isSuccess()); + } }