diff --git a/.envs/.local/.django b/.envs/.local/.django index 0978166d..9e7b56c4 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -46,4 +46,4 @@ LRM_QA_PASSWORD='' #Server Tokens #-------------------------------------------------------------------------------- LRM_DEV_TOKEN='' -XLI_TOKEN='' +XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI' diff --git a/local.yml b/local.yml index ebdb810b..84893914 100644 --- a/local.yml +++ b/local.yml @@ -54,14 +54,23 @@ services: container_name: sde_indexing_helper_local_redis celeryworker: - <<: *django - image: sde_indexing_helper_local_celeryworker - container_name: sde_indexing_helper_local_celeryworker - depends_on: - - redis - - postgres - ports: [] - command: /start-celeryworker + <<: *django + image: sde_indexing_helper_local_celeryworker + container_name: sde_indexing_helper_local_celeryworker + depends_on: + - redis + - postgres + ports: [] + command: /start-celeryworker + deploy: + resources: + limits: + cpus: '4.0' + memory: 8G + reservations: + cpus: '2.0' + memory: 4G + # celerybeat: # <<: *django diff --git a/sde_collections/sinequa_api.py b/sde_collections/sinequa_api.py index 868afb77..f3e7dda3 100644 --- a/sde_collections/sinequa_api.py +++ b/sde_collections/sinequa_api.py @@ -1,10 +1,10 @@ import json from typing import Any - import requests import urllib3 from django.conf import settings - +from .models.delta_url import DumpUrl +from django.db import transaction urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) server_configs = { @@ -134,26 +134,64 @@ def query(self, page: int, collection_config_folder: str | None = None, source: payload["query"]["advanced"]["collection"] = f"/{source}/{collection_config_folder}/" return self.process_response(url, payload) - - def sql_query(self, sql: str) -> Any: - """Executes an SQL query on the configured server using token-based authentication.""" + def sql_query(self, sql: str, collection) -> Any: + """Executes an SQL query on the configured server using token-based authentication with pagination.""" token = self._get_token() if not token: raise ValueError("A token is required to use the SQL endpoint") - - url = f"{self.base_url}/api/v1/engine.sql" - headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} - raw_payload = json.dumps( - { + + page = 0 + page_size = 5000 # Number of records per page + skip_records = 0 + + while True: + paginated_sql = f"{sql} SKIP {skip_records} COUNT {page_size}" + url = f"{self.base_url}/api/v1/engine.sql" + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} + raw_payload = json.dumps({ "method": "engine.sql", - "sql": sql, + "sql": paginated_sql, "pretty": True, - } - ) - - return self.process_response(url, headers=headers, raw_data=raw_payload) - - def get_full_texts(self, collection_config_folder: str, source: str = None) -> Any: + }) + + response = self.process_response(url, headers=headers, raw_data=raw_payload) + batch_data = response.get('Rows', []) + total_row_count = response.get('TotalRowCount', 0) + processed_response = self._process_full_text_response(response) + self.process_and_update_data(processed_response, collection) + print(f"Batch {page + 1} is being processed and updated") + + # Check if all rows have been fetched + if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count: + break + + page += 1 + skip_records += page_size + + return f"All {total_row_count} records have been processed and updated." + + def process_and_update_data(self, batch_data, collection): + for record in batch_data: + try: + with transaction.atomic(): + url = record['url'] + scraped_text = record.get('full_text', '') + scraped_title = record.get('title', '') + # Ensure the collection is included in the defaults + DumpUrl.objects.update_or_create( + url=url, + defaults={ + 'scraped_text': scraped_text, + 'scraped_title': scraped_title, + 'collection': collection + } + ) + except KeyError as e: + print(f"Missing key in data: {str(e)}") + except Exception as e: + print(f"Error processing record: {str(e)}") + + def get_full_texts(self, collection_config_folder: str, source: str = None, collection=None) -> Any: """ Retrieves the full texts, URLs, and titles for a specified collection. @@ -184,11 +222,10 @@ def get_full_texts(self, collection_config_folder: str, source: str = None) -> A raise ValueError("Index not defined for this server") sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'" - full_text_response = self.sql_query(sql) - return self._process_full_text_response(full_text_response) - + return self.sql_query(sql,collection) @staticmethod - def _process_full_text_response(full_text_response: str): + def _process_full_text_response(batch_data:str): return [ - {"url": url, "full_text": full_text, "title": title} for url, full_text, title in full_text_response["Rows"] + {"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"] ] + \ No newline at end of file diff --git a/sde_collections/tasks.py b/sde_collections/tasks.py index 9555de30..1b65233e 100644 --- a/sde_collections/tasks.py +++ b/sde_collections/tasks.py @@ -232,13 +232,11 @@ def resolve_title_pattern(title_pattern_id): title_pattern.apply() -@celery_app.task +@celery_app.task(soft_time_limit=600) def fetch_and_replace_full_text(collection_id, server_name): """ - Task to fetch and replace full text and metadata for all URLs associated with a specified collection - from a given server. This task deletes all existing DumpUrl entries for the collection and creates - new entries based on the latest fetched data. - + Task to initiate fetching and replacing full text and metadata for all URLs associated with a specified collection + from a given server. Args: collection_id (int): The identifier for the collection in the database. server_name (str): The name of the server. @@ -248,28 +246,15 @@ def fetch_and_replace_full_text(collection_id, server_name): """ collection = Collection.objects.get(id=collection_id) api = Api(server_name) - documents = api.get_full_texts(collection.config_folder) # Step 1: Delete all existing DumpUrl entries for the collection deleted_count, _ = DumpUrl.objects.filter(collection=collection).delete() + print(f"Deleted {deleted_count} old records.") - # Step 2: Create new DumpUrl entries from the fetched documents - processed_count = 0 - for doc in documents: - try: - DumpUrl.objects.create( - url=doc["url"], - collection=collection, - scraped_text=doc.get("full_text", ""), - scraped_title=doc.get("title", ""), - ) - processed_count += 1 - except IntegrityError: - # Handle duplicate URL case if needed - print(f"Duplicate URL found, skipping: {doc['url']}") + # Step 2: Fetch and process new data + result_message = api.get_full_texts(collection.config_folder,collection=collection) + # Step 3: Migrate DumpUrl to DeltaUrl collection.migrate_dump_to_delta() - print(f"Processed {processed_count} new records.") - - return f"Successfully processed {len(documents)} records and updated the database." + return result_message