From d7dfa2fece11962e9a51bfefb1e7490ab027d8fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Ja=C5=A1ek?= Date: Wed, 24 Apr 2024 08:29:00 +0200 Subject: [PATCH] implement onclusive reingesting (#1971) SDCP-751 --- .../feeding_services/onclusive_api_service.py | 38 ++++- .../onclusive_api_service_tests.py | 130 ++++++++++++------ 2 files changed, 122 insertions(+), 46 deletions(-) diff --git a/server/planning/feeding_services/onclusive_api_service.py b/server/planning/feeding_services/onclusive_api_service.py index 66e116e59..81e300cc6 100644 --- a/server/planning/feeding_services/onclusive_api_service.py +++ b/server/planning/feeding_services/onclusive_api_service.py @@ -12,6 +12,8 @@ from urllib.parse import urljoin from superdesk.errors import ProviderError from celery.exceptions import SoftTimeLimitExceeded +from superdesk.celery_task_utils import get_lock_id +from superdesk.lock import touch logger = logging.getLogger(__name__) @@ -58,6 +60,13 @@ class OnclusiveApiService(HTTPFeedingServiceBase): "required": False, "default": 365, }, + { + "id": "days_to_reingest", + "type": "text", + "label": lazy_gettext("Days in the past to Reingest"), + "placeholder": lazy_gettext("Days"), + "required": False, + }, ] HTTP_AUTH = False @@ -86,6 +95,22 @@ def _update(self, provider, update): update["last_updated"] = utcnow().replace( second=0 ) # next time start from here, onclusive api does not use seconds + + # force reingest starting from now - days_to_reingest + if provider["config"].get("days_to_reingest"): + start_date = datetime.now() - timedelta(days=int(provider["config"]["days_to_reingest"])) + logger.info("Reingesting from %s", start_date.date().isoformat()) + update["config"] = provider["config"].copy() + update["config"]["days_to_reingest"] = "" + # override to reset + update["tokens"]["start_date"] = start_date + update["tokens"]["next_start"] = start_date + update["tokens"]["reingesting"] = True + update["tokens"]["import_finished"] = None + update["tokens"]["date"] = "" + + reingesting = update["tokens"].get("reingesting") + if update["tokens"].get("import_finished"): # populate it for cases when import was done before we introduced the field update["tokens"].setdefault("next_start", update["tokens"]["import_finished"] - timedelta(hours=5)) @@ -120,8 +145,11 @@ def _update(self, provider, update): if date > processed_date # when continuing skip previously ingested days ) logger.info("ingest from onclusive %s with params %s", url, params) + lock_name = get_lock_id("ingest", provider["name"], provider["_id"]) try: for i in iterations: + if not touch(lock_name, expire=60 * 15): + break params[iterations_param] = i logger.info("Onclusive PARAMS %s", params) content = self._fetch(url, params, provider, update["tokens"]) @@ -129,9 +157,15 @@ def _update(self, provider, update): logger.info("Onclusive returned %d items", len(items)) for item in items: item.setdefault("language", self.language) - yield items + if reingesting: + item["versioncreated"] += timedelta(seconds=1) # bump versioncreated to trigger an update + if items: + yield items update["tokens"][iterations_param] = i - update["tokens"].setdefault("import_finished", utcnow()) + else: + # there was no break so we are done + update["tokens"]["import_finished"] = utcnow() + update["tokens"]["reingesting"] = False except SoftTimeLimitExceeded: logger.warning("stopped due to time limit, tokens=%s", update["tokens"]) diff --git a/server/planning/feeding_services/onclusive_api_service_tests.py b/server/planning/feeding_services/onclusive_api_service_tests.py index 243864d02..22a1a03f0 100644 --- a/server/planning/feeding_services/onclusive_api_service_tests.py +++ b/server/planning/feeding_services/onclusive_api_service_tests.py @@ -1,11 +1,12 @@ -from planning.feed_parsers.onclusive import OnclusiveFeedParser -from .onclusive_api_service import OnclusiveApiService -from unittest.mock import MagicMock -from datetime import datetime, timedelta - import flask import unittest -import requests_mock +import responses + +from unittest.mock import MagicMock, patch +from datetime import datetime, timedelta +from planning.feed_parsers.onclusive import OnclusiveFeedParser + +from .onclusive_api_service import OnclusiveApiService parser = MagicMock(OnclusiveFeedParser) @@ -15,54 +16,95 @@ class OnclusiveApiServiceTestCase(unittest.TestCase): def setUp(self) -> None: super().setUp() self.app = flask.Flask(__name__) + self.service = OnclusiveApiService() + self.service.get_feed_parser = MagicMock(return_value=parser) + event = {"versioncreated": datetime(2023, 3, 1, 8, 0, 0)} - def test_update(self): - event = {"versioncreated": datetime.fromisoformat("2023-03-01T08:00:00")} - with self.app.app_context(): - now = datetime.utcnow() - service = OnclusiveApiService() - service.get_feed_parser = MagicMock(return_value=parser) - parser.parse.return_value = [event] - - provider = { - "_id": "onclusive_api", - "name": "onclusive", - "feed_parser": "onclusive_api", - "config": {"url": "https://api.abc.com", "username": "user", "password": "pass", "days": "30"}, - } + parser.parse.return_value = [ + event.copy(), + ] + + # for requests.json we need to convert datetime to string + self.event = {"versioncreated": event["versioncreated"].isoformat()} + + self.provider = { + "_id": "onclusive_api", + "name": "onclusive", + "feed_parser": "onclusive_api", + "config": {"url": "https://api.abc.com", "username": "user", "password": "pass", "days": "30"}, + } + @responses.activate + @patch("planning.feeding_services.onclusive_api_service.touch") + def test_update(self, lock_touch): + responses.post( + url="https://api.abc.com/api/v2/auth", + json={ + "token": "tok", + "refreshToken": "refresh", + "productId": 10, + }, + ) + + now = datetime.now() + responses.get( + "https://api.abc.com/api/v2/events/date?date={}".format(now.strftime("%Y%m%d")), + json=[self.event], + ) # first returns an item + responses.get("https://api.abc.com/api/v2/events/date", json=[]) # ones won't + + with self.app.app_context(): updates = {} - with requests_mock.Mocker() as m: - m.post( - "https://api.abc.com/api/v2/auth", - json={ - "token": "tok", - "refreshToken": "refresh", - "productId": 10, - }, - ) - m.get( - "https://api.abc.com/api/v2/events/date?date={}".format(now.strftime("%Y%m%d")), - json=[{"versioncreated": event["versioncreated"].isoformat()}], - ) # first returns an item - m.get("https://api.abc.com/api/v2/events/date", json=[]) # ones won't - items = list(service._update(provider, updates)) + items = list(self.service._update(self.provider, updates)) self.assertIn("tokens", updates) self.assertEqual("refresh", updates["tokens"]["refreshToken"]) self.assertIn("import_finished", updates["tokens"]) self.assertEqual(updates["last_updated"], updates["tokens"]["next_start"]) self.assertEqual("fr-CA", items[0][0]["language"]) - provider.update(updates) + self.provider.update(updates) updates = {} - with requests_mock.Mocker() as m: - m.post( - "https://api.abc.com/api/v2/auth/renew", + responses.post( + "https://api.abc.com/api/v2/auth/renew", + json={ + "token": "tok2", + "refreshToken": "refresh2", + }, + ) + responses.get("https://api.abc.com/api/v2/events/latest", json=[]) + list(self.service._update(self.provider, updates)) + self.assertEqual("refresh2", updates["tokens"]["refreshToken"]) + + @patch("planning.feeding_services.onclusive_api_service.touch") + def test_reingest(self, lock_touch): + with self.app.app_context(): + start = datetime.now() - timedelta(days=30) + self.provider["config"]["days_to_reingest"] = "30" + self.provider["config"]["days_to_ingest"] = "10" + updates = {} + with responses.RequestsMock() as rsps: # checks if all requests were fired + rsps.add( + responses.POST, + url="https://api.abc.com/api/v2/auth", json={ - "token": "tok2", - "refreshToken": "refresh2", + "token": "tok", + "refreshToken": "refresh", + "productId": 10, }, ) - m.get("https://api.abc.com/api/v2/events/latest", json=[]) - list(service._update(provider, updates)) - self.assertEqual("refresh2", updates["tokens"]["refreshToken"]) + + for i in range(0, 10): + rsps.add( + responses.GET, + "https://api.abc.com/api/v2/events/date?limit=2000&date={}".format( + (start + timedelta(days=i)).strftime("%Y%m%d") + ), + json=[self.event], + ) + + items = list(self.service._update(self.provider, updates)) + assert 10 == len(items) + assert 1 == len(items[0]) + assert items[0][0]["versioncreated"].isoformat() > self.event["versioncreated"] + assert updates["tokens"]["import_finished"] + assert not updates["tokens"]["reingesting"]