Skip to content

Commit

Permalink
Fix kafka container host address in VM-VM case
Browse files Browse the repository at this point in the history
  • Loading branch information
Shmuma committed Oct 10, 2023
1 parent 4c85ff4 commit 014a64d
Showing 1 changed file with 42 additions and 26 deletions.
68 changes: 42 additions & 26 deletions src/test/java/com/exasol/cloudetl/kafka/KafkaTestSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;


class KafkaContainerTweaked extends org.testcontainers.containers.KafkaContainer {
public KafkaContainerTweaked(DockerImageName dockerImageName) {
super(dockerImageName);
}

@Override
public String getHost() {
return "172.17.0.1";
}
}

class KafkaTestSetup implements AutoCloseable {

private static final Logger LOG = Logger.getLogger(KafkaTestSetup.class.getName());
Expand Down Expand Up @@ -40,33 +52,37 @@ public String getTopicName() {
static KafkaTestSetup create() throws ExecutionException, InterruptedException {

final DockerImageName dockerImageName = DockerImageName.parse("confluentinc/cp-kafka:latest");
// @SuppressWarnings("resource")
// final GenericContainer<?> zookeeperContainer = new GenericContainer<>("confluentinc/cp-zookeeper:7.4.1")
// // .withNetwork(kafka.getNetwork())
// // .withNetworkAliases("zookeeper")
// // .withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
// .withExposedPorts(ZOOKEEPER_PORT)// ;
// .withEnv("ZOOKEEPER_CLIENT_PORT", Integer.toString(ZOOKEEPER_PORT));
// // c.withReuse(true)
// zookeeperContainer.start();

// final String zookeeperConnString = "172.17.0.1" + ":" + zookeeperContainer.getMappedPort(ZOOKEEPER_PORT);
// @SuppressWarnings("resource") // Container will be stopped in close() method
// final org.testcontainers.containers.KafkaContainer kafkaContainer = new KafkaContainer(
// DockerImageName.parse("confluentinc/cp-kafka:7.4.1"))//
// .withExternalZookeeper(zookeeperConnString)
// // .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:$ZOOKEEPER_PORT")//
// .withEnv("KAFKA_BROKER_ID", "0")//
// // .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT")//
// // .withEnv("KAFKA_LISTENERS", "INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:" + KAFKA_EXTERNAL_PORT)
// // .withEnv("KAFKA_ADVERTISED_LISTENERS",
// // "INTERNAL://kafka01:9092,EXTERNAL://127.0.0.1:" + KAFKA_EXTERNAL_PORT)
// // .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")//
// .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")//
// .withExtraHost("kafka01", "127.0.0.1")//
// .dependsOn(zookeeperContainer);// .withEmbeddedZookeeper();//
//
// LOG.info("zookeeper: " + zookeeperConnString); // .withKraft();
@SuppressWarnings("resource")
final GenericContainer<?> zookeeperContainer = new GenericContainer<>("confluentinc/cp-zookeeper:7.4.1")
// .withNetwork(kafka.getNetwork())
// .withNetworkAliases("zookeeper")
// .withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
.withExposedPorts(ZOOKEEPER_PORT)// ;
.withEnv("ZOOKEEPER_CLIENT_PORT", Integer.toString(ZOOKEEPER_PORT));
// c.withReuse(true)
zookeeperContainer.start();

final String zookeeperConnString = "172.17.0.1" + ":" + zookeeperContainer.getMappedPort(ZOOKEEPER_PORT);
@SuppressWarnings("resource") // Container will be stopped in close() method
final org.testcontainers.containers.KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.4.1"))//
.withExternalZookeeper(zookeeperConnString)
// .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:$ZOOKEEPER_PORT")//
.withEnv("KAFKA_BROKER_ID", "0")//
// .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT")//
// .withEnv("KAFKA_LISTENERS", "INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:" + KAFKA_EXTERNAL_PORT)
// .withEnv("KAFKA_ADVERTISED_LISTENERS",
// "INTERNAL://kafka01:9092,EXTERNAL://127.0.0.1:" + KAFKA_EXTERNAL_PORT)
// .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")//
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")//
.withExtraHost("kafka01", "127.0.0.1")//
.dependsOn(zookeeperContainer);// .withEmbeddedZookeeper();//

LOG.info("zookeeper: " + zookeeperConnString); // .withKraft();
final org.testcontainers.containers.KafkaContainer kafkaContainer = new KafkaContainerTweaked(
DockerImageName.parse("confluentinc/cp-kafka:7.4.1"))
.withReuse(true);
kafkaContainer.start();
// return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093));
// container.start();
Expand Down

0 comments on commit 014a64d

Please sign in to comment.