From d8a5e5e400854abee6d60055f937d18503eb270f Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Thu, 28 Sep 2023 13:34:15 -0700 Subject: [PATCH 1/6] Add GPU version of ToPrettyString [databricks] (#9221) * Added a GpuToPrettyString * Added unit tests because integration tests don't cover `DF.show()` --------- Signed-off-by: Raza Jafri --- .../src/main/python/cast_test.py | 7 +- .../com/nvidia/spark/rapids/GpuCast.scala | 170 +++++++++++------- .../com/nvidia/spark/rapids/TypeChecks.scala | 17 +- .../spark/rapids/shims/GpuCastShims.scala | 2 +- .../spark/rapids/shims/GpuCastShims.scala | 4 +- .../rapids/shims/GpuToPrettyString.scala | 67 +++++++ .../spark/rapids/shims/SparkShims.scala | 16 +- .../spark/rapids/GpuBatchUtilsSuite.scala | 101 ++++++----- .../spark/rapids/ToPrettyStringSuite.scala | 116 ++++++++++++ 9 files changed, 380 insertions(+), 120 deletions(-) create mode 100644 sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/GpuToPrettyString.scala create mode 100644 tests/src/test/spark350/scala/com/nvidia/spark/rapids/ToPrettyStringSuite.scala diff --git a/integration_tests/src/main/python/cast_test.py b/integration_tests/src/main/python/cast_test.py index 496002d931d..16c946b2811 100644 --- a/integration_tests/src/main/python/cast_test.py +++ b/integration_tests/src/main/python/cast_test.py @@ -16,8 +16,8 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error, assert_gpu_fallback_collect, assert_spark_exception from data_gen import * -from spark_session import is_before_spark_320, is_before_spark_330, is_spark_340_or_later, is_spark_350_or_later, \ - is_databricks113_or_later, with_gpu_session +from spark_session import is_before_spark_320, is_before_spark_330, is_spark_340_or_later, \ + is_databricks113_or_later from marks import allow_non_gpu, approximate_float from pyspark.sql.types import * from spark_init_internal import spark_version @@ -297,7 +297,6 @@ def _assert_cast_to_string_equal (data_gen, conf): @pytest.mark.parametrize('data_gen', all_array_gens_for_cast_to_string, ids=idfn) @pytest.mark.parametrize('legacy', ['true', 'false']) -@pytest.mark.xfail(condition=is_spark_350_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9065') def test_cast_array_to_string(data_gen, legacy): _assert_cast_to_string_equal( data_gen, @@ -317,7 +316,6 @@ def test_cast_array_with_unmatched_element_to_string(data_gen, legacy): @pytest.mark.parametrize('data_gen', basic_map_gens_for_cast_to_string, ids=idfn) @pytest.mark.parametrize('legacy', ['true', 'false']) -@pytest.mark.xfail(condition=is_spark_350_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9065') def test_cast_map_to_string(data_gen, legacy): _assert_cast_to_string_equal( data_gen, @@ -337,7 +335,6 @@ def test_cast_map_with_unmatched_element_to_string(data_gen, legacy): @pytest.mark.parametrize('data_gen', [StructGen([[str(i), gen] for i, gen in enumerate(basic_array_struct_gens_for_cast_to_string)] + [["map", MapGen(ByteGen(nullable=False), null_gen)]])], ids=idfn) @pytest.mark.parametrize('legacy', ['true', 'false']) -@pytest.mark.xfail(condition=is_spark_350_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9065') def test_cast_struct_to_string(data_gen, legacy): _assert_cast_to_string_equal( data_gen, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 7a4bdb592b4..d9dded085c5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -50,24 +50,46 @@ final class CastExprMeta[INPUT <: UnaryExpression with TimeZoneAwareExpression w // We do not want to create a shim class for this small change stringToAnsiDate: Boolean, toTypeOverride: Option[DataType] = None) - extends UnaryExprMeta[INPUT](cast, conf, parent, rule) { + extends CastExprMetaBase(cast, conf, parent, rule, doFloatToIntCheck) { + + val legacyCastComplexTypesToString: Boolean = + SQLConf.get.getConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING) + override val toType: DataType = toTypeOverride.getOrElse(cast.dataType) + + override def tagExprForGpu(): Unit = { + if (evalMode == GpuEvalMode.TRY) { + willNotWorkOnGpu("try_cast is not supported on the GPU") + } + recursiveTagExprForGpuCheck() + } def withToTypeOverride(newToType: DecimalType): CastExprMeta[INPUT] = new CastExprMeta[INPUT](cast, evalMode, conf, parent, rule, doFloatToIntCheck, stringToAnsiDate, Some(newToType)) + override def convertToGpu(child: Expression): GpuExpression = + GpuCast(child, toType, evalMode == GpuEvalMode.ANSI, cast.timeZoneId, + legacyCastComplexTypesToString, stringToAnsiDate) + +} + +/** Meta-data for cast, ansi_cast and ToPrettyString */ +abstract class CastExprMetaBase[INPUT <: UnaryExpression with TimeZoneAwareExpression]( + cast: INPUT, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule, + doFloatToIntCheck: Boolean = false) + extends UnaryExprMeta[INPUT](cast, conf, parent, rule) { + val fromType: DataType = cast.child.dataType - val toType: DataType = toTypeOverride.getOrElse(cast.dataType) - val legacyCastToString: Boolean = SQLConf.get.getConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING) + val toType: DataType = cast.dataType override def tagExprForGpu(): Unit = { - if (evalMode == GpuEvalMode.TRY) { - willNotWorkOnGpu("try_cast is not supported on the GPU") - } recursiveTagExprForGpuCheck() } - private def recursiveTagExprForGpuCheck( + protected def recursiveTagExprForGpuCheck( fromDataType: DataType = fromType, toDataType: DataType = toType, depth: Int = 0): Unit = { @@ -155,10 +177,6 @@ final class CastExprMeta[INPUT <: UnaryExpression with TimeZoneAwareExpression w s"${entry.doc}. To enable this operation on the GPU, set ${entry.key} to true." } - override def convertToGpu(child: Expression): GpuExpression = - GpuCast(child, toType, evalMode == GpuEvalMode.ANSI, cast.timeZoneId, legacyCastToString, - stringToAnsiDate) - // timezone tagging in type checks is good enough, so always false override protected val needTimezoneTagging: Boolean = false } @@ -166,9 +184,22 @@ final class CastExprMeta[INPUT <: UnaryExpression with TimeZoneAwareExpression w object CastOptions { val DEFAULT_CAST_OPTIONS = new CastOptions(false, false, false) val ARITH_ANSI_OPTIONS = new CastOptions(false, true, false) + val TO_PRETTY_STRING_OPTIONS = ToPrettyStringOptions def getArithmeticCastOptions(failOnError: Boolean): CastOptions = if (failOnError) ARITH_ANSI_OPTIONS else DEFAULT_CAST_OPTIONS + + object ToPrettyStringOptions extends CastOptions(false, false, false) { + override val leftBracket: String = "{" + + override val rightBracket: String = "}" + + override val nullString: String = "NULL" + + override val useDecimalPlainString: Boolean = true + + override val useHexFormatForBinary: Boolean = true + } } /** @@ -281,8 +312,12 @@ object GpuCast { case (DateType, BooleanType | _: NumericType) => // casts from date type to numerics are always null GpuColumnVector.columnVectorFromNull(input.getRowCount.toInt, toDataType) - case (DateType, StringType) => - input.asStrings("%Y-%m-%d") + + // Cast to String + case (DateType | TimestampType | FloatType | DoubleType | BinaryType | + _: DecimalType | _: ArrayType | _: MapType | _: StructType, StringType) => + castToString(input, fromDataType, options) + case (TimestampType, FloatType | DoubleType) => withResource(input.castTo(DType.INT64)) { asLongs => @@ -301,13 +336,13 @@ object GpuCast { toDataType match { case IntegerType => assertValuesInRange[Long](cv, Int.MinValue.toLong, - Int.MaxValue.toLong, errorMsg = GpuCast.OVERFLOW_MESSAGE) + Int.MaxValue.toLong, errorMsg = OVERFLOW_MESSAGE) case ShortType => assertValuesInRange[Long](cv, Short.MinValue.toLong, - Short.MaxValue.toLong, errorMsg = GpuCast.OVERFLOW_MESSAGE) + Short.MaxValue.toLong, errorMsg = OVERFLOW_MESSAGE) case ByteType => assertValuesInRange[Long](cv, Byte.MinValue.toLong, - Byte.MaxValue.toLong, errorMsg = GpuCast.OVERFLOW_MESSAGE) + Byte.MaxValue.toLong, errorMsg = OVERFLOW_MESSAGE) } } cv.castTo(GpuColumnVector.getNonNestedRapidsType(toDataType)) @@ -320,12 +355,6 @@ object GpuCast { asLongs.floorDiv(microsPerSec, GpuColumnVector.getNonNestedRapidsType(toDataType)) } } - case (TimestampType, StringType) => - castTimestampToString(input) - - case (StructType(fields), StringType) => - castStructToString(input, fields, options) - // ansi cast from larger-than-long integral-like types, to long case (dt: DecimalType, LongType) if ansiMode => // This is a work around for https://github.com/rapidsai/cudf/issues/9282 @@ -338,7 +367,7 @@ object GpuCast { withResource(input.max()) { maxInput => if (minInput.isValid && minInput.getBigDecimal().compareTo(min) == -1 || maxInput.isValid && maxInput.getBigDecimal().compareTo(max) == 1) { - throw new ArithmeticException(GpuCast.OVERFLOW_MESSAGE) + throw new ArithmeticException(OVERFLOW_MESSAGE) } } } @@ -501,9 +530,7 @@ object GpuCast { withResource(FloatUtils.nanToZero(input)) { inputWithNansToZero => inputWithNansToZero.castTo(GpuColumnVector.getNonNestedRapidsType(toDataType)) } - case (FloatType | DoubleType, StringType) => - castFloatingTypeToString(input) - case (StringType, ByteType | ShortType | IntegerType | LongType ) => + case (StringType, ByteType | ShortType | IntegerType | LongType) => CastStrings.toInteger(input, ansiMode, GpuColumnVector.getNonNestedRapidsType(toDataType)) case (StringType, FloatType | DoubleType) => @@ -533,12 +560,6 @@ object GpuCast { case (ShortType | IntegerType | LongType | ByteType | StringType, BinaryType) => input.asByteList(true) - case (BinaryType, StringType) => - castBinToString(input) - - case (_: DecimalType, StringType) => - GpuCastShims.CastDecimalToString(input, ansiMode) - case (ArrayType(nestedFrom, _), ArrayType(nestedTo, _)) => withResource(input.getChildColumnView(0)) { childView => withResource(doCast(childView, nestedFrom, nestedTo, options)) { childColumnVector => @@ -546,18 +567,12 @@ object GpuCast { } } - case (ArrayType(elementType, _), StringType) => - castArrayToString(input, elementType, options) - case (from: StructType, to: StructType) => castStructToStruct(from, to, input, options) case (from: MapType, to: MapType) => castMapToMap(from, to, input, options) - case (from: MapType, _: StringType) => - castMapToString(input, from, options) - case (dayTime: DataType, _: StringType) if GpuTypeShims.isSupportedDayTimeType(dayTime) => GpuIntervalUtils.toDayTimeIntervalString(input, dayTime) @@ -618,7 +633,7 @@ object GpuCast { maxValue: T, inclusiveMin: Boolean = true, inclusiveMax: Boolean = true, - errorMsg:String = GpuCast.OVERFLOW_MESSAGE) + errorMsg: String = OVERFLOW_MESSAGE) (implicit ord: Ordering[T]): Unit = { def throwIfAnyNan(): Unit = { @@ -705,6 +720,25 @@ object GpuCast { } } + def castToString( + input: ColumnView, + fromDataType: DataType, options: CastOptions): ColumnVector = fromDataType match { + case StringType => input.copyToColumnVector() + case DateType => input.asStrings("%Y-%m-%d") + case TimestampType => castTimestampToString(input) + case FloatType | DoubleType => castFloatingTypeToString(input) + case BinaryType => castBinToString(input, options) + case _: DecimalType => GpuCastShims.CastDecimalToString(input, options.useDecimalPlainString) + case StructType(fields) => castStructToString(input, fields, options) + + case ArrayType(elementType, _) => + castArrayToString(input, elementType, options) + case from: MapType => + castMapToString(input, from, options) + case _ => + input.castTo(GpuColumnVector.getNonNestedRapidsType(StringType)) + } + private def castTimestampToString(input: ColumnView): ColumnVector = { // the complexity in this function is due to Spark's rules for truncating // the fractional part of the timestamp string. Any trailing decimal place @@ -816,8 +850,8 @@ object GpuCast { private def castArrayToString(input: ColumnView, elementType: DataType, - options: CastOptions): ColumnVector = { - + options: CastOptions, + castingBinaryData: Boolean = false): ColumnVector = { // We use square brackets for arrays regardless val (leftStr, rightStr) = ("[", "]") val emptyStr = "" @@ -833,7 +867,7 @@ object GpuCast { val concatenated = withResource(strChildContainsNull) { _ => withResource(input.replaceListChild(strChildContainsNull)) { - concatenateStringArrayElements(_, options) + concatenateStringArrayElements(_, options, castingBinaryData) } } @@ -863,13 +897,11 @@ object GpuCast { // cast the key column and value column to string columns val (strKey, strValue) = withResource(input.getChildColumnView(0)) { kvStructColumn => val strKey = withResource(kvStructColumn.getChildColumnView(0)) { keyColumn => - doCast( - keyColumn, from.keyType, StringType, options) + castToString(keyColumn, from.keyType, options) } - val strValue = closeOnExcept(strKey) {_ => + val strValue = closeOnExcept(strKey) { _ => withResource(kvStructColumn.getChildColumnView(1)) { valueColumn => - doCast( - valueColumn, from.valueType, StringType, options) + castToString(valueColumn, from.valueType, options) } } (strKey, strValue) @@ -950,7 +982,7 @@ object GpuCast { // 3.1+: {firstCol columns += leftColumn.incRefCount() withResource(input.getChildColumnView(0)) { firstColumnView => - columns += doCast(firstColumnView, inputSchema.head.dataType, StringType, options) + columns += castToString(firstColumnView, inputSchema.head.dataType, options) } for (nonFirstIndex <- 1 until numInputColumns) { withResource(input.getChildColumnView(nonFirstIndex)) { nonFirstColumnView => @@ -1024,7 +1056,7 @@ object GpuCast { if (ansiEnabled) { withResource(validBools.all()) { isAllBool => if (isAllBool.isValid && !isAllBool.getBoolean) { - throw new IllegalStateException(GpuCast.INVALID_INPUT_MESSAGE) + throw new IllegalStateException(INVALID_INPUT_MESSAGE) } } } @@ -1065,8 +1097,8 @@ object GpuCast { } } - /** This method does not close the `input` ColumnVector. */ - def convertDateOr( + /** This method does not close the `input` ColumnVector. */ + def convertDateOr( input: ColumnVector, regex: String, cudfFormat: String, @@ -1335,17 +1367,27 @@ object GpuCast { } } - private def castBinToString(input: ColumnView): ColumnVector = { - // Spark interprets the binary as UTF-8 bytes. So the layout of the - // binary and the layout of the string are the same. We just need to play some games with - // the CPU side metadata to make CUDF think it is a String. - // Sadly there is no simple CUDF API to do this, so for now we pull it apart and put - // it back together again - withResource(input.getChildColumnView(0)) { dataCol => - withResource(new ColumnView(DType.STRING, input.getRowCount, - Optional.of[java.lang.Long](input.getNullCount), - dataCol.getData, input.getValid, input.getOffsets)) { cv => - cv.copyToColumnVector() + private def castBinToString(input: ColumnView, options: CastOptions): ColumnVector = { + if (options.useHexFormatForBinary) { + withResource(input.getChildColumnView(0)) { dataCol => + withResource(dataCol.toHex()) { stringCol => + withResource(input.replaceListChild(stringCol)) { cv => + castArrayToString(cv, DataTypes.StringType, options, true) + } + } + } + } else { + // Spark interprets the binary as UTF-8 bytes. So the layout of the + // binary and the layout of the string are the same. We just need to play some games with + // the CPU side metadata to make CUDF think it is a String. + // Sadly there is no simple CUDF API to do this, so for now we pull it apart and put + // it back together again + withResource(input.getChildColumnView(0)) { dataCol => + withResource(new ColumnView(DType.STRING, input.getRowCount, + Optional.of[java.lang.Long](input.getNullCount), + dataCol.getData, input.getValid, input.getOffsets)) { cv => + cv.copyToColumnVector() + } } } } @@ -1433,7 +1475,7 @@ object GpuCast { if (ansiMode) { withResource(outOfBounds.any()) { isAny => if (isAny.isValid && isAny.getBoolean) { - throw RapidsErrorUtils.arithmeticOverflowError(GpuCast.OVERFLOW_MESSAGE) + throw RapidsErrorUtils.arithmeticOverflowError(OVERFLOW_MESSAGE) } } input.copyToColumnVector() @@ -1534,7 +1576,7 @@ object GpuCast { val cv = withResource(updatedMaxRet) { updatedMax => withResource(Seq(minSeconds, Long.MinValue).safeMap(Scalar.fromLong)) { case Seq(minSecondsS, longMinS) => - withResource(longInput.lessThan(minSecondsS)){ + withResource(longInput.lessThan(minSecondsS)) { _.ifElse(longMinS, updatedMax) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 049c0c5d230..b50029ab344 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -1476,7 +1476,7 @@ class CastChecks extends ExprChecks { } } - private[this] def tagBase(meta: RapidsMeta[_, _, _], willNotWork: String => Unit): Unit = { + protected def tagBase(meta: RapidsMeta[_, _, _], willNotWork: String => Unit): Unit = { val cast = meta.wrapped.asInstanceOf[UnaryExpression] val from = cast.child.dataType val to = cast.dataType @@ -1506,6 +1506,21 @@ class CastChecks extends ExprChecks { } } +/** + * This class is just restricting the 'to' dataType to a StringType in the CastChecks class + */ +class ToPrettyStringChecks extends CastChecks { + + override protected def tagBase(meta: RapidsMeta[_, _, _], willNotWork: String => Unit): Unit = { + val cast = meta.wrapped.asInstanceOf[UnaryExpression] + val from = cast.child.dataType + val to = StringType + if (!gpuCanCast(from, to)) { + willNotWork(s"${meta.wrapped.getClass.getSimpleName} from $from to $to is not supported") + } + } +} + object ExprChecks { /** * A check for an expression that only supports project. diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuCastShims.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuCastShims.scala index 3de41de76c2..ffff1d9e2c1 100644 --- a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuCastShims.scala +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuCastShims.scala @@ -38,7 +38,7 @@ import ai.rapids.cudf.{ColumnVector, ColumnView} import com.nvidia.spark.rapids.jni.CastStrings object GpuCastShims { - def CastDecimalToString(decimalInput: ColumnView, ansiMode: Boolean): ColumnVector = { + def CastDecimalToString(decimalInput: ColumnView, usePlainString: Boolean): ColumnVector = { CastStrings.fromDecimal(decimalInput) } } diff --git a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/GpuCastShims.scala b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/GpuCastShims.scala index 60fdd1e5e72..2493d81d9de 100644 --- a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/GpuCastShims.scala +++ b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/GpuCastShims.scala @@ -38,8 +38,8 @@ object GpuCastShims { case _ => throw new IllegalArgumentException(s"Unsupported type in cast $t") } - def CastDecimalToString(decimalInput: ColumnView, ansiMode: Boolean): ColumnVector = { - if (ansiMode) { + def CastDecimalToString(decimalInput: ColumnView, usePlainString: Boolean): ColumnVector = { + if (usePlainString) { // This is equivalent to // https://docs.oracle.com/javase/8/docs/api/java/math/BigDecimal.html#toPlainString-- // except there are a few corner cases, but they are really rare diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/GpuToPrettyString.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/GpuToPrettyString.scala new file mode 100644 index 00000000000..521e2e326b8 --- /dev/null +++ b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/GpuToPrettyString.scala @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import ai.rapids.cudf.Scalar +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm._ +import com.nvidia.spark.rapids.RapidsPluginImplicits._ + +import org.apache.spark.sql.catalyst.expressions.{Expression, TimeZoneAwareExpression} +import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.sql.vectorized.ColumnarBatch + +case class GpuToPrettyString(child: Expression, timeZoneId: Option[String] = None) + extends ShimUnaryExpression with GpuExpression with TimeZoneAwareExpression { + + override lazy val resolved: Boolean = childrenResolved + + override def dataType: DataType = StringType + + override def nullable: Boolean = false + + override def withTimeZone(timeZoneId: String): GpuToPrettyString = + copy(timeZoneId = Some(timeZoneId)) + + override def withNewChildInternal(newChild: Expression): Expression = + copy(child = newChild) + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + withResource(child.columnarEval(batch)) { evaluatedCol => + withResource(GpuCast.doCast( + evaluatedCol.getBase, + evaluatedCol.dataType(), + StringType, + CastOptions.TO_PRETTY_STRING_OPTIONS)) { possibleStringResult => + if (possibleStringResult.hasNulls) { + withResource(possibleStringResult.isNull) { isNull => + val stringColWithNulls = possibleStringResult + withResource(Scalar.fromString(CastOptions.TO_PRETTY_STRING_OPTIONS.nullString)) { + nullString => + GpuColumnVector.from(isNull.ifElse(nullString, stringColWithNulls), StringType) + } + } + } else { + GpuColumnVector.from(possibleStringResult.incRefCount(), StringType) + } + } + } + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index 0f4a4bf66d5..75a13143a94 100644 --- a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -21,13 +21,27 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ -import org.apache.spark.sql.catalyst.expressions.{Expression, PythonUDAF} +import org.apache.spark.sql.catalyst.expressions.{Expression, PythonUDAF, ToPrettyString} import org.apache.spark.sql.rapids.execution.python.GpuPythonUDAF +import org.apache.spark.sql.types.StringType object SparkShimImpl extends Spark340PlusShims { override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { val shimExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( + GpuOverrides.expr[ToPrettyString]("An internal expressions which is used to " + + "generate pretty string for all kinds of values", + new ToPrettyStringChecks(), + (toPrettyString, conf, p, r) => { + new CastExprMetaBase[ToPrettyString](toPrettyString, conf, p, r) { + + override val toType: StringType.type = StringType + + override def convertToGpu(child: Expression): GpuExpression = { + GpuToPrettyString(child) + } + } + }), GpuOverrides.expr[PythonUDAF]( "UDF run in an external python process. Does not actually run on the GPU, but " + "the transfer of data to/from it can be accelerated", diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala index 5190f4434a4..87b73e1cd10 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.funsuite.AnyFunSuite import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericRow} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -207,7 +208,12 @@ object GpuBatchUtilsSuite { externalRows.toArray } - private def createValueForType(i: Int, r: Random, dt: DataType, nullable: Boolean): Any = { + private def createValueForType( + i: Int, + r: Random, + dt: DataType, + nullable: Boolean, + isInternal: Boolean = true): Any = { dt match { case DataTypes.BooleanType => maybeNull(nullable, i, r.nextBoolean()) case DataTypes.ByteType => maybeNull(nullable, i, r.nextInt().toByte) @@ -225,30 +231,43 @@ object GpuBatchUtilsSuite { case dataType: DecimalType => val upperBound = (0 until dataType.precision).foldLeft(1L)((x, _) => x * 10) val unScaledValue = r.nextLong() % upperBound - maybeNull(nullable, i, Decimal(unScaledValue, dataType.precision, dataType.scale)) + val d = maybeNull(nullable, i, Decimal(unScaledValue, dataType.precision, dataType.scale)) + if (d != null && !isInternal) { + d.asInstanceOf[Decimal].toJavaBigDecimal + } else { + d + } case dataType@DataTypes.StringType => + val length = if (nullable) dataType.defaultSize * 2 else dataType.defaultSize + val utf8String = createUTF8String(length) + val string = if (!isInternal) utf8String.toString() else utf8String + if (nullable) { // since we want a deterministic test that compares the estimate with actual // usage we need to make sure the average length of strings is `dataType.defaultSize` if (i % 2 == 0) { null } else { - createUTF8String(dataType.defaultSize * 2) + string } } else { - createUTF8String(dataType.defaultSize) + string } case dataType@DataTypes.BinaryType => + val length = if (nullable) dataType.defaultSize * 2 else dataType.defaultSize + val bytes = r.nextString(length).getBytes + val binary = if (!isInternal) bytes.toSeq else bytes + if (nullable) { // since we want a deterministic test that compares the estimate with actual usage we // need to make sure the average length of binary values is `dataType.defaultSize` if (i % 2 == 0) { null } else { - r.nextString(dataType.defaultSize * 2).getBytes + binary } } else { - r.nextString(dataType.defaultSize).getBytes + binary } case ArrayType(elementType, containsNull) => if (nullable && i % 2 == 0) { @@ -256,33 +275,48 @@ object GpuBatchUtilsSuite { } else { val arrayValues = new mutable.ArrayBuffer[Any]() for (_ <- 0 to r.nextInt(10)) { - arrayValues.append(createValueForType(i, r, elementType, containsNull)) + arrayValues.append(createValueForType(i, r, elementType, containsNull, isInternal)) + } + val array = ArrayData.toArrayData(arrayValues) + if (!isInternal && array != null) { + array.toSeq(elementType) + } else { + array } - arrayValues.toArray.toSeq } - case MapType(_, _, valueContainsNull) => + case MapType(keyType, valueType, valueContainsNull) => if (nullable && i % 2 == 0) { null } else { - // TODO: add other types - val map = mutable.Map[String, String]() - for ( j <- 0 until 10) { + val map = mutable.Map[Any, Any]() + for (j <- 0 until 10) { if (valueContainsNull && j % 2 == 0) { - map += (createUTF8String(10).toString -> null) + map += (createValueForType(i, r, keyType, nullable = false, isInternal) -> null) } else { - map += (createUTF8String(10).toString -> createUTF8String(10).toString) + map += (createValueForType(i, r, keyType, nullable = false, isInternal) -> + createValueForType(i, r, valueType, nullable = false, isInternal)) } } - map + val mapData = ArrayBasedMapData(map) + if (mapData != null && !isInternal) { + ArrayBasedMapData.toScalaMap(mapData) + } else { + mapData + } } case StructType(fields) => - new GenericRow(fields.map(f => createValueForType(i, r, f.dataType, nullable))) - case unknown => throw new UnsupportedOperationException( + if (!isInternal) { + new GenericRow(fields.map(f => + createValueForType(i, r, f.dataType, nullable = f.nullable, isInternal = false))) + } else { + InternalRow(fields.map(f => createValueForType(i, r, f.dataType, nullable)): _*) + } + + case unknown => throw new UnsupportedOperationException( s"Type $unknown not supported") } } - private def createRowValues(i: Int, r: Random, fields: Array[StructField]) = { val values: Array[Any] = fields.map(field => { createValueForType(i, r, field.dataType, field.nullable) @@ -291,34 +325,9 @@ object GpuBatchUtilsSuite { } private def createExternalRowValues(i: Int, r: Random, fields: Array[StructField]): Array[Any] = { - val values: Array[Any] = fields.map(field => { - field.dataType match { - // Since it's using the createUTF8String method for InternalRow case, need to convert to - // String for Row case. - case StringType => - val utf8StringOrNull = createValueForType(i, r, field.dataType, field.nullable) - if (utf8StringOrNull != null) { - utf8StringOrNull.asInstanceOf[UTF8String].toString - } else { - utf8StringOrNull - } - case BinaryType => - val b = createValueForType(i, r, field.dataType, field.nullable) - if (b != null) { - b.asInstanceOf[Array[Byte]].toSeq - } else { - b - } - case DecimalType() => - val d = createValueForType(i, r, field.dataType, field.nullable) - if (d != null) { - d.asInstanceOf[Decimal].toJavaBigDecimal - } else { - d - } - case _ => createValueForType(i, r, field.dataType, field.nullable) - } - }) + val values: Array[Any] = fields.map { field => + createValueForType(i, r, field.dataType, field.nullable, isInternal = false) + } values } diff --git a/tests/src/test/spark350/scala/com/nvidia/spark/rapids/ToPrettyStringSuite.scala b/tests/src/test/spark350/scala/com/nvidia/spark/rapids/ToPrettyStringSuite.scala new file mode 100644 index 00000000000..b13ebaa8297 --- /dev/null +++ b/tests/src/test/spark350/scala/com/nvidia/spark/rapids/ToPrettyStringSuite.scala @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids + +import ai.rapids.cudf.ColumnVector +import com.nvidia.spark.rapids.Arm._ +import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder +import com.nvidia.spark.rapids.shims.GpuToPrettyString +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.sql.catalyst.expressions.{BoundReference, NamedExpression, ToPrettyString} +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, MapType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +class ToPrettyStringSuite extends GpuUnitTests { + + private def testDataType(dataType: DataType): Unit = { + val schema = (new StructType) + .add(StructField("a", dataType, true)) + val numRows = 100 + val inputRows = GpuBatchUtilsSuite.createRows(schema, numRows) + val cpuOutput: Array[String] = inputRows.map { + input => + ToPrettyString(BoundReference(0, dataType, true), Some("UTC")) + .eval(input).asInstanceOf[UTF8String].toString() + } + val child = GpuBoundReference(0, dataType, true)(NamedExpression.newExprId, "arg") + val gpuToPrettyStr = GpuToPrettyString(child, Some("UTC")) + + withResource(new GpuColumnarBatchBuilder(schema, numRows)) { batchBuilder => + val r2cConverter = new GpuRowToColumnConverter(schema) + inputRows.foreach(r2cConverter.convert(_, batchBuilder)) + withResource(batchBuilder.build(numRows)) { columnarBatch => + withResource(GpuColumnVector.from(ColumnVector.fromStrings(cpuOutput: _*), + DataTypes.StringType)) { expected => + checkEvaluation(gpuToPrettyStr, expected, columnarBatch) + } + } + } + } + + test("test show() on booleans") { + testDataType(DataTypes.BooleanType) + } + + test("test show() on bytes") { + testDataType(DataTypes.ByteType) + } + + test("test show() on shorts") { + testDataType(DataTypes.ShortType) + } + + test("test show() on ints") { + testDataType(DataTypes.IntegerType) + } + + test("test show() on longs") { + testDataType(DataTypes.LongType) + } + + test("test show() on floats") { + // This test is expected to fail until https://github.com/NVIDIA/spark-rapids/issues/4204 + // is resolved + assertThrows[TestFailedException](testDataType(DataTypes.FloatType)) + } + + test("test show() on doubles") { + // This test is expected to fail until https://github.com/NVIDIA/spark-rapids/issues/4204 + // is resolved + assertThrows[TestFailedException](testDataType(DataTypes.DoubleType)) + } + + test("test show() on strings") { + testDataType(DataTypes.StringType) + } + + test("test show() on decimals") { + testDataType(DecimalType(8,2)) + } + + test("test show() on binary") { + testDataType(DataTypes.BinaryType) + } + + test("test show() on array") { + testDataType(ArrayType(DataTypes.IntegerType)) + } + + test("test show() on map") { + testDataType(MapType(DataTypes.IntegerType, DataTypes.IntegerType)) + } + + test("test show() on struct") { + testDataType(StructType(Seq(StructField("a", DataTypes.IntegerType), + StructField("b", DataTypes.IntegerType), + StructField("c", DataTypes.IntegerType)))) + } +} From 6d8785a771fa0c4365186864c3708d6dc6f71ba6 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Fri, 29 Sep 2023 05:09:31 +0800 Subject: [PATCH 2/6] Update authorized user in blossom-ci whitelist [skip ci] (#9318) Signed-off-by: Cheng Xu --- .github/workflows/blossom-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/blossom-ci.yml b/.github/workflows/blossom-ci.yml index 83e102fca21..0320c686046 100644 --- a/.github/workflows/blossom-ci.yml +++ b/.github/workflows/blossom-ci.yml @@ -68,7 +68,7 @@ jobs: YanxuanLiu,\ cindyyuanjiang,\ thirtiseven,\ - winningsixnv,\ + winningsix,\ ', format('{0},', github.actor)) && github.event.comment.body == 'build' steps: - name: Check if comment is issued by authorized person From 1399d80889e71906e3bb71e3be9034ee13b3e003 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 28 Sep 2023 17:54:29 -0600 Subject: [PATCH 3/6] Temporarily skip failing tests (#9335) Signed-off-by: Andy Grove --- integration_tests/src/main/python/csv_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 5e5234d7033..f07f1213cd1 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -561,6 +561,7 @@ def test_csv_read_count(spark_tmp_path): conf = {'spark.rapids.sql.explain': 'ALL'}) @allow_non_gpu('FileSourceScanExec', 'ProjectExec', 'CollectLimitExec', 'DeserializeToObjectExec') +@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9325") @pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+') @pytest.mark.parametrize('date_format', csv_supported_date_formats) @pytest.mark.parametrize('ts_part', csv_supported_ts_parts) @@ -569,6 +570,7 @@ def test_csv_infer_schema_timestamp_ntz_v1(spark_tmp_path, date_format, ts_part, csv_infer_schema_timestamp_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, 'csv', 'FileSourceScanExec') @allow_non_gpu('BatchScanExec', 'FileSourceScanExec', 'ProjectExec', 'CollectLimitExec', 'DeserializeToObjectExec') +@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9325") @pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+') @pytest.mark.parametrize('date_format', csv_supported_date_formats) @pytest.mark.parametrize('ts_part', csv_supported_ts_parts) From 7bffb165559b2a7ce20ed1b94d3fd5f1ba06c997 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Fri, 29 Sep 2023 09:35:53 +0800 Subject: [PATCH 4/6] Support `format_number` (#9281) * wip * wip * support format_number for integral and decimal type Signed-off-by: Haoyang Li * support double/float normal cases * support scientific notation double/float with positive exp * support scientific notation double/float with negative exp * bug fixed and clean up * refactor and memory leak fix * Handle resource pair as a whole * fix more memory leak * address some comments * add a config to control float/double enabling * fixed a bug in neg exp get parts * fixed another bug and add float scala test * add some comments and use lstrip to remove neg sign * fix memory leaks Signed-off-by: Haoyang Li * minor changes Signed-off-by: Haoyang Li * fallback decimal with high scale Signed-off-by: Haoyang Li * Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala Co-authored-by: Liangcai Li --------- Signed-off-by: Haoyang Li Co-authored-by: Liangcai Li --- .../advanced_configs.md | 2 + docs/compatibility.md | 4 + docs/supported_ops.md | 150 ++-- .../src/main/python/string_test.py | 54 ++ .../nvidia/spark/rapids/GpuOverrides.scala | 29 + .../com/nvidia/spark/rapids/RapidsConf.scala | 8 + .../spark/sql/rapids/stringFunctions.scala | 665 +++++++++++++++++- .../spark/rapids/StringFunctionSuite.scala | 29 + tools/generated_files/operatorsScore.csv | 1 + tools/generated_files/supportedExprs.csv | 3 + 10 files changed, 899 insertions(+), 46 deletions(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 1b2216c59e7..1b3ac623bbd 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -109,6 +109,7 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.format.parquet.reader.type|Sets the Parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO|Runtime spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true|Runtime spark.rapids.sql.format.parquet.writer.int96.enabled|When set to false, disables accelerated parquet write if the spark.sql.parquet.outputTimestampType is set to INT96|true|Runtime +spark.rapids.sql.formatNumberFloat.enabled|format_number with floating point types on the GPU returns results that have a different precision than the default results of Spark.|false|Runtime spark.rapids.sql.hasExtendedYearValues|Spark 3.2.0+ extended parsing of years in dates and timestamps to support the full range of possible values. Prior to this it was limited to a positive 4 digit year. The Accelerator does not support the extended range yet. This config indicates if your data includes this extended range or not, or if you don't care about getting the correct values on values with the extended range.|true|Runtime spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false|Runtime spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows.|true|Runtime @@ -234,6 +235,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Expm1|`expm1`|Euler's number e raised to a power minus 1|true|None| spark.rapids.sql.expression.Flatten|`flatten`|Creates a single array from an array of arrays|true|None| spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None| +spark.rapids.sql.expression.FormatNumber|`format_number`|Formats the number x like '#,###,###.##', rounded to d decimal places.|true|None| spark.rapids.sql.expression.FromUTCTimestamp|`from_utc_timestamp`|Render the input UTC timestamp in the input timezone|true|None| spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None| spark.rapids.sql.expression.GetArrayItem| |Gets the field at `ordinal` in the Array|true|None| diff --git a/docs/compatibility.md b/docs/compatibility.md index e72415b634f..de4ee77496e 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -664,6 +664,10 @@ The GPU will use different precision than Java's toString method when converting types to strings. The GPU uses a lowercase `e` prefix for an exponent while Spark uses uppercase `E`. As a result the computed string can differ from the default behavior in Spark. +The `format_number` function will retain 10 digits of precision for the GPU when the input is a floating +point number, but Spark will retain up to 17 digits of precision, i.e. `format_number(1234567890.1234567890, 5)` +will return `1,234,567,890.00000` on the GPU and `1,234,567,890.12346` on the CPU. To enable this on the GPU, set [`spark.rapids.sql.formatNumberFloat.enabled`](additional-functionality/advanced_configs.md#sql.formatNumberFloat.enabled) to `true`. + Starting from 22.06 this conf is enabled by default, to disable this operation on the GPU, set [`spark.rapids.sql.castFloatToString.enabled`](additional-functionality/advanced_configs.md#sql.castFloatToString.enabled) to `false`. diff --git a/docs/supported_ops.md b/docs/supported_ops.md index ee22d2de6c3..4e2be930b49 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -6461,23 +6461,23 @@ are limited. -FromUTCTimestamp -`from_utc_timestamp` -Render the input UTC timestamp in the input timezone +FormatNumber +`format_number` +Formats the number x like '#,###,###.##', rounded to d decimal places. None project -timestamp - - - - - - +x +S +S +S +S +S +S -PS
UTC is only supported TZ for TIMESTAMP
+S @@ -6487,17 +6487,17 @@ are limited. -timezone - +d +PS
Literal value only
-PS
Only timezones equivalent to UTC are supported
+NS @@ -6517,8 +6517,8 @@ are limited. -PS
UTC is only supported TZ for TIMESTAMP
+S @@ -6555,6 +6555,74 @@ are limited. UDT +FromUTCTimestamp +`from_utc_timestamp` +Render the input UTC timestamp in the input timezone +None +project +timestamp + + + + + + + + +PS
UTC is only supported TZ for TIMESTAMP
+ + + + + + + + + + + +timezone + + + + + + + + + +PS
Only timezones equivalent to UTC are supported
+ + + + + + + + + + +result + + + + + + + + +PS
UTC is only supported TZ for TIMESTAMP
+ + + + + + + + + + + FromUnixTime `from_unixtime` Get the string from a unix timestamp @@ -6874,6 +6942,32 @@ are limited. NS +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + GetStructField Gets the named field of the struct @@ -6921,32 +7015,6 @@ are limited. NS -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - GetTimestamp Gets timestamps from strings using given pattern. diff --git a/integration_tests/src/main/python/string_test.py b/integration_tests/src/main/python/string_test.py index 87fc168928c..316a427db94 100644 --- a/integration_tests/src/main/python/string_test.py +++ b/integration_tests/src/main/python/string_test.py @@ -797,3 +797,57 @@ def test_conv_dec_to_from_hex(from_base, to_base, pattern): lambda spark: unary_op_df(spark, gen).select('a', f.conv(f.col('a'), from_base, to_base)), conf={'spark.rapids.sql.expression.Conv': True} ) + +format_number_gens = integral_gens + [DecimalGen(precision=7, scale=7), DecimalGen(precision=18, scale=0), + DecimalGen(precision=18, scale=3), DecimalGen(precision=36, scale=5), + DecimalGen(precision=36, scale=-5), DecimalGen(precision=38, scale=10), + DecimalGen(precision=38, scale=-10), + DecimalGen(precision=38, scale=30, special_cases=[Decimal('0.000125')]), + DecimalGen(precision=38, scale=32, special_cases=[Decimal('0.000125')])] + +@pytest.mark.parametrize('data_gen', format_number_gens, ids=idfn) +def test_format_number_supported(data_gen): + gen = data_gen + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, gen).selectExpr( + 'format_number(a, -2)', + 'format_number(a, 0)', + 'format_number(a, 1)', + 'format_number(a, 5)', + 'format_number(a, 10)', + 'format_number(a, 100)') + ) + +float_format_number_conf = {'spark.rapids.sql.formatNumberFloat.enabled': 'true'} +format_number_float_gens = [DoubleGen(min_exp=-300, max_exp=15)] + +@pytest.mark.parametrize('data_gen', format_number_float_gens, ids=idfn) +def test_format_number_float_limited(data_gen): + gen = data_gen + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, gen).selectExpr( + 'format_number(a, 5)'), + conf = float_format_number_conf + ) + +# format_number for float/double is disabled by default due to compatibility issue +# GPU will generate result with less precision than CPU +@allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) +def test_format_number_float_fallback(data_gen): + assert_gpu_fallback_collect( + lambda spark: unary_op_df(spark, data_gen).selectExpr( + 'format_number(a, 5)'), + 'FormatNumber' + ) + +# fallback due to https://github.com/NVIDIA/spark-rapids/issues/9309 +@allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) +def test_format_number_decimal_big_scale_fallback(data_gen): + data_gen = DecimalGen(precision=38, scale=37) + assert_gpu_fallback_collect( + lambda spark: unary_op_df(spark, data_gen).selectExpr( + 'format_number(a, 5)'), + 'FormatNumber' + ) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a5bda29670f..bb8668cbb9b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3085,6 +3085,35 @@ object GpuOverrides extends Logging { |For instance decimal strings not longer than 18 characters / hexadecimal strings |not longer than 15 characters disregarding the sign cannot cause an overflow. """.stripMargin.replaceAll("\n", " ")), + expr[FormatNumber]( + "Formats the number x like '#,###,###.##', rounded to d decimal places.", + ExprChecks.binaryProject(TypeSig.STRING, TypeSig.STRING, + ("x", TypeSig.gpuNumeric, TypeSig.cpuNumeric), + ("d", TypeSig.lit(TypeEnum.INT), TypeSig.INT+TypeSig.STRING)), + (in, conf, p, r) => new BinaryExprMeta[FormatNumber](in, conf, p, r) { + override def tagExprForGpu(): Unit = { + in.children.head.dataType match { + case _: FloatType | DoubleType => { + if (!conf.isFloatFormatNumberEnabled) { + willNotWorkOnGpu("format_number with floating point types on the GPU returns " + + "results that have a different precision than the default results of Spark. " + + "To enable this operation on the GPU, set" + + s" ${RapidsConf.ENABLE_FLOAT_FORMAT_NUMBER} to true.") + } + } + case dt: DecimalType => { + if (dt.scale > 32) { + willNotWorkOnGpu("format_number will generate results mismatched from Spark " + + "when the scale is larger than 32.") + } + } + case _ => + } + } + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuFormatNumber(lhs, rhs) + } + ), expr[MapConcat]( "Returns the union of all the given maps", ExprChecks.projectOnly(TypeSig.MAP.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index adddeecdc61..f3edc99a53f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -716,6 +716,12 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val ENABLE_FLOAT_FORMAT_NUMBER = conf("spark.rapids.sql.formatNumberFloat.enabled") + .doc("format_number with floating point types on the GPU returns results that have " + + "a different precision than the default results of Spark.") + .booleanConf + .createWithDefault(false) + val ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES = conf("spark.rapids.sql.castFloatToIntegralTypes.enabled") .doc("Casting from floating point types to integral types on the GPU supports a " + @@ -2332,6 +2338,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCastFloatToStringEnabled: Boolean = get(ENABLE_CAST_FLOAT_TO_STRING) + lazy val isFloatFormatNumberEnabled: Boolean = get(ENABLE_FLOAT_FORMAT_NUMBER) + lazy val isCastStringToTimestampEnabled: Boolean = get(ENABLE_CAST_STRING_TO_TIMESTAMP) lazy val hasExtendedYearValues: Boolean = get(HAS_EXTENDED_YEAR_VALUES) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala index e2c55b32647..febbf75ba58 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.rapids import java.nio.charset.Charset -import java.util.Optional +import java.text.DecimalFormatSymbols +import java.util.{Locale, Optional} import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{BinaryOp, BinaryOperable, CaptureGroups, ColumnVector, ColumnView, DType, PadSide, RegexProgram, Scalar, Table} +import ai.rapids.cudf.{BinaryOp, BinaryOperable, CaptureGroups, ColumnVector, ColumnView, DType, PadSide, RegexProgram, RoundMode, Scalar, Table} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -1989,8 +1990,6 @@ case class GpuStringInstr(str: Expression, substr: Expression) } } - - class GpuConvMeta( expr: Conv, conf: RapidsConf, @@ -2079,4 +2078,660 @@ case class GpuConv(num: Expression, fromBase: Expression, toBase: Expression) override def third: Expression = toBase override def dataType: DataType = StringType -} \ No newline at end of file +} + +case class GpuFormatNumber(x: Expression, d: Expression) + extends GpuBinaryExpression with ExpectsInputTypes with NullIntolerant { + + override def left: Expression = x + override def right: Expression = d + override def dataType: DataType = StringType + override def nullable: Boolean = true + override def inputTypes: Seq[AbstractDataType] = Seq(NumericType, IntegerType) + + private def removeNegSign(cv: ColumnVector): ColumnVector = { + withResource(Scalar.fromString("-")) { negativeSign => + cv.lstrip(negativeSign) + } + } + + private def getZeroCv(size: Int): ColumnVector = { + withResource(Scalar.fromString("0")) { zero => + ColumnVector.fromScalar(zero, size) + } + } + + private def handleDoublePosExp(cv: ColumnVector, intPart: ColumnVector, decPart: ColumnVector, + exp: ColumnVector, d: Int): (ColumnVector, ColumnVector) = { + // handle cases when exp is positive + // append "0" * zerosNum after end of strings, zerosNum = exp - decLen + d + val expSubDecLen = withResource(decPart.getCharLengths) { decLen => + exp.sub(decLen) + } + val zerosNum = withResource(expSubDecLen) { _ => + withResource(Scalar.fromInt(d)) { dScalar => + expSubDecLen.add(dScalar) + } + } + val zeroCv = withResource(Scalar.fromString("0")) { zero => + ColumnVector.fromScalar(zero, cv.getRowCount.toInt) + } + val zeros = withResource(zerosNum) { _ => + withResource(zeroCv) { _ => + zeroCv.repeatStrings(zerosNum) + } + } + + val intAndDecParts = withResource(zeros) { _ => + ColumnVector.stringConcatenate(Array(intPart, decPart, zeros)) + } + // split intAndDecParts to intPart and decPart with substrings, start = len(intAndDecParts) - d + closeOnExcept(ArrayBuffer.empty[ColumnVector]) { resourceArray => + val (intPartPosExp, decPartPosExpTemp) = withResource(intAndDecParts) { _ => + val (start, end) = withResource(intAndDecParts.getCharLengths) { partsLength => + (withResource(Scalar.fromInt(d)) { d => + partsLength.sub(d) + }, partsLength.incRefCount()) + } + withResource(start) { _ => + withResource(end) { _ => + val zeroIntCv = withResource(Scalar.fromInt(0)) { zero => + ColumnVector.fromScalar(zero, cv.getRowCount.toInt) + } + val intPart = withResource(zeroIntCv) { _ => + intAndDecParts.substring(zeroIntCv, start) + } + val decPart = closeOnExcept(intPart) { _ => + intAndDecParts.substring(start, end) + } + (intPart, decPart) + } + } + } + resourceArray += intPartPosExp + // if decLen - exp > d, convert to float/double, round, convert back to string + // decLen's max value is 9, abs(expPart)'s min value is 7, so it is possible only when d < 2 + // because d is small, we can use double to do the rounding + val decPartPosExp = if (0 < d && d < 2) { + val pointCv = closeOnExcept(decPartPosExpTemp) { _ => + withResource(Scalar.fromString(".")) { point => + ColumnVector.fromScalar(point, cv.getRowCount.toInt) + } + } + val withPoint = withResource(decPartPosExpTemp) { _ => + withResource(pointCv) { pointCv => + ColumnVector.stringConcatenate(Array(pointCv, decPartPosExpTemp)) + } + } + val decimalTypeRounding = DType.create(DType.DTypeEnum.DECIMAL128, -9) + val withPointDecimal = withResource(withPoint) { _ => + withResource(withPoint.castTo(decimalTypeRounding)) { decimal => + decimal.round(d, RoundMode.HALF_EVEN) + } + } + val roundedString = withResource(withPointDecimal) { _ => + withPointDecimal.castTo(DType.STRING) + } + withResource(roundedString) { _ => + withResource(roundedString.stringSplit(".", 2)) { splited => + splited.getColumn(1).incRefCount() + } + } + } else { + decPartPosExpTemp + } + (intPartPosExp, decPartPosExp) + } + } + + private def handleDoubleNegExp(cv: ColumnVector, intPart: ColumnVector, decPart: ColumnVector, + exp: ColumnVector, d: Int): (ColumnVector, ColumnVector) = { + // handle cases when exp is negative + // "0." + (- exp - 1) * "0" + intPart + decPart + // if -1 - d <= exp and decLen - exp > d, need to rounding + val cond1 = withResource(Scalar.fromInt(-1 - d)) { negOneSubD => + exp.greaterOrEqualTo(negOneSubD) + } + val cond2 = closeOnExcept(cond1) { _ => + val decLenSubExp = withResource(decPart.getCharLengths) { decLen => + decLen.sub(exp) + } + withResource(decLenSubExp) { _ => + withResource(Scalar.fromInt(d)) { d => + decLenSubExp.greaterThan(d) + } + } + } + val needRounding = withResource(cond1) { _ => + withResource(cond2) { _ => + cond1.and(cond2) + } + } + val anyNeedRounding = withResource(needRounding) { _ => + withResource(needRounding.any()) { any => + any.isValid && any.getBoolean + } + } + anyNeedRounding match { + case false => + // a shortcut when no need to rounding + // "0." + (- exp - 1) * "0" + intPart + decPart + withResource(getZeroCv(cv.getRowCount.toInt)) { zeroCv => + val expSubOne = withResource(Scalar.fromInt(-1)) { negOne => + negOne.sub(exp) + } + val addingZeros = withResource(expSubOne) { _ => + zeroCv.repeatStrings(expSubOne) + } + val decPartNegExp = withResource(addingZeros) { _ => + ColumnVector.stringConcatenate(Array(addingZeros, intPart, decPart)) + } + val decPartNegSubstr = withResource(decPartNegExp) { _ => + decPartNegExp.substring(0, d) + } + (zeroCv.incRefCount(), decPartNegSubstr) + } + case true => + // if -exp <= d + 1 && -exp + decLen + 1 > d, need to rounding + // dec will be round to (d + exp + 1) digits + val dExpOne = withResource(Scalar.fromInt(d + 1)) { dExpOne => + exp.add(dExpOne) + } + // To do a dataframe operation, add some zeros before + // (intPat + decPart) and round them to 10 + // zerosNumRounding = (10 - (d + exp + 1)) . max(0) + val tenSubDExpOne = withResource(dExpOne) { _ => + withResource(Scalar.fromInt(10)) { ten => + ten.sub(dExpOne) + } + } + val zerosNumRounding = withResource(tenSubDExpOne) { _ => + withResource(Scalar.fromInt(0)) { zero => + withResource(tenSubDExpOne.lessThan(zero)) { lessThanZero => + lessThanZero.ifElse(zero, tenSubDExpOne) + } + } + } + val leadingZeros = withResource(zerosNumRounding) { _ => + withResource(getZeroCv(cv.getRowCount.toInt)) { zeroCv => + zeroCv.repeatStrings(zerosNumRounding) + } + } + val numberToRoundStr = withResource(leadingZeros) { _ => + val zeroPointCv = withResource(Scalar.fromString("0.")) { point => + ColumnVector.fromScalar(point, cv.getRowCount.toInt) + } + withResource(zeroPointCv) { _ => + ColumnVector.stringConcatenate(Array(zeroPointCv, leadingZeros, intPart, decPart)) + } + } + // use a decimal type to round, set scale to -20 to keep all digits + val decimalTypeRounding = DType.create(DType.DTypeEnum.DECIMAL128, -20) + val numberToRound = withResource(numberToRoundStr) { _ => + numberToRoundStr.castTo(decimalTypeRounding) + } + // rounding 10 digits + val rounded = withResource(numberToRound) { _ => + numberToRound.round(10, RoundMode.HALF_EVEN) + } + val roundedStr = withResource(rounded) { _ => + rounded.castTo(DType.STRING) + } + // substr 2 to remove "0." + val roundedDecPart = withResource(roundedStr) { _ => + roundedStr.substring(2) + } + val decPartStriped = withResource(roundedDecPart) { _ => + withResource(Scalar.fromString("0")) { zero => + roundedDecPart.lstrip(zero) + } + } + val decPartNegExp = withResource(decPartStriped) { _ => + decPartStriped.pad(d, PadSide.LEFT, "0") + } + closeOnExcept(decPartNegExp) { _ => + (getZeroCv(cv.getRowCount.toInt), decPartNegExp) + } + } + } + + private def normalDoubleSplit(cv: ColumnVector, d: Int): (ColumnVector, ColumnVector) = { + val roundingScale = d.min(10) // cuDF will keep at most 9 digits after decimal point + val roundedStr = withResource(cv.round(roundingScale, RoundMode.HALF_EVEN)) { rounded => + rounded.castTo(DType.STRING) + } + val (intPart, decPart) = withResource(roundedStr) { _ => + withResource(roundedStr.stringSplit(".", 2)) { intAndDec => + (intAndDec.getColumn(0).incRefCount(), intAndDec.getColumn(1).incRefCount()) + } + } + val intPartNoNeg = closeOnExcept(decPart) { _ => + withResource(intPart) { _ => + removeNegSign(intPart) + } + } + val decPartPad = closeOnExcept(intPartNoNeg) { _ => + withResource(decPart) { _ => + decPart.pad(d, PadSide.RIGHT, "0") + } + } + // a workaround for cuDF float to string, e.g. 12.3 => "12.30000019" instead of "12.3" + val decPartSubstr = closeOnExcept(intPartNoNeg) { _ => + withResource(decPartPad) { _ => + decPartPad.substring(0, d) + } + } + (intPartNoNeg, decPartSubstr) + } + + private def expDoubleSplit(cv: ColumnVector, d: Int): (ColumnVector, ColumnVector) = { + // handle special case: 1.234567e+7 or 1.234567e-6 + // get three parts first: + val replaceDelimToE = withResource(Scalar.fromString("e")) { e => + withResource(Scalar.fromString(".")) { p => + cv.stringReplace(e, p) + } + } + // get three parts: 1.234567e+7 -> 1, 234567, +7 + val (intPartSign, decPart, expPart) = withResource(replaceDelimToE) { _ => + withResource(replaceDelimToE.stringSplit(".", 3)) { intDecExp => + (intDecExp.getColumn(0).incRefCount(), + intDecExp.getColumn(1).incRefCount(), + intDecExp.getColumn(2).incRefCount()) + } + } + // sign will be handled later, use string-based solution instead abs to avoid overfolw + val intPart = closeOnExcept(decPart) { _ => + closeOnExcept(expPart) { _ => + withResource(intPartSign) { _ => + removeNegSign(intPartSign) + } + } + } + val exp = closeOnExcept(decPart) { _ => + closeOnExcept(intPart) { _ => + withResource(expPart) { _ => + expPart.castTo(DType.INT32) + } + } + } + // handle positive and negative exp separately + val (intPartPosExp, decPartPosExp) = closeOnExcept(intPart) { _ => + closeOnExcept(decPart) { _ => + closeOnExcept(exp) { _ => + handleDoublePosExp(cv, intPart, decPart, exp, d) + } + } + } + withResource(ArrayBuffer.empty[ColumnVector]) { resourceArray => + val (intPartNegExp, decPartNegExp) = withResource(intPart) { _ => + withResource(decPart) { _ => + closeOnExcept(exp) { _ => + handleDoubleNegExp(cv, intPart, decPart, exp, d) + } + } + } + resourceArray += intPartNegExp + resourceArray += decPartNegExp + val expPos = withResource(exp) { _ => + withResource(Scalar.fromInt(0)) { zero => + exp.greaterOrEqualTo(zero) + } + } + // combine results + withResource(expPos) { _ => + val intPartExp = withResource(intPartPosExp) { _ => + expPos.ifElse(intPartPosExp, intPartNegExp) + } + val decPartExp = closeOnExcept(intPartExp) { _ => + withResource(decPartPosExp) { _ => + expPos.ifElse(decPartPosExp, decPartNegExp) + } + } + (intPartExp, decPartExp) + } + } + } + + private def getPartsFromDouble(cv: ColumnVector, d: Int): (ColumnVector, ColumnVector) = { + // handle normal case: 1234.567 + closeOnExcept(ArrayBuffer.empty[ColumnVector]) { resourceArray => + val (normalInt, normalDec) = normalDoubleSplit(cv, d) + resourceArray += normalInt + resourceArray += normalDec + // first check special case + val cvStr = withResource(cv.castTo(DType.STRING)) { cvStr => + cvStr.incRefCount() + } + val containsE = closeOnExcept(cvStr) { _ => + withResource(Scalar.fromString("e")) { e => + cvStr.stringContains(e) + } + } + withResource(containsE) { _ => + // if no special case, return normal case directly + val anyExp = closeOnExcept(cvStr) { _ => + withResource(containsE.any()) { any => + any.isValid && any.getBoolean + } + } + anyExp match { + case false => { + cvStr.safeClose() + (normalInt, normalDec) + } + case true => { + val noEReplaced = withResource(cvStr) { _ => + // replace normal case with 0e0 to avoid error + withResource(Scalar.fromString("0.0e0")) { default => + containsE.ifElse(cvStr, default) + } + } + // handle scientific notation case: + val (expInt, expDec) = withResource(noEReplaced) { _ => + expDoubleSplit(noEReplaced, d) + } + // combine results + // remove normalInt from resourceArray + resourceArray.remove(0) + val intPart = closeOnExcept(expDec) { _ => + withResource(expInt) { _ => + withResource(normalInt) { _ => + containsE.ifElse(expInt, normalInt) + } + } + } + resourceArray.clear() + resourceArray += intPart + val decPart = withResource(expDec) { _ => + withResource(normalDec) { _ => + containsE.ifElse(expDec, normalDec) + } + } + (intPart, decPart) + } + } + } + } + } + + private def getPartsFromDecimal(cv: ColumnVector, d: Int, scale: Int): + (ColumnVector, ColumnVector) = { + // prevent d too large to fit in decimalType + val roundingScale = scale.min(d) + // append zeros to the end of decPart, zerosNum = d - scale + // if d <= scale, no need to append zeros, if scale < 0, append d zeros + val appendZeroNum = (d - scale).max(0).min(d) + val (intPart, decTemp) = if (roundingScale <= 0) { + withResource(ArrayBuffer.empty[ColumnVector]) { resourceArray => + val intPart = withResource(cv.round(roundingScale, RoundMode.HALF_EVEN)) { rounded => + rounded.castTo(DType.STRING) + } + resourceArray += intPart + // if intString starts with 0, it must be "00000...", replace it with "0" + val (isZero, zeroCv) = withResource(Scalar.fromString("0")) { zero => + withResource(intPart.startsWith(zero)) { isZero => + (isZero.incRefCount(), ColumnVector.fromScalar(zero, cv.getRowCount.toInt)) + } + } + val intPartZeroHandled = withResource(isZero) { isZero => + withResource(zeroCv) { zeroCv => + isZero.ifElse(zeroCv, intPart) + } + } + resourceArray += intPartZeroHandled + // a temp decPart is empty before appending zeros + val decPart = withResource(Scalar.fromString("")) { emptyString => + ColumnVector.fromScalar(emptyString, cv.getRowCount.toInt) + } + resourceArray += decPart + (intPartZeroHandled.incRefCount(), decPart.incRefCount()) + } + } else { + withResource(cv.round(roundingScale, RoundMode.HALF_EVEN)) { rounded => + withResource(rounded.castTo(DType.STRING)) { roundedStr => + withResource(roundedStr.stringSplit(".", 2)) { intAndDec => + (intAndDec.getColumn(0).incRefCount(), intAndDec.getColumn(1).incRefCount()) + } + } + } + } + closeOnExcept(ArrayBuffer.empty[ColumnVector]) { resourceArray => + // remove negative sign from intPart, sign will be handled later + val intPartPos = closeOnExcept(decTemp) { _ => + withResource(intPart) { _ => + removeNegSign(intPart) + } + } + resourceArray += intPartPos + // append zeros + val appendZeros = "0" * appendZeroNum + val appendZerosCv = closeOnExcept(decTemp) { _ => + withResource(Scalar.fromString(appendZeros)) { zeroString => + ColumnVector.fromScalar(zeroString, cv.getRowCount.toInt) + } + } + val decPart = withResource(decTemp) { _ => + withResource(appendZerosCv) { _ => + ColumnVector.stringConcatenate(Array(decTemp, appendZerosCv)) + } + } + (intPartPos, decPart) + } + } + + private def getParts(cv: ColumnVector, d: Int): (ColumnVector, ColumnVector) = { + // get int part and dec part from a column vector, int part will be set to positive + x.dataType match { + case FloatType | DoubleType => { + getPartsFromDouble(cv, d) + } + case DecimalType.Fixed(_, scale) => { + getPartsFromDecimal(cv, d, scale) + } + case IntegerType | LongType | ShortType | ByteType => { + val intPartPos = withResource(cv.castTo(DType.STRING)) { intPart => + removeNegSign(intPart) + } + // dec part is all zeros + val dzeros = "0" * d + val decPart = closeOnExcept(intPartPos) { _ => + withResource(Scalar.fromString(dzeros)) { zeroString => + ColumnVector.fromScalar(zeroString, cv.getRowCount.toInt) + } + } + (intPartPos, decPart) + } + case _ => { + throw new UnsupportedOperationException(s"format_number doesn't support type ${x.dataType}") + } + } + } + + private def negativeCheck(cv: ColumnVector): ColumnVector = { + withResource(cv.castTo(DType.STRING)) { cvStr => + withResource(Scalar.fromString("-")) { negativeSign => + cvStr.startsWith(negativeSign) + } + } + } + + private def removeExtraCommas(str: ColumnVector): ColumnVector = { + withResource(Scalar.fromString(",")) { comma => + str.rstrip(comma) + } + } + + private def addCommas(str: ColumnVector): ColumnVector = { + val maxstrlen = withResource(str.getCharLengths()) { strlen => + withResource(strlen.max()) { maxlen => + maxlen.isValid match { + case true => maxlen.getInt + case false => 0 + } + } + } + val sepCol = withResource(Scalar.fromString(",")) { sep => + ColumnVector.fromScalar(sep, str.getRowCount.toInt) + } + val substrs = closeOnExcept(sepCol) { _ => + (0 until maxstrlen by 3).safeMap { i => + str.substring(i, i + 3).asInstanceOf[ColumnView] + }.toArray + } + withResource(substrs) { _ => + withResource(sepCol) { _ => + withResource(ColumnVector.stringConcatenate(substrs, sepCol)) { res => + removeExtraCommas(res) + } + } + } + } + + private def handleInfAndNan(cv: ColumnVector, res: ColumnVector): ColumnVector = { + // replace inf and nan with infSymbol and nanSymbol in res according to cv + val symbols = DecimalFormatSymbols.getInstance(Locale.US) + val nanSymbol = symbols.getNaN + val infSymbol = symbols.getInfinity + val negInfSymbol = "-" + infSymbol + val handleNan = withResource(cv.isNan()) { isNan => + withResource(Scalar.fromString(nanSymbol)) { nan => + isNan.ifElse(nan, res) + } + } + val isInf = closeOnExcept(handleNan) { _ => + x.dataType match { + case DoubleType => { + withResource(Scalar.fromDouble(Double.PositiveInfinity)) { inf => + cv.equalTo(inf) + } + } + case FloatType => { + withResource(Scalar.fromFloat(Float.PositiveInfinity)) { inf => + cv.equalTo(inf) + } + } + } + } + val handleInf = withResource(isInf) { _ => + withResource(handleNan) { _ => + withResource(Scalar.fromString(infSymbol)) { inf => + isInf.ifElse(inf, handleNan) + } + } + } + val isNegInf = closeOnExcept(handleInf) { _ => + x.dataType match { + case DoubleType => { + withResource(Scalar.fromDouble(Double.NegativeInfinity)) { negInf => + cv.equalTo(negInf) + } + } + case FloatType => { + withResource(Scalar.fromFloat(Float.NegativeInfinity)) { negInf => + cv.equalTo(negInf) + } + } + } + } + val handleNegInf = withResource(isNegInf) { _ => + withResource(Scalar.fromString(negInfSymbol)) { negInf => + withResource(handleInf) { _ => + isNegInf.ifElse(negInf, handleInf) + } + } + } + handleNegInf + } + + override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { + // get int d from rhs + if (!rhs.isValid || rhs.getValue.asInstanceOf[Int] < 0) { + return GpuColumnVector.columnVectorFromNull(lhs.getRowCount.toInt, StringType) + } + val d = rhs.getValue.asInstanceOf[Int] + val (integerPart, decimalPart) = getParts(lhs.getBase, d) + // reverse integer part for adding commas + val resWithDecimalPart = withResource(decimalPart) { _ => + val reversedIntegerPart = withResource(integerPart) { intPart => + intPart.reverseStringsOrLists() + } + val reversedIntegerPartWithCommas = withResource(reversedIntegerPart) { _ => + addCommas(reversedIntegerPart) + } + // reverse result back + val reverseBack = withResource(reversedIntegerPartWithCommas) { r => + r.reverseStringsOrLists() + } + d match { + case 0 => { + // d == 0, only return integer part + reverseBack + } + case _ => { + // d > 0, append decimal part to result + withResource(reverseBack) { _ => + withResource(Scalar.fromString(".")) { point => + withResource(Scalar.fromString("")) { empty => + ColumnVector.stringConcatenate(point, empty, Array(reverseBack, decimalPart)) + } + } + } + } + } + } + // add negative sign back + val negCv = withResource(Scalar.fromString("-")) { negativeSign => + ColumnVector.fromScalar(negativeSign, lhs.getRowCount.toInt) + } + val formated = withResource(resWithDecimalPart) { _ => + val resWithNeg = withResource(negCv) { _ => + ColumnVector.stringConcatenate(Array(negCv, resWithDecimalPart)) + } + withResource(negativeCheck(lhs.getBase)) { isNegative => + withResource(resWithNeg) { _ => + isNegative.ifElse(resWithNeg, resWithDecimalPart) + } + } + } + // handle null case + val anyNull = closeOnExcept(formated) { _ => + lhs.getBase.getNullCount > 0 + } + val formatedWithNull = anyNull match { + case true => { + withResource(formated) { _ => + withResource(lhs.getBase.isNull) { isNull => + withResource(Scalar.fromNull(DType.STRING)) { nullScalar => + isNull.ifElse(nullScalar, formated) + } + } + } + } + case false => formated + } + // handle inf and nan + x.dataType match { + case FloatType | DoubleType => { + withResource(formatedWithNull) { _ => + handleInfAndNan(lhs.getBase, formatedWithNull) + } + } + case _ => formatedWithNull + } + } + + override def doColumnar(lhs: GpuScalar, rhs: GpuColumnVector): ColumnVector = { + throw new UnsupportedOperationException() + } + + override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = { + throw new UnsupportedOperationException() + } + + override def doColumnar(numRows: Int, lhs: GpuScalar, rhs: GpuScalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, dataType)) { col => + doColumnar(col, rhs) + } + } +} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala index 3c3933946c5..4f64839acfd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/StringFunctionSuite.scala @@ -220,6 +220,35 @@ class RegExpUtilsSuite extends AnyFunSuite { } } +class FormatNumberSuite extends SparkQueryCompareTestSuite { + def testFormatNumberDf(session: SparkSession): DataFrame = { + import session.sqlContext.implicits._ + Seq[java.lang.Float]( + -0.0f, + 0.0f, + Float.PositiveInfinity, + Float.NegativeInfinity, + Float.NaN, + 1.0f, + 1.2345f, + 123456789.0f, + 123456789.123456789f, + 0.00123456789f, + 0.0000000123456789f, + 1.0000000123456789f + ).toDF("doubles") + } + + testSparkResultsAreEqual("Test format_number float", + testFormatNumberDf, + conf = new SparkConf().set("spark.rapids.sql.formatNumberFloat.enabled", "true")) { + frame => frame.selectExpr("format_number(doubles, -1)", + "format_number(doubles, 0)", + "format_number(doubles, 1)", + "format_number(doubles, 5)") + } +} + /* * This isn't actually a test. It's just useful to help visualize what's going on when there are * differences present. diff --git a/tools/generated_files/operatorsScore.csv b/tools/generated_files/operatorsScore.csv index 9582404b8e0..0c583853490 100644 --- a/tools/generated_files/operatorsScore.csv +++ b/tools/generated_files/operatorsScore.csv @@ -110,6 +110,7 @@ Expm1,4 First,4 Flatten,4 Floor,4 +FormatNumber,4 FromUTCTimestamp,4 FromUnixTime,4 GetArrayItem,4 diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv index f5a99f7197e..5b16293d07a 100644 --- a/tools/generated_files/supportedExprs.csv +++ b/tools/generated_files/supportedExprs.csv @@ -208,6 +208,9 @@ Flatten,S,`flatten`,None,project,input,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Flatten,S,`flatten`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA Floor,S,`floor`,None,project,input,NA,NA,NA,NA,S,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA Floor,S,`floor`,None,project,result,NA,NA,NA,NA,S,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA +FormatNumber,S,`format_number`,None,project,x,NA,S,S,S,S,S,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA +FormatNumber,S,`format_number`,None,project,d,NA,NA,NA,PS,NA,NA,NA,NA,NA,NS,NA,NA,NA,NA,NA,NA,NA,NA +FormatNumber,S,`format_number`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA FromUTCTimestamp,S,`from_utc_timestamp`,None,project,timestamp,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA FromUTCTimestamp,S,`from_utc_timestamp`,None,project,timezone,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA FromUTCTimestamp,S,`from_utc_timestamp`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA From d22848a846c5a9e73e2550052a8b29d430fe2629 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Fri, 29 Sep 2023 10:05:03 -0500 Subject: [PATCH 5/6] Add kuhushukla to blossom ci yml (#9329) Signed-off-by: Kuhu Shukla --- .github/workflows/blossom-ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/blossom-ci.yml b/.github/workflows/blossom-ci.yml index 0320c686046..7c22332eb9d 100644 --- a/.github/workflows/blossom-ci.yml +++ b/.github/workflows/blossom-ci.yml @@ -42,6 +42,7 @@ jobs: jbrennan333, \ jlowe,\ krajendrannv,\ + kuhushukla,\ mythrocks,\ nartal1,\ nvdbaranec,\ From 199b35ce81ffc3f44bb4e672394cb11dba64c359 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Fri, 29 Sep 2023 10:12:25 -0500 Subject: [PATCH 6/6] Building different Cuda versions section profile does not take effect [skip ci] (#9328) * Building different Cuda versions section profile does not take effect Signed-off-by: Kuhu Shukla * Address review comments Signed-off-by: Kuhu Shukla --------- Signed-off-by: Kuhu Shukla --- CONTRIBUTING.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5d6c0933f1f..b4d7cc040cf 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -144,8 +144,9 @@ specifying the environment variable `BUILD_PARALLEL=`. ### Building against different CUDA Toolkit versions -You can build against different versions of the CUDA Toolkit by using one of the following profiles: -* `-Pcuda11` (CUDA 11.0/11.1/11.2, default) +You can build against different versions of the CUDA Toolkit by modifying the variable `cuda.version`: +* `-Dcuda.version=cuda11` (CUDA 11.x, default) +* `-Dcuda.version=cuda12` (CUDA 12.x) ### Building a Distribution for a Single Spark Release