Skip to content

Commit

Permalink
Merge pull request #4 from firestarman/pandas-test-mg
Browse files Browse the repository at this point in the history
Add integration tests for Pandas UDF
  • Loading branch information
firestarman authored Sep 8, 2020
2 parents 69b1ec0 + 6435eaa commit c33b41c
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 5 deletions.
1 change: 1 addition & 0 deletions integration_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ markers =
incompat: Enable incompat operators
limit(num_rows): Limit the number of rows that will be check in a result
qarun: Mark qa test
udf: Mark udf test

1 change: 1 addition & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
incompat = pytest.mark.incompat
limit = pytest.mark.limit
qarun = pytest.mark.qarun
udf = pytest.mark.udf
283 changes: 283 additions & 0 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import pandas as pd
import time
from typing import Iterator
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType
from spark_session import with_cpu_session, with_gpu_session
from marks import udf, allow_non_gpu

_conf = {
'spark.rapids.sql.exec.ArrowEvalPythonExec':'true',
'spark.rapids.sql.exec.MapInPandasExec':'true',
'spark.rapids.sql.exec.FlatMapGroupsInPandasExec': 'true',
'spark.rapids.sql.exec.AggregateInPandasExec': 'true',
'spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec': 'true',
'spark.rapids.sql.exec.WindowInPandasExec': 'true',
'spark.rapids.python.gpu.enabled': 'true'
}

def _create_df(spark):
return spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v")
)

def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False):
print('### CPU RUN ###')
cpu_start = time.time()
cpu_ret = with_cpu_session(cpu_func, conf=cpu_conf)
cpu_end = time.time()
print('### GPU RUN ###')
gpu_start = time.time()
gpu_ret = with_gpu_session(gpu_func, conf=gpu_conf)
gpu_end = time.time()
print('### WRITE: GPU TOOK {} CPU TOOK {} ###'.format(
gpu_end - gpu_start, cpu_end - cpu_start))
print('### CPU RETURN ###')
print(cpu_ret)
print('### GPU RETURN ###')
print(gpu_ret)
if is_sort:
assert cpu_ret.sort() == gpu_ret.sort()
else:
assert cpu_ret == gpu_ret


@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1

@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_serises = cudf.Series(v)
gpu_serises = gpu_serises + 1
return gpu_serises.to_pandas()

@allow_non_gpu(any=True)
@udf
def test_with_column():
def cpu_run(spark):
df = _create_df(spark)
return df.withColumn("v1", _plus_one_cpu_func(df.v)).collect()

def gpu_run(spark):
df = _create_df(spark)
return df.withColumn("v1", _plus_one_gpu_func(df.v)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)

@allow_non_gpu(any=True)
@udf
def test_sql():
def cpu_run(spark):
_ = spark.udf.register("add_one_cpu", _plus_one_cpu_func)
return spark.sql("SELECT add_one_cpu(id) FROM range(3)").collect()
def gpu_run(spark):
_ = spark.udf.register("add_one_gpu", _plus_one_gpu_func)
return spark.sql("SELECT add_one_gpu(id) FROM range(3)").collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


@pandas_udf("long")
def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1

@pandas_udf("long")
def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import cudf
for s in iterator:
gpu_serises = cudf.Series(s)
gpu_serises = gpu_serises + 1
yield gpu_serises.to_pandas()

@allow_non_gpu(any=True)
@udf
def test_select():
def cpu_run(spark):
df = _create_df(spark)
return df.select(_plus_one_cpu_iter_func(df.v)).collect()

def gpu_run(spark):
df = _create_df(spark)
return df.select(_plus_one_gpu_iter_func(df.v)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


@allow_non_gpu(any=True)
@udf
def test_map_in_pandas():
def cpu_run(spark):
df = _create_df(spark)
def _filter_cpu_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
return df.mapInPandas(_filter_cpu_func, df.schema).collect()

def gpu_run(spark):
df = _create_df(spark)
def _filter_gpu_func(iterator):
import cudf
for pdf in iterator:
gdf = cudf.from_pandas(pdf)
yield gdf[gdf.id == 1].to_pandas()
return df.mapInPandas(_filter_gpu_func, df.schema).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


#To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP
#need to add udf type
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_cpu_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_gpu_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()

@allow_non_gpu(any=True)
@udf
def test_group_apply():
def cpu_run(spark):
df = _create_df(spark)
return df.groupby("id").apply(_normalize_cpu_func).collect()

def gpu_run(spark):
df = _create_df(spark)
return df.groupby("id").apply(_normalize_gpu_func).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@allow_non_gpu(any=True)
@udf
def test_group_apply_in_pandas():
def cpu_run(spark):
df = _create_df(spark)
def _normalize_cpu_in_pandas_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())
return df.groupby("id").applyInPandas(_normalize_cpu_in_pandas_func, df.schema).collect()

def gpu_run(spark):
df = _create_df(spark)
def _normalize_gpu_in_pandas_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()
return df.groupby("id").applyInPandas(_normalize_gpu_in_pandas_func, df.schema).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()

@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_serises = cudf.Series(v)
return gpu_serises.sum()

@allow_non_gpu(any=True)
@udf
def test_group_agg():
def cpu_run(spark):
df = _create_df(spark)
return df.groupby("id").agg(_sum_cpu_func(df.v)).collect()

def gpu_run(spark):
df = _create_df(spark)
return df.groupby("id").agg(_sum_gpu_func(df.v)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@allow_non_gpu(any=True)
@udf
def test_sql_group():
def cpu_run(spark):
_ = spark.udf.register("sum_cpu_udf", _sum_cpu_func)
q = "SELECT sum_cpu_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
return spark.sql(q).collect()

def gpu_run(spark):
_ = spark.udf.register("sum_gpu_udf", _sum_gpu_func)
q = "SELECT sum_gpu_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
return spark.sql(q).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@allow_non_gpu(any=True)
@udf
def test_window():
def cpu_run(spark):
df = _create_df(spark)
w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
return df.withColumn('sum_v', _sum_cpu_func('v').over(w)).collect()

def gpu_run(spark):
df = _create_df(spark)
w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
return df.withColumn('sum_v', _sum_gpu_func('v').over(w)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@allow_non_gpu(any=True)
@udf
def test_cogroup():
def cpu_run(spark):
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def _cpu_join_func(l, r):
return pd.merge(l, r, on="time")
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect()

def gpu_run(spark):
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def _gpu_join_func(l, r):
import cudf
gl = cudf.from_pandas(l)
gr = cudf.from_pandas(r)
return gl.merge(gr, on="time").to_pandas()
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


2 changes: 1 addition & 1 deletion jenkins/Dockerfile.ubuntu16
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN add-apt-repository ppa:deadsnakes/ppa && \
openjdk-8-jdk python3.6 python3-pip tzdata git

RUN ln -s /usr/bin/python3.6 /usr/bin/python
RUN python -m pip install pytest sre_yield requests
RUN python -m pip install pytest sre_yield requests pandas numpy pyarrow

RUN adduser --uid 26576 --gid 30 --shell /bin/bash svcngcc
USER svcngcc
6 changes: 3 additions & 3 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH"
tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \
rm -f $SPARK_HOME.tgz

mvn -U -B $MVN_URM_MIRROR '-Pinclude-databricks,!snapshot-shims' clean verify -Dpytest.TEST_TAGS=''
mvn -U -B $MVN_URM_MIRROR '-Pinclude-databricks,!snapshot-shims' clean verify -Dpytest.TEST_TAGS='not udf'
# Run the unit tests for other Spark versions but dont run full python integration tests
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS=''
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS=''
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS='not udf'
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS='not udf'

# The jacoco coverage should have been collected, but because of how the shade plugin
# works and jacoco we need to clean some things up so jacoco will only report for the
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
<rapids.shuffle.manager.override>false</rapids.shuffle.manager.override>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.sourceEncoding>UTF-8</project.reporting.sourceEncoding>
<pytest.TEST_TAGS>not qarun</pytest.TEST_TAGS>
<pytest.TEST_TAGS>not qarun and not udf</pytest.TEST_TAGS>
<rat.consoleOutput>false</rat.consoleOutput>
<slf4j.version>1.7.30</slf4j.version>
<spark300.version>3.0.0</spark300.version>
Expand Down

0 comments on commit c33b41c

Please sign in to comment.