Skip to content

Commit

Permalink
Upgraded the Deequ version to 2.0.7
Browse files Browse the repository at this point in the history
- A new Deequ version 2.0.7 was released for Spark versions 3.1 to 3.5. As part of this change, we upgrade the Deequ dependencies to 2.0.7.
- The new Deequ version contains updated APIs and therefore, we change the PyDeequ code to conform to the API changes. For now, we pass in default parameters for the API changes. In a future release, we will expose the parameter changes in PyDeequ's public interface as well.
- Support for older Spark versions 2.4 and 3.0 is removed. The Github workflow is also updated accordingly.
  • Loading branch information
rdsharma26 committed Apr 26, 2024
1 parent 59274a5 commit 451eec6
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 93 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
PYSPARK_VERSION: ["3.0", "3.1.3", "3.2", "3.3"]
PYSPARK_VERSION: ["3.1.3", "3.2", "3.3"]

steps:
- uses: actions/checkout@v3
Expand Down
99 changes: 43 additions & 56 deletions pydeequ/analyzers.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ def _analyzer_jvm(self):
:return self: access the value of the Completeness analyzer.
"""
return self._deequAnalyzers.Completeness(self.column, self._jvm.scala.Option.apply(self.where))
return self._deequAnalyzers.Completeness(
self.column,
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.Option.apply(None)
)


class Compliance(_AnalyzerObject):
Expand Down Expand Up @@ -303,19 +307,13 @@ def _analyzer_jvm(self):
:return self
"""
if SPARK_VERSION == "3.3":
return self._deequAnalyzers.Compliance(
self.instance,
self.predicate,
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.collection.Seq.empty()
)
else:
return self._deequAnalyzers.Compliance(
self.instance,
self.predicate,
self._jvm.scala.Option.apply(self.where)
)
return self._deequAnalyzers.Compliance(
self.instance,
self.predicate,
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.collection.Seq.empty(),
self._jvm.scala.Option.apply(None)
)


class Correlation(_AnalyzerObject):
Expand Down Expand Up @@ -469,22 +467,14 @@ def _analyzer_jvm(self):
"""
if not self.maxDetailBins:
self.maxDetailBins = getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$3")()
if SPARK_VERSION == "3.3":
return self._deequAnalyzers.Histogram(
self.column,
self._jvm.scala.Option.apply(self.binningUdf),
self.maxDetailBins,
self._jvm.scala.Option.apply(self.where),
getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$5")(),
getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$6")()
)
else:
return self._deequAnalyzers.Histogram(
self.column,
self._jvm.scala.Option.apply(self.binningUdf),
self.maxDetailBins,
self._jvm.scala.Option.apply(self.where)
)
return self._deequAnalyzers.Histogram(
self.column,
self._jvm.scala.Option.apply(self.binningUdf),
self.maxDetailBins,
self._jvm.scala.Option.apply(self.where),
getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$5")(),
getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$6")()
)


class KLLParameters:
Expand Down Expand Up @@ -553,7 +543,9 @@ def _analyzer_jvm(self):
:return self
"""
return self._deequAnalyzers.Maximum(self.column, self._jvm.scala.Option.apply(self.where))
return self._deequAnalyzers.Maximum(
self.column, self._jvm.scala.Option.apply(self.where), self._jvm.scala.Option.apply(None)
)


class MaxLength(_AnalyzerObject):
Expand All @@ -575,17 +567,11 @@ def _analyzer_jvm(self):
:return self
"""
if SPARK_VERSION == "3.3":
return self._deequAnalyzers.MaxLength(
self.column,
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.Option.apply(None)
)
else:
return self._deequAnalyzers.MaxLength(
self.column,
self._jvm.scala.Option.apply(self.where)
)
return self._deequAnalyzers.MaxLength(
self.column,
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.Option.apply(None)
)


class Mean(_AnalyzerObject):
Expand Down Expand Up @@ -628,7 +614,9 @@ def _analyzer_jvm(self):
:return self
"""
return self._deequAnalyzers.Minimum(self.column, self._jvm.scala.Option.apply(self.where))
return self._deequAnalyzers.Minimum(
self.column, self._jvm.scala.Option.apply(self.where), self._jvm.scala.Option.apply(None)
)


class MinLength(_AnalyzerObject):
Expand All @@ -651,17 +639,11 @@ def _analyzer_jvm(self):
:return self
"""
if SPARK_VERSION == "3.3":
return self._deequAnalyzers.MinLength(
self.column,
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.Option.apply(None)
)
else:
return self._deequAnalyzers.MinLength(
self.column,
self._jvm.scala.Option.apply(self.where)
)
return self._deequAnalyzers.MinLength(
self.column,
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.Option.apply(None)
)


class MutualInformation(_AnalyzerObject):
Expand Down Expand Up @@ -725,6 +707,7 @@ def _analyzer_jvm(self):
# TODO: revisit bc scala constructor does some weird implicit type casting from python str -> java list
# if we don't cast it to str()
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.Option.apply(None)
)


Expand Down Expand Up @@ -814,7 +797,9 @@ def _analyzer_jvm(self):
:return self
"""
return self._deequAnalyzers.Uniqueness(
to_scala_seq(self._jvm, self.columns), self._jvm.scala.Option.apply(self.where)
to_scala_seq(self._jvm, self.columns),
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.Option.apply(None)
)


Expand All @@ -839,7 +824,9 @@ def _analyzer_jvm(self):
:return self
"""
return self._deequAnalyzers.UniqueValueRatio(
to_scala_seq(self._jvm, self.columns), self._jvm.scala.Option.apply(self.where)
to_scala_seq(self._jvm, self.columns),
self._jvm.scala.Option.apply(self.where),
self._jvm.scala.Option.apply(None)
)

class DataTypeInstances(Enum):
Expand Down
50 changes: 19 additions & 31 deletions pydeequ/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def isComplete(self, column, hint=None):
:return: isComplete self:A Check.scala object that asserts on a column completion.
"""
hint = self._jvm.scala.Option.apply(hint)
self._Check = self._Check.isComplete(column, hint)
self._Check = self._Check.isComplete(column, hint, self._jvm.scala.Option.apply(None))
return self

def hasCompleteness(self, column, assertion, hint=None):
Expand All @@ -170,7 +170,7 @@ def hasCompleteness(self, column, assertion, hint=None):
"""
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
hint = self._jvm.scala.Option.apply(hint)
self._Check = self._Check.hasCompleteness(column, assertion_func, hint)
self._Check = self._Check.hasCompleteness(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
return self

def areComplete(self, columns, hint=None):
Expand Down Expand Up @@ -234,7 +234,7 @@ def isUnique(self, column, hint=None):
:return: isUnique self: A Check.scala object that asserts uniqueness in the column.
"""
hint = self._jvm.scala.Option.apply(hint)
self._Check = self._Check.isUnique(column, hint)
self._Check = self._Check.isUnique(column, hint, self._jvm.scala.Option.apply(None))
return self

def isPrimaryKey(self, column, *columns, hint=None):
Expand Down Expand Up @@ -297,7 +297,7 @@ def hasUniqueValueRatio(self, columns, assertion, hint=None):
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
hint = self._jvm.scala.Option.apply(hint)
columns_seq = to_scala_seq(self._jvm, columns)
self._Check = self._Check.hasUniqueValueRatio(columns_seq, assertion_func, hint)
self._Check = self._Check.hasUniqueValueRatio(columns_seq, assertion_func, hint, self._jvm.scala.Option.apply(None))
return self

def hasNumberOfDistinctValues(self, column, assertion, binningUdf, maxBins, hint=None):
Expand Down Expand Up @@ -418,11 +418,7 @@ def hasMinLength(self, column, assertion, hint=None):
"""
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
hint = self._jvm.scala.Option.apply(hint)
if SPARK_VERSION == "3.3":
self._Check = self._Check.hasMinLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
else:
self._Check = self._Check.hasMinLength(column, assertion_func, hint)

self._Check = self._Check.hasMinLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
return self

def hasMaxLength(self, column, assertion, hint=None):
Expand All @@ -437,10 +433,7 @@ def hasMaxLength(self, column, assertion, hint=None):
"""
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
hint = self._jvm.scala.Option.apply(hint)
if SPARK_VERSION == "3.3":
self._Check = self._Check.hasMaxLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
else:
self._Check = self._Check.hasMaxLength(column, assertion_func, hint)
self._Check = self._Check.hasMaxLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
return self

def hasMin(self, column, assertion, hint=None):
Expand All @@ -456,7 +449,7 @@ def hasMin(self, column, assertion, hint=None):
"""
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
hint = self._jvm.scala.Option.apply(hint)
self._Check = self._Check.hasMin(column, assertion_func, hint)
self._Check = self._Check.hasMin(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
return self

def hasMax(self, column, assertion, hint=None):
Expand All @@ -472,7 +465,7 @@ def hasMax(self, column, assertion, hint=None):
"""
assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
hint = self._jvm.scala.Option.apply(hint)
self._Check = self._Check.hasMax(column, assertion_func, hint)
self._Check = self._Check.hasMax(column, assertion_func, hint, self._jvm.scala.Option.apply(None))
return self

def hasMean(self, column, assertion, hint=None):
Expand Down Expand Up @@ -565,21 +558,14 @@ def satisfies(self, columnCondition, constraintName, assertion=None, hint=None):
else getattr(self._Check, "satisfies$default$3")()
)
hint = self._jvm.scala.Option.apply(hint)
if SPARK_VERSION == "3.3":
self._Check = self._Check.satisfies(
columnCondition,
constraintName,
assertion_func,
hint,
self._jvm.scala.collection.Seq.empty()
)
else:
self._Check = self._Check.satisfies(
columnCondition,
constraintName,
assertion_func,
hint
)
self._Check = self._Check.satisfies(
columnCondition,
constraintName,
assertion_func,
hint,
self._jvm.scala.collection.Seq.empty(),
self._jvm.scala.Option.apply(None)
)
return self

def hasPattern(self, column, pattern, assertion=None, name=None, hint=None):
Expand All @@ -602,7 +588,9 @@ def hasPattern(self, column, pattern, assertion=None, name=None, hint=None):
name = self._jvm.scala.Option.apply(name)
hint = self._jvm.scala.Option.apply(hint)
pattern_regex = self._jvm.scala.util.matching.Regex(pattern, None)
self._Check = self._Check.hasPattern(column, pattern_regex, assertion_func, name, hint)
self._Check = self._Check.hasPattern(
column, pattern_regex, assertion_func, name, hint, self._jvm.scala.Option.apply(None)
)
return self

def containsCreditCardNumber(self, column, assertion=None, hint=None):
Expand Down
8 changes: 3 additions & 5 deletions pydeequ/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@


SPARK_TO_DEEQU_COORD_MAPPING = {
"3.3": "com.amazon.deequ:deequ:2.0.4-spark-3.3",
"3.2": "com.amazon.deequ:deequ:2.0.1-spark-3.2",
"3.1": "com.amazon.deequ:deequ:2.0.0-spark-3.1",
"3.0": "com.amazon.deequ:deequ:1.2.2-spark-3.0",
"2.4": "com.amazon.deequ:deequ:1.1.0_spark-2.4-scala-2.11",
"3.3": "com.amazon.deequ:deequ:2.0.7-spark-3.3",
"3.2": "com.amazon.deequ:deequ:2.0.7-spark-3.2",
"3.1": "com.amazon.deequ:deequ:2.0.7-spark-3.1"
}


Expand Down

0 comments on commit 451eec6

Please sign in to comment.