A library to load data into Spark SQL DataFrames from Amazon Redshift, and write them back to Redshift tables. Amazon S3 is used to transfer data efficiently into and out of Redshift, and JDBC is used to trigger the appropriate COPY and UNLOAD commands on Redshift automatically.
Note: spark-redshift
requires Apache Spark version 1.4+ and Amazon Redshift version 1.0.963+ for
writing with Avro data.
You may use this library in your applications with the following dependency information:
groupId: com.databricks
artifactId: spark-redshift
version: 0.4.1
The project makes use of spark-avro
, which is pulled
in as a dependency, however you'll need to provide the corresponding avro-mapred
matching the Hadoop
distribution that you plan to deploy to.
Further, as Redshift is an AWS product, some AWS libraries will be required. This library expects that
your deployment environment will include hadoop-aws
, or other things necessary to access S3, credentials,
etc. Check the dependencies with "provided" scope in build.sbt if you're at all unclear.
You're also going to need a JDBC driver that is compatible with Redshift. Amazon recommend that you use their driver, although this library has also been successfully tested using the Postgres JDBC driver.
You can use spark-redshift
via the Data Sources API in Scala, Python or SQL, as follows:
import org.apache.spark.sql._
val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)
// Get some data from a Redshift table
val df: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable" -> "my_table")
.option("tempdir" -> "s3://path/for/temp/data")
.load()
// Apply some transformations to the data as per normal, then you can use the
// Data Source API to write the data back to another table
df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable" -> "my_table_copy")
.option("tempdir" -> "s3://path/for/temp/data")
.mode("error")
.save()
from pyspark.sql import SQLContext
sc = # existing SparkContext
sql_context = SQLContext(sc)
# Read data from a table
df = sql_context.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable" -> "my_table") \
.option("tempdir" -> "s3://path/for/temp/data") \
.load()
# Write back to a table
df.write \
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable" -> "my_table_copy") \
.option("tempdir" -> "s3://path/for/temp/data") \
.mode("error")
.save()
CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (dbtable 'my_table',
tempdir 's3://my_bucket/tmp',
url 'jdbc:redshift://host:port/db?user=username&password=pass');
The com.databricks.spark.redshift package has some shortcuts if you're working directly from a Scala application and don't want to use the Data Sources API:
import com.databricks.spark.redshift._
val sqlContext = new SQLContext(sc)
val dataFrame = sqlContext.redshiftTable( ... )
dataFrame.saveAsRedshiftTable( ... )
The library contains a Hadoop input format for Redshift tables unloaded with the ESCAPE option, which you may make direct use of as follows:
Usage in Spark Core:
import com.databricks.spark.redshift.RedshiftInputFormat
val records = sc.newAPIHadoopFile(
path,
classOf[RedshiftInputFormat],
classOf[java.lang.Long],
classOf[Array[String]])
Usage in Spark SQL:
import com.databricks.spark.redshift._
// Call redshiftFile() that returns a SchemaRDD with all string columns.
val records: DataFrame = sqlContext.redshiftFile(path, Seq("name", "age"))
// Call redshiftFile() with the table schema.
val records: DataFrame = sqlContext.redshiftFile(path, "name varchar(10) age integer")
The parameter map or OPTIONS provided in Spark SQL supports the following settings.
Parameter | Required | Default | Notes |
---|---|---|---|
dbtable | Yes | No default | The table to create or read from in Redshift |
url | Yes | No default |
A JDBC URL, of the format, jdbc:subprotocol://host:port/database?user=username&password=password
|
aws_access_key_id | No, unless also unavailable from environment | Default Provider Chain | AWS access key, must have write permissions to the S3 bucket. |
aws_secret_access_key | No, unless also unavailable from environment | Default Provider Chain | AWS secret access key corresponding to provided access key. |
aws_security_token | No, unless using temporary IAM credentials | None | AWS security token corresponding to provided access key. |
tempdir | Yes | No default | A writeable location in Amazon S3, to be used for unloaded data when reading and Avro data to be loaded into Redshift when writing. If you're using `spark-redshift` as part of a regular ETL pipeline, it can be useful to set a Lifecycle Policy on a bucket and use that as a temp location for this data. |
jdbcdriver | No | com.amazon.redshift.jdbc4.Driver | The class name of the JDBC driver to load before JDBC operations. Must be on classpath. |
overwrite | No | false | If true, drop any existing data before writing new content. Only applies when using the Scala `saveAsRedshiftTable` function directly, as `SaveMode` will be preferred when using the Data Source API. See also usestagingtable |
diststyle | No | EVEN | The Redshift Distribution Style to be used when creating a table. Can be one of EVEN, KEY or ALL (see Redshift docs). When using KEY, you must also set a distribution key with the distkey option. |
distkey | No, unless using DISTSTYLE KEY | No default | The name of a column in the table to use as the distribution key when creating a table. |
sortkeyspec | No | No default |
A full Redshift Sort Key definition. Examples include:
|
usestagingtable | No | true |
When performing an overwrite of existing data, this setting can be used to stage the new data in a temporary table, such that we make sure the COPY finishes successfully before making any changes to the existing table. This means that we minimize the amount of time that the target table will be unavailable and restore the old data should the COPY fail. You may wish to disable this by setting the parameter to false if you can't spare the disk space in your Redshift cluster and/or don't have requirements to keep the table availability high. |
postactions | No | No default |
This can be a ; separated list of SQL commands to be executed after a successful COPY when loading data. It may be useful to have some GRANT commands or similar run here when loading new data. If the command contains %s, the table name will be formatted in before execution (in case you're using a staging table). Be warned that if this commands fail, it is treated as an error and you'll get an exception. If using a staging table, the changes will be reverted and the backup table restored if post actions fail. |
Note that you can provide AWS credentials in the parameters above, with Hadoop fs.*
configuration settings,
or you can make them available by the usual environment variables, system properties or IAM roles, etc. The credentials
you provide will be used in Redshift COPY and UNLOAD commands, which means they need write access
to the S3 bucket you reference in your tempdir setting.
Some breaking changes were made in version 0.3 of the Hadoop InputFormat. Users should make the following changes in their code if they would like to use the 0.3+ versions, when using the input format directly:
- com.databricks.examples.redshift.input -> com.databricks.spark.redshift
- SchemaRDD -> DataFrame
import com.databricks.examples.redshift.input.RedshiftInputFormat._
->import com.databricks.spark.redshift._
Version 0.4+ adds the DataSource API and JDBC, which is an entirely new API, so although this won't break code using the InputFormat directly, you may wish to make use of the new functionality to avoid performing UNLOAD queries manually.