Skip to content

Commit

Permalink
Fix concurrency issue in client that creates a deadlock (#25)
Browse files Browse the repository at this point in the history
Signed-off-by: Sven Trieflinger <[email protected]>
  • Loading branch information
strieflin authored May 9, 2022
1 parent c43d17c commit 04baa0a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.carbynestack.httpclient.CsHttpClient;
import io.carbynestack.httpclient.CsHttpClientException;
import io.carbynestack.httpclient.CsResponseEntity;
import io.vavr.concurrent.Future;
import io.vavr.control.Either;
import io.vavr.control.Option;
import java.io.File;
Expand All @@ -25,15 +24,6 @@
* A client for the Carbyne Stack Ephemeral service.
*
* <p>The client interacts with a single ephemeral backend service in order to execute MPC programs.
*
* <p>The methods of this client are all asynchronous and return a {@link Future}. This future
* completes normally with either the requested domain object or a http status code.
*
* <p>In case a low-level error occurs on the network or representation layer the future terminates
* exceptionally.
*
* <p>The public API of this class is defensive, i.e., arguments are checked for validity and an
* {@link IllegalArgumentException} is thrown in case of a contract violation.
*/
@Slf4j
public class EphemeralClient {
Expand Down Expand Up @@ -76,27 +66,25 @@ private EphemeralClient(
* Triggers a program execution.
*
* @param activation {@link Activation} configuration used as input for the program activation.
* @return A future completing normally either with a list of Amphora secret identifiers or an
* http error code if the execution failed on server side and exceptionally in case an error
* occurs on the network or representation layer
* @return Either an {@link ActivationResult} containing a list of identifiers of Amphora secrets
* generated by the execution or an http error code if the execution failed on server side.
* @throws CsHttpClientException In case an error occurs on the network or representation layer
*/
public Future<Either<ActivationError, ActivationResult>> execute(@NonNull Activation activation) {
return Future.of(
() -> {
CsResponseEntity<String, ActivationResult> responseEntity =
csHttpClient.postForEntity(
endpoint.getActivationUri(
activation.getCode() != null && !activation.getCode().isEmpty()),
bearerToken.map(BearerTokenUtils::createBearerToken).collect(Collectors.toList()),
activation,
ActivationResult.class);
return responseEntity
.getContent()
.mapLeft(
l ->
new ActivationError()
.setMessage(l)
.setResponseCode(responseEntity.getHttpStatus()));
});
public Either<ActivationError, ActivationResult> execute(@NonNull Activation activation)
throws CsHttpClientException {
CsResponseEntity<String, ActivationResult> responseEntity =
csHttpClient.postForEntity(
endpoint.getActivationUri(
activation.getCode() != null && !activation.getCode().isEmpty()),
bearerToken.map(BearerTokenUtils::createBearerToken).collect(Collectors.toList()),
activation,
ActivationResult.class);
return responseEntity
.getContent()
.mapLeft(
l ->
new ActivationError()
.setMessage(l)
.setResponseCode(responseEntity.getHttpStatus()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -39,9 +41,11 @@
public class EphemeralMultiClient {

private final List<EphemeralClient> clients;
private final Executor executor;

EphemeralMultiClient(final List<EphemeralClient> clients) {
this.clients = clients;
this.executor = Executors.newFixedThreadPool(clients.size());
}

/**
Expand Down Expand Up @@ -110,7 +114,7 @@ public Future<Either<ActivationError, List<ActivationResult>>> execute(
clients.get(t._2).getEndpoint(),
activation);
}
return t._1.execute(activation);
return Future.of(executor, () -> t._1.execute(activation));
});
return Future.sequence(invocations)
.andThen(a -> a.forEach(e -> log.debug("Results for game {} are {}", gameId, e.asJava())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
*/
package io.carbynestack.ephemeral.client;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
Expand All @@ -17,7 +19,6 @@
import io.carbynestack.httpclient.CsHttpClient;
import io.carbynestack.httpclient.CsHttpClientException;
import io.carbynestack.httpclient.CsResponseEntity;
import io.vavr.concurrent.Future;
import io.vavr.control.Either;
import io.vavr.control.Option;
import io.vavr.control.Try;
Expand Down Expand Up @@ -66,9 +67,8 @@ public void givenSuccessful_whenExecuteProgram_thenReturnResult() throws CsHttpC
activation,
ActivationResult.class))
.thenReturn(CsResponseEntity.success(200, result));
Future<Either<ActivationError, ActivationResult>> eitherFuture = client.execute(activation);
eitherFuture.await();
assertThat(eitherFuture.get().get(), equalTo(result));
Either<ActivationError, ActivationResult> response = client.execute(activation);
assertThat(response.get(), equalTo(result));
}

@Test
Expand All @@ -83,11 +83,9 @@ public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailur
activation,
ActivationResult.class))
.thenReturn(CsResponseEntity.failed(httpFailureCode, errMessage));
Future<Either<ActivationError, ActivationResult>> eitherFuture =
client.execute(Activation.builder().build());
eitherFuture.await();
assertThat(eitherFuture.get().getLeft().responseCode, equalTo(httpFailureCode));
assertThat(eitherFuture.get().getLeft().message, equalTo(errMessage));
Either<ActivationError, ActivationResult> result = client.execute(Activation.builder().build());
assertThat(result.getLeft().responseCode, equalTo(httpFailureCode));
assertThat(result.getLeft().message, equalTo(errMessage));
}

@Test
Expand All @@ -102,9 +100,7 @@ public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailur
ArgumentCaptor<List<Header>> headersCaptor = ArgumentCaptor.forClass(List.class);
when(specsHttpClientMock.postForEntity(any(), headersCaptor.capture(), any(), any()))
.thenReturn(CsResponseEntity.success(200, result));
Future<Either<ActivationError, ActivationResult>> eitherFuture =
clientWithToken.execute(activation);
eitherFuture.await();
clientWithToken.execute(activation);
List<Header> headers = headersCaptor.getValue();
assertThat("No header has been supplied", headers.size(), is(1));
Header header = headers.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.*;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.verifyNew;
import static org.powermock.api.mockito.PowerMockito.whenNew;

import io.carbynestack.httpclient.CsHttpClientException;
import io.vavr.concurrent.Future;
import io.vavr.control.Either;
import io.vavr.control.Option;
Expand All @@ -23,12 +32,19 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

Expand Down Expand Up @@ -68,12 +84,10 @@ public void givenServiceUrlsIsNull_whenCreateClient_thenThrowException() {
}

@Test
public void givenSuccessful_whenExecuteProgram_thenReturnResult() {
public void givenSuccessful_whenExecuteProgram_thenReturnResult() throws CsHttpClientException {
ActivationResult result = new ActivationResult(Collections.singletonList(UUID.randomUUID()));
when(client1Mock.execute(any(Activation.class)))
.thenReturn(Future.successful(Either.right(result)));
when(client2Mock.execute(any(Activation.class)))
.thenReturn(Future.successful(Either.right(result)));
when(client1Mock.execute(any(Activation.class))).thenReturn(Either.right(result));
when(client2Mock.execute(any(Activation.class))).thenReturn(Either.right(result));
Future<Either<ActivationError, List<ActivationResult>>> results =
client.execute(activation.getCode(), inputObjects);
results.await();
Expand All @@ -87,15 +101,14 @@ public void givenSuccessful_whenExecuteProgram_thenReturnResult() {
}

@Test
public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailureCode() {
public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailureCode()
throws CsHttpClientException {
int httpFailureCode = 404;
String errMessage = "an unexpected error";
ActivationError activationError =
new ActivationError().setMessage(errMessage).setResponseCode(httpFailureCode);
when(client1Mock.execute(any(Activation.class)))
.thenReturn(Future.successful(Either.left(activationError)));
when(client2Mock.execute(any(Activation.class)))
.thenReturn(Future.successful(Either.left(activationError)));
when(client1Mock.execute(any(Activation.class))).thenReturn(Either.left(activationError));
when(client2Mock.execute(any(Activation.class))).thenReturn(Either.left(activationError));
Future<Either<ActivationError, List<ActivationResult>>> results =
client.execute(activation.getCode(), inputObjects);
results.await();
Expand All @@ -106,10 +119,11 @@ public void givenServiceRespondsUnsuccessful_whenExecuteProgram_thenReturnFailur
}

@Test
public void givenServiceCommunicationFails_whenExecuteProgram_thenFutureFails() {
Exception failure = new Exception("an unexpected error");
when(client1Mock.execute(any(Activation.class))).thenReturn(Future.failed(failure));
when(client2Mock.execute(any(Activation.class))).thenReturn(Future.failed(failure));
public void givenServiceCommunicationFails_whenExecuteProgram_thenCallFails()
throws CsHttpClientException {
CsHttpClientException failure = new CsHttpClientException("error");
when(client1Mock.execute(any(Activation.class))).thenThrow(failure);
when(client2Mock.execute(any(Activation.class))).thenThrow(failure);
Future<Either<ActivationError, List<ActivationResult>>> results =
client.execute(activation.getCode(), inputObjects);
results.await();
Expand All @@ -131,4 +145,33 @@ public void givenBearerTokenProvider_whenCreateClient_thenCorrectBearerTokensAre
verifyNew(EphemeralClient.class, times(2))
.withArguments(any(), any(), any(), eq(Option.some(token)), any());
}

@Test
public void givenMoreBackendsThanCores_whenExecuteProgram_thenReturnResult()
throws CsHttpClientException {
// This is the number of threads created by the thread pool backing the Vavr Future
// implementation
int cores = ForkJoinPool.commonPool().getParallelism();
int vcps = 2 * cores;
List<EphemeralClient> clientMocks =
IntStream.range(0, vcps)
.mapToObj(i -> Mockito.mock(EphemeralClient.class))
.collect(Collectors.toList());
CyclicBarrier barrier = new CyclicBarrier(vcps);
ActivationResult result = new ActivationResult(Collections.singletonList(UUID.randomUUID()));
for (EphemeralClient c : clientMocks) {
doAnswer(
(Answer<Either<ActivationError, ActivationResult>>)
invocation -> {
barrier.await();
return Either.right(result);
})
.when(c)
.execute(any(Activation.class));
}
EphemeralMultiClient client = new EphemeralMultiClient(clientMocks);
Future<Either<ActivationError, List<ActivationResult>>> results =
client.execute(activation.getCode(), inputObjects);
assertThat("activation timed out", results.await(5, TimeUnit.SECONDS).isSuccess());
}
}

0 comments on commit 04baa0a

Please sign in to comment.