From f739fedb3244589399ebd2fd97f362583d2d9a1f Mon Sep 17 00:00:00 2001 From: rdsharma26 <65777064+rdsharma26@users.noreply.github.com> Date: Fri, 26 Apr 2024 15:50:12 -0400 Subject: [PATCH] Upgraded the Deequ version to 2.0.7 (#200) - A new Deequ version 2.0.7 was released for Spark versions 3.1 to 3.5. As part of this change, we upgrade the Deequ dependencies to 2.0.7. - The new Deequ version contains updated APIs and therefore, we change the PyDeequ code to conform to the API changes. For now, we pass in default parameters for the API changes. In a future release, we will expose the parameter changes in PyDeequ's public interface as well. - Support for older Spark versions 2.4 and 3.0 is removed. The Github workflow is also updated accordingly. --- .github/workflows/base.yml | 2 +- pydeequ/analyzers.py | 99 +++++++++++++++++--------------------- pydeequ/checks.py | 50 ++++++++----------- pydeequ/configs.py | 8 ++- 4 files changed, 66 insertions(+), 93 deletions(-) diff --git a/.github/workflows/base.yml b/.github/workflows/base.yml index ea5f8f0..67c6268 100644 --- a/.github/workflows/base.yml +++ b/.github/workflows/base.yml @@ -12,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - PYSPARK_VERSION: ["3.0", "3.1.3", "3.2", "3.3"] + PYSPARK_VERSION: ["3.1.3", "3.2", "3.3"] steps: - uses: actions/checkout@v3 diff --git a/pydeequ/analyzers.py b/pydeequ/analyzers.py index 4289094..3952c93 100644 --- a/pydeequ/analyzers.py +++ b/pydeequ/analyzers.py @@ -271,7 +271,11 @@ def _analyzer_jvm(self): :return self: access the value of the Completeness analyzer. """ - return self._deequAnalyzers.Completeness(self.column, self._jvm.scala.Option.apply(self.where)) + return self._deequAnalyzers.Completeness( + self.column, + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) + ) class Compliance(_AnalyzerObject): @@ -303,19 +307,13 @@ def _analyzer_jvm(self): :return self """ - if SPARK_VERSION == "3.3": - return self._deequAnalyzers.Compliance( - self.instance, - self.predicate, - self._jvm.scala.Option.apply(self.where), - self._jvm.scala.collection.Seq.empty() - ) - else: - return self._deequAnalyzers.Compliance( - self.instance, - self.predicate, - self._jvm.scala.Option.apply(self.where) - ) + return self._deequAnalyzers.Compliance( + self.instance, + self.predicate, + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.collection.Seq.empty(), + self._jvm.scala.Option.apply(None) + ) class Correlation(_AnalyzerObject): @@ -469,22 +467,14 @@ def _analyzer_jvm(self): """ if not self.maxDetailBins: self.maxDetailBins = getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$3")() - if SPARK_VERSION == "3.3": - return self._deequAnalyzers.Histogram( - self.column, - self._jvm.scala.Option.apply(self.binningUdf), - self.maxDetailBins, - self._jvm.scala.Option.apply(self.where), - getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$5")(), - getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$6")() - ) - else: - return self._deequAnalyzers.Histogram( - self.column, - self._jvm.scala.Option.apply(self.binningUdf), - self.maxDetailBins, - self._jvm.scala.Option.apply(self.where) - ) + return self._deequAnalyzers.Histogram( + self.column, + self._jvm.scala.Option.apply(self.binningUdf), + self.maxDetailBins, + self._jvm.scala.Option.apply(self.where), + getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$5")(), + getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$6")() + ) class KLLParameters: @@ -553,7 +543,9 @@ def _analyzer_jvm(self): :return self """ - return self._deequAnalyzers.Maximum(self.column, self._jvm.scala.Option.apply(self.where)) + return self._deequAnalyzers.Maximum( + self.column, self._jvm.scala.Option.apply(self.where), self._jvm.scala.Option.apply(None) + ) class MaxLength(_AnalyzerObject): @@ -575,17 +567,11 @@ def _analyzer_jvm(self): :return self """ - if SPARK_VERSION == "3.3": - return self._deequAnalyzers.MaxLength( - self.column, - self._jvm.scala.Option.apply(self.where), - self._jvm.scala.Option.apply(None) - ) - else: - return self._deequAnalyzers.MaxLength( - self.column, - self._jvm.scala.Option.apply(self.where) - ) + return self._deequAnalyzers.MaxLength( + self.column, + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) + ) class Mean(_AnalyzerObject): @@ -628,7 +614,9 @@ def _analyzer_jvm(self): :return self """ - return self._deequAnalyzers.Minimum(self.column, self._jvm.scala.Option.apply(self.where)) + return self._deequAnalyzers.Minimum( + self.column, self._jvm.scala.Option.apply(self.where), self._jvm.scala.Option.apply(None) + ) class MinLength(_AnalyzerObject): @@ -651,17 +639,11 @@ def _analyzer_jvm(self): :return self """ - if SPARK_VERSION == "3.3": - return self._deequAnalyzers.MinLength( - self.column, - self._jvm.scala.Option.apply(self.where), - self._jvm.scala.Option.apply(None) - ) - else: - return self._deequAnalyzers.MinLength( - self.column, - self._jvm.scala.Option.apply(self.where) - ) + return self._deequAnalyzers.MinLength( + self.column, + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) + ) class MutualInformation(_AnalyzerObject): @@ -725,6 +707,7 @@ def _analyzer_jvm(self): # TODO: revisit bc scala constructor does some weird implicit type casting from python str -> java list # if we don't cast it to str() self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) ) @@ -814,7 +797,9 @@ def _analyzer_jvm(self): :return self """ return self._deequAnalyzers.Uniqueness( - to_scala_seq(self._jvm, self.columns), self._jvm.scala.Option.apply(self.where) + to_scala_seq(self._jvm, self.columns), + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) ) @@ -839,7 +824,9 @@ def _analyzer_jvm(self): :return self """ return self._deequAnalyzers.UniqueValueRatio( - to_scala_seq(self._jvm, self.columns), self._jvm.scala.Option.apply(self.where) + to_scala_seq(self._jvm, self.columns), + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) ) class DataTypeInstances(Enum): diff --git a/pydeequ/checks.py b/pydeequ/checks.py index a95c178..749f74d 100644 --- a/pydeequ/checks.py +++ b/pydeequ/checks.py @@ -154,7 +154,7 @@ def isComplete(self, column, hint=None): :return: isComplete self:A Check.scala object that asserts on a column completion. """ hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.isComplete(column, hint) + self._Check = self._Check.isComplete(column, hint, self._jvm.scala.Option.apply(None)) return self def hasCompleteness(self, column, assertion, hint=None): @@ -170,7 +170,7 @@ def hasCompleteness(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.hasCompleteness(column, assertion_func, hint) + self._Check = self._Check.hasCompleteness(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def areComplete(self, columns, hint=None): @@ -234,7 +234,7 @@ def isUnique(self, column, hint=None): :return: isUnique self: A Check.scala object that asserts uniqueness in the column. """ hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.isUnique(column, hint) + self._Check = self._Check.isUnique(column, hint, self._jvm.scala.Option.apply(None)) return self def isPrimaryKey(self, column, *columns, hint=None): @@ -297,7 +297,7 @@ def hasUniqueValueRatio(self, columns, assertion, hint=None): assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) columns_seq = to_scala_seq(self._jvm, columns) - self._Check = self._Check.hasUniqueValueRatio(columns_seq, assertion_func, hint) + self._Check = self._Check.hasUniqueValueRatio(columns_seq, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasNumberOfDistinctValues(self, column, assertion, binningUdf, maxBins, hint=None): @@ -418,11 +418,7 @@ def hasMinLength(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - if SPARK_VERSION == "3.3": - self._Check = self._Check.hasMinLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) - else: - self._Check = self._Check.hasMinLength(column, assertion_func, hint) - + self._Check = self._Check.hasMinLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasMaxLength(self, column, assertion, hint=None): @@ -437,10 +433,7 @@ def hasMaxLength(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - if SPARK_VERSION == "3.3": - self._Check = self._Check.hasMaxLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) - else: - self._Check = self._Check.hasMaxLength(column, assertion_func, hint) + self._Check = self._Check.hasMaxLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasMin(self, column, assertion, hint=None): @@ -456,7 +449,7 @@ def hasMin(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.hasMin(column, assertion_func, hint) + self._Check = self._Check.hasMin(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasMax(self, column, assertion, hint=None): @@ -472,7 +465,7 @@ def hasMax(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.hasMax(column, assertion_func, hint) + self._Check = self._Check.hasMax(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasMean(self, column, assertion, hint=None): @@ -565,21 +558,14 @@ def satisfies(self, columnCondition, constraintName, assertion=None, hint=None): else getattr(self._Check, "satisfies$default$3")() ) hint = self._jvm.scala.Option.apply(hint) - if SPARK_VERSION == "3.3": - self._Check = self._Check.satisfies( - columnCondition, - constraintName, - assertion_func, - hint, - self._jvm.scala.collection.Seq.empty() - ) - else: - self._Check = self._Check.satisfies( - columnCondition, - constraintName, - assertion_func, - hint - ) + self._Check = self._Check.satisfies( + columnCondition, + constraintName, + assertion_func, + hint, + self._jvm.scala.collection.Seq.empty(), + self._jvm.scala.Option.apply(None) + ) return self def hasPattern(self, column, pattern, assertion=None, name=None, hint=None): @@ -602,7 +588,9 @@ def hasPattern(self, column, pattern, assertion=None, name=None, hint=None): name = self._jvm.scala.Option.apply(name) hint = self._jvm.scala.Option.apply(hint) pattern_regex = self._jvm.scala.util.matching.Regex(pattern, None) - self._Check = self._Check.hasPattern(column, pattern_regex, assertion_func, name, hint) + self._Check = self._Check.hasPattern( + column, pattern_regex, assertion_func, name, hint, self._jvm.scala.Option.apply(None) + ) return self def containsCreditCardNumber(self, column, assertion=None, hint=None): diff --git a/pydeequ/configs.py b/pydeequ/configs.py index 49cb277..ea4d0e8 100644 --- a/pydeequ/configs.py +++ b/pydeequ/configs.py @@ -5,11 +5,9 @@ SPARK_TO_DEEQU_COORD_MAPPING = { - "3.3": "com.amazon.deequ:deequ:2.0.4-spark-3.3", - "3.2": "com.amazon.deequ:deequ:2.0.1-spark-3.2", - "3.1": "com.amazon.deequ:deequ:2.0.0-spark-3.1", - "3.0": "com.amazon.deequ:deequ:1.2.2-spark-3.0", - "2.4": "com.amazon.deequ:deequ:1.1.0_spark-2.4-scala-2.11", + "3.3": "com.amazon.deequ:deequ:2.0.7-spark-3.3", + "3.2": "com.amazon.deequ:deequ:2.0.7-spark-3.2", + "3.1": "com.amazon.deequ:deequ:2.0.7-spark-3.1" }