Skip to content

Commit

Permalink
Fix row level bug when composing outcome (awslabs#594)
Browse files Browse the repository at this point in the history
* Fix row level bug when composing outcome

- When a check fails due to a precondition failure, the row level results are not evaluated correctly.
- For example, let's say a check has a completeness constraint which passes, and a minimum constraint which fails due to a precondition failure.
- The row level results will be the results for just the completeness constraint. There will be no results generated for the minimum constraint, and therefore the row level results will be incorrect.
- We fix this by adding a default outcome for when the row level result column is not provided by the analyzer.

* Added similar logic to RowLevelConstraint as well

Skipped RowLevelGroupedConstraint because only UniqueValueRatio/Uniqueness use it, and they don't use preconditions.
  • Loading branch information
rdsharma26 authored Dec 17, 2024
1 parent 4544616 commit 82537bd
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/main/scala/com/amazon/deequ/VerificationResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.amazon.deequ.repository.SimpleResultSerde
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{col, monotonically_increasing_id}

import java.util.UUID
Expand Down Expand Up @@ -144,9 +145,9 @@ object VerificationResult {
val constraint = constraintResult.constraint
constraint match {
case asserted: RowLevelAssertedConstraint =>
constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_))
constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)).orElse(Some(lit(false)))
case _: RowLevelConstraint =>
constraintResult.metric.flatMap(metricToColumn)
constraintResult.metric.flatMap(metricToColumn).orElse(Some(lit(false)))
case _: RowLevelGroupedConstraint =>
constraintResult.metric.flatMap(metricToColumn)
case _ => None
Expand All @@ -160,7 +161,6 @@ object VerificationResult {
}
}


private[this] def getSimplifiedCheckResultOutput(
verificationResult: VerificationResult)
: Seq[SimpleCheckResultOutput] = {
Expand Down
56 changes: 56 additions & 0 deletions src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1996,6 +1996,62 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
}

"Verification Suite's Row Level Results" should {
"yield correct results for invalid column type" in withSparkSession { sparkSession =>
import sparkSession.implicits._
val df = Seq(
("1", 1, "blue"),
("2", 2, "green"),
("3", 3, "blue"),
("4", 4, "red"),
("5", 5, "purple")
).toDF("id", "id2", "color")

val idColumn = "id"
val id2Column = "id2"

val minCheckOnInvalidColumnDescription = s"min check on $idColumn"
val minCheckOnValidColumnDescription = s"min check on $id2Column"
val patternMatchCheckOnInvalidColumnDescription = s"pattern check on $id2Column"
val patternMatchCheckOnValidColumnDescription = s"pattern check on $idColumn"

val minCheckOnInvalidColumn = Check(CheckLevel.Error, minCheckOnInvalidColumnDescription)
.hasMin(idColumn, _ >= 3)
.isComplete(idColumn)
val minCheckOnValidColumn = Check(CheckLevel.Error, minCheckOnValidColumnDescription)
.hasMin(id2Column, _ >= 3)
.isComplete(id2Column)

val patternMatchCheckOnInvalidColumn = Check(CheckLevel.Error, patternMatchCheckOnInvalidColumnDescription)
.hasPattern(id2Column, "[0-3]+".r)
val patternMatchCheckOnValidColumn = Check(CheckLevel.Error, patternMatchCheckOnValidColumnDescription)
.hasPattern(idColumn, "[0-3]+".r)

val checks = Seq(
minCheckOnInvalidColumn,
minCheckOnValidColumn,
patternMatchCheckOnInvalidColumn,
patternMatchCheckOnValidColumn
)

val verificationResult = VerificationSuite().onData(df).addChecks(checks).run()
val rowLevelResultsDF = VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df)
val rowLevelResults = rowLevelResultsDF.collect()

val minCheckOnInvalidColumnRowLevelResults =
rowLevelResults.map(_.getAs[Boolean](minCheckOnInvalidColumnDescription))
val minCheckOnValidColumnRowLevelResults =
rowLevelResults.map(_.getAs[Boolean](minCheckOnValidColumnDescription))
val patternMatchCheckOnInvalidColumnRowLevelResults =
rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnInvalidColumnDescription))
val patternMatchCheckOnValidColumnRowLevelResults =
rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnValidColumnDescription))

minCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false)
minCheckOnValidColumnRowLevelResults shouldBe Seq(false, false, true, true, true)
patternMatchCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false)
patternMatchCheckOnValidColumnRowLevelResults shouldBe Seq(true, true, true, false, false)
}

"yield correct results for satisfies check" in withSparkSession { sparkSession =>
import sparkSession.implicits._
val df = Seq(
Expand Down

0 comments on commit 82537bd

Please sign in to comment.