Skip to content

Commit

Permalink
implement onclusive reingesting
Browse files Browse the repository at this point in the history
SDCP-751
  • Loading branch information
petrjasek committed Apr 23, 2024
1 parent 01571d5 commit f7dceba
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 46 deletions.
2 changes: 1 addition & 1 deletion server/planning/events/events_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ def test_new_planning_is_published_when_adding_to_published_event(self):

def test_related_planning_item_validation_on_post(self):
"""
check planning item fields validation
Check planning item fields validation
if validation fails, plannning item is not posted.
"""
events_service = get_resource_service("events")
Expand Down
31 changes: 29 additions & 2 deletions server/planning/feeding_services/onclusive_api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ class OnclusiveApiService(HTTPFeedingServiceBase):
"required": False,
"default": 365,
},
{
"id": "days_to_reingest",
"type": "text",
"label": lazy_gettext("Days to Reingest"),
"placeholder": lazy_gettext("Days"),
"required": False,
},
]

HTTP_AUTH = False
Expand Down Expand Up @@ -86,6 +93,22 @@ def _update(self, provider, update):
update["last_updated"] = utcnow().replace(
second=0
) # next time start from here, onclusive api does not use seconds

# force reingest starting from now - days_to_reingest
if provider["config"].get("days_to_reingest"):
start_date = datetime.now() - timedelta(days=int(provider["config"]["days_to_reingest"]))
logger.info("Reingesting from %s", start_date.date().isoformat())
update["config"] = provider["config"].copy()
update["config"]["days_to_ingest"] = ""
# override to reset
update["tokens"]["start_date"] = start_date
update["tokens"]["next_start"] = start_date
update["tokens"]["reingesting"] = True
update["tokens"]["import_finished"] = None
update["tokens"]["date"] = ""

reingesting = update["tokens"].get("reingesting")

if update["tokens"].get("import_finished"):
# populate it for cases when import was done before we introduced the field
update["tokens"].setdefault("next_start", update["tokens"]["import_finished"] - timedelta(hours=5))
Expand Down Expand Up @@ -129,9 +152,13 @@ def _update(self, provider, update):
logger.info("Onclusive returned %d items", len(items))
for item in items:
item.setdefault("language", self.language)
yield items
if reingesting:
item["versioncreated"] += timedelta(seconds=1) # bump versioncreated to trigger an update
if items:
yield items
update["tokens"][iterations_param] = i
update["tokens"].setdefault("import_finished", utcnow())
update["tokens"]["import_finished"] = utcnow()
update["tokens"]["reingesting"] = False
except SoftTimeLimitExceeded:
logger.warning("stopped due to time limit, tokens=%s", update["tokens"])

Expand Down
124 changes: 82 additions & 42 deletions server/planning/feeding_services/onclusive_api_service_tests.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from planning.feed_parsers.onclusive import OnclusiveFeedParser
from .onclusive_api_service import OnclusiveApiService
import flask
import unittest
import responses

from unittest.mock import MagicMock
from datetime import datetime, timedelta
from planning.feed_parsers.onclusive import OnclusiveFeedParser

import flask
import unittest
import requests_mock
from .onclusive_api_service import OnclusiveApiService


parser = MagicMock(OnclusiveFeedParser)
Expand All @@ -15,54 +16,93 @@ class OnclusiveApiServiceTestCase(unittest.TestCase):
def setUp(self) -> None:
super().setUp()
self.app = flask.Flask(__name__)
self.service = OnclusiveApiService()
self.service.get_feed_parser = MagicMock(return_value=parser)
event = {"versioncreated": datetime(2023, 3, 1, 8, 0, 0)}

parser.parse.return_value = [
event.copy(),
]

# for requests.json we need to convert datetime to string
self.event = {"versioncreated": event["versioncreated"].isoformat()}

self.provider = {
"_id": "onclusive_api",
"name": "onclusive",
"feed_parser": "onclusive_api",
"config": {"url": "https://api.abc.com", "username": "user", "password": "pass", "days": "30"},
}

@responses.activate
def test_update(self):
event = {"versioncreated": datetime.fromisoformat("2023-03-01T08:00:00")}
with self.app.app_context():
now = datetime.utcnow()
service = OnclusiveApiService()
service.get_feed_parser = MagicMock(return_value=parser)
parser.parse.return_value = [event]

provider = {
"_id": "onclusive_api",
"name": "onclusive",
"feed_parser": "onclusive_api",
"config": {"url": "https://api.abc.com", "username": "user", "password": "pass", "days": "30"},
}
responses.post(
url="https://api.abc.com/api/v2/auth",
json={
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)

now = datetime.now()
responses.get(
"https://api.abc.com/api/v2/events/date?date={}".format(now.strftime("%Y%m%d")),
json=[self.event],
) # first returns an item
responses.get("https://api.abc.com/api/v2/events/date", json=[]) # ones won't

with self.app.app_context():
updates = {}
with requests_mock.Mocker() as m:
m.post(
"https://api.abc.com/api/v2/auth",
json={
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)
m.get(
"https://api.abc.com/api/v2/events/date?date={}".format(now.strftime("%Y%m%d")),
json=[{"versioncreated": event["versioncreated"].isoformat()}],
) # first returns an item
m.get("https://api.abc.com/api/v2/events/date", json=[]) # ones won't
items = list(service._update(provider, updates))
items = list(self.service._update(self.provider, updates))
self.assertIn("tokens", updates)
self.assertEqual("refresh", updates["tokens"]["refreshToken"])
self.assertIn("import_finished", updates["tokens"])
self.assertEqual(updates["last_updated"], updates["tokens"]["next_start"])
self.assertEqual("fr-CA", items[0][0]["language"])

provider.update(updates)
self.provider.update(updates)
updates = {}
with requests_mock.Mocker() as m:
m.post(
"https://api.abc.com/api/v2/auth/renew",
responses.post(
"https://api.abc.com/api/v2/auth/renew",
json={
"token": "tok2",
"refreshToken": "refresh2",
},
)
responses.get("https://api.abc.com/api/v2/events/latest", json=[])
list(self.service._update(self.provider, updates))
self.assertEqual("refresh2", updates["tokens"]["refreshToken"])

def test_reingest(self):
with self.app.app_context():
start = datetime.now() - timedelta(days=30)
self.provider["config"]["days_to_reingest"] = "30"
self.provider["config"]["days_to_ingest"] = "10"
updates = {}
with responses.RequestsMock() as rsps: # checks if all requests were fired
rsps.add(
responses.POST,
url="https://api.abc.com/api/v2/auth",
json={
"token": "tok2",
"refreshToken": "refresh2",
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)
m.get("https://api.abc.com/api/v2/events/latest", json=[])
list(service._update(provider, updates))
self.assertEqual("refresh2", updates["tokens"]["refreshToken"])

for i in range(0, 10):
rsps.add(
responses.GET,
"https://api.abc.com/api/v2/events/date?limit=2000&date={}".format(
(start + timedelta(days=i)).strftime("%Y%m%d")
),
json=[self.event],
)

items = list(self.service._update(self.provider, updates))
assert 10 == len(items)
assert 1 == len(items[0])
assert items[0][0]["versioncreated"].isoformat() > self.event["versioncreated"]
assert updates["tokens"]["import_finished"]
assert not updates["tokens"]["reingesting"]
1 change: 0 additions & 1 deletion server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ flake8-docstrings
pydocstyle<7.0
wooper==0.4.4
requests
requests-mock==1.12.1
icalendar>=4.0.3,<5.1
coverage==7.4.4
deepdiff
Expand Down

0 comments on commit f7dceba

Please sign in to comment.