From a976c9f277c170df48986b2137e05f1b8d69c650 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 06:29:02 -0600 Subject: [PATCH 01/20] Fixes #1097 --- .envs/.local/.django | 2 +- local.yml | 25 +++++++---- sde_collections/sinequa_api.py | 81 +++++++++++++++++++++++++--------- sde_collections/tasks.py | 31 ++++--------- 4 files changed, 85 insertions(+), 54 deletions(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 0978166d..9e7b56c4 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='' +XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' diff --git a/local.yml b/local.yml index ebdb810b..84893914 100644 --- a/local.yml +++ b/local.yml @@ -54,14 +54,23 @@ services: container_name: sde_indexing_helper_local_redis celeryworker: - <<: *django - image: sde_indexing_helper_local_celeryworker - container_name: sde_indexing_helper_local_celeryworker - depends_on: - - redis - - postgres - ports: [] - command: /start-celeryworker + <<: *django + image: sde_indexing_helper_local_celeryworker + container_name: sde_indexing_helper_local_celeryworker + depends_on: + - redis + - postgres + ports: [] + command: /start-celeryworker + deploy: + resources: + limits: + cpus: '4.0' + memory: 8G + reservations: + cpus: '2.0' + memory: 4G + # celerybeat: # <<: *django diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index 868afb77..f3e7dda3 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -1,10 +1,10 @@ import json from typing import Any - import requests import urllib3 from django.conf import settings - +from .models.delta_url import DumpUrl +from django.db import transaction urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) server_configs = { @@ -134,26 +134,64 @@ def query(self, page: int, collection_config_folder: str | None = None, source: payload["query"]["advanced"]["collection"] = f"/{source}/{collection_config_folder}/" return self.process_response(url, payload) - - def sql_query(self, sql: str) -> Any: - """Executes an SQL query on the configured server using token-based authentication.""" + def sql_query(self, sql: str, collection) -> Any: + """Executes an SQL query on the configured server using token-based authentication with pagination.""" token = self._get_token() if not token: raise ValueError("A token is required to use the SQL endpoint") - - url = f"{self.base_url}/api/v1/engine.sql" - headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} - raw_payload = json.dumps( - { + + page = 0 + page_size = 5000 # Number of records per page + skip_records = 0 + + while True: + paginated_sql = f"{sql} SKIP {skip_records} COUNT {page_size}" + url = f"{self.base_url}/api/v1/engine.sql" + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} + raw_payload = json.dumps({ "method": "engine.sql", - "sql": sql, + "sql": paginated_sql, "pretty": True, - } - ) - - return self.process_response(url, headers=headers, raw_data=raw_payload) - - def get_full_texts(self, collection_config_folder: str, source: str = None) -> Any: + }) + + response = self.process_response(url, headers=headers, raw_data=raw_payload) + batch_data = response.get('Rows', []) + total_row_count = response.get('TotalRowCount', 0) + processed_response = self._process_full_text_response(response) + self.process_and_update_data(processed_response, collection) + print(f"Batch {page + 1} is being processed and updated") + + # Check if all rows have been fetched + if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count: + break + + page += 1 + skip_records += page_size + + return f"All {total_row_count} records have been processed and updated." + + def process_and_update_data(self, batch_data, collection): + for record in batch_data: + try: + with transaction.atomic(): + url = record['url'] + scraped_text = record.get('full_text', '') + scraped_title = record.get('title', '') + # Ensure the collection is included in the defaults + DumpUrl.objects.update_or_create( + url=url, + defaults={ + 'scraped_text': scraped_text, + 'scraped_title': scraped_title, + 'collection': collection + } + ) + except KeyError as e: + print(f"Missing key in data: {str(e)}") + except Exception as e: + print(f"Error processing record: {str(e)}") + + def get_full_texts(self, collection_config_folder: str, source: str = None, collection=None) -> Any: """ Retrieves the full texts, URLs, and titles for a specified collection. @@ -184,11 +222,10 @@ def get_full_texts(self, collection_config_folder: str, source: str = None) -> A raise ValueError("Index not defined for this server") sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'" - full_text_response = self.sql_query(sql) - return self._process_full_text_response(full_text_response) - + return self.sql_query(sql,collection) @staticmethod - def _process_full_text_response(full_text_response: str): + def _process_full_text_response(batch_data:str): return [ - {"url": url, "full_text": full_text, "title": title} for url, full_text, title in full_text_response["Rows"] + {"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"] ] + \ No newline at end of file diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 47c96338..572eccdc 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -145,13 +145,11 @@ def resolve_title_pattern(title_pattern_id): title_pattern.apply() -@celery_app.task +@celery_app.task(soft_time_limit=600) def fetch_and_replace_full_text(collection_id, server_name): """ - Task to fetch and replace full text and metadata for all URLs associated with a specified collection - from a given server. This task deletes all existing DumpUrl entries for the collection and creates - new entries based on the latest fetched data. - + Task to initiate fetching and replacing full text and metadata for all URLs associated with a specified collection + from a given server. Args: collection_id (int): The identifier for the collection in the database. server_name (str): The name of the server. @@ -161,28 +159,15 @@ def fetch_and_replace_full_text(collection_id, server_name): """ collection = Collection.objects.get(id=collection_id) api = Api(server_name) - documents = api.get_full_texts(collection.config_folder) # Step 1: Delete all existing DumpUrl entries for the collection deleted_count, _ = DumpUrl.objects.filter(collection=collection).delete() + print(f"Deleted {deleted_count} old records.") - # Step 2: Create new DumpUrl entries from the fetched documents - processed_count = 0 - for doc in documents: - try: - DumpUrl.objects.create( - url=doc["url"], - collection=collection, - scraped_text=doc.get("full_text", ""), - scraped_title=doc.get("title", ""), - ) - processed_count += 1 - except IntegrityError: - # Handle duplicate URL case if needed - print(f"Duplicate URL found, skipping: {doc['url']}") + # Step 2: Fetch and process new data + result_message = api.get_full_texts(collection.config_folder,collection=collection) + # Step 3: Migrate DumpUrl to DeltaUrl collection.migrate_dump_to_delta() - print(f"Processed {processed_count} new records.") - - return f"Successfully processed {len(documents)} records and updated the database." + return result_message From 663457613ac301d613c08949406f2dfa4b5ed92d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:53:46 +0000 Subject: [PATCH 02/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- sde_collections/sinequa_api.py | 52 ++++++++++++++++++---------------- sde_collections/tasks.py | 2 +- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index f3e7dda3..c16277d5 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -1,10 +1,13 @@ import json from typing import Any + import requests import urllib3 from django.conf import settings -from .models.delta_url import DumpUrl from django.db import transaction + +from .models.delta_url import DumpUrl + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) server_configs = { @@ -134,29 +137,32 @@ def query(self, page: int, collection_config_folder: str | None = None, source: payload["query"]["advanced"]["collection"] = f"/{source}/{collection_config_folder}/" return self.process_response(url, payload) + def sql_query(self, sql: str, collection) -> Any: """Executes an SQL query on the configured server using token-based authentication with pagination.""" token = self._get_token() if not token: raise ValueError("A token is required to use the SQL endpoint") - + page = 0 page_size = 5000 # Number of records per page - skip_records = 0 + skip_records = 0 while True: paginated_sql = f"{sql} SKIP {skip_records} COUNT {page_size}" url = f"{self.base_url}/api/v1/engine.sql" headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} - raw_payload = json.dumps({ - "method": "engine.sql", - "sql": paginated_sql, - "pretty": True, - }) + raw_payload = json.dumps( + { + "method": "engine.sql", + "sql": paginated_sql, + "pretty": True, + } + ) response = self.process_response(url, headers=headers, raw_data=raw_payload) - batch_data = response.get('Rows', []) - total_row_count = response.get('TotalRowCount', 0) + batch_data = response.get("Rows", []) + total_row_count = response.get("TotalRowCount", 0) processed_response = self._process_full_text_response(response) self.process_and_update_data(processed_response, collection) print(f"Batch {page + 1} is being processed and updated") @@ -174,17 +180,17 @@ def process_and_update_data(self, batch_data, collection): for record in batch_data: try: with transaction.atomic(): - url = record['url'] - scraped_text = record.get('full_text', '') - scraped_title = record.get('title', '') + url = record["url"] + scraped_text = record.get("full_text", "") + scraped_title = record.get("title", "") # Ensure the collection is included in the defaults DumpUrl.objects.update_or_create( - url=url, + url=url, defaults={ - 'scraped_text': scraped_text, - 'scraped_title': scraped_title, - 'collection': collection - } + "scraped_text": scraped_text, + "scraped_title": scraped_title, + "collection": collection, + }, ) except KeyError as e: print(f"Missing key in data: {str(e)}") @@ -222,10 +228,8 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll raise ValueError("Index not defined for this server") sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'" - return self.sql_query(sql,collection) + return self.sql_query(sql, collection) + @staticmethod - def _process_full_text_response(batch_data:str): - return [ - {"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"] - ] - \ No newline at end of file + def _process_full_text_response(batch_data: str): + return [{"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"]] diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 572eccdc..5a4bb3be 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -165,7 +165,7 @@ def fetch_and_replace_full_text(collection_id, server_name): print(f"Deleted {deleted_count} old records.") # Step 2: Fetch and process new data - result_message = api.get_full_texts(collection.config_folder,collection=collection) + result_message = api.get_full_texts(collection.config_folder, collection=collection) # Step 3: Migrate DumpUrl to DeltaUrl collection.migrate_dump_to_delta() From 05ec13bd8c6ee135963b7a98cfe6829d3c69d2a3 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 06:57:03 -0600 Subject: [PATCH 03/20] Updated_#1097 --- sde_collections/tasks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 5a4bb3be..188147ac 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -7,7 +7,6 @@ from django.conf import settings from django.core import management from django.core.management.commands import loaddata -from django.db import IntegrityError from config import celery_app From df4a1de1c0a8415a3e050a5d438d071b9d79d6ed Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 07:00:14 -0600 Subject: [PATCH 04/20] Fixes_Issue__#1097 --- .envs/.local/.django | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 9e7b56c4..54c76263 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' +XLI_TOKEN='' \ No newline at end of file From 03ce0e63bfa643e717f2acba946bf7719c44824f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:00:42 +0000 Subject: [PATCH 05/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .envs/.local/.django | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 54c76263..0978166d 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='' \ No newline at end of file +XLI_TOKEN='' From 89756bfc769c17583235ec998d38c452cd05b182 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 08:30:10 -0600 Subject: [PATCH 06/20] Include Api tests #1097 --- sde_collections/tests/api_tests.py | 159 +++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 sde_collections/tests/api_tests.py diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py new file mode 100644 index 00000000..7f34e45f --- /dev/null +++ b/sde_collections/tests/api_tests.py @@ -0,0 +1,159 @@ +#docker-compose -f local.yml run --rm django pytest sde_collections/tests/api_tests.py +import pytest +from unittest.mock import patch, MagicMock +from django.utils import timezone +from sde_collections.models.collection import Collection, WorkflowStatusChoices +from sde_collections.models.delta_url import DumpUrl +from sde_collections.tests.factories import CollectionFactory, UserFactory +from sde_collections.sinequa_api import Api +from sde_collections.tasks import fetch_and_replace_full_text + + +@pytest.mark.django_db +class TestApiClass: + @pytest.fixture + def collection(self): + """Fixture to create a collection object for testing.""" + user = UserFactory() + return CollectionFactory( + curated_by=user, + curation_started=timezone.now(), + config_folder="example_config", + workflow_status=WorkflowStatusChoices.RESEARCH_IN_PROGRESS + ) + + @pytest.fixture + def api_instance(self): + """Fixture to create an Api instance with mocked server configs.""" + with patch("sde_collections.sinequa_api.server_configs", { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index" + } + }): + return Api(server_name="test_server", user="test_user", password="test_pass", token="test_token") + + @patch("requests.post") + def test_process_response_success(self, mock_post, api_instance): + """Test that process_response handles successful responses.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"key": "value"} + mock_post.return_value = mock_response + + response = api_instance.process_response("http://example.com", payload={"test": "data"}) + assert response == {"key": "value"} + + @patch("requests.post") + def test_process_response_failure(self, mock_post, api_instance): + """Test that process_response raises an exception on failure.""" + mock_response = MagicMock() + mock_response.status_code = 500 + mock_post.return_value = mock_response + mock_response.raise_for_status.side_effect = Exception("Internal Server Error") + + with pytest.raises(Exception, match="Internal Server Error"): + api_instance.process_response("http://example.com", payload={"test": "data"}) + + @patch("sde_collections.sinequa_api.Api.process_response") + def test_query(self, mock_process_response, api_instance): + """Test that query sends correct payload and processes response.""" + mock_process_response.return_value = {"result": "success"} + response = api_instance.query(page=1, collection_config_folder="folder") + assert response == {"result": "success"} + + @patch("sde_collections.sinequa_api.Api.process_response") + def test_sql_query(self, mock_process_response, api_instance, collection): + """Test SQL query execution and response processing.""" + mock_process_response.return_value = { + "Rows": [{"url": "http://example.com", "full_text": "Text", "title": "Title"}], + "TotalRowCount": 1 + } + response = api_instance.sql_query("SELECT * FROM test_index", collection) + assert response == "All 1 records have been processed and updated." + + @patch("sde_collections.sinequa_api.Api.process_response") + def test_get_full_texts(self, mock_process_response, api_instance, collection): + """Test fetching full texts from the API.""" + mock_process_response.return_value = { + "Rows": [{"url": "http://example.com", "text": "Example text", "title": "Example title"}] + } + response = api_instance.get_full_texts(collection_config_folder="folder", source="source", collection=collection) + assert response == "All 0 records have been processed and updated." + + def test_process_and_update_data(self, api_instance, collection): + """Test processing and updating data in the database.""" + batch_data = [ + {"url": "http://example.com", "full_text": "Example text", "title": "Example title"} + ] + api_instance.process_and_update_data(batch_data, collection) + dump_urls = DumpUrl.objects.filter(collection=collection) + assert dump_urls.count() == 1 + assert dump_urls.first().url == "http://example.com" + + @patch("sde_collections.sinequa_api.Api.sql_query") + @patch("sde_collections.models.collection.Collection.migrate_dump_to_delta") + def test_fetch_and_replace_full_text(self, mock_migrate, mock_sql_query, collection): + """Test the fetch_and_replace_full_text Celery task.""" + with patch("sde_collections.sinequa_api.server_configs", { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index" + } + }): + mock_sql_query.return_value = "All records processed" + mock_migrate.return_value = None + + result = fetch_and_replace_full_text(collection.id, "test_server") + assert result == "All records processed" + mock_migrate.assert_called_once() + + @patch("sde_collections.sinequa_api.server_configs", { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index" + } + }) + @pytest.mark.parametrize("server_name, user, password, expected", [ + ("test_server", "user1", "pass1", True), + ("invalid_server", None, None, False) + ]) + def test_api_init(self, server_name, user, password, expected): + """Test API initialization with valid and invalid server names.""" + if expected: + api = Api(server_name=server_name, user=user, password=password) + assert api.server_name == server_name + else: + with pytest.raises(ValueError): + Api(server_name=server_name) + + @patch("requests.post") + def test_query_dev_server_authentication(self, mock_post, api_instance): + """Test query on dev servers requiring authentication.""" + api_instance.server_name = "xli" # Setting a dev server + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"result": "success"}) + + response = api_instance.query(page=1, collection_config_folder="folder") + assert response == {"result": "success"} + + # Extract URL from call_args (positional arguments) + called_url = mock_post.call_args[0][0] # URL is the first positional argument + assert "?Password=test_pass&User=test_user" in called_url + + @patch("sde_collections.sinequa_api.Api.process_response") + def test_sql_query_pagination(self, mock_process_response, api_instance, collection): + """Test SQL query with pagination.""" + mock_process_response.side_effect = [ + {"Rows": [{"url": "http://example.com/1", "full_text": "Text 1", "title": "Title 1"}], "TotalRowCount": 6}, + {"Rows": [{"url": "http://example.com/2", "full_text": "Text 2", "title": "Title 2"}], "TotalRowCount": 6}, + {"Rows": [], "TotalRowCount": 6}, + ] + + result = api_instance.sql_query("SELECT * FROM test_index", collection) + assert result == "All 6 records have been processed and updated." From 280e49134fc0e6186596442bf7cb44165ecb08ab Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 14:31:12 +0000 Subject: [PATCH 07/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- sde_collections/tests/api_tests.py | 85 +++++++++++++++++------------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 7f34e45f..46487d19 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -1,12 +1,14 @@ -#docker-compose -f local.yml run --rm django pytest sde_collections/tests/api_tests.py +# docker-compose -f local.yml run --rm django pytest sde_collections/tests/api_tests.py +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock from django.utils import timezone + from sde_collections.models.collection import Collection, WorkflowStatusChoices from sde_collections.models.delta_url import DumpUrl -from sde_collections.tests.factories import CollectionFactory, UserFactory from sde_collections.sinequa_api import Api from sde_collections.tasks import fetch_and_replace_full_text +from sde_collections.tests.factories import CollectionFactory, UserFactory @pytest.mark.django_db @@ -19,20 +21,23 @@ def collection(self): curated_by=user, curation_started=timezone.now(), config_folder="example_config", - workflow_status=WorkflowStatusChoices.RESEARCH_IN_PROGRESS + workflow_status=WorkflowStatusChoices.RESEARCH_IN_PROGRESS, ) @pytest.fixture def api_instance(self): """Fixture to create an Api instance with mocked server configs.""" - with patch("sde_collections.sinequa_api.server_configs", { - "test_server": { - "app_name": "test_app", - "query_name": "test_query", - "base_url": "http://testserver.com/api", - "index": "test_index" - } - }): + with patch( + "sde_collections.sinequa_api.server_configs", + { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index", + } + }, + ): return Api(server_name="test_server", user="test_user", password="test_pass", token="test_token") @patch("requests.post") @@ -69,7 +74,7 @@ def test_sql_query(self, mock_process_response, api_instance, collection): """Test SQL query execution and response processing.""" mock_process_response.return_value = { "Rows": [{"url": "http://example.com", "full_text": "Text", "title": "Title"}], - "TotalRowCount": 1 + "TotalRowCount": 1, } response = api_instance.sql_query("SELECT * FROM test_index", collection) assert response == "All 1 records have been processed and updated." @@ -80,14 +85,14 @@ def test_get_full_texts(self, mock_process_response, api_instance, collection): mock_process_response.return_value = { "Rows": [{"url": "http://example.com", "text": "Example text", "title": "Example title"}] } - response = api_instance.get_full_texts(collection_config_folder="folder", source="source", collection=collection) + response = api_instance.get_full_texts( + collection_config_folder="folder", source="source", collection=collection + ) assert response == "All 0 records have been processed and updated." def test_process_and_update_data(self, api_instance, collection): """Test processing and updating data in the database.""" - batch_data = [ - {"url": "http://example.com", "full_text": "Example text", "title": "Example title"} - ] + batch_data = [{"url": "http://example.com", "full_text": "Example text", "title": "Example title"}] api_instance.process_and_update_data(batch_data, collection) dump_urls = DumpUrl.objects.filter(collection=collection) assert dump_urls.count() == 1 @@ -97,14 +102,17 @@ def test_process_and_update_data(self, api_instance, collection): @patch("sde_collections.models.collection.Collection.migrate_dump_to_delta") def test_fetch_and_replace_full_text(self, mock_migrate, mock_sql_query, collection): """Test the fetch_and_replace_full_text Celery task.""" - with patch("sde_collections.sinequa_api.server_configs", { - "test_server": { - "app_name": "test_app", - "query_name": "test_query", - "base_url": "http://testserver.com/api", - "index": "test_index" - } - }): + with patch( + "sde_collections.sinequa_api.server_configs", + { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index", + } + }, + ): mock_sql_query.return_value = "All records processed" mock_migrate.return_value = None @@ -112,18 +120,21 @@ def test_fetch_and_replace_full_text(self, mock_migrate, mock_sql_query, collect assert result == "All records processed" mock_migrate.assert_called_once() - @patch("sde_collections.sinequa_api.server_configs", { - "test_server": { - "app_name": "test_app", - "query_name": "test_query", - "base_url": "http://testserver.com/api", - "index": "test_index" - } - }) - @pytest.mark.parametrize("server_name, user, password, expected", [ - ("test_server", "user1", "pass1", True), - ("invalid_server", None, None, False) - ]) + @patch( + "sde_collections.sinequa_api.server_configs", + { + "test_server": { + "app_name": "test_app", + "query_name": "test_query", + "base_url": "http://testserver.com/api", + "index": "test_index", + } + }, + ) + @pytest.mark.parametrize( + "server_name, user, password, expected", + [("test_server", "user1", "pass1", True), ("invalid_server", None, None, False)], + ) def test_api_init(self, server_name, user, password, expected): """Test API initialization with valid and invalid server names.""" if expected: From 3c1114389844cf6452bd0bb9c8f11ae1603fe0ed Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 21 Nov 2024 08:45:43 -0600 Subject: [PATCH 08/20] Include_Api_tests #1097 --- sde_collections/tests/api_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 46487d19..0a7a9245 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -4,7 +4,7 @@ import pytest from django.utils import timezone -from sde_collections.models.collection import Collection, WorkflowStatusChoices +from sde_collections.models.collection import WorkflowStatusChoices from sde_collections.models.delta_url import DumpUrl from sde_collections.sinequa_api import Api from sde_collections.tasks import fetch_and_replace_full_text From 28956276361a055b57bfb7a2d09ff02ace5ffd83 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:27:23 -0600 Subject: [PATCH 09/20] celeryworker_updates --- .envs/.local/.django | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 0978166d..172022c5 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -45,5 +45,5 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- -LRM_DEV_TOKEN='' -XLI_TOKEN='' +LRM_DEV_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI5NDgzMzU4LCJzaWQiOiIwRUM1QjI3QjU1RTQ0QjhBODA2QzM2QjY0REM0QkVCNiIsImtpbmQiOiJhY2Nlc3MiLCJzdWIiOiJzaW5lcXVhfGNvc21vc19tbF91c2VyIn0.slzYgP9vr1CE-lVRo3ZzJ7sTlh-S9bBC-bX5PUt5Ns8' +XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' From 5f8e7e1bd882b6667ea2f9fd74e3f5c7e1237289 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:29:47 -0600 Subject: [PATCH 10/20] latest --- .envs/.local/.django | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 172022c5..54c76263 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -45,5 +45,5 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- -LRM_DEV_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI5NDgzMzU4LCJzaWQiOiIwRUM1QjI3QjU1RTQ0QjhBODA2QzM2QjY0REM0QkVCNiIsImtpbmQiOiJhY2Nlc3MiLCJzdWIiOiJzaW5lcXVhfGNvc21vc19tbF91c2VyIn0.slzYgP9vr1CE-lVRo3ZzJ7sTlh-S9bBC-bX5PUt5Ns8' -XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' +LRM_DEV_TOKEN='' +XLI_TOKEN='' \ No newline at end of file From 4747f591fe192fd04090a3af0771d6f443ba48f1 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:30:48 -0600 Subject: [PATCH 11/20] latest --- local.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/local.yml b/local.yml index 84893914..49611576 100644 --- a/local.yml +++ b/local.yml @@ -62,14 +62,6 @@ services: - postgres ports: [] command: /start-celeryworker - deploy: - resources: - limits: - cpus: '4.0' - memory: 8G - reservations: - cpus: '2.0' - memory: 4G # celerybeat: From 437cd659a32c3455864a33705c17b18b58044d2c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 25 Nov 2024 21:31:25 +0000 Subject: [PATCH 12/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .envs/.local/.django | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.envs/.local/.django b/.envs/.local/.django index 54c76263..0978166d 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='' \ No newline at end of file +XLI_TOKEN='' From 41dd9e57d9cd7ff9adf14f4fb8fc540d5f064a4d Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:35:42 -0600 Subject: [PATCH 13/20] latest --- local.yml | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/local.yml b/local.yml index 49611576..7359ac75 100644 --- a/local.yml +++ b/local.yml @@ -53,16 +53,15 @@ services: image: redis:6 container_name: sde_indexing_helper_local_redis - celeryworker: - <<: *django - image: sde_indexing_helper_local_celeryworker - container_name: sde_indexing_helper_local_celeryworker - depends_on: - - redis - - postgres - ports: [] - command: /start-celeryworker - + celeryworker: + <<: *django + image: sde_indexing_helper_local_celeryworker + container_name: sde_indexing_helper_local_celeryworker + depends_on: + - redis + - postgres + ports: [] + command: /start-celeryworker # celerybeat: # <<: *django From be863894a79c304162722b2bcf4d75ef041f90b1 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 25 Nov 2024 15:38:44 -0600 Subject: [PATCH 14/20] latest_ --- local.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/local.yml b/local.yml index 7359ac75..ebdb810b 100644 --- a/local.yml +++ b/local.yml @@ -53,7 +53,7 @@ services: image: redis:6 container_name: sde_indexing_helper_local_redis - celeryworker: + celeryworker: <<: *django image: sde_indexing_helper_local_celeryworker container_name: sde_indexing_helper_local_celeryworker From 77b1ec328b321d099bea36aa1579f8abc3da601d Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 26 Nov 2024 01:33:50 -0600 Subject: [PATCH 15/20] Fixes #1096 --- sde_collections/sinequa_api.py | 14 +++++++++++-- sde_collections/tests/api_tests.py | 33 +++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index c16277d5..d0713062 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -231,5 +231,15 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll return self.sql_query(sql, collection) @staticmethod - def _process_full_text_response(batch_data: str): - return [{"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"]] + def _process_full_text_response(batch_data: dict): + if 'Rows' not in batch_data or not isinstance(batch_data['Rows'], list): + raise ValueError("Expected 'Rows' key with a list of data.") + + processed_data = [] + for row in batch_data['Rows']: + # Ensure each row has exactly three elements (url, full_text, title) + if len(row) != 3: + raise ValueError("Each row must contain exactly three elements (url, full_text, title).") + url, full_text, title = row + processed_data.append({"url": url, "full_text": full_text, "title": title}) + return processed_data diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 0a7a9245..85db82a8 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -147,7 +147,7 @@ def test_api_init(self, server_name, user, password, expected): @patch("requests.post") def test_query_dev_server_authentication(self, mock_post, api_instance): """Test query on dev servers requiring authentication.""" - api_instance.server_name = "xli" # Setting a dev server + api_instance.server_name = "xli" mock_post.return_value = MagicMock(status_code=200, json=lambda: {"result": "success"}) response = api_instance.query(page=1, collection_config_folder="folder") @@ -168,3 +168,34 @@ def test_sql_query_pagination(self, mock_process_response, api_instance, collect result = api_instance.sql_query("SELECT * FROM test_index", collection) assert result == "All 6 records have been processed and updated." + + def test_process_full_text_response(self, api_instance): + """Test that _process_full_text_response correctly processes the data.""" + batch_data = {"Rows": [ + ["http://example.com", "Example text", "Example title"], + ["http://example.net", "Another text", "Another title"] + ]} + expected_output = [ + {"url": "http://example.com", "full_text": "Example text", "title": "Example title"}, + {"url": "http://example.net", "full_text": "Another text", "title": "Another title"} + ] + result = api_instance._process_full_text_response(batch_data) + assert result == expected_output + + def test_process_full_text_response_with_invalid_data(self, api_instance): + """Test that _process_full_text_response raises an error with invalid data.""" + # Test for missing 'Rows' key + invalid_data_no_rows = {} # No 'Rows' key + with pytest.raises(ValueError, match="Expected 'Rows' key with a list of data"): + api_instance._process_full_text_response(invalid_data_no_rows) + + # Test for incorrect row length + invalid_data_wrong_length = {"Rows": [["http://example.com", "Example text"]]} # Missing 'title' + with pytest.raises(ValueError, match="Each row must contain exactly three elements"): + api_instance._process_full_text_response(invalid_data_wrong_length) + + @patch("sde_collections.sinequa_api.Api._get_token", return_value=None) + def test_sql_query_missing_token(self, mock_get_token, api_instance, collection): + """Test that sql_query raises an error when no token is provided.""" + with pytest.raises(ValueError, match="A token is required to use the SQL endpoint"): + api_instance.sql_query("SELECT * FROM test_table", collection) From 60fdac1f91c9324f9664fdb0a1a41369db85b5d2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 26 Nov 2024 07:34:12 +0000 Subject: [PATCH 16/20] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- sde_collections/sinequa_api.py | 4 ++-- sde_collections/tests/api_tests.py | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index d0713062..03d3a724 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -232,11 +232,11 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll @staticmethod def _process_full_text_response(batch_data: dict): - if 'Rows' not in batch_data or not isinstance(batch_data['Rows'], list): + if "Rows" not in batch_data or not isinstance(batch_data["Rows"], list): raise ValueError("Expected 'Rows' key with a list of data.") processed_data = [] - for row in batch_data['Rows']: + for row in batch_data["Rows"]: # Ensure each row has exactly three elements (url, full_text, title) if len(row) != 3: raise ValueError("Each row must contain exactly three elements (url, full_text, title).") diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 85db82a8..8abb5f08 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -171,13 +171,15 @@ def test_sql_query_pagination(self, mock_process_response, api_instance, collect def test_process_full_text_response(self, api_instance): """Test that _process_full_text_response correctly processes the data.""" - batch_data = {"Rows": [ - ["http://example.com", "Example text", "Example title"], - ["http://example.net", "Another text", "Another title"] - ]} + batch_data = { + "Rows": [ + ["http://example.com", "Example text", "Example title"], + ["http://example.net", "Another text", "Another title"], + ] + } expected_output = [ {"url": "http://example.com", "full_text": "Example text", "title": "Example title"}, - {"url": "http://example.net", "full_text": "Another text", "title": "Another title"} + {"url": "http://example.net", "full_text": "Another text", "title": "Another title"}, ] result = api_instance._process_full_text_response(batch_data) assert result == expected_output From c4c1bc1c8796f29693f68e080f9380559fd038bf Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Tue, 26 Nov 2024 15:11:41 -0600 Subject: [PATCH 17/20] improved doc strings and the errors --- sde_collections/sinequa_api.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index 03d3a724..aa559474 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -64,7 +64,7 @@ class Api: def __init__(self, server_name: str = None, user: str = None, password: str = None, token: str = None) -> None: self.server_name = server_name if server_name not in server_configs: - raise ValueError(f"Server name '{server_name}' is not in server_configs") + raise ValueError(f"Invalid server configuration: '{server_name}' is not a recognized server name") self.config = server_configs[server_name] self.app_name: str = self.config["app_name"] @@ -72,7 +72,6 @@ def __init__(self, server_name: str = None, user: str = None, password: str = No self.base_url: str = self.config["base_url"] self.dev_servers = ["xli", "lrm_dev", "lrm_qa"] - # Store provided values only self._provided_user = user self._provided_password = password self._provided_token = token @@ -116,7 +115,8 @@ def query(self, page: int, collection_config_folder: str | None = None, source: password = self._get_password() if not user or not password: raise ValueError( - "User and password are required for the query endpoint on the following servers: {self.dev_servers}" + f"Authentication error: Missing credentials for dev server '{self.server_name}'. " + f"Both username and password are required for servers: {', '.join(self.dev_servers)}" ) authentication = f"?Password={password}&User={user}" url = f"{url}{authentication}" @@ -139,10 +139,9 @@ def query(self, page: int, collection_config_folder: str | None = None, source: return self.process_response(url, payload) def sql_query(self, sql: str, collection) -> Any: - """Executes an SQL query on the configured server using token-based authentication with pagination.""" token = self._get_token() if not token: - raise ValueError("A token is required to use the SQL endpoint") + raise ValueError("Authentication error: Token is required for SQL endpoint access") page = 0 page_size = 5000 # Number of records per page @@ -165,7 +164,6 @@ def sql_query(self, sql: str, collection) -> Any: total_row_count = response.get("TotalRowCount", 0) processed_response = self._process_full_text_response(response) self.process_and_update_data(processed_response, collection) - print(f"Batch {page + 1} is being processed and updated") # Check if all rows have been fetched if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count: @@ -183,7 +181,6 @@ def process_and_update_data(self, batch_data, collection): url = record["url"] scraped_text = record.get("full_text", "") scraped_title = record.get("title", "") - # Ensure the collection is included in the defaults DumpUrl.objects.update_or_create( url=url, defaults={ @@ -193,6 +190,7 @@ def process_and_update_data(self, batch_data, collection): }, ) except KeyError as e: + # TODO: reevaluate whether this should be a Raise and break the code print(f"Missing key in data: {str(e)}") except Exception as e: print(f"Error processing record: {str(e)}") @@ -225,7 +223,10 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll source = self._get_source_name() if (index := self.config.get("index")) is None: - raise ValueError("Index not defined for this server") + raise ValueError( + f"Configuration error: Index not defined for server '{self.server_name}'. " + "Please update server configuration with the required index." + ) sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'" return self.sql_query(sql, collection) @@ -233,13 +234,18 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll @staticmethod def _process_full_text_response(batch_data: dict): if "Rows" not in batch_data or not isinstance(batch_data["Rows"], list): - raise ValueError("Expected 'Rows' key with a list of data.") + raise ValueError( + "Invalid response format: Expected 'Rows' key with list data in Sinequa server response. " + f"Received: {type(batch_data.get('Rows', None))}" + ) processed_data = [] - for row in batch_data["Rows"]: - # Ensure each row has exactly three elements (url, full_text, title) + for idx, row in enumerate(batch_data["Rows"]): if len(row) != 3: - raise ValueError("Each row must contain exactly three elements (url, full_text, title).") + raise ValueError( + f"Invalid row format at index {idx}: Expected exactly three elements (url, full_text, title). " + f"Received {len(row)} elements." + ) url, full_text, title = row processed_data.append({"url": url, "full_text": full_text, "title": title}) return processed_data From 5a9308c661f9a05983804a3c5d9bd6ea9a1f3eb2 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 2 Dec 2024 00:32:16 -0600 Subject: [PATCH 18/20] Continue in case of missing records --- sde_collections/sinequa_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index aa559474..f4a45ca3 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -190,7 +190,6 @@ def process_and_update_data(self, batch_data, collection): }, ) except KeyError as e: - # TODO: reevaluate whether this should be a Raise and break the code print(f"Missing key in data: {str(e)}") except Exception as e: print(f"Error processing record: {str(e)}") From cba90fa80d90481bc01baa4337a51e0330286701 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 2 Dec 2024 00:43:03 -0600 Subject: [PATCH 19/20] prints for each processed and updated batch --- sde_collections/sinequa_api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index f4a45ca3..99177df0 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -164,6 +164,7 @@ def sql_query(self, sql: str, collection) -> Any: total_row_count = response.get("TotalRowCount", 0) processed_response = self._process_full_text_response(response) self.process_and_update_data(processed_response, collection) + print(f"Batch {page + 1} has been processed and updated") # Check if all rows have been fetched if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count: From b91405dcf7f4bbb78dde4746d4bf61b9e4f7550b Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Tue, 3 Dec 2024 19:40:59 -0600 Subject: [PATCH 20/20] refactor sinequa_api wrapper, test suites, and full_text import --- sde_collections/sinequa_api.py | 179 +++++++++------- sde_collections/tasks.py | 50 +++-- sde_collections/tests/api_tests.py | 199 +++++++----------- .../tests/test_import_fulltexts.py | 70 ++++-- 4 files changed, 269 insertions(+), 229 deletions(-) diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index 99177df0..78ac4e05 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -1,12 +1,10 @@ import json +from collections.abc import Iterator from typing import Any import requests import urllib3 from django.conf import settings -from django.db import transaction - -from .models.delta_url import DumpUrl urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -138,85 +136,99 @@ def query(self, page: int, collection_config_folder: str | None = None, source: return self.process_response(url, payload) - def sql_query(self, sql: str, collection) -> Any: + def _execute_sql_query(self, sql: str) -> dict: + """ + Executes a SQL query against the Sinequa API. + + Args: + sql (str): The SQL query to execute + + Returns: + dict: The JSON response from the API containing 'Rows' and 'TotalRowCount' + + Raises: + ValueError: If no token is available for authentication + """ token = self._get_token() if not token: raise ValueError("Authentication error: Token is required for SQL endpoint access") - page = 0 - page_size = 5000 # Number of records per page - skip_records = 0 + url = f"{self.base_url}/api/v1/engine.sql" + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} + raw_payload = json.dumps( + { + "method": "engine.sql", + "sql": sql, + "pretty": True, + } + ) - while True: - paginated_sql = f"{sql} SKIP {skip_records} COUNT {page_size}" - url = f"{self.base_url}/api/v1/engine.sql" - headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} - raw_payload = json.dumps( - { - "method": "engine.sql", - "sql": paginated_sql, - "pretty": True, - } - ) + return self.process_response(url, headers=headers, raw_data=raw_payload) - response = self.process_response(url, headers=headers, raw_data=raw_payload) - batch_data = response.get("Rows", []) - total_row_count = response.get("TotalRowCount", 0) - processed_response = self._process_full_text_response(response) - self.process_and_update_data(processed_response, collection) - print(f"Batch {page + 1} has been processed and updated") + def _process_rows_to_records(self, rows: list) -> list[dict]: + """ + Converts raw SQL row data into structured record dictionaries. - # Check if all rows have been fetched - if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count: - break + Args: + rows (list): List of rows, where each row is [url, full_text, title] - page += 1 - skip_records += page_size - - return f"All {total_row_count} records have been processed and updated." - - def process_and_update_data(self, batch_data, collection): - for record in batch_data: - try: - with transaction.atomic(): - url = record["url"] - scraped_text = record.get("full_text", "") - scraped_title = record.get("title", "") - DumpUrl.objects.update_or_create( - url=url, - defaults={ - "scraped_text": scraped_text, - "scraped_title": scraped_title, - "collection": collection, - }, - ) - except KeyError as e: - print(f"Missing key in data: {str(e)}") - except Exception as e: - print(f"Error processing record: {str(e)}") - - def get_full_texts(self, collection_config_folder: str, source: str = None, collection=None) -> Any: + Returns: + list[dict]: List of processed records with url, full_text, and title keys + + Raises: + ValueError: If any row doesn't contain exactly 3 elements + """ + processed_records = [] + for idx, row in enumerate(rows): + if len(row) != 3: + raise ValueError( + f"Invalid row format at index {idx}: Expected exactly three elements (url, full_text, title). " + f"Received {len(row)} elements." + ) + processed_records.append({"url": row[0], "full_text": row[1], "title": row[2]}) + return processed_records + + def get_full_texts(self, collection_config_folder: str, source: str = None) -> Iterator[dict]: """ - Retrieves the full texts, URLs, and titles for a specified collection. + Retrieves and yields batches of text records from the SQL database for a given collection. + Uses pagination to handle large datasets efficiently. - Returns: - dict: A JSON response containing the results of the SQL query, - where each item has 'url', 'text', and 'title'. - - Example: - Calling get_full_texts("example_collection") might return: - [ - { - 'url': 'http://example.com/article1', - 'text': 'Here is the full text of the first article...', - 'title': 'Article One Title' - }, - { - 'url': 'http://example.com/article2', - 'text': 'Here is the full text of the second article...', - 'title': 'Article Two Title' - } - ] + Args: + collection_config_folder (str): The collection folder to query (e.g., "EARTHDATA", "SMD") + source (str, optional): The source to query. If None, defaults to "scrapers" for dev servers + or "SDE" for other servers. + + Yields: + list[dict]: Batches of records, where each record is a dictionary containing: + { + "url": str, # The URL of the document + "full_text": str, # The full text content of the document + "title": str # The title of the document + } + + Raises: + ValueError: If the server's index is not defined in its configuration + + Example batch: + [ + { + "url": "https://example.nasa.gov/doc1", + "full_text": "This is the content of doc1...", + "title": "Document 1 Title" + }, + { + "url": "https://example.nasa.gov/doc2", + "full_text": "This is the content of doc2...", + "title": "Document 2 Title" + } + ] + + Note: + - Results are paginated in batches of 5000 records + - Each batch is processed into clean dictionaries before being yielded + - The iterator will stop when either: + 1. No more rows are returned from the query + 2. The total count of records has been reached """ if not source: @@ -229,7 +241,28 @@ def get_full_texts(self, collection_config_folder: str, source: str = None, coll ) sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'" - return self.sql_query(sql, collection) + + page = 0 + page_size = 5000 + total_processed = 0 + + while True: + paginated_sql = f"{sql} SKIP {total_processed} COUNT {page_size}" + response = self._execute_sql_query(paginated_sql) + + rows = response.get("Rows", []) + if not rows: # Stop if we get an empty batch + break + + yield self._process_rows_to_records(rows) + + total_processed += len(rows) + total_count = response.get("TotalRowCount", 0) + + if total_processed >= total_count: # Stop if we've processed all records + break + + page += 1 @staticmethod def _process_full_text_response(batch_data: dict): diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 188147ac..8d4a4c4d 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -7,6 +7,7 @@ from django.conf import settings from django.core import management from django.core.management.commands import loaddata +from django.db import transaction from config import celery_app @@ -147,26 +148,43 @@ def resolve_title_pattern(title_pattern_id): @celery_app.task(soft_time_limit=600) def fetch_and_replace_full_text(collection_id, server_name): """ - Task to initiate fetching and replacing full text and metadata for all URLs associated with a specified collection - from a given server. - Args: - collection_id (int): The identifier for the collection in the database. - server_name (str): The name of the server. - - Returns: - str: A message indicating the result of the operation, including the number of URLs processed. + Task to fetch and replace full text and metadata for a collection. + Handles data in batches to manage memory usage. """ collection = Collection.objects.get(id=collection_id) api = Api(server_name) - # Step 1: Delete all existing DumpUrl entries for the collection + # Step 1: Delete existing DumpUrl entries deleted_count, _ = DumpUrl.objects.filter(collection=collection).delete() print(f"Deleted {deleted_count} old records.") - # Step 2: Fetch and process new data - result_message = api.get_full_texts(collection.config_folder, collection=collection) - - # Step 3: Migrate DumpUrl to DeltaUrl - collection.migrate_dump_to_delta() - - return result_message + # Step 2: Process data in batches + total_processed = 0 + + try: + for batch in api.get_full_texts(collection.config_folder): + # Use bulk_create for efficiency, with a transaction per batch + with transaction.atomic(): + DumpUrl.objects.bulk_create( + [ + DumpUrl( + url=record["url"], + collection=collection, + scraped_text=record["full_text"], + scraped_title=record["title"], + ) + for record in batch + ] + ) + + total_processed += len(batch) + print(f"Processed batch of {len(batch)} records. Total: {total_processed}") + + # Step 3: Migrate dump URLs to delta URLs + collection.migrate_dump_to_delta() + + return f"Successfully processed {total_processed} records and updated the database." + + except Exception as e: + print(f"Error processing records: {str(e)}") + raise diff --git a/sde_collections/tests/api_tests.py b/sde_collections/tests/api_tests.py index 8abb5f08..88a0f44f 100644 --- a/sde_collections/tests/api_tests.py +++ b/sde_collections/tests/api_tests.py @@ -5,9 +5,7 @@ from django.utils import timezone from sde_collections.models.collection import WorkflowStatusChoices -from sde_collections.models.delta_url import DumpUrl from sde_collections.sinequa_api import Api -from sde_collections.tasks import fetch_and_replace_full_text from sde_collections.tests.factories import CollectionFactory, UserFactory @@ -69,135 +67,96 @@ def test_query(self, mock_process_response, api_instance): response = api_instance.query(page=1, collection_config_folder="folder") assert response == {"result": "success"} - @patch("sde_collections.sinequa_api.Api.process_response") - def test_sql_query(self, mock_process_response, api_instance, collection): - """Test SQL query execution and response processing.""" - mock_process_response.return_value = { - "Rows": [{"url": "http://example.com", "full_text": "Text", "title": "Title"}], - "TotalRowCount": 1, - } - response = api_instance.sql_query("SELECT * FROM test_index", collection) - assert response == "All 1 records have been processed and updated." + def test_process_rows_to_records(self, api_instance): + """Test processing row data into record dictionaries.""" + # Test valid input + valid_rows = [["http://example.com/1", "Text 1", "Title 1"], ["http://example.com/2", "Text 2", "Title 2"]] + expected_output = [ + {"url": "http://example.com/1", "full_text": "Text 1", "title": "Title 1"}, + {"url": "http://example.com/2", "full_text": "Text 2", "title": "Title 2"}, + ] + assert api_instance._process_rows_to_records(valid_rows) == expected_output + + # Test invalid row length + invalid_rows = [["http://example.com", "Text"]] # Missing title + with pytest.raises(ValueError, match="Invalid row format at index 0"): + api_instance._process_rows_to_records(invalid_rows) @patch("sde_collections.sinequa_api.Api.process_response") - def test_get_full_texts(self, mock_process_response, api_instance, collection): - """Test fetching full texts from the API.""" - mock_process_response.return_value = { - "Rows": [{"url": "http://example.com", "text": "Example text", "title": "Example title"}] - } - response = api_instance.get_full_texts( - collection_config_folder="folder", source="source", collection=collection - ) - assert response == "All 0 records have been processed and updated." - - def test_process_and_update_data(self, api_instance, collection): - """Test processing and updating data in the database.""" - batch_data = [{"url": "http://example.com", "full_text": "Example text", "title": "Example title"}] - api_instance.process_and_update_data(batch_data, collection) - dump_urls = DumpUrl.objects.filter(collection=collection) - assert dump_urls.count() == 1 - assert dump_urls.first().url == "http://example.com" - - @patch("sde_collections.sinequa_api.Api.sql_query") - @patch("sde_collections.models.collection.Collection.migrate_dump_to_delta") - def test_fetch_and_replace_full_text(self, mock_migrate, mock_sql_query, collection): - """Test the fetch_and_replace_full_text Celery task.""" - with patch( - "sde_collections.sinequa_api.server_configs", + def test_execute_sql_query(self, mock_process_response, api_instance): + """Test SQL query execution.""" + mock_process_response.return_value = {"Rows": [], "TotalRowCount": 0} + + # Test successful query + result = api_instance._execute_sql_query("SELECT * FROM test") + assert result == {"Rows": [], "TotalRowCount": 0} + + # Test query with missing token + api_instance._provided_token = None + with pytest.raises(ValueError, match="Token is required"): + api_instance._execute_sql_query("SELECT * FROM test") + + @patch("sde_collections.sinequa_api.Api._execute_sql_query") + def test_get_full_texts_pagination(self, mock_execute_sql, api_instance): + """Test that get_full_texts correctly handles pagination.""" + # Mock responses for two pages of results + mock_execute_sql.side_effect = [ { - "test_server": { - "app_name": "test_app", - "query_name": "test_query", - "base_url": "http://testserver.com/api", - "index": "test_index", - } + "Rows": [["http://example.com/1", "Text 1", "Title 1"], ["http://example.com/2", "Text 2", "Title 2"]], + "TotalRowCount": 3, }, - ): - mock_sql_query.return_value = "All records processed" - mock_migrate.return_value = None - - result = fetch_and_replace_full_text(collection.id, "test_server") - assert result == "All records processed" - mock_migrate.assert_called_once() - - @patch( - "sde_collections.sinequa_api.server_configs", - { - "test_server": { - "app_name": "test_app", - "query_name": "test_query", - "base_url": "http://testserver.com/api", - "index": "test_index", - } - }, - ) + {"Rows": [["http://example.com/3", "Text 3", "Title 3"]], "TotalRowCount": 3}, + {"Rows": [], "TotalRowCount": 3}, + ] + + # Collect all batches from the iterator + batches = list(api_instance.get_full_texts("test_folder")) + + assert len(batches) == 2 # Should have two batches + assert len(batches[0]) == 2 # First batch has 2 records + assert len(batches[1]) == 1 # Second batch has 1 record + + # Verify content of first batch + assert batches[0] == [ + {"url": "http://example.com/1", "full_text": "Text 1", "title": "Title 1"}, + {"url": "http://example.com/2", "full_text": "Text 2", "title": "Title 2"}, + ] + + # Verify content of second batch + assert batches[1] == [{"url": "http://example.com/3", "full_text": "Text 3", "title": "Title 3"}] + + def test_get_full_texts_missing_index(self, api_instance): + """Test that get_full_texts raises error when index is missing from config.""" + api_instance.config.pop("index", None) + with pytest.raises(ValueError, match="Index not defined for server"): + next(api_instance.get_full_texts("test_folder")) + @pytest.mark.parametrize( - "server_name, user, password, expected", - [("test_server", "user1", "pass1", True), ("invalid_server", None, None, False)], + "server_name,expect_auth", + [ + ("xli", True), # dev server should have auth + ("production", False), # prod server should not have auth + ], ) - def test_api_init(self, server_name, user, password, expected): - """Test API initialization with valid and invalid server names.""" - if expected: - api = Api(server_name=server_name, user=user, password=password) - assert api.server_name == server_name - else: - with pytest.raises(ValueError): - Api(server_name=server_name) - @patch("requests.post") - def test_query_dev_server_authentication(self, mock_post, api_instance): - """Test query on dev servers requiring authentication.""" - api_instance.server_name = "xli" + def test_query_authentication(self, mock_post, server_name, expect_auth, api_instance): + """Test authentication handling for different server types.""" + api_instance.server_name = server_name mock_post.return_value = MagicMock(status_code=200, json=lambda: {"result": "success"}) response = api_instance.query(page=1, collection_config_folder="folder") assert response == {"result": "success"} - # Extract URL from call_args (positional arguments) - called_url = mock_post.call_args[0][0] # URL is the first positional argument - assert "?Password=test_pass&User=test_user" in called_url + called_url = mock_post.call_args[0][0] + auth_present = "?Password=test_pass&User=test_user" in called_url + assert auth_present == expect_auth - @patch("sde_collections.sinequa_api.Api.process_response") - def test_sql_query_pagination(self, mock_process_response, api_instance, collection): - """Test SQL query with pagination.""" - mock_process_response.side_effect = [ - {"Rows": [{"url": "http://example.com/1", "full_text": "Text 1", "title": "Title 1"}], "TotalRowCount": 6}, - {"Rows": [{"url": "http://example.com/2", "full_text": "Text 2", "title": "Title 2"}], "TotalRowCount": 6}, - {"Rows": [], "TotalRowCount": 6}, - ] + @patch("requests.post") + def test_query_dev_server_missing_credentials(self, mock_post, api_instance): + """Test that dev servers raise error when credentials are missing.""" + api_instance.server_name = "xli" + api_instance._provided_user = None + api_instance._provided_password = None - result = api_instance.sql_query("SELECT * FROM test_index", collection) - assert result == "All 6 records have been processed and updated." - - def test_process_full_text_response(self, api_instance): - """Test that _process_full_text_response correctly processes the data.""" - batch_data = { - "Rows": [ - ["http://example.com", "Example text", "Example title"], - ["http://example.net", "Another text", "Another title"], - ] - } - expected_output = [ - {"url": "http://example.com", "full_text": "Example text", "title": "Example title"}, - {"url": "http://example.net", "full_text": "Another text", "title": "Another title"}, - ] - result = api_instance._process_full_text_response(batch_data) - assert result == expected_output - - def test_process_full_text_response_with_invalid_data(self, api_instance): - """Test that _process_full_text_response raises an error with invalid data.""" - # Test for missing 'Rows' key - invalid_data_no_rows = {} # No 'Rows' key - with pytest.raises(ValueError, match="Expected 'Rows' key with a list of data"): - api_instance._process_full_text_response(invalid_data_no_rows) - - # Test for incorrect row length - invalid_data_wrong_length = {"Rows": [["http://example.com", "Example text"]]} # Missing 'title' - with pytest.raises(ValueError, match="Each row must contain exactly three elements"): - api_instance._process_full_text_response(invalid_data_wrong_length) - - @patch("sde_collections.sinequa_api.Api._get_token", return_value=None) - def test_sql_query_missing_token(self, mock_get_token, api_instance, collection): - """Test that sql_query raises an error when no token is provided.""" - with pytest.raises(ValueError, match="A token is required to use the SQL endpoint"): - api_instance.sql_query("SELECT * FROM test_table", collection) + with pytest.raises(ValueError, match="Authentication error: Missing credentials for dev server"): + api_instance.query(page=1) diff --git a/sde_collections/tests/test_import_fulltexts.py b/sde_collections/tests/test_import_fulltexts.py index b4256bde..d39f1633 100644 --- a/sde_collections/tests/test_import_fulltexts.py +++ b/sde_collections/tests/test_import_fulltexts.py @@ -4,39 +4,69 @@ import pytest -from sde_collections.models.delta_url import CuratedUrl, DeltaUrl, DumpUrl +from sde_collections.models.delta_url import DeltaUrl, DumpUrl from sde_collections.tasks import fetch_and_replace_full_text from sde_collections.tests.factories import CollectionFactory @pytest.mark.django_db def test_fetch_and_replace_full_text(): - # Create a test collection - collection = CollectionFactory() + collection = CollectionFactory(config_folder="test_folder") - # Mock API response - mock_documents = [ + mock_batch = [ {"url": "http://example.com/1", "full_text": "Test Text 1", "title": "Test Title 1"}, {"url": "http://example.com/2", "full_text": "Test Text 2", "title": "Test Title 2"}, ] + def mock_generator(): + yield mock_batch + with patch("sde_collections.sinequa_api.Api.get_full_texts") as mock_get_full_texts: - mock_get_full_texts.return_value = mock_documents + mock_get_full_texts.return_value = mock_generator() - # Call the function fetch_and_replace_full_text(collection.id, "lrm_dev") - # Assertions assert DumpUrl.objects.filter(collection=collection).count() == 0 - assert DeltaUrl.objects.filter(collection=collection).count() == len(mock_documents) - assert CuratedUrl.objects.filter(collection=collection).count() == 0 - - for doc in mock_documents: - assert ( - DeltaUrl.objects.filter(collection=collection) - .filter( - url=doc["url"], - scraped_text=doc["full_text"], - ) - .exists() - ) + assert DeltaUrl.objects.filter(collection=collection).count() == 2 + + +@pytest.mark.django_db +def test_fetch_and_replace_full_text_large_dataset(): + """Test processing a large number of records with proper pagination and batching.""" + collection = CollectionFactory(config_folder="test_folder") + + # Create sample data - 20,000 records in total + def create_batch(start_idx, size): + return [ + {"url": f"http://example.com/{i}", "full_text": f"Test Text {i}", "title": f"Test Title {i}"} + for i in range(start_idx, start_idx + size) + ] + + # Mock the API to return data in batches of 5000 (matching actual API pagination) + def mock_batch_generator(): + batch_size = 5000 + total_records = 20000 + + for start in range(0, total_records, batch_size): + yield create_batch(start, min(batch_size, total_records - start)) + + with patch("sde_collections.sinequa_api.Api.get_full_texts") as mock_get_full_texts: + mock_get_full_texts.return_value = mock_batch_generator() + + # Execute the task + result = fetch_and_replace_full_text(collection.id, "lrm_dev") + + # Verify total number of records + assert DeltaUrl.objects.filter(collection=collection).count() == 20000 + + # Verify some random records exist and have correct data + for i in [0, 4999, 5000, 19999]: # Check boundaries and middle + url = DeltaUrl.objects.get(url=f"http://example.com/{i}") + assert url.scraped_text == f"Test Text {i}" + assert url.scraped_title == f"Test Title {i}" + + # Verify batch processing worked by checking the success message + assert "Successfully processed 20000 records" in result + + # Verify no DumpUrls remain (should all be migrated to DeltaUrls) + assert DumpUrl.objects.filter(collection=collection).count() == 0