Skip to content

Commit

Permalink
Merge pull request #21 from exasol/feature/#18-avro-format-import-sup…
Browse files Browse the repository at this point in the history
…port

Add support for importing avro formatted files
  • Loading branch information
morazow authored Mar 22, 2019
2 parents 519164a + 7f75a6c commit 98325ef
Show file tree
Hide file tree
Showing 32 changed files with 726 additions and 293 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ can contact our support team.

* [Overview](#overview)
* [Getting started](#getting-started)
* [Storage Formats](#data-storage-formats)
* [Import](#import)
* [Export](#export)
* [Building from source](#building-from-source)
Expand Down Expand Up @@ -110,6 +111,28 @@ CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS
Please do not forget to change the bucket name or latest jar version according
to your setup.

## Data Storage Formats

When performing import or export, the default data format is set as [Apache
Parquet][parquet] format. However, you can specify the format using
`DATA_FORMAT` configuration property.

For example in order to import [Apache Avro][avro] format:

```sql
IMPORT INTO MY_SALES_POSITIONS_TABLE
FROM SCRIPT ETL.IMPORT_PATH WITH
BUCKET_PATH = 's3a://exa-bucket/data/avro/retail/sales_positions/*'
DATA_FORMAT = 'AVRO'
...
PARALLELISM = 'nproc()';
```

### Supported storage formats

* [Apache Parquet][parquet]
* [Apache Avro][avro]: currently only import is supported

## IMPORT

Please follow instructions below in order to import from different cloud
Expand Down Expand Up @@ -268,4 +291,5 @@ The packaged jar should be located at
[gcs]: https://cloud.google.com/storage/
[azure]: https://azure.microsoft.com/en-us/services/storage/blobs/
[parquet]: https://parquet.apache.org/
[avro]: https://avro.apache.org/
[jars]: https://github.com/exasol/cloud-storage-etl-udfs/releases
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.2.6
sbt.version=1.2.8
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3")

// Adds a `wartremover` a flexible Scala code linting tool
// http://github.com/puffnfresh/wartremover
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.3.7")
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.1")

// Adds Contrib Warts
// http://github.com/wartremover/wartremover-contrib/
Expand Down Expand Up @@ -32,7 +32,7 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")

// Adds a `dependencyUpdates` task to check Maven repositories for dependency updates
// http://github.com/rtimush/sbt-updates
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.3.4")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.4.0")

// Adds a `scalafmt` task for automatic source code formatting
// https://github.com/lucidsoftware/neo-sbt-scalafmt
Expand Down Expand Up @@ -60,7 +60,7 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")

// Adds a `sbt-explicit-dependencies` plugin
// https://github.com/cb372/sbt-explicit-dependencies
addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.6")
addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.9")

// Setup this and project/project/plugins.sbt for formatting project/*.scala files with scalafmt
inThisBuild(
Expand Down
65 changes: 34 additions & 31 deletions sbtx
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

set -o pipefail

declare -r sbt_release_version="0.13.18"
declare -r sbt_unreleased_version="0.13.18"
declare -r sbt_release_version="1.2.8"
declare -r sbt_unreleased_version="1.2.8"

declare -r latest_213="2.13.0-M5"
declare -r latest_212="2.12.8"
Expand Down Expand Up @@ -43,11 +43,12 @@ declare -a extra_jvm_opts extra_sbt_opts

echoerr () { echo >&2 "$@"; }
vlog () { [[ -n "$verbose" ]] && echoerr "$@"; }
die () { echo "Aborting: $@" ; exit 1; }
die () { echo "Aborting: $*" ; exit 1; }

setTrapExit () {
# save stty and trap exit, to ensure echo is re-enabled if we are interrupted.
export SBT_STTY="$(stty -g 2>/dev/null)"
SBT_STTY="$(stty -g 2>/dev/null)"
export SBT_STTY

# restore stty settings (echo in particular)
onSbtRunnerExit() {
Expand All @@ -67,16 +68,18 @@ get_script_path () {
local path="$1"
[[ -L "$path" ]] || { echo "$path" ; return; }

local target="$(readlink "$path")"
local -r target="$(readlink "$path")"
if [[ "${target:0:1}" == "/" ]]; then
echo "$target"
else
echo "${path%/*}/$target"
fi
}

declare -r script_path="$(get_script_path "$BASH_SOURCE")"
declare -r script_name="${script_path##*/}"
script_path="$(get_script_path "${BASH_SOURCE[0]}")"
declare -r script_path
script_name="${script_path##*/}"
declare -r script_name

init_default_option_file () {
local overriding_var="${!1}"
Expand All @@ -90,8 +93,8 @@ init_default_option_file () {
echo "$default_file"
}

declare sbt_opts_file="$(init_default_option_file SBT_OPTS .sbtopts)"
declare jvm_opts_file="$(init_default_option_file JVM_OPTS .jvmopts)"
sbt_opts_file="$(init_default_option_file SBT_OPTS .sbtopts)"
jvm_opts_file="$(init_default_option_file JVM_OPTS .jvmopts)"

build_props_sbt () {
[[ -r "$buildProps" ]] && \
Expand Down Expand Up @@ -142,9 +145,9 @@ addResidual () { vlog "[residual] arg = '$1'" ; residual_args+=("$1"); }
addResolver () { addSbt "set resolvers += $1"; }
addDebugger () { addJava "-Xdebug" ; addJava "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=$1"; }
setThisBuild () {
vlog "[addBuild] args = '$@'"
vlog "[addBuild] args = '$*'"
local key="$1" && shift
addSbt "set $key in ThisBuild := $@"
addSbt "set $key in ThisBuild := $*"
}
setScalaVersion () {
[[ "$1" == *"-SNAPSHOT" ]] && addResolver 'Resolver.sonatypeRepo("snapshots")'
Expand All @@ -159,7 +162,7 @@ setJavaHome () {
}

getJavaVersion() {
local str=$("$1" -version 2>&1 | grep -E -e '(java|openjdk) version' | awk '{ print $3 }' | tr -d '"')
local -r str=$("$1" -version 2>&1 | grep -E -e '(java|openjdk) version' | awk '{ print $3 }' | tr -d '"')

# java -version on java8 says 1.8.x
# but on 9 and 10 it's 9.x.y and 10.x.y.
Expand Down Expand Up @@ -191,14 +194,14 @@ checkJava() {
}

java_version () {
local version=$(getJavaVersion "$java_cmd")
local -r version=$(getJavaVersion "$java_cmd")
vlog "Detected Java version: $version"
echo "$version"
}

# MaxPermSize critical on pre-8 JVMs but incurs noisy warning on 8+
default_jvm_opts () {
local v="$(java_version)"
local -r v="$(java_version)"
if [[ $v -ge 8 ]]; then
echo "$default_jvm_opts_common"
else
Expand Down Expand Up @@ -240,11 +243,11 @@ execRunner () {

jar_url () { make_url "$1"; }

is_cygwin () [[ "$(uname -a)" == "CYGWIN"* ]]
is_cygwin () { [[ "$(uname -a)" == "CYGWIN"* ]]; }

jar_file () {
is_cygwin \
&& echo "$(cygpath -w $sbt_launch_dir/"$1"/sbt-launch.jar)" \
&& cygpath -w "$sbt_launch_dir/$1/sbt-launch.jar" \
|| echo "$sbt_launch_dir/$1/sbt-launch.jar"
}

Expand Down Expand Up @@ -420,7 +423,7 @@ process_args "$@"
readConfigFile() {
local end=false
until $end; do
read || end=true
read -r || end=true
[[ $REPLY =~ ^# ]] || [[ -z $REPLY ]] || echo "$REPLY"
done < "$1"
}
Expand All @@ -429,10 +432,10 @@ readConfigFile() {
# can supply args to this runner
if [[ -r "$sbt_opts_file" ]]; then
vlog "Using sbt options defined in file $sbt_opts_file"
while read opt; do extra_sbt_opts+=("$opt"); done < <(readConfigFile "$sbt_opts_file")
while read -r opt; do extra_sbt_opts+=("$opt"); done < <(readConfigFile "$sbt_opts_file")
elif [[ -n "$SBT_OPTS" && ! ("$SBT_OPTS" =~ ^@.*) ]]; then
vlog "Using sbt options defined in variable \$SBT_OPTS"
extra_sbt_opts=( $SBT_OPTS )
IFS=" " read -r -a extra_sbt_opts <<< "$SBT_OPTS"
else
vlog "No extra sbt options have been defined"
fi
Expand All @@ -452,18 +455,18 @@ checkJava
setTraceLevel() {
case "$sbt_version" in
"0.7."* | "0.10."* | "0.11."* ) echoerr "Cannot set trace level in sbt version $sbt_version" ;;
*) setThisBuild traceLevel $trace_level ;;
*) setThisBuild traceLevel "$trace_level" ;;
esac
}

# set scalacOptions if we were given any -S opts
[[ ${#scalac_args[@]} -eq 0 ]] || addSbt "set scalacOptions in ThisBuild += \"${scalac_args[@]}\""
[[ ${#scalac_args[@]} -eq 0 ]] || addSbt "set scalacOptions in ThisBuild += \"${scalac_args[*]}\""

[[ -n "$sbt_explicit_version" && -z "$sbt_new" ]] && addJava "-Dsbt.version=$sbt_explicit_version"
vlog "Detected sbt version $sbt_version"

if [[ -n "$sbt_script" ]]; then
residual_args=( $sbt_script ${residual_args[@]} )
residual_args=( "$sbt_script" "${residual_args[@]}" )
else
# no args - alert them there's stuff in here
(( argumentCount > 0 )) || {
Expand All @@ -484,6 +487,7 @@ EOM
}

# pick up completion if present; todo
# shellcheck disable=SC1091
[[ -r .sbt_completion.sh ]] && source .sbt_completion.sh

# directory to store sbt launchers
Expand Down Expand Up @@ -518,13 +522,13 @@ fi

if [[ -r "$jvm_opts_file" ]]; then
vlog "Using jvm options defined in file $jvm_opts_file"
while read opt; do extra_jvm_opts+=("$opt"); done < <(readConfigFile "$jvm_opts_file")
while read -r opt; do extra_jvm_opts+=("$opt"); done < <(readConfigFile "$jvm_opts_file")
elif [[ -n "$JVM_OPTS" && ! ("$JVM_OPTS" =~ ^@.*) ]]; then
vlog "Using jvm options defined in \$JVM_OPTS variable"
extra_jvm_opts=( $JVM_OPTS )
IFS=" " read -r -a extra_jvm_opts <<< "$JVM_OPTS"
else
vlog "Using default jvm options"
extra_jvm_opts=( $(default_jvm_opts) )
IFS=" " read -r -a extra_jvm_opts <<< "$(default_jvm_opts)"
fi

# traceLevel is 0.12+
Expand All @@ -546,13 +550,12 @@ main () {
# we're not going to print those lines anyway. We strip that bit of
# line noise, but leave the other codes to preserve color.
mainFiltered () {
local ansiOverwrite='\r\x1BM\x1B[2K'
local excludeRegex=$(egrep -v '^#|^$' ~/.sbtignore | paste -sd'|' -)
local -r excludeRegex=$(grep -E -v '^#|^$' ~/.sbtignore | paste -sd'|' -)

echoLine () {
local line="$1"
local line1="$(echo "$line" | sed 's/\r\x1BM\x1B\[2K//g')" # This strips the OverwriteLine code.
local line2="$(echo "$line1" | sed 's/\x1B\[[0-9;]*[JKmsu]//g')" # This strips all codes - we test regexes against this.
local -r line="$1"
local -r line1="${line//\r\x1BM\x1B\[2K//g}" # This strips the OverwriteLine code.
local -r line2="${line1//\x1B\[[0-9;]*[JKmsu]//g}" # This strips all codes - we test regexes against this.

if [[ $line2 =~ $excludeRegex ]]; then
[[ -n $debugUs ]] && echo "[X] $line1"
Expand All @@ -569,7 +572,7 @@ mainFiltered () {
# Obviously this is super ad hoc but I don't know how to improve on it. Testing whether
# stdin is a terminal is useless because most of my use cases for this filtering are
# exactly when I'm at a terminal, running sbt non-interactively.
shouldFilter () { [[ -f ~/.sbtignore ]] && ! egrep -q '\b(shell|console|consoleProject)\b' <<<"${residual_args[@]}"; }
shouldFilter () { [[ -f ~/.sbtignore ]] && ! grep -E -q '\b(shell|console|consoleProject)\b' <<<"${residual_args[@]}"; }

# run sbt
if shouldFilter; then mainFiltered; else main; fi
65 changes: 65 additions & 0 deletions src/main/scala/com/exasol/cloudetl/avro/AvroRowIterator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.exasol.cloudetl.avro

import com.exasol.cloudetl.data.Row

import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.util.Utf8

/**
* An object that creates a [[com.exasol.cloudetl.data.Row]] iterator
* given the Avro [[org.apache.avro.file.DataFileReader]] with
* [[org.apache.avro.generic.GenericRecord]].
*
* Each next record is converted into an internal Row class.
*/
object AvroRowIterator {

def apply(reader: DataFileReader[GenericRecord]): Iterator[Row] =
new Iterator[Row] {
@SuppressWarnings(Array("org.wartremover.warts.Var"))
private[this] var isCompleted = false

override def hasNext: Boolean =
if (isCompleted) {
false
} else {
val hasNext = reader.hasNext
if (!hasNext) {
reader.close()
isCompleted = true
}
hasNext
}

override def next(): Row = {
if (!hasNext) {
throw new NoSuchElementException("Avro reader next on empty iterator")
}
val record = reader.next()
recordToRow(record)
}
}

private[this] def recordToRow(record: GenericRecord): Row = {
val size = record.getSchema.getFields.size
val values = Array.ofDim[Any](size)
for { index <- 0 until size } {
values.update(index, convertRecordValue(record.get(index)))
}
Row(values.toSeq)
}

@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
private[this] def convertRecordValue(value: Any): Any = value match {
case _: GenericRecord =>
throw new IllegalArgumentException("Avro nested record type is not supported yet!")
case _: java.util.Collection[_] =>
throw new IllegalArgumentException("Avro collection type is not supported yet!")
case _: java.util.Map[_, _] =>
throw new IllegalArgumentException("Avro map type is not supported yet!")
case _: Utf8 => value.asInstanceOf[Utf8].toString
case primitiveType => primitiveType
}

}
40 changes: 40 additions & 0 deletions src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.exasol.cloudetl.bucket

import org.apache.hadoop.conf.Configuration

/** A [[Bucket]] implementation for the Azure Blob Storage */
final case class AzureBlobBucket(path: String, params: Map[String, String]) extends Bucket {

/** @inheritdoc */
override val bucketPath: String = path

/** @inheritdoc */
override def validate(): Unit =
Bucket.validate(params, Bucket.AZURE_PARAMETERS)

/**
* @inheritdoc
*
* Additionally validates that all required parameters are available
* in order to create a configuration.
*/
override def createConfiguration(): Configuration = {
validate()

val conf = new Configuration()
val accountName = Bucket.requiredParam(params, "AZURE_ACCOUNT_NAME")
val accountSecretKey = Bucket.requiredParam(params, "AZURE_SECRET_KEY")
conf.set("fs.azure", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName)
conf.set("fs.wasb.impl", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName)
conf.set("fs.wasbs.impl", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName)
conf.set("fs.AbstractFileSystem.wasb.impl", classOf[org.apache.hadoop.fs.azure.Wasb].getName)
conf.set(
"fs.AbstractFileSystem.wasbs.impl",
classOf[org.apache.hadoop.fs.azure.Wasbs].getName
)
conf.set(s"fs.azure.account.key.$accountName.blob.core.windows.net", accountSecretKey)

conf
}

}
Loading

0 comments on commit 98325ef

Please sign in to comment.