From 5f23f4a290577e0d8820fb62edd52abbb70f052f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 11 Dec 2024 13:47:36 -0800 Subject: [PATCH 1/5] Async write support for ORC writer Signed-off-by: Jihoon Son --- .../src/main/python/orc_write_test.py | 17 +++++++++++++ .../spark/sql/rapids/GpuOrcFileFormat.scala | 24 +++++++++++++------ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py index 103cae474a3..8a884526c01 100644 --- a/integration_tests/src/main/python/orc_write_test.py +++ b/integration_tests/src/main/python/orc_write_test.py @@ -360,6 +360,23 @@ def create_empty_df(spark, path): conf={'spark.rapids.sql.format.orc.write.enabled': True}) +hold_gpu_configs = [True, False] +@pytest.mark.parametrize('hold_gpu', hold_gpu_configs, ids=idfn) +def test_async_writer(spark_tmp_path, hold_gpu): + data_path = spark_tmp_path + '/ORC_DATA' + num_rows = 2048 + num_cols = 10 + orc_gen = [int_gen for _ in range(num_cols)] + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gen)] + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: gen_df(spark, gen_list, length=num_rows).coalesce(1).write.orc(path), + lambda spark, path: spark.read.orc(path), + data_path, + conf={"spark.rapids.sql.asyncWrite.queryOutput.enabled": "true", + "spark.rapids.sql.batchSizeBytes": 4 * num_cols * 100, # 100 rows per batch + "spark.rapids.sql.queryOutput.holdGpuInTask": hold_gpu}) + + @ignore_order @pytest.mark.skipif(is_before_spark_320(), reason="is only supported in Spark 320+") def test_concurrent_writer(spark_tmp_path): diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index 5ac2aa1fe98..2875d92dba5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -169,7 +169,8 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging { options: Map[String, String], dataSchema: StructType): ColumnarOutputWriterFactory = { - val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) + val sqlConf = sparkSession.sessionState.conf + val orcOptions = new OrcOptions(options, sqlConf) val conf = job.getConfiguration @@ -180,12 +181,17 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging { conf.asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) + val asyncOutputWriteEnabled = RapidsConf.ENABLE_ASYNC_OUTPUT_WRITE.get(sqlConf) + // holdGpuBetweenBatches is on by default if asyncOutputWriteEnabled is on + val holdGpuBetweenBatches = RapidsConf.ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK.get(sqlConf) + .getOrElse(asyncOutputWriteEnabled) + new ColumnarOutputWriterFactory { override def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext, debugOutputPath: Option[String]): ColumnarOutputWriter = { - new GpuOrcWriter(path, dataSchema, context, debugOutputPath) + new GpuOrcWriter(path, dataSchema, context, holdGpuBetweenBatches, asyncOutputWriteEnabled) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -204,11 +210,15 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging { } } -class GpuOrcWriter(override val path: String, - dataSchema: StructType, - context: TaskAttemptContext, - debugOutputPath: Option[String]) - extends ColumnarOutputWriter(context, dataSchema, "ORC", true, debugOutputPath) { +class GpuOrcWriter( + override val path: String, + dataSchema: StructType, + context: TaskAttemptContext, + debugOutputPath: Option[String], + holdGpuBetweenBatches: Boolean, + useAsyncWrite: Boolean) + extends ColumnarOutputWriter(context, dataSchema, "ORC", true, debugOutputPath, + holdGpuBetweenBatches, useAsyncWrite) { override val tableWriter: TableWriter = { val builder = SchemaUtils From a4b1f65a23cc34d54048c52a68497a312764443b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 13 Dec 2024 17:23:34 -0800 Subject: [PATCH 2/5] doc change --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 2 +- .../org/apache/spark/sql/rapids/GpuOrcFileFormat.scala | 3 ++- tools/generated_files/351/operatorsScore.csv | 2 ++ tools/generated_files/351/supportedExprs.csv | 6 ++++++ 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index c4199e3ea75..aecf3dad2b3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2453,7 +2453,7 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .doc("Option to turn on the async query output write. During the final output write, the " + "task first copies the output to the host memory, and then writes it into the storage. " + "When this option is enabled, the task will asynchronously write the output in the host " + - "memory to the storage. Only the Parquet format is supported currently.") + "memory to the storage. Only the Parquet and ORC formats are supported currently.") .internal() .booleanConf .createWithDefault(false) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index 2875d92dba5..422f6c2337e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -191,7 +191,8 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging { dataSchema: StructType, context: TaskAttemptContext, debugOutputPath: Option[String]): ColumnarOutputWriter = { - new GpuOrcWriter(path, dataSchema, context, holdGpuBetweenBatches, asyncOutputWriteEnabled) + new GpuOrcWriter(path, dataSchema, context, debugOutputPath, holdGpuBetweenBatches, + asyncOutputWriteEnabled) } override def getFileExtension(context: TaskAttemptContext): String = { diff --git a/tools/generated_files/351/operatorsScore.csv b/tools/generated_files/351/operatorsScore.csv index 3b0b82d58bf..cfb5b486942 100644 --- a/tools/generated_files/351/operatorsScore.csv +++ b/tools/generated_files/351/operatorsScore.csv @@ -279,6 +279,8 @@ ToUTCTimestamp,4 ToUnixTimestamp,4 TransformKeys,4 TransformValues,4 +TruncDate,4 +TruncTimestamp,4 UnaryMinus,4 UnaryPositive,4 UnboundedFollowing$,4 diff --git a/tools/generated_files/351/supportedExprs.csv b/tools/generated_files/351/supportedExprs.csv index 3c7ee2a51eb..09cea891f97 100644 --- a/tools/generated_files/351/supportedExprs.csv +++ b/tools/generated_files/351/supportedExprs.csv @@ -637,6 +637,12 @@ TransformKeys,S,`transform_keys`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA, TransformValues,S,`transform_values`,None,project,argument,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA TransformValues,S,`transform_values`,None,project,function,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS,NS,NS TransformValues,S,`transform_values`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA +TruncDate,S,`trunc`,None,project,date,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +TruncDate,S,`trunc`,None,project,format,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +TruncDate,S,`trunc`,None,project,result,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +TruncTimestamp,S,`date_trunc`,None,project,format,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +TruncTimestamp,S,`date_trunc`,None,project,date,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +TruncTimestamp,S,`date_trunc`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA UnaryMinus,S,`negative`,None,project,input,NA,S,S,S,S,S,S,NA,NA,NA,S,NA,NA,NS,NA,NA,NA,NA,S,S UnaryMinus,S,`negative`,None,project,result,NA,S,S,S,S,S,S,NA,NA,NA,S,NA,NA,NS,NA,NA,NA,NA,S,S UnaryMinus,S,`negative`,None,AST,input,NA,NS,NS,S,S,S,S,NA,NA,NA,NS,NA,NA,NS,NA,NA,NA,NA,NS,NS From 9a9cc48e68e32858825d2d1cecc7e82129737a6c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 18 Dec 2024 17:04:30 -0800 Subject: [PATCH 3/5] remove unnecessary coalesce in the tests --- integration_tests/src/main/python/orc_write_test.py | 2 +- integration_tests/src/main/python/parquet_write_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py index 8a884526c01..3317f48aadf 100644 --- a/integration_tests/src/main/python/orc_write_test.py +++ b/integration_tests/src/main/python/orc_write_test.py @@ -369,7 +369,7 @@ def test_async_writer(spark_tmp_path, hold_gpu): orc_gen = [int_gen for _ in range(num_cols)] gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gen)] assert_gpu_and_cpu_writes_are_equal_collect( - lambda spark, path: gen_df(spark, gen_list, length=num_rows).coalesce(1).write.orc(path), + lambda spark, path: gen_df(spark, gen_list, length=num_rows).write.orc(path), lambda spark, path: spark.read.orc(path), data_path, conf={"spark.rapids.sql.asyncWrite.queryOutput.enabled": "true", diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index e5719d267b4..8b0a5031a39 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -705,7 +705,7 @@ def test_async_writer(spark_tmp_path, hold_gpu): parquet_gen = [int_gen for _ in range(num_cols)] gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gen)] assert_gpu_and_cpu_writes_are_equal_collect( - lambda spark, path: gen_df(spark, gen_list, length=num_rows).coalesce(1).write.parquet(path), + lambda spark, path: gen_df(spark, gen_list, length=num_rows).write.parquet(path), lambda spark, path: spark.read.parquet(path), data_path, copy_and_update( From ed3cdc09c807a585b7cde5848e87d873fbf3a2b2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 18 Dec 2024 17:07:11 -0800 Subject: [PATCH 4/5] revert unrelated change --- tools/generated_files/351/operatorsScore.csv | 2 -- tools/generated_files/351/supportedExprs.csv | 6 ------ 2 files changed, 8 deletions(-) diff --git a/tools/generated_files/351/operatorsScore.csv b/tools/generated_files/351/operatorsScore.csv index cfb5b486942..3b0b82d58bf 100644 --- a/tools/generated_files/351/operatorsScore.csv +++ b/tools/generated_files/351/operatorsScore.csv @@ -279,8 +279,6 @@ ToUTCTimestamp,4 ToUnixTimestamp,4 TransformKeys,4 TransformValues,4 -TruncDate,4 -TruncTimestamp,4 UnaryMinus,4 UnaryPositive,4 UnboundedFollowing$,4 diff --git a/tools/generated_files/351/supportedExprs.csv b/tools/generated_files/351/supportedExprs.csv index 09cea891f97..3c7ee2a51eb 100644 --- a/tools/generated_files/351/supportedExprs.csv +++ b/tools/generated_files/351/supportedExprs.csv @@ -637,12 +637,6 @@ TransformKeys,S,`transform_keys`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA, TransformValues,S,`transform_values`,None,project,argument,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA TransformValues,S,`transform_values`,None,project,function,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS,NS,NS TransformValues,S,`transform_values`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA -TruncDate,S,`trunc`,None,project,date,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -TruncDate,S,`trunc`,None,project,format,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -TruncDate,S,`trunc`,None,project,result,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -TruncTimestamp,S,`date_trunc`,None,project,format,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -TruncTimestamp,S,`date_trunc`,None,project,date,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -TruncTimestamp,S,`date_trunc`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA UnaryMinus,S,`negative`,None,project,input,NA,S,S,S,S,S,S,NA,NA,NA,S,NA,NA,NS,NA,NA,NA,NA,S,S UnaryMinus,S,`negative`,None,project,result,NA,S,S,S,S,S,S,NA,NA,NA,S,NA,NA,NS,NA,NA,NA,NA,S,S UnaryMinus,S,`negative`,None,AST,input,NA,NS,NS,S,S,S,S,NA,NA,NA,NS,NA,NA,NS,NA,NA,NA,NA,NS,NS From 942821f482cfe8428846d579a6f347bd63566ca0 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 19 Dec 2024 10:38:30 -0800 Subject: [PATCH 5/5] sort results --- integration_tests/src/main/python/orc_write_test.py | 2 +- integration_tests/src/main/python/parquet_write_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py index 3317f48aadf..b3c82675869 100644 --- a/integration_tests/src/main/python/orc_write_test.py +++ b/integration_tests/src/main/python/orc_write_test.py @@ -370,7 +370,7 @@ def test_async_writer(spark_tmp_path, hold_gpu): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gen)] assert_gpu_and_cpu_writes_are_equal_collect( lambda spark, path: gen_df(spark, gen_list, length=num_rows).write.orc(path), - lambda spark, path: spark.read.orc(path), + lambda spark, path: spark.read.orc(path).orderBy([('_c' + str(i)) for i in range(num_cols)]), data_path, conf={"spark.rapids.sql.asyncWrite.queryOutput.enabled": "true", "spark.rapids.sql.batchSizeBytes": 4 * num_cols * 100, # 100 rows per batch diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 8b0a5031a39..9b43fabd26d 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -706,7 +706,7 @@ def test_async_writer(spark_tmp_path, hold_gpu): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gen)] assert_gpu_and_cpu_writes_are_equal_collect( lambda spark, path: gen_df(spark, gen_list, length=num_rows).write.parquet(path), - lambda spark, path: spark.read.parquet(path), + lambda spark, path: spark.read.parquet(path).orderBy([('_c' + str(i)) for i in range(num_cols)]), data_path, copy_and_update( writer_confs,