Skip to content

Commit

Permalink
feat(histograms): Supporting back compatibility of queries which span…
Browse files Browse the repository at this point in the history
…s across prom histograms and otel histograms for same metric (filodb#1807)

* Updating schema for otel histograms

* Adding config for columns to add max min columns
  • Loading branch information
sandeep6189 authored Aug 2, 2024
1 parent 122f6ea commit 99134a2
Show file tree
Hide file tree
Showing 17 changed files with 213 additions and 54 deletions.
25 changes: 14 additions & 11 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ filodb {
columns = ["timestamp:ts",
"sum:double:detectDrops=true",
"count:double:detectDrops=true",
"h:hist:counter=true",
"min:double:detectDrops=true",
"max:double:detectDrops=true",
"h:hist:counter=true"]
"max:double:detectDrops=true"]
value-column = "h"
downsamplers = ["tTime(0)", "dLast(1)", "dLast(2)", "dMin(3)", "dMax(4)", "hLast(5)"]
downsamplers = ["tTime(0)", "dLast(1)", "dLast(2)", "hLast(3)", "dMin(4)", "dMax(5)"]
# Downsample periods are determined by counter dips of the count column
downsample-period-marker = "counter(2)"
downsample-schema = "otel-cumulative-histogram"
Expand All @@ -203,11 +203,11 @@ filodb {
columns = ["timestamp:ts",
"sum:double:{detectDrops=false,delta=true}",
"count:double:{detectDrops=false,delta=true}",
"h:hist:{counter=false,delta=true}",
"min:double:{detectDrops=false,delta=true}",
"max:double:{detectDrops=false,delta=true}",
"h:hist:{counter=false,delta=true}"]
"max:double:{detectDrops=false,delta=true}"]
value-column = "h"
downsamplers = ["tTime(0)", "dSum(1)", "dSum(2)", "dMin(3)", "dMax(4)", "hSum(5)"]
downsamplers = ["tTime(0)", "dSum(1)", "dSum(2)", "hSum(3)", "dMin(4)", "dMax(5)"]
downsample-period-marker = "time(0)"
downsample-schema = "otel-delta-histogram"
}
Expand Down Expand Up @@ -245,7 +245,7 @@ filodb {
columns = ["timestamp:ts",
"sum:double:{detectDrops=false,delta=true}",
"count:double:{detectDrops=false,delta=true}",
"tscount:double:{detectDrops=false,delta=true}"
"tscount:double:{detectDrops=false,delta=true}",
"h:hist:{counter=false,delta=true}"]
value-column = "h"
downsample-period-marker = "time(0)"
Expand All @@ -256,13 +256,13 @@ filodb {
columns = ["timestamp:ts",
"sum:double:{detectDrops=false,delta=true}",
"count:double:{detectDrops=false,delta=true}",
"tscount:double:{detectDrops=false,delta=true}",
"h:hist:{counter=false,delta=true}",
"min:double:{detectDrops=false,delta=true}",
"max:double:{detectDrops=false,delta=true}",
"tscount:double:{detectDrops=false,delta=true}"
"h:hist:{counter=false,delta=true}"]
"max:double:{detectDrops=false,delta=true}"]
value-column = "h"
downsample-period-marker = "time(0)"
downsamplers = ["tTime(0)", "dSum(1)", "dSum(2)", "dMin(3)", "dMax(4)", "dSum(5)", "hSum(6)"]
downsamplers = ["tTime(0)", "dSum(1)", "dSum(2)", "dSum(3)", "hSum(4)", "dMin(5)", "dMax(6)"]
downsample-schema = "preagg-otel-delta-histogram"
}

Expand Down Expand Up @@ -396,6 +396,9 @@ filodb {
partitions-deny-list = ""
}

# Config to disable workspaces where we don't want max min columns to be selected for back compatibility
# If unspecified, all workspaces are disabled by default.
workspaces-disabled-max-min = ["disabled_ws"]

routing {
enable-remote-raw-exports = false
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/filodb.core/GlobalConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filodb.core

import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.StrictLogging
import scala.jdk.CollectionConverters._

/**
* Loads the overall configuration in a specific order:
Expand Down Expand Up @@ -41,4 +42,11 @@ object GlobalConfig extends StrictLogging {
|}
|""".stripMargin)

// Workspaces which are disabled for using min/max columns for otel histograms
val workspacesDisabledForMaxMin : Option[Set[String]] =
systemConfig.hasPath("filodb.query.workspaces-disabled-max-min") match {
case false => None
case true => Some(systemConfig.getStringList("filodb.query.workspaces-disabled-max-min").asScala.toSet)
}

}
2 changes: 1 addition & 1 deletion core/src/main/scala/filodb.core/store/ChunkSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ trait ChunkSource extends RawChunkSource with StrictLogging {
lookupRes.firstSchemaId match {
case Some(reqSchemaId) =>
scanPartitions(ref, lookupRes, columnIDs, querySession).filter { p =>
if (p.schema.schemaHash != reqSchemaId)
if (!p.doesSchemaMatchOrBackCompatibleHistograms(Schemas.global.schemaName(reqSchemaId), reqSchemaId))
throw SchemaMismatch(Schemas.global.schemaName(reqSchemaId), p.schema.name, getClass.getSimpleName)
p.hasChunks(lookupRes.chunkMethod)
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/filodb.core/store/ReadablePartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,21 @@ trait ReadablePartition extends FiloPartition {
countInfoIterator: CountingChunkInfoIterator): RangeVectorCursor =
new PartitionTimeRangeReader(this, method.startTime, method.endTime, countInfoIterator, columnIDs)

/**
* @param schemaNameToCheck the schema name to check against
* @param schemaHashToCheck the schema hash to check against
* @return true if the schema name and hash match or if the schemas are back compatible histograms
*/
final def doesSchemaMatchOrBackCompatibleHistograms(schemaNameToCheck : String, schemaHashToCheck : Int) : Boolean = {
if (schema.schemaHash == schemaHashToCheck) { true }
else {
val sortedSchemas = Seq(schema.name, schemaNameToCheck).sortBy(_.length)
val ret = if ((sortedSchemas(0) == "prom-histogram") && (sortedSchemas(1) == "otel-cumulative-histogram")) true
else if ((sortedSchemas(0) == "delta-histogram") && (sortedSchemas(1) == "otel-delta-histogram")) true
else false
ret
}
}

def publishInterval: Option[Long]
}
1 change: 1 addition & 0 deletions core/src/test/resources/application_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ filodb {
translate-prom-to-filodb-histogram = true
parser = "antlr"
enforce-result-byte-limit = true
workspaces-disabled-max-min = ["disabled_ws"]
container-size-overrides {
filodb-query-exec-aggregate-large-container = 65536
filodb-query-exec-metadataexec = 65536
Expand Down
16 changes: 9 additions & 7 deletions core/src/test/scala/filodb.core/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ object MachineMetricsData {

var histBucketScheme: bv.HistogramBuckets = _
def linearHistSeries(startTs: Long = 100000L, numSeries: Int = 10, timeStep: Int = 1000, numBuckets: Int = 8,
infBucket: Boolean = false):
infBucket: Boolean = false, ws: String = "demo"):
Stream[Seq[Any]] = {
val scheme = if (infBucket) {
// Custom geometric buckets, with top bucket being +Inf
Expand All @@ -379,7 +379,7 @@ object MachineMetricsData {
buckets.sum.toLong,
bv.LongHistogram(scheme, buckets.map(x => x)),
"request-latency",
extraTags ++ Map("_ws_".utf8 -> "demo".utf8, "_ns_".utf8 -> "testapp".utf8, "dc".utf8 -> s"${n % numSeries}".utf8))
extraTags ++ Map("_ws_".utf8 -> ws.utf8, "_ns_".utf8 -> "testapp".utf8, "dc".utf8 -> s"${n % numSeries}".utf8))
}
}

Expand All @@ -393,8 +393,10 @@ object MachineMetricsData {
val schemas2h = Schemas(schema2.partition,
Map(schema2.name -> schema2, "histogram" -> histDataset.schema))

val histMaxMinDS = Dataset("histmaxmin", Seq("metric:string", "tags:map"),
Seq("timestamp:ts", "count:long", "sum:long", "min:double", "max:double", "h:hist:counter=false"))
val histMaxMinDS = Dataset.make("histmaxmin",
Seq("metric:string", "tags:map"),
Seq("timestamp:ts", "count:long", "sum:long", "h:hist:counter=false", "min:double", "max:double"),
valueColumn = Some("h")).get

// Pass in the output of linearHistSeries here.
// Adds in the max and min column before h/hist
Expand All @@ -407,7 +409,7 @@ object MachineMetricsData {
.lastOption.getOrElse(hist.numBuckets - 1)
val max = hist.bucketTop(lastBucketNum) * 0.8
val min = hist.bucketTop(lastBucketNum) * 0.1
((row take 3):+ min :+ max) ++ (row drop 3)
((row take 4):+ min :+ max) ++ (row drop 4)
}

val histKeyBuilder = new RecordBuilder(TestData.nativeMem, 2048)
Expand Down Expand Up @@ -436,7 +438,7 @@ object MachineMetricsData {

private val histMaxBP = new WriteBufferPool(TestData.nativeMem, histMaxMinDS.schema.data, TestData.storeConf)

// Designed explicitly to work with histMax(linearHistSeries) records
// Designed explicitly to work with histMaxMin(linearHistSeries) records
def histMaxMinRV(startTS: Long, pubFreq: Long = 10000L, numSamples: Int = 100, numBuckets: Int = 8):
(Stream[Seq[Any]], RawDataRangeVector) = {
val histData = histMaxMin(linearHistSeries(startTS, 1, pubFreq.toInt, numBuckets)).take(numSamples)
Expand All @@ -447,7 +449,7 @@ object MachineMetricsData {
// Now flush and ingest the rest to ensure two separate chunks
part.switchBuffers(histMaxMinBH, encode = true)
// Select timestamp, hist, max, min
(histData, RawDataRangeVector(null, part, AllChunkScan, Array(0, 5, 4, 3),
(histData, RawDataRangeVector(null, part, AllChunkScan, Array(0, 3, 5, 4),
new AtomicLong, Long.MaxValue, "query-id"))
}
}
Expand Down
77 changes: 77 additions & 0 deletions core/src/test/scala/filodb.core/store/ReadablePartitionSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package filodb.core.store

import filodb.core.GlobalConfig
import filodb.core.NamesTestData.dataset
import filodb.core.downsample.OffHeapMemory
import filodb.core.memstore.{PagedReadablePartition, TimeSeriesPartition, TimeSeriesShardInfo, TimeSeriesShardStats}
import filodb.core.metadata.Schemas
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
class ReadablePartitionSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll {

it("doesSchemaMatchOrBackCompatibleHistograms should return true for delta and otel-delta") {
val rawData = RawPartData(Array.empty, Seq.empty)
val p1 = new PagedReadablePartition(Schemas.deltaHistogram, 0, -1, rawData, 10)
val p2 = new PagedReadablePartition(Schemas.otelDeltaHistogram, 0, -1, rawData, 10)
p1.doesSchemaMatchOrBackCompatibleHistograms(p2.schema.name, p2.schema.schemaHash) shouldEqual true
}

it("doesSchemaMatchOrBackCompatibleHistograms should return true for cumulative and otel-cumulative") {
val rawData = RawPartData(Array.empty, Seq.empty)
val p1 = new PagedReadablePartition(Schemas.promHistogram, 0, -1, rawData, 10)
val p2 = new PagedReadablePartition(Schemas.otelCumulativeHistogram, 0, -1, rawData, 10)
p1.doesSchemaMatchOrBackCompatibleHistograms(p2.schema.name, p2.schema.schemaHash) shouldEqual true
}

it("doesSchemaMatchOrBackCompatibleHistograms should return true if schema matches paged readble partition") {
val rawData = RawPartData(Array.empty, Seq.empty)
val p1 = new PagedReadablePartition(Schemas.deltaCounter, 0, -1, rawData, 10)
val p2 = new PagedReadablePartition(Schemas.deltaCounter, 0, -1, rawData, 10)
p1.doesSchemaMatchOrBackCompatibleHistograms(p2.schema.name, p2.schema.schemaHash) shouldEqual true

val p3 = new PagedReadablePartition(Schemas.gauge, 0, -1, rawData, 10)
val p4 = new PagedReadablePartition(Schemas.gauge, 0, -1, rawData, 10)
p3.doesSchemaMatchOrBackCompatibleHistograms(p4.schema.name, p4.schema.schemaHash) shouldEqual true

val p5 = new PagedReadablePartition(Schemas.promHistogram, 0, -1, rawData, 10)
val p6 = new PagedReadablePartition(Schemas.promHistogram, 0, -1, rawData, 10)
p5.doesSchemaMatchOrBackCompatibleHistograms(p6.schema.name, p6.schema.schemaHash) shouldEqual true

val p7 = new PagedReadablePartition(Schemas.promCounter, 0, -1, rawData, 10)
val p8 = new PagedReadablePartition(Schemas.promCounter, 0, -1, rawData, 10)
p7.doesSchemaMatchOrBackCompatibleHistograms(p8.schema.name, p8.schema.schemaHash) shouldEqual true
}

it("doesSchemaMatchOrBackCompatibleHistograms should return true if schema matches timeseries partition") {
val storeConfig = StoreConfig(GlobalConfig.defaultFiloConfig.getConfig("downsampler.downsample-store-config"))
val offheapMem = new OffHeapMemory(Seq(Schemas.gauge, Schemas.promCounter, Schemas.promHistogram, Schemas.untyped),
Map.empty, 100, storeConfig)
val shardInfo = TimeSeriesShardInfo(
0, new TimeSeriesShardStats(dataset.ref, 0), offheapMem.bufferPools, offheapMem.nativeMemoryManager)

val p1 = new TimeSeriesPartition(0, Schemas.deltaHistogram, 0, shardInfo, 1)
val p2 = new TimeSeriesPartition(0, Schemas.otelDeltaHistogram, 0, shardInfo, 1)
p1.doesSchemaMatchOrBackCompatibleHistograms(p2.schema.name, p2.schema.schemaHash) shouldEqual true

val p3 = new TimeSeriesPartition(0, Schemas.gauge, 0, shardInfo, 1)
val p4 = new TimeSeriesPartition(0, Schemas.gauge, 0, shardInfo, 1)
p3.doesSchemaMatchOrBackCompatibleHistograms(p4.schema.name, p4.schema.schemaHash) shouldEqual true
}

it("doesSchemaMatchOrBackCompatibleHistograms should return false if schema does not matches timeseries partition") {
val storeConfig = StoreConfig(GlobalConfig.defaultFiloConfig.getConfig("downsampler.downsample-store-config"))
val offheapMem = new OffHeapMemory(Seq(Schemas.gauge, Schemas.promCounter, Schemas.promHistogram, Schemas.untyped),
Map.empty, 100, storeConfig)
val shardInfo = TimeSeriesShardInfo(
0, new TimeSeriesShardStats(dataset.ref, 0), offheapMem.bufferPools, offheapMem.nativeMemoryManager)

val p1 = new TimeSeriesPartition(0, Schemas.deltaHistogram, 0, shardInfo, 1)
val p2 = new TimeSeriesPartition(0, Schemas.promHistogram, 0, shardInfo, 1)
p1.doesSchemaMatchOrBackCompatibleHistograms(p2.schema.name, p2.schema.schemaHash) shouldEqual false

val p3 = new TimeSeriesPartition(0, Schemas.promCounter, 0, shardInfo, 1)
val p4 = new TimeSeriesPartition(0, Schemas.deltaCounter, 0, shardInfo, 1)
p3.doesSchemaMatchOrBackCompatibleHistograms(p4.schema.name, p4.schema.schemaHash) shouldEqual false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ object InputRecord {
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addBlob(hist.serialize())
builder.addDouble(min)
builder.addDouble(max)
builder.addBlob(hist.serialize())

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
Expand Down Expand Up @@ -220,9 +220,9 @@ object InputRecord {
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addBlob(hist.serialize())
builder.addDouble(min)
builder.addDouble(max)
builder.addBlob(hist.serialize())

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object TestTimeseriesProducer extends StrictLogging {
if (histSchema == Schemas.otelDeltaHistogram || histSchema == Schemas.otelCumulativeHistogram) {
val minVal = buckets.min.toDouble
val maxVal = buckets.max.toDouble
new MetricTagInputRecord(Seq(timestamp, sum, count, minVal, maxVal, hist), metric, tags, histSchema)
new MetricTagInputRecord(Seq(timestamp, sum, count, hist, minVal, maxVal), metric, tags, histSchema)
}
else {
new MetricTagInputRecord(Seq(timestamp, sum, count, hist), metric, tags, histSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
builder2.allContainers.head.iterate(Schemas.otelDeltaHistogram.ingestionSchema).foreach { row =>
row.getDouble(1) shouldEqual sum
row.getDouble(2) shouldEqual count
row.getDouble(3) shouldEqual min
row.getDouble(4) shouldEqual max
row.getHistogram(5) shouldEqual expected
row.getDouble(4) shouldEqual min
row.getDouble(5) shouldEqual max
row.getHistogram(3) shouldEqual expected
}
}

Expand All @@ -91,9 +91,9 @@ class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
builder2.allContainers.head.iterate(Schemas.otelCumulativeHistogram.ingestionSchema).foreach { row =>
row.getDouble(1) shouldEqual sum
row.getDouble(2) shouldEqual count
row.getDouble(3) shouldEqual min
row.getDouble(4) shouldEqual max
row.getHistogram(5) shouldEqual expected
row.getDouble(4) shouldEqual min
row.getDouble(5) shouldEqual max
row.getHistogram(3) shouldEqual expected
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package filodb.query.exec
import kamon.Kamon
import monix.execution.Scheduler

import filodb.core.DatasetRef
import filodb.core.{DatasetRef, GlobalConfig}
import filodb.core.metadata.Schemas
import filodb.core.query.{ColumnFilter, QueryConfig, QueryContext, QuerySession, QueryWarnings}
import filodb.core.query.Filter.Equals
import filodb.core.store._
import filodb.query.Query.qLogger
import filodb.query.TsCardinalities

final case class UnknownSchemaQueryErr(id: Int) extends
Exception(s"Unknown schema ID $id during query. This likely means a schema config change happened and " +
Expand Down Expand Up @@ -56,13 +57,28 @@ final case class MultiSchemaPartitionsExec(queryContext: QueryContext,
(lookupRes, Some(columnName))
}

/**
* @param ws workspace name
* @return true if the config is defined AND ws is not in the list of disabled workspaces. false otherwise
*/
def isMaxMinEnabledForWorkspace(ws: Option[String]) : Boolean = {
ws.isDefined match {
// we are making sure that the config is defined to avoid any accidental "turn on" of the feature when not desired
case true => (GlobalConfig.workspacesDisabledForMaxMin.isDefined) &&
(!GlobalConfig.workspacesDisabledForMaxMin.get.contains(ws.get))
case false => false
}
}

// scalastyle:off method.length
private def finalizePlan(source: ChunkSource,
querySession: QuerySession): SelectRawPartitionsExec = {
val partMethod = FilteredPartitionScan(ShardSplit(shard), filters)
Kamon.currentSpan().mark("filtered-partition-scan")
var lookupRes = source.lookupPartitions(dataset, partMethod, chunkMethod, querySession)
val metricName = filters.find(_.column == metricColumn).map(_.filter.valuesStrings.head.toString)
val ws = filters.find(x => x.column == TsCardinalities.LABEL_WORKSPACE && x.filter.isInstanceOf[Equals])
.map(_.filter.valuesStrings.head.toString)
var newColName = colName

/*
Expand Down Expand Up @@ -99,11 +115,18 @@ final case class MultiSchemaPartitionsExec(queryContext: QueryContext,
// Get exact column IDs needed, including max column as needed for histogram calculations.
// This code is responsible for putting exact IDs needed by any range functions.
val colIDs1 = getColumnIDs(sch, newColName.toSeq, rangeVectorTransformers)
val colIDs = addIDsForHistMaxMin(sch, colIDs1)

val colIDs = isMaxMinEnabledForWorkspace(ws) match {
case true => addIDsForHistMaxMin(sch, colIDs1)
case _ => colIDs1
}

// Modify transformers as needed for histogram w/ max, downsample, other schemas
val newxformers1 = newXFormersForDownsample(sch, rangeVectorTransformers)
val newxformers = newXFormersForHistMaxMin(sch, colIDs, newxformers1)
val newxformers = isMaxMinEnabledForWorkspace(ws) match {
case true => newXFormersForHistMaxMin(sch, colIDs, newxformers1)
case _ => newxformers1
}

val newPlan = SelectRawPartitionsExec(queryContext, dispatcher, dataset,
Some(sch), Some(lookupRes),
Expand Down
Loading

0 comments on commit 99134a2

Please sign in to comment.