diff --git a/duo_client/admin.py b/duo_client/admin.py index 63804f1..d20f151 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -209,6 +209,71 @@ def _canonicalize_bypass_codes(codes): else: return ','.join([str(int(code)) for code in codes]) + def get_administrative_units(self, admin_id=None, group_id=None, + integration_key=None, limit=None, offset=0): + """ + Retrieves a list of administrative units optionally filtered by admin, + group, or integration. At most one of admin_id, group_id, or + integration_key should be passed. + + Args: + admin_id(str): id of admin (optional) + group_id(str): id of group (optional) + integration_key(str): id of integration (optional) + limit: The max number of administrative units to fetch at once. + Default None + offset: If a limit is passed, the offset to start retrieval. + Default 0 + + Returns: list of administrative units + + Raises RuntimeError on error. + """ + (limit, offset) = self.normalize_paging_args(limit, offset) + + params = {} + if admin_id is not None: + params['admin_id'] = admin_id + if group_id is not None: + params['group_id'] = group_id + if integration_key is not None: + params['integration_key'] = integration_key + + if limit: + params['limit'] = limit + params['offset'] = offset + + return self.json_api_call('GET', + '/admin/v1/administrative_units', + params) + + iterator = self.get_administrative_units_iterator( + admin_id, group_id, integration_key) + + return list(iterator) + + def get_administrative_units_iterator(self, admin_id=None, group_id=None, + integration_key=None, ): + """ + Provides a generator which produces administrative_units. Under the + hood, this generator uses pagination, so it will only store one page of + administrative_units at a time in memory. + + Returns: A generator which produces administrative_units. + + Raises RuntimeError on error. + """ + params = {} + if admin_id is not None: + params['admin_id'] = admin_id + if group_id is not None: + params['group_id'] = group_id + if integration_key is not None: + params['integration_key'] = integration_key + return self.json_paging_api_call('GET', + '/admin/v1/administrative_units', + params) + def get_administrator_log(self, mintime=0): """ diff --git a/tests/admin/test_administrative_units.py b/tests/admin/test_administrative_units.py new file mode 100644 index 0000000..b77aba4 --- /dev/null +++ b/tests/admin/test_administrative_units.py @@ -0,0 +1,94 @@ +from .. import util +from .base import TestAdmin + + +class TestAdminUnits(TestAdmin): + # Uses underlying paging + def test_get_administratrive_units(self): + response = self.client_list.get_administrative_units() + response = response[0] + self.assertEqual(response['method'], 'GET') + (uri, args) = response['uri'].split('?') + self.assertEqual(uri, '/admin/v1/administrative_units') + + def test_get_administrative_units_with_limit(self): + response = self.client_list.get_administrative_units(limit=20) + response = response[0] + self.assertEqual(response['method'], 'GET') + (uri, args) = response['uri'].split('?') + self.assertEqual(uri, '/admin/v1/administrative_units') + self.assertEqual( + util.params_to_dict(args), + { + 'account_id': [self.client.account_id], + 'limit': ['20'], + 'offset': ['0'], + }) + + def test_get_adminstrative_units_with_limit_offset(self): + response = self.client_list.get_administrative_units(limit=20, offset=2) + response = response[0] + self.assertEqual(response['method'], 'GET') + (uri, args) = response['uri'].split('?') + self.assertEqual(uri, '/admin/v1/administrative_units') + self.assertEqual( + util.params_to_dict(args), + { + 'account_id': [self.client.account_id], + 'limit': ['20'], + 'offset': ['2'], + }) + + def test_get_administrative_units_with_offset(self): + response = self.client_list.get_administrative_units(offset=9001) + response = response[0] + self.assertEqual(response['method'], 'GET') + (uri, args) = response['uri'].split('?') + self.assertEqual(uri, '/admin/v1/administrative_units') + self.assertEqual( + util.params_to_dict(args), + { + 'account_id': [self.client.account_id], + 'limit': ['100'], + 'offset': ['0'], + }) + + def test_get_administrative_units_iterator(self): + expected_path = '/admin/v1/administrative_units' + expected_method = 'GET' + + tests = [ + { + 'admin_id': 'aaaa', + 'group_id': '1234', + 'integration_key': 'aaaaaaaaaaaaaaaaaaaa', + }, + { + 'admin_id': 'aaaa', + 'group_id': '1234', + }, + { + 'admin_id': 'aaaa', + }, + {} + ] + + for test in tests: + response = ( + self.client_list.get_administrative_units_iterator(**test) + ) + response = next(response) + self.assertEqual(response['method'], expected_method) + (uri, args) = response['uri'].split('?') + self.assertEqual(uri, expected_path) + expected_params = { + key: [value] for (key, value) in test.items() + } + expected_params.update( + { + 'account_id': [self.client.account_id], + 'limit': ['100'], + 'offset': ['0'], + } + ) + self.assertEqual(util.params_to_dict(args), expected_params)