Skip to content

Commit

Permalink
implement onclusive reingesting (#1971)
Browse files Browse the repository at this point in the history
SDCP-751
  • Loading branch information
petrjasek committed Apr 24, 2024
1 parent 60319f4 commit d7dfa2f
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 46 deletions.
38 changes: 36 additions & 2 deletions server/planning/feeding_services/onclusive_api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from urllib.parse import urljoin
from superdesk.errors import ProviderError
from celery.exceptions import SoftTimeLimitExceeded
from superdesk.celery_task_utils import get_lock_id
from superdesk.lock import touch

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,6 +60,13 @@ class OnclusiveApiService(HTTPFeedingServiceBase):
"required": False,
"default": 365,
},
{
"id": "days_to_reingest",
"type": "text",
"label": lazy_gettext("Days in the past to Reingest"),
"placeholder": lazy_gettext("Days"),
"required": False,
},
]

HTTP_AUTH = False
Expand Down Expand Up @@ -86,6 +95,22 @@ def _update(self, provider, update):
update["last_updated"] = utcnow().replace(
second=0
) # next time start from here, onclusive api does not use seconds

# force reingest starting from now - days_to_reingest
if provider["config"].get("days_to_reingest"):
start_date = datetime.now() - timedelta(days=int(provider["config"]["days_to_reingest"]))
logger.info("Reingesting from %s", start_date.date().isoformat())
update["config"] = provider["config"].copy()
update["config"]["days_to_reingest"] = ""
# override to reset
update["tokens"]["start_date"] = start_date
update["tokens"]["next_start"] = start_date
update["tokens"]["reingesting"] = True
update["tokens"]["import_finished"] = None
update["tokens"]["date"] = ""

reingesting = update["tokens"].get("reingesting")

if update["tokens"].get("import_finished"):
# populate it for cases when import was done before we introduced the field
update["tokens"].setdefault("next_start", update["tokens"]["import_finished"] - timedelta(hours=5))
Expand Down Expand Up @@ -120,18 +145,27 @@ def _update(self, provider, update):
if date > processed_date # when continuing skip previously ingested days
)
logger.info("ingest from onclusive %s with params %s", url, params)
lock_name = get_lock_id("ingest", provider["name"], provider["_id"])
try:
for i in iterations:
if not touch(lock_name, expire=60 * 15):
break
params[iterations_param] = i
logger.info("Onclusive PARAMS %s", params)
content = self._fetch(url, params, provider, update["tokens"])
items = parser.parse(content, provider)
logger.info("Onclusive returned %d items", len(items))
for item in items:
item.setdefault("language", self.language)
yield items
if reingesting:
item["versioncreated"] += timedelta(seconds=1) # bump versioncreated to trigger an update
if items:
yield items
update["tokens"][iterations_param] = i
update["tokens"].setdefault("import_finished", utcnow())
else:
# there was no break so we are done
update["tokens"]["import_finished"] = utcnow()
update["tokens"]["reingesting"] = False
except SoftTimeLimitExceeded:
logger.warning("stopped due to time limit, tokens=%s", update["tokens"])

Expand Down
130 changes: 86 additions & 44 deletions server/planning/feeding_services/onclusive_api_service_tests.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from planning.feed_parsers.onclusive import OnclusiveFeedParser
from .onclusive_api_service import OnclusiveApiService
from unittest.mock import MagicMock
from datetime import datetime, timedelta

import flask
import unittest
import requests_mock
import responses

from unittest.mock import MagicMock, patch
from datetime import datetime, timedelta
from planning.feed_parsers.onclusive import OnclusiveFeedParser

from .onclusive_api_service import OnclusiveApiService


parser = MagicMock(OnclusiveFeedParser)
Expand All @@ -15,54 +16,95 @@ class OnclusiveApiServiceTestCase(unittest.TestCase):
def setUp(self) -> None:
super().setUp()
self.app = flask.Flask(__name__)
self.service = OnclusiveApiService()
self.service.get_feed_parser = MagicMock(return_value=parser)
event = {"versioncreated": datetime(2023, 3, 1, 8, 0, 0)}

def test_update(self):
event = {"versioncreated": datetime.fromisoformat("2023-03-01T08:00:00")}
with self.app.app_context():
now = datetime.utcnow()
service = OnclusiveApiService()
service.get_feed_parser = MagicMock(return_value=parser)
parser.parse.return_value = [event]

provider = {
"_id": "onclusive_api",
"name": "onclusive",
"feed_parser": "onclusive_api",
"config": {"url": "https://api.abc.com", "username": "user", "password": "pass", "days": "30"},
}
parser.parse.return_value = [
event.copy(),
]

# for requests.json we need to convert datetime to string
self.event = {"versioncreated": event["versioncreated"].isoformat()}

self.provider = {
"_id": "onclusive_api",
"name": "onclusive",
"feed_parser": "onclusive_api",
"config": {"url": "https://api.abc.com", "username": "user", "password": "pass", "days": "30"},
}

@responses.activate
@patch("planning.feeding_services.onclusive_api_service.touch")
def test_update(self, lock_touch):
responses.post(
url="https://api.abc.com/api/v2/auth",
json={
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)

now = datetime.now()
responses.get(
"https://api.abc.com/api/v2/events/date?date={}".format(now.strftime("%Y%m%d")),
json=[self.event],
) # first returns an item
responses.get("https://api.abc.com/api/v2/events/date", json=[]) # ones won't

with self.app.app_context():
updates = {}
with requests_mock.Mocker() as m:
m.post(
"https://api.abc.com/api/v2/auth",
json={
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)
m.get(
"https://api.abc.com/api/v2/events/date?date={}".format(now.strftime("%Y%m%d")),
json=[{"versioncreated": event["versioncreated"].isoformat()}],
) # first returns an item
m.get("https://api.abc.com/api/v2/events/date", json=[]) # ones won't
items = list(service._update(provider, updates))
items = list(self.service._update(self.provider, updates))
self.assertIn("tokens", updates)
self.assertEqual("refresh", updates["tokens"]["refreshToken"])
self.assertIn("import_finished", updates["tokens"])
self.assertEqual(updates["last_updated"], updates["tokens"]["next_start"])
self.assertEqual("fr-CA", items[0][0]["language"])

provider.update(updates)
self.provider.update(updates)
updates = {}
with requests_mock.Mocker() as m:
m.post(
"https://api.abc.com/api/v2/auth/renew",
responses.post(
"https://api.abc.com/api/v2/auth/renew",
json={
"token": "tok2",
"refreshToken": "refresh2",
},
)
responses.get("https://api.abc.com/api/v2/events/latest", json=[])
list(self.service._update(self.provider, updates))
self.assertEqual("refresh2", updates["tokens"]["refreshToken"])

@patch("planning.feeding_services.onclusive_api_service.touch")
def test_reingest(self, lock_touch):
with self.app.app_context():
start = datetime.now() - timedelta(days=30)
self.provider["config"]["days_to_reingest"] = "30"
self.provider["config"]["days_to_ingest"] = "10"
updates = {}
with responses.RequestsMock() as rsps: # checks if all requests were fired
rsps.add(
responses.POST,
url="https://api.abc.com/api/v2/auth",
json={
"token": "tok2",
"refreshToken": "refresh2",
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)
m.get("https://api.abc.com/api/v2/events/latest", json=[])
list(service._update(provider, updates))
self.assertEqual("refresh2", updates["tokens"]["refreshToken"])

for i in range(0, 10):
rsps.add(
responses.GET,
"https://api.abc.com/api/v2/events/date?limit=2000&date={}".format(
(start + timedelta(days=i)).strftime("%Y%m%d")
),
json=[self.event],
)

items = list(self.service._update(self.provider, updates))
assert 10 == len(items)
assert 1 == len(items[0])
assert items[0][0]["versioncreated"].isoformat() > self.event["versioncreated"]
assert updates["tokens"]["import_finished"]
assert not updates["tokens"]["reingesting"]

0 comments on commit d7dfa2f

Please sign in to comment.