Skip to content

Commit

Permalink
HPCC-30458 Parquet plugin needs timestamp support
Browse files Browse the repository at this point in the history
  • Loading branch information
jackdelv committed Oct 11, 2023
1 parent a799031 commit 24a2547
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 115 deletions.
26 changes: 26 additions & 0 deletions plugins/parquet/examples/taxi_data.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
IMPORT PARQUET;

EXPORT Layout := RECORD
STRING VendorID;
STRING tpep_pickup_datetime;
STRING tpep_dropoff_datetime;
STRING passenger_count;
STRING trip_distance;
STRING RatecodeID;
STRING store_and_fwd_flag;
STRING PULocationID;
STRING DOLocationID;
STRING payment_type;
STRING fare_amount;
STRING extra;
STRING mta_tax;
STRING tip_amount;
STRING tolls_amount;
STRING improvement_surcharge;
STRING total_amount;
END;

tripData := '/datadrive/dev/test_data/yellow_tripdata_2017-01.parquet';
read_in := ParquetIO.Read(Layout, tripData);
COUNT(read_in);
OUTPUT(CHOOSEN(read_in, 100));
237 changes: 127 additions & 110 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,91 @@ int64_t ParquetRowBuilder::currArrayIndex()
return !m_pathStack.empty() && m_pathStack.back().nodeType == CPNTSet ? m_pathStack.back().childrenProcessed++ : currentRow;
}

__int64 getSigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->int8_arr->Value(index);
case 16:
return (*array_visitor)->int16_arr->Value(index);
case 32:
return (*array_visitor)->int32_arr->Value(index);
case 64:
return (*array_visitor)->int64_arr->Value(index);
default:
failx("getSigned: Invalid size %i", (*array_visitor)->size);
}
}

unsigned __int64 getUnsigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->uint8_arr->Value(index);
case 16:
return (*array_visitor)->uint16_arr->Value(index);
case 32:
return (*array_visitor)->uint32_arr->Value(index);
case 64:
return (*array_visitor)->uint64_arr->Value(index);
default:
failx("getUnsigned: Invalid size %i", (*array_visitor)->size);
}
}

std::string_view ParquetRowBuilder::getCurrView(const RtlFieldInfo *field)
{
serialized.clear();

switch((*array_visitor)->type)
{
case BoolType:
tokenSerializer.serialize((*array_visitor)->bool_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case BinaryType:
return (*array_visitor)->bin_arr->GetView(currArrayIndex());
case LargeBinaryType:
return (*array_visitor)->large_bin_arr->GetView(currArrayIndex());
case DoubleType:
tokenSerializer.serialize((*array_visitor)->double_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case HalfFloatType:
tokenSerializer.serialize((*array_visitor)->half_float_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case FloatType:
tokenSerializer.serialize((*array_visitor)->float_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case IntType:
tokenSerializer.serialize(getSigned(array_visitor, currArrayIndex()), serialized);
return serialized.str();
case UIntType:
tokenSerializer.serialize(getUnsigned(array_visitor, currArrayIndex()), serialized);
return serialized.str();
case DateType:
tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->date32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->date64_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case TimestampType:
tokenSerializer.serialize((__int64) (*array_visitor)->timestamp_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case TimeType:
tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->time32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->time64_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case DurationType:
tokenSerializer.serialize((__int64) (*array_visitor)->duration_arr->Value(currArrayIndex()), serialized);
return serialized.str();
case StringType:
return (*array_visitor)->string_arr->GetView(currArrayIndex());
case LargeStringType:
return (*array_visitor)->large_string_arr->GetView(currArrayIndex());
case DecimalType:
return (*array_visitor)->size == 128 ? (*array_visitor)->dec_arr->GetView(currArrayIndex()) : (*array_visitor)->large_dec_arr->GetView(currArrayIndex());
default:
failx("Unimplemented Parquet type for field with name %s.", field->name);
}
}

/**
* @brief Gets a Boolean result for an ECL Row
*
Expand All @@ -853,11 +938,10 @@ bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field)
NullFieldProcessor p(field);
return p.boolResult;
}
if ((*array_visitor)->type != BoolType)
{
failx("Incorrect type for field %s.", field->name);
}
return (*array_visitor)->bool_arr->Value(currArrayIndex());
bool mybool;
auto scalar = getCurrView(field);
handleDeserializeOutcome(tokenDeserializer.deserialize(scalar.data(), mybool), "bool", scalar.data());
return mybool;
}

/**
Expand All @@ -877,16 +961,10 @@ void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len,
rtlUtf8ToDataX(len, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == BinaryType)
{
auto view = (*array_visitor)->large_bin_arr->GetView(currArrayIndex());
rtlStrToDataX(len, result, view.size(), view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}

auto view = getCurrView(field);
rtlStrToDataX(len, result, view.size(), view.data());
return;
}

/**
Expand All @@ -904,45 +982,11 @@ double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field)
NullFieldProcessor p(field);
return p.doubleResult;
}
if ((*array_visitor)->type != DoubleType)
{
failx("Incorrect type for field %s.", field->name);
}
return (*array_visitor)->double_arr->Value(currArrayIndex());
}

__int64 getSigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->int8_arr->Value(index);
case 16:
return (*array_visitor)->int16_arr->Value(index);
case 32:
return (*array_visitor)->int32_arr->Value(index);
case 64:
return (*array_visitor)->int64_arr->Value(index);
default:
failx("getSigned: Invalid size %i", (*array_visitor)->size);
}
}

unsigned __int64 getUnsigned(std::shared_ptr<ParquetArrayVisitor> *array_visitor, int index)
{
switch ((*array_visitor)->size)
{
case 8:
return (*array_visitor)->uint8_arr->Value(index);
case 16:
return (*array_visitor)->uint16_arr->Value(index);
case 32:
return (*array_visitor)->uint32_arr->Value(index);
case 64:
return (*array_visitor)->uint64_arr->Value(index);
default:
failx("getUnsigned: Invalid size %i", (*array_visitor)->size);
}
double mydouble = 0.0;
auto value = getCurrView(field);
handleDeserializeOutcome(tokenDeserializer.deserialize(value.data(), mydouble), "real", value.data());
return mydouble;
}

/**
Expand All @@ -958,13 +1002,13 @@ __int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field)
if ((*array_visitor)->type == NullType)
{
NullFieldProcessor p(field);
return p.uintResult;
return p.intResult;
}
if ((*array_visitor)->type != IntType)
{
failx("Incorrect type for field %s.", field->name);
}
return getSigned(array_visitor, currArrayIndex());

__int64 myint64 = 0;
auto scalar = getCurrView(field);
handleDeserializeOutcome(tokenDeserializer.deserialize(scalar.data(), myint64), "signed", scalar.data());
return myint64;
}

/**
Expand All @@ -983,11 +1027,11 @@ unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field)
NullFieldProcessor p(field);
return p.uintResult;
}
if ((*array_visitor)->type != UIntType)
{
failx("Incorrect type for field %s.", field->name);
}
return getUnsigned(array_visitor, currArrayIndex());

unsigned __int64 myuint64 = 0;
auto scalar = getCurrView(field);
handleDeserializeOutcome(tokenDeserializer.deserialize(scalar.data(), myuint64), "unsigned", scalar.data());
return myuint64;
}

/**
Expand All @@ -1007,17 +1051,10 @@ void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &cha
rtlUtf8ToStrX(chars, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToStrX(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToStrX(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1037,17 +1074,10 @@ void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars
rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUtf8X(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUtf8X(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1067,17 +1097,10 @@ void ParquetRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &ch
rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto view = (*array_visitor)->string_arr->GetView(currArrayIndex());
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUnicodeX(chars, result, numchars, view.data());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}
auto view = getCurrView(field);
unsigned numchars = rtlUtf8Length(view.size(), view.data());
rtlUtf8ToUnicodeX(chars, result, numchars, view.data());
return;
}

/**
Expand All @@ -1096,18 +1119,12 @@ void ParquetRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &val
value.set(p.decimalResult);
return;
}
if ((*array_visitor)->type == StringType)
{
auto dvalue = (*array_visitor)->string_arr->GetView(currArrayIndex());
value.setString(dvalue.size(), dvalue.data());
RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type;
value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
return;
}
else
{
failx("Incorrect type for field %s.", field->name);
}

auto dvalue = getCurrView(field);
value.setString(dvalue.size(), dvalue.data());
RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type;
value.setPrecision(dtype->getDecimalDigits(), dtype->getDecimalPrecision());
return;
}

/**
Expand All @@ -1129,7 +1146,7 @@ void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll)
}
else
{
failx("Incorrect type for field %s.", field->name);
failx("Error reading nested set with name %s.", field->name);
}
}

Expand Down
Loading

0 comments on commit 24a2547

Please sign in to comment.