Skip to content

Commit

Permalink
Merge pull request #18514 from jackdelv/ParquetWithRtlRecordInfo
Browse files Browse the repository at this point in the history
HPCC-31584 Add ECL Record information to Parquet

Reviewed-By: Dan S. Camper <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Apr 11, 2024
2 parents a782d9a + 174f8ea commit d32574d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 9 deletions.
2 changes: 1 addition & 1 deletion common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1767,7 +1767,7 @@ bool ParquetDiskRowReader::matches(const char * _format, bool _streamRemote, IDi
bool ParquetDiskRowReader::setInputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, offset_t baseOffset, const IPropertyTree * inputOptions, const FieldFilterArray & expectedFilter)
{
DBGLOG(0, "Opening File: %s", localFilename);
parquetFileReader = new parquetembed::ParquetReader("read", localFilename, 50000, nullptr, parquetActivityCtx);
parquetFileReader = new parquetembed::ParquetReader("read", localFilename, 50000, nullptr, parquetActivityCtx, &mapping->queryExpectedMeta()->queryRecordAccessor(true));
auto st = parquetFileReader->processReadFile();
if (!st.ok())
throw MakeStringException(0, "%s: %s.", st.CodeAsString().c_str(), st.message().c_str());
Expand Down
42 changes: 38 additions & 4 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "rtlembed.hpp"
#include "rtlds_imp.hpp"
#include "jfile.hpp"
#include "rtlrecord.hpp"

static constexpr const char *MODULE_NAME = "parquet";
static constexpr const char *MODULE_DESCRIPTION = "Parquet Embed Helper";
Expand Down Expand Up @@ -103,7 +104,11 @@ extern void fail(const char *message)
* @param _activityCtx Additional context about the thor workers running.
*/
ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx)
: partOption(option), location(_location)
: ParquetReader(option, _location, _maxRowCountInTable, _partitionFields, _activityCtx, nullptr) {}

// Constructs a ParquetReader with the expected record layout of the Parquet file
ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlRecord *_expectedRecord)
: partOption(option), location(_location), expectedRecord(_expectedRecord)
{
maxRowCountInTable = _maxRowCountInTable;
activityCtx = _activityCtx;
Expand Down Expand Up @@ -260,6 +265,29 @@ void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRow
}
}

/**
* @brief Reads selected columns from the current Table in the Parquet file.
*
* @param currTable The index of the Table to read columns from.
* @return __int64 The number of rows in the current Table.
*/
__int64 ParquetReader::readColumns(__int64 currTable)
{
auto rowGroupReader = queryCurrentTable(currTable); // Sets currentTableMetadata
for (int i = 0; i < expectedRecord->getNumFields(); i++)
{
int columnIndex = currentTableMetadata->schema()->ColumnIndex(expectedRecord->queryName(i));
if (columnIndex >= 0)
{
std::shared_ptr<arrow::ChunkedArray> column;
reportIfFailure(rowGroupReader->Column(columnIndex)->Read(&column));
parquetTable.insert(std::make_pair(expectedRecord->queryName(i), column->chunk(0)));
}
}

return currentTableMetadata->num_rows();
}

/**
* @brief Splits an arrow table into an unordered map with the left side containing the
* column names and the right side containing an Array of the column values.
Expand Down Expand Up @@ -291,6 +319,7 @@ std::shared_ptr<parquet::arrow::RowGroupReader> ParquetReader::queryCurrentTable
tables += fileTableCounts[i];
if (currTable < tables)
{
currentTableMetadata = parquetFileReaders[i]->parquet_reader()->metadata()->Subset({static_cast<int>(currTable - offset)});
return parquetFileReaders[i]->RowGroup(currTable - offset);
}
offset = tables;
Expand Down Expand Up @@ -397,11 +426,16 @@ __int64 ParquetReader::next(TableColumns *&nextTable)
}
else
{
reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table));
if (expectedRecord)
rowsCount = readColumns(tablesProcessed + startRowGroup);
else
{
reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table));
rowsCount = table->num_rows();
splitTable(table);
}
}
tablesProcessed++;
rowsCount = table->num_rows();
splitTable(table);
}
nextTable = &parquetTable;
totalRowsProcessed++;
Expand Down
15 changes: 11 additions & 4 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,23 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
{
public:
ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx);
ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx, const RtlRecord *_expectedRecord);
~ParquetReader();
arrow::Status openReadFile();

arrow::Status processReadFile();
void splitTable(std::shared_ptr<arrow::Table> &table);
bool shouldRead();
__int64 next(TableColumns *&nextTable);
std::shared_ptr<parquet::arrow::RowGroupReader> queryCurrentTable(__int64 currTable);
arrow::Result<std::shared_ptr<arrow::Table>> queryRows();

bool getCursor(MemoryBuffer & cursor);
void setCursor(MemoryBuffer & cursor);

private:
arrow::Status openReadFile();
__int64 readColumns(__int64 currTable);
void splitTable(std::shared_ptr<arrow::Table> &table);
std::shared_ptr<parquet::arrow::RowGroupReader> queryCurrentTable(__int64 currTable);
arrow::Result<std::shared_ptr<arrow::Table>> queryRows();

private:
// Count of processed rows and tables for both partitioned and regular files.
__int64 tablesProcessed = 0; // The number of tables processed when reading parquet files.
Expand All @@ -392,12 +397,14 @@ class PARQUETEMBED_PLUGIN_API ParquetReader
size_t maxRowCountInTable = 0; // Max table size set by user.
std::string partOption; // Begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'.
std::string location; // Full path to location for reading parquet files. Can be a filename or directory.
const RtlRecord *expectedRecord = nullptr; // Expected record layout of Parquet file. Only available when used in the platform i.e. not available when used as a plugin.
const IThorActivityContext *activityCtx = nullptr; // Context about the thor worker configuration.
std::shared_ptr<arrow::dataset::Scanner> scanner = nullptr; // Scanner for reading through partitioned files.
std::shared_ptr<arrow::RecordBatchReader> rbatchReader = nullptr; // RecordBatchReader reads a dataset one record batch at a time. Must be kept alive for rbatchItr.
arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr; // Iterator of RecordBatches when reading a partitioned dataset.
std::vector<__int64> fileTableCounts; // Count of RowGroups in each open file to get the correct row group when reading specific parts of the file.
std::vector<std::shared_ptr<parquet::arrow::FileReader>> parquetFileReaders; // Vector of FileReaders that match the target file name. data0.parquet, data1.parquet, etc.
std::shared_ptr<parquet::FileMetaData> currentTableMetadata = nullptr; // Parquet metadata for the current table.
TableColumns parquetTable; // The current table being read broken up into columns. Unordered map where the left side is a string of the field name and the right side is an array of the values.
std::vector<std::string> partitionFields; // The partitioning schema for reading Directory Partitioned files.
arrow::MemoryPool *pool = nullptr; // Memory pool for reading parquet files.
Expand Down

0 comments on commit d32574d

Please sign in to comment.