Skip to content

Commit

Permalink
Improved sendgrid advisory script
Browse files Browse the repository at this point in the history
  • Loading branch information
DGovEnterprise committed Apr 28, 2024
1 parent 18cdb73 commit 8b5997c
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 39 deletions.
157 changes: 125 additions & 32 deletions .github/scripts/send-advisory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
import sys, re, os
import json, sys, re, os, time
from sendgrid import SendGridAPIClient
from pathlib import Path
from markdown import markdown
Expand All @@ -8,10 +8,44 @@

added_files = check_output(["git", "diff", "--name-only", "-r", "--diff-filter=A", "HEAD^"]).decode("utf8").split("\n")
latest_advisory = max(Path("docs/advisories").glob("*.md"))
advisory_uid = latest_advisory.name[:11]
latest_advisory_uid = latest_advisory.name[:11]
advisory_uid = {}
advisory_mds = {}
base_url = "https://soc.cyber.wa.gov.au/advisories/"


def email_lookup (advisory_uid):

sg = SendGridAPIClient(os.environ.get('sendgrid_api'))

response = sg.client.marketing.singlesends.search.post(
request_body={"name":advisory_uid}
)
response_output = json.loads(response.body)
results = response_output["result"]
#print(results)

if len(results) > 0:
result_name = results[0]["name"]
result_send_time = results[0]["send_at"]
result_status = results[0]["status"]
result_id = results[0]["id"]
print(f"\nNOTE: The advisory '{result_name}' already exists and was sent at '{result_send_time}' with '{result_status}' status. Sengrid ID: {result_id}")
return True, result_name
else:
print("NOTE: No existing advisory found")
return False, ''

def email_delete (advisory_uid):

lookup = email_lookup(advisory_uid)
if lookup[0] is True:
#sg.client.marketing.singlesends._(result_id).delete() #Off by default
print(f"NOTE: The advisory {lookup[1]} was deleted")
else:
print(f"No existing advisory found. Cannot delete advisory {advisory_uid}")
sys.exit()

def email_campaign (uid, title, overview, url, content):

sg = SendGridAPIClient(os.environ.get('sendgrid_api'))
Expand Down Expand Up @@ -49,39 +83,98 @@ def email_campaign (uid, title, overview, url, content):
#print(response.body)
#print(response.headers)

def send_campaign (advisory_content):


for source, advisory_md in advisory_content.items():
if source.startswith(base_url):
url = source
print(f"\nCollecting advisory data from source: {source}")
if source.endswith(".md"):
url = base_url + source[:-3] + "/"
print(f"\nCollecting advisory data from source: {source}")
sections = advisory_md.split("##")
title, overview = None, None

for section in sections:
if not title and section.startswith("#"):
title = section[1:].strip()[:-14]
if section.strip().lower().startswith("overview"):
overview = section[9:].strip()
break

lookup = email_lookup(advisory_uid)
if lookup[0] is True:
# An Advisory already exists with this UID
print("\nERROR: Advisory already exists. Requested advisory will be skipped")
print("\n-------------------------")
else:
html_overview = markdown(overview)
html_filled = Template(Path("templates/tlp-clear-email-template.html").read_text()).substitute(title=title, overview=html_overview, url=url, uid=advisory_uid)
#print(html_filled) # Reviewing the filled email template for sendgrid
email_campaign(advisory_uid, title, html_overview, url, html_filled)


arguments = sys.argv[1:]

options = sys.argv[2:]

if len(arguments) < 1:
print(f"Error: at least 1 argument expected, {len(arguments)} arguments given. Add a SOC Advisory URL, Advisory Markdown file or type '-b', '--bulk'")
sys.exit()
if len(options) < 1 and arguments[0] not in ['-b', '--bulk', '-a', '--auto']: #or arguments[0] != '--auto':
print(f"Error: at least 1 option expected, {len(arguments)} options given. Choose the following options: ('-n', '--new', '-d', '--delete', '-u', '--update')")
sys.exit()



if sys.argv[1].startswith(base_url):
# If a url is provided, find the relevant advisory and send it
input_url = sys.argv[1]
advisory_id = re.search(r'(?<=advisories/)\d+(?=-)', input_url).group(0).strip()
advisory_mds[input_url] = max(Path(f"docs/advisories").glob(f"{advisory_id}-*.md")).read_text()
advisory_uid = re.search(r'(?<=advisories/)\d+(?=-)', input_url).group(0).strip()
advisory_mds[input_url] = max(Path(f"docs/advisories").glob(f"{advisory_uid}-*.md")).read_text()
elif sys.argv[1].endswith(".md"):
# If a markdown is provided, find the relevant content and send it
markdown_file = sys.argv[1]
advisory_uid = markdown_file[:11]
advisory_mds[markdown_file] = max(Path(f"docs/advisories").glob(f"{advisory_uid}-*.md")).read_text()
elif sys.argv[1] == '-b' or sys.argv[1] == '--bulk':
number_of_file = input("To bulk create sendgrid advisories, please enter the 'number' of latest advisories: ")
all_files = Path("docs/advisories").glob("*.md")
sorted_files = sorted(all_files, key=lambda item: item.name)
requested_files = (sorted_files)[-int(number_of_file):]
for file in requested_files:
#print(f"{file}")
advisory_uid = file.name[:11]
content = {file.name:max(Path(f"docs/advisories").glob(f"{file.name}")).read_text()}
send_campaign(content)
elif sys.argv[1] == '-a' or sys.argv[1] == '--auto':
look_back_time = time.time() - 1 * 60 * 60 #1 hr look back
all_files = Path("docs/advisories").glob("*.md")
sorted_files = sorted(all_files, key=lambda item: item.name)
for file in sorted_files:
if os.path.getctime(file) > look_back_time:
advisory_uid = file.name[:11]
print(f"Advisory File Number {advisory_uid}: {file} was created less than a hour ago")
content = {file.name:max(Path(f"docs/advisories").glob(f"{file.name}")).read_text()}
send_campaign(content)
else:
print(f"input {sys.argv[1]} doesn't start with {base_url}")
# Figure out if file is in latest set of added files, and send if so
for file in added_files:
print(file)
if file.startswith(f"docs/advisories/{advisory_uid}"):
file = Path(file)
url = base_url + file.stem
advisory_mds[url] = file.read_text()

print("Preparing to send below advisories: ")

for url, advisory_md in advisory_mds.items():
print(url)
sections = advisory_md.split("##")
title, overview = None, None

for section in sections:
if not title and section.startswith("#"):
title = section[1:].strip()[:-14]
if section.strip().lower().startswith("overview"):
overview = section[9:].strip()
break

html_overview = markdown(overview)
html_filled = Template(Path("templates/tlp-clear-email-template.html").read_text()).substitute(title=title, overview=html_overview, url=url, uid=advisory_uid)

email_campaign(advisory_uid, title, html_overview, url, html_filled)
print(f"ERROR: Input {sys.argv[1]} doesn't start with {base_url} or is a markdown file or is contains type '-a', '--auto'")
sys.exit()

#print()
if options:
for option in options:
if option not in ('-n', '--new', '-d', '--delete', '-u', '--update'):
print(f"ERROR: {option} not available! Exiting Program")
sys.exit()
for option in options:
if option == '-n' or option == '--new':
print(f"New Option Selected - Manually creating advisory {advisory_uid}")
send_campaign(advisory_mds)
if option == '-u' or option == '--update':
print(f"Update Option Selected - Deleting old advisory {advisory_uid} and then creating new {advisory_uid}")
email_delete(advisory_uid)
send_campaign(advisory_mds)
if option == '-d' or option == '--delete':
print(f"Delete Option selected - Deleting advisory {advisory_uid}")
email_delete(advisory_uid)
12 changes: 5 additions & 7 deletions .github/workflows/send-advisory.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ name: Send advisory
on:
push:
branches: [main]
workflow_dispatch:
inputs:
advisory_url:
type: string
pull_request:
branches: [main]


env: # sendgrid api token
Expand All @@ -17,7 +15,7 @@ permissions:

jobs:
send-modified-files:
name: Send advisory files added today, or send advisory from input url
name: Send advisory files created in last hour
runs-on: ubuntu-latest
steps:
- name: Harden Runner
Expand All @@ -33,7 +31,7 @@ jobs:
with:
python-version: '3.11'
cache: 'pip' # caching pip dependencies
- name: install mkdocs
- name: install requirements
run: pip install -r requirements.txt
- name: Send advisories
run: ./.github/scripts/send-advisory.py "${{ inputs.advisory_url }}"
run: ./.github/scripts/send-advisory.py --auto

0 comments on commit 8b5997c

Please sign in to comment.