From 71ad01105dbb2126ace374926ac671c8d2e47ff5 Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Thu, 18 Apr 2024 16:23:04 +0000 Subject: [PATCH] HPCC-31642 ParquetReader::readColumns not reading nested columns properly --- common/thorhelper/thorread.cpp | 2 +- plugins/parquet/parquetembed.cpp | 83 +++++++++++++++++--------------- plugins/parquet/parquetembed.hpp | 5 +- 3 files changed, 46 insertions(+), 44 deletions(-) diff --git a/common/thorhelper/thorread.cpp b/common/thorhelper/thorread.cpp index d1dd5169a69..ab7386a7719 100644 --- a/common/thorhelper/thorread.cpp +++ b/common/thorhelper/thorread.cpp @@ -1767,7 +1767,7 @@ bool ParquetDiskRowReader::matches(const char * _format, bool _streamRemote, IDi bool ParquetDiskRowReader::setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter) { DBGLOG(0, "Opening File: %s", localFilename); - parquetFileReader = new parquetembed::ParquetReader("read", localFilename, 50000, nullptr, parquetActivityCtx, &mapping->queryExpectedMeta()->queryRecordAccessor(true)); + parquetFileReader = new parquetembed::ParquetReader("read", localFilename, 50000, nullptr, parquetActivityCtx, mapping->queryExpectedMeta()->queryTypeInfo()); auto st = parquetFileReader->processReadFile(); if (!st.ok()) throw MakeStringException(0, "%s: %s.", st.CodeAsString().c_str(), st.message().c_str()); diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index e247c3fb07c..e8e91fdbed9 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -95,6 +95,42 @@ extern void fail(const char *message) rtlFail(0, msg.str()); } +/** + * @brief Utility function for getting the xpath or field name from an RtlFieldInfo object. + * + * @param outXPath The buffer for storing output. + * @param field RtlFieldInfo object storing metadata for field. + */ +void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) +{ + outXPath.clear(); + + if (field->xpath) + { + if (field->xpath[0] == xpathCompoundSeparatorChar) + { + outXPath.append(field->xpath + 1); + } + else + { + const char *sep = strchr(field->xpath, xpathCompoundSeparatorChar); + + if (!sep) + { + outXPath.append(field->xpath); + } + else + { + outXPath.append(field->xpath, 0, static_cast(sep - field->xpath)); + } + } + } + else + { + outXPath.append(field->name); + } +} + /** * @brief Contructs a ParquetReader for a specific file location. * @@ -107,7 +143,7 @@ ParquetReader::ParquetReader(const char *option, const char *_location, int _max : ParquetReader(option, _location, _maxRowCountInTable, _partitionFields, _activityCtx, nullptr) {} // Constructs a ParquetReader with the expected record layout of the Parquet file -ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlRecord *_expectedRecord) +ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlTypeInfo * _expectedRecord) : partOption(option), location(_location), expectedRecord(_expectedRecord) { maxRowCountInTable = _maxRowCountInTable; @@ -274,14 +310,17 @@ void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRow __int64 ParquetReader::readColumns(__int64 currTable) { auto rowGroupReader = queryCurrentTable(currTable); // Sets currentTableMetadata - for (int i = 0; i < expectedRecord->getNumFields(); i++) + int numFields = getNumFields(expectedRecord); + for (int i = 0; i < numFields; i++) { - int columnIndex = currentTableMetadata->schema()->ColumnIndex(expectedRecord->queryName(i)); + StringBuffer fieldName; + xpathOrName(fieldName, expectedRecord->queryFields()[i]); + int columnIndex = currentTableMetadata->schema()->group_node()->FieldIndex(fieldName.str()); if (columnIndex >= 0) { std::shared_ptr column; reportIfFailure(rowGroupReader->Column(columnIndex)->Read(&column)); - parquetTable.insert(std::make_pair(expectedRecord->queryName(i), column->chunk(0))); + parquetTable.insert(std::make_pair(fieldName.str(), column->chunk(0))); } } @@ -1013,42 +1052,6 @@ void ParquetRowStream::stop() shouldRead = false; } -/** - * @brief Utility function for getting the xpath or field name from an RtlFieldInfo object. - * - * @param outXPath The buffer for storing output. - * @param field RtlFieldInfo object storing metadata for field. - */ -void ParquetRowBuilder::xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const -{ - outXPath.clear(); - - if (field->xpath) - { - if (field->xpath[0] == xpathCompoundSeparatorChar) - { - outXPath.append(field->xpath + 1); - } - else - { - const char *sep = strchr(field->xpath, xpathCompoundSeparatorChar); - - if (!sep) - { - outXPath.append(field->xpath); - } - else - { - outXPath.append(field->xpath, 0, static_cast(sep - field->xpath)); - } - } - } - else - { - outXPath.append(field->name); - } -} - /** * @brief Gets the current array index taking into account the nested status of the row. * diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index 39a3e3c5aef..277c5385c26 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -361,7 +361,7 @@ class PARQUETEMBED_PLUGIN_API ParquetReader { public: ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx); - ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlRecord *_expectedRecord); + ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlTypeInfo *_expectedRecord); ~ParquetReader(); arrow::Status processReadFile(); @@ -397,7 +397,7 @@ class PARQUETEMBED_PLUGIN_API ParquetReader size_t maxRowCountInTable = 0; // Max table size set by user. std::string partOption; // Begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'. std::string location; // Full path to location for reading parquet files. Can be a filename or directory. - const RtlRecord *expectedRecord = nullptr; // Expected record layout of Parquet file. Only available when used in the platform i.e. not available when used as a plugin. + const RtlTypeInfo * expectedRecord = nullptr; // Expected record layout of Parquet file. Only available when used in the platform i.e. not available when used as a plugin. const IThorActivityContext *activityCtx = nullptr; // Context about the thor worker configuration. std::shared_ptr scanner = nullptr; // Scanner for reading through partitioned files. std::shared_ptr rbatchReader = nullptr; // RecordBatchReader reads a dataset one record batch at a time. Must be kept alive for rbatchItr. @@ -513,7 +513,6 @@ class PARQUETEMBED_PLUGIN_API ParquetRowBuilder : public CInterfaceOf