Skip to content

Commit

Permalink
Merge pull request #18555 from jackdelv/HPCC-31642
Browse files Browse the repository at this point in the history
HPCC-31642 ParquetReader::readColumns not reading nested columns properly

Reviewed-By: Dan S. Camper <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Apr 26, 2024
2 parents c6fb14c + 71ad011 commit f7b5d42
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 44 deletions.
2 changes: 1 addition & 1 deletion common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1767,7 +1767,7 @@ bool ParquetDiskRowReader::matches(const char * _format, bool _streamRemote, IDi
bool ParquetDiskRowReader::setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter)
{
DBGLOG(0, "Opening File: %s", localFilename);
parquetFileReader = new parquetembed::ParquetReader("read", localFilename, 50000, nullptr, parquetActivityCtx, &mapping->queryExpectedMeta()->queryRecordAccessor(true));
parquetFileReader = new parquetembed::ParquetReader("read", localFilename, 50000, nullptr, parquetActivityCtx, mapping->queryExpectedMeta()->queryTypeInfo());
auto st = parquetFileReader->processReadFile();
if (!st.ok())
throw MakeStringException(0, "%s: %s.", st.CodeAsString().c_str(), st.message().c_str());
Expand Down
83 changes: 43 additions & 40 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,42 @@ extern void fail(const char *message)
rtlFail(0, msg.str());
}

/**
* @brief Utility function for getting the xpath or field name from an RtlFieldInfo object.
*
* @param outXPath The buffer for storing output.
* @param field RtlFieldInfo object storing metadata for field.
*/
void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field)
{
outXPath.clear();

if (field->xpath)
{
if (field->xpath[0] == xpathCompoundSeparatorChar)
{
outXPath.append(field->xpath + 1);
}
else
{
const char *sep = strchr(field->xpath, xpathCompoundSeparatorChar);

if (!sep)
{
outXPath.append(field->xpath);
}
else
{
outXPath.append(field->xpath, 0, static_cast<size32_t>(sep - field->xpath));
}
}
}
else
{
outXPath.append(field->name);
}
}

/**
* @brief Contructs a ParquetReader for a specific file location.
*
Expand All @@ -107,7 +143,7 @@ ParquetReader::ParquetReader(const char *option, const char *_location, int _max
: ParquetReader(option, _location, _maxRowCountInTable, _partitionFields, _activityCtx, nullptr) {}

// Constructs a ParquetReader with the expected record layout of the Parquet file
ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlRecord *_expectedRecord)
ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlTypeInfo * _expectedRecord)
: partOption(option), location(_location), expectedRecord(_expectedRecord)
{
maxRowCountInTable = _maxRowCountInTable;
Expand Down Expand Up @@ -274,14 +310,17 @@ void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRow
__int64 ParquetReader::readColumns(__int64 currTable)
{
auto rowGroupReader = queryCurrentTable(currTable); // Sets currentTableMetadata
for (int i = 0; i < expectedRecord->getNumFields(); i++)
int numFields = getNumFields(expectedRecord);
for (int i = 0; i < numFields; i++)
{
int columnIndex = currentTableMetadata->schema()->ColumnIndex(expectedRecord->queryName(i));
StringBuffer fieldName;
xpathOrName(fieldName, expectedRecord->queryFields()[i]);
int columnIndex = currentTableMetadata->schema()->group_node()->FieldIndex(fieldName.str());
if (columnIndex >= 0)
{
std::shared_ptr<arrow::ChunkedArray> column;
reportIfFailure(rowGroupReader->Column(columnIndex)->Read(&column));
parquetTable.insert(std::make_pair(expectedRecord->queryName(i), column->chunk(0)));
parquetTable.insert(std::make_pair(fieldName.str(), column->chunk(0)));
}
}

Expand Down Expand Up @@ -1013,42 +1052,6 @@ void ParquetRowStream::stop()
shouldRead = false;
}

/**
* @brief Utility function for getting the xpath or field name from an RtlFieldInfo object.
*
* @param outXPath The buffer for storing output.
* @param field RtlFieldInfo object storing metadata for field.
*/
void ParquetRowBuilder::xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const
{
outXPath.clear();

if (field->xpath)
{
if (field->xpath[0] == xpathCompoundSeparatorChar)
{
outXPath.append(field->xpath + 1);
}
else
{
const char *sep = strchr(field->xpath, xpathCompoundSeparatorChar);

if (!sep)
{
outXPath.append(field->xpath);
}
else
{
outXPath.append(field->xpath, 0, static_cast<size32_t>(sep - field->xpath));
}
}
}
else
{
outXPath.append(field->name);
}
}

/**
* @brief Gets the current array index taking into account the nested status of the row.
*
Expand Down
5 changes: 2 additions & 3 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
{
public:
ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx);
ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlRecord *_expectedRecord);
ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlTypeInfo *_expectedRecord);
~ParquetReader();

arrow::Status processReadFile();
Expand Down Expand Up @@ -399,7 +399,7 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
size_t maxRowCountInTable = 0; // Max table size set by user.
std::string partOption; // Begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'.
std::string location; // Full path to location for reading parquet files. Can be a filename or directory.
const RtlRecord *expectedRecord = nullptr; // Expected record layout of Parquet file. Only available when used in the platform i.e. not available when used as a plugin.
const RtlTypeInfo * expectedRecord = nullptr; // Expected record layout of Parquet file. Only available when used in the platform i.e. not available when used as a plugin.
const IThorActivityContext *activityCtx = nullptr; // Context about the thor worker configuration.
std::shared_ptr<arrow::dataset::Scanner> scanner = nullptr; // Scanner for reading through partitioned files.
std::shared_ptr<arrow::RecordBatchReader> rbatchReader = nullptr; // RecordBatchReader reads a dataset one record batch at a time. Must be kept alive for rbatchItr.
Expand Down Expand Up @@ -515,7 +515,6 @@ class PARQUETEMBED_PLUGIN_API ParquetRowBuilder : public CInterfaceOf<IFieldSour
double getCurrRealValue(const RtlFieldInfo *field);
void nextField(const RtlFieldInfo *field);
void nextFromStruct(const RtlFieldInfo *field);
void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const;
int64_t currArrayIndex();

private:
Expand Down

0 comments on commit f7b5d42

Please sign in to comment.