From 97fa2f988a70056974c5e10bded25f9af1a8be12 Mon Sep 17 00:00:00 2001 From: lyogev Date: Wed, 22 Apr 2020 18:19:10 +0300 Subject: [PATCH] feat(delta): add initial delta writer --- build.sbt | 5 +- .../metorikku/configuration/job/Output.scala | 3 +- .../configuration/job/output/Delta.scala | 5 + .../configuration/metric/Output.scala | 3 +- .../metorikku/output/WriterFactory.scala | 1 + .../writers/file/DeltaOutputWriter.scala | 119 ++++++++++++++++++ 6 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/com/yotpo/metorikku/configuration/job/output/Delta.scala create mode 100644 src/main/scala/com/yotpo/metorikku/output/writers/file/DeltaOutputWriter.scala diff --git a/build.sbt b/build.sbt index 5cd8b889e..81f91ea78 100644 --- a/build.sbt +++ b/build.sbt @@ -28,6 +28,8 @@ lazy val excludeAvro = ExclusionRule(organization = "org.apache.avro", name = "a lazy val excludeSpark = ExclusionRule(organization = "org.apache.spark") lazy val excludeFasterXML = ExclusionRule(organization = "com.fasterxml.jackson.module", name= "jackson-module-scala_2.12") lazy val excludeMetricsCore = ExclusionRule(organization = "io.dropwizard.metrics", name= "metrics-core") +lazy val excludeJavaxJSON = ExclusionRule(organization = "org.glassfish", name= "javax.json") +lazy val excludeLog4j = ExclusionRule(organization = "org.apache.logging.log4j") libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", @@ -65,7 +67,8 @@ libraryDependencies ++= Seq( "org.apache.hudi" %% "hudi-spark-bundle" % "0.5.1-incubating" % "provided" excludeAll excludeFasterXML, "org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided", "org.apache.avro" % "avro" % "1.8.2" % "provided", - "org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll) + "org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll, excludeLog4j), + "io.delta" %% "delta-core" % "0.5.0" % "provided" excludeAll(excludeJavaxJSON) ) // Temporary fix for https://github.com/databricks/spark-redshift/issues/315#issuecomment-285294306 diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/Output.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/Output.scala index 0a2d16f19..5ee7ae699 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/job/Output.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/Output.scala @@ -11,4 +11,5 @@ case class Output(cassandra: Option[Cassandra] = None, file: Option[File] = None, kafka: Option[Kafka] = None, elasticsearch: Option[Elasticsearch] = None, - hudi: Option[Hudi] = None) + hudi: Option[Hudi] = None, + delta: Option[Delta] = None) diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/output/Delta.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/output/Delta.scala new file mode 100644 index 000000000..c5faa8698 --- /dev/null +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/output/Delta.scala @@ -0,0 +1,5 @@ +package com.yotpo.metorikku.configuration.job.output + +case class Delta(dir: String, + hiveSync: Option[Boolean], + options: Option[Map[String, String]]) { } \ No newline at end of file diff --git a/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala b/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala index 2a0deeb0b..58653a178 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala @@ -26,7 +26,8 @@ object OutputType extends Enumeration { Elasticsearch, File, Kafka, - Hudi = Value + Hudi, + Delta = Value } class OutputTypeReference extends TypeReference[OutputType.type] diff --git a/src/main/scala/com/yotpo/metorikku/output/WriterFactory.scala b/src/main/scala/com/yotpo/metorikku/output/WriterFactory.scala index d2e392607..86054a443 100644 --- a/src/main/scala/com/yotpo/metorikku/output/WriterFactory.scala +++ b/src/main/scala/com/yotpo/metorikku/output/WriterFactory.scala @@ -33,6 +33,7 @@ object WriterFactory { case OutputType.JSON => new JSONOutputWriter(metricOutputOptions, output.file) case OutputType.Parquet => new ParquetOutputWriter(metricOutputOptions, output.file) case OutputType.Hudi => new HudiOutputWriter(metricOutputOptions, output.hudi) + case OutputType.Delta => new DeltaOutputWriter(metricOutputOptions, output.delta) case OutputType.Instrumentation => new InstrumentationOutputWriter( metricOutputOptions, outputConfig.dataFrameName, metricName, job.instrumentationFactory) diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/file/DeltaOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/file/DeltaOutputWriter.scala new file mode 100644 index 000000000..ef30d3298 --- /dev/null +++ b/src/main/scala/com/yotpo/metorikku/output/writers/file/DeltaOutputWriter.scala @@ -0,0 +1,119 @@ +package com.yotpo.metorikku.output.writers.file + +import com.yotpo.metorikku.configuration.job.output.Delta +import com.yotpo.metorikku.exceptions.MetorikkuException +import com.yotpo.metorikku.output.Writer +import io.delta.tables.DeltaTable +import org.apache.log4j.LogManager +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.{max, col} + +import scala.collection.immutable.Map + +class DeltaOutputWriter(props: Map[String, Object], deltaOutput: Option[Delta]) extends Writer { + case class DeltaOutputProperties(path: Option[String], + saveMode: Option[String], + keyColumn: String, + timeColumn: String, + tableName: Option[String], + retentionHours: Option[Double], + repartition: Option[Int], + extraOptions: Option[Map[String, String]]) + val log = LogManager.getLogger(this.getClass) + + val deltaOutputProperties = DeltaOutputProperties( + props.get("path").asInstanceOf[Option[String]], + props.get("saveMode").asInstanceOf[Option[String]], + props.get("keyColumn").asInstanceOf[Option[String]].get, + props.get("timeColumn").asInstanceOf[Option[String]].get, + props.get("tableName").asInstanceOf[Option[String]], + props.get("retentionHours").asInstanceOf[Option[Double]], + props.get("repartition").asInstanceOf[Option[Int]], + props.get("extraOptions").asInstanceOf[Option[Map[String, String]]]) + + override def write(dataFrame: DataFrame): Unit = { + val keyColumn = deltaOutputProperties.keyColumn + val timeColumn = deltaOutputProperties.timeColumn + + val latestForEachKey = dataFrame + .selectExpr(s"${keyColumn} AS key_for_latest", s"struct(${timeColumn} as time_for_latest, *) as otherCols") + .groupBy("key_for_latest") + .agg(max("otherCols").as("latest")) + .selectExpr("latest.*").drop("time_for_latest") + + val path: Option[String] = (deltaOutputProperties.path, deltaOutput) match { + case (Some(path), Some(file)) => Option(file.dir + "/" + path) + case (Some(path), None) => Option(path) + case _ => None + } + + path match { + case Some(filePath) => { + if (DeltaTable.isDeltaTable(filePath)) { + val deltaTable = DeltaTable.forPath(filePath) + val columnMapping = latestForEachKey.columns.filter(c => c != "_delete").map(c => (c, s"s.${c}")).toMap + + deltaTable.as("t") + .merge( + latestForEachKey.as("s"), + s"s.${keyColumn} = t.${keyColumn} AND s.${timeColumn} > t.${timeColumn}") + .whenMatched("s._delete = true") + .delete() + .whenMatched() + .updateExpr(columnMapping) + .whenNotMatched() + .insertExpr(columnMapping) + .execute() + + deltaOutputProperties.retentionHours match { + case Some(retentionsHours) => { + log.info("Vacuuming") + deltaTable.vacuum(retentionsHours) + } + case _ => + } + + deltaOutputProperties.repartition match { + case Some(repartition) => dataFrame.sparkSession.read + .format("delta") + .load(filePath) + .repartition(repartition) + .write + .option("dataChange", "false") + .format("delta") + .mode("overwrite") + .save(filePath) + case _ => + } + } + else { + val writer = latestForEachKey.drop("_delete").write.format("delta") + writer.option("path", filePath) + deltaOutputProperties.saveMode match { + case Some(saveMode) => writer.mode(saveMode) + case None => + } + + deltaOutput match { + case Some(outputOptions) => { + outputOptions.options match { + case Some(options) => writer.options(options) + case _ => + } + } + case _ => + } + + deltaOutputProperties.extraOptions match { + case Some(options) => writer.options(options) + case _ => + } + writer.save() + } + } + case None => throw MetorikkuException("Path is empty, please define a dir and a path") + } + + // TODO: sync to hive + } +}