Skip to content

Commit

Permalink
Merge pull request #28 from salute-developers/feature/kafka_replyTopic
Browse files Browse the repository at this point in the history
DPNLPF-1692 Reply to topic "kafka_replyTopic" if it exists in message headers
  • Loading branch information
SyrexMinus authored Aug 11, 2022
2 parents d488d6b + 0ed18b5 commit f9e91c6
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 22 deletions.
1 change: 1 addition & 0 deletions core/configs/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
CALLBACK_ID_HEADER = "app_callback_id"
LINK_BEHAVIOR_FLAG = "link_previous_behavior"
KAFKA = "kafka"
KAFKA_REPLY_TOPIC = "kafka_replyTopic"

ORIGINAL_INTENT = "original_intent"
NEW_SESSION = "new_session"
Expand Down
3 changes: 3 additions & 0 deletions core/message/from_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def __init__(self, data):
def __getitem__(self, item):
return self.raw[item].decode()

def __contains__(self, item):
return item in self.raw

def get(self, key, default=None, encoding="utf-8"):
res = self.raw.get(key)
if res is None:
Expand Down
5 changes: 4 additions & 1 deletion core/model/base_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class BaseUser(Model):

counters: Counters
variables: Variables
private_vars: Variables
local_vars: Variables

def __init__(self, id, message, values, descriptions, load_error=False):
Expand All @@ -46,6 +47,7 @@ def fields(self):
Field("last_messages_ids", LimitedQueuedHashableObjects, LimitedQueuedHashableObjectsDescription(None)),
Field("local_vars", Variables, None, False),
Field("variables", Variables),
Field("private_vars", Variables),
]

@property
Expand All @@ -54,12 +56,13 @@ def raw_str(self):
raw = json.dumps(self.raw, default=lambda o: f"<non-serializable: {type(o).__qualname__}>")
log("%(class_name)s.raw USER %(uid)s SAVE db_version = %(db_version)s. "
"Saving User %(uid)s. Serialized utf8 json length is %(user_length)s symbols.", self,
{"db_version": str(self.variables.get(self.USER_DB_VERSION)),
{"db_version": str(self.private_vars.get(self.USER_DB_VERSION)),
"uid": str(self.id), "user_length": len(raw),
KEY_NAME: "user_save"})
return raw

def expire(self):
self.counters.expire()
self.variables.expire()
self.private_vars.expire()
self.local_vars.expire()
41 changes: 34 additions & 7 deletions core/request/kafka_request.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,60 @@
from typing import Dict

from confluent_kafka.cimpl import Message as KafkaMessage
from core.mq.kafka.kafka_publisher import KafkaPublisher
from core.request.base_request import BaseRequest
from core.logging.logger_utils import log
import core.logging.logger_constants as log_const


class KafkaRequest(BaseRequest):
TOPIC_KEY = "topic_key"
KAFKA_KEY = "kafka_key"
TOPIC = "topic"

def __init__(self, items, id=None):
super(KafkaRequest, self).__init__(items)
items = items or {}
self.topic_key = items.get(self.TOPIC_KEY)
self.kafka_key = items.get(self.KAFKA_KEY)
# topic_key has priority over topic
self.topic = items.get(self.TOPIC)

def update_empty_items(self, items):
self.topic_key = self.topic_key or items["topic_key"]
self.kafka_key = self.kafka_key or items["kafka_key"]
def update_empty_items(self, items: Dict[str, str]):
self.topic_key = self.topic_key or items.get(self.TOPIC_KEY)
self.kafka_key = self.kafka_key or items.get(self.KAFKA_KEY)
self.topic = self.topic or items.get(self.TOPIC)

@property
def group_key(self):
return "{}_{}".format(self.kafka_key, self.topic_key) if (self.topic_key and self.kafka_key) else None

def _get_new_headers(self, source_mq_message):
def _get_new_headers(self, source_mq_message: KafkaMessage):
headers = source_mq_message.headers() or []
return headers

def send(self, data, publisher: KafkaPublisher, source_mq_message):
headers = self._get_new_headers(source_mq_message)
if self.topic_key is not None:
publisher.send(data, source_mq_message.key(), self.topic_key, headers=headers)
elif self.topic is not None:
publisher.send_to_topic(data, source_mq_message.key(), self.topic, headers=headers)
else:
log_params = {
"data": str(data),
log_const.KEY_NAME: log_const.EXCEPTION_VALUE
}
log("KafkaRequest: got no topic and no topic_key", params=log_params, level="ERROR")

def run(self, data, params=None):
publishers = params["publishers"]
publisher = publishers[self.kafka_key]
source_mq_message = params["mq_message"]
publisher.send(data, source_mq_message.key(), self.topic_key, headers=self._get_new_headers(source_mq_message))
self.send(data=data, publisher=publisher, source_mq_message=params["mq_message"])

def __str__(self):
return f"KafkaRequest: topic_key={self.topic_key} kafka_key={self.kafka_key}"
if self.topic_key is not None:
return f"KafkaRequest: topic_key={self.topic_key} kafka_key={self.kafka_key}"
elif self.topic is not None:
return f"KafkaRequest: topic={self.topic} kafka_key={self.kafka_key}"
else:
return f"KafkaRequest: kafka_key={self.kafka_key}"
4 changes: 2 additions & 2 deletions scenarios/user/user_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def __init__(self, id, message, db_data, settings, descriptions, parametrizer_cl
self.initial_db_data = db_data

def _initialize(self):
db_version = self.variables.get(self.USER_DB_VERSION, default=0)
self.variables.set(self.USER_DB_VERSION, db_version + 1)
db_version = self.private_vars.get(self.USER_DB_VERSION, default=0)
self.private_vars.set(self.USER_DB_VERSION, db_version + 1)
log("%(class_name)s.__init__ USER %(uid)s LOAD db_version = %(db_version)s.", self,
params={"db_version": str(db_version),
"uid": str(self.id), log_const.KEY_NAME: "user_load"})
Expand Down
6 changes: 4 additions & 2 deletions smart_kit/request/kafka_request.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from core.configs.global_constants import CALLBACK_ID_HEADER
from typing import Dict

from core.configs.global_constants import CALLBACK_ID_HEADER, KAFKA_REPLY_TOPIC
from core.request.kafka_request import KafkaRequest


class SmartKitKafkaRequest(KafkaRequest):
KAFKA_REPLY_TOPIC = "kafka_replyTopic"
KAFKA_REPLY_TOPIC = KAFKA_REPLY_TOPIC
KAFKA_EXTRA_HEADERS = "kafka_extraHeaders"

def __init__(self, items, id=None):
Expand Down
41 changes: 32 additions & 9 deletions smart_kit/start_points/main_loop_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import time
from collections import namedtuple
from functools import lru_cache
from typing import Optional

from confluent_kafka.cimpl import KafkaException
from confluent_kafka.cimpl import KafkaException, Message as KafkaMessage
from lazy import lazy

import scenarios.logging.logger_constants as log_const
from core.basic_models.actions.push_action import PUSH_NOTIFY
from core.configs.global_constants import KAFKA_REPLY_TOPIC
from core.logging.logger_utils import log, UID_STR, MESSAGE_ID_STR
from smart_kit.names.message_names import ANSWER_TO_USER, RUN_APP, MESSAGE_TO_SKILL, SERVER_ACTION, CLOSE_APP

from core.message.from_message import SmartAppFromMessage
from core.model.heapq.heapq_storage import HeapqKV
Expand Down Expand Up @@ -119,7 +121,12 @@ def _generate_answers(self, user, commands, message, **kwargs):

for command in commands:
request = SmartKitKafkaRequest(id=None, items=command.request_data)
request.update_empty_items({"topic_key": topic_key, "kafka_key": kafka_key})
request.update_empty_items({
"kafka_key": kafka_key,
"topic_key": topic_key,
"topic": user.private_vars.get(KAFKA_REPLY_TOPIC) if command.name == ANSWER_TO_USER else None
})

to_message = get_to_message(command.name)
answer = to_message(command=command, message=message, request=request,
masking_fields=self.masking_fields,
Expand Down Expand Up @@ -177,7 +184,7 @@ def iterate_behavior_timeouts(self):
"message_key": mq_message.key(),
"kafka_key": kafka_key,
"uid": user.id,
"db_version": str(user.variables.get(user.USER_DB_VERSION))},
"db_version": str(user.private_vars.get(user.USER_DB_VERSION))},
level="WARNING")

continue
Expand All @@ -192,7 +199,7 @@ def iterate_behavior_timeouts(self):
"message_partition": mq_message.partition(),
"kafka_key": kafka_key,
"uid": user.id,
"db_version": str(user.variables.get(user.USER_DB_VERSION))},
"db_version": str(user.private_vars.get(user.USER_DB_VERSION))},
level="WARNING")

monitoring.counter_save_collision_tries_left(self.app_name)
Expand All @@ -209,7 +216,7 @@ def _get_topic_key(self, mq_message, kafka_key):
topic_names_2_key = self._topic_names_2_key(kafka_key)
return self.default_topic_key(kafka_key) or topic_names_2_key[mq_message.topic()]

def process_message(self, mq_message, consumer, kafka_key, stats):
def process_message(self, mq_message: KafkaMessage, consumer, kafka_key, stats):
topic_key = self._get_topic_key(mq_message, kafka_key)

save_tries = 0
Expand Down Expand Up @@ -258,6 +265,22 @@ def process_message(self, mq_message, consumer, kafka_key, stats):
user = self.load_user(db_uid, message)
monitoring.sampling_load_time(self.app_name, load_timer.secs)
stats += "Loading time: {} msecs\n".format(load_timer.msecs)

if KAFKA_REPLY_TOPIC in message.headers and \
message.message_name in [RUN_APP, MESSAGE_TO_SKILL, SERVER_ACTION, CLOSE_APP]:
if user.private_vars.get(KAFKA_REPLY_TOPIC):
log("MainLoop.iterate: kafka_replyTopic collision",
params={log_const.KEY_NAME: "ignite_collision",
"db_uid": db_uid,
"message_key": mq_message.key(),
"message_partition": mq_message.partition(),
"kafka_key": kafka_key,
"uid": user.id,
"saved_topic": user.private_vars.get(KAFKA_REPLY_TOPIC),
"current_topic": message.headers[KAFKA_REPLY_TOPIC]},
user=user, level="WARNING")
user.private_vars.set(KAFKA_REPLY_TOPIC, message.headers[KAFKA_REPLY_TOPIC])

with StatsTimer() as script_timer:
commands = self.model.answer(message, user)

Expand All @@ -282,7 +305,7 @@ def process_message(self, mq_message, consumer, kafka_key, stats):
"message_partition": mq_message.partition(),
"kafka_key": kafka_key,
"uid": user.id,
"db_version": str(user.variables.get(user.USER_DB_VERSION))},
"db_version": str(user.private_vars.get(user.USER_DB_VERSION))},
level="WARNING")
continue

Expand Down Expand Up @@ -318,15 +341,15 @@ def process_message(self, mq_message, consumer, kafka_key, stats):
"message_partition": mq_message.partition(),
"kafka_key": kafka_key,
"uid": user.id,
"db_version": str(user.variables.get(user.USER_DB_VERSION))},
"db_version": str(user.private_vars.get(user.USER_DB_VERSION))},
level="WARNING")
self.postprocessor.postprocess(user, message)
monitoring.counter_save_collision_tries_left(self.app_name)
consumer.commit_offset(mq_message)

def iterate(self, kafka_key):
consumer = self.consumers[kafka_key]
mq_message = None
mq_message: Optional[KafkaMessage] = None
message_value = None
try:
mq_message = None
Expand Down
6 changes: 6 additions & 0 deletions tests/smart_kit_tests/request/test_kafka_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def setUp(self):
"app_callback_id": 22, "kafka_key": "54321"} # пусть
self.test_items2 = {"topic_key": "12345", "timeout": 10} # пусть
self.test_items3 = {"topic_key": "12345", "kafka_replyTopic": "any theme", "timeout": 10} # пусть
self.test_items4 = {"topic_key": "12345", "kafka_replyTopic": "any theme", "timeout": 10, "topic": "topic123"} # пусть
self.test_source_mq_message = Mock('source_mq_message')
self.test_source_mq_message.headers = lambda: [("any header 1", 1)]

Expand All @@ -24,6 +25,11 @@ def test_smart_kafka_request_init(self):
self.assertTrue(obj1._callback_id == 22)
self.assertTrue(obj1._kafka_replyTopic == "any theme")

def test_smart_kafka_request_topic(self):
obj1 = kafka_request.SmartKitKafkaRequest(self.test_items4)
self.assertTrue(obj1.topic_key == "12345")
self.assertTrue(obj1.topic == "topic123")

def test_smart_kafka_request_callback_id_header_name(self):
obj1 = kafka_request.SmartKitKafkaRequest(self.test_items1)
self.assertTrue(obj1._callback_id_header_name == "app_callback_id")
Expand Down
2 changes: 1 addition & 1 deletion tests/smart_kit_tests/user/test_user_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_smart_app_user_init(self):
def test_smart_app_user_fields(self):
obj1 = user_model.User(self.test_id, self.test_message, None, self.test_values, self.test_descriptions,
self.test_parametrizer_cls)
self.assertTrue(len(obj1.fields) == 13)
self.assertTrue(len(obj1.fields) == 14)
self.assertTrue(isinstance(obj1.fields[0], Field))

def test_smart_app_user_parametrizer(self):
Expand Down

0 comments on commit f9e91c6

Please sign in to comment.