Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests to check compatibility with fastparquet #9366

Merged
merged 23 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
74d31f1
WIP: Initial stab at fastparquet tests.
mythrocks Sep 29, 2023
d3394dc
Date tests. Plus minor refactor.
mythrocks Sep 29, 2023
27ab6d9
Date/Time tests.
mythrocks Sep 29, 2023
b309918
Added tests for reading data written with fastparquet.
mythrocks Oct 2, 2023
45000f9
Tests for reading GPU-written files.
mythrocks Oct 2, 2023
cad94bb
Added failing tests for arrays, struct.
mythrocks Oct 5, 2023
725c316
Clarification of failure conditions.
mythrocks Oct 5, 2023
1b141c2
Workaround tests for timestamps.
mythrocks Oct 5, 2023
641fa14
Workaround tests for dates.
mythrocks Oct 5, 2023
ef9c5f1
Miscellaneous fixes:
mythrocks Oct 5, 2023
782e076
Test descriptions.
mythrocks Oct 5, 2023
411612e
Workaround tests for STRUCT, ARRAY, etc.
mythrocks Oct 5, 2023
3624fac
Added xfails for struct/array.
mythrocks Oct 5, 2023
6bcff7a
Updated with concrete fastparquet version.
mythrocks Oct 5, 2023
4a67ef0
Fixed up some xfail messages.
mythrocks Oct 6, 2023
b6b7d19
Fixed another xfail message.
mythrocks Oct 9, 2023
c9f7e2c
Extend date/time margins to Pandas.Timestamp.min and Pandas.Timestamp…
mythrocks Oct 9, 2023
fceafc2
Added dependency to CI scripts, Docker images.
mythrocks Oct 9, 2023
b10a321
Change in tack: Install fastparquet explicitly.
mythrocks Oct 9, 2023
f34eec7
Per #8789, reverted change for Centos Dockerfile.
mythrocks Oct 10, 2023
126b9f4
Removed fastparquet from UDF tests.
mythrocks Oct 10, 2023
fa356f8
Optionally skips fastparquet tests.
mythrocks Oct 10, 2023
c072dc5
Merge remote-tracking branch 'origin/branch-23.12' into fastparquet-c…
mythrocks Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ pandas
pyarrow
pytest-xdist >= 2.0.0
findspark
fastparquet
fastparquet == 2023.8.0
147 changes: 127 additions & 20 deletions integration_tests/src/main/python/fastparquet_compatibility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from datetime import date, datetime, timezone
from data_gen import *
from pyspark.sql.types import *
from spark_session import with_cpu_session, with_gpu_session
import fastparquet
from spark_session import spark_version, with_cpu_session, with_gpu_session

rebase_write_corrected_conf = {
'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
Expand All @@ -40,14 +39,15 @@ def read_with_fastparquet_or_plugin(spark):
if plugin_enabled:
return spark.read.parquet(data_path)
else:
import fastparquet
df = fastparquet.ParquetFile(data_path).to_pandas()
return spark.createDataFrame(df)

return read_with_fastparquet_or_plugin


@pytest.mark.parametrize('corrected_conf', [rebase_write_corrected_conf])
@pytest.mark.skipif(condition=spark_version() < "3.4.1",
reason="spark_df.to_pandas() is not reliable on prior versions of Spark. 3.4.1 and above seem to "
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
"work more stably.")
@pytest.mark.parametrize('data_gen', [
ByteGen(nullable=False),
ShortGen(nullable=False),
Expand Down Expand Up @@ -76,25 +76,38 @@ def read_with_fastparquet_or_plugin(spark):
start=datetime(1, 1, 1, tzinfo=timezone.utc),
end=datetime(1899, 12, 31, tzinfo=timezone.utc)),
marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")),
# TODO: Array gen type deduction borked when converting from Pandas to Spark dataframe.
# ArrayGen(child_gen=IntegerGen(nullable=False), nullable=False),
# TODO: Struct rows seem to be correct, but are failing comparison because of differences in Row representation.
# StructGen(children=[("first", IntegerGen(nullable=False)),
# ("second", FloatGen(nullable=False))], nullable=False)
pytest.param(
ArrayGen(child_gen=IntegerGen(nullable=False), nullable=False),
marks=pytest.mark.xfail(reason="Conversion from Pandas dataframe to Spark dataframe fails: "
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
"\"Unable to infer the type of the field a\".")),
pytest.param(
StructGen(children=[("first", IntegerGen(nullable=False)),
("second", FloatGen(nullable=False))], nullable=False),
marks=pytest.mark.xfail(reason="Values are correct, but struct row representations differ between "
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
"fastparquet and Spark."))
], ids=idfn)
def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, corrected_conf):
def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path):
"""
This test writes data_gen output to Parquet via Apache Spark, then verifies that fastparquet and the RAPIDS
plugin read the data identically.
There are xfails here because of limitations in converting Spark dataframes to Pandas, if they contain nulls,
as well as limitations in fastparquet's handling of Dates, Timestamps, Decimals, etc.
"""
data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT"
gen = StructGen([('a', data_gen)], nullable=False)
# Write data with CPU session.
with_cpu_session(
# Single output file, to avoid differences in order of file reads.
lambda spark: gen_df(spark, gen, 3).repartition(1).write.mode('overwrite').parquet(data_path),
conf=corrected_conf
lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path),
conf=rebase_write_corrected_conf
)
# Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records.
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=corrected_conf)
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path))


@pytest.mark.skipif(condition=spark_version() < "3.4.1",
reason="spark_df.to_pandas() is not reliable on prior versions of Spark. 3.4.1 and above seem to "
"work more stably.")
@pytest.mark.parametrize('column_gen', [
ByteGen(nullable=False),
ShortGen(nullable=False),
Expand Down Expand Up @@ -125,6 +138,11 @@ def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, correct
marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")),
], ids=idfn)
def test_reading_file_written_with_gpu(spark_tmp_path, column_gen):
"""
This test writes the data-gen output to file via the RAPIDS plugin, then checks that the data is read identically
via fastparquet and Spark.
There are xfails here because of fastparquet limitations in handling Decimal, Timestamps, Dates, etc.
"""
data_path = spark_tmp_path + "/FASTPARQUET_TEST_GPU_WRITE_PATH"

gen = StructGen([('a', column_gen),
Expand Down Expand Up @@ -171,30 +189,39 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen):
start=date(year=2000, month=1, day=1),
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
end=date(year=2020, month=12, day=31)),
marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Dates generated in Spark can't be written "
"with fastparquet, because the dtype/encoding cannot be deduced.")),
"with fastparquet, because the dtype/encoding cannot be deduced. "
"This test has a workaround in test_reading_file_rewritten_with_fastparquet.")),
pytest.param(
TimestampGen(nullable=False),
marks=pytest.mark.xfail(reason="Timestamps exceeding year=2300 are out of bounds for Pandas.")),
marks=pytest.mark.xfail(reason="Old timestamps are out of bounds for Pandas. E.g.: "
"\"pandas._libs.tslibs.np_datetime.OutOfBoundsDatetime: Out of bounds "
"nanosecond timestamp: 740-07-19 18:09:56\"."
"This test has a workaround in test_reading_file_rewritten_with_fastparquet.")),
pytest.param(
TimestampGen(nullable=False,
start=datetime(2000, 1, 1, tzinfo=timezone.utc),
end=datetime(2200, 12, 31, tzinfo=timezone.utc)),
marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Timestamps in Spark can't be "
"converted to pandas, because of type errors. The error states: "
"\"TypeError: Casting to unit-less dtype 'datetime64' is not supported. "
"Pass e.g. 'datetime64[ns]' instead.\"")),
"Pass e.g. 'datetime64[ns]' instead.\" This test has a workaround in "
"test_reading_file_rewritten_with_fastparquet.")),
pytest.param(
ArrayGen(IntegerGen(nullable=False), nullable=False),
marks.pytest.mark.xfail(reason="spark.toPandas() problem: toPandas() converts Array columns into String. "
marks=pytest.mark.xfail(reason="spark.toPandas() problem: toPandas() converts Array columns into String. "
"The test then fails with the same problem as with String columns. "
"See https://github.com/NVIDIA/spark-rapids/issues/9387.")),
"See https://github.com/NVIDIA/spark-rapids/issues/9387."
"This test has a workaround in test_reading_file_rewritten_with_fastparquet.")),
], ids=idfn)
def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path):
"""
This test writes data-gen output with fastparquet, and checks that both Apache Spark and the RAPIDS plugin
read the written data correctly.
"""
data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH"

def write_with_fastparquet(spark, data_gen):
# TODO: (future) Compression settings?
import fastparquet
dataframe = gen_df(spark, data_gen, 2048)
fastparquet.write(data_path, dataframe.toPandas())

Expand All @@ -209,3 +236,83 @@ def write_with_fastparquet(spark, data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
rebase_write_corrected_conf)


@pytest.mark.parametrize('column_gen, time_format', [
pytest.param(
DateGen(nullable=False,
start=date(year=2000, month=1, day=1),
end=date(year=2020, month=12, day=31)), 'int64',
marks=pytest.mark.xfail(reason="Apache Spark and the plugin both have problems reading dates written via "
"fastparquet, if written in int64: "
"\"Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)).\"")),
pytest.param(
DateGen(nullable=False), 'int96',
marks=pytest.mark.xfail(reason="fastparquet does not support int96RebaseModeInWrite, for dates before "
"1582-10-15 or timestamps before 1900-01-01T00:00:00Z. "
"This messes up reads from Apache Spark and the plugin.")),
(DateGen(nullable=False,
start=date(year=2000, month=1, day=1),
end=date(year=2020, month=12, day=31)), 'int96'),
(DateGen(nullable=True,
start=date(year=2000, month=1, day=1),
end=date(year=2020, month=12, day=31)), 'int96'),
pytest.param(
TimestampGen(nullable=False,
start=datetime(2000, 1, 1, tzinfo=timezone.utc),
end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int64',
marks=pytest.mark.xfail(reason="Apache Spark and the plugin both have problems reading timestamps written via "
"fastparquet, if written in int64: "
"\"Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)).\"")),
(TimestampGen(nullable=False,
start=datetime(2000, 1, 1, tzinfo=timezone.utc),
end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int96'),
(TimestampGen(nullable=True,
start=datetime(2000, 1, 1, tzinfo=timezone.utc),
end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int96'),
pytest.param(
TimestampGen(nullable=False), 'int96',
marks=pytest.mark.xfail(reason="fastparquet does not support int96RebaseModeInWrite, for dates before "
"1582-10-15 or timestamps before 1900-01-01T00:00:00Z. "
"This messes up reads from Apache Spark and the plugin.")),
pytest.param(
ArrayGen(nullable=False, child_gen=IntegerGen(nullable=False)), 'int96',
marks=pytest.mark.xfail(reason="fastparquet fails to serialize array elements with any available encoding. "
"E.g. \"Error converting column 'a' to bytes using encoding JSON. "
"Original error: Object of type int32 is not JSON serializable\".")),
(StructGen(nullable=False, children=[('first', IntegerGen(nullable=False))]), 'int96'),
pytest.param(
StructGen(nullable=True, children=[('first', IntegerGen(nullable=False))]), 'int96',
marks=pytest.mark.xfail(reason="fastparquet fails to read nullable Struct columns written from Apache Spark. "
"It fails the rewrite to parquet, thereby failing the test.")),
], ids=idfn)
def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_tmp_path):
"""
This test is a workaround to test data-types that have problems being converted
from Spark dataframes to Pandas dataframes.
For instance, sparkDF.toPandas() incorrectly converts ARRAY<INT> columns into
STRING columns.
This test writes the Spark dataframe into a temporary file, and then uses
`fastparquet` to read and write the file again, to the final destination.
The final file should be in the correct format, with the right datatypes.
This is then checked for read-accuracy, via CPU and GPU.
"""
data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH"

def rewrite_with_fastparquet(spark, data_gen):
tmp_data_path = data_path + "_tmp"
spark_df = gen_df(spark, data_gen, 2048)
spark_df.repartition(1).write.mode("overwrite").parquet(tmp_data_path)
pandas_df = fastparquet.ParquetFile(tmp_data_path).to_pandas()
fastparquet.write(data_path, pandas_df, times=time_format)

gen = StructGen([('a', column_gen),
('part', IntegerGen(nullable=False))], nullable=False)
# Write data with CPU session.
with_cpu_session(
lambda spark: rewrite_with_fastparquet(spark, gen)
)
# Read Parquet with CPU (Apache Spark) and GPU (plugin), and compare records.
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
rebase_write_corrected_conf)
Loading