Skip to content

Commit

Permalink
Fixes #1097
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Nov 21, 2024
1 parent 715a571 commit 8bfd881
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .envs/.local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ LRM_QA_PASSWORD=''
#Server Tokens
#--------------------------------------------------------------------------------
LRM_DEV_TOKEN=''
XLI_TOKEN=''
XLI_TOKEN='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJTaW5lcXVhIiwiaWF0IjoxNzI3OTAzMzAzLCJzaWQiOiJCRDkwN0Q4QzJCMjg0MDA2ODQ5OEZFOENCRjdEODQwNiIsImtpbmQiOiJhY2Nlc3MiLCJleHAiOjE3MzU2NzkzMDMsInN1YiI6IlNpbmVxdWF8Z3JhX3VzZXJzIn0.o1a3eDPgEWdoHu7S8KQi0wMw_brxfAM1lClbfncVQVI'
25 changes: 17 additions & 8 deletions local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,23 @@ services:
container_name: sde_indexing_helper_local_redis

celeryworker:
<<: *django
image: sde_indexing_helper_local_celeryworker
container_name: sde_indexing_helper_local_celeryworker
depends_on:
- redis
- postgres
ports: []
command: /start-celeryworker
<<: *django
image: sde_indexing_helper_local_celeryworker
container_name: sde_indexing_helper_local_celeryworker
depends_on:
- redis
- postgres
ports: []
command: /start-celeryworker
deploy:
resources:
limits:
cpus: '4.0'
memory: 8G
reservations:
cpus: '2.0'
memory: 4G


# celerybeat:
# <<: *django
Expand Down
81 changes: 59 additions & 22 deletions sde_collections/sinequa_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
from typing import Any

import requests
import urllib3
from django.conf import settings

from .models.delta_url import DumpUrl
from django.db import transaction
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

server_configs = {
Expand Down Expand Up @@ -134,26 +134,64 @@ def query(self, page: int, collection_config_folder: str | None = None, source:
payload["query"]["advanced"]["collection"] = f"/{source}/{collection_config_folder}/"

return self.process_response(url, payload)

def sql_query(self, sql: str) -> Any:
"""Executes an SQL query on the configured server using token-based authentication."""
def sql_query(self, sql: str, collection) -> Any:
"""Executes an SQL query on the configured server using token-based authentication with pagination."""
token = self._get_token()
if not token:
raise ValueError("A token is required to use the SQL endpoint")

url = f"{self.base_url}/api/v1/engine.sql"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
raw_payload = json.dumps(
{

page = 0
page_size = 5000 # Number of records per page
skip_records = 0

while True:
paginated_sql = f"{sql} SKIP {skip_records} COUNT {page_size}"
url = f"{self.base_url}/api/v1/engine.sql"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
raw_payload = json.dumps({
"method": "engine.sql",
"sql": sql,
"sql": paginated_sql,
"pretty": True,
}
)

return self.process_response(url, headers=headers, raw_data=raw_payload)

def get_full_texts(self, collection_config_folder: str, source: str = None) -> Any:
})

response = self.process_response(url, headers=headers, raw_data=raw_payload)
batch_data = response.get('Rows', [])
total_row_count = response.get('TotalRowCount', 0)
processed_response = self._process_full_text_response(response)
self.process_and_update_data(processed_response, collection)
print(f"Batch {page + 1} is being processed and updated")

# Check if all rows have been fetched
if len(batch_data) == 0 or (skip_records + page_size) >= total_row_count:
break

page += 1
skip_records += page_size

return f"All {total_row_count} records have been processed and updated."

def process_and_update_data(self, batch_data, collection):
for record in batch_data:
try:
with transaction.atomic():
url = record['url']
scraped_text = record.get('full_text', '')
scraped_title = record.get('title', '')
# Ensure the collection is included in the defaults
DumpUrl.objects.update_or_create(
url=url,
defaults={
'scraped_text': scraped_text,
'scraped_title': scraped_title,
'collection': collection
}
)
except KeyError as e:
print(f"Missing key in data: {str(e)}")
except Exception as e:
print(f"Error processing record: {str(e)}")

def get_full_texts(self, collection_config_folder: str, source: str = None, collection=None) -> Any:
"""
Retrieves the full texts, URLs, and titles for a specified collection.
Expand Down Expand Up @@ -184,11 +222,10 @@ def get_full_texts(self, collection_config_folder: str, source: str = None) -> A
raise ValueError("Index not defined for this server")

sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'"
full_text_response = self.sql_query(sql)
return self._process_full_text_response(full_text_response)

return self.sql_query(sql,collection)
@staticmethod
def _process_full_text_response(full_text_response: str):
def _process_full_text_response(batch_data:str):
return [
{"url": url, "full_text": full_text, "title": title} for url, full_text, title in full_text_response["Rows"]
{"url": url, "full_text": full_text, "title": title} for url, full_text, title in batch_data["Rows"]
]

31 changes: 8 additions & 23 deletions sde_collections/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,11 @@ def resolve_title_pattern(title_pattern_id):
title_pattern.apply()


@celery_app.task
@celery_app.task(soft_time_limit=600)
def fetch_and_replace_full_text(collection_id, server_name):
"""
Task to fetch and replace full text and metadata for all URLs associated with a specified collection
from a given server. This task deletes all existing DumpUrl entries for the collection and creates
new entries based on the latest fetched data.
Task to initiate fetching and replacing full text and metadata for all URLs associated with a specified collection
from a given server.
Args:
collection_id (int): The identifier for the collection in the database.
server_name (str): The name of the server.
Expand All @@ -248,28 +246,15 @@ def fetch_and_replace_full_text(collection_id, server_name):
"""
collection = Collection.objects.get(id=collection_id)
api = Api(server_name)
documents = api.get_full_texts(collection.config_folder)

# Step 1: Delete all existing DumpUrl entries for the collection
deleted_count, _ = DumpUrl.objects.filter(collection=collection).delete()
print(f"Deleted {deleted_count} old records.")

# Step 2: Create new DumpUrl entries from the fetched documents
processed_count = 0
for doc in documents:
try:
DumpUrl.objects.create(
url=doc["url"],
collection=collection,
scraped_text=doc.get("full_text", ""),
scraped_title=doc.get("title", ""),
)
processed_count += 1
except IntegrityError:
# Handle duplicate URL case if needed
print(f"Duplicate URL found, skipping: {doc['url']}")
# Step 2: Fetch and process new data
result_message = api.get_full_texts(collection.config_folder,collection=collection)

# Step 3: Migrate DumpUrl to DeltaUrl
collection.migrate_dump_to_delta()

print(f"Processed {processed_count} new records.")

return f"Successfully processed {len(documents)} records and updated the database."
return result_message

0 comments on commit 8bfd881

Please sign in to comment.