Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30583 Fix parquet column reads #17946

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugins/parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ The Parquet Plugin offers the following main functions:

The Read function allows ECL programmers to create an ECL dataset from both regular and partitioned Parquet files. It leverages the Apache Arrow interface for Parquet to efficiently stream data from ECL to the plugin, ensuring optimized data transfer.

In order to read a Parquet file it is necessary to define the record structure of the file you intend to read with the names of the fields as stored in the Parquet file and the type that you wish to read them as. It is possible for a Parquet file to have field names that contain characters that are incompatible with the ECL field name definition. For example, ECL field names are case insensitive causing an issue when trying to read Parquet fields with uppercase letters. To read field names of this type an XPATH can be passed as seen in the following example:

```
layout := RECORD
STRING name;
STRING job_id {XPATH('jobID')};
END;

dataset := ParquetIO.Read(layout, '/source/directory/data.parquet');
```

Expand Down
13 changes: 7 additions & 6 deletions plugins/parquet/examples/taxi_data.ecl
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
IMPORT PARQUET;

EXPORT Layout := RECORD
STRING VendorID;
STRING VendorID {XPATH('VendorID')};
STRING tpep_pickup_datetime;
STRING tpep_dropoff_datetime;
STRING passenger_count;
STRING trip_distance;
STRING RatecodeID;
STRING RatecodeID {XPATH('RatecodeID')};
STRING store_and_fwd_flag;
STRING PULocationID;
STRING DOLocationID;
STRING PULocationID {XPATH('PULocationID')};
STRING DOLocationID {XPATH('DOLocationID')};
STRING payment_type;
STRING fare_amount;
STRING extra;
Expand All @@ -18,9 +18,10 @@ EXPORT Layout := RECORD
STRING tolls_amount;
STRING improvement_surcharge;
STRING total_amount;
STRING congestion_surcharge;
STRING airport_fee;
END;

tripData := '/datadrive/dev/test_data/yellow_tripdata_2017-01.parquet';
tripData := '/datadrive/dev/test_data/yellow_tripdata_2023-01.parquet';
read_in := ParquetIO.Read(Layout, tripData);
COUNT(read_in);
OUTPUT(CHOOSEN(read_in, 100));
4 changes: 2 additions & 2 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,6 @@ ParquetRowStream::ParquetRowStream(IEngineRowAllocator *_resultAllocator, std::s
: m_resultAllocator(_resultAllocator), s_parquet(_parquet)
{
rowsCount = _parquet->queryRowsCount();
array_visitor = std::make_shared<ParquetArrayVisitor>();
}

const void *ParquetRowStream::nextRow()
Expand Down Expand Up @@ -1337,7 +1336,8 @@ void ParquetRowBuilder::nextField(const RtlFieldInfo *field)
nextFromStruct(field);
return;
}
auto column = result_rows->find(field->name);
(*array_visitor) = std::make_shared<ParquetArrayVisitor>();
auto column = result_rows->find(field->xpath ? field->xpath : field->name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not new/future/question:

for records with many fields, this linear search by field name per column into an arrow::Array could be quite expensive by the looks of it.
I assume their positions in the array are invariant between calls to nextField for a given ParquetRowBuilder instance?
And are probably also invariant from row to row?

Can you prepopulate a map before ParquetRowStream::nextRow() is called with a table from RtlFieldInfo -> to position in the arrow::Array, to avoid these find's ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean by a table from RtlFieldInfo? The result_rows object is a prepopulated map with the field name of the column and an array containing the values for that column. You are correct in that the positions are invariant from row to row and between calls to nextField.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misread. I thought the code was performing a find in arrow::Array per field name.
I see now that it's a find in 'result_rows' which is an unordered_map, whose rhs is a arrow::Array.

if (column != result_rows->end())
{
reportIfFailure(column->second->Accept((*array_visitor).get()));
Expand Down
Loading