Skip to content

Commit

Permalink
feat: converted IP&URI plugin to new base class
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Nov 25, 2024
1 parent b7c4bac commit dda840c
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 176 deletions.
169 changes: 91 additions & 78 deletions src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,118 @@
from __future__ import annotations

import logging
from contextlib import suppress
from itertools import product
from pathlib import Path
from re import search
from typing import TYPE_CHECKING, List, Optional

import geoip2.database
from common_analysis_ip_and_uri_finder import CommonAnalysisIPAndURIFinder
from geoip2.errors import AddressNotFoundError
from maxminddb.errors import InvalidDatabaseError
from pydantic import BaseModel

from analysis.plugin import AnalysisPluginV0
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin

from analysis.PluginBase import AnalysisBasePlugin
if TYPE_CHECKING:
from io import FileIO

GEOIP_DATABASE_PATH = Path(__file__).parent.parent / 'bin/GeoLite2-City/GeoLite2-City.mmdb'

IP_V4_BLACKLIST = [r'127.0.[0-9]+.1', r'255.[0-9]+.[0-9]+.[0-9]+'] # localhost # subnet masks
IP_V6_BLACKLIST = [r'^[0-9A-Za-z]::$', r'^::[0-9A-Za-z]$', r'^[0-9A-Za-z]::[0-9A-Za-z]$', r'^::$'] # trivial addresses


class AnalysisPlugin(AnalysisBasePlugin):
NAME = 'ip_and_uri_finder'
DEPENDENCIES = [] # noqa: RUF012
MIME_WHITELIST = [ # noqa: RUF012
'text/plain',
'application/octet-stream',
'application/x-executable',
'application/x-object',
'application/x-sharedlib',
'application/x-dosexec',
]
DESCRIPTION = 'Search file for IP addresses and URIs based on regular expressions.'
VERSION = '0.4.2'
FILE = __file__

def additional_setup(self):
class IpAddress(BaseModel):
address: str
location: Optional[Location]


class Location(BaseModel):
longitude: float
latitude: float


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
ips_v4: List[IpAddress]
ips_v6: List[IpAddress]
uris: List[str]

def __init__(self):
self.ip_and_uri_finder = CommonAnalysisIPAndURIFinder()
try:
self.reader = geoip2.database.Reader(str(GEOIP_DATABASE_PATH))
except FileNotFoundError:
logging.error('could not load GeoIP database')
self.reader = None

def process_object(self, file_object):
result = self.ip_and_uri_finder.analyze_file(file_object.file_path, separate_ipv6=True)

for key in ['uris', 'ips_v4', 'ips_v6']:
result[key] = self._remove_duplicates(result[key])
result['ips_v4'] = self._remove_blacklisted(result['ips_v4'], IP_V4_BLACKLIST)
result['ips_v6'] = self._remove_blacklisted(result['ips_v6'], IP_V6_BLACKLIST)

file_object.processed_analysis[self.NAME] = self._get_augmented_result(self.add_geo_uri_to_ip(result))

return file_object

def _get_augmented_result(self, result):
result['summary'] = self._get_summary(result)
result['system_version'] = self.ip_and_uri_finder.system_version
return result

def add_geo_uri_to_ip(self, result):
for key in ['ips_v4', 'ips_v6']:
result[key] = self.link_ips_with_geo_location(result[key])
return result

def find_geo_location(self, ip_address):
response = self.reader.city(ip_address)
return f'{response.location.latitude}, {response.location.longitude}'

def link_ips_with_geo_location(self, ip_addresses):
linked_ip_geo_list = []
for ip in ip_addresses:
try:
ip_tuple = ip, self.find_geo_location(ip)
except (
AttributeError,
AddressNotFoundError,
FileNotFoundError,
ValueError,
InvalidDatabaseError,
) as exception:
logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True)
ip_tuple = ip, ''
linked_ip_geo_list.append(ip_tuple)
return linked_ip_geo_list

@staticmethod
def _get_summary(results):
summary = []
summary.extend(results['uris'])
for key in ['ips_v4', 'ips_v6']:
for ip, *_ in results[key]: # IP results come in tuples (ip, latitude, longitude)
summary.append(ip)
super().__init__(
metadata=self.MetaData(
name='ip_and_uri_finder',
description='Search file for IP addresses and URIs based on regular expressions.',
version='1.0.0',
Schema=self.Schema,
mime_whitelist=[
'text/plain',
'application/octet-stream',
'application/x-executable',
'application/x-object',
'application/x-sharedlib',
'application/x-dosexec',
],
system_version=self.ip_and_uri_finder.system_version,
),
)

def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema:
del virtual_file_path, analyses
ip_data = self.ip_and_uri_finder.analyze_file(file_handle.name, separate_ipv6=True)
ip_v4_results = _remove_blacklisted(_remove_duplicates(ip_data['ips_v4']), IP_V4_BLACKLIST)
ip_v6_results = _remove_blacklisted(_remove_duplicates(ip_data['ips_v6']), IP_V6_BLACKLIST)
uris = _remove_duplicates(ip_data['uris'])
return self.Schema(
ips_v4=[IpAddress(address=ip, location=self.find_geo_location(ip)) for ip in ip_v4_results],
ips_v6=[IpAddress(address=ip, location=self.find_geo_location(ip)) for ip in ip_v6_results],
uris=uris,
)

def find_geo_location(self, ip_address: str) -> Location | None:
if self.reader is None:
return None
try:
response = self.reader.city(ip_address)
return Location(
longitude=float(response.location.longitude),
latitude=float(response.location.latitude),
)
except (
AttributeError,
AddressNotFoundError,
FileNotFoundError,
ValueError,
InvalidDatabaseError,
) as exception:
logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True)
return None

def summarize(self, result: Schema) -> list:
summary = [*result.uris]
for ip_list in [result.ips_v4, result.ips_v6]:
for ip in ip_list:
summary.append(ip.address)
return summary

@staticmethod
def _remove_duplicates(input_list):
return list(set(input_list))

@staticmethod
def _remove_blacklisted(ip_list, blacklist):
for ip, blacklist_entry in product(ip_list, blacklist):
if search(blacklist_entry, ip):
with suppress(ValueError):
ip_list.remove(ip)
return ip_list

def _remove_duplicates(input_list: list[str]) -> list[str]:
return list(set(input_list))


def _remove_blacklisted(ip_list: list[str], blacklist: list[str]) -> list[str]:
for ip, blacklist_entry in product(ip_list, blacklist):
if search(blacklist_entry, ip):
with suppress(ValueError):
ip_list.remove(ip)
return ip_list
129 changes: 45 additions & 84 deletions src/plugins/analysis/ip_and_uri_finder/test/test_ip_and_uri_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import tempfile
from collections import namedtuple
from pathlib import Path

import pytest
from geoip2.errors import AddressNotFoundError

from objects.file import FileObject

from ..code.ip_and_uri_finder import AnalysisPlugin
from ..code.ip_and_uri_finder import AnalysisPlugin, _remove_blacklisted

MockResponse = namedtuple('MockResponse', ['location'])
MockLocation = namedtuple('MockLocation', ['latitude', 'longitude'])
Expand Down Expand Up @@ -51,97 +50,59 @@ def ip_and_uri_finder_plugin(analysis_plugin):
@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestAnalysisPluginIpAndUriFinder:
def test_process_object_ips(self, ip_and_uri_finder_plugin):
with tempfile.NamedTemporaryFile() as tmp:
with open(tmp.name, 'w') as fp: # noqa: PTH123
fp.write(
'1.2.3.4 abc 1.1.1.1234 abc 3. 3. 3. 3 abc 1255.255.255.255 1234:1234:abcd:abcd:1234:1234:abcd:abc'
'd xyz 2001:db8::8d3:: xyz 2001:db8:0:0:8d3::'
)
tmp_fo = FileObject(file_path=tmp.name)
processed_object = ip_and_uri_finder_plugin.process_object(tmp_fo)
results = processed_object.processed_analysis[ip_and_uri_finder_plugin.NAME]
assert results['uris'] == []
assert {
('1.2.3.4', '47.913, -122.3042'),
('1.1.1.123', '-37.7, 145.1833'),
} == set(results['ips_v4'])
assert len(
[
('1.2.3.4', '47.913, -122.3042'),
('1.1.1.123', '-37.7, 145.1833'),
]
) == len(results['ips_v4'])
assert {
('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1'),
('2001:db8:0:0:8d3::', '3.1, 3.1'),
} == set(results['ips_v6'])
assert len(
[
('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1'),
('2001:db8:0:0:8d3::', '3.1, 3.1'),
]
) == len(results['ips_v6'])
with tempfile.NamedTemporaryFile() as tmp, Path(tmp.name).open('w') as fp:
fp.write(
'1.2.3.4 abc 1.1.1.1234 abc 3. 3. 3. 3 abc 1255.255.255.255 1234:1234:abcd:abcd:1234:1234:abcd:abc'
'd xyz 2001:db8::8d3:: xyz 2001:db8:0:0:8d3::'
)
fp.seek(0)
results = ip_and_uri_finder_plugin.analyze(fp, {}, {})
assert results.uris == []
assert len(results.ips_v4) == 2
ip_v4_addresses = {ipa.address: f'{ipa.location.latitude}, {ipa.location.longitude}' for ipa in results.ips_v4}
assert ip_v4_addresses == {
'1.2.3.4': '47.913, -122.3042',
'1.1.1.123': '-37.7, 145.1833',
}
assert len(results.ips_v6) == 2
ip_v6_addresses = {ipa.address: f'{ipa.location.latitude}, {ipa.location.longitude}' for ipa in results.ips_v6}
assert ip_v6_addresses == {
'1234:1234:abcd:abcd:1234:1234:abcd:abcd': '2.1, 2.1',
'2001:db8:0:0:8d3::': '3.1, 3.1',
}

assert set(ip_and_uri_finder_plugin.summarize(results)) == {*ip_v4_addresses, *ip_v6_addresses}

def test_process_object_uris(self, ip_and_uri_finder_plugin):
with tempfile.NamedTemporaryFile() as tmp:
with open(tmp.name, 'w') as fp: # noqa: PTH123
fp.write(
'http://www.google.de https://www.test.de/test/?x=y&1=2 ftp://ftp.is.co.za/rfc/rfc1808.txt '
'telnet://192.0.2.16:80/'
)
tmp_fo = FileObject(file_path=tmp.name)
processed_object = ip_and_uri_finder_plugin.process_object(tmp_fo)
results = processed_object.processed_analysis[ip_and_uri_finder_plugin.NAME]
assert {
with tempfile.NamedTemporaryFile() as tmp, Path(tmp.name).open('w') as fp:
fp.write(
'http://www.google.de https://www.test.de/test/?x=y&1=2 ftp://ftp.is.co.za/rfc/rfc1808.txt '
'telnet://192.0.2.16:80/'
)
fp.seek(0)
results = ip_and_uri_finder_plugin.analyze(fp, {}, {})
assert set(results.uris) == {
'http://www.google.de',
'https://www.test.de/test/',
'ftp://ftp.is.co.za/rfc/rfc1808.txt',
'telnet://192.0.2.16:80/',
} == set(results['uris'])
assert len(
[
'http://www.google.de',
'https://www.test.de/test/',
'ftp://ftp.is.co.za/rfc/rfc1808.txt',
'telnet://192.0.2.16:80/',
]
) == len(results['uris'])

def test_add_geo_uri_to_ip(self, ip_and_uri_finder_plugin):
test_data = {
'ips_v4': ['128.101.101.101', '255.255.255.255'],
'ips_v6': ['1234:1234:abcd:abcd:1234:1234:abcd:abcd'],
'uris': 'http://www.google.de',
}
results = ip_and_uri_finder_plugin.add_geo_uri_to_ip(test_data)
assert results['uris'] == 'http://www.google.de'
assert [('128.101.101.101', '44.9759, -93.2166'), ('255.255.255.255', '0.0, 0.0')] == results['ips_v4']
assert [('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1')] == results['ips_v6']
assert len(results.uris) == 4

assert set(ip_and_uri_finder_plugin.summarize(results)) == set(results.uris).union({'192.0.2.16'})

def test_find_geo_location(self, ip_and_uri_finder_plugin):
assert ip_and_uri_finder_plugin.find_geo_location('128.101.101.101') == '44.9759, -93.2166'
assert ip_and_uri_finder_plugin.find_geo_location('127.101.101.101') == '4.1, 4.1'

with pytest.raises(AddressNotFoundError):
ip_and_uri_finder_plugin.find_geo_location('1.1.2.345')
with pytest.raises(ValueError): # noqa: PT011
ip_and_uri_finder_plugin.find_geo_location('aaa')

def test_link_ips_with_geo_location(self, ip_and_uri_finder_plugin):
ip_addresses = ['128.101.101.101', '255.255.255.255']
expected_results = [('128.101.101.101', '44.9759, -93.2166'), ('255.255.255.255', '0.0, 0.0')]
assert ip_and_uri_finder_plugin.link_ips_with_geo_location(ip_addresses) == expected_results

def test_get_summary(self):
results = {
'uris': ['http://www.google.de'],
'ips_v4': [('128.101.101.101', '44.9759, -93.2166')],
'ips_v6': [('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1')],
}
expected_results = ['http://www.google.de', '128.101.101.101', '1234:1234:abcd:abcd:1234:1234:abcd:abcd']
assert AnalysisPlugin._get_summary(results), expected_results
location = ip_and_uri_finder_plugin.find_geo_location('128.101.101.101')
assert location.latitude == 44.9759
assert location.longitude == -93.2166
location = ip_and_uri_finder_plugin.find_geo_location('127.101.101.101')
assert location.latitude == 4.1
assert location.longitude == 4.1

assert ip_and_uri_finder_plugin.find_geo_location('1.1.2.345') is None
assert ip_and_uri_finder_plugin.find_geo_location('aaa') is None

def test_remove_blacklisted(self, ip_and_uri_finder_plugin):
input_list = ['1.1.1.1', 'blah', '0.0.0.0']
blacklist = [r'[0-9].{4}', r'x.y']
assert ip_and_uri_finder_plugin._remove_blacklisted(input_list, blacklist) == ['blah']
assert _remove_blacklisted(input_list, blacklist) == ['blah']
14 changes: 4 additions & 10 deletions src/plugins/analysis/ip_and_uri_finder/view/ip_and_uri_finder.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,23 @@
<td>
{% if key == "ips_v4" %}IPv4
{% elif key == "ips_v6" %}IPv6
{% elif key == "ips" %}IP
{% else %}URI{% endif %}
</td>
<td class="p-0">
<ul class="list-group p-0 mb-0" style="width: 100%;">
{% for item in value %}
{% if key == "ips_v6" or key == "ips_v4" %}
<li class="list-group-item d-flex justify-content-between align-items-center rounded-0">
{{ item[0] }}
{% if item[1] %}
<a href="https://www.google.de/maps/place/{{ item[1] }}">
{{ item.address }}
{% if item.location %}
<a href="https://www.google.de/maps/place/{{ item.location.latitude }},{{ item.location.longitude }}">
<i class="fas fa-map-marker-alt"></i>
</a>
{% endif %}
</li>
{% else %}
<li class="list-group-item rounded-0">
{% if key != 'ips' %}
<a href="{{ item }}">{{ item }}</a>
{% else %}
{{ item }}
{% endif %}
<a href="{{ item }}">{{ item }}</a>
</li>
{% endif %}
{% endfor %}
Expand All @@ -40,4 +35,3 @@
{% endfor %}

{% endblock %}

Loading

0 comments on commit dda840c

Please sign in to comment.