diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala index 0fc7defd063..c0e20bfaebc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala @@ -20,6 +20,7 @@ import scala.util.Random import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuExpressionsUtils.NullVecCache import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.ShimUnaryExecNode @@ -54,7 +55,9 @@ class GpuExpandExecMeta( val projections = gpuProjections.map(_.map(_.convertToGpu())) GpuExpandExec(projections, expand.output, childPlans.head.convertIfNeeded())( useTieredProject = conf.isTieredProjectEnabled, - preprojectEnabled = conf.isExpandPreprojectEnabled) + preprojectEnabled = conf.isExpandPreprojectEnabled, + cacheNullMaxCount = conf.expandCachingNullVecMaxCount, + coalesceAfter = conf.isCoalesceAfterExpandEnabled) } } @@ -72,11 +75,17 @@ case class GpuExpandExec( output: Seq[Attribute], child: SparkPlan)( useTieredProject: Boolean = false, - preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec { + preprojectEnabled: Boolean = false, + cacheNullMaxCount: Int = 0, + override val coalesceAfter: Boolean = true +) extends ShimUnaryExecNode with GpuExec { override def otherCopyArgs: Seq[AnyRef] = Seq[AnyRef]( useTieredProject.asInstanceOf[java.lang.Boolean], - preprojectEnabled.asInstanceOf[java.lang.Boolean]) + preprojectEnabled.asInstanceOf[java.lang.Boolean], + cacheNullMaxCount.asInstanceOf[java.lang.Integer], + coalesceAfter.asInstanceOf[java.lang.Boolean] + ) private val PRE_PROJECT_TIME = "preprojectTime" override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL @@ -127,7 +136,7 @@ case class GpuExpandExec( } child.executeColumnar().mapPartitions { it => - new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it)) + new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it), cacheNullMaxCount) } } @@ -191,7 +200,8 @@ case class GpuExpandExec( class GpuExpandIterator( boundProjections: Seq[GpuTieredProject], metrics: Map[String, GpuMetric], - it: Iterator[ColumnarBatch]) + it: Iterator[ColumnarBatch], + cacheNullMaxCount: Int) extends Iterator[ColumnarBatch] { private var sb: Option[SpillableColumnarBatch] = None @@ -206,9 +216,20 @@ class GpuExpandIterator( Option(TaskContext.get()).foreach { tc => onTaskCompletion(tc) { sb.foreach(_.close()) + + if (cacheNullMaxCount > 0) { + import scala.collection.JavaConverters._ + GpuExpressionsUtils.cachedNullVectors.get().values().asScala.foreach(_.close()) + GpuExpressionsUtils.cachedNullVectors.get().clear() + } } } + if (cacheNullMaxCount > 0 && GpuExpressionsUtils.cachedNullVectors.get() == null) { + GpuExpressionsUtils.cachedNullVectors.set(new NullVecCache(cacheNullMaxCount)) + } + + override def hasNext: Boolean = sb.isDefined || it.hasNext override def next(): ColumnarBatch = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala index c46862ab2aa..2400b364b5a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala @@ -21,11 +21,12 @@ import com.nvidia.spark.Retryable import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.{ShimBinaryExpression, ShimExpression, ShimTernaryExpression, ShimUnaryExpression} +import java.util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{DataType, StringType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.UTF8String @@ -52,6 +53,40 @@ object GpuExpressionsUtils { "implemented and should have been disabled") } + // This is only for ExpandExec which will generate a lot of null vectors + case class NullVecKey(d: DataType, n: Int) + + class NullVecCache(private val maxNulls: Int) + extends util.LinkedHashMap[NullVecKey, GpuColumnVector](100, 0.75f, true) { + private var totalNulls: Long = 0L + + override def clear(): Unit = { + super.clear() + totalNulls = 0 + } + + override def put(key: NullVecKey, v: GpuColumnVector): GpuColumnVector = { + if (v.getRowCount > maxNulls) { + throw new IllegalStateException(s"spark.rapids.sql.expandCachingNullVec.maxNulls" + + s"($maxNulls) is set too small to hold single vector with ${v.getRowCount} rows.") + } + val iter = entrySet().iterator() + while (iter.hasNext && totalNulls > maxNulls - v.getRowCount) { + val entry = iter.next() + iter.remove() + totalNulls -= entry.getValue.getRowCount + } + + val ret = super.put(key, v) + totalNulls += v.getRowCount + ret + } + + override def remove(key: Any): GpuColumnVector = throw new UnsupportedOperationException() + } + + val cachedNullVectors = new ThreadLocal[NullVecCache]() + /** * Tries to resolve a `GpuColumnVector` from a Scala `Any`. * @@ -73,7 +108,18 @@ object GpuExpressionsUtils { def resolveColumnVector(any: Any, numRows: Int): GpuColumnVector = { withResourceIfAllowed(any) { case c: GpuColumnVector => c.incRefCount() - case s: GpuScalar => GpuColumnVector.from(s, numRows, s.dataType) + case s: GpuScalar => + if (!s.isValid && cachedNullVectors.get() != null) { + if (!cachedNullVectors.get.containsKey(NullVecKey.apply(s.dataType, numRows))) { + cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows), + GpuColumnVector.from(s, numRows, s.dataType)) + } + + val ret = cachedNullVectors.get().get(NullVecKey.apply(s.dataType, numRows)) + ret.incRefCount() + } else { + GpuColumnVector.from(s, numRows, s.dataType) + } case other => throw new IllegalArgumentException(s"Cannot resolve a ColumnVector from the value:" + s" $other. Please convert it to a GpuScalar or a GpuColumnVector before returning.") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 48f9de5a61a..eef083bb93d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -360,6 +360,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { case _: GpuDataSourceScanExec => true case _: DataSourceV2ScanExecBase => true case _: RDDScanExec => true // just in case an RDD was reading in data + case _: ExpandExec => true case _ => false } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 1f9380ddbd0..11456d42a78 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1219,6 +1219,20 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(true) + val ENABLE_COALESCE_AFTER_EXPAND = conf("spark.rapids.sql.coalesceAfterExpand.enabled") + .doc("When set to false disables the coalesce after GPU Expand. ") + .internal() + .booleanConf + .createWithDefault(false) + + val EXPAND_CACHING_NULL_VEC_MAX_NULL_COUNT = + conf("spark.rapids.sql.expandCachingNullVec.maxNulls") + .doc("Max number of null scalar in null vectors to cache for GPU Expand. " + + "If the number of null scala exceeds this value, the null vectors will not be cached." + + "The value has to be positive for caching to be enabled.") + .internal().integerConf + .createWithDefault(0) + val ENABLE_ORC_FLOAT_TYPES_TO_STRING = conf("spark.rapids.sql.format.orc.floatTypesToString.enable") .doc("When reading an ORC file, the source data schemas(schemas of ORC file) may differ " + @@ -2736,6 +2750,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isExpandPreprojectEnabled: Boolean = get(ENABLE_EXPAND_PREPROJECT) + lazy val isCoalesceAfterExpandEnabled: Boolean = get(ENABLE_COALESCE_AFTER_EXPAND) + + lazy val expandCachingNullVecMaxCount: Int = get(EXPAND_CACHING_NULL_VEC_MAX_NULL_COUNT) + lazy val multiThreadReadNumThreads: Int = { // Use the largest value set among all the options. val deprecatedConfs = Seq(