Skip to content

Commit

Permalink
Use a KRaft cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Maurin committed Feb 2, 2024
1 parent 0b43947 commit cc60fbe
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 25 deletions.
14 changes: 1 addition & 13 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
# Network and IPs are hard coded otherwise a broker might be confused on restarting from a different IP

services:
zookeeper:
build:
context: ./docker/kafka
command: "zookeeper-server-start.sh config/zookeeper.properties"
environment:
KAFKA_HEAP_OPTS: "-Xmx64M -Xms32M"
networks:
aiokafka-test-network:
ipv4_address: 172.16.23.10

kafka1:
build:
context: ./docker/kafka
command: "start-broker.sh"
environment:
BROKER_ID: "1"
ZOOKEEPER: "zookeeper:2181"
KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M"
CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
networks:
aiokafka-test-network:
ipv4_address: 172.16.23.11
Expand All @@ -27,8 +17,6 @@ services:
test: [ "CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 || exit 1" ]
retries: 30
interval: 1s
depends_on:
- zookeeper

kafka2:
extends:
Expand Down
10 changes: 7 additions & 3 deletions docker/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ RUN apt-get update && \
ENV PATH="/opt/kafka/bin:${PATH}"
WORKDIR /opt/kafka

COPY start-broker.sh /opt/kafka/bin/

# API like CreateTopics are redirected automatically to the controller starting to 2.8.0
# https://issues.apache.org/jira/browse/KAFKA-10181
# To reproduce the error, we must use a previous version of kafka
ARG SCALA_VERSION=2.13
ARG KAFKA_VERSION=2.8.1
ARG KAFKA_VERSION=3.3.0

RUN wget -q -O kafka.tgz "https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \
&& tar xfvz kafka.tgz --strip 1 \
&& rm -rf kafka.tgz site-docs

COPY start-broker.sh /opt/kafka/bin/
COPY base-server.properties /opt/kafka/config
13 changes: 13 additions & 0 deletions docker/kafka/base-server.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
26 changes: 19 additions & 7 deletions docker/kafka/start-broker.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
#!/usr/bin/env bash

exec kafka-server-start.sh config/server.properties \
--override "broker.id=${BROKER_ID:-0}" \
--override "zookeeper.connect=${ZOOKEEPER}" \
--override "listeners=PLAINTEXT://:9092" \
--override "advertised.listeners=PLAINTEXT://$(hostname -i):9092" \
--override "listener.security.protocol.map=PLAINTEXT:PLAINTEXT" \
--override "offsets.topic.replication.factor=${OFFSETS_REPLICATIONS:-1}"

echo "
broker.id=${BROKER_ID:-0}
process.roles=broker,controller
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://$(hostname -i):9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS}
controller.listener.names=CONTROLLER
offsets.topic.replication.factor=${OFFSETS_REPLICATIONS:-1}
" > config/runtime.properties

cat config/base-server.properties config/runtime.properties > config/server.properties

if [ ! -e "/tmp/kafka-logs/meta.properties" ]; then
kafka-storage.sh format --config config/server.properties --cluster-id "YPKJRKEhT06jEqGlBQar5A"
fi

exec kafka-server-start.sh config/server.properties
4 changes: 2 additions & 2 deletions tests/manual/topic_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ async def main() -> None:
await client.create_topics(

Check warning on line 14 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L12-L14

Added lines #L12 - L14 were not covered by tests
[NewTopic(name=topic, num_partitions=3, replication_factor=2)]
)
await asyncio.sleep(5)
await asyncio.sleep(1)
print("Deleting topic:", topic)
await client.delete_topics([topic])
await asyncio.sleep(5)
await asyncio.sleep(1)

Check warning on line 20 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L17-L20

Added lines #L17 - L20 were not covered by tests
finally:
await client.close()

Check warning on line 22 in tests/manual/topic_management.py

View check run for this annotation

Codecov / codecov/patch

tests/manual/topic_management.py#L22

Added line #L22 was not covered by tests

Expand Down

0 comments on commit cc60fbe

Please sign in to comment.