diff --git a/.github/workflows/base.yml b/.github/workflows/base.yml index ea5f8f0..67c6268 100644 --- a/.github/workflows/base.yml +++ b/.github/workflows/base.yml @@ -12,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - PYSPARK_VERSION: ["3.0", "3.1.3", "3.2", "3.3"] + PYSPARK_VERSION: ["3.1.3", "3.2", "3.3"] steps: - uses: actions/checkout@v3 diff --git a/pydeequ/analyzers.py b/pydeequ/analyzers.py index 4289094..3952c93 100644 --- a/pydeequ/analyzers.py +++ b/pydeequ/analyzers.py @@ -271,7 +271,11 @@ def _analyzer_jvm(self): :return self: access the value of the Completeness analyzer. """ - return self._deequAnalyzers.Completeness(self.column, self._jvm.scala.Option.apply(self.where)) + return self._deequAnalyzers.Completeness( + self.column, + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) + ) class Compliance(_AnalyzerObject): @@ -303,19 +307,13 @@ def _analyzer_jvm(self): :return self """ - if SPARK_VERSION == "3.3": - return self._deequAnalyzers.Compliance( - self.instance, - self.predicate, - self._jvm.scala.Option.apply(self.where), - self._jvm.scala.collection.Seq.empty() - ) - else: - return self._deequAnalyzers.Compliance( - self.instance, - self.predicate, - self._jvm.scala.Option.apply(self.where) - ) + return self._deequAnalyzers.Compliance( + self.instance, + self.predicate, + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.collection.Seq.empty(), + self._jvm.scala.Option.apply(None) + ) class Correlation(_AnalyzerObject): @@ -469,22 +467,14 @@ def _analyzer_jvm(self): """ if not self.maxDetailBins: self.maxDetailBins = getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$3")() - if SPARK_VERSION == "3.3": - return self._deequAnalyzers.Histogram( - self.column, - self._jvm.scala.Option.apply(self.binningUdf), - self.maxDetailBins, - self._jvm.scala.Option.apply(self.where), - getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$5")(), - getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$6")() - ) - else: - return self._deequAnalyzers.Histogram( - self.column, - self._jvm.scala.Option.apply(self.binningUdf), - self.maxDetailBins, - self._jvm.scala.Option.apply(self.where) - ) + return self._deequAnalyzers.Histogram( + self.column, + self._jvm.scala.Option.apply(self.binningUdf), + self.maxDetailBins, + self._jvm.scala.Option.apply(self.where), + getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$5")(), + getattr(self._jvm.com.amazon.deequ.analyzers.Histogram, "apply$default$6")() + ) class KLLParameters: @@ -553,7 +543,9 @@ def _analyzer_jvm(self): :return self """ - return self._deequAnalyzers.Maximum(self.column, self._jvm.scala.Option.apply(self.where)) + return self._deequAnalyzers.Maximum( + self.column, self._jvm.scala.Option.apply(self.where), self._jvm.scala.Option.apply(None) + ) class MaxLength(_AnalyzerObject): @@ -575,17 +567,11 @@ def _analyzer_jvm(self): :return self """ - if SPARK_VERSION == "3.3": - return self._deequAnalyzers.MaxLength( - self.column, - self._jvm.scala.Option.apply(self.where), - self._jvm.scala.Option.apply(None) - ) - else: - return self._deequAnalyzers.MaxLength( - self.column, - self._jvm.scala.Option.apply(self.where) - ) + return self._deequAnalyzers.MaxLength( + self.column, + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) + ) class Mean(_AnalyzerObject): @@ -628,7 +614,9 @@ def _analyzer_jvm(self): :return self """ - return self._deequAnalyzers.Minimum(self.column, self._jvm.scala.Option.apply(self.where)) + return self._deequAnalyzers.Minimum( + self.column, self._jvm.scala.Option.apply(self.where), self._jvm.scala.Option.apply(None) + ) class MinLength(_AnalyzerObject): @@ -651,17 +639,11 @@ def _analyzer_jvm(self): :return self """ - if SPARK_VERSION == "3.3": - return self._deequAnalyzers.MinLength( - self.column, - self._jvm.scala.Option.apply(self.where), - self._jvm.scala.Option.apply(None) - ) - else: - return self._deequAnalyzers.MinLength( - self.column, - self._jvm.scala.Option.apply(self.where) - ) + return self._deequAnalyzers.MinLength( + self.column, + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) + ) class MutualInformation(_AnalyzerObject): @@ -725,6 +707,7 @@ def _analyzer_jvm(self): # TODO: revisit bc scala constructor does some weird implicit type casting from python str -> java list # if we don't cast it to str() self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) ) @@ -814,7 +797,9 @@ def _analyzer_jvm(self): :return self """ return self._deequAnalyzers.Uniqueness( - to_scala_seq(self._jvm, self.columns), self._jvm.scala.Option.apply(self.where) + to_scala_seq(self._jvm, self.columns), + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) ) @@ -839,7 +824,9 @@ def _analyzer_jvm(self): :return self """ return self._deequAnalyzers.UniqueValueRatio( - to_scala_seq(self._jvm, self.columns), self._jvm.scala.Option.apply(self.where) + to_scala_seq(self._jvm, self.columns), + self._jvm.scala.Option.apply(self.where), + self._jvm.scala.Option.apply(None) ) class DataTypeInstances(Enum): diff --git a/pydeequ/checks.py b/pydeequ/checks.py index a95c178..749f74d 100644 --- a/pydeequ/checks.py +++ b/pydeequ/checks.py @@ -154,7 +154,7 @@ def isComplete(self, column, hint=None): :return: isComplete self:A Check.scala object that asserts on a column completion. """ hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.isComplete(column, hint) + self._Check = self._Check.isComplete(column, hint, self._jvm.scala.Option.apply(None)) return self def hasCompleteness(self, column, assertion, hint=None): @@ -170,7 +170,7 @@ def hasCompleteness(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.hasCompleteness(column, assertion_func, hint) + self._Check = self._Check.hasCompleteness(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def areComplete(self, columns, hint=None): @@ -234,7 +234,7 @@ def isUnique(self, column, hint=None): :return: isUnique self: A Check.scala object that asserts uniqueness in the column. """ hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.isUnique(column, hint) + self._Check = self._Check.isUnique(column, hint, self._jvm.scala.Option.apply(None)) return self def isPrimaryKey(self, column, *columns, hint=None): @@ -297,7 +297,7 @@ def hasUniqueValueRatio(self, columns, assertion, hint=None): assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) columns_seq = to_scala_seq(self._jvm, columns) - self._Check = self._Check.hasUniqueValueRatio(columns_seq, assertion_func, hint) + self._Check = self._Check.hasUniqueValueRatio(columns_seq, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasNumberOfDistinctValues(self, column, assertion, binningUdf, maxBins, hint=None): @@ -418,11 +418,7 @@ def hasMinLength(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - if SPARK_VERSION == "3.3": - self._Check = self._Check.hasMinLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) - else: - self._Check = self._Check.hasMinLength(column, assertion_func, hint) - + self._Check = self._Check.hasMinLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasMaxLength(self, column, assertion, hint=None): @@ -437,10 +433,7 @@ def hasMaxLength(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - if SPARK_VERSION == "3.3": - self._Check = self._Check.hasMaxLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) - else: - self._Check = self._Check.hasMaxLength(column, assertion_func, hint) + self._Check = self._Check.hasMaxLength(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasMin(self, column, assertion, hint=None): @@ -456,7 +449,7 @@ def hasMin(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.hasMin(column, assertion_func, hint) + self._Check = self._Check.hasMin(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasMax(self, column, assertion, hint=None): @@ -472,7 +465,7 @@ def hasMax(self, column, assertion, hint=None): """ assertion_func = ScalaFunction1(self._spark_session.sparkContext._gateway, assertion) hint = self._jvm.scala.Option.apply(hint) - self._Check = self._Check.hasMax(column, assertion_func, hint) + self._Check = self._Check.hasMax(column, assertion_func, hint, self._jvm.scala.Option.apply(None)) return self def hasMean(self, column, assertion, hint=None): @@ -565,21 +558,14 @@ def satisfies(self, columnCondition, constraintName, assertion=None, hint=None): else getattr(self._Check, "satisfies$default$3")() ) hint = self._jvm.scala.Option.apply(hint) - if SPARK_VERSION == "3.3": - self._Check = self._Check.satisfies( - columnCondition, - constraintName, - assertion_func, - hint, - self._jvm.scala.collection.Seq.empty() - ) - else: - self._Check = self._Check.satisfies( - columnCondition, - constraintName, - assertion_func, - hint - ) + self._Check = self._Check.satisfies( + columnCondition, + constraintName, + assertion_func, + hint, + self._jvm.scala.collection.Seq.empty(), + self._jvm.scala.Option.apply(None) + ) return self def hasPattern(self, column, pattern, assertion=None, name=None, hint=None): @@ -602,7 +588,9 @@ def hasPattern(self, column, pattern, assertion=None, name=None, hint=None): name = self._jvm.scala.Option.apply(name) hint = self._jvm.scala.Option.apply(hint) pattern_regex = self._jvm.scala.util.matching.Regex(pattern, None) - self._Check = self._Check.hasPattern(column, pattern_regex, assertion_func, name, hint) + self._Check = self._Check.hasPattern( + column, pattern_regex, assertion_func, name, hint, self._jvm.scala.Option.apply(None) + ) return self def containsCreditCardNumber(self, column, assertion=None, hint=None): diff --git a/pydeequ/configs.py b/pydeequ/configs.py index 49cb277..ea4d0e8 100644 --- a/pydeequ/configs.py +++ b/pydeequ/configs.py @@ -5,11 +5,9 @@ SPARK_TO_DEEQU_COORD_MAPPING = { - "3.3": "com.amazon.deequ:deequ:2.0.4-spark-3.3", - "3.2": "com.amazon.deequ:deequ:2.0.1-spark-3.2", - "3.1": "com.amazon.deequ:deequ:2.0.0-spark-3.1", - "3.0": "com.amazon.deequ:deequ:1.2.2-spark-3.0", - "2.4": "com.amazon.deequ:deequ:1.1.0_spark-2.4-scala-2.11", + "3.3": "com.amazon.deequ:deequ:2.0.7-spark-3.3", + "3.2": "com.amazon.deequ:deequ:2.0.7-spark-3.2", + "3.1": "com.amazon.deequ:deequ:2.0.7-spark-3.1" }