-
Notifications
You must be signed in to change notification settings - Fork 0
/
group_ownership.py
405 lines (364 loc) · 16.3 KB
/
group_ownership.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
"""Handler for requests to view/change group ownership."""
import shared.custom_fields as custom_fields
import shared.globals
import shared.shared_ldap as shared_ldap
import shared.shared_sd as shared_sd
import linaro_shared
CAPABILITIES = [
"CREATE",
"COMMENT",
"TRANSITION"
]
IT_BOT = "uid=it.support.bot,ou=mail-contacts-unsynced,ou=accounts,dc=linaro,dc=org"
GROUP_EMAIL_ADDRESS = "Group Email Address"
WONT_DO = "Won't Do"
def comment(ticket_data):
""" Comment handler """
last_comment, keyword = shared_sd.central_comment_handler(
["add", "remove", "help"], # Public comments
["retry"]) # Private comments
print(f"group_ownership comment handler: {last_comment}, {keyword}")
if keyword == "help":
shared_sd.post_comment(
("All bot commands must be internal comments and the first "
"word/phrase in the comment.\r\n\r\n"
"Valid commands are:\r\n"
"* retry to ask the bot to process the request again after "
"issues have been resolved."), False)
elif keyword == "retry":
create(ticket_data)
elif keyword == "add" or keyword == "remove":
# Explicitly process comment if keyword is add or remove so that this works
# for IT staff!
process_public_comment(ticket_data, last_comment, keyword)
# because ok_to_process_public_comment ignores comments from IT!
elif (linaro_shared.ok_to_process_public_comment(last_comment) and
(keyword is None or not process_public_comment(ticket_data, last_comment, keyword))):
shared_sd.post_comment(
"Your comment has not been recognised as an instruction to "
"the bot so the ticket will be left for IT Services to "
"review.", True)
shared_sd.deassign_ticket_if_appropriate(last_comment)
def process_public_comment(ticket_data, last_comment, keyword):
"""Logic to process a public comment."""
shared_sd.assign_issue_to(shared.globals.CONFIGURATION["bot_name"])
# If the original reporter IS a group owner, we will only accept comments
# from the same person and those comments will be add/remove commands.
#
# Otherwise, deassign and let IT work on what was said.
#
# Get the definitive email address for the group and the owner(s).
cf_group_email_address = custom_fields.get(GROUP_EMAIL_ADDRESS)
group_email_address = shared_sd.get_field(
ticket_data, cf_group_email_address)
if group_email_address is not None:
group_email_address = group_email_address.strip().lower()
group_email_address, result = shared_ldap.find_group(
group_email_address, ['owner'])
# Make sure that the group still exists because this is all asynchronous
# and anything could have happened!
if len(result) == 0:
shared_sd.post_comment(
"Sorry but the group's email address can't be found in Linaro "
"Login.", True)
shared_sd.resolve_ticket(WONT_DO)
return True
if len(result) != 1:
shared_sd.post_comment(
"Sorry but, somehow, the group's email address appears more than "
"once in Linaro Login.", True)
shared_sd.resolve_ticket(WONT_DO)
return True
if (result[0].owner.values != [] and
shared_ldap.reporter_is_group_owner(result[0].owner.values) and
keyword in ("add", "remove")):
grp_name = shared_ldap.extract_id_from_dn(result[0].entry_dn)
changes = last_comment["body"].split("\n")
print(f"process_public_comment: {grp_name}, {changes}")
batch_process_ownership_changes(grp_name, changes)
post_owners_of_group_as_comment(result[0].entry_dn)
return True
return False
def create(ticket_data):
"""Triggered when the issue is created."""
cf_group_email_address = custom_fields.get(GROUP_EMAIL_ADDRESS)
group_email_address = shared_sd.get_field(
ticket_data, cf_group_email_address)
if group_email_address is not None:
group_email_address = group_email_address.strip().lower()
group_email_address, result = shared_ldap.find_group(
group_email_address, ['owner'])
shared_sd.set_summary(
f"View/Change group ownership for {group_email_address}")
shared_sd.assign_issue_to(shared.globals.CONFIGURATION["bot_name"])
if len(result) == 0:
shared_sd.post_comment(
"Sorry but the group's email address can't be found in Linaro "
"Login.", True)
shared_sd.resolve_ticket(WONT_DO)
return
if len(result) != 1:
shared_sd.post_comment(
"Sorry but, somehow, the group's email address appears more than "
"once in Linaro Login.", True)
shared_sd.resolve_ticket(WONT_DO)
return
# See if the bot owns this group
owners = result[0].owner.values
if (len(owners) == 1 and owners[0] == IT_BOT):
shared_sd.post_comment(
(
"This group is maintained through automation. It is not "
"possible to change the owners of this group or raise "
"tickets to directly change the membership. If you want "
"to understand how this group is maintained automatically, "
"please raise a general IT Services support ticket."
),
True
)
shared_sd.resolve_ticket()
return
# Do we have any changes to process? If not, post the current owners to
# the ticket.
cf_group_owners = custom_fields.get("Group Owner(s)")
ownerchanges = shared_sd.get_field(ticket_data, cf_group_owners)
if ownerchanges is None:
post_owners_of_group_as_comment(result[0].entry_dn)
if shared_ldap.reporter_is_group_owner(result[0].owner.values):
shared_sd.post_comment(
("As you are an owner of this group, you can make changes to "
"the ownership by posting new comments to this ticket with "
"the following format:\r\n"
"*add* <email address>\r\n"
"*remove* <email address>\r\n"
"One command per line but you can have multiple changes in a "
"single comment. If you do not get the syntax right, the "
"automation will not be able to understand your request and "
"processing will stop.\r\n"), True)
shared_sd.transition_request_to("Waiting for customer")
else:
shared_sd.post_comment(
"As you are not an owner of this group, if you want to make "
"changes to the ownership, you will need to open a "
"[new ticket|https://servicedesk.linaro.org/servicedesk/"
"customer/portal/3/create/140].", True)
shared_sd.resolve_ticket()
return
# There are changes ... but is the requester a group owner?
cf_approvers = custom_fields.get("Approvers")
if result[0].owner.values == []:
# No owners at all. IT is always allowed to make changes
if shared_ldap.is_user_in_group("its", shared.globals.REPORTER):
shared_sd.transition_request_to("In progress")
else:
shared_sd.post_comment(
"This group has no owners. Asking IT Services to review "
"your request.", True)
it_members = shared_ldap.get_group_membership(
"cn=its,ou=mailing,ou=groups,dc=linaro,dc=org")
shared_sd.assign_approvers(it_members, cf_approvers)
shared_sd.transition_request_to("Needs approval")
elif shared_ldap.reporter_is_group_owner(result[0].owner.values):
shared_sd.transition_request_to("In progress")
else:
shared_sd.post_comment(
"As you are not an owner of this group, the owners will be "
"asked to approve or decline your request.", True)
shared_sd.assign_approvers(result[0].owner.values, cf_approvers)
shared_sd.transition_request_to("Needs approval")
def transition(status_to, ticket_data):
"""
If the status is "In Progress", trigger the membership change. This
status can only be reached from Open or Needs Approval.
"""
if status_to == "In Progress":
cf_group_email_address = custom_fields.get(GROUP_EMAIL_ADDRESS)
group_email_address = shared_sd.get_field(
ticket_data, cf_group_email_address)
if group_email_address is not None:
group_email_address = group_email_address.strip().lower()
group_email_address, result = shared_ldap.find_group(
group_email_address, ['owner'])
action_change(ticket_data, result[0])
def action_change(ticket_data, group_owners):
""" Process the ownership changes specified in the field. """
grp_name = shared_ldap.extract_id_from_dn(group_owners.entry_dn)
cf_group_owners = custom_fields.get("Group Owner(s)")
ownerchanges = shared_sd.get_field(ticket_data, cf_group_owners)
if ownerchanges is not None:
changes = ownerchanges.split("\r\n")
else:
changes = None
cf_added_removed = custom_fields.get("Added / Removed")
action_field = shared_sd.get_field(ticket_data, cf_added_removed)
if action_field is None:
changes_to_make = ""
else:
changes_to_make = action_field["value"]
batch_process_ownership_changes(grp_name, changes, True, changes_to_make)
post_owners_of_group_as_comment(group_owners.entry_dn)
if (group_owners.owner.values != [] and
shared_ldap.reporter_is_group_owner(group_owners.owner.values)):
shared_sd.post_comment(
("As you are an owner of this group, you can make changes to the "
"ownership by posting new comments to this ticket with the "
"following format:\r\n"
"*add* <email address>\r\n"
"*remove* <email address>\r\n"
"One command per line but you can have multiple changes in a "
"single comment. If you do not get the syntax right, the "
"automation will not be able to understand your request and "
"processing will stop.\r\n"), True)
shared_sd.transition_request_to("Waiting for customer")
else:
shared_sd.resolve_ticket()
def batch_process_ownership_changes(
group_cn, batch, auto=False, change_to_make=None):
"""Process a list of changes to the ownership."""
# This is used for the initial ticket (which doesn't specify add/remove)
# and for followup comments (which *does* specify add/remove). If auto is
# false, we're expecting "keyword emailaddress" and will stop on the first
# line that doesn't match that syntax. If auto is true, we're just
# expecting emailaddress.
#
# The formatting of the text varies from "\r\n" in the original request
# to "\n" in comments, so the *caller* must pass batch as a list.
change_made = False
# We need a list of current owners to sanity check the request.
owners = get_group_owners(group_cn)
response = ""
keyword = determine_change_keyword(auto, change_to_make)
for change in batch:
if change != "":
email_address, keyword = parse_change_line(auto, change, keyword)
print(f"parse_change_line returned {email_address} and {keyword}")
local_change, got_error, response = process_change(
keyword, email_address, owners, group_cn, response)
if got_error:
print("got error back from process_change")
break
change_made = change_made or local_change
if change_made:
linaro_shared.trigger_google_sync()
response += (
"Please note it can take up to 15 minutes for these changes to "
"appear on Google."
)
else:
response += "No change made as a result of the last instruction."
if response != "":
shared_sd.post_comment(response, True)
def determine_change_keyword(auto, change_to_make):
""" Standardise the keywords being used for the changes. """
if auto:
if change_to_make == "Added":
keyword = "add"
elif change_to_make == "Removed":
keyword = "remove"
else:
keyword = "auto"
else:
keyword = change_to_make
return keyword
def get_group_owners(group_cn):
""" Consistently return a list of owners. """
_, result = shared_ldap.find_group(group_cn, ["owner"])
if len(result) == 1:
owners = result[0].owner.values
else:
owners = []
return owners
def process_change(keyword, email_address, owners, group_cn, response):
""" Process the membership change specified. """
email_address = linaro_shared.cleanup_if_markdown(email_address)
result = shared_ldap.find_single_object_from_email(email_address)
if result is None:
response += (
"Couldn't find an entry on Linaro Login with an email "
f"address of '{email_address}'.\r\n"
)
return False, True, response
return process_keyword(
keyword, result, owners, email_address, group_cn, response)
def parse_change_line(auto, change, keyword):
""" Extract the email address and optionally keyword from the line. """
if auto:
# Should just be an email address with nothing else on that
# line.
email_address = change.strip().lower()
else:
# Try to get a keyword from this line of text.
keyword = "".join(
(char if char.isalpha() else " ") for char in change).\
split()[0].lower()
# Split the line on spaces and treat the second "word" as the
# email address.
email_address = change.split()[1].lower()
return email_address, keyword
def process_keyword(keyword, result, owners, email_address, group_cn, response):
""" Implement the change for the specified keyword action. """
change_made = False
got_error = False
if keyword == "auto":
# Is this email address already an owner?
if result in owners:
keyword = "add"
else:
keyword = "remove"
if keyword == "add":
if result in owners:
response += (
f"{email_address} is already an owner of the group.\r\n"
)
else:
response += f"Adding {email_address}\r\n"
shared_ldap.add_owner_to_group(group_cn, result)
change_made = True
elif keyword == "remove":
if result in owners:
response += f"Removing {email_address}\r\n"
shared_ldap.remove_owner_from_group(
group_cn, result)
change_made = True
else:
response += (
f"{email_address} is not an owner of the group so cannot "
"be removed as one.\r\n"
)
else:
response += (
f"{keyword} is not recognised as 'add' or 'remove'.\r\n"
)
got_error = True
return change_made, got_error, response
def post_owners_of_group_as_comment(group_full_dn):
"""Emit a list of the owners of the group."""
# Need to re-fetch the group ownership because we may have changed it
# since last time we queried it.
name = shared_ldap.extract_id_from_dn(group_full_dn)
_, result = shared_ldap.find_group(name, ["owner"])
if len(result) == 1 and result[0].owner.values != []:
response = "Here are the owners for the group:\r\n"
for owner in result[0].owner.values:
response += "* [%s|mailto:%s]\r\n" % owner_and_display_name(owner) # pylint: disable=consider-using-f-string
else:
response = "There are no owners for the group."
shared_sd.post_comment(response, True)
def owner_and_display_name(owner):
""" Calculate the owner's email address and display name. """
this_owner = shared_ldap.get_object(
owner,
['displayName', 'mail', 'givenName', 'sn'])
if this_owner is None:
return None, None
if this_owner.displayName.value is not None:
display_name = this_owner.displayName.value
else:
if this_owner.sn.value is not None:
if this_owner.givenName.value is not None:
display_name = f"{this_owner.givenName.value} {this_owner.sn.value}"
else:
display_name = this_owner.sn.value
else:
display_name = this_owner.mail.value
return (display_name, this_owner.mail.value)