generated from cds-snc/project-template
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: move users related commands to own module
- Loading branch information
Showing
4 changed files
with
191 additions
and
112 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,16 +8,13 @@ | |
""" | ||
|
||
import json | ||
import os | ||
from slack_bolt import App | ||
|
||
from integrations.aws.organizations import get_account_id_by_name | ||
from integrations.aws import identity_store | ||
from integrations.slack import commands as slack_commands | ||
from integrations.slack import users as slack_users | ||
from modules.aws import aws_access_requests, aws_account_health, groups | ||
from modules.aws.identity_center import provision_aws_users | ||
from modules.permissions import handler as permissions | ||
from modules.aws import aws_access_requests, aws_account_health, groups, users | ||
|
||
PREFIX = os.environ.get("PREFIX", "") | ||
AWS_ADMIN_GROUPS = os.environ.get("AWS_ADMIN_GROUPS", "[email protected]").split(",") | ||
|
@@ -40,7 +37,7 @@ | |
""" | ||
|
||
|
||
def register(bot): | ||
def register(bot: App) -> None: | ||
"""AWS module registration. | ||
Args: | ||
|
@@ -82,8 +79,9 @@ def aws_command(ack, command, logger, respond, client, body) -> None: | |
aws_access_requests.request_access_modal(client, body) | ||
case "health": | ||
aws_account_health.request_health_modal(client, body) | ||
case "user": | ||
request_user_provisioning(client, body, respond, args, logger) | ||
case "users": | ||
users.command_handler(client, body, respond, args, logger) | ||
# request_user_provisioning(client, body, respond, args, logger) | ||
case "groups": | ||
groups.command_handler(client, body, respond, args, logger) | ||
case _: | ||
|
@@ -93,40 +91,6 @@ def aws_command(ack, command, logger, respond, client, body) -> None: | |
) | ||
|
||
|
||
def request_user_provisioning(client, body, respond, args, logger): | ||
"""Request AWS user provisioning. | ||
This function processes a request to provision or deprovision AWS users. | ||
Args: | ||
client (SlackClient): The Slack client instance. | ||
body (dict): The request body. | ||
respond (function): The function to respond to the request. | ||
args (list): The list of arguments passed with the command. | ||
logger (Logger): The logger instance. | ||
""" | ||
requestor_email = slack_users.get_user_email_from_body(client, body) | ||
if permissions.is_user_member_of_groups(requestor_email, AWS_ADMIN_GROUPS): | ||
operation = args[0] | ||
users_emails = args[1:] | ||
users_emails = [ | ||
( | ||
slack_users.get_user_email_from_handle(client, email) | ||
if email.startswith("@") | ||
else email | ||
) | ||
for email in users_emails | ||
] | ||
response = provision_aws_users(operation, users_emails) | ||
respond(f"Request completed:\n{json.dumps(response, indent=2)}") | ||
else: | ||
respond( | ||
"This function is restricted to admins only. Please contact #sre-and-tech-ops for assistance." | ||
) | ||
|
||
logger.info("Completed user provisioning request") | ||
|
||
|
||
def request_aws_account_access( | ||
account_name, rationale, start_date, end_date, user_email, access_type | ||
): | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import os | ||
import json | ||
from slack_sdk.web import WebClient | ||
from modules.aws import identity_center | ||
from modules.permissions import handler as permissions | ||
from integrations.slack import users as slack_users | ||
|
||
AWS_ADMIN_GROUPS = os.environ.get("AWS_ADMIN_GROUPS", "").split(",") | ||
|
||
help_text = """ | ||
\n *AWS Users*: | ||
\n • `/aws users <create|delete> [@username [email protected]]` - Provision or deprovision AWS users. | ||
\n • `/aws users help` - Show this help text. | ||
""" | ||
|
||
|
||
def command_handler(client: WebClient, body, respond, args, logger): | ||
"""Handle the command. | ||
Args: | ||
client (WebClient): The Slack client. | ||
body (dict): The request body. | ||
respond (function): The function to respond to the request. | ||
args (list[str]): The list of arguments. | ||
logger (Logger): The logger. | ||
""" | ||
action: list[str] = args.pop(0) if args else "" | ||
match action: | ||
case "help" | "aide": | ||
respond(help_text) | ||
case "create" | "delete": | ||
# reinsert the action into the args list for the request_user_provisioning function | ||
args.insert(0, action) | ||
request_user_provisioning(client, body, respond, args, logger) | ||
case _: | ||
respond("Invalid command. Type `/aws users help` for more information.") | ||
|
||
|
||
def request_user_provisioning(client: WebClient, body, respond, args, logger): | ||
"""Request AWS user provisioning. | ||
This function processes a request to provision or deprovision AWS users. | ||
Args: | ||
client (SlackClient): The Slack client instance. | ||
body (dict): The request body. | ||
respond (function): The function to respond to the request. | ||
args (list): The list of arguments passed with the command. | ||
logger (Logger): The logger instance. | ||
""" | ||
requestor_email = slack_users.get_user_email_from_body(client, body) | ||
if permissions.is_user_member_of_groups(requestor_email, AWS_ADMIN_GROUPS): | ||
operation = args[0] | ||
users_emails = args[1:] | ||
users_emails = [ | ||
( | ||
slack_users.get_user_email_from_handle(client, email) | ||
if email.startswith("@") | ||
else email | ||
) | ||
for email in users_emails | ||
] | ||
response = identity_center.provision_aws_users(operation, users_emails) | ||
respond(f"Request completed:\n{json.dumps(response, indent=2)}") | ||
else: | ||
respond( | ||
"This function is restricted to admins only. Please contact #sre-and-tech-ops for assistance." | ||
) | ||
|
||
logger.info("Completed user provisioning request") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,24 +34,24 @@ def test_aws_command_handles_access_command(request_access_modal): | |
request_access_modal.assert_called_with(client, body) | ||
|
||
|
||
@patch("modules.aws.aws.slack_commands.parse_command") | ||
@patch("modules.aws.aws.request_user_provisioning") | ||
def test_aws_command_handles_provision_command( | ||
mock_request_provisioning: MagicMock, mock_parse_command: MagicMock | ||
): | ||
ack = MagicMock() | ||
logger = MagicMock() | ||
respond = MagicMock() | ||
client = MagicMock() | ||
body = MagicMock() | ||
mock_parse_command.return_value = ["user", "create", "[email protected]"] | ||
aws.aws_command( | ||
ack, {"text": "user create [email protected]"}, logger, respond, client, body | ||
) | ||
ack.assert_called() | ||
mock_request_provisioning.assert_called_with( | ||
client, body, respond, ["create", "[email protected]"], logger | ||
) | ||
# @patch("modules.aws.aws.slack_commands.parse_command") | ||
# @patch("modules.aws.aws.request_user_provisioning") | ||
# def test_aws_command_handles_provision_command( | ||
# mock_request_provisioning: MagicMock, mock_parse_command: MagicMock | ||
# ): | ||
# ack = MagicMock() | ||
# logger = MagicMock() | ||
# respond = MagicMock() | ||
# client = MagicMock() | ||
# body = MagicMock() | ||
# mock_parse_command.return_value = ["user", "create", "[email protected]"] | ||
# aws.aws_command( | ||
# ack, {"text": "user create [email protected]"}, logger, respond, client, body | ||
# ) | ||
# ack.assert_called() | ||
# mock_request_provisioning.assert_called_with( | ||
# client, body, respond, ["create", "[email protected]"], logger | ||
# ) | ||
|
||
|
||
@patch("modules.aws.aws.slack_commands.parse_command") | ||
|
@@ -99,58 +99,6 @@ def test_aws_command_handles_unknown_command(mock_parse_command): | |
) | ||
|
||
|
||
@patch("modules.aws.aws.AWS_ADMIN_GROUPS", ["[email protected]"]) | ||
@patch("modules.aws.aws.provision_aws_users") | ||
@patch("modules.aws.aws.permissions") | ||
@patch("modules.aws.aws.slack_users") | ||
def test_request_user_provisioning( | ||
mock_slack_users, mock_permissions, mock_provision_aws_user | ||
): | ||
client = MagicMock() | ||
body = MagicMock() | ||
respond = MagicMock() | ||
logger = MagicMock() | ||
mock_slack_users.get_user_email_from_body.return_value = "[email protected]" | ||
mock_provision_aws_user.return_value = True | ||
aws.request_user_provisioning( | ||
client, body, respond, ["create", "user.email"], logger | ||
) | ||
mock_slack_users.get_user_email_from_body.assert_called_with(client, body) | ||
mock_permissions.is_user_member_of_groups.assert_called_with( | ||
"[email protected]", ["[email protected]"] | ||
) | ||
respond.assert_called_with("Request completed:\ntrue") | ||
logger.info.assert_called_with("Completed user provisioning request") | ||
|
||
|
||
@patch("modules.aws.aws.AWS_ADMIN_GROUPS", ["[email protected]"]) | ||
@patch("modules.aws.aws.provision_aws_users") | ||
@patch("modules.aws.aws.permissions") | ||
@patch("modules.aws.aws.slack_users") | ||
def test_request_user_provisioning_requestor_not_admin( | ||
mock_slack_users, | ||
mock_permissions, | ||
mock_provision_aws_user, | ||
): | ||
client = MagicMock() | ||
body = MagicMock() | ||
respond = MagicMock() | ||
logger = MagicMock() | ||
mock_slack_users.get_user_email_from_body.return_value = "[email protected]" | ||
mock_permissions.is_user_member_of_groups.return_value = False | ||
mock_provision_aws_user.return_value = True | ||
aws.request_user_provisioning( | ||
client, body, respond, ["create", "user.email"], logger | ||
) | ||
mock_slack_users.get_user_email_from_body.assert_called_with(client, body) | ||
mock_permissions.is_user_member_of_groups.assert_called_with( | ||
"[email protected]", ["[email protected]"] | ||
) | ||
respond.assert_called_with( | ||
"This function is restricted to admins only. Please contact #sre-and-tech-ops for assistance." | ||
) | ||
|
||
|
||
@patch("modules.aws.aws_access_requests.create_aws_access_request") | ||
@patch("integrations.aws.identity_store.get_user_id") | ||
@patch("integrations.aws.organizations.get_account_id_by_name") | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from unittest.mock import patch, MagicMock, call | ||
from modules.aws import users | ||
|
||
|
||
def test_aws_users_command_handles_empty_command(): | ||
client = MagicMock() | ||
body = MagicMock() | ||
respond = MagicMock() | ||
args = "" | ||
logger = MagicMock() | ||
|
||
users.command_handler(client, body, respond, args, logger) | ||
|
||
respond.assert_called_once_with( | ||
"Invalid command. Type `/aws users help` for more information." | ||
) | ||
logger.info.assert_not_called() | ||
|
||
|
||
def test_aws_users_command_handles_help_command(): | ||
client = MagicMock() | ||
body = MagicMock() | ||
respond = MagicMock() | ||
args = ["help"] | ||
logger = MagicMock() | ||
|
||
users.command_handler(client, body, respond, args, logger) | ||
|
||
respond.assert_called_once_with(users.help_text) | ||
logger.info.assert_not_called() | ||
|
||
|
||
@patch("modules.aws.users.request_user_provisioning") | ||
def test_aws_users_command_handles_create_command(mock_request_user_provisioning): | ||
client = MagicMock() | ||
body = MagicMock() | ||
respond = MagicMock() | ||
args = ["create", "@username"] | ||
logger = MagicMock() | ||
|
||
users.command_handler(client, body, respond, args, logger) | ||
|
||
mock_request_user_provisioning.assert_called_once_with( | ||
client, body, respond, args, logger | ||
) | ||
|
||
|
||
@patch("modules.aws.users.AWS_ADMIN_GROUPS", ["[email protected]"]) | ||
@patch("modules.aws.users.identity_center.provision_aws_users") | ||
@patch("modules.aws.users.permissions") | ||
@patch("modules.aws.users.slack_users") | ||
def test_request_user_provisioning( | ||
mock_slack_users, mock_permissions, mock_provision_aws_user | ||
): | ||
client = MagicMock() | ||
body = MagicMock() | ||
respond = MagicMock() | ||
logger = MagicMock() | ||
mock_slack_users.get_user_email_from_body.return_value = "[email protected]" | ||
mock_provision_aws_user.return_value = True | ||
users.request_user_provisioning( | ||
client, body, respond, ["create", "user.email"], logger | ||
) | ||
mock_slack_users.get_user_email_from_body.assert_called_with(client, body) | ||
mock_permissions.is_user_member_of_groups.assert_called_with( | ||
"[email protected]", ["[email protected]"] | ||
) | ||
respond.assert_called_with("Request completed:\ntrue") | ||
logger.info.assert_called_with("Completed user provisioning request") | ||
|
||
|
||
@patch("modules.aws.users.AWS_ADMIN_GROUPS", ["[email protected]"]) | ||
@patch("modules.aws.users.identity_center.provision_aws_users") | ||
@patch("modules.aws.users.permissions") | ||
@patch("modules.aws.users.slack_users") | ||
def test_request_user_provisioning_requestor_not_admin( | ||
mock_slack_users, | ||
mock_permissions, | ||
mock_provision_aws_user, | ||
): | ||
client = MagicMock() | ||
body = MagicMock() | ||
respond = MagicMock() | ||
logger = MagicMock() | ||
mock_slack_users.get_user_email_from_body.return_value = "[email protected]" | ||
mock_permissions.is_user_member_of_groups.return_value = False | ||
mock_provision_aws_user.return_value = True | ||
users.request_user_provisioning( | ||
client, body, respond, ["create", "user.email"], logger | ||
) | ||
mock_slack_users.get_user_email_from_body.assert_called_with(client, body) | ||
mock_permissions.is_user_member_of_groups.assert_called_with( | ||
"[email protected]", ["[email protected]"] | ||
) | ||
respond.assert_called_with( | ||
"This function is restricted to admins only. Please contact #sre-and-tech-ops for assistance." | ||
) |