You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
[24.02] test_dpp_bypass / test_dpp_via_aggregate_subquery failures in CI Databricks 13.3
FAILED ../../src/main/python/dpp_test.py::test_dpp_bypass[true-5-parquet][DATAGEN_SEED=1709607357, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
FAILED ../../src/main/python/dpp_test.py::test_dpp_bypass[true-5-orc][DATAGEN_SEED=1709607357, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
FAILED ../../src/main/python/dpp_test.py::test_dpp_via_aggregate_subquery[true-5-parquet][DATAGEN_SEED=1709607357, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
FAILED ../../src/main/python/dpp_test.py::test_dpp_via_aggregate_subquery[true-5-orc][DATAGEN_SEED=1709607357, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
-----------------------------------------------------------------------------------------------------------------------------------
----------------------------- Captured stdout call -----------------------------
### CPU RUN ###
### GPU RUN ###
_________________ test_dpp_via_aggregate_subquery[true-5-orc] __________________
[gw1] linux -- Python 3.10.12 /usr/bin/python
spark_tmp_table_factory = <conftest.TmpTableFactory object at 0x7ff1e5f22d40>
store_format = 'orc', s_index = 5, aqe_enabled = 'true'
@ignore_order
@pytest.mark.parametrize('store_format', ['parquet', 'orc'], ids=idfn)
@pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn)
@pytest.mark.parametrize('aqe_enabled', [
'false',
pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(),
reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled'))
], ids=idfn)
def test_dpp_via_aggregate_subquery(spark_tmp_table_factory, store_format, s_index, aqe_enabled):
fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get()
create_fact_table(fact_table, store_format)
filter_val = create_dim_table(dim_table, store_format)
statement = _statements[s_index].format(fact_table, dim_table, filter_val)
> assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.sql(statement),
# SubqueryExec appears if we plan extra subquery for DPP
exist_classes='DynamicPruningExpression,SubqueryExec',
conf=dict(_no_exchange_reuse_conf + [('spark.sql.adaptive.enabled', aqe_enabled)]))
../../src/main/python/dpp_test.py:255:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../src/main/python/asserts.py:410: in assert_cpu_and_gpu_are_equal_collect_with_capture
jvm.org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains(gpu_df._jdf, clz)
/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322: in __call__
return_value = get_return_value(
/databricks/spark/python/pyspark/errors/exceptions/captured.py:188: in deco
return f(*a, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
answer = 'xro342590'
gateway_client = <py4j.clientserver.JavaClient object at 0x7ff2000e5b70>
target_id = 'z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback'
name = 'assertContains'
def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a Python object.
For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.
:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
> raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
format(target_id, ".", name), value)
E py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.
E : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan
E AdaptiveSparkPlan isFinalPlan=true
E +- == Final Plan ==
E ResultQueryStage 4, Statistics(sizeInBytes=0.0 B, ColumnStat: N/A)
E +- LocalTableScan <empty>, [key#119441, max(value)#119457L]
E +- == Initial Plan ==
E Sort [key#119441 ASC NULLS FIRST, max(value)#119457L ASC NULLS FIRST], true, 0
E +- Exchange rangepartitioning(key#119441 ASC NULLS FIRST, max(value)#119457L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=208239]
E +- HashAggregate(keys=[key#119441], functions=[finalmerge_max(merge max#119507L) AS max(value#119442L)#119456L], output=[key#119441, max(value)#119457L])
E +- Exchange hashpartitioning(key#119441, 4), ENSURE_REQUIREMENTS, [plan_id=208237]
E +- HashAggregate(keys=[key#119441], functions=[partial_max(value#119442L) AS max#119507L], output=[key#119441, max#119507L])
E +- Union
E :- Project [key#119350 AS key#119441, value#119446L AS value#119442L]
E : +- BroadcastHashJoin [key#119350], [key#119352], Inner, BuildRight, false
E : :- HashAggregate(keys=[key#119350], functions=[finalmerge_sum(merge sum#119509L) AS sum(value#119349)#119450L], output=[key#119350, value#119446L])
E : : +- Exchange hashpartitioning(key#119350, 4), ENSURE_REQUIREMENTS, [plan_id=208223]
E : : +- HashAggregate(keys=[key#119350], functions=[partial_sum(value#119349) AS sum#119509L], output=[key#119350, sum#119509L])
E : : +- Project [value#119349, key#119350]
E : : +- Filter (isnotnull(value#119349) AND (value#119349 > 0))
E : : +- FileScan orc spark_catalog.default.tmp_table_gw1_142524449_0[value#119349,key#119350,skey#119351] Batched: true, DataFilters: [isnotnull(value#119349), (value#119349 > 0)], Format: ORC, Location: InMemoryFileIndex(50 paths)[file:/home/ubuntu/spark-rapids/integration_tests/target/run_dir-20240..., PartitionFilters: [isnotnull(key#119350), dynamicpruning#119503 119502], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
E : +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=208202]
E : +- Project [key#119352]
E : +- Filter ((((isnotnull(ex_key#119354) AND isnotnull(filter#119356)) AND (ex_key#119354 = 3)) AND (filter#119356 = 451)) AND isnotnull(key#119352))
E : +- FileScan orc spark_catalog.default.tmp_table_gw1_142524449_1[key#119352,ex_key#119354,filter#119356] Batched: true, DataFilters: [isnotnull(ex_key#119354), isnotnull(filter#119356), (ex_key#119354 = 3), (filter#119356 = 451), ..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/ubuntu/spark-rapids/integration_tests/target/run_dir-202403..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,451), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
E +- Project [key#119490, value#119493L]
E +- BroadcastHashJoin [key#119490], [key#119494], Inner, BuildRight, false
E :- HashAggregate(keys=[key#119490], functions=[finalmerge_sum(merge sum#119511L) AS sum(value#119489)#119450L], output=[key#119490, value#119493L])
E : +- Exchange hashpartitioning(key#119490, 4), ENSURE_REQUIREMENTS, [plan_id=208231]
E : +- HashAggregate(keys=[key#119490], functions=[partial_sum(value#119489) AS sum#119511L], output=[key#119490, sum#119511L])
E : +- Project [value#119489, key#119490]
E : +- Filter (isnotnull(value#119489) AND (value#119489 > 0))
E : +- FileScan orc spark_catalog.default.tmp_table_gw1_142524449_0[value#119489,key#119490,skey#119491] Batched: true, DataFilters: [isnotnull(value#119489), (value#119489 > 0)], Format: ORC, Location: InMemoryFileIndex(50 paths)[file:/home/ubuntu/spark-rapids/integration_tests/target/run_dir-20240..., PartitionFilters: [isnotnull(key#119490), dynamicpruning#119505 119504], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
E +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=208208]
E +- Project [key#119494]
E +- Filter ((((isnotnull(ex_key#119496) AND isnotnull(filter#119498)) AND (ex_key#119496 = 3)) AND (filter#119498 = 451)) AND isnotnull(key#119494))
E +- FileScan orc spark_catalog.default.tmp_table_gw1_142524449_1[key#119494,ex_key#119496,filter#119498] Batched: true, DataFilters: [isnotnull(ex_key#119496), isnotnull(filter#119498), (ex_key#119496 = 3), (filter#119498 = 451), ..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/ubuntu/spark-rapids/integration_tests/target/run_dir-202403..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,451), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
E
E at scala.Predef$.assert(Predef.scala:223)
E at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:170)
E at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:175)
E at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback$.assertContains(ExecutionPlanCaptureCallback.scala:76)
E at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains(ExecutionPlanCaptureCallback.scala)
E at sun.reflect.GeneratedMethodAccessor422.invoke(Unknown Source)
E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.lang.reflect.Method.invoke(Method.java:498)
E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
E at py4j.Gateway.invoke(Gateway.java:306)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
E at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
E at java.lang.Thread.run(Thread.java:750)
/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326: Py4JJavaError
----------------------------- Captured stdout call -----------------------------
The text was updated successfully, but these errors were encountered:
Describe the bug
[24.02] test_dpp_bypass / test_dpp_via_aggregate_subquery failures in CI Databricks 13.3
The text was updated successfully, but these errors were encountered: