diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index 58c9b3ff717..983c7f2a3c0 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -243,7 +243,7 @@ namespace parquetembed auto dataset = std::make_shared(table); StringBuffer basename_template; - basename_template.appendf("part{i}_%d.parquet", tablesProcessed++); + basename_template.appendf("part{i}_%ld.parquet", tablesProcessed++); write_options.basename_template = basename_template.str(); { @@ -299,7 +299,7 @@ namespace parquetembed * @brief Divide row groups being read from a parquet file among any number of thor workers. If running hthor all row groups are assigned to it. This function * will handle all cases where the number of groups is greater than, less than or divisible by the number of thor workers. */ - void divide_row_groups(const IThorActivityContext *activityCtx, int total_row_groups, int &num_row_groups, int &start_row_group) + void divide_row_groups(const IThorActivityContext *activityCtx, int64_t total_row_groups, int64_t &num_row_groups, int64_t &start_row_group) { int workers = activityCtx->numSlaves(); int strands = activityCtx->numStrands(); @@ -355,6 +355,16 @@ namespace parquetembed } } + void ParquetHelper::chunkTable(std::shared_ptr &table) + { + auto columns = table->columns(); + parquet_table.clear(); + for (int i = 0; i < columns.size(); i++) + { + parquet_table.insert(std::make_pair(table->field(i)->name(), columns[i]->chunk(0))); + } + } + /** * @brief Sets the parquet_table member to the output of what is read from the given * parquet file. @@ -374,8 +384,10 @@ namespace parquetembed divide_row_groups(activityCtx, std::ceil(total_rows / batch_size), tableCount, start_row_group); if (tableCount != 0) { - PARQUET_ASSIGN_OR_THROW(parquet_table, queryRows()); - rowsCount = parquet_table->num_rows(); + std::shared_ptr table; + PARQUET_ASSIGN_OR_THROW(table, queryRows()); + rowsCount = table->num_rows(); + chunkTable(table); tablesProcessed++; } else @@ -387,12 +399,13 @@ namespace parquetembed { int total_row_groups = parquet_read->num_row_groups(); divide_row_groups(activityCtx, total_row_groups, tableCount, start_row_group); - rowsProcessed = 0; if (tableCount != 0) { - reportIfFailure(parquet_read->RowGroup(tablesProcessed + start_row_group)->ReadTable(&parquet_table)); - rowsCount = parquet_table->num_rows(); + std::shared_ptr table; + reportIfFailure(parquet_read->RowGroup(tablesProcessed + start_row_group)->ReadTable(&table)); + rowsCount = table->num_rows(); + chunkTable(table); tablesProcessed++; } else @@ -418,7 +431,7 @@ namespace parquetembed * * @return int Maximum size of the row group. */ - int ParquetHelper::getMaxRowSize() + int64_t ParquetHelper::getMaxRowSize() { return row_size; } @@ -446,7 +459,7 @@ namespace parquetembed return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount); } - int &ParquetHelper::getRowsProcessed() + int64_t &ParquetHelper::getRowsProcessed() { return rowsProcessed; } @@ -498,7 +511,7 @@ namespace parquetembed return std::move(arrow::Table::FromRecordBatches(std::move(to_table))); } - std::shared_ptr *ParquetHelper::next() + std::map> *ParquetHelper::next() { if (rowsProcessed == rowsCount) { @@ -507,15 +520,19 @@ namespace parquetembed // rowsProcessed starts at zero and we read in batches until it is equal to rowsCount rowsProcessed = 0; tablesProcessed++; - PARQUET_ASSIGN_OR_THROW(parquet_table, queryRows()); - rowsCount = parquet_table->num_rows(); + std::shared_ptr table; + PARQUET_ASSIGN_OR_THROW(table, queryRows()); + rowsCount = table->num_rows(); + chunkTable(table); } else { - reportIfFailure(parquet_read->RowGroup(tablesProcessed + start_row_group)->ReadTable(&parquet_table)); + std::shared_ptr table; + reportIfFailure(parquet_read->RowGroup(tablesProcessed + start_row_group)->ReadTable(&table)); rowsProcessed = 0; tablesProcessed++; - rowsCount = parquet_table->num_rows(); + rowsCount = table->num_rows(); + chunkTable(table); } } return &parquet_table; @@ -745,7 +762,7 @@ namespace parquetembed auto table = s_parquet->next(); m_currentRow++; - if ((*table).get()) + if (table) { ParquetRowBuilder pRowBuilder(table, s_parquet->getRowsProcessed()++, &array_visitor); @@ -806,7 +823,7 @@ namespace parquetembed */ bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -830,7 +847,7 @@ namespace parquetembed */ void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len, void *&result) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -859,7 +876,7 @@ namespace parquetembed */ double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -912,7 +929,7 @@ namespace parquetembed */ __int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -935,7 +952,7 @@ namespace parquetembed */ unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -960,7 +977,7 @@ namespace parquetembed */ void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &chars, char *&result) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -990,7 +1007,7 @@ namespace parquetembed */ void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char *&result) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -1021,7 +1038,7 @@ namespace parquetembed */ void ParquetRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar *&result) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -1051,7 +1068,7 @@ namespace parquetembed */ void ParquetRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &value) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 0) { @@ -1083,7 +1100,7 @@ namespace parquetembed void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll) { isAll = false; // ALL not supported - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 8) { @@ -1133,7 +1150,7 @@ namespace parquetembed { if (strncmp(xpath, "", 5) != 0) { - reportIfFailure(nextField(field)); + nextField(field); if ((*array_visitor)->type == 9) { m_pathStack.push_back(PathTracker(field->name, (*array_visitor)->struct_arr, CPNTScalar)); @@ -1218,46 +1235,29 @@ namespace parquetembed } } - arrow::Status ParquetRowBuilder::nextFromStruct(const RtlFieldInfo *field) + void ParquetRowBuilder::nextFromStruct(const RtlFieldInfo *field) { auto structPtr = m_pathStack.back().structPtr; reportIfFailure(structPtr->Accept((*array_visitor).get())); if (m_pathStack.back().nodeType == CPNTScalar) { auto child = (*array_visitor)->struct_arr->GetFieldByName(field->name); - return child->Accept((*array_visitor).get()); + reportIfFailure(child->Accept((*array_visitor).get())); } else if (m_pathStack.back().nodeType == CPNTSet) { auto child = (*array_visitor)->list_arr->value_slice(currentRow); - return child->Accept((*array_visitor).get()); - } - else - { - return arrow::Status::TypeError("Invalid nested type."); + reportIfFailure(child->Accept((*array_visitor).get())); } } - const std::shared_ptr getChunk(std::shared_ptr column, int ¤tRow) - { - int offset = 0; - int chunk_idx = 0; - std::shared_ptr chunk; - while (offset <= currentRow) - { - chunk = column->chunk(chunk_idx++); - offset += chunk->length(); - } - return chunk; - } - /** * @brief Gets the next field and processes it. * * @param field Information about the context of the next field. * @return const char* Result of building field. */ - arrow::Status ParquetRowBuilder::nextField(const RtlFieldInfo *field) + void ParquetRowBuilder::nextField(const RtlFieldInfo *field) { if (!field->name) { @@ -1265,10 +1265,15 @@ namespace parquetembed } if (m_pathStack.size() > 0) { - return nextFromStruct(field); + nextFromStruct(field); + return; + } + auto column = result_rows->find(field->name); + if (column != result_rows->end()) + { + reportIfFailure(column->second->Accept((*array_visitor).get())); + return; } - auto chunk = getChunk((*result_rows)->GetColumnByName(field->name), currentRow); - return chunk->Accept((*array_visitor).get()); } unsigned ParquetRecordBinder::checkNextParam(const RtlFieldInfo *field) @@ -1582,11 +1587,11 @@ namespace parquetembed : logctx(_logctx), m_NextRow(), m_nextParam(0), m_numParams(0), m_scriptFlags(_flags) { // Option Variables - const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition) - const char *location = ""; // file name and location of where to write parquet file - const char *destination = ""; // file name and location of where to read parquet file from - int rowsize = 20000; // Size of the row groups when writing to parquet files - int batchSize = 20000; // Size of the batches when converting parquet columns to rows + const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition) + const char *location = ""; // file name and location of where to write parquet file + const char *destination = ""; // file name and location of where to read parquet file from + int64_t rowsize = 2000000; // Size of the row groups when writing to parquet files + int64_t batchSize = 2000000; // Size of the batches when converting parquet columns to rows // Iterate through user options and save them StringArray inputOptions; inputOptions.appendList(options, ","); diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index ab2b7026119..5ae425fbe0a 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -704,21 +704,21 @@ namespace parquetembed arrow::Status openReadFile(); arrow::Status writePartition(std::shared_ptr table); std::unique_ptr *write(); + void chunkTable(std::shared_ptr &table); void read(); rapidjson::Value *doc(); void update_row(); std::vector *record_batch(); bool partSetting(); - int getMaxRowSize(); + int64_t getMaxRowSize(); char options(); bool shouldRead(); - int &getRowsProcessed(); + int64_t &getRowsProcessed(); arrow::Result> ConvertToRecordBatch(const std::vector &rows, std::shared_ptr schema); - std::shared_ptr *next(); + std::map> *next(); arrow::Result> queryRows(); int64_t num_rows(); std::shared_ptr makeChildRecord(const RtlFieldInfo *field); - arrow::Status TypeToNode(const RtlTypeInfo *field, std::shared_ptr arrow_field); arrow::Status FieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector> &arrow_fields); int countFields(const RtlTypeInfo *typeInfo); arrow::Status fieldsToSchema(const RtlTypeInfo *typeInfo); @@ -727,12 +727,12 @@ namespace parquetembed void end_row(const char *name); private: - int current_row; - int row_size; // The maximum size of each parquet row group. - int tablesProcessed; // Current RowGroup that has been read from the input file. - int rowsProcessed; // Current Row that has been read from the RowGroup - int start_row_group; // The beginning RowGroup that is read by a worker - int tableCount; // The number of RowGroups to be read by the worker from the file that was opened for reading. + int64_t current_row; + int64_t row_size; // The maximum size of each parquet row group. + int64_t tablesProcessed; // Current RowGroup that has been read from the input file. + int64_t rowsProcessed; // Current Row that has been read from the RowGroup + int64_t start_row_group; // The beginning RowGroup that is read by a worker + int64_t tableCount; // The number of RowGroups to be read by the worker from the file that was opened for reading. int64_t rowsCount; // The number of result rows in a given RowGroup read from the parquet file. size_t batch_size; // batch_size for converting Parquet Columns to ECL rows. It is more efficient to break the data into small batches for converting to rows than to convert all at once. bool partition; // Boolean variable to track whether we are writing partitioned files or not. @@ -749,7 +749,7 @@ namespace parquetembed std::shared_ptr rbatch_reader = nullptr; arrow::RecordBatchReader::RecordBatchReaderIterator rbatch_itr; std::unique_ptr parquet_read = nullptr; // FileReader for reading from parquet files. - std::shared_ptr parquet_table = nullptr; // Table for creating the iterator for outputing result rows. + std::map> parquet_table; arrow::MemoryPool *pool; }; @@ -783,8 +783,8 @@ namespace parquetembed class ParquetRowBuilder : public CInterfaceOf { public: - ParquetRowBuilder(std::shared_ptr *_result_rows, int _currentRow, std::shared_ptr *_array_visitor) - : result_rows(_result_rows), currentRow(_currentRow), array_visitor(_array_visitor) {} + ParquetRowBuilder(std::map> *_result_rows, int64_t _currentRow, std::shared_ptr *_array_visitor) + : result_rows(_result_rows), currentRow(_currentRow), array_visitor(_array_visitor), currentChunk(0), chunkOffset(0) {} virtual ~ParquetRowBuilder() = default; @@ -807,14 +807,17 @@ namespace parquetembed virtual void processEndRow(const RtlFieldInfo *field); protected: - arrow::Status nextField(const RtlFieldInfo *field); - arrow::Status nextFromStruct(const RtlFieldInfo *field); + const std::shared_ptr &getChunk(std::shared_ptr *column); + void nextField(const RtlFieldInfo *field); + void nextFromStruct(const RtlFieldInfo *field); void xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const; private: - int currentRow; + int64_t currentRow; + int64_t currentChunk; + int64_t chunkOffset; TokenDeserializer m_tokenDeserializer; - std::shared_ptr *result_rows; + std::map> *result_rows; std::vector m_pathStack; std::shared_ptr *array_visitor; };