Skip to content

Commit

Permalink
[fix][broker Fix bug in RangeCache where different instance of the ke…
Browse files Browse the repository at this point in the history
…y wouldn't ever match (apache#23903)
  • Loading branch information
lhotari authored Jan 28, 2025
1 parent 9079262 commit b6cfecc
Show file tree
Hide file tree
Showing 7 changed files with 543 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,39 @@ K getKey() {
return localKey;
}

/**
* Get the value associated with the key. Returns null if the key does not match the key.
*
* @param key the key to match
* @return the value associated with the key, or null if the value has already been recycled or the key does not
* match
*/
V getValue(K key) {
return getValueInternal(key, false);
}

/**
* Get the value associated with the Map.Entry's key and value. Exact instance of the key is required to match.
* @param entry the entry which contains the key and {@link EntryWrapper} value to get the value from
* @return the value associated with the key, or null if the value has already been recycled or the key does not
* exactly match the same instance
*/
static <K, V> V getValueMatchingMapEntry(Map.Entry<K, EntryWrapper<K, V>> entry) {
return entry.getValue().getValueInternal(entry.getKey(), true);
}

/**
* Get the value associated with the key. Returns null if the key does not match the key associated with the
* value.
*
* @param key the key to match
* @param requireSameKeyInstance when true, the matching will be restricted to exactly the same instance of the
* key as the one stored in the wrapper. This is used to avoid any races
* when retrieving or removing the entries from the cache when the key and value
* instances are available.
* @return the value associated with the key, or null if the key does not match
*/
private V getValueInternal(K key, boolean requireSameKeyInstance) {
long stamp = lock.tryOptimisticRead();
K localKey = this.key;
V localValue = this.value;
Expand All @@ -116,7 +148,11 @@ V getValue(K key) {
localValue = this.value;
lock.unlockRead(stamp);
}
if (localKey != key) {
// check that the given key matches the key associated with the value in the entry
// this is used to detect if the entry has already been recycled and contains another key
// when requireSameKeyInstance is true, the key must be exactly the same instance as the one stored in the
// entry to match
if (localKey != key && (requireSameKeyInstance || localKey == null || !localKey.equals(key))) {
return null;
}
return localValue;
Expand Down Expand Up @@ -236,34 +272,45 @@ public boolean exists(Key key) {
* The caller is responsible for releasing the reference.
*/
public Value get(Key key) {
return getValue(key, entries.get(key));
return getValueFromWrapper(key, entries.get(key));
}

private Value getValue(Key key, EntryWrapper<Key, Value> valueWrapper) {
private Value getValueFromWrapper(Key key, EntryWrapper<Key, Value> valueWrapper) {
if (valueWrapper == null) {
return null;
} else {
Value value = valueWrapper.getValue(key);
if (value == null) {
// the wrapper has been recycled and contains another key
return null;
}
try {
value.retain();
} catch (IllegalReferenceCountException e) {
// Value was already deallocated
return null;
}
// check that the value matches the key and that there's at least 2 references to it since
// the cache should be holding one reference and a new reference was just added in this method
if (value.refCnt() > 1 && value.matchesKey(key)) {
return value;
} else {
// Value or IdentityWrapper was recycled and already contains another value
// release the reference added in this method
value.release();
return null;
}
return getRetainedValueMatchingKey(key, value);
}
}

private Value getValueMatchingEntry(Map.Entry<Key, EntryWrapper<Key, Value>> entry) {
Value valueMatchingEntry = EntryWrapper.getValueMatchingMapEntry(entry);
return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
}

// validates that the value matches the key and that the value has not been recycled
// which are possible due to the lack of exclusive locks in the cache and the use of reference counted objects
private Value getRetainedValueMatchingKey(Key key, Value value) {
if (value == null) {
// the wrapper has been recycled and contains another key
return null;
}
try {
value.retain();
} catch (IllegalReferenceCountException e) {
// Value was already deallocated
return null;
}
// check that the value matches the key and that there's at least 2 references to it since
// the cache should be holding one reference and a new reference was just added in this method
if (value.refCnt() > 1 && value.matchesKey(key)) {
return value;
} else {
// Value or IdentityWrapper was recycled and already contains another value
// release the reference added in this method
value.release();
return null;
}
}

Expand All @@ -280,7 +327,7 @@ public Collection<Value> getRange(Key first, Key last) {

// Return the values of the entries found in cache
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : entries.subMap(first, true, last, true).entrySet()) {
Value value = getValue(entry.getKey(), entry.getValue());
Value value = getValueMatchingEntry(entry);
if (value != null) {
values.add(value);
}
Expand All @@ -297,6 +344,9 @@ public Collection<Value> getRange(Key first, Key last) {
* @return an pair of ints, containing the number of removed entries and the total size
*/
public Pair<Integer, Long> removeRange(Key first, Key last, boolean lastInclusive) {
if (log.isDebugEnabled()) {
log.debug("Removing entries in range [{}, {}], lastInclusive: {}", first, last, lastInclusive);
}
RemovalCounters counters = RemovalCounters.create();
Map<Key, EntryWrapper<Key, Value>> subMap = entries.subMap(first, true, last, lastInclusive);
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : subMap.entrySet()) {
Expand All @@ -320,7 +370,7 @@ private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key, Value>> e
boolean skipInvalid, Predicate<Value> removeCondition) {
Key key = entry.getKey();
EntryWrapper<Key, Value> entryWrapper = entry.getValue();
Value value = entryWrapper.getValue(key);
Value value = getValueMatchingEntry(entry);
if (value == null) {
// the wrapper has already been recycled and contains another key
if (!skipInvalid) {
Expand Down Expand Up @@ -404,6 +454,9 @@ private Pair<Integer, Long> handleRemovalResult(RemovalCounters counters) {
* @return a pair containing the number of entries evicted and their total size
*/
public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
if (log.isDebugEnabled()) {
log.debug("Evicting entries to reach a minimum size of {}", minSize);
}
checkArgument(minSize > 0);
RemovalCounters counters = RemovalCounters.create();
while (counters.removedSize < minSize && !Thread.currentThread().isInterrupted()) {
Expand All @@ -422,6 +475,9 @@ public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
* @return the tota
*/
public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
if (log.isDebugEnabled()) {
log.debug("Evicting entries with timestamp <= {}", maxTimestamp);
}
RemovalCounters counters = RemovalCounters.create();
while (!Thread.currentThread().isInterrupted()) {
Map.Entry<Key, EntryWrapper<Key, Value>> entry = entries.firstEntry();
Expand Down Expand Up @@ -453,6 +509,9 @@ public long getSize() {
* @return size of removed entries
*/
public Pair<Integer, Long> clear() {
if (log.isDebugEnabled()) {
log.debug("Clearing the cache with {} entries and size {}", entries.size(), size.get());
}
RemovalCounters counters = RemovalCounters.create();
while (!Thread.currentThread().isInterrupted()) {
Map.Entry<Key, EntryWrapper<Key, Value>> entry = entries.firstEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,28 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -53,18 +57,17 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import lombok.Cleanup;

@Slf4j
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {

Expand Down Expand Up @@ -241,6 +244,108 @@ public void verifyConcurrentUsage() throws Exception {
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
}

@Test
public void verifyAsyncReadEntryUsingCache() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();

config.setMaxCacheSize(100 * 1024 * 1024);
config.setCacheEvictionTimeThresholdMillis(10000);
config.setCacheEvictionIntervalMs(10000);

@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);

ManagedLedgerConfig conf = new ManagedLedgerConfig();
conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2)
.setRetentionSizeInMB(-1).setRetentionTime(-1, TimeUnit.MILLISECONDS);
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my-ledger" + testName, conf);

int NumProducers = 5;
int NumConsumers = 10;

final AtomicBoolean done = new AtomicBoolean();
final CyclicBarrier barrier = new CyclicBarrier(NumProducers + NumConsumers + 1);

List<Future<?>> futures = new ArrayList();
List<Position> positions = new CopyOnWriteArrayList<>();

for (int i = 0; i < NumProducers; i++) {
futures.add(executor.submit(() -> {
try {
// wait for all threads to be ready to start at once
barrier.await();
while (!done.get()) {
Position position = ledger.addEntry("entry".getBytes());
positions.add(position);
Thread.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
throw FutureUtil.wrapToCompletionException(e);
}
}));
}

// create a dummy cursor since caching happens only when there are active consumers
ManagedCursor cursor = ledger.openCursor("dummy");

for (int i = 0; i < NumConsumers; i++) {
futures.add(executor.submit(() -> {
try {
// wait for all threads to be ready to start at once
barrier.await();
while (!done.get()) {
if (positions.isEmpty()) {
Thread.sleep(1);
continue;
}
// Simulate a replay queue read pattern where individual entries are read
Position randomPosition = positions.get(ThreadLocalRandom.current().nextInt(positions.size()));
// Clone the original instance so that another instance is used in the asyncReadEntry call
// This is to test that keys are compared by .equals and not by reference under the covers
randomPosition = PositionFactory.create(randomPosition);
CompletableFuture<Void> future = new CompletableFuture<>();
ledger.asyncReadEntry(randomPosition, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entry.release();
future.complete(null);
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
future.get();
Thread.sleep(2);
}
} catch (Exception e) {
e.printStackTrace();
throw FutureUtil.wrapToCompletionException(e);
}
}));
}

// trigger all worker threads at once to continue from the barrier
barrier.await();

int testDurationSeconds = 3;
Thread.sleep(testDurationSeconds * 1000);

done.set(true);
for (Future<?> future : futures) {
future.get();
}

factory.getMbean().refreshStats(testDurationSeconds, TimeUnit.SECONDS);

assertTrue(factory.getMbean().getCacheHitsRate() > 0.0);
assertEquals(factory.getMbean().getCacheMissesRate(), 0.0);
assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0);
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
}

@Test
public void testSimple() throws Exception {
@Cleanup("shutdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -338,4 +339,20 @@ public void testRemoveEntryWithInvalidMatchingKey() {
cache.clear();
assertEquals(cache.getNumberOfEntries(), 0);
}
}

@Test
public void testGetKeyWithDifferentInstance() {
RangeCache<Integer, RefString> cache = new RangeCache<>();
Integer key = 129;
cache.put(key, new RefString("129"));
// create a different instance of the key
Integer key2 = Integer.valueOf(129);
// key and key2 are different instances but they are equal
assertNotSame(key, key2);
assertEquals(key, key2);
// get the value using key2
RefString s = cache.get(key2);
// the value should be found
assertEquals(s.s, "129");
}
}
Loading

0 comments on commit b6cfecc

Please sign in to comment.