Skip to content

Commit

Permalink
feat(core): Add open/close support for Tantivy Index
Browse files Browse the repository at this point in the history
Commit 1 of adding Tantivy index support.  This PR is broken down into a small subset
of the overall Tantivy logic to allow for easier partial review.  The index code is not
usable end to end until all parts are committed.

This adds the basic Rust project skeleton and supports opening, applying schema, and closing the index.
Many methods are unimplemented and will be added in follow up PRs.  End to end testing is not available
in this PR as the index test suite requires ingestion and query support for verification.
  • Loading branch information
rfairfax committed Jul 23, 2024
1 parent 9779a31 commit 338b3dc
Show file tree
Hide file tree
Showing 19 changed files with 2,001 additions and 11 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: CI

on:
push:
branches: [ develop ]
branches: [ develop, rfairfax/build_test ]
pull_request:
branches: [ develop, integration, main, feat-index-rust ]

Expand All @@ -19,6 +19,11 @@ jobs:
with:
java-version: '11'
distribution: 'adopt'
- name: Install Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: rustfmt, clippy
target: x86_64-apple-darwin, aarch64-apple-darwin, aarch64-unknown-linux-gnu
- name: Run tests
run: .github/workflows/runtests.sh
- name: Coverage Reports
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ metastore_db/
**/kafka/src/test/scala/filodb/kafka/shard*
*lib*

# Allow Rust's lib.rs since we're otherwise blocking *lib* above
!lib.rs

coordinator/src/test/resources/
5 changes: 5 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,11 @@ filodb {
# Whether caching on index is disabled underlying Lucene index uses LRUCache enabled by default, the flag lets us
#disable this feature
disable-index-caching = false

# The Part Key index implementation to use. Supported values:
# lucene - Lucene based index (default)
# tantivy - Tantivy based index
part-key-index-type = lucene
}

# for standalone worker cluster configuration, see akka-bootstrapper
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/filodb.core/memstore/PartKeyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ abstract class PartKeyIndexRaw(ref: DatasetRef,
protected val lifecycleManager: Option[IndexMetadataStore] = None)
extends StrictLogging {

protected val startTimeLookupLatency = Kamon.histogram("index-startTimes-for-odp-lookup-latency",
MeasurementUnit.time.nanoseconds)
.withTag("dataset", ref.dataset)
.withTag("shard", shardNum)

protected val queryIndexLookupLatency = Kamon.histogram("index-partition-lookup-latency",
MeasurementUnit.time.nanoseconds)
.withTag("dataset", ref.dataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
import PartKeyLuceneIndex._
import PartKeyIndexRaw._

val startTimeLookupLatency = Kamon.histogram("index-startTimes-for-odp-lookup-latency",
MeasurementUnit.time.nanoseconds)
.withTag("dataset", ref.dataset)
.withTag("shard", shardNum)

val partIdFromPartKeyLookupLatency = Kamon.histogram("index-ingestion-partId-lookup-latency",
MeasurementUnit.time.nanoseconds)
.withTag("dataset", ref.dataset)
Expand Down
213 changes: 213 additions & 0 deletions core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package filodb.core.memstore

import java.io.File
import java.nio.file.Files

import debox.Buffer
import org.apache.commons.lang3.SystemUtils
import org.apache.lucene.util.BytesRef

import filodb.core.DatasetRef
import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn}
import filodb.core.metadata.PartitionSchema
import filodb.core.query.ColumnFilter

class PartKeyTantivyIndex(ref: DatasetRef,
schema: PartitionSchema,
shardNum: Int,
retentionMillis: Long, // only used to calculate fallback startTime
diskLocation: Option[File] = None,
lifecycleManager: Option[IndexMetadataStore] = None
) extends PartKeyIndexRaw(ref, shardNum, schema, diskLocation, lifecycleManager) {

// Compute field names for native schema code
private val schemaFields = schema.columns.filter { c =>
c.columnType == StringColumn
}.map { c =>
c.name
}.toArray

private val schemaMapFields = schema.columns.filter { c =>
c.columnType == MapColumn
}.map { c =>
c.name
}.toArray

private val schemaMultiColumnFacets = schema.options.multiColumnFacets.keys.toArray

// Native handle for cross JNI operations
private var indexHandle: Long = loadIndexData(() => TantivyNativeMethods.newIndexHandle(indexDiskLocation.toString,
schemaFields, schemaMapFields, schemaMultiColumnFacets))

logger.info(s"Created tantivy index for dataset=$ref shard=$shardNum at $indexDiskLocation")

override def reset(): Unit = {
TantivyNativeMethods.reset(indexHandle)
}

override def startFlushThread(flushDelayMinSeconds: Int, flushDelayMaxSeconds: Int): Unit = {
???
}

override def partIdsEndedBefore(endedBefore: Long): Buffer[Int] = {
???
}

override def removePartitionsEndedBefore(endedBefore: Long, returnApproxDeletedCount: Boolean): Int = {
???
}

override def removePartKeys(partIds: Buffer[Int]): Unit = {
???
}

override def indexRamBytes: Long = {
???
}

override def indexNumEntries: Long = {
???
}

override def closeIndex(): Unit = {
logger.info(s"Closing index on dataset=$ref shard=$shardNum")

commit()
TantivyNativeMethods.freeIndexHandle(indexHandle)
indexHandle = 0
}

override def indexNames(limit: Int): Seq[String] = {
???
}

override def indexValues(fieldName: String, topK: Int): Seq[TermInfo] = {
???
}

override def labelNamesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long): Seq[String] = {
???
}

override def labelValuesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long,
colName: String, limit: Int): Seq[String] = {
???
}

override def addPartKey(partKeyOnHeapBytes: Array[Byte], partId: Int, startTime: Long, endTime: Long,
partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = {
???
}

override def upsertPartKey(partKeyOnHeapBytes: Array[Byte], partId: Int, startTime: Long, endTime: Long,
partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = {
???
}

override def partKeyFromPartId(partId: Int): Option[BytesRef] = {
???
}

override def startTimeFromPartId(partId: Int): Long = {
???
}

override def endTimeFromPartId(partId: Int): Long = {
???
}

override def startTimeFromPartIds(partIds: Iterator[Int]): debox.Map[Int, Long] = {
???
}

override def commit(): Unit = {
TantivyNativeMethods.commit(indexHandle)
}

override def updatePartKeyWithEndTime(partKeyOnHeapBytes: Array[Byte], partId: Int, endTime: Long,
partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = {
???
}

override def refreshReadersBlocking(): Unit = {
???
}

override def partIdsFromFilters(columnFilters: Seq[ColumnFilter], startTime: Long, endTime: Long,
limit: Int): Buffer[Int] = {
???
}

override def partKeyRecordsFromFilters(columnFilters: Seq[ColumnFilter], startTime: Long, endTime: Long,
limit: Int): Seq[PartKeyLuceneIndexRecord] = {
???
}

override def partIdFromPartKeySlow(partKeyBase: Any, partKeyOffset: Long): Option[Int] = {
???
}

override def singlePartKeyFromFilters(columnFilters: Seq[ColumnFilter], startTime: Long,
endTime: Long): Option[Array[Byte]] = {
???
}

override protected def addIndexedField(key: String, value: String): Unit = {
???
}

protected def addIndexedMapField(mapColumn: String, key: String, value: String): Unit = {
???
}

protected override def addMultiColumnFacet(key: String, value: String): Unit = {
???
}
}

// JNI methods
protected object TantivyNativeMethods {
// Load native library from jar
private def loadLibrary(): Unit = {
val tempDir = Files.createTempDirectory("filodb-native-")

val lib = System.mapLibraryName("filodb_core")

val arch = SystemUtils.OS_ARCH
val kernel = if (SystemUtils.IS_OS_LINUX) {
"linux"
} else if (SystemUtils.IS_OS_MAC) {
"darwin"
} else if (SystemUtils.IS_OS_WINDOWS) {
"windows"
} else {
sys.error(s"Unhandled platform ${SystemUtils.OS_NAME}")
}

val resourcePath: String = "/native/" + kernel + "/" + arch + "/" + lib
val resourceStream = Option(TantivyNativeMethods.getClass.getResourceAsStream(resourcePath)).get

val finalPath = tempDir.resolve(lib)
Files.copy(resourceStream, finalPath)

System.load(finalPath.toAbsolutePath.toString)
}

loadLibrary()

@native
def newIndexHandle(diskLocation: String, schemaFields: Array[String],
schemaMapFields: Array[String], schemaMultiColumnFacets: Array[String]): Long

// Free memory used by an index handle
@native
def freeIndexHandle(handle: Long): Unit

// Reset index data (delete all docs)
@native
def reset(handle: Long): Unit

// Commit changes to the index
@native
def commit(handle: Long): Unit
}
12 changes: 9 additions & 3 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class TimeSeriesShard(val ref: DatasetRef,
private val indexFacetingEnabledAllLabels = filodbConfig.getBoolean("memstore.index-faceting-enabled-for-all-labels")
private val numParallelFlushes = filodbConfig.getInt("memstore.flush-task-parallelism")
private val disableIndexCaching = filodbConfig.getBoolean("memstore.disable-index-caching")
private val partKeyIndexType = filodbConfig.getString("memstore.part-key-index-type")


/////// END CONFIGURATION FIELDS ///////////////////
Expand All @@ -311,9 +312,14 @@ class TimeSeriesShard(val ref: DatasetRef,
* Used to answer queries not involving the full partition key.
* Maintained using a high-performance bitmap index.
*/
private[memstore] final val partKeyIndex: PartKeyIndexRaw = new PartKeyLuceneIndex(ref, schemas.part,
indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum,
storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching)
private[memstore] final val partKeyIndex: PartKeyIndexRaw = partKeyIndexType match {
case "lucene" => new PartKeyLuceneIndex(ref, schemas.part,
indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum,
storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching)
case "tantivy" => new PartKeyTantivyIndex(ref, schemas.part,
shardNum, storeConfig.diskTTLSeconds * 1000)
case x => sys.error(s"Unsupported part key index type: '$x'")
}

private val cardTracker: CardinalityTracker = initCardTracker()

Expand Down
Loading

0 comments on commit 338b3dc

Please sign in to comment.