Skip to content

Commit

Permalink
Merge pull request #9 from morazow/feature-gcs
Browse files Browse the repository at this point in the history
Add Initial GCS Import Functionality
  • Loading branch information
morazow authored Dec 5, 2018
2 parents be783f5 + d1af010 commit 91e0e6c
Show file tree
Hide file tree
Showing 17 changed files with 493 additions and 133 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ deploy:
hF48soVocY1xus5AkKiMxrW6d93Th2XTGoyRzJbwm4iXPY1UIKndlkEjFq3RsZRIPND9iURmp/qcwvlIdB29SsczYbH
p3QOQn/NTC6SZbmgAW4xZpCRUUZwfOXP4RacIcDKHlsUjqBZwmSmxK/vJ6KRNR4yxBn7cVlm060cD5l3TmpuUC6X9JI
EPAkYJyNJ1CtRUkYDbgBn+Eof5X3jOZqo8pI51YBKdnz0E=
file: target/scala-2.12/cloud-storage-etl-udfs-${TRAVIS_TAG}.jar
file: ./target/scala-2.12/cloud-storage-etl-udfs-${TRAVIS_TAG}.jar
skip_cleanup: true
on:
repo: exasol/cloud-storage-etl-udfs
tags: true
Expand Down
57 changes: 46 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Please follow the steps described below in order to setup the UDFs.
Download the latest jar file from [releases][jars].

Additionally, you can also build it from the source by following the [build from
source](#building-from-source) step.
source](#building-from-source) steps.

### Upload the JAR file to Exasol BucketFS

Expand All @@ -52,25 +52,29 @@ Please change required parameters.
CREATE SCHEMA ETL;
OPEN SCHEMA ETL;

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_S3_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportS3Path;
CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportPath;
%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_S3_FILES(...) EMITS (...) AS
CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS
%env LD_LIBRARY_PATH=/tmp/;
%scriptclass com.exasol.cloudetl.scriptclasses.ImportS3Files;
%scriptclass com.exasol.cloudetl.scriptclasses.ImportFiles;
%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar;
/

CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_S3_METADATA(...)
EMITS (s3_filename VARCHAR(200), partition_index VARCHAR(100)) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportS3Metadata;
CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...)
EMITS (filename VARCHAR(200), partition_index VARCHAR(100)) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportMetadata;
%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar;
/
```

### Import data from cloud storage
### Import data from cloud storages

Please follow steps below in order to import from cloud strorages.

#### Create an Exasol schema and table

```sql
CREATE SCHEMA TEST;
Expand All @@ -87,12 +91,16 @@ CREATE TABLE SALES_POSITIONS (
VOUCHER_ID SMALLINT,
CANCELED BOOLEAN
);
```

#### Import from AWS S3

```sql
-- ALTER SESSION SET SCRIPT_OUTPUT_ADDRESS='10.0.2.162:3000';

IMPORT INTO SALES_POSITIONS
FROM SCRIPT ETL.IMPORT_S3_PATH WITH
S3_BUCKET_PATH = 's3a://exa-mo-frankfurt/test/retail/sales_positions/*'
FROM SCRIPT ETL.IMPORT_PATH WITH
BUCKET_PATH = 's3a://exa-mo-frankfurt/test/retail/sales_positions/*'
S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY'
S3_SECRET_KEY = 'MY_AWS_SECRET_KEY'
S3_ENDPOINT = 's3.MY_REGION.amazonaws.com'
Expand All @@ -103,6 +111,33 @@ FROM SCRIPT ETL.IMPORT_S3_PATH WITH
SELECT * FROM SALES_POSITIONS LIMIT 10;
```

#### Import from Google GCS

In order to read data from [Google GCS][gcs], you need to provide a service
account key file. This should be uploaded to a secure Exasol bucket in advance.

For example,

```bash
curl \
-X PUT \
-T path/to/project-id-service-keyfile.json \
http://w:MY-PASSWORD@EXA-NODE-ID:2580/bucket1/project-id-service-keyfile.json
```

And then run import,

```sql
IMPORT INTO SALES_POSITIONS
FROM SCRIPT ETL.IMPORT_PATH WITH
BUCKET_PATH = 'gs://exa-test-bucket/data/parquet/sales_positions/*'
GCS_PROJECT_ID = 'MY_GCS_PORJECT_ID'
GCS_KEYFILE_PATH = 'MY_BUCKETFS_PATH/project-id-service-keyfile.json'
PARALLELISM = 'nproc()*10';

SELECT * FROM SALES_POSITIONS LIMIT 10;
```

## Building from Source

Clone the repository,
Expand Down
7 changes: 3 additions & 4 deletions project/Compilation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object Compilation {
contribWart("SealedCaseClass"),
contribWart("SomeApply"),
contribWart("SymbolicName"),
// contribWart("UnsafeInheritance"),
contribWart("UnsafeInheritance"),
ExtraWart.EnumerationPartial,
ExtraWart.FutureObject,
ExtraWart.GenMapLikePartial,
Expand All @@ -124,9 +124,8 @@ object Compilation {
)

val WartremoverTestFlags: Seq[Wart] = ExtraWartremoverFlags ++ Warts.allBut(
Wart.Null,
Wart.Throw,
Wart.While
Wart.NonUnitStatements,
Wart.Null
)

}
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ object Dependencies {
"org.apache.hadoop" % "hadoop-aws" % "2.8.4",
"org.apache.hadoop" % "hadoop-common" % "2.8.4" exclude ("org.slf4j", "slf4j-log4j12"),
"org.apache.hadoop" % "hadoop-hdfs" % "2.8.4",
"org.apache.parquet" % "parquet-avro" % "1.8.1"
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop2-1.9.10"
)

/** Test dependencies only required in `test` */
Expand Down
138 changes: 138 additions & 0 deletions src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package com.exasol.cloudetl.bucket

import java.net.URI

import com.exasol.cloudetl.util.FsUtil

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

abstract class Bucket {

val bucketPath: String

def validate(): Unit

def createConfiguration(): Configuration

lazy val fs: FileSystem =
FileSystem.get(new URI(bucketPath), createConfiguration())

final def getPaths(): Seq[Path] =
FsUtil.globWithPattern(bucketPath, fs)

}

final case class S3Bucket(path: String, params: Map[String, String]) extends Bucket {

override val bucketPath: String = path

override def validate(): Unit =
Bucket.validate(params, Bucket.S3_PARAMETERS)

override def createConfiguration(): Configuration = {
validate()

val conf = new Configuration()
conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName)
conf.set("fs.s3a.endpoint", Bucket.requiredParam(params, "S3_ENDPOINT"))
conf.set("fs.s3a.access.key", Bucket.requiredParam(params, "S3_ACCESS_KEY"))
conf.set("fs.s3a.secret.key", Bucket.requiredParam(params, "S3_SECRET_KEY"))

conf
}

}

final case class GCSBucket(path: String, params: Map[String, String]) extends Bucket {

override val bucketPath: String = path

override def validate(): Unit =
Bucket.validate(params, Bucket.GCS_PARAMETERS)

override def createConfiguration(): Configuration = {
validate()

val conf = new Configuration()
conf.set("fs.gs.impl", classOf[com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem].getName)
conf.setBoolean("fs.gs.auth.service.account.enable", true)
conf.set("fs.gs.project.id", Bucket.requiredParam(params, "GCS_PROJECT_ID"))
conf.set(
"fs.gs.auth.service.account.json.keyfile",
Bucket.requiredParam(params, "GCS_KEYFILE_PATH")
)

conf
}

}

final case class LocalBucket(path: String, params: Map[String, String]) extends Bucket {

override val bucketPath: String = path

override def validate(): Unit = ()

override def createConfiguration(): Configuration =
new Configuration()

}

object Bucket extends LazyLogging {

def apply(params: Map[String, String]): Bucket = {
val path = requiredParam(params, BUCKET_PATH)
val scheme = getScheme(path)

scheme match {
case "s3a" => S3Bucket(path, params)
case "gs" => GCSBucket(path, params)
case "file" => LocalBucket(path, params)
case _ => throw new IllegalArgumentException(s"Unknown path scheme $scheme")
}
}

def getScheme(path: String): String =
new URI(path).getScheme

def validate(params: Map[String, String], keys: Seq[String]): Unit =
keys.foreach { key =>
requiredParam(params, key)
}

def requiredParam(params: Map[String, String], key: String): String = {
val opt = params.get(key)
opt.fold {
throw new IllegalArgumentException(s"The required parameter $key is not defined!")
}(identity)
}

def optionalParam(params: Map[String, String], key: String, defaultValue: String): String =
params.get(key).fold(defaultValue)(identity)

def mapToStr(params: Map[String, String]): String = {
val selectedParams = (params -- Seq("PARALLELISM"))
selectedParams.map { case (k, v) => s"$k=$v" }.mkString(";")
}

def strToMap(str: String): Map[String, String] =
str
.split(";")
.map { word =>
val kv = word.split("=")
kv(0) -> kv(1)
}
.toMap

final val BUCKET_PATH: String = "BUCKET_PATH"

final val S3_PARAMETERS: Seq[String] =
Seq("S3_ENDPOINT", "S3_ACCESS_KEY", "S3_SECRET_KEY")

final val GCS_PARAMETERS: Seq[String] =
Seq("GCS_PROJECT_ID", "GCS_KEYFILE_PATH")

}
6 changes: 5 additions & 1 deletion src/main/scala/com/exasol/cloudetl/row/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.parquet.schema.Type

final case class Row(val values: Seq[Any])

@SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance"))
class RowReadSupport extends ReadSupport[Row] {

override def prepareForRead(
Expand All @@ -35,6 +36,7 @@ class RowReadSupport extends ReadSupport[Row] {
}
}

@SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance"))
class RowRecordMaterializer(messageType: MessageType, readContext: ReadContext)
extends RecordMaterializer[Row] {

Expand All @@ -43,7 +45,9 @@ class RowRecordMaterializer(messageType: MessageType, readContext: ReadContext)
override def getCurrentRecord: Row = Row(getRootConverter.currentResult.toSeq)
}

@SuppressWarnings(Array("org.wartremover.warts.Var"))
@SuppressWarnings(
Array("org.wartremover.warts.Var", "org.wartremover.contrib.warts.UnsafeInheritance")
)
class RowRootConverter(schema: GroupType) extends GroupConverter {
private val size = schema.getFieldCount
private var values: Array[Any] = Array.ofDim[Any](size)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,36 @@
package com.exasol.cloudetl.scriptclasses

import java.net.URI

import scala.collection.mutable.ListBuffer

import com.exasol.ExaIterator
import com.exasol.ExaMetadata
import com.exasol.cloudetl.bucket._
import com.exasol.cloudetl.source.ParquetSource

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object ImportS3Files {
object ImportFiles extends LazyLogging {

def run(meta: ExaMetadata, iter: ExaIterator): Unit = {
val s3Bucket = iter.getString(0)
val s3Endpoint = iter.getString(1)
val s3AccessKey = iter.getString(2)
val s3SecretKey = iter.getString(3)
val files = groupFiles(iter, 4)
val bucketPath = iter.getString(0)

val conf: Configuration = new Configuration()
conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName)
conf.set("fs.s3a.endpoint", s3Endpoint)
conf.set("fs.s3a.access.key", s3AccessKey)
conf.set("fs.s3a.secret.key", s3SecretKey)
val rest = iter.getString(1)
val params = Bucket.strToMap(rest)

val fs: FileSystem = FileSystem.get(new URI(s3Bucket), conf)
val bucket = Bucket(params)

val paths = files.map(f => new Path(f))
val files = groupFiles(iter, 2)

logger.info(s"Reading file = ${files.take(5).mkString(",")} from bucket = $bucketPath")

val source = ParquetSource(paths, fs, conf)
val source = createNewSource(files, bucket.fs, bucket.createConfiguration())

readAndEmit(source, iter)
}

def readAndEmit(src: ParquetSource, ctx: ExaIterator): Unit =
src.stream.foreach { iter =>
iter.foreach { row =>
val columns: Seq[Object] = row.values.map(_.asInstanceOf[AnyRef])
ctx.emit(columns: _*)
}
}

@SuppressWarnings(Array("org.wartremover.warts.MutableDataStructures"))
private[this] def groupFiles(iter: ExaIterator, iterIndex: Int): Seq[String] = {
val files = ListBuffer[String]()
Expand All @@ -55,4 +42,21 @@ object ImportS3Files {
files.toSeq
}

private[this] def createNewSource(
files: Seq[String],
fs: FileSystem,
conf: Configuration
): ParquetSource = {
val paths = files.map(f => new Path(f))
ParquetSource(paths, fs, conf)
}

private[this] def readAndEmit(src: ParquetSource, ctx: ExaIterator): Unit =
src.stream.foreach { iter =>
iter.foreach { row =>
val columns: Seq[Object] = row.values.map(_.asInstanceOf[AnyRef])
ctx.emit(columns: _*)
}
}

}
Loading

0 comments on commit 91e0e6c

Please sign in to comment.