Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

match retry policy with go etcd client #1393

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public CompletableFuture<CampaignResponse> campaign(ByteSequence electionName, l
execute(
() -> stubWithLeader().campaign(request),
CampaignResponse::new,
Errors::isRetryable));
Errors::isSafeRetryMutableRPC));
}

@Override
Expand All @@ -111,7 +111,7 @@ public CompletableFuture<ProclaimResponse> proclaim(LeaderKey leaderKey, ByteSeq
execute(
() -> stubWithLeader().proclaim(request),
ProclaimResponse::new,
Errors::isRetryable));
Errors::isSafeRetryMutableRPC));
}

@Override
Expand All @@ -126,7 +126,7 @@ public CompletableFuture<LeaderResponse> leader(ByteSequence electionName) {
execute(
() -> stubWithLeader().leader(request),
response -> new LeaderResponse(response, namespace),
Errors::isRetryable));
Errors::isSafeRetryImmutableRPC));
}

@Override
Expand Down Expand Up @@ -162,7 +162,7 @@ public CompletableFuture<ResignResponse> resign(LeaderKey leaderKey) {
execute(
() -> stubWithLeader().resign(request),
ResignResponse::new,
Errors::isRetryable));
Errors::isSafeRetryMutableRPC));
}

private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) {
Expand Down
4 changes: 2 additions & 2 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/Impl.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ protected <S> CompletableFuture<S> completable(
protected <S, T> CompletableFuture<T> execute(
Supplier<Future<S>> supplier,
Function<S, T> resultConvert) {

return execute(supplier, resultConvert, Errors::isRetryable);
// TODO: in go etcd client lease operations are 'repeatable'
return execute(supplier, resultConvert, Errors::isSafeRetryMutableRPC);
}

/**
Expand Down
10 changes: 5 additions & 5 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/KVImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public CompletableFuture<PutResponse> put(ByteSequence key, ByteSequence value,
return execute(
() -> stub.put(Requests.mapPutRequest(key, value, option, namespace)),
response -> new PutResponse(response, namespace),
Errors::isRetryable);
Errors::isSafeRetryMutableRPC);
}

@Override
Expand All @@ -81,7 +81,7 @@ public CompletableFuture<GetResponse> get(ByteSequence key, GetOption option) {
return execute(
() -> stub.range(Requests.mapRangeRequest(key, option, namespace)),
response -> new GetResponse(response, namespace),
Errors::isRetryable);
Errors::isSafeRetryImmutableRPC);
}

@Override
Expand All @@ -97,7 +97,7 @@ public CompletableFuture<DeleteResponse> delete(ByteSequence key, DeleteOption o
return execute(
() -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)),
response -> new DeleteResponse(response, namespace),
Errors::isRetryable);
Errors::isSafeRetryMutableRPC);
}

@Override
Expand All @@ -116,15 +116,15 @@ public CompletableFuture<CompactResponse> compact(long rev, CompactOption option
return execute(
() -> stub.compact(request),
CompactResponse::new,
Errors::isRetryable);
Errors::isSafeRetryMutableRPC);
}

@Override
public Txn txn() {
return TxnImpl.newTxn(
request -> execute(
() -> stub.txn(request),
response -> new TxnResponse(response, namespace), Errors::isRetryable),
response -> new TxnResponse(response, namespace), Errors::isSafeRetryMutableRPC),
namespace);
}
}
4 changes: 2 additions & 2 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public CompletableFuture<LockResponse> lock(ByteSequence name, long leaseId) {
return execute(
() -> stubWithLeader().lock(request),
response -> new LockResponse(response, namespace),
Errors::isRetryable);
Errors::isSafeRetryMutableRPC);
}

@Override
Expand All @@ -83,6 +83,6 @@ public CompletableFuture<UnlockResponse> unlock(ByteSequence lockKey) {
return execute(
() -> stubWithLeader().unlock(request),
UnlockResponse::new,
Errors::isRetryable);
Errors::isSafeRetryMutableRPC);
}
}
21 changes: 20 additions & 1 deletion jetcd-core/src/main/java/io/etcd/jetcd/support/Errors.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,30 @@ public final class Errors {
private Errors() {
}

public static boolean isRetryable(Status status) {
public static boolean isSafeRetryImmutableRPC(Status status) {
// similar to go etcd client isSafeRetryImmutableRPC

return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isInvalidTokenError(status)
|| isAuthStoreExpired(status);
}

public static boolean isSafeRetryMutableRPC(Status status) {
// similar to go etcd client isSafeRetryMutableRPC

if (isInvalidTokenError(status) || isAuthStoreExpired(status)) {
return true;
}
if (!Status.UNAVAILABLE.getCode().equals(status.getCode())) {
return false;
}
String desc = status.getDescription();
if (desc == null) {
return false;
}

return desc.equals("there is no address available") || desc.equals("there is no connection available");
}

public static boolean isInvalidTokenError(Throwable e) {
Status status = Status.fromThrowable(e);
return isInvalidTokenError(status);
Expand Down
21 changes: 17 additions & 4 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/UtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,29 @@ public void testAuthStoreExpired() {
}

@Test
public void testAuthErrorIsNotRetryable() {
public void testAuthErrorIsSafeRetryImmutableRPC() {
Status authErrorStatus = Status.UNAUTHENTICATED
.withDescription("etcdserver: invalid auth token");
Status status = Status.fromThrowable(new StatusException(authErrorStatus));
assertThat(Errors.isRetryable(status)).isTrue();
assertThat(Errors.isSafeRetryImmutableRPC(status)).isTrue();
}

@Test
public void testUnavailableErrorIsRetryable() {
public void testUnavailableErrorIsSafeRetryImmutableRPC() {
Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE));
assertThat(Errors.isRetryable(status)).isTrue();
assertThat(Errors.isSafeRetryImmutableRPC(status)).isTrue();
}

@Test
public void testUnavailableErrorIsSafeRetryMutableRPC() {
Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE));
assertThat(Errors.isSafeRetryMutableRPC(status)).isFalse();
}

@Test
public void testNoAddressAvailableIsSafeRetryMutableRPC() {
Status status = Status.UNAVAILABLE
.withDescription("there is no address available");
assertThat(Errors.isSafeRetryMutableRPC(status)).isTrue();
}
}
Loading