-
Notifications
You must be signed in to change notification settings - Fork 229
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
229 additions
and
0 deletions.
There are no files selected for viewing
79 changes: 79 additions & 0 deletions
79
helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package org.apache.helix.gateway.util; | ||
|
||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Queue; | ||
import java.util.Set; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
|
||
/** | ||
* A per-key blocking executor that ensures that only one event is running for a given key at a time. | ||
*/ | ||
public class PerKeyBlockingExecutor { | ||
private final ThreadPoolExecutor _executor; | ||
private final Map<String, Queue<Runnable>> _pendingBlockedEvents; | ||
private final Set<String> _runningEvents; | ||
private final Lock _queueLock; | ||
|
||
public PerKeyBlockingExecutor(int maxWorkers) { | ||
this._executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxWorkers); | ||
this._pendingBlockedEvents = new HashMap<>(); | ||
this._queueLock = new ReentrantLock(); | ||
this._runningEvents = new HashSet<>(); | ||
} | ||
|
||
/** | ||
* Offer an event to be executed. If an event is already running for the given key, the event will be queued. | ||
* @param key | ||
* @param event | ||
*/ | ||
public void offerEvent(String key, Runnable event) { | ||
_queueLock.lock(); | ||
try { | ||
if (!_runningEvents.contains(key)) { | ||
_executor.execute(() -> runEvent(key, event)); | ||
} else { | ||
_pendingBlockedEvents.computeIfAbsent(key, k -> new ConcurrentLinkedQueue<>()); | ||
_pendingBlockedEvents.get(key).offer(event); | ||
} | ||
} finally { | ||
_queueLock.unlock(); | ||
} | ||
} | ||
|
||
private void runEvent(String key, Runnable event) { | ||
try { | ||
_runningEvents.add(key); | ||
event.run(); | ||
} finally { | ||
_queueLock.lock(); | ||
try { | ||
_runningEvents.remove(key); | ||
processQueue(key); | ||
} finally { | ||
_queueLock.unlock(); | ||
} | ||
} | ||
} | ||
|
||
private void processQueue(String key) { | ||
if (!_pendingBlockedEvents.containsKey(key)) { | ||
return; | ||
} | ||
Runnable event = _pendingBlockedEvents.get(key).poll(); | ||
if (event != null) { | ||
_executor.execute(() -> runEvent(key, event)); | ||
} | ||
} | ||
|
||
public void shutdown() { | ||
_executor.shutdown(); | ||
} | ||
|
||
} |
51 changes: 51 additions & 0 deletions
51
helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package org.apache.helix.gateway.util; | ||
|
||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
/** | ||
* A registry that manages locks per key. | ||
*/ | ||
public class PerKeyLockRegistry { | ||
private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>(); | ||
|
||
public void lock(String key) { | ||
ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock()); | ||
lock.lock(); | ||
} | ||
|
||
public void unlock(String key) { | ||
ReentrantLock lock = lockMap.get(key); | ||
if (lock != null) { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
/** | ||
* Execute the action with the lock on the key | ||
* @param key | ||
* @param action | ||
*/ | ||
public void withLock(String key, Runnable action) { | ||
lock(key); | ||
try { | ||
action.run(); | ||
} finally { | ||
unlock(key); | ||
} | ||
} | ||
|
||
/** | ||
* Remove the lock if it is not being used. | ||
* it must be called after the lock is required | ||
* @param key | ||
*/ | ||
public boolean removeLock(String key) { | ||
ReentrantLock lock = lockMap.get(key); | ||
if (lock != null && lock.isHeldByCurrentThread() && !lock.hasQueuedThreads()) { | ||
lockMap.remove(key, lock); | ||
return true; | ||
} | ||
return false; | ||
} | ||
} |
44 changes: 44 additions & 0 deletions
44
helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package org.apache.helix.gateway; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.helix.gateway.util.PerKeyBlockingExecutor; | ||
import org.testng.annotations.Test; | ||
import org.testng.Assert; | ||
|
||
|
||
public class TestPerKeyBlockingExecutor { | ||
@Test | ||
public void testEventNotAddedIfPending() throws InterruptedException { | ||
CountDownLatch latch1 = new CountDownLatch(1); | ||
CountDownLatch latch2 = new CountDownLatch(1); | ||
CountDownLatch latch3 = new CountDownLatch(1); | ||
|
||
PerKeyBlockingExecutor perKeyBlockingExecutor = new PerKeyBlockingExecutor(3); | ||
|
||
perKeyBlockingExecutor.offerEvent("key1", () -> { | ||
try { | ||
latch1.await(); // Wait for the test to release this latch | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
}); | ||
|
||
perKeyBlockingExecutor.offerEvent("key1", () -> { | ||
latch2.countDown(); | ||
}); | ||
|
||
Thread.sleep(100); // Give time for the second event to be potentially processed | ||
|
||
Assert.assertFalse(latch2.await(100, TimeUnit.MILLISECONDS)); // Event 2 should not run yet | ||
latch1.countDown(); // Release the first latch | ||
Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS)); // Event 2 should run now | ||
|
||
perKeyBlockingExecutor.offerEvent("key1", () -> { | ||
latch3.countDown(); | ||
}); | ||
|
||
Assert.assertTrue(latch3.await(1, TimeUnit.SECONDS)); // Event 3 should run after Event 2 | ||
perKeyBlockingExecutor.shutdown(); | ||
} | ||
} |
55 changes: 55 additions & 0 deletions
55
helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package org.apache.helix.gateway; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import org.apache.helix.gateway.util.PerKeyLockRegistry; | ||
import org.testng.annotations.Test; | ||
import org.testng.Assert; | ||
|
||
|
||
public class TestPerKeyLockRegistry { | ||
@Test | ||
public void testConcurrentAccess() { | ||
PerKeyLockRegistry lockRegistry = new PerKeyLockRegistry(); | ||
final AtomicInteger counter = new AtomicInteger(0); | ||
final CountDownLatch startLatch = new CountDownLatch(1); | ||
final CountDownLatch doneLatch = new CountDownLatch(2); | ||
|
||
lockRegistry.withLock("key1", () -> { | ||
counter.incrementAndGet(); | ||
// try to acquir the lock for another key | ||
lockRegistry.withLock("key2", () -> { | ||
counter.incrementAndGet(); | ||
}); | ||
}); | ||
|
||
// counter should be 2 | ||
Assert.assertEquals(2, counter.get()); | ||
|
||
// acquire the lock for key | ||
ExecutorService executor = Executors.newFixedThreadPool(2); | ||
lockRegistry.lock("key1"); | ||
executor.submit(() -> { | ||
lockRegistry.withLock("key1", () -> { | ||
//try remove lock | ||
Assert.assertFalse(lockRegistry.removeLock("key1")); | ||
}); | ||
}); | ||
lockRegistry.unlock("key1"); | ||
executor.submit(() -> { | ||
lockRegistry.withLock("key2", () -> { | ||
//try remove lock, should fail because key1 is not locked | ||
Assert.assertFalse(lockRegistry.removeLock("key1")); | ||
}); | ||
}); | ||
executor.submit(() -> { | ||
lockRegistry.withLock("key1", () -> { | ||
//try remove lock, only this tiem it succeded | ||
Assert.assertFalse(lockRegistry.removeLock("key1")); | ||
}); | ||
}); | ||
executor.shutdown(); | ||
} | ||
} |