diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index 4c6e5d9..6588aa6 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -2,9 +2,7 @@ name: Docker Image CI on: push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] + branches: [ "main", "dev" ] jobs: diff --git a/.gitignore b/.gitignore index 17b4d7e..28b581e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /logs/ /.venv/ -/__pycache__/ \ No newline at end of file +**/__pycache__/ +config.yaml \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..8a42757 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,650 @@ +[MAIN] + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint +# in a server-like mode. +clear-cache-post-run=no + +# Load and enable all available extensions. Use --list-extensions to see a list +# all available extensions. +#enable-all-extensions= + +# In error mode, messages with a category besides ERROR or FATAL are +# suppressed, and no reports are done by default. Error mode is compatible with +# disabling specific errors. +#errors-only= + +# Always return a 0 (non-error) status code, even if lint errors are found. +# This is primarily useful in continuous integration scripts. +#exit-zero= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +extension-pkg-allow-list= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +extension-pkg-whitelist= + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +fail-on= + +# Specify a score threshold under which the program will exit with error. +fail-under=10 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +#from-stdin= + +# Files or directories to be skipped. They should be base names, not paths. +ignore=CVS + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because '\\' represents the directory delimiter on Windows systems, +# it can't be used as an escape character. +ignore-paths= + +# Files or directories matching the regular expression patterns are skipped. +# The regex matches against base names, not paths. The default value ignores +# Emacs file locks +ignore-patterns=^\.# + +# List of module names for which member attributes should not be checked and +# will not be imported (useful for modules/projects where namespaces are +# manipulated during runtime and thus existing member attributes cannot be +# deduced by static analysis). It supports qualified module names, as well as +# Unix pattern matching. +ignored-modules= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +jobs=1 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +load-plugins= + +# Pickle collected data for later comparisons. +persistent=yes + +# Resolve imports to .pyi stubs if available. May reduce no-member messages and +# increase not-an-iterable messages. +prefer-stubs=no + +# Minimum Python version to use for version dependent checks. Will default to +# the version used to run pylint. +py-version=3.12 + +# Discover python modules and packages in the file system subtree. +recursive=no + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +source-roots= + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# In verbose mode, extra non-checker-related info will be displayed. +#verbose= + + +[BASIC] + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names. Overrides argument- +# naming-style. If left empty, argument names will be checked with the set +# naming style. +#argument-rgx= + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +#attr-rgx= + +# Bad variable names which should always be refused, separated by a comma. +bad-names=foo, + bar, + baz, + toto, + tutu, + tata + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +bad-names-rgxs= + +# Naming style matching correct class attribute names. +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +#class-attribute-rgx= + +# Naming style matching correct class constant names. +class-const-naming-style=UPPER_CASE + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +#class-const-rgx= + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names. Overrides class-naming- +# style. If left empty, class names will be checked with the set naming style. +#class-rgx= + +# Naming style matching correct constant names. +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming +# style. +#const-rgx= + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names. Overrides function- +# naming-style. If left empty, function names will be checked with the set +# naming style. +#function-rgx= + +# Good variable names which should always be accepted, separated by a comma. +good-names=i, + j, + k, + ex, + Run, + _ + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +good-names-rgxs= + +# Include a hint for the correct naming format with invalid-name. +include-naming-hint=no + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +#inlinevar-rgx= + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +#method-rgx= + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +#module-rgx= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. +# These decorators are taken in consideration only for invalid-name. +property-classes=abc.abstractproperty + +# Regular expression matching correct type alias names. If left empty, type +# alias names will be checked with the set naming style. +#typealias-rgx= + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +#typevar-rgx= + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names. Overrides variable- +# naming-style. If left empty, variable names will be checked with the set +# naming style. +#variable-rgx= + + +[CLASSES] + +# Warn about protected attribute access inside special methods +check-protected-access-in-special-methods=no + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__, + __new__, + setUp, + asyncSetUp, + __post_init__ + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make,os._exit + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + + +[DESIGN] + +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +exclude-too-few-public-methods= + +# List of qualified class names to ignore when counting class parents (see +# R0901) +ignored-parents= + +# Maximum number of arguments for function / method. +max-args=5 + +# Maximum number of attributes for a class (see R0902). +max-attributes=7 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr=5 + +# Maximum number of branch for function / method body. +max-branches=12 + +# Maximum number of locals for function / method body. +max-locals=15 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of positional arguments for function / method. +max-positional-arguments=5 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of return / yield for function / method body. +max-returns=6 + +# Maximum number of statements in function / method body. +max-statements=50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when caught. +overgeneral-exceptions=builtins.BaseException,builtins.Exception + + +[FORMAT] + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Maximum number of characters on a single line. +max-line-length=100 + +# Maximum number of lines in a module. +max-module-lines=1000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + + +[IMPORTS] + +# List of modules that can be imported at any level, not just the top level +# one. +allow-any-import-level= + +# Allow explicit reexports by alias from a package __init__. +allow-reexport-from-package=no + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Deprecated modules which should not be used, separated by a comma. +deprecated-modules= + +# Output a graph (.gv or any supported image format) of external dependencies +# to the given file (report RP0402 must not be disabled). +ext-import-graph= + +# Output a graph (.gv or any supported image format) of all (i.e. internal and +# external) dependencies to the given file (report RP0402 must not be +# disabled). +import-graph= + +# Output a graph (.gv or any supported image format) of internal dependencies +# to the given file (report RP0402 must not be disabled). +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Couples of modules and preferred modules, separated by a comma. +preferred-modules= + + +[LOGGING] + +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style=old + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules=logging + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, +# UNDEFINED. +confidence=HIGH, + CONTROL_FLOW, + INFERENCE, + INFERENCE_FAILURE, + UNDEFINED + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once). You can also use "--disable=all" to +# disable everything first and then re-enable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +disable=raw-checker-failed, + bad-inline-option, + locally-disabled, + file-ignored, + suppressed-message, + useless-suppression, + deprecated-pragma, + use-symbolic-message-instead, + use-implicit-booleaness-not-comparison-to-string, + use-implicit-booleaness-not-comparison-to-zero + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable= + + +[METHOD_ARGS] + +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. 'requests.api.get,requests.api.post' +timeout-methods=requests.api.delete,requests.api.get,requests.api.head,requests.api.options,requests.api.patch,requests.api.post,requests.api.put,requests.api.request + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME, + XXX, + TODO + +# Regular expression of note tags to take in consideration. +notes-rgx= + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit,argparse.parse_error + +# Let 'consider-using-join' be raised when the separator to join on would be +# non-empty (resulting in expected fixes of the type: ``"- " + " - +# ".join(items)``) +suggest-join-with-non-empty-separator=yes + + +[REPORTS] + +# Python expression which should return a score less than or equal to 10. You +# have access to the variables 'fatal', 'error', 'warning', 'refactor', +# 'convention', and 'info' which contain the number of messages in each +# category, as well as 'statement' which is the total number of statements +# analyzed. This score is used by the global evaluation report (RP0004). +evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +msg-template= + +# Set the output format. Available formats are: text, parseable, colorized, +# json2 (improved json format), json (old json format) and msvs (visual +# studio). You can also give a reporter class, e.g. +# mypackage.mymodule.MyReporterClass. +#output-format= + +# Tells whether to display a full report or only the messages. +reports=no + +# Activate the evaluation score. +score=yes + + +[SIMILARITIES] + +# Comments are removed from the similarity computation +ignore-comments=yes + +# Docstrings are removed from the similarity computation +ignore-docstrings=yes + +# Imports are removed from the similarity computation +ignore-imports=yes + +# Signatures are removed from the similarity computation +ignore-signatures=yes + +# Minimum lines number of a similarity. +min-similarity-lines=4 + + +[SPELLING] + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=4 + +# Spelling dictionary name. No available dictionaries : You need to install +# both the python package and the system dependency for enchant to work. +spelling-dict= + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy: + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains the private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +spelling-store-unknown-words=no + + +[STRING] + +# This flag controls whether inconsistent-quotes generates a warning when the +# character used as a quote delimiter is used inconsistently within a module. +check-quote-consistency=no + +# This flag controls whether the implicit-str-concat should generate a warning +# on implicit string concatenation in sequences defined over several lines. +check-str-concat-over-line-jumps=no + + +[TYPECHECK] + +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators=contextlib.contextmanager + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins=no-member, + not-async-context-manager, + not-context-manager, + attribute-defined-outside-init + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx=.*[Mm]ixin + +# List of decorators that change the signature of a decorated function. +signature-mutators= + + +[VARIABLES] + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of names allowed to shadow builtins +allowed-redefined-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_, + _cb + +# A regular expression matching the name of dummy variables (i.e. expected to +# not be used). +dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ + +# Argument names that match this expression will be ignored. +ignored-argument-names=_.*|^ignored_|^unused_ + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io + +[MESSAGES CONTROL] +disable=trailing-whitespace diff --git a/.vscode/launch.json b/.vscode/launch.json index 7f2ec9b..bc7ab6e 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -2,17 +2,16 @@ "version": "0.2.0", "configurations": [ { - "name": "Python Debugger: Current Program", + "name": "Python Debugger: Frigate Event Processor", "type": "debugpy", "request": "launch", - "program": "mqtt_processor.py", - "console": "integratedTerminal" - }, - { - "name": "Python Debugger: Python File", - "type": "debugpy", - "request": "launch", - "program": "${file}" + "module": "src.frigate_event_processor.main", + "cwd": "${workspaceFolder}", + "console": "integratedTerminal", + "justMyCode": true, + "env": { + "PYTHONPATH": "${workspaceFolder}/src" + }, } ] } \ No newline at end of file diff --git a/AppConfiguration.py b/AppConfiguration.py deleted file mode 100644 index 6fdf5da..0000000 --- a/AppConfiguration.py +++ /dev/null @@ -1,277 +0,0 @@ -import yaml -import logging -import time -from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler -from pathlib import Path -import re -from typing import List - -# Define the classes to map the structure -logger = logging.getLogger(__name__) - -class MqttConfig: - def __init__(self): - self.host = "localhost" - self.port = 1883 - self.username = None - self.password = None - self.listen_topic = "#" - self.alert_topic = "alerts/camera_system" - - def __repr__(self): - return f"Mqtt(host={self.host}, username={self.username}, password={self.password}, listen_topic={self.listen_topic}, alert_topic={self.alert_topic})" - -class FrigateConfig: - def __init__(self): - self.host = "localhost" - self.port = 5000 - self.use_ssl = False - - @property - def api_base_url(self): - protocol = "https" if self.use_ssl else "http" - return f"{protocol}://{self.host}:{self.port}/api" - - def __repr__(self): - return f"Frigate(url={self.api_base_url})" - -class AlertConfig: - def __init__(self, camera): - self.camera = camera - self.labels = [] - self.enabled = True - self.zones = ZonesConfig() - - def __repr__(self): - return f"Alert(camera={self.camera}, objects={self.labels}, enabled={self.enabled}, zones={self.zones})" - -class ZoneAndLabelsConfig: - def __init__(self): - self.zone = "" - self.labels = [] - - def __repr__(self): - return f"ZoneAndLabel(zone={self.zone}, labels={self.labels})" - - -class ZonesConfig: - def __init__(self): - self.ignore_zones = [] - self.require_zones = [] - - def __repr__(self): - return f"Zones(ignored={self.ignore_zones}, required={self.require_zones})" - - @staticmethod - def check_zone_match(zone_configs: list[ZoneAndLabelsConfig], active_zones: list[str], label: str, default: bool) -> bool: - if zone_configs is None or len(zone_configs) == 0: - return default - - for config in zone_configs: - # Check if the zone is in active_zones and the label is in the labels of the object - if config.zone in active_zones: - # if the label doesn't exist, the zone is enough - otherwise, if the rule has a * or matches the label - if label is None or "*" in config.labels or label in config.labels: - return True - return False - - @staticmethod - def parse_zones(data): - if data is None: - return [] - - config = [] - for item in data: - zone = ZoneAndLabelsConfig() - zone.zone = item.get('zone') - zone.labels = item.get('labels') - config.append(zone) - - return config - - - -class CooldownConfig: - def __init__(self): - self.camera_duration_seconds = 0 - self.label_duration_seconds = 0 - - def __repr__(self): - return f"Cooldown(camera={self.camera_duration_seconds}, object={self.label_duration_seconds})" - -class AlertRulesConfig: - def __init__(self): - self.minimum_duration_seconds = 0 - self.maximum_duration_seconds = 0 - self.require_snapshot = False - self.require_video = False - self.cooldown = CooldownConfig() - - def __repr__(self): - return f"AlertRules(min_dur={self.minimum_duration_seconds}s, snapshots={self.require_snapshot}, video={self.require_video}, cooldown={self.cooldown})" - -class ObjectTrackingConfig: - def __init__(self): - self.enabled = True - - def __repr__(self): - return f"ObjectTracking(enabled={self.enabled})" - -class LoggingConfig: - def __init__(self): - self.level = logging.INFO - self.path = None - self.rotate = False - self.max_keep = 10 - -class AppConfig: - def __init__(self): - self.mqtt = MqttConfig() - self.frigate = FrigateConfig() - self.alerts = [] - self.alert_rules = AlertRulesConfig() - self.object_tracking = ObjectTrackingConfig() - self.logging = LoggingConfig() - - def apply_from_dict(self, data): - # Parse mqtt - self.load_mqtt_config(data) - - # Parse frigate - self.load_frigate_config(data) - - # Parse alerts/cameras - self.load_alerts_config(data) - - # Parse alert_rules - self.load_rules_config(data) - - # Parse object tracking - self.load_tracking_config(data) - - # Parse logger - self.load_logging_config(data) - - def load_logging_config(self, data): - logging = data.get('logging') - if logging is not None: - self.logging.level = logging.get('level') - self.logging.path = logging.get('path') - self.logging.rotate = logging.get('rotate') - self.logging.max_keep = logging.get('max_keep') - - def load_tracking_config(self, data): - tracking = data.get('object_tracking') - if tracking is not None: - self.object_tracking.enabled = tracking.get('enabled') - else: - self.object_tracking.enabled = True - - def load_rules_config(self, data): - rules = data.get('alert_rules') - if rules is not None: - self.alert_rules.minimum_duration_seconds = self.parse_duration(rules.get('min_event_duration', "0s")) - self.alert_rules.maximum_duration_seconds = self.parse_duration(rules.get('max_event_duration', "0s")) - self.alert_rules.require_snapshot = rules.get('snapshot', False) - self.alert_rules.require_video = rules.get('video', False) - - cooldown = rules.get('cooldown') - if cooldown is not None: - self.alert_rules.cooldown.camera_duration_seconds = self.parse_duration(cooldown.get('camera', "0s")) - self.alert_rules.cooldown.label_duration_seconds = self.parse_duration(cooldown.get('label', "0s")) - else: - self.alert_rules.cooldown.camera_duration_seconds = 0 - self.alert_rules.cooldown.label_duration_seconds = 0 - - def load_alerts_config(self, data): - alerts = data.get('alerts') - self.alerts.clear() - for alert in alerts: - new_alert = AlertConfig(alert.get('camera')) - new_alert.enabled = alert.get('enabled') or True - new_alert.labels = alert.get('labels') or [] - - zones = alert.get('zones') - if zones is not None: - new_alert.zones.ignore_zones = ZonesConfig.parse_zones(zones.get('ignore')) - new_alert.zones.require_zones = ZonesConfig.parse_zones(zones.get('require')) - self.alerts.append(new_alert) - - def load_frigate_config(self, data): - frigate = data.get('frigate') - if frigate is not None: - self.frigate.host = frigate.get('host') or "localhost" - self.frigate.port = frigate.get('port') or 5000 - self.frigate.use_ssl = frigate.get('ssl') or False - - def load_mqtt_config(self, data): - mqtt = data.get('mqtt') - if mqtt is not None: - self.mqtt.host = mqtt.get('host') or "localhost" - self.mqtt.port = mqtt.get('port') or 1883 - self.mqtt.listen_topic = mqtt.get('listen_topic') or "#" - self.mqtt.alert_topic = mqtt.get('alert_topic') or "alerts/camera_system" - self.mqtt.username = mqtt.get('username') - self.mqtt.password = mqtt.get('password') - - def __repr__(self): - return (f"Config(mqtt={self.mqtt}, alerts={self.alerts}, cooldown={self.cooldown}, " - f"snapshots={self.snapshots}, object_tracking={self.object_tracking})") - - def parse_duration(self, duration_str): - # Update regex pattern to capture float or integer and unit (s = seconds, m = minutes, h = hours) - pattern = r'(\d*\.?\d+)([smh])' - match = re.match(pattern, duration_str) - - if not match: - raise ValueError(f"Invalid duration format: {duration_str}") - - value, unit = match.groups() - value = float(value) # Convert value to float to handle both integers and floats - - if unit == 's': # seconds - return value - elif unit == 'm': # minutes to seconds - return value * 60 - elif unit == 'h': # hours to seconds - return value * 3600 - else: - raise ValueError(f"Unsupported time unit: {unit}") - - -class FileBasedAppConfig(AppConfig): - def __init__(self, config_file, watch_for_changes = True): - super().__init__() - self.file_path = Path(config_file).resolve() - self.reload_function() - if watch_for_changes: - self.enable_watchdog() - - def reload_function(self): - logger.info(f"Loading app configuration from {self.file_path}") - with open(self.file_path, 'r') as file: - data = yaml.safe_load(file) - self.apply_from_dict(data) - - def enable_watchdog(self): - # Set up the event handler and observer - file_to_watch = self.file_path - event_handler = FileChangeHandler(str(file_to_watch), self.reload_function) - observer = Observer() - observer.schedule(event_handler, path=str(file_to_watch.parent), recursive=False) - - # Start the observer - observer.start() - logger.info(f"Watching configuration file {file_to_watch} for changes...") - - -class FileChangeHandler(FileSystemEventHandler): - def __init__(self, file_path, reload_function): - self.file_path = file_path - self.reload_function = reload_function - - def on_modified(self, event): - if event.src_path == self.file_path: - logger.info(f"{self.file_path} has been modified, reloading...") - self.reload_function() diff --git a/Dockerfile b/Dockerfile index 0a06c04..69edc60 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,8 +9,16 @@ COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy the Python application into the container -COPY *.py . +COPY src ./src + +WORKDIR /app/src + +# Add health check instruction +ENV DOCKER_HEALTH_ENABLED=true +ENV DOCKER_HEALTH_HOST=127.0.0.1 +ENV DOCKER_HEALTH_PORT=59123 +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 CMD ["python", "/app/src/health_checker.py"] # Set the default command to run the app -CMD ["python", "mqtt_processor.py"] +CMD ["python", "-m", "frigate_event_processor.main"] diff --git a/MqttEventReceiver.py b/MqttEventReceiver.py deleted file mode 100644 index 2c68336..0000000 --- a/MqttEventReceiver.py +++ /dev/null @@ -1,110 +0,0 @@ -import paho.mqtt.client as mqtt -import json -import time -import logging -from datetime import datetime -from FrigateEventProcessor import FrigateEventProcessor - -from AppConfiguration import AppConfig - -logger = logging.getLogger(__name__) - -class MqttEventReceiver: - def __init__(self, config:AppConfig): - self.config = config - self.processor = FrigateEventProcessor(config, self.publish_message) - self.mqtt_client = None - - # Callback when the client receives a message from the server. - def on_message(self, client, userdata, msg): - try: - # Decode the message payload - message = msg.payload.decode('utf-8') - - # Parse the message as JSON - data = json.loads(message) - - # Extract the "after" node if it exists - self.processor.process_event(data) - - except json.JSONDecodeError: - logger.warning(f"Failed to decode message as JSON from topic {msg.topic}: {message}") - - def on_connect(self, client, userdata, flags, rc, properties): - logger.info(f"MQTT session is connected: {rc}") - - # Subscribe to the topic for events - topic = self.config.mqtt.listen_topic - logger.info(f"Subscribing to topic {topic}") - client.subscribe(topic) - - # Publish "online" message when successfully connected - client.publish(self.config.mqtt.alert_topic + "/status", "online", retain=True) - - def on_disconnect(self, client, userdata, flags, rc, properties): - if rc != 0: - logger.warning(f"MQTT session is disconnected: {rc}") - - - def publish_message(self, topic, value): - client = self.mqtt_client - client.publish(topic, value) - - # Function to connect to the broker and subscribe to the topic - def connect_and_loop(self): - client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) - client.will_set(self.config.mqtt.alert_topic + "/status", "offline", retain=True) - client.on_message = self.on_message - client.on_connect = self.on_connect - client.on_disconnect = self.on_disconnect - - broker = self.config.mqtt.host - port = self.config.mqtt.port - - logger.info(f"Connecting to broker {broker}:{port}") - try: - client.connect(broker, port, 60) - except Exception as e: - logger.error(f"Unable to connect to server: {e}") - raise - - self.mqtt_client = client - - # Starts processing the loop on another thread - client.loop_start() - - loop = True - skipInput = False - while loop: - # get user input and respond - try: - if skipInput: - time.sleep(1) - else: - command = input("") - if command.lower() == "p": - self.processor.print_ongoing_events() - elif command.lower() == "q": - loop = False - elif command.lower().startswith("a "): - self.processor.generate_alert_for_event_id(command[2:]) - elif command.lower().startswith("i "): - self.processor.log_info_event_id(command[2:]) - else: - logger.info("Unrecognized command. Expected: [p, q, a , i ]") - except EOFError: - logger.info("App received an EOF from stdin - disabling interactive mode") - skipInput = True - - except KeyboardInterrupt: - logger.info("App received signal to shudown.") - loop = False - - logger.info("Shutting down...") - client.publish(self.config.mqtt.alert_topic + "/status", "offline", retain=True) - - client.loop_stop() - client.disconnect() - self.processor.clear_pending_notifications() - - logger.info("Disconnected.") diff --git a/config_example.yaml b/config_example.yaml new file mode 100644 index 0000000..c528ae2 --- /dev/null +++ b/config_example.yaml @@ -0,0 +1,86 @@ +mqtt: + host: homeassistant.lan + port: 1883 + listen_topic: frigate/events # Topic to listen for events on, published by Frigate + alert_topic: alerts/camera_system # Topic where qualified alerts are published for notifications + +frigate: + host: homeassistant.lan # Address for the Frigate API, use the unauthenticated address + port: 5000 + ssl: false + +# record events into MQTT sensors that can be displayed in various dashboards or with Home Assistant +event_tracking: + enabled: true # Enable reporting event details to MQTT + mqtt_topic: frigate_events/cameras # Topic to publish event details on + home_assistant: true # Enable Home Assistant discovery for camera events + discovery_base_topic: homeassistant # Base topic for Home Assistant discovery + home_assistant_url: http://homeassistant.lan # Used for generating image URLs that load using the Frigate API integration + + +alerts: + - camera: yard + labels: + - person + - camera: front_door + labels: + - person + - package + - car + - fedex + - amazon + - ups + - package + zones: + ignore: + - zone: parked_cars + labels: ["car"] # ignore the label car in the parked_cars zone +# require: +# - zone: driveway +# labels: ["*"] + + - camera: backyard + labels: + - person + - package + - car + +alert_rules: + # Minimum duration of time an event is active before a notification is fired, 0 to disable + # Note this will delay processing of all alert notifications for at least this duration of + # time to make sure the event doesn't end first. Recommend to keep this to a low value to + # ensure timely delivery of alerts + min_event_duration: 1.1s + + # maximum time since the event was created that will still generate an alert (this can be used + # to prevent alerts for parked cars and other items that are detected for a long time) + max_event_duration: 1m + + # Require that a snapshot is avaialble before a notificaiton is fired + snapshot: false + + # Require that a video is avaialble before a notification is fired + video: false + + cooldown: + # Amount of time that must elapse before a notification is fired again for the same camera + camera: 0s # 30s or 5m or 1h + + # Amount of time that must elapse before a notification is fired again for the same label on a camera + label: 1m + +logging: + level: INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL + path: "./logs/frigate-processor.log" + max_keep: 10 # number of log files to keep + +ai: + enabled: true # enable AI processing with Google Gemini API + api_key: your-api-key-here + ai_model: gemini-1.5-flash + prompt: > + Very briefly describe what you see in this image from my security camera. Your + message needs to be short to fit in a phone notification. Don't describe + stationary objects or buildings. Focus on people, animals, or actions. + Start your response with the name of the camera. + inject_detection: true # inject the camera name and labels into the prompt diff --git a/mqtt_processor.py b/mqtt_processor.py deleted file mode 100644 index 407242a..0000000 --- a/mqtt_processor.py +++ /dev/null @@ -1,19 +0,0 @@ -import logging -import os - -from MqttEventReceiver import MqttEventReceiver -from AppConfiguration import FileBasedAppConfig - -logger = logging.getLogger(__name__) - -# Main function -def main(): - path = os.getenv('CONFIG_FILE', './config.yaml') - logger.info(f"Reading configuration from {path}") - - config = FileBasedAppConfig(path, True) - receiver = MqttEventReceiver(config) - receiver.connect_and_loop() - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a9f609d..812d45c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,7 @@ paho-mqtt==2.1.0 PyYAML==6.0.2 watchdog==5.0.2 prettytable==3.11.0 -wcwidth==0.2.13 \ No newline at end of file +wcwidth==0.2.13 +google-generativeai==0.8.3 +httpx==0.28.1 +Flask==3.1.0 diff --git a/src/frigate_event_processor/__init__.py b/src/frigate_event_processor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/frigate_event_processor/app_config_utils.py b/src/frigate_event_processor/app_config_utils.py new file mode 100644 index 0000000..0fbc707 --- /dev/null +++ b/src/frigate_event_processor/app_config_utils.py @@ -0,0 +1,109 @@ +""" +AppConfigurationUtils module +This module provides classes and functions to manage the configuration of an application. It includes support for loading configuration from a YAML file and watching for changes to the configuration file using the watchdog library. +Classes: + ParserUtilities: Utility functions for parsing configuration values. + BaseAppConfig: Abstract base class for application configuration. + FileBasedAppConfig: App configuration that is loaded from a file. + FileChangeHandler: Event handler for file changes. +Functions: + ParserUtilities.parse_duration(duration_str): Parse a duration string into seconds. + FileBasedAppConfig.reload_function(): Reload the configuration from the file. + FileBasedAppConfig.enable_watchdog(): Enable the watchdog to watch for changes to the configuration file. +""" + +import logging +from abc import ABC, abstractmethod +from pathlib import Path +import re +import yaml +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +from typing import Generic, TypeVar + +# Define the classes to map the structure +logger = logging.getLogger(__name__) + + +class ParserUtilities: + """Utility functions for parsing configuration values""" + + @staticmethod + def parse_duration(duration_str: str) -> float: + """Parse a duration string into seconds""" + + if duration_str is None: + return 0.0 + + # Update regex pattern to capture float or integer and unit (s = seconds, m = minutes, h = hours) + pattern = r'(\d*\.?\d+)([smh])' + match = re.match(pattern, duration_str) + + if not match: + raise ValueError(f"Invalid duration format: {duration_str}") + + value, unit = match.groups() + value = float(value) # Convert value to float to handle both integers and floats + + if unit == 's': # seconds + return value + if unit == 'm': # minutes to seconds + return value * 60 + if unit == 'h': # hours to seconds + return value * 3600 + + raise ValueError(f"Unsupported time unit: {unit}") + +class BaseAppConfig(ABC): + @abstractmethod + def apply_from_dict(self, data): + """Load settings from a dictionary""" + pass + +class FileBasedAppConfig: + """App configuration that is loaded from a file""" + def __init__(self, config_object: BaseAppConfig, config_file: str, watch_for_changes = True): + super().__init__() + + self.__config = config_object + self.config_file_path = Path(config_file).resolve() + self.__reload_function() + if watch_for_changes: + self.__enable_watchdog() + + + + @property + def config(self): + return self.__config + + def __reload_function(self): + """Reload the configuration from the file""" + logger.info("Loading app configuration from %s", self.config_file_path) + with open(self.config_file_path, 'r', encoding='utf-8') as file: + data = yaml.safe_load(file) + self.__config.apply_from_dict(data) + + def __enable_watchdog(self): + """Enable the watchdog to watch for changes to the configuration file""" + # Set up the event handler and observer + file_to_watch = self.config_file_path + event_handler = FileChangeHandler(str(file_to_watch), self.__reload_function) + observer = Observer() + observer.schedule(event_handler, path=str(file_to_watch.parent), recursive=False) + + # Start the observer + observer.start() + logger.info("Watching configuration file %s for changes...", file_to_watch) + +class FileChangeHandler(FileSystemEventHandler): + """Event handler for file changes""" + def __init__(self, file_to_watch, reload_function): + self.file_to_watch = file_to_watch + self.reload_function = reload_function + + def on_modified(self, event): + if event.src_path == self.file_to_watch: + logger.info("%s has been modified, reloading...", self.file_to_watch) + self.reload_function() + diff --git a/src/frigate_event_processor/app_configuration.py b/src/frigate_event_processor/app_configuration.py new file mode 100644 index 0000000..37197c2 --- /dev/null +++ b/src/frigate_event_processor/app_configuration.py @@ -0,0 +1,364 @@ +""" +AppConfiguration module +This module provides classes and functions to manage the configuration of an application that integrates with an MQTT broker, Frigate API, and other components. It includes support for loading configuration from a YAML file and watching for changes to the configuration file using the watchdog library. +Classes: + MqttConfig: Configuration for the MQTT broker. + FrigateConfig: Configuration for the Frigate API. + AlertConfig: Configuration for alerts. + ZoneAndLabelsConfig: Configuration for zones and labels. + ZonesConfig: Configuration for zones. + CooldownConfig: Configuration for cooldowns. + AlertRulesConfig: Configuration for alerting rules. + ObjectTrackingConfig: Configuration for object tracking. + LoggingConfig: Configuration for the logger. + AIConfig: Configuration for the AI model. + AppConfig: Configuration for the application. + FileBasedAppConfig: App configuration that is loaded from a file. + FileChangeHandler: Event handler for file changes. +Functions: + AppConfig.apply_from_dict(data): Load settings from a dictionary. + AppConfig.load_logging_config(data): Load the logging settings. + AppConfig.load_tracking_config(data): Load object tracking settings. + AppConfig.load_rules_config(data): Load alerting rules. + AppConfig.load_alerts_config(data): Load alerts configuration. + AppConfig.load_frigate_config(data): Load frigate configuration. + AppConfig.load_mqtt_config(data): Load saved configuration for the MQTT. + AppConfig.load_ai_config(data): Load AI configuration. + AppConfig.parse_duration(duration_str): Parse a duration string into seconds. + FileBasedAppConfig.reload_function(): Reload the configuration from the file. + FileBasedAppConfig.enable_watchdog(): Enable the watchdog to watch for changes to the configuration file. +""" +import logging +from pathlib import Path +from .app_config_utils import BaseAppConfig, ParserUtilities + +# Define the classes to map the structure +logger = logging.getLogger(__name__) + +class MqttConfig: + """Configuration for the MQTT broker""" + def __init__(self): + self.host = None + self.port = None + self.username = None + self.password = None + self.listen_topic = None + self.alert_topic = None + self.load_default() + + def load_default(self): + self.load_json({}) + + def load_json(self, data): + """Load saved configuration for the MQTT""" + self.host = data.get('host') or "localhost" + self.port = data.get('port') or 1883 + self.listen_topic = data.get('listen_topic') or "#" + self.alert_topic = data.get('alert_topic') or "alerts/camera_system" + self.username = data.get('username') + self.password = data.get('password') + + def __repr__(self): + return f"Mqtt(host={self.host}, username={self.username}, password={self.password}, listen_topic={self.listen_topic}, alert_topic={self.alert_topic})" + +class FrigateConfig: + """Configuration for the Frigate API""" + def __init__(self): + self.host = None + self.port = None + self.use_ssl = None + self.load_default() + + def load_default(self): + self.load_json({}) + + def load_json(self, data): + """Load frigate configuration from a JSON object""" + self.host = data.get('host') or "localhost" + self.port = data.get('port') or 5000 + self.use_ssl = data.get('ssl') or False + + @property + def api_base_url(self): + """Get the base URL for the Frigate API""" + protocol = "https" if self.use_ssl else "http" + return f"{protocol}://{self.host}:{self.port}/api" + + def __repr__(self): + return f"Frigate(url={self.api_base_url})" + +class AlertConfig: + """Configuration for alerts""" + def __init__(self): + self.camera = None + self.labels = [] + self.enabled = True + self.zones = ZonesConfig() + self.load_default() + + def load_default(self): + self.load_json({}) + + def load_json(self, data): + """Load alert configuration from a JSON object""" + self.camera = data.get('camera') + self.enabled = data.get('enabled') or True + self.labels = data.get('labels') or [] + + zones = data.get('zones') + if zones is not None: + self.zones.ignore_zones = ZonesConfig.parse_zones(zones.get('ignore')) + self.zones.require_zones = ZonesConfig.parse_zones(zones.get('require')) + + def __repr__(self): + return f"Alert(camera={self.camera}, objects={self.labels}, enabled={self.enabled}, zones={self.zones})" + +class ZoneAndLabelsConfig: + """Configuration for zones and labels""" + def __init__(self): + self.zone = "" + self.labels = [] + + def __repr__(self): + return f"ZoneAndLabel(zone={self.zone}, labels={self.labels})" + +class ZonesConfig: + """Configuration for zones""" + def __init__(self): + self.ignore_zones = [] + self.require_zones = [] + + def __repr__(self): + return f"Zones(ignored={self.ignore_zones}, required={self.require_zones})" + + @staticmethod + def check_zone_match(zone_configs: list[ZoneAndLabelsConfig], active_zones: list[str], label: str, default: bool) -> bool: + """Check if the zone matches the active zones and labels""" + if zone_configs is None or len(zone_configs) == 0: + return default + + for config in zone_configs: + # Check if the zone is in active_zones and the label is in the labels of the object + if config.zone in active_zones: + # if the label doesn't exist, the zone is enough - otherwise, if the rule has a * or matches the label + if label is None or "*" in config.labels or label in config.labels: + return True + return False + + @staticmethod + def parse_zones(data): + """Parse the zones from the configuration""" + if data is None: + return [] + + config = [] + for item in data: + zone = ZoneAndLabelsConfig() + zone.zone = item.get('zone') + zone.labels = item.get('labels') + config.append(zone) + + return config + +class CooldownConfig: + """Configuration for cooldowns""" + def __init__(self): + self.camera_duration_seconds = None + self.label_duration_seconds = None + + def load_default(self): + self.load_json({}) + + def load_json(self, data): + """Load the cooldown configuration from a JSON object""" + self.camera_duration_seconds = data.get('camera') or 0 + self.label_duration_seconds = data.get('label') or 0 + + def __repr__(self): + return f"Cooldown(camera={self.camera_duration_seconds}, object={self.label_duration_seconds})" + +class AlertRulesConfig: + """Configuration for alerting rules""" + def __init__(self): + self.minimum_duration_seconds = None + self.maximum_duration_seconds = None + self.require_snapshot = None + self.require_video = None + self.cooldown = CooldownConfig() + self.load_default() + + def load_default(self): + self.load_json({}) + + def load_json(self, data): + """Load the alerting rules from a JSON object""" + self.minimum_duration_seconds = ParserUtilities.parse_duration(data.get('min_event_duration')) + self.maximum_duration_seconds = ParserUtilities.parse_duration(data.get('max_event_duration')) + self.require_snapshot = data.get('snapshot') or False + self.require_video = data.get('video') or False + + cooldown = data.get('cooldown') + if cooldown is not None: + self.cooldown.load_json(cooldown) + else: + self.cooldown.load_default() + + def __repr__(self): + return f"AlertRules(min_dur={self.minimum_duration_seconds}s, snapshots={self.require_snapshot}, video={self.require_video}, cooldown={self.cooldown})" + +class EventTrackingConfig: + """Configuration for event tracking""" + def __init__(self): + self.enabled = None + self.mqtt_topic = None + self.home_assistant = None + self.discovery_base_topic = None + self.home_assistant_url = None + + def load_default(self): + self.load_json({}) + + def load_json(self, data): + """Load the event tracking configuration from a JSON object""" + self.enabled = data.get('enabled') or False + self.mqtt_topic = data.get('mqtt_topic') + self.home_assistant = data.get('home_assistant') or False + self.discovery_base_topic = data.get('discovery_base_topic') or "homeassistant" + self.home_assistant_url = data.get('home_assistant_url') + + def __repr__(self): + return (f"EventTracking(enabled={self.enabled}, mqtt_topic={self.mqtt_topic}, " + f"home_assistant={self.home_assistant}, discovery_base_topic={self.discovery_base_topic}, " + f"home_assistant_url={self.home_assistant_url})") + +class LoggingConfig: + """Configuration for the logger""" + def __init__(self): + self.level = None + self.path = None + self.rotate = None + self.max_keep = None + + def load_default(self): + self.load_json({}) + + def load_json(self, data): + """Load the logging configuration from a JSON object""" + self.level = data.get('level') or logging.INFO + self.path = data.get('path') or None + self.rotate = data.get('rotate') or False + self.max_keep = data.get('max_keep') or 10 + +class AIConfig: + """Configuration for the AI model""" + def __init__(self): + self.enabled = None + self.engine = None + self.api_key = None + self.ai_model = None + self.snapshot_format = None + self.prompt = None + self.inject_detection = None + self.service_url = None + + def load_default(self): + self.load_json({}) + + def load_json(self, data): + """Load the AI configuration from a JSON object""" + self.enabled = data.get('enabled') or False + self.engine = data.get('engine') or "google" + self.api_key = data.get('api_key') or None + self.ai_model = data.get('ai_model') or "gemini-1.5-flash" + self.snapshot_format = data.get('snapshot_format') or "image/jpeg" + self.prompt = data.get('prompt') or None + self.inject_detection = data.get('inject_detection') or True + self.service_url = data.get('service_url') or None + + def __repr__(self): + return f"AIConfig(enabled={self.enabled}, engine={self.engine}, api_key={self.api_key}, ai_model={self.ai_model}, snapshot_format={self.snapshot_format}, prompt={self.prompt}, inject_detection={self.inject_detection}, service_url={self.service_url})" + +class AppConfig(BaseAppConfig): + + """Configuration for the application""" + def __init__(self): + self.mqtt = MqttConfig() + self.frigate = FrigateConfig() + self.alerts = [] + self.alert_rules = AlertRulesConfig() + self.event_tracking = EventTrackingConfig() + self.logging = LoggingConfig() + self.ai = AIConfig() + + def apply_from_dict(self, data): + """Load settings from a dictionary""" + self.__load_mqtt_config(data) + self.__load_frigate_config(data) + self.__load_alerts_config(data) + self.__load_rules_config(data) + self.__load_tracking_config(data) + self.__load_logging_config(data) + self.__load_ai_config(data) + + def __load_logging_config(self, data): + """Load the logging settings""" + config = data.get('logging') + if config is not None: + self.logging.load_json(config) + else: + self.logging.load_default() + + def __load_tracking_config(self, data): + """Load object tracking settings""" + tracking = data.get('event_tracking') + if tracking is not None: + self.event_tracking.load_json(tracking) + else: + self.event_tracking.load_default() + + def __load_rules_config(self, data): + """Load alerting rules""" + rules = data.get('alert_rules') + if rules is not None: + self.alert_rules.load_json(rules) + else: + self.alert_rules.load_default() + + def __load_alerts_config(self, data): + """Load alerts configuration""" + alerts = data.get('alerts') + self.alerts.clear() + for alert in alerts: + new_alert = AlertConfig() + new_alert.load_json(alert) + self.alerts.append(new_alert) + + def __load_frigate_config(self, data): + """Load frigate configuration""" + frigate = data.get('frigate') + if frigate is not None: + self.frigate.load_json(frigate) + else: + self.frigate.load_default() + + def __load_mqtt_config(self, data): + """Load saved configuration for the MQTT""" + mqtt = data.get('mqtt') + if mqtt is not None: + self.mqtt.load_json(mqtt) + else: + self.mqtt.load_default() + + def __load_ai_config(self, data): + """Load AI configuration""" + ai = data.get('ai') + if ai is not None: + self.ai.load_json(ai) + else: + self.ai.load_default() + + def __repr__(self): + return (f"AppConfig(mqtt={self.mqtt}, frigate={self.frigate}, alerts={self.alerts}, ", + f"alert_rules={self.alert_rules}, event_tracking={self.event_tracking}, ", + f"logging={self.logging}, ai={self.ai})") + diff --git a/src/frigate_event_processor/docker_health.py b/src/frigate_event_processor/docker_health.py new file mode 100644 index 0000000..25cac4d --- /dev/null +++ b/src/frigate_event_processor/docker_health.py @@ -0,0 +1,65 @@ +import logging +from threading import Thread +from flask import Flask, jsonify +from abc import ABC, abstractmethod +import os + + +logger = logging.getLogger(__name__) +app = Flask(__name__) + +class BaseHealthCheck(ABC): + @abstractmethod + def is_healthy(self) -> bool: + pass + +class DockerHealthCheck: + + @staticmethod + def health_check_enabled(): + """Checks to see if the Docker health check is enabled.""" + return os.getenv('DOCKER_HEALTH_ENABLED') is not None + + def __init__(self, reference: BaseHealthCheck): + """ + Initialize the DockerHealthCheck with a custom health check function. + + :param health_check_func: A function that returns a boolean indicating health status + """ + self.__reference = reference + # self.__add_routes() + self.enabled = DockerHealthCheck.health_check_enabled() + self.host = os.getenv('DOCKER_HEALTH_HOST', '127.0.0.1') + self.port = os.getenv('DOCKER_HEALTH_PORT', 54123) + self.__flask_thread = None + self.__add_routes() + + def run_flask(self): + app.run(host=self.host, port=self.port) + + def start(self): + self.__flask_thread = Thread(target=self.run_flask, daemon=True) + self.__flask_thread.start() + + def stop(self): + self.__flask_thread.stop() + + def __add_routes(self): + """ + Add the Flask route for health checks. + """ + app.add_url_rule('/health', 'health_check', self.health_check, methods=['GET']) + + def health_check(self): + """ + Flask route to check the application's health status. + """ + try: + is_healthy = self.__reference.is_healthy() + if is_healthy: + return jsonify({"status": "healthy"}), 200 + else: + return jsonify({"status": "unhealthy"}), 500 + except Exception as e: + logger.error(f"Health check failed: {e}") + return jsonify({"status": "unhealthy", "error": str(e)}), 500 diff --git a/FrigateEventProcessor.py b/src/frigate_event_processor/frigate_event_processor.py similarity index 62% rename from FrigateEventProcessor.py rename to src/frigate_event_processor/frigate_event_processor.py index 51f8ae6..f02eddc 100644 --- a/FrigateEventProcessor.py +++ b/src/frigate_event_processor/frigate_event_processor.py @@ -1,14 +1,17 @@ import logging import json import threading -from logging.handlers import RotatingFileHandler -from AppConfiguration import AppConfig, ZonesConfig from datetime import datetime, timedelta +from logging.handlers import RotatingFileHandler from prettytable import PrettyTable +from .app_configuration import AppConfig, ZonesConfig +from .vision_processor import BaseVisionProcessor logger = logging.getLogger(__name__) class FrigateEventProcessor: + """Main class for processing events from Frigate via MQTT""" + def __init__(self, config: AppConfig, alert_publish_func): self.ongoing_events = dict() self.config = config @@ -18,32 +21,29 @@ def __init__(self, config: AppConfig, alert_publish_func): self.label_notification_history = dict() self.event_processing_queue = dict() self.alert_publish_func = alert_publish_func - + self.ai_processor = BaseVisionProcessor.get_vision_engine(config.ai) def process_event(self, event): - type = event.get('type') + """ Main loop for processing events """ + event_type = event.get('type') before = event.get('before') after = event.get('after') - if type == "new" or type == "update": - self.process_event_data(after, type.upper()) - elif type == "end": + if event_type == "new" or event_type == "update": + self.process_event_data(after, event_type.upper()) + elif event_type == "end": self.process_end_event(before) - """ - Cancel any pending timers queued - """ def clear_pending_notifications(self): - for index, (key, value) in enumerate(self.event_processing_queue.items()): + """ Cancel any pending timers queued """ + for _index, (_key, value) in enumerate(self.event_processing_queue.items()): value.cancel() self.event_processing_queue.clear() - """ - Indicates a new event has started - """ def process_event_data(self, data, tag): + """ Indicates a new event has started """ event = EventData(data) - logger.info(f"{tag}: {event.id}, camera={event.camera}, label={event.label}, score={event.score}") + logger.info("%s: %s, camera=%s, label=%s, score=%s", tag, event.id, event.camera, event.label, event.score) # if we need to delay processing this event, queue the event if self.should_queue_event(event): @@ -52,10 +52,8 @@ def process_event_data(self, data, tag): previous = self.ongoing_events.get(event.id) self.process_event_for_alert(event, previous) - """ - Check to see if an event needs to be queued bsased on the start_time of the event - """ def should_queue_event(self, event): + """ Check to see if an event needs to be queued bsased on the start_time of the event """ # Check to see if there is a minimum event duration before we process events if self.config.alert_rules.minimum_duration_seconds > 0: event_start_time = datetime.fromtimestamp(event.start_time) @@ -66,19 +64,16 @@ def should_queue_event(self, event): # If this event ID is already queued, don't break the queue if self.event_processing_queue.get(event.id) is not None: return True - return False - - """ - Handles queuing an event for the required duration of time and/or adding an event to an existing queue - """ def queue_event_processing(self, event): + """ Handles queuing an event for the required duration of time and/or adding an event to an existing queue """ + elapsed_time = datetime.now() - datetime.fromtimestamp(event.start_time) remaining_time = self.config.alert_rules.minimum_duration_seconds - elapsed_time.total_seconds() - if remaining_time < 0: remaining_time = 0 + remaining_time = max(remaining_time, 0) - logger.info(F"Queuing event {event.id} for remaining minimum duration: {remaining_time}") + logger.info("Queuing event %s for remaining minimum duration: %s", event.id, remaining_time) existing_queue = self.event_processing_queue.get(event.id) if existing_queue is None: existing_queue = EventProcessingQueue(event) @@ -89,154 +84,177 @@ def queue_event_processing(self, event): else: existing_queue.add_to_queue(event) - """ - Loops over the events stored into the event's queue and processes them in order to make sure - we perform all the necessary notifications - """ def process_event_queue(self, event_queue): + """ + Loops over the events stored into the event's queue and processes them in order to make sure + we perform all the necessary notifications + """ + del self.event_processing_queue[event_queue.id] previous = None for event in event_queue.queue: self.process_event_for_alert(event, previous) previous = event - """ - Evalautes an event to determine if it should be elevated to an alert - """ def process_event_for_alert(self, event, previous): - logger.info(F"Processing {event.id}...") + """ + Evalautes an event to determine if it should be elevated to an alert + """ + logger.info("Event %s: Processing new alert", event.id) self.ongoing_events[event.id] = event if self.evaluate_alert(previous, event): self.publish_event_to_mqtt(event) - - """ - Publish an alert to the MQTT alerting topic - """ + def publish_event_to_mqtt(self, event): + """ + Publish an alert to the MQTT alerting topic + """ alert = self.generate_notification(event) self.camera_notification_history[event.camera] = alert self.label_notification_history[self.camera_and_label_key(event)] = alert alert_payload = json.dumps(alert.to_dict()) - logger.info(f"ALERT: {alert_payload}") + logger.info("ALERT: %s", alert_payload) self.alert_publish_func(self.config.mqtt.alert_topic + "/alert", alert_payload) - """ - Generate the alert content based on an event ID. Used for manually triggering alerts. - """ + if self.config.event_tracking.enabled: + self.publish_event_tracking(alert) + + def publish_event_tracking(self, alert): + """ Publish the event to the event tracking MQTT topic """ + camera = alert.camera + state_topic = f"{self.config.event_tracking.mqtt_topic}/{camera}" + payload = json.dumps({ + "event_id": alert.event_id, + "image_url": f"{self.config.event_tracking.home_assistant_url}/api/frigate/notifications/{alert.event_id}/snapshot.jpg", + "message": alert.message + }) + + self.alert_publish_func(state_topic, payload) + def generate_alert_for_event_id(self, event_id): - logger.info(F"Manually processing {event_id} for alert") + """ Generate the alert content based on an event ID. Used for manually triggering alerts. """ + logger.info("Manually processing %s for alert", event_id) event = self.ongoing_events.get(event_id) if event is None: - logger.warning(f"Event {event_id} no longer available. Nothing generated.") + logger.warning("Event %s no longer available. Nothing generated.", event_id) return self.publish_event_to_mqtt(event) - """ - Write information about a particular event to the log - """ + def get_ongoing_event(self, event_id): + """ Get the ongoing event by ID """ + return self.ongoing_events.get(event_id) + def log_info_event_id(self, event_id): + """ Write information about a particular event to the log """ event = self.ongoing_events.get(event_id) if event is None: - logger.warning(f"Event {event_id} no longer available.") + logger.warning("Event %s no longer available.", event_id) return - logger.info(f"Event {event_id}: {event}") - - """ - Indicates that the event has ended and the object - is no longer detected in the video - """ + logger.info("Event %s: %s", event_id, event) + def process_end_event(self, data): - id = data.get('id') - logger.info(f"END: Event {id} ended") + """ Indicates that the event has ended and the object + is no longer detected in the video """ + event_id = data.get('id') + logger.info("END: Event %s ended", event_id) - existing_queue = self.event_processing_queue.get(id) + existing_queue = self.event_processing_queue.get(event_id) if existing_queue: existing_queue.timer.cancel() del self.event_processing_queue[existing_queue.id] - logger.info(F"Canceled processing {id} since it ended before the min_duration") + logger.info("Canceled processing %s since it ended before the min_duration", event_id) try: del self.ongoing_events[id] except KeyError: pass - - """ - Compare events to see if we should create - a new notification for this event - """ def evaluate_alert(self, before, after): - + """ + Compare events to see if we should create a new notification for this event + """ # check to see if this is a significant change from the previous event is_significant = True + reason = "" if before is not None: - is_significant = (before.label != after.label or - before.sub_label != after.sub_label or - before.current_zones != after.current_zones or - before.entered_zones != after.entered_zones or - (before.has_clip != after.has_clip and after.has_clip == True) or - (before.has_snapshot != after.has_snapshot and after.has_snapshot == True)) + if before.label != after.label: + reason = "label" + elif before.sub_label != after.sub_label: + reason = "sub_label" + elif before.current_zones != after.current_zones: + reason = "current_zones" + elif before.entered_zones != after.entered_zones: + reason = "entered_zones" + elif before.has_clip != after.has_clip and bool(after.has_clip): + reason = "has_clip" + elif before.has_snapshot != after.has_snapshot and bool(after.has_snapshot): + reason = "has_snapshot" + else: + reason = "end of statements" + is_significant = False + else: + reason = "new event - before was none" if not is_significant: - logger.info(f"Event update for {before.id} was not significant and was discarded.") + logger.info("Event %s: not significant change.", before.id) return False + + logger.info("Event %s: was significant due to %s", after.id, reason) # check for max_duration if self.config.alert_rules.maximum_duration_seconds > 0: event_too_old = datetime.fromtimestamp(after.start_time) + timedelta(seconds=self.config.alert_rules.maximum_duration_seconds) < datetime.now() if event_too_old: - logger.info(f"Event {after.id} was too old and discarded.") + logger.info("Event %s: too long duration for alert.", after.id) return False - # check to see if this event meets the configuration criteria for this camera alert_config = self.config_for_camera(after.camera) if alert_config is None: - logger.info(f"No configuration for camera {after.camera}") + logger.info("Event %s: no configuration for camera %s", after.id, after.camera) return True # is the alert enabled or disabled - if alert_config.enabled == False: - logger.info(f"Event {after.id} (camera={after.camera}) was disabled in configuration") + if not alert_config.enabled: + logger.info("Event %s: configuration disabled for camera %s", after.id, after.camera) return False # is the alert for an expected object type (label) if not after.label in alert_config.labels: - logger.info(f"Event {after.id} (camera={after.camera}, label={after.label}) was not included in configuration") + logger.info("Event %s: configuration missing for camera %s and label %s", after.id, after.camera, after.label) return False # is the event including a required zone? required_zones = alert_config.zones.require_zones if not ZonesConfig.check_zone_match(required_zones, after.current_zones, after.label, True): - logger.info(f"Event {after.id} (camera={after.camera}, label={after.label}, current_zones={after.current_zones}) was not in a required zone") + logger.info("Event %s: not in a required zone (camera=%s, label=%s, current_zones=%s)", after.id, after.camera, after.label, after.current_zones) return False # is the event in an ignored zone? ignored_zones = alert_config.zones.ignore_zones if ZonesConfig.check_zone_match(ignored_zones, after.current_zones, after.label, False): - logger.info(f"Event {after.id} (camera={after.camera}, label={after.label}, current_zones={after.current_zones}) was in an ignored zone") + logger.info("Event %s: in ignored zone (camera=%s, label=%s, current_zones=%s)", after.id, after.camera, after.label, after.current_zones) return False # does the event have required parameters - if self.config.alert_rules.require_snapshot and after.has_snapshot == False: - logger.info(f"Event {after.id} (camera={after.camera}, label={after.label}) has no snapshot and was dropped") + if self.config.alert_rules.require_snapshot and not after.has_snapshot: + logger.info("Event %s: no snapshot", after.id) return False - if self.config.alert_rules.require_video and after.has_video == False: - logger.info(f"Event {after.id} (camera={after.camera}, label={after.label}) has no video clip and was dropped") + if self.config.alert_rules.require_video and not after.has_video: + logger.info("Event %s: no video clip", after.id) return False # check to see if we're still in the event cooldown for the camera - if not self.is_event_past_cooldown(after): - logger.info(f"Event {after.id} (camera={after.camera}, label={after.label}) was still in cooldown and was skipped") + if not before and not self.is_event_past_cooldown(after): + logger.info("Event %s: was still in cooldown time", after.id) return False return True - """ - Check to see if this event meets the required cooldown time in the configuration - """ + def is_event_past_cooldown(self, event): + """ Check to see if this event meets the required cooldown time in the configuration """ cooldown = self.config.alert_rules.cooldown # If both camera and label cooldowns are 0, always return True @@ -261,11 +279,10 @@ def is_past_cooldown(previous_notification, duration_seconds): return False return True - - """ - Generate the location string for this event based on the camera name and current zones - """ + def generate_location_string(self, event): + """ Generate the location string for this event based on the camera name and current zones """ + camera = event.camera.replace("_", " ").title() if event.current_zones is not None and len(event.current_zones) > 0: zones = ", ".join(event.current_zones).replace("_", " ").title() @@ -273,26 +290,54 @@ def generate_location_string(self, event): else: return camera - """ - Returns a JSON string representing the alert notification for this event - """ + def generate_notification(self, event): + """ Returns a JSON string representing the alert notification for this event """ + if event is None: + logger.warning("generate_notification called with no event") + return None + + logger.debug("Event %s: Generating notification for event", event.id) + detection = self.generate_detection_string(event) location = self.generate_location_string(event) notification = Notification(event) - notification.message = f"{detection} was detected at {location}" - if event.has_snapshot: - notification.image = self.config.frigate.api_base_url + f"/events/{event.id}/thumbnail.jpg" - if event.has_clip: - notification.video = self.config.frigate.api_base_url + f"/events/{event.id}/clip.mp4" + if getattr(self.ai_processor, 'enabled', False): + logger.debug("Event %s: Processing with AI model", event.id) + notification.message = self.ai_processor.process_event(detection, location, self.get_snapshot_url(event), event) + else: + logger.debug("Event %s: AI Model is disabled", event.id) + + if notification.message is None: + logger.debug("Event %s: Generating default message", event.id) + notification.message = f"{detection} was detected at {location}" + + notification.image = self.get_thumbnail_url(event) + notification.video = self.get_video_url(event) return notification + def get_snapshot_url(self, event): + """ Get the snapshot URL for this event """ + return self.config.frigate.api_base_url + f"/events/{event.id}/snapshot.jpg" + + def get_thumbnail_url(self, event): + """ Get the thumbnail URL for this event """ + return self.config.frigate.api_base_url + f"/events/{event.id}/thumbnail.jpg" + + def get_video_url(self, event): + """ Get the video URL for this event """ + if event.has_clip: + return self.config.frigate.api_base_url + f"/events/{event.id}/clip.mp4" + return None + def camera_and_label_key(self, event): + """ Generate a unique key for this event based on the camera and label """ return f"{event.camera}__{event.label}" def generate_detection_string(self, event): + """ Generate the detection string for this event """ output = event.label.replace("_", " ").title() # "Person" if event.sub_label is not None: sub_labels = ', '.join([item['subLabel'].title() for item in event.sub_label]) @@ -300,17 +345,17 @@ def generate_detection_string(self, event): return output def config_for_camera(self, camera): + """ Get the configuration for the camera """ return self.cameras.get(camera) def configure_logging(self): - + """ Configure logging for the class """ level = logging.INFO if self.config.logging.level.upper() == "DEBUG": level = logging.DEBUG if self.config.logging.level.upper() == "WARNING": level = logging.WARNING - # enable logging logging.basicConfig( level=level, @@ -325,17 +370,20 @@ def configure_logging(self): handler.setFormatter(formatter) logging.getLogger().addHandler(handler) - """ - Print a table of ongoing events to the console - """ + def print_ongoing_events(self): + """ Print a table of ongoing events to the console """ table = PrettyTable() table.field_names = ["ID", "Camera", "Zones", "Label", "SubLabel", "Score", "Duration"] - for index, (key, event) in enumerate(self.ongoing_events.items()): + for _index, (key, event) in enumerate(self.ongoing_events.items()): table.add_row([key, event.camera, ", ".join(event.current_zones), event.label, event.sub_label, "{:.2f}".format(event.score), event.duration]) - logger.info("\n"+str(table)) + logger.info("\n%s", str(table)) + + + + class EventProcessingQueue: @@ -347,9 +395,6 @@ def __init__(self, event): def add_to_queue(self, event): self.queue.append(event) - - - class EventData: def __init__(self, data): self.id = data.get('id') diff --git a/src/frigate_event_processor/google_vision_processor.py b/src/frigate_event_processor/google_vision_processor.py new file mode 100644 index 0000000..bd851cb --- /dev/null +++ b/src/frigate_event_processor/google_vision_processor.py @@ -0,0 +1,62 @@ +"""Module to process images using Google Vision API.""" + +import logging +import base64 +import google.generativeai as genai +import httpx + +from .vision_processor import BaseVisionProcessor +from .app_configuration import AIConfig + +logger = logging.getLogger(__name__) + +class GoogleVision(BaseVisionProcessor): + """Class to process images using Google Vision API.""" + def __init__(self, ai_config: AIConfig): + self.config = ai_config + if ai_config.enabled: + logger.debug("Initializing Google AI with API_KEY: %s", ai_config.api_key) + genai.configure(api_key=ai_config.api_key) + ai_model = ai_config.ai_model + logger.debug("Specified model: %s", ai_model) + self.model = genai.GenerativeModel(model_name=ai_model) + logger.info("Google AI model initialized: %s", ai_model) + + @property + def enabled(self): + """Returns True if the AI processor is enabled.""" + return self.config.enabled + + def process_event(self, detection, location, snapshot_url, event): + """Processes an event using the AI model.""" + if not self.config.enabled: + logger.warning("AI processor is not enabled but was invoked.") + return None + + logger.info("Event %s: processing with AI model: %s", event.id, self.config.ai_model) + + logger.debug("Event %s: fetching image from URL: %s", event.id, snapshot_url) + try: + image_response = httpx.get(snapshot_url) + if image_response.status_code != 200: + logger.info("Event %s: failed to fetch image from URL: %s", event.id, snapshot_url) + return None + except httpx.RequestError as exc: + logger.error("Event %s: failed to fetch image from URL: %s: %s", event.id, snapshot_url, exc) + return None + + image_data = image_response.content + prompt = self.config.prompt or """Describe this image""" + + if self.config.inject_detection: + prompt += f" Camera name was '{location}'. This image was labeled with '{detection}'." + + request = [{'mime_type': self.config.snapshot_format, 'data': base64.b64encode(image_data).decode('utf-8')}, prompt] + logger.debug("API request parameters: %s", request) + try: + response = self.model.generate_content(request) + logger.debug("API response: %s", response) + return response.text + except Exception as exc: + logger.error("Failed to process event %s with AI model: %s: %s", event.id, self.config.ai_model, exc) + return None diff --git a/src/frigate_event_processor/hass_discovery.py b/src/frigate_event_processor/hass_discovery.py new file mode 100644 index 0000000..6b949de --- /dev/null +++ b/src/frigate_event_processor/hass_discovery.py @@ -0,0 +1,285 @@ +# MIT License +# Copyright (c) 2025 Ryan Gregg +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +Handles MQTT topic registration for Home Assistant MQTT discovery +""" + +import logging +import json +from enum import Enum +import paho.mqtt.client as mqtt +from .app_configuration import AppConfig + +logger = logging.getLogger(__name__) + +class DiscoverableDevice: + """Class to represent a device that can be discovered by Home Assistant""" + def __init__(self, name:str, identifiers:list, manufacturer:str, model:str, sw_version:str, hw_version:str): + self.name = name + self.identifiers = identifiers + self.manufacturer = manufacturer + self.model = model + self.sw_version = sw_version + self.hw_version = hw_version + + @staticmethod + def empty_device(): + """Returns an empty device object""" + return DiscoverableDevice("", [], "", "", "", "") + +class Availability: + """Class to represent the availability of a sensor""" + def __init__(self, topic:str): + self.topic = topic + self.payload_available = "online" + self.payload_not_available = "offline" + self.value_template = None + + def to_dict(self) -> dict: + """Convert the object to a dictionary for JSON serialization""" + base_dict = { + "topic": self.topic, + "payload_available": self.payload_available, + "payload_not_available": self.payload_not_available, + "value_template": self.value_template + } + + #remove any keys with None values + return {key: value for key, value in base_dict.items() if value is not None} + + +class SensorType(Enum): + """Enumeration of sensor types""" + SENSOR = "sensor" + # BINARY_SENSOR = "binary_sensor" + # SWITCH = "switch" + # FAN = "fan" + # LIGHT = "light" + # COVER = "cover" + # CLIMATE = "climate" + # VACUUM = "vacuum" + # CAMERA = "camera" + IMAGE = "image" + # LOCK = "lock" + # DEVICE_TRACKER = "device_tracker" + # HUMIDIFIER = "humidifier" + # AIR_QUALITY = "air_quality" + # WATER_HEATER = "water_heater" + # WATER_LEVEL = "water_level" + # WATER_QUALITY = "water_quality" + # WINDOW = "window" + # WINDOW_COVERING = "window_covering" + # ZONE = "zone" + TEXT = "text" + +class DeviceClass(Enum): + """Enumeration of device classes""" + ENUM = "enum" + BATTERY = "battery" + CONNECTIVITY = "connectivity" + CURRENT = "current" + ENERGY = "energy" + HUMIDITY = "humidity" + ILLUMINANCE = "illuminance" + POWER = "power" + PRESSURE = "pressure" + SIGNAL_STRENGTH = "signal_strength" + TEMPERATURE = "temperature" + TIMESTAMP = "timestamp" + VOLTAGE = "voltage" + +class StateClass(Enum): + """Enumeration of state classes""" + DEFAULT = None + MEASUREMENT = "measurement" + TOTAL_INCREASING = "total_increasing" + TOTAL = "total" + + +class DiscoverableEntityBase: + """Class to represent common entity components for Home Assistant""" + + def __init__(self, unique_id:str, name:str): + self._sensor_type = None + self.name = name + self.icon = None + self.enabled_by_default = True + self._availability = None + self.device = DiscoverableDevice.empty_device() + self.unique_id = unique_id + + @property + def sensor_type(self): + return self._sensor_type + + @sensor_type.setter + def sensor_type(self, value): + if not isinstance(value, SensorType): + raise ValueError("sensor_type must be a SensorType") + self._sensor_type = value + + @property + def availability(self): + return self._availability + + @availability.setter + def availability(self, value): + if value is None: + pass + elif not isinstance(value, Availability): + raise ValueError("availability must be an Availability") + self._availability = value + + def to_dict(self): + """Convert the object to a dictionary for JSON serialization""" + base_dict = { + "name": self.name, + "icon": self.icon, + "enabled_by_default": self.enabled_by_default, + "availability": self.availability.to_dict() if self.availability else None, + "device": { + "name": self.device.name, + "identifiers": self.device.identifiers, + "manufacturer": self.device.manufacturer, + "model": self.device.model, + "sw_version": self.device.sw_version, + "hw_version": self.device.hw_version, + }, + "unique_id": self.unique_id, + } + + # Remove keys with None values + return self.remove_none_values(base_dict) + + def remove_none_values(self, dictionary: dict): + """Remove keys with None values from a dictionary""" + return {key: value for key, value in dictionary.items() if value is not None} + +class DiscoverableSensor(DiscoverableEntityBase): + """Class to represent a sensor that can be discovered by Home Assistant""" + + def __init__(self, unique_id:str, name:str): + super().__init__(unique_id, name) + self._sensor_type = SensorType.SENSOR + self._state_class = None + self._device_class = None + self.unit_of_measurement = None + self.value_template = None + self.state_topic = None + self.options = None + + @property + def device_class(self) -> DeviceClass: + return self._device_class + + @device_class.setter + def device_class(self, value): + if not isinstance(value, DeviceClass): + raise ValueError("device_class must be a DeviceClass") + self._device_class = value + + @property + def state_class(self) -> StateClass: + return self._state_class + + @state_class.setter + def state_class(self, value): + if not isinstance(value, StateClass): + raise ValueError("state_class must be a StateClass") + self._state_class = value + + def to_dict(self): + """Convert the object to a dictionary for JSON serialization""" + base_dict = { + "device_class": self._device_class.value if self._device_class else None, + "state_class": self._state_class.value if self._state_class else None, + "unit_of_measurement": self.unit_of_measurement, + "value_template": self.value_template, + "state_topic": self.state_topic, + "options": self.options, + } + base_dict.update(super().to_dict()) + return self.remove_none_values(base_dict) + +class DiscoverableText(DiscoverableEntityBase): + """Class to represent a text_input sensor that can be discovered by Home Assistant""" + + def __init__(self, unique_id:str, name:str): + super().__init__(unique_id, name) + self._sensor_type = SensorType.TEXT + self.value_template = None + self.state_topic = None + self.command_topic = None + self.command_template = None + + def to_dict(self): + """Convert the object to a dictionary for JSON serialization""" + base_dict = { + "value_template": self.value_template, + "state_topic": self.state_topic, + "command_topic": self.command_topic, + "command_template": self.command_template, + } + base_dict.update(super().to_dict()) + return self.remove_none_values(base_dict) + + + +class DiscoverableImage(DiscoverableEntityBase): + """Class to represent an image sensor that can be discovered by Home Assistant""" + + def __init__(self, unique_id:str, name:str): + super().__init__(unique_id, name) + self._sensor_type = SensorType.IMAGE + self.url_topic = None + self.url_template = None + + def to_dict(self): + + base_dict = { + "url_topic": self.url_topic, + "url_template": self.url_template + } + base_dict.update(super().to_dict()) + + return super().remove_none_values(base_dict) + + + + +class HomeAssistantDiscovery: + """Class to handle Home Assistant MQTT discovery""" + def __init__(self, config:AppConfig): + self.config = config + + def publish_sensor(self, sensor:DiscoverableSensor, mqtt_client: mqtt.Client): + """Publishes HASS discovery information for registered devices""" + + discovery_topic = f"{self.config.event_tracking.discovery_base_topic}/{sensor.sensor_type.value}/{self.clean_key_name(sensor.device.identifiers[0])}/{self.clean_key_name(sensor.unique_id)}/config" + payload = json.dumps(sensor.to_dict()) + logger.debug("Publishing discovery for %s: %s", discovery_topic, payload) + + mqtt_client.publish(discovery_topic, payload, retain=True) + + def clean_key_name(self, value:str): + """Removes invalid characters from HASS topic names""" + return value.replace(".", "_").replace(" ", "_").replace(",", "_").replace(":", "_").replace("-", "_") diff --git a/src/frigate_event_processor/main.py b/src/frigate_event_processor/main.py new file mode 100644 index 0000000..b479631 --- /dev/null +++ b/src/frigate_event_processor/main.py @@ -0,0 +1,71 @@ +""" +mqtt_processor.py + +This module sets up and runs an MQTT event processor. It reads configuration +from a specified file, initializes the application configuration, and starts +an MQTT event receiver to process incoming events. + +Modules: + logging: Provides logging capabilities. + os: Provides a way of using operating system dependent functionality. + MqttEventReceiver: Handles receiving events from an MQTT broker. + AppConfiguration: Manages application configuration from a file. + +Functions: + main: Entry point for the application. Reads configuration, initializes + the MQTT event receiver, and starts the event loop. +""" +import logging +import os + +from .mqtt_event_receiver import MqttEventReceiver +from .app_configuration import AppConfig +from .app_config_utils import FileBasedAppConfig +from .docker_health import DockerHealthCheck + +logger = logging.getLogger(__name__) + +health_check = None +mqtt_receiver = None + +# Main function +def main(): + """Entry point for app""" + path = os.getenv('CONFIG_FILE', './config.yaml') + logger.info("Reading configuration from %s", path) + + config = AppConfig() + file_config = FileBasedAppConfig(config, path, True) + logger.debug("Configuration: %s", file_config.config) + + mqtt_receiver = MqttEventReceiver(file_config.config) + + # Start the health check if enabled + if DockerHealthCheck.health_check_enabled(): + health_check = DockerHealthCheck(mqtt_receiver) + health_check.start() + + # Start the MQTT event receiver (blocking) + try: + mqtt_receiver.connect_and_loop() + except KeyboardInterrupt: + logger.info("Shutting down MQTT event receiver") + mqtt_receiver.disconnect() + if health_check is not None: + health_check.stop() + + + +def health_check_func() -> bool: + """ + Custom health check function for the DockerHealthCheck. + + :return: True if the application is healthy, False otherwise + """ + if mqtt_receiver is None or not mqtt_receiver.is_connected: + return False + + return True + +if __name__ == '__main__': + main() diff --git a/src/frigate_event_processor/mqtt_event_receiver.py b/src/frigate_event_processor/mqtt_event_receiver.py new file mode 100644 index 0000000..aae9a19 --- /dev/null +++ b/src/frigate_event_processor/mqtt_event_receiver.py @@ -0,0 +1,217 @@ +""" +This module defines the MqttEventReceiver class, which is responsible for receiving and processing MQTT messages. +It connects to an MQTT broker, subscribes to a specified topic, and processes incoming messages using the +FrigateEventProcessor class. The module also handles publishing messages to the MQTT broker and provides an +interactive command-line interface for managing ongoing events. +Classes: + MqttEventReceiver: A class that handles MQTT message reception, processing, and publishing. +Functions: + on_message: Callback when the client receives a message from the server. + on_connect: Callback when the client connects to the server. + on_disconnect: Callback when the client disconnects from the server. + publish_message: Publishes a message to the MQTT broker. + connect_and_loop: Connects to the MQTT broker and starts the event loop. +""" +import json +import time +import logging +import paho.mqtt.client as mqtt +from .frigate_event_processor import FrigateEventProcessor +from .app_configuration import AppConfig +from .hass_discovery import HomeAssistantDiscovery, DiscoverableSensor, DiscoverableImage, DiscoverableDevice, DeviceClass +from .docker_health import BaseHealthCheck + +logger = logging.getLogger(__name__) + +class MqttEventReceiver(BaseHealthCheck): + """A class that handles MQTT message reception, processing, and publishing.""" + + def __init__(self, config:AppConfig): + self.config = config + self.processor = FrigateEventProcessor(config, self.publish_message) + self.mqtt_client = None + + @property + def is_connected(self): + """Returns True if the MQTT client is connected.""" + return self.mqtt_client.is_connected() + + def is_healthy(self): + if self.mqtt_client is None: + return False + if not self.mqtt_client.is_connected(): + return False + return True + + # Callback when the client receives a message from the server. + def on_message(self, _client, _userdata, msg): + """Callback when the client receives a message from the server.""" + try: + # Decode the message payload + message = msg.payload.decode('utf-8') + + # Parse the message as JSON + data = json.loads(message) + + # Extract the "after" node if it exists + self.processor.process_event(data) + + except json.JSONDecodeError: + logger.warning("Failed to decode message as JSON from topic %s: %s", msg.topic, message) + + def on_connect(self, client, _userdata, _flags, rc, _properties): + """Callback when the client connects to the server.""" + logger.info("MQTT session is connected: %s", rc) + + # Subscribe to the topic for events + topic = self.config.mqtt.listen_topic + logger.info("Subscribing to topic %s", topic) + client.subscribe(topic) + + # Publish "online" message when successfully connected + client.publish(self.config.mqtt.alert_topic + "/status", "online", retain=True) + + def on_disconnect(self, _client, _userdata, _flags, rc, _properties): + """Callback when the client disconnects from the server.""" + if rc != 0: + logger.warning("MQTT session is disconnected: %s", rc) + + + def publish_message(self, topic, value): + """Publishes a message to the MQTT broker.""" + client = self.mqtt_client + client.publish(topic, value) + + def connect_and_loop(self): + """Connects to the MQTT broker and starts the event loop.""" + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + client.will_set(self.config.mqtt.alert_topic + "/status", "offline", retain=True) + client.on_message = self.on_message + client.on_connect = self.on_connect + client.on_disconnect = self.on_disconnect + + broker = self.config.mqtt.host + port = self.config.mqtt.port + + logger.info("Connecting to broker %s:%s", broker, port) + try: + client.connect(broker, port, 60) + except Exception as e: + logger.error("Unable to connect to server: %s", e) + raise + + self.mqtt_client = client + + self.register_home_assistant_discovery() + + # Starts processing the loop on another thread + client.loop_start() + + loop = True + skip_input = False + while loop: + # get user input and respond + try: + if skip_input: + time.sleep(1) + else: + command = input("") + if command.lower() == "p": + self.processor.print_ongoing_events() + elif command.lower() == "q": + loop = False + elif command.lower().startswith("a "): + self.processor.generate_alert_for_event_id(command[2:]) + elif command.lower().startswith("i "): + self.processor.log_info_event_id(command[2:]) + elif command.lower().startswith("n "): + event = self.processor.get_ongoing_event(command[2:]) + message = self.processor.generate_notification(event) + output = json.dumps(message.to_dict(), indent=2) + logger.info("Notification:\n%s", output) + elif command.lower().startswith("t "): + event = self.processor.get_ongoing_event(command[2:]) + url = self.processor.get_snapshot_url(event) + logger.info("Snapshot URL: %s", url) + else: + option_text = ("p: Print ongoing events\n", + "q: Quit\n", + "a : Generate alert for event ID\n", + "i : Log info for event ID\n", + "n : Generate notification for event ID\n", + "t : Get snapshot URL for event ID\n") + logger.info("Unrecognized command. Expected:\n%s", option_text) + except EOFError: + logger.info("App received an EOF from stdin - disabling interactive mode") + skip_input = True + + except KeyboardInterrupt: + logger.info("App received signal to shudown.") + loop = False + + logger.info("Shutting down...") + client.publish(self.config.mqtt.alert_topic + "/status", "offline", retain=True) + + client.loop_stop() + client.disconnect() + self.processor.clear_pending_notifications() + + logger.info("Disconnected.") + + + def disconnect(self): + """Disconnects the MQTT client.""" + if self.mqtt_client: + self.mqtt_client.loop_stop() + self.mqtt_client.disconnect() + if self.processor: + self.processor.clear_pending_notifications() + + def register_home_assistant_discovery(self): + """ Register the Home Assistant discovery for this service """ + if not self.config.event_tracking.home_assistant: + return + + hass_discovery = HomeAssistantDiscovery(self.config) + + processor_device = DiscoverableDevice("Frigate Event Processor", ["frigate_event_processor"], "Frigate Event Processor", "frigate-event-processor", "1.0", "1.0") + processor_available = DiscoverableSensor("processor_available", "Processor Running") + processor_available.state_topic = self.config.mqtt.alert_topic + "/status" + processor_available.icon = "mdi:server" + processor_available.device = processor_device + processor_available.device_class = DeviceClass.ENUM + processor_available.options = ["online", "offline"] + hass_discovery.publish_sensor(processor_available, self.mqtt_client) + + # Register the MQTT discovery for the event tracking + camera_names = [alert.camera for alert in self.config.alerts] + for camera in camera_names: + + device = DiscoverableDevice(f"Event Processor {camera.title()} ", [f"frigate_event_processor_{camera}"], "Frigate Event Processor", "frigate-event-processor", "1.0", "1.0") + + sensor_event_id = DiscoverableSensor(f"{camera}_event_id", + "Last Event ID") + sensor_event_id.value_template = "{{ value_json.event_id }}" + sensor_event_id.state_topic = f"{self.config.event_tracking.mqtt_topic}/{camera}" + sensor_event_id.icon = "mdi:star-box" + sensor_event_id.device = device + hass_discovery.publish_sensor(sensor_event_id, self.mqtt_client) + + sensor_event_image = DiscoverableImage(f"{camera}_event_image", + "Last Snapshot") + sensor_event_image.icon = "mdi:image-area" + sensor_event_image.url_template = "{{ value_json.image_url }}" + sensor_event_image.device = device + sensor_event_image.url_topic = f"{self.config.event_tracking.mqtt_topic}/{camera}" + hass_discovery.publish_sensor(sensor_event_image, self.mqtt_client) + + sensor_message = DiscoverableSensor(f"{camera}_message", + "Last Event Description") + sensor_message.value_template = "{{ value_json.message }}" + sensor_message.state_topic = f"{self.config.event_tracking.mqtt_topic}/{camera}" + sensor_message.device = device + hass_discovery.publish_sensor(sensor_message, self.mqtt_client) + + logger.info("Home Assistant Discovery registration complete.") + + diff --git a/src/frigate_event_processor/ollama_vision_processor.py b/src/frigate_event_processor/ollama_vision_processor.py new file mode 100644 index 0000000..b088c3b --- /dev/null +++ b/src/frigate_event_processor/ollama_vision_processor.py @@ -0,0 +1,48 @@ +""" Vision processor that uses a local Olama model to generate image descriptions. """ +import logging +import base64 +import httpx + +from .vision_processor import BaseVisionProcessor +from .app_configuration import AIConfig + +logger = logging.getLogger(__name__) + +class OlamaVision(BaseVisionProcessor): + """Class to process images using the local Olama model.""" + def __init__(self, ai_config: AIConfig): + self.config = ai_config + + @property + def enabled(self): + """Returns True if the AI processor is enabled.""" + return self.config.enabled + + def process_event(self, detection, location, snapshot_url, event): + """Processes an event using the AI model.""" + if not self.config.enabled: + logger.warning("AI processor is not enabled but was invoked.") + return None + + logger.info("Event %s: processing with AI model: %s", event.id, self.config.ai_model) + + image_data = super()._fetch_image_base64(snapshot_url) + if image_data is None: + return None + + prompt = super()._prepare_prompt(detection, location) + + request = { + "model": self.config.ai_model, + "prompt": prompt, + "stream": False, + "images": [image_data] + } + + try: + response = httpx.post(self.config.service_url, json=request) + logger.debug("API response: %s", response) + return response.json().get('response') + except Exception as exc: + logger.error("Event %s: failed to process image with Olama model: %s", event.id, exc) + return None \ No newline at end of file diff --git a/src/frigate_event_processor/vision_processor.py b/src/frigate_event_processor/vision_processor.py new file mode 100644 index 0000000..a94797a --- /dev/null +++ b/src/frigate_event_processor/vision_processor.py @@ -0,0 +1,64 @@ +"""Provides an abstract class for AI generated vision processors.""" + +import httpx +import base64 +import logging +from abc import ABC, abstractmethod +from .app_configuration import AIConfig + +logger = logging.getLogger(__name__) + +class BaseVisionProcessor(ABC): + """Base class for AI vision processors.""" + def __init__(self, ai_config): + self.config = ai_config + + @property + def enabled(self): + """Returns True if the AI processor is enabled.""" + return self.config.enabled + + @abstractmethod + def process_event(self, detection, location, snapshot_url, event): + """Processes an event using the AI model.""" + raise NotImplementedError("Subclasses must implement this method") + + def _fetch_image_base64(self, image_url) -> str: + """Fetches an image from a URL and returns it as a base64 encoded string.""" + logger.debug("Fetching image from URL: %s", image_url) + + try: + image_response = httpx.get(image_url) + if image_response.status_code != 200: + logger.info("Failed to fetch image from URL [%s]: %s", image_response.status_code, image_url) + return None + except httpx.RequestError as exc: + logger.error("Failed to fetch image from URL: %s: %s", image_url, exc) + return None + + image_data = image_response.content + return base64.b64encode(image_data).decode('utf-8') + + def _prepare_prompt(self, detection, location): + """Prepares the prompt for the AI model.""" + prompt = self.config.prompt or """Describe this image""" + + if self.config.inject_detection: + prompt += f"\n\nCamera name was '{location}'. This image was labeled with '{detection}'." + + return prompt + + @staticmethod + def get_vision_engine(config: AIConfig) -> 'BaseVisionProcessor': + """Returns a vision processor based on the engine.""" + if config.engine == 'ollama': + logger.info("Using Olama Vision processor.") + from .ollama_vision_processor import OlamaVision + return OlamaVision(config) + elif config.engine == 'google': + logger.info("Using Google Gemini Vision processor.") + from .google_vision_processor import GoogleVision + return GoogleVision(config) + else: + logger.warning(f"Unsupported vision engine: {config.engine}") + return None \ No newline at end of file diff --git a/src/health_checker.py b/src/health_checker.py new file mode 100644 index 0000000..998cae1 --- /dev/null +++ b/src/health_checker.py @@ -0,0 +1,16 @@ +import httpx +import os + +if __name__ == "__main__": + + is_enabled = os.getenv('DOCKER_HEALTH_ENABLED') is not None + host = os.getenv('DOCKER_HEALTH_HOST', 'localhost') + port = os.getenv('DOCKER_HEALTH_PORT', 8000) + + if is_enabled: + response = httpx.get(f"http://{host}:{port}/health") + if response.status_code != 200: + # exit with failure + exit(1) + + exit(0)