Skip to content

Commit

Permalink
[Backport] Fix json inputs for drill windowing tests (#15148) (#15211)
Browse files Browse the repository at this point in the history
This PR:

adds a flag to JsonToParquet to do the fix during conversion
updates the json files to more correct conents
some resultset mismatches were fixed by this
updates parquet to 1.13.1

(cherry picked from commit 9fb0dbf)
  • Loading branch information
kgyrtkirk authored Oct 20, 2023
1 parent 13c4d90 commit 4882fb0
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 445 deletions.
7 changes: 6 additions & 1 deletion extensions-core/parquet-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<parquet.version>1.12.0</parquet.version>
<parquet.version>1.13.1</parquet.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -162,6 +162,11 @@
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>com.github.rvesse</groupId>
<artifactId>airline</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.github.rvesse.airline.Cli;
import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.builder.CliBuilder;
import org.apache.druid.data.input.parquet.simple.ParquetGroupConverter;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
Expand All @@ -29,41 +34,74 @@
import org.apache.parquet.hadoop.example.GroupReadSupport;

import java.io.File;
import java.util.List;
import java.util.concurrent.Callable;

/**
* Converts parquet files into new-deliminated JSON object files. Takes a single argument (an input directory)
* and processes all files that end with a ".parquet" extension. Writes out a new file in the same directory named
* by appending ".json" to the old file name. Will overwrite any output file that already exists.
* Converts parquet files into new-deliminated JSON object files. Takes a single
* argument (an input directory) and processes all files that end with a
* ".parquet" extension. Writes out a new file in the same directory named by
* appending ".json" to the old file name. Will overwrite any output file that
* already exists.
*/
public class ParquetToJson
@Command(name = "ParquetToJson")
public class ParquetToJson implements Callable<Void>
{

@Option(name = "--convert-corrupt-dates")
public boolean convertCorruptDates = false;

@Arguments(description = "directory")
public List<String> directories;


public static void main(String[] args) throws Exception
{
if (args.length != 1) {
throw new IAE("Usage: directory");
CliBuilder<Callable> builder = Cli.builder("ParquetToJson");
builder.withDefaultCommand(ParquetToJson.class);
builder.build().parse(args).call();
}

private File[] getInputFiles()
{
if (directories == null || directories.size() != 1) {
throw new IAE("Only one directory argument is supported!");
}

ParquetGroupConverter converter = new ParquetGroupConverter(true);
File dir = new File(directories.get(0));
if (!dir.isDirectory()) {
throw new IAE("Not a directory [%s]", dir);
}
File[] inputFiles = dir.listFiles(
pathname -> pathname.getName().endsWith(".parquet"));
if (inputFiles == null || inputFiles.length == 0) {
throw new IAE("No parquet files in directory [%s]", dir);
}
return inputFiles;
}

@Override
public Void call() throws Exception
{
ObjectMapper mapper = new DefaultObjectMapper();

File[] inputFiles = new File(args[0]).listFiles(
pathname -> pathname.getName().endsWith(".parquet")
);
File[] inputFiles = getInputFiles();

for (File inputFile : inputFiles) {
File outputFile = new File(inputFile.getAbsolutePath() + ".json");

try (
final org.apache.parquet.hadoop.ParquetReader<Group> reader = org.apache.parquet.hadoop.ParquetReader
.builder(new GroupReadSupport(), new Path(inputFile.toURI()))
.build();
final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)
) {
final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)) {
ParquetGroupConverter converter = new ParquetGroupConverter(true, convertCorruptDates);
Group group;
while ((group = reader.read()) != null) {
writer.write(converter.convertGroup(group));
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,55 @@ public class ParquetGroupConverter
private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

/**
* See {@link ParquetGroupConverter#convertField(Group, String)}
* https://github.com/apache/drill/blob/2ab46a9411a52f12a0f9acb1144a318059439bc4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java#L89
*/
public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_EPOCH_OFFSET_DAYS;

private final boolean binaryAsString;
private final boolean convertCorruptDates;

public ParquetGroupConverter(boolean binaryAsString, boolean convertCorruptDates)
{
this.binaryAsString = binaryAsString;
this.convertCorruptDates = convertCorruptDates;
}

/**
* Recursively converts a group into native Java Map
*
* @param g the group
* @return the native Java object
*/
public Object convertGroup(Group g)
{
Map<String, Object> retVal = new LinkedHashMap<>();

for (Type field : g.getType().getFields()) {
final String fieldName = field.getName();
retVal.put(fieldName, convertField(g, fieldName));
}

return retVal;
}

Object unwrapListElement(Object o)
{
if (o instanceof Group) {
Group g = (Group) o;
return convertListElement(g);
}
return o;
}

/**
* Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
* into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
* {@link ParquetGroupConverter#convertLogicalMap}), repeated fields will also be translated to lists, and
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*/
@Nullable
private static Object convertField(Group g, String fieldName, boolean binaryAsString)
Object convertField(Group g, String fieldName)
{
if (!g.getType().containsField(fieldName)) {
return null;
Expand All @@ -76,22 +121,22 @@ private static Object convertField(Group g, String fieldName, boolean binaryAsSt
int repeated = g.getFieldRepetitionCount(fieldIndex);
List<Object> vals = new ArrayList<>();
for (int i = 0; i < repeated; i++) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
}
return vals;
}
return convertPrimitiveField(g, fieldIndex, binaryAsString);
return convertPrimitiveField(g, fieldIndex);
} else {
if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
return convertRepeatedFieldToList(g, fieldIndex, binaryAsString);
return convertRepeatedFieldToList(g, fieldIndex);
}

if (isLogicalMapType(fieldType)) {
return convertLogicalMap(g.getGroup(fieldIndex, 0), binaryAsString);
return convertLogicalMap(g.getGroup(fieldIndex, 0));
}

if (isLogicalListType(fieldType)) {
return convertLogicalList(g.getGroup(fieldIndex, 0), binaryAsString);
return convertLogicalList(g.getGroup(fieldIndex, 0));
}

// not a list, but not a primitive, return the nested group type
Expand All @@ -102,7 +147,7 @@ private static Object convertField(Group g, String fieldName, boolean binaryAsSt
/**
* convert a repeated field into a list of primitives or groups
*/
private static List<Object> convertRepeatedFieldToList(Group g, int fieldIndex, boolean binaryAsString)
private List<Object> convertRepeatedFieldToList(Group g, int fieldIndex)
{

Type t = g.getType().getFields().get(fieldIndex);
Expand All @@ -111,7 +156,7 @@ private static List<Object> convertRepeatedFieldToList(Group g, int fieldIndex,
List<Object> vals = new ArrayList<>();
for (int i = 0; i < repeated; i++) {
if (t.isPrimitive()) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
} else {
vals.add(g.getGroup(fieldIndex, i));
}
Expand All @@ -134,7 +179,7 @@ private static boolean isLogicalListType(Type listType)
/**
* convert a parquet 'list' logical type {@link Group} to a java list of primitives or groups
*/
private static List<Object> convertLogicalList(Group g, boolean binaryAsString)
private List<Object> convertLogicalList(Group g)
{
/*
// List<Integer> (nullable list, non-null elements)
Expand Down Expand Up @@ -181,16 +226,16 @@ optional group my_list (LIST) {

for (int i = 0; i < repeated; i++) {
if (isListItemPrimitive) {
vals.add(convertPrimitiveField(g, 0, i, binaryAsString));
vals.add(convertPrimitiveField(g, 0, i));
} else {
Group listItem = g.getGroup(0, i);
vals.add(convertListElement(listItem, binaryAsString));
vals.add(convertListElement(listItem));
}
}
return vals;
}

private static Object convertListElement(Group listItem, boolean binaryAsString)
private Object convertListElement(Group listItem)
{
if (
listItem.getType().isRepetition(Type.Repetition.REPEATED) &&
Expand All @@ -199,7 +244,7 @@ private static Object convertListElement(Group listItem, boolean binaryAsString)
listItem.getType().getFields().get(0).isPrimitive()
) {
// nullable primitive list elements can have a repeating wrapper element, peel it off
return convertPrimitiveField(listItem, 0, binaryAsString);
return convertPrimitiveField(listItem, 0);
} else if (
listItem.getType().isRepetition(Type.Repetition.REPEATED) &&
listItem.getType().getFieldCount() == 1 &&
Expand Down Expand Up @@ -244,7 +289,7 @@ private static boolean isLogicalMapType(Type groupType)
/**
* Convert a parquet 'map' logical type {@link Group} to a java map of string keys to groups/lists/primitive values
*/
private static Map<String, Object> convertLogicalMap(Group g, boolean binaryAsString)
private Map<String, Object> convertLogicalMap(Group g)
{
/*
// Map<String, Integer> (nullable map, non-null values)
Expand All @@ -268,8 +313,8 @@ optional group my_map (MAP_KEY_VALUE) {(
Map<String, Object> converted = new HashMap<>();
for (int i = 0; i < mapEntries; i++) {
Group mapEntry = g.getGroup(0, i);
String key = convertPrimitiveField(mapEntry, 0, binaryAsString).toString();
Object value = convertField(mapEntry, "value", binaryAsString);
String key = convertPrimitiveField(mapEntry, 0).toString();
Object value = convertField(mapEntry, "value");
converted.put(key, value);
}
return converted;
Expand All @@ -281,17 +326,17 @@ optional group my_map (MAP_KEY_VALUE) {(
* @return "ingestion ready" java object, or null
*/
@Nullable
private static Object convertPrimitiveField(Group g, int fieldIndex, boolean binaryAsString)
private Object convertPrimitiveField(Group g, int fieldIndex)
{
PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(fieldIndex);
if (pt.isRepetition(Type.Repetition.REPEATED) && g.getFieldRepetitionCount(fieldIndex) > 1) {
List<Object> vals = new ArrayList<>();
for (int i = 0; i < g.getFieldRepetitionCount(fieldIndex); i++) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
}
return vals;
}
return convertPrimitiveField(g, fieldIndex, 0, binaryAsString);
return convertPrimitiveField(g, fieldIndex, 0);
}

/**
Expand All @@ -300,7 +345,7 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, boolean bin
* @return "ingestion ready" java object, or null
*/
@Nullable
private static Object convertPrimitiveField(Group g, int fieldIndex, int index, boolean binaryAsString)
private Object convertPrimitiveField(Group g, int fieldIndex, int index)
{
PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(fieldIndex);
OriginalType ot = pt.getOriginalType();
Expand All @@ -310,7 +355,7 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, int index,
// convert logical types
switch (ot) {
case DATE:
long ts = g.getInteger(fieldIndex, index) * MILLIS_IN_DAY;
long ts = convertDateToMillis(g.getInteger(fieldIndex, index));
return ts;
case TIME_MICROS:
return g.getLong(fieldIndex, index);
Expand Down Expand Up @@ -443,6 +488,14 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, int index,
}
}

private long convertDateToMillis(int value)
{
if (convertCorruptDates) {
value -= CORRECT_CORRUPT_DATE_SHIFT;
}
return value * MILLIS_IN_DAY;
}

/**
* convert deprecated parquet int96 nanosecond timestamp to a long, based on
* https://github.com/prestodb/presto/blob/master/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTimestampUtils.java#L44
Expand Down Expand Up @@ -490,51 +543,4 @@ private static BigDecimal convertBinaryToDecimal(Binary value, int precision, in
return new BigDecimal(new BigInteger(value.getBytes()), scale);
}
}

private final boolean binaryAsString;

public ParquetGroupConverter(boolean binaryAsString)
{
this.binaryAsString = binaryAsString;
}

/**
* Recursively converts a group into native Java Map
*
* @param g the group
* @return the native Java object
*/
public Object convertGroup(Group g)
{
Map<String, Object> retVal = new LinkedHashMap<>();

for (Type field : g.getType().getFields()) {
final String fieldName = field.getName();
retVal.put(fieldName, convertField(g, fieldName));
}

return retVal;
}

/**
* Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
* into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
* {@link ParquetGroupConverter#convertLogicalMap}), repeated fields will also be translated to lists, and
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*/
@Nullable
Object convertField(Group g, String fieldName)
{
return convertField(g, fieldName, binaryAsString);
}

Object unwrapListElement(Object o)
{
if (o instanceof Group) {
Group g = (Group) o;
return convertListElement(g, binaryAsString);
}
return o;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMak

public ParquetGroupFlattenerMaker(boolean binaryAsString, boolean discoverNestedFields)
{
this.converter = new ParquetGroupConverter(binaryAsString);
this.converter = new ParquetGroupConverter(binaryAsString, false);
this.parquetJsonProvider = new ParquetGroupJsonProvider(converter);
this.jsonPathConfiguration = Configuration.builder()
.jsonProvider(parquetJsonProvider)
Expand Down
Loading

0 comments on commit 4882fb0

Please sign in to comment.