Skip to content

Commit

Permalink
Merge branch-24.08
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Jun 25, 2024
2 parents 7c43e69 + 7a8690f commit fdc4dd9
Show file tree
Hide file tree
Showing 50 changed files with 4,151 additions and 950 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/blossom-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
repository: ${{ fromJson(needs.Authorization.outputs.args).repo }}
ref: ${{ fromJson(needs.Authorization.outputs.args).ref }}
lfs: 'true'

# repo specific steps
- name: Setup java
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: adopt
java-version: 8
Expand Down
4 changes: 4 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
<pattern>com.google.flatbuffers</pattern>
<shadedPattern>${rapids.shade.package}.com.google.flatbuffers</shadedPattern>
</relocation>
<relocation>
<pattern>org.roaringbitmap</pattern>
<shadedPattern>${rapids.shade.package}.org.roaringbitmap</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.delta

import ai.rapids.cudf.{ColumnVector => CudfColumnVector, Scalar, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuColumnVector
import org.roaringbitmap.longlong.{PeekableLongIterator, Roaring64Bitmap}

import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}


object GpuDeltaParquetFileFormatUtils {
/**
* Row number of the row in the file. When used with [[FILE_PATH_COL]] together, it can be used
* as unique id of a row in file. Currently to correctly calculate this, the caller needs to
* set both [[isSplitable]] to false, and [[RapidsConf.PARQUET_READER_TYPE]] to "PERFILE".
*/
val METADATA_ROW_IDX_COL: String = "__metadata_row_index"
val METADATA_ROW_IDX_FIELD: StructField = StructField(METADATA_ROW_IDX_COL, LongType,
nullable = false)

val METADATA_ROW_DEL_COL: String = "__metadata_row_del"
val METADATA_ROW_DEL_FIELD: StructField = StructField(METADATA_ROW_DEL_COL, BooleanType,
nullable = false)


/**
* File path of the file that the row came from.
*/
val FILE_PATH_COL: String = "_metadata_file_path"
val FILE_PATH_FIELD: StructField = StructField(FILE_PATH_COL, StringType, nullable = false)

/**
* Add a metadata column to the iterator. Currently only support [[METADATA_ROW_IDX_COL]].
*/
def addMetadataColumnToIterator(
schema: StructType,
delVector: Option[Roaring64Bitmap],
input: Iterator[ColumnarBatch],
maxBatchSize: Int): Iterator[ColumnarBatch] = {
val metadataRowIndexCol = schema.fieldNames.indexOf(METADATA_ROW_IDX_COL)
val delRowIdx = schema.fieldNames.indexOf(METADATA_ROW_DEL_COL)
if (metadataRowIndexCol == -1 && delRowIdx == -1) {
return input
}
var rowIndex = 0L
input.map { batch =>
withResource(batch) { _ =>
val rowIdxCol = if (metadataRowIndexCol == -1) {
None
} else {
Some(metadataRowIndexCol)
}

val delRowIdx2 = if (delRowIdx == -1) {
None
} else {
Some(delRowIdx)
}
val newBatch = addMetadataColumns(rowIdxCol, delRowIdx2, delVector,maxBatchSize,
rowIndex, batch)
rowIndex += batch.numRows()
newBatch
}
}
}

private def addMetadataColumns(
rowIdxPos: Option[Int],
delRowIdx: Option[Int],
delVec: Option[Roaring64Bitmap],
maxBatchSize: Int,
rowIdxStart: Long,
batch: ColumnarBatch): ColumnarBatch = {
val rowIdxCol = rowIdxPos.map { _ =>
withResource(Scalar.fromLong(rowIdxStart)) { start =>
GpuColumnVector.from(CudfColumnVector.sequence(start, batch.numRows()),
METADATA_ROW_IDX_FIELD.dataType)
}
}

closeOnExcept(rowIdxCol) { rowIdxCol =>

val delVecCol = delVec.map { delVec =>
withResource(Scalar.fromBool(false)) { s =>
withResource(CudfColumnVector.fromScalar(s, batch.numRows())) { c =>
var table = new Table(c)
val posIter = new RoaringBitmapIterator(
delVec.getLongIteratorFrom(rowIdxStart),
rowIdxStart,
rowIdxStart + batch.numRows(),
).grouped(Math.min(maxBatchSize, batch.numRows()))

for (posChunk <- posIter) {
withResource(CudfColumnVector.fromLongs(posChunk: _*)) { poses =>
withResource(Scalar.fromBool(true)) { s =>
table = withResource(table) { _ =>
Table.scatter(Array(s), poses, table)
}
}
}
}

withResource(table) { _ =>
GpuColumnVector.from(table.getColumn(0).incRefCount(),
METADATA_ROW_DEL_FIELD.dataType)
}
}
}
}

closeOnExcept(delVecCol) { delVecCol =>
// Replace row_idx column
val columns = new Array[ColumnVector](batch.numCols())
for (i <- 0 until batch.numCols()) {
if (rowIdxPos.contains(i)) {
columns(i) = rowIdxCol.get
} else if (delRowIdx.contains(i)) {
columns(i) = delVecCol.get
} else {
columns(i) = batch.column(i) match {
case gpuCol: GpuColumnVector => gpuCol.incRefCount()
case col => col
}
}
}

new ColumnarBatch(columns, batch.numRows())
}
}
}
}

class RoaringBitmapIterator(val inner: PeekableLongIterator, val start: Long, val end: Long)
extends Iterator[Long] {

override def hasNext: Boolean = {
inner.hasNext && inner.peekNext() < end
}

override def next(): Long = {
inner.next() - start
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,19 @@

package com.nvidia.spark.rapids.delta

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import ai.rapids.cudf.{ColumnVector, Scalar, Table}
import ai.rapids.cudf.Table.DuplicateKeepOption
import com.nvidia.spark.RapidsUDF
import com.nvidia.spark.rapids.Arm.withResource
import org.roaringbitmap.longlong.Roaring64Bitmap

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.types.{BinaryType, DataType, SQLUserDefinedType, UserDefinedType}
import org.apache.spark.util.AccumulatorV2

class GpuDeltaRecordTouchedFileNameUDF(accum: AccumulatorV2[String, java.util.Set[String]])
Expand Down Expand Up @@ -73,3 +80,77 @@ class GpuDeltaMetricUpdateUDF(metric: SQLMetric)
}
}
}

class GpuDeltaNoopUDF extends Function1[Boolean, Boolean] with RapidsUDF with Serializable {
override def apply(v1: Boolean): Boolean = v1

override def evaluateColumnar(numRows: Int, args: ColumnVector*): ColumnVector = {
require(args.length == 1)
args(0).incRefCount()
}
}

@SQLUserDefinedType(udt = classOf[RoaringBitmapUDT])
case class RoaringBitmapWrapper(inner: Roaring64Bitmap) {
def serializeToBytes(): Array[Byte] = {
withResource(new ByteArrayOutputStream()) { bout =>
withResource(new DataOutputStream(bout)) { dao =>
inner.serialize(dao)
}
bout.toByteArray
}
}
}

object RoaringBitmapWrapper {
def deserializeFromBytes(bytes: Array[Byte]): RoaringBitmapWrapper = {
withResource(new ByteArrayInputStream(bytes)) { bin =>
withResource(new DataInputStream(bin)) { din =>
val ret = RoaringBitmapWrapper(new Roaring64Bitmap)
ret.inner.deserialize(din)
ret
}
}
}
}

class RoaringBitmapUDT extends UserDefinedType[RoaringBitmapWrapper] {

override def sqlType: DataType = BinaryType

override def serialize(obj: RoaringBitmapWrapper): Any = {
obj.serializeToBytes()
}

override def deserialize(datum: Any): RoaringBitmapWrapper = {
datum match {
case b: Array[Byte] => RoaringBitmapWrapper.deserializeFromBytes(b)
case t => throw new IllegalArgumentException(s"t: ${t.getClass}")
}
}

override def userClass: Class[RoaringBitmapWrapper] = classOf[RoaringBitmapWrapper]

override def typeName: String = "RoaringBitmap"
}

object RoaringBitmapUDAF extends Aggregator[Long, RoaringBitmapWrapper, RoaringBitmapWrapper] {
override def zero: RoaringBitmapWrapper = RoaringBitmapWrapper(new Roaring64Bitmap())

override def reduce(b: RoaringBitmapWrapper, a: Long): RoaringBitmapWrapper = {
b.inner.addLong(a)
b
}

override def merge(b1: RoaringBitmapWrapper, b2: RoaringBitmapWrapper): RoaringBitmapWrapper = {
val ret = b1.inner.clone()
ret.or(b2.inner)
RoaringBitmapWrapper(ret)
}

override def finish(reduction: RoaringBitmapWrapper): RoaringBitmapWrapper = reduction

override def bufferEncoder: Encoder[RoaringBitmapWrapper] = ExpressionEncoder()

override def outputEncoder: Encoder[RoaringBitmapWrapper] = ExpressionEncoder()
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,7 +74,8 @@ object Delta24xProvider extends DeltaIOProvider {

override def getReadFileFormat(format: FileFormat): FileFormat = {
val cpuFormat = format.asInstanceOf[DeltaParquetFileFormat]
GpuDelta24xParquetFileFormat(cpuFormat.metadata, cpuFormat.isSplittable)
GpuDelta24xParquetFileFormat(cpuFormat.metadata, cpuFormat.isSplittable,
cpuFormat.disablePushDowns, cpuFormat.broadcastDvMap)
}

override def convertToGpu(
Expand Down
Loading

0 comments on commit fdc4dd9

Please sign in to comment.