From 8603164cc646dcb4bc2a6ead51cc9ad37441fda7 Mon Sep 17 00:00:00 2001 From: Christopher Peck <27231838+itschrispeck@users.noreply.github.com> Date: Fri, 26 Jul 2024 14:04:13 -0700 Subject: [PATCH] Improve realtime Lucene text index freshness/cpu/disk io usage (#13503) * enable parallel background refresh for lucene text index * lint * address comments --- .../mutable/MutableSegmentImpl.java | 35 --- .../LuceneNRTCachingMergePolicy.java | 64 +++++ ...ealtimeLuceneIndexReaderRefreshThread.java | 153 ---------- .../RealtimeLuceneIndexRefreshManager.java | 268 ++++++++++++++++++ .../RealtimeLuceneIndexRefreshState.java | 139 --------- .../RealtimeLuceneTextIndex.java | 28 +- .../impl/text/LuceneTextIndexCreator.java | 13 +- .../RealtimeSegmentConverterTest.java | 2 + .../LuceneMutableTextIndexTest.java | 19 +- .../NativeAndLuceneMutableTextIndexTest.java | 3 +- ...RealtimeLuceneIndexRefreshManagerTest.java | 160 +++++++++++ .../starter/helix/BaseServerStarter.java | 19 +- .../pinot/spi/utils/CommonConstants.java | 5 + 13 files changed, 554 insertions(+), 354 deletions(-) create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneNRTCachingMergePolicy.java delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManager.java delete mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManagerTest.java diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 58d02af9627d..847f589f2a4d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -51,8 +51,6 @@ import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary; -import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState; -import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; import org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector; import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; import org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource; @@ -159,8 +157,6 @@ public class MutableSegmentImpl implements MutableSegment { private volatile long _lastIndexedTimeMs = Long.MIN_VALUE; private volatile long _latestIngestionTimeMs = Long.MIN_VALUE; - private RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders _realtimeLuceneReaders; - private final PartitionDedupMetadataManager _partitionDedupMetadataManager; private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager; @@ -347,17 +343,6 @@ public boolean isMutableSegment() { } } - // TODO - this logic is in the wrong place and belongs in a Lucene-specific submodule, - // it is beyond the scope of realtime index pluggability to do this refactoring, so realtime - // text indexes remain statically defined. Revisit this after this refactoring has been done. - MutableIndex textIndex = mutableIndexes.get(StandardIndexes.text()); - if (textIndex instanceof RealtimeLuceneTextIndex) { - if (_realtimeLuceneReaders == null) { - _realtimeLuceneReaders = new RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName); - } - _realtimeLuceneReaders.addReader((RealtimeLuceneTextIndex) textIndex); - } - Pair columnAggregatorPair = metricsAggregators.getOrDefault(column, Pair.of(column, null)); String sourceColumn = columnAggregatorPair.getLeft(); @@ -368,13 +353,6 @@ public boolean isMutableSegment() { nullValueVector, sourceColumn, valueAggregator)); } - // TODO separate concerns: this logic does not belong here - if (_realtimeLuceneReaders != null) { - // add the realtime lucene index readers to the global queue for refresh task to pick up - RealtimeLuceneIndexRefreshState realtimeLuceneIndexRefreshState = RealtimeLuceneIndexRefreshState.getInstance(); - realtimeLuceneIndexRefreshState.addRealtimeReadersToQueue(_realtimeLuceneReaders); - } - _partitionDedupMetadataManager = config.getPartitionDedupMetadataManager(); _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager(); @@ -993,19 +971,6 @@ public void destroy() { } } - // Stop the text index refresh before closing the indexes - if (_realtimeLuceneReaders != null) { - // set this to true as a way of signalling the refresh task thread to - // not attempt refresh on this segment here onwards - _realtimeLuceneReaders.getLock().lock(); - try { - _realtimeLuceneReaders.setSegmentDestroyed(); - _realtimeLuceneReaders.clearRealtimeReaderList(); - } finally { - _realtimeLuceneReaders.getLock().unlock(); - } - } - // Close the indexes for (IndexContainer indexContainer : _indexContainerMap.values()) { indexContainer.close(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneNRTCachingMergePolicy.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneNRTCachingMergePolicy.java new file mode 100644 index 000000000000..ab811d85933a --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneNRTCachingMergePolicy.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.TieredMergePolicy; +import org.apache.lucene.store.NRTCachingDirectory; + + +/** + * LuceneNRTCachingMergePolicy is a best-effort policy to generate merges for segments that are fully in memory, + * at the time of SegmentInfo selection. It does not consider segments that have been flushed to disk eligible + * for merging. + *

+ * Each refresh creates a small Lucene segment. Increasing the frequency of refreshes to reduce indexing lag results + * in a large number of small segments, and high disk IO ops for merging them. By using this best-effort merge policy + * the small ops can be avoided since the segments are in memory when merged. + */ +public class LuceneNRTCachingMergePolicy extends TieredMergePolicy { + private final NRTCachingDirectory _nrtCachingDirectory; + + public LuceneNRTCachingMergePolicy(NRTCachingDirectory nrtCachingDirectory) { + _nrtCachingDirectory = nrtCachingDirectory; + } + + @Override + public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) + throws IOException { + SegmentInfos inMemorySegmentInfos = new SegmentInfos(segmentInfos.getIndexCreatedVersionMajor()); + // Collect all segment commit infos that still have all files in memory + Set cachedFiles = new HashSet<>(List.of(_nrtCachingDirectory.listCachedFiles())); + for (SegmentCommitInfo info : segmentInfos) { + for (String file : info.files()) { + if (!cachedFiles.contains(file)) { + break; + } + } + inMemorySegmentInfos.add(info); + } + return super.findMerges(mergeTrigger, inMemorySegmentInfos, mergeContext); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java deleted file mode 100644 index 13110b4f324f..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.segment.local.realtime.impl.invertedindex; - -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import org.apache.lucene.search.SearcherManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Background thread to refresh the realtime lucene index readers for supporting - * near-realtime text search. The task maintains a queue of realtime segments. - * This queue is global (across all realtime segments of all realtime/hybrid tables). - * - * Each element in the queue is of type {@link RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders}. - * It encapsulates a lock and all the realtime lucene readers for the particular realtime segment. - * Since text index is also create on a per column basis, there will be as many realtime lucene - * readers as the number of columns with text search enabled. - * - * Between each successive execution of the task, there is a fixed delay (regardless of how long - * each execution took). When the task wakes up, it pick the RealtimeLuceneReadersForRealtimeSegment - * from the head of queue, refresh it's readers and adds this at the tail of queue. - */ -public class RealtimeLuceneIndexReaderRefreshThread implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeLuceneIndexReaderRefreshThread.class); - // TODO: make this configurable and choose a higher default value - private static final int DELAY_BETWEEN_SUCCESSIVE_EXECUTION_MS_DEFAULT = 10; - - private final ConcurrentLinkedQueue _luceneRealtimeReaders; - private final Lock _mutex; - private final Condition _conditionVariable; - - private volatile boolean _stopped = false; - - RealtimeLuceneIndexReaderRefreshThread( - ConcurrentLinkedQueue luceneRealtimeReaders, Lock mutex, - Condition conditionVariable) { - _luceneRealtimeReaders = luceneRealtimeReaders; - _mutex = mutex; - _conditionVariable = conditionVariable; - } - - void setStopped() { - _stopped = true; - } - - @Override - public void run() { - while (!_stopped) { - _mutex.lock(); - try { - // During instantiation of a given MutableSegmentImpl, we will signal on this condition variable once - // one or more realtime lucene readers (one per column) belonging to the MutableSegment - // are added to the global queue managed by this thread. The thread that signals will - // grab this mutex and signal on the condition variable. - // - // This refresh thread will be woken up (and grab the mutex automatically as per the - // implementation of await) and check if the queue is non-empty. It will then proceed to - // poll the queue and refresh the realtime index readers for the polled segment. - // - // The mutex and condition-variable semantics take care of the scenario when on - // a given Pinot server, there is no realtime segment with text index enabled. In such - // cases, there is no need for this thread to wake up simply after every few seconds/minutes - // only to find that there is nothing to be refreshed. The thread should simply be - // off CPU until signalled specifically. This also covers the situation where initially - // there were few realtime segments of a table with text index. Later if they got - // moved to another server as part of rebalance, then again there is no need for this thread - // to do anything until some realtime segment is created with text index enabled. - while (_luceneRealtimeReaders.isEmpty()) { - _conditionVariable.await(); - } - } catch (InterruptedException e) { - LOGGER.warn("Realtime lucene reader refresh thread got interrupted while waiting on condition variable: ", e); - Thread.currentThread().interrupt(); - } finally { - _mutex.unlock(); - } - - // check if shutdown has been initiated - if (_stopped) { - // exit - break; - } - - // remove the realtime segment from the front of queue - RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders realtimeReadersForSegment = _luceneRealtimeReaders.poll(); - if (realtimeReadersForSegment != null) { - String segmentName = realtimeReadersForSegment.getSegmentName(); - // take the lock to prevent the realtime segment from being concurrently destroyed - // and thus closing the realtime readers while this thread attempts to refresh them - realtimeReadersForSegment.getLock().lock(); - try { - if (!realtimeReadersForSegment.isSegmentDestroyed()) { - // if the segment hasn't yet been destroyed, refresh each - // realtime reader (one per column with text index enabled) - // for this segment. - List realtimeLuceneReaders = - realtimeReadersForSegment.getRealtimeLuceneReaders(); - for (RealtimeLuceneTextIndex realtimeReader : realtimeLuceneReaders) { - if (_stopped) { - // exit - break; - } - SearcherManager searcherManager = realtimeReader.getSearcherManager(); - try { - searcherManager.maybeRefresh(); - } catch (Exception e) { - // we should never be here since the locking semantics between MutableSegmentImpl::destroy() - // and this code along with volatile state "isSegmentDestroyed" protect against the cases - // where this thread might attempt to refresh a realtime lucene reader after it has already - // been closed duing segment destroy. - LOGGER.warn("Caught exception {} while refreshing realtime lucene reader for segment: {}", e, - segmentName); - } - } - } - } finally { - if (!realtimeReadersForSegment.isSegmentDestroyed()) { - _luceneRealtimeReaders.offer(realtimeReadersForSegment); - } - realtimeReadersForSegment.getLock().unlock(); - } - } - - try { - Thread.sleep(DELAY_BETWEEN_SUCCESSIVE_EXECUTION_MS_DEFAULT); - } catch (Exception e) { - LOGGER.warn("Realtime lucene reader refresh thread got interrupted while sleeping: ", e); - Thread.currentThread().interrupt(); - } - } // end while - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManager.java new file mode 100644 index 000000000000..84e2e982fcbb --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManager.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.common.utils.ScalingThreadPoolExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class manages the refreshing of all realtime Lucene index readers. It uses an auto-scaling pool of threads, + * which expands up to a configurable size, to refresh the readers. + *

+ * During instantiation of a RealtimeLuceneTextIndex the corresponding SearcherManager is registered with this class. + * When the RealtimeLuceneTextIndex is closed, the flag set by the RealtimeLuceneTextIndex is checked before attempting + * a refresh, and SearcherManagerHolder instance previously registered to this class is dropped from the queue of + * readers to be refreshed. + */ +public class RealtimeLuceneIndexRefreshManager { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeLuceneIndexRefreshManager.class); + // max number of parallel refresh threads + private final int _maxParallelism; + // delay between refresh iterations + private int _delayMs; + // partitioned lists of SearcherManagerHolders, each gets its own thread for refreshing. SearcherManagerHolders + // are added to the list with the smallest size to roughly balance the load across threads + private final List> _partitionedListsOfSearchers; + private static RealtimeLuceneIndexRefreshManager _singletonInstance; + private static ExecutorService _executorService; + + private RealtimeLuceneIndexRefreshManager(int maxParallelism, int delayMs) { + _maxParallelism = maxParallelism; + _delayMs = delayMs; + // Set min pool size to 0, scale up/down as needed. Set keep alive time to 0, as threads are generally long-lived + _executorService = ScalingThreadPoolExecutor.newScalingThreadPool(0, _maxParallelism, 0L); + _partitionedListsOfSearchers = new ArrayList<>(); + } + + public static RealtimeLuceneIndexRefreshManager getInstance() { + Preconditions.checkArgument(_singletonInstance != null, + "RealtimeLuceneIndexRefreshManager.init() must be called first"); + return _singletonInstance; + } + + /** + * Initializes the RealtimeLuceneIndexRefreshManager with the given maxParallelism and delayMs. This is + * intended to be called only once at the beginning of the server lifecycle. + * @param maxParallelism maximum number of refresh threads to use + * @param delayMs minimum delay between refreshes + */ + public static RealtimeLuceneIndexRefreshManager init(int maxParallelism, int delayMs) { + _singletonInstance = new RealtimeLuceneIndexRefreshManager(maxParallelism, delayMs); + return _singletonInstance; + } + + @VisibleForTesting + public void reset() { + _partitionedListsOfSearchers.clear(); + _executorService.shutdownNow(); + _executorService = ScalingThreadPoolExecutor.newScalingThreadPool(0, _maxParallelism, 0L); + } + + @VisibleForTesting + public void setDelayMs(int delayMs) { + _delayMs = delayMs; + } + + /** + * Add a new SearcherManagerHolder and submit it to the executor service for refreshing. + *

+ * If the _partitionedListsOfSearchers has less than _maxParallelism lists, a new list is created and submitted to + * the executor service to begin refreshing. If there are already _maxParallelism lists, the SearcherManagerHolder + * will be added to the list with the smallest size. If the smallest list is empty, it will be submitted to the + * executor as the old one was stopped. + *

+ * The RealtimeLuceneRefreshRunnable will drop closed indexes from the list, and if all indexes are closed, the + * empty list will ensure the Runnable finishes. ScalingThreadPoolExecutor will then scale down the number of + * threads. This ensures that we do not leave any threads if there are no tables with text index, or text indices + * are removed on the server through actions such as config update or re-balance. + */ + public synchronized void addSearcherManagerHolder(SearcherManagerHolder searcherManagerHolder) { + if (_partitionedListsOfSearchers.size() < _maxParallelism) { + List searcherManagers = Collections.synchronizedList(new ArrayList<>()); + searcherManagers.add(searcherManagerHolder); + _partitionedListsOfSearchers.add(searcherManagers); + _executorService.submit(new RealtimeLuceneRefreshRunnable(searcherManagers, _delayMs)); + return; + } + + List smallestList = null; + for (List list : _partitionedListsOfSearchers) { + if (smallestList == null || list.size() < smallestList.size()) { + smallestList = list; + } + } + assert smallestList != null; + smallestList.add(searcherManagerHolder); + + // If the list was empty before adding the SearcherManagerHolder, the runnable containing the list + // has exited or will soon exit. Therefore, we need to submit a new runnable to the executor for the list. + if (smallestList.size() == 1) { + _executorService.submit(new RealtimeLuceneRefreshRunnable(smallestList, _delayMs)); + } + } + + /** + * Blocks for up to 45 seconds waiting for refreshes of realtime Lucene index readers to complete. + * If all segments were previously closed, it should return immediately. + */ + public boolean awaitTermination() { + // Interrupts will be handled by the RealtimeLuceneRefreshRunnable refresh loop. In general, all + // indexes should be marked closed before this method is called, and _executorService should + // shutdown immediately as there are no active threads. If for some reason an index did not close correctly, + // SearcherManager.maybeRefresh() should be on the order of seconds in the worst case and this should + // return shortly after. + _executorService.shutdownNow(); + boolean terminated = false; + try { + terminated = _executorService.awaitTermination(45, TimeUnit.SECONDS); + if (!terminated) { + LOGGER.warn("Realtime Lucene index refresh pool did not terminate in 45 seconds."); + } + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for realtime Lucene index refresh to shutdown."); + } + return terminated; + } + + @VisibleForTesting + public int getPoolSize() { + return ((ThreadPoolExecutor) _executorService).getPoolSize(); + } + + @VisibleForTesting + public List getListSizes() { + return _partitionedListsOfSearchers.stream().map(List::size).sorted().collect(Collectors.toList()); + } + + /** + * SearcherManagerHolder is a class that holds a SearcherManager instance for a segment and column. Instances + * of this class should be registered with the RealtimeLuceneIndexRefreshManager class to manage refreshing of + * the SearcherManager instance it holds. + */ + public static class SearcherManagerHolder { + private final String _segmentName; + private final String _columnName; + private final Lock _lock; + private volatile boolean _indexClosed; + private final SearcherManager _searcherManager; + + public SearcherManagerHolder(String segmentName, String columnName, SearcherManager searcherManager) { + _segmentName = segmentName; + _columnName = columnName; + _lock = new ReentrantLock(); + _indexClosed = false; + _searcherManager = searcherManager; + } + + public void setIndexClosed() { + _indexClosed = true; + } + + public Lock getLock() { + return _lock; + } + + public String getSegmentName() { + return _segmentName; + } + + public String getColumnName() { + return _columnName; + } + + public SearcherManager getSearcherManager() { + return _searcherManager; + } + + public boolean isIndexClosed() { + return _indexClosed; + } + } + + /** + * Runnable that refreshes a list of SearcherManagerHolder instances. This class is responsible for refreshing + * each SearcherManagerHolder in the list, and re-adding it to the list if it has not been closed. If every + * instance has been closed, the thread will terminate as the list size will be empty. + */ + private static class RealtimeLuceneRefreshRunnable implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeLuceneRefreshRunnable.class); + private final int _delayMs; + private final List _searchers; + + public RealtimeLuceneRefreshRunnable(List searchers, int delayMs) { + _searchers = searchers; + _delayMs = delayMs; + } + + @Override + public void run() { + int i = 0; // current index in _searchers + while (!_searchers.isEmpty() && i <= _searchers.size() && !Thread.interrupted()) { + if (i == _searchers.size()) { + i = 0; // reset cursor to the beginning if we've reached the end + } + SearcherManagerHolder searcherManagerHolder = _searchers.get(i); + assert searcherManagerHolder != null; + searcherManagerHolder.getLock().lock(); + try { + if (searcherManagerHolder.isIndexClosed()) { + _searchers.remove(i); + continue; // do not increment i, as the remaining elements in the list have been shifted + } + + if (!searcherManagerHolder.isIndexClosed()) { + try { + searcherManagerHolder.getSearcherManager().maybeRefresh(); + } catch (Exception e) { + // we should never be here since the locking semantics between RealtimeLuceneTextIndex.close() + // and this code along with volatile state isIndexClosed protect against the cases where this thread + // might attempt to refresh a realtime lucene reader after it has already been closed during + // RealtimeLuceneTextIndex.commit() + LOGGER.warn("Caught exception {} while refreshing realtime lucene reader for segment: {} and column: {}", + e, searcherManagerHolder.getSegmentName(), searcherManagerHolder.getColumnName()); + } + i++; + } + } finally { + searcherManagerHolder.getLock().unlock(); + } + + try { + Thread.sleep(_delayMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java deleted file mode 100644 index e2330a6550ce..000000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.segment.local.realtime.impl.invertedindex; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * This class manages the realtime lucene index readers. Creates a global - * queue with all the realtime segment lucene index readers across - * all tables and manages their refresh using {@link RealtimeLuceneIndexReaderRefreshThread} - * - * TODO: eventually we should explore partitioning this queue on per table basis - */ -public class RealtimeLuceneIndexRefreshState { - private static RealtimeLuceneIndexRefreshState _singletonInstance; - private static RealtimeLuceneIndexReaderRefreshThread _realtimeRefreshThread; - private final Lock _mutex; - private final Condition _conditionVariable; - private static ConcurrentLinkedQueue _luceneRealtimeReaders; - - private RealtimeLuceneIndexRefreshState() { - _mutex = new ReentrantLock(); - _conditionVariable = _mutex.newCondition(); - _luceneRealtimeReaders = new ConcurrentLinkedQueue<>(); - } - - /** - * Used by HelixServerStarter during bootstrap to create the singleton - * instance of this class and start the realtime reader refresh thread. - */ - public void start() { - _realtimeRefreshThread = - new RealtimeLuceneIndexReaderRefreshThread(_luceneRealtimeReaders, _mutex, _conditionVariable); - Thread t = new Thread(_realtimeRefreshThread); - t.start(); - } - - /** - * Used by HelixServerStarter during shutdown. This sets the volatile - * "stopped" variable to indicate the shutdown to refresh thread. - * Since refresh thread might be suspended waiting on the condition variable, - * we signal the condition variable for the refresh thread to wake up, - * check that shutdown has been initiated and exit. - */ - public void stop() { - _realtimeRefreshThread.setStopped(); - _mutex.lock(); - _conditionVariable.signal(); - _mutex.unlock(); - } - - public static RealtimeLuceneIndexRefreshState getInstance() { - if (_singletonInstance == null) { - synchronized (RealtimeLuceneIndexRefreshState.class) { - if (_singletonInstance == null) { - _singletonInstance = new RealtimeLuceneIndexRefreshState(); - } - } - } - return _singletonInstance; - } - - public void addRealtimeReadersToQueue(RealtimeLuceneReaders readersForRealtimeSegment) { - _mutex.lock(); - _luceneRealtimeReaders.offer(readersForRealtimeSegment); - _conditionVariable.signal(); - _mutex.unlock(); - } - - /** - * Since the text index is maintained per TEXT column (similar to other Pinot indexes), - * there could be multiple lucene indexes for a given segment and therefore there can be - * multiple realtime lucene readers (one for each index/column) for the particular - * realtime segment. - */ - public static class RealtimeLuceneReaders { - private final String _segmentName; - private final Lock _lock; - private boolean _segmentDestroyed; - private final List _realtimeLuceneReaders; - - public RealtimeLuceneReaders(String segmentName) { - _segmentName = segmentName; - _lock = new ReentrantLock(); - _segmentDestroyed = false; - _realtimeLuceneReaders = new LinkedList<>(); - } - - public void addReader(RealtimeLuceneTextIndex realtimeLuceneTextIndexReader) { - _realtimeLuceneReaders.add(realtimeLuceneTextIndexReader); - } - - public void setSegmentDestroyed() { - _segmentDestroyed = true; - } - - public Lock getLock() { - return _lock; - } - - public String getSegmentName() { - return _segmentName; - } - - public List getRealtimeLuceneReaders() { - return _realtimeLuceneReaders; - } - - public void clearRealtimeReaderList() { - _realtimeLuceneReaders.clear(); - } - - boolean isSegmentDestroyed() { - return _segmentDestroyed; - } - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java index 0175bc51096b..961f02862959 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java @@ -58,6 +58,7 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { private final boolean _reuseMutableIndex; private boolean _enablePrefixSuffixMatchingInPhraseQueries = false; private final RealtimeLuceneRefreshListener _refreshListener; + private final RealtimeLuceneIndexRefreshManager.SearcherManagerHolder _searcherManagerHolder; /** * Created by {@link MutableSegmentImpl} @@ -92,6 +93,11 @@ public RealtimeLuceneTextIndex(String column, File segmentIndexDir, String segme _analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer(); _enablePrefixSuffixMatchingInPhraseQueries = config.isEnablePrefixSuffixMatchingInPhraseQueries(); _reuseMutableIndex = config.isReuseMutableIndex(); + + // Submit the searcher manager to the global pool for refreshing + _searcherManagerHolder = + new RealtimeLuceneIndexRefreshManager.SearcherManagerHolder(segmentName, column, _searcherManager); + RealtimeLuceneIndexRefreshManager.getInstance().addSearcherManagerHolder(_searcherManagerHolder); } catch (Exception e) { LOGGER.error("Failed to instantiate realtime Lucene index reader for column {}, exception {}", column, e.getMessage()); @@ -179,7 +185,7 @@ private MutableRoaringBitmap getPinotDocIds(IndexSearcher indexSearcher, Mutable while (luceneDocIDIterator.hasNext()) { int luceneDocId = luceneDocIDIterator.next(); Document document = indexSearcher.doc(luceneDocId); - int pinotDocId = Integer.valueOf(document.get(LuceneTextIndexCreator.LUCENE_INDEX_DOC_ID_COLUMN_NAME)); + int pinotDocId = Integer.parseInt(document.get(LuceneTextIndexCreator.LUCENE_INDEX_DOC_ID_COLUMN_NAME)); actualDocIDs.add(pinotDocId); } } catch (Exception e) { @@ -196,6 +202,18 @@ public void commit() { } try { _indexCreator.getIndexWriter().commit(); + // Set the SearcherManagerHolder.indexClosed() flag to stop generating refreshed readers + _searcherManagerHolder.getLock().lock(); + try { + _searcherManagerHolder.setIndexClosed(); + // Block for one final refresh, to ensure queries are fully up to date while segment is being converted + _searcherManager.maybeRefreshBlocking(); + } finally { + _searcherManagerHolder.getLock().unlock(); + } + // It is OK to close the index writer as we are done indexing, and no more refreshes will take place + // The SearcherManager will still provide an up-to-date reader via .acquire() + _indexCreator.getIndexWriter().close(); } catch (Exception e) { LOGGER.error("Failed to commit the realtime lucene text index for column {}, exception {}", _column, e.getMessage()); @@ -206,6 +224,14 @@ public void commit() { @Override public void close() { try { + // Set the SearcherManagerHolder.indexClosed() flag to stop generating refreshed readers. If completionMode is + // set as DOWNLOAD, then commit() will not be called the flag must be set here. + _searcherManagerHolder.getLock().lock(); + try { + _searcherManagerHolder.setIndexClosed(); + } finally { + _searcherManagerHolder.getLock().unlock(); + } _searcherManager.close(); _searcherManager = null; _refreshListener.close(); // clean up metrics prior to closing _indexCreator, as they contain a reference to it diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java index fc2a9ae15104..a5262b489fc5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java @@ -36,11 +36,11 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergeScheduler; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.NRTCachingDirectory; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.LuceneNRTCachingMergePolicy; import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator; import org.apache.pinot.segment.local.segment.index.text.AbstractTextIndexCreator; @@ -132,19 +132,12 @@ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commi indexWriterConfig.setCommitOnClose(commit); indexWriterConfig.setUseCompoundFile(config.isLuceneUseCompoundFile()); - // For the realtime segment, to reuse mutable index, we should set the two write configs below. - // The realtime segment will call .commit() on the IndexWriter when segment conversion occurs. - // By default, Lucene will sometimes choose to merge segments in the background, which is problematic because - // the lucene index directory's contents is copied to create the immutable segment. If a background merge - // occurs during this copy, a FileNotFoundException will be triggered and segment build will fail. - // // Also, for the realtime segment, we set the OpenMode to CREATE to ensure that any existing artifacts // will be overwritten. This is necessary because the realtime segment can be created multiple times // during a server crash and restart scenario. If the existing artifacts are appended to, the realtime // query results will be accurate, but after segment conversion the mapping file generated will be loaded // for only the first numDocs lucene docIds, which can cause IndexOutOfBounds errors. if (!_commitOnClose && config.isReuseMutableIndex()) { - indexWriterConfig.setMergeScheduler(NoMergeScheduler.INSTANCE); indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); } @@ -166,7 +159,9 @@ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commi LOGGER.info( "Using NRTCachingDirectory for realtime lucene index for segment {} and column {} with buffer size: {}MB", segmentIndexDir, column, bufSize); - _indexDirectory = new NRTCachingDirectory(FSDirectory.open(_indexFile.toPath()), bufSize, bufSize); + NRTCachingDirectory dir = new NRTCachingDirectory(FSDirectory.open(_indexFile.toPath()), bufSize, bufSize); + indexWriterConfig.setMergePolicy(new LuceneNRTCachingMergePolicy(dir)); + _indexDirectory = dir; } else { _indexDirectory = FSDirectory.open(_indexFile.toPath()); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java index 390a2d158e18..84254060b32a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java @@ -38,6 +38,7 @@ import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshManager; import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool; import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; @@ -495,6 +496,7 @@ public void testSegmentBuilderWithReuse(boolean columnMajorSegmentBuilder, Strin // create mutable segment impl RealtimeLuceneTextIndexSearcherPool.init(1); + RealtimeLuceneIndexRefreshManager.init(1, 10); MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null); List rows = generateTestDataForReusePath(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java index 324cc1fa5d4d..cc46486606b7 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java @@ -19,14 +19,15 @@ package org.apache.pinot.segment.local.realtime.impl.invertedindex; import java.io.File; +import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; -import org.apache.lucene.search.SearcherManager; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.spi.index.TextIndexConfig; +import org.apache.pinot.util.TestUtils; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.testng.annotations.AfterClass; @@ -46,6 +47,7 @@ public class LuceneMutableTextIndexTest { private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; public LuceneMutableTextIndexTest() { + RealtimeLuceneIndexRefreshManager.init(1, 10); ServerMetrics.register(mock(ServerMetrics.class)); } @@ -80,13 +82,6 @@ public void setUp() _realtimeLuceneTextIndex.add(row); } } - - SearcherManager searcherManager = _realtimeLuceneTextIndex.getSearcherManager(); - try { - searcherManager.maybeRefresh(); - } catch (Exception e) { - throw new RuntimeException(e); - } } @AfterClass @@ -96,6 +91,14 @@ public void tearDown() { @Test public void testQueries() { + TestUtils.waitForCondition(aVoid -> { + try { + return _realtimeLuceneTextIndex.getSearcherManager().isSearcherCurrent(); + } catch (IOException e) { + return false; + } + }, 10000, + "Background pool did not refresh the searcher manager in time"); assertEquals(_realtimeLuceneTextIndex.getDocIds("stream"), ImmutableRoaringBitmap.bitmapOf(0)); assertEquals(_realtimeLuceneTextIndex.getDocIds("/.*house.*/"), ImmutableRoaringBitmap.bitmapOf(1)); assertEquals(_realtimeLuceneTextIndex.getDocIds("invalid"), ImmutableRoaringBitmap.bitmapOf()); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java index 932c851be053..84765353b092 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java @@ -73,6 +73,7 @@ private String[][] getMVTextData() { @BeforeClass public void setUp() throws Exception { + RealtimeLuceneIndexRefreshManager.init(1, 10); ServerMetrics.register(mock(ServerMetrics.class)); TextIndexConfig config = new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false, false, 0); @@ -101,7 +102,7 @@ public void setUp() searcherManagers.add(_realtimeLuceneMVTextIndex.getSearcherManager()); try { for (SearcherManager searcherManager : searcherManagers) { - searcherManager.maybeRefresh(); + searcherManager.maybeRefreshBlocking(); } } catch (Exception e) { throw new RuntimeException(e); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManagerTest.java new file mode 100644 index 000000000000..351ea336bbaf --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManagerTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.io.IOException; +import java.util.List; +import org.apache.lucene.search.SearcherManager; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class RealtimeLuceneIndexRefreshManagerTest { + + @BeforeClass + public void init() { + RealtimeLuceneIndexRefreshManager.init(2, 0); + } + + @BeforeMethod + public void setup() { + RealtimeLuceneIndexRefreshManager.getInstance().reset(); + } + + @Test + public void testSingleSearcherManager() { + RealtimeLuceneIndexRefreshManager realtimeLuceneIndexRefreshManager = + RealtimeLuceneIndexRefreshManager.getInstance(); + + assertEquals(realtimeLuceneIndexRefreshManager.getPoolSize(), 0, "Initial pool size should be 0"); + + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder = getSearcherManagerHolder(1); + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder); + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getPoolSize() == 1, 5000, + "Timed out waiting for thead pool to scale up for the initial searcher manager holder"); + + searcherManagerHolder.setIndexClosed(); + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getPoolSize() == 0, 5000, + "Timed out waiting for thread pool to scale down"); + } + + @Test + public void testManySearcherManagers() { + RealtimeLuceneIndexRefreshManager realtimeLuceneIndexRefreshManager = + RealtimeLuceneIndexRefreshManager.getInstance(); + + assertEquals(realtimeLuceneIndexRefreshManager.getPoolSize(), 0, "Initial pool size should be 0"); + + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder1 = getSearcherManagerHolder(1); + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder2 = getSearcherManagerHolder(2); + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder3 = getSearcherManagerHolder(3); + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder4 = getSearcherManagerHolder(4); + + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder1); + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getPoolSize() == 1, 5000, + "Timed out waiting for thead pool to scale up for the initial searcher manager holder"); + + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder2); + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getPoolSize() == 2, 5000, + "Timed out waiting for thead pool to scale up for the second searcher manager holder"); + + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder3); + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getListSizes().equals(List.of(1, 2)), 5000, + "Timed out waiting for the searcher manager holder to be added to another queue"); + + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder4); + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getListSizes().equals(List.of(2, 2)), 5000, + "Timed out waiting for the searcher manager holder to be added to the smallest queue"); + + searcherManagerHolder1.setIndexClosed(); + searcherManagerHolder2.setIndexClosed(); + searcherManagerHolder3.setIndexClosed(); + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getPoolSize() == 1, 5000, + "Timed out waiting for thead pool to scale down as only one searcher manager holder is left"); + + searcherManagerHolder4.setIndexClosed(); + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getPoolSize() == 0, 5000, + "Timed out waiting for thead pool to scale down as all searcher manager holders have been closed"); + } + + @Test + public void testDelayMs() + throws Exception { + RealtimeLuceneIndexRefreshManager realtimeLuceneIndexRefreshManager = + RealtimeLuceneIndexRefreshManager.getInstance(); + realtimeLuceneIndexRefreshManager.setDelayMs(501); + + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder = getSearcherManagerHolder(1); + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder); + + Thread.sleep(1500); + searcherManagerHolder.setIndexClosed(); + + SearcherManager searcherManager = searcherManagerHolder.getSearcherManager(); + verify(searcherManager, times(3)).maybeRefresh(); + } + + @Test + public void testTerminationForEmptyPool() { + assertTrue(RealtimeLuceneIndexRefreshManager.getInstance().awaitTermination()); + } + + @Test + public void testTerminationWhileActive() { + RealtimeLuceneIndexRefreshManager realtimeLuceneIndexRefreshManager = + RealtimeLuceneIndexRefreshManager.getInstance(); + + assertEquals(realtimeLuceneIndexRefreshManager.getPoolSize(), 0, "Initial pool size should be 0"); + + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder1 = getSearcherManagerHolder(1); + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder2 = getSearcherManagerHolder(2); + RealtimeLuceneIndexRefreshManager.SearcherManagerHolder searcherManagerHolder3 = getSearcherManagerHolder(3); + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder1); + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder2); + realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder3); + + TestUtils.waitForCondition(aVoid -> realtimeLuceneIndexRefreshManager.getPoolSize() == 2, 10000, + "Timed out waiting for thread pool to scale up"); + + searcherManagerHolder1.setIndexClosed(); + searcherManagerHolder2.setIndexClosed(); + searcherManagerHolder3.setIndexClosed(); + assertTrue(RealtimeLuceneIndexRefreshManager.getInstance().awaitTermination()); + } + + private RealtimeLuceneIndexRefreshManager.SearcherManagerHolder getSearcherManagerHolder(int id) { + SearcherManager searcherManager = mock(SearcherManager.class); + try { + when(searcherManager.maybeRefresh()).thenReturn(true); + } catch (IOException e) { + Assert.fail("Mocked searcher manager should not throw exception"); + } + return new RealtimeLuceneIndexRefreshManager.SearcherManagerHolder("segment" + id, "col", searcherManager); + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index b961582a9e27..54e118634f8a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -72,7 +72,7 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.transport.ListenerConfig; import org.apache.pinot.core.util.ListenerConfigUtil; -import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshManager; import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.server.access.AccessControlFactory; @@ -142,8 +142,8 @@ public abstract class BaseServerStarter implements ServiceStartable { protected AccessControlFactory _accessControlFactory; protected AdminApiApplication _adminApiApplication; protected ServerQueriesDisabledTracker _serverQueriesDisabledTracker; - protected RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState; protected RealtimeLuceneTextIndexSearcherPool _realtimeLuceneTextIndexSearcherPool; + protected RealtimeLuceneIndexRefreshManager _realtimeLuceneTextIndexRefreshManager; protected PinotEnvironmentProvider _pinotEnvironmentProvider; protected volatile boolean _isServerReadyToServeQueries = false; @@ -587,6 +587,15 @@ public void start() _serverConf.getProperty(ResourceManager.QUERY_WORKER_CONFIG_KEY, ResourceManager.DEFAULT_QUERY_WORKER_THREADS); _realtimeLuceneTextIndexSearcherPool = RealtimeLuceneTextIndexSearcherPool.init(queryWorkerThreads); + // Initialize RealtimeLuceneIndexRefreshManager with max refresh threads and min refresh interval configs + LOGGER.info("Initializing lucene refresh manager"); + int luceneMaxRefreshThreads = + _serverConf.getProperty(Server.LUCENE_MAX_REFRESH_THREADS, Server.DEFAULT_LUCENE_MAX_REFRESH_THREADS); + int luceneMinRefreshIntervalDuration = + _serverConf.getProperty(Server.LUCENE_MIN_REFRESH_INTERVAL_MS, Server.DEFAULT_LUCENE_MIN_REFRESH_INTERVAL_MS); + _realtimeLuceneTextIndexRefreshManager = + RealtimeLuceneIndexRefreshManager.init(luceneMaxRefreshThreads, luceneMinRefreshIntervalDuration); + LOGGER.info("Initializing server instance and registering state model factory"); Utils.logVersions(); ControllerLeaderLocator.create(_helixManager); @@ -684,9 +693,6 @@ public void start() _serverQueriesDisabledTracker = new ServerQueriesDisabledTracker(_helixClusterName, _instanceId, _helixManager, serverMetrics); _serverQueriesDisabledTracker.start(); - - _realtimeLuceneIndexRefreshState = RealtimeLuceneIndexRefreshState.getInstance(); - _realtimeLuceneIndexRefreshState.start(); } /** @@ -719,9 +725,6 @@ public void stop() { if (_serverQueriesDisabledTracker != null) { _serverQueriesDisabledTracker.stop(); } - if (_realtimeLuceneIndexRefreshState != null) { - _realtimeLuceneIndexRefreshState.stop(); - } try { // Close PinotFS after all data managers are shutdown. Otherwise, segments which are being committed will not // be uploaded to the deep-store. diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index b6fec05b3149..6abdec73e0b4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -726,6 +726,11 @@ public static class Server { public static final String CONFIG_OF_REALTIME_SEGMENT_CONSUMER_CLIENT_ID_SUFFIX = "consumer.client.id.suffix"; + public static final String LUCENE_MAX_REFRESH_THREADS = "pinot.server.lucene.max.refresh.threads"; + public static final int DEFAULT_LUCENE_MAX_REFRESH_THREADS = 1; + public static final String LUCENE_MIN_REFRESH_INTERVAL_MS = "pinot.server.lucene.min.refresh.interval.ms"; + public static final int DEFAULT_LUCENE_MIN_REFRESH_INTERVAL_MS = 10; + public static class SegmentCompletionProtocol { public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = "pinot.server.segment.uploader";