diff --git a/.gitignore b/.gitignore index 8530aa2..eb3af7f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ project/project project/target derby.log metastore_db +*.sw[a-z] diff --git a/build.sbt b/build.sbt index 45e8245..bc715c3 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ assemblyMergeStrategy in assembly := { lazy val root = (project in file(".")) .settings( name := "Parquet-Dump", - version := "1.1.0", + version := "1.1.1", organization := "com.tune", scalaVersion := "2.11.8", parallelExecution in test := false, diff --git a/src/main/scala/ParquetDumper.scala b/src/main/scala/ParquetDumper.scala index 81904c1..9e49fec 100644 --- a/src/main/scala/ParquetDumper.scala +++ b/src/main/scala/ParquetDumper.scala @@ -1,28 +1,43 @@ package parquetdump +import scala.collection.JavaConverters._ + import java.io.{File, FileInputStream, DataInputStream, BufferedInputStream, PrintWriter, InputStream} import java.util.concurrent.ArrayBlockingQueue import org.apache.hadoop.fs.Path -import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile object ParquetDumper { def main(args: Array[String]) { - + val isCount = args.headOption.map(_ == "--counts").getOrElse(false) val wq = new ArrayBlockingQueue[SimpleRecord](1000) val in = new FileInputStream(new File("/dev/stdin")) try { val out = new PrintWriter(System.out, true) unpackInputStream(in) - .flatMap(filePath => { - val reader = ParquetReader.builder(new SimpleReadSupport(), new Path(filePath)).build() - Iterator.continually({ - reader.read() - }) - .takeWhile(_ != null) - }) - .foreach(record => { - record.prettyPrintJson(out) - out.println() + .foreach(filePath => { + if (isCount) { + val f = HadoopInputFile.fromPath(new Path(s"file://$filePath"), new org.apache.hadoop.conf.Configuration) + val blocks = ParquetFileReader.readFooter(new org.apache.hadoop.conf.Configuration, new Path(s"file://$filePath"), ParquetMetadataConverter.NO_FILTER) + .getBlocks + .asScala + .foreach(b => { + out.println(b.getRowCount) + }) + } else { + val reader = ParquetReader.builder(new SimpleReadSupport(), new Path(filePath)).build() + Iterator.continually({ + reader.read() + }) + .takeWhile(_ != null) + .foreach(record => { + record.prettyPrintJson(out) + out.println() + }) + } }) } catch { case e: Throwable => throw e