Skip to content

Commit

Permalink
add feature flag for recent TextIndex optimizations (apache#13577)
Browse files Browse the repository at this point in the history
* add feature flag for recent TextIndex optimizations

* expand the reuseParams test data

* add cache size config for NRT cache
  • Loading branch information
klsince authored Jul 11, 2024
1 parent c11390e commit 4cb27af
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex {
private Analyzer _analyzer;
private final String _column;
private final String _segmentName;
private final boolean _reuseMutableIndex;
private boolean _enablePrefixSuffixMatchingInPhraseQueries = false;
private final RealtimeLuceneRefreshListener _refreshListener;

Expand Down Expand Up @@ -90,6 +91,7 @@ public RealtimeLuceneTextIndex(String column, File segmentIndexDir, String segme
_searcherManager.addListener(_refreshListener);
_analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
_enablePrefixSuffixMatchingInPhraseQueries = config.isEnablePrefixSuffixMatchingInPhraseQueries();
_reuseMutableIndex = config.isReuseMutableIndex();
} catch (Exception e) {
LOGGER.error("Failed to instantiate realtime Lucene index reader for column {}, exception {}", column,
e.getMessage());
Expand Down Expand Up @@ -152,8 +154,7 @@ public MutableRoaringBitmap getDocIds(String searchQuery) {
}
}
};
Future<MutableRoaringBitmap> searchFuture =
SEARCHER_POOL.getExecutorService().submit(searchCallable);
Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit(searchCallable);
try {
return searchFuture.get();
} catch (InterruptedException e) {
Expand All @@ -163,7 +164,7 @@ public MutableRoaringBitmap getDocIds(String searchQuery) {
throw new RuntimeException("TEXT_MATCH query interrupted while querying the consuming segment");
} catch (Exception e) {
LOGGER.error("Failed while searching the realtime text index for segment {}, column {}, search query {},"
+ " exception {}", _segmentName, _column, searchQuery, e.getMessage());
+ " exception {}", _segmentName, _column, searchQuery, e.getMessage());
throw new RuntimeException(e);
}
}
Expand All @@ -190,6 +191,9 @@ private MutableRoaringBitmap getPinotDocIds(IndexSearcher indexSearcher, Mutable

@Override
public void commit() {
if (!_reuseMutableIndex) {
return;
}
try {
_indexCreator.getIndexWriter().commit();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commi
_textColumn = column;
_commitOnClose = commit;

// to reuse the mutable index, it must be (1) not the realtime index, i.e. commit is set to false
// and (2) happens during realtime segment conversion
_reuseMutableIndex = commit && realtimeConversion;
String luceneAnalyzerClass = config.getLuceneAnalyzerClass();
try {
// segment generation is always in V1 and later we convert (as part of post creation processing)
Expand All @@ -135,38 +132,43 @@ public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commi
indexWriterConfig.setCommitOnClose(commit);
indexWriterConfig.setUseCompoundFile(config.isLuceneUseCompoundFile());

// For the realtime segment, prevent background merging. The realtime segment will call .commit()
// on the IndexWriter when segment conversion occurs. By default, Lucene will sometimes choose to
// merge segments in the background, which is problematic because the lucene index directory's
// contents is copied to create the immutable segment. If a background merge occurs during this
// copy, a FileNotFoundException will be triggered and segment build will fail.
// For the realtime segment, to reuse mutable index, we should set the two write configs below.
// The realtime segment will call .commit() on the IndexWriter when segment conversion occurs.
// By default, Lucene will sometimes choose to merge segments in the background, which is problematic because
// the lucene index directory's contents is copied to create the immutable segment. If a background merge
// occurs during this copy, a FileNotFoundException will be triggered and segment build will fail.
//
// Also, for the realtime segment, we set the OpenMode to CREATE to ensure that any existing artifacts
// will be overwritten. This is necessary because the realtime segment can be created multiple times
// during a server crash and restart scenario. If the existing artifacts are appended to, the realtime
// query results will be accurate, but after segment conversion the mapping file generated will be loaded
// for only the first numDocs lucene docIds, which can cause IndexOutOfBounds errors.
if (!_commitOnClose) {
if (!_commitOnClose && config.isReuseMutableIndex()) {
indexWriterConfig.setMergeScheduler(NoMergeScheduler.INSTANCE);
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
}

// to reuse the mutable index, it must be (1) not the realtime index, i.e. commit is set to true
// and (2) happens during realtime segment conversion
_reuseMutableIndex = config.isReuseMutableIndex() && commit && realtimeConversion;
if (_reuseMutableIndex) {
LOGGER.info("Reusing the realtime lucene index for segment {} and column {}", segmentIndexDir, column);
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
convertMutableSegment(segmentIndexDir, consumerDir, immutableToMutableIdMap, indexWriterConfig);
return;
}

if (_commitOnClose) {
_indexDirectory = FSDirectory.open(_indexFile.toPath());
} else {
if (!_commitOnClose && config.getLuceneNRTCachingDirectoryMaxBufferSizeMB() > 0) {
// For realtime index, use NRTCachingDirectory to reduce the number of open files. This buffers the
// flushes triggered by the near real-time refresh and writes them to disk when the buffer is full,
// reducing the number of small writes.
_indexDirectory =
new NRTCachingDirectory(FSDirectory.open(_indexFile.toPath()), config.getLuceneMaxBufferSizeMB(),
config.getLuceneMaxBufferSizeMB());
int bufSize = config.getLuceneNRTCachingDirectoryMaxBufferSizeMB();
LOGGER.info(
"Using NRTCachingDirectory for realtime lucene index for segment {} and column {} with buffer size: {}MB",
segmentIndexDir, column, bufSize);
_indexDirectory = new NRTCachingDirectory(FSDirectory.open(_indexFile.toPath()), bufSize, bufSize);
} else {
_indexDirectory = FSDirectory.open(_indexFile.toPath());
}
_indexWriter = new IndexWriter(_indexDirectory, indexWriterConfig);
} catch (ReflectiveOperationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,21 @@ private void testSegment(List<GenericRow> rows, File indexDir,
@DataProvider
public static Object[][] reuseParams() {
List<Boolean> enabledColumnMajorSegmentBuildParams = Arrays.asList(false, true);
String[] sortedColumnParams = new String[]{null, STRING_COLUMN1};

return enabledColumnMajorSegmentBuildParams.stream().flatMap(
columnMajor -> Arrays.stream(sortedColumnParams).map(sortedColumn -> new Object[]{columnMajor,
sortedColumn}))
List<String> sortedColumnParams = Arrays.asList(null, STRING_COLUMN1);
List<Boolean> reuseMutableIndex = Arrays.asList(true, false);
List<Integer> luceneNRTCachingDirectoryMaxBufferSizeMB = Arrays.asList(0, 5);

return enabledColumnMajorSegmentBuildParams.stream().flatMap(columnMajor -> sortedColumnParams.stream().flatMap(
sortedColumn -> reuseMutableIndex.stream().flatMap(
reuseIndex -> luceneNRTCachingDirectoryMaxBufferSizeMB.stream()
.map(cacheSize -> new Object[]{columnMajor, sortedColumn, reuseIndex, cacheSize}))))
.toArray(Object[][]::new);
}

// Test the realtime segment conversion of a table with an index that reuses mutable index artifacts during conversion
@Test(dataProvider = "reuseParams")
public void testSegmentBuilderWithReuse(boolean columnMajorSegmentBuilder, String sortedColumn)
public void testSegmentBuilderWithReuse(boolean columnMajorSegmentBuilder, String sortedColumn,
boolean reuseMutableIndex, int luceneNRTCachingDirectoryMaxBufferSizeMB)
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
FieldConfig textIndexFieldConfig =
Expand All @@ -477,7 +481,7 @@ public void testSegmentBuilderWithReuse(boolean columnMajorSegmentBuilder, Strin
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
TextIndexConfig textIndexConfig =
new TextIndexConfig(false, null, null, false, false, Collections.emptyList(), Collections.emptyList(), false,
500, null, false);
500, null, false, reuseMutableIndex, luceneNRTCachingDirectoryMaxBufferSizeMB);

RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private String[][] getRepeatedData() {
public void setUp()
throws Exception {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false);
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false, false, 0);
_realtimeLuceneTextIndex =
new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "table__0__1__20240602T0014Z", config);
String[][] documents = getTextData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void setUp()
throws Exception {
ServerMetrics.register(mock(ServerMetrics.class));
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false);
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false, false, 0);
_realtimeLuceneTextIndex =
new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "table__0__1__20240602T0014Z", config);
_nativeMutableTextIndex = new NativeMutableTextIndex(TEXT_COLUMN_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void nativeTextIndexIsDeleted()
public void testRemoveTextIndices()
throws IOException {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false);
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false, false, 0);
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap);
LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, null,
config);
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testRemoveTextIndices()
public void testGetColumnIndices()
throws IOException {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false);
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false, false, 0);
// Write sth to buffers and flush them to index files on disk
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap);
LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, null,
Expand All @@ -293,17 +293,15 @@ public void testGetColumnIndices()
}

// Need segmentMetadata to tell the full set of columns in this segment.
when(_segmentMetadata.getAllColumns())
.thenReturn(new TreeSet<>(Arrays.asList("col1", "col2", "col3", "col4", "col5", "foo", "bar")));
when(_segmentMetadata.getAllColumns()).thenReturn(
new TreeSet<>(Arrays.asList("col1", "col2", "col3", "col4", "col5", "foo", "bar")));
try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) {
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.forward()),
new HashSet<>(Arrays.asList("col1", "col3")));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.forward()), new HashSet<>(Arrays.asList("col1", "col3")));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.dictionary()),
new HashSet<>(Collections.singletonList("col2")));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.inverted()),
new HashSet<>(Collections.singletonList("col4")));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.h3()),
new HashSet<>(Collections.singletonList("col5")));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.h3()), new HashSet<>(Collections.singletonList("col5")));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.text()), new HashSet<>(Arrays.asList("foo", "bar")));

fpi.removeIndex("col1", StandardIndexes.forward());
Expand All @@ -318,8 +316,7 @@ public void testGetColumnIndices()
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.inverted()),
new HashSet<>(Collections.singletonList("col4")));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.h3()), new HashSet<>(Collections.emptySet()));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.text()),
new HashSet<>(Collections.singletonList("bar")));
assertEquals(fpi.getColumnsWithIndex(StandardIndexes.text()), new HashSet<>(Collections.singletonList("bar")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void testCleanupRemovedIndices()
public void testRemoveTextIndices()
throws IOException, ConfigurationException {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false);
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false, false, 0);
try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap);
LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, null,
config);
Expand Down Expand Up @@ -343,7 +343,7 @@ public void testPersistIndexMaps() {
public void testGetColumnIndices()
throws Exception {
TextIndexConfig config =
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false);
new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, false, false, 0);
try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap);
LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true, false, null, null,
config);
Expand Down
Loading

0 comments on commit 4cb27af

Please sign in to comment.