From 9b7e42a1e601e51e83917017c7e4e199864cb8db Mon Sep 17 00:00:00 2001 From: Christian Beikov Date: Fri, 29 May 2020 11:26:58 +0200 Subject: [PATCH] Introduce lock service for easy distributed locking --- clustering/wildfly/pom.xml | 12 ++++ .../wildfly/JGroupsLockService.java | 48 +++++++++++++++ .../wildfly/WildflyClusterStateManager.java | 17 ++++-- .../actor/spi/ClusterStateManager.java | 7 +++ .../com/blazebit/actor/spi/LockService.java | 37 ++++++++++++ .../actor/impl/ActorContextBuilderImpl.java | 60 ++++++++++++++++++- 6 files changed, 176 insertions(+), 5 deletions(-) create mode 100644 clustering/wildfly/src/main/java/com/blazebit/actor/clustering/wildfly/JGroupsLockService.java create mode 100644 core/api/src/main/java/com/blazebit/actor/spi/LockService.java diff --git a/clustering/wildfly/pom.xml b/clustering/wildfly/pom.xml index c71a5a8..859efd5 100644 --- a/clustering/wildfly/pom.xml +++ b/clustering/wildfly/pom.xml @@ -25,6 +25,18 @@ ${version.wildfly} provided + + org.wildfly + wildfly-clustering-api + ${version.wildfly} + provided + + + org.wildfly + wildfly-clustering-jgroups-api + ${version.wildfly} + provided + ${project.groupId} blaze-apt-utils diff --git a/clustering/wildfly/src/main/java/com/blazebit/actor/clustering/wildfly/JGroupsLockService.java b/clustering/wildfly/src/main/java/com/blazebit/actor/clustering/wildfly/JGroupsLockService.java new file mode 100644 index 0000000..13afd53 --- /dev/null +++ b/clustering/wildfly/src/main/java/com/blazebit/actor/clustering/wildfly/JGroupsLockService.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 - 2020 Blazebit. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.blazebit.actor.clustering.wildfly; + +import com.blazebit.actor.spi.LockService; +import org.jgroups.Channel; + +import java.util.concurrent.locks.Lock; + +/** + * A lock service implementation based on JGroups LockService. + * + * @author Christian Beikov + * @since 1.0.0 + */ +public class JGroupsLockService implements LockService { + + private final org.jgroups.blocks.locking.LockService lockService; + + /** + * Creates a lock service. + * + * @param channel The JGroups channel to use for the lock service + */ + public JGroupsLockService(Channel channel) { + this.lockService = new org.jgroups.blocks.locking.LockService(channel); + } + + @Override + public Lock getLock(String name) { + return lockService.getLock(name); + } + +} diff --git a/clustering/wildfly/src/main/java/com/blazebit/actor/clustering/wildfly/WildflyClusterStateManager.java b/clustering/wildfly/src/main/java/com/blazebit/actor/clustering/wildfly/WildflyClusterStateManager.java index c20d78d..da48f23 100644 --- a/clustering/wildfly/src/main/java/com/blazebit/actor/clustering/wildfly/WildflyClusterStateManager.java +++ b/clustering/wildfly/src/main/java/com/blazebit/actor/clustering/wildfly/WildflyClusterStateManager.java @@ -18,6 +18,7 @@ import com.blazebit.actor.spi.ClusterNodeInfo; import com.blazebit.actor.spi.ClusterStateListener; import com.blazebit.actor.spi.ClusterStateManager; +import com.blazebit.actor.spi.LockService; import com.blazebit.actor.spi.StateReturningEvent; import org.wildfly.clustering.dispatcher.Command; import org.wildfly.clustering.dispatcher.CommandDispatcher; @@ -72,10 +73,11 @@ public int compare(Node o1, Node o2) { }; // @Resource(lookup = "java:jboss/clustering/group/ee") - private Group channelGroup; - + private final Group channelGroup; // @Resource(lookup = "java:jboss/clustering/dispatcher/ee") - private CommandDispatcherFactory factory; + private final CommandDispatcherFactory factory; +// @Resource(lookup = "java:jboss/jgroups/channel/ee") + private final LockService lockService; private final List clusterStateListeners = new CopyOnWriteArrayList<>(); private final Map, List>> listeners = new ConcurrentHashMap<>(); @@ -88,10 +90,12 @@ public int compare(Node o1, Node o2) { * * @param channelGroup The channel group * @param factory The dispatcher factory + * @param lockService The cluster lock service */ - public WildflyClusterStateManager(Group channelGroup, CommandDispatcherFactory factory) { + public WildflyClusterStateManager(Group channelGroup, CommandDispatcherFactory factory, LockService lockService) { this.channelGroup = channelGroup; this.factory = factory; + this.lockService = lockService; this.localNode = new Node[]{channelGroup.getLocalNode()}; } @@ -231,6 +235,11 @@ private Map> fireEvent(StateReturningEvent eve } } + @Override + public LockService getLockService() { + return lockService; + } + @Override public boolean isStandalone() { return false; diff --git a/core/api/src/main/java/com/blazebit/actor/spi/ClusterStateManager.java b/core/api/src/main/java/com/blazebit/actor/spi/ClusterStateManager.java index 843c5d3..2c95a9f 100644 --- a/core/api/src/main/java/com/blazebit/actor/spi/ClusterStateManager.java +++ b/core/api/src/main/java/com/blazebit/actor/spi/ClusterStateManager.java @@ -85,6 +85,13 @@ public interface ClusterStateManager { */ public Map> fireEventExcludeSelf(StateReturningEvent event); + /** + * Returns the lock service for the cluster. + * + * @return The lock service + */ + public LockService getLockService(); + /** * Returns true if this is a standalone instance without real clustering support, otherwise false. * diff --git a/core/api/src/main/java/com/blazebit/actor/spi/LockService.java b/core/api/src/main/java/com/blazebit/actor/spi/LockService.java new file mode 100644 index 0000000..1d2de2e --- /dev/null +++ b/core/api/src/main/java/com/blazebit/actor/spi/LockService.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018 - 2020 Blazebit. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.blazebit.actor.spi; + +import java.util.concurrent.locks.Lock; + +/** + * A cluster lock service. + * + * @author Christian Beikov + * @since 1.0.0 + */ +public interface LockService { + + /** + * Returns a lock with the given name. + * + * @param name The lock name + * @return The lock + */ + public Lock getLock(String name); + +} diff --git a/core/impl/src/main/java/com/blazebit/actor/impl/ActorContextBuilderImpl.java b/core/impl/src/main/java/com/blazebit/actor/impl/ActorContextBuilderImpl.java index 58e9647..b7364e5 100644 --- a/core/impl/src/main/java/com/blazebit/actor/impl/ActorContextBuilderImpl.java +++ b/core/impl/src/main/java/com/blazebit/actor/impl/ActorContextBuilderImpl.java @@ -27,11 +27,14 @@ import com.blazebit.actor.spi.ClusterStateManager; import com.blazebit.actor.spi.Consumer; import com.blazebit.actor.spi.ConsumerListenerFactory; +import com.blazebit.actor.spi.LockService; import com.blazebit.actor.spi.Scheduler; import com.blazebit.actor.spi.SchedulerFactory; import com.blazebit.actor.spi.StateReturningEvent; import java.io.Serializable; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -49,6 +52,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * @author Christian Beikov @@ -391,6 +396,7 @@ public void stop(long timeout, TimeUnit unit) throws InterruptedException { private static class NoClusterStateManager implements ClusterStateManager, ClusterNodeInfo { private final Map, List>> listeners = new ConcurrentHashMap<>(); + private final LockService lockService = new LocalLockService(); @Override public ClusterNodeInfo getCurrentNodeInfo() { @@ -438,7 +444,12 @@ public Map> fireEvent(StateReturningEvent even @Override public Map> fireEventExcludeSelf(StateReturningEvent event) { // Noop because there is no cluster - return null; + return Collections.emptyMap(); + } + + @Override + public LockService getLockService() { + return lockService; } @Override @@ -478,6 +489,53 @@ public int getClusterSize() { } } + /** + * @author Christian Beikov + * @since 1.0.0 + */ + private static class LocalLockService implements LockService { + + private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + private final Map locks = new ConcurrentHashMap<>(); + + @Override + public Lock getLock(String name) { + WeakLockReference ref; + while ((ref = (WeakLockReference) referenceQueue.poll()) != null) { + locks.remove(ref.name); + } + WeakLockReference weakReference = locks.computeIfAbsent(name, k -> new WeakLockReference(new ReentrantLock(), referenceQueue, k)); + Lock l = weakReference.get(); + while (l == null) { + WeakLockReference old = weakReference; + weakReference = locks.compute(name, (k, v) -> { + if (v == old) { + return new WeakLockReference(new ReentrantLock(), referenceQueue, k); + } else { + return v; + } + }); + l = weakReference.get(); + } + + return l; + } + } + + /** + * @author Christian Beikov + * @since 1.0.0 + */ + private static class WeakLockReference extends WeakReference { + + final String name; + + public WeakLockReference(Lock referent, ReferenceQueue q, String name) { + super(referent, q); + this.name = name; + } + } + /** * @author Christian Beikov * @since 1.0.0