From 029039429b42bc4430169588366ddf2189a8a670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 29 Jul 2024 12:46:52 +0000 Subject: [PATCH] Add Spark 4.0 support --- .github/workflows/data/clickhouse/matrix.yml | 19 ++-- .github/workflows/data/core/matrix.yml | 17 ++-- .github/workflows/data/greenplum/matrix.yml | 10 +-- .github/workflows/data/hdfs/matrix.yml | 18 ++-- .github/workflows/data/hive/matrix.yml | 17 ++-- .github/workflows/data/kafka/matrix.yml | 20 +++-- .github/workflows/data/local-fs/matrix.yml | 32 +++---- .github/workflows/data/mongodb/matrix.yml | 20 +++-- .github/workflows/data/mssql/matrix.yml | 18 ++-- .github/workflows/data/mysql/matrix.yml | 18 ++-- .github/workflows/data/oracle/matrix.yml | 20 +++-- .github/workflows/data/postgres/matrix.yml | 18 ++-- .github/workflows/data/s3/matrix.yml | 20 +++-- .github/workflows/data/samba/matrix.yml | 2 +- .github/workflows/data/teradata/matrix.yml | 6 +- README.rst | 34 +++---- docs/changelog/next_release/297.feature.rst | 1 + .../clickhouse/prerequisites.rst | 4 +- .../db_connection/hive/prerequisites.rst | 4 +- .../db_connection/kafka/prerequisites.rst | 4 +- .../db_connection/mongodb/prerequisites.rst | 4 +- .../db_connection/mssql/prerequisites.rst | 4 +- .../db_connection/mysql/prerequisites.rst | 4 +- .../db_connection/oracle/prerequisites.rst | 4 +- .../db_connection/postgres/prerequisites.rst | 4 +- onetl/_util/scala.py | 6 +- .../db_connection/db_connection/dialect.py | 2 +- .../db_connection/greenplum/connection.py | 2 +- .../jdbc_connection/connection.py | 8 +- .../db_connection/jdbc_mixin/connection.py | 47 +++++++--- .../db_connection/kafka/connection.py | 7 +- .../file_df_connection/spark_s3/connection.py | 7 +- onetl/file/format/avro.py | 7 +- onetl/file/format/xml.py | 20 ++++- onetl/hwm/store/hwm_class_registry.py | 90 ++++++++++++------- requirements/tests/spark-4.0.0.txt | 5 ++ tests/fixtures/processing/hive.py | 1 + tests/fixtures/processing/mysql.py | 14 +++ tests/fixtures/spark.py | 2 +- .../test_strategy_increment_hive.py | 2 +- tests/util/assert_df.py | 11 ++- tests/util/to_pandas.py | 21 +++-- 42 files changed, 392 insertions(+), 182 deletions(-) create mode 100644 docs/changelog/next_release/297.feature.rst create mode 100644 requirements/tests/spark-4.0.0.txt diff --git a/.github/workflows/data/clickhouse/matrix.yml b/.github/workflows/data/clickhouse/matrix.yml index 075d98a01..259997450 100644 --- a/.github/workflows/data/clickhouse/matrix.yml +++ b/.github/workflows/data/clickhouse/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x # Clickhouse version with proper DateTime > DateTime64 comparison clickhouse-image: yandex/clickhouse-server clickhouse-version: '21.1' @@ -8,7 +8,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x clickhouse-image: clickhouse/clickhouse-server clickhouse-version: 24.8.2.3-alpine spark-version: 3.5.3 @@ -17,6 +17,15 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + clickhouse-image: clickhouse/clickhouse-server + clickhouse-version: 24.6.3.70-alpine + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest clickhouse-image: clickhouse/clickhouse-server clickhouse-version: latest-alpine @@ -27,6 +36,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *max, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/core/matrix.yml b/.github/workflows/data/core/matrix.yml index 67d32ce3f..a4d8f2404 100644 --- a/.github/workflows/data/core/matrix.yml +++ b/.github/workflows/data/core/matrix.yml @@ -1,17 +1,24 @@ -min: &min +2x: &2x spark-version: 2.3.1 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +3x: &3x spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 os: ubuntu-latest +4x: &4x + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest spark-version: latest pydantic-version: latest @@ -20,6 +27,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *max, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/greenplum/matrix.yml b/.github/workflows/data/greenplum/matrix.yml index 0935a821e..63c0aaa80 100644 --- a/.github/workflows/data/greenplum/matrix.yml +++ b/.github/workflows/data/greenplum/matrix.yml @@ -1,4 +1,4 @@ -min: &min +23: &23 greenplum-version: 6.23.1 package-version: 2.2.0 # Spark 2.3.0 does not support passing ivysettings.xml @@ -8,7 +8,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +32: &32 greenplum-version: 7.0.0 package-version: 2.3.1 # Greenplum connector does not support Spark 3.3+ @@ -29,6 +29,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *max, *latest] + small: [*32] + full: [*23, *32] + nightly: [*23, *latest] diff --git a/.github/workflows/data/hdfs/matrix.yml b/.github/workflows/data/hdfs/matrix.yml index bb913214d..5a491c7f3 100644 --- a/.github/workflows/data/hdfs/matrix.yml +++ b/.github/workflows/data/hdfs/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x hadoop-version: hadoop2-hdfs spark-version: 2.3.1 pydantic-version: 1 @@ -6,7 +6,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x hadoop-version: hadoop3-hdfs spark-version: 3.5.3 pydantic-version: 2 @@ -14,6 +14,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + hadoop-version: hadoop3-hdfs + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest hadoop-version: hadoop3-hdfs spark-version: latest @@ -23,6 +31,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *max, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/hive/matrix.yml b/.github/workflows/data/hive/matrix.yml index 6bb53edf3..a4d8f2404 100644 --- a/.github/workflows/data/hive/matrix.yml +++ b/.github/workflows/data/hive/matrix.yml @@ -1,17 +1,24 @@ -min: &min +2x: &2x spark-version: 2.3.1 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +3x: &3x spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 os: ubuntu-latest +4x: &4x + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest spark-version: latest pydantic-version: latest @@ -20,6 +27,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/kafka/matrix.yml b/.github/workflows/data/kafka/matrix.yml index 309c0caef..c22980109 100644 --- a/.github/workflows/data/kafka/matrix.yml +++ b/.github/workflows/data/kafka/matrix.yml @@ -1,5 +1,5 @@ -min: &min - # Headers are supported only since 2.x. +2x: &2x + # Headers are supported only since Kafka 2x. # Images before 3.2.3 are not creating kafka_jaas.conf properly, and failing to start # https://github.com/bitnami/containers/blob/9db9064668365cac89bff58259f63eb78bb97e79/bitnami/kafka/README.md?plain=1#L933 kafka-version: 3.2.3 @@ -9,7 +9,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x kafka-version: 3.7.1 pydantic-version: 2 spark-version: 3.5.3 @@ -17,6 +17,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + kafka-version: 3.7.1 + pydantic-version: 2 + spark-version: 4.0.0 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest kafka-version: latest pydantic-version: latest @@ -26,6 +34,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *max, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/local-fs/matrix.yml b/.github/workflows/data/local-fs/matrix.yml index 365d1d3c9..aaa42bcc2 100644 --- a/.github/workflows/data/local-fs/matrix.yml +++ b/.github/workflows/data/local-fs/matrix.yml @@ -1,25 +1,27 @@ -min: &min +23: &23 spark-version: 2.3.1 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -min_avro: &min_avro +24: &24 + # Avro supported only since Spark 2.4 spark-version: 2.4.8 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -min_excel: &min_excel +32: &32 + # Excel supported only since Spark 3.2 spark-version: 3.2.4 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +35: &35 # Excel pagkage currently supports Spark 3.5.1 max spark-version: 3.5.1 pydantic-version: 2 @@ -27,6 +29,13 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest spark-version: latest pydantic-version: latest @@ -35,15 +44,6 @@ latest: &latest os: ubuntu-latest matrix: - small: - - <<: *max - full: - - <<: *min - - <<: *min_avro - - <<: *min_excel - - <<: *max - nightly: - - <<: *min - - <<: *min_avro - - <<: *min_excel - - <<: *latest + small: [*35] + full: [*23, *24, *32, *35, *4x] + nightly: [*23, *24, *32, *35, *latest] diff --git a/.github/workflows/data/mongodb/matrix.yml b/.github/workflows/data/mongodb/matrix.yml index 6df9c853e..ccce1e527 100644 --- a/.github/workflows/data/mongodb/matrix.yml +++ b/.github/workflows/data/mongodb/matrix.yml @@ -1,13 +1,13 @@ -min: &min +32: &32 mongodb-version: 4.0.0 - # MongoDB connector does not support Spark 2.x + # MongoDB connector does not support Spark 2x spark-version: 3.2.4 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +35: &35 mongodb-version: 7.0.14 spark-version: 3.5.3 pydantic-version: 2 @@ -15,6 +15,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + mongodb-version: 7.0.12 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 20 + os: ubuntu-latest + latest: &latest mongodb-version: latest spark-version: latest @@ -24,6 +32,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*35] + full: [*32, *35, *4x] + nightly: [*32, *35, *latest] diff --git a/.github/workflows/data/mssql/matrix.yml b/.github/workflows/data/mssql/matrix.yml index 978faa17b..4638c901d 100644 --- a/.github/workflows/data/mssql/matrix.yml +++ b/.github/workflows/data/mssql/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x mssql-version: 2017-latest spark-version: 2.3.1 pydantic-version: 1 @@ -6,7 +6,7 @@ min: &min java-version: 8 os: ubuntu-20.04 -max: &max +3x: &3x mssql-version: 2022-latest spark-version: 3.5.3 pydantic-version: 2 @@ -14,6 +14,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + mssql-version: 2022-CU14-ubuntu-22.04 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest mssql-version: latest spark-version: latest @@ -23,6 +31,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/mysql/matrix.yml b/.github/workflows/data/mysql/matrix.yml index b98c985cc..0cf59553b 100644 --- a/.github/workflows/data/mysql/matrix.yml +++ b/.github/workflows/data/mysql/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x # Tags 5.7.6-5.6.12 cannot be downloaded since Docker v26: # "Docker Image Format v1 and Docker Image manifest version 2, schema 1 support is disabled by default" mysql-version: 5.7.13 @@ -8,7 +8,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x mysql-version: 9.0.1 spark-version: 3.5.3 pydantic-version: 2 @@ -16,6 +16,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + mysql-version: 9.0.1 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest mysql-version: latest spark-version: latest @@ -25,6 +33,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/oracle/matrix.yml b/.github/workflows/data/oracle/matrix.yml index 051f0df9c..1a6753494 100644 --- a/.github/workflows/data/oracle/matrix.yml +++ b/.github/workflows/data/oracle/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x oracle-image: gvenzl/oracle-xe oracle-version: 11.2.0.2-slim-faststart db-name: XE @@ -8,7 +8,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x oracle-image: gvenzl/oracle-free oracle-version: 23.4-slim-faststart db-name: FREEPDB1 @@ -18,6 +18,16 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + oracle-image: gvenzl/oracle-free + oracle-version: 23.4-slim-faststart + db-name: FREEPDB1 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest oracle-image: gvenzl/oracle-free oracle-version: slim-faststart @@ -29,6 +39,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/postgres/matrix.yml b/.github/workflows/data/postgres/matrix.yml index 43b914e99..f9c1c6d5c 100644 --- a/.github/workflows/data/postgres/matrix.yml +++ b/.github/workflows/data/postgres/matrix.yml @@ -1,4 +1,4 @@ -min: &min +2x: &2x # Min supported version by JDBC driver is 8.2, but it is too ancient to be used by anyone in real life postgres-version: 9.4.26-alpine spark-version: 2.3.1 @@ -7,7 +7,7 @@ min: &min java-version: 8 os: ubuntu-latest -max: &max +3x: &3x postgres-version: 16.4-alpine spark-version: 3.5.3 pydantic-version: 2 @@ -15,6 +15,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + postgres-version: 16.3-alpine + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest postgres-version: alpine spark-version: latest @@ -24,6 +32,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*3x] + full: [*2x, *3x, *4x] + nightly: [*2x, *3x, *latest] diff --git a/.github/workflows/data/s3/matrix.yml b/.github/workflows/data/s3/matrix.yml index 5a4083733..f9e47efe6 100644 --- a/.github/workflows/data/s3/matrix.yml +++ b/.github/workflows/data/s3/matrix.yml @@ -1,14 +1,14 @@ -min: &min +32: &32 # prior image versions returns empty content of bucket root, some kind of bug minio-version: 2021.3.17 - # Minimal Spark version with Hadoop 3.x support + # Minimal Spark version with Hadoop 3x support spark-version: 3.2.4 pydantic-version: 1 python-version: '3.7' java-version: 8 os: ubuntu-latest -max: &max +35: &35 minio-version: 2024.8.29 spark-version: 3.5.3 pydantic-version: 2 @@ -16,6 +16,14 @@ max: &max java-version: 20 os: ubuntu-latest +4x: &4x + minio-version: 2024.7.26 + spark-version: 4.0.0 + pydantic-version: 2 + python-version: '3.12' + java-version: 22 + os: ubuntu-latest + latest: &latest minio-version: latest spark-version: latest @@ -25,6 +33,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*min, *max] - nightly: [*min, *latest] + small: [*35] + full: [*32, *35, *4x] + nightly: [*32, *35, *latest] diff --git a/.github/workflows/data/samba/matrix.yml b/.github/workflows/data/samba/matrix.yml index 045a093ba..19254d231 100644 --- a/.github/workflows/data/samba/matrix.yml +++ b/.github/workflows/data/samba/matrix.yml @@ -1,5 +1,5 @@ min: &min - # elswork/samba image versions does not correlate with smbd version, it is always 4.x + # elswork/samba image versions does not correlate with smbd version, it is always 4x server-version: latest pydantic-version: 1 python-version: '3.7' diff --git a/.github/workflows/data/teradata/matrix.yml b/.github/workflows/data/teradata/matrix.yml index e9e4b5fa1..08ab2b0df 100644 --- a/.github/workflows/data/teradata/matrix.yml +++ b/.github/workflows/data/teradata/matrix.yml @@ -1,4 +1,4 @@ -max: &max +3x: &3x spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' @@ -13,6 +13,6 @@ latest: &latest os: ubuntu-latest matrix: - small: [*max] - full: [*max] + small: [*3x] + full: [*3x] nightly: [*latest] diff --git a/README.rst b/README.rst index ac5f550a7..0b870171c 100644 --- a/README.rst +++ b/README.rst @@ -66,7 +66,7 @@ Requirements ------------ * **Python 3.7 - 3.12** -* PySpark 2.3.x - 3.5.x (depends on used connector) +* PySpark 2.3.x - 4.0.x (depends on used connector) * Java 8+ (required by Spark, see below) * Kerberos libs & GCC (required by ``Hive``, ``HDFS`` and ``SparkHDFS`` connectors) @@ -182,21 +182,23 @@ Firstly, you should install JDK. The exact installation instruction depends on y Compatibility matrix ^^^^^^^^^^^^^^^^^^^^ -+--------------------------------------------------------------+-------------+-------------+-------+ -| Spark | Python | Java | Scala | -+==============================================================+=============+=============+=======+ -| `2.3.x `_ | 3.7 only | 8 only | 2.11 | -+--------------------------------------------------------------+-------------+-------------+-------+ -| `2.4.x `_ | 3.7 only | 8 only | 2.11 | -+--------------------------------------------------------------+-------------+-------------+-------+ -| `3.2.x `_ | 3.7 - 3.10 | 8u201 - 11 | 2.12 | -+--------------------------------------------------------------+-------------+-------------+-------+ -| `3.3.x `_ | 3.7 - 3.10 | 8u201 - 17 | 2.12 | -+--------------------------------------------------------------+-------------+-------------+-------+ -| `3.4.x `_ | 3.7 - 3.12 | 8u362 - 20 | 2.12 | -+--------------------------------------------------------------+-------------+-------------+-------+ -| `3.5.x `_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 | -+--------------------------------------------------------------+-------------+-------------+-------+ ++-----------------------------------------------------------------------+-------------+-------------+-------+ +| Spark | Python | Java | Scala | ++=======================================================================+=============+=============+=======+ +| `2.3.x `_ | 3.7 only | 8 only | 2.11 | ++-----------------------------------------------------------------------+-------------+-------------+-------+ +| `2.4.x `_ | 3.7 only | 8 only | 2.11 | ++-----------------------------------------------------------------------+-------------+-------------+-------+ +| `3.2.x `_ | 3.7 - 3.10 | 8u201 - 11 | 2.12 | ++-----------------------------------------------------------------------+-------------+-------------+-------+ +| `3.3.x `_ | 3.7 - 3.10 | 8u201 - 17 | 2.12 | ++-----------------------------------------------------------------------+-------------+-------------+-------+ +| `3.4.x `_ | 3.7 - 3.12 | 8u362 - 20 | 2.12 | ++-----------------------------------------------------------------------+-------------+-------------+-------+ +| `3.5.x `_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 | ++-----------------------------------------------------------------------+-------------+-------------+-------+ +| `4.0.x `_ | 3.8 - 3.12 | 17 - 22 | 2.13 | ++-----------------------------------------------------------------------+-------------+-------------+-------+ .. _pyspark-install: diff --git a/docs/changelog/next_release/297.feature.rst b/docs/changelog/next_release/297.feature.rst new file mode 100644 index 000000000..fe398226a --- /dev/null +++ b/docs/changelog/next_release/297.feature.rst @@ -0,0 +1 @@ +Add Spark 4.0 support. diff --git a/docs/connection/db_connection/clickhouse/prerequisites.rst b/docs/connection/db_connection/clickhouse/prerequisites.rst index c66be6357..fe97946e5 100644 --- a/docs/connection/db_connection/clickhouse/prerequisites.rst +++ b/docs/connection/db_connection/clickhouse/prerequisites.rst @@ -9,8 +9,8 @@ Version Compatibility * Clickhouse server versions: * Officially declared: 22.8 or higher * Actually tested: 21.1, 24.8 -* Spark versions: 2.3.x - 3.5.x -* Java versions: 8 - 20 +* Spark versions: 2.3.x - 4.0.x +* Java versions: 8 - 22 See `official documentation `_. diff --git a/docs/connection/db_connection/hive/prerequisites.rst b/docs/connection/db_connection/hive/prerequisites.rst index 0f56e7ba7..1e62f480f 100644 --- a/docs/connection/db_connection/hive/prerequisites.rst +++ b/docs/connection/db_connection/hive/prerequisites.rst @@ -17,8 +17,8 @@ Version Compatibility * Hive Metastore version: * Officially declared: 0.12 - 3.1.3 (may require to add proper .jar file explicitly) * Actually tested: 1.2.100, 2.3.10, 3.1.3 -* Spark versions: 2.3.x - 3.5.x -* Java versions: 8 - 20 +* Spark versions: 2.3.x - 4.0.x +* Java versions: 8 - 22 See `official documentation `_. diff --git a/docs/connection/db_connection/kafka/prerequisites.rst b/docs/connection/db_connection/kafka/prerequisites.rst index db2598d15..572c2b9cf 100644 --- a/docs/connection/db_connection/kafka/prerequisites.rst +++ b/docs/connection/db_connection/kafka/prerequisites.rst @@ -9,8 +9,8 @@ Version Compatibility * Kafka server versions: * Officially declared: 0.10 or higher * Actually tested: 3.2.3, 3.7.1 (only 3.x supports message headers) -* Spark versions: 2.4.x - 3.5.x -* Java versions: 8 - 17 +* Spark versions: 2.4.x - 4.0.x +* Java versions: 8 - 22 See `official documentation `_. diff --git a/docs/connection/db_connection/mongodb/prerequisites.rst b/docs/connection/db_connection/mongodb/prerequisites.rst index 8a01d675a..3419cecb9 100644 --- a/docs/connection/db_connection/mongodb/prerequisites.rst +++ b/docs/connection/db_connection/mongodb/prerequisites.rst @@ -9,8 +9,8 @@ Version Compatibility * MongoDB server versions: * Officially declared: 4.0 or higher * Actually tested: 4.0, 7.0 -* Spark versions: 3.2.x - 3.5.x -* Java versions: 8 - 20 +* Spark versions: 3.2.x - 4.0.x +* Java versions: 8 - 22 See `official documentation `_. diff --git a/docs/connection/db_connection/mssql/prerequisites.rst b/docs/connection/db_connection/mssql/prerequisites.rst index 4e9fd263b..c29909a85 100644 --- a/docs/connection/db_connection/mssql/prerequisites.rst +++ b/docs/connection/db_connection/mssql/prerequisites.rst @@ -9,8 +9,8 @@ Version Compatibility * SQL Server versions: * Officially declared: 2016 - 2022 * Actually tested: 2014, 2022 -* Spark versions: 2.3.x - 3.5.x -* Java versions: 8 - 20 +* Spark versions: 2.3.x - 4.0.x +* Java versions: 8 - 22 See `official documentation `_ and `official compatibility matrix `_. diff --git a/docs/connection/db_connection/mysql/prerequisites.rst b/docs/connection/db_connection/mysql/prerequisites.rst index 15b7c574c..5b73e84c7 100644 --- a/docs/connection/db_connection/mysql/prerequisites.rst +++ b/docs/connection/db_connection/mysql/prerequisites.rst @@ -9,8 +9,8 @@ Version Compatibility * MySQL server versions: * Officially declared: 8.0 - 9.0 * Actually tested: 5.7, 9.0 -* Spark versions: 2.3.x - 3.5.x -* Java versions: 8 - 20 +* Spark versions: 2.3.x - 4.0.x +* Java versions: 8 - 22 See `official documentation `_. diff --git a/docs/connection/db_connection/oracle/prerequisites.rst b/docs/connection/db_connection/oracle/prerequisites.rst index 35dd75690..ec2e0a4be 100644 --- a/docs/connection/db_connection/oracle/prerequisites.rst +++ b/docs/connection/db_connection/oracle/prerequisites.rst @@ -9,8 +9,8 @@ Version Compatibility * Oracle Server versions: * Officially declared: 19 - 23 * Actually tested: 11.2, 23 -* Spark versions: 2.3.x - 3.5.x -* Java versions: 8 - 20 +* Spark versions: 2.3.x - 4.0.x +* Java versions: 8 - 22 See `official documentation `_. diff --git a/docs/connection/db_connection/postgres/prerequisites.rst b/docs/connection/db_connection/postgres/prerequisites.rst index b1961b0dc..3fc6ed280 100644 --- a/docs/connection/db_connection/postgres/prerequisites.rst +++ b/docs/connection/db_connection/postgres/prerequisites.rst @@ -9,8 +9,8 @@ Version Compatibility * PostgreSQL server versions: * Officially declared: 8.2 - 16 * Actually tested: 9.4, 16 -* Spark versions: 2.3.x - 3.5.x -* Java versions: 8 - 20 +* Spark versions: 2.3.x - 4.0.x +* Java versions: 8 - 22 See `official documentation `_. diff --git a/onetl/_util/scala.py b/onetl/_util/scala.py index 5d472f2f7..a0d7f17a6 100644 --- a/onetl/_util/scala.py +++ b/onetl/_util/scala.py @@ -9,9 +9,11 @@ def get_default_scala_version(spark_version: Version) -> Version: """ Get default Scala version for specific Spark version """ - if spark_version.major < 3: + if spark_version.major == 2: return Version("2.11") - return Version("2.12") + if spark_version.major == 3: + return Version("2.12") + return Version("2.13") def scala_seq_to_python_list(seq) -> list: diff --git a/onetl/connection/db_connection/db_connection/dialect.py b/onetl/connection/db_connection/db_connection/dialect.py index 7080e3244..e722c4241 100644 --- a/onetl/connection/db_connection/db_connection/dialect.py +++ b/onetl/connection/db_connection/db_connection/dialect.py @@ -17,7 +17,7 @@ class DBDialect(BaseDBDialect): def detect_hwm_class(self, field: StructField) -> type[HWM] | None: - return SparkTypeToHWM.get(field.dataType.typeName()) # type: ignore + return SparkTypeToHWM.get(field.dataType) # type: ignore def get_sql_query( self, diff --git a/onetl/connection/db_connection/greenplum/connection.py b/onetl/connection/db_connection/greenplum/connection.py index 7f7e89611..95d2c5b7b 100644 --- a/onetl/connection/db_connection/greenplum/connection.py +++ b/onetl/connection/db_connection/greenplum/connection.py @@ -357,7 +357,7 @@ def get_df_schema( columns: list[str] | None = None, options: JDBCReadOptions | None = None, ) -> StructType: - log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_class_name()) log.info("|%s| Fetching schema of table %r ...", self.__class__.__name__, source) query = self.dialect.get_sql_query(source, columns=columns, limit=0, compact=True) diff --git a/onetl/connection/db_connection/jdbc_connection/connection.py b/onetl/connection/db_connection/jdbc_connection/connection.py index 5bb5811ca..e4df3888c 100644 --- a/onetl/connection/db_connection/jdbc_connection/connection.py +++ b/onetl/connection/db_connection/jdbc_connection/connection.py @@ -90,7 +90,7 @@ def sql( query = clear_statement(query) - log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_class_name()) log.info("|%s| Executing SQL query (on executor):", self.__class__.__name__) log_lines(log, query) @@ -159,7 +159,7 @@ def read_source_as_df( limit=limit, ) - log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_class_name()) log.info("|%s| Executing SQL query (on executor):", self.__class__.__name__) log_lines(log, query) @@ -187,7 +187,7 @@ def write_df_to_target( else write_options.if_exists.value ) log.info("|%s| Saving data to a table %r", self.__class__.__name__, target) - log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_class_name()) df.write.format("jdbc").mode(mode).options(dbtable=target, **jdbc_properties).save() log.info("|%s| Table %r successfully written", self.__class__.__name__, target) @@ -198,7 +198,7 @@ def get_df_schema( columns: list[str] | None = None, options: JDBCReadOptions | None = None, ) -> StructType: - log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_class_name()) log.info("|%s| Fetching schema of table %r ...", self.__class__.__name__, source) query = self.dialect.get_sql_query(source, columns=columns, limit=0, compact=True) diff --git a/onetl/connection/db_connection/jdbc_mixin/connection.py b/onetl/connection/db_connection/jdbc_mixin/connection.py index a2f1d192d..ec4f4b788 100644 --- a/onetl/connection/db_connection/jdbc_mixin/connection.py +++ b/onetl/connection/db_connection/jdbc_mixin/connection.py @@ -205,7 +205,7 @@ def fetch( query = clear_statement(query) - log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_class_name()) log.info("|%s| Executing SQL query (on driver):", self.__class__.__name__) log_lines(log, query) @@ -278,7 +278,7 @@ def execute( statement = clear_statement(statement) - log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_class_name()) log.info("|%s| Executing statement (on driver):", self.__class__.__name__) log_lines(log, statement) @@ -419,12 +419,16 @@ def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions): self._last_connection_and_options.data = (new_connection, options) return new_connection - def _get_spark_dialect_name(self) -> str: + def _get_spark_dialect_class_name(self) -> str: """ Returns the name of the JDBC dialect associated with the connection URL. """ - dialect = self._get_spark_dialect().toString() - return dialect.split("$")[0] if "$" in dialect else dialect + from py4j.java_gateway import JavaObject + + dialect = self._get_spark_dialect() + if isinstance(dialect, JavaObject): + dialect = dialect.getClass() + return dialect.getCanonicalName().split("$")[0] def _get_spark_dialect(self): jdbc_dialects_package = self.spark._jvm.org.apache.spark.sql.jdbc @@ -466,10 +470,11 @@ def _execute_on_driver( statement_args = self._get_statement_args() jdbc_statement = self._build_statement(statement, statement_type, jdbc_connection, statement_args) - return self._execute_statement(jdbc_statement, statement, options, callback, read_only) + return self._execute_statement(jdbc_connection, jdbc_statement, statement, options, callback, read_only) def _execute_statement( self, + jdbc_connection, jdbc_statement, statement: str, options: JDBCFetchOptions | JDBCExecuteOptions, @@ -507,7 +512,7 @@ def _execute_statement( else: jdbc_statement.executeUpdate(statement) - return callback(jdbc_statement) + return callback(jdbc_connection, jdbc_statement) @staticmethod def _build_statement( @@ -536,11 +541,11 @@ def _build_statement( return jdbc_connection.createStatement(*statement_args) - def _statement_to_dataframe(self, jdbc_statement) -> DataFrame: + def _statement_to_dataframe(self, jdbc_connection, jdbc_statement) -> DataFrame: result_set = jdbc_statement.getResultSet() - return self._resultset_to_dataframe(result_set) + return self._resultset_to_dataframe(jdbc_connection, result_set) - def _statement_to_optional_dataframe(self, jdbc_statement) -> DataFrame | None: + def _statement_to_optional_dataframe(self, jdbc_connection, jdbc_statement) -> DataFrame | None: """ Returns ``org.apache.spark.sql.DataFrame`` or ``None``, if ResultSet is does not contain any columns. @@ -557,9 +562,9 @@ def _statement_to_optional_dataframe(self, jdbc_statement) -> DataFrame | None: if not result_column_count: return None - return self._resultset_to_dataframe(result_set) + return self._resultset_to_dataframe(jdbc_connection, result_set) - def _resultset_to_dataframe(self, result_set) -> DataFrame: + def _resultset_to_dataframe(self, jdbc_connection, result_set) -> DataFrame: """ Converts ``java.sql.ResultSet`` to ``org.apache.spark.sql.DataFrame`` using Spark's internal methods. @@ -578,13 +583,27 @@ def _resultset_to_dataframe(self, result_set) -> DataFrame: java_converters = self.spark._jvm.scala.collection.JavaConverters # type: ignore - if get_spark_version(self.spark) >= Version("3.4"): + spark_version = get_spark_version(self.spark) + + if spark_version >= Version("4.0"): + result_schema = jdbc_utils.getSchema( + jdbc_connection, + result_set, + jdbc_dialect, + False, # noqa: WPS425 + False, # noqa: WPS425 + ) + elif spark_version >= Version("3.4"): # https://github.com/apache/spark/commit/2349175e1b81b0a61e1ed90c2d051c01cf78de9b result_schema = jdbc_utils.getSchema(result_set, jdbc_dialect, False, False) # noqa: WPS425 else: result_schema = jdbc_utils.getSchema(result_set, jdbc_dialect, False) # noqa: WPS425 - result_iterator = jdbc_utils.resultSetToRows(result_set, result_schema) + if spark_version.major >= 4: + result_iterator = jdbc_utils.resultSetToRows(result_set, result_schema, jdbc_dialect) + else: + result_iterator = jdbc_utils.resultSetToRows(result_set, result_schema) + result_list = java_converters.seqAsJavaListConverter(result_iterator.toSeq()).asJava() jdf = self.spark._jsparkSession.createDataFrame(result_list, result_schema) # type: ignore diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 7765936c5..a4ed2e7a9 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -432,8 +432,13 @@ def get_packages( raise ValueError(f"Spark version must be at least 2.4, got {spark_ver}") scala_ver = Version(scala_version).min_digits(2) if scala_version else get_default_scala_version(spark_ver) + + if spark_ver.major < 4: + version = spark_ver.format("{0}.{1}.{2}") + else: + version = "4.0.0-preview1" return [ - f"org.apache.spark:spark-sql-kafka-0-10_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}", + f"org.apache.spark:spark-sql-kafka-0-10_{scala_ver.format('{0}.{1}')}:{version}", ] def __enter__(self): diff --git a/onetl/connection/file_df_connection/spark_s3/connection.py b/onetl/connection/file_df_connection/spark_s3/connection.py index 2044ebb60..27598929c 100644 --- a/onetl/connection/file_df_connection/spark_s3/connection.py +++ b/onetl/connection/file_df_connection/spark_s3/connection.py @@ -246,9 +246,14 @@ def get_packages( # https://issues.apache.org/jira/browse/SPARK-23977 raise ValueError(f"Spark version must be at least 3.x, got {spark_ver}") + if spark_ver.major < 4: + version = spark_ver.format("{0}.{1}.{2}") + else: + version = "4.0.0-preview1" + scala_ver = Version(scala_version).min_digits(2) if scala_version else get_default_scala_version(spark_ver) # https://mvnrepository.com/artifact/org.apache.spark/spark-hadoop-cloud - return [f"org.apache.spark:spark-hadoop-cloud_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}"] + return [f"org.apache.spark:spark-hadoop-cloud_{scala_ver.format('{0}.{1}')}:{version}"] @slot def path_from_string(self, path: os.PathLike | str) -> RemotePath: diff --git a/onetl/file/format/avro.py b/onetl/file/format/avro.py index 426eb30f8..93154e72c 100644 --- a/onetl/file/format/avro.py +++ b/onetl/file/format/avro.py @@ -163,7 +163,12 @@ def get_packages( if scala_ver < Version("2.11"): raise ValueError(f"Scala version should be at least 2.11, got {scala_ver.format('{0}.{1}')}") - return [f"org.apache.spark:spark-avro_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}"] + if spark_ver.major < 4: + version = spark_ver.format("{0}.{1}.{2}") + else: + version = "4.0.0-preview1" + + return [f"org.apache.spark:spark-avro_{scala_ver.format('{0}.{1}')}:{version}"] @slot def check_if_supported(self, spark: SparkSession) -> None: diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index 508ec829c..d2066ccbe 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -193,6 +193,10 @@ def get_packages( # noqa: WPS231 ) """ + spark_ver = Version(spark_version) + if spark_ver.major >= 4: + # since Spark 4.0, XML is bundled with Spark + return [] if package_version: version = Version(package_version).min_digits(3) @@ -202,7 +206,6 @@ def get_packages( # noqa: WPS231 else: version = Version("0.18.0") - spark_ver = Version(spark_version) scala_ver = Version(scala_version).min_digits(2) if scala_version else get_default_scala_version(spark_ver) # Ensure compatibility with Spark and Scala versions @@ -216,8 +219,12 @@ def get_packages( # noqa: WPS231 @slot def check_if_supported(self, spark: SparkSession) -> None: - java_class = "com.databricks.spark.xml.XmlReader" + version = get_spark_version(spark) + if version.major >= 4: + # since Spark 4.0, XML is bundled with Spark + return + java_class = "com.databricks.spark.xml.XmlReader" try: try_import_java_class(spark, java_class) except Exception as e: @@ -332,12 +339,12 @@ def parse_column(self, column: str | Column, schema: StructType) -> Column: | |-- name: string (nullable = true) | |-- age: integer (nullable = true) """ + from pyspark import __version__ as spark_version from pyspark.sql import Column, SparkSession # noqa: WPS442 spark = SparkSession._instantiatedSession # noqa: WPS437 self.check_if_supported(spark) - from pyspark.sql.column import _to_java_column # noqa: WPS450 from pyspark.sql.functions import col if isinstance(column, Column): @@ -345,6 +352,13 @@ def parse_column(self, column: str | Column, schema: StructType) -> Column: else: column_name, column = column, col(column).cast("string") + if spark_version > "4": + from pyspark.sql.functions import from_xml # noqa: WPS450 + + return from_xml(column, schema, self.dict()).alias(column_name) + + from pyspark.sql.column import _to_java_column # noqa: WPS450 + java_column = _to_java_column(column) java_schema = spark._jsparkSession.parseDataType(schema.json()) # noqa: WPS437 scala_options = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap( # noqa: WPS219, WPS437 diff --git a/onetl/hwm/store/hwm_class_registry.py b/onetl/hwm/store/hwm_class_registry.py index b15b77aff..04086810a 100644 --- a/onetl/hwm/store/hwm_class_registry.py +++ b/onetl/hwm/store/hwm_class_registry.py @@ -2,10 +2,13 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -from typing import ClassVar +from typing import TYPE_CHECKING, ClassVar from etl_entities.hwm import HWM, ColumnDateHWM, ColumnDateTimeHWM, ColumnIntHWM +if TYPE_CHECKING: + from pyspark.sql.types import DataType + class SparkTypeToHWM: """Registry class for HWM types @@ -14,43 +17,67 @@ class SparkTypeToHWM: -------- >>> from etl_entities.hwm import ColumnIntHWM, ColumnDateHWM + >>> from pyspark.sql.types import IntegerType, ShortType, DateType, StringType >>> from onetl.hwm.store import SparkTypeToHWM - >>> SparkTypeToHWM.get("integer") + >>> SparkTypeToHWM.get(IntegerType()) >>> # multiple type names are supported - >>> SparkTypeToHWM.get("short") + >>> SparkTypeToHWM.get(ShortType()) - >>> SparkTypeToHWM.get("date") + >>> SparkTypeToHWM.get(DateType()) - >>> SparkTypeToHWM.get("unknown") + >>> SparkTypeToHWM.get(StringType()) """ - _mapping: ClassVar[dict[str, type[HWM]]] = { - "byte": ColumnIntHWM, - "integer": ColumnIntHWM, - "short": ColumnIntHWM, - "long": ColumnIntHWM, - "date": ColumnDateHWM, - "timestamp": ColumnDateTimeHWM, - # for Oracle which does not differ between int and float/double - everything is Decimal - "float": ColumnIntHWM, - "double": ColumnIntHWM, - "fractional": ColumnIntHWM, - "decimal": ColumnIntHWM, - "numeric": ColumnIntHWM, - } + _mapping: ClassVar[dict[DataType | type[DataType], type[HWM]]] = {} @classmethod - def get(cls, type_name: str) -> type[HWM] | None: - return cls._mapping.get(type_name) + def get(cls, spark_type: DataType) -> type[HWM] | None: + # avoid importing pyspark in the module + from pyspark.sql.types import ( # noqa: WPS235 + ByteType, + DateType, + DecimalType, + DoubleType, + FloatType, + FractionalType, + IntegerType, + LongType, + NumericType, + ShortType, + TimestampType, + ) + + default_mapping: dict[type[DataType], type[HWM]] = { + ByteType: ColumnIntHWM, + IntegerType: ColumnIntHWM, + ShortType: ColumnIntHWM, + LongType: ColumnIntHWM, + DateType: ColumnDateHWM, + TimestampType: ColumnDateTimeHWM, + # for Oracle which does not differ between int and float/double - everything is Decimal + FloatType: ColumnIntHWM, + DoubleType: ColumnIntHWM, + DecimalType: ColumnIntHWM, + FractionalType: ColumnIntHWM, + NumericType: ColumnIntHWM, + } + + return ( + cls._mapping.get(spark_type) + or cls._mapping.get(spark_type.__class__) + or default_mapping.get(spark_type.__class__) + ) @classmethod - def add(cls, type_name: str, klass: type[HWM]) -> None: - cls._mapping[type_name] = klass + def add(cls, spark_type: DataType | type[DataType], klass: type[HWM]) -> None: + cls._mapping[spark_type] = klass -def register_spark_type_to_hwm_type_mapping(*type_names: str): - """Decorator for registering some HWM class with a type name or names +def register_spark_type_to_hwm_type_mapping(*spark_types: DataType | type[DataType]): + """Decorator for registering mapping between Spark data type and HWM type. + + Accepts both data type class and instance. Examples -------- @@ -58,17 +85,20 @@ def register_spark_type_to_hwm_type_mapping(*type_names: str): >>> from etl_entities.hwm import ColumnHWM >>> from onetl.hwm.store import SparkTypeToHWM >>> from onetl.hwm.store import SparkTypeToHWM, register_spark_type_to_hwm_type_mapping - >>> @register_spark_type_to_hwm_type_mapping("somename", "anothername") + >>> from pyspark.sql.types import IntegerType, DecimalType + >>> @register_spark_type_to_hwm_type_mapping(IntegerType, DecimalType(38, 0)) ... class MyHWM(ColumnHWM): ... - >>> SparkTypeToHWM.get("somename") + >>> SparkTypeToHWM.get(IntegerType()) - >>> SparkTypeToHWM.get("anothername") + >>> SparkTypeToHWM.get(DecimalType(38, 0)) + >>> SparkTypeToHWM.get(DecimalType(38, 10)) + """ def wrapper(cls: type[HWM]): - for type_name in type_names: - SparkTypeToHWM.add(type_name, cls) + for spark_type in spark_types: + SparkTypeToHWM.add(spark_type, cls) return cls return wrapper diff --git a/requirements/tests/spark-4.0.0.txt b/requirements/tests/spark-4.0.0.txt new file mode 100644 index 000000000..a9edc6474 --- /dev/null +++ b/requirements/tests/spark-4.0.0.txt @@ -0,0 +1,5 @@ +numpy>=1.16 +pandas>=1.0 +pyarrow>=1.0 +pyspark==4.0.0.dev2 +sqlalchemy diff --git a/tests/fixtures/processing/hive.py b/tests/fixtures/processing/hive.py index 58d830dd0..08330b61c 100644 --- a/tests/fixtures/processing/hive.py +++ b/tests/fixtures/processing/hive.py @@ -83,6 +83,7 @@ def insert_data( ) -> None: df = self.connection.createDataFrame(values) df.write.mode("append").insertInto(f"{schema}.{table}") + self.connection.sql(f"REFRESH TABLE {schema}.{table}") def get_expected_dataframe( self, diff --git a/tests/fixtures/processing/mysql.py b/tests/fixtures/processing/mysql.py index 480a4b227..fe69f3dd4 100644 --- a/tests/fixtures/processing/mysql.py +++ b/tests/fixtures/processing/mysql.py @@ -146,3 +146,17 @@ def get_expected_dataframe( self.get_expected_dataframe_ddl(schema, table, order_by) + ";", con=self.url, ) + + def fix_pandas_df( + self, + df: pandas.DataFrame, + ) -> pandas.DataFrame: + df = super().fix_pandas_df(df) + + for column in df.columns: + if "float" in column: + # Spark 4.0 returns float32 instead float64 + # https://github.com/apache/spark/pull/45666/files + df[column] = df[column].astype("float32") + + return df diff --git a/tests/fixtures/spark.py b/tests/fixtures/spark.py index 7a9b812a4..34d2973fa 100644 --- a/tests/fixtures/spark.py +++ b/tests/fixtures/spark.py @@ -103,7 +103,7 @@ def maven_packages(request): # There is no MongoDB connector for Spark less than 3.2 packages.extend(MongoDB.get_packages(spark_version=str(pyspark_version))) - if "excel" in markers: + if "excel" in markers and pyspark_version.major < 4: # There is no Excel files support for Spark less than 3.2 packages.extend(Excel.get_packages(spark_version=str(pyspark_version))) diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_hive.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_hive.py index 6cc860f56..40ff79b06 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_hive.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_hive.py @@ -200,7 +200,7 @@ def test_hive_strategy_incremental_nothing_to_read(spark, processing, prepare_sc [ ("float_value", ValueError, "Expression 'float_value' returned values"), ("text_string", RuntimeError, "Cannot detect HWM type for"), - ("unknown_column", Exception, r"column .* cannot be resolved|cannot resolve .* given input columns"), + ("unknown_column", Exception, r".*cannot.*resolve.*"), ], ) def test_hive_strategy_incremental_wrong_hwm( diff --git a/tests/util/assert_df.py b/tests/util/assert_df.py index e36c9af73..331388e83 100644 --- a/tests/util/assert_df.py +++ b/tests/util/assert_df.py @@ -66,5 +66,12 @@ def assert_subset_df( columns = [column.lower() for column in columns] for column in columns: # noqa: WPS528 - difference = ~small_pdf[column].isin(large_pdf[column]) - assert not difference.all(), small_pdf[difference] + small_column = small_pdf[column] + large_column = large_pdf[column] + different_indices = ~small_column.isin(large_column) + assert not different_indices.all(), ( + column, + small_column.dtype, + large_column.dtype, + small_pdf[different_indices], + ) diff --git a/tests/util/to_pandas.py b/tests/util/to_pandas.py index 142a024d6..b72aa2d22 100644 --- a/tests/util/to_pandas.py +++ b/tests/util/to_pandas.py @@ -5,6 +5,9 @@ import pandas +from onetl._util.spark import get_pyspark_version +from onetl._util.version import Version + if TYPE_CHECKING: from pyspark.sql import DataFrame as SparkDataFrame @@ -16,16 +19,20 @@ def fix_pyspark_df(df: SparkDataFrame) -> SparkDataFrame: """ Fix Spark DataFrame column types before converting it to Pandas DataFrame. - Using ``df.toPandas()`` on Spark 3.x with Pandas 2.x raises the following exception: + On Spark 4.0, it returns dataframe as-is, see https://issues.apache.org/jira/browse/SPARK-43194. + + On Spark 3.x with Pandas 2.x, ``df.toPandas()`` raises the following exception: .. code:: TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead. This method converts dates and timestamps to strings, to convert them back to original type later. - - TODO: remove after https://issues.apache.org/jira/browse/SPARK-43194 """ + if get_pyspark_version() >= Version("4"): + return df + + # https://issues.apache.org/jira/browse/SPARK-43194 from pyspark.sql.functions import date_format from pyspark.sql.types import DateType, TimestampType @@ -84,9 +91,13 @@ def fix_pandas_df( column_name = column.lower() if "datetime" in column_name: - df[column] = parse_datetime(df[column]) + if df.dtypes[column] == "object": + df[column] = parse_datetime(df[column]) elif "date" in column_name: - df[column] = parse_date(df[column]).dt.date + if df.dtypes[column] == "object": + df[column] = parse_date(df[column]).dt.date + else: + df[column] = df[column].dt.date return df