diff --git a/docker-compose.yml b/docker-compose.yml index 7578bb2c..2a36067d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,24 +1,14 @@ # Network and IPs are hard coded otherwise a broker might be confused on restarting from a different IP services: - zookeeper: - build: - context: ./docker/kafka - command: "zookeeper-server-start.sh config/zookeeper.properties" - environment: - KAFKA_HEAP_OPTS: "-Xmx64M -Xms32M" - networks: - aiokafka-test-network: - ipv4_address: 172.16.23.10 - kafka1: build: context: ./docker/kafka command: "start-broker.sh" environment: BROKER_ID: "1" - ZOOKEEPER: "zookeeper:2181" KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M" + CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093" networks: aiokafka-test-network: ipv4_address: 172.16.23.11 @@ -27,8 +17,6 @@ services: test: [ "CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 || exit 1" ] retries: 30 interval: 1s - depends_on: - - zookeeper kafka2: extends: diff --git a/docker/kafka/Dockerfile b/docker/kafka/Dockerfile index 57db6205..f38be719 100644 --- a/docker/kafka/Dockerfile +++ b/docker/kafka/Dockerfile @@ -7,11 +7,15 @@ RUN apt-get update && \ ENV PATH="/opt/kafka/bin:${PATH}" WORKDIR /opt/kafka -COPY start-broker.sh /opt/kafka/bin/ - +# API like CreateTopics are redirected automatically to the controller starting to 2.8.0 +# https://issues.apache.org/jira/browse/KAFKA-10181 +# To reproduce the error, we must use a previous version of kafka ARG SCALA_VERSION=2.13 -ARG KAFKA_VERSION=2.8.1 +ARG KAFKA_VERSION=3.3.0 RUN wget -q -O kafka.tgz "https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \ && tar xfvz kafka.tgz --strip 1 \ && rm -rf kafka.tgz site-docs + +COPY start-broker.sh /opt/kafka/bin/ +COPY base-server.properties /opt/kafka/config \ No newline at end of file diff --git a/docker/kafka/base-server.properties b/docker/kafka/base-server.properties new file mode 100644 index 00000000..8a617425 --- /dev/null +++ b/docker/kafka/base-server.properties @@ -0,0 +1,13 @@ +num.network.threads=3 +num.io.threads=8 +socket.send.buffer.bytes=102400 +socket.receive.buffer.bytes=102400 +socket.request.max.bytes=104857600 +log.dirs=/tmp/kafka-logs +num.partitions=1 +num.recovery.threads.per.data.dir=1 +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 +log.retention.hours=168 +log.retention.check.interval.ms=300000 diff --git a/docker/kafka/start-broker.sh b/docker/kafka/start-broker.sh index 4636c09e..7ba06ba5 100755 --- a/docker/kafka/start-broker.sh +++ b/docker/kafka/start-broker.sh @@ -1,9 +1,21 @@ #!/usr/bin/env bash -exec kafka-server-start.sh config/server.properties \ - --override "broker.id=${BROKER_ID:-0}" \ - --override "zookeeper.connect=${ZOOKEEPER}" \ - --override "listeners=PLAINTEXT://:9092" \ - --override "advertised.listeners=PLAINTEXT://$(hostname -i):9092" \ - --override "listener.security.protocol.map=PLAINTEXT:PLAINTEXT" \ - --override "offsets.topic.replication.factor=${OFFSETS_REPLICATIONS:-1}" + +echo " +broker.id=${BROKER_ID:-0} +process.roles=broker,controller +listeners=PLAINTEXT://:9092,CONTROLLER://:9093 +advertised.listeners=PLAINTEXT://$(hostname -i):9092 +listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT +controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS} +controller.listener.names=CONTROLLER +offsets.topic.replication.factor=${OFFSETS_REPLICATIONS:-1} +" > config/runtime.properties + +cat config/base-server.properties config/runtime.properties > config/server.properties + +if [ ! -e "/tmp/kafka-logs/meta.properties" ]; then + kafka-storage.sh format --config config/server.properties --cluster-id "YPKJRKEhT06jEqGlBQar5A" +fi + +exec kafka-server-start.sh config/server.properties diff --git a/tests/manual/topic_management.py b/tests/manual/topic_management.py index 3a13351d..801342e2 100644 --- a/tests/manual/topic_management.py +++ b/tests/manual/topic_management.py @@ -14,10 +14,10 @@ async def main() -> None: await client.create_topics( [NewTopic(name=topic, num_partitions=3, replication_factor=2)] ) - await asyncio.sleep(5) + await asyncio.sleep(1) print("Deleting topic:", topic) await client.delete_topics([topic]) - await asyncio.sleep(5) + await asyncio.sleep(1) finally: await client.close()