From 1a00a33d1ce3f0de6a9d1e95920d23d9c8abffce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 29 Jul 2024 12:46:52 +0000 Subject: [PATCH] Test Spark 4.0 --- .github/workflows/data/core/matrix.yml | 17 +++++++--- .github/workflows/data/hdfs/matrix.yml | 23 +++++++------ .github/workflows/data/hive/matrix.yml | 17 +++++++--- .github/workflows/data/kafka/matrix.yml | 25 ++++++++------- .github/workflows/data/local-fs/matrix.yml | 32 +++++++++---------- .github/workflows/data/mongodb/matrix.yml | 20 ++++++++---- .github/workflows/data/mssql/matrix.yml | 18 ++++++++--- .github/workflows/data/mysql/matrix.yml | 18 ++++++++--- .github/workflows/data/oracle/matrix.yml | 20 +++++++++--- .github/workflows/data/postgres/matrix.yml | 18 ++++++++--- .github/workflows/data/s3/matrix.yml | 20 ++++++++---- .github/workflows/data/samba/matrix.yml | 2 +- .github/workflows/data/teradata/matrix.yml | 6 ++-- onetl/_util/scala.py | 6 ++-- .../db_connection/jdbc_mixin/connection.py | 31 ++++++++++++------ .../db_connection/kafka/connection.py | 2 +- .../file_df_connection/spark_s3/connection.py | 2 +- onetl/file/format/avro.py | 2 +- onetl/file/format/xml.py | 18 +++++++++-- requirements/tests/spark-4.0.0.txt | 5 +++ tests/fixtures/spark.py | 2 +- 21 files changed, 203 insertions(+), 101 deletions(-) create mode 100644 requirements/tests/spark-4.0.0.txt diff --git a/.github/workflows/data/core/matrix.yml b/.github/workflows/data/core/matrix.yml index d20f074ab..7bf0caa05 100644 --- a/.github/workflows/data/core/matrix.yml +++ b/.github/workflows/data/core/matrix.yml @@ -1,17 +1,24 @@ -min: &min +2x: &2x spark-version: 2.3.1 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +3x: &3x spark-version: 3.5.1 pydantic-version: 2 python-version: '3.12' java-version: 20 os: ubuntu-latest +4x: &4x + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest spark-version: latest pydantic-version: latest @@ -20,6 +27,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *max, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/hdfs/matrix.yml b/.github/workflows/data/hdfs/matrix.yml index 6d8156c50..85c713b21 100644 --- a/.github/workflows/data/hdfs/matrix.yml +++ b/.github/workflows/data/hdfs/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x hadoop-version: hadoop2-hdfs spark-version: 2.3.1 pydantic-version: 1 @@ -6,7 +6,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x hadoop-version: hadoop3-hdfs spark-version: 3.5.1 pydantic-version: 2 @@ -14,6 +14,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + hadoop-version: hadoop3-hdfs + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest hadoop-version: hadoop3-hdfs spark-version: latest @@ -23,11 +31,6 @@ latest: &latest os: ubuntu-latest matrix: - small: - - *max - full: - - *min - - *max - nightly: - - *min - - *latest + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/hive/matrix.yml b/.github/workflows/data/hive/matrix.yml index 6ce0d7a8e..7bf0caa05 100644 --- a/.github/workflows/data/hive/matrix.yml +++ b/.github/workflows/data/hive/matrix.yml @@ -1,17 +1,24 @@ -min: &min +2x: &2x spark-version: 2.3.1 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +3x: &3x spark-version: 3.5.1 pydantic-version: 2 python-version: '3.12' java-version: 20 os: ubuntu-latest +4x: &4x + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest spark-version: latest pydantic-version: latest @@ -20,6 +27,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/kafka/matrix.yml b/.github/workflows/data/kafka/matrix.yml index c5242cb5a..f39032bc9 100644 --- a/.github/workflows/data/kafka/matrix.yml +++ b/.github/workflows/data/kafka/matrix.yml @@ -1,5 +1,5 @@ -min: &min - # Headers are supported only since 2.x. +2x: &2x + # Headers are supported only since Kafka 2x. # Images before 3.2.3 are not creating kafka_jaas.conf properly, and failing to start # https://github.com/bitnami/containers/blob/9db9064668365cac89bff58259f63eb78bb97e79/bitnami/kafka/README.md?plain=1#L933 kafka-version: 3.2.3 @@ -9,7 +9,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x kafka-version: 3.7.1 pydantic-version: 2 spark-version: 3.5.1 @@ -17,6 +17,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + kafka-version: 3.7.1 + pydantic-version: 2 + spark-version: 4.0.0 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest kafka-version: latest pydantic-version: latest @@ -26,11 +34,6 @@ latest: &latest os: ubuntu-latest matrix: - small: - - *max - full: - - *min - - *max - nightly: - - *min - - *latest + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/local-fs/matrix.yml b/.github/workflows/data/local-fs/matrix.yml index d1337291e..596104bc7 100644 --- a/.github/workflows/data/local-fs/matrix.yml +++ b/.github/workflows/data/local-fs/matrix.yml @@ -1,25 +1,27 @@ -min: &min +23: &23 spark-version: 2.3.1 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -min_avro: &min_avro +24: &24 + # Avro supported only since Spark 2.4 spark-version: 2.4.8 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -min_excel: &min_excel +32: &32 + # Excel supported only since Spark 3.2 spark-version: 3.2.4 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +35: &35 # Excel package currently has no release for 3.5.1 spark-version: 3.5.0 pydantic-version: 2 @@ -27,6 +29,13 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest spark-version: latest pydantic-version: latest @@ -35,15 +44,6 @@ latest: &latest os: ubuntu-latest matrix: - small: - - <<: *max - full: - - <<: *min - - <<: *min_avro - - <<: *min_excel - - <<: *max - nightly: - - <<: *min - - <<: *min_avro - - <<: *min_excel - - <<: *latest + small: [*35] + full: [*23, *24, *32, *35, *4x] + nightly: [*23, *24, *32, *35, *latest] diff --git a/.github/workflows/data/mongodb/matrix.yml b/.github/workflows/data/mongodb/matrix.yml index 98e1fe971..f5ab88e09 100644 --- a/.github/workflows/data/mongodb/matrix.yml +++ b/.github/workflows/data/mongodb/matrix.yml @@ -1,13 +1,13 @@ -min: &min +32: &32 mongodb-version: 4.0.0 - # MongoDB connector does not support Spark 2.x + # MongoDB connector does not support Spark 2x spark-version: 3.2.4 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +35: &35 mongodb-version: 7.0.12 spark-version: 3.5.1 pydantic-version: 2 @@ -15,6 +15,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + mongodb-version: 7.0.12 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest mongodb-version: latest spark-version: latest @@ -24,6 +32,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*35] + full: [*32, *35, *4x] + nightly: [*32, *35, *latest] diff --git a/.github/workflows/data/mssql/matrix.yml b/.github/workflows/data/mssql/matrix.yml index fad2e738c..9fb35d398 100644 --- a/.github/workflows/data/mssql/matrix.yml +++ b/.github/workflows/data/mssql/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x mssql-version: 2017-GA-ubuntu spark-version: 2.3.1 pydantic-version: 1 @@ -6,7 +6,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x mssql-version: 2022-CU14-ubuntu-22.04 spark-version: 3.5.1 pydantic-version: 2 @@ -14,6 +14,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + mssql-version: 2022-CU14-ubuntu-22.04 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest mssql-version: latest spark-version: latest @@ -23,6 +31,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/mysql/matrix.yml b/.github/workflows/data/mysql/matrix.yml index d2e703143..a7b51640c 100644 --- a/.github/workflows/data/mysql/matrix.yml +++ b/.github/workflows/data/mysql/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x # Tags 5.7.6-5.6.12 cannot be downloaded since Docker v26: # "Docker Image Format v1 and Docker Image manifest version 2, schema 1 support is disabled by default" mysql-version: 5.7.13 @@ -8,7 +8,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x mysql-version: 9.0.1 spark-version: 3.5.1 pydantic-version: 2 @@ -16,6 +16,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + mysql-version: 9.0.1 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest mysql-version: latest spark-version: latest @@ -25,6 +33,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/oracle/matrix.yml b/.github/workflows/data/oracle/matrix.yml index 7a79c68a7..c8c379dab 100644 --- a/.github/workflows/data/oracle/matrix.yml +++ b/.github/workflows/data/oracle/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x oracle-image: gvenzl/oracle-xe oracle-version: 11.2.0.2-slim-faststart db-name: XE @@ -8,7 +8,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x oracle-image: gvenzl/oracle-free oracle-version: 23.4-slim-faststart db-name: FREEPDB1 @@ -18,6 +18,16 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + oracle-image: gvenzl/oracle-free + oracle-version: 23.4-slim-faststart + db-name: FREEPDB1 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest oracle-image: gvenzl/oracle-free oracle-version: slim-faststart @@ -29,6 +39,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/postgres/matrix.yml b/.github/workflows/data/postgres/matrix.yml index 4c5b5f4ef..3827ce844 100644 --- a/.github/workflows/data/postgres/matrix.yml +++ b/.github/workflows/data/postgres/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x # Min supported version by JDBC driver is 8.4, but it is too ancient to be used by anyone in real life postgres-version: 9.4.26-alpine spark-version: 2.3.1 @@ -7,7 +7,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x postgres-version: 16.3-alpine spark-version: 3.5.1 pydantic-version: 2 @@ -15,6 +15,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + postgres-version: 16.3-alpine + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest postgres-version: alpine spark-version: latest @@ -24,6 +32,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/s3/matrix.yml b/.github/workflows/data/s3/matrix.yml index 06d4f7489..974e50f31 100644 --- a/.github/workflows/data/s3/matrix.yml +++ b/.github/workflows/data/s3/matrix.yml @@ -1,14 +1,14 @@ -min: &min +32: &32 # prior image versions returns empty content of bucket root, some kind of bug minio-version: 2021.3.17 - # Minimal Spark version with Hadoop 3.x support + # Minimal Spark version with Hadoop 3x support spark-version: 3.2.4 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +35: &35 minio-version: 2024.7.26 spark-version: 3.5.1 pydantic-version: 2 @@ -16,6 +16,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + minio-version: 2024.7.26 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest minio-version: latest spark-version: latest @@ -25,6 +33,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*35] + full: [*32, *35, *4x] + nightly: [*32, *35, *latest] diff --git a/.github/workflows/data/samba/matrix.yml b/.github/workflows/data/samba/matrix.yml index 045a093ba..19254d231 100644 --- a/.github/workflows/data/samba/matrix.yml +++ b/.github/workflows/data/samba/matrix.yml @@ -1,5 +1,5 @@ min: &min - # elswork/samba image versions does not correlate with smbd version, it is always 4.x + # elswork/samba image versions does not correlate with smbd version, it is always 4x server-version: latest pydantic-version: 1 python-version: '3.7' diff --git a/.github/workflows/data/teradata/matrix.yml b/.github/workflows/data/teradata/matrix.yml index 6c2a55455..ec71ac865 100644 --- a/.github/workflows/data/teradata/matrix.yml +++ b/.github/workflows/data/teradata/matrix.yml @@ -1,4 +1,4 @@ -max: &max +3x: &3x spark-version: 3.5.1 pydantic-version: 2 python-version: '3.12' @@ -13,6 +13,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*max] + small: [*3x] + full: [*3x] nightly: [*latest] diff --git a/onetl/_util/scala.py b/onetl/_util/scala.py index 397a91576..c4069e5a3 100644 --- a/onetl/_util/scala.py +++ b/onetl/_util/scala.py @@ -9,6 +9,8 @@ def get_default_scala_version(spark_version: Version) -> Version: """ Get default Scala version for specific Spark version """ - if spark_version.major < 3: + if spark_version.major == 2: return Version("2.11") - return Version("2.12") + if spark_version.major == 3: + return Version("2.12") + return Version("2.13") diff --git a/onetl/connection/db_connection/jdbc_mixin/connection.py b/onetl/connection/db_connection/jdbc_mixin/connection.py index e8c19e38b..0ff6b5245 100644 --- a/onetl/connection/db_connection/jdbc_mixin/connection.py +++ b/onetl/connection/db_connection/jdbc_mixin/connection.py @@ -431,10 +431,11 @@ def _execute_on_driver( statement_args = self._get_statement_args() jdbc_statement = self._build_statement(statement, statement_type, jdbc_connection, statement_args) - return self._execute_statement(jdbc_statement, statement, options, callback, read_only) + return self._execute_statement(jdbc_connection, jdbc_statement, statement, options, callback, read_only) def _execute_statement( self, + jdbc_connection, jdbc_statement, statement: str, options: JDBCFetchOptions | JDBCExecuteOptions, @@ -472,7 +473,7 @@ def _execute_statement( else: jdbc_statement.executeUpdate(statement) - return callback(jdbc_statement) + return callback(jdbc_connection, jdbc_statement) @staticmethod def _build_statement( @@ -501,11 +502,11 @@ def _build_statement( return jdbc_connection.createStatement(*statement_args) - def _statement_to_dataframe(self, jdbc_statement) -> DataFrame: + def _statement_to_dataframe(self, jdbc_connection, jdbc_statement) -> DataFrame: result_set = jdbc_statement.getResultSet() - return self._resultset_to_dataframe(result_set) + return self._resultset_to_dataframe(jdbc_connection, result_set) - def _statement_to_optional_dataframe(self, jdbc_statement) -> DataFrame | None: + def _statement_to_optional_dataframe(self, jdbc_connection, jdbc_statement) -> DataFrame | None: """ Returns ``org.apache.spark.sql.DataFrame`` or ``None``, if ResultSet is does not contain any columns. @@ -522,9 +523,9 @@ def _statement_to_optional_dataframe(self, jdbc_statement) -> DataFrame | None: if not result_column_count: return None - return self._resultset_to_dataframe(result_set) + return self._resultset_to_dataframe(jdbc_connection, result_set) - def _resultset_to_dataframe(self, result_set) -> DataFrame: + def _resultset_to_dataframe(self, jdbc_connection, result_set) -> DataFrame: """ Converts ``java.sql.ResultSet`` to ``org.apache.spark.sql.DataFrame`` using Spark's internal methods. @@ -545,13 +546,25 @@ def _resultset_to_dataframe(self, result_set) -> DataFrame: java_converters = self.spark._jvm.scala.collection.JavaConverters # type: ignore - if get_spark_version(self.spark) >= Version("3.4"): + if get_spark_version(self.spark) >= Version("4.0"): + result_schema = jdbc_utils.getSchema( + jdbc_connection, + result_set, + jdbc_dialect, + False, # noqa: WPS425 + False, # noqa: WPS425 + ) + elif get_spark_version(self.spark) >= Version("3.4"): # https://github.com/apache/spark/commit/2349175e1b81b0a61e1ed90c2d051c01cf78de9b result_schema = jdbc_utils.getSchema(result_set, jdbc_dialect, False, False) # noqa: WPS425 else: result_schema = jdbc_utils.getSchema(result_set, jdbc_dialect, False) # noqa: WPS425 - result_iterator = jdbc_utils.resultSetToRows(result_set, result_schema) + if get_spark_version(self.spark) >= Version("4.0"): + result_iterator = jdbc_utils.resultSetToRows(result_set, result_schema, jdbc_dialect) + else: + result_iterator = jdbc_utils.resultSetToRows(result_set, result_schema) + result_list = java_converters.seqAsJavaListConverter(result_iterator.toSeq()).asJava() jdf = self.spark._jsparkSession.createDataFrame(result_list, result_schema) # type: ignore diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index ce3829e49..9bd3008e3 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -433,7 +433,7 @@ def get_packages( scala_ver = Version(scala_version).min_digits(2) if scala_version else get_default_scala_version(spark_ver) return [ - f"org.apache.spark:spark-sql-kafka-0-10_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}", + f"org.apache.spark:spark-sql-kafka-0-10_{scala_ver.format('{0}.{1}')}:{spark_ver}", ] def __enter__(self): diff --git a/onetl/connection/file_df_connection/spark_s3/connection.py b/onetl/connection/file_df_connection/spark_s3/connection.py index 1efe39d4e..f04ea69b4 100644 --- a/onetl/connection/file_df_connection/spark_s3/connection.py +++ b/onetl/connection/file_df_connection/spark_s3/connection.py @@ -248,7 +248,7 @@ def get_packages( scala_ver = Version(scala_version).min_digits(2) if scala_version else get_default_scala_version(spark_ver) # https://mvnrepository.com/artifact/org.apache.spark/spark-hadoop-cloud - return [f"org.apache.spark:spark-hadoop-cloud_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}"] + return [f"org.apache.spark:spark-hadoop-cloud_{scala_ver.format('{0}.{1}')}:{spark_ver}"] @slot def path_from_string(self, path: os.PathLike | str) -> RemotePath: diff --git a/onetl/file/format/avro.py b/onetl/file/format/avro.py index 3699620b0..4a7aafa04 100644 --- a/onetl/file/format/avro.py +++ b/onetl/file/format/avro.py @@ -163,7 +163,7 @@ def get_packages( if scala_ver < Version("2.11"): raise ValueError(f"Scala version should be at least 2.11, got {scala_ver.format('{0}.{1}')}") - return [f"org.apache.spark:spark-avro_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}"] + return [f"org.apache.spark:spark-avro_{scala_ver.format('{0}.{1}')}:{spark_ver}"] @slot def check_if_supported(self, spark: SparkSession) -> None: diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index cc7cd4777..0f6ad7f83 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -193,6 +193,9 @@ def get_packages( # noqa: WPS231 ) """ + spark_ver = Version(spark_version) + if spark_ver.major >= 4: + return [] if package_version: version = Version(package_version).min_digits(3) @@ -202,7 +205,6 @@ def get_packages( # noqa: WPS231 else: version = Version("0.18.0").min_digits(3) - spark_ver = Version(spark_version) scala_ver = Version(scala_version).min_digits(2) if scala_version else get_default_scala_version(spark_ver) # Ensure compatibility with Spark and Scala versions @@ -216,8 +218,11 @@ def get_packages( # noqa: WPS231 @slot def check_if_supported(self, spark: SparkSession) -> None: - java_class = "com.databricks.spark.xml.XmlReader" + version = get_spark_version(spark) + if version.major >= 4: + return + java_class = "com.databricks.spark.xml.XmlReader" try: try_import_java_class(spark, java_class) except Exception as e: @@ -332,12 +337,12 @@ def parse_column(self, column: str | Column, schema: StructType) -> Column: | |-- name: string (nullable = true) | |-- age: integer (nullable = true) """ + from pyspark import __version__ as spark_version from pyspark.sql import Column, SparkSession # noqa: WPS442 spark = SparkSession._instantiatedSession # noqa: WPS437 self.check_if_supported(spark) - from pyspark.sql.column import _to_java_column # noqa: WPS450 from pyspark.sql.functions import col if isinstance(column, Column): @@ -345,6 +350,13 @@ def parse_column(self, column: str | Column, schema: StructType) -> Column: else: column_name, column = column, col(column).cast("string") + if spark_version > "4": + from pyspark.sql.functions import from_xml # noqa: WPS450 + + return from_xml(column, schema, self.dict()).alias(column_name) + + from pyspark.sql.column import _to_java_column # noqa: WPS450 + java_column = _to_java_column(column) java_schema = spark._jsparkSession.parseDataType(schema.json()) # noqa: WPS437 scala_options = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap( # noqa: WPS219, WPS437 diff --git a/requirements/tests/spark-4.0.0.txt b/requirements/tests/spark-4.0.0.txt new file mode 100644 index 000000000..038d47de7 --- /dev/null +++ b/requirements/tests/spark-4.0.0.txt @@ -0,0 +1,5 @@ +numpy>=1.16 +pandas>=1.0 +pyarrow>=1.0 +pyspark==4.0.0.dev1 +sqlalchemy diff --git a/tests/fixtures/spark.py b/tests/fixtures/spark.py index e7248e84f..e91eccf21 100644 --- a/tests/fixtures/spark.py +++ b/tests/fixtures/spark.py @@ -103,7 +103,7 @@ def maven_packages(request): # There is no MongoDB connector for Spark less than 3.2 packages.extend(MongoDB.get_packages(spark_version=str(pyspark_version))) - if "excel" in markers: + if "excel" in markers and pyspark_version < Version("4.0"): # There is no Excel files support for Spark less than 3.2 packages.extend(Excel.get_packages(spark_version=str(pyspark_version)))