Skip to content

Commit

Permalink
Introduction of a way for end-user to bring special rules.
Browse files Browse the repository at this point in the history
This patch (partially) fixes #409.

Indeed, before this patch, end-user couldn't bring their own
rules. Therefore, through this patch, we expose part of our
internal through a "user-friendly" configuration key.

There a room for improvement, but this first version should handle
most of the cases.
  • Loading branch information
funilrys committed Dec 28, 2024
1 parent 296a613 commit 06426c7
Show file tree
Hide file tree
Showing 10 changed files with 612 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ omit =
PyFunceble/dataset/mariadb_base.py
PyFunceble/dataset/*/mariadb.py
PyFunceble/dataset/*/mysql.py
PyFunceble/checker/availability/extra_rules.py
PyFunceble/checker/availability/extras/*
PyFunceble/__init__.py
PyFunceble/logger.py

Expand Down
2 changes: 2 additions & 0 deletions PyFunceble/checker/availability/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from PyFunceble.checker.availability.extras.base import ExtraRuleHandlerBase
from PyFunceble.checker.availability.extras.dns import DNSRulesHandler
from PyFunceble.checker.availability.extras.etoxic import EToxicHandler
from PyFunceble.checker.availability.extras.external import ExternalRulesHandler
from PyFunceble.checker.availability.extras.rules import ExtraRulesHandler
from PyFunceble.checker.availability.extras.subject_switch import (
SubjectSwitchRulesHandler,
Expand Down Expand Up @@ -174,6 +175,7 @@ def __init__(
DNSRulesHandler(),
EToxicHandler(),
ExtraRulesHandler(),
ExternalRulesHandler(rulesets=PyFunceble.storage.SPECIAL_RULES),
]
self.db_session = db_session

Expand Down
2 changes: 1 addition & 1 deletion PyFunceble/checker/availability/extras/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def do_on_header_match(
def handle_regex_match_mode(_req: requests.Response):
matches2search_result = {}

for header, loc_matches in matches:
for header, loc_matches in matches.items():
matches2search_result[header] = False

if header not in _req.headers:
Expand Down
341 changes: 341 additions & 0 deletions PyFunceble/checker/availability/extras/external.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
"""
The tool to check the availability or syntax of domain, IP or URL.
::
██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗
██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝
██████╔╝ ╚████╔╝ █████╗ ██║ ██║██╔██╗ ██║██║ █████╗ ██████╔╝██║ █████╗
██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║██║╚██╗██║██║ ██╔══╝ ██╔══██╗██║ ██╔══╝
██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
Provides the extra rules handler based on some DNS records.
Author:
Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
Special thanks:
https://pyfunceble.github.io/#/special-thanks
Contributors:
https://pyfunceble.github.io/#/contributors
Project link:
https://github.com/funilrys/PyFunceble
Project documentation:
https://docs.pyfunceble.com
Project homepage:
https://pyfunceble.github.io/
License:
::
Copyright 2017, 2018, 2019, 2020, 2022, 2023, 2024 Nissar Chababy
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

# pylint: disable=line-too-long

from typing import Optional

from PyFunceble.checker.availability.extras.base import ExtraRuleHandlerBase
from PyFunceble.checker.availability.status import AvailabilityCheckerStatus


class ExternalRulesHandler(ExtraRuleHandlerBase):
"""
Provides the external rules handler that is used to handle the external
provided rules.
Through this handler, end-user can provide their own rules to handle
the availability status of a subject.
:param status:
The previously gathered status.
:type status:
:class:`~PyFunceble.checker.availability.status.AvailabilityCheckerStatus`
"""

rulesets: list = []
"""
The rulesets to process.
If you want to switch from the status code, you should provide a dict
with the following structure:
{
"subject_pattern": ".*", // The pattern the subject should match.
"validation_type": "status_code", // Type of validation (status_code, headers, body, etc.)
"state_transition": "up", // "up" -> ACTIVE, "down" -> INACTIVE
"required_status_code": [404], // Status code to match.
}
If you want to switch from the headers, you should provide a dict
{
"subject_pattern": ".*", // The pattern the subject should match.
"validation_type": "headers", // Type of validation (status_code, headers, body, etc.)
"state_transition": "up", // "up" -> ACTIVE, "down" -> INACTIVE
"required_headers_patterns": { // Required, the headers to match.
"header_name": ["possible", "values"]
},
}
If you want to switch from the body, you should provide a dict
{
"subject_pattern": ".*", // The pattern the subject should match.
"validation_type": "body", // Type of validation (status_code, headers, body, etc.)
"state_transition": "up", // "up" -> ACTIVE, "down" -> INACTIVE
"required_body_patterns": ["regex1", "regex2"] // Required, the body patterns to match.
}
If you want to switch from a combination of headers and body, you should provide a dict
{
"subject_pattern": ".*", // The pattern the subject should match.
"validation_type": "headers+body", // Type of validation (status_code, headers, body, etc.)
"state_transition": "up", // "up" -> ACTIVE, "down" -> INACTIVE
"required_headers_patterns": { // Required, the headers to match.
"header_name": ["possible", "values"]
},
"required_body_patterns": ["regex1", "regex2"] // Required, the body patterns to match.
}
If you want to switch from a combination of all, you should provide a dict
{
"subject_pattern": ".*", // The pattern the subject should match.
"validation_type": "all", // Type of validation (status_code, headers, body, etc.)
"state_transition": "up", // "up" -> ACTIVE, "down" -> INACTIVE
"required_status_code": [404], // Optional, Status code to match.
"required_headers_patterns": { // Optional, the headers to match.
"header_name": ["possible", "values"]
},
"required_body_patterns": ["regex1", "regex2"] // Optional, the body patterns to match.
}
"""

def __init__(
self,
status: Optional[AvailabilityCheckerStatus] = None,
*,
rulesets: list = None
) -> None:
if rulesets is not None:
self.rulesets = rulesets

super().__init__(status)

def switch_from_status_code_rule(self, rule: dict) -> "ExternalRulesHandler":
"""
Switch from the status code rule.
:param rule:
The rule to switch from.
:type rule: dict
"""

required_keys = ["validation_type", "required_status_code"]

if any(x not in rule for x in required_keys):
return self

if rule["validation_type"] != "status_code":
return self

if all(
self.status.http_status_code != int(x) for x in rule["required_status_code"]
):
return self

if rule["state_transition"] == "up":
return self.switch_to_up()

if rule["state_transition"] == "down":
return self.switch_to_down()

return self

def switch_from_headers_rule(self, rule: dict) -> "ExternalRulesHandler":
"""
Switch from the headers rule.
:param rule:
The rule to switch from.
:type rule: dict
"""

required_keys = ["validation_type", "required_headers_patterns"]

if any(x not in rule for x in required_keys):
return self

if rule["validation_type"] != "headers":
return self

if rule["state_transition"] == "up":
switch_method = self.switch_to_up

if rule["state_transition"] == "down":
switch_method = self.switch_to_down

if "required_headers_patterns" in rule and rule["required_headers_patterns"]:
# pylint: disable=possibly-used-before-assignment
self.do_on_header_match(
self.req_url,
rule["required_headers_patterns"],
method=switch_method,
strict=False,
allow_redirects=False,
)

return self

def switch_from_body_rule(self, rule: dict) -> "ExternalRulesHandler":
"""
Switch from the body rule.
:param rule:
The rule to switch from.
:type rule: dict
"""

required_keys = ["validation_type", "required_body_patterns"]

if any(x not in rule for x in required_keys):
return self

if rule["validation_type"] != "body":
return self

if rule["state_transition"] == "up":
switch_method = self.switch_to_up

if rule["state_transition"] == "down":
switch_method = self.switch_to_down

if "required_body_patterns" in rule and rule["required_body_patterns"]:
# pylint: disable=possibly-used-before-assignment
self.do_on_body_match(
self.req_url,
rule["required_body_patterns"],
method=switch_method,
strict=False,
allow_redirects=False,
)

return self

def switch_from_all_rule(self, rule: dict) -> "ExternalRulesHandler":
"""
Switch from the all rule.
:param rule:
The rule to switch from.
:type rule: dict
"""

required_keys = [
"validation_type",
]

if any(x not in rule for x in required_keys):
return self

if rule["validation_type"] != "all":
return self

if rule["state_transition"] == "up":
switch_method = self.switch_to_up

if rule["state_transition"] == "down":
switch_method = self.switch_to_down

if (
"required_status_code" in rule
and rule["required_status_code"]
and any(
self.status.http_status_code == int(x)
for x in rule["required_status_code"]
)
):
# pylint: disable=possibly-used-before-assignment
switch_method()

if "required_headers_patterns" in rule and rule["required_headers_patterns"]:
self.do_on_header_match(
self.req_url,
rule["required_headers_patterns"],
method=switch_method,
strict=False,
allow_redirects=False,
)

if "required_body_patterns" in rule and rule["required_body_patterns"]:
self.do_on_body_match(
self.req_url,
rule["required_body_patterns"],
method=switch_method,
strict=False,
allow_redirects=False,
)

return self

@ExtraRuleHandlerBase.ensure_status_is_given
@ExtraRuleHandlerBase.setup_status_before
@ExtraRuleHandlerBase.setup_status_after
def start(self) -> "ExternalRulesHandler":
"""
Process the check and handling of the external rules for the given subject.
"""

required_keys = ["subject_pattern", "validation_type", "state_transition"]

for rule in self.rulesets:
if any(x not in rule for x in required_keys):
continue

if not self.regex_helper.set_regex(rule["subject_pattern"]).match(
self.status.netloc, return_match=False
):
continue

if rule["state_transition"] not in ["up", "down"]:
continue

if self.status.status_after_extra_rules:
# We already switched the status.
break

if rule["validation_type"] == "status_code":
self.switch_from_status_code_rule(rule)
elif rule["validation_type"] == "headers":
self.switch_from_headers_rule(rule)
elif rule["validation_type"] == "body":
self.switch_from_body_rule(rule)
elif rule["validation_type"] == "headers+body":
self.switch_from_headers_rule(rule)
self.switch_from_body_rule(rule)
elif rule["validation_type"] == "all":
self.switch_from_all_rule(rule)

return self
4 changes: 4 additions & 0 deletions PyFunceble/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,9 @@ def start(self) -> "ConfigLoader":
if "proxy" in config and config["proxy"]:
PyFunceble.storage.PROXY = Box(config["proxy"])

if "special_rules" in config and config["special_rules"]:
PyFunceble.storage.SPECIAL_RULES = config["special_rules"]

# Early load user agents to allow usage of defined user agents.
UserAgentDataset().get_latest()

Expand All @@ -676,6 +679,7 @@ def destroy(self, keep_custom: bool = False) -> "ConfigLoader":
PyFunceble.storage.PLATFORM = Box({})
PyFunceble.storage.LINKS = Box({})
PyFunceble.storage.PROXY = Box({})
PyFunceble.storage.SPECIAL_RULES = Box({})
except (AttributeError, TypeError): # pragma: no cover ## Safety.
pass

Expand Down
Loading

0 comments on commit 06426c7

Please sign in to comment.