From dbbd748e40af9a0036b4998a7e8a21a6dc182825 Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Fri, 27 Oct 2023 12:44:10 +0000 Subject: [PATCH] HPCC-30583 Fix parquet column reads --- plugins/parquet/README.md | 7 +++++++ plugins/parquet/examples/taxi_data.ecl | 13 +++++++------ plugins/parquet/parquetembed.cpp | 4 ++-- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/plugins/parquet/README.md b/plugins/parquet/README.md index 241d89d7a51..68b47109065 100644 --- a/plugins/parquet/README.md +++ b/plugins/parquet/README.md @@ -32,7 +32,14 @@ The Parquet Plugin offers the following main functions: The Read function allows ECL programmers to create an ECL dataset from both regular and partitioned Parquet files. It leverages the Apache Arrow interface for Parquet to efficiently stream data from ECL to the plugin, ensuring optimized data transfer. +In order to read a Parquet file it is necessary to define the record structure of the file you intend to read with the names of the fields as stored in the Parquet file and the type that you wish to read them as. It is possible for a Parquet file to have field names that contain characters that are incompatible with the ECL field name definition. For example, ECL field names are case insensitive causing an issue when trying to read Parquet fields with uppercase letters. To read field names of this type an XPATH can be passed as seen in the following example: + ``` +layout := RECORD + STRING name; + STRING job_id {XPATH('jobID')}; +END; + dataset := ParquetIO.Read(layout, '/source/directory/data.parquet'); ``` diff --git a/plugins/parquet/examples/taxi_data.ecl b/plugins/parquet/examples/taxi_data.ecl index b2ed9deef06..65b78b389c9 100644 --- a/plugins/parquet/examples/taxi_data.ecl +++ b/plugins/parquet/examples/taxi_data.ecl @@ -1,15 +1,15 @@ IMPORT PARQUET; EXPORT Layout := RECORD - STRING VendorID; + STRING VendorID {XPATH('VendorID')}; STRING tpep_pickup_datetime; STRING tpep_dropoff_datetime; STRING passenger_count; STRING trip_distance; - STRING RatecodeID; + STRING RatecodeID {XPATH('RatecodeID')}; STRING store_and_fwd_flag; - STRING PULocationID; - STRING DOLocationID; + STRING PULocationID {XPATH('PULocationID')}; + STRING DOLocationID {XPATH('DOLocationID')}; STRING payment_type; STRING fare_amount; STRING extra; @@ -18,9 +18,10 @@ EXPORT Layout := RECORD STRING tolls_amount; STRING improvement_surcharge; STRING total_amount; + STRING congestion_surcharge; + STRING airport_fee; END; -tripData := '/datadrive/dev/test_data/yellow_tripdata_2017-01.parquet'; +tripData := '/datadrive/dev/test_data/yellow_tripdata_2023-01.parquet'; read_in := ParquetIO.Read(Layout, tripData); -COUNT(read_in); OUTPUT(CHOOSEN(read_in, 100)); \ No newline at end of file diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index 749b14d35b6..8f27a9dd75b 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -770,7 +770,6 @@ ParquetRowStream::ParquetRowStream(IEngineRowAllocator *_resultAllocator, std::s : m_resultAllocator(_resultAllocator), s_parquet(_parquet) { rowsCount = _parquet->queryRowsCount(); - array_visitor = std::make_shared(); } const void *ParquetRowStream::nextRow() @@ -1337,7 +1336,8 @@ void ParquetRowBuilder::nextField(const RtlFieldInfo *field) nextFromStruct(field); return; } - auto column = result_rows->find(field->name); + (*array_visitor) = std::make_shared(); + auto column = result_rows->find(field->xpath ? field->xpath : field->name); if (column != result_rows->end()) { reportIfFailure(column->second->Accept((*array_visitor).get()));