Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test and modification to bestCandidate Calculation #765

Merged
merged 8 commits into from
Mar 19, 2021
38 changes: 26 additions & 12 deletions core/src/main/scala/com/yahoo/maha/core/query/QueryPipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -450,22 +450,36 @@ trait QueryPipelineFactory {

}

case class RollupTuple(name: String, engine: Engine, costEstimate: Long, level: Int, cardinality: Int) {
def getTuple = (name, engine, costEstimate, level, cardinality)
}

object DefaultQueryPipelineFactory extends Logging {

val druidMultiQueryEngineList: Seq[Engine] = List(OracleEngine, PostgresEngine)

private[this] def aLessThanBByLevelAndCostAndCardinality(a: (String, Engine, Long, Int, Int), b: (String, Engine, Long, Int, Int)): Boolean = {
if (a._2 == b._2) {
if (a._4 == b._4) {
a._3 < b._3
def rollupComparator(
f: (RollupTuple, RollupTuple) => Boolean = aLessThanBByLevelAndCostAndCardinality
) = {
f
}

private[this] def aLessThanBByLevelAndCostAndCardinality(a: RollupTuple, b: RollupTuple): Boolean = {
if (a.engine == b.engine) {
if (a.costEstimate == b.costEstimate) {
a.level < b.level
} else {
a._4 < b._4
a.costEstimate < b.costEstimate
}
} else {
if (a._5 == b._5) {
a._3 < b._3
if (a.cardinality == b.cardinality) {
if(a.costEstimate == b.costEstimate){
a.level < b.level
} else {
a.costEstimate < b.costEstimate
}
} else {
a._5 < b._5
a.cardinality < b.cardinality
}
}
}
Expand Down Expand Up @@ -529,7 +543,7 @@ object DefaultQueryPipelineFactory extends Logging {
if (requestModel.isDebugEnabled) {
info(s"disqualifySet = $disqualifySet")
}
val result: IndexedSeq[(String, Engine, Long, Int, Int)] = requestModel.factCost.toIndexedSeq.collect {
val result: IndexedSeq[RollupTuple] = requestModel.factCost.toIndexedSeq.collect {
case ((fn, engine), rowcost) if (!queryGeneratorRegistry.getDefaultGenerator(engine).isDefined || queryGeneratorRegistry.getDefaultGenerator(engine).get.validateEngineConstraints(requestModel)) && (forceEngine.contains(engine) || (!disqualifySet(engine) && forceEngine.isEmpty)) =>
val fact = requestModel.bestCandidates.get.facts(fn).fact
val level = fact.level
Expand All @@ -538,11 +552,11 @@ object DefaultQueryPipelineFactory extends Logging {
if (requestModel.isDebugEnabled) {
info(s"fn=$fn engine=$engine cost=${rowcost.costEstimate} level=$level cardinalityPreference=$dimCardinalityPreference")
}
(fn, engine, rowcost.costEstimate, level, dimCardinalityPreference)
}.sortWith(aLessThanBByLevelAndCostAndCardinality)
RollupTuple(fn, engine, rowcost.costEstimate, level, dimCardinalityPreference)
}.sortWith(rollupComparator())
require(result.nonEmpty,
s"Failed to find best candidate, forceEngine=$forceEngine, engine disqualifyingSet=$disqualifySet, candidates=${requestModel.bestCandidates.get.facts.mapValues(_.fact.engine).toSet}")
requestModel.bestCandidates.get.getFactBestCandidate(result.head._1, requestModel)
requestModel.bestCandidates.get.getFactBestCandidate(result.head.name, requestModel)
}
}

Expand Down
18 changes: 17 additions & 1 deletion core/src/test/scala/com/yahoo/maha/core/ColumnTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
package com.yahoo.maha.core

import com.yahoo.maha.core.dimension.{ConstDimCol, DimCol, HivePartDimCol}
import com.yahoo.maha.core.fact.{ConstFactCol, DruidConstDerFactCol, HiveDerFactCol, NoopRollup}
import com.yahoo.maha.core.fact.{ConstFactCol, DruidConstDerFactCol, HiveDerFactCol, NoopRollup, PostgresDerFactCol}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -69,4 +69,20 @@ class ColumnTest extends AnyFunSuite with Matchers {
}
}
}

test("PostgresDerFactCol test") {
ColumnContext.withColumnContext { implicit cc: ColumnContext =>
import PostgresExpression._
DimCol("dimCol", IntType())
val col = PostgresDerFactCol("postgres_ctr_copywith_test", DecType(), "{clicks}" /- "{impressions}" * "1000")
ColumnContext.withColumnContext {
implicit cc:ColumnContext=>
col.copyWith(cc, Map.empty, true)
}
ColumnContext.withColumnContext {
implicit cc:ColumnContext=>
col.copyWith(cc, Map.empty, false)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,63 @@ class QueryPipelineWithFallbackTest extends AnyFunSuite with Matchers with Befor
pipeline = builder._1.toOption.get.build()
assert(pipeline.fallbackQueryChainOption.isEmpty, s"No fallback query expected: $pipeline")
}

test("Demonstrate the fix in the new result comparator.") {

def aLessThanBByLevelAndCostAndCardinality(a: RollupTuple, b: RollupTuple): Boolean = {
if (a.engine == b.engine) {
if (a.level == b.level) {
a.costEstimate < b.costEstimate
} else {
a.level < b.level
}
} else {
if (a.cardinality == b.cardinality) {
a.costEstimate < b.costEstimate
} else {
a.cardinality < b.cardinality
}
}
}

//(fn, engine, rowcost.costEstimate, level, dimCardinalityPreference)
//(name:String, engine: Engine, cost: Long, level: Int, cardinality: Int)

val t1 = RollupTuple("dr_stats_hourly", DruidEngine, 1600L, 9993, 8675309)
val t2 = RollupTuple("dr_ad_stats_hourly", DruidEngine, 1600L, 9995, 8675309)
val t3 = RollupTuple("oracle_stats", OracleEngine, 1600L, 9992, 8675309)
val t4 = RollupTuple("oracle_ad_stats", OracleEngine, 1600L, 9994, 8675309)
val t5 = RollupTuple("dr_teacher_ad_stats", DruidEngine, 1600L, 9998, 8675309)
val t6 = RollupTuple("oracle_teacher_stats", OracleEngine, 1600L, 9991, 8675309)
val t7 = RollupTuple("dr_teacher_stats_hourly", DruidEngine, 1600L, 9992, 8675309)
val t8 = RollupTuple("dr_teacher_ad_stats_hourly", DruidEngine, 1600L, 9997, 8675309)


val newResult = Vector(t1,t2,t3,t4,t5,t6,t7,t8).sortWith(DefaultQueryPipelineFactory.rollupComparator())
val oldResult = Vector(t1,t2,t3,t4,t5,t6,t7,t8).sortWith(DefaultQueryPipelineFactory.rollupComparator(aLessThanBByLevelAndCostAndCardinality))

//println(oldResult.mkString("\n"))
//println(newResult.mkString("\n"))

assert(oldResult.map(_.getTuple).mkString("\n").equals(
"(dr_stats_hourly,Druid,1600,9993,8675309)\n" +
"(dr_ad_stats_hourly,Druid,1600,9995,8675309)\n" +
"(oracle_stats,Oracle,1600,9992,8675309)\n" +
"(oracle_ad_stats,Oracle,1600,9994,8675309)\n" +
"(dr_teacher_ad_stats,Druid,1600,9998,8675309)\n" +
"(oracle_teacher_stats,Oracle,1600,9991,8675309)\n" +
"(dr_teacher_stats_hourly,Druid,1600,9992,8675309)\n" +
"(dr_teacher_ad_stats_hourly,Druid,1600,9997,8675309)"
))
assert(newResult.map(_.getTuple).mkString("\n").equals(
"(oracle_teacher_stats,Oracle,1600,9991,8675309)\n" +
"(oracle_stats,Oracle,1600,9992,8675309)\n" +
"(dr_teacher_stats_hourly,Druid,1600,9992,8675309)\n" +
"(dr_stats_hourly,Druid,1600,9993,8675309)\n" +
"(oracle_ad_stats,Oracle,1600,9994,8675309)\n" +
"(dr_ad_stats_hourly,Druid,1600,9995,8675309)\n" +
"(dr_teacher_ad_stats_hourly,Druid,1600,9997,8675309)\n" +
"(dr_teacher_ad_stats,Druid,1600,9998,8675309)"
))
}
}