Skip to content

Commit

Permalink
Cache chunks rather than read them in every time we build a field.
Browse files Browse the repository at this point in the history
  • Loading branch information
jackdelv committed Aug 9, 2023
1 parent 881c215 commit 816a53b
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 72 deletions.
115 changes: 60 additions & 55 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ namespace parquetembed
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(table);

StringBuffer basename_template;
basename_template.appendf("part{i}_%d.parquet", tablesProcessed++);
basename_template.appendf("part{i}_%ld.parquet", tablesProcessed++);
write_options.basename_template = basename_template.str();

{
Expand Down Expand Up @@ -299,7 +299,7 @@ namespace parquetembed
* @brief Divide row groups being read from a parquet file among any number of thor workers. If running hthor all row groups are assigned to it. This function
* will handle all cases where the number of groups is greater than, less than or divisible by the number of thor workers.
*/
void divide_row_groups(const IThorActivityContext *activityCtx, int total_row_groups, int &num_row_groups, int &start_row_group)
void divide_row_groups(const IThorActivityContext *activityCtx, int64_t total_row_groups, int64_t &num_row_groups, int64_t &start_row_group)
{
int workers = activityCtx->numSlaves();
int strands = activityCtx->numStrands();
Expand Down Expand Up @@ -355,6 +355,16 @@ namespace parquetembed
}
}

void ParquetHelper::chunkTable(std::shared_ptr<arrow::Table> &table)
{
auto columns = table->columns();
parquet_table.clear();
for (int i = 0; i < columns.size(); i++)
{
parquet_table.insert(std::make_pair(table->field(i)->name(), columns[i]->chunk(0)));
}
}

/**
* @brief Sets the parquet_table member to the output of what is read from the given
* parquet file.
Expand All @@ -374,8 +384,10 @@ namespace parquetembed
divide_row_groups(activityCtx, std::ceil(total_rows / batch_size), tableCount, start_row_group);
if (tableCount != 0)
{
PARQUET_ASSIGN_OR_THROW(parquet_table, queryRows());
rowsCount = parquet_table->num_rows();
std::shared_ptr<arrow::Table> table;
PARQUET_ASSIGN_OR_THROW(table, queryRows());
rowsCount = table->num_rows();
chunkTable(table);
tablesProcessed++;
}
else
Expand All @@ -387,12 +399,13 @@ namespace parquetembed
{
int total_row_groups = parquet_read->num_row_groups();
divide_row_groups(activityCtx, total_row_groups, tableCount, start_row_group);

rowsProcessed = 0;
if (tableCount != 0)
{
reportIfFailure(parquet_read->RowGroup(tablesProcessed + start_row_group)->ReadTable(&parquet_table));
rowsCount = parquet_table->num_rows();
std::shared_ptr<arrow::Table> table;
reportIfFailure(parquet_read->RowGroup(tablesProcessed + start_row_group)->ReadTable(&table));
rowsCount = table->num_rows();
chunkTable(table);
tablesProcessed++;
}
else
Expand All @@ -418,7 +431,7 @@ namespace parquetembed
*
* @return int Maximum size of the row group.
*/
int ParquetHelper::getMaxRowSize()
int64_t ParquetHelper::getMaxRowSize()
{
return row_size;
}
Expand Down Expand Up @@ -446,7 +459,7 @@ namespace parquetembed
return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount);
}

int &ParquetHelper::getRowsProcessed()
int64_t &ParquetHelper::getRowsProcessed()
{
return rowsProcessed;
}
Expand Down Expand Up @@ -498,7 +511,7 @@ namespace parquetembed
return std::move(arrow::Table::FromRecordBatches(std::move(to_table)));
}

std::shared_ptr<arrow::Table> *ParquetHelper::next()
std::map<std::string, std::shared_ptr<arrow::Array>> *ParquetHelper::next()
{
if (rowsProcessed == rowsCount)
{
Expand All @@ -507,15 +520,19 @@ namespace parquetembed
// rowsProcessed starts at zero and we read in batches until it is equal to rowsCount
rowsProcessed = 0;
tablesProcessed++;
PARQUET_ASSIGN_OR_THROW(parquet_table, queryRows());
rowsCount = parquet_table->num_rows();
std::shared_ptr<arrow::Table> table;
PARQUET_ASSIGN_OR_THROW(table, queryRows());
rowsCount = table->num_rows();
chunkTable(table);
}
else
{
reportIfFailure(parquet_read->RowGroup(tablesProcessed + start_row_group)->ReadTable(&parquet_table));
std::shared_ptr<arrow::Table> table;
reportIfFailure(parquet_read->RowGroup(tablesProcessed + start_row_group)->ReadTable(&table));
rowsProcessed = 0;
tablesProcessed++;
rowsCount = parquet_table->num_rows();
rowsCount = table->num_rows();
chunkTable(table);
}
}
return &parquet_table;
Expand Down Expand Up @@ -745,7 +762,7 @@ namespace parquetembed
auto table = s_parquet->next();
m_currentRow++;

if ((*table).get())
if (table)
{
ParquetRowBuilder pRowBuilder(table, s_parquet->getRowsProcessed()++, &array_visitor);

Expand Down Expand Up @@ -806,7 +823,7 @@ namespace parquetembed
*/
bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand All @@ -830,7 +847,7 @@ namespace parquetembed
*/
void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len, void *&result)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand Down Expand Up @@ -859,7 +876,7 @@ namespace parquetembed
*/
double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand Down Expand Up @@ -912,7 +929,7 @@ namespace parquetembed
*/
__int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand All @@ -935,7 +952,7 @@ namespace parquetembed
*/
unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand All @@ -960,7 +977,7 @@ namespace parquetembed
*/
void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &chars, char *&result)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand Down Expand Up @@ -990,7 +1007,7 @@ namespace parquetembed
*/
void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char *&result)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand Down Expand Up @@ -1021,7 +1038,7 @@ namespace parquetembed
*/
void ParquetRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar *&result)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand Down Expand Up @@ -1051,7 +1068,7 @@ namespace parquetembed
*/
void ParquetRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &value)
{
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 0)
{
Expand Down Expand Up @@ -1083,7 +1100,7 @@ namespace parquetembed
void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll)
{
isAll = false; // ALL not supported
reportIfFailure(nextField(field));
nextField(field);

if ((*array_visitor)->type == 8)
{
Expand Down Expand Up @@ -1133,7 +1150,7 @@ namespace parquetembed
{
if (strncmp(xpath, "<row>", 5) != 0)
{
reportIfFailure(nextField(field));
nextField(field);
if ((*array_visitor)->type == 9)
{
m_pathStack.push_back(PathTracker(field->name, (*array_visitor)->struct_arr, CPNTScalar));
Expand Down Expand Up @@ -1218,57 +1235,45 @@ namespace parquetembed
}
}

arrow::Status ParquetRowBuilder::nextFromStruct(const RtlFieldInfo *field)
void ParquetRowBuilder::nextFromStruct(const RtlFieldInfo *field)
{
auto structPtr = m_pathStack.back().structPtr;
reportIfFailure(structPtr->Accept((*array_visitor).get()));
if (m_pathStack.back().nodeType == CPNTScalar)
{
auto child = (*array_visitor)->struct_arr->GetFieldByName(field->name);
return child->Accept((*array_visitor).get());
reportIfFailure(child->Accept((*array_visitor).get()));
}
else if (m_pathStack.back().nodeType == CPNTSet)
{
auto child = (*array_visitor)->list_arr->value_slice(currentRow);
return child->Accept((*array_visitor).get());
}
else
{
return arrow::Status::TypeError("Invalid nested type.");
reportIfFailure(child->Accept((*array_visitor).get()));
}
}

const std::shared_ptr<arrow::Array> getChunk(std::shared_ptr<arrow::ChunkedArray> column, int &currentRow)
{
int offset = 0;
int chunk_idx = 0;
std::shared_ptr<arrow::Array> chunk;
while (offset <= currentRow)
{
chunk = column->chunk(chunk_idx++);
offset += chunk->length();
}
return chunk;
}

/**
* @brief Gets the next field and processes it.
*
* @param field Information about the context of the next field.
* @return const char* Result of building field.
*/
arrow::Status ParquetRowBuilder::nextField(const RtlFieldInfo *field)
void ParquetRowBuilder::nextField(const RtlFieldInfo *field)
{
if (!field->name)
{
failx("Field name is empty.");
}
if (m_pathStack.size() > 0)
{
return nextFromStruct(field);
nextFromStruct(field);
return;
}
auto column = result_rows->find(field->name);
if (column != result_rows->end())
{
reportIfFailure(column->second->Accept((*array_visitor).get()));
return;
}
auto chunk = getChunk((*result_rows)->GetColumnByName(field->name), currentRow);
return chunk->Accept((*array_visitor).get());
}

unsigned ParquetRecordBinder::checkNextParam(const RtlFieldInfo *field)
Expand Down Expand Up @@ -1582,11 +1587,11 @@ namespace parquetembed
: logctx(_logctx), m_NextRow(), m_nextParam(0), m_numParams(0), m_scriptFlags(_flags)
{
// Option Variables
const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition)
const char *location = ""; // file name and location of where to write parquet file
const char *destination = ""; // file name and location of where to read parquet file from
int rowsize = 20000; // Size of the row groups when writing to parquet files
int batchSize = 20000; // Size of the batches when converting parquet columns to rows
const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition)
const char *location = ""; // file name and location of where to write parquet file
const char *destination = ""; // file name and location of where to read parquet file from
int64_t rowsize = 2000000; // Size of the row groups when writing to parquet files
int64_t batchSize = 2000000; // Size of the batches when converting parquet columns to rows
// Iterate through user options and save them
StringArray inputOptions;
inputOptions.appendList(options, ",");
Expand Down
37 changes: 20 additions & 17 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,21 +704,21 @@ namespace parquetembed
arrow::Status openReadFile();
arrow::Status writePartition(std::shared_ptr<arrow::Table> table);
std::unique_ptr<parquet::arrow::FileWriter> *write();
void chunkTable(std::shared_ptr<arrow::Table> &table);
void read();
rapidjson::Value *doc();
void update_row();
std::vector<rapidjson::Document> *record_batch();
bool partSetting();
int getMaxRowSize();
int64_t getMaxRowSize();
char options();
bool shouldRead();
int &getRowsProcessed();
int64_t &getRowsProcessed();
arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertToRecordBatch(const std::vector<rapidjson::Document> &rows, std::shared_ptr<arrow::Schema> schema);
std::shared_ptr<arrow::Table> *next();
std::map<std::string, std::shared_ptr<arrow::Array>> *next();
arrow::Result<std::shared_ptr<arrow::Table>> queryRows();
int64_t num_rows();
std::shared_ptr<arrow::NestedType> makeChildRecord(const RtlFieldInfo *field);
arrow::Status TypeToNode(const RtlTypeInfo *field, std::shared_ptr<arrow::Field> arrow_field);
arrow::Status FieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector<std::shared_ptr<arrow::Field>> &arrow_fields);
int countFields(const RtlTypeInfo *typeInfo);
arrow::Status fieldsToSchema(const RtlTypeInfo *typeInfo);
Expand All @@ -727,12 +727,12 @@ namespace parquetembed
void end_row(const char *name);

private:
int current_row;
int row_size; // The maximum size of each parquet row group.
int tablesProcessed; // Current RowGroup that has been read from the input file.
int rowsProcessed; // Current Row that has been read from the RowGroup
int start_row_group; // The beginning RowGroup that is read by a worker
int tableCount; // The number of RowGroups to be read by the worker from the file that was opened for reading.
int64_t current_row;
int64_t row_size; // The maximum size of each parquet row group.
int64_t tablesProcessed; // Current RowGroup that has been read from the input file.
int64_t rowsProcessed; // Current Row that has been read from the RowGroup
int64_t start_row_group; // The beginning RowGroup that is read by a worker
int64_t tableCount; // The number of RowGroups to be read by the worker from the file that was opened for reading.
int64_t rowsCount; // The number of result rows in a given RowGroup read from the parquet file.
size_t batch_size; // batch_size for converting Parquet Columns to ECL rows. It is more efficient to break the data into small batches for converting to rows than to convert all at once.
bool partition; // Boolean variable to track whether we are writing partitioned files or not.
Expand All @@ -749,7 +749,7 @@ namespace parquetembed
std::shared_ptr<arrow::RecordBatchReader> rbatch_reader = nullptr;
arrow::RecordBatchReader::RecordBatchReaderIterator rbatch_itr;
std::unique_ptr<parquet::arrow::FileReader> parquet_read = nullptr; // FileReader for reading from parquet files.
std::shared_ptr<arrow::Table> parquet_table = nullptr; // Table for creating the iterator for outputing result rows.
std::map<std::string, std::shared_ptr<arrow::Array>> parquet_table;
arrow::MemoryPool *pool;
};

Expand Down Expand Up @@ -783,8 +783,8 @@ namespace parquetembed
class ParquetRowBuilder : public CInterfaceOf<IFieldSource>
{
public:
ParquetRowBuilder(std::shared_ptr<arrow::Table> *_result_rows, int _currentRow, std::shared_ptr<ParquetArrayVisitor> *_array_visitor)
: result_rows(_result_rows), currentRow(_currentRow), array_visitor(_array_visitor) {}
ParquetRowBuilder(std::map<std::string, std::shared_ptr<arrow::Array>> *_result_rows, int64_t _currentRow, std::shared_ptr<ParquetArrayVisitor> *_array_visitor)
: result_rows(_result_rows), currentRow(_currentRow), array_visitor(_array_visitor), currentChunk(0), chunkOffset(0) {}

virtual ~ParquetRowBuilder() = default;

Expand All @@ -807,14 +807,17 @@ namespace parquetembed
virtual void processEndRow(const RtlFieldInfo *field);

protected:
arrow::Status nextField(const RtlFieldInfo *field);
arrow::Status nextFromStruct(const RtlFieldInfo *field);
const std::shared_ptr<arrow::Array> &getChunk(std::shared_ptr<arrow::ChunkedArray> *column);
void nextField(const RtlFieldInfo *field);
void nextFromStruct(const RtlFieldInfo *field);
void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const;

private:
int currentRow;
int64_t currentRow;
int64_t currentChunk;
int64_t chunkOffset;
TokenDeserializer m_tokenDeserializer;
std::shared_ptr<arrow::Table> *result_rows;
std::map<std::string, std::shared_ptr<arrow::Array>> *result_rows;
std::vector<PathTracker> m_pathStack;
std::shared_ptr<ParquetArrayVisitor> *array_visitor;
};
Expand Down

0 comments on commit 816a53b

Please sign in to comment.