-
Notifications
You must be signed in to change notification settings - Fork 180
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add OpenLineage reporting support for Spark connector
- Loading branch information
1 parent
307607c
commit 164f9ac
Showing
6 changed files
with
264 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 change: 1 addition & 0 deletions
1
...n/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
org.apache.hadoop.hbase.spark.SparkHBaseLineageProvider |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
12 changes: 12 additions & 0 deletions
12
.../hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/SparkHBaseLineageProvider.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package org.apache.hadoop.hbase.spark | ||
|
||
import io.openlineage.spark.extension.OpenLineageExtensionProvider | ||
import io.openlineage.spark.shade.extension.v1.lifecycle.plan.SparkOpenLineageExtensionVisitor | ||
|
||
class SparkHBaseLineageProvider extends OpenLineageExtensionProvider { | ||
|
||
def shadedPackage(): String = | ||
"org.apache.hbase.thirdparty.io.openlineage.spark.shade" | ||
|
||
override def getVisitorClassName: String = classOf[SparkOpenLineageExtensionVisitor].getCanonicalName | ||
} |
179 changes: 179 additions & 0 deletions
179
spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/OpenLineageSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
package org.apache.hadoop.hbase.spark | ||
|
||
import io.openlineage.spark.agent.OpenLineageSparkListener | ||
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} | ||
import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, HBaseTableCatalog} | ||
import org.apache.hadoop.hbase.util.Bytes | ||
import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName} | ||
import org.apache.spark.SparkConf | ||
import org.apache.spark.sql.{SQLContext, SparkSession} | ||
import org.json4s._ | ||
import org.json4s.jackson.JsonMethods._ | ||
import org.scalatest.Matchers.convertToAnyShouldWrapper | ||
import org.scalatest.concurrent.Eventually | ||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} | ||
|
||
import java.io.File | ||
import scala.collection.mutable.ArrayBuffer | ||
import scala.io.Source | ||
|
||
class OpenLineageSuite extends FunSuite with Eventually with BeforeAndAfterEach with BeforeAndAfterAll with Logging { | ||
@transient var sc: SparkSession = null | ||
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility | ||
|
||
val t1TableName = "t1" | ||
val t2TableName = "t2" | ||
val columnFamily = "c" | ||
var sqlContext: SQLContext = null | ||
|
||
val timestamp = 1234567890000L | ||
val lineageFile = File.createTempFile(s"openlineage_test_${System.nanoTime()}", ".log") | ||
|
||
override def beforeAll() { | ||
|
||
TEST_UTIL.startMiniCluster | ||
|
||
logInfo(" - minicluster started") | ||
try | ||
TEST_UTIL.deleteTable(TableName.valueOf(t1TableName)) | ||
catch { | ||
case e: Exception => logInfo(" - no table " + t1TableName + " found") | ||
} | ||
try | ||
TEST_UTIL.deleteTable(TableName.valueOf(t2TableName)) | ||
catch { | ||
case e: Exception => logInfo(" - no table " + t2TableName + " found") | ||
} | ||
|
||
logInfo(" - creating table " + t1TableName) | ||
TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily)) | ||
logInfo(" - created table") | ||
logInfo(" - creating table " + t2TableName) | ||
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily)) | ||
logInfo(" - created table") | ||
|
||
val sparkConf = new SparkConf | ||
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true") | ||
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100") | ||
sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100") | ||
sparkConf.set("spark.extraListeners", classOf[OpenLineageSparkListener].getCanonicalName) | ||
sparkConf.set("spark.openlineage.transport.type", "file") | ||
sparkConf.set("spark.openlineage.transport.location", lineageFile.getAbsolutePath) | ||
|
||
sc = SparkSession.builder().master("local").appName("openlineage-test").config(sparkConf).getOrCreate(); | ||
val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration) | ||
try { | ||
val t1Table = connection.getTable(TableName.valueOf(t1TableName)) | ||
|
||
try { | ||
var put = new Put(Bytes.toBytes("get1")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1)) | ||
t1Table.put(put) | ||
put = new Put(Bytes.toBytes("get2")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4)) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO")) | ||
t1Table.put(put) | ||
put = new Put(Bytes.toBytes("get3")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8)) | ||
t1Table.put(put) | ||
put = new Put(Bytes.toBytes("get4")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10)) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR")) | ||
t1Table.put(put) | ||
put = new Put(Bytes.toBytes("get5")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8")) | ||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8)) | ||
t1Table.put(put) | ||
} finally { | ||
t1Table.close() | ||
} | ||
} finally { | ||
connection.close() | ||
} | ||
|
||
new HBaseContext(sc.sparkContext, TEST_UTIL.getConfiguration) | ||
} | ||
|
||
override def afterAll() { | ||
TEST_UTIL.deleteTable(TableName.valueOf(t1TableName)) | ||
logInfo("shuting down minicluster") | ||
TEST_UTIL.shutdownMiniCluster() | ||
|
||
sc.stop() | ||
} | ||
|
||
override def beforeEach(): Unit = { | ||
DefaultSourceStaticUtils.lastFiveExecutionRules.clear() | ||
} | ||
|
||
test("Test rowKey point only rowKey query") { | ||
val hbaseTable1Catalog = | ||
s"""{ | ||
|"table":{"namespace":"default", "name":"t1"}, | ||
|"rowkey":"key", | ||
|"columns":{ | ||
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"}, | ||
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"}, | ||
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"} | ||
|} | ||
|}""".stripMargin | ||
|
||
val hbaseTable2Catalog = | ||
s"""{ | ||
|"table":{"namespace":"default", "name":"t2"}, | ||
|"rowkey":"key", | ||
|"columns":{ | ||
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"}, | ||
|"OUTPUT_A_FIELD":{"cf":"c", "col":"a", "type":"string"}, | ||
|"OUTPUT_B_FIELD":{"cf":"c", "col":"b", "type":"string"} | ||
|} | ||
|}""".stripMargin | ||
|
||
val results = sc.read | ||
.options(Map(HBaseTableCatalog.tableCatalog -> hbaseTable1Catalog)) | ||
.format("org.apache.hadoop.hbase.spark") | ||
.load() | ||
|
||
results.createOrReplaceTempView("tempview"); | ||
|
||
val outputDf = sc.sql("SELECT KEY_FIELD, A_FIELD AS OUTPUT_A_FIELD, B_FIELD AS OUTPUT_B_FIELD FROM tempview") | ||
|
||
outputDf.write | ||
.format("org.apache.hadoop.hbase.spark") | ||
.options(Map(HBaseTableCatalog.tableCatalog -> hbaseTable2Catalog)) | ||
.save() | ||
|
||
val events = eventually { val eventLog = parseEventLog(lineageFile); eventLog.size shouldBe 1; eventLog } | ||
|
||
val json = events.head | ||
assert(((json \\ "inputs")(0) \ "name") == JString("default.t1")) | ||
assert(((json \\ "inputs")(0) \ "namespace") == JString("hbase://127.0.0.1")) | ||
assert(((json \\ "outputs")(0) \ "name") == JString("default.t2")) | ||
assert(((json \\ "outputs")(0) \ "namespace") == JString("hbase://127.0.0.1")) | ||
} | ||
|
||
def parseEventLog(file: File): List[JValue] = { | ||
val source = Source.fromFile(file) | ||
val eventlist = ArrayBuffer.empty[JValue] | ||
for (line <- source.getLines()) { | ||
val event = parse(line) | ||
for { | ||
JObject(child) <- event | ||
JField("inputs", JArray(inputs)) <- child | ||
JField("outputs", JArray(outputs)) <- child | ||
JField("eventType", JString(eventType)) <- child | ||
if outputs.nonEmpty && inputs.nonEmpty && eventType == "COMPLETE" | ||
} yield eventlist += event | ||
} | ||
eventlist.toList | ||
} | ||
} |