From 9af640a64846ba7f6680cbe6d69fca6d786e2bc1 Mon Sep 17 00:00:00 2001 From: SimonGurney Date: Tue, 10 Sep 2024 17:06:07 +0100 Subject: [PATCH] feat:async-runtime * feat: initial working labda * chore: black * fix: just return array * chore: black * feat: Attempt to async the code * feat: Make async actually work * feat: working * chore: format * lambda-req * feat: speed improvements * chore: black * feat: abstract resolver * chore: black * testing: custom resolver * chore:black * feat: mx * chore: black * feat: fix tests * chore: black * fix: fixed mocks * chore: black * feat: bypass useragent filtering * feat: add retry logic and fix sigs * feat: optimise nx_domain * feat: bump lambda * chore: black * feat: store results in dynamo * feat: Tidy up codebase and async whois (#184) * feat: Tidy up codebase and async whois * feat: Run black * feat: Address concerns over mocks. * Update test action to install main requirements * Cap parallel tests * Make python verison tests more aggressive --------- Co-authored-by: Brandon Hall * Some provider data/error improvements * Explain why SSL validation is disabled * Formatting * Clean-up some code smells * Clean-up lambda * Fix lambda dockerfile path * Fix cdockerfile lambda stage * Reorder dockerfile? --------- Co-authored-by: Alex Brozych Co-authored-by: alexbrozych <120374931+alexbrozych@users.noreply.github.com> Co-authored-by: Brandon Hall --- .dockerignore | 166 +++++++++- .github/actions/pytest/action.yml | 48 +-- .github/workflows/build_preview.yml | 2 +- .gitignore | 277 +++++++++-------- .terraform.lock.hcl | 63 ++++ Dockerfile | 17 +- argparsing.py | 4 +- dev/docker/bind/db.punksecurity.io | 5 +- domain.py | 128 ++++---- finding.py | 3 - lambda-requirements.txt | 2 + lambda.py | 171 +++++++++++ lambda.tf | 136 ++++++++ main.py | 93 +++--- providers/bind.py | 4 +- providers/cloudflare.py | 2 +- providers/file.py | 6 +- providers/godaddy.py | 216 ++++++------- providers/projectdiscovery.py | 3 +- providers/securitytrails.py | 2 +- providers/single.py | 2 +- requirements.txt | 3 +- resolver.py | 290 ++++++++++++++++++ scan.py | 17 +- .../_generic_cname_found_but_404_http.py | 4 +- .../_generic_cname_found_but_404_https.py | 4 +- .../_generic_cname_found_but_unregistered.py | 5 +- .../_generic_cname_found_doesnt_resolve.py | 5 +- signatures/_generic_zone_missing_on_ns.py | 4 +- signatures/checks/CNAME.py | 8 +- signatures/checks/NS.py | 16 +- signatures/checks/WEB.py | 28 +- signatures/launchrock_cname.py | 6 +- signatures/shopify.py | 2 +- signatures/simplebooklet.py | 16 +- signatures/surveysparrow.py | 14 +- signatures/templates/base.py | 2 +- .../templates/cname_found_but_NX_DOMAIN.py | 4 +- .../templates/cname_found_but_status_code.py | 6 +- .../cname_found_but_string_in_body.py | 6 +- .../cname_or_ip_found_but_string_in_body.py | 6 +- .../templates/ip_found_but_string_in_body.py | 6 +- signatures/templates/ns_found_but_no_SOA.py | 4 +- signatures/wix.py | 11 + test-requirements.txt | 12 +- tests/__init__.py | 0 tests/mocks.py | 42 ++- tests/signatures_tests/__init__.py | 0 tests/signatures_tests/checks/__init__.py | 0 tests/signatures_tests/checks/test_A.py | 22 +- tests/signatures_tests/checks/test_AAAA.py | 22 +- tests/signatures_tests/checks/test_CNAME.py | 129 ++++---- .../signatures_tests/checks/test_COMBINED.py | 43 ++- tests/signatures_tests/checks/test_NS.py | 51 +-- tests/signatures_tests/checks/test_WEB.py | 79 +++-- tests/signatures_tests/templates/__init__.py | 0 .../test_cname_found_but_NX_DOMAIN.py | 27 +- .../test_cname_found_but_status_code.py | 27 +- .../test_cname_found_but_string_in_body.py | 27 +- ...st_cname_or_ip_found_but_string_in_body.py | 39 ++- .../test_ip_found_but_string_in_body.py | 30 +- .../templates/test_ns_found_but_no_SOA.py | 11 +- .../test_generic_cname_found_but_404_http.py | 24 +- .../test_generic_cname_found_but_404_https.py | 24 +- ...st_generic_cname_found_but_unregistered.py | 41 ++- ...test_generic_cname_found_doesnt_resolve.py | 37 ++- .../test_generic_zone_missing_on_ns.py | 13 +- 67 files changed, 1781 insertions(+), 736 deletions(-) create mode 100644 .terraform.lock.hcl create mode 100644 lambda-requirements.txt create mode 100644 lambda.py create mode 100644 lambda.tf create mode 100644 resolver.py create mode 100644 signatures/wix.py create mode 100644 tests/__init__.py create mode 100644 tests/signatures_tests/__init__.py create mode 100644 tests/signatures_tests/checks/__init__.py create mode 100644 tests/signatures_tests/templates/__init__.py diff --git a/.dockerignore b/.dockerignore index 81fc670..4ba6ee9 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,6 +1,160 @@ -.github -dev -docs -venv -tests -__pycache__/ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Exclude Output +results.csv + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Pycharm +.idea + +/build + +.terraform + +.terraform.lock.hcl + +terraform.tfstate +.terraform.tfstate.lock.info +*.zip + +pdkey + +# exclude state +terraform.tfstate +terraform.tfstate.backup + +#exclude other stuff +.github +.vscode +__pycache__ +.git +dev +docs +#tests diff --git a/.github/actions/pytest/action.yml b/.github/actions/pytest/action.yml index fb905da..c1d96c5 100644 --- a/.github/actions/pytest/action.yml +++ b/.github/actions/pytest/action.yml @@ -1,24 +1,24 @@ -name: 'Python runtime test' -description: 'Test pwnSpoof against a given Python version' -inputs: - python-version: - description: 'Python version' - required: true -runs: - using: composite - steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ inputs.python-version }} - uses: actions/setup-python@v1 - with: - python-version: ${{ inputs.python-version }} - - name: Python version - id: python-version - run: python --version - shell: bash - - name: install requirements - run: python -m pip install -r test-requirements.txt - shell: bash - - name: output tests - run: python -m pytest -v - shell: bash +name: 'Python runtime test' +description: 'Test pwnSpoof against a given Python version' +inputs: + python-version: + description: 'Python version' + required: true +runs: + using: composite + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ inputs.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ inputs.python-version }} + - name: Python version + id: python-version + run: python --version + shell: bash + - name: install requirements + run: python -m pip install -r requirements.txt -r test-requirements.txt + shell: bash + - name: output tests + run: python -m pytest -v + shell: bash diff --git a/.github/workflows/build_preview.yml b/.github/workflows/build_preview.yml index 0cc9fb1..c754da4 100644 --- a/.github/workflows/build_preview.yml +++ b/.github/workflows/build_preview.yml @@ -11,6 +11,7 @@ on: jobs: pytest: strategy: + max-parallel: 1 matrix: python-version: ["3.9", "3.10", "3.11", "3.12"] runs-on: ubuntu-latest @@ -34,4 +35,3 @@ jobs: VERSION: ${{ github.run_id }} with: targets: "preview" - diff --git a/.gitignore b/.gitignore index 875eee6..af784bc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,135 +1,142 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# Exclude Output -results.csv - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -.python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# Pycharm -.idea +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Exclude Output +results.csv + +# exclude pdkey +pdkey + +# exclude state +terraform.tfstate +terraform.tfstate.backup + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Pycharm +.idea diff --git a/.terraform.lock.hcl b/.terraform.lock.hcl new file mode 100644 index 0000000..51eddfc --- /dev/null +++ b/.terraform.lock.hcl @@ -0,0 +1,63 @@ +# This file is maintained automatically by "terraform init". +# Manual edits may be lost in future updates. + +provider "registry.terraform.io/hashicorp/aws" { + version = "5.40.0" + constraints = ">= 5.16.0, < 6.0.0" + hashes = [ + "h1:M28qqKLKQrRTC99lWmytVdDyAk0FgTIgH3TI9yPYhjY=", + "zh:11f177a2385703740bd26d0652d3dba08575101d7639f386ce5637bdb0e29a13", + "zh:203fc43e69634f1bd487a9dc24b01944dfd568beac78e491f26677d103d343ed", + "zh:3697ebad4929da30ea98276a85d4ce5ebfc48508f4dd149e17e1dcdc7f306c6e", + "zh:421e0799756587e728f75a9024b8d4e38707cd6d65cf0710cb8d189062c85a58", + "zh:4be2adcd4c32a66159c532908f0d425d793c814b3686832e9af549b1515ae032", + "zh:55778b32470212ce6bbfd402529c88e7ea6ba34b0882f85d6ea001ff5c6255a5", + "zh:689a4c1fd1e1d5dab7b169759389c76f25e366f19a470971674321d6fca09791", + "zh:68a23eda608573a053e8738894457bd0c11766bc243e68826c78ab6b5a144710", + "zh:9b12af85486a96aedd8d7984b0ff811a4b42e3d88dad1a3fb4c0b580d04fa425", + "zh:a1580115c22564e5752e569dc40482503de6cced44da3e9431885cd9d4bf18ea", + "zh:b127756d7ee513691e76c211570580c10eaa2f7a7e4fd27c3566a48ec214991c", + "zh:b7ccea7a759940c8dcf8726272eed6653eed0b31f7223f71e829a344627afd39", + "zh:bb130fc50494fd45406e04b44d242da9a8f138a4a43feb65cf9e86d13aa13629", + "zh:cf1c972c90d5f22c9705274a33792275e284a0a3fcac12ce4083b5a4480463f4", + "zh:ebe60d3887b23703ca6a4c65b15c6d7b8d93ba27a028d996d17882fe6e98d5c0", + ] +} + +provider "registry.terraform.io/hashicorp/null" { + version = "3.2.2" + hashes = [ + "h1:JViWrgF7Ks2GqB6UfcLDUbusXeSfhfhFymo4c0N5e+I=", + "zh:3248aae6a2198f3ec8394218d05bd5e42be59f43a3a7c0b71c66ec0df08b69e7", + "zh:32b1aaa1c3013d33c245493f4a65465eab9436b454d250102729321a44c8ab9a", + "zh:38eff7e470acb48f66380a73a5c7cdd76cc9b9c9ba9a7249c7991488abe22fe3", + "zh:4c2f1faee67af104f5f9e711c4574ff4d298afaa8a420680b0cb55d7bbc65606", + "zh:544b33b757c0b954dbb87db83a5ad921edd61f02f1dc86c6186a5ea86465b546", + "zh:696cf785090e1e8cf1587499516b0494f47413b43cb99877ad97f5d0de3dc539", + "zh:6e301f34757b5d265ae44467d95306d61bef5e41930be1365f5a8dcf80f59452", + "zh:78d5eefdd9e494defcb3c68d282b8f96630502cac21d1ea161f53cfe9bb483b3", + "zh:913a929070c819e59e94bb37a2a253c228f83921136ff4a7aa1a178c7cce5422", + "zh:aa9015926cd152425dbf86d1abdbc74bfe0e1ba3d26b3db35051d7b9ca9f72ae", + "zh:bb04798b016e1e1d49bcc76d62c53b56c88c63d6f2dfe38821afef17c416a0e1", + "zh:c23084e1b23577de22603cff752e59128d83cfecc2e6819edadd8cf7a10af11e", + ] +} + +provider "registry.terraform.io/hashicorp/time" { + version = "0.10.0" + hashes = [ + "h1:XiRMsGFEe6VTWGL0O32l8viW2fI8wXyJFRJYfdQR8os=", + "zh:0ab31efe760cc86c9eef9e8eb070ae9e15c52c617243bbd9041632d44ea70781", + "zh:0ee4e906e28f23c598632eeac297ab098d6d6a90629d15516814ab90ad42aec8", + "zh:3bbb3e9da728b82428c6f18533b5b7c014e8ff1b8d9b2587107c966b985e5bcc", + "zh:6771c72db4e4486f2c2603c81dfddd9e28b6554d1ded2996b4cb37f887b467de", + "zh:78d5eefdd9e494defcb3c68d282b8f96630502cac21d1ea161f53cfe9bb483b3", + "zh:833c636d86c2c8f23296a7da5d492bdfd7260e22899fc8af8cc3937eb41a7391", + "zh:c545f1497ae0978ffc979645e594b57ff06c30b4144486f4f362d686366e2e42", + "zh:def83c6a85db611b8f1d996d32869f59397c23b8b78e39a978c8a2296b0588b2", + "zh:df9579b72cc8e5fac6efee20c7d0a8b72d3d859b50828b1c473d620ab939e2c7", + "zh:e281a8ecbb33c185e2d0976dc526c93b7359e3ffdc8130df7422863f4952c00e", + "zh:ecb1af3ae67ac7933b5630606672c94ec1f54b119bf77d3091f16d55ab634461", + "zh:f8109f13e07a741e1e8a52134f84583f97a819e33600be44623a21f6424d6593", + ] +} diff --git a/Dockerfile b/Dockerfile index 0a10026..22bdd3a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,13 +3,25 @@ RUN apk add gcc libffi musl-dev libffi-dev # Create app directory RUN python -m venv /opt/venv -ENV PATH="/opt/venv/bin:$PATH" +ENV PATH "/opt/venv/bin:$PATH" COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt +# Prepares a zipped version of the application suitable for running on lambda +FROM amazon/aws-lambda-python:3.11 AS lambda + +COPY requirements.txt /app/ +COPY lambda-requirements.txt /app/ +WORKDIR /app +RUN pip install -r requirements.txt -r lambda-requirements.txt --target . +COPY . /app + +RUN yum install zip -y && zip -r9 /packaged_app.zip . + +# Main application target FROM python:3.11-alpine COPY --from=builder /opt/venv /opt/venv -ENV PATH="/opt/venv/bin:$PATH" +ENV PATH "/opt/venv/bin:$PATH" RUN mkdir -p /app/results WORKDIR /app @@ -19,3 +31,4 @@ COPY . . # Exports ENV SM_COMMAND "docker run punksecurity/dnsreaper --" ENTRYPOINT [ "python3", "/app/main.py" ] + diff --git a/argparsing.py b/argparsing.py index e08b739..971734e 100644 --- a/argparsing.py +++ b/argparsing.py @@ -117,8 +117,8 @@ def error(self, message): parser.add_argument( "--resolver", type=str, - default="", - help="Provide a custom DNS resolver (or multiple seperated by commas)", + default="8.8.8.8,8.8.4.4,1.1.1.1,1.0.0.1,208.67.222.2,208.67.220.2", + help="Provide a custom DNS resolver (or multiple seperated by commas), or '' to use system resolver. We loadbalance a public resolver list by default", ) diff --git a/dev/docker/bind/db.punksecurity.io b/dev/docker/bind/db.punksecurity.io index d4c6d24..f2f95eb 100644 --- a/dev/docker/bind/db.punksecurity.io +++ b/dev/docker/bind/db.punksecurity.io @@ -1,5 +1,6 @@ -punksecurity.io. 3600 IN SOA punksecurity.io. root.punksecurity.io. 2041230773 7200 3600 86400 3600 +$ORIGIN punksecurity.io. +@ 3600 IN SOA punksecurity.io. root.punksecurity.io. 2041230773 7200 3600 86400 3600 IN NS ns1.localhost.net. ;; NS Records -vulnerable.punksecurity.io. 1 IN NS ns-40.awsdns-05.com. +vulnerable 1 IN NS ns-40.awsdns-05.com. diff --git a/domain.py b/domain.py index f653309..9cf72c3 100644 --- a/domain.py +++ b/domain.py @@ -1,56 +1,53 @@ import ipaddress -import socket from collections import namedtuple -import dns.resolver -from functools import lru_cache +from typing import Optional -import requests -import logging - -import urllib3 +import dns.asyncresolver +import logging import collections collections.Iterable = collections.abc.Iterable collections.Mapping = collections.abc.Mapping -import whois + +import asyncwhois +import aiohttp +import ssl + +from resolver import Resolver class Domain: + resolver = Resolver() + @property - @lru_cache - def SOA(self): - return self.query("SOA") + async def SOA(self): + return await self.query("SOA") @property - @lru_cache - def NX_DOMAIN(self): - record_types = ["A", "AAAA", "CNAME", "TXT", "MX", "NS"] - for record_type in record_types: - if self.query(record_type): - return False - return True - - def query(self, type): + async def NX_DOMAIN(self): + return (await self.resolver.resolve(self.domain, "A"))["NX_DOMAIN"] + + async def query(self, type): try: - resp = self.resolver.resolve(self.domain, type) - return [record.to_text().rstrip(".") for record in resp] + resp = await self.resolver.resolve(self.domain, type) + return resp[type] except: return [] - def fetch_std_records(self): + async def fetch_std_records(self): # TODO: is this recursive? - self.CNAME = self.query("CNAME") - self.A = self.query("A") - self.AAAA = self.query("AAAA") - if self.CNAME: + self.CNAME = await self.query("CNAME") + self.A = await self.query("A") + self.AAAA = await self.query("AAAA") + if self.CNAME or self.A or self.AAAA: # return early if we get a CNAME otherwise we get records for the cname aswell # this is actually desirable for A/AAAA but not NS as the next zone # will be queried based on the CNAME value, not the original domain return - self.NS = self.query("NS") + self.NS = await self.query("NS") - def fetch_external_records(self): + async def fetch_external_records(self): for cname in self.CNAME: split_cname = cname.split(".", 1) if len(split_cname) == 1: @@ -58,14 +55,14 @@ def fetch_external_records(self): if self.base_domain == split_cname[1]: continue # Same zone, dont fetch d = Domain(cname) - d.fetch_std_records() + await d.fetch_std_records() self.A += d.A self.AAAA += d.AAAA self.CNAME += d.CNAME for ns in self.NS: try: d = Domain(self.domain) - d.set_custom_NS(ns=ns) + await d.set_resolver_nameserver(ns) self.A += d.A self.AAAA += d.AAAA except: @@ -73,18 +70,37 @@ def fetch_external_records(self): f"We could not resolve the provided NS record '{ns}' to an ip" ) - def set_custom_NS(self, ns: str): + async def set_resolver_nameserver(self, ns: Optional[str] = None): + if ns is None: + self.resolver = dns.asyncresolver + self.resolver.timeout = 1 + + return + if type(ns) != str: logging.error(f"Cannot set custom NS as {ns} not a string") - self.resolver = dns.resolver.Resolver() + + raise RuntimeError(f"Invalid NS type - expected str got {type(ns)}") + + self.resolver = dns.asyncresolver.Resolver() + self.resolver.timeout = 1 try: ipaddress.ip_address(ns) self.resolver.nameservers = [ns] + return except ValueError: + # if ns isn't a valid IP address, attempt to resolve it try: - self.resolver.nameservers = [socket.gethostbyname(ns.rstrip("."))] + nameservers = list( + map( + lambda rr: rr.address, + (await self.resolver.resolve(ns.rstrip("."))).rrset, + ) + ) + self.resolver.nameservers = nameservers except: + # TODO: document why this gets set to an empty list self.resolver.nameservers = [] def set_base_domain(self): @@ -94,47 +110,47 @@ def set_base_domain(self): else: self.base_domain = "." - def __init__(self, domain, fetch_standard_records=True, ns=None): + def __init__(self, domain, fetch_standard_records=True): self.domain = domain.rstrip(".") self.NS = [] self.A = [] self.AAAA = [] self.CNAME = [] self.set_base_domain() - self.requests = requests - if ns == None: - self.resolver = dns.resolver - else: - self.set_custom_NS(ns) - self.resolver.timeout = 1 self.should_fetch_std_records = fetch_standard_records + self.base_domain = None - @lru_cache - def fetch_web(self, uri="", https=True, params={}): - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + def get_session(self): + return aiohttp.ClientSession() + + async def fetch_web(self, uri="", https=True): protocol = "https" if https else "http" url = f"{protocol}://{self.domain}/{uri}" + + # We must disable SSL validation because vulnerable domains probably won't have a valid cert on the other end + # e.g. github + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" + } try: - resp = self.requests.get(url, timeout=5, verify=False, params=params) - web_status = resp.status_code - web_body = resp.content.decode() + async with self.get_session() as session: + resp = await session.get(url, ssl=ssl_context, headers=headers) + web_status = resp.status + web_body = await resp.text() except: web_status = 0 web_body = "" return namedtuple("web_response", ["status_code", "body"])(web_status, web_body) @property - @lru_cache - def is_registered(self): + async def is_registered(self): try: - whois.whois(self.domain) + await asyncwhois.aio_whois(self.domain) return True - except whois.parser.PywhoisError as e: - if e.args[0] == "No whois server is known for this kind of object.": - # This is the only case of a potentially registered domain - # triggering a PywhoisError - # https://github.com/richardpenman/whois/blob/56dc7e41d134e6d4343ad80a48533681bd887ff2/whois/parser.py#L201 - return True + except asyncwhois.NotFoundError: return False except Exception: return True diff --git a/finding.py b/finding.py index 954eed1..79dc51d 100644 --- a/finding.py +++ b/finding.py @@ -1,6 +1,3 @@ -from os import linesep - - class Finding(object): def __init__(self, domain, signature, info, confidence, more_info_url): self.domain = domain.domain diff --git a/lambda-requirements.txt b/lambda-requirements.txt new file mode 100644 index 0000000..465f897 --- /dev/null +++ b/lambda-requirements.txt @@ -0,0 +1,2 @@ +fastapi==0.87.0 +mangum==0.17 diff --git a/lambda.py b/lambda.py new file mode 100644 index 0000000..35f3bb9 --- /dev/null +++ b/lambda.py @@ -0,0 +1,171 @@ +import asyncio +import datetime +import logging +import os +import random +import re +import sys +import time +import urllib.parse +import uuid +from functools import partial +from sys import stdout + +import boto3 + +import detection_enums +import output +import signatures +from domain import Domain +from providers import projectdiscovery +from resolver import Resolver +from scan import scan_domain + +sys.path.append(os.getcwd()) + +logger = logging.getLogger() +logger.setLevel("INFO") + +from fastapi import FastAPI +from mangum import Mangum + +app = FastAPI() + +###### signatures + +signatures = [getattr(signatures, signature) for signature in signatures.__all__] + +# replace name for each signature +for signature in signatures: + signature.__name__ = signature.__name__.replace("signatures.", "") + +signatures = [ + s for s in signatures if s.test.CONFIDENCE != detection_enums.CONFIDENCE.UNLIKELY +] + + +@app.get("/") +async def root(): + return {"message": "Hello Punk!"} + + +@app.get("/check") +async def check(domain: str): + try: + logging.warning(f"Received: {domain}") + domain = domain.replace(" ", "") + domains = domain.split(",") + return (await process_domains(domains))["findings"] + except Exception as e: + logging.error(f"Caught exception when checking: {e}") + + return {"error": True} + + +@app.get("/scan") +async def scan(domain: str): + try: + dynamodb = boto3.resource("dynamodb") + table = dynamodb.Table(DYNAMO_TABLE) + logging.warning(f"Received: {domain}") + domain = domain.replace(" ", "") + domains = domain.split(",") + results = await process_domains(domains) + guid = str(uuid.uuid4()) + table.put_item(Item={"guid": guid, "results": results}) + return {"results": guid} + except Exception as e: + logging.error(e) + return {"error": True} + + +@app.get("/result") +async def result(id: str): + try: + dynamodb = boto3.resource("dynamodb") + table = dynamodb.Table(DYNAMO_TABLE) + logging.warning(f"Received id: {id}") + return table.get_item(Key={"guid": id})["Item"]["results"] + except Exception as e: + logging.error(e) + return {"error": True} + + +###### scanning + +PD_API_KEY = os.environ.get("PD_API_KEY", None) +DYNAMO_TABLE = os.environ.get("DYNAMO_TABLE", None) +SCAN_DOMAIN_LIMIT = os.environ.get("SCAN_DOMAIN_LIMIT", 2000) + + +async def process_domains(domains: list[str]): + Domain.resolver = Resolver( + nameservers=[ + "8.8.8.8", + "8.8.4.4", + "1.1.1.1", + "1.0.0.1", + "208.67.222.2", + "208.67.220.2", + ], + parallelism=4000, + ) + findings = [] + domain_objs = [Domain(domain) for domain in domains] + if len(domains) == 1: + # Project Discovery! + + primary_domain = domains[0].strip() + + if "://" not in primary_domain: + # If no scheme is present, add a prefix so urlsplit treats it as absolute + primary_domain = f"//{primary_domain}" + + # Use urlsplit to remove any URL gubbins + primary_domain = urllib.parse.urlsplit(primary_domain).hostname + + # urlsplit doesn't do validation. Double check that the hostname doesn't have any undesired chars + invalid_char_match = re.search("[^a-zA-Z0-9.-]", primary_domain) + + if invalid_char_match is not None: + return {"error": True, "message": "Invalid domain"} + + # Remove any remaining nonsense characters + pd_domains = projectdiscovery.fetch_domains(PD_API_KEY, primary_domain) + + logging.warning(f"Got {len(pd_domains)} domains from PD") + domain_objs += pd_domains + + random.shuffle(domain_objs) + domain_objs = domain_objs[:SCAN_DOMAIN_LIMIT] + logging.warning(domain_objs) + start_time = time.time() + with output.Output("json", stdout) as o: + scan = partial( + scan_domain, + signatures=signatures, + output_handler=o, + findings=findings, + ) + await asyncio.wait( + [asyncio.create_task(scan(domain)) for domain in domain_objs], + timeout=20, + return_when=asyncio.ALL_COMPLETED, + ) + return { + "domains": domains, + "findings": [f.__dict__ for f in findings], + "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "count_scanned_domains": len(domain_objs), + "count_signatures": len(signatures), + "execution_time": str(round(time.time() - start_time, 2)), + } + + +def handler(event, context): + asgi_handler = Mangum(app) + response = asgi_handler( + event, context + ) # Call the instance with the event arguments + + return response diff --git a/lambda.tf b/lambda.tf new file mode 100644 index 0000000..354cd13 --- /dev/null +++ b/lambda.tf @@ -0,0 +1,136 @@ + terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = ">=5.16, <6.0" + } + } + } + + +### Gets caller's ID +data "aws_caller_identity" "current" { + # Retrieves information about the AWS account corresponding to the + # access key being used to run Terraform, which we need to populate + # the "source_account" on the permission resource. +} + +data "aws_region" "region" {} + +resource "aws_dynamodb_table" "results" { + name = "dnsreaper-results" + billing_mode = "PAY_PER_REQUEST" + + attribute { + name = "guid" + type = "S" + } + hash_key = "guid" +} + +resource "time_static" "src" { + triggers = { + src : sha1(join("", [for f in fileset("${path.root}/","*.py"): filesha1(f)])) + } +} + +resource "null_resource" "build_venv" { + triggers = { + id = time_static.src.unix + } + + provisioner "local-exec" { + command = "docker build -t ${time_static.src.unix} ${path.root} -f ${path.root}/Dockerfile --target lambda && docker create --name ${time_static.src.unix} ${time_static.src.unix} && docker cp ${time_static.src.unix}:/packaged_app.zip ${path.root}/${time_static.src.unix}.zip && docker rm -v ${time_static.src.unix}" + } +} + +### Logs +resource "aws_cloudwatch_log_group" "logs" { + name = "/aws/lambda/dnsReaper-public-lambda" + retention_in_days = 14 +} + +resource "aws_lambda_function" "serverless-dnsreaper" { + depends_on = [ "null_resource.build_venv" ] + description = time_static.src.unix + filename = "${path.root}/${time_static.src.unix}.zip" + function_name = "dnsReaper-public-lambda" + role = aws_iam_role.iam_for_lambda.arn + handler = "lambda.handler" + + runtime = "python3.11" + + timeout = 30 + memory_size = 256 + + environment { + variables = { + PD_API_KEY = file("./pdkey"), + DYNAMO_TABLE = aws_dynamodb_table.results.name + } + } +} + +data "aws_iam_policy_document" "assume_role" { + statement { + effect = "Allow" + + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + + actions = ["sts:AssumeRole"] + } +} + +data "aws_iam_policy_document" "execution" { + statement { + effect = "Allow" + actions = [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface" + ] + resources = ["*"] + } + statement { + actions = [ + "logs:CreateLogGroup", + ] + resources = ["arn:aws:logs:${data.aws_region.region.name}:${data.aws_caller_identity.current.account_id}:*"] + } + statement { + actions = [ + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["${aws_cloudwatch_log_group.logs.arn}:*"] + } + + statement { + actions = [ + "dynamodb:PutItem", + "dynamodb:GetItem", + "dynamodb:UpdateItem", + "dynamodb:DeleteItem", + "dynamodb:BatchGetItem", + "dynamodb:BatchWriteItem", + "dynamodb:Scan", + "dynamodb:Query" + ] + resources = [aws_dynamodb_table.results.arn] + } + +} + +resource "aws_iam_role" "iam_for_lambda" { + name = "iam_for_lambda" + assume_role_policy = data.aws_iam_policy_document.assume_role.json + inline_policy { + name = "execution_policy" + policy = data.aws_iam_policy_document.execution.json + } +} + + diff --git a/main.py b/main.py index 5b55e24..5d26cde 100644 --- a/main.py +++ b/main.py @@ -4,9 +4,9 @@ import detection_enums import providers from os import linesep +from domain import Domain +from resolver import Resolver -from multiprocessing.pool import ThreadPool -import threading from functools import partial import logging @@ -18,6 +18,10 @@ import time +import asyncio + +import dns.resolver + start_time = time.time() if "--nocolour" in argv: @@ -44,14 +48,6 @@ for module in ["boto", "requests"]: logger = logging.getLogger(module) logger.setLevel(logging.CRITICAL) -###### domain ingestion - -provider = getattr(providers, args.provider) -domains = list(provider.fetch_domains(**args.__dict__)) - -if len(domains) == 0: - logging.error("ERROR: No domains to scan") - exit(-1) ###### signatures @@ -87,43 +83,52 @@ ###### scanning findings = [] -lock = threading.Lock() if "--out" not in argv: # using default out location, need to append our format args.out = f"{args.out}.{args.out_format}" -with output.Output(args.out_format, args.out) as o: - scan = partial( - scan_domain, - signatures=signatures, - output_handler=o, - lock=lock, - findings=findings, - name_servers=args.resolver.replace(" ", "").split(","), + +async def main(): + ###### domain ingestion + nameservers = ( + dns.resolver.Resolver().nameservers + if args.resolver == "" + else args.resolver.replace(" ", "").split(",") + ) + Domain.resolver = Resolver(nameservers=nameservers, parallelism=args.parallelism) + provider = getattr(providers, args.provider) + domains = list(provider.fetch_domains(**args.__dict__)) + + if len(domains) == 0: + logging.error("ERROR: No domains to scan") + exit(-1) + + with output.Output(args.out_format, args.out) as o: + scan = partial( + scan_domain, + signatures=signatures, + output_handler=o, + findings=findings, + ) + + await asyncio.gather(*[asyncio.create_task(scan(domain)) for domain in domains]) + + ###### exit + logging.warning(f"\n\nWe found {len(findings)} takeovers ☠️") + for finding in findings: + msg = f"-- DOMAIN '{finding.domain}' :: SIGNATURE '{finding.signature}' :: CONFIDENCE '{finding.confidence}'" + msg += f"{linesep}{finding.populated_records()}" + if args.nocolour == False: + msg = colorama.Fore.RED + msg + colorama.Fore.RESET + logging.warning(msg) + logging.warning( + f"\n⏱️ We completed in {round(time.time() - start_time, 2)} seconds" ) - pool = ThreadPool(processes=args.parallelism) - res = pool.map_async(scan, domains) - try: - while not res.ready(): - time.sleep(2) - except KeyboardInterrupt: - logging.warning("Caught KeyboardInterrupt, terminating early...") - lock.acquire() - else: - pool.close() - pool.join() - -###### exit -logging.warning(f"\n\nWe found {len(findings)} takeovers ☠️") -for finding in findings: - msg = f"-- DOMAIN '{finding.domain}' :: SIGNATURE '{finding.signature}' :: CONFIDENCE '{finding.confidence}'" - msg += f"{linesep}{finding.populated_records()}" - if args.nocolour == False: - msg = colorama.Fore.RED + msg + colorama.Fore.RESET - logging.warning(msg) -logging.warning(f"\n⏱️ We completed in {round(time.time() - start_time, 2)} seconds") -logging.warning(f"...Thats all folks!") -if args.pipeline: - logging.debug(f"Pipeline flag set - Exit code: {len(findings)}") - exit(len(findings)) + logging.warning(f"...Thats all folks!") + if args.pipeline: + logging.debug(f"Pipeline flag set - Exit code: {len(findings)}") + exit(len(findings)) + + +asyncio.run(main()) diff --git a/providers/bind.py b/providers/bind.py index 6db2ae9..ee87d87 100644 --- a/providers/bind.py +++ b/providers/bind.py @@ -32,7 +32,7 @@ def bind_file_to_domains(bind_zone_file): def fetch_domains(bind_zone_file, **args): if isfile(bind_zone_file): domains = bind_file_to_domains(bind_zone_file) - logging.warn(f"Read {len(domains)} domains from zone file") + logging.warning(f"Read {len(domains)} domains from zone file") else: domains = [] files = [ @@ -43,5 +43,5 @@ def fetch_domains(bind_zone_file, **args): for file in files: logging.debug("Reading file '{file}'") domains = [*domains, *bind_file_to_domains(file)] - logging.warn(f"Read {len(domains)} domains from zone file dir") + logging.warning(f"Read {len(domains)} domains from zone file dir") return domains diff --git a/providers/cloudflare.py b/providers/cloudflare.py index 0a3a8e5..94ec486 100644 --- a/providers/cloudflare.py +++ b/providers/cloudflare.py @@ -91,5 +91,5 @@ def fetch_domains(cloudflare_token, **args): ) for record in convert_records_to_domains(records): domains.append(record) - logging.warn(f"Got {len(domains)} records from cloudflare") + logging.warning(f"Got {len(domains)} records from cloudflare") return domains diff --git a/providers/file.py b/providers/file.py index 83c3f61..e2f2b04 100644 --- a/providers/file.py +++ b/providers/file.py @@ -11,7 +11,9 @@ def fetch_domains(filename, **args): with open(filename) as file: try: domains = file.readlines() - logging.warn(f"Ingested {len(domains)} domains from file '{filename}'") + logging.warning( + f"Ingested {len(domains)} domains from file '{filename}'" + ) except Exception as e: logging.error(f"Could not read any domains from file {filename} -- {e}") exit(-1) @@ -25,7 +27,7 @@ def fetch_domains(filename, **args): logging.debug(f"Ingested domains from file '{file}'") except: logging.debug(f"Could not read file '{file}'") - logging.warn(f"Ingested {len(domains)} domains from folder '{filename}'") + logging.warning(f"Ingested {len(domains)} domains from folder '{filename}'") return [Domain(domain.rstrip()) for domain in domains] diff --git a/providers/godaddy.py b/providers/godaddy.py index 4407dec..c49083e 100644 --- a/providers/godaddy.py +++ b/providers/godaddy.py @@ -1,107 +1,109 @@ -import requests - -from domain import Domain - -description = "Scan multiple domains by fetching them from GoDaddy" - - -class DomainNotFoundError(Exception): - def __init__(self, domain): - self.message = "Domain not found: " + domain - super().__init__(self.message) - - -class GDApi: - def __init__(self, api_key, api_secret): - self.session = requests.session() - self.session.headers.update( - { - "Content-Type": "application/json", - "Authorization": "sso-key " + api_key + ":" + api_secret, - } - ) - - @staticmethod - def check_response(response: requests.Response): - if response.status_code == 401: - raise ValueError("Invalid API key specified.") - - if response.status_code < 200 or response.status_code >= 300: - raise ValueError("Invalid response received from API: " + response.json()) - - return response - - def make_request(self, endpoint): - return self.session.prepare_request( - requests.Request("GET", "https://api.godaddy.com/v1/" + endpoint) - ) - - def list_domains(self): - req = self.make_request("domains") - - return self.check_response(self.session.send(req)) - - def get_records(self, domain): - req = self.make_request(f"domains/{domain}/records") - res = self.session.send(req) - - if 404 == res.status_code: - raise DomainNotFoundError(domain) - - return self.check_response(res) - - -def convert_records_to_domains(records, root_domain): - buf = {} - for record in records: - if "@" == record["name"]: - continue - - record_name = f"{record['name']}.{root_domain}" - - if record_name not in buf.keys(): - buf[record_name] = {} - - if record["type"] not in buf[record_name].keys(): - buf[record_name][record["type"]] = [] - - if "data" in record.keys(): - buf[record_name][record["type"]].append(record["data"]) - - def extract_records(desired_type): - return [r.rstrip(".") for r in buf[subdomain][desired_type]] - - for subdomain in buf.keys(): - domain = Domain(subdomain.rstrip("."), fetch_standard_records=False) - - if "A" in buf[subdomain].keys(): - domain.A = extract_records("A") - if "AAAA" in buf[subdomain].keys(): - domain.AAAA = extract_records("AAAA") - if "CNAME" in buf[subdomain].keys(): - domain.CNAME = extract_records("CNAME") - if "NS" in buf[subdomain].keys(): - domain.NS = extract_records("NS") - - yield domain - - -def fetch_domains(gd_api_key: str, gd_api_secret: str, gd_domains: str = None, **args): - root_domains = [] - domains = [] - api = GDApi(gd_api_key, gd_api_secret) - - if gd_domains is not None and len(gd_domains): - root_domains = [domain.strip(" ") for domain in gd_domains.split(",")] - else: - resp_data = api.list_domains().json() - root_domains = [domain["domain"] for domain in resp_data] - - for domain in root_domains: - if "" == domain or domain is None: - continue - - records = api.get_records(domain).json() - domains.extend(convert_records_to_domains(records, domain)) - - return domains +import requests + +from domain import Domain + +description = "Scan multiple domains by fetching them from GoDaddy" + + +class DomainNotFoundError(Exception): + def __init__(self, domain): + self.message = "Domain not found: " + domain + super().__init__(self.message) + + +class GDApi: + def __init__(self, api_key, api_secret): + self.session = requests.session() + self.session.headers.update( + { + "Content-Type": "application/json", + "Authorization": "sso-key " + api_key + ":" + api_secret, + } + ) + + @staticmethod + def check_response(response: requests.Response): + if response.status_code == 401: + raise ValueError("Invalid API key specified.") + + if response.status_code < 200 or response.status_code >= 300: + raise ValueError( + "Invalid response received from API: " + str(response.json()) + ) + + return response + + def make_request(self, endpoint): + return self.session.prepare_request( + requests.Request("GET", "https://api.godaddy.com/v1/" + endpoint) + ) + + def list_domains(self): + req = self.make_request("domains") + + return self.check_response(self.session.send(req)) + + def get_records(self, domain): + req = self.make_request(f"domains/{domain}/records") + res = self.session.send(req) + + if 404 == res.status_code: + raise DomainNotFoundError(domain) + + return self.check_response(res) + + +def convert_records_to_domains(records, root_domain): + buf = {} + for record in records: + if "@" == record["name"]: + continue + + record_name = f"{record['name']}.{root_domain}" + + if record_name not in buf.keys(): + buf[record_name] = {} + + if record["type"] not in buf[record_name].keys(): + buf[record_name][record["type"]] = [] + + if "data" in record.keys(): + buf[record_name][record["type"]].append(record["data"]) + + def extract_records(desired_type): + return [r.rstrip(".") for r in buf[subdomain][desired_type]] + + for subdomain in buf.keys(): + domain = Domain(subdomain.rstrip("."), fetch_standard_records=False) + + if "A" in buf[subdomain].keys(): + domain.A = extract_records("A") + if "AAAA" in buf[subdomain].keys(): + domain.AAAA = extract_records("AAAA") + if "CNAME" in buf[subdomain].keys(): + domain.CNAME = extract_records("CNAME") + if "NS" in buf[subdomain].keys(): + domain.NS = extract_records("NS") + + yield domain + + +def fetch_domains(gd_api_key: str, gd_api_secret: str, gd_domains: str = None, **args): + root_domains = [] + domains = [] + api = GDApi(gd_api_key, gd_api_secret) + + if gd_domains is not None and len(gd_domains): + root_domains = [domain.strip(" ") for domain in gd_domains.split(",")] + else: + resp_data = api.list_domains().json() + root_domains = [domain["domain"] for domain in resp_data] + + for domain in root_domains: + if "" == domain or domain is None: + continue + + records = api.get_records(domain).json() + domains.extend(convert_records_to_domains(records, domain)) + + return domains diff --git a/providers/projectdiscovery.py b/providers/projectdiscovery.py index 2c3ae6a..3bb8e20 100644 --- a/providers/projectdiscovery.py +++ b/providers/projectdiscovery.py @@ -39,6 +39,7 @@ def list_domains(self): return self.check_response(self.session.send(req)) def get_subdomains(self, domain): + domain = domain.lower() req = self.make_request(f"{domain}/subdomains") res = self.session.send(req) @@ -60,7 +61,7 @@ def fetch_domains(pd_api_key: str, pd_domains: str, **args): continue raw_domains = api.get_subdomains(domain).json() - logging.warn(f"Testing {len(raw_domains['subdomains'])} subdomains") + logging.warning(f"Testing {len(raw_domains['subdomains'])} subdomains") domains.extend( [ Domain(f"{sb}.{domain}") diff --git a/providers/securitytrails.py b/providers/securitytrails.py index e4ec690..7d262e9 100644 --- a/providers/securitytrails.py +++ b/providers/securitytrails.py @@ -63,7 +63,7 @@ def fetch_domains(st_api_key: str, st_domains: str, **args): raw_domains = api.get_subdomains(domain).json() - logging.warn(f"Testing {raw_domains['subdomain_count']} subdomains") + logging.warning(f"Testing {raw_domains['subdomain_count']} subdomains") domains.extend([Domain(f"{sb}.{domain}") for sb in raw_domains["subdomains"]]) return domains diff --git a/providers/single.py b/providers/single.py index f229e0d..562a312 100644 --- a/providers/single.py +++ b/providers/single.py @@ -5,5 +5,5 @@ def fetch_domains(domain, **args): - logging.warn(f"Domain '{domain}' provided on commandline") + logging.warning(f"Domain '{domain}' provided on commandline") return [Domain(domain)] diff --git a/requirements.txt b/requirements.txt index b33819f..23785af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ dnspython==2.2.1 requests==2.31.0 -python-whois==0.8.0 +asyncwhois==1.1.4 boto3==1.24.40 cloudflare==2.9.11 colorama==0.4.5 @@ -8,3 +8,4 @@ azure-mgmt-dns==8.0.0 azure-identity==1.10.0 msrestazure==0.6.4 google-cloud-dns==0.34.1 +aiohttp diff --git a/resolver.py b/resolver.py new file mode 100644 index 0000000..128e4d0 --- /dev/null +++ b/resolver.py @@ -0,0 +1,290 @@ +import dns.asyncresolver +import asyncio +import random +import time +import os + +import dns.resolver + +import socket +import struct + +import logging + +# DNS query types +TYPE_A = 1 +TYPE_NS = 2 +TYPE_CNAME = 5 +TYPE_SOA = 6 +TYPE_AAAA = 28 +TYPE_MX = 15 + +QUERY_TYPES = { + "A": TYPE_A, + "NS": TYPE_NS, + "CNAME": TYPE_CNAME, + "SOA": TYPE_SOA, + "AAAA": TYPE_AAAA, + "MX": TYPE_MX, +} + + +class DNSException(Exception): + pass + + +class NXDomainException(DNSException): + pass + + +class NoAnswerException(DNSException): + pass + + +def build_dns_query(domain, qtype): + # Transaction ID + transaction_id = 0x1234 + + # Flags: standard query with recursion + flags = 0x0100 + + # Questions + questions = 1 + + # Answer RRs + answer_rrs = 0 + + # Authority RRs + authority_rrs = 0 + + # Additional RRs + additional_rrs = 0 + + # Header + header = struct.pack( + ">HHHHHH", + transaction_id, + flags, + questions, + answer_rrs, + authority_rrs, + additional_rrs, + ) + + # Question + question = b"" + for part in domain.split("."): + question += struct.pack("B", len(part)) + part.encode() + + question += struct.pack("B", 0) # End of the domain name + + # Type and Class + question += struct.pack(">HH", qtype, 1) # QTYPE = qtype, QCLASS = IN + + return header + question + + +def parse_dns_response(response): + def decode_name(offset): + labels = [] + while True: + length = response[offset] + if length & 0xC0 == 0xC0: + pointer = struct.unpack_from("!H", response, offset)[0] + pointer &= 0x3FFF + labels.append(decode_name(pointer)[0]) + offset += 2 + break + elif length == 0: + offset += 1 + break + else: + offset += 1 + labels.append(response[offset : offset + length].decode()) + offset += length + return ".".join(labels), offset + + header = struct.unpack(">HHHHHH", response[:12]) + rcode = header[1] & 0x000F # Response code + question_count = header[2] + answer_count = header[3] + + if rcode == 3: # NXDOMAIN + raise NXDomainException("The domain does not exist (NXDOMAIN)") + + if answer_count == 0: + raise NoAnswerException("No answer found in the DNS response") + + offset = 12 + for _ in range(question_count): + while response[offset] != 0: + offset += response[offset] + 1 + offset += 5 # null byte + QTYPE + QCLASS + + records = { + "A": [], + "NS": [], + "CNAME": [], + "SOA": [], + "AAAA": [], + "MX": [], + "NX_DOMAIN": False, + } + + for _ in range(answer_count): + name, offset = decode_name(offset) + rtype, rclass, ttl, data_length = struct.unpack_from(">HHIH", response, offset) + offset += 10 + + if rtype == TYPE_A: + ip = socket.inet_ntoa(response[offset : offset + data_length]) + records["A"].append(ip) + elif rtype == TYPE_NS: + ns, _ = decode_name(offset) + records["NS"].append(ns) + elif rtype == TYPE_CNAME: + cname, _ = decode_name(offset) + records["CNAME"].append(cname) + elif rtype == TYPE_SOA: + mname, offset = decode_name(offset) + rname, offset = decode_name(offset) + serial, refresh, retry, expire, minimum = struct.unpack_from( + ">IIIII", response, offset + ) + soa = { + "mname": mname, + "rname": rname, + "serial": serial, + "refresh": refresh, + "retry": retry, + "expire": expire, + "minimum": minimum, + } + records["SOA"].append(soa) + offset += 20 + elif rtype == TYPE_AAAA: + ip = socket.inet_ntop( + socket.AF_INET6, response[offset : offset + data_length] + ) + records["AAAA"].append(ip) + elif rtype == TYPE_MX: + preference = struct.unpack_from(">H", response, offset)[0] + offset += 2 + exchange, _ = decode_name(offset) + records["MX"].append((preference, exchange)) + + offset += data_length + + return records + + +class DnsClientProtocol(asyncio.DatagramProtocol): + def __init__(self, query, future): + self.query = query + self.future = future + self.transport = None + + def connection_made(self, transport): + self.transport = transport + self.transport.sendto(self.query) + + def datagram_received(self, data, addr): + if not self.future.done(): + self.future.set_result(data) + self.transport.close() + + def error_received(self, exc): + if not self.future.done(): + self.future.set_exception(exc) + self.transport.close() + + def connection_lost(self, exc): + if not self.future.done(): + self.future.set_exception(exc) + + +async def resolve_dns(domain, qtype, server="8.8.8.8", port=53): + query = build_dns_query(domain, qtype) + loop = asyncio.get_running_loop() + future = loop.create_future() + transport, protocol = await loop.create_datagram_endpoint( + lambda: DnsClientProtocol(query, future), remote_addr=(server, port) + ) + + try: + response = await asyncio.wait_for(future, timeout=0.5) + records = parse_dns_response(response) + return records + finally: + transport.close() + + +class Resolver: + attempted_resolutions = 0 + resolutions = 0 + errors = 0 + no_records = 0 + nx_domains = 0 + + def __init__(self, parallelism=200, nameservers=None): + self.nameservers = nameservers if nameservers is not None else ["8.8.8.8"] + self.semaphore = asyncio.Semaphore(parallelism) + + async def resolve(self, fqdn, type=None, retry=3): + async with self.semaphore: + if self.attempted_resolutions == 0: + write_when_on_warn("\nDNS Resolving: (Feedback per 100 resolves)\n") + write_when_on_warn( + "[ . = success, + = non-existent domain, x = error (likely rate-limiting)]\n" + ) + self.attempted_resolutions += 1 + qtype = QUERY_TYPES[type] + start = time.time() + resp = { + "A": [], + "NS": [], + "CNAME": [], + "SOA": [], + "AAAA": [], + "MX": [], + "NX_DOMAIN": False, + } + try: + resp = await resolve_dns(fqdn, qtype, random.choice(self.nameservers)) + if self.resolutions % 100 == 99: + write_when_on_warn(".") + self.resolutions += 1 + except NoAnswerException: + if self.resolutions % 100 == 99: + write_when_on_warn(".") + self.resolutions += 1 + except NXDomainException: + if self.nx_domains % 100 == 99: + write_when_on_warn("+") + self.nx_domains += 1 + resp["NX_DOMAIN"] = True + except: + if self.errors % 100 == 99: + write_when_on_warn("x") + self.errors += 1 + if retry > 0: + await asyncio.sleep(0.1) + resp = await self.resolve(fqdn, type=type, retry=retry - 1) + time_delta = time.time() - start + if time_delta < 0.125: + await asyncio.sleep(0.125 - time_delta) + return resp + + @staticmethod + async def resolve_with_ns(fqdn, ns, type=None): + qtype = QUERY_TYPES[type] + try: + return await resolve_dns(fqdn, qtype, ns) + except: + return {"A": [], "NS": [], "CNAME": [], "SOA": [], "AAAA": [], "MX": []} + + +def write_when_on_warn(msg): + if logging.root.level == logging.WARN: + # Only in warning, as we need a clean output + os.write(2, msg.encode()) diff --git a/scan.py b/scan.py index 93b773b..e3003bb 100644 --- a/scan.py +++ b/scan.py @@ -2,16 +2,12 @@ import logging -import random - -def scan_domain(domain, signatures, lock, findings, output_handler, name_servers: list): - if name_servers and name_servers != [""]: - domain.set_custom_NS(random.choice(name_servers)) +async def scan_domain(domain, signatures, findings, output_handler): if domain.should_fetch_std_records: - domain.fetch_std_records() + await domain.fetch_std_records() else: - domain.fetch_external_records() + await domain.fetch_external_records() for signature in signatures: logging.debug( f"Testing domain '{domain.domain}' with signature '{signature.__name__}'" @@ -20,7 +16,7 @@ def scan_domain(domain, signatures, lock, findings, output_handler, name_servers logging.debug( f"Potential takeover found on DOMAIN '{domain}' using signature '{signature.__name__}'" ) - if signature.test.check(domain=domain): + if await signature.test.check(domain=domain): status = signature.test.CONFIDENCE.value logging.info( f"Takeover {status} on {domain} using signature '{signature.__name__}'" @@ -32,9 +28,8 @@ def scan_domain(domain, signatures, lock, findings, output_handler, name_servers confidence=signature.test.CONFIDENCE, more_info_url=signature.test.more_info_url, ) - with lock: - findings.append(finding) - output_handler.write(finding) + findings.append(finding) + output_handler.write(finding) else: logging.debug( f"Takeover not possible on DOMAIN '{domain}' using signature '{signature.__name__}'" diff --git a/signatures/_generic_cname_found_but_404_http.py b/signatures/_generic_cname_found_but_404_http.py index d4a0c45..4ed9bc4 100644 --- a/signatures/_generic_cname_found_but_404_http.py +++ b/signatures/_generic_cname_found_but_404_http.py @@ -14,8 +14,8 @@ def potential(domain: Domain, **kwargs) -> bool: return False -def check(domain: Domain, **kwargs) -> bool: - return checks.WEB.status_code_404(domain, False) +async def check(domain: Domain, **kwargs) -> bool: + return await checks.WEB.status_code_404(domain, False) INFO = """ diff --git a/signatures/_generic_cname_found_but_404_https.py b/signatures/_generic_cname_found_but_404_https.py index 064f442..ccee60e 100644 --- a/signatures/_generic_cname_found_but_404_https.py +++ b/signatures/_generic_cname_found_but_404_https.py @@ -14,8 +14,8 @@ def potential(domain: Domain, **kwargs) -> bool: return False -def check(domain: Domain, **kwargs) -> bool: - return checks.WEB.status_code_404(domain, True) +async def check(domain: Domain, **kwargs) -> bool: + return await checks.WEB.status_code_404(domain, True) INFO = """ diff --git a/signatures/_generic_cname_found_but_unregistered.py b/signatures/_generic_cname_found_but_unregistered.py index ad22cef..3da2e8c 100644 --- a/signatures/_generic_cname_found_but_unregistered.py +++ b/signatures/_generic_cname_found_but_unregistered.py @@ -1,6 +1,5 @@ from domain import Domain from . import checks -import detection_enums from .templates.base import Base @@ -14,8 +13,8 @@ def potential(domain: Domain, **kwargs) -> bool: return False -def check(domain: Domain, **kwargs) -> bool: - return checks.CNAME.is_unregistered(domain) +async def check(domain: Domain, **kwargs) -> bool: + return await checks.CNAME.is_unregistered(domain) INFO = """ diff --git a/signatures/_generic_cname_found_doesnt_resolve.py b/signatures/_generic_cname_found_doesnt_resolve.py index e97ff19..52856c2 100644 --- a/signatures/_generic_cname_found_doesnt_resolve.py +++ b/signatures/_generic_cname_found_doesnt_resolve.py @@ -10,6 +10,7 @@ ".cloudfront.net", ".oracle.com", ".invalid", + "online.lync.com", ] @@ -34,8 +35,8 @@ def potential(domain: Domain, **kwargs) -> bool: return False -def check(domain: Domain, **kwargs) -> bool: - return checks.CNAME.NX_DOMAIN_on_resolve(domain) +async def check(domain: Domain, **kwargs) -> bool: + return await checks.CNAME.NX_DOMAIN_on_resolve(domain) INFO = """ diff --git a/signatures/_generic_zone_missing_on_ns.py b/signatures/_generic_zone_missing_on_ns.py index c32a8c9..6be1582 100644 --- a/signatures/_generic_zone_missing_on_ns.py +++ b/signatures/_generic_zone_missing_on_ns.py @@ -14,8 +14,8 @@ def potential(domain: Domain, **kwargs) -> bool: return False -def check(domain: Domain, **kwargs) -> bool: - return checks.NS.no_SOA_detected(domain) +async def check(domain: Domain, **kwargs) -> bool: + return await checks.NS.no_SOA_detected(domain) INFO = """ diff --git a/signatures/checks/CNAME.py b/signatures/checks/CNAME.py index 6fe138f..05cd3e6 100644 --- a/signatures/checks/CNAME.py +++ b/signatures/checks/CNAME.py @@ -11,19 +11,19 @@ def match(domain: Domain, strings) -> str: return False -def NX_DOMAIN_on_resolve(domain: Domain) -> bool: +async def NX_DOMAIN_on_resolve(domain: Domain) -> bool: for cname in domain.CNAME: cname = Domain(cname, fetch_standard_records=False) - if cname.NX_DOMAIN: + if await cname.NX_DOMAIN: logging.info(f"NX_Domain for cname {cname}") return True return False -def is_unregistered(domain: Domain) -> bool: +async def is_unregistered(domain: Domain) -> bool: for cname in domain.CNAME: cname = Domain(cname, fetch_standard_records=False) - if not cname.is_registered: + if not await cname.is_registered: logging.info(f"The domain '{cname}' is NOT registered") return True return False diff --git a/signatures/checks/NS.py b/signatures/checks/NS.py index cc05531..10760f8 100644 --- a/signatures/checks/NS.py +++ b/signatures/checks/NS.py @@ -1,6 +1,7 @@ from domain import Domain from . import helpers import logging +import resolver as resolver def match(domain: Domain, strings) -> str: @@ -11,16 +12,19 @@ def match(domain: Domain, strings) -> str: return False -def no_SOA_detected(domain: Domain) -> bool: - takeover_possible = False +async def no_SOA_detected(domain: Domain) -> bool: for ns in domain.NS: - ns_ip = Domain(ns, fetch_standard_records=False).query("A") + ns_ip = await Domain(ns, fetch_standard_records=False).query("A") if ns_ip == []: logging.debug(f"Could not resolve NS '{ns}'") continue - if Domain(domain.domain, fetch_standard_records=False, ns=ns_ip[0]).SOA == []: + if ( + (await resolver.Resolver.resolve_with_ns(domain.domain, ns_ip[0], "SOA"))[ + "SOA" + ] + ) == []: logging.info(f"NAMESERVER at {ns} does not have this zone.") - takeover_possible = True + return True else: logging.debug(f"SOA record found on NAMESERVER '{ns}'") - return takeover_possible + return False # Never found the condition diff --git a/signatures/checks/WEB.py b/signatures/checks/WEB.py index afd4a27..0b3b807 100644 --- a/signatures/checks/WEB.py +++ b/signatures/checks/WEB.py @@ -1,28 +1,36 @@ from domain import Domain from math import floor import logging +import sys -def string_in_body( +async def string_in_body( domain: Domain, string: str, https: bool, custom_uri: str = "" ) -> bool: - if string in domain.fetch_web(https=https, uri=custom_uri).body: + if string in (await domain.fetch_web(https=https, uri=custom_uri)).body: logging.info(f"Message observed in response for '{domain}'") return True logging.debug(f"Message not found in response for '{domain}'") + # Uncomment to debug and identify a string match issue + if "pytest" in sys.modules: + logging.warning((await domain.fetch_web(https=https, uri=custom_uri)).body) return False -def string_in_body_http(domain: Domain, string: str, custom_uri: str = "") -> bool: - return string_in_body(domain, string, False, custom_uri) +async def string_in_body_http( + domain: Domain, string: str, custom_uri: str = "" +) -> bool: + return await string_in_body(domain, string, False, custom_uri) -def string_in_body_https(domain: Domain, string: str, custom_uri: str = "") -> bool: - return string_in_body(domain, string, True, custom_uri) +async def string_in_body_https( + domain: Domain, string: str, custom_uri: str = "" +) -> bool: + return await string_in_body(domain, string, True, custom_uri) -def status_code_match(domain: Domain, status_code: int, https: bool) -> bool: - response_code = domain.fetch_web(https=https).status_code +async def status_code_match(domain: Domain, status_code: int, https: bool) -> bool: + response_code = (await domain.fetch_web(https=https)).status_code if status_code < 10: # match the first int if floor(response_code / 100) == status_code: logging.info(f"Response code {response_code} observed for '{domain}'") @@ -35,5 +43,5 @@ def status_code_match(domain: Domain, status_code: int, https: bool) -> bool: return False -def status_code_404(domain: Domain, https: bool) -> bool: - return status_code_match(domain, 404, https) +async def status_code_404(domain: Domain, https: bool) -> bool: + return await status_code_match(domain, 404, https) diff --git a/signatures/launchrock_cname.py b/signatures/launchrock_cname.py index 74b0b57..f595a23 100644 --- a/signatures/launchrock_cname.py +++ b/signatures/launchrock_cname.py @@ -1,8 +1,8 @@ -from .templates.cname_found_but_status_code import cname_found_but_status_code +from .templates.cname_found_but_string_in_body import cname_found_but_string_in_body -test = cname_found_but_status_code( +test = cname_found_but_string_in_body( cname="host.launchrock.com", - code=0, + domain_not_configured_message="It looks like you may have taken a wrong turn somewhere", service="Launchrock", https=True, ) diff --git a/signatures/shopify.py b/signatures/shopify.py index 375853f..066f507 100644 --- a/signatures/shopify.py +++ b/signatures/shopify.py @@ -10,7 +10,7 @@ test = cname_or_ip_found_but_string_in_body( cname=cname, ips=ipv4 + ipv6, - domain_not_configured_message="Create an Ecommerce Website and Sell Online! Ecommerce Software by Shopify", + domain_not_configured_message="This domain points to Shopify but isn't configured properly", service="Shopify", ) diff --git a/signatures/simplebooklet.py b/signatures/simplebooklet.py index 922517f..c75187a 100644 --- a/signatures/simplebooklet.py +++ b/signatures/simplebooklet.py @@ -1,8 +1,8 @@ -from .templates.cname_found_but_string_in_body import cname_found_but_string_in_body - -test = cname_found_but_string_in_body( - cname="cname.simplebooklet.com", - domain_not_configured_message="you're looking for isn't here.", - service="simplebooklet.com", - https=True, -) +from .templates.cname_found_but_string_in_body import cname_found_but_string_in_body + +test = cname_found_but_string_in_body( + cname="cname.simplebooklet.com", + domain_not_configured_message="The link to this Simplebooklet may have changed", + service="simplebooklet.com", + https=True, +) diff --git a/signatures/surveysparrow.py b/signatures/surveysparrow.py index cd1c2b0..48750f1 100644 --- a/signatures/surveysparrow.py +++ b/signatures/surveysparrow.py @@ -1,7 +1,7 @@ -from .templates.cname_found_but_string_in_body import cname_found_but_string_in_body - -test = cname_found_but_string_in_body( - cname=".surveysparrow.com", - domain_not_configured_message="
Account not found.
", - service="survey sparrow", -) +from .templates.cname_found_but_string_in_body import cname_found_but_string_in_body + +test = cname_found_but_string_in_body( + cname=".surveysparrow.com", + domain_not_configured_message="DNS resolution error ", + service="survey sparrow", +) diff --git a/signatures/templates/base.py b/signatures/templates/base.py index 3d21ac1..6aef2c8 100644 --- a/signatures/templates/base.py +++ b/signatures/templates/base.py @@ -5,7 +5,7 @@ class Base: def potential(self, *args, **kwargs): raise NotImplementedError() - def check(self, *args, **kwargs): + async def check(self, *args, **kwargs): raise NotImplementedError() def __init__( diff --git a/signatures/templates/cname_found_but_NX_DOMAIN.py b/signatures/templates/cname_found_but_NX_DOMAIN.py index a15990c..729d564 100644 --- a/signatures/templates/cname_found_but_NX_DOMAIN.py +++ b/signatures/templates/cname_found_but_NX_DOMAIN.py @@ -14,8 +14,8 @@ class cname_found_but_NX_DOMAIN(base.Base): def potential(self, domain, **kwargs) -> bool: return signatures.checks.CNAME.match(domain, self.cname) - def check(self, domain, **kwargs) -> bool: - return signatures.checks.CNAME.NX_DOMAIN_on_resolve(domain) + async def check(self, domain, **kwargs) -> bool: + return await signatures.checks.CNAME.NX_DOMAIN_on_resolve(domain) def __init__(self, cname, service, info=None, **kwargs): self.cname = cname diff --git a/signatures/templates/cname_found_but_status_code.py b/signatures/templates/cname_found_but_status_code.py index d6dfe28..dbfc560 100644 --- a/signatures/templates/cname_found_but_status_code.py +++ b/signatures/templates/cname_found_but_status_code.py @@ -14,8 +14,10 @@ class cname_found_but_status_code(base.Base): def potential(self, domain, **kwargs) -> bool: return signatures.checks.CNAME.match(domain, self.cname) - def check(self, domain, **kwargs) -> bool: - return signatures.checks.WEB.status_code_match(domain, self.code, self.https) + async def check(self, domain, **kwargs) -> bool: + return await signatures.checks.WEB.status_code_match( + domain, self.code, self.https + ) def __init__(self, cname, code, service, info=None, https=False, **kwargs): self.cname = cname diff --git a/signatures/templates/cname_found_but_string_in_body.py b/signatures/templates/cname_found_but_string_in_body.py index 1380234..3cfac96 100644 --- a/signatures/templates/cname_found_but_string_in_body.py +++ b/signatures/templates/cname_found_but_string_in_body.py @@ -14,12 +14,12 @@ class cname_found_but_string_in_body(base.Base): def potential(self, domain, **kwargs) -> bool: return signatures.checks.CNAME.match(domain, self.cname) - def check(self, domain, **kwargs) -> bool: + async def check(self, domain, **kwargs) -> bool: if self.https: - return signatures.checks.WEB.string_in_body_https( + return await signatures.checks.WEB.string_in_body_https( domain, self.domain_not_configured_message, custom_uri=self.custom_uri ) - return signatures.checks.WEB.string_in_body_http( + return await signatures.checks.WEB.string_in_body_http( domain, self.domain_not_configured_message, custom_uri=self.custom_uri ) diff --git a/signatures/templates/cname_or_ip_found_but_string_in_body.py b/signatures/templates/cname_or_ip_found_but_string_in_body.py index ca9d29e..ed02548 100644 --- a/signatures/templates/cname_or_ip_found_but_string_in_body.py +++ b/signatures/templates/cname_or_ip_found_but_string_in_body.py @@ -16,12 +16,12 @@ def potential(self, domain, **kwargs) -> bool: domain, self.cname, self.ips ) - def check(self, domain, **kwargs) -> bool: + async def check(self, domain, **kwargs) -> bool: if self.https: - return signatures.checks.WEB.string_in_body_https( + return await signatures.checks.WEB.string_in_body_https( domain, self.domain_not_configured_message, custom_uri=self.custom_uri ) - return signatures.checks.WEB.string_in_body_http( + return await signatures.checks.WEB.string_in_body_http( domain, self.domain_not_configured_message, custom_uri=self.custom_uri ) diff --git a/signatures/templates/ip_found_but_string_in_body.py b/signatures/templates/ip_found_but_string_in_body.py index d3f9564..1d10c06 100644 --- a/signatures/templates/ip_found_but_string_in_body.py +++ b/signatures/templates/ip_found_but_string_in_body.py @@ -16,12 +16,12 @@ def potential(self, domain, **kwargs) -> bool: return True return signatures.checks.AAAA.match(domain, self.ips) - def check(self, domain, **kwargs) -> bool: + async def check(self, domain, **kwargs) -> bool: if self.https: - return signatures.checks.WEB.string_in_body_https( + return await signatures.checks.WEB.string_in_body_https( domain, self.domain_not_configured_message, custom_uri=self.custom_uri ) - return signatures.checks.WEB.string_in_body_http( + return await signatures.checks.WEB.string_in_body_http( domain, self.domain_not_configured_message, custom_uri=self.custom_uri ) diff --git a/signatures/templates/ns_found_but_no_SOA.py b/signatures/templates/ns_found_but_no_SOA.py index fb794f3..5466da9 100644 --- a/signatures/templates/ns_found_but_no_SOA.py +++ b/signatures/templates/ns_found_but_no_SOA.py @@ -13,8 +13,8 @@ class ns_found_but_no_SOA(base.Base): def potential(self, domain, **kwargs) -> bool: return signatures.checks.NS.match(domain, self.ns) - def check(self, domain, **kwargs) -> bool: - return signatures.checks.NS.no_SOA_detected(domain) + async def check(self, domain, **kwargs) -> bool: + return await signatures.checks.NS.no_SOA_detected(domain) def __init__(self, ns, service, sample_ns=None, info=None, **kwargs): self.ns = ns diff --git a/signatures/wix.py b/signatures/wix.py new file mode 100644 index 0000000..3bae2a5 --- /dev/null +++ b/signatures/wix.py @@ -0,0 +1,11 @@ +from detection_enums import CONFIDENCE +from .templates.ip_found_but_string_in_body import ip_found_but_string_in_body + + +test = ip_found_but_string_in_body( + ips=["23.236.62.147"], + domain_not_configured_message="ConnectYourDomain", + service="Wix", +) + +test.CONFIDENCE = CONFIDENCE.POTENTIAL diff --git a/test-requirements.txt b/test-requirements.txt index ccd7aec..a636261 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,10 +1,2 @@ -pytest==7.1.2 -dnspython==2.2.1 -requests==2.31.0 -python-whois==0.8.0 -boto3==1.24.40 -cloudflare==2.9.11 -colorama==0.4.5 -azure-mgmt-dns==8.0.0 -azure-identity==1.10.0 -msrestazure==0.6.4 \ No newline at end of file +pytest==8.3.2 +pytest-asyncio diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/mocks.py b/tests/mocks.py index 5ec5339..4aeb479 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -1,10 +1,13 @@ from collections import namedtuple +import socket from domain import Domain import requests import dns.resolver import ipaddress from uuid import uuid4 +import aiohttp + def random_string(): return f"a{uuid4().hex[:8]}" @@ -13,7 +16,7 @@ def random_string(): def mock_web_response_with_static_value( domain: Domain, body: str = "", status_code: int = 0 ) -> Domain: - def mock_fetch_web(**kwargs): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["body", "status_code"])(body, status_code) domain.fetch_web = mock_fetch_web @@ -92,3 +95,40 @@ def send(self, request, **kwargs): # overwrite the host header request.headers["Host"] = result.hostname return super(HostHeaderAdapter, self).send(request, **kwargs) + + +class CustomResolver: + def __init__(self, ip): + self.ip = ip + + async def resolve(self, host, port=0, family=socket.AF_INET): + return [ + { + "hostname": host, + "host": self.ip, + "port": port, + "family": family, + "proto": socket.IPPROTO_TCP, + "flags": 0, + } + ] + + +def generate_mock_aiohttp_session_with_forced_ip_resolution(ip): + def mock_aiohttp_session_with_forced_resolution(): + resolver = CustomResolver(ip) + conn = aiohttp.TCPConnector(resolver=resolver) + return aiohttp.ClientSession(connector=conn) + + return mock_aiohttp_session_with_forced_resolution + + +def generate_mock_aiohttp_session_with_forced_cname_resolution(cname): + ip = [record.to_text() for record in dns.resolver.resolve(cname)][0] + + def mock_aiohttp_session_with_forced_resolution(): + resolver = CustomResolver(ip) + conn = aiohttp.TCPConnector(resolver=resolver) + return aiohttp.ClientSession(connector=conn) + + return mock_aiohttp_session_with_forced_resolution diff --git a/tests/signatures_tests/__init__.py b/tests/signatures_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/signatures_tests/checks/__init__.py b/tests/signatures_tests/checks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/signatures_tests/checks/test_A.py b/tests/signatures_tests/checks/test_A.py index 0f98a33..dc5fb7f 100644 --- a/tests/signatures_tests/checks/test_A.py +++ b/tests/signatures_tests/checks/test_A.py @@ -1,44 +1,52 @@ from domain import Domain from signatures.checks import A +import pytest -def test_match_for_single_ip(): +@pytest.mark.asyncio +async def test_match_for_single_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1", "2.2.2.2"] assert A.match(domain, "1.1.1.1") == True -def test_match_for_multiple_ip(): +@pytest.mark.asyncio +async def test_match_for_multiple_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1", "2.2.2.2"] assert A.match(domain, ["1.1.1.1", "2.2.2.2"]) == True -def test_match_for_one_of_multiple_ip(): +@pytest.mark.asyncio +async def test_match_for_one_of_multiple_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1", "3.3.3.3"] assert A.match(domain, ["1.1.1.1", "2.2.2.2"]) == True -def test_match_for_none_of_multiple_ip(): +@pytest.mark.asyncio +async def test_match_for_none_of_multiple_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1", "2.2.2.2"] assert A.match(domain, ["3.3.3.3", "4.4.4.4"]) == False -def test_match_for_none_matching_ip(): +@pytest.mark.asyncio +async def test_match_for_none_matching_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1", "2.2.2.2"] assert A.match(domain, "3.3.3.3") == False -def test_match_with_no_A_records(): +@pytest.mark.asyncio +async def test_match_with_no_A_records(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = [] assert A.match(domain, "1.1.1.1") == False -def test_match_multiple_with_no_A_records(): +@pytest.mark.asyncio +async def test_match_multiple_with_no_A_records(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = [] assert A.match(domain, ["1.1.1.1", "2.2.2.2"]) == False diff --git a/tests/signatures_tests/checks/test_AAAA.py b/tests/signatures_tests/checks/test_AAAA.py index 118f590..e72dd3b 100644 --- a/tests/signatures_tests/checks/test_AAAA.py +++ b/tests/signatures_tests/checks/test_AAAA.py @@ -1,44 +1,52 @@ from domain import Domain from signatures.checks import AAAA +import pytest -def test_ipv6_in_AAAA_for_single_ip(): +@pytest.mark.asyncio +async def test_ipv6_in_AAAA_for_single_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::1", "::2"] assert AAAA.match(domain, "::1") == True -def test_ipv6_in_AAAA_for_multiple_ip(): +@pytest.mark.asyncio +async def test_ipv6_in_AAAA_for_multiple_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::1", "::2"] assert AAAA.match(domain, ["::1", "::2"]) == True -def test_ipv6_in_AAAA_for_one_of_multiple_ip(): +@pytest.mark.asyncio +async def test_ipv6_in_AAAA_for_one_of_multiple_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::1", "::3"] assert AAAA.match(domain, ["::1", "::2"]) == True -def test_ipv6_in_AAAA_for_none_of_multiple_ip(): +@pytest.mark.asyncio +async def test_ipv6_in_AAAA_for_none_of_multiple_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::1", "::2"] assert AAAA.match(domain, ["::3", "::4"]) == False -def test_ipv6_in_AAAA_for_none_matching_ip(): +@pytest.mark.asyncio +async def test_ipv6_in_AAAA_for_none_matching_ip(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::1", "::2"] assert AAAA.match(domain, "::3") == False -def test_ipv6_in_AAAA_with_no_AAAA_records(): +@pytest.mark.asyncio +async def test_ipv6_in_AAAA_with_no_AAAA_records(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = [] assert AAAA.match(domain, "::1") == False -def test_ipv6_in_AAAA_multiple_with_no_AAAA_records(): +@pytest.mark.asyncio +async def test_ipv6_in_AAAA_multiple_with_no_AAAA_records(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = [] assert AAAA.match(domain, ["::1", "::2"]) == False diff --git a/tests/signatures_tests/checks/test_CNAME.py b/tests/signatures_tests/checks/test_CNAME.py index 7a9328b..61a26de 100644 --- a/tests/signatures_tests/checks/test_CNAME.py +++ b/tests/signatures_tests/checks/test_CNAME.py @@ -1,65 +1,76 @@ from domain import Domain from signatures.checks import CNAME -from unittest.mock import patch, PropertyMock -from whois.parser import PywhoisError -import mocks +from unittest.mock import patch, AsyncMock +from asyncwhois.errors import NotFoundError +from ... import mocks +import pytest -def test_match_for_single_string(): +@pytest.mark.asyncio +async def test_match_for_single_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["abc", "def"] assert CNAME.match(domain, "abc") == True -def test_match_for_single_substring(): +@pytest.mark.asyncio +async def test_match_for_single_substring(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["abc", "def"] assert CNAME.match(domain, "b") == True -def test_match_where_cname_is_subtring_of_string(): +@pytest.mark.asyncio +async def test_match_where_cname_is_subtring_of_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["abc", "def"] assert CNAME.match(domain, "abcd") == False -def test_match_for_multiple_string(): +@pytest.mark.asyncio +async def test_match_for_multiple_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["abc", "def"] assert CNAME.match(domain, ["abc", "def"]) == True -def test_match_for_one_of_multiple_string(): +@pytest.mark.asyncio +async def test_match_for_one_of_multiple_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["abc", "hij"] assert CNAME.match(domain, ["abc", "def"]) == True -def test_match_for_one_of_multiple_substring(): +@pytest.mark.asyncio +async def test_match_for_one_of_multiple_substring(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["abc", "hij"] assert CNAME.match(domain, ["def", "h"]) == True -def test_match_for_none_of_multiple_string(): +@pytest.mark.asyncio +async def test_match_for_none_of_multiple_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["abc", "def"] assert CNAME.match(domain, ["hij", "klm"]) == False -def test_match_for_none_matching_string(): +@pytest.mark.asyncio +async def test_match_for_none_matching_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["abc", "def"] assert CNAME.match(domain, "hij") == False -def test_match_with_no_CNAME_records(): +@pytest.mark.asyncio +async def test_match_with_no_CNAME_records(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = [] assert CNAME.match(domain, "abc") == False -def test_match_multiple_with_no_CNAME_records(): +@pytest.mark.asyncio +async def test_match_multiple_with_no_CNAME_records(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = [] assert CNAME.match(domain, ["abc", "def"]) == False @@ -71,82 +82,66 @@ def test_match_multiple_with_no_CNAME_records(): domain_with_cname.CNAME = ["cname"] -def test_NX_DOMAIN_on_resolve_success(): - with patch("domain.Domain.query", return_value=[]): - assert CNAME.NX_DOMAIN_on_resolve(domain_with_cname) == True +@pytest.mark.asyncio +async def test_NX_DOMAIN_on_resolve_success(): + with patch( + "resolver.Resolver.resolve", + side_effect=AsyncMock(return_value={"NX_DOMAIN": True}), + ): + assert await CNAME.NX_DOMAIN_on_resolve(domain_with_cname) == True -def test_NX_DOMAIN_on_resolve_failure_A_record_found(): - def query(self, type): - return "something" if type == "A" else [] - - with patch("domain.Domain.query", new=query): - assert CNAME.NX_DOMAIN_on_resolve(domain_with_cname) == False - - -def test_NX_DOMAIN_on_resolve_failure_AAAA_record_found(): - def query(self, type): - return "something" if type == "AAAA" else [] - - with patch("domain.Domain.query", new=query): - assert CNAME.NX_DOMAIN_on_resolve(domain_with_cname) == False - - -def test_NX_DOMAIN_on_resolve_failure_CNAME_record_found(): - def query(self, type): - return "something" if type == "CNAME" else [] - - with patch("domain.Domain.query", new=query): - assert CNAME.NX_DOMAIN_on_resolve(domain_with_cname) == False - - -def test_NX_DOMAIN_on_resolve_failure_NS_record_found(): - def query(self, type): - return "something" if type == "NS" else [] - - with patch("domain.Domain.query", new=query): - assert CNAME.NX_DOMAIN_on_resolve(domain_with_cname) == False - - -def test_NX_DOMAIN_on_resolve_failure_no_cname(): +@pytest.mark.asyncio +async def test_NX_DOMAIN_on_resolve_failure_no_cname(): domain = Domain("mock.local", fetch_standard_records=False) - with patch("domain.Domain.query", return_value=["something"]): - assert CNAME.NX_DOMAIN_on_resolve(domain_with_cname) == False + with patch( + "resolver.Resolver.resolve", + side_effect=AsyncMock(return_value={"NX_DOMAIN": False}), + ): + assert await CNAME.NX_DOMAIN_on_resolve(domain_with_cname) == False -def test_is_unregistered_failure_no_cname(): +@pytest.mark.asyncio +async def test_is_unregistered_failure_no_cname(): domain = Domain("mock.local", fetch_standard_records=False) - assert CNAME.is_unregistered(domain) == False + assert await CNAME.is_unregistered(domain) == False -def test_is_unregistered_failure_cname_registered(): +@pytest.mark.asyncio +async def test_is_unregistered_failure_cname_registered(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["something"] - with patch("domain.whois.whois", return_value={"registrar": "something"}): - assert CNAME.is_unregistered(domain) == False + with patch( + "domain.asyncwhois.aio_whois", + side_effect=AsyncMock(return_value={"registrar": "something"}), + ): + assert await CNAME.is_unregistered(domain) == False -def test_is_unregistered_failure_whois_failure(): - def whois(domain): +@pytest.mark.asyncio +async def test_is_unregistered_failure_whois_failure(): + async def whois(domain): raise ValueError("BOOK") domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["something"] - with patch("domain.whois.whois", new=whois): - assert CNAME.is_unregistered(domain) == False + with patch("domain.asyncwhois.aio_whois", new=whois): + assert await CNAME.is_unregistered(domain) == False -def test_is_unregistered_success_cname_unregistered(): - def whois(domain): - raise PywhoisError("Not found") +@pytest.mark.asyncio +async def test_is_unregistered_success_cname_unregistered(): + async def whois(domain): + raise NotFoundError("Domain not found!") domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["something"] - with patch("domain.whois.whois", new=whois): - assert CNAME.is_unregistered(domain) == True + with patch("domain.asyncwhois.aio_whois", new=whois): + assert await CNAME.is_unregistered(domain) == True -def test_is_unregistered_success_cname_unregistered_ACTIVE(): +@pytest.mark.asyncio +async def test_is_unregistered_success_cname_unregistered_ACTIVE(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = [f"{mocks.random_string()}.com"] - assert CNAME.is_unregistered(domain) == True + assert await CNAME.is_unregistered(domain) == True diff --git a/tests/signatures_tests/checks/test_COMBINED.py b/tests/signatures_tests/checks/test_COMBINED.py index abfd15e..f676852 100644 --- a/tests/signatures_tests/checks/test_COMBINED.py +++ b/tests/signatures_tests/checks/test_COMBINED.py @@ -1,24 +1,28 @@ from xml import dom from domain import Domain from signatures.checks import COMBINED +import pytest ## matching_ipv4_or_ipv6 -def test_matching_ipv4_or_ipv6_ipv4_match(): +@pytest.mark.asyncio +async def test_matching_ipv4_or_ipv6_ipv4_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1"] domain.AAAA = [] assert COMBINED.matching_ipv4_or_ipv6(domain, "1.1.1.1", "::1") == True -def test_matching_ipv4_or_ipv6_ipv6_match(): +@pytest.mark.asyncio +async def test_matching_ipv4_or_ipv6_ipv6_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = [] domain.AAAA = ["::1"] assert COMBINED.matching_ipv4_or_ipv6(domain, "1.1.1.1", "::1") == True -def test_matching_ipv4_or_ipv6_no_match(): +@pytest.mark.asyncio +async def test_matching_ipv4_or_ipv6_no_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = [] domain.AAAA = [] @@ -26,49 +30,56 @@ def test_matching_ipv4_or_ipv6_no_match(): ## macthing_ip_or_cname -def test_matching_ip_or_cname_single_ipv4_match(): +@pytest.mark.asyncio +async def test_matching_ip_or_cname_single_ipv4_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1"] domain.CNAME = [] assert COMBINED.matching_ip_or_cname(domain, "", ips="1.1.1.1") == True -def test_matching_ip_or_cname_multiple_ipv4_match(): +@pytest.mark.asyncio +async def test_matching_ip_or_cname_multiple_ipv4_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["2.2.2.2"] domain.CNAME = [] assert COMBINED.matching_ip_or_cname(domain, "", ips=["1.1.1.1", "2.2.2.2"]) == True -def test_matching_ip_or_cname_single_ipv6_match(): +@pytest.mark.asyncio +async def test_matching_ip_or_cname_single_ipv6_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::1"] domain.CNAME = [] assert COMBINED.matching_ip_or_cname(domain, "", ips="::1") == True -def test_matching_ip_or_cname_multiple_ipv6_match(): +@pytest.mark.asyncio +async def test_matching_ip_or_cname_multiple_ipv6_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::2"] domain.CNAME = [] assert COMBINED.matching_ip_or_cname(domain, "", ips=["::1", "::2"]) == True -def test_matching_ip_or_cname_multiple_ips_match_ipv6(): +@pytest.mark.asyncio +async def test_matching_ip_or_cname_multiple_ips_match_ipv6(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::2"] domain.CNAME = [] assert COMBINED.matching_ip_or_cname(domain, "", ips=["1.1.1.1", "::2"]) == True -def test_matching_ip_or_cname_multiple_ips_match_ipv4(): +@pytest.mark.asyncio +async def test_matching_ip_or_cname_multiple_ips_match_ipv4(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1"] domain.CNAME = [] assert COMBINED.matching_ip_or_cname(domain, "", ips=["1.1.1.1", "::2"]) == True -def test_matching_ip_or_cname_cname_match(): +@pytest.mark.asyncio +async def test_matching_ip_or_cname_cname_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = [] domain.CNAME = ["goose"] @@ -77,7 +88,8 @@ def test_matching_ip_or_cname_cname_match(): ) -def test_matching_ip_or_cname_no_match(): +@pytest.mark.asyncio +async def test_matching_ip_or_cname_no_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = [] domain.AAAA = [] @@ -88,21 +100,24 @@ def test_matching_ip_or_cname_no_match(): ## macthing_ipv4_or_cname -def test_matching_ipv4_or_cname_ipv4_match(): +@pytest.mark.asyncio +async def test_matching_ipv4_or_cname_ipv4_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1"] domain.CNAME = [] assert COMBINED.matching_ipv4_or_cname(domain, "1.1.1.1", "goose") == True -def test_matching_ipv4_or_cname_cname_match(): +@pytest.mark.asyncio +async def test_matching_ipv4_or_cname_cname_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = [] domain.CNAME = ["goose"] assert COMBINED.matching_ipv4_or_cname(domain, "1.1.1.1", "goose") == True -def test_matching_ipv4_or_cname_no_match(): +@pytest.mark.asyncio +async def test_matching_ipv4_or_cname_no_match(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = [] domain.AAAA = [] diff --git a/tests/signatures_tests/checks/test_NS.py b/tests/signatures_tests/checks/test_NS.py index fb8570a..8633fa7 100644 --- a/tests/signatures_tests/checks/test_NS.py +++ b/tests/signatures_tests/checks/test_NS.py @@ -1,87 +1,102 @@ from domain import Domain from signatures.checks import NS from unittest.mock import patch, PropertyMock +import pytest -def test_match_for_single_string(): +@pytest.mark.asyncio +async def test_match_for_single_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["abc", "def"] assert NS.match(domain, "abc") == True -def test_match_for_single_substring(): +@pytest.mark.asyncio +async def test_match_for_single_substring(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["abc", "def"] assert NS.match(domain, "b") == True -def test_match_where_ns_is_subtring_of_string(): +@pytest.mark.asyncio +async def test_match_where_ns_is_subtring_of_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["abc", "def"] assert NS.match(domain, "abcd") == False -def test_match_for_multiple_string(): +@pytest.mark.asyncio +async def test_match_for_multiple_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["abc", "def"] assert NS.match(domain, ["abc", "def"]) == True -def test_match_for_one_of_multiple_string(): +@pytest.mark.asyncio +async def test_match_for_one_of_multiple_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["abc", "hij"] assert NS.match(domain, ["abc", "def"]) == True -def test_match_for_one_of_multiple_substring(): +@pytest.mark.asyncio +async def test_match_for_one_of_multiple_substring(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["abc", "hij"] assert NS.match(domain, ["def", "h"]) == True -def test_match_for_none_of_multiple_string(): +@pytest.mark.asyncio +async def test_match_for_none_of_multiple_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["abc", "def"] assert NS.match(domain, ["hij", "klm"]) == False -def test_match_for_none_matching_string(): +@pytest.mark.asyncio +async def test_match_for_none_matching_string(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["abc", "def"] assert NS.match(domain, "hij") == False -def test_match_with_no_ns_records(): +@pytest.mark.asyncio +async def test_match_with_no_ns_records(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = [] assert NS.match(domain, "abc") == False -def test_match_multiple_with_no_ns_records(): +@pytest.mark.asyncio +async def test_match_multiple_with_no_ns_records(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = [] assert NS.match(domain, ["abc", "def"]) == False -def test_no_SOA_detected_on_NS_with_matching_nameservers(): +@pytest.mark.asyncio +async def test_no_SOA_detected_on_NS_with_matching_nameservers(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["ns"] - with patch("domain.Domain.SOA", return_value=[], new_callable=PropertyMock): + with patch("resolver.Resolver.resolve_with_ns", return_value={"SOA": []}): with patch("domain.Domain.query", return_value=["10.10.10.10"]): - assert NS.no_SOA_detected(domain) == True + assert (await NS.no_SOA_detected(domain)) == True -def test_no_SOA_detected_on_NS_with_no_matching_nameservers(): +@pytest.mark.asyncio +async def test_no_SOA_detected_on_NS_with_no_matching_nameservers(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["ns"] + with patch( - "domain.Domain.SOA", return_value=["SOA RECORD HERE"], new_callable=PropertyMock + "resolver.Resolver.resolve_with_ns", return_value={"SOA": ["SOA RECORD HERE"]} ): with patch("domain.Domain.query", return_value=["10.10.10.10"]): - assert NS.no_SOA_detected(domain) == False + assert (await NS.no_SOA_detected(domain)) == False -def test_no_SOA_detected_on_NS_with_no_nameservers(): +@pytest.mark.asyncio +async def test_no_SOA_detected_on_NS_with_no_nameservers(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = [] - assert NS.no_SOA_detected(domain) == False + assert (await NS.no_SOA_detected(domain)) == False diff --git a/tests/signatures_tests/checks/test_WEB.py b/tests/signatures_tests/checks/test_WEB.py index c679955..d30ac56 100644 --- a/tests/signatures_tests/checks/test_WEB.py +++ b/tests/signatures_tests/checks/test_WEB.py @@ -3,6 +3,8 @@ from collections import namedtuple +import pytest + ## string_in_body string_in_body_response = "This is a response" @@ -11,103 +13,114 @@ string_in_body_no_match = "goose" -def test_string_in_body_success_on_full_match(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_string_in_body_success_on_full_match(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["body"])(string_in_body_response) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.string_in_body(domain, string_in_body_full_match, False) == True + assert await WEB.string_in_body(domain, string_in_body_full_match, False) == True -def test_string_in_body_success_on_partial_match(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_string_in_body_success_on_partial_match(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["body"])(string_in_body_response) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.string_in_body(domain, string_in_body_partial_match, False) == True + assert await WEB.string_in_body(domain, string_in_body_partial_match, False) == True -def test_string_in_body_failure_on_no_match(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_string_in_body_failure_on_no_match(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["body"])(string_in_body_response) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.string_in_body(domain, string_in_body_no_match, False) == False + assert await WEB.string_in_body(domain, string_in_body_no_match, False) == False -def test_string_in_body_failure_on_no_body(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_string_in_body_failure_on_no_body(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["body"])("") domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.string_in_body(domain, string_in_body_full_match, False) == False + assert await WEB.string_in_body(domain, string_in_body_full_match, False) == False ## status_code_match -def test_status_code_match_success_specific(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_status_code_match_success_specific(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(200) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.status_code_match(domain, 200, False) == True + assert await WEB.status_code_match(domain, 200, False) == True -def test_status_code_match_success_partial(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_status_code_match_success_partial(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(302) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.status_code_match(domain, 3, False) == True + assert await WEB.status_code_match(domain, 3, False) == True -def test_status_code_match_fail_specific(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_status_code_match_fail_specific(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(302) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.status_code_match(domain, 401, False) == False + assert await WEB.status_code_match(domain, 401, False) == False -def test_status_code_match_fail_partial(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_status_code_match_fail_partial(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(404) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.status_code_match(domain, 3, False) == False + assert await WEB.status_code_match(domain, 3, False) == False -def test_status_code_match_fail_partial(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_status_code_match_fail_partial(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(404) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.status_code_match(domain, 3, False) == False + assert await WEB.status_code_match(domain, 3, False) == False -def test_status_code_404_success(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_status_code_404_success(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(404) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.status_code_404(domain, False) == True + assert await WEB.status_code_404(domain, False) == True -def test_status_code_404_failure(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_status_code_404_failure(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(200) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert WEB.status_code_404(domain, False) == False + assert await WEB.status_code_404(domain, False) == False diff --git a/tests/signatures_tests/templates/__init__.py b/tests/signatures_tests/templates/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/signatures_tests/templates/test_cname_found_but_NX_DOMAIN.py b/tests/signatures_tests/templates/test_cname_found_but_NX_DOMAIN.py index f7f5bd0..58ec41a 100644 --- a/tests/signatures_tests/templates/test_cname_found_but_NX_DOMAIN.py +++ b/tests/signatures_tests/templates/test_cname_found_but_NX_DOMAIN.py @@ -4,36 +4,40 @@ cname_found_but_NX_DOMAIN, ) -from tests import mocks +from ... import mocks from unittest.mock import patch import pytest test = cname_found_but_NX_DOMAIN("cname", "INFO") -def test_potential_success_with_matching_CNAME(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_CNAME(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] assert test.potential(domain) == True -def test_potential_failure_no_matching_CNAME(): +@pytest.mark.asyncio +async def test_potential_failure_no_matching_CNAME(): domain = Domain("mock.local", fetch_standard_records=False) assert test.potential(domain) == False -def test_check_success(): +@pytest.mark.asyncio +async def test_check_success(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] - with patch("domain.Domain.query", return_value=[]): - assert test.check(domain) == True + with patch("resolver.Resolver.resolve", return_value={"NX_DOMAIN": True}): + assert await test.check(domain) == True -def test_check_failure(): +@pytest.mark.asyncio +async def test_check_failure(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] - with patch("domain.Domain.query", return_value=["something"]): - assert test.check(domain) == False + with patch("resolver.Resolver.resolve", return_value={"NX_DOMAIN": False}): + assert await test.check(domain) == False signatures = [getattr(signatures, signature) for signature in signatures.__all__] @@ -43,7 +47,8 @@ def test_check_failure(): "signature", [s for s in signatures if isinstance(s.test, cname_found_but_NX_DOMAIN)], ) -def test_check_success_ACTIVE(signature): +@pytest.mark.asyncio +async def test_check_success_ACTIVE(signature): cnames = ( signature.test.cname if type(signature.test.cname) == list @@ -54,4 +59,4 @@ def test_check_success_ACTIVE(signature): domain = Domain(f"{mocks.random_string()}.com", fetch_standard_records=False) domain.CNAME = [test_cname] print(f"Testing cname {test_cname}") - assert signature.test.check(domain) == True + assert await signature.test.check(domain) == True diff --git a/tests/signatures_tests/templates/test_cname_found_but_status_code.py b/tests/signatures_tests/templates/test_cname_found_but_status_code.py index 8e45bf0..46014c0 100644 --- a/tests/signatures_tests/templates/test_cname_found_but_status_code.py +++ b/tests/signatures_tests/templates/test_cname_found_but_status_code.py @@ -5,32 +5,36 @@ ) import pytest -from tests import mocks +from ... import mocks test = cname_found_but_status_code("cname", 404, "mock") -def test_potential_success_with_matching_CNAME(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_CNAME(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] assert test.potential(domain) == True -def test_potential_failure_no_matching_CNAME(): +@pytest.mark.asyncio +async def test_potential_failure_no_matching_CNAME(): domain = Domain("mock.local", fetch_standard_records=False) assert test.potential(domain) == False -def test_check_success(): +@pytest.mark.asyncio +async def test_check_success(): domain = Domain("mock.local", fetch_standard_records=False) mocks.mock_web_response_with_static_value(domain, status_code=test.code) - assert test.check(domain) == True + assert await test.check(domain) == True -def test_check_failure(): +@pytest.mark.asyncio +async def test_check_failure(): domain = Domain("mock.local", fetch_standard_records=False) mocks.mock_web_response_with_static_value(domain, status_code=200) - assert test.check(domain) == False + assert await test.check(domain) == False signatures = [getattr(signatures, signature) for signature in signatures.__all__] @@ -40,7 +44,8 @@ def test_check_failure(): "signature", [s for s in signatures if isinstance(s.test, cname_found_but_status_code)], ) -def test_check_success_ACTIVE(signature): +@pytest.mark.asyncio +async def test_check_success_ACTIVE(signature): cnames = ( signature.test.cname if type(signature.test.cname) == list @@ -49,5 +54,7 @@ def test_check_success_ACTIVE(signature): for cname in cnames: test_cname = f"{mocks.random_string()}{cname}" if cname[0] == "." else cname domain = Domain(f"{mocks.random_string()}.com", fetch_standard_records=False) - mocks.mock_web_request_by_providing_static_host_resolution(domain, test_cname) - assert signature.test.check(domain) == True + domain.get_session = ( + mocks.generate_mock_aiohttp_session_with_forced_cname_resolution(test_cname) + ) + assert await signature.test.check(domain) == True diff --git a/tests/signatures_tests/templates/test_cname_found_but_string_in_body.py b/tests/signatures_tests/templates/test_cname_found_but_string_in_body.py index 0d3d64f..62b3a7f 100644 --- a/tests/signatures_tests/templates/test_cname_found_but_string_in_body.py +++ b/tests/signatures_tests/templates/test_cname_found_but_string_in_body.py @@ -4,35 +4,39 @@ cname_found_but_string_in_body, ) -from tests import mocks +from ... import mocks import pytest test = cname_found_but_string_in_body("cname", "No domain found here", "INFO") -def test_potential_success_with_matching_CNAME(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_CNAME(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] assert test.potential(domain) == True -def test_potential_failure_no_matching_CNAME(): +@pytest.mark.asyncio +async def test_potential_failure_no_matching_CNAME(): domain = Domain("mock.local", fetch_standard_records=False) assert test.potential(domain) == False -def test_check_success(): +@pytest.mark.asyncio +async def test_check_success(): domain = Domain("mock.local", fetch_standard_records=False) mocks.mock_web_response_with_static_value( domain, test.domain_not_configured_message ) - assert test.check(domain) == True + assert await test.check(domain) == True -def test_check_failure(): +@pytest.mark.asyncio +async def test_check_failure(): domain = Domain("mock.local", fetch_standard_records=False) mocks.mock_web_response_with_static_value(domain, "Welcome to my site!") - assert test.check(domain) == False + assert await test.check(domain) == False signatures = [getattr(signatures, signature) for signature in signatures.__all__] @@ -42,7 +46,8 @@ def test_check_failure(): "signature", [s for s in signatures if isinstance(s.test, cname_found_but_string_in_body)], ) -def test_check_success_ACTIVE(signature): +@pytest.mark.asyncio +async def test_check_success_ACTIVE(signature): cnames = ( signature.test.cname if type(signature.test.cname) == list @@ -51,5 +56,7 @@ def test_check_success_ACTIVE(signature): for cname in cnames: test_cname = f"{mocks.random_string()}{cname}" if cname[0] == "." else cname domain = Domain(f"{mocks.random_string()}.com", fetch_standard_records=False) - mocks.mock_web_request_by_providing_static_host_resolution(domain, test_cname) - assert signature.test.check(domain) == True + domain.get_session = ( + mocks.generate_mock_aiohttp_session_with_forced_cname_resolution(test_cname) + ) + assert await signature.test.check(domain) == True diff --git a/tests/signatures_tests/templates/test_cname_or_ip_found_but_string_in_body.py b/tests/signatures_tests/templates/test_cname_or_ip_found_but_string_in_body.py index f86a1f8..943ddf1 100644 --- a/tests/signatures_tests/templates/test_cname_or_ip_found_but_string_in_body.py +++ b/tests/signatures_tests/templates/test_cname_or_ip_found_but_string_in_body.py @@ -4,7 +4,7 @@ cname_or_ip_found_but_string_in_body, ) -from tests import mocks +from ... import mocks import pytest test = cname_or_ip_found_but_string_in_body( @@ -12,25 +12,29 @@ ) -def test_potential_success_with_matching_CNAME(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_CNAME(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] assert test.potential(domain) == True -def test_potential_success_with_matching_ipv4(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_ipv4(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1"] assert test.potential(domain) == True -def test_potential_success_with_matching_ipv6(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_ipv6(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::1"] assert test.potential(domain) == True -def test_potential_failure_no_matching(): +@pytest.mark.asyncio +async def test_potential_failure_no_matching(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["wrong"] domain.A = ["2.2.2.2"] @@ -38,18 +42,20 @@ def test_potential_failure_no_matching(): assert test.potential(domain) == False -def test_check_success(): +@pytest.mark.asyncio +async def test_check_success(): domain = Domain("mock.local", fetch_standard_records=False) mocks.mock_web_response_with_static_value( domain, test.domain_not_configured_message ) - assert test.check(domain) == True + assert await test.check(domain) == True -def test_check_failure(): +@pytest.mark.asyncio +async def test_check_failure(): domain = Domain("mock.local", fetch_standard_records=False) mocks.mock_web_response_with_static_value(domain, "Welcome to my site!") - assert test.check(domain) == False + assert await test.check(domain) == False signatures = [getattr(signatures, signature) for signature in signatures.__all__] @@ -59,7 +65,8 @@ def test_check_failure(): "signature", [s for s in signatures if isinstance(s.test, cname_or_ip_found_but_string_in_body)], ) -def test_check_success_ACTIVE(signature): +@pytest.mark.asyncio +async def test_check_success_ACTIVE(signature): cnames = ( signature.test.cname if type(signature.test.cname) == list @@ -68,8 +75,10 @@ def test_check_success_ACTIVE(signature): for cname in cnames: test_cname = f"{mocks.random_string()}{cname}" if cname[0] == "." else cname domain = Domain(f"{mocks.random_string()}.com", fetch_standard_records=False) - mocks.mock_web_request_by_providing_static_host_resolution(domain, test_cname) - assert signature.test.check(domain) == True + domain.get_session = ( + mocks.generate_mock_aiohttp_session_with_forced_cname_resolution(test_cname) + ) + assert await signature.test.check(domain) == True ips = ( signature.test.ips if type(signature.test.ips) == list else [signature.test.ips] @@ -78,5 +87,7 @@ def test_check_success_ACTIVE(signature): if ":" in ip: continue # skip IPv6 domain = Domain(f"{mocks.random_string()}.com", fetch_standard_records=False) - mocks.mock_web_request_by_providing_static_host_resolution(domain, ip) - assert signature.test.check(domain) == True + domain.get_session = ( + mocks.generate_mock_aiohttp_session_with_forced_ip_resolution(ip) + ) + assert await signature.test.check(domain) == True diff --git a/tests/signatures_tests/templates/test_ip_found_but_string_in_body.py b/tests/signatures_tests/templates/test_ip_found_but_string_in_body.py index 4911712..e9f0a9a 100644 --- a/tests/signatures_tests/templates/test_ip_found_but_string_in_body.py +++ b/tests/signatures_tests/templates/test_ip_found_but_string_in_body.py @@ -3,41 +3,46 @@ from signatures.templates.ip_found_but_string_in_body import ( ip_found_but_string_in_body, ) -from tests import mocks +from ... import mocks import pytest test = ip_found_but_string_in_body(["::1", "1.1.1.1"], "No domain found here", "INFO") -def test_potential_success_with_matching_ipv4(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_ipv4(): domain = Domain("mock.local", fetch_standard_records=False) domain.A = ["1.1.1.1"] assert test.potential(domain) == True -def test_potential_success_with_matching_ipv6(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_ipv6(): domain = Domain("mock.local", fetch_standard_records=False) domain.AAAA = ["::1"] assert test.potential(domain) == True -def test_potential_failure_no_matching_CNAME(): +@pytest.mark.asyncio +async def test_potential_failure_no_matching_CNAME(): domain = Domain("mock.local", fetch_standard_records=False) assert test.potential(domain) == False -def test_check_success(): +@pytest.mark.asyncio +async def test_check_success(): domain = Domain("mock.local", fetch_standard_records=False) mocks.mock_web_response_with_static_value( domain, test.domain_not_configured_message ) - assert test.check(domain) == True + assert await test.check(domain) == True -def test_check_failure(): +@pytest.mark.asyncio +async def test_check_failure(): domain = Domain("mock.local", fetch_standard_records=False) mocks.mock_web_response_with_static_value(domain, "Welcome to my site!") - assert test.check(domain) == False + assert await test.check(domain) == False signatures = [getattr(signatures, signature) for signature in signatures.__all__] @@ -47,7 +52,8 @@ def test_check_failure(): "signature", [s for s in signatures if isinstance(s.test, ip_found_but_string_in_body)], ) -def test_check_success_ACTIVE(signature): +@pytest.mark.asyncio +async def test_check_success_ACTIVE(signature): ips = ( signature.test.ips if type(signature.test.ips) == list else [signature.test.ips] ) @@ -55,5 +61,7 @@ def test_check_success_ACTIVE(signature): if ":" in ip: continue # skip IPv6 domain = Domain(f"{mocks.random_string()}.com", fetch_standard_records=False) - mocks.mock_web_request_by_providing_static_host_resolution(domain, ip) - assert signature.test.check(domain) == True + domain.get_session = ( + mocks.generate_mock_aiohttp_session_with_forced_ip_resolution(ip) + ) + assert await signature.test.check(domain) == True diff --git a/tests/signatures_tests/templates/test_ns_found_but_no_SOA.py b/tests/signatures_tests/templates/test_ns_found_but_no_SOA.py index e7c3533..697d4ea 100644 --- a/tests/signatures_tests/templates/test_ns_found_but_no_SOA.py +++ b/tests/signatures_tests/templates/test_ns_found_but_no_SOA.py @@ -9,13 +9,15 @@ test = ns_found_but_no_SOA("ns1", "mock") -def test_potential_success_with_matching_nameserver(): +@pytest.mark.asyncio +async def test_potential_success_with_matching_nameserver(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["ns1"] assert test.potential(domain) == True -def test_potential_failure(): +@pytest.mark.asyncio +async def test_potential_failure(): domain = Domain("mock.local", fetch_standard_records=False) assert test.potential(domain) == False @@ -23,11 +25,12 @@ def test_potential_failure(): signatures = [getattr(signatures, signature) for signature in signatures.__all__] +@pytest.mark.asyncio @pytest.mark.parametrize( "signature", [s for s in signatures if isinstance(s.test, ns_found_but_no_SOA)], ) -def test_check_success_ACTIVE(signature): +async def test_check_success_ACTIVE(signature): try: ns = signature.test.sample_ns except: @@ -36,4 +39,4 @@ def test_check_success_ACTIVE(signature): domain = Domain(f"mock.local", fetch_standard_records=False) for nameserver in ns: domain.NS = [nameserver] - assert signature.test.check(domain) == True + assert await signature.test.check(domain) == True diff --git a/tests/signatures_tests/test_generic_cname_found_but_404_http.py b/tests/signatures_tests/test_generic_cname_found_but_404_http.py index 6edadd3..cf73f89 100644 --- a/tests/signatures_tests/test_generic_cname_found_but_404_http.py +++ b/tests/signatures_tests/test_generic_cname_found_but_404_http.py @@ -2,38 +2,44 @@ from signatures import _generic_cname_found_but_404_http from collections import namedtuple +import pytest -def test_potential_success_with_a_cname(): +@pytest.mark.asyncio +async def test_potential_success_with_a_cname(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] assert _generic_cname_found_but_404_http.test.potential(domain) == True -def test_potential_failure(): +@pytest.mark.asyncio +async def test_potential_failure(): domain = Domain("mock.local", fetch_standard_records=False) assert _generic_cname_found_but_404_http.test.potential(domain) == False -def test_potential_failure_with_same_root_For_both_domain_and_cname(): +@pytest.mark.asyncio +async def test_potential_failure_with_same_root_For_both_domain_and_cname(): domain = Domain("foo.mock.local", fetch_standard_records=False) domain.CNAME = ["bar.mock.local"] assert _generic_cname_found_but_404_http.test.potential(domain) == False -def test_check_success(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_check_success(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(404) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert _generic_cname_found_but_404_http.test.check(domain) == True + assert await _generic_cname_found_but_404_http.test.check(domain) == True -def test_check_failure(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_check_failure(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(200) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert _generic_cname_found_but_404_http.test.check(domain) == False + assert await _generic_cname_found_but_404_http.test.check(domain) == False diff --git a/tests/signatures_tests/test_generic_cname_found_but_404_https.py b/tests/signatures_tests/test_generic_cname_found_but_404_https.py index 1bed914..3ae4680 100644 --- a/tests/signatures_tests/test_generic_cname_found_but_404_https.py +++ b/tests/signatures_tests/test_generic_cname_found_but_404_https.py @@ -2,38 +2,44 @@ from signatures import _generic_cname_found_but_404_https from collections import namedtuple +import pytest -def test_potential_success_with_a_cname(): +@pytest.mark.asyncio +async def test_potential_success_with_a_cname(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] assert _generic_cname_found_but_404_https.test.potential(domain) == True -def test_potential_failure(): +@pytest.mark.asyncio +async def test_potential_failure(): domain = Domain("mock.local", fetch_standard_records=False) assert _generic_cname_found_but_404_https.test.potential(domain) == False -def test_potential_failure_with_same_root_For_both_domain_and_cname(): +@pytest.mark.asyncio +async def test_potential_failure_with_same_root_For_both_domain_and_cname(): domain = Domain("foo.mock.local", fetch_standard_records=False) domain.CNAME = ["bar.mock.local"] assert _generic_cname_found_but_404_https.test.potential(domain) == False -def test_check_success(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_check_success(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(404) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert _generic_cname_found_but_404_https.test.check(domain) == True + assert await _generic_cname_found_but_404_https.test.check(domain) == True -def test_check_failure(): - def mock_fetch_web(**kwargs): +@pytest.mark.asyncio +async def test_check_failure(): + async def mock_fetch_web(**kwargs): return namedtuple("web_response", ["status_code"])(200) domain = Domain("mock.local", fetch_standard_records=False) domain.fetch_web = mock_fetch_web - assert _generic_cname_found_but_404_https.test.check(domain) == False + assert await _generic_cname_found_but_404_https.test.check(domain) == False diff --git a/tests/signatures_tests/test_generic_cname_found_but_unregistered.py b/tests/signatures_tests/test_generic_cname_found_but_unregistered.py index 1e4d7c1..c23b70b 100644 --- a/tests/signatures_tests/test_generic_cname_found_but_unregistered.py +++ b/tests/signatures_tests/test_generic_cname_found_but_unregistered.py @@ -1,47 +1,58 @@ from domain import Domain from signatures import _generic_cname_found_but_unregistered -from unittest.mock import patch, PropertyMock -from tests import mocks +from unittest.mock import patch, PropertyMock, AsyncMock +from .. import mocks +import pytest -def test_potential_success_with_a_cname_of_two_parts(): +@pytest.mark.asyncio +async def test_potential_success_with_a_cname_of_two_parts(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname.tld"] assert _generic_cname_found_but_unregistered.test.potential(domain) == True -def test_potential_failure(): +@pytest.mark.asyncio +async def test_potential_failure(): domain = Domain("mock.local", fetch_standard_records=False) assert _generic_cname_found_but_unregistered.test.potential(domain) == False -def test_potential_failure_with_three_part_domain(): +@pytest.mark.asyncio +async def test_potential_failure_with_three_part_domain(): domain = Domain("foo.mock.local", fetch_standard_records=False) domain.CNAME = ["subdomain.cname.tld"] assert _generic_cname_found_but_unregistered.test.potential(domain) == False -def test_check_success(): +@pytest.mark.asyncio +async def test_check_success(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname.tld"] with patch( - "domain.Domain.is_registered", return_value=False, new_callable=PropertyMock - ): - assert _generic_cname_found_but_unregistered.test.check(domain) == True + "domain.Domain.is_registered", + new=PropertyMock(spec=AsyncMock()(), return_value=False), + ) as mock: + assert await _generic_cname_found_but_unregistered.test.check(domain) == True + assert mock.await_count == 1 -def test_check_failure(): +@pytest.mark.asyncio +async def test_check_failure(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname.tld"] with patch( - "domain.Domain.is_registered", return_value=True, new_callable=PropertyMock - ): - assert _generic_cname_found_but_unregistered.test.check(domain) == False + "domain.Domain.is_registered", + new=PropertyMock(spec=AsyncMock()(), return_value=True), + ) as mock: + assert await _generic_cname_found_but_unregistered.test.check(domain) == False + assert mock.await_count == 1 -def test_check_success_ACTIVE(): +@pytest.mark.asyncio +async def test_check_success_ACTIVE(): test_cname = f"{mocks.random_string()}.co.uk" domain = Domain(f"{mocks.random_string()}.com", fetch_standard_records=False) domain.CNAME = [test_cname] print(f"Testing cname {test_cname}") - assert _generic_cname_found_but_unregistered.test.check(domain) == True + assert await _generic_cname_found_but_unregistered.test.check(domain) == True diff --git a/tests/signatures_tests/test_generic_cname_found_doesnt_resolve.py b/tests/signatures_tests/test_generic_cname_found_doesnt_resolve.py index 2e653f6..6ce5722 100644 --- a/tests/signatures_tests/test_generic_cname_found_doesnt_resolve.py +++ b/tests/signatures_tests/test_generic_cname_found_doesnt_resolve.py @@ -1,56 +1,65 @@ from domain import Domain from signatures import _generic_cname_found_doesnt_resolve from unittest.mock import patch -from tests import mocks +from .. import mocks +import pytest -def test_potential_success_with_a_cname(): +@pytest.mark.asyncio +async def test_potential_success_with_a_cname(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] assert _generic_cname_found_doesnt_resolve.test.potential(domain) == True -def test_potential_failure(): +@pytest.mark.asyncio +async def test_potential_failure(): domain = Domain("mock.local", fetch_standard_records=False) assert _generic_cname_found_doesnt_resolve.test.potential(domain) == False -def test_potential_failure_with_same_root_For_both_domain_and_cname(): +@pytest.mark.asyncio +async def test_potential_failure_with_same_root_For_both_domain_and_cname(): domain = Domain("foo.mock.local", fetch_standard_records=False) domain.CNAME = ["bar.mock.local"] assert _generic_cname_found_doesnt_resolve.test.potential(domain) == False -def test_potential_failure_with_filtered_cname(): +@pytest.mark.asyncio +async def test_potential_failure_with_filtered_cname(): for cname in _generic_cname_found_doesnt_resolve.filtered_cname_substrings: domain = Domain("foo.mock.local", fetch_standard_records=False) domain.CNAME = [cname] assert _generic_cname_found_doesnt_resolve.test.potential(domain) == False -def test_potential_failure_with_domain_in_cname(): +@pytest.mark.asyncio +async def test_potential_failure_with_domain_in_cname(): domain = Domain("foo.mock.local", fetch_standard_records=False) domain.CNAME = ["foo.mock.local.cdn"] assert _generic_cname_found_doesnt_resolve.test.potential(domain) == False -def test_check_success(): +@pytest.mark.asyncio +async def test_check_success(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] - with patch("domain.Domain.query", return_value=[]): - assert _generic_cname_found_doesnt_resolve.test.check(domain) == True + with patch("resolver.Resolver.resolve", return_value={"NX_DOMAIN": True}): + assert (await _generic_cname_found_doesnt_resolve.test.check(domain)) == True -def test_check_failure(): +@pytest.mark.asyncio +async def test_check_failure(): domain = Domain("mock.local", fetch_standard_records=False) domain.CNAME = ["cname"] - with patch("domain.Domain.query", return_value=["something"]): - assert _generic_cname_found_doesnt_resolve.test.check(domain) == False + with patch("resolver.Resolver.resolve", return_value={"NX_DOMAIN": False}): + assert (await _generic_cname_found_doesnt_resolve.test.check(domain)) == False -def test_check_success_ACTIVE(): +@pytest.mark.asyncio +async def test_check_success_ACTIVE(): test_cname = f"{mocks.random_string()}.io" domain = Domain(f"{mocks.random_string()}.com", fetch_standard_records=False) domain.CNAME = [test_cname] print(f"Testing cname {test_cname}") - assert _generic_cname_found_doesnt_resolve.test.check(domain) == True + assert (await _generic_cname_found_doesnt_resolve.test.check(domain)) == True diff --git a/tests/signatures_tests/test_generic_zone_missing_on_ns.py b/tests/signatures_tests/test_generic_zone_missing_on_ns.py index 9948436..44f00ec 100644 --- a/tests/signatures_tests/test_generic_zone_missing_on_ns.py +++ b/tests/signatures_tests/test_generic_zone_missing_on_ns.py @@ -1,25 +1,30 @@ from domain import Domain from signatures import _generic_zone_missing_on_ns +import pytest -def test_potential_success_with_a_nameserver(): +@pytest.mark.asyncio +async def test_potential_success_with_a_nameserver(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["ns"] assert _generic_zone_missing_on_ns.test.potential(domain) == True -def test_potential_success_with_multiple_nameservers(): +@pytest.mark.asyncio +async def test_potential_success_with_multiple_nameservers(): domain = Domain("mock.local", fetch_standard_records=False) domain.NS = ["ns1", "ns2"] assert _generic_zone_missing_on_ns.test.potential(domain) == True -def test_potential_failure(): +@pytest.mark.asyncio +async def test_potential_failure(): domain = Domain("mock.local", fetch_standard_records=False) assert _generic_zone_missing_on_ns.test.potential(domain) == False -def test_potential_failure_with_same_root_for_both_domain_and_ns(): +@pytest.mark.asyncio +async def test_potential_failure_with_same_root_for_both_domain_and_ns(): domain = Domain("something.mock.local", fetch_standard_records=False) domain.NS = ["ns1.mock.local"] assert _generic_zone_missing_on_ns.test.potential(domain) == False