Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GraalVM Reachability Metadata and corresponding nativeTest for Consul integration #29588

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 9 additions & 24 deletions mode/type/cluster/repository/provider/consul/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,9 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
<version>${consul.api.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
<version>${consul-client.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand All @@ -54,25 +43,21 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@

package org.apache.shardingsphere.mode.repository.cluster.consul;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import com.ecwid.consul.v1.session.model.Session;
import com.google.common.base.Strings;
import com.orbitz.consul.Consul;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;
import lombok.Getter;
import org.apache.http.HttpStatus;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
Expand All @@ -39,212 +32,150 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Registry repository of Consul.
* Before JDK 18 implemented in JEP 400, the return value of `{@link java.nio.charset.Charset}.defaultCharset()` on the
* Windows platform was usually not `{@link java.nio.charset.StandardCharsets}.UTF_8`.
* This explains the series of settings this class has on CharSet.
*/
public final class ConsulRepository implements ClusterPersistRepository {

private static final ScheduledThreadPoolExecutor SESSION_FLUSH_EXECUTOR = new ScheduledThreadPoolExecutor(2);
private final Map<String, KVCache> caches = new ConcurrentHashMap<>();

private ShardingSphereConsulClient consulClient;

private ConsulProperties consulProps;
private Consul consulClient;

@Getter
private DistributedLockHolder distributedLockHolder;

private Map<String, Collection<String>> watchKeyMap;
private final Set<String> ephemeralKeySet = new HashSet<>();

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
consulProps = new ConsulProperties(config.getProps());
ConsulRawClient rawClient = createConsulRawClient(config.getServerLists());
consulClient = new ShardingSphereConsulClient(rawClient);
ConsulProperties consulProps = new ConsulProperties(config.getProps());
consulClient = createConsulClient(config.getServerLists(), consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS));
distributedLockHolder = new DistributedLockHolder(getType(), consulClient, consulProps);
watchKeyMap = new HashMap<>(6, 1F);
}

@Override
public String getDirectly(final String key) {
Response<GetValue> response = consulClient.getKVValue(key);
if (null == response) {
return null;
/**
* Set ReadTimeoutMillis to avoid `java.lang.IllegalArgumentException: Cache watchInterval=10sec >= networkClientReadTimeout=10000ms. It can cause issues`.
*
* @param serverLists serverUrl.
* @param blockQueryTimeToSeconds blockQueryTimeToSeconds for Mode Config.
* @return Consul client.
* @throws RuntimeException MalformedURLException.
*/
@SuppressWarnings("HttpUrlsUsage")
private Consul createConsulClient(final String serverLists, final long blockQueryTimeToSeconds) {
Consul.Builder builder = Consul.builder().withReadTimeoutMillis(blockQueryTimeToSeconds * 1000);
if (Strings.isNullOrEmpty(serverLists)) {
return builder.build();
}
URL serverUrl;
try {
serverUrl = new URL(!serverLists.startsWith("https://") && !serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
GetValue value = response.getValue();
return null == value ? null : value.getValue();
return builder.withUrl(serverUrl).build();
}

@Override
public List<String> getChildrenKeys(final String key) {
Response<List<String>> response = consulClient.getKVKeysOnly(key);
if (null == response) {
return Collections.emptyList();
}
List<String> value = response.getValue();
return null == value ? Collections.emptyList() : value;
return consulClient.keyValueClient()
.getKeys(key)
.stream()
.filter(s -> !s.equals(key))
.collect(Collectors.toList());
}

@Override
public boolean isExisted(final String key) {
return null != consulClient.getKVValue(key).getValue();
public void persist(final String key, final String value) {
consulClient.keyValueClient().putValue(key, value, StandardCharsets.UTF_8);
}

@Override
public void persist(final String key, final String value) {
consulClient.setKVValue(key, value);
public void update(final String key, final String value) {
consulClient.keyValueClient().putValue(key, value, StandardCharsets.UTF_8);
}

@Override
public void update(final String key, final String value) {
consulClient.setKVValue(key, value);
public String getDirectly(final String key) {
return consulClient.keyValueClient().getValueAsString(key, StandardCharsets.UTF_8).orElse(null);
}

@Override
public void delete(final String key) {
consulClient.deleteKVValue(key);
public boolean isExisted(final String key) {
return !consulClient.keyValueClient().getValuesAsString(key, StandardCharsets.UTF_8).isEmpty();
}

/**
* {@link ConsulRawClient} is a wrapper of blocking HTTP client and does not have a close method.
* Using such a Client does not necessarily conform to the implementation of the relevant SPI. ShardingSphere needs to
* consider solutions similar to <a href="https://github.com/spring-cloud/spring-cloud-consul/issues/475">spring-cloud/spring-cloud-consul#475</a>.
* Persist Ephemeral by flushing session by update TTL.
*
* @see ConsulRawClient
* @param key key of data
* @param value value of data
*/
@Override
public void close() {
}

@Override
public void persistEphemeral(final String key, final String value) {
Response<String> response = consulClient.sessionCreate(createNewSession(key), QueryParams.DEFAULT);
String sessionId = response.getValue();
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
consulClient.setKVValue(key, value, putParams);
generatorFlushSessionTtlTask(consulClient, sessionId);
verifyConsulAgentRunning();
}

@SuppressWarnings("HttpUrlsUsage")
private ConsulRawClient createConsulRawClient(final String serverLists) {
if (Strings.isNullOrEmpty(serverLists)) {
return new ConsulRawClient();
}
URL serverUrl;
try {
serverUrl = new URL(!serverLists.startsWith("https://") && !serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
if (isExisted(key)) {
consulClient.keyValueClient().deleteKeys(key);
ephemeralKeySet.remove(key);
}
if (-1 == serverUrl.getPort()) {
return new ConsulRawClient(serverUrl.getHost());
}
return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort());
}

private NewSession createNewSession(final String key) {
NewSession result = new NewSession();
result.setName(key);
result.setBehavior(Session.Behavior.DELETE);
result.setTtl(consulProps.getValue(ConsulPropertyKey.TIME_TO_LIVE_SECONDS));
return result;
consulClient.keyValueClient().putValue(key, value, StandardCharsets.UTF_8);
ephemeralKeySet.add(key);
}

@Override
public void persistExclusiveEphemeral(final String key, final String value) {
persistEphemeral(key, value);
consulClient.keyValueClient().putValue(key, value, StandardCharsets.UTF_8);
ephemeralKeySet.add(key);
}

@Override
public void watch(final String key, final DataChangedEventListener listener) {
Thread watchThread = new Thread(() -> watchChildKeyChangeEvent(key, listener));
watchThread.setDaemon(true);
watchThread.start();
}

private void watchChildKeyChangeEvent(final String key, final DataChangedEventListener listener) {
AtomicBoolean running = new AtomicBoolean(true);
long currentIndex = 0;
while (running.get()) {
Response<List<GetValue>> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
List<GetValue> value = response.getValue();
if (null == value) {
continue;
}
Long index = response.getConsulIndex();
if (null != index && 0 == currentIndex) {
currentIndex = index;
if (!watchKeyMap.containsKey(key)) {
watchKeyMap.put(key, new HashSet<>());
}
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : value) {
watchKeys.add(each.getKey());
}
continue;
}
if (null != index && index > currentIndex) {
currentIndex = index;
Collection<String> newKeys = new HashSet<>(value.size(), 1F);
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : value) {
newKeys.add(each.getKey());
if (!watchKeys.contains(each.getKey())) {
watchKeys.add(each.getKey());
fireDataChangeEvent(each, listener, DataChangedEvent.Type.ADDED);
} else if (watchKeys.contains(each.getKey()) && each.getModifyIndex() >= currentIndex) {
fireDataChangeEvent(each, listener, DataChangedEvent.Type.UPDATED);
}
}
for (String each : watchKeys) {
if (!newKeys.contains(each)) {
GetValue getValue = new GetValue();
getValue.setKey(each);
fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.DELETED);
}
}
watchKeyMap.put(key, newKeys);
} else if (null != index && index < currentIndex) {
currentIndex = 0;
}
public void delete(final String key) {
if (isExisted(key)) {
consulClient.keyValueClient().deleteKeys(key);
}
}

private void fireDataChangeEvent(final GetValue getValue, final DataChangedEventListener listener, final DataChangedEvent.Type type) {
listener.onChange(new DataChangedEvent(getValue.getKey(), getValue.getValue(), type));
}

/**
* Flush session by update TTL.
* Consul doesn't tell clients what key changed when performing a watch. we best bet is to do a comparison with the previous set of values.
* This is a bit troublesome in ShardingSphere context implementation.
*
* @param consulClient consul client
* @param sessionId session id
* @param key key of data
* @param listener data changed event listener
*/
public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L, TimeUnit.SECONDS);
@Override
public void watch(final String key, final DataChangedEventListener listener) {
KVCache cache = caches.get(key);
if (null == cache) {
cache = KVCache.newCache(consulClient.keyValueClient(), key);
caches.put(key, cache);
}
cache.addListener(newValues -> {
Optional<Value> newValue = newValues.values().stream().filter(value -> value.getKey().equals(key)).findAny();
newValue.ifPresent(value -> {
Optional<String> decodedValue = newValue.get().getValueAsString();
decodedValue.ifPresent(v -> listener.onChange(new DataChangedEvent(key, v, DataChangedEvent.Type.ADDED)));
});
});
cache.start();
}

/**
* See <a href="https://developer.hashicorp.com/consul/api-docs/v1.17.x/status">Status HTTP API</a> .
*
* @throws RuntimeException Unable to connect to Consul Agent.
*/
private void verifyConsulAgentRunning() {
HttpResponse httpResponse = consulClient.getRawClient().makeGetRequest("/v1/status/leader");
if (HttpStatus.SC_OK != httpResponse.getStatusCode()) {
throw new RuntimeException("Unable to connect to Consul Agent and StatusCode is " + httpResponse.getStatusCode() + ".");
}
@Override
public void close() {
caches.values().forEach(KVCache::close);
ephemeralKeySet.forEach(s -> consulClient.keyValueClient().deleteKey(s));
ephemeralKeySet.clear();
consulClient.destroy();
}

@Override
Expand Down
Loading
Loading