Skip to content

Commit

Permalink
initial support for list lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
timvw committed Nov 7, 2023
1 parent eadf0c9 commit bf25a02
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package be.icteam.adobe.analytics.datafeed

import com.univocity.parsers.tsv.{TsvParser, TsvParserSettings}
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.rocksdb.{Options, RocksDB}

import java.io.{ByteArrayInputStream, File, FileInputStream}
import java.nio.file.Files

import scala.collection.JavaConverters._

case class ListLookupValuesContributor(lookupFilesByName: Map[String, File], sourceSchema: StructType) extends ValuesContributor with AutoCloseable {

private case class ListLookupRule(lookupfileName: String, phyiscalColumnName: String, resultSchemaField: StructField)

private val listLookupRules = List(
ListLookupRule(LookupFile.Names.event, "post_event_list", StructField("post_event_list", ArrayType(StringType))))

override def getFieldsWhichCanBeContributed(): List[StructField] = rulesWhichCanContribute.map(_.resultSchemaField)

override def getContributor(alreadyContributedFields: List[StructField], requestedSchema: StructType): Contributor = {

val contributingLookupRules = getContributingRules(requestedSchema)
buildLookupDatabases(contributingLookupRules)
val contributedFields = contributingLookupRules.map(_.resultSchemaField)

val contributeFunctions = contributingLookupRules.map(simpleLookupRule => {
val physicalFieldIndex = sourceSchema.fieldIndex(simpleLookupRule.phyiscalColumnName)
val requestedFieldIndex = requestedSchema.fieldIndex(simpleLookupRule.resultSchemaField.name)
val lookupDatabase = lookupDatabasesByName(simpleLookupRule.lookupfileName)
(row: GenericInternalRow, columns: Array[String]) => {
val parsedValue = columns(physicalFieldIndex)
val value = if (parsedValue == null) null else {
val listValues = parsedValue.split(",")
val lookedupValues = listValues.map(x => lookupDatabase.get(x.getBytes))
ArrayData.toArrayData(lookedupValues.map(x => if (x == null) null else UTF8String.fromBytes(x)))
}
row.update(requestedFieldIndex, value)
}
})

val contributeFunction = (row: GenericInternalRow, parsedValues: Array[String]) => {
contributeFunctions.foreach(x => x(row, parsedValues))
}

Contributor(contributedFields, contributeFunction)


}

private val rulesWhichCanContribute = {
def fileExistsForLookupRule(listLookupRule: ListLookupRule): Boolean = lookupFilesByName.contains(listLookupRule.lookupfileName)

def sourceFieldExistsForLookupRule(listLookupRule: ListLookupRule): Boolean = sourceSchema.fieldNames.contains(listLookupRule.phyiscalColumnName)

listLookupRules
.filter(fileExistsForLookupRule)
.filter(sourceFieldExistsForLookupRule)
}

private def getContributingRules(requestedSchema: StructType) = {
def lookupFieldIsRequested(listLookupRule: ListLookupRule): Boolean = requestedSchema.fieldNames.contains(listLookupRule.resultSchemaField.name)

rulesWhichCanContribute.filter(lookupFieldIsRequested)
}

var lookupDatabasesByName: Map[String, RocksDB] = _

override def close(): Unit = {
Option(lookupDatabasesByName).foreach(x => x.foreach(_._2.close()))
}

private def buildLookupFileDatabase(lookupFileName: String) = {
RocksDB.loadLibrary()

val options = new Options()
.setCreateIfMissing(true)
.setUseDirectReads(true)

val lookupFileDbDir = Files.createTempDirectory(s"lookups-${lookupFileName}")
val lookupFileDb = RocksDB.open(options, lookupFileDbDir.toString)

import scala.collection.JavaConverters._
val lookupFile = lookupFilesByName(lookupFileName)
val lookupStream = new FileInputStream(lookupFile)
val tsvParserSettings = new TsvParserSettings
tsvParserSettings.setMaxColumns(10)
val tokenizer = new TsvParser(tsvParserSettings)
tokenizer.iterate(lookupStream).iterator().asScala.foreach(x => {
lookupFileDb.put(x(0).getBytes, x(1).getBytes)
})
lookupStream.close()
lookupFileDb
}

private def buildLookupDatabases(contributingLookupRules: Seq[ListLookupRule]): Unit = {
val contributingLookupFiles = contributingLookupRules
.map(_.lookupfileName)
.toSet

lookupDatabasesByName = contributingLookupFiles
.map(x => (x, buildLookupFileDatabase(x)))
.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package be.icteam.adobe.analytics.datafeed

import com.univocity.parsers.tsv.{TsvParser, TsvParserSettings}
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.rocksdb.{Options, RocksDB}

import java.io.{File, FileInputStream}
import java.nio.file.Files

case class SimpleLookupValuesContributor(lookupFilesByName: Map[String, File], sourceSchema: StructType) extends ValuesContributor with AutoCloseable {


case class SimpleLookupValuesContributor(lookupFilesByName: Map[String, File], sourceSchema: StructType) extends ValuesContributor with AutoCloseable {

override def getFieldsWhichCanBeContributed(): List[StructField] = rulesWhichCanContribute.map(_.resultSchemaField)

Expand All @@ -21,10 +23,10 @@ case class SimpleLookupValuesContributor(lookupFilesByName: Map[String, File], s
val contributeFunctions = contributingLookupRules.map(simpleLookupRule => {
val physicalFieldIndex = sourceSchema.fieldIndex(simpleLookupRule.phyiscalColumnName)
val requestedFieldIndex = requestedSchema.fieldIndex(simpleLookupRule.resultSchemaField.name)
val lookupDatabase = lookupDatabasesByName(simpleLookupRule.lookupfileName)
(row: GenericInternalRow, columns: Array[String]) => {
val parsedValue = columns(physicalFieldIndex)
val value = if (parsedValue == null) null else {
val lookupDatabase = lookupDatabasesByName(simpleLookupRule.lookupfileName)
val foundValue = lookupDatabase.get(parsedValue.getBytes)
UTF8String.fromBytes(foundValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ trait ValuesContributor {
case class Contributor(contributedFields: List[StructField], contributeFunction: (GenericInternalRow, Array[String]) => Unit)

case class CompositeValuesContributor(valuesContributors: List[ValuesContributor]) extends ValuesContributor {
override def getFieldsWhichCanBeContributed(): List[StructField] = valuesContributors.foldLeft(Set.empty[StructField])((acc, x) => acc ++ x.getFieldsWhichCanBeContributed()).toList
override def getFieldsWhichCanBeContributed(): List[StructField] = {
valuesContributors.foldLeft(List.empty[StructField])((acc, x) => {
acc ++ x.getFieldsWhichCanBeContributed().filter(x => !acc.exists(_.name == x.name))
})
}
override def getContributor(alreadyContributedFields: List[StructField], requestedSchema: StructType): Contributor = {
// update thingie..
val compositeValuesContributorParts = valuesContributors.foldLeft((alreadyContributedFields, List.empty[(GenericInternalRow, Array[String]) => Unit]))((acc, valuesContributor) => {
val contributor = valuesContributor.getContributor(acc._1, requestedSchema)
val contributedValues = alreadyContributedFields ++ contributor.contributedFields
val contributedValues = acc._1 ++ contributor.contributedFields
val contributorActions = acc._2 ++ List(contributor.contributeFunction)
(contributedValues, contributorActions)
})
Expand All @@ -48,7 +51,10 @@ case class CompositeValuesContributor(valuesContributors: List[ValuesContributor
object ValuesContributor {
def apply(enableLookups: Boolean, lookupFilesByName: Map[String, File], sourceSchema: StructType): ValuesContributor = {
val contributors = if(enableLookups) {
List(SimpleLookupValuesContributor(lookupFilesByName, sourceSchema), SimpleSourceValuesContributor(sourceSchema))
List(
ListLookupValuesContributor(lookupFilesByName, sourceSchema),
SimpleLookupValuesContributor(lookupFilesByName, sourceSchema),
SimpleSourceValuesContributor(sourceSchema))
} else {
List(SimpleSourceValuesContributor(sourceSchema))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,23 @@ class DefaultSourceTest extends AnyFunSuite {

spark.stop()
}

test("read datafeed with list") {
val spark = TestUtil.getSparkSession()

val df = spark.read
.format("datafeed")
.load(feedPath)
.select(col("post_event_list"))
//.filter(col("post_event_list").isNotNull)

df.printSchema()

//val os = df.take(1)(0).getString(0)
//assert(os == "1550374905")
df.show(10, false)
//df.printSchema()

spark.stop()
}
}

0 comments on commit bf25a02

Please sign in to comment.