Skip to content

Commit

Permalink
Fix bug for not able to get sourceTables from metadata (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#883)

* add logs

Signed-off-by: Sean Kao <[email protected]>

* match Array when reading sourceTables

Signed-off-by: Sean Kao <[email protected]>

* add test cases

Signed-off-by: Sean Kao <[email protected]>

* use ArrayList only

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Nov 12, 2024
1 parent 98e1c03 commit dd9c0cf
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
Expand Down Expand Up @@ -141,9 +141,9 @@ object FlintSparkIndexFactory extends Logging {
}

private def getMvSourceTables(spark: SparkSession, metadata: FlintMetadata): Array[String] = {
val sourceTables = getArrayString(metadata.properties, "sourceTables")
val sourceTables = getSourceTablesFromMetadata(metadata)
if (sourceTables.isEmpty) {
FlintSparkMaterializedView.extractSourceTableNames(spark, metadata.source)
FlintSparkMaterializedView.extractSourceTablesFromQuery(spark, metadata.source)
} else {
sourceTables
}
Expand All @@ -161,12 +161,4 @@ object FlintSparkIndexFactory extends Logging {
Some(value.asInstanceOf[String])
}
}

private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = {
map.get(key) match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.FlintSparkIndexOptions
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser

/**
Expand Down Expand Up @@ -61,12 +61,7 @@ object FlintMetadataCache {
None
}
val sourceTables = metadata.kind match {
case MV_INDEX_TYPE =>
metadata.properties.get("sourceTables") match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata)
case _ => Array(metadata.source)
}
val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
.isInstanceOf[FlintOpenSearchIndexMetadataService]

override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = {
logInfo(s"Updating metadata cache for $indexName");
logInfo(s"Updating metadata cache for $indexName with $metadata");
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
val request = new PutMappingRequest(osIndexName)
request.source(serialize(metadata), XContentType.JSON)
val serialized = serialize(metadata)
logInfo(s"Serialized: $serialized")
request.source(serialized, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.mv

import java.util.Locale

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.common.metadata.FlintMetadata
Expand All @@ -18,6 +18,7 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
Expand Down Expand Up @@ -64,10 +65,14 @@ case class FlintSparkMaterializedView(
}.toArray
val schema = generateSchema(outputSchema).asJava

// Convert Scala Array to Java ArrayList for consistency with OpenSearch JSON parsing.
// OpenSearch uses Jackson, which deserializes JSON arrays into ArrayLists.
val sourceTablesProperty = new java.util.ArrayList[String](sourceTables.toSeq.asJava)

metadataBuilder(this)
.name(mvName)
.source(query)
.addProperty("sourceTables", sourceTables)
.addProperty("sourceTables", sourceTablesProperty)
.indexedColumns(indexColumnMaps)
.schema(schema)
.build()
Expand Down Expand Up @@ -153,7 +158,7 @@ case class FlintSparkMaterializedView(
}
}

object FlintSparkMaterializedView {
object FlintSparkMaterializedView extends Logging {

/** MV index type name */
val MV_INDEX_TYPE = "mv"
Expand Down Expand Up @@ -185,13 +190,40 @@ object FlintSparkMaterializedView {
* @return
* source table names
*/
def extractSourceTableNames(spark: SparkSession, query: String): Array[String] = {
spark.sessionState.sqlParser
def extractSourceTablesFromQuery(spark: SparkSession, query: String): Array[String] = {
logInfo(s"Extracting source tables from query $query")
val sourceTables = spark.sessionState.sqlParser
.parsePlan(query)
.collect { case relation: UnresolvedRelation =>
qualifyTableName(spark, relation.tableName)
}
.toArray
logInfo(s"Extracted tables: [${sourceTables.mkString(", ")}]")
sourceTables
}

/**
* Get source tables from Flint metadata properties field.
*
* @param metadata
* Flint metadata
* @return
* source table names
*/
def getSourceTablesFromMetadata(metadata: FlintMetadata): Array[String] = {
logInfo(s"Getting source tables from metadata $metadata")
val sourceTables = metadata.properties.get("sourceTables")
sourceTables match {
case list: java.util.ArrayList[_] =>
logInfo(s"sourceTables is [${list.asScala.mkString(", ")}]")
list.toArray.map(_.toString)
case null =>
logInfo("sourceTables property does not exist")
Array.empty[String]
case _ =>
logInfo(s"sourceTables has unexpected type: ${sourceTables.getClass.getName}")
Array.empty[String]
}
}

/** Builder class for MV build */
Expand Down Expand Up @@ -223,7 +255,7 @@ object FlintSparkMaterializedView {
*/
def query(query: String): Builder = {
this.query = query
this.sourceTables = extractSourceTableNames(flint.spark, query)
this.sourceTables = extractSourceTablesFromQuery(flint.spark, query)
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
metadata.kind shouldBe MV_INDEX_TYPE
metadata.source shouldBe "SELECT 1"
metadata.properties should contain key "sourceTables"
metadata.properties.get("sourceTables").asInstanceOf[Array[String]] should have size 0
metadata.properties
.get("sourceTables")
.asInstanceOf[java.util.ArrayList[String]] should have size 0
metadata.indexedColumns shouldBe Array(
Map("columnName" -> "test_col", "columnType" -> "integer").asJava)
metadata.schema shouldBe Map("test_col" -> Map("type" -> "integer").asJava).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTableNames, getFlintIndexName}
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName, getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
Expand Down Expand Up @@ -65,14 +65,76 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| FROM spark_catalog.default.`table/3`
| INNER JOIN spark_catalog.default.`table.4`
|""".stripMargin
extractSourceTableNames(flint.spark, testComplexQuery) should contain theSameElementsAs
extractSourceTablesFromQuery(flint.spark, testComplexQuery) should contain theSameElementsAs
Array(
"spark_catalog.default.table1",
"spark_catalog.default.table2",
"spark_catalog.default.`table/3`",
"spark_catalog.default.`table.4`")

extractSourceTableNames(flint.spark, "SELECT 1") should have size 0
extractSourceTablesFromQuery(flint.spark, "SELECT 1") should have size 0
}

test("get source table names from index metadata successfully") {
val mv = FlintSparkMaterializedView(
"spark_catalog.default.mv",
s"SELECT 1 FROM $testTable",
Array(testTable),
Map("1" -> "integer"))
val metadata = mv.metadata()
getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable)
}

test("get source table names from deserialized metadata successfully") {
val metadata = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": {
| "sourceTables": [
| "$testTable"
| ]
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)
getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable)
}

test("get empty source tables from invalid field in metadata") {
val metadataWrongType = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": {
| "sourceTables": "$testTable"
| }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)
val metadataMissingField = FlintOpenSearchIndexMetadataService.deserialize(s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "properties": { }
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin)

getSourceTablesFromMetadata(metadataWrongType) shouldBe empty
getSourceTablesFromMetadata(metadataMissingField) shouldBe empty
}

test("create materialized view with metadata successfully") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService}
import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.scalatest.Entry
Expand Down Expand Up @@ -161,12 +162,29 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(testTable)
.asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(
testTable)
}
}

test(s"write metadata cache to materialized view index mappings with source tables") {
test("write metadata cache with source tables from index metadata") {
val mv = FlintSparkMaterializedView(
"spark_catalog.default.mv",
s"SELECT 1 FROM $testTable",
Array(testTable),
Map("1" -> "integer"))
val metadata = mv.metadata().copy(latestLogEntry = Some(flintMetadataLogEntry))

flintClient.createIndex(testFlintIndex, metadata)
flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata)

val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(testTable)
}

test("write metadata cache with source tables from deserialized metadata") {
val testTable2 = "spark_catalog.default.metadatacache_test2"
val content =
s""" {
Expand Down Expand Up @@ -194,8 +212,9 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat
val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs Array(testTable, testTable2)
.asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(
testTable,
testTable2)
}

test("write metadata cache to index mappings with refresh interval") {
Expand Down

0 comments on commit dd9c0cf

Please sign in to comment.