Skip to content

Commit

Permalink
Fix json inputs for drill windowing tests (apache#15148)
Browse files Browse the repository at this point in the history
This PR:

adds a flag to JsonToParquet to do the fix during conversion
updates the json files to more correct conents
some resultset mismatches were fixed by this
updates parquet to 1.13.1
  • Loading branch information
kgyrtkirk authored and ycp2 committed Nov 17, 2023
1 parent 469f6a8 commit 0a47c2c
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 445 deletions.
7 changes: 6 additions & 1 deletion extensions-core/parquet-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<parquet.version>1.12.0</parquet.version>
<parquet.version>1.13.1</parquet.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -162,6 +162,11 @@
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>com.github.rvesse</groupId>
<artifactId>airline</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.github.rvesse.airline.Cli;
import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.builder.CliBuilder;
import org.apache.druid.data.input.parquet.simple.ParquetGroupConverter;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
Expand All @@ -29,41 +34,74 @@
import org.apache.parquet.hadoop.example.GroupReadSupport;

import java.io.File;
import java.util.List;
import java.util.concurrent.Callable;

/**
* Converts parquet files into new-deliminated JSON object files. Takes a single argument (an input directory)
* and processes all files that end with a ".parquet" extension. Writes out a new file in the same directory named
* by appending ".json" to the old file name. Will overwrite any output file that already exists.
* Converts parquet files into new-deliminated JSON object files. Takes a single
* argument (an input directory) and processes all files that end with a
* ".parquet" extension. Writes out a new file in the same directory named by
* appending ".json" to the old file name. Will overwrite any output file that
* already exists.
*/
public class ParquetToJson
@Command(name = "ParquetToJson")
public class ParquetToJson implements Callable<Void>
{

@Option(name = "--convert-corrupt-dates")
public boolean convertCorruptDates = false;

@Arguments(description = "directory")
public List<String> directories;


public static void main(String[] args) throws Exception
{
if (args.length != 1) {
throw new IAE("Usage: directory");
CliBuilder<Callable> builder = Cli.builder("ParquetToJson");
builder.withDefaultCommand(ParquetToJson.class);
builder.build().parse(args).call();
}

private File[] getInputFiles()
{
if (directories == null || directories.size() != 1) {
throw new IAE("Only one directory argument is supported!");
}

ParquetGroupConverter converter = new ParquetGroupConverter(true);
File dir = new File(directories.get(0));
if (!dir.isDirectory()) {
throw new IAE("Not a directory [%s]", dir);
}
File[] inputFiles = dir.listFiles(
pathname -> pathname.getName().endsWith(".parquet"));
if (inputFiles == null || inputFiles.length == 0) {
throw new IAE("No parquet files in directory [%s]", dir);
}
return inputFiles;
}

@Override
public Void call() throws Exception
{
ObjectMapper mapper = new DefaultObjectMapper();

File[] inputFiles = new File(args[0]).listFiles(
pathname -> pathname.getName().endsWith(".parquet")
);
File[] inputFiles = getInputFiles();

for (File inputFile : inputFiles) {
File outputFile = new File(inputFile.getAbsolutePath() + ".json");

try (
final org.apache.parquet.hadoop.ParquetReader<Group> reader = org.apache.parquet.hadoop.ParquetReader
.builder(new GroupReadSupport(), new Path(inputFile.toURI()))
.build();
final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)
) {
final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)) {
ParquetGroupConverter converter = new ParquetGroupConverter(true, convertCorruptDates);
Group group;
while ((group = reader.read()) != null) {
writer.write(converter.convertGroup(group));
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,55 @@ public class ParquetGroupConverter
private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

/**
* See {@link ParquetGroupConverter#convertField(Group, String)}
* https://github.com/apache/drill/blob/2ab46a9411a52f12a0f9acb1144a318059439bc4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java#L89
*/
public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_EPOCH_OFFSET_DAYS;

private final boolean binaryAsString;
private final boolean convertCorruptDates;

public ParquetGroupConverter(boolean binaryAsString, boolean convertCorruptDates)
{
this.binaryAsString = binaryAsString;
this.convertCorruptDates = convertCorruptDates;
}

/**
* Recursively converts a group into native Java Map
*
* @param g the group
* @return the native Java object
*/
public Object convertGroup(Group g)
{
Map<String, Object> retVal = new LinkedHashMap<>();

for (Type field : g.getType().getFields()) {
final String fieldName = field.getName();
retVal.put(fieldName, convertField(g, fieldName));
}

return retVal;
}

Object unwrapListElement(Object o)
{
if (o instanceof Group) {
Group g = (Group) o;
return convertListElement(g);
}
return o;
}

/**
* Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
* into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
* {@link ParquetGroupConverter#convertLogicalMap}), repeated fields will also be translated to lists, and
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*/
@Nullable
private static Object convertField(Group g, String fieldName, boolean binaryAsString)
Object convertField(Group g, String fieldName)
{
if (!g.getType().containsField(fieldName)) {
return null;
Expand All @@ -76,22 +121,22 @@ private static Object convertField(Group g, String fieldName, boolean binaryAsSt
int repeated = g.getFieldRepetitionCount(fieldIndex);
List<Object> vals = new ArrayList<>();
for (int i = 0; i < repeated; i++) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
}
return vals;
}
return convertPrimitiveField(g, fieldIndex, binaryAsString);
return convertPrimitiveField(g, fieldIndex);
} else {
if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
return convertRepeatedFieldToList(g, fieldIndex, binaryAsString);
return convertRepeatedFieldToList(g, fieldIndex);
}

if (isLogicalMapType(fieldType)) {
return convertLogicalMap(g.getGroup(fieldIndex, 0), binaryAsString);
return convertLogicalMap(g.getGroup(fieldIndex, 0));
}

if (isLogicalListType(fieldType)) {
return convertLogicalList(g.getGroup(fieldIndex, 0), binaryAsString);
return convertLogicalList(g.getGroup(fieldIndex, 0));
}

// not a list, but not a primitive, return the nested group type
Expand All @@ -102,7 +147,7 @@ private static Object convertField(Group g, String fieldName, boolean binaryAsSt
/**
* convert a repeated field into a list of primitives or groups
*/
private static List<Object> convertRepeatedFieldToList(Group g, int fieldIndex, boolean binaryAsString)
private List<Object> convertRepeatedFieldToList(Group g, int fieldIndex)
{

Type t = g.getType().getFields().get(fieldIndex);
Expand All @@ -111,7 +156,7 @@ private static List<Object> convertRepeatedFieldToList(Group g, int fieldIndex,
List<Object> vals = new ArrayList<>();
for (int i = 0; i < repeated; i++) {
if (t.isPrimitive()) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
} else {
vals.add(g.getGroup(fieldIndex, i));
}
Expand All @@ -134,7 +179,7 @@ private static boolean isLogicalListType(Type listType)
/**
* convert a parquet 'list' logical type {@link Group} to a java list of primitives or groups
*/
private static List<Object> convertLogicalList(Group g, boolean binaryAsString)
private List<Object> convertLogicalList(Group g)
{
/*
// List<Integer> (nullable list, non-null elements)
Expand Down Expand Up @@ -181,16 +226,16 @@ optional group my_list (LIST) {

for (int i = 0; i < repeated; i++) {
if (isListItemPrimitive) {
vals.add(convertPrimitiveField(g, 0, i, binaryAsString));
vals.add(convertPrimitiveField(g, 0, i));
} else {
Group listItem = g.getGroup(0, i);
vals.add(convertListElement(listItem, binaryAsString));
vals.add(convertListElement(listItem));
}
}
return vals;
}

private static Object convertListElement(Group listItem, boolean binaryAsString)
private Object convertListElement(Group listItem)
{
if (
listItem.getType().isRepetition(Type.Repetition.REPEATED) &&
Expand All @@ -199,7 +244,7 @@ private static Object convertListElement(Group listItem, boolean binaryAsString)
listItem.getType().getFields().get(0).isPrimitive()
) {
// nullable primitive list elements can have a repeating wrapper element, peel it off
return convertPrimitiveField(listItem, 0, binaryAsString);
return convertPrimitiveField(listItem, 0);
} else if (
listItem.getType().isRepetition(Type.Repetition.REPEATED) &&
listItem.getType().getFieldCount() == 1 &&
Expand Down Expand Up @@ -244,7 +289,7 @@ private static boolean isLogicalMapType(Type groupType)
/**
* Convert a parquet 'map' logical type {@link Group} to a java map of string keys to groups/lists/primitive values
*/
private static Map<String, Object> convertLogicalMap(Group g, boolean binaryAsString)
private Map<String, Object> convertLogicalMap(Group g)
{
/*
// Map<String, Integer> (nullable map, non-null values)
Expand All @@ -268,8 +313,8 @@ optional group my_map (MAP_KEY_VALUE) {(
Map<String, Object> converted = new HashMap<>();
for (int i = 0; i < mapEntries; i++) {
Group mapEntry = g.getGroup(0, i);
String key = convertPrimitiveField(mapEntry, 0, binaryAsString).toString();
Object value = convertField(mapEntry, "value", binaryAsString);
String key = convertPrimitiveField(mapEntry, 0).toString();
Object value = convertField(mapEntry, "value");
converted.put(key, value);
}
return converted;
Expand All @@ -281,17 +326,17 @@ optional group my_map (MAP_KEY_VALUE) {(
* @return "ingestion ready" java object, or null
*/
@Nullable
private static Object convertPrimitiveField(Group g, int fieldIndex, boolean binaryAsString)
private Object convertPrimitiveField(Group g, int fieldIndex)
{
PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(fieldIndex);
if (pt.isRepetition(Type.Repetition.REPEATED) && g.getFieldRepetitionCount(fieldIndex) > 1) {
List<Object> vals = new ArrayList<>();
for (int i = 0; i < g.getFieldRepetitionCount(fieldIndex); i++) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
}
return vals;
}
return convertPrimitiveField(g, fieldIndex, 0, binaryAsString);
return convertPrimitiveField(g, fieldIndex, 0);
}

/**
Expand All @@ -300,7 +345,7 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, boolean bin
* @return "ingestion ready" java object, or null
*/
@Nullable
private static Object convertPrimitiveField(Group g, int fieldIndex, int index, boolean binaryAsString)
private Object convertPrimitiveField(Group g, int fieldIndex, int index)
{
PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(fieldIndex);
OriginalType ot = pt.getOriginalType();
Expand All @@ -310,7 +355,7 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, int index,
// convert logical types
switch (ot) {
case DATE:
long ts = g.getInteger(fieldIndex, index) * MILLIS_IN_DAY;
long ts = convertDateToMillis(g.getInteger(fieldIndex, index));
return ts;
case TIME_MICROS:
return g.getLong(fieldIndex, index);
Expand Down Expand Up @@ -443,6 +488,14 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, int index,
}
}

private long convertDateToMillis(int value)
{
if (convertCorruptDates) {
value -= CORRECT_CORRUPT_DATE_SHIFT;
}
return value * MILLIS_IN_DAY;
}

/**
* convert deprecated parquet int96 nanosecond timestamp to a long, based on
* https://github.com/prestodb/presto/blob/master/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTimestampUtils.java#L44
Expand Down Expand Up @@ -490,51 +543,4 @@ private static BigDecimal convertBinaryToDecimal(Binary value, int precision, in
return new BigDecimal(new BigInteger(value.getBytes()), scale);
}
}

private final boolean binaryAsString;

public ParquetGroupConverter(boolean binaryAsString)
{
this.binaryAsString = binaryAsString;
}

/**
* Recursively converts a group into native Java Map
*
* @param g the group
* @return the native Java object
*/
public Object convertGroup(Group g)
{
Map<String, Object> retVal = new LinkedHashMap<>();

for (Type field : g.getType().getFields()) {
final String fieldName = field.getName();
retVal.put(fieldName, convertField(g, fieldName));
}

return retVal;
}

/**
* Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
* into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
* {@link ParquetGroupConverter#convertLogicalMap}), repeated fields will also be translated to lists, and
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*/
@Nullable
Object convertField(Group g, String fieldName)
{
return convertField(g, fieldName, binaryAsString);
}

Object unwrapListElement(Object o)
{
if (o instanceof Group) {
Group g = (Group) o;
return convertListElement(g, binaryAsString);
}
return o;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMak

public ParquetGroupFlattenerMaker(boolean binaryAsString, boolean discoverNestedFields)
{
this.converter = new ParquetGroupConverter(binaryAsString);
this.converter = new ParquetGroupConverter(binaryAsString, false);
this.parquetJsonProvider = new ParquetGroupJsonProvider(converter);
this.jsonPathConfiguration = Configuration.builder()
.jsonProvider(parquetJsonProvider)
Expand Down
Loading

0 comments on commit 0a47c2c

Please sign in to comment.