Skip to content

Commit

Permalink
#39: Fixed bug related to consuming all offsets given empty records (#40
Browse files Browse the repository at this point in the history
)

Fixes #39

Co-authored-by: Johan Wärlander <[email protected]>
  • Loading branch information
morazow and jwarlander authored Jun 29, 2021
1 parent 7fc77ad commit 32744af
Show file tree
Hide file tree
Showing 16 changed files with 783 additions and 41 deletions.
7 changes: 5 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ jdk:
scala:
- 2.13.6

env:
- EXASOL_DOCKER_VERSION="6.2.15-d1"
- EXASOL_DOCKER_VERSION="7.0.10"

before_install:
- git fetch --tags
- docker pull exasol/docker-db:latest
- docker pull exasol/docker-db:6.2.14-d1
- docker pull "exasol/docker-db:$EXASOL_DOCKER_VERSION"

script:
- ./scripts/ci.sh
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ lazy val root =
project
.in(file("."))
.settings(moduleName := "exasol-kafka-connector-extension")
.settings(version := "1.1.0")
.settings(version := "1.2.0")
.settings(orgSettings)
.settings(buildSettings)
.settings(Settings.projectSettings(scalaVersion))
Expand Down
1 change: 1 addition & 0 deletions doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Releases

* [1.2.0](changes_1.2.0.md)
* [1.1.0](changes_1.1.0.md)
* [1.0.0](changes_1.0.0.md)
* [0.2.1](changes_0.2.1.md)
Expand Down
29 changes: 29 additions & 0 deletions doc/changes/changes_1.2.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Kafka Connector Extension 1.2.0, released 2021-06-29

Code name: Fixed bug in consuming all offsets

## Summary

In this release, we fixed a bug that can go into infinite waiting loop if polled records from Kafka are empty.

## Bugfixes

* #39: Fixed bug related to consuming all offsets when already at end of partition

### Runtime Dependency Updates

* Updated `io.confluent:kafka-avro-serializer:6.1.1` to `6.2.0`

### Test Dependency Updates

* Added `com.exasol:test-db-builder-java:3.2.0`
* Added `com.exasol:exasol-testcontainers:3.5.3`
* Added `com.exasol:hamcrest-resultset-matcher:1.4.0`
* Added `org.testcontainers:kafka:1.15.3`
* Updated `org.mockito:mockito-core:3.11.0` to `3.11.2`
* Updated `io.github.embeddedkafka:embedded-kafka-schema-registry:6.1.1` to `6.2.0`

### Plugin Updates

* Updated `org.scoverage:sbt-coveralls:1.2.7` to `1.3.1`
* Updated `net.bzzt:sbt-reproducible-builds:0.25` to `0.28`
16 changes: 12 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ object Dependencies {
// Runtime dependencies versions
private val ImportExportUDFVersion = "0.2.0"
private val KafkaClientsVersion = "2.8.0"
private val KafkaAvroSerializerVersion = "6.1.1"
private val KafkaAvroSerializerVersion = "6.2.0"
private val ScalaCollectionCompatVersion = "2.4.4"

// Test dependencies versions
private val ScalaTestVersion = "3.2.9"
private val ScalaTestPlusVersion = "1.0.0-M2"
private val MockitoCoreVersion = "3.11.0"
private val KafkaSchemaRegistryVersion = "6.1.1"
private val MockitoCoreVersion = "3.11.2"
private val KafkaSchemaRegistryVersion = "6.2.0"
private val ExasolTestDBBuilderVersion = "3.2.0"
private val ExasolTestContainersVersion = "3.5.3"
private val ExasolHamcrestMatcherVersion = "1.4.0"
private val TestContainersVersion = "1.15.3"

val Resolvers: Seq[Resolver] = Seq(
"jitpack.io" at "https://jitpack.io",
Expand Down Expand Up @@ -45,7 +49,11 @@ object Dependencies {
"io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % KafkaSchemaRegistryVersion
exclude ("com.fasterxml.jackson.core", "jackson-annotations")
exclude ("com.fasterxml.jackson.core", "jackson-core")
exclude ("com.fasterxml.jackson.core", "jackson-databind")
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.exasol" % "exasol-testcontainers" % ExasolTestContainersVersion,
"com.exasol" % "test-db-builder-java" % ExasolTestDBBuilderVersion,
"com.exasol" % "hamcrest-resultset-matcher" % ExasolHamcrestMatcherVersion,
"org.testcontainers" % "kafka" % TestContainersVersion
).map(_ % Test)

lazy val AllDependencies: Seq[ModuleID] = RuntimeDependencies ++ TestDependencies
Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.8.2")
// Adds SBT Coveralls plugin for uploading Scala code coverage to
// https://coveralls.io
// https://github.com/scoverage/sbt-coveralls
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.1")

// Adds a `dependencyUpdates` task to check Maven repositories for
// dependency updates
Expand Down Expand Up @@ -51,7 +51,7 @@ addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.16")

// Adds a `sbt-reproducible-builds` plugin
// https://github.com/raboof/sbt-reproducible-builds
addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.25")
addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.28")

// Setup this and project/project/plugins.sbt for formatting
// project/*.scala files with scalafmt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.exasol.cloudetl.kafka

import scala.util.Random
import com.exasol.ExaIterator
import net.manub.embeddedkafka.schemaregistry.EmbeddedKafka
import io.github.embeddedkafka.schemaregistry.EmbeddedKafka
import org.mockito.ArgumentMatchers.argThat
import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfterAll
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.exasol.cloudetl.kafka

import java.io.File
import java.nio.file.Paths
import java.sql.Connection

import com.exasol.containers.ExasolContainer
import com.exasol.dbbuilder.dialects.Column
import com.exasol.dbbuilder.dialects.exasol.ExasolObjectFactory
import com.exasol.dbbuilder.dialects.exasol.ExasolSchema
import com.exasol.dbbuilder.dialects.exasol.udf.UdfScript

import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

trait BaseDockerIntegrationTest extends AnyFunSuite with BeforeAndAfterAll {
private[this] val JAR_DIRECTORY_PATTERN = "scala-"
private[this] val JAR_NAME_PATTERN = "exasol-kafka-connector-extension-"
private[this] val DEFAULT_EXASOL_DOCKER_IMAGE = "7.0.10"

val network = DockerNamedNetwork("kafka-it-tests", true)
val exasolContainer = {
val c: ExasolContainer[_] = new ExasolContainer(getExasolDockerImageVersion())
c.withNetwork(network)
c.withReuse(true)
c
}
var factory: ExasolObjectFactory = _
var schema: ExasolSchema = _
var connection: Connection = _
val assembledJarName = getAssembledJarName()

override def beforeAll(): Unit = {
exasolContainer.start()
connection = getConnection()
}

override def afterAll(): Unit = {
connection.close()
exasolContainer.stop()
}

def installKafkaConnector(schemaName: String): Unit = {
executeStmt(s"DROP SCHEMA IF EXISTS $schemaName CASCADE;")
factory = new ExasolObjectFactory(getConnection())
schema = factory.createSchema(schemaName)
createKafkaImportDeploymentScripts()
uploadJarToBucket()
}

def executeStmt(sql: String): Unit = {
connection.createStatement().execute(sql)
()
}

def executeQuery(sql: String): java.sql.ResultSet =
connection.createStatement().executeQuery(sql)

private[this] def getConnection(): java.sql.Connection =
exasolContainer.createConnection("")

private[this] def getAssembledJarName(): String = {
val jarDir = findFileOrDirectory("target", JAR_DIRECTORY_PATTERN)
findFileOrDirectory("target/" + jarDir, JAR_NAME_PATTERN)
}

private[this] def createKafkaImportDeploymentScripts(): Unit = {
val jarPath = s"/buckets/bfsdefault/default/$assembledJarName"
schema
.createUdfBuilder("KAFKA_CONSUMER")
.language(UdfScript.Language.JAVA)
.inputType(UdfScript.InputType.SET)
.emits()
.bucketFsContent("com.exasol.cloudetl.kafka.KafkaConsumerQueryGenerator", jarPath)
.build()
schema
.createUdfBuilder("KAFKA_IMPORT")
.language(UdfScript.Language.JAVA)
.inputType(UdfScript.InputType.SET)
.emits()
.bucketFsContent("com.exasol.cloudetl.kafka.KafkaTopicDataImporter", jarPath)
.build()
schema
.createUdfBuilder("KAFKA_METADATA")
.language(UdfScript.Language.JAVA)
.inputType(UdfScript.InputType.SET)
.parameter("params", "VARCHAR(2000)")
.parameter("kafka_partition", "DECIMAL(18, 0)")
.parameter("kafka_offset", "DECIMAL(36, 0)")
.emits(
new Column("partition_index", "DECIMAL(18, 0)"),
new Column("max_offset", "DECIMAL(36, 0)")
)
.bucketFsContent("com.exasol.cloudetl.kafka.KafkaTopicMetadataReader", jarPath)
.build()
()
}

private[this] def uploadJarToBucket(): Unit = {
val jarDir = findFileOrDirectory("target", JAR_DIRECTORY_PATTERN)
val jarPath = Paths.get("target", jarDir, assembledJarName)
exasolContainer.getDefaultBucket.uploadFile(jarPath, assembledJarName)
}

private[this] def findFileOrDirectory(searchDirectory: String, name: String): String = {
val files = listDirectoryFiles(searchDirectory)
val jarFile = files.find(_.getName.contains(name))
jarFile match {
case Some(jarFilename) => jarFilename.getName
case None =>
throw new IllegalArgumentException(
s"Cannot find a file or a directory with pattern '$name' in '$searchDirectory'"
)
}
}

private[this] def listDirectoryFiles(directoryName: String): List[File] = {
val directory = new File(directoryName)
if (directory.exists && directory.isDirectory) {
directory.listFiles.toList
} else {
List.empty[File]
}
}

private[this] def getExasolDockerImageVersion(): String =
System.getProperty("EXASOL_DOCKER_VERSION", DEFAULT_EXASOL_DOCKER_IMAGE)

}
Loading

0 comments on commit 32744af

Please sign in to comment.