Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Key-Value Transactions Impl #198

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions src/main/java/com/ecwid/consul/v1/ConsulClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
import com.ecwid.consul.v1.session.model.Session;
import com.ecwid.consul.v1.status.StatusClient;
import com.ecwid.consul.v1.status.StatusConsulClient;
import com.ecwid.consul.v1.transactions.ParamBuilder;
import com.ecwid.consul.v1.transactions.TransactionsClient;
import com.ecwid.consul.v1.transactions.TransactionsConsulClient;
import com.ecwid.consul.v1.transactions.model.TxnResult;

import java.util.List;
import java.util.Map;
Expand All @@ -62,7 +66,8 @@ public class ConsulClient implements
KeyValueClient,
QueryClient,
SessionClient,
StatusClient {
StatusClient,
TransactionsClient {

private final AclClient aclClient;
private final AgentClient agentClient;
Expand All @@ -74,6 +79,7 @@ public class ConsulClient implements
private final QueryClient queryClient;
private final SessionClient sessionClient;
private final StatusClient statusClient;
private final TransactionsClient transactionsClient;

public ConsulClient(ConsulRawClient rawClient) {
aclClient = new AclConsulClient(rawClient);
Expand All @@ -86,6 +92,7 @@ public ConsulClient(ConsulRawClient rawClient) {
queryClient = new QueryConsulClient(rawClient);
sessionClient = new SessionConsulClient(rawClient);
statusClient = new StatusConsulClient(rawClient);
transactionsClient = new TransactionsConsulClient(rawClient);
}

/**
Expand Down Expand Up @@ -209,7 +216,7 @@ public Response<List<Member>> getAgentMembers() {
public Response<Self> getAgentSelf() {
return agentClient.getAgentSelf();
}

@Override
public Response<Self> getAgentSelf(String token) {
return agentClient.getAgentSelf(token);
Expand Down Expand Up @@ -868,4 +875,23 @@ public Response<String> getStatusLeader() {
public Response<List<String>> getStatusPeers() {
return statusClient.getStatusPeers();
}

/**
* Submit transaction
*
* @param token token
* @param builder parameters builder
* @return process result report
*/
@Override
public Response<TxnResult> commit(String token, ParamBuilder builder) { return transactionsClient.commit(token,builder);}

/**
* Submit transaction
*
* @param builder parameters builder
* @return process result report
*/
@Override
public Response<TxnResult> commit(ParamBuilder builder) { return transactionsClient.commit(builder);}
}
231 changes: 231 additions & 0 deletions src/main/java/com/ecwid/consul/v1/transactions/ParamBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package com.ecwid.consul.v1.transactions;

import com.ecwid.consul.v1.transactions.model.Operate;
import com.ecwid.consul.v1.transactions.model.TxnReqItem;
import com.ecwid.consul.v1.transactions.model.Verb;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.LinkedList;
import java.util.List;

/**
* Build consul Transaction parameters
* <p>
* https://www.consul.io/api/txn.html#tables-of-operations
*
* @author Trisia Cliven ([email protected])
* @since 2019-11-11 12:21:52
*/
public class ParamBuilder {

private List<TxnReqItem> operates;

private ParamBuilder() {
operates = new LinkedList<>();
}

public static ParamBuilder getInstance() {
return new ParamBuilder();
}

/**
* Sets the Key to the given Value
*
* @param key key
* @param value value (do not base64)
* @param flag [option]
* @return this
*/
public ParamBuilder kvSet(String key, String value, int... flag) {
Operate op = new Operate(Verb.set)
.setKey(key)
.setValue(Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)));
if (flag != null && flag.length > 0) {
op.setFlags(flag[0]);
}
operates.add(new TxnReqItem(op));
return this;
}

/**
* Sets, but with CAS semantics
*
* @param key key
* @param value value
* @param index index
* @param flag [option]
* @return this
*/
public ParamBuilder kvCas(String key, String value, int index, int... flag) {
Operate op = new Operate(Verb.cas)
.setKey(key)
.setValue(Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)))
.setIndex(index);
if (flag != null && flag.length > 0) {
op.setFlags(flag[0]);
}
operates.add(new TxnReqItem(op));
return this;
}

/**
* Lock with the given Session
*
* @param key key
* @param value value
* @param session session id
* @param flag [option]
* @return this
*/
public ParamBuilder kvLock(String key, String value, String session, int... flag) {
Operate op = new Operate(Verb.lock)
.setKey(key)
.setValue(Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)))
.setSession(session);
if (flag != null && flag.length > 0) {
op.setFlags(flag[0]);
}
operates.add(new TxnReqItem(op));
return this;
}


/**
* Unlock with the given Session
*
* @param key key
* @param value value
* @param session session id
* @param flag [option]
* @return this
*/
public ParamBuilder kvUnlock(String key, String value, String session, int... flag) {
Operate op = new Operate(Verb.unlock)
.setKey(key)
.setValue(Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)))
.setSession(session);
if (flag != null && flag.length > 0) {
op.setFlags(flag[0]);
}
operates.add(new TxnReqItem(op));
return this;
}

/**
* Get the key, fails if it does not exist
*
* @param key key
* @return this
*/
public ParamBuilder kvGet(String key) {
Operate op = new Operate(Verb.get)
.setKey(key);
operates.add(new TxnReqItem(op));
return this;
}

/**
* Gets all keys with the prefix
*
* @param key key
* @return this
*/
public ParamBuilder kvGetTree(String key) {
Operate op = new Operate(Verb.get_tree)
.setKey(key);
operates.add(new TxnReqItem(op));
return this;
}

/**
* Fail if modify index != index
*
* @param key key
* @param index index
* @return this
*/
public ParamBuilder kvCheckIndex(String key, int index) {
Operate op = new Operate(Verb.check_index)
.setKey(key)
.setIndex(index);
operates.add(new TxnReqItem(op));
return this;
}

/**
* Fail if not locked by session
*
* @param key key
* @param session session id
* @return this
*/
public ParamBuilder kvCheckSession(String key, String session) {
Operate op = new Operate(Verb.check_session)
.setKey(key)
.setSession(session);
operates.add(new TxnReqItem(op));
return this;
}

/**
* Fail if key exists
*
* @param key key
* @return this
*/
public ParamBuilder kvCheckNotExists(String key) {
Operate op = new Operate(Verb.check_not_exists)
.setKey(key);
operates.add(new TxnReqItem(op));
return this;
}

/**
* Delete the key
*
* @param key key
* @return this
*/
public ParamBuilder kvDelete(String key) {
Operate op = new Operate(Verb.delete)
.setKey(key);
operates.add(new TxnReqItem(op));
return this;
}

/**
* Delete all keys with a prefix
*
* @param key key
* @return this
*/
public ParamBuilder kvDeleteTree(String key) {
Operate op = new Operate(Verb.delete_tree)
.setKey(key);
operates.add(new TxnReqItem(op));
return this;
}

/**
* Delete, but with CAS semantics
*
* @param key key
* @return this
*/
public ParamBuilder kvDeleteCas(String key) {
Operate op = new Operate(Verb.delete_cas)
.setKey(key);
operates.add(new TxnReqItem(op));
return this;
}

/**
* Build Request parameters
*
* @return parameters
*/
public List<TxnReqItem> build() {
return this.operates;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.ecwid.consul.v1.transactions;


import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.transactions.model.TxnResult;

/**
* Transactions Client
*
* @author Trisia Cliven ([email protected])
* @since 2019-11-11 10:24:49
*/
public interface TransactionsClient {
/**
* Submit transaction
*
* @param token token
* @param builder parameters builder
* @return process result report
*/
public Response<TxnResult> commit(String token, ParamBuilder builder);
/**
* Submit transaction
*
* @param builder parameters builder
* @return process result report
*/
public Response<TxnResult> commit(ParamBuilder builder);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.ecwid.consul.v1.transactions;

import com.ecwid.consul.SingleUrlParameters;
import com.ecwid.consul.UrlParameters;
import com.ecwid.consul.json.GsonFactory;
import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.OperationException;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.transactions.model.TxnResult;

/**
* Transactions Client for consul
*
* @author Trisia Cliven ([email protected])
*/
public class TransactionsConsulClient implements TransactionsClient {
private final ConsulRawClient rawClient;

public TransactionsConsulClient(ConsulRawClient rawClient) {
this.rawClient = rawClient;
}

public TransactionsConsulClient() {
this(new ConsulRawClient());
}

/**
* Submit transaction
*
* @param token token
* @param builder parameters builder
* @return process result report
*/
@Override
public Response<TxnResult> commit(String token, ParamBuilder builder) {
UrlParameters tokenParams = token != null ? new SingleUrlParameters("token", token) : null;
String json = GsonFactory.getGson().toJson(builder.build());
HttpResponse httpResponse = rawClient.makePutRequest("/v1/txn", json, tokenParams);

if (httpResponse.getStatusCode() == 200) {
TxnResult value = GsonFactory.getGson().fromJson(httpResponse.getContent(), TxnResult.class);
return new Response<>(value, httpResponse);
} else {
throw new OperationException(httpResponse);
}
}

/**
* Submit transaction
*
* @param builder parameters builder
* @return process result report
*/
@Override
public Response<TxnResult> commit(ParamBuilder builder) {
return this.commit(null, builder);
}
}
Loading