Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add crawler and matcher scripts #66

Open
wants to merge 2 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
from models import RegistrationKey, SiteBase, SiteIndicator, User
from modules.reference import DEFAULTS, ENGINES, LANGUAGES, COUNTRIES, LANGUAGES_YANDEX, LANGUAGES_YAHOO, COUNTRIES_YAHOO, COUNTRY_LANGUAGE_DUCKDUCKGO, DOMAINS_GOOGLE, INDICATOR_METADATA, MATCH_VALUES_TO_IGNORE
# Import all your functions here
from modules.crawler import crawl_one_or_more_urls, annotate_indicators
from modules.matcher import find_matches
from modules.crawl import crawl_one_or_more_urls, annotate_indicators
from modules.match import find_matches
from modules.email_utils import send_results_email

app = init_app(os.getenv("CONFIG_MODE"))
Expand Down
85 changes: 85 additions & 0 deletions crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os
import argparse
import logging
from pathlib import Path

import pandas as pd
from modules.crawl import crawl, get_domain_name



def write_domain_indicators(domain, indicators, output_file):
attribution_table = pd.DataFrame(
columns=["indicator_type", "indicator_content"],
data=(indicator.to_dict() for indicator in indicators),
)
attribution_table['domain_name'] = domain
# this is done so if anything bad happens to break the script, we still get partial results
# this approach also keeps the indicators list from becoming huge and slowing down
if Path(output_file).exists():
attribution_table.to_csv(
output_file,
index=False,
mode="a",
encoding="utf-8",
header=False,
)
else:
attribution_table.to_csv(
output_file,
index=False,
mode="w",
encoding="utf-8",
header=True,
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Match indicators across sites.", add_help=False
)
parser.add_argument(
"-f",
"--input-file",
type=str,
help="file containing list of domains",
required=False,
default=os.path.join(".", "sites_of_concern.csv"),
)
parser.add_argument(
"-c", "--domain-column", type=str, required=False, default="Domain"
)
# option to run urlscan
parser.add_argument("-u", "--run-urlscan", type=bool, required=False, default=False)

parser.add_argument(
"-o",
"--output-file",
type=str,
help="file to save final list of match results",
required=False,
default=os.path.join(".", "indicators_output.csv"),
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler()
]
)

args = parser.parse_args()
domain_col = args.domain_column
output_file = args.output_file
run_urlscan = args.run_urlscan
input_data = pd.read_csv(args.input_file)
domains = input_data[domain_col]
for domain in domains:
try:
print(f"Processing {domain}")
domain_name = get_domain_name(domain)
indicators = crawl(domain, run_urlscan=run_urlscan)
write_domain_indicators(domain_name, indicators, output_file=output_file)
except Exception as e:
logging.error(f"Failing error on {domain}. See traceback below. Soldiering on...")
logging.error(e, exc_info=True)
70 changes: 70 additions & 0 deletions matcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import argparse
import logging
from pathlib import Path
import pandas as pd

from modules.match import find_matches

def define_output_filename(file1, file2=None):
if file2:
return f"{Path(file1).stem}_{Path(file2).stem}_results.csv"
return f"{Path(file1).stem}_results.csv"


def main(input_file, compare_file, output_file, comparison_type):
data1 = pd.read_csv(input_file)
if comparison_type == "compare" and compare_file:
data2 = pd.read_csv(compare_file)
matches: pd.DataFrame = find_matches(data1, data2)
else:
matches = find_matches(data1)
logging.info(f"Matches found: {matches.shape[0]}")
logging.info(
f"Summary of matches:\n{matches.groupby('match_type')['match_value'].count()}"
)
if not output_file:
output_file = define_output_filename(input_file, compare_file)
matches.to_csv(output_file, index=False)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Match indicators across sites.", add_help=False
)
parser.add_argument(
"-f", "--input-file", type=str, help="file of indicators to match",
default="./indicators_output.csv"
)
parser.add_argument(
"-o",
"--output-file",
type=str,
help="file to save final list of match results",
required=False,
default="matching_results.csv"
)

parser.add_argument(
"-c",
"--comparison-type",
type=str,
help="type of comparison to run, pairwise or one-to-one compare",
required=False,
default="pairwise",
)
parser.add_argument(
"-cf",
"--compare-file",
type=str,
help="file of indicators to compare against",
required=False,
default="./comparison_indicators.csv",
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
args = parser.parse_args()

main(**vars(args))
Loading