Skip to content

Commit

Permalink
Polling requests and blocking an integration (#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
GDay authored Sep 29, 2023
1 parent c246123 commit 13300a2
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 0 deletions.
54 changes: 54 additions & 0 deletions back/admin/integrations/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import base64
import json
import uuid
Expand Down Expand Up @@ -242,6 +243,40 @@ def renew_key(self):
self.save(update_fields=["expiring", "extra_args"])
return success

def _check_condition(self, response, condition):
value = self._replace_vars(condition.get("value"))
try:
# first argument will be taken from the response
response_value = get_value_from_notation(
condition.get("response_notation"), response.json()
)
except KeyError:
# we know that the result might not be in the response yet, as we are
# waiting for the correct response, so just respond with an empty string
response_value = ""
return value == response_value

def _polling(self, item, response):
polling = item.get("polling")
continue_if = item.get("continue_if")
interval = polling.get("interval")
amount = polling.get("amount")

got_expected_result = self._check_condition(response, continue_if)
if got_expected_result:
return True, response

tried = 1
while amount > tried:
time.sleep(interval)
success, response = self.run_request(item)
got_expected_result = self._check_condition(response, continue_if)
if got_expected_result:
return True, response
tried += 1
# if exceeding the max amounts, then fail
return False, response

def execute(self, new_hire, params):
self.params = params
self.params["responses"] = []
Expand All @@ -262,9 +297,28 @@ def execute(self, new_hire, params):
for item in self.manifest["execute"]:
success, response = self.run_request(item)

# check if we need to poll before continuing
if polling := item.get("polling", False):
success, response = self._polling(item, response)

# check if we need to block this integration based on condition
if continue_if := item.get("continue_if", False):
got_expected_result = self._check_condition(response, continue_if)
if not got_expected_result:
response = self.clean_response(response=response)
Notification.objects.create(
notification_type=Notification.Type.BLOCKED_INTEGRATION,
extra_text=self.name,
created_for=new_hire,
description=f"Execute url ({item['url']}): {response}",
)
return False, response

# No need to retry or log when we are importing users
if not success and self.has_user_context:
response = self.clean_response(response=response)
if polling:
response = "Polling timed out: " + response
Notification.objects.create(
notification_type=Notification.Type.FAILED_INTEGRATION,
extra_text=self.name,
Expand Down
22 changes: 22 additions & 0 deletions back/admin/integrations/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ class ManifestFormSerializer(ValidateMixin, serializers.Serializer):
choice_name = serializers.CharField(required=False)


class ManifestConditionSerializer(ValidateMixin, serializers.Serializer):
response_notation = serializers.CharField()
value = serializers.CharField()


class ManifestPollingSerializer(ValidateMixin, serializers.Serializer):
interval = serializers.IntegerField(min_value=1, max_value=3600)
amount = serializers.IntegerField(min_value=1, max_value=100)


class ManifestExistSerializer(ValidateMixin, serializers.Serializer):
url = serializers.CharField()
expected = serializers.CharField()
Expand Down Expand Up @@ -57,6 +67,18 @@ class ManifestExecuteSerializer(ValidateMixin, serializers.Serializer):
("PUT", "PUT"),
]
)
polling = ManifestPollingSerializer(required=False)
continue_if = ManifestConditionSerializer(required=False)

def validate(self, data):
# Check that if polling has been filled, that continue_if is also filled
polling = data.get("polling", False)
continue_if = data.get("continue_if", False)
if polling and not continue_if:
raise serializers.ValidationError(
"continue_if must be filled if you use polling"
)
return data


class ManifestPostExecuteNotificationSerializer(ValidateMixin, serializers.Serializer):
Expand Down
130 changes: 130 additions & 0 deletions back/admin/integrations/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,136 @@ def test_integration_save_data_to_user_invalid_lookup(
assert new_hire.extra_fields == {}


@pytest.mark.django_db
def test_polling_not_getting_correct_state(
monkeypatch, new_hire_factory, custom_integration_factory
):
new_hire = new_hire_factory()

integration = custom_integration_factory(
manifest={
"execute": [
{
"url": "http://localhost/",
"polling": {
# very small number to not let task hang too long
"interval": 0.1,
"amount": 3,
},
"continue_if": {
"response_notation": "status",
"value": "done",
},
}
]
}
)

with patch(
"admin.integrations.models.Integration.run_request",
Mock(
side_effect=(
# first call
[
True,
Mock(json=lambda: {"status": "not_done"}),
],
# second call
[
True,
Mock(json=lambda: {"status": "not_done"}),
],
# third call
[
True,
Mock(json=lambda: {"status": "not_done"}),
],
# fourth call (will never reach this)
[
True,
Mock(json=lambda: {"status": "done"}),
],
)
),
) as request_mock:
success, _response = integration.execute(new_hire, {})

assert request_mock.call_count == 3
assert success is False


@pytest.mark.django_db
@patch(
"admin.integrations.models.Integration.run_request",
Mock(
side_effect=(
# first call
[
True,
Mock(json=lambda: {"status": "not_done"}),
],
# second call
[
True,
Mock(json=lambda: {"status": "done"}),
],
)
),
)
def test_polling_getting_correct_state(new_hire_factory, custom_integration_factory):
new_hire = new_hire_factory()

integration = custom_integration_factory(
manifest={
"execute": [
{
"url": "http://localhost/",
"polling": {
# very small number to not let task hang too long
"interval": 0.1,
"amount": 3,
},
"continue_if": {
"response_notation": "status",
"value": "done",
},
}
]
}
)

success, _response = integration.execute(new_hire, {})

assert success is True


@pytest.mark.django_db
@patch(
"admin.integrations.models.Integration.run_request",
Mock(return_value=(True, Mock(json=lambda: {"status": "not_done"}))),
)
def test_block_integration_on_condition(new_hire_factory, custom_integration_factory):
new_hire = new_hire_factory()

integration = custom_integration_factory(
manifest={
"execute": [
{
"url": "http://localhost/",
"continue_if": {
"response_notation": "status",
"value": "done",
},
}
]
}
)

success, _response = integration.execute(new_hire, {})

assert success is False


@pytest.mark.django_db
@patch(
"admin.integrations.models.Integration.run_request",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Generated by Django 4.2.5 on 2023-09-27 00:36

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("organization", "0036_organization_ignored_user_emails"),
]

operations = [
migrations.AlterField(
model_name="notification",
name="notification_type",
field=models.CharField(
choices=[
("added_todo", "A new to do item has been added"),
("completed_todo", "To do item has been marked as completed"),
("added_resource", "A new resource item has been added"),
("completed_course", "Course has been completed"),
("added_badge", "A new badge item has been added"),
("added_introduction", "A new introduction item has been added"),
("added_preboarding", "A new preboarding item has been added"),
("added_appointment", "A new appointment item has been added"),
("added_new_hire", "A new hire has been added"),
("added_administrator", "A new administrator has been added"),
("added_manager", "A new manager has been added"),
("added_admin_task", "A new admin task has been added"),
("added_sequence", "A new sequence has been added"),
("sent_email_message", "A new email has been sent"),
("sent_text_message", "A new text message has been sent"),
("sent_slack_message", "A new slack message has been sent"),
("updated_slack_message", "A new slack message has been updated"),
(
"sent_email_login_credentials",
"Login credentials have been sent",
),
("sent_email_task_reopened", "Reopened task email has been sent"),
("sent_email_task_reminder", "Task reminder email has been sent"),
(
"sent_email_new_hire_credentials",
"Sent new hire credentials email",
),
(
"sent_email_preboarding_access",
"Sent new hire preboarding email",
),
("sent_email_custom_sequence", "Sent email from sequence"),
(
"sent_email_new_hire_with_updates",
"Sent email with updates to new hire",
),
(
"sent_email_admin_task_extra",
"Sent email to extra person in admin task",
),
(
"sent_email_admin_task_new_assigned",
"Sent email about new person assigned to admin task",
),
(
"sent_email_admin_task_new_comment",
"Sent email about new comment on admin task",
),
(
"sent_email_integration_notification",
"Sent email about completing integration call",
),
(
"failed_no_phone",
"Couldn't send text message: number is missing",
),
(
"failed_no_email",
"Couldn't send email message: email is missing",
),
(
"failed_email_recipients_refused",
"Couldn't deliver email message: recipient refused",
),
(
"failed_email_delivery",
"Couldn't deliver email message: provider error",
),
(
"failed_email_address",
"Couldn't deliver email message: provider error",
),
("failed_send_slack_message", "Couldn't send Slack message"),
("failed_update_slack_message", "Couldn't update Slack message"),
("ran_integration", "Integration has been triggered"),
("failed_integration", "Couldn't complete integration"),
("blocked_integration", "Integration was blocked due to condition"),
(
"failed_text_integration_notification",
"Couldn't send integration notification",
),
],
default="added_todo",
max_length=100,
),
),
]
3 changes: 3 additions & 0 deletions back/organization/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ class Type(models.TextChoices):
)
RAN_INTEGRATION = "ran_integration", _("Integration has been triggered")
FAILED_INTEGRATION = "failed_integration", _("Couldn't complete integration")
BLOCKED_INTEGRATION = "blocked_integration", _(
"Integration was blocked due to condition"
)
FAILED_TEXT_INTEGRATION_NOTIFICATION = (
"failed_text_integration_notification",
_("Couldn't send integration notification"),
Expand Down
21 changes: 21 additions & 0 deletions docs/Integrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ Example:
}
```

`continue_if`: (optional) This can be used as a blocked. If you want to block an integration if a response is not what you expect, then you can do that with this. If you need to wait for a response to come back (waiting for a background task for example), then you can use polling and continue with the call with the response changes. It will check every response and stop polling when it matches.

Example:
```
{
"response_notation": "detail.status",
"value": "done"
}
```
With this config, it will check the response for the `status` property in the `detail` property (so for example: `{"detail": {"status": "done"}}`). It will check for the value `done`, which would be valid in this case and therefore continue with the integration.

`polling`: (optional) You can use this to poll a url if you are waiting for a background (async) task to be completed. It will retry fetching the url for as many times as you specify at the interval you want. Here is an example config:
```
{
"interval": 5,
"amount": 60,
}
```
This config will try to fetch the same url for 60 times and wait 5 seconds between each call (so max 300 seconds) and will keep going until the `status` of the response is `done`. If it exceeds the 300 seconds, then the integration will fail.


### Headers
These headers will be send with every request. These could include some sort of token variable for authentication.

Expand Down

0 comments on commit 13300a2

Please sign in to comment.