diff --git a/pom.xml b/pom.xml index ee16efc..6f1c80d 100644 --- a/pom.xml +++ b/pom.xml @@ -380,6 +380,10 @@ CVE-2023-4586 CVE-2020-36641 + CVE-2023-40167 + CVE-2023-36479 + CVE-2023-43642 + diff --git a/src/test/java/com/exasol/cloudetl/kafka/ExtensionIT.java b/src/test/java/com/exasol/cloudetl/kafka/ExtensionIT.java index 72d34fa..50a0429 100644 --- a/src/test/java/com/exasol/cloudetl/kafka/ExtensionIT.java +++ b/src/test/java/com/exasol/cloudetl/kafka/ExtensionIT.java @@ -281,7 +281,8 @@ private void executeKafkaImport(final Table targetTable, final KafkaTestSetup ka // topics. Avro is set as default record value format. " TOPIC_NAME = '" + kafkaSetup.getTopicName() + "'\n" + // " TABLE_NAME = '" + targetTable.getFullyQualifiedName() + "'\n" + // - " GROUP_ID = 'exasol-kafka-udf-consumers'\n"; + " GROUP_ID = 'exaudf' \n" + // + " CONSUME_ALL_OFFSETS = 'true' \n"; LOGGER.info("Executing query '" + sql + "'"); executeStatement(sql); } diff --git a/src/test/java/com/exasol/cloudetl/kafka/KafkaTestSetup.java b/src/test/java/com/exasol/cloudetl/kafka/KafkaTestSetup.java index 7933670..e8c0288 100644 --- a/src/test/java/com/exasol/cloudetl/kafka/KafkaTestSetup.java +++ b/src/test/java/com/exasol/cloudetl/kafka/KafkaTestSetup.java @@ -1,13 +1,14 @@ package com.exasol.cloudetl.kafka; -import java.util.Collections; -import java.util.Properties; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.logging.Logger; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.KafkaFuture; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; class KafkaTestSetup implements AutoCloseable { @@ -31,17 +32,47 @@ public String getTopicName() { return this.topicName; } + private static final int ZOOKEEPER_PORT = 2181; + private static final int KAFKA_EXTERNAL_PORT = 29092; + private static final int SCHEMA_REGISTRY_PORT = 8081; + private static final int ADMIN_TIMEOUT_MILLIS = 5000; + static KafkaTestSetup create() throws ExecutionException, InterruptedException { + + final DockerImageName dockerImageName = DockerImageName.parse("confluentinc/cp-kafka:latest"); + @SuppressWarnings("resource") + final GenericContainer zookeeperContainer = new GenericContainer<>("confluentinc/cp-zookeeper:7.4.1") + // .withNetwork(kafka.getNetwork()) + // .withNetworkAliases("zookeeper") + // .withEnv("ZOOKEEPER_CLIENT_PORT", "2181"); + .withExposedPorts(ZOOKEEPER_PORT)// ; + .withEnv("ZOOKEEPER_CLIENT_PORT", Integer.toString(ZOOKEEPER_PORT)); + // c.withReuse(true) + zookeeperContainer.start(); + + final String zookeeperConnString = "172.17.0.1" + ":" + zookeeperContainer.getMappedPort(ZOOKEEPER_PORT); @SuppressWarnings("resource") // Container will be stopped in close() method - final DockerImageName dockerImageName = DockerImageName.parse("confluentinc/cp-kafka:6.2.1"); - final org.testcontainers.containers.KafkaContainer container = new org.testcontainers.containers.KafkaContainer( - dockerImageName); - container.start(); + final org.testcontainers.containers.KafkaContainer kafkaContainer = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:7.4.1"))// + .withExternalZookeeper(zookeeperConnString) + // .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:$ZOOKEEPER_PORT")// + .withEnv("KAFKA_BROKER_ID", "0")// + // .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT")// + // .withEnv("KAFKA_LISTENERS", "INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:" + KAFKA_EXTERNAL_PORT) + // .withEnv("KAFKA_ADVERTISED_LISTENERS", + // "INTERNAL://kafka01:9092,EXTERNAL://127.0.0.1:" + KAFKA_EXTERNAL_PORT) + // .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")// + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")// + .withExtraHost("kafka01", "127.0.0.1")// + .dependsOn(zookeeperContainer);// .withEmbeddedZookeeper();// + + LOG.info("zookeeper: " + zookeeperConnString); // .withKraft(); + kafkaContainer.start(); // return String.format("PLAINTEXT://%s:%s", this.getHost(), this.getMappedPort(9093)); // container.start(); final Properties properties = new Properties(); - properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, container.getBootstrapServers()); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); final String topicName = "testTopic"; LOG.info("Topicname: " + topicName); // CREATE TOPIC AND WAIT @@ -53,32 +84,39 @@ static KafkaTestSetup create() throws ExecutionException, InterruptedException { final CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic)); final KafkaFuture future = result.values().get(topicName); - final var createTopicsResult = future.get(); + future.get(); + + final Set topicNames = admin.listTopics().names().get(); + + LOG.info("Succesfully created topic"); } catch (final Exception ex) { LOG.warning("Exception occurred during Kafka topic creation: '" + ex.getMessage() + "'"); } + // PRODUCE + final Properties producerProps = new Properties(); + producerProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + producerProps.put("acks", "all"); + producerProps.put("retries", 0); + producerProps.put("batch.size", 16384); + producerProps.put("linger.ms", 1); + producerProps.put("buffer.memory", 33554432); + producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + final Producer producer = new KafkaProducer<>(producerProps); try { - // PRODUCE - final Properties producerProps = new Properties(); - producerProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, container.getBootstrapServers()); - producerProps.put("acks", "all"); - producerProps.put("retries", 0); - producerProps.put("batch.size", 16384); - producerProps.put("linger.ms", 1); - producerProps.put("buffer.memory", 33554432); - producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - final Producer producer = new KafkaProducer<>(producerProps); + // for (int i = 0; i < 100; i++) producer.send(new ProducerRecord(topicName, Integer.toString(1), "OK")); producer.send(new ProducerRecord(topicName, Integer.toString(2), "WARN")); - producer.close(); + } catch (final Exception ex) { LOG.warning("Exception occurred producing Kafka records: '" + ex.getMessage() + "'"); + } finally { + producer.close(); } // RETURN kafkaTestSetup object - return new KafkaTestSetup(container, topicName); + return new KafkaTestSetup(kafkaContainer, topicName); } @Override