diff --git a/extensions-core/parquet-extensions/pom.xml b/extensions-core/parquet-extensions/pom.xml
index 4951c1c49e8d..b72afc7d8902 100644
--- a/extensions-core/parquet-extensions/pom.xml
+++ b/extensions-core/parquet-extensions/pom.xml
@@ -33,7 +33,7 @@
4.0.0
- 1.12.0
+ 1.13.1
@@ -162,6 +162,11 @@
avro
${avro.version}
+
+ com.github.rvesse
+ airline
+ provided
+
junit
junit
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetToJson.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetToJson.java
index 5cc40df45c2c..e75b17960ac4 100644
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetToJson.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetToJson.java
@@ -21,6 +21,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SequenceWriter;
+import com.github.rvesse.airline.Cli;
+import com.github.rvesse.airline.annotations.Arguments;
+import com.github.rvesse.airline.annotations.Command;
+import com.github.rvesse.airline.annotations.Option;
+import com.github.rvesse.airline.builder.CliBuilder;
import org.apache.druid.data.input.parquet.simple.ParquetGroupConverter;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
@@ -29,27 +34,59 @@
import org.apache.parquet.hadoop.example.GroupReadSupport;
import java.io.File;
+import java.util.List;
+import java.util.concurrent.Callable;
/**
- * Converts parquet files into new-deliminated JSON object files. Takes a single argument (an input directory)
- * and processes all files that end with a ".parquet" extension. Writes out a new file in the same directory named
- * by appending ".json" to the old file name. Will overwrite any output file that already exists.
+ * Converts parquet files into new-deliminated JSON object files. Takes a single
+ * argument (an input directory) and processes all files that end with a
+ * ".parquet" extension. Writes out a new file in the same directory named by
+ * appending ".json" to the old file name. Will overwrite any output file that
+ * already exists.
*/
-public class ParquetToJson
+@Command(name = "ParquetToJson")
+public class ParquetToJson implements Callable
{
+ @Option(name = "--convert-corrupt-dates")
+ public boolean convertCorruptDates = false;
+
+ @Arguments(description = "directory")
+ public List directories;
+
+
public static void main(String[] args) throws Exception
{
- if (args.length != 1) {
- throw new IAE("Usage: directory");
+ CliBuilder builder = Cli.builder("ParquetToJson");
+ builder.withDefaultCommand(ParquetToJson.class);
+ builder.build().parse(args).call();
+ }
+
+ private File[] getInputFiles()
+ {
+ if (directories == null || directories.size() != 1) {
+ throw new IAE("Only one directory argument is supported!");
}
- ParquetGroupConverter converter = new ParquetGroupConverter(true);
+ File dir = new File(directories.get(0));
+ if (!dir.isDirectory()) {
+ throw new IAE("Not a directory [%s]", dir);
+ }
+ File[] inputFiles = dir.listFiles(
+ pathname -> pathname.getName().endsWith(".parquet"));
+ if (inputFiles == null || inputFiles.length == 0) {
+ throw new IAE("No parquet files in directory [%s]", dir);
+ }
+ return inputFiles;
+ }
+
+ @Override
+ public Void call() throws Exception
+ {
ObjectMapper mapper = new DefaultObjectMapper();
- File[] inputFiles = new File(args[0]).listFiles(
- pathname -> pathname.getName().endsWith(".parquet")
- );
+ File[] inputFiles = getInputFiles();
+
for (File inputFile : inputFiles) {
File outputFile = new File(inputFile.getAbsolutePath() + ".json");
@@ -57,13 +94,14 @@ public static void main(String[] args) throws Exception
final org.apache.parquet.hadoop.ParquetReader reader = org.apache.parquet.hadoop.ParquetReader
.builder(new GroupReadSupport(), new Path(inputFile.toURI()))
.build();
- final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)
- ) {
+ final SequenceWriter writer = mapper.writer().withRootValueSeparator("\n").writeValues(outputFile)) {
+ ParquetGroupConverter converter = new ParquetGroupConverter(true, convertCorruptDates);
Group group;
while ((group = reader.read()) != null) {
writer.write(converter.convertGroup(group));
}
}
}
+ return null;
}
}
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupConverter.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupConverter.java
index d2bd643304a3..4d1bb5d6f110 100644
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupConverter.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupConverter.java
@@ -52,10 +52,55 @@ public class ParquetGroupConverter
private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
/**
- * See {@link ParquetGroupConverter#convertField(Group, String)}
+ * https://github.com/apache/drill/blob/2ab46a9411a52f12a0f9acb1144a318059439bc4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java#L89
+ */
+ private static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_EPOCH_OFFSET_DAYS;
+
+ private final boolean binaryAsString;
+ private final boolean convertCorruptDates;
+
+ public ParquetGroupConverter(boolean binaryAsString, boolean convertCorruptDates)
+ {
+ this.binaryAsString = binaryAsString;
+ this.convertCorruptDates = convertCorruptDates;
+ }
+
+ /**
+ * Recursively converts a group into native Java Map
+ *
+ * @param g the group
+ * @return the native Java object
+ */
+ public Object convertGroup(Group g)
+ {
+ Map retVal = new LinkedHashMap<>();
+
+ for (Type field : g.getType().getFields()) {
+ final String fieldName = field.getName();
+ retVal.put(fieldName, convertField(g, fieldName));
+ }
+
+ return retVal;
+ }
+
+ Object unwrapListElement(Object o)
+ {
+ if (o instanceof Group) {
+ Group g = (Group) o;
+ return convertListElement(g);
+ }
+ return o;
+ }
+
+ /**
+ * Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
+ * into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
+ * {@link ParquetGroupConverter#convertLogicalMap}), repeated fields will also be translated to lists, and
+ * primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
+ * if a field is not present, this method will return null.
*/
@Nullable
- private static Object convertField(Group g, String fieldName, boolean binaryAsString)
+ Object convertField(Group g, String fieldName)
{
if (!g.getType().containsField(fieldName)) {
return null;
@@ -76,22 +121,22 @@ private static Object convertField(Group g, String fieldName, boolean binaryAsSt
int repeated = g.getFieldRepetitionCount(fieldIndex);
List