Skip to content

Commit

Permalink
fix buildDictLateMatPatch on full empty dict
Browse files Browse the repository at this point in the history
  • Loading branch information
sperlingxx authored and wjxiz1992 committed Apr 15, 2024
1 parent 1dec981 commit 0c2328c
Showing 1 changed file with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ object BinaryColumnMetaUtils extends Logging {
var sizeInRows = 0L
rowGroups.foreach { rowGroup =>
val pageReader = rowGroup.getPageReader(descriptor)
val (encoded, bs, rs) = extractPageMeta(pageReader)
isAllDictEncoded = isAllDictEncoded && encoded
val (pages, dctEcdPages, bs, rs) = extractPageMeta(pageReader)
logInfo(s"Column(${descriptor.getPath.mkString(".")}) " +
s"RowGroup size($rs rows/$bs bytes) and Dict-encoded pages($dctEcdPages/$pages)")
isAllDictEncoded = isAllDictEncoded && pages == dctEcdPages
sizeInBytes = sizeInBytes + bs
sizeInRows = sizeInRows + rs
if (isAllDictEncoded) {
Expand All @@ -73,7 +75,7 @@ object BinaryColumnMetaUtils extends Logging {
if (!isAllDictEncoded) None else Some(dictPages.toArray))
}

private def extractPageMeta(pageReader: PageReader): (Boolean, Long, Long) = {
private def extractPageMeta(pageReader: PageReader): (Int, Int, Long, Long) = {
require(ccPageReader.isInstance(pageReader),
"Only supports org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader")

Expand All @@ -93,20 +95,22 @@ object BinaryColumnMetaUtils extends Logging {
s"Get unknown type ${pages.getClass} from ColumnChunkPageReader::compressedPages")
}

val (dctEncoded, bs, rs) = pageIterator.asScala.foldLeft((true, 0L, 0L)) {
case ((encoded, byteSize, rowSize), p: DataPage) => p match {
val (pageNum, dctEcdPageNum, bs, rs) = pageIterator.asScala.foldLeft((0, 0, 0L, 0L)) {
case ((pages, dctEcdPages, byteSize, rowSize), p: DataPage) => p match {
case p: DataPageV1 =>
(encoded && p.getValueEncoding.usesDictionary(),
(pages + 1,
if (p.getValueEncoding.usesDictionary()) dctEcdPages + 1 else dctEcdPages,
byteSize + p.getUncompressedSize,
rowSize + p.getValueCount)
case p: DataPageV2 =>
(encoded && p.getDataEncoding.usesDictionary(),
(pages + 1,
if (p.getDataEncoding.usesDictionary()) dctEcdPages + 1 else dctEcdPages,
byteSize + p.getUncompressedSize,
rowSize + p.getRowCount)
}
}

(dctEncoded, bs, rs)
(pageNum, dctEcdPageNum, bs, rs)
}

def buildDictLateMatPatch(dictPages: Seq[DictionaryPage],
Expand All @@ -132,8 +136,9 @@ object BinaryColumnMetaUtils extends Logging {
i += 1
}
}

val charBuf = HostMemoryBuffer.allocate(charNum)
// There exists dict without any non-empty string values, in case of null ptr error during
// copyFromHostToDevice, allocating at least 1 byte for char buffer.
val charBuf = HostMemoryBuffer.allocate(charNum max 1)
i = 0
dictionaries.foreach { dict =>
(0 to dict.getMaxId).foreach { j =>
Expand All @@ -145,6 +150,9 @@ object BinaryColumnMetaUtils extends Logging {

val dictVector = new HostColumnVector(DType.STRING, rowNum, Optional.of(0L),
charBuf, null, offsetBuf, new util.ArrayList[HostColumnVectorCore]())
logInfo(s"Built the HostDictVector for Column(${descriptor.getPath.mkString(".")}): " +
s"${dictVector.getRowCount}rows/${charBuf.getLength}bytes")

DictLatMatPatch(dictVector, pageOffsets.toArray)
}

Expand Down

0 comments on commit 0c2328c

Please sign in to comment.