Skip to content

Commit

Permalink
Introduce lock service for easy distributed locking
Browse files Browse the repository at this point in the history
  • Loading branch information
beikov committed May 29, 2020
1 parent 0c6414e commit 9b7e42a
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 5 deletions.
12 changes: 12 additions & 0 deletions clustering/wildfly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@
<version>${version.wildfly}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wildfly</groupId>
<artifactId>wildfly-clustering-api</artifactId>
<version>${version.wildfly}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wildfly</groupId>
<artifactId>wildfly-clustering-jgroups-api</artifactId>
<version>${version.wildfly}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>blaze-apt-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018 - 2020 Blazebit.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.blazebit.actor.clustering.wildfly;

import com.blazebit.actor.spi.LockService;
import org.jgroups.Channel;

import java.util.concurrent.locks.Lock;

/**
* A lock service implementation based on JGroups LockService.
*
* @author Christian Beikov
* @since 1.0.0
*/
public class JGroupsLockService implements LockService {

private final org.jgroups.blocks.locking.LockService lockService;

/**
* Creates a lock service.
*
* @param channel The JGroups channel to use for the lock service
*/
public JGroupsLockService(Channel channel) {
this.lockService = new org.jgroups.blocks.locking.LockService(channel);
}

@Override
public Lock getLock(String name) {
return lockService.getLock(name);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.blazebit.actor.spi.ClusterNodeInfo;
import com.blazebit.actor.spi.ClusterStateListener;
import com.blazebit.actor.spi.ClusterStateManager;
import com.blazebit.actor.spi.LockService;
import com.blazebit.actor.spi.StateReturningEvent;
import org.wildfly.clustering.dispatcher.Command;
import org.wildfly.clustering.dispatcher.CommandDispatcher;
Expand Down Expand Up @@ -72,10 +73,11 @@ public int compare(Node o1, Node o2) {
};

// @Resource(lookup = "java:jboss/clustering/group/ee")
private Group channelGroup;

private final Group channelGroup;
// @Resource(lookup = "java:jboss/clustering/dispatcher/ee")
private CommandDispatcherFactory factory;
private final CommandDispatcherFactory factory;
// @Resource(lookup = "java:jboss/jgroups/channel/ee")
private final LockService lockService;

private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<Class<?>, List<java.util.function.Consumer<Serializable>>> listeners = new ConcurrentHashMap<>();
Expand All @@ -88,10 +90,12 @@ public int compare(Node o1, Node o2) {
*
* @param channelGroup The channel group
* @param factory The dispatcher factory
* @param lockService The cluster lock service
*/
public WildflyClusterStateManager(Group channelGroup, CommandDispatcherFactory factory) {
public WildflyClusterStateManager(Group channelGroup, CommandDispatcherFactory factory, LockService lockService) {
this.channelGroup = channelGroup;
this.factory = factory;
this.lockService = lockService;
this.localNode = new Node[]{channelGroup.getLocalNode()};
}

Expand Down Expand Up @@ -231,6 +235,11 @@ private <T> Map<ClusterNodeInfo, Future<T>> fireEvent(StateReturningEvent<T> eve
}
}

@Override
public LockService getLockService() {
return lockService;
}

@Override
public boolean isStandalone() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ public interface ClusterStateManager {
*/
public <T> Map<ClusterNodeInfo, Future<T>> fireEventExcludeSelf(StateReturningEvent<T> event);

/**
* Returns the lock service for the cluster.
*
* @return The lock service
*/
public LockService getLockService();

/**
* Returns <code>true</code> if this is a standalone instance without real clustering support, otherwise <code>false</code>.
*
Expand Down
37 changes: 37 additions & 0 deletions core/api/src/main/java/com/blazebit/actor/spi/LockService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2018 - 2020 Blazebit.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.blazebit.actor.spi;

import java.util.concurrent.locks.Lock;

/**
* A cluster lock service.
*
* @author Christian Beikov
* @since 1.0.0
*/
public interface LockService {

/**
* Returns a lock with the given name.
*
* @param name The lock name
* @return The lock
*/
public Lock getLock(String name);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import com.blazebit.actor.spi.ClusterStateManager;
import com.blazebit.actor.spi.Consumer;
import com.blazebit.actor.spi.ConsumerListenerFactory;
import com.blazebit.actor.spi.LockService;
import com.blazebit.actor.spi.Scheduler;
import com.blazebit.actor.spi.SchedulerFactory;
import com.blazebit.actor.spi.StateReturningEvent;

import java.io.Serializable;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -49,6 +52,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author Christian Beikov
Expand Down Expand Up @@ -391,6 +396,7 @@ public void stop(long timeout, TimeUnit unit) throws InterruptedException {
private static class NoClusterStateManager implements ClusterStateManager, ClusterNodeInfo {

private final Map<Class<?>, List<java.util.function.Consumer<Serializable>>> listeners = new ConcurrentHashMap<>();
private final LockService lockService = new LocalLockService();

@Override
public ClusterNodeInfo getCurrentNodeInfo() {
Expand Down Expand Up @@ -438,7 +444,12 @@ public <T> Map<ClusterNodeInfo, Future<T>> fireEvent(StateReturningEvent<T> even
@Override
public <T> Map<ClusterNodeInfo, Future<T>> fireEventExcludeSelf(StateReturningEvent<T> event) {
// Noop because there is no cluster
return null;
return Collections.emptyMap();
}

@Override
public LockService getLockService() {
return lockService;
}

@Override
Expand Down Expand Up @@ -478,6 +489,53 @@ public int getClusterSize() {
}
}

/**
* @author Christian Beikov
* @since 1.0.0
*/
private static class LocalLockService implements LockService {

private final ReferenceQueue<Lock> referenceQueue = new ReferenceQueue<>();
private final Map<String, WeakLockReference> locks = new ConcurrentHashMap<>();

@Override
public Lock getLock(String name) {
WeakLockReference ref;
while ((ref = (WeakLockReference) referenceQueue.poll()) != null) {
locks.remove(ref.name);
}
WeakLockReference weakReference = locks.computeIfAbsent(name, k -> new WeakLockReference(new ReentrantLock(), referenceQueue, k));
Lock l = weakReference.get();
while (l == null) {
WeakLockReference old = weakReference;
weakReference = locks.compute(name, (k, v) -> {
if (v == old) {
return new WeakLockReference(new ReentrantLock(), referenceQueue, k);
} else {
return v;
}
});
l = weakReference.get();
}

return l;
}
}

/**
* @author Christian Beikov
* @since 1.0.0
*/
private static class WeakLockReference extends WeakReference<Lock> {

final String name;

public WeakLockReference(Lock referent, ReferenceQueue<? super Lock> q, String name) {
super(referent, q);
this.name = name;
}
}

/**
* @author Christian Beikov
* @since 1.0.0
Expand Down

0 comments on commit 9b7e42a

Please sign in to comment.