-
Notifications
You must be signed in to change notification settings - Fork 0
/
identitystore_bulkoperations.py
211 lines (166 loc) · 6.88 KB
/
identitystore_bulkoperations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#Python program for AWS Identity bulk User Management
# CSV format for identities to be created:
# username,givenname,familyname,groupname,email,emailtype,primary
# nina_franco,Nina,Franco,ExistingGroup,[email protected],work,TRUE
# ...
import argparse
import json
import boto3
import csv
client = boto3.client('identitystore')
def create_users(args):
"""
This function creates bulks user and adds each user to groups if the group exists.
- If the group does not exists , this function will create only the user and skip adding user to the group
Note: Uses the region set in the default profile or shell environment
Required parameters
-------------------
--identitystoreid - Identity Store Id of SSO configuration
--identities_file - CSV file containing identities:
Required fields
-------------------
username - User Name for the user
givenname - First Name for the user
familyname - Last Name for the user
groupname - Name of the SSO group !! Only accepts a single groupname
email - Email !! Only accepts a single email
email_type - Type of email, e.g. work, personal
primary - Binary True|False
Response
--------
None
"""
sso_id_storeid = args.identitystoreid
ids_filename = args.identities_file
try:
with open(ids_filename, encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
rows=[]
for row in reader:
rows.append(row)
print(row)
except:
raise Exception('Unable to parse specified file: %s. Check file exists and verify file format' % ids_filename)
for row in rows:
user_name = row['username']
given_name = row['givenname']
family_name = row['familyname']
display_name = "{} {}".format(given_name, family_name)
groupnameFlag=False
if 'groupname' in row:
groupnameFlag=True
group_name = row['groupname']
emailFlag=False
if 'email' in row:
emailFlag=True
email=row['email']
email_type=row['emailtype']
if row['primary']=='TRUE':
primary=True
elif row['Primary']=='FALSE':
primary=False
else:
raise Exception('Email provided for %s but primary neither True nor False' % user_name)
user_payload = {}
user_payload['IdentityStoreId']=sso_id_storeid
user_payload['UserName'] = user_name
user_payload['DisplayName'] = display_name
user_payload['Name']={
'FamilyName': family_name,
'GivenName': given_name
}
if emailFlag:
user_payload['Emails']=[{'Value': email, 'Type': email_type, 'Primary': primary}]
print(user_payload)
create_user_response = client.create_user(**user_payload)
user_id = create_user_response["UserId"]
print("User:{} with UserId:{} created successfully".format(
user_name, create_user_response["UserId"]))
group_exists = True
if groupnameFlag:
try:
get_group_id_response = client.get_group_id(
AlternateIdentifier={
'UniqueAttribute': {
'AttributePath': 'displayName',
'AttributeValue': group_name
}
},
IdentityStoreId=sso_id_storeid
)
except client.exceptions.ResourceNotFoundException as e:
print("Group Name {} does not exists, Skipping adding user to group".format(
group_name))
group_exists = False
if group_exists:
create_group_membership_response = client.create_group_membership(
GroupId=(get_group_id_response["GroupId"]),
IdentityStoreId=sso_id_storeid,
MemberId={
'UserId': user_id
}
)
print("User:{} added to Group:{} successfully".format(
user_name, group_name))
def delete_users(args):
"""
This function deletes bulks user and adds each user to groups if the group exists.
- If the group does not exists , this function will delete only the user and skip adding user to the group
Note: Uses the region set in the default profile or shell environment
Required parameters
-------------------
--identitystoreid - Identity Store Id of SSO configuration
--identities_file - CSV file containing identities:
Required fields
-------------------
username - User Name for the user
Response
--------
None
"""
sso_id_storeid = args.identitystoreid
ids_filename = args.identities_file
try:
with open(ids_filename, encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
rows=[]
for row in reader:
rows.append(row)
print(row)
except:
raise Exception('Unable to parse specified file: %s. Check file exists and verify file format' % ids_filename)
for row in rows:
user_name = row['username']
user_id=client.get_user_id(
IdentityStoreId=sso_id_storeid,
AlternateIdentifier={
'UniqueAttribute': {
'AttributePath': 'UserName',
'AttributeValue': user_name
}
}
)['UserId']
client.delete_user(
IdentityStoreId=sso_id_storeid,
UserId=user_id)
print("User:{} with UserId:{} deleted successfully".format(
user_name, user_id))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
#sub-parsers for creating multiple users in IAM Identity Store
create_users_parser = subparsers.add_parser('create_users')
create_users_parser.add_argument(
'--identitystoreid', required=True, help="Identity Store Id for IAM Identity Center Directory Configuration")
create_users_parser.add_argument(
'--identities_file', required=True, help="Filename for csv file with identities to be added")
create_users_parser.set_defaults(func=create_users)
#sub-parsers for deleting multiple users in IAM Identity Store
delete_users_parser = subparsers.add_parser('delete_users')
delete_users_parser.add_argument(
'--identitystoreid', required=True, help="Identity Store Id for IAM Identity Center Directory Configuration")
delete_users_parser.add_argument(
'--identities_file', required=True, help="Filename for csv file with identities to be added")
delete_users_parser.set_defaults(func=delete_users)
args = parser.parse_args()
args.func(args)