Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix json inputs for drill windowing tests #15148

Merged
merged 10 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion extensions-core/parquet-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<parquet.version>1.12.0</parquet.version>
<parquet.version>1.13.1</parquet.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -162,6 +162,11 @@
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>com.github.rvesse</groupId>
<artifactId>airline</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.github.rvesse.airline.Cli;
import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.builder.CliBuilder;
import org.apache.druid.data.input.parquet.simple.ParquetGroupConverter;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
Expand All @@ -29,41 +34,74 @@
import org.apache.parquet.hadoop.example.GroupReadSupport;

import java.io.File;
import java.util.List;
import java.util.concurrent.Callable;

/**
* Converts parquet files into new-deliminated JSON object files. Takes a single argument (an input directory)
* and processes all files that end with a ".parquet" extension. Writes out a new file in the same directory named
* by appending ".json" to the old file name. Will overwrite any output file that already exists.
* Converts parquet files into new-deliminated JSON object files. Takes a single
* argument (an input directory) and processes all files that end with a
* ".parquet" extension. Writes out a new file in the same directory named by
* appending ".json" to the old file name. Will overwrite any output file that
* already exists.
*/
public class ParquetToJson
@Command(name = "ParquetToJson")
public class ParquetToJson implements Callable<Void>
{

@Option(name = "--convert-corrupt-dates")
public boolean convertCorruptDates = false;

@Arguments(description = "directory")
public List<String> directories;


public static void main(String[] args) throws Exception
{
if (args.length != 1) {
throw new IAE("Usage: directory");
CliBuilder<Callable> builder = Cli.builder("ParquetToJson");
builder.withDefaultCommand(ParquetToJson.class);
builder.build().parse(args).call();
}

private File[] getInputFiles()
{
if (directories == null || directories.size() != 1) {
throw new IAE("Only one directory argument is supported!");
}

ParquetGroupConverter converter = new ParquetGroupConverter(true);
File dir = new File(directories.get(0));
if (!dir.isDirectory()) {
throw new IAE("Not a directory [%s]", dir);
}
File[] inputFiles = dir.listFiles(
pathname -> pathname.getName().endsWith(".parquet"));
if (inputFiles == null || inputFiles.length == 0) {
throw new IAE("No parquet files in directory [%s]", dir);
}
return inputFiles;
}

@Override
public Void call() throws Exception
{
ObjectMapper mapper = new DefaultObjectMapper();

File[] inputFiles = new File(args[0]).listFiles(
pathname -> pathname.getName().endsWith(".parquet")
);
File[] inputFiles = getInputFiles();

for (File inputFile : inputFiles) {
File outputFile = new File(inputFile.getAbsolutePath() + ".json");

try (
final org.apache.parquet.hadoop.ParquetReader<Group> reader = org.apache.parquet.hadoop.ParquetReader
.builder(new GroupReadSupport(), new Path(inputFile.toURI()))
.build();
final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)
) {
final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)) {
ParquetGroupConverter converter = new ParquetGroupConverter(true, convertCorruptDates);
Group group;
while ((group = reader.read()) != null) {
writer.write(converter.convertGroup(group));
}
}
}
return null;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a design nit: with the change to remove the static label from the methods, I would expect them to start coming after the constructor. That is, we tend to follow the code flow of static first, then constructor then class methods. I noticed that it was inverted because I kept searching for the constructor at the top of the file and didn't see it, and then realized it was at the bottom and that's because the methods used to be static but now are not.

The current structure reads really nicely for the diff though, I hope that the whitespace change that I'm nit picking doesn't screw up the diff...

Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,55 @@ public class ParquetGroupConverter
private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

/**
* See {@link ParquetGroupConverter#convertField(Group, String)}
* https://github.com/apache/drill/blob/2ab46a9411a52f12a0f9acb1144a318059439bc4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java#L89
*/
public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_EPOCH_OFFSET_DAYS;

private final boolean binaryAsString;
private final boolean convertCorruptDates;

public ParquetGroupConverter(boolean binaryAsString, boolean convertCorruptDates)
{
this.binaryAsString = binaryAsString;
this.convertCorruptDates = convertCorruptDates;
}

/**
* Recursively converts a group into native Java Map
*
* @param g the group
* @return the native Java object
*/
public Object convertGroup(Group g)
{
Map<String, Object> retVal = new LinkedHashMap<>();

for (Type field : g.getType().getFields()) {
final String fieldName = field.getName();
retVal.put(fieldName, convertField(g, fieldName));
}

return retVal;
}

Object unwrapListElement(Object o)
{
if (o instanceof Group) {
Group g = (Group) o;
return convertListElement(g);
}
return o;
}

/**
* Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
* into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
* {@link ParquetGroupConverter#convertLogicalMap}), repeated fields will also be translated to lists, and
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*/
@Nullable
private static Object convertField(Group g, String fieldName, boolean binaryAsString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking, this binaryAsString argument. That was being passed around but never actually used?

Any changes you want to make to ParquetToJson are good, that code is just for our own dev purposes, but these changes in ParquetGroupConverter is changing the code that our current parquet-based file ingestions end up using. So, I just want to double check that this cleanup is truly a redundant argument and not just something that wasn't needed for ParquetToJson but used for the other production code paths.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

binaryAsString was passed around in private static methods in a class on which an instance method was called first....so I've choosen to remove them (and use the implict class access - to get it where needed) ; instead of adding another boolean to every static method

Object convertField(Group g, String fieldName)
{
if (!g.getType().containsField(fieldName)) {
return null;
Expand All @@ -76,22 +121,22 @@ private static Object convertField(Group g, String fieldName, boolean binaryAsSt
int repeated = g.getFieldRepetitionCount(fieldIndex);
List<Object> vals = new ArrayList<>();
for (int i = 0; i < repeated; i++) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
}
return vals;
}
return convertPrimitiveField(g, fieldIndex, binaryAsString);
return convertPrimitiveField(g, fieldIndex);
} else {
if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
return convertRepeatedFieldToList(g, fieldIndex, binaryAsString);
return convertRepeatedFieldToList(g, fieldIndex);
}

if (isLogicalMapType(fieldType)) {
return convertLogicalMap(g.getGroup(fieldIndex, 0), binaryAsString);
return convertLogicalMap(g.getGroup(fieldIndex, 0));
}

if (isLogicalListType(fieldType)) {
return convertLogicalList(g.getGroup(fieldIndex, 0), binaryAsString);
return convertLogicalList(g.getGroup(fieldIndex, 0));
}

// not a list, but not a primitive, return the nested group type
Expand All @@ -102,7 +147,7 @@ private static Object convertField(Group g, String fieldName, boolean binaryAsSt
/**
* convert a repeated field into a list of primitives or groups
*/
private static List<Object> convertRepeatedFieldToList(Group g, int fieldIndex, boolean binaryAsString)
private List<Object> convertRepeatedFieldToList(Group g, int fieldIndex)
{

Type t = g.getType().getFields().get(fieldIndex);
Expand All @@ -111,7 +156,7 @@ private static List<Object> convertRepeatedFieldToList(Group g, int fieldIndex,
List<Object> vals = new ArrayList<>();
for (int i = 0; i < repeated; i++) {
if (t.isPrimitive()) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
} else {
vals.add(g.getGroup(fieldIndex, i));
}
Expand All @@ -134,7 +179,7 @@ private static boolean isLogicalListType(Type listType)
/**
* convert a parquet 'list' logical type {@link Group} to a java list of primitives or groups
*/
private static List<Object> convertLogicalList(Group g, boolean binaryAsString)
private List<Object> convertLogicalList(Group g)
{
/*
// List<Integer> (nullable list, non-null elements)
Expand Down Expand Up @@ -181,16 +226,16 @@ optional group my_list (LIST) {

for (int i = 0; i < repeated; i++) {
if (isListItemPrimitive) {
vals.add(convertPrimitiveField(g, 0, i, binaryAsString));
vals.add(convertPrimitiveField(g, 0, i));
} else {
Group listItem = g.getGroup(0, i);
vals.add(convertListElement(listItem, binaryAsString));
vals.add(convertListElement(listItem));
}
}
return vals;
}

private static Object convertListElement(Group listItem, boolean binaryAsString)
private Object convertListElement(Group listItem)
{
if (
listItem.getType().isRepetition(Type.Repetition.REPEATED) &&
Expand All @@ -199,7 +244,7 @@ private static Object convertListElement(Group listItem, boolean binaryAsString)
listItem.getType().getFields().get(0).isPrimitive()
) {
// nullable primitive list elements can have a repeating wrapper element, peel it off
return convertPrimitiveField(listItem, 0, binaryAsString);
return convertPrimitiveField(listItem, 0);
} else if (
listItem.getType().isRepetition(Type.Repetition.REPEATED) &&
listItem.getType().getFieldCount() == 1 &&
Expand Down Expand Up @@ -244,7 +289,7 @@ private static boolean isLogicalMapType(Type groupType)
/**
* Convert a parquet 'map' logical type {@link Group} to a java map of string keys to groups/lists/primitive values
*/
private static Map<String, Object> convertLogicalMap(Group g, boolean binaryAsString)
private Map<String, Object> convertLogicalMap(Group g)
{
/*
// Map<String, Integer> (nullable map, non-null values)
Expand All @@ -268,8 +313,8 @@ optional group my_map (MAP_KEY_VALUE) {(
Map<String, Object> converted = new HashMap<>();
for (int i = 0; i < mapEntries; i++) {
Group mapEntry = g.getGroup(0, i);
String key = convertPrimitiveField(mapEntry, 0, binaryAsString).toString();
Object value = convertField(mapEntry, "value", binaryAsString);
String key = convertPrimitiveField(mapEntry, 0).toString();
Object value = convertField(mapEntry, "value");
converted.put(key, value);
}
return converted;
Expand All @@ -281,17 +326,17 @@ optional group my_map (MAP_KEY_VALUE) {(
* @return "ingestion ready" java object, or null
*/
@Nullable
private static Object convertPrimitiveField(Group g, int fieldIndex, boolean binaryAsString)
private Object convertPrimitiveField(Group g, int fieldIndex)
{
PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(fieldIndex);
if (pt.isRepetition(Type.Repetition.REPEATED) && g.getFieldRepetitionCount(fieldIndex) > 1) {
List<Object> vals = new ArrayList<>();
for (int i = 0; i < g.getFieldRepetitionCount(fieldIndex); i++) {
vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
vals.add(convertPrimitiveField(g, fieldIndex, i));
}
return vals;
}
return convertPrimitiveField(g, fieldIndex, 0, binaryAsString);
return convertPrimitiveField(g, fieldIndex, 0);
}

/**
Expand All @@ -300,7 +345,7 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, boolean bin
* @return "ingestion ready" java object, or null
*/
@Nullable
private static Object convertPrimitiveField(Group g, int fieldIndex, int index, boolean binaryAsString)
private Object convertPrimitiveField(Group g, int fieldIndex, int index)
{
PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(fieldIndex);
OriginalType ot = pt.getOriginalType();
Expand All @@ -310,7 +355,7 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, int index,
// convert logical types
switch (ot) {
case DATE:
long ts = g.getInteger(fieldIndex, index) * MILLIS_IN_DAY;
long ts = convertDateToMillis(g.getInteger(fieldIndex, index));
return ts;
case TIME_MICROS:
return g.getLong(fieldIndex, index);
Expand Down Expand Up @@ -443,6 +488,14 @@ private static Object convertPrimitiveField(Group g, int fieldIndex, int index,
}
}

private long convertDateToMillis(int value)
{
if (convertCorruptDates) {
value -= CORRECT_CORRUPT_DATE_SHIFT;
}
return value * MILLIS_IN_DAY;
}

/**
* convert deprecated parquet int96 nanosecond timestamp to a long, based on
* https://github.com/prestodb/presto/blob/master/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTimestampUtils.java#L44
Expand Down Expand Up @@ -490,51 +543,4 @@ private static BigDecimal convertBinaryToDecimal(Binary value, int precision, in
return new BigDecimal(new BigInteger(value.getBytes()), scale);
}
}

private final boolean binaryAsString;

public ParquetGroupConverter(boolean binaryAsString)
{
this.binaryAsString = binaryAsString;
}

/**
* Recursively converts a group into native Java Map
*
* @param g the group
* @return the native Java object
*/
public Object convertGroup(Group g)
{
Map<String, Object> retVal = new LinkedHashMap<>();

for (Type field : g.getType().getFields()) {
final String fieldName = field.getName();
retVal.put(fieldName, convertField(g, fieldName));
}

return retVal;
}

/**
* Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
* into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
* {@link ParquetGroupConverter#convertLogicalMap}), repeated fields will also be translated to lists, and
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*/
@Nullable
Object convertField(Group g, String fieldName)
{
return convertField(g, fieldName, binaryAsString);
}

Object unwrapListElement(Object o)
{
if (o instanceof Group) {
Group g = (Group) o;
return convertListElement(g, binaryAsString);
}
return o;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMak

public ParquetGroupFlattenerMaker(boolean binaryAsString, boolean discoverNestedFields)
{
this.converter = new ParquetGroupConverter(binaryAsString);
this.converter = new ParquetGroupConverter(binaryAsString, false);
this.parquetJsonProvider = new ParquetGroupJsonProvider(converter);
this.jsonPathConfiguration = Configuration.builder()
.jsonProvider(parquetJsonProvider)
Expand Down
Loading