Skip to content

Commit

Permalink
feat: Add AWS Identity Store integration
Browse files Browse the repository at this point in the history
  • Loading branch information
gcharest authored Apr 18, 2024
1 parent 979f66b commit 7ab0eae
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 0 deletions.
63 changes: 63 additions & 0 deletions app/integrations/aws/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import boto3 # type: ignore

from dotenv import load_dotenv

load_dotenv()

# ROLE_ARN = os.environ.get("AWS_ORG_ACCOUNT_ROLE_ARN", "")
ROLE_ARN = os.environ.get("AWS_SSO_ROLE_ARN", "")
SYSTEM_ADMIN_PERMISSIONS = os.environ.get("AWS_SSO_SYSTEM_ADMIN_PERMISSIONS")
VIEW_ONLY_PERMISSIONS = os.environ.get("AWS_SSO_VIEW_ONLY_PERMISSIONS")


AWS_REGION = os.environ.get("AWS_REGION", "ca-central-1")


def get_boto3_client(client_type, region=AWS_REGION):
"""Gets the client for the specified service"""
return boto3.client(client_type, region_name=region)


def paginate(client, operation, keys, **kwargs):
"""Generic paginator for AWS operations"""
paginator = client.get_paginator(operation)
results = []

for page in paginator.paginate(**kwargs):
for key in keys:
if key in page:
results.extend(page[key])

return results


def assume_role_client(client_type, role_arn=None, role_session_name="SREBot"):
if not role_arn:
role_arn = ROLE_ARN

# Create a new session using the credentials provided by the ECS task role
session = boto3.Session()

# Use the session to create an STS client
sts_client = session.client("sts")

# Assume the role
response = sts_client.assume_role(
RoleArn=role_arn, RoleSessionName=role_session_name
)

# Create a new session with the assumed role's credentials
assumed_role_session = boto3.Session(
aws_access_key_id=response["Credentials"]["AccessKeyId"],
aws_secret_access_key=response["Credentials"]["SecretAccessKey"],
aws_session_token=response["Credentials"]["SessionToken"],
)

# Return a client created with the assumed role's session
return assumed_role_session.client(client_type)


def test():
sts = boto3.client("sts")
print(sts.get_caller_identity())
64 changes: 64 additions & 0 deletions app/integrations/aws/identity_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from integrations.aws.client import paginate, assume_role_client

INSTANCE_ID = os.environ.get("AWS_SSO_INSTANCE_ID", "")
INSTANCE_ARN = os.environ.get("AWS_SSO_INSTANCE_ARN", "")
ROLE_ARN = os.environ.get("AWS_SSO_ROLE_ARN", "")


def list_users(identity_store_id=None, attribute_path=None, attribute_value=None):
"""Retrieves all users from the AWS Identity Center (identitystore)"""
client = assume_role_client("identitystore", ROLE_ARN)
if not identity_store_id:
identity_store_id = INSTANCE_ID
kwargs = {"IdentityStoreId": identity_store_id}

if attribute_path and attribute_value:
kwargs["Filters"] = [
{"AttributePath": attribute_path, "AttributeValue": attribute_value},
]

return paginate(client, "list_users", ["Users"], **kwargs)


def list_groups(identity_store_id=None, attribute_path=None, attribute_value=None):
"""Retrieves all groups from the AWS Identity Center (identitystore)"""
client = assume_role_client("identitystore", ROLE_ARN)
if not identity_store_id:
identity_store_id = INSTANCE_ID
kwargs = {"IdentityStoreId": identity_store_id}

if attribute_path and attribute_value:
kwargs["Filters"] = [
{"AttributePath": attribute_path, "AttributeValue": attribute_value},
]

return paginate(client, "list_groups", ["Groups"], **kwargs)


def list_group_memberships(identity_store_id, group_id):
"""Retrieves all group memberships from the AWS Identity Center (identitystore)"""
client = assume_role_client("identitystore", ROLE_ARN)

if not identity_store_id:
identity_store_id = INSTANCE_ID
return paginate(
client,
"list_group_memberships",
["GroupMemberships"],
IdentityStoreId=identity_store_id,
GroupId=group_id,
)


def list_groups_with_membership(identity_store_id):
"""Retrieves all groups with their members from the AWS Identity Center (identitystore)"""
if not identity_store_id:
identity_store_id = INSTANCE_ID
groups = list_groups(identity_store_id)
for group in groups:
group["GroupMemberships"] = list_group_memberships(
identity_store_id, group["GroupId"]
)

return groups
Empty file added app/modules/dev/__init__.py
Empty file.

0 comments on commit 7ab0eae

Please sign in to comment.