generated from cds-snc/project-template
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use Notify API endpoint to automatically revoke a key (#316)
* Changing logic to use Notify's endpoint * Adding new notify integration * Addng notify tests * Adding additional unti tests * Writing additional unit tests * Adding additional secrets to the github actions * Adding a timeout to the post request * Fixing up a typo * Removing the api key in the message * Adding service_id to the message plus posting to the right notifications channel * Adding the Notify ops channel id to the unit tests env.varialbes
- Loading branch information
1 parent
cd00068
commit d05102d
Showing
7 changed files
with
306 additions
and
97 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
import os | ||
import jwt | ||
import time | ||
import calendar | ||
import logging | ||
import requests | ||
import json | ||
|
||
|
||
# generate the epoch seconds for the jwt token | ||
def epoch_seconds(): | ||
return calendar.timegm(time.gmtime()) | ||
|
||
|
||
def create_jwt_token(secret, client_id): | ||
""" | ||
Generate a JWT Token for the Notify API | ||
Tokens have a header consisting of: | ||
{ | ||
"typ": "JWT", | ||
"alg": "HS256" | ||
} | ||
Parameters: | ||
secret: Application signing secret | ||
client_id: Identifier for the client | ||
Claims are: | ||
iss: identifier for the client | ||
iat: epoch seconds for the token (UTC) | ||
Returns a JWT token for this request | ||
""" | ||
assert secret, "Missing secret key" | ||
assert client_id, "Missing client id" | ||
|
||
headers = {"typ": "JWT", "alg": "HS256"} | ||
|
||
claims = {"iss": client_id, "iat": epoch_seconds()} | ||
t = jwt.encode(payload=claims, key=secret, headers=headers) | ||
if isinstance(t, str): | ||
return t | ||
else: | ||
return t.decode() | ||
|
||
|
||
# Function to create the authorization header for the Notify API | ||
def create_authorization_header(): | ||
# get the client_id and secret from the environment variables | ||
client_id = os.getenv("NOTIFY_SRE_USER_NAME") | ||
secret = os.getenv("NOTIFY_SRE_CLIENT_SECRET") | ||
|
||
# If the client_id or secret is missing, raise an assertion error | ||
assert client_id, "NOTIFY_SRE_USER_NAME is missing" | ||
assert secret, "NOTIFY_SRE_CLIENT_SECRET is missing" | ||
|
||
# Create the jwt token and return the authorization header | ||
token = create_jwt_token(secret=secret, client_id=client_id) | ||
return "Authorization", "Bearer {}".format(token) | ||
|
||
|
||
# Function to post an api call to Notify | ||
def post_event(url, payload): | ||
# Create the authorization headers | ||
header_key, header_value = create_authorization_header() | ||
header = {header_key: header_value, "Content-Type": "application/json"} | ||
|
||
# Post the response | ||
response = requests.post(url, data=json.dumps(payload), headers=header, timeout=60) | ||
return response | ||
|
||
|
||
# Function to revoke an api key by calling Notify's revoke api endpoint | ||
def revoke_api_key(api_key, api_type, github_repo, source): | ||
# get the url and jwt_token | ||
url = os.getenv("NOTIFY_API_URL") | ||
|
||
if url is None: | ||
logging.error("NOTIFY_API_URL is missing") | ||
return False | ||
|
||
# append the revoke-endpoint to the url | ||
url = url + "/sre-tools/api-key-revoke" | ||
|
||
# generate the payload | ||
payload = { | ||
"token": api_key, | ||
"type": api_type, | ||
"url": github_repo, | ||
"source": source, | ||
} | ||
|
||
# post the event (ie call the api) | ||
response = post_event(url, payload) | ||
# A successful response has a status code of 201 | ||
if response.status_code == 201: | ||
logging.info(f"API key {api_key} has been successfully revoked") | ||
return True | ||
else: | ||
logging.error( | ||
f"API key {api_key} could not be revoked. Response code: {response.status_code}" | ||
) | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
black==22.12.0 | ||
coverage==6.5.0 | ||
flake8==6.1.0 | ||
freezegun==1.2.2 | ||
pytest==7.4.3 | ||
pytest-asyncio==0.21.1 | ||
pytest-env==0.8.2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import jwt | ||
import os | ||
import pytest | ||
from integrations import notify | ||
from unittest.mock import patch | ||
from freezegun import freeze_time | ||
|
||
|
||
# helper function to decode the token for testing | ||
def decode_token(token, secret): | ||
return jwt.decode( | ||
token, key=secret, options={"verify_signature": True}, algorithms=["HS256"] | ||
) | ||
|
||
|
||
# Test that an exception is raised if the secret is missing | ||
def test_create_jwt_token_secret_missing(): | ||
with pytest.raises(AssertionError) as err: | ||
notify.create_jwt_token(None, "client_id") | ||
assert str(err.value) == "Missing secret key" | ||
|
||
|
||
# Test that an exception is raised if the client_id is missing | ||
def test_create_jwt_token_client_id_missing(): | ||
with pytest.raises(AssertionError) as err: | ||
notify.create_jwt_token("secret", None) | ||
assert str(err.value) == "Missing client id" | ||
|
||
|
||
# Test that the token is created correctly and the type and alg headers are set correctly | ||
def test_create_jwt_token_contains_correct_headers(): | ||
token = notify.create_jwt_token("secret", "client_id") | ||
headers = jwt.get_unverified_header(token) | ||
assert headers["typ"] == "JWT" | ||
assert headers["alg"] == "HS256" | ||
|
||
|
||
# Test that the claims headers are set correctly | ||
def test_create_jwt_token_contains_correct_claims_headers(): | ||
token = notify.create_jwt_token("secret", "client_id") | ||
decoded_token = decode_token(token, "secret") | ||
assert decoded_token["iss"] == "client_id" | ||
assert "iat" in decoded_token | ||
assert "req" not in decoded_token | ||
assert "pay" not in decoded_token | ||
|
||
|
||
# Test that the correct iat time in epoch seconds is set correctly | ||
@freeze_time("2020-01-01 00:00:00") | ||
def test_token_contains_correct_iat(): | ||
token = notify.create_jwt_token("secret", "client_id") | ||
decoded_token = decode_token(token, "secret") | ||
assert decoded_token["iat"] == 1577836800 | ||
|
||
|
||
# Test that an assertion error is raised if the NOTIFY_SRE_USER_NAME is missing | ||
@patch.dict(os.environ, {"NOTIFY_SRE_USER_NAME": "", "NOTIFY_SRE_CLIENT_SECRET": "foo"}) | ||
@patch("integrations.notify.create_jwt_token") | ||
def test_authorization_header_missing_client_id(jwt_token_mock): | ||
with pytest.raises(AssertionError) as err: | ||
notify.create_authorization_header() | ||
assert str(err.value) == "NOTIFY_SRE_USER_NAME is missing" | ||
|
||
|
||
# Test that an assertion error is raised if the NOTIFY_SRE_CLIENT_SECRET is missing | ||
@patch.dict(os.environ, {"NOTIFY_SRE_USER_NAME": "foo", "NOTIFY_SRE_CLIENT_SECRET": ""}) | ||
@patch("integrations.notify.create_jwt_token") | ||
def test_authorization_header_missing_secret(jwt_token_mock): | ||
with pytest.raises(AssertionError) as err: | ||
notify.create_authorization_header() | ||
assert str(err.value) == "NOTIFY_SRE_CLIENT_SECRET is missing" | ||
|
||
|
||
# Test that the authorization header is created correctly and the correct header is generated | ||
@patch("integrations.notify.create_jwt_token") | ||
def test_successful_creation_of_header(mock_jwt_token): | ||
mock_jwt_token.return_value = "mocked_jwt_token" | ||
header_key, header_value = notify.create_authorization_header() | ||
|
||
assert header_key == "Authorization" | ||
assert header_value == "Bearer mocked_jwt_token" |
Oops, something went wrong.