diff --git a/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala b/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala index 94b4163c..aa49ed89 100644 --- a/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala +++ b/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala @@ -70,5 +70,21 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit withResultSet(assertThat(_, matcher)) () } + + def assertFails(errorMessageMatcher: Matcher[String]): Unit = { + paths.foreach(path => uploadFileToS3(bucketName, path)) + val tableBuilder = schema + .createTableBuilder(tableName.toUpperCase(java.util.Locale.ENGLISH)) + columns.foreach { case (colName, colType) => + tableBuilder.column(colName, colType) + } + + val table = tableBuilder.build() + val exception = intercept[IllegalStateException] { + importFromS3IntoExasol(schemaName, table, bucketName, s"$baseFileName*", dataFormat) + } + assertThat(exception.getCause().getMessage(), errorMessageMatcher) + () + } } } diff --git a/src/test/scala/com/exasol/cloudetl/it/BaseIntegrationTest.scala b/src/test/scala/com/exasol/cloudetl/it/BaseIntegrationTest.scala index 5163de97..b4df9ac7 100644 --- a/src/test/scala/com/exasol/cloudetl/it/BaseIntegrationTest.scala +++ b/src/test/scala/com/exasol/cloudetl/it/BaseIntegrationTest.scala @@ -53,7 +53,7 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll with LazyLo getConnection().createStatement().execute(sql) } catch { case exception: Exception => - throw new AssertionError(s"Failed executing SQL '$sql': ${exception.getMessage()}", exception) + throw new IllegalStateException(s"Failed executing SQL '$sql': ${exception.getMessage()}", exception) } () } diff --git a/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala b/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala index 5aa892b3..61341cde 100644 --- a/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala +++ b/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala @@ -19,6 +19,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.io.api.Binary import org.apache.parquet.schema._ +import org.hamcrest.Matchers class ParquetDataImporterIT extends BaseDataImporter { @@ -494,6 +495,73 @@ class ParquetDataImporterIT extends BaseDataImporter { ) } + test("imports from multiple files") { + MultiParquetChecker( + "required binary name (UTF8); required int32 age;", + Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), + "multi_col" + ) + .addParquetFile { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24)) + } + .addParquetFile { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22)) + } + .assertResultSet( + table("VARCHAR", "BIGINT") + .row("John", 24L) + .row("Jane", 22L) + .matches() + ) + } + + test("imports from file with missing field") { + MultiParquetChecker( + "required binary name (UTF8); required int32 age;", + Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), + "multi_col" + ) + .addParquetFile { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "John")) + } + .addParquetFile { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22)) + } + .assertResultSet( + table("VARCHAR", "BIGINT") + .row("John", null) + .row("Jane", 22L) + .matches() + ) + } + + test("importing from files with different schema fails") { + MultiParquetChecker( + "required binary name (UTF8);", + Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), + "multi_col" + ) + .addParquetFileWithSchema( + MessageTypeParser.parseMessageType("message test { required binary name (UTF8); required int32 age; }"), + { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24)) + } + ) + .addParquetFileWithSchema( + MessageTypeParser.parseMessageType("message test { required binary name (UTF8); }"), + { case (writer, schema) => + writer.write( + new SimpleGroup(schema).append("name", "Jane") + ) + } + ) + .assertFails( + Matchers.containsString( + "ExaIterationException: E-UDF-CL-SL-JAVA-1107: emit() takes exactly 2 arguments (1 given)" + ) + ) + } + case class ParquetChecker(parquetColumn: String, exaColumn: String, tableName: String) extends AbstractChecker(exaColumn, tableName) with ParquetTestDataWriter { @@ -519,10 +587,16 @@ class ParquetDataImporterIT extends BaseDataImporter { with ParquetTestDataWriter { private val parquetSchema = MessageTypeParser.parseMessageType(s"message test { $parquetColumn }") - def addParquetFile(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker = { + def addParquetFile(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker = + addParquetFileWithSchema(parquetSchema, block) + + def addParquetFileWithSchema( + customParquetSchema: MessageType, + block: (ParquetWriter[Group], MessageType) => Unit + ): MultiParquetChecker = { val path = addFile() - val writer = getParquetWriter(path, parquetSchema, true) - block(writer, parquetSchema) + val writer = getParquetWriter(path, customParquetSchema, true) + block(writer, customParquetSchema) writer.close() this }