Skip to content

Commit

Permalink
[spark-connector] Add option to fail read when there are invalid segm…
Browse files Browse the repository at this point in the history
…ents (apache#13080)
  • Loading branch information
cbalci authored May 22, 2024
1 parent 4eb0690 commit 3c4fe72
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,5 @@ val df = spark.read
| segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 |
| pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins |
| useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false |
| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |
| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |
| failOnInvalidSegments | Fail the read operation if response metadata indicates invalid segments | No | false |
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.JavaConverters._
import scala.util.Try

/**
* Helper methods for spark-pinot conversions
* Helper class for extracting Spark rows from Pinot DataTable.
*/
private[datasource] object TypeConverter {
private[datasource] object DataExtractor {

/** Convert a Pinot schema to Spark schema. */
def pinotSchemaToSparkSchema(schema: Schema): StructType = {
Expand Down Expand Up @@ -64,11 +65,19 @@ private[datasource] object TypeConverter {
/** Convert Pinot DataTable to Seq of InternalRow */
def pinotDataTableToInternalRows(
dataTable: DataTable,
sparkSchema: StructType): Seq[InternalRow] = {
sparkSchema: StructType,
failOnInvalidSegments: Boolean): Seq[InternalRow] = {
val dataTableColumnNames = dataTable.getDataSchema.getColumnNames
val nullRowIdsByColumn = (0 until dataTable.getDataSchema.size()).map{ col =>
dataTable.getNullRowIds(col)
}
val numSegmentsPrunedInvalid = dataTable.getMetadata.getOrDefault(
DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName, "0")
if (Try(numSegmentsPrunedInvalid.toInt).getOrElse(0) > 0) {
if (failOnInvalidSegments) {
throw PinotException(s"${numSegmentsPrunedInvalid} segments were pruned as invalid." +
s" Failing read operation.") }
}
(0 until dataTable.getNumberOfRows).map { rowIndex =>
// spark schema is used to ensure columns order
val columns = sparkSchema.fields.map { field =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class PinotDataSourceReader(options: DataSourceOptions, userSchema: Option[Struc
currentSchema = userSchema.getOrElse {
val pinotTableSchema =
PinotClusterClient.getTableSchema(readParameters.controller, readParameters.tableName)
TypeConverter.pinotSchemaToSparkSchema(pinotTableSchema)
DataExtractor.pinotSchemaToSparkSchema(pinotTableSchema)
}
}
currentSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class PinotInputPartition(
override def _partitionId: Int = partitionId
override def _pinotSplit: PinotSplit = pinotSplit
override def _dataSourceOptions: PinotDataSourceReadOptions = dataSourceOptions
override def _translator: DataTable => Seq[InternalRow] = TypeConverter.pinotDataTableToInternalRows(_, schema)
override def _dataExtractor: DataTable => Seq[InternalRow] =
DataExtractor.pinotDataTableToInternalRows(_, schema, failOnInvalidSegments = false)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pinot.connector.spark.datasource

import org.apache.pinot.common.datatable.DataTableFactory
import org.apache.pinot.common.datatable.{DataTable, DataTableFactory}
import org.apache.pinot.common.utils.DataSchema
import org.apache.pinot.common.utils.DataSchema.ColumnDataType
import org.apache.pinot.connector.spark.common.PinotException
Expand All @@ -35,7 +35,7 @@ import scala.io.Source
/**
* Test pinot/spark conversions like schema, data table etc.
*/
class TypeConverterTest extends BaseTest {
class DataExtractorTest extends BaseTest {

test("Pinot DataTable should be converted to Spark InternalRows") {
val columnNames = Array(
Expand Down Expand Up @@ -115,7 +115,7 @@ class TypeConverterTest extends BaseTest {
)
)

val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head
val result = DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false).head
result.getArray(0) shouldEqual ArrayData.toArrayData(Seq(1, 2, 0))
result.getInt(1) shouldEqual 5
result.getArray(2) shouldEqual ArrayData.toArrayData(Seq(0d, 10.3d))
Expand Down Expand Up @@ -156,7 +156,7 @@ class TypeConverterTest extends BaseTest {
)

val exception = intercept[PinotException] {
TypeConverter.pinotDataTableToInternalRows(dataTable, schema)
DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false)
}

exception.getMessage shouldEqual s"'longCol' not found in Pinot server response"
Expand Down Expand Up @@ -187,15 +187,49 @@ class TypeConverterTest extends BaseTest {
)
)

val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head
val result = DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false).head
result.get(0, StringType) shouldEqual null
}

test("Pinot schema should be converted to spark schema") {
val pinotSchemaAsString = Source.fromResource("schema/pinot-schema.json").mkString
val resultSchema = TypeConverter.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString))
val resultSchema = DataExtractor.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString))
val sparkSchemaAsString = Source.fromResource("schema/spark-schema.json").mkString
val sparkSchema = DataType.fromJson(sparkSchemaAsString).asInstanceOf[StructType]
resultSchema.fields should contain theSameElementsAs sparkSchema.fields
}

test("Should fail if configured when metadata indicates invalid segments") {
val columnNames = Array(
"strCol"
)
val columnTypes = Array(
ColumnDataType.STRING,
)
val dataSchema = new DataSchema(columnNames, columnTypes)

val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
dataTableBuilder.startRow()
dataTableBuilder.setColumn(0, "strValue")
dataTableBuilder.finishRow()
val dataTable = dataTableBuilder.build()

val schema = StructType(
Seq(
StructField("strCol", StringType),
)
)

// Simulate invalid segments and expect exception
dataTable.getMetadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName, "1")
val exception = intercept[PinotException] {
DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = true)
}
exception.getMessage shouldEqual "1 segments were pruned as invalid. Failing read operation."

// don't fail if configured not to
val result =
DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false).head
result.getString(0) shouldEqual "strValue"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,15 @@ val df = spark.read
**Note: Limit is added to every query. Because, generated queries will be converted to the Pinot `BrokerRequest` class. In this operation, pinot sets limit to `10` automatically. Therefore, `LIMIT` was set to `Int.MaxValue` to prevent this issue.**

### Connector Read Parameters
| Configuration | Description | Required | Default Value |
| ------------- | ------------- | ------------- | ------------- |
| table | Pinot table name without table type | Yes | - |
| tableType | Pinot table type(`realtime`, `offline` or `hybrid`) | Yes | - |
| controller | Pinot controller url and port. Input should be `url:port` format without schema. Connector does not support `https` schema for now. | No | localhost:9000 |
| broker | Pinot broker url and port. Input should be `url:port` format without schema. If not specified, connector will find broker instances of table automatically. Connector does not support `https` schema for now | No | Fetch broker instances of table from Pinot Controller |
| usePushDownFilters | Push filters to pinot servers or not. If true, data exchange between pinot server and spark will be minimized. | No | true |
| segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 |
| pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins |
| useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false |
| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |
| Configuration | Description | Required | Default Value |
|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------- |-------------------------------------------------------|
| table | Pinot table name without table type | Yes | - |
| tableType | Pinot table type(`realtime`, `offline` or `hybrid`) | Yes | - |
| controller | Pinot controller url and port. Input should be `url:port` format without schema. Connector does not support `https` schema for now. | No | localhost:9000 |
| broker | Pinot broker url and port. Input should be `url:port` format without schema. If not specified, connector will find broker instances of table automatically. Connector does not support `https` schema for now | No | Fetch broker instances of table from Pinot Controller |
| usePushDownFilters | Push filters to pinot servers or not. If true, data exchange between pinot server and spark will be minimized. | No | true |
| segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 |
| pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins |
| useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false |
| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |
| failOnInvalidSegments | Fail the read operation if response metadata indicates invalid segments | No | false |
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.JavaConverters._
import scala.util.Try

/**
* Helper methods for spark-pinot conversions
* Helper class for extracting Spark rows from Pinot DataTable.
*/
private[pinot] object TypeConverter {
private[pinot] object DataExtractor {

/** Convert a Pinot schema to Spark schema. */
def pinotSchemaToSparkSchema(schema: Schema): StructType = {
Expand Down Expand Up @@ -64,11 +65,21 @@ private[pinot] object TypeConverter {
/** Convert Pinot DataTable to Seq of InternalRow */
def pinotDataTableToInternalRows(
dataTable: DataTable,
sparkSchema: StructType): Seq[InternalRow] = {
sparkSchema: StructType,
failOnInvalidSegments: Boolean): Seq[InternalRow] = {
val dataTableColumnNames = dataTable.getDataSchema.getColumnNames
val nullRowIdsByColumn = (0 until dataTable.getDataSchema.size()).map{ col =>
dataTable.getNullRowIds(col)
}
val numSegmentsPrunedInvalid = dataTable.getMetadata.getOrDefault(
DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName, "0")
if (Try(numSegmentsPrunedInvalid.toInt).getOrElse(0) > 0) {
if (failOnInvalidSegments) {
throw PinotException(s"${numSegmentsPrunedInvalid} segments were pruned as invalid." +
s" Failing read operation.")
}
}

(0 until dataTable.getNumberOfRows).map { rowIndex =>
// spark schema is used to ensure columns order
val columns = sparkSchema.fields.map { field =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class PinotDataSource extends TableProvider with DataSourceRegister {

val pinotTableSchema =
PinotClusterClient.getTableSchema(controller, tableName)
TypeConverter.pinotSchemaToSparkSchema(pinotTableSchema)
DataExtractor.pinotSchemaToSparkSchema(pinotTableSchema)
}

override def getTable(schema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class PinotScan(
override def _partitionId: Int = p.partitionId
override def _pinotSplit: PinotSplit = p.pinotSplit
override def _dataSourceOptions: PinotDataSourceReadOptions = p.dataSourceOptions
override def _translator: DataTable => Seq[InternalRow] =
TypeConverter.pinotDataTableToInternalRows(_, _schema)
override def _dataExtractor: DataTable => Seq[InternalRow] =
DataExtractor.pinotDataTableToInternalRows(_, _schema, p.dataSourceOptions.failOnInvalidSegments)
}
case _ =>
throw new Exception("Unknown InputPartition type. Expecting PinotInputPartition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pinot.connector.spark.v3.datasource

import org.apache.pinot.common.datatable.DataTableFactory
import org.apache.pinot.common.datatable.{DataTable, DataTableFactory}
import org.apache.pinot.common.utils.DataSchema
import org.apache.pinot.common.utils.DataSchema.ColumnDataType
import org.apache.pinot.connector.spark.common.PinotException
Expand All @@ -35,7 +35,7 @@ import scala.io.Source
/**
* Test pinot/spark conversions like schema, data table etc.
*/
class TypeConverterTest extends BaseTest {
class DataExtractorTest extends BaseTest {

test("Pinot DataTable should be converted to Spark InternalRows") {
val columnNames = Array(
Expand Down Expand Up @@ -115,7 +115,7 @@ class TypeConverterTest extends BaseTest {
)
)

val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head
val result = DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false).head
result.getArray(0) shouldEqual ArrayData.toArrayData(Seq(1, 2, 0))
result.getInt(1) shouldEqual 5
result.getArray(2) shouldEqual ArrayData.toArrayData(Seq(0d, 10.3d))
Expand Down Expand Up @@ -156,7 +156,7 @@ class TypeConverterTest extends BaseTest {
)

val exception = intercept[PinotException] {
TypeConverter.pinotDataTableToInternalRows(dataTable, schema)
DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false)
}

exception.getMessage shouldEqual s"'longCol' not found in Pinot server response"
Expand Down Expand Up @@ -189,15 +189,49 @@ class TypeConverterTest extends BaseTest {
)
)

val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head
val result = DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false).head
result.get(0, StringType) shouldEqual null
}

test("Pinot schema should be converted to spark schema") {
val pinotSchemaAsString = Source.fromResource("schema/pinot-schema.json").mkString
val resultSchema = TypeConverter.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString))
val resultSchema = DataExtractor.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString))
val sparkSchemaAsString = Source.fromResource("schema/spark-schema.json").mkString
val sparkSchema = DataType.fromJson(sparkSchemaAsString).asInstanceOf[StructType]
resultSchema.fields should contain theSameElementsAs sparkSchema.fields
}

test("Should fail if configured when metadata indicates invalid segments") {
val columnNames = Array(
"strCol"
)
val columnTypes = Array(
ColumnDataType.STRING,
)
val dataSchema = new DataSchema(columnNames, columnTypes)

val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
dataTableBuilder.startRow()
dataTableBuilder.setColumn(0, "strValue")
dataTableBuilder.finishRow()
val dataTable = dataTableBuilder.build()

val schema = StructType(
Seq(
StructField("strCol", StringType),
)
)

// Simulate invalid segments and expect exception
dataTable.getMetadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName, "1")
val exception = intercept[PinotException] {
DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = true)
}
exception.getMessage shouldEqual "1 segments were pruned as invalid. Failing read operation."

// don't fail if configured not to
val result =
DataExtractor.pinotDataTableToInternalRows(dataTable, schema, failOnInvalidSegments = false).head
result.getString(0) shouldEqual "strValue"
}
}
Loading

0 comments on commit 3c4fe72

Please sign in to comment.