Skip to content

Commit

Permalink
#190: Added Spark filter pushdown for S3 variant (#191)
Browse files Browse the repository at this point in the history
Fixes #190
  • Loading branch information
morazow authored Aug 1, 2023
1 parent 93df67d commit 0a34451
Show file tree
Hide file tree
Showing 15 changed files with 406 additions and 419 deletions.
35 changes: 19 additions & 16 deletions dependencies.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions doc/changes/changelog.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions doc/changes/changes_2.1.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Spark Connector 2.1.0, released 2023-08-02

Code name: Added filter pushdown and column selection for `S3` variant

## Summary

This release adds predicate pushdown and column selection for the `S3` variant. Similarly, uses the latest API changes from the `spark-connector-common-java` library.

## Features

* #190: Added predicate pushdown and column selection for `S3` variant

## Dependency Updates

### Spark Exasol Connector With JDBC

#### Compile Dependency Updates

* Updated `com.exasol:spark-connector-common-java:1.1.1` to `2.0.0`

#### Plugin Dependency Updates

* Updated `com.diffplug.spotless:spotless-maven-plugin:2.37.0` to `2.38.0`

### Spark Exasol Connector With S3

#### Compile Dependency Updates

* Updated `com.exasol:spark-connector-common-java:1.1.1` to `2.0.0`
* Updated `software.amazon.awssdk:s3:2.20.103` to `2.20.115`

#### Test Dependency Updates

* Updated `com.amazonaws:aws-java-sdk-s3:1.12.506` to `1.12.518`
* Added `com.exasol:java-util-logging-testing:2.0.3`
* Updated `org.junit.jupiter:junit-jupiter-api:5.9.3` to `5.10.0`
* Updated `org.junit.jupiter:junit-jupiter:5.9.3` to `5.10.0`
6 changes: 5 additions & 1 deletion exasol-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
<groupId>com.exasol</groupId>
<artifactId>exasol-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.exasol</groupId>
<artifactId>spark-connector-common-java</artifactId>
</dependency>
<dependency>
<groupId>com.exasol</groupId>
<artifactId>sql-statement-builder-java8</artifactId>
Expand Down Expand Up @@ -362,7 +366,7 @@
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.37.0</version>
<version>2.38.0</version>
<configuration>
<scala>
<scalafmt>
Expand Down

This file was deleted.

47 changes: 31 additions & 16 deletions exasol-jdbc/src/main/scala/com/exasol/spark/ExasolRelation.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package com.exasol.spark

import java.util.Optional

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType

import com.exasol.spark.common.FilterConverter
import com.exasol.spark.common.StatementGeneratorFactory
import com.exasol.spark.rdd.ExasolRDD
import com.exasol.spark.util.ExasolConnectionManager
import com.exasol.spark.util.Filters
import com.exasol.spark.util.Types
import com.exasol.sql.expression.BooleanExpression

/**
* The Exasol specific implementation of Spark
Expand Down Expand Up @@ -58,20 +62,21 @@ class ExasolRelation(
override def buildScan(requiredColumns: Array[String]): RDD[Row] =
buildScan(requiredColumns, Array.empty)

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] =
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val predicate = new FilterConverter().convert(filters)
if (requiredColumns.isEmpty) {
makeEmptyRDD(filters)
makeEmptyRDD(predicate)
} else {
new ExasolRDD(
sqlContext.sparkContext,
getEnrichedQuery(requiredColumns, filters),
Types.selectColumns(requiredColumns, schema),
manager
)
val query = getEnrichedQuery(requiredColumns, predicate)
logInfo("Creating Spark RDD from Exasol query '" + query + "'.")
new ExasolRDD(sqlContext.sparkContext, query, Types.selectColumns(requiredColumns, schema), manager)
}
}

override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
filters.filterNot(Filters.filterToBooleanExpression(_).isDefined)
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
val filterConverter = new FilterConverter()
filters.filter(!filterConverter.isFilterSupported(_))
}

/**
* When a count action is run from Spark dataframe we do not have to read the
Expand All @@ -86,13 +91,23 @@ class ExasolRelation(
* @return An RDD of empty Row-s which has as many elements as count(*) from
* enriched query
*/
private[this] def makeEmptyRDD(filters: Array[Filter]): RDD[Row] = {
val cntQuery = getEnrichedQuery(Array.empty[String], filters)
val cnt = manager.withCountQuery(cntQuery)
private[this] def makeEmptyRDD(predicate: Optional[BooleanExpression]): RDD[Row] = {
val stmtGenerator = StatementGeneratorFactory.countStarFrom(s"($queryString)")
if (predicate.isPresent()) {
stmtGenerator.where(predicate.get())
}
val countStarQuery = stmtGenerator.render()
logInfo("Running count star query '" + countStarQuery + "'.")
val cnt = manager.withCountQuery(countStarQuery)
sqlContext.sparkContext.parallelize(1L to cnt, 4).map(_ => Row.empty)
}

private[this] def getEnrichedQuery(columns: Array[String], filters: Array[Filter]): String =
ExasolQueryEnricher(queryString).enrichQuery(columns, filters)
private[this] def getEnrichedQuery(columns: Array[String], predicate: Optional[BooleanExpression]): String = {
val stmtGenerator = StatementGeneratorFactory.selectFrom(s"($queryString)").columns(columns: _*)
if (predicate.isPresent()) {
stmtGenerator.where(predicate.get())
}
return stmtGenerator.render()
}

}
Loading

0 comments on commit 0a34451

Please sign in to comment.