Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(delta): add initial delta writer #306

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ lazy val excludeAvro = ExclusionRule(organization = "org.apache.avro", name = "a
lazy val excludeSpark = ExclusionRule(organization = "org.apache.spark")
lazy val excludeFasterXML = ExclusionRule(organization = "com.fasterxml.jackson.module", name= "jackson-module-scala_2.12")
lazy val excludeMetricsCore = ExclusionRule(organization = "io.dropwizard.metrics", name= "metrics-core")
lazy val excludeJavaxJSON = ExclusionRule(organization = "org.glassfish", name= "javax.json")
lazy val excludeLog4j = ExclusionRule(organization = "org.apache.logging.log4j")

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
Expand Down Expand Up @@ -65,7 +67,8 @@ libraryDependencies ++= Seq(
"org.apache.hudi" %% "hudi-spark-bundle" % "0.5.1-incubating" % "provided" excludeAll excludeFasterXML,
"org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided",
"org.apache.avro" % "avro" % "1.8.2" % "provided",
"org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll)
"org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll, excludeLog4j),
"io.delta" %% "delta-core" % "0.5.0" % "provided" excludeAll(excludeJavaxJSON)
)

// Temporary fix for https://github.com/databricks/spark-redshift/issues/315#issuecomment-285294306
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ case class Output(cassandra: Option[Cassandra] = None,
file: Option[File] = None,
kafka: Option[Kafka] = None,
elasticsearch: Option[Elasticsearch] = None,
hudi: Option[Hudi] = None)
hudi: Option[Hudi] = None,
delta: Option[Delta] = None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.yotpo.metorikku.configuration.job.output

case class Delta(dir: String,
hiveSync: Option[Boolean],
options: Option[Map[String, String]]) { }
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ object OutputType extends Enumeration {
Elasticsearch,
File,
Kafka,
Hudi = Value
Hudi,
Delta = Value
}

class OutputTypeReference extends TypeReference[OutputType.type]
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object WriterFactory {
case OutputType.JSON => new JSONOutputWriter(metricOutputOptions, output.file)
case OutputType.Parquet => new ParquetOutputWriter(metricOutputOptions, output.file)
case OutputType.Hudi => new HudiOutputWriter(metricOutputOptions, output.hudi)
case OutputType.Delta => new DeltaOutputWriter(metricOutputOptions, output.delta)
case OutputType.Instrumentation => new InstrumentationOutputWriter(
metricOutputOptions,
outputConfig.dataFrameName, metricName, job.instrumentationFactory)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.yotpo.metorikku.output.writers.file

import com.yotpo.metorikku.configuration.job.output.Delta
import com.yotpo.metorikku.exceptions.MetorikkuException
import com.yotpo.metorikku.output.Writer
import io.delta.tables.DeltaTable
import org.apache.log4j.LogManager
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{max, col}

import scala.collection.immutable.Map

class DeltaOutputWriter(props: Map[String, Object], deltaOutput: Option[Delta]) extends Writer {
case class DeltaOutputProperties(path: Option[String],
saveMode: Option[String],
keyColumn: String,
timeColumn: String,
tableName: Option[String],
retentionHours: Option[Double],
repartition: Option[Int],
extraOptions: Option[Map[String, String]])
val log = LogManager.getLogger(this.getClass)

val deltaOutputProperties = DeltaOutputProperties(
props.get("path").asInstanceOf[Option[String]],
props.get("saveMode").asInstanceOf[Option[String]],
props.get("keyColumn").asInstanceOf[Option[String]].get,
props.get("timeColumn").asInstanceOf[Option[String]].get,
props.get("tableName").asInstanceOf[Option[String]],
props.get("retentionHours").asInstanceOf[Option[Double]],
props.get("repartition").asInstanceOf[Option[Int]],
props.get("extraOptions").asInstanceOf[Option[Map[String, String]]])

override def write(dataFrame: DataFrame): Unit = {
val keyColumn = deltaOutputProperties.keyColumn
val timeColumn = deltaOutputProperties.timeColumn

val latestForEachKey = dataFrame
.selectExpr(s"${keyColumn} AS key_for_latest", s"struct(${timeColumn} as time_for_latest, *) as otherCols")
.groupBy("key_for_latest")
.agg(max("otherCols").as("latest"))
.selectExpr("latest.*").drop("time_for_latest")

val path: Option[String] = (deltaOutputProperties.path, deltaOutput) match {
case (Some(path), Some(file)) => Option(file.dir + "/" + path)
case (Some(path), None) => Option(path)
case _ => None
}

path match {
case Some(filePath) => {
if (DeltaTable.isDeltaTable(filePath)) {
val deltaTable = DeltaTable.forPath(filePath)
val columnMapping = latestForEachKey.columns.filter(c => c != "_delete").map(c => (c, s"s.${c}")).toMap

deltaTable.as("t")
.merge(
latestForEachKey.as("s"),
s"s.${keyColumn} = t.${keyColumn} AND s.${timeColumn} > t.${timeColumn}")
.whenMatched("s._delete = true")
.delete()
.whenMatched()
.updateExpr(columnMapping)
.whenNotMatched()
.insertExpr(columnMapping)
.execute()

deltaOutputProperties.retentionHours match {
case Some(retentionsHours) => {
log.info("Vacuuming")
deltaTable.vacuum(retentionsHours)
}
case _ =>
}

deltaOutputProperties.repartition match {
case Some(repartition) => dataFrame.sparkSession.read
.format("delta")
.load(filePath)
.repartition(repartition)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(filePath)
case _ =>
}
}
else {
val writer = latestForEachKey.drop("_delete").write.format("delta")
writer.option("path", filePath)
deltaOutputProperties.saveMode match {
case Some(saveMode) => writer.mode(saveMode)
case None =>
}

deltaOutput match {
case Some(outputOptions) => {
outputOptions.options match {
case Some(options) => writer.options(options)
case _ =>
}
}
case _ =>
}

deltaOutputProperties.extraOptions match {
case Some(options) => writer.options(options)
case _ =>
}
writer.save()
}
}
case None => throw MetorikkuException("Path is empty, please define a dir and a path")
}

// TODO: sync to hive
}
}