diff --git a/.travis.yml b/.travis.yml index b64055b5..b8e8872e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,7 +55,8 @@ deploy: hF48soVocY1xus5AkKiMxrW6d93Th2XTGoyRzJbwm4iXPY1UIKndlkEjFq3RsZRIPND9iURmp/qcwvlIdB29SsczYbH p3QOQn/NTC6SZbmgAW4xZpCRUUZwfOXP4RacIcDKHlsUjqBZwmSmxK/vJ6KRNR4yxBn7cVlm060cD5l3TmpuUC6X9JI EPAkYJyNJ1CtRUkYDbgBn+Eof5X3jOZqo8pI51YBKdnz0E= - file: target/scala-2.12/cloud-storage-etl-udfs-${TRAVIS_TAG}.jar + file: ./target/scala-2.12/cloud-storage-etl-udfs-${TRAVIS_TAG}.jar + skip_cleanup: true on: repo: exasol/cloud-storage-etl-udfs tags: true diff --git a/README.md b/README.md index f915e502..81245759 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ Please follow the steps described below in order to setup the UDFs. Download the latest jar file from [releases][jars]. Additionally, you can also build it from the source by following the [build from -source](#building-from-source) step. +source](#building-from-source) steps. ### Upload the JAR file to Exasol BucketFS @@ -52,25 +52,29 @@ Please change required parameters. CREATE SCHEMA ETL; OPEN SCHEMA ETL; -CREATE OR REPLACE JAVA SET SCRIPT IMPORT_S3_PATH(...) EMITS (...) AS -%scriptclass com.exasol.cloudetl.scriptclasses.ImportS3Path; +CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS +%scriptclass com.exasol.cloudetl.scriptclasses.ImportPath; %jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; / -CREATE OR REPLACE JAVA SET SCRIPT IMPORT_S3_FILES(...) EMITS (...) AS +CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS %env LD_LIBRARY_PATH=/tmp/; -%scriptclass com.exasol.cloudetl.scriptclasses.ImportS3Files; +%scriptclass com.exasol.cloudetl.scriptclasses.ImportFiles; %jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; / -CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_S3_METADATA(...) -EMITS (s3_filename VARCHAR(200), partition_index VARCHAR(100)) AS -%scriptclass com.exasol.cloudetl.scriptclasses.ImportS3Metadata; +CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) +EMITS (filename VARCHAR(200), partition_index VARCHAR(100)) AS +%scriptclass com.exasol.cloudetl.scriptclasses.ImportMetadata; %jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar; / ``` -### Import data from cloud storage +### Import data from cloud storages + +Please follow steps below in order to import from cloud strorages. + +#### Create an Exasol schema and table ```sql CREATE SCHEMA TEST; @@ -87,12 +91,16 @@ CREATE TABLE SALES_POSITIONS ( VOUCHER_ID SMALLINT, CANCELED BOOLEAN ); +``` +#### Import from AWS S3 + +```sql -- ALTER SESSION SET SCRIPT_OUTPUT_ADDRESS='10.0.2.162:3000'; IMPORT INTO SALES_POSITIONS -FROM SCRIPT ETL.IMPORT_S3_PATH WITH - S3_BUCKET_PATH = 's3a://exa-mo-frankfurt/test/retail/sales_positions/*' +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 's3a://exa-mo-frankfurt/test/retail/sales_positions/*' S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' @@ -103,6 +111,33 @@ FROM SCRIPT ETL.IMPORT_S3_PATH WITH SELECT * FROM SALES_POSITIONS LIMIT 10; ``` +#### Import from Google GCS + +In order to read data from [Google GCS][gcs], you need to provide a service +account key file. This should be uploaded to a secure Exasol bucket in advance. + +For example, + +```bash +curl \ + -X PUT \ + -T path/to/project-id-service-keyfile.json \ + http://w:MY-PASSWORD@EXA-NODE-ID:2580/bucket1/project-id-service-keyfile.json +``` + +And then run import, + +```sql +IMPORT INTO SALES_POSITIONS +FROM SCRIPT ETL.IMPORT_PATH WITH + BUCKET_PATH = 'gs://exa-test-bucket/data/parquet/sales_positions/*' + GCS_PROJECT_ID = 'MY_GCS_PORJECT_ID' + GCS_KEYFILE_PATH = 'MY_BUCKETFS_PATH/project-id-service-keyfile.json' + PARALLELISM = 'nproc()*10'; + +SELECT * FROM SALES_POSITIONS LIMIT 10; +``` + ## Building from Source Clone the repository, diff --git a/project/Compilation.scala b/project/Compilation.scala index d78c1c06..ab971072 100644 --- a/project/Compilation.scala +++ b/project/Compilation.scala @@ -103,7 +103,7 @@ object Compilation { contribWart("SealedCaseClass"), contribWart("SomeApply"), contribWart("SymbolicName"), - // contribWart("UnsafeInheritance"), + contribWart("UnsafeInheritance"), ExtraWart.EnumerationPartial, ExtraWart.FutureObject, ExtraWart.GenMapLikePartial, @@ -124,9 +124,8 @@ object Compilation { ) val WartremoverTestFlags: Seq[Wart] = ExtraWartremoverFlags ++ Warts.allBut( - Wart.Null, - Wart.Throw, - Wart.While + Wart.NonUnitStatements, + Wart.Null ) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e56474fb..5e83190c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,8 @@ object Dependencies { "org.apache.hadoop" % "hadoop-aws" % "2.8.4", "org.apache.hadoop" % "hadoop-common" % "2.8.4" exclude ("org.slf4j", "slf4j-log4j12"), "org.apache.hadoop" % "hadoop-hdfs" % "2.8.4", - "org.apache.parquet" % "parquet-avro" % "1.8.1" + "org.apache.parquet" % "parquet-avro" % "1.8.1", + "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop2-1.9.10" ) /** Test dependencies only required in `test` */ diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala new file mode 100644 index 00000000..6067013b --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -0,0 +1,138 @@ +package com.exasol.cloudetl.bucket + +import java.net.URI + +import com.exasol.cloudetl.util.FsUtil + +import com.typesafe.scalalogging.LazyLogging +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +abstract class Bucket { + + val bucketPath: String + + def validate(): Unit + + def createConfiguration(): Configuration + + lazy val fs: FileSystem = + FileSystem.get(new URI(bucketPath), createConfiguration()) + + final def getPaths(): Seq[Path] = + FsUtil.globWithPattern(bucketPath, fs) + +} + +final case class S3Bucket(path: String, params: Map[String, String]) extends Bucket { + + override val bucketPath: String = path + + override def validate(): Unit = + Bucket.validate(params, Bucket.S3_PARAMETERS) + + override def createConfiguration(): Configuration = { + validate() + + val conf = new Configuration() + conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName) + conf.set("fs.s3a.endpoint", Bucket.requiredParam(params, "S3_ENDPOINT")) + conf.set("fs.s3a.access.key", Bucket.requiredParam(params, "S3_ACCESS_KEY")) + conf.set("fs.s3a.secret.key", Bucket.requiredParam(params, "S3_SECRET_KEY")) + + conf + } + +} + +final case class GCSBucket(path: String, params: Map[String, String]) extends Bucket { + + override val bucketPath: String = path + + override def validate(): Unit = + Bucket.validate(params, Bucket.GCS_PARAMETERS) + + override def createConfiguration(): Configuration = { + validate() + + val conf = new Configuration() + conf.set("fs.gs.impl", classOf[com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem].getName) + conf.setBoolean("fs.gs.auth.service.account.enable", true) + conf.set("fs.gs.project.id", Bucket.requiredParam(params, "GCS_PROJECT_ID")) + conf.set( + "fs.gs.auth.service.account.json.keyfile", + Bucket.requiredParam(params, "GCS_KEYFILE_PATH") + ) + + conf + } + +} + +final case class LocalBucket(path: String, params: Map[String, String]) extends Bucket { + + override val bucketPath: String = path + + override def validate(): Unit = () + + override def createConfiguration(): Configuration = + new Configuration() + +} + +object Bucket extends LazyLogging { + + def apply(params: Map[String, String]): Bucket = { + val path = requiredParam(params, BUCKET_PATH) + val scheme = getScheme(path) + + scheme match { + case "s3a" => S3Bucket(path, params) + case "gs" => GCSBucket(path, params) + case "file" => LocalBucket(path, params) + case _ => throw new IllegalArgumentException(s"Unknown path scheme $scheme") + } + } + + def getScheme(path: String): String = + new URI(path).getScheme + + def validate(params: Map[String, String], keys: Seq[String]): Unit = + keys.foreach { key => + requiredParam(params, key) + } + + def requiredParam(params: Map[String, String], key: String): String = { + val opt = params.get(key) + opt.fold { + throw new IllegalArgumentException(s"The required parameter $key is not defined!") + }(identity) + } + + def optionalParam(params: Map[String, String], key: String, defaultValue: String): String = + params.get(key).fold(defaultValue)(identity) + + def mapToStr(params: Map[String, String]): String = { + val selectedParams = (params -- Seq("PARALLELISM")) + selectedParams.map { case (k, v) => s"$k=$v" }.mkString(";") + } + + def strToMap(str: String): Map[String, String] = + str + .split(";") + .map { word => + val kv = word.split("=") + kv(0) -> kv(1) + } + .toMap + + final val BUCKET_PATH: String = "BUCKET_PATH" + + final val S3_PARAMETERS: Seq[String] = + Seq("S3_ENDPOINT", "S3_ACCESS_KEY", "S3_SECRET_KEY") + + final val GCS_PARAMETERS: Seq[String] = + Seq("GCS_PROJECT_ID", "GCS_KEYFILE_PATH") + +} diff --git a/src/main/scala/com/exasol/cloudetl/row/Row.scala b/src/main/scala/com/exasol/cloudetl/row/Row.scala index 5ecc7809..a7ec53be 100644 --- a/src/main/scala/com/exasol/cloudetl/row/Row.scala +++ b/src/main/scala/com/exasol/cloudetl/row/Row.scala @@ -14,6 +14,7 @@ import org.apache.parquet.schema.Type final case class Row(val values: Seq[Any]) +@SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance")) class RowReadSupport extends ReadSupport[Row] { override def prepareForRead( @@ -35,6 +36,7 @@ class RowReadSupport extends ReadSupport[Row] { } } +@SuppressWarnings(Array("org.wartremover.contrib.warts.UnsafeInheritance")) class RowRecordMaterializer(messageType: MessageType, readContext: ReadContext) extends RecordMaterializer[Row] { @@ -43,7 +45,9 @@ class RowRecordMaterializer(messageType: MessageType, readContext: ReadContext) override def getCurrentRecord: Row = Row(getRootConverter.currentResult.toSeq) } -@SuppressWarnings(Array("org.wartremover.warts.Var")) +@SuppressWarnings( + Array("org.wartremover.warts.Var", "org.wartremover.contrib.warts.UnsafeInheritance") +) class RowRootConverter(schema: GroupType) extends GroupConverter { private val size = schema.getFieldCount private var values: Array[Any] = Array.ofDim[Any](size) diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Files.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala similarity index 56% rename from src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Files.scala rename to src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala index 5d9fff30..b1a6fc32 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Files.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala @@ -1,49 +1,36 @@ package com.exasol.cloudetl.scriptclasses -import java.net.URI - import scala.collection.mutable.ListBuffer import com.exasol.ExaIterator import com.exasol.ExaMetadata +import com.exasol.cloudetl.bucket._ import com.exasol.cloudetl.source.ParquetSource +import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path -object ImportS3Files { +object ImportFiles extends LazyLogging { def run(meta: ExaMetadata, iter: ExaIterator): Unit = { - val s3Bucket = iter.getString(0) - val s3Endpoint = iter.getString(1) - val s3AccessKey = iter.getString(2) - val s3SecretKey = iter.getString(3) - val files = groupFiles(iter, 4) + val bucketPath = iter.getString(0) - val conf: Configuration = new Configuration() - conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName) - conf.set("fs.s3a.endpoint", s3Endpoint) - conf.set("fs.s3a.access.key", s3AccessKey) - conf.set("fs.s3a.secret.key", s3SecretKey) + val rest = iter.getString(1) + val params = Bucket.strToMap(rest) - val fs: FileSystem = FileSystem.get(new URI(s3Bucket), conf) + val bucket = Bucket(params) - val paths = files.map(f => new Path(f)) + val files = groupFiles(iter, 2) + + logger.info(s"Reading file = ${files.take(5).mkString(",")} from bucket = $bucketPath") - val source = ParquetSource(paths, fs, conf) + val source = createNewSource(files, bucket.fs, bucket.createConfiguration()) readAndEmit(source, iter) } - def readAndEmit(src: ParquetSource, ctx: ExaIterator): Unit = - src.stream.foreach { iter => - iter.foreach { row => - val columns: Seq[Object] = row.values.map(_.asInstanceOf[AnyRef]) - ctx.emit(columns: _*) - } - } - @SuppressWarnings(Array("org.wartremover.warts.MutableDataStructures")) private[this] def groupFiles(iter: ExaIterator, iterIndex: Int): Seq[String] = { val files = ListBuffer[String]() @@ -55,4 +42,21 @@ object ImportS3Files { files.toSeq } + private[this] def createNewSource( + files: Seq[String], + fs: FileSystem, + conf: Configuration + ): ParquetSource = { + val paths = files.map(f => new Path(f)) + ParquetSource(paths, fs, conf) + } + + private[this] def readAndEmit(src: ParquetSource, ctx: ExaIterator): Unit = + src.stream.foreach { iter => + iter.foreach { row => + val columns: Seq[Object] = row.values.map(_.asInstanceOf[AnyRef]) + ctx.emit(columns: _*) + } + } + } diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala new file mode 100644 index 00000000..9ad71fb5 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala @@ -0,0 +1,32 @@ +package com.exasol.cloudetl.scriptclasses + +import com.exasol.ExaIterator +import com.exasol.ExaMetadata +import com.exasol.cloudetl.bucket._ + +import com.typesafe.scalalogging.LazyLogging + +object ImportMetadata extends LazyLogging { + + def run(meta: ExaMetadata, iter: ExaIterator): Unit = { + val bucketPath = iter.getString(0) + val parallelism = iter.getInteger(2) + + logger.info( + s"Reading metadata from bucket path = $bucketPath with parallelism = ${parallelism.toString}" + ) + + val rest = iter.getString(1) + val params = Bucket.strToMap(rest) + + val bucket = Bucket(params) + + val paths = bucket.getPaths() + + paths.zipWithIndex.foreach { + case (filename, idx) => + iter.emit(filename.toString, s"${idx % parallelism}") + } + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala new file mode 100644 index 00000000..062b1ae1 --- /dev/null +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportPath.scala @@ -0,0 +1,40 @@ +package com.exasol.cloudetl.scriptclasses + +import scala.collection.JavaConverters._ + +import com.exasol.ExaImportSpecification +import com.exasol.ExaMetadata +import com.exasol.cloudetl.bucket._ + +object ImportPath { + + def generateSqlForImportSpec(exaMeta: ExaMetadata, exaSpec: ExaImportSpecification): String = { + val params = exaSpec.getParameters.asScala.toMap + + val bucket = Bucket(params) + + bucket.validate() + + val bucketPath = bucket.bucketPath + val parallelism = Bucket.optionalParam(params, "PARALLELISM", "nproc()") + + val rest = Bucket.mapToStr(params) + + val scriptSchema = exaMeta.getScriptSchema + + s""" + |SELECT + | $scriptSchema.IMPORT_FILES( + | '$bucketPath', '$rest', filename + |) + |FROM ( + | SELECT $scriptSchema.IMPORT_METADATA( + | '$bucketPath', '$rest', $parallelism + | ) + |) + |GROUP BY + | partition_index; + """.stripMargin + } + +} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Metadata.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Metadata.scala deleted file mode 100644 index cffded83..00000000 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Metadata.scala +++ /dev/null @@ -1,37 +0,0 @@ -package com.exasol.cloudetl.scriptclasses - -import java.net.URI - -import com.exasol.ExaIterator -import com.exasol.ExaMetadata -import com.exasol.cloudetl.util.FsUtil - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem - -object ImportS3Metadata { - - def run(meta: ExaMetadata, iter: ExaIterator): Unit = { - val s3Bucket = iter.getString(0) - val s3Endpoint = iter.getString(1) - val s3AccessKey = iter.getString(2) - val s3SecretKey = iter.getString(3) - val parallelism = iter.getInteger(4) - - val conf: Configuration = new Configuration() - conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName) - conf.set("fs.s3a.endpoint", s3Endpoint) - conf.set("fs.s3a.access.key", s3AccessKey) - conf.set("fs.s3a.secret.key", s3SecretKey) - - val fs: FileSystem = FileSystem.get(new URI(s3Bucket), conf) - - val paths = FsUtil.globWithPattern(s3Bucket, fs) - - paths.zipWithIndex.foreach { - case (filename, idx) => - iter.emit(filename.toString, s"${idx % parallelism}") - } - } - -} diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Path.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Path.scala deleted file mode 100644 index e358a394..00000000 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportS3Path.scala +++ /dev/null @@ -1,53 +0,0 @@ -package com.exasol.cloudetl.scriptclasses - -import java.util.{Map => JMap} - -import com.exasol.ExaImportSpecification -import com.exasol.ExaMetadata - -object ImportS3Path { - - def generateSqlForImportSpec(exaMeta: ExaMetadata, exaSpec: ExaImportSpecification): String = { - val params = exaSpec.getParameters - val s3Bucket = requiredParam(params, "S3_BUCKET_PATH") - val s3Endpoint = requiredParam(params, "S3_ENDPOINT") - val s3AccessKey = requiredParam(params, "S3_ACCESS_KEY") - val s3SecretKey = requiredParam(params, "S3_SECRET_KEY") - val parallelism = optionalParam(params, "PARALLELISM", "nproc()") - - val scriptSchema = exaMeta.getScriptSchema - - s""" - |SELECT - | $scriptSchema.IMPORT_S3_FILES( - | '$s3Bucket', '$s3Endpoint', '$s3AccessKey', '$s3SecretKey', s3_filename - |) - |FROM ( - | SELECT $scriptSchema.IMPORT_S3_METADATA( - | '$s3Bucket', '$s3Endpoint', '$s3AccessKey', '$s3SecretKey', $parallelism - | ) - |) - |GROUP BY - | partition_index; - """.stripMargin - } - - private[this] def requiredParam(params: JMap[String, String], key: String): String = - if (!params.containsKey(key)) { - throw new RuntimeException(s"The required parameter $key is not defined!") - } else { - params.get(key) - } - - private[this] def optionalParam( - params: JMap[String, String], - key: String, - defaultValue: String - ): String = - if (!params.containsKey(key)) { - defaultValue - } else { - params.get(key) - } - -} diff --git a/src/test/resources/parquet/sales_positions_small.snappy.parquet b/src/test/resources/parquet/sales_positions_small.snappy.parquet new file mode 100644 index 00000000..5d13213f Binary files /dev/null and b/src/test/resources/parquet/sales_positions_small.snappy.parquet differ diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala new file mode 100644 index 00000000..9f65afe7 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/BaseImportSuite.scala @@ -0,0 +1,45 @@ +package com.exasol.cloudetl.scriptclasses + +import java.nio.file.Path +import java.nio.file.Paths + +import com.exasol.ExaIterator +import com.exasol.cloudetl.bucket.Bucket + +import org.mockito.Mockito.when +import org.scalatest.FunSuite +import org.scalatest.Matchers +import org.scalatest.mockito.MockitoSugar + +trait BaseImportSuite extends FunSuite with Matchers with MockitoSugar { + + val testSchema = "my_schema" + + val s3BucketPath = "s3a://my_bucket/folder1/*" + val s3Endpoint = "s3.eu-central-1.com" + val s3AccessKey = "s3_access_key" + val s3SecretKey = "s3_secret_key" + + val params: Map[String, String] = Map( + "BUCKET_PATH" -> s3BucketPath, + "S3_ENDPOINT" -> s3Endpoint, + "S3_ACCESS_KEY" -> s3AccessKey, + "S3_SECRET_KEY" -> s3SecretKey + ) + + val resourcePath: String = norm(Paths.get(getClass.getResource("/parquet").toURI)) + val resourceBucket: String = s"$resourcePath/*.parquet" + + final def norm(path: Path): String = + path.toUri.toString.replaceAll("/$", "").replaceAll("///", "/") + + final def commonExaIterator(bucket: String): ExaIterator = { + val mockIter = mock[ExaIterator] + val newParams = params + (Bucket.BUCKET_PATH -> bucket) + + when(mockIter.getString(0)).thenReturn(bucket) + when(mockIter.getString(1)).thenReturn(Bucket.mapToStr(newParams)) + + mockIter + } +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala new file mode 100644 index 00000000..1125f84f --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportFilesSuite.scala @@ -0,0 +1,69 @@ +package com.exasol.cloudetl.scriptclasses + +import com.exasol.ExaMetadata + +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito._ + +@SuppressWarnings(Array("org.wartremover.warts.Any")) +class ImportFilesSuite extends BaseImportSuite { + + test("`run` should emit total number of records") { + val file1 = s"$resourcePath/sales_positions1.snappy.parquet" + val file2 = s"$resourcePath/sales_positions2.snappy.parquet" + + val exaIter = commonExaIterator(resourceBucket) + + when(exaIter.next()).thenReturn(true, false) + when(exaIter.getString(2)).thenReturn(file1, file2) + + ImportFiles.run(mock[ExaMetadata], exaIter) + + val totalRecords = 1000 + + verify(exaIter, times(totalRecords)).emit(Seq(any[Object]): _*) + } + + /** + * + * +---------+-----------+----------+------+-----+----------+--------+ + * |sales_id |position_id|article_id|amount|price|voucher_id|canceled| + * +---------+-----------+----------+------+-----+----------+--------+ + * |582244536|2 |96982 |1 |0.56 |null |null | + * |582177839|6 |96982 |2 |0.56 |null |null | + * |582370207|0 |96982 |1 |0.56 |null |null | + * |582344312|0 |96982 |5 |0.56 |null |null | + * |582344274|1 |96982 |1 |0.56 |null |null | + * +---------+-----------+----------+------+-----+----------+--------+ + * + */ + test("`run` should emit corrent sequence of records") { + val file = s"$resourcePath/sales_positions_small.snappy.parquet" + + val exaIter = commonExaIterator(resourceBucket) + + when(exaIter.next()).thenReturn(false) + when(exaIter.getString(2)).thenReturn(file) + + ImportFiles.run(mock[ExaMetadata], exaIter) + + val totalRecords = 5 + + val records: Seq[Seq[Object]] = Seq( + Seq(582244536L, 2, 96982, 1, 0.56, null, null), + Seq(582177839L, 6, 96982, 2, 0.56, null, null), + Seq(582370207L, 0, 96982, 1, 0.56, null, null), + Seq(582344312L, 0, 96982, 5, 0.56, null, null), + Seq(582344274L, 1, 96982, 1, 0.56, null, null) + ).map { seq => + seq.map(_.asInstanceOf[AnyRef]) + } + + verify(exaIter, times(totalRecords)).emit(Seq(any[Object]): _*) + records.foreach { + case rows => + verify(exaIter, times(1)).emit(rows: _*) + } + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala new file mode 100644 index 00000000..7082ef39 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportMetadataSuite.scala @@ -0,0 +1,22 @@ +package com.exasol.cloudetl.scriptclasses + +import com.exasol.ExaMetadata + +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mockito._ + +class ImportMetadataSuite extends BaseImportSuite { + + test("`run` should create a list of files names") { + val exaIter = commonExaIterator(resourceBucket) + when(exaIter.getInteger(2)).thenReturn(2) + + ImportMetadata.run(mock[ExaMetadata], exaIter) + + verify(exaIter, times(3)).emit(anyString(), anyString()) + verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions1.snappy.parquet", "0") + verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions2.snappy.parquet", "1") + verify(exaIter, times(1)).emit(s"$resourcePath/sales_positions_small.snappy.parquet", "0") + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala new file mode 100644 index 00000000..0a8fe23e --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ImportPathSuite.scala @@ -0,0 +1,60 @@ +package com.exasol.cloudetl.scriptclasses + +import scala.collection.JavaConverters._ + +import com.exasol.ExaImportSpecification +import com.exasol.ExaMetadata + +import org.mockito.Mockito._ + +class ImportPathSuite extends BaseImportSuite { + + test("`generateSqlForImportSpec` should create a sql statement") { + val exaMeta = mock[ExaMetadata] + val exaSpec = mock[ExaImportSpecification] + + when(exaMeta.getScriptSchema()).thenReturn(testSchema) + when(exaSpec.getParameters()).thenReturn(params.asJava) + + val rest = + s"""BUCKET_PATH=$s3BucketPath;S3_ENDPOINT=$s3Endpoint;""" + + s"""S3_ACCESS_KEY=$s3AccessKey;S3_SECRET_KEY=$s3SecretKey""" + + val sqlExpected = + s""" + |SELECT + | $testSchema.IMPORT_FILES( + | '$s3BucketPath', '$rest', filename + |) + |FROM ( + | SELECT $testSchema.IMPORT_METADATA( + | '$s3BucketPath', '$rest', nproc() + | ) + |) + |GROUP BY + | partition_index; + """.stripMargin + + assert(ImportPath.generateSqlForImportSpec(exaMeta, exaSpec).trim === sqlExpected.trim) + verify(exaMeta, atLeastOnce).getScriptSchema + verify(exaSpec, times(1)).getParameters + } + + test("`generateSqlForImportSpec` should throw an exception if any required param is missing") { + val exaMeta = mock[ExaMetadata] + val exaSpec = mock[ExaImportSpecification] + + val newParams = params - ("S3_ACCESS_KEY") + + when(exaMeta.getScriptSchema()).thenReturn(testSchema) + when(exaSpec.getParameters()).thenReturn(newParams.asJava) + + val thrown = intercept[IllegalArgumentException] { + ImportPath.generateSqlForImportSpec(exaMeta, exaSpec) + } + + assert(thrown.getMessage === "The required parameter S3_ACCESS_KEY is not defined!") + verify(exaSpec, times(1)).getParameters + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala b/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala index f58fd7f5..1966f21a 100644 --- a/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/source/ParquetSourceSuite.scala @@ -29,7 +29,7 @@ class ParquetSourceSuite extends FunSuite with Matchers { val pattern = s"${resourcesDir.toUri.toString}/*.parquet" val data = ParquetSource(pattern, fs, conf) val iters = data.stream - assert(iters.map(_.size).sum === 1000) + assert(iters.map(_.size).sum === 1005) } }