From a01e7ef37bf991870ac54ca159fd77a3d3b7b10c Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Tue, 24 Dec 2024 07:44:02 +0800 Subject: [PATCH] Add GlobalLock (#34131) * Add GlobalLock * Add GlobalLock --- .../preparer/MigrationJobPreparer.java | 3 +- .../preparer/MigrationPrepareLock.java | 35 +++++++++++++++++++ .../globalclock/executor/GlobalClockLock.java | 31 ++++++++++++++++ .../executor/GlobalClockTransactionHook.java | 3 +- .../mode/lock/global/GlobalLock.java | 31 ++++++++++++++++ .../lock/global/GlobalLockDefinition.java | 4 +++ .../mode/lock/global/GlobalLockName.java | 6 ---- ...ShardingSphereStatisticsRefreshEngine.java | 4 +-- .../refresher/lock/StatisticsLock.java | 31 ++++++++++++++++ .../lock/global/GlobalLockDefinitionTest.java | 6 +++- .../cluster/lock/ClusterLockContextTest.java | 9 +++-- .../service/GlobalLockPersistServiceTest.java | 19 +++++++--- .../ral/updatable/LockClusterExecutor.java | 4 +-- .../ral/updatable/UnlockClusterExecutor.java | 4 +-- .../ral/updatable/lock/ClusterLock.java | 31 ++++++++++++++++ 15 files changed, 197 insertions(+), 24 deletions(-) create mode 100644 kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java create mode 100644 kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockLock.java create mode 100644 mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLock.java create mode 100644 mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/lock/StatisticsLock.java create mode 100644 proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/ClusterLock.java diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index 0bed58cbd4cfe..1d2314aa29e43 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -70,7 +70,6 @@ import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; -import org.apache.shardingsphere.mode.lock.global.GlobalLockName; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.parser.rule.SQLParserRule; @@ -118,7 +117,7 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) { jobItemManager.persistProgress(jobItemContext); } - LockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.PREPARE, jobConfig.getJobId()); + LockDefinition lockDefinition = new GlobalLockDefinition(new MigrationPrepareLock(jobConfig.getJobId())); long startTimeMillis = System.currentTimeMillis(); if (lockContext.tryLock(lockDefinition, 600 * 1000L)) { log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java new file mode 100644 index 0000000000000..958045d301fc0 --- /dev/null +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer; + +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.mode.lock.global.GlobalLock; + +/** + * Migration prepare lock. + */ +@RequiredArgsConstructor +public final class MigrationPrepareLock implements GlobalLock { + + private final String jobId; + + @Override + public String getName() { + return String.format("migration_prepare_%s", jobId); + } +} diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockLock.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockLock.java new file mode 100644 index 0000000000000..589b13b4e8138 --- /dev/null +++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockLock.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.globalclock.executor; + +import org.apache.shardingsphere.mode.lock.global.GlobalLock; + +/** + * Global clock lock. + */ +public final class GlobalClockLock implements GlobalLock { + + @Override + public String getName() { + return "global_clock"; + } +} diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java index 3a36bf69dca92..c6093c1d03ea6 100644 --- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java +++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java @@ -23,7 +23,6 @@ import org.apache.shardingsphere.globalclock.rule.constant.GlobalClockOrder; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.mode.lock.global.GlobalLockName; import org.apache.shardingsphere.infra.lock.LockContext; import org.apache.shardingsphere.infra.lock.LockDefinition; import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext; @@ -41,7 +40,7 @@ */ public final class GlobalClockTransactionHook implements TransactionHook { - private final LockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.GLOBAL_CLOCK); + private final LockDefinition lockDefinition = new GlobalLockDefinition(new GlobalClockLock()); @Override public void beforeBegin(final GlobalClockRule rule, final DatabaseType databaseType, final TransactionConnectionContext transactionContext) { diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLock.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLock.java new file mode 100644 index 0000000000000..c4c54b55c36f5 --- /dev/null +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLock.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.lock.global; + +/** + * Global lock. + */ +public interface GlobalLock { + + /** + * Get lock name. + * + * @return lock name + */ + String getName(); +} diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLockDefinition.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLockDefinition.java index bee425df49d7b..a886d715dc86d 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLockDefinition.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLockDefinition.java @@ -33,4 +33,8 @@ public final class GlobalLockDefinition implements LockDefinition { public GlobalLockDefinition(final GlobalLockName lockName, final Object... lockParams) { lockKey = String.format(KEY_PATTERN, String.format(lockName.getLockName(), lockParams)); } + + public GlobalLockDefinition(final GlobalLock globalLock) { + lockKey = String.format(KEY_PATTERN, globalLock.getName()); + } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLockName.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLockName.java index 782c0818ce0c5..b05d10e0d760c 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLockName.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/global/GlobalLockName.java @@ -27,12 +27,6 @@ @Getter public enum GlobalLockName { - CLUSTER_LOCK("cluster_lock"), - - PREPARE("prepare_%s"), - - GLOBAL_CLOCK("global_clock"), - STATISTICS("statistics"); private final String lockName; diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java index d3add96827bf4..61fe8413cb779 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java @@ -36,8 +36,8 @@ import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper; import org.apache.shardingsphere.metadata.persist.data.AlteredShardingSphereDatabaseData; import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; -import org.apache.shardingsphere.mode.lock.global.GlobalLockName; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.metadata.refresher.lock.StatisticsLock; import java.util.ArrayList; import java.util.Map; @@ -84,7 +84,7 @@ public void refresh() { } private void collectAndRefresh() { - GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.STATISTICS); + GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new StatisticsLock()); if (lockContext.tryLock(lockDefinition, 5000L)) { try { ShardingSphereStatistics statistics = contextManager.getMetaDataContexts().getStatistics(); diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/lock/StatisticsLock.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/lock/StatisticsLock.java new file mode 100644 index 0000000000000..db5d495102780 --- /dev/null +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/lock/StatisticsLock.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.metadata.refresher.lock; + +import org.apache.shardingsphere.mode.lock.global.GlobalLock; + +/** + * Statistics lock. + */ +public final class StatisticsLock implements GlobalLock { + + @Override + public String getName() { + return "statistics"; + } +} diff --git a/mode/core/src/test/java/org/apache/shardingsphere/mode/lock/global/GlobalLockDefinitionTest.java b/mode/core/src/test/java/org/apache/shardingsphere/mode/lock/global/GlobalLockDefinitionTest.java index 19732c7a989f3..82ee55f24a609 100644 --- a/mode/core/src/test/java/org/apache/shardingsphere/mode/lock/global/GlobalLockDefinitionTest.java +++ b/mode/core/src/test/java/org/apache/shardingsphere/mode/lock/global/GlobalLockDefinitionTest.java @@ -21,11 +21,15 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class GlobalLockDefinitionTest { @Test void assertGetLockKey() { - assertThat(new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK).getLockKey(), is("/lock/exclusive/locks/cluster_lock")); + GlobalLock globalLock = mock(GlobalLock.class); + when(globalLock.getName()).thenReturn("foo_lock"); + assertThat(new GlobalLockDefinition(globalLock).getLockKey(), is("/lock/exclusive/locks/foo_lock")); } } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/ClusterLockContextTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/ClusterLockContextTest.java index 08d857be142c3..8b168bad457df 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/ClusterLockContextTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/ClusterLockContextTest.java @@ -18,8 +18,8 @@ package org.apache.shardingsphere.mode.manager.cluster.lock; import org.apache.shardingsphere.mode.lock.LockPersistService; +import org.apache.shardingsphere.mode.lock.global.GlobalLock; import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; -import org.apache.shardingsphere.mode.lock.global.GlobalLockName; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -33,15 +33,20 @@ @ExtendWith(MockitoExtension.class) class ClusterLockContextTest { - private final GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK); + @Mock + private GlobalLock globalLock; @Mock private LockPersistService lockPersistService; + private GlobalLockDefinition lockDefinition; + private ClusterLockContext lockContext; @BeforeEach void init() { + when(globalLock.getName()).thenReturn("foo_lock"); + lockDefinition = new GlobalLockDefinition(globalLock); lockContext = new ClusterLockContext(lockPersistService); } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/GlobalLockPersistServiceTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/GlobalLockPersistServiceTest.java index 77ea847d5e003..6a34617d5b2bf 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/GlobalLockPersistServiceTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/GlobalLockPersistServiceTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.mode.manager.cluster.persist.service; +import org.apache.shardingsphere.mode.lock.global.GlobalLock; import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; -import org.apache.shardingsphere.mode.lock.global.GlobalLockName; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; @@ -33,20 +34,28 @@ @ExtendWith(MockitoExtension.class) class GlobalLockPersistServiceTest { + @Mock + private GlobalLock globalLock; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private ClusterPersistRepository repository; + @BeforeEach + void setUp() { + when(globalLock.getName()).thenReturn("foo_lock"); + } + @Test void assertTryLock() { - when(repository.getDistributedLockHolder().getDistributedLock("/lock/exclusive/locks/cluster_lock").tryLock(1000L)).thenReturn(true); - GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK); + when(repository.getDistributedLockHolder().getDistributedLock("/lock/exclusive/locks/foo_lock").tryLock(1000L)).thenReturn(true); + GlobalLockDefinition lockDefinition = new GlobalLockDefinition(globalLock); assertTrue(new GlobalLockPersistService(repository).tryLock(lockDefinition, 1000L)); } @Test void assertUnlock() { - GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK); + GlobalLockDefinition lockDefinition = new GlobalLockDefinition(globalLock); new GlobalLockPersistService(repository).unlock(lockDefinition); - verify(repository.getDistributedLockHolder().getDistributedLock("/lock/exclusive/locks/cluster_lock")).unlock(); + verify(repository.getDistributedLockHolder().getDistributedLock("/lock/exclusive/locks/foo_lock")).unlock(); } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutor.java index ebaf98e816db9..73d2bb59f26da 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutor.java @@ -23,13 +23,13 @@ import org.apache.shardingsphere.infra.algorithm.core.exception.MissingRequiredAlgorithmException; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.core.external.sql.identifier.SQLExceptionIdentifier; -import org.apache.shardingsphere.mode.lock.global.GlobalLockName; import org.apache.shardingsphere.infra.lock.LockContext; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.state.cluster.ClusterState; import org.apache.shardingsphere.mode.exception.LockedClusterException; import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.lock.ClusterLock; import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy; /** @@ -43,7 +43,7 @@ public void executeUpdate(final LockClusterStatement sqlStatement, final Context checkState(contextManager); checkAlgorithm(sqlStatement); LockContext lockContext = contextManager.getComputeNodeInstanceContext().getLockContext(); - GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK); + GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new ClusterLock()); if (lockContext.tryLock(lockDefinition, 3000L)) { try { checkState(contextManager); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java index 701e1c0eee3f0..d9d28b883f875 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java @@ -21,12 +21,12 @@ import org.apache.shardingsphere.distsql.handler.required.DistSQLExecutorClusterModeRequired; import org.apache.shardingsphere.distsql.statement.ral.updatable.UnlockClusterStatement; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; -import org.apache.shardingsphere.mode.lock.global.GlobalLockName; import org.apache.shardingsphere.infra.lock.LockContext; import org.apache.shardingsphere.infra.state.cluster.ClusterState; import org.apache.shardingsphere.mode.exception.NotLockedClusterException; import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.lock.ClusterLock; /** * Unlock cluster executor. @@ -38,7 +38,7 @@ public final class UnlockClusterExecutor implements DistSQLUpdateExecutor