diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 82f368936a7..c9d4fbe0ba7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -132,9 +132,12 @@ class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric) class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Array[DataType], useKudo: Boolean) extends Serializer with Serializable { + + private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes)) + override def newInstance(): SerializerInstance = { if (useKudo) { - new KudoSerializerInstance(metrics, dataTypes) + new KudoSerializerInstance(metrics, dataTypes, kudo) } else { new GpuColumnarBatchSerializerInstance(metrics) } @@ -158,7 +161,7 @@ private class GpuColumnarBatchSerializerInstance(metrics: Map[String, GpuMetric] val batch = value.asInstanceOf[ColumnarBatch] val numColumns = batch.numCols() val columns: Array[HostColumnVector] = new Array(numColumns) - val toClose = new ArrayBuffer[AutoCloseable]() + val toClose = new ArrayBuffer[AutoCloseable](numColumns) try { var startRow = 0 val numRows = batch.numRows() @@ -338,7 +341,9 @@ object SerializedTableColumn { */ private class KudoSerializerInstance( val metrics: Map[String, GpuMetric], - val dataTypes: Array[DataType]) extends SerializerInstance { + val dataTypes: Array[DataType], + val kudo: KudoSerializer +) extends SerializerInstance { private val dataSize = metrics(METRIC_DATA_SIZE) private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME) private val serCalcHeaderTime = metrics(METRIC_SHUFFLE_SER_CALC_HEADER_TIME) @@ -346,8 +351,6 @@ private class KudoSerializerInstance( private val serCopyBufferTime = metrics(METRIC_SHUFFLE_SER_COPY_BUFFER_TIME) private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME) - private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes)) - override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { private[this] val dOut: DataOutputStream = new DataOutputStream(new BufferedOutputStream(out)) @@ -356,7 +359,7 @@ private class KudoSerializerInstance( val batch = value.asInstanceOf[ColumnarBatch] val numColumns = batch.numCols() val columns: Array[HostColumnVector] = new Array(numColumns) - withResource(new ArrayBuffer[AutoCloseable]()) { toClose => + withResource(new ArrayBuffer[AutoCloseable](numColumns)) { toClose => var startRow = 0 val numRows = batch.numRows() if (batch.numCols() > 0) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index c33c19cdd8a..62b8e36be65 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -237,6 +237,7 @@ class KudoTableOperator( kudoMergeHeaderTime: GpuMetric, kudoMergeBufferTime: GpuMetric) extends SerializedTableOperator[KudoSerializedTableColumn] { require(kudo != null, "kudo serializer should not be null") + private val kudoTables = new util.ArrayList[KudoTable]() override def getDataLen(column: KudoSerializedTableColumn): Long = column.kudoTable.getHeader .getTotalDataLen @@ -251,7 +252,8 @@ class KudoTableOperator( val totalRowsNum = columns.map(getNumRows).sum RowCountOnlyMergeResult(totalRowsNum) } else { - val kudoTables = new util.ArrayList[KudoTable](columns.length) + kudoTables.clear() + kudoTables.ensureCapacity(columns.length) columns.foreach { column => kudoTables.add(column.kudoTable) }