diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java index 2f2b161a30684..0de6f94362215 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java @@ -106,7 +106,39 @@ K getKey() { return localKey; } + /** + * Get the value associated with the key. Returns null if the key does not match the key. + * + * @param key the key to match + * @return the value associated with the key, or null if the value has already been recycled or the key does not + * match + */ V getValue(K key) { + return getValueInternal(key, false); + } + + /** + * Get the value associated with the Map.Entry's key and value. Exact instance of the key is required to match. + * @param entry the entry which contains the key and {@link EntryWrapper} value to get the value from + * @return the value associated with the key, or null if the value has already been recycled or the key does not + * exactly match the same instance + */ + static V getValueMatchingMapEntry(Map.Entry> entry) { + return entry.getValue().getValueInternal(entry.getKey(), true); + } + + /** + * Get the value associated with the key. Returns null if the key does not match the key associated with the + * value. + * + * @param key the key to match + * @param requireSameKeyInstance when true, the matching will be restricted to exactly the same instance of the + * key as the one stored in the wrapper. This is used to avoid any races + * when retrieving or removing the entries from the cache when the key and value + * instances are available. + * @return the value associated with the key, or null if the key does not match + */ + private V getValueInternal(K key, boolean requireSameKeyInstance) { long stamp = lock.tryOptimisticRead(); K localKey = this.key; V localValue = this.value; @@ -116,7 +148,11 @@ V getValue(K key) { localValue = this.value; lock.unlockRead(stamp); } - if (localKey != key) { + // check that the given key matches the key associated with the value in the entry + // this is used to detect if the entry has already been recycled and contains another key + // when requireSameKeyInstance is true, the key must be exactly the same instance as the one stored in the + // entry to match + if (localKey != key && (requireSameKeyInstance || localKey == null || !localKey.equals(key))) { return null; } return localValue; @@ -236,34 +272,45 @@ public boolean exists(Key key) { * The caller is responsible for releasing the reference. */ public Value get(Key key) { - return getValue(key, entries.get(key)); + return getValueFromWrapper(key, entries.get(key)); } - private Value getValue(Key key, EntryWrapper valueWrapper) { + private Value getValueFromWrapper(Key key, EntryWrapper valueWrapper) { if (valueWrapper == null) { return null; } else { Value value = valueWrapper.getValue(key); - if (value == null) { - // the wrapper has been recycled and contains another key - return null; - } - try { - value.retain(); - } catch (IllegalReferenceCountException e) { - // Value was already deallocated - return null; - } - // check that the value matches the key and that there's at least 2 references to it since - // the cache should be holding one reference and a new reference was just added in this method - if (value.refCnt() > 1 && value.matchesKey(key)) { - return value; - } else { - // Value or IdentityWrapper was recycled and already contains another value - // release the reference added in this method - value.release(); - return null; - } + return getRetainedValueMatchingKey(key, value); + } + } + + private Value getValueMatchingEntry(Map.Entry> entry) { + Value valueMatchingEntry = EntryWrapper.getValueMatchingMapEntry(entry); + return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry); + } + + // validates that the value matches the key and that the value has not been recycled + // which are possible due to the lack of exclusive locks in the cache and the use of reference counted objects + private Value getRetainedValueMatchingKey(Key key, Value value) { + if (value == null) { + // the wrapper has been recycled and contains another key + return null; + } + try { + value.retain(); + } catch (IllegalReferenceCountException e) { + // Value was already deallocated + return null; + } + // check that the value matches the key and that there's at least 2 references to it since + // the cache should be holding one reference and a new reference was just added in this method + if (value.refCnt() > 1 && value.matchesKey(key)) { + return value; + } else { + // Value or IdentityWrapper was recycled and already contains another value + // release the reference added in this method + value.release(); + return null; } } @@ -280,7 +327,7 @@ public Collection getRange(Key first, Key last) { // Return the values of the entries found in cache for (Map.Entry> entry : entries.subMap(first, true, last, true).entrySet()) { - Value value = getValue(entry.getKey(), entry.getValue()); + Value value = getValueMatchingEntry(entry); if (value != null) { values.add(value); } @@ -297,6 +344,9 @@ public Collection getRange(Key first, Key last) { * @return an pair of ints, containing the number of removed entries and the total size */ public Pair removeRange(Key first, Key last, boolean lastInclusive) { + if (log.isDebugEnabled()) { + log.debug("Removing entries in range [{}, {}], lastInclusive: {}", first, last, lastInclusive); + } RemovalCounters counters = RemovalCounters.create(); Map> subMap = entries.subMap(first, true, last, lastInclusive); for (Map.Entry> entry : subMap.entrySet()) { @@ -320,7 +370,7 @@ private RemoveEntryResult removeEntry(Map.Entry> e boolean skipInvalid, Predicate removeCondition) { Key key = entry.getKey(); EntryWrapper entryWrapper = entry.getValue(); - Value value = entryWrapper.getValue(key); + Value value = getValueMatchingEntry(entry); if (value == null) { // the wrapper has already been recycled and contains another key if (!skipInvalid) { @@ -404,6 +454,9 @@ private Pair handleRemovalResult(RemovalCounters counters) { * @return a pair containing the number of entries evicted and their total size */ public Pair evictLeastAccessedEntries(long minSize) { + if (log.isDebugEnabled()) { + log.debug("Evicting entries to reach a minimum size of {}", minSize); + } checkArgument(minSize > 0); RemovalCounters counters = RemovalCounters.create(); while (counters.removedSize < minSize && !Thread.currentThread().isInterrupted()) { @@ -422,6 +475,9 @@ public Pair evictLeastAccessedEntries(long minSize) { * @return the tota */ public Pair evictLEntriesBeforeTimestamp(long maxTimestamp) { + if (log.isDebugEnabled()) { + log.debug("Evicting entries with timestamp <= {}", maxTimestamp); + } RemovalCounters counters = RemovalCounters.create(); while (!Thread.currentThread().isInterrupted()) { Map.Entry> entry = entries.firstEntry(); @@ -453,6 +509,9 @@ public long getSize() { * @return size of removed entries */ public Pair clear() { + if (log.isDebugEnabled()) { + log.debug("Clearing the cache with {} entries and size {}", entries.size(), size.get()); + } RemovalCounters counters = RemovalCounters.create(); while (!Thread.currentThread().isInterrupted()) { Map.Entry> entry = entries.firstEntry(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index e23937afea2c9..574ed2f325136 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -24,24 +24,28 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; - +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.Entry; @@ -53,18 +57,17 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import io.netty.buffer.ByteBuf; -import lombok.Cleanup; - @Slf4j public class ManagedLedgerBkTest extends BookKeeperClusterTestCase { @@ -241,6 +244,108 @@ public void verifyConcurrentUsage() throws Exception { assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0); } + @Test + public void verifyAsyncReadEntryUsingCache() throws Exception { + ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); + + config.setMaxCacheSize(100 * 1024 * 1024); + config.setCacheEvictionTimeThresholdMillis(10000); + config.setCacheEvictionIntervalMs(10000); + + @Cleanup("shutdown") + ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, config); + + ManagedLedgerConfig conf = new ManagedLedgerConfig(); + conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2) + .setRetentionSizeInMB(-1).setRetentionTime(-1, TimeUnit.MILLISECONDS); + final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my-ledger" + testName, conf); + + int NumProducers = 5; + int NumConsumers = 10; + + final AtomicBoolean done = new AtomicBoolean(); + final CyclicBarrier barrier = new CyclicBarrier(NumProducers + NumConsumers + 1); + + List> futures = new ArrayList(); + List positions = new CopyOnWriteArrayList<>(); + + for (int i = 0; i < NumProducers; i++) { + futures.add(executor.submit(() -> { + try { + // wait for all threads to be ready to start at once + barrier.await(); + while (!done.get()) { + Position position = ledger.addEntry("entry".getBytes()); + positions.add(position); + Thread.sleep(1); + } + } catch (Exception e) { + e.printStackTrace(); + throw FutureUtil.wrapToCompletionException(e); + } + })); + } + + // create a dummy cursor since caching happens only when there are active consumers + ManagedCursor cursor = ledger.openCursor("dummy"); + + for (int i = 0; i < NumConsumers; i++) { + futures.add(executor.submit(() -> { + try { + // wait for all threads to be ready to start at once + barrier.await(); + while (!done.get()) { + if (positions.isEmpty()) { + Thread.sleep(1); + continue; + } + // Simulate a replay queue read pattern where individual entries are read + Position randomPosition = positions.get(ThreadLocalRandom.current().nextInt(positions.size())); + // Clone the original instance so that another instance is used in the asyncReadEntry call + // This is to test that keys are compared by .equals and not by reference under the covers + randomPosition = PositionFactory.create(randomPosition); + CompletableFuture future = new CompletableFuture<>(); + ledger.asyncReadEntry(randomPosition, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + entry.release(); + future.complete(null); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + future.get(); + Thread.sleep(2); + } + } catch (Exception e) { + e.printStackTrace(); + throw FutureUtil.wrapToCompletionException(e); + } + })); + } + + // trigger all worker threads at once to continue from the barrier + barrier.await(); + + int testDurationSeconds = 3; + Thread.sleep(testDurationSeconds * 1000); + + done.set(true); + for (Future future : futures) { + future.get(); + } + + factory.getMbean().refreshStats(testDurationSeconds, TimeUnit.SECONDS); + + assertTrue(factory.getMbean().getCacheHitsRate() > 0.0); + assertEquals(factory.getMbean().getCacheMissesRate(), 0.0); + assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0); + assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0); + } + @Test public void testSimple() throws Exception { @Cleanup("shutdown") diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java index 4bcf2cc6c4e35..aa13d4b8e3488 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -338,4 +339,20 @@ public void testRemoveEntryWithInvalidMatchingKey() { cache.clear(); assertEquals(cache.getNumberOfEntries(), 0); } -} + + @Test + public void testGetKeyWithDifferentInstance() { + RangeCache cache = new RangeCache<>(); + Integer key = 129; + cache.put(key, new RefString("129")); + // create a different instance of the key + Integer key2 = Integer.valueOf(129); + // key and key2 are different instances but they are equal + assertNotSame(key, key2); + assertEquals(key, key2); + // get the value using key2 + RefString s = cache.get(key2); + // the value should be found + assertEquals(s.s, "129"); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java new file mode 100644 index 0000000000000..9fca95e2e8719 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; +import static org.assertj.core.api.SoftAssertions.assertSoftly; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.StickyKeyDispatcher; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.tests.KeySharedImplementationType; +import org.awaitility.Awaitility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class KeySharedSubscriptionBrokerCacheTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionBrokerCacheTest.class); + private static final String SUBSCRIPTION_NAME = "key_shared"; + private final KeySharedImplementationType implementationType; + + // Comment out the next line (Factory annotation) to run tests manually in IntelliJ, one-by-one + @Factory + public static Object[] createTestInstances() { + return KeySharedImplementationType.generateTestInstances(KeySharedSubscriptionBrokerCacheTest::new); + } + + public KeySharedSubscriptionBrokerCacheTest() { + // set the default implementation type for manual running in IntelliJ + this(KeySharedImplementationType.PIP379); + } + + public KeySharedSubscriptionBrokerCacheTest(KeySharedImplementationType implementationType) { + this.implementationType = implementationType; + } + + @DataProvider(name = "currentImplementationType") + public Object[] currentImplementationType() { + return new Object[]{ implementationType }; + } + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setSubscriptionKeySharedUseClassicPersistentImplementation(implementationType.classic); + conf.setSubscriptionSharedUseClassicPersistentImplementation(implementationType.classic); + conf.setUnblockStuckSubscriptionEnabled(false); + conf.setSubscriptionKeySharedUseConsistentHashing(true); + conf.setManagedLedgerCacheSizeMB(100); + + // configure to evict entries after 30 seconds so that we can test retrieval from cache + conf.setManagedLedgerCacheEvictionTimeThresholdMillis(30000); + conf.setManagedLedgerCacheEvictionIntervalMs(30000); + + // Important: this is currently necessary to make use of cache for replay queue reads + conf.setCacheEvictionByMarkDeletedPosition(true); + + conf.setManagedLedgerMaxReadsInFlightSizeInMB(100); + conf.setDispatcherRetryBackoffInitialTimeInMs(0); + conf.setDispatcherRetryBackoffMaxTimeInMs(0); + conf.setKeySharedUnblockingIntervalMs(0); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @AfterMethod(alwaysRun = true) + public void resetAfterMethod() throws Exception { + List list = admin.namespaces().getTopics("public/default"); + for (String topicName : list){ + if (!pulsar.getBrokerService().isSystemTopic(topicName)) { + admin.topics().delete(topicName, false); + } + } + pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor(null); + } + + // Use a fixed seed to make the tests using random values deterministic + // When a test fails, it's possible to re-run it to reproduce the issue + private static final Random random = new Random(1); + + private Producer createProducer(String topic, boolean enableBatch) throws PulsarClientException { + Producer producer = null; + if (enableBatch) { + producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(true) + .maxPendingMessages(2001) + .batcherBuilder(BatcherBuilder.KEY_BASED) + .create(); + } else { + producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .maxPendingMessages(2001) + .enableBatching(false) + .create(); + } + return producer; + } + + private StickyKeyConsumerSelector getSelector(String topic, String subscription) { + return getStickyKeyDispatcher(topic, subscription).getSelector(); + } + + @SneakyThrows + private StickyKeyDispatcher getStickyKeyDispatcher(String topic, String subscription) { + Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + PersistentSubscription sub = (PersistentSubscription) t.getSubscription(subscription); + StickyKeyDispatcher dispatcher = (StickyKeyDispatcher) sub.getDispatcher(); + return dispatcher; + } + + @Test(dataProvider = "currentImplementationType", invocationCount = 1) + public void testReplayQueueReadsGettingCached(KeySharedImplementationType impl) throws Exception { + String topic = newUniqueName("testReplayQueueReadsGettingCached"); + int numberOfKeys = 100; + long pauseTime = 100L; + + @Cleanup + Producer producer = createProducer(topic, false); + + // create a consumer and close it to create a subscription + pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe() + .close(); + + Set remainingMessageValues = Collections.synchronizedSet(new HashSet<>()); + BlockingQueue, Message>> unackedMessages = new LinkedBlockingQueue<>(); + AtomicBoolean c2MessagesShouldBeUnacked = new AtomicBoolean(true); + Set keysForC2 = new HashSet<>(); + AtomicLong lastMessageTimestamp = new AtomicLong(System.currentTimeMillis()); + + MessageListener messageHandler = (consumer, msg) -> { + lastMessageTimestamp.set(System.currentTimeMillis()); + synchronized (this) { + String key = msg.getKey(); + if (c2MessagesShouldBeUnacked.get() && keysForC2.contains(key)) { + unackedMessages.add(Pair.of(consumer, msg)); + return; + } + remainingMessageValues.remove(msg.getValue()); + consumer.acknowledgeAsync(msg); + } + }; + + pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor((ledgerId, firstEntry, lastEntry, entries) -> { + log.error("Attempting to read from BK when cache should be used. {}:{} to {}:{}", ledgerId, firstEntry, + ledgerId, lastEntry); + return CompletableFuture.failedFuture( + new ManagedLedgerException.NonRecoverableLedgerException( + "Should not read from BK since cache should be used.")); + }); + + // Adding a new consumer. + @Cleanup + Consumer c1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c1") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(messageHandler) + .subscribe(); + + @Cleanup + Consumer c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(messageHandler) + .subscribe(); + + @Cleanup + Consumer c3 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c3") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(messageHandler) + .subscribe(); + + StickyKeyDispatcher dispatcher = getStickyKeyDispatcher(topic, SUBSCRIPTION_NAME); + StickyKeyConsumerSelector selector = dispatcher.getSelector(); + + // find keys that will be assigned to c2 + for (int i = 0; i < numberOfKeys; i++) { + String key = String.valueOf(i); + byte[] keyBytes = key.getBytes(UTF_8); + int hash = selector.makeStickyKeyHash(keyBytes); + if (selector.select(hash).consumerName().equals("c2")) { + keysForC2.add(key); + } + } + + // close c2 + c2.close(); + + // produce messages with random keys + for (int i = 0; i < 1000; i++) { + String key = String.valueOf(random.nextInt(numberOfKeys)); + //log.info("Producing message with key: {} value: {}", key, i); + remainingMessageValues.add(i); + producer.newMessage() + .key(key) + .value(i) + .send(); + } + + // reconnect c2 + c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(messageHandler) + .startPaused(true) + .subscribe(); + + // ack the unacked messages to unblock c2 keys + c2MessagesShouldBeUnacked.set(false); + Pair, Message> consumerMessagePair; + while ((consumerMessagePair = unackedMessages.poll()) != null) { + messageHandler.received(consumerMessagePair.getLeft(), consumerMessagePair.getRight()); + } + + // produce more messages with random keys + for (int i = 0; i < 1000; i++) { + String key = String.valueOf(random.nextInt(numberOfKeys)); + //log.info("Producing message with key: {} value: {}", key, i); + remainingMessageValues.add(i); + producer.newMessage() + .key(key) + .value(i) + .send(); + } + + c2.resume(); + + Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> { + return remainingMessageValues.isEmpty() + || System.currentTimeMillis() - lastMessageTimestamp.get() > 50 * pauseTime; + }); + + try { + assertSoftly(softly -> { + softly.assertThat(remainingMessageValues).as("remainingMessageValues").isEmpty(); + ManagedLedgerFactoryMXBean cacheStats = pulsar.getDefaultManagedLedgerFactory().getCacheStats(); + softly.assertThat(cacheStats.getCacheHitsTotal()).as("cache hits").isGreaterThan(0); + softly.assertThat(cacheStats.getCacheMissesTotal()).as("cache misses").isEqualTo(0); + softly.assertThat(cacheStats.getNumberOfCacheEvictions()).as("cache evictions").isEqualTo(0); + }); + } finally { + logTopicStats(topic); + } + } +} diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml index 7b3cd6a04fcca..b348a1d04b797 100644 --- a/pulsar-broker/src/test/resources/log4j2.xml +++ b/pulsar-broker/src/test/resources/log4j2.xml @@ -53,5 +53,14 @@ --> + + diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java index 3147279477843..f7c343d7421f3 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java @@ -55,6 +55,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -127,6 +128,14 @@ public Object[][] mainProcessCasesProvider(){ }; } + @Override + protected ManagedLedgerFactoryConfig createManagedLedgerFactoryConfig() { + ManagedLedgerFactoryConfig managedLedgerFactoryConfig = super.createManagedLedgerFactoryConfig(); + // disable the broker cache so that assertAllByteBufHasBeenReleased can work correctly. + managedLedgerFactoryConfig.setMaxCacheSize(0); + return managedLedgerFactoryConfig; + } + /** * Tests all operations from write to callback, including these step: * 1. Write many data. diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java index ac5aa3bd8927e..e3e6945620c34 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java @@ -82,10 +82,14 @@ public void setUp(Method method) throws Exception { throw e; } - ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig(); + ManagedLedgerFactoryConfig conf = createManagedLedgerFactoryConfig(); factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, conf); } + protected ManagedLedgerFactoryConfig createManagedLedgerFactoryConfig() { + return new ManagedLedgerFactoryConfig(); + } + @AfterMethod(alwaysRun = true) public void tearDown(Method method) { try {