Skip to content

Commit

Permalink
Update file reading structure to expect each thor worker to have its …
Browse files Browse the repository at this point in the history
…own parquet file.
  • Loading branch information
jackdelv committed Aug 14, 2023
1 parent 9730e8d commit f1380b8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 32 deletions.
32 changes: 20 additions & 12 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,17 @@ namespace parquetembed
}
else
{
// Currently under the assumption that all channels and workers are given a worker id and no matter
// the configuration will show up in activityCtx->numSlaves()
if (activityCtx->numSlaves() > 1)
{
location.insert(location.find(".parquet"), std::to_string(activityCtx->querySlave()));
}
arrow::Status st;
auto reader_properties = parquet::ReaderProperties(pool);
auto arrow_reader_props = parquet::ArrowReaderProperties();
parquet::arrow::FileReaderBuilder reader_builder;
ARROW_RETURN_NOT_OK(reader_builder.OpenFile(location, false, reader_properties));
reportIfFailure(reader_builder.OpenFile(location, false, reader_properties));
reader_builder.memory_pool(pool);
reader_builder.properties(arrow_reader_props);
ARROW_ASSIGN_OR_RAISE(parquet_read, reader_builder.Build());
Expand All @@ -243,7 +249,7 @@ namespace parquetembed
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(table);

StringBuffer basename_template;
basename_template.appendf("part{i}_%ld.parquet", tablesProcessed++);
basename_template.appendf("part{i}_%lld.parquet", tablesProcessed++);
write_options.basename_template = basename_template.str();

{
Expand All @@ -254,7 +260,7 @@ namespace parquetembed
ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());

// Write partitioned files.
ARROW_RETURN_NOT_OK(arrow::dataset::FileSystemDataset::Write(write_options, scanner));
reportIfFailure(arrow::dataset::FileSystemDataset::Write(write_options, scanner));
}

return arrow::Status::OK();
Expand Down Expand Up @@ -299,7 +305,7 @@ namespace parquetembed
* @brief Divide row groups being read from a parquet file among any number of thor workers. If running hthor all row groups are assigned to it. This function
* will handle all cases where the number of groups is greater than, less than or divisible by the number of thor workers.
*/
void divide_row_groups(const IThorActivityContext *activityCtx, int64_t total_row_groups, int64_t &num_row_groups, int64_t &start_row_group)
void divide_row_groups(const IThorActivityContext *activityCtx, __int64 total_row_groups, __int64 &num_row_groups, __int64 &start_row_group)
{
int workers = activityCtx->numSlaves();
int strands = activityCtx->numStrands();
Expand Down Expand Up @@ -397,8 +403,10 @@ namespace parquetembed
}
else
{
int total_row_groups = parquet_read->num_row_groups();
divide_row_groups(activityCtx, total_row_groups, tableCount, start_row_group);
// int total_row_groups = parquet_read->num_row_groups();
// divide_row_groups(activityCtx, total_row_groups, tableCount, start_row_group);
tableCount = parquet_read->num_row_groups();
start_row_group = 0;
rowsProcessed = 0;
if (tableCount != 0)
{
Expand Down Expand Up @@ -431,7 +439,7 @@ namespace parquetembed
*
* @return int Maximum size of the row group.
*/
int64_t ParquetHelper::getMaxRowSize()
__int64 ParquetHelper::getMaxRowSize()
{
return row_size;
}
Expand Down Expand Up @@ -459,7 +467,7 @@ namespace parquetembed
return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount);
}

int64_t &ParquetHelper::getRowsProcessed()
__int64 &ParquetHelper::getRowsProcessed()
{
return rowsProcessed;
}
Expand Down Expand Up @@ -511,7 +519,7 @@ namespace parquetembed
return std::move(arrow::Table::FromRecordBatches(std::move(to_table)));
}

std::map<std::string, std::shared_ptr<arrow::Array>> *ParquetHelper::next()
std::unordered_map<std::string, std::shared_ptr<arrow::Array>> *ParquetHelper::next()
{
if (rowsProcessed == rowsCount)
{
Expand All @@ -538,7 +546,7 @@ namespace parquetembed
return &parquet_table;
}

int64_t ParquetHelper::num_rows()
__int64 ParquetHelper::num_rows()
{
return rowsCount;
}
Expand Down Expand Up @@ -1590,8 +1598,8 @@ namespace parquetembed
const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition)
const char *location = ""; // file name and location of where to write parquet file
const char *destination = ""; // file name and location of where to read parquet file from
int64_t rowsize = 2000000; // Size of the row groups when writing to parquet files
int64_t batchSize = 2000000; // Size of the batches when converting parquet columns to rows
__int64 rowsize = 2000000; // Size of the row groups when writing to parquet files
__int64 batchSize = 2000000; // Size of the batches when converting parquet columns to rows
// Iterate through user options and save them
StringArray inputOptions;
inputOptions.appendList(options, ",");
Expand Down
38 changes: 18 additions & 20 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ namespace parquetembed
ARROW_RETURN_NOT_OK(child_converter.Convert(*child_field.get(), child_builder.get()));
}

// Make null bitmap
// Make null bitunordered_map
for (const auto &maybe_value : FieldValues())
{
ARROW_ASSIGN_OR_RAISE(auto value, maybe_value);
Expand Down Expand Up @@ -710,14 +710,14 @@ namespace parquetembed
void update_row();
std::vector<rapidjson::Document> *record_batch();
bool partSetting();
int64_t getMaxRowSize();
__int64 getMaxRowSize();
char options();
bool shouldRead();
int64_t &getRowsProcessed();
__int64 &getRowsProcessed();
arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertToRecordBatch(const std::vector<rapidjson::Document> &rows, std::shared_ptr<arrow::Schema> schema);
std::map<std::string, std::shared_ptr<arrow::Array>> *next();
std::unordered_map<std::string, std::shared_ptr<arrow::Array>> *next();
arrow::Result<std::shared_ptr<arrow::Table>> queryRows();
int64_t num_rows();
__int64 num_rows();
std::shared_ptr<arrow::NestedType> makeChildRecord(const RtlFieldInfo *field);
arrow::Status FieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector<std::shared_ptr<arrow::Field>> &arrow_fields);
int countFields(const RtlTypeInfo *typeInfo);
Expand All @@ -727,13 +727,13 @@ namespace parquetembed
void end_row(const char *name);

private:
int64_t current_row;
int64_t row_size; // The maximum size of each parquet row group.
int64_t tablesProcessed; // Current RowGroup that has been read from the input file.
int64_t rowsProcessed; // Current Row that has been read from the RowGroup
int64_t start_row_group; // The beginning RowGroup that is read by a worker
int64_t tableCount; // The number of RowGroups to be read by the worker from the file that was opened for reading.
int64_t rowsCount; // The number of result rows in a given RowGroup read from the parquet file.
__int64 current_row;
__int64 row_size; // The maximum size of each parquet row group.
__int64 tablesProcessed; // Current RowGroup that has been read from the input file.
__int64 rowsProcessed; // Current Row that has been read from the RowGroup
__int64 start_row_group; // The beginning RowGroup that is read by a worker
__int64 tableCount; // The number of RowGroups to be read by the worker from the file that was opened for reading.
__int64 rowsCount; // The number of result rows in a given RowGroup read from the parquet file.
size_t batch_size; // batch_size for converting Parquet Columns to ECL rows. It is more efficient to break the data into small batches for converting to rows than to convert all at once.
bool partition; // Boolean variable to track whether we are writing partitioned files or not.
std::string p_option; // Read, r, Write, w, option for specifying parquet operation.
Expand All @@ -749,7 +749,7 @@ namespace parquetembed
std::shared_ptr<arrow::RecordBatchReader> rbatch_reader = nullptr;
arrow::RecordBatchReader::RecordBatchReaderIterator rbatch_itr;
std::unique_ptr<parquet::arrow::FileReader> parquet_read = nullptr; // FileReader for reading from parquet files.
std::map<std::string, std::shared_ptr<arrow::Array>> parquet_table;
std::unordered_map<std::string, std::shared_ptr<arrow::Array>> parquet_table;
arrow::MemoryPool *pool;
};

Expand All @@ -771,7 +771,7 @@ namespace parquetembed
Linked<IEngineRowAllocator> m_resultAllocator; //! Pointer to allocator used when building result rows.
bool m_shouldRead; //! If true, we should continue trying to read more messages.
__int64 m_currentRow; //! Current result row.
int64_t rowsCount; //! Number of result rows read from parquet file.
__int64 rowsCount; //! Number of result rows read from parquet file.
std::shared_ptr<ParquetArrayVisitor> array_visitor;
std::shared_ptr<ParquetHelper> s_parquet; //! Shared pointer to ParquetHelper class for the stream class.
};
Expand All @@ -783,8 +783,8 @@ namespace parquetembed
class ParquetRowBuilder : public CInterfaceOf<IFieldSource>
{
public:
ParquetRowBuilder(std::map<std::string, std::shared_ptr<arrow::Array>> *_result_rows, int64_t _currentRow, std::shared_ptr<ParquetArrayVisitor> *_array_visitor)
: result_rows(_result_rows), currentRow(_currentRow), array_visitor(_array_visitor), currentChunk(0), chunkOffset(0) {}
ParquetRowBuilder(std::unordered_map<std::string, std::shared_ptr<arrow::Array>> *_result_rows, int64_t _currentRow, std::shared_ptr<ParquetArrayVisitor> *_array_visitor)
: result_rows(_result_rows), currentRow(_currentRow), array_visitor(_array_visitor) {}

virtual ~ParquetRowBuilder() = default;

Expand Down Expand Up @@ -813,11 +813,9 @@ namespace parquetembed
void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const;

private:
int64_t currentRow;
int64_t currentChunk;
int64_t chunkOffset;
__int64 currentRow;
TokenDeserializer m_tokenDeserializer;
std::map<std::string, std::shared_ptr<arrow::Array>> *result_rows;
std::unordered_map<std::string, std::shared_ptr<arrow::Array>> *result_rows;
std::vector<PathTracker> m_pathStack;
std::shared_ptr<ParquetArrayVisitor> *array_visitor;
};
Expand Down

0 comments on commit f1380b8

Please sign in to comment.