Skip to content

Commit

Permalink
Incorrect parsing of aggregates in DB queries (#790)
Browse files Browse the repository at this point in the history
* Incorrect parsing of aggregates in DB queries

Fixes #786

DB uses `finalmerge_` as a prefix for final merge while Spark uses an empty prefix
This PR is to replace the prefixes as follows:
`finalmerge_`, `partial_`, `merge_`

---------

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
amahussein authored Feb 15, 2024
1 parent 649a356 commit ec5239e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,14 @@ object SQLPlanParser extends Logging {

val windowFunctionPattern = """(\w+)\(""".r

val aggregatePrefixes = Set(
"finalmerge_", // DB specific prefix for final merge agg functions
"partial_", // used for partials
"merge_" // Used for partial merge
)

val ignoreExpressions = Set("any", "cast", "ansi_cast", "decimal", "decimaltype", "every",
"some", "merge_max", "merge_min", "merge_sum", "merge_count", "merge_avg", "merge_first",
"some",
"list",
// some ops turn into literals and they should not cause any fallbacks
"current_database", "current_user", "current_timestamp",
Expand Down Expand Up @@ -625,11 +631,30 @@ object SQLPlanParser extends Logging {
}

private def getAllFunctionNames(regPattern: Regex, expr: String,
groupInd: Int = 1): Set[String] = {
groupInd: Int = 1, isAggr: Boolean = true): Set[String] = {
// Returns all matches in an expression. This can be used when the SQL expression is not
// tokenized.
val newExpr = processSpecialFunctions(expr)
regPattern.findAllMatchIn(newExpr).map(_.group(groupInd)).toSet.filterNot(ignoreExpression(_))

// first get all the functionNames
val exprss =
regPattern.findAllMatchIn(newExpr).map(_.group(groupInd)).toSet

// For aggregate expressions we want to process the results to remove the prefix
// DB: remove the "^partial_" and "^finalmerge_" prefixes
// TODO:
// for performance sake, we can turn off the aggregate processing by enabling it only
// when needed. However, for now, we always do this processing until we are confident we know
// the correct place to turn on/off that flag.we can use the argument isAgg only when needed
val results = if (isAggr) {
exprss.collect {
case func =>
aggregatePrefixes.find(func.startsWith(_)).map(func.replaceFirst(_, "")).getOrElse(func)
}
} else {
exprss
}
results.filterNot(ignoreExpression(_))
}

def parseProjectExpressions(exprStr: String): Array[String] = {
Expand Down Expand Up @@ -665,10 +690,8 @@ object SQLPlanParser extends Logging {
val group_value = m.group(group_ind)
if (patternMap.getOrElse(group_value, false)) {
val clauseExpr = m.group(group_ind + 1)
// Here "partial_" and "merge_" is removed and only function name is preserved.
val processedExpr = clauseExpr.replaceAll("partial_", "").replaceAll("merge_", "")
// No need to split the expr any further because we are only interested in function names
val used_functions = getAllFunctionNames(functionPrefixPattern, processedExpr)
val used_functions = getAllFunctionNames(functionPrefixPattern, clauseExpr)
parsedExpressions ++= used_functions
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,9 +974,8 @@ object QualOutputWriter {
sumInfo: QualificationSummaryInfo): Set[ExecInfo] = {
sumInfo.planInfo.map(_.execInfo).collect {
case execInfos =>
// No need to flatten the execs because by definition wholeCodeGen execs should not be part
// of that list
execInfos.filter(exec => exec.stages.isEmpty && !exec.isSupported)
val allExecs = flattenedExecs(execInfos)
allExecs.filter(exec => exec.stages.isEmpty && !exec.isSupported)
}.flatten.toSet
}

Expand Down

0 comments on commit ec5239e

Please sign in to comment.