Skip to content

Commit

Permalink
fix 'emit() takes exactly 4 arguments (3 given)'
Browse files Browse the repository at this point in the history
  • Loading branch information
pj-spoelders committed Oct 11, 2023
1 parent 743d8c9 commit 107498a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-build.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions src/test/java/com/exasol/cloudetl/kafka/ExtensionIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,22 @@ private void verifyImportWorks() {

final Table targetTable = schema.createTableBuilder("TARGET")

.column("SENSOR_ID", "INTEGER").column("STATUS", "VARCHAR(10)") //
.column("KAFKA_PARTITION", "DECIMAL(18, 0)").column("KAFKA_OFFSET", "DECIMAL(36, 0)").build();
//.column("SENSOR_ID", "INTEGER")//
.column("STATUS", "VARCHAR(10)") //
.column("KAFKA_PARTITION", "DECIMAL(18, 0)")//
.column("KAFKA_OFFSET", "DECIMAL(36, 0)")//
.build();
// CREATE CONNECTION (optional, see
// https://github.com/exasol/kafka-connector-extension/blob/main/doc/user_guide/user_guide.md#importing-records)

executeKafkaImport(targetTable, kafkaSetup);

assertQueryResult(
"select sensor_id, status, kafka_partition,kafka_offset from " + targetTable.getFullyQualifiedName()
+ " order by sensor_id",
table("BIGINT", "VARCHAR", "DECIMAL", "DECIMAL") //
.row(1L, "OK", 1.0, 1.0) //
.row(2L, "WARN", 1.0, 1.0) //
"select status, kafka_partition, kafka_offset from " + targetTable.getFullyQualifiedName()
+ " order by status",
table("VARCHAR", "DECIMAL", "DECIMAL") //
.row( "OK", 0L, 0L) //
.row( "WARN", 0L, 1L) //
.matches());
} finally {
schema.drop();
Expand Down
36 changes: 3 additions & 33 deletions src/test/java/com/exasol/cloudetl/kafka/KafkaTestSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaFuture;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;


class KafkaContainerTweaked extends org.testcontainers.containers.KafkaContainer {
public KafkaContainerTweaked(DockerImageName dockerImageName) {
public KafkaContainerTweaked(final DockerImageName dockerImageName) {
super(dockerImageName);
}

Expand Down Expand Up @@ -51,37 +48,10 @@ public String getTopicName() {

static KafkaTestSetup create() throws ExecutionException, InterruptedException {

final DockerImageName dockerImageName = DockerImageName.parse("confluentinc/cp-kafka:latest");
// @SuppressWarnings("resource")
// final GenericContainer<?> zookeeperContainer = new GenericContainer<>("confluentinc/cp-zookeeper:7.4.1")
// // .withNetwork(kafka.getNetwork())
// // .withNetworkAliases("zookeeper")
// // .withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
// .withExposedPorts(ZOOKEEPER_PORT)// ;
// .withEnv("ZOOKEEPER_CLIENT_PORT", Integer.toString(ZOOKEEPER_PORT));
// // c.withReuse(true)
// zookeeperContainer.start();

// final String zookeeperConnString = "172.17.0.1" + ":" + zookeeperContainer.getMappedPort(ZOOKEEPER_PORT);
// @SuppressWarnings("resource") // Container will be stopped in close() method
// final org.testcontainers.containers.KafkaContainer kafkaContainer = new KafkaContainer(
// DockerImageName.parse("confluentinc/cp-kafka:7.4.1"))//
// .withExternalZookeeper(zookeeperConnString)
// // .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:$ZOOKEEPER_PORT")//
// .withEnv("KAFKA_BROKER_ID", "0")//
// // .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT")//
// // .withEnv("KAFKA_LISTENERS", "INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:" + KAFKA_EXTERNAL_PORT)
// // .withEnv("KAFKA_ADVERTISED_LISTENERS",
// // "INTERNAL://kafka01:9092,EXTERNAL://127.0.0.1:" + KAFKA_EXTERNAL_PORT)
// // .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")//
// .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")//
// .withExtraHost("kafka01", "127.0.0.1")//
// .dependsOn(zookeeperContainer);// .withEmbeddedZookeeper();//
//
// LOG.info("zookeeper: " + zookeeperConnString); // .withKraft();
@SuppressWarnings("resource")
final org.testcontainers.containers.KafkaContainer kafkaContainer = new KafkaContainerTweaked(
DockerImageName.parse("confluentinc/cp-kafka:7.4.1"))
DockerImageName.parse("confluentinc/cp-kafka:7.4.1"))//
.withEmbeddedZookeeper()//
.withReuse(true);
kafkaContainer.start();
// return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093));
Expand Down

0 comments on commit 107498a

Please sign in to comment.