Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-31642 ParquetReader::readColumns not reading nested columns properly #18555

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1767,7 +1767,7 @@ bool ParquetDiskRowReader::matches(const char * _format, bool _streamRemote, IDi
bool ParquetDiskRowReader::setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter)
{
DBGLOG(0, "Opening File: %s", localFilename);
parquetFileReader = new parquetembed::ParquetReader("read", localFilename, 50000, nullptr, parquetActivityCtx, &mapping->queryExpectedMeta()->queryRecordAccessor(true));
parquetFileReader = new parquetembed::ParquetReader("read", localFilename, 50000, nullptr, parquetActivityCtx, mapping->queryExpectedMeta()->queryTypeInfo());
auto st = parquetFileReader->processReadFile();
if (!st.ok())
throw MakeStringException(0, "%s: %s.", st.CodeAsString().c_str(), st.message().c_str());
Expand Down
83 changes: 43 additions & 40 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,42 @@ extern void fail(const char *message)
rtlFail(0, msg.str());
}

/**
* @brief Utility function for getting the xpath or field name from an RtlFieldInfo object.
*
* @param outXPath The buffer for storing output.
* @param field RtlFieldInfo object storing metadata for field.
*/
void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field)
{
outXPath.clear();

if (field->xpath)
{
if (field->xpath[0] == xpathCompoundSeparatorChar)
{
outXPath.append(field->xpath + 1);
}
else
{
const char *sep = strchr(field->xpath, xpathCompoundSeparatorChar);

if (!sep)
{
outXPath.append(field->xpath);
}
else
{
outXPath.append(field->xpath, 0, static_cast<size32_t>(sep - field->xpath));
}
}
}
else
{
outXPath.append(field->name);
}
}

/**
* @brief Contructs a ParquetReader for a specific file location.
*
Expand All @@ -107,7 +143,7 @@ ParquetReader::ParquetReader(const char *option, const char *_location, int _max
: ParquetReader(option, _location, _maxRowCountInTable, _partitionFields, _activityCtx, nullptr) {}

// Constructs a ParquetReader with the expected record layout of the Parquet file
ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlRecord *_expectedRecord)
ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlTypeInfo * _expectedRecord)
: partOption(option), location(_location), expectedRecord(_expectedRecord)
{
maxRowCountInTable = _maxRowCountInTable;
Expand Down Expand Up @@ -274,14 +310,17 @@ void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRow
__int64 ParquetReader::readColumns(__int64 currTable)
{
auto rowGroupReader = queryCurrentTable(currTable); // Sets currentTableMetadata
for (int i = 0; i < expectedRecord->getNumFields(); i++)
int numFields = getNumFields(expectedRecord);
for (int i = 0; i < numFields; i++)
{
int columnIndex = currentTableMetadata->schema()->ColumnIndex(expectedRecord->queryName(i));
StringBuffer fieldName;
xpathOrName(fieldName, expectedRecord->queryFields()[i]);
int columnIndex = currentTableMetadata->schema()->group_node()->FieldIndex(fieldName.str());
if (columnIndex >= 0)
{
std::shared_ptr<arrow::ChunkedArray> column;
reportIfFailure(rowGroupReader->Column(columnIndex)->Read(&column));
parquetTable.insert(std::make_pair(expectedRecord->queryName(i), column->chunk(0)));
parquetTable.insert(std::make_pair(fieldName.str(), column->chunk(0)));
}
}

Expand Down Expand Up @@ -1013,42 +1052,6 @@ void ParquetRowStream::stop()
shouldRead = false;
}

/**
* @brief Utility function for getting the xpath or field name from an RtlFieldInfo object.
*
* @param outXPath The buffer for storing output.
* @param field RtlFieldInfo object storing metadata for field.
*/
void ParquetRowBuilder::xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const
{
outXPath.clear();

if (field->xpath)
{
if (field->xpath[0] == xpathCompoundSeparatorChar)
{
outXPath.append(field->xpath + 1);
}
else
{
const char *sep = strchr(field->xpath, xpathCompoundSeparatorChar);

if (!sep)
{
outXPath.append(field->xpath);
}
else
{
outXPath.append(field->xpath, 0, static_cast<size32_t>(sep - field->xpath));
}
}
}
else
{
outXPath.append(field->name);
}
}

/**
* @brief Gets the current array index taking into account the nested status of the row.
*
Expand Down
5 changes: 2 additions & 3 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
{
public:
ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx);
ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlRecord *_expectedRecord);
ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlTypeInfo *_expectedRecord);
~ParquetReader();

arrow::Status processReadFile();
Expand Down Expand Up @@ -397,7 +397,7 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
size_t maxRowCountInTable = 0; // Max table size set by user.
std::string partOption; // Begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'.
std::string location; // Full path to location for reading parquet files. Can be a filename or directory.
const RtlRecord *expectedRecord = nullptr; // Expected record layout of Parquet file. Only available when used in the platform i.e. not available when used as a plugin.
const RtlTypeInfo * expectedRecord = nullptr; // Expected record layout of Parquet file. Only available when used in the platform i.e. not available when used as a plugin.
const IThorActivityContext *activityCtx = nullptr; // Context about the thor worker configuration.
std::shared_ptr<arrow::dataset::Scanner> scanner = nullptr; // Scanner for reading through partitioned files.
std::shared_ptr<arrow::RecordBatchReader> rbatchReader = nullptr; // RecordBatchReader reads a dataset one record batch at a time. Must be kept alive for rbatchItr.
Expand Down Expand Up @@ -513,7 +513,6 @@ class PARQUETEMBED_PLUGIN_API ParquetRowBuilder : public CInterfaceOf<IFieldSour
double getCurrRealValue(const RtlFieldInfo *field);
void nextField(const RtlFieldInfo *field);
void nextFromStruct(const RtlFieldInfo *field);
void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const;
int64_t currArrayIndex();

private:
Expand Down
Loading