Skip to content

Commit

Permalink
Fix late initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Dec 6, 2024
1 parent a55aca0 commit 98508d2
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,13 @@ class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Arr
useKudo: Boolean)
extends Serializer with Serializable {

private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes))
private lazy val kudo = {
if (useKudo && dataTypes.nonEmpty) {
Some(new KudoSerializer(GpuColumnVector.from(dataTypes)))
} else {
None
}
}

override def newInstance(): SerializerInstance = {
if (useKudo) {
Expand Down Expand Up @@ -342,7 +348,7 @@ object SerializedTableColumn {
private class KudoSerializerInstance(
val metrics: Map[String, GpuMetric],
val dataTypes: Array[DataType],
val kudo: KudoSerializer
val kudo: Option[KudoSerializer]
) extends SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME)
Expand Down Expand Up @@ -387,7 +393,9 @@ private class KudoSerializerInstance(
}

withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ =>
val writeMetric = kudo.writeToStreamWithMetrics(columns, dOut, startRow, numRows)
val writeMetric = kudo
.getOrElse(throw new IllegalStateException("Kudo serializer not initialized."))
.writeToStreamWithMetrics(columns, dOut, startRow, numRows)

dataSize += writeMetric.getWrittenBytes
serCalcHeaderTime += writeMetric.getCalcHeaderTime
Expand Down

0 comments on commit 98508d2

Please sign in to comment.