Skip to content

Commit

Permalink
Enhance CV and MV idempotency via deterministic ID generation (#946)
Browse files Browse the repository at this point in the history
* Generate id column for CV and MV

Signed-off-by: Chen Dai <[email protected]>

* Add UT for CV and MV

Signed-off-by: Chen Dai <[email protected]>

* Update with doc and UT

Signed-off-by: Chen Dai <[email protected]>

* Handle struct type in tumble function

Signed-off-by: Chen Dai <[email protected]>

* Refactor UT and doc

Signed-off-by: Chen Dai <[email protected]>

* Add logging and more IT

Signed-off-by: Chen Dai <[email protected]>

* Fix id expression comment

Signed-off-by: Chen Dai <[email protected]>

* Refactor UT assertions

Signed-off-by: Chen Dai <[email protected]>

* Remove auto gen logic for MV

Signed-off-by: Chen Dai <[email protected]>

* Update user manual and scaladoc

Signed-off-by: Chen Dai <[email protected]>

* Refactor UT with idColumn function

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Jan 8, 2025
1 parent 1cb538f commit 5590939
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 38 deletions.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Note that the index option name is case-sensitive. Here is an example:
Expand All @@ -406,6 +407,7 @@ WITH (
watermark_delay = '1 Second',
output_mode = 'complete',
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
id_expression = "sha1(concat_ws('\0',startTime,status))",
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
)
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintJsonHelper._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -62,7 +64,7 @@ trait FlintSparkIndex {
def build(spark: SparkSession, df: Option[DataFrame]): DataFrame
}

object FlintSparkIndex {
object FlintSparkIndex extends Logging {

/**
* Interface indicates a Flint index has custom streaming refresh capability other than foreach
Expand Down Expand Up @@ -117,6 +119,25 @@ object FlintSparkIndex {
s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`"
}

/**
* Generate an ID column using ID expression provided in the index option.
*
* @param df
* which DataFrame to generate ID column
* @param options
* Flint index options
* @return
* DataFrame with/without ID column
*/
def addIdColumn(df: DataFrame, options: FlintSparkIndexOptions): DataFrame = {
options.idExpression() match {
case Some(idExpr) if idExpr.nonEmpty =>
logInfo(s"Using user-provided ID expression: $idExpr")
df.withColumn(ID_COLUMN, expr(idExpr))
case _ => df
}
}

/**
* Populate environment variables to persist in Flint metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.{Collections, UUID}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser
Expand Down Expand Up @@ -96,6 +96,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS)

/**
* An expression that generates unique value as index data row ID.
*
* @return
* ID expression
*/
def idExpression(): Option[String] = getOptionValue(ID_EXPRESSION)

/**
* Extra streaming source options that can be simply passed to DataStreamReader or
* Relation.options
Expand Down Expand Up @@ -187,6 +195,7 @@ object FlintSparkIndexOptions {
val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay")
val OUTPUT_MODE: OptionName.Value = Value("output_mode")
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
val ID_EXPRESSION: OptionName.Value = Value("id_expression")
val EXTRA_OPTIONS: OptionName.Value = Value("extra_options")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchema, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndex.{addIdColumn, flintIndexNamePrefix, generateSchema, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

Expand Down Expand Up @@ -71,10 +71,13 @@ case class FlintSparkCoveringIndex(
val job = df.getOrElse(spark.read.table(quotedTableName(tableName)))

// Add optional filtering condition
filterCondition
.map(job.where)
.getOrElse(job)
.select(colNames.head, colNames.tail: _*)
val batchDf =
filterCondition
.map(job.where)
.getOrElse(job)
.select(colNames.head, colNames.tail: _*)

addIdColumn(batchDf, options)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import scala.collection.convert.ImplicitConversions.`map AsScala`
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchema, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndex.{addIdColumn, flintIndexNamePrefix, generateSchema, metadataBuilder, ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}
Expand Down Expand Up @@ -81,7 +81,8 @@ case class FlintSparkMaterializedView(
override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = {
require(df.isEmpty, "materialized view doesn't support reading from other data frame")

spark.sql(query)
val batchDf = spark.sql(query)
addIdColumn(batchDf, options)
}

override def buildStream(spark: SparkSession): DataFrame = {
Expand All @@ -99,7 +100,9 @@ case class FlintSparkMaterializedView(
case relation: UnresolvedRelation if !relation.isStreaming =>
relation.copy(isStreaming = true, options = optionsWithExtra(spark, relation))
}
logicalPlanToDataFrame(spark, streamingPlan)

val streamingDf = logicalPlanToDataFrame(spark, streamingPlan)
addIdColumn(streamingDf, options)
}

private def watermark(timeCol: Attribute, child: LogicalPlan) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
package org.apache.spark

import org.opensearch.flint.spark.FlintSparkExtensions
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN

import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.{Alias, CodegenObjectFactoryMode, Expression}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf}
import org.apache.spark.sql.flint.config.FlintSparkConf.{EXTERNAL_SCHEDULER_ENABLED, HYBRID_SCAN_ENABLED, METADATA_CACHE_WRITE}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -68,4 +71,27 @@ trait FlintSuite extends SharedSparkSession {
setFlintSparkConf(METADATA_CACHE_WRITE, "false")
}
}

/**
* Implicit class to extend DataFrame functionality with additional utilities.
*
* @param df
* the DataFrame to which the additional methods are added
*/
protected implicit class DataFrameExtensions(val df: DataFrame) {

/**
* Retrieves the ID column expression from the logical plan of the DataFrame, if it exists.
*
* @return
* an `Option` containing the `Expression` for the ID column if present, or `None` otherwise
*/
def idColumn(): Option[Expression] = {
df.queryExecution.logical.collectFirst { case Project(projectList, _) =>
projectList.collectFirst { case Alias(child, ID_COLUMN) =>
child
}
}.flatten
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
Expand All @@ -22,6 +21,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
WATERMARK_DELAY.toString shouldBe "watermark_delay"
OUTPUT_MODE.toString shouldBe "output_mode"
INDEX_SETTINGS.toString shouldBe "index_settings"
ID_EXPRESSION.toString shouldBe "id_expression"
EXTRA_OPTIONS.toString shouldBe "extra_options"
}

Expand All @@ -36,6 +36,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
"watermark_delay" -> "30 Seconds",
"output_mode" -> "complete",
"index_settings" -> """{"number_of_shards": 3}""",
"id_expression" -> """sha1(col("timestamp"))""",
"extra_options" ->
""" {
| "alb_logs": {
Expand All @@ -55,6 +56,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
options.watermarkDelay() shouldBe Some("30 Seconds")
options.outputMode() shouldBe Some("complete")
options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""")
options.idExpression() shouldBe Some("""sha1(col("timestamp"))""")
options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1")
options.extraSinkOptions() shouldBe Map("opt2" -> "val2", "opt3" -> "val3")
}
Expand Down Expand Up @@ -83,6 +85,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
options.watermarkDelay() shouldBe empty
options.outputMode() shouldBe empty
options.indexSettings() shouldBe empty
options.idExpression() shouldBe empty
options.extraSourceOptions("alb_logs") shouldBe empty
options.extraSinkOptions() shouldBe empty
options.optionsWithDefault should contain("auto_refresh" -> "false")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndex.{addIdColumn, ID_COLUMN}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.expressions.{Add, ConcatWs, Literal, Sha1, StructsToJson}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

class FlintSparkIndexSuite extends QueryTest with FlintSuite with Matchers {

test("should add ID column if ID expression is provided") {
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val options = new FlintSparkIndexOptions(Map("id_expression" -> "id + 10"))

val resultDf = addIdColumn(df, options)
resultDf.idColumn() shouldBe Some(Add(UnresolvedAttribute("id"), Literal(10)))
checkAnswer(resultDf.select(ID_COLUMN), Seq(Row(11), Row(12)))
}

test("should not add ID column if ID expression is not provided") {
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val options = FlintSparkIndexOptions.empty

val resultDf = addIdColumn(df, options)
resultDf.columns should not contain ID_COLUMN
}

test("should not add ID column if ID expression is empty") {
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val options = FlintSparkIndexOptions.empty

val resultDf = addIdColumn(df, options)
resultDf.columns should not contain ID_COLUMN
}

test("should generate ID column for various column types") {
val schema = StructType.fromDDL("""
boolean_col BOOLEAN,
string_col STRING,
long_col LONG,
int_col INT,
double_col DOUBLE,
float_col FLOAT,
timestamp_col TIMESTAMP,
date_col DATE,
struct_col STRUCT<subfield1: STRING, subfield2: INT>
""")
val data = Seq(
Row(
true,
"Alice",
100L,
10,
10.5,
3.14f,
java.sql.Timestamp.valueOf("2024-01-01 10:00:00"),
java.sql.Date.valueOf("2024-01-01"),
Row("sub1", 1)))

val aggregatedDf = spark
.createDataFrame(sparkContext.parallelize(data), schema)
.groupBy(
"boolean_col",
"string_col",
"long_col",
"int_col",
"double_col",
"float_col",
"timestamp_col",
"date_col",
"struct_col",
"struct_col.subfield2")
.count()
val options = FlintSparkIndexOptions(Map("id_expression" ->
"sha1(concat_ws('\0',boolean_col,string_col,long_col,int_col,double_col,float_col,timestamp_col,date_col,to_json(struct_col),struct_col.subfield2))"))

val resultDf = addIdColumn(aggregatedDf, options)
resultDf.idColumn() shouldBe Some(
UnresolvedFunction(
"sha1",
Seq(UnresolvedFunction(
"concat_ws",
Seq(
Literal(UTF8String.fromString("\0"), StringType),
UnresolvedAttribute(Seq("boolean_col")),
UnresolvedAttribute(Seq("string_col")),
UnresolvedAttribute(Seq("long_col")),
UnresolvedAttribute(Seq("int_col")),
UnresolvedAttribute(Seq("double_col")),
UnresolvedAttribute(Seq("float_col")),
UnresolvedAttribute(Seq("timestamp_col")),
UnresolvedAttribute(Seq("date_col")),
UnresolvedFunction(
"to_json",
Seq(UnresolvedAttribute(Seq("struct_col"))),
isDistinct = false),
UnresolvedAttribute(Seq("struct_col", "subfield2"))),
isDistinct = false)),
isDistinct = false))
resultDf.select(ID_COLUMN).distinct().count() shouldBe 1
}
}
Loading

0 comments on commit 5590939

Please sign in to comment.