diff --git a/core/mq/kafka/kafka_publisher.py b/core/mq/kafka/kafka_publisher.py index 51ba44d7..9098a641 100644 --- a/core/mq/kafka/kafka_publisher.py +++ b/core/mq/kafka/kafka_publisher.py @@ -2,6 +2,7 @@ import logging import os import time +from typing import Union from confluent_kafka import Producer @@ -21,11 +22,13 @@ def __init__(self, config): if internal_log_path: debug_logger = logging.getLogger("debug_publisher") timestamp = time.strftime("_%d%m%Y_") - debug_logger.addHandler(logging.FileHandler("{}/kafka_publisher_debug{}{}.log".format(internal_log_path, timestamp, os.getpid()))) + debug_logger.addHandler(logging.FileHandler( + "{}/kafka_publisher_debug{}{}.log".format(internal_log_path, timestamp, os.getpid()) + )) conf["logger"] = debug_logger self._producer = Producer(**conf) - def send(self, value, key=None, topic_key=None, headers=None): + def send(self, value: Union[str, bytes], key=None, topic_key=None, headers=None): try: topic = self._config["topic"] if topic_key is not None: diff --git a/core/request/kafka_request.py b/core/request/kafka_request.py index 05e4b502..4ca83c12 100644 --- a/core/request/kafka_request.py +++ b/core/request/kafka_request.py @@ -46,7 +46,7 @@ def send(self, data, publisher: KafkaPublisher, source_mq_message): } log("KafkaRequest: got no topic and no topic_key", params=log_params, level="ERROR") - def run(self, data, params=None): + def run(self, data, params): publishers = params["publishers"] publisher = publishers[self.kafka_key] self.send(data=data, publisher=publisher, source_mq_message=params["mq_message"]) diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index 28a375dc..b151a087 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -3,6 +3,7 @@ import time from collections import namedtuple from functools import lru_cache +from typing import Union from typing import Optional from confluent_kafka.cimpl import KafkaException, Message as KafkaMessage @@ -14,6 +15,7 @@ from smart_kit.names.message_names import ANSWER_TO_USER, RUN_APP, MESSAGE_TO_SKILL, SERVER_ACTION, CLOSE_APP from core.message.from_message import SmartAppFromMessage +from core.model.base_user import BaseUser from core.model.heapq.heapq_storage import HeapqKV from core.mq.kafka.kafka_consumer import KafkaConsumer from core.mq.kafka.kafka_publisher import KafkaPublisher @@ -52,20 +54,16 @@ def __init__(self, *args, **kwargs): consumers = {} publishers = {} - log( - "%(class_name)s START CONSUMERS/PUBLISHERS CREATE", - params={"class_name": self.__class__.__name__}, level="WARNING" - ) + log("%(class_name)s START CONSUMERS/PUBLISHERS CREATE", + params={"class_name": self.__class__.__name__}, level="WARNING") kafka_config_copy = pickle_deepcopy(kafka_config) for key, config in kafka_config_copy.items(): if config.get("consumer"): consumers.update({key: KafkaConsumer(config)}) if config.get("publisher"): publishers.update({key: KafkaPublisher(config)}) - log( - "%(class_name)s FINISHED CONSUMERS/PUBLISHERS CREATE", - params={"class_name": self.__class__.__name__}, level="WARNING" - ) + log("%(class_name)s FINISHED CONSUMERS/PUBLISHERS CREATE", + params={"class_name": self.__class__.__name__}, level="WARNING") self.app_name = self.settings.app_name self.consumers = consumers @@ -157,6 +155,8 @@ def iterate_behavior_timeouts(self): save_tries = 0 user_save_no_collisions = False user = None + timeout_from_message = None + answers = None while save_tries < self.user_save_collisions_tries and not user_save_no_collisions: save_tries += 1 @@ -176,8 +176,8 @@ def iterate_behavior_timeouts(self): user_save_no_collisions = self.save_user(db_uid, user, mq_message) if user and not user_save_no_collisions: - log( - "MainLoop.iterate_behavior_timeouts: save user got collision on uid %(uid)s db_version %(db_version)s.", + log("MainLoop.iterate_behavior_timeouts: save user got collision on uid %(uid)s db_version " + "%(db_version)s.", user=user, params={log_const.KEY_NAME: "ignite_collision", "db_uid": db_uid, @@ -190,8 +190,8 @@ def iterate_behavior_timeouts(self): continue if not user_save_no_collisions: - log( - "MainLoop.iterate_behavior_timeouts: db_save collision all tries left on uid %(uid)s db_version %(db_version)s.", + log("MainLoop.iterate_behavior_timeouts: db_save collision all tries left on uid %(uid)s " + "db_version %(db_version)s.", user=user, params={log_const.KEY_NAME: "ignite_collision", "db_uid": db_uid, @@ -212,7 +212,7 @@ def iterate_behavior_timeouts(self): log_const.REQUEST_VALUE: str(mq_message.value())}, level="ERROR", exc_info=True) - def _get_topic_key(self, mq_message, kafka_key): + def _get_topic_key(self, mq_message: KafkaMessage, kafka_key): topic_names_2_key = self._topic_names_2_key(kafka_key) return self.default_topic_key(kafka_key) or topic_names_2_key[mq_message.topic()] @@ -232,7 +232,6 @@ def process_message(self, mq_message: KafkaMessage, consumer, kafka_key, stats): masking_fields=self.masking_fields, creation_time=consumer.get_msg_create_time(mq_message)) - # TODO вернуть проверку ключа!!! if message.validate(): waiting_message_time = 0 if message.creation_time: @@ -242,84 +241,105 @@ def process_message(self, mq_message: KafkaMessage, consumer, kafka_key, stats): stats += "Mid: {}\n".format(message.incremental_id) monitoring.sampling_mq_waiting_time(self.app_name, waiting_message_time / 1000) - self.check_message_key(message, mq_message.key(), user) - log( - "INCOMING FROM TOPIC: %(topic)s partition %(message_partition)s HEADERS: %(headers)s DATA: %(incoming_data)s", - params={log_const.KEY_NAME: "incoming_message", - "topic": mq_message.topic(), - "message_partition": mq_message.partition(), - "message_key": (mq_message.key() or b"").decode('utf-8', 'backslashreplace'), - "message_id": message.incremental_id, - "kafka_key": kafka_key, - "incoming_data": str(message.masked_value), - "length": len(message.value), - "headers": str(mq_message.headers()), - "waiting_message": waiting_message_time, - "surface": message.device.surface, - MESSAGE_ID_STR: message.incremental_id}, - user=user - ) - - db_uid = message.db_uid - with StatsTimer() as load_timer: - user = self.load_user(db_uid, message) - monitoring.sampling_load_time(self.app_name, load_timer.secs) - stats += "Loading time: {} msecs\n".format(load_timer.msecs) - - if KAFKA_REPLY_TOPIC in message.headers and \ - message.message_name in [RUN_APP, MESSAGE_TO_SKILL, SERVER_ACTION, CLOSE_APP]: - if user.private_vars.get(KAFKA_REPLY_TOPIC): - log("MainLoop.iterate: kafka_replyTopic collision", + # check_message_key + message_key = self._get_str_message_key(mq_message.key(), incremental_id=message.incremental_id, + uid=message.uid, user=user) + valid_key = self._get_valid_message_key(message) + if message_key != valid_key: + log(f"%(class_name)s.process_message: Failed to check Kafka message key {message_key} " + f"!= {valid_key}", + params={ + log_const.KEY_NAME: "check_kafka_key_validation", + MESSAGE_ID_STR: message.incremental_id, + UID_STR: message.uid, + "class_name": self.__class__.__name__ + }, user=user, + level="WARNING") + self.publishers[kafka_key].send_to_topic(message_value, valid_key, mq_message.topic(), + mq_message.headers()) + log(f"%(class_name)s.process_message: Kafka message with invalid Kafka message key '{message_key}' " + f"resent again with a valid key: '{valid_key}'", + params={log_const.KEY_NAME: "kafka_message_key_recovery", + MESSAGE_ID_STR: message.incremental_id, + UID_STR: message.uid, + "class_name": self.__class__.__name__}, + user=user, + level="WARNING") + + else: + log("INCOMING FROM TOPIC: %(topic)s partition %(message_partition)s HEADERS: %(headers)s DATA: " + "%(incoming_data)s", + params={log_const.KEY_NAME: "incoming_message", + "topic": mq_message.topic(), + "message_partition": mq_message.partition(), + "message_key": (mq_message.key() or b"").decode('utf-8', 'backslashreplace'), + "message_id": message.incremental_id, + "kafka_key": kafka_key, + "incoming_data": str(message.masked_value), + "length": len(message.value), + "headers": str(mq_message.headers()), + "waiting_message": waiting_message_time, + "surface": message.device.surface, + MESSAGE_ID_STR: message.incremental_id}, + user=user) + + db_uid = message.db_uid + with StatsTimer() as load_timer: + user = self.load_user(db_uid, message) + monitoring.sampling_load_time(self.app_name, load_timer.secs) + stats += "Loading time: {} msecs\n".format(load_timer.msecs) + if KAFKA_REPLY_TOPIC in message.headers and \ + message.message_name in [RUN_APP, MESSAGE_TO_SKILL, SERVER_ACTION, CLOSE_APP]: + if user.private_vars.get(KAFKA_REPLY_TOPIC): + log("MainLoop.iterate: kafka_replyTopic collision", + params={log_const.KEY_NAME: "ignite_collision", + "db_uid": db_uid, + "message_key": mq_message.key(), + "message_partition": mq_message.partition(), + "kafka_key": kafka_key, + "uid": user.id, + "saved_topic": user.private_vars.get(KAFKA_REPLY_TOPIC), + "current_topic": message.headers[KAFKA_REPLY_TOPIC]}, + user=user, level="WARNING") + user.private_vars.set(KAFKA_REPLY_TOPIC, message.headers[KAFKA_REPLY_TOPIC]) + with StatsTimer() as script_timer: + commands = self.model.answer(message, user) + + answers = self._generate_answers(user=user, commands=commands, message=message, + topic_key=topic_key, + kafka_key=kafka_key) + monitoring.sampling_script_time(self.app_name, script_timer.secs) + stats += "Script time: {} msecs\n".format(script_timer.msecs) + + with StatsTimer() as save_timer: + user_save_no_collisions = self.save_user(db_uid, user, message) + + monitoring.sampling_save_time(self.app_name, save_timer.secs) + stats += "Saving time: {} msecs\n".format(save_timer.msecs) + if not user_save_no_collisions: + log("MainLoop.iterate: save user got collision on uid %(uid)s db_version %(db_version)s.", + user=user, params={log_const.KEY_NAME: "ignite_collision", "db_uid": db_uid, "message_key": mq_message.key(), "message_partition": mq_message.partition(), "kafka_key": kafka_key, "uid": user.id, - "saved_topic": user.private_vars.get(KAFKA_REPLY_TOPIC), - "current_topic": message.headers[KAFKA_REPLY_TOPIC]}, - user=user, level="WARNING") - user.private_vars.set(KAFKA_REPLY_TOPIC, message.headers[KAFKA_REPLY_TOPIC]) - - with StatsTimer() as script_timer: - commands = self.model.answer(message, user) - - answers = self._generate_answers(user=user, commands=commands, message=message, - topic_key=topic_key, - kafka_key=kafka_key) - monitoring.sampling_script_time(self.app_name, script_timer.secs) - stats += "Script time: {} msecs\n".format(script_timer.msecs) - - with StatsTimer() as save_timer: - user_save_no_collisions = self.save_user(db_uid, user, message) - - monitoring.sampling_save_time(self.app_name, save_timer.secs) - stats += "Saving time: {} msecs\n".format(save_timer.msecs) - if not user_save_no_collisions: - log( - "MainLoop.iterate: save user got collision on uid %(uid)s db_version %(db_version)s.", - user=user, - params={log_const.KEY_NAME: "ignite_collision", - "db_uid": db_uid, - "message_key": mq_message.key(), - "message_partition": mq_message.partition(), - "kafka_key": kafka_key, - "uid": user.id, - "db_version": str(user.private_vars.get(user.USER_DB_VERSION))}, - level="WARNING") - continue + "db_version": str(user.private_vars.get(user.USER_DB_VERSION))}, + level="WARNING") + continue - self.save_behavior_timeouts(user, mq_message, kafka_key) + self.save_behavior_timeouts(user, mq_message, kafka_key) - if mq_message.headers() is None: - mq_message.set_headers([]) + if mq_message.headers() is None: + mq_message.set_headers([]) - if answers: - for answer in answers: - with StatsTimer() as publish_timer: - self._send_request(user, answer, mq_message) - stats += "Publishing time: {} msecs".format(publish_timer.msecs) - log(stats, user=user) + if answers: + for answer in answers: + with StatsTimer() as publish_timer: + self._send_request(user, answer, mq_message) + stats += "Publishing time: {} msecs".format(publish_timer.msecs) + log(stats, user=user) else: try: data = message.masked_value @@ -332,8 +352,7 @@ def process_message(self, mq_message: KafkaMessage, consumer, kafka_key, stats): level="ERROR") monitoring.counter_invalid_message(self.app_name) if user and not user_save_no_collisions: - log( - "MainLoop.iterate: db_save collision all tries left on uid %(uid)s db_version %(db_version)s.", + log("MainLoop.iterate: db_save collision all tries left on uid %(uid)s db_version %(db_version)s.", user=user, params={log_const.KEY_NAME: "ignite_collision", "db_uid": db_uid, @@ -379,36 +398,24 @@ def iterate(self, kafka_key): log("Error handling worker fail exception.", level="ERROR", exc_info=True) - def check_message_key(self, from_message, message_key, user): - sub = from_message.sub - channel = from_message.channel - uid = from_message.uid + def _get_valid_message_key(self, from_message: SmartAppFromMessage): + return "_".join([i for i in [from_message.channel, from_message.sub, from_message.uid] if i]) + + def _get_str_message_key(self, message_key: Union[str, bytes], incremental_id: int, uid: str, user: BaseUser): message_key = message_key or b"" try: - params = [channel, sub, uid] - valid_key = "" - for value in params: - if value: - valid_key = "{}{}{}".format(valid_key, "_", value) if valid_key else "{}".format(value) - key_str = message_key.decode() - - message_key_is_valid = key_str == valid_key - if not message_key_is_valid: - log(f"Failed to check Kafka message key {message_key} != {valid_key}", - params={ - log_const.KEY_NAME: "check_kafka_key_validation", - MESSAGE_ID_STR: from_message.incremental_id, - UID_STR: uid - }, user=user, - level="WARNING") - except: - log(f"Exception to check Kafka message key {message_key}", + if isinstance(message_key, bytes): + message_key = message_key.decode() + except UnicodeDecodeError: + log(f"MainLoop._get_str_message_key: Decode error of Kafka message key {message_key}", params={log_const.KEY_NAME: "check_kafka_key_error", - MESSAGE_ID_STR: from_message.incremental_id, - UID_STR: uid - }, user=user, level="ERROR") + MESSAGE_ID_STR: incremental_id, + UID_STR: uid}, + user=user, level="ERROR") + + return message_key - def _send_request(self, user, answer, mq_message): + def _send_request(self, user: BaseUser, answer: SmartAppToMessage, mq_message: KafkaMessage): kafka_broker_settings = self.settings["template_settings"].get( "route_kafka_broker" ) or [] @@ -452,12 +459,11 @@ def default_topic_key(self, kafka_key): def masking_fields(self): return self.settings["template_settings"].get("masking_fields") - def save_behavior_timeouts(self, user, mq_message, kafka_key): + def save_behavior_timeouts(self, user, mq_message: KafkaMessage, kafka_key): for i, (expire_time_us, callback_id) in enumerate(user.behaviors.get_behavior_timeouts()): # two behaviors can be created in one query, so we need add some salt to make theirs key unique unique_key = expire_time_us + i * 1e-5 - log( - "%(class_name)s: adding local_timeout on callback %(callback_id)s with timeout on %(unique_key)s", + log("%(class_name)s: adding local_timeout on callback %(callback_id)s with timeout on %(unique_key)s", params={log_const.KEY_NAME: "adding_local_timeout", "class_name": self.__class__.__name__, "callback_id": callback_id,