Шина данных на базе Apache Kafka, состоящая из отдельных компонентов-микросервисов
- Данные поступают во входной топик
Kafka
, обработанные данные считываются из выходного топикаKafka
- Данные обрабатываются 3-мя сервисами: фильтрация (
Filtering
), дедубликация (Deduplication
), обогащение (Enrichment
) - Правила для сервисов поступают из соответствующих таблиц базы данных
PostgreSQL
, правила вычитываются раз в заданный интервал (конфигурация:updateIntervalSec
) - Правила конфигурируются с помощью сервиса-менеджера (
Management
) черезSwagger UI
(либо же через эндпоинты напрямую) - Сервис дедубликации использует
Redis
для хранения состояния дублей - Сервис обогащения использует
MongoDB
для хранения обогащающих данных - Все сервисы конфигурируются через переменные среды (возможна конфигурация через
application.conf
напрямую)
Description:
Фильтрует данные по заданным правилам
Rules:
filter_id
: id фильтраrule_id
: id правилаfield_name
: json-поле сообщения, по которому выполняется фильтрацияfilter_function_name
: название функции фильтрации (equals, contains, not_equals, not_contains)filter_value
: значение для сравнения
Configuration:
DB_URL
: JDBC-URL базы данных с правилами фильтрацииDB_USER
: имя пользователя базы данныхDB_PASSWORD
: пароль базы данныхKAFKA_CONSUMER_BOOTSTRAP_SERVERS
: URL для подключения к KafkaKAFKA_CONSUMER_CLIENT_ID
: id клиента-консьюмера KafkaKAFKA_CONSUMER_GROUP_ID
: id группы консьюмеров KafkaKAFKA_CONSUMER_AUTO_OFFSET_RESET
: сдвиг для чтения из топика KafkaKAFKA_CONSUMER_TOPIC
: входной топик Kafka для фильтрацииKAFKA_PRODUCER_BOOTSTRAP_SERVERS
: URL для подключения к KafkaKAFKA_PRODUCER_CLIENT_ID
: id клиента-продюсера KafkaKAFKA_PRODUCER_ACKS
: фактор подтверждения отправки от брокеров KafkaKAFKA_PRODUCER_TOPIC
: выходной топик Kafka после фильтрацииUPDATE_INTERVAL_SEC
: интервал чтения правил фильтрации из базы данных
Description:
Очищает данные от дубликатов, используя для хранения ключей дедубликации Redis
(несколько правил фильтрации объединяются в один ключ дедубликации)
Rules:
deduplication_id
: id сервиса дедубликацииrule_id
: id правилаfield_name
: json-поле сообщения, по которому выполняется дедубликацияtime_to_live_sec
: время жизни ключа в Redisis_active
: вкл/выкл правила
Configuration:
DB_URL
: JDBC-URL базы данных с правилами дедубликацииDB_USER
: имя пользователя базы данныхDB_PASSWORD
: пароль базы данныхKAFKA_CONSUMER_BOOTSTRAP_SERVERS
: URL для подключения к KafkaKAFKA_CONSUMER_CLIENT_ID
: id клиента-консьюмера KafkaKAFKA_CONSUMER_GROUP_ID
: id группы консьюмеров KafkaKAFKA_CONSUMER_AUTO_OFFSET_RESET
: сдвиг для чтения из топика KafkaKAFKA_CONSUMER_TOPIC
: входной топик Kafka для дедубликацииKAFKA_PRODUCER_BOOTSTRAP_SERVERS
: URL для подключения к KafkaKAFKA_PRODUCER_CLIENT_ID
: id клиента-продюсера KafkaKAFKA_PRODUCER_ACKS
: фактор подтверждения отправки от брокеров KafkaKAFKA_PRODUCER_TOPIC
: выходной топик Kafka после дедубликацииREDIS_HOST
: хост для подключения к RedisREDIS_PORT
: порт для подключения к RedisUPDATE_INTERVAL_SEC
: интервал чтения правил дедубликации из базы данных
Description:
Обогащает данные дополнительной информацией, использует для обогащения документы из MongoDB
. Несколько правил обогащения объединяются и применяются для одного сообщения. Если два правила обогащают одно и то же поле разными документами, то актуальным правилом является то правило, чей rule_id
больше. Если одному правилу соответствует несколько документов, то актуальным является тот документ, чей _id
больше (максимальный из всех). Если по актуальному правилу документа в MongoDB
нет, то поле обогащается значением по умолчанию из правила. Если правил нет, сообщение не обогащается и отправляется в том виде, в котором есть в выходной топик.
Rules:
enricher_id
: id обогатителяrule_id
: id правилаfield_name
: json-поле сообщения, которое нужно обогатитьfield_name_enrichment
: название поля в коллекции MongoDB для обогащенияfield_value
: поле сообщения, из которого берется значение поля field_name_enrichment, по которому нужно найти документ в коллекции MongoDBfield_default_value
: значение по умолчанию, если значение для обогащения не найдено в MongoDB
Configuration:
DB_URL
: JDBC-URL базы данных с правилами обогащенияDB_USER
: имя пользователя базы данныхDB_PASSWORD
: пароль базы данныхKAFKA_CONSUMER_BOOTSTRAP_SERVERS
: URL для подключения к KafkaKAFKA_CONSUMER_CLIENT_ID
: id клиента-консьюмера KafkaKAFKA_CONSUMER_GROUP_ID
: id группы консьюмеров KafkaKAFKA_CONSUMER_AUTO_OFFSET_RESET
: сдвиг для чтения из топика KafkaKAFKA_CONSUMER_TOPIC
: входной топик Kafka для обогащенияKAFKA_PRODUCER_BOOTSTRAP_SERVERS
: URL для подключения к KafkaKAFKA_PRODUCER_CLIENT_ID
: id клиента-продюсера KafkaKAFKA_PRODUCER_ACKS
: фактор подтверждения отправки от брокеров KafkaKAFKA_PRODUCER_TOPIC
: выходной топик Kafka после обогащенияMONGO_CONNECTION_STRING
: строка подключения к MongoDBMONGO_DATABASE
: название базы данных в MongoDBMONGO_COLLECTION
: название коллекции с данными обогащенияENRICHMENT_ID
: id сервиса-обогатителяUPDATE_INTERVAL_SEC
: интервал чтения правил обогащения из базы данных
Description:
Позволяет конфигурировать правила фильтрации, дедубликации, обогащения. Валидирует вводимые правила. Имеет несколько метрик.
Endpoints:
/filter
:/findAll
: получить информацию о всех фильтрах в БД/findAll/{id}
: получить информацию о всех фильтрах в БД по filter_id/find/{filterId}/{ruleId}
: получить информацию о фильтре по filter_id и rule_id/delete
: удалить информацию о всех фильтрах/delete/{filterId}/{ruleId}
: удалить информацию по конкретному фильтру filter_id и rule_id/save
: создать фильтр
/deduplication
:/findAll
: получить информацию о всех правилах дедубликации в БД/findAll/{id}
: получить информацию о всех правилах дедубликации в БД по deduplication_id/find/{filterId}/{ruleId}
: получить информацию о правиле дедубликации по deduplication_id и rule_id/delete
: удалить информацию о всех правилах дедубликации/delete/{filterId}/{ruleId}
: удалить информацию по конкретному правилу дедубликации с deduplication_id и rule_id/save
: создать правило дедубликации
/enrichment
:/findAll
: получить информацию о всех правилах обогащения в БД/findAll/{id}
: получить информацию о всех правилах обогащения в БД по enrichment_id/find/{filterId}/{ruleId}
: получить информацию о правиле обогащения по enrichment_id и rule_id/delete
: удалить информацию о всех правилах обогащения/delete/{filterId}/{ruleId}
: удалить информацию по конкретному правилу обогащения с enrichment_id и rule_id/save
: создать правило обогащения
Metrics:
countFilters
: количество правил фильтрацииcountDeduplications
: количество правил дедубликацииcountEnrichments
: количество правил обогащения (URL по умолчанию: /actuator/metrics)
Configuration:
DB_URL
: JDBC-URL базы данных с правиламиDB_USER
: имя пользователя базы данныхDB_PASSWORD
: пароль базы данных