Skip to content

Commit

Permalink
Improve use of kafkit.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed Mar 6, 2024
1 parent 068788b commit 47c30bf
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions python/lsst/consdb/hinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import aiokafka
import astropy.time
import httpx
import kafkit
import kafkit.registry
import kakfit.registry.httpx
import kafkit.ssl
import yaml
from astro_metadata_translator import ObservationInfo
from lsst.resources import ResourcePath
Expand Down Expand Up @@ -290,15 +292,23 @@ async def main() -> None:
global bucket_prefix, kafka_bootstrap, kafka_group_id, schema_url, topic

async with httpx.AsyncClient() as client:
schema_registry = kafkit.registry.RegistryApi(client=client, url=schema_url)
schema_registry = kafkit.registry.httpx.RegistryApi(http_client=client, url=schema_url)
deserializer = kafkit.registry.Deserializer(registry=schema_registry)

ssl_context = kafkit.ssl.create_ssl_context(
cluster_ca_path=broker_ca_path,
client_cert_path=client_cert_path,
client_key_path=client_key_path,
)
consumer = aiokafka.AIOKafkaConsumer(
topic,
bootstrap_servers=kafka_bootstrap,
ssl_context=ssl_context,
security_protocol="SSL",
group_id=kafka_group_id,
auto_offset_reset="earliest",
)

await consumer.start()
try:
async for msg in consumer:
Expand Down

0 comments on commit 47c30bf

Please sign in to comment.