diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java new file mode 100644 index 0000000000..3870d92470 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java @@ -0,0 +1,79 @@ +package org.apache.helix.gateway.util; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * A per-key blocking executor that ensures that only one event is running for a given key at a time. + */ +public class PerKeyBlockingExecutor { + private final ThreadPoolExecutor _executor; + private final Map> _pendingBlockedEvents; + private final Set _runningEvents; + private final Lock _queueLock; + + public PerKeyBlockingExecutor(int maxWorkers) { + this._executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxWorkers); + this._pendingBlockedEvents = new HashMap<>(); + this._queueLock = new ReentrantLock(); + this._runningEvents = new HashSet<>(); + } + + /** + * Offer an event to be executed. If an event is already running for the given key, the event will be queued. + * @param key + * @param event + */ + public void offerEvent(String key, Runnable event) { + _queueLock.lock(); + try { + if (!_runningEvents.contains(key)) { + _executor.execute(() -> runEvent(key, event)); + } else { + _pendingBlockedEvents.computeIfAbsent(key, k -> new ConcurrentLinkedQueue<>()); + _pendingBlockedEvents.get(key).offer(event); + } + } finally { + _queueLock.unlock(); + } + } + + private void runEvent(String key, Runnable event) { + try { + _runningEvents.add(key); + event.run(); + } finally { + _queueLock.lock(); + try { + _runningEvents.remove(key); + processQueue(key); + } finally { + _queueLock.unlock(); + } + } + } + + private void processQueue(String key) { + if (!_pendingBlockedEvents.containsKey(key)) { + return; + } + Runnable event = _pendingBlockedEvents.get(key).poll(); + if (event != null) { + _executor.execute(() -> runEvent(key, event)); + } + } + + public void shutdown() { + _executor.shutdown(); + } + +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java new file mode 100644 index 0000000000..5bfc3c9491 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java @@ -0,0 +1,51 @@ +package org.apache.helix.gateway.util; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A registry that manages locks per key. + */ +public class PerKeyLockRegistry { + private final ConcurrentHashMap lockMap = new ConcurrentHashMap<>(); + + public void lock(String key) { + ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock()); + lock.lock(); + } + + public void unlock(String key) { + ReentrantLock lock = lockMap.get(key); + if (lock != null) { + lock.unlock(); + } + } + + /** + * Execute the action with the lock on the key + * @param key + * @param action + */ + public void withLock(String key, Runnable action) { + lock(key); + try { + action.run(); + } finally { + unlock(key); + } + } + + /** + * Remove the lock if it is not being used. + * it must be called after the lock is required + * @param key + */ + public boolean removeLock(String key) { + ReentrantLock lock = lockMap.get(key); + if (lock != null && lock.isHeldByCurrentThread() && !lock.hasQueuedThreads()) { + lockMap.remove(key, lock); + return true; + } + return false; + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java new file mode 100644 index 0000000000..457002cbd6 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java @@ -0,0 +1,44 @@ +package org.apache.helix.gateway; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.helix.gateway.util.PerKeyBlockingExecutor; +import org.testng.annotations.Test; +import org.testng.Assert; + + +public class TestPerKeyBlockingExecutor { + @Test + public void testEventNotAddedIfPending() throws InterruptedException { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch latch3 = new CountDownLatch(1); + + PerKeyBlockingExecutor perKeyBlockingExecutor = new PerKeyBlockingExecutor(3); + + perKeyBlockingExecutor.offerEvent("key1", () -> { + try { + latch1.await(); // Wait for the test to release this latch + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + perKeyBlockingExecutor.offerEvent("key1", () -> { + latch2.countDown(); + }); + + Thread.sleep(100); // Give time for the second event to be potentially processed + + Assert.assertFalse(latch2.await(100, TimeUnit.MILLISECONDS)); // Event 2 should not run yet + latch1.countDown(); // Release the first latch + Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS)); // Event 2 should run now + + perKeyBlockingExecutor.offerEvent("key1", () -> { + latch3.countDown(); + }); + + Assert.assertTrue(latch3.await(1, TimeUnit.SECONDS)); // Event 3 should run after Event 2 + perKeyBlockingExecutor.shutdown(); + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java new file mode 100644 index 0000000000..89d2e0bc19 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java @@ -0,0 +1,55 @@ +package org.apache.helix.gateway; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.helix.gateway.util.PerKeyLockRegistry; +import org.testng.annotations.Test; +import org.testng.Assert; + + +public class TestPerKeyLockRegistry { + @Test + public void testConcurrentAccess() { + PerKeyLockRegistry lockRegistry = new PerKeyLockRegistry(); + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(2); + + lockRegistry.withLock("key1", () -> { + counter.incrementAndGet(); + // try to acquir the lock for another key + lockRegistry.withLock("key2", () -> { + counter.incrementAndGet(); + }); + }); + + // counter should be 2 + Assert.assertEquals(2, counter.get()); + + // acquire the lock for key + ExecutorService executor = Executors.newFixedThreadPool(2); + lockRegistry.lock("key1"); + executor.submit(() -> { + lockRegistry.withLock("key1", () -> { + //try remove lock + Assert.assertFalse(lockRegistry.removeLock("key1")); + }); + }); + lockRegistry.unlock("key1"); + executor.submit(() -> { + lockRegistry.withLock("key2", () -> { + //try remove lock, should fail because key1 is not locked + Assert.assertFalse(lockRegistry.removeLock("key1")); + }); + }); + executor.submit(() -> { + lockRegistry.withLock("key1", () -> { + //try remove lock, only this tiem it succeded + Assert.assertFalse(lockRegistry.removeLock("key1")); + }); + }); + executor.shutdown(); + } +}