diff --git a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java index d87f4d3903d150..280ded9e16d864 100644 --- a/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java +++ b/mode/type/cluster/repository/provider/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java @@ -29,6 +29,8 @@ import com.google.common.base.Strings; import lombok.Getter; import org.apache.http.HttpStatus; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; @@ -68,7 +70,7 @@ public final class ConsulRepository implements ClusterPersistRepository { @Override public void init(final ClusterPersistRepositoryConfiguration config) { consulProps = new ConsulProperties(config.getProps()); - ConsulRawClient rawClient = createConsulRawClient(config.getServerLists()); + ConsulRawClient rawClient = createConsulRawClient(config.getServerLists(), consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS)); consulClient = new ShardingSphereConsulClient(rawClient); distributedLockHolder = new DistributedLockHolder(getType(), consulClient, consulProps); watchKeyMap = new HashMap<>(6, 1F); @@ -137,9 +139,10 @@ public void persistEphemeral(final String key, final String value) { } @SuppressWarnings("HttpUrlsUsage") - private ConsulRawClient createConsulRawClient(final String serverLists) { + private ConsulRawClient createConsulRawClient(final String serverLists, final long blockQueryTimeToSeconds) { + CloseableHttpClient httpClient = HttpClientBuilder.create().setConnectionTimeToLive(blockQueryTimeToSeconds, TimeUnit.SECONDS).build(); if (Strings.isNullOrEmpty(serverLists)) { - return new ConsulRawClient(); + return new ConsulRawClient(httpClient); } URL serverUrl; try { @@ -148,9 +151,9 @@ private ConsulRawClient createConsulRawClient(final String serverLists) { throw new RuntimeException(e); } if (-1 == serverUrl.getPort()) { - return new ConsulRawClient(serverUrl.getHost()); + return new ConsulRawClient(serverUrl.getHost(), httpClient); } - return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort()); + return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort(), httpClient); } private NewSession createNewSession(final String key) { @@ -177,13 +180,11 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi AtomicBoolean running = new AtomicBoolean(true); long currentIndex = 0; while (running.get()) { - Response> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex)); + QueryParams queryParams = QueryParams.Builder.builder().setIndex(currentIndex).build(); + Response> response = consulClient.getKVValues(key, queryParams); List value = response.getValue(); - if (null == value) { - continue; - } Long index = response.getConsulIndex(); - if (null != index && 0 == currentIndex) { + if (null != index && 0 == currentIndex && null != value) { currentIndex = index; if (!watchKeyMap.containsKey(key)) { watchKeyMap.put(key, new HashSet<>()); @@ -194,7 +195,7 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi } continue; } - if (null != index && index > currentIndex) { + if (null != index && index > currentIndex && null != value) { currentIndex = index; Collection newKeys = new HashSet<>(value.size(), 1F); Collection watchKeys = watchKeyMap.get(key); @@ -215,7 +216,7 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi } } watchKeyMap.put(key, newKeys); - } else if (null != index && index < currentIndex) { + } else if (null != index && index < currentIndex && null != value) { currentIndex = 0; } } diff --git a/test/native/pom.xml b/test/native/pom.xml index 96d87fb8d2b6f0..5a8ac40fb1ae88 100644 --- a/test/native/pom.xml +++ b/test/native/pom.xml @@ -45,6 +45,12 @@ ${project.version} test + + org.apache.shardingsphere + shardingsphere-cluster-mode-repository-consul + ${project.version} + test + org.awaitility diff --git a/test/native/src/test/java/org/apache/shardingsphere/test/natived/jdbc/commons/testcontainers/ShardingSphereConsulContainer.java b/test/native/src/test/java/org/apache/shardingsphere/test/natived/jdbc/commons/testcontainers/ShardingSphereConsulContainer.java new file mode 100644 index 00000000000000..e853fea3cdd227 --- /dev/null +++ b/test/native/src/test/java/org/apache/shardingsphere/test/natived/jdbc/commons/testcontainers/ShardingSphereConsulContainer.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.test.natived.jdbc.commons.testcontainers; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import com.github.dockerjava.api.model.Capability; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.HostConfig; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +@SuppressWarnings({"FieldMayBeFinal", "FieldCanBeLocal", "resource", "DataFlowIssue", "unused"}) +public class ShardingSphereConsulContainer extends GenericContainer { + + private static final DockerImageName DEFAULT_OLD_IMAGE_NAME = DockerImageName.parse("consul"); + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("hashicorp/consul"); + + private static final int CONSUL_HTTP_PORT = 8500; + + private static final int CONSUL_GRPC_PORT = 8502; + + private List initCommands = new ArrayList<>(); + + private String[] startConsulCmd = new String[]{"agent", "-dev", "-client", "0.0.0.0"}; + + /** + * Manually specify the Port for ShardingSphere's nativeTest. + * @param dockerImageName docker image name + */ + public ShardingSphereConsulContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_OLD_IMAGE_NAME, DEFAULT_IMAGE_NAME); + setWaitStrategy(Wait.forHttp("/v1/status/leader").forPort(CONSUL_HTTP_PORT).forStatusCode(200)); + withCreateContainerCmdModifier(cmd -> { + cmd.getHostConfig().withCapAdd(Capability.IPC_LOCK); + cmd.withHostConfig(new HostConfig().withPortBindings(new PortBinding(Ports.Binding.bindPort(62391), new ExposedPort(CONSUL_HTTP_PORT)))); + }); + withEnv("CONSUL_ADDR", "http://0.0.0.0:" + CONSUL_HTTP_PORT); + withCommand(startConsulCmd); + } + + @Override + protected void containerIsStarted(final InspectContainerResponse containerInfo) { + if (!initCommands.isEmpty()) { + String commands = initCommands.stream().map(command -> "consul " + command).collect(Collectors.joining(" && ")); + try { + ExecResult execResult = this.execInContainer("/bin/sh", "-c", commands); + if (0 != execResult.getExitCode()) { + logger().error( + "Failed to execute these init commands {}. Exit code {}. Stdout {}. Stderr {}", + initCommands, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + } + } catch (IOException | InterruptedException e) { + logger().error( + "Failed to execute these init commands {}. Exception message: {}", + initCommands, + e.getMessage()); + } + } + } + + /** + * work with Consul Command. + * @param commands The commands to send to the consul cli + * @return this + */ + public ShardingSphereConsulContainer withConsulCommand(final String... commands) { + initCommands.addAll(Arrays.asList(commands)); + return self(); + } +} diff --git a/test/native/src/test/java/org/apache/shardingsphere/test/natived/jdbc/mode/cluster/ConsulTest.java b/test/native/src/test/java/org/apache/shardingsphere/test/natived/jdbc/mode/cluster/ConsulTest.java new file mode 100644 index 00000000000000..7502ea0fd9c0f1 --- /dev/null +++ b/test/native/src/test/java/org/apache/shardingsphere/test/natived/jdbc/mode/cluster/ConsulTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.test.natived.jdbc.mode.cluster; + +import com.ecwid.consul.transport.HttpResponse; +import com.ecwid.consul.v1.ConsulRawClient; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory; +import org.apache.shardingsphere.test.natived.jdbc.commons.FileTestUtils; +import org.apache.shardingsphere.test.natived.jdbc.commons.TestShardingService; +import org.apache.shardingsphere.test.natived.jdbc.commons.testcontainers.ShardingSphereConsulContainer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledInNativeImage; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +import javax.sql.DataSource; +import java.io.IOException; +import java.sql.SQLException; +import java.time.Duration; + +public class ConsulTest { + + private static final int CONSUL_HOST_HTTP_PORT = 62391; + + private TestShardingService testShardingService; + + @Test + @EnabledInNativeImage + void assertShardingInLocalTransactions() throws SQLException, IOException { + try ( + GenericContainer consulContainer = new ShardingSphereConsulContainer(DockerImageName.parse("hashicorp/consul:1.10.12"))) { + consulContainer.start(); + beforeAll(); + DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(FileTestUtils.readFromFileURLString("test-native/yaml/mode/cluster/consul.yaml")); + testShardingService = new TestShardingService(dataSource); + initEnvironment(); + Awaitility.await().atMost(Duration.ofSeconds(30L)).ignoreExceptions().until(() -> { + dataSource.getConnection().close(); + return true; + }); + testShardingService.processSuccess(); + testShardingService.cleanEnvironment(); + } + } + + private void initEnvironment() throws SQLException { + testShardingService.getOrderRepository().createTableIfNotExistsInMySQL(); + testShardingService.getOrderItemRepository().createTableIfNotExistsInMySQL(); + testShardingService.getAddressRepository().createTableIfNotExists(); + testShardingService.getOrderRepository().truncateTable(); + testShardingService.getOrderItemRepository().truncateTable(); + testShardingService.getAddressRepository().truncateTable(); + } + + private void beforeAll() { + Awaitility.await().atMost(Duration.ofSeconds(30L)).ignoreExceptions().until(this::verifyConsulAgentRunning); + } + + private boolean verifyConsulAgentRunning() { + boolean flag = false; + HttpResponse httpResponse = new ConsulRawClient("http://localhost", CONSUL_HOST_HTTP_PORT).makeGetRequest("/v1/status/leader"); + if (HttpStatus.SC_OK == httpResponse.getStatusCode()) { + flag = true; + } + return flag; + } +} diff --git a/test/native/src/test/resources/META-INF/native-image/shardingsphere-test-native-test-metadata/resource-config.json b/test/native/src/test/resources/META-INF/native-image/shardingsphere-test-native-test-metadata/resource-config.json index c904b01b547963..20bee2b5bf392c 100644 --- a/test/native/src/test/resources/META-INF/native-image/shardingsphere-test-native-test-metadata/resource-config.json +++ b/test/native/src/test/resources/META-INF/native-image/shardingsphere-test-native-test-metadata/resource-config.json @@ -33,6 +33,9 @@ }, { "condition":{"typeReachable":"org.apache.shardingsphere.test.natived.jdbc.mode.cluster.ZookeeperTest"}, "pattern":"\\Qtest-native/yaml/mode/cluster/zookeeper.yaml\\E" + }, { + "condition":{"typeReachable":"org.apache.shardingsphere.test.natived.jdbc.mode.cluster.ConsulTest"}, + "pattern":"\\Qtest-native/yaml/mode/cluster/consul.yaml\\E" }]}, "bundles":[] } diff --git a/test/native/src/test/resources/test-native/yaml/mode/cluster/consul.yaml b/test/native/src/test/resources/test-native/yaml/mode/cluster/consul.yaml new file mode 100644 index 00000000000000..4c90d21bfa7b8b --- /dev/null +++ b/test/native/src/test/resources/test-native/yaml/mode/cluster/consul.yaml @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +mode: + type: Cluster + repository: + type: Consul + props: + namespace: governance-consul-data-source + server-lists: localhost:62391 + +dataSources: + ds_0: + dataSourceClassName: com.zaxxer.hikari.HikariDataSource + driverClassName: org.h2.Driver + jdbcUrl: jdbc:h2:mem:cluster_consul_ds_0;MODE=MYSQL;IGNORECASE=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE + username: root + password: 123456 + ds_1: + dataSourceClassName: com.zaxxer.hikari.HikariDataSource + driverClassName: org.h2.Driver + jdbcUrl: jdbc:h2:mem:cluster_consul_ds_1;MODE=MYSQL;IGNORECASE=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE + username: root + password: 123456 + ds_2: + dataSourceClassName: com.zaxxer.hikari.HikariDataSource + driverClassName: org.h2.Driver + jdbcUrl: jdbc:h2:mem:cluster_consul_ds_2;MODE=MYSQL;IGNORECASE=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE + username: root + password: 123456 + +rules: + - !SHARDING + tables: + t_order: + actualDataNodes: + keyGenerateStrategy: + column: order_id + keyGeneratorName: snowflake + t_order_item: + actualDataNodes: + keyGenerateStrategy: + column: order_item_id + keyGeneratorName: snowflake + defaultDatabaseStrategy: + standard: + shardingColumn: user_id + shardingAlgorithmName: inline + shardingAlgorithms: + inline: + type: CLASS_BASED + props: + strategy: STANDARD + algorithmClassName: org.apache.shardingsphere.test.natived.jdbc.commons.algorithm.ClassBasedInlineShardingAlgorithmFixture + keyGenerators: + snowflake: + type: SNOWFLAKE + auditors: + sharding_key_required_auditor: + type: DML_SHARDING_CONDITIONS + + - !BROADCAST + tables: + - t_address + +props: + sql-show: false diff --git a/test/native/src/test/resources/test-native/yaml/mode/cluster/zookeeper.yaml b/test/native/src/test/resources/test-native/yaml/mode/cluster/zookeeper.yaml index 6df3f7c47e8829..fbf8d05b50aab9 100644 --- a/test/native/src/test/resources/test-native/yaml/mode/cluster/zookeeper.yaml +++ b/test/native/src/test/resources/test-native/yaml/mode/cluster/zookeeper.yaml @@ -20,7 +20,7 @@ mode: repository: type: ZooKeeper props: - namespace: governance + namespace: governance-zookeeper-data-source server-lists: localhost:62372 dataSources: