Skip to content

Commit

Permalink
Add tests for importing multiple files
Browse files Browse the repository at this point in the history
  • Loading branch information
kaklakariada committed Oct 9, 2023
1 parent 073f3ea commit e4847f2
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 4 deletions.
16 changes: 16 additions & 0 deletions src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,21 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit
withResultSet(assertThat(_, matcher))
()
}

def assertFails(errorMessageMatcher: Matcher[String]): Unit = {
paths.foreach(path => uploadFileToS3(bucketName, path))
val tableBuilder = schema
.createTableBuilder(tableName.toUpperCase(java.util.Locale.ENGLISH))
columns.foreach { case (colName, colType) =>
tableBuilder.column(colName, colType)
}

val table = tableBuilder.build()
val exception = intercept[IllegalStateException] {
importFromS3IntoExasol(schemaName, table, bucketName, s"$baseFileName*", dataFormat)
}
assertThat(exception.getCause().getMessage(), errorMessageMatcher)
()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll with LazyLo
getConnection().createStatement().execute(sql)
} catch {
case exception: Exception =>
throw new AssertionError(s"Failed executing SQL '$sql': ${exception.getMessage()}", exception)
throw new IllegalStateException(s"Failed executing SQL '$sql': ${exception.getMessage()}", exception)
}
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema._
import org.hamcrest.Matchers

class ParquetDataImporterIT extends BaseDataImporter {

Expand Down Expand Up @@ -494,6 +495,73 @@ class ParquetDataImporterIT extends BaseDataImporter {
)
}

test("imports from multiple files") {
MultiParquetChecker(
"required binary name (UTF8); required int32 age;",
Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"),
"multi_col"
)
.addParquetFile { case (writer, schema) =>
writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24))
}
.addParquetFile { case (writer, schema) =>
writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22))
}
.assertResultSet(
table("VARCHAR", "BIGINT")
.row("John", 24L)
.row("Jane", 22L)
.matches()
)
}

test("imports from file with missing field") {
MultiParquetChecker(
"required binary name (UTF8); required int32 age;",
Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"),
"multi_col"
)
.addParquetFile { case (writer, schema) =>
writer.write(new SimpleGroup(schema).append("name", "John"))
}
.addParquetFile { case (writer, schema) =>
writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22))
}
.assertResultSet(
table("VARCHAR", "BIGINT")
.row("John", null)
.row("Jane", 22L)
.matches()
)
}

test("importing from files with different schema fails") {
MultiParquetChecker(
"required binary name (UTF8);",
Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"),
"multi_col"
)
.addParquetFileWithSchema(
MessageTypeParser.parseMessageType("message test { required binary name (UTF8); required int32 age; }"),
{ case (writer, schema) =>
writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24))
}
)
.addParquetFileWithSchema(
MessageTypeParser.parseMessageType("message test { required binary name (UTF8); }"),
{ case (writer, schema) =>
writer.write(
new SimpleGroup(schema).append("name", "Jane")
)
}
)
.assertFails(
Matchers.containsString(
"ExaIterationException: E-UDF-CL-SL-JAVA-1107: emit() takes exactly 2 arguments (1 given)"
)
)
}

case class ParquetChecker(parquetColumn: String, exaColumn: String, tableName: String)
extends AbstractChecker(exaColumn, tableName)
with ParquetTestDataWriter {
Expand All @@ -519,10 +587,16 @@ class ParquetDataImporterIT extends BaseDataImporter {
with ParquetTestDataWriter {
private val parquetSchema = MessageTypeParser.parseMessageType(s"message test { $parquetColumn }")

def addParquetFile(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker = {
def addParquetFile(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker =
addParquetFileWithSchema(parquetSchema, block)

def addParquetFileWithSchema(
customParquetSchema: MessageType,
block: (ParquetWriter[Group], MessageType) => Unit
): MultiParquetChecker = {
val path = addFile()
val writer = getParquetWriter(path, parquetSchema, true)
block(writer, parquetSchema)
val writer = getParquetWriter(path, customParquetSchema, true)
block(writer, customParquetSchema)
writer.close()
this
}
Expand Down

0 comments on commit e4847f2

Please sign in to comment.