Skip to content

Commit

Permalink
HPCC-30583 Fix parquet column reads
Browse files Browse the repository at this point in the history
  • Loading branch information
jackdelv committed Oct 27, 2023
1 parent 7d8b69a commit dbbd748
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
7 changes: 7 additions & 0 deletions plugins/parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ The Parquet Plugin offers the following main functions:

The Read function allows ECL programmers to create an ECL dataset from both regular and partitioned Parquet files. It leverages the Apache Arrow interface for Parquet to efficiently stream data from ECL to the plugin, ensuring optimized data transfer.

In order to read a Parquet file it is necessary to define the record structure of the file you intend to read with the names of the fields as stored in the Parquet file and the type that you wish to read them as. It is possible for a Parquet file to have field names that contain characters that are incompatible with the ECL field name definition. For example, ECL field names are case insensitive causing an issue when trying to read Parquet fields with uppercase letters. To read field names of this type an XPATH can be passed as seen in the following example:

```
layout := RECORD
STRING name;
STRING job_id {XPATH('jobID')};
END;
dataset := ParquetIO.Read(layout, '/source/directory/data.parquet');
```

Expand Down
13 changes: 7 additions & 6 deletions plugins/parquet/examples/taxi_data.ecl
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
IMPORT PARQUET;

EXPORT Layout := RECORD
STRING VendorID;
STRING VendorID {XPATH('VendorID')};
STRING tpep_pickup_datetime;
STRING tpep_dropoff_datetime;
STRING passenger_count;
STRING trip_distance;
STRING RatecodeID;
STRING RatecodeID {XPATH('RatecodeID')};
STRING store_and_fwd_flag;
STRING PULocationID;
STRING DOLocationID;
STRING PULocationID {XPATH('PULocationID')};
STRING DOLocationID {XPATH('DOLocationID')};
STRING payment_type;
STRING fare_amount;
STRING extra;
Expand All @@ -18,9 +18,10 @@ EXPORT Layout := RECORD
STRING tolls_amount;
STRING improvement_surcharge;
STRING total_amount;
STRING congestion_surcharge;
STRING airport_fee;
END;

tripData := '/datadrive/dev/test_data/yellow_tripdata_2017-01.parquet';
tripData := '/datadrive/dev/test_data/yellow_tripdata_2023-01.parquet';
read_in := ParquetIO.Read(Layout, tripData);
COUNT(read_in);
OUTPUT(CHOOSEN(read_in, 100));
4 changes: 2 additions & 2 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,6 @@ ParquetRowStream::ParquetRowStream(IEngineRowAllocator *_resultAllocator, std::s
: m_resultAllocator(_resultAllocator), s_parquet(_parquet)
{
rowsCount = _parquet->queryRowsCount();
array_visitor = std::make_shared<ParquetArrayVisitor>();
}

const void *ParquetRowStream::nextRow()
Expand Down Expand Up @@ -1337,7 +1336,8 @@ void ParquetRowBuilder::nextField(const RtlFieldInfo *field)
nextFromStruct(field);
return;
}
auto column = result_rows->find(field->name);
(*array_visitor) = std::make_shared<ParquetArrayVisitor>();
auto column = result_rows->find(field->xpath ? field->xpath : field->name);
if (column != result_rows->end())
{
reportIfFailure(column->second->Accept((*array_visitor).get()));
Expand Down

0 comments on commit dbbd748

Please sign in to comment.