Skip to content

Commit

Permalink
Add GlobalLock
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Dec 23, 2024
1 parent 3b8033f commit a5de6d3
Show file tree
Hide file tree
Showing 15 changed files with 197 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.mode.lock.global.GlobalLockName;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.parser.rule.SQLParserRule;

Expand Down Expand Up @@ -118,7 +117,7 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem
if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
jobItemManager.persistProgress(jobItemContext);
}
LockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.PREPARE, jobConfig.getJobId());
LockDefinition lockDefinition = new GlobalLockDefinition(new MigrationPrepareLock(jobConfig.getJobId()));
long startTimeMillis = System.currentTimeMillis();
if (lockContext.tryLock(lockDefinition, 600 * 1000L)) {
log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.lock.global.GlobalLock;

/**
* Migration prepare lock.
*/
@RequiredArgsConstructor
public final class MigrationPrepareLock implements GlobalLock {

private final String jobId;

@Override
public String getName() {
return String.format("migration_prepare_%s", jobId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.globalclock.executor;

import org.apache.shardingsphere.mode.lock.global.GlobalLock;

/**
* Global clock lock.
*/
public final class GlobalClockLock implements GlobalLock {

@Override
public String getName() {
return "global_clock";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.shardingsphere.globalclock.rule.constant.GlobalClockOrder;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.mode.lock.global.GlobalLockName;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
Expand All @@ -41,7 +40,7 @@
*/
public final class GlobalClockTransactionHook implements TransactionHook<GlobalClockRule> {

private final LockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.GLOBAL_CLOCK);
private final LockDefinition lockDefinition = new GlobalLockDefinition(new GlobalClockLock());

@Override
public void beforeBegin(final GlobalClockRule rule, final DatabaseType databaseType, final TransactionConnectionContext transactionContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.lock.global;

/**
* Global lock.
*/
public interface GlobalLock {

/**
* Get lock name.
*
* @return lock name
*/
String getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ public final class GlobalLockDefinition implements LockDefinition {
public GlobalLockDefinition(final GlobalLockName lockName, final Object... lockParams) {
lockKey = String.format(KEY_PATTERN, String.format(lockName.getLockName(), lockParams));
}

public GlobalLockDefinition(final GlobalLock globalLock) {
lockKey = String.format(KEY_PATTERN, globalLock.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@
@Getter
public enum GlobalLockName {

CLUSTER_LOCK("cluster_lock"),

PREPARE("prepare_%s"),

GLOBAL_CLOCK("global_clock"),

STATISTICS("statistics");

private final String lockName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
import org.apache.shardingsphere.metadata.persist.data.AlteredShardingSphereDatabaseData;
import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.mode.lock.global.GlobalLockName;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.refresher.lock.StatisticsLock;

import java.util.ArrayList;
import java.util.Map;
Expand Down Expand Up @@ -84,7 +84,7 @@ public void refresh() {
}

private void collectAndRefresh() {
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.STATISTICS);
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new StatisticsLock());
if (lockContext.tryLock(lockDefinition, 5000L)) {
try {
ShardingSphereStatistics statistics = contextManager.getMetaDataContexts().getStatistics();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.metadata.refresher.lock;

import org.apache.shardingsphere.mode.lock.global.GlobalLock;

/**
* Statistics lock.
*/
public final class StatisticsLock implements GlobalLock {

@Override
public String getName() {
return "statistics";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class GlobalLockDefinitionTest {

@Test
void assertGetLockKey() {
assertThat(new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK).getLockKey(), is("/lock/exclusive/locks/cluster_lock"));
GlobalLock globalLock = mock(GlobalLock.class);
when(globalLock.getName()).thenReturn("foo_lock");
assertThat(new GlobalLockDefinition(globalLock).getLockKey(), is("/lock/exclusive/locks/foo_lock"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.shardingsphere.mode.manager.cluster.lock;

import org.apache.shardingsphere.mode.lock.LockPersistService;
import org.apache.shardingsphere.mode.lock.global.GlobalLock;
import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.mode.lock.global.GlobalLockName;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -33,15 +33,20 @@
@ExtendWith(MockitoExtension.class)
class ClusterLockContextTest {

private final GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK);
@Mock
private GlobalLock globalLock;

@Mock
private LockPersistService<GlobalLockDefinition> lockPersistService;

private GlobalLockDefinition lockDefinition;

private ClusterLockContext lockContext;

@BeforeEach
void init() {
when(globalLock.getName()).thenReturn("foo_lock");
lockDefinition = new GlobalLockDefinition(globalLock);
lockContext = new ClusterLockContext(lockPersistService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.shardingsphere.mode.manager.cluster.persist.service;

import org.apache.shardingsphere.mode.lock.global.GlobalLock;
import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.mode.lock.global.GlobalLockName;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
Expand All @@ -33,20 +34,28 @@
@ExtendWith(MockitoExtension.class)
class GlobalLockPersistServiceTest {

@Mock
private GlobalLock globalLock;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ClusterPersistRepository repository;

@BeforeEach
void setUp() {
when(globalLock.getName()).thenReturn("foo_lock");
}

@Test
void assertTryLock() {
when(repository.getDistributedLockHolder().getDistributedLock("/lock/exclusive/locks/cluster_lock").tryLock(1000L)).thenReturn(true);
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK);
when(repository.getDistributedLockHolder().getDistributedLock("/lock/exclusive/locks/foo_lock").tryLock(1000L)).thenReturn(true);
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(globalLock);
assertTrue(new GlobalLockPersistService(repository).tryLock(lockDefinition, 1000L));
}

@Test
void assertUnlock() {
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK);
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(globalLock);
new GlobalLockPersistService(repository).unlock(lockDefinition);
verify(repository.getDistributedLockHolder().getDistributedLock("/lock/exclusive/locks/cluster_lock")).unlock();
verify(repository.getDistributedLockHolder().getDistributedLock("/lock/exclusive/locks/foo_lock")).unlock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import org.apache.shardingsphere.infra.algorithm.core.exception.MissingRequiredAlgorithmException;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.core.external.sql.identifier.SQLExceptionIdentifier;
import org.apache.shardingsphere.mode.lock.global.GlobalLockName;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.mode.exception.LockedClusterException;
import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.lock.ClusterLock;
import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;

/**
Expand All @@ -43,7 +43,7 @@ public void executeUpdate(final LockClusterStatement sqlStatement, final Context
checkState(contextManager);
checkAlgorithm(sqlStatement);
LockContext lockContext = contextManager.getComputeNodeInstanceContext().getLockContext();
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK);
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new ClusterLock());
if (lockContext.tryLock(lockDefinition, 3000L)) {
try {
checkState(contextManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.shardingsphere.distsql.handler.required.DistSQLExecutorClusterModeRequired;
import org.apache.shardingsphere.distsql.statement.ral.updatable.UnlockClusterStatement;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.mode.lock.global.GlobalLockName;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.mode.exception.NotLockedClusterException;
import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.lock.ClusterLock;

/**
* Unlock cluster executor.
Expand All @@ -38,7 +38,7 @@ public final class UnlockClusterExecutor implements DistSQLUpdateExecutor<Unlock
public void executeUpdate(final UnlockClusterStatement sqlStatement, final ContextManager contextManager) {
checkState(contextManager);
LockContext lockContext = contextManager.getComputeNodeInstanceContext().getLockContext();
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockName.CLUSTER_LOCK);
GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new ClusterLock());
if (lockContext.tryLock(lockDefinition, 3000L)) {
try {
checkState(contextManager);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.lock;

import org.apache.shardingsphere.mode.lock.global.GlobalLock;

/**
* Cluster lock.
*/
public final class ClusterLock implements GlobalLock {

@Override
public String getName() {
return "cluster_lock";
}
}

0 comments on commit a5de6d3

Please sign in to comment.