Skip to content

Commit

Permalink
Merge pull request #37 from salute-developers/feature/fix_kafka_messa…
Browse files Browse the repository at this point in the history
…ge_key

DPNLPF-1729: add recovery of Kafka message key, refactor check_message_key, reformatting
  • Loading branch information
dangerink authored Aug 19, 2022
2 parents ff30653 + e33e569 commit fd6b2f6
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 117 deletions.
7 changes: 5 additions & 2 deletions core/mq/kafka/kafka_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import time
from typing import Union

from confluent_kafka import Producer

Expand All @@ -21,11 +22,13 @@ def __init__(self, config):
if internal_log_path:
debug_logger = logging.getLogger("debug_publisher")
timestamp = time.strftime("_%d%m%Y_")
debug_logger.addHandler(logging.FileHandler("{}/kafka_publisher_debug{}{}.log".format(internal_log_path, timestamp, os.getpid())))
debug_logger.addHandler(logging.FileHandler(
"{}/kafka_publisher_debug{}{}.log".format(internal_log_path, timestamp, os.getpid())
))
conf["logger"] = debug_logger
self._producer = Producer(**conf)

def send(self, value, key=None, topic_key=None, headers=None):
def send(self, value: Union[str, bytes], key=None, topic_key=None, headers=None):
try:
topic = self._config["topic"]
if topic_key is not None:
Expand Down
2 changes: 1 addition & 1 deletion core/request/kafka_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def send(self, data, publisher: KafkaPublisher, source_mq_message):
}
log("KafkaRequest: got no topic and no topic_key", params=log_params, level="ERROR")

def run(self, data, params=None):
def run(self, data, params):
publishers = params["publishers"]
publisher = publishers[self.kafka_key]
self.send(data=data, publisher=publisher, source_mq_message=params["mq_message"])
Expand Down
Loading

0 comments on commit fd6b2f6

Please sign in to comment.