Skip to content

Python-Tools/kafkaproxy

Repository files navigation

kafkaproxy

kafka生产者和消费者的代理工具.

代理对象用于推迟初始化.我们可以在需要的地方用代理对象的全局变量直接编写逻辑,避免被代理的对象来回在函数间传递.

特性

  • 支持代理kafka-python,aiokafkaconfluent-kafka的生产者消费者对象.
  • 提供统一通用的生产者消费者接口包装

安装

  • 只安装本项目不安装被代理对象的依赖: pip install kafkaproxy
  • 安装本项目同时确定要代理的对象包为kafka-python: pip install kafkaproxy[kafka]
  • 安装本项目同时确定要代理的对象包为aiokafka: pip install kafkaproxy[aio]
  • 安装本项目同时确定要代理的对象包为confluent-kafka: pip install kafkaproxy[confluent]

使用

本项目支持代理3种kafka模块中的对应模块,使用枚举KafkaType中的取值在调用方法initialize_from_addresses初始化时指定. 代理对象除了原样代理对象外还提供了生产者和消费者的统一通用接口包装. 由于对应的方法是动态绑定的,因此如果需要他们的typehints可以用typing.cast将代理对象转化为对应的协议对象

  • 同步接口生产者使用ProducerProtocol
  • 异步接口生产者使用AioProducerProtocol
  • 同步接消费产者使用ConsumerProtocol
  • 异步接消费产者使用AioConsumerProtocol

代理kafka-pythonconfluent-kafka生产者

from kafkaproxy import ProducerProxy, KafkaType, ProducerProtocol
from typing import cast
import time
kafkap = ProducerProxy()


def run() -> None:
    p = cast(ProducerProtocol, kafkap)
    with p.mount() as cli:
        for i in range(10):
            cli.publish("topic1", f"send {i}")
            time.sleep(0.1)


# kafkap.initialize_from_addresses("localhost:9094", kafka_type=KafkaType.ConfluentKafka, acks="all")
kafkap.initialize_from_addresses("localhost:9094", kafka_type=KafkaType.Kafka)
try:
    print("start publishing")
    run()
finally:
    print("stoped")

代理kafka-pythonconfluent-kafka消费者

from kafkaproxy import ConsumerProxy, KafkaType, ConsumerProtocol
from typing import cast

kafkac = ConsumerProxy()


def run() -> None:
    c = cast(ConsumerProtocol, kafkac)
    with c.watch() as g:
        for record in g:
            print(record.value)


# kafkac.initialize_from_addresses("localhost:9094", "topic1", group_id="test2", kafka_type=KafkaType.Kafka)
kafkac.initialize_from_addresses("localhost:9094", "topic1", group_id="test2", kafka_type=KafkaType.ConfluentKafka)
try:
    print("start watching")
    run()
finally:
    print("stoped")

代理aiokafka生产者

import asyncio
from kafkaproxy import ProducerProxy, KafkaType, AioProducerProtocol
from typing import cast

kafkap = ProducerProxy()


async def run() -> None:
    p = cast(AioProducerProtocol, kafkap)
    async with p.mount() as cli:
        for i in range(10):
            await cli.publish("topic1", f"send {i}")
            await asyncio.sleep(0.1)


async def main() -> None:
    kafkap.initialize_from_addresses("localhost:9094", kafka_type=KafkaType.AioKafka, acks="all")
    await run()


try:
    print("start watching")
    asyncio.run(main())
finally:
    print("stoped")

代理aiokafka消费者

import asyncio
from kafkaproxy import ConsumerProxy, KafkaAutoOffsetReset, KafkaType, AioConsumerProtocol
from typing import cast

kafkac = ConsumerProxy()


async def run() -> None:
    c = cast(AioConsumerProtocol, kafkac)
    async with c.watch() as g:
        async for record in g:
            print(record.value)


async def main() -> None:
    kafkac.initialize_from_addresses("localhost:9094", "topic1", group_id="test2", kafka_type=KafkaType.AioKafka, auto_offset_reset=KafkaAutoOffsetReset.earliest)
    await run()


try:
    print("start watching")
    asyncio.run(main())
finally:
    print("stoped")