-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
21 changed files
with
949 additions
and
58 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,3 +7,6 @@ var/ | |
|
||
# IDE config files | ||
.vscode | ||
|
||
# Local database | ||
.volumes |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from django.apps import AppConfig | ||
|
||
|
||
class CognitoConfig(AppConfig): | ||
default_auto_field = 'django.db.models.BigAutoField' | ||
name = 'cognito' |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
from typing import Any | ||
|
||
from cognito.utils.client import Client | ||
from cognito.utils.user import User | ||
from mypy_boto3_cognito_idp.type_defs import UserTypeTypeDef | ||
from utils.command import CommandHandler | ||
from utils.command import CustomBaseCommand | ||
|
||
from django.core.management.base import CommandParser | ||
|
||
|
||
def get_local_users() -> list[User]: | ||
# TODO: remove me! | ||
return [] | ||
|
||
|
||
class Handler(CommandHandler): | ||
|
||
def __init__(self, command: CustomBaseCommand, options: dict[str, Any]) -> None: | ||
super().__init__(command, options) | ||
self.clear = options['clear'] | ||
self.dry_run = options['dry_run'] | ||
self.client = Client() | ||
self.counts = {'added': 0, 'deleted': 0, 'updated': 0} | ||
|
||
def attributes_to_dict(self, user: UserTypeTypeDef) -> dict[str, str]: | ||
""" Converts the attributes from a cognito uses to a dict. """ | ||
return {attribute['Name']: attribute['Value'] for attribute in user.get('Attributes', {})} | ||
|
||
def clear_users(self) -> None: | ||
""" Remove all existing cognito users. """ | ||
|
||
for user in self.client.get_users(): | ||
self.counts['deleted'] += 1 | ||
username = user['Username'] | ||
self.print(f'deleting user {username}') | ||
if not self.dry_run: | ||
self.client.delete_user(username) | ||
|
||
def sync_users(self) -> None: | ||
""" Synchronizes local and cognito users. """ | ||
|
||
# Get all remote and local users | ||
local = {str(user.id): user for user in get_local_users()} | ||
remote = {user['Username']: user for user in self.client.get_users()} | ||
|
||
# Add local only user | ||
for username in set(local.keys()).difference(set(remote.keys())): | ||
self.counts['added'] += 1 | ||
self.print(f'adding user {username}') | ||
if not self.dry_run: | ||
self.client.create_user(username, local[username].email) | ||
|
||
# Remove remote only user | ||
for username in set(remote.keys()).difference(set(local.keys())): | ||
self.counts['deleted'] += 1 | ||
self.print(f'deleting user {username}') | ||
if not self.dry_run: | ||
self.client.delete_user(username) | ||
|
||
# Update common users | ||
for username in set(local.keys()).intersection(set(remote.keys())): | ||
if local[username].email != self.attributes_to_dict(remote[username]).get('email'): | ||
self.counts['updated'] += 1 | ||
self.print(f'updating user {username}') | ||
if not self.dry_run: | ||
self.client.update_user(username, local[username].email) | ||
|
||
def run(self) -> None: | ||
""" Main entry point of command. """ | ||
|
||
# Clear data | ||
if self.clear: | ||
self.clear_users() | ||
|
||
# Sync data | ||
self.sync_users() | ||
|
||
# Print counts | ||
printed = False | ||
for operation, count in self.counts.items(): | ||
if count: | ||
printed = True | ||
self.print_success(f'{count} user(s) {operation}') | ||
if not printed: | ||
self.print_success('nothing to be done') | ||
|
||
if self.dry_run: | ||
self.print_warning('dry run, nothing has been done') | ||
|
||
|
||
class Command(CustomBaseCommand): | ||
help = "Synchronizes local users with cognito" | ||
|
||
def add_arguments(self, parser: CommandParser) -> None: | ||
super().add_arguments(parser) | ||
parser.add_argument( | ||
'--clear', | ||
action='store_true', | ||
help='Delete existing users in cognito before synchronizing', | ||
) | ||
parser.add_argument( | ||
'--dry-run', | ||
action='store_true', | ||
help='Dry run, abort transaction in the end', | ||
) | ||
|
||
def handle(self, *args: Any, **options: Any) -> None: | ||
Handler(self, options).run() |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from unittest.mock import call | ||
from unittest.mock import patch | ||
|
||
from cognito.utils.client import Client | ||
|
||
from django.test import TestCase | ||
|
||
|
||
class ClientTestCase(TestCase): | ||
|
||
@patch('cognito.utils.client.client') | ||
def test_get_users(self, boto3): | ||
client = Client() | ||
client.get_users() | ||
self.assertIn(call().list_users(UserPoolId=client.user_pool_id), boto3.mock_calls) | ||
|
||
@patch('cognito.utils.client.client') | ||
def test_get_user(self, boto3): | ||
client = Client() | ||
client.get_user('1234') | ||
self.assertIn( | ||
call().admin_get_user(UserPoolId=client.user_pool_id, Username='1234'), | ||
boto3.mock_calls | ||
) | ||
|
||
@patch('cognito.utils.client.client') | ||
def test_create_user(self, boto3): | ||
client = Client() | ||
client.create_user('1234', '[email protected]') | ||
self.assertIn( | ||
call().admin_create_user( | ||
UserPoolId=client.user_pool_id, | ||
Username='1234', | ||
UserAttributes=[{ | ||
'Name': 'email', 'Value': '[email protected]' | ||
}], | ||
DesiredDeliveryMediums=['EMAIL'] | ||
), | ||
boto3.mock_calls | ||
) | ||
|
||
@patch('cognito.utils.client.client') | ||
def test_delete_user(self, boto3): | ||
client = Client() | ||
client.delete_user('1234') | ||
self.assertIn( | ||
call().admin_delete_user(UserPoolId=client.user_pool_id, Username='1234'), | ||
boto3.mock_calls | ||
) | ||
|
||
@patch('cognito.utils.client.client') | ||
def test_update_user(self, boto3): | ||
client = Client() | ||
client.update_user('1234', '[email protected]') | ||
self.assertIn( | ||
call().admin_update_user_attributes( | ||
UserPoolId=client.user_pool_id, | ||
Username='1234', | ||
UserAttributes=[{ | ||
'Name': 'email', 'Value': '[email protected]' | ||
}] | ||
), | ||
boto3.mock_calls | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
from io import StringIO | ||
from unittest.mock import call | ||
from unittest.mock import patch | ||
|
||
from django.core.management import call_command | ||
from django.test import TestCase | ||
|
||
|
||
class DummyUser: | ||
|
||
def __init__(self, id_, email): | ||
self.id = id_ | ||
self.email = email | ||
|
||
|
||
def cognito_user(username, email): | ||
return {'Username': username, 'Attributes': [{'Name': 'email', 'Value': email}]} | ||
|
||
|
||
class CognitoSyncCommandTest(TestCase): | ||
|
||
@patch('cognito.management.commands.cognito_sync.get_local_users') | ||
@patch('cognito.management.commands.cognito_sync.Client') | ||
def test_command_adds(self, client, users): | ||
users.return_value = [DummyUser(1, '[email protected]')] | ||
client.return_value.get_users.return_value = [] | ||
|
||
out = StringIO() | ||
call_command('cognito_sync', verbosity=2, stdout=out) | ||
|
||
self.assertIn('adding user 1', out.getvalue()) | ||
self.assertIn('1 user(s) added', out.getvalue()) | ||
self.assertIn(call().create_user('1', '[email protected]'), client.mock_calls) | ||
|
||
@patch('cognito.management.commands.cognito_sync.get_local_users') | ||
@patch('cognito.management.commands.cognito_sync.Client') | ||
def test_command_removes(self, client, users): | ||
users.return_value = [] | ||
client.return_value.get_users.return_value = [cognito_user('1', '[email protected]')] | ||
|
||
out = StringIO() | ||
call_command('cognito_sync', verbosity=2, stdout=out) | ||
|
||
self.assertIn('deleting user 1', out.getvalue()) | ||
self.assertIn('1 user(s) deleted', out.getvalue()) | ||
self.assertIn(call().delete_user('1'), client.mock_calls) | ||
|
||
@patch('cognito.management.commands.cognito_sync.get_local_users') | ||
@patch('cognito.management.commands.cognito_sync.Client') | ||
def test_command_updates(self, client, users): | ||
users.return_value = [DummyUser(1, '[email protected]')] | ||
client.return_value.get_users.return_value = [cognito_user('1', '[email protected]')] | ||
|
||
out = StringIO() | ||
call_command('cognito_sync', verbosity=2, stdout=out) | ||
|
||
self.assertIn('updating user 1', out.getvalue()) | ||
self.assertIn('1 user(s) updated', out.getvalue()) | ||
self.assertIn(call().update_user('1', '[email protected]'), client.mock_calls) | ||
|
||
@patch('cognito.management.commands.cognito_sync.get_local_users') | ||
@patch('cognito.management.commands.cognito_sync.Client') | ||
def test_command_does_not_updates_if_unchanged(self, client, users): | ||
users.return_value = [DummyUser(1, '[email protected]')] | ||
client.return_value.get_users.return_value = [cognito_user('1', '[email protected]')] | ||
|
||
out = StringIO() | ||
call_command('cognito_sync', verbosity=2, stdout=out) | ||
|
||
self.assertIn('nothing to be done', out.getvalue()) | ||
|
||
@patch('cognito.management.commands.cognito_sync.get_local_users') | ||
@patch('cognito.management.commands.cognito_sync.Client') | ||
def test_command_clears(self, client, users): | ||
users.return_value = [DummyUser(1, '[email protected]')] | ||
client.return_value.get_users.side_effect = [[cognito_user('1', '[email protected]')], []] | ||
|
||
out = StringIO() | ||
call_command('cognito_sync', clear=True, verbosity=2, stdout=out) | ||
|
||
self.assertIn('deleting user 1', out.getvalue()) | ||
self.assertIn('1 user(s) deleted', out.getvalue()) | ||
self.assertIn('adding user 1', out.getvalue()) | ||
self.assertIn('1 user(s) added', out.getvalue()) | ||
self.assertIn(call().delete_user('1'), client.mock_calls) | ||
self.assertIn(call().create_user('1', '[email protected]'), client.mock_calls) | ||
|
||
@patch('cognito.management.commands.cognito_sync.get_local_users') | ||
@patch('cognito.management.commands.cognito_sync.Client') | ||
def test_command_runs_dry(self, client, users): | ||
users.return_value = [DummyUser(1, '[email protected]'), DummyUser(2, '[email protected]')] | ||
client.return_value.get_users.return_value = [ | ||
cognito_user('1', '[email protected]'), cognito_user('3', '[email protected]') | ||
] | ||
|
||
out = StringIO() | ||
call_command('cognito_sync', dry_run=True, verbosity=2, stdout=out) | ||
|
||
self.assertIn('adding user 2', out.getvalue()) | ||
self.assertIn('deleting user 3', out.getvalue()) | ||
self.assertIn('updating user 1', out.getvalue()) | ||
self.assertIn('dry run', out.getvalue()) | ||
self.assertEqual([call(), call().get_users()], client.mock_calls) |
Oops, something went wrong.