diff --git a/Makefile b/Makefile index 1be335e3..70117c7e 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,10 @@ ci-test-unit: ci-test-all: pytest -s -v --log-format="%(asctime)s %(levelname)s %(message)s" --log-level DEBUG --cov aiokafka --cov-report xml --color=yes --docker-image $(DOCKER_IMAGE) $(FLAGS) tests +.PHONY: manual-test +manual-test: + docker compose up --build --exit-code-from aiokafka --attach aiokafka + coverage.xml: .coverage coverage xml diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..7578bb2c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,76 @@ +# Network and IPs are hard coded otherwise a broker might be confused on restarting from a different IP + +services: + zookeeper: + build: + context: ./docker/kafka + command: "zookeeper-server-start.sh config/zookeeper.properties" + environment: + KAFKA_HEAP_OPTS: "-Xmx64M -Xms32M" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.10 + + kafka1: + build: + context: ./docker/kafka + command: "start-broker.sh" + environment: + BROKER_ID: "1" + ZOOKEEPER: "zookeeper:2181" + KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.11 + stop_grace_period: 30s + healthcheck: + test: [ "CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 || exit 1" ] + retries: 30 + interval: 1s + depends_on: + - zookeeper + + kafka2: + extends: + service: kafka1 + environment: + BROKER_ID: "2" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.12 + + kafka3: + extends: + service: kafka1 + environment: + BROKER_ID: "3" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.13 + + + aiokafka: + build: + context: . + dockerfile: docker/aiokafka/Dockerfile + command: [ "python", "-u", "-m", "tests.manual.topic_management" ] + environment: + BOOTSTRAP_SERVERS: "kafka3:9092,kafka2:9092,kafka1:9092" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.100 + depends_on: + kafka1: + condition: service_healthy + kafka2: + condition: service_healthy + kafka3: + condition: service_healthy + +networks: + aiokafka-test-network: + ipam: + driver: default + config: + - subnet: 172.16.23.0/24 + ip_range: 172.28.23.0/24 diff --git a/docker/aiokafka/Dockerfile b/docker/aiokafka/Dockerfile new file mode 100644 index 00000000..e7071d2c --- /dev/null +++ b/docker/aiokafka/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.12 + +WORKDIR /opt/project + +COPY setup.py pyproject.toml requirements-* /opt/project/ + +RUN pip install -r requirements-ci.txt + +COPY aiokafka /opt/project/aiokafka +COPY tests /opt/project/tests diff --git a/docker/kafka/Dockerfile b/docker/kafka/Dockerfile new file mode 100644 index 00000000..57db6205 --- /dev/null +++ b/docker/kafka/Dockerfile @@ -0,0 +1,17 @@ +FROM ubuntu:22.04 + +RUN apt-get update && \ + apt-get install -y --no-install-recommends default-jre wget && \ + rm -rf /var/lib/apt/lists/* + +ENV PATH="/opt/kafka/bin:${PATH}" +WORKDIR /opt/kafka + +COPY start-broker.sh /opt/kafka/bin/ + +ARG SCALA_VERSION=2.13 +ARG KAFKA_VERSION=2.8.1 + +RUN wget -q -O kafka.tgz "https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \ + && tar xfvz kafka.tgz --strip 1 \ + && rm -rf kafka.tgz site-docs diff --git a/docker/kafka/start-broker.sh b/docker/kafka/start-broker.sh new file mode 100755 index 00000000..4636c09e --- /dev/null +++ b/docker/kafka/start-broker.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +exec kafka-server-start.sh config/server.properties \ + --override "broker.id=${BROKER_ID:-0}" \ + --override "zookeeper.connect=${ZOOKEEPER}" \ + --override "listeners=PLAINTEXT://:9092" \ + --override "advertised.listeners=PLAINTEXT://$(hostname -i):9092" \ + --override "listener.security.protocol.map=PLAINTEXT:PLAINTEXT" \ + --override "offsets.topic.replication.factor=${OFFSETS_REPLICATIONS:-1}" diff --git a/tests/manual/__init__.py b/tests/manual/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/manual/topic_management.py b/tests/manual/topic_management.py new file mode 100644 index 00000000..3a13351d --- /dev/null +++ b/tests/manual/topic_management.py @@ -0,0 +1,27 @@ +import asyncio +import os + +from aiokafka.admin import AIOKafkaAdminClient, NewTopic + + +async def main() -> None: + client = AIOKafkaAdminClient(bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"]) + await client.start() + try: + for i in range(20): + topic = f"test-{i}" + print("Creating topic:", topic) + await client.create_topics( + [NewTopic(name=topic, num_partitions=3, replication_factor=2)] + ) + await asyncio.sleep(5) + print("Deleting topic:", topic) + await client.delete_topics([topic]) + await asyncio.sleep(5) + finally: + await client.close() + + +if __name__ == "__main__": + # Start the asyncio loop by running the main function + asyncio.run(main())