From b58e360001860ed648ed12cd4870136572f625a4 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Tue, 5 Nov 2024 15:20:02 +0100 Subject: [PATCH 01/22] Added script to allow removal of topics associated with a keyword - This will then re-index the affected documents in ES accordingly to the resulting changes. --- .../update_es_based_on_url_keyword.py | 142 ++++++++++++++++++ zeeguu/core/elastic/indexing.py | 3 +- 2 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 tools/es_v8_migration/update_es_based_on_url_keyword.py diff --git a/tools/es_v8_migration/update_es_based_on_url_keyword.py b/tools/es_v8_migration/update_es_based_on_url_keyword.py new file mode 100644 index 00000000..cfb6a975 --- /dev/null +++ b/tools/es_v8_migration/update_es_based_on_url_keyword.py @@ -0,0 +1,142 @@ +""" + This script expects the following parameters: + + - URL_KEYWORD_TO_UPDATE (str): the keyword we seek to update the ES + - DELETE_ARTICLE_NEW_TOPICS (bool): if we should delete the current new topics for + the articles containing the URL_KEYWORD_TO_UPDATE. e.g. we note that there is a + keyword wrongly associated with a specific topic. + - ITERATION_STEP (int): number of articles to index in each loop. +""" + +URL_KEYWORD_TO_UPDATE = "vejret" +DELETE_ARTICLE_NEW_TOPICS = True +ITERATION_STEP = 100 + + +# coding=utf-8 +from zeeguu.core.elastic.indexing import ( + create_or_update_bulk_docs, +) +from sqlalchemy import func +from elasticsearch import Elasticsearch, helpers +from elasticsearch.helpers import bulk +import zeeguu.core +from zeeguu.core.model import Article +from datetime import datetime +from sqlalchemy.orm.exc import NoResultFound +from zeeguu.api.app import create_app + +from zeeguu.core.model import ( + Topic, + ArticleUrlKeywordMap, + UrlKeyword, + NewArticleTopicMap, +) +from zeeguu.core.model.article import article_topic_map +from zeeguu.core.elastic.settings import ES_CONN_STRING +import numpy as np +from tqdm import tqdm + +app = create_app() +app.app_context().push() + + +print(ES_CONN_STRING) +es = Elasticsearch(ES_CONN_STRING) +db_session = zeeguu.core.model.db.session +print(es.info()) + + +def find_topics(article_id, session): + article_topic = ( + session.query(Topic) + .join(article_topic_map) + .filter(article_topic_map.c.article_id == article_id) + ) + topics = "" + for t in article_topic: + topics = topics + str(t.title) + " " + + return topics.rstrip() + + +def main(): + def fetch_articles_by_id(id_list): + for i in id_list: + try: + article = Article.find_by_id(i) + if not article: + print(f"Skipped for: '{i}', article not in DB.") + continue + topics = find_topics(article.id, db_session) + yield (article, topics) + except NoResultFound: + print(f"fail for: '{i}'") + except Exception as e: + print(f"fail for: '{i}', {e}") + + def gen_docs(articles_w_topics): + for article, topics in articles_w_topics: + try: + yield create_or_update_bulk_docs(article, db_session, topics) + except Exception as e: + print(f"fail for: '{article.id}', {e}") + + # Get the articles for the url_keyword + target_ids = np.array( + [ + a_id[0] + for a_id in db_session.query(Article.id) + .join(ArticleUrlKeywordMap) + .join(UrlKeyword) + .filter(UrlKeyword.keyword == URL_KEYWORD_TO_UPDATE) + .distinct() + ] + ) + + print( + f"Got articles with url_keyword '{URL_KEYWORD_TO_UPDATE}', total: {len(target_ids)}", + ) + + if DELETE_ARTICLE_NEW_TOPICS: + print( + f"Deleting new_topics for articles with the keyword: '{URL_KEYWORD_TO_UPDATE}'" + ) + articles_to_delete = NewArticleTopicMap.query.filter( + NewArticleTopicMap.article_id.in_(list(target_ids)) + ) + articles_to_delete.delete() + db_session.commit() + + if len(target_ids) == 0: + print("No articles found! Exiting...") + return + + # I noticed that if a document is not added then it won't let me query the ES search. + total_added = 0 + errors_encountered = [] + + for i_start in tqdm(range(0, len(target_ids), ITERATION_STEP)): + batch = target_ids[i_start : i_start + ITERATION_STEP] + res_bulk, error_bulk = bulk( + es, gen_docs(fetch_articles_by_id(batch)), raise_on_error=False + ) + total_added += res_bulk + errors_encountered += error_bulk + total_bulk_errors = len(error_bulk) + if total_bulk_errors > 0: + print(f"## Warning, {total_bulk_errors} failed to index. With errors: ") + print(error_bulk) + db_session.commit() + print(f"Batch finished. ADDED/UPDATED:{res_bulk} | ERRORS: {total_bulk_errors}") + print(errors_encountered) + print(f"Total articles added/updated: {total_added}") + + +if __name__ == "__main__": + start = datetime.now() + print(f"Started at: {start}") + main() + end = datetime.now() + print(f"Ended at: {end}") + print(f"Process took: {end-start}") diff --git a/zeeguu/core/elastic/indexing.py b/zeeguu/core/elastic/indexing.py index 3c6d8eb7..116c5f43 100644 --- a/zeeguu/core/elastic/indexing.py +++ b/zeeguu/core/elastic/indexing.py @@ -117,11 +117,12 @@ def create_or_update_bulk_docs(article, session, topics=None): doc = {} doc["_id"] = article.id doc["_index"] = ES_ZINDEX - doc["_source"] = doc_data if es.exists(index=ES_ZINDEX, id=article.id): doc["_op_type"] = "update" + doc["_source"] = {"doc": doc_data} else: doc["_op_type"] = "create" + doc["_source"] = doc_data return doc From 0d739ee5f07571a4c58e6afd4c4ba2a9261cfc00 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Tue, 5 Nov 2024 15:20:42 +0100 Subject: [PATCH 02/22] Renamed a method to be more specific - Reflect the action of the method. --- tools/es_v8_migration/set_new_topics_from_url_keyword.py | 2 +- zeeguu/core/model/article.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/es_v8_migration/set_new_topics_from_url_keyword.py b/tools/es_v8_migration/set_new_topics_from_url_keyword.py index f2a385d5..d42a9ccd 100644 --- a/tools/es_v8_migration/set_new_topics_from_url_keyword.py +++ b/tools/es_v8_migration/set_new_topics_from_url_keyword.py @@ -53,7 +53,7 @@ continue topics_added.add(topic.id) topics.append(topic) - article.set_new_topics(topics, db_session) + article.set_new_topics_url_keywords(topics, db_session) except Exception as e: counter -= 1 print(f"Failed for article id: {a_id}, with: {e}") diff --git a/zeeguu/core/model/article.py b/zeeguu/core/model/article.py index 49408881..d4edbc78 100644 --- a/zeeguu/core/model/article.py +++ b/zeeguu/core/model/article.py @@ -308,7 +308,7 @@ def add_new_topic(self, new_topic, session, origin_type: TopicOriginType): ) session.add(t) - def set_new_topics(self, topics, session): + def set_new_topics_url_keywords(self, topics, session): for t in topics: self.add_new_topic(t, session, TopicOriginType.URL_PARSED.value) From 286f83d91a8af2aa18935641f5c6a12a4cc41f59 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Tue, 5 Nov 2024 15:41:06 +0100 Subject: [PATCH 03/22] Moved script to tools - This tool can be used to re-update any articles that might get new mappings in the future, so it's not just related with migration. --- tools/{es_v8_migration => }/update_es_based_on_url_keyword.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tools/{es_v8_migration => }/update_es_based_on_url_keyword.py (100%) diff --git a/tools/es_v8_migration/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py similarity index 100% rename from tools/es_v8_migration/update_es_based_on_url_keyword.py rename to tools/update_es_based_on_url_keyword.py From 80762724089533407f8095496302e1d606579c1d Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Tue, 5 Nov 2024 15:43:42 +0100 Subject: [PATCH 04/22] Remove the existing new_topic mapping from keyword --- tools/update_es_based_on_url_keyword.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index cfb6a975..8386d379 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -82,6 +82,14 @@ def gen_docs(articles_w_topics): except Exception as e: print(f"fail for: '{article.id}', {e}") + # Updating url_keyword new_topic mapping + # If we are deleting the current topics, it means the mapping should not be kept. + if DELETE_ARTICLE_NEW_TOPICS: + url_keywords = UrlKeyword.find_all_by_keyword(URL_KEYWORD_TO_UPDATE) + for u_key in url_keywords: + u_key.new_topic_id = None + db_session.commit() + # Get the articles for the url_keyword target_ids = np.array( [ From a26ef24ebe8e726253ac96b86a9473214499cecc Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Fri, 8 Nov 2024 13:37:32 +0100 Subject: [PATCH 05/22] Update mysql_to_elastic_new_topics.py - Removed redundant code in mysql -> es. --- tools/mysql_to_elastic_new_topics.py | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/tools/mysql_to_elastic_new_topics.py b/tools/mysql_to_elastic_new_topics.py index 6ae0ccdd..eda58638 100644 --- a/tools/mysql_to_elastic_new_topics.py +++ b/tools/mysql_to_elastic_new_topics.py @@ -14,7 +14,6 @@ from zeeguu.api.app import create_app from zeeguu.core.model import Topic, NewArticleTopicMap -from zeeguu.core.model.article import article_topic_map from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING from zeeguu.core.model.new_article_topic_map import TopicOriginType import numpy as np @@ -46,19 +45,6 @@ print(es.info()) -def find_topics(article_id, session): - article_topic = ( - session.query(Topic) - .join(article_topic_map) - .filter(article_topic_map.c.article_id == article_id) - ) - topics = "" - for t in article_topic: - topics = topics + str(t.title) + " " - - return topics.rstrip() - - def main(): if DELETE_INDEX: try: @@ -77,17 +63,16 @@ def fetch_articles_by_id(id_list): if not article: print(f"Skipped for: '{i}', article not in DB.") continue - topics = find_topics(article.id, db_session) - yield (article, topics) + yield article except NoResultFound: print(f"fail for: '{i}'") except Exception as e: print(f"fail for: '{i}', {e}") def gen_docs(articles_w_topics): - for article, topics in articles_w_topics: + for article in articles_w_topics: try: - yield create_or_update_bulk_docs(article, db_session, topics) + yield create_or_update_bulk_docs(article, db_session) except Exception as e: print(f"fail for: '{article.id}', {e}") From a84df0c3f9c838a3f6f258ef8c7887d565c3aeca Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Fri, 8 Nov 2024 14:13:50 +0100 Subject: [PATCH 06/22] Updated article.py - Renamed set_new_topics_url_keywords -> add_new_topics_from_url_keyword , to better reflect the purpose of the method - Added a method recalculate_topics_from_url_keywords to allow to recalculate the topics from the url_keywords. --- .../set_new_topics_from_url_keyword.py | 2 +- zeeguu/core/model/article.py | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/tools/es_v8_migration/set_new_topics_from_url_keyword.py b/tools/es_v8_migration/set_new_topics_from_url_keyword.py index d42a9ccd..3cbd12d9 100644 --- a/tools/es_v8_migration/set_new_topics_from_url_keyword.py +++ b/tools/es_v8_migration/set_new_topics_from_url_keyword.py @@ -53,7 +53,7 @@ continue topics_added.add(topic.id) topics.append(topic) - article.set_new_topics_url_keywords(topics, db_session) + article.add_new_topics_from_url_keyword(topics, db_session) except Exception as e: counter -= 1 print(f"Failed for article id: {a_id}, with: {e}") diff --git a/zeeguu/core/model/article.py b/zeeguu/core/model/article.py index d4edbc78..57a60a76 100644 --- a/zeeguu/core/model/article.py +++ b/zeeguu/core/model/article.py @@ -301,6 +301,19 @@ def is_owned_by(self, user): def add_topic(self, topic): self.topics.append(topic) + def recalculate_topics_from_url_keywords(self, session): + topics_added = set([t.new_topic.id for t in self.new_topics]) + topics = [] + for url_keyword in self.url_keywords: + topic = url_keyword.url_keyword.new_topic + if topic is None: + continue + if topic.id in topics_added: + continue + topics_added.add(topic.id) + topics.append(topic) + self.add_new_topics_from_url_keyword(topics, session) + def add_new_topic(self, new_topic, session, origin_type: TopicOriginType): t = NewArticleTopicMap( @@ -308,7 +321,7 @@ def add_new_topic(self, new_topic, session, origin_type: TopicOriginType): ) session.add(t) - def set_new_topics_url_keywords(self, topics, session): + def add_new_topics_from_url_keyword(self, topics, session): for t in topics: self.add_new_topic(t, session, TopicOriginType.URL_PARSED.value) From fb927249ff210b81895b36a1f34a392ce1bf44dc Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Fri, 8 Nov 2024 14:15:31 +0100 Subject: [PATCH 07/22] Update indexing.py - Improved the indexing process by checking if the document should compute embeddings. This should only happen for new articles, or articles that have updated their content. This will allow for faster updates in the future. --- zeeguu/core/elastic/indexing.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/zeeguu/core/elastic/indexing.py b/zeeguu/core/elastic/indexing.py index 116c5f43..8ae273d7 100644 --- a/zeeguu/core/elastic/indexing.py +++ b/zeeguu/core/elastic/indexing.py @@ -53,7 +53,7 @@ def find_filter_url_keywords(article_id, session): return topic_kewyords -def document_from_article(article, session, topics=None, is_v7=True): +def document_from_article(article, session, is_v7=True, current_doc=None): old_topics = find_topics(article.id, session) if is_v7: @@ -74,6 +74,13 @@ def document_from_article(article, session, topics=None, is_v7=True): } else: topics, topics_inferred = find_new_topics(article.id, session) + embedding_generation_required = current_doc is None + # Embeddings only need to be re-computed if the document + # doesn't exist or the text is updated. + # This is the most expensive operation in the indexing process, so it + # saves time by skipping it. + if current_doc is not None: + embedding_generation_required = current_doc["content"] != article.content doc = { "title": article.title, "author": article.authors, @@ -90,10 +97,14 @@ def document_from_article(article, session, topics=None, is_v7=True): "language": article.language.name, "fk_difficulty": article.fk_difficulty, "lr_difficulty": DifficultyLingoRank.value_for_article(article), - "sem_vec": get_embedding_from_article(article), "url": article.url.as_string(), "video": article.video, } + if not embedding_generation_required and current_doc is not None: + doc["sem_vec"] = current_doc["sem_vec"] + else: + doc["sem_vec"] = get_embedding_from_article(article) + return doc @@ -110,20 +121,24 @@ def create_or_update(article, session): return res -def create_or_update_bulk_docs(article, session, topics=None): +def create_or_update_bulk_docs(article, session): es = Elasticsearch(ES_CONN_STRING) es_version = int(es.info()["version"]["number"][0]) - doc_data = document_from_article(article, session, topics, is_v7=es_version == 7) + doc = {} doc["_id"] = article.id doc["_index"] = ES_ZINDEX if es.exists(index=ES_ZINDEX, id=article.id): + current_doc = es.get(index=ES_ZINDEX, id=article.id) + doc_data = document_from_article( + article, session, is_v7=es_version == 7, current_doc=current_doc["_source"] + ) doc["_op_type"] = "update" doc["_source"] = {"doc": doc_data} else: + doc_data = document_from_article(article, session, is_v7=es_version == 7) doc["_op_type"] = "create" doc["_source"] = doc_data - return doc From 8ba72b0552e9c9e84c9fff3d9ced549e74a07aaa Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Fri, 8 Nov 2024 14:21:00 +0100 Subject: [PATCH 08/22] Update mysql_to_elastic_new_topics.py - Remove unnecessary code --- tools/mysql_to_elastic_new_topics.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tools/mysql_to_elastic_new_topics.py b/tools/mysql_to_elastic_new_topics.py index eda58638..995cfa69 100644 --- a/tools/mysql_to_elastic_new_topics.py +++ b/tools/mysql_to_elastic_new_topics.py @@ -1,19 +1,16 @@ # coding=utf-8 from zeeguu.core.elastic.indexing import ( - create_or_update, - document_from_article, create_or_update_bulk_docs, ) -from sqlalchemy import func -from elasticsearch import Elasticsearch, helpers -from elasticsearch.helpers import bulk +from elasticsearch import Elasticsearch +from elasticsearch.helpers import bulk, scan import zeeguu.core from zeeguu.core.model import Article from datetime import datetime from sqlalchemy.orm.exc import NoResultFound from zeeguu.api.app import create_app -from zeeguu.core.model import Topic, NewArticleTopicMap +from zeeguu.core.model import NewArticleTopicMap from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING from zeeguu.core.model.new_article_topic_map import TopicOriginType import numpy as np @@ -36,8 +33,8 @@ # ITERATION_STEP - number of articles to index before reporting. Default: 1000 DELETE_INDEX = False INDEX_WITH_TOPIC_ONLY = True -TOTAL_ITEMS = 5000 -ITERATION_STEP = 1000 +TOTAL_ITEMS = 1000 +ITERATION_STEP = 100 print(ES_CONN_STRING) es = Elasticsearch(ES_CONN_STRING) @@ -112,9 +109,11 @@ def gen_docs(articles_w_topics): if len(target_ids) == 0: print("No articles found! Exiting...") return - target_ids_not_in_es = list( - filter(lambda x: not es.exists(index=ES_ZINDEX, id=x), target_ids) + es_query = {"query": {"match_all": {}}} + ids_in_es = set( + [int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)] ) + target_ids_not_in_es = list(filter(lambda x: x not in ids_in_es, target_ids)) print("Total articles missing: ", len(target_ids_not_in_es)) # I noticed that if a document is not added then it won't let me query the ES search. From 9a99168c6e1482cab65fa129e52474c6cf978b84 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Fri, 8 Nov 2024 14:25:16 +0100 Subject: [PATCH 09/22] Update update_es_based_on_url_keyword.py - Improved the script to only delete the topic mappings that are associated with the keyword that is being updated. - If the document is found in ES, then check if we need to re-compute the embeddings, this should speed up the process of updating the DB in case of errors. - Renamed variables for clarity --- tools/update_es_based_on_url_keyword.py | 98 +++++++++++++++---------- 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index 8386d379..16e680c3 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -5,21 +5,33 @@ - DELETE_ARTICLE_NEW_TOPICS (bool): if we should delete the current new topics for the articles containing the URL_KEYWORD_TO_UPDATE. e.g. we note that there is a keyword wrongly associated with a specific topic. + - RECALCULATE_TOPICS (bool): if the topics for the articles should be recalculated. + Let's say two keywords have the same topic, then the article would loose it's topic + despite still being categorized by another keyword. E.g. 'vejret' and 'klima' are + associated with 'Technology & Science'. If we delete topics associated, with 'vejret' + all articles with 'vejret' will loose that topic. This would be incorrect for all + those that continue to have 'klima' as a keyword. For this reason, + RECALCULATE_TOPICS should be true. This setting MUST BE true, in case of an update, + e.g. we adde a new mapping to one of the keywords. + - RE_INDEX_ONLY_ARTICLES_IN_ES (bool): if the articles re-index are only those + that were already in ES. - ITERATION_STEP (int): number of articles to index in each loop. """ URL_KEYWORD_TO_UPDATE = "vejret" DELETE_ARTICLE_NEW_TOPICS = True -ITERATION_STEP = 100 +RECALCULATE_TOPICS = True +RE_INDEX_ONLY_ARTICLES_IN_ES = True +ITERATION_STEP = 1000 # coding=utf-8 from zeeguu.core.elastic.indexing import ( create_or_update_bulk_docs, ) -from sqlalchemy import func -from elasticsearch import Elasticsearch, helpers -from elasticsearch.helpers import bulk + +from elasticsearch import Elasticsearch +from elasticsearch.helpers import bulk, scan import zeeguu.core from zeeguu.core.model import Article from datetime import datetime @@ -27,13 +39,13 @@ from zeeguu.api.app import create_app from zeeguu.core.model import ( - Topic, ArticleUrlKeywordMap, UrlKeyword, NewArticleTopicMap, + NewTopic, ) from zeeguu.core.model.article import article_topic_map -from zeeguu.core.elastic.settings import ES_CONN_STRING +from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING import numpy as np from tqdm import tqdm @@ -41,55 +53,35 @@ app.app_context().push() -print(ES_CONN_STRING) es = Elasticsearch(ES_CONN_STRING) db_session = zeeguu.core.model.db.session print(es.info()) -def find_topics(article_id, session): - article_topic = ( - session.query(Topic) - .join(article_topic_map) - .filter(article_topic_map.c.article_id == article_id) - ) - topics = "" - for t in article_topic: - topics = topics + str(t.title) + " " - - return topics.rstrip() - - def main(): - def fetch_articles_by_id(id_list): + def fetch_articles_by_id(id_list, recalculate_topic=False): for i in id_list: try: article = Article.find_by_id(i) if not article: print(f"Skipped for: '{i}', article not in DB.") continue - topics = find_topics(article.id, db_session) - yield (article, topics) + if recalculate_topic: + article.recalculate_topics_from_url_keywords(db_session) + db_session.commit() + yield (article) except NoResultFound: print(f"fail for: '{i}'") except Exception as e: print(f"fail for: '{i}', {e}") def gen_docs(articles_w_topics): - for article, topics in articles_w_topics: + for article in articles_w_topics: try: - yield create_or_update_bulk_docs(article, db_session, topics) + yield create_or_update_bulk_docs(article, db_session) except Exception as e: print(f"fail for: '{article.id}', {e}") - # Updating url_keyword new_topic mapping - # If we are deleting the current topics, it means the mapping should not be kept. - if DELETE_ARTICLE_NEW_TOPICS: - url_keywords = UrlKeyword.find_all_by_keyword(URL_KEYWORD_TO_UPDATE) - for u_key in url_keywords: - u_key.new_topic_id = None - db_session.commit() - # Get the articles for the url_keyword target_ids = np.array( [ @@ -106,28 +98,58 @@ def gen_docs(articles_w_topics): f"Got articles with url_keyword '{URL_KEYWORD_TO_UPDATE}', total: {len(target_ids)}", ) + # Updating url_keyword new_topic mapping + # And the topics that were added based on that keyword. if DELETE_ARTICLE_NEW_TOPICS: + new_topics_ids_to_delete = [] + new_topics = [] + url_keywords = UrlKeyword.find_all_by_keyword(URL_KEYWORD_TO_UPDATE) + for u_key in url_keywords: + if u_key.new_topic_id: + new_topics.append(NewTopic.find_by_id(u_key.new_topic_id)) + new_topics_ids_to_delete.append(u_key.new_topic_id) + u_key.new_topic_id = None + print( - f"Deleting new_topics for articles with the keyword: '{URL_KEYWORD_TO_UPDATE}'" + f"Deleting new_topics '{",".join([t.title for t in new_topics])}' for articles which have the keyword: '{URL_KEYWORD_TO_UPDATE}'" ) - articles_to_delete = NewArticleTopicMap.query.filter( + topic_mappings_to_delete = NewArticleTopicMap.query.filter( NewArticleTopicMap.article_id.in_(list(target_ids)) + ).filter(NewArticleTopicMap.new_topic_id.in_(new_topics_ids_to_delete)) + print( + f"Found '{len(topic_mappings_to_delete.all())}' topic mappings to delete." ) - articles_to_delete.delete() + topic_mappings_to_delete.delete() db_session.commit() + print("MySQL deletion completed.") if len(target_ids) == 0: print("No articles found! Exiting...") return # I noticed that if a document is not added then it won't let me query the ES search. + if RE_INDEX_ONLY_ARTICLES_IN_ES: + print("Re-indexing only existing articles in ES...") + es_query = {"query": {"match_all": {}}} + ids_in_es = set( + [int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)] + ) + target_ids_in_es = list(filter(lambda x: x in ids_in_es, target_ids)) + if len(target_ids_in_es) < len(target_ids): + print( + f"From the total articles {len(target_ids)}, only {len(target_ids_in_es)} will be indexed..." + ) + target_ids = target_ids_in_es + total_added = 0 errors_encountered = [] - + print("Starting re-indexing process...") for i_start in tqdm(range(0, len(target_ids), ITERATION_STEP)): batch = target_ids[i_start : i_start + ITERATION_STEP] res_bulk, error_bulk = bulk( - es, gen_docs(fetch_articles_by_id(batch)), raise_on_error=False + es, + gen_docs(fetch_articles_by_id(batch, RECALCULATE_TOPICS)), + raise_on_error=False, ) total_added += res_bulk errors_encountered += error_bulk From fd5011f4c15341ee9578ce7a21f15cda2394f602 Mon Sep 17 00:00:00 2001 From: Mircea Filip Lungu Date: Mon, 18 Nov 2024 10:11:37 +0100 Subject: [PATCH 10/22] small refactorings --- tools/update_es_based_on_url_keyword.py | 30 ++++++++++++++----------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index 16e680c3..c8914865 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -44,7 +44,7 @@ NewArticleTopicMap, NewTopic, ) -from zeeguu.core.model.article import article_topic_map + from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING import numpy as np from tqdm import tqdm @@ -58,6 +58,19 @@ print(es.info()) +def ids_of_articles_matching_url_keyword(): + return np.array( + [ + a_id[0] + for a_id in db_session.query(Article.id) + .join(ArticleUrlKeywordMap) + .join(UrlKeyword) + .filter(UrlKeyword.keyword == URL_KEYWORD_TO_UPDATE) + .distinct() + ] + ) + + def main(): def fetch_articles_by_id(id_list, recalculate_topic=False): for i in id_list: @@ -69,7 +82,7 @@ def fetch_articles_by_id(id_list, recalculate_topic=False): if recalculate_topic: article.recalculate_topics_from_url_keywords(db_session) db_session.commit() - yield (article) + yield article except NoResultFound: print(f"fail for: '{i}'") except Exception as e: @@ -83,19 +96,10 @@ def gen_docs(articles_w_topics): print(f"fail for: '{article.id}', {e}") # Get the articles for the url_keyword - target_ids = np.array( - [ - a_id[0] - for a_id in db_session.query(Article.id) - .join(ArticleUrlKeywordMap) - .join(UrlKeyword) - .filter(UrlKeyword.keyword == URL_KEYWORD_TO_UPDATE) - .distinct() - ] - ) + target_ids = ids_of_articles_matching_url_keyword() print( - f"Got articles with url_keyword '{URL_KEYWORD_TO_UPDATE}', total: {len(target_ids)}", + f"Got articles for url_keyword '{URL_KEYWORD_TO_UPDATE}', total: {len(target_ids)}", ) # Updating url_keyword new_topic mapping From 00eb3b7671c2c4dfb17aacc8d2329cf55655912b Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Mon, 18 Nov 2024 11:20:22 +0100 Subject: [PATCH 11/22] Update to script based on review --- tools/mysql_to_elastic_new_topics.py | 4 +- tools/update_es_based_on_url_keyword.py | 69 ++++++++++++++++--------- zeeguu/core/elastic/indexing.py | 2 +- 3 files changed, 49 insertions(+), 26 deletions(-) diff --git a/tools/mysql_to_elastic_new_topics.py b/tools/mysql_to_elastic_new_topics.py index 995cfa69..e2d052ba 100644 --- a/tools/mysql_to_elastic_new_topics.py +++ b/tools/mysql_to_elastic_new_topics.py @@ -1,6 +1,6 @@ # coding=utf-8 from zeeguu.core.elastic.indexing import ( - create_or_update_bulk_docs, + create_or_update_doc_for_bulk, ) from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, scan @@ -69,7 +69,7 @@ def fetch_articles_by_id(id_list): def gen_docs(articles_w_topics): for article in articles_w_topics: try: - yield create_or_update_bulk_docs(article, db_session) + yield create_or_update_doc_for_bulk(article, db_session) except Exception as e: print(f"fail for: '{article.id}', {e}") diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index c8914865..6d0edae4 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -1,10 +1,22 @@ """ + Updates a url_keyword topic mapping and the article topic mapping for articles + that contain that url_keyword. + + The script follows the following structure: + 1. (IS_DELETION == True) + Delete the association of the url_keyword to the topic_id and all article + mappings to the topic this url_keyword was associated with. + 2. Recalculate all the topics based on url_keywords for the articles associated + with the url_keyword being updated. + 3. Re-index all the documents to ES. If RE_INDEX_ONLY_ARTICLES_IN_ES, only the + ones that were in ES are re-index, otherwise all the documents are indexed. + + This script expects the following parameters: - URL_KEYWORD_TO_UPDATE (str): the keyword we seek to update the ES - - DELETE_ARTICLE_NEW_TOPICS (bool): if we should delete the current new topics for - the articles containing the URL_KEYWORD_TO_UPDATE. e.g. we note that there is a - keyword wrongly associated with a specific topic. + - IS_DELETION (bool): when this is true, the URL_KEYWORD_TO_UPDATE topic_id mapping + is removed, and all associated article_topic_mappings are deleted. - RECALCULATE_TOPICS (bool): if the topics for the articles should be recalculated. Let's say two keywords have the same topic, then the article would loose it's topic despite still being categorized by another keyword. E.g. 'vejret' and 'klima' are @@ -12,14 +24,14 @@ all articles with 'vejret' will loose that topic. This would be incorrect for all those that continue to have 'klima' as a keyword. For this reason, RECALCULATE_TOPICS should be true. This setting MUST BE true, in case of an update, - e.g. we adde a new mapping to one of the keywords. + e.g. we add a new mapping to one of the keywords. - RE_INDEX_ONLY_ARTICLES_IN_ES (bool): if the articles re-index are only those that were already in ES. - ITERATION_STEP (int): number of articles to index in each loop. """ URL_KEYWORD_TO_UPDATE = "vejret" -DELETE_ARTICLE_NEW_TOPICS = True +IS_DELETION = False RECALCULATE_TOPICS = True RE_INDEX_ONLY_ARTICLES_IN_ES = True ITERATION_STEP = 1000 @@ -27,7 +39,7 @@ # coding=utf-8 from zeeguu.core.elastic.indexing import ( - create_or_update_bulk_docs, + create_or_update_doc_for_bulk, ) from elasticsearch import Elasticsearch @@ -72,26 +84,30 @@ def ids_of_articles_matching_url_keyword(): def main(): - def fetch_articles_by_id(id_list, recalculate_topic=False): + def recalculate_article_url_keyword_topics(article_id, commit_after_article=False): + article = Article.find_by_id(article_id) + article.recalculate_topics_from_url_keywords(db_session) + if commit_after_article: + db_session.commit() + return article + + def fetch_articles_by_id(id_list: list[int]): for i in id_list: try: article = Article.find_by_id(i) if not article: print(f"Skipped for: '{i}', article not in DB.") continue - if recalculate_topic: - article.recalculate_topics_from_url_keywords(db_session) - db_session.commit() - yield article + yield (article) except NoResultFound: print(f"fail for: '{i}'") except Exception as e: print(f"fail for: '{i}', {e}") - def gen_docs(articles_w_topics): - for article in articles_w_topics: + def gen_docs(articles: list[Article]): + for article in articles: try: - yield create_or_update_bulk_docs(article, db_session) + yield create_or_update_doc_for_bulk(article, db_session) except Exception as e: print(f"fail for: '{article.id}', {e}") @@ -104,22 +120,22 @@ def gen_docs(articles_w_topics): # Updating url_keyword new_topic mapping # And the topics that were added based on that keyword. - if DELETE_ARTICLE_NEW_TOPICS: - new_topics_ids_to_delete = [] - new_topics = [] + if IS_DELETION: + topics_ids_to_delete_mappings = [] + topics = [] url_keywords = UrlKeyword.find_all_by_keyword(URL_KEYWORD_TO_UPDATE) for u_key in url_keywords: if u_key.new_topic_id: - new_topics.append(NewTopic.find_by_id(u_key.new_topic_id)) - new_topics_ids_to_delete.append(u_key.new_topic_id) + topics.append(NewTopic.find_by_id(u_key.new_topic_id)) + topics_ids_to_delete_mappings.append(u_key.new_topic_id) u_key.new_topic_id = None print( - f"Deleting new_topics '{",".join([t.title for t in new_topics])}' for articles which have the keyword: '{URL_KEYWORD_TO_UPDATE}'" + f"Deleting new_topics '{",".join([t.title for t in topics])}' for articles which have the keyword: '{URL_KEYWORD_TO_UPDATE}'" ) topic_mappings_to_delete = NewArticleTopicMap.query.filter( NewArticleTopicMap.article_id.in_(list(target_ids)) - ).filter(NewArticleTopicMap.new_topic_id.in_(new_topics_ids_to_delete)) + ).filter(NewArticleTopicMap.new_topic_id.in_(topics_ids_to_delete_mappings)) print( f"Found '{len(topic_mappings_to_delete.all())}' topic mappings to delete." ) @@ -131,7 +147,14 @@ def gen_docs(articles_w_topics): print("No articles found! Exiting...") return - # I noticed that if a document is not added then it won't let me query the ES search. + if RECALCULATE_TOPICS: + print("Updating Article Topics based on URL Keywords...") + for a_id in tqdm(target_ids): + recalculate_article_url_keyword_topics(a_id) + print("Commiting...") + db_session.commit() + print("DONE.") + if RE_INDEX_ONLY_ARTICLES_IN_ES: print("Re-indexing only existing articles in ES...") es_query = {"query": {"match_all": {}}} @@ -152,7 +175,7 @@ def gen_docs(articles_w_topics): batch = target_ids[i_start : i_start + ITERATION_STEP] res_bulk, error_bulk = bulk( es, - gen_docs(fetch_articles_by_id(batch, RECALCULATE_TOPICS)), + gen_docs(fetch_articles_by_id(batch)), raise_on_error=False, ) total_added += res_bulk diff --git a/zeeguu/core/elastic/indexing.py b/zeeguu/core/elastic/indexing.py index 8ae273d7..bd059cfd 100644 --- a/zeeguu/core/elastic/indexing.py +++ b/zeeguu/core/elastic/indexing.py @@ -121,7 +121,7 @@ def create_or_update(article, session): return res -def create_or_update_bulk_docs(article, session): +def create_or_update_doc_for_bulk(article, session): es = Elasticsearch(ES_CONN_STRING) es_version = int(es.info()["version"]["number"][0]) From 791d038e041d9121957cdd82f95e904badeccd95 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Mon, 18 Nov 2024 12:21:20 +0100 Subject: [PATCH 12/22] Update mysql_to_elastic_new_topics.py - Fixed a case, if the Index doesn't exist an error is thrown. - Use the configuration constant rather than hardcoded value --- tools/mysql_to_elastic_new_topics.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tools/mysql_to_elastic_new_topics.py b/tools/mysql_to_elastic_new_topics.py index e2d052ba..0d1d90d7 100644 --- a/tools/mysql_to_elastic_new_topics.py +++ b/tools/mysql_to_elastic_new_topics.py @@ -9,7 +9,6 @@ from datetime import datetime from sqlalchemy.orm.exc import NoResultFound from zeeguu.api.app import create_app - from zeeguu.core.model import NewArticleTopicMap from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING from zeeguu.core.model.new_article_topic_map import TopicOriginType @@ -45,7 +44,7 @@ def main(): if DELETE_INDEX: try: - es.options(ignore_status=[400, 404]).indices.delete(index="zeeguu") + es.options(ignore_status=[400, 404]).indices.delete(index=ES_ZINDEX) print("Deleted index 'zeeguu'!") except Exception as e: print(f"Failed to delete: {e}") @@ -109,11 +108,16 @@ def gen_docs(articles_w_topics): if len(target_ids) == 0: print("No articles found! Exiting...") return - es_query = {"query": {"match_all": {}}} - ids_in_es = set( - [int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)] - ) - target_ids_not_in_es = list(filter(lambda x: x not in ids_in_es, target_ids)) + if es.indices.exists(index=ES_ZINDEX): + es_query = {"query": {"match_all": {}}} + ids_in_es = set( + [int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)] + ) + target_ids_not_in_es = list(filter(lambda x: x not in ids_in_es, target_ids)) + else: + # The index was deleted / doesn't exist: + target_ids_not_in_es = target_ids + print("Total articles missing: ", len(target_ids_not_in_es)) # I noticed that if a document is not added then it won't let me query the ES search. From 3dc0e25691017980278b2b9e1edd73f6c81f3810 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Mon, 18 Nov 2024 12:21:58 +0100 Subject: [PATCH 13/22] Update update_es_based_on_url_keyword.py - By default, we are doing deletion. --- tools/update_es_based_on_url_keyword.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index 6d0edae4..0e23f305 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -31,7 +31,7 @@ """ URL_KEYWORD_TO_UPDATE = "vejret" -IS_DELETION = False +IS_DELETION = True RECALCULATE_TOPICS = True RE_INDEX_ONLY_ARTICLES_IN_ES = True ITERATION_STEP = 1000 From 0effc2f5384c6f2bf3f2c0b66ff782388d707712 Mon Sep 17 00:00:00 2001 From: Mircea Filip Lungu Date: Wed, 20 Nov 2024 16:51:10 +0100 Subject: [PATCH 14/22] small changes. --- tools/update_es_based_on_url_keyword.py | 49 ++++++++++++++++--------- zeeguu/core/elastic/indexing.py | 2 + zeeguu/core/model/article.py | 4 +- 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index 0e23f305..6ddba43b 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -27,7 +27,8 @@ e.g. we add a new mapping to one of the keywords. - RE_INDEX_ONLY_ARTICLES_IN_ES (bool): if the articles re-index are only those that were already in ES. - - ITERATION_STEP (int): number of articles to index in each loop. + - ITERATION_STEP (int): number of articles to index in each loop. + """ URL_KEYWORD_TO_UPDATE = "vejret" @@ -111,11 +112,12 @@ def gen_docs(articles: list[Article]): except Exception as e: print(f"fail for: '{article.id}', {e}") + # Main action! # Get the articles for the url_keyword - target_ids = ids_of_articles_matching_url_keyword() + ids_of_articles_containing_keyword = ids_of_articles_matching_url_keyword() print( - f"Got articles for url_keyword '{URL_KEYWORD_TO_UPDATE}', total: {len(target_ids)}", + f"Got articles for url_keyword '{URL_KEYWORD_TO_UPDATE}', total: {len(ids_of_articles_containing_keyword)}", ) # Updating url_keyword new_topic mapping @@ -123,6 +125,9 @@ def gen_docs(articles: list[Article]): if IS_DELETION: topics_ids_to_delete_mappings = [] topics = [] + + # the only reason why here we have multiple keywords is that there are multiple languages + # so the following code would delete both danish and norwegian topics url_keywords = UrlKeyword.find_all_by_keyword(URL_KEYWORD_TO_UPDATE) for u_key in url_keywords: if u_key.new_topic_id: @@ -131,48 +136,56 @@ def gen_docs(articles: list[Article]): u_key.new_topic_id = None print( - f"Deleting new_topics '{",".join([t.title for t in topics])}' for articles which have the keyword: '{URL_KEYWORD_TO_UPDATE}'" + f"Deleting new_topics '{ ','.join([t.title for t in topics])} ' for articles which have the keyword: '{URL_KEYWORD_TO_UPDATE}'" ) - topic_mappings_to_delete = NewArticleTopicMap.query.filter( - NewArticleTopicMap.article_id.in_(list(target_ids)) + article_topic_mappings_to_delete = NewArticleTopicMap.query.filter( + NewArticleTopicMap.article_id.in_(list(ids_of_articles_containing_keyword)) ).filter(NewArticleTopicMap.new_topic_id.in_(topics_ids_to_delete_mappings)) print( - f"Found '{len(topic_mappings_to_delete.all())}' topic mappings to delete." + f"Found '{len(article_topic_mappings_to_delete.all())}' topic mappings to delete." ) - topic_mappings_to_delete.delete() + article_topic_mappings_to_delete.delete() db_session.commit() print("MySQL deletion completed.") - if len(target_ids) == 0: + if len(ids_of_articles_containing_keyword) == 0: print("No articles found! Exiting...") return if RECALCULATE_TOPICS: print("Updating Article Topics based on URL Keywords...") - for a_id in tqdm(target_ids): + for a_id in tqdm(ids_of_articles_containing_keyword): recalculate_article_url_keyword_topics(a_id) print("Commiting...") db_session.commit() print("DONE.") if RE_INDEX_ONLY_ARTICLES_IN_ES: - print("Re-indexing only existing articles in ES...") + print( + "Selecting only the IDS of articles matching the keyword that are already in ES..." + ) es_query = {"query": {"match_all": {}}} ids_in_es = set( [int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)] ) - target_ids_in_es = list(filter(lambda x: x in ids_in_es, target_ids)) - if len(target_ids_in_es) < len(target_ids): + target_ids_in_es = list( + filter(lambda x: x in ids_in_es, ids_of_articles_containing_keyword) + ) + if len(target_ids_in_es) < len(ids_of_articles_containing_keyword): print( - f"From the total articles {len(target_ids)}, only {len(target_ids_in_es)} will be indexed..." + f"From the total articles {len(ids_of_articles_containing_keyword)}, only {len(target_ids_in_es)} will be indexed..." ) - target_ids = target_ids_in_es + # we overwrite here the original article list with the subset that exists already in ES + ids_of_articles_containing_keyword = target_ids_in_es + # THE ACTUAL INDEXING total_added = 0 errors_encountered = [] print("Starting re-indexing process...") - for i_start in tqdm(range(0, len(target_ids), ITERATION_STEP)): - batch = target_ids[i_start : i_start + ITERATION_STEP] + for i_start in tqdm( + range(0, len(ids_of_articles_containing_keyword), ITERATION_STEP) + ): + batch = ids_of_articles_containing_keyword[i_start : i_start + ITERATION_STEP] res_bulk, error_bulk = bulk( es, gen_docs(fetch_articles_by_id(batch)), @@ -180,11 +193,13 @@ def gen_docs(articles: list[Article]): ) total_added += res_bulk errors_encountered += error_bulk + total_bulk_errors = len(error_bulk) if total_bulk_errors > 0: print(f"## Warning, {total_bulk_errors} failed to index. With errors: ") print(error_bulk) db_session.commit() + print(f"Batch finished. ADDED/UPDATED:{res_bulk} | ERRORS: {total_bulk_errors}") print(errors_encountered) print(f"Total articles added/updated: {total_added}") diff --git a/zeeguu/core/elastic/indexing.py b/zeeguu/core/elastic/indexing.py index bd059cfd..518534af 100644 --- a/zeeguu/core/elastic/indexing.py +++ b/zeeguu/core/elastic/indexing.py @@ -122,6 +122,8 @@ def create_or_update(article, session): def create_or_update_doc_for_bulk(article, session): + + # Bulk indexing requires a special kind of "document" es = Elasticsearch(ES_CONN_STRING) es_version = int(es.info()["version"]["number"][0]) diff --git a/zeeguu/core/model/article.py b/zeeguu/core/model/article.py index 57a60a76..9523cc6f 100644 --- a/zeeguu/core/model/article.py +++ b/zeeguu/core/model/article.py @@ -302,15 +302,13 @@ def add_topic(self, topic): self.topics.append(topic) def recalculate_topics_from_url_keywords(self, session): - topics_added = set([t.new_topic.id for t in self.new_topics]) topics = [] for url_keyword in self.url_keywords: topic = url_keyword.url_keyword.new_topic if topic is None: continue - if topic.id in topics_added: + if topic in topics: continue - topics_added.add(topic.id) topics.append(topic) self.add_new_topics_from_url_keyword(topics, session) From 9df145aee1715a2130acc368b4427260be7940d7 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Thu, 21 Nov 2024 10:15:09 +0100 Subject: [PATCH 15/22] Update update_es_based_on_url_keyword.py - Also check if Index is created before filtering. --- tools/update_es_based_on_url_keyword.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index 6ddba43b..6937175a 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -160,7 +160,7 @@ def gen_docs(articles: list[Article]): db_session.commit() print("DONE.") - if RE_INDEX_ONLY_ARTICLES_IN_ES: + if RE_INDEX_ONLY_ARTICLES_IN_ES and es.indices.exists(index=ES_ZINDEX): print( "Selecting only the IDS of articles matching the keyword that are already in ES..." ) From dfe607f288715d32696909a239e5604d05c8dfa2 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Thu, 21 Nov 2024 10:15:20 +0100 Subject: [PATCH 16/22] Renaming + Moving old file to old --- tools/mysql_to_elastic.py | 179 ++++++++++++++++++++------- tools/mysql_to_elastic_new_topics.py | 155 ----------------------- tools/old/mysql_to_elastic.py | 68 ++++++++++ 3 files changed, 201 insertions(+), 201 deletions(-) delete mode 100644 tools/mysql_to_elastic_new_topics.py create mode 100644 tools/old/mysql_to_elastic.py diff --git a/tools/mysql_to_elastic.py b/tools/mysql_to_elastic.py index 85340560..5ab81888 100644 --- a/tools/mysql_to_elastic.py +++ b/tools/mysql_to_elastic.py @@ -1,68 +1,155 @@ # coding=utf-8 -import sqlalchemy as database -from zeeguu.api.app import create_app -from zeeguu.core.elastic.indexing import create_or_update, document_from_article -from sqlalchemy import func +from zeeguu.core.elastic.indexing import ( + create_or_update_doc_for_bulk, +) from elasticsearch import Elasticsearch +from elasticsearch.helpers import bulk, scan import zeeguu.core -from sqlalchemy.orm import sessionmaker from zeeguu.core.model import Article -import sys from datetime import datetime from sqlalchemy.orm.exc import NoResultFound - +from zeeguu.api.app import create_app +from zeeguu.core.model import ArticleTopicMap from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING +from zeeguu.core.model.article_topic_map import TopicOriginType +import numpy as np +from tqdm import tqdm +import time app = create_app() app.app_context().push() +# First we should only index with topics so we can do +# inference based on the articles that have topics. -es = Elasticsearch([ES_CONN_STRING]) -DB_URI = app.config["SQLALCHEMY_DATABASE_URI"] -engine = database.create_engine(DB_URI) -Session = sessionmaker(bind=engine) -session = Session() +# These parameters can be changed based on need. +# DELETE_INDEX - used to re-index from scratch. Default: False +# INDEX_WITH_TOPIC_ONLY - determines which articles are indexed. If set to True, +# only the articles which have a topic assigned to them are index. If false, then +# only the articles without the topic will be added. Default: True +# TOTAL_ITEMS - total items to be indexed, the IDs are sampled and this is used to +# have a variety of different articles in ES. Default: 5000 +# NOTE: If you want to index all the articles, you shoud pass a number that's higher +# or equal to the number of articles in the DB +# ITERATION_STEP - number of articles to index before reporting. Default: 1000 +DELETE_INDEX = False +INDEX_WITH_TOPIC_ONLY = True +TOTAL_ITEMS = 1000 +ITERATION_STEP = 100 +print(ES_CONN_STRING) +es = Elasticsearch(ES_CONN_STRING) +db_session = zeeguu.core.model.db.session +print(es.info()) -def main(starting_index): - max_id = session.query(func.max(Article.id)).first()[0] - min_id = session.query(func.min(Article.id)).first()[0] +def main(): + if DELETE_INDEX: + try: + es.options(ignore_status=[400, 404]).indices.delete(index=ES_ZINDEX) + print("Deleted index 'zeeguu'!") + except Exception as e: + print(f"Failed to delete: {e}") - if max_id is None: - max_id = 0 - if min_id is None: - min_id = 0 - - print(f"starting import at: {starting_index}") - print(f"max id in db: {max_id}") + def fetch_articles_by_id(id_list): + for i in id_list: + try: + if es.exists(index=ES_ZINDEX, id=i): + print(f"Skipped for: '{i}', article already in ES.") + continue + article = Article.find_by_id(i) + if not article: + print(f"Skipped for: '{i}', article not in DB.") + continue + yield article + except NoResultFound: + print(f"fail for: '{i}'") + except Exception as e: + print(f"fail for: '{i}', {e}") - for i in range(max_id, min_id, -1): - print("article id...") - print(i) - try: - article = Article.find_by_id(i) - if article: - print(article.title) - print(article.id) - res = create_or_update(article, session) - print(res) - except NoResultFound: - print(f"fail for: {i}") - except: - print("fail for " + str(i)) - # import traceback - # traceback.print_exc() + def gen_docs(articles_w_topics): + for article in articles_w_topics: + try: + yield create_or_update_doc_for_bulk(article, db_session) + except Exception as e: + print(f"fail for: '{article.id}', {e}") + # Sample Articles that have topics assigned and are not inferred + if INDEX_WITH_TOPIC_ONLY: + target_ids = np.array( + [ + a_id[0] + for a_id in db_session.query(Article.id) + .join(ArticleTopicMap) + .filter( + ArticleTopicMap.origin_type != TopicOriginType.INFERRED + ) # Do not index Inferred topics + .filter(Article.broken != 1) # Filter out documents that are broken + # .filter(Article.language_id == 2) If only one language + .distinct() + ] + ) + print("Got articles with topics, total: ", len(target_ids)) + else: + articles_with_topic = set( + [ + art_id_w_topic[0] + for art_id_w_topic in db_session.query( + ArticleTopicMap.article_id + ).distinct() + ] + ) + target_ids = np.array( + list( + set([a_id[0] for a_id in db_session.query(Article.id)]) + - articles_with_topic + ) + ) + print("Got articles without topics, total: ", len(target_ids)) -if __name__ == "__main__": + if len(target_ids) == 0: + print("No articles found! Exiting...") + return + if es.indices.exists(index=ES_ZINDEX): + es_query = {"query": {"match_all": {}}} + ids_in_es = set( + [int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)] + ) + target_ids_not_in_es = list(filter(lambda x: x not in ids_in_es, target_ids)) + else: + # The index was deleted / doesn't exist: + target_ids_not_in_es = target_ids - print("waiting for the ES process to boot up") + print("Total articles missing: ", len(target_ids_not_in_es)) - print(f"started at: {datetime.now()}") - starting_index = 0 + # I noticed that if a document is not added then it won't let me query the ES search. + total_added = 0 + errors_encountered = [] + final_count_of_articles = min(TOTAL_ITEMS, len(target_ids_not_in_es)) + sampled_ids = np.random.choice( + target_ids_not_in_es, final_count_of_articles, replace=False + ) + for i_start in tqdm(range(0, final_count_of_articles, ITERATION_STEP)): + sub_sample = sampled_ids[i_start : i_start + ITERATION_STEP] + res_bulk, error_bulk = bulk( + es, gen_docs(fetch_articles_by_id(sub_sample)), raise_on_error=False + ) + total_added += res_bulk + errors_encountered += error_bulk + total_bulk_errors = len(error_bulk) + if total_bulk_errors > 0: + print(f"## Warning, {total_bulk_errors} failed to index. With errors: ") + print(error_bulk) + print(f"Batch finished. ADDED:{res_bulk} | ERRORS: {total_bulk_errors}") + print(errors_encountered) + print(f"Total articles added: {total_added}") - if len(sys.argv) > 1: - starting_index = int(sys.argv[1]) - main(starting_index) - print(f"ended at: {datetime.now()}") +if __name__ == "__main__": + + print("waiting for the ES process to boot up") + start = datetime.now() + print(f"started at: {start}") + main() + end = datetime.now() + print(f"ended at: {end}") + print(f"Process took: {end-start}") diff --git a/tools/mysql_to_elastic_new_topics.py b/tools/mysql_to_elastic_new_topics.py deleted file mode 100644 index 5ab81888..00000000 --- a/tools/mysql_to_elastic_new_topics.py +++ /dev/null @@ -1,155 +0,0 @@ -# coding=utf-8 -from zeeguu.core.elastic.indexing import ( - create_or_update_doc_for_bulk, -) -from elasticsearch import Elasticsearch -from elasticsearch.helpers import bulk, scan -import zeeguu.core -from zeeguu.core.model import Article -from datetime import datetime -from sqlalchemy.orm.exc import NoResultFound -from zeeguu.api.app import create_app -from zeeguu.core.model import ArticleTopicMap -from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING -from zeeguu.core.model.article_topic_map import TopicOriginType -import numpy as np -from tqdm import tqdm -import time - -app = create_app() -app.app_context().push() -# First we should only index with topics so we can do -# inference based on the articles that have topics. - -# These parameters can be changed based on need. -# DELETE_INDEX - used to re-index from scratch. Default: False -# INDEX_WITH_TOPIC_ONLY - determines which articles are indexed. If set to True, -# only the articles which have a topic assigned to them are index. If false, then -# only the articles without the topic will be added. Default: True -# TOTAL_ITEMS - total items to be indexed, the IDs are sampled and this is used to -# have a variety of different articles in ES. Default: 5000 -# NOTE: If you want to index all the articles, you shoud pass a number that's higher -# or equal to the number of articles in the DB -# ITERATION_STEP - number of articles to index before reporting. Default: 1000 -DELETE_INDEX = False -INDEX_WITH_TOPIC_ONLY = True -TOTAL_ITEMS = 1000 -ITERATION_STEP = 100 - -print(ES_CONN_STRING) -es = Elasticsearch(ES_CONN_STRING) -db_session = zeeguu.core.model.db.session -print(es.info()) - - -def main(): - if DELETE_INDEX: - try: - es.options(ignore_status=[400, 404]).indices.delete(index=ES_ZINDEX) - print("Deleted index 'zeeguu'!") - except Exception as e: - print(f"Failed to delete: {e}") - - def fetch_articles_by_id(id_list): - for i in id_list: - try: - if es.exists(index=ES_ZINDEX, id=i): - print(f"Skipped for: '{i}', article already in ES.") - continue - article = Article.find_by_id(i) - if not article: - print(f"Skipped for: '{i}', article not in DB.") - continue - yield article - except NoResultFound: - print(f"fail for: '{i}'") - except Exception as e: - print(f"fail for: '{i}', {e}") - - def gen_docs(articles_w_topics): - for article in articles_w_topics: - try: - yield create_or_update_doc_for_bulk(article, db_session) - except Exception as e: - print(f"fail for: '{article.id}', {e}") - - # Sample Articles that have topics assigned and are not inferred - if INDEX_WITH_TOPIC_ONLY: - target_ids = np.array( - [ - a_id[0] - for a_id in db_session.query(Article.id) - .join(ArticleTopicMap) - .filter( - ArticleTopicMap.origin_type != TopicOriginType.INFERRED - ) # Do not index Inferred topics - .filter(Article.broken != 1) # Filter out documents that are broken - # .filter(Article.language_id == 2) If only one language - .distinct() - ] - ) - print("Got articles with topics, total: ", len(target_ids)) - else: - articles_with_topic = set( - [ - art_id_w_topic[0] - for art_id_w_topic in db_session.query( - ArticleTopicMap.article_id - ).distinct() - ] - ) - target_ids = np.array( - list( - set([a_id[0] for a_id in db_session.query(Article.id)]) - - articles_with_topic - ) - ) - print("Got articles without topics, total: ", len(target_ids)) - - if len(target_ids) == 0: - print("No articles found! Exiting...") - return - if es.indices.exists(index=ES_ZINDEX): - es_query = {"query": {"match_all": {}}} - ids_in_es = set( - [int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)] - ) - target_ids_not_in_es = list(filter(lambda x: x not in ids_in_es, target_ids)) - else: - # The index was deleted / doesn't exist: - target_ids_not_in_es = target_ids - - print("Total articles missing: ", len(target_ids_not_in_es)) - - # I noticed that if a document is not added then it won't let me query the ES search. - total_added = 0 - errors_encountered = [] - final_count_of_articles = min(TOTAL_ITEMS, len(target_ids_not_in_es)) - sampled_ids = np.random.choice( - target_ids_not_in_es, final_count_of_articles, replace=False - ) - for i_start in tqdm(range(0, final_count_of_articles, ITERATION_STEP)): - sub_sample = sampled_ids[i_start : i_start + ITERATION_STEP] - res_bulk, error_bulk = bulk( - es, gen_docs(fetch_articles_by_id(sub_sample)), raise_on_error=False - ) - total_added += res_bulk - errors_encountered += error_bulk - total_bulk_errors = len(error_bulk) - if total_bulk_errors > 0: - print(f"## Warning, {total_bulk_errors} failed to index. With errors: ") - print(error_bulk) - print(f"Batch finished. ADDED:{res_bulk} | ERRORS: {total_bulk_errors}") - print(errors_encountered) - print(f"Total articles added: {total_added}") - - -if __name__ == "__main__": - - print("waiting for the ES process to boot up") - start = datetime.now() - print(f"started at: {start}") - main() - end = datetime.now() - print(f"ended at: {end}") - print(f"Process took: {end-start}") diff --git a/tools/old/mysql_to_elastic.py b/tools/old/mysql_to_elastic.py new file mode 100644 index 00000000..85340560 --- /dev/null +++ b/tools/old/mysql_to_elastic.py @@ -0,0 +1,68 @@ +# coding=utf-8 +import sqlalchemy as database +from zeeguu.api.app import create_app +from zeeguu.core.elastic.indexing import create_or_update, document_from_article +from sqlalchemy import func +from elasticsearch import Elasticsearch +import zeeguu.core +from sqlalchemy.orm import sessionmaker +from zeeguu.core.model import Article +import sys +from datetime import datetime +from sqlalchemy.orm.exc import NoResultFound + +from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING + +app = create_app() +app.app_context().push() + +es = Elasticsearch([ES_CONN_STRING]) +DB_URI = app.config["SQLALCHEMY_DATABASE_URI"] +engine = database.create_engine(DB_URI) +Session = sessionmaker(bind=engine) +session = Session() + + +def main(starting_index): + + max_id = session.query(func.max(Article.id)).first()[0] + min_id = session.query(func.min(Article.id)).first()[0] + + if max_id is None: + max_id = 0 + if min_id is None: + min_id = 0 + + print(f"starting import at: {starting_index}") + print(f"max id in db: {max_id}") + + for i in range(max_id, min_id, -1): + print("article id...") + print(i) + try: + article = Article.find_by_id(i) + if article: + print(article.title) + print(article.id) + res = create_or_update(article, session) + print(res) + except NoResultFound: + print(f"fail for: {i}") + except: + print("fail for " + str(i)) + # import traceback + # traceback.print_exc() + + +if __name__ == "__main__": + + print("waiting for the ES process to boot up") + + print(f"started at: {datetime.now()}") + starting_index = 0 + + if len(sys.argv) > 1: + starting_index = int(sys.argv[1]) + + main(starting_index) + print(f"ended at: {datetime.now()}") From 87ef5adeab2ad9b94210e618e4d2a5d02545c1f5 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Thu, 21 Nov 2024 10:17:12 +0100 Subject: [PATCH 17/22] Update indexing.py - Renaming got lost in the merge. --- zeeguu/core/elastic/indexing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zeeguu/core/elastic/indexing.py b/zeeguu/core/elastic/indexing.py index 120618b9..8d01970e 100644 --- a/zeeguu/core/elastic/indexing.py +++ b/zeeguu/core/elastic/indexing.py @@ -85,7 +85,7 @@ def create_or_update(article, session): return res -def create_or_update_bulk_docs(article, session): +def create_or_update_doc_for_bulk(article, session): es = Elasticsearch(ES_CONN_STRING) doc_data = document_from_article(article, session) doc = {} @@ -94,7 +94,7 @@ def create_or_update_bulk_docs(article, session): if es.exists(index=ES_ZINDEX, id=article.id): current_doc = es.get(index=ES_ZINDEX, id=article.id) doc_data = document_from_article( - article, session, current_doc=current_doc["_source"] + article, session, current_doc=current_doc["_source"] ) doc["_op_type"] = "update" doc["_source"] = {"doc": doc_data} From 5357f2ccb782ddaeed822f72424ff0f426529bea Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Thu, 21 Nov 2024 10:28:48 +0100 Subject: [PATCH 18/22] Fixed some namings failing tests --- zeeguu/core/model/article.py | 2 +- zeeguu/core/test/rules/topic_rule.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/zeeguu/core/model/article.py b/zeeguu/core/model/article.py index a67e8faa..ea57dc8b 100644 --- a/zeeguu/core/model/article.py +++ b/zeeguu/core/model/article.py @@ -281,7 +281,7 @@ def is_owned_by(self, user): def add_topic(self, topic, session, origin_type: TopicOriginType): t = ArticleTopicMap( - article=self, new_topic=topic, origin_type=origin_type + article=self, topic=topic, origin_type=origin_type ) session.add(t) diff --git a/zeeguu/core/test/rules/topic_rule.py b/zeeguu/core/test/rules/topic_rule.py index 4a82087d..b9896dc8 100644 --- a/zeeguu/core/test/rules/topic_rule.py +++ b/zeeguu/core/test/rules/topic_rule.py @@ -40,11 +40,11 @@ def __create_new_topic(cls, topic_id): if topic_name is None: raise KeyError - new_topic = Topic(topic_name) + topic = Topic(topic_name) - cls.save(new_topic) + cls.save(topic) - return new_topic + return topic @property def random(self): From d867896b31e68623869a1a8c7719c75374415f91 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Thu, 21 Nov 2024 10:28:54 +0100 Subject: [PATCH 19/22] Formatting changes --- tools/mysql_to_elastic.py | 3 ++- tools/update_es_based_on_url_keyword.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tools/mysql_to_elastic.py b/tools/mysql_to_elastic.py index 5ab81888..b0b83302 100644 --- a/tools/mysql_to_elastic.py +++ b/tools/mysql_to_elastic.py @@ -119,7 +119,8 @@ def gen_docs(articles_w_topics): # The index was deleted / doesn't exist: target_ids_not_in_es = target_ids - print("Total articles missing: ", len(target_ids_not_in_es)) + print(f"""Total articles missing: {len(target_ids_not_in_es)}""") + print(f"""Indexing a total of: {TOTAL_ITEMS}, in batches of: {ITERATION_STEP}""") # I noticed that if a document is not added then it won't let me query the ES search. total_added = 0 diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index 6937175a..3b7bd091 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -117,7 +117,7 @@ def gen_docs(articles: list[Article]): ids_of_articles_containing_keyword = ids_of_articles_matching_url_keyword() print( - f"Got articles for url_keyword '{URL_KEYWORD_TO_UPDATE}', total: {len(ids_of_articles_containing_keyword)}", + f"Got articles for url_keyword '{URL_KEYWORD_TO_UPDATE}' total: {len(ids_of_articles_containing_keyword)}", ) # Updating url_keyword new_topic mapping From 1d30360383b909edee9bb215d724bcea15b9e9b5 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Thu, 21 Nov 2024 10:31:52 +0100 Subject: [PATCH 20/22] Update mysql_to_elastic.py --- tools/mysql_to_elastic.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tools/mysql_to_elastic.py b/tools/mysql_to_elastic.py index b0b83302..d581f8db 100644 --- a/tools/mysql_to_elastic.py +++ b/tools/mysql_to_elastic.py @@ -106,6 +106,7 @@ def gen_docs(articles_w_topics): ) print("Got articles without topics, total: ", len(target_ids)) + total_articles_in_es = 0 if len(target_ids) == 0: print("No articles found! Exiting...") return @@ -114,11 +115,13 @@ def gen_docs(articles_w_topics): ids_in_es = set( [int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)] ) + total_articles_in_es = len(ids_in_es) target_ids_not_in_es = list(filter(lambda x: x not in ids_in_es, target_ids)) else: # The index was deleted / doesn't exist: target_ids_not_in_es = target_ids + print(f"""Total articles in ES: {total_articles_in_es}""") print(f"""Total articles missing: {len(target_ids_not_in_es)}""") print(f"""Indexing a total of: {TOTAL_ITEMS}, in batches of: {ITERATION_STEP}""") From 6e3cc7d87764f3cda908b0b9dfd31748d06f8376 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Tue, 26 Nov 2024 10:04:47 +0100 Subject: [PATCH 21/22] Added more explicit add topic actions to articles - Before we had a "add_topic" which just attempted to add a topic to the article. However, this would fail if there was an existing article. To avoid situations where we replace topics when we don't expect, I separated the methods into 'add_topic_if_doesnt_exist' and 'add_or_replace_topic'. These names should be clear to their function. - With the methods above, whenever we add keyword topics or hardcoded these should be prioritized over the inferred topics. This means whenever a keyword or hardset is topic we would use the 'add_or_replace_topic' and for inferred topics 'add_topic_if_doesnt_exist' --- .../content_retriever/article_downloader.py | 12 ++++-- zeeguu/core/model/article.py | 17 +++++--- zeeguu/core/model/article_topic_map.py | 41 +++++++++++++++++++ zeeguu/core/test/test_article.py | 25 ++++++++++- 4 files changed, 85 insertions(+), 10 deletions(-) diff --git a/zeeguu/core/content_retriever/article_downloader.py b/zeeguu/core/content_retriever/article_downloader.py index 358597fa..5a710f67 100644 --- a/zeeguu/core/content_retriever/article_downloader.py +++ b/zeeguu/core/content_retriever/article_downloader.py @@ -343,7 +343,9 @@ def add_topics(new_article, feed, url_keywords, session): if feed.id in HARDCODED_FEEDS: print("Used HARDCODED feed") topic = Topic.find_by_id(HARDCODED_FEEDS[feed.id]) - new_article.add_topic(topic, session, TopicOriginType.HARDSET.value) + new_article.add_topic_if_doesnt_exist( + topic, session, TopicOriginType.HARDSET.value + ) session.add(new_article) return TopicOriginType.HARDSET.value, [topic.title] @@ -360,7 +362,9 @@ def add_topics(new_article, feed, url_keywords, session): continue topics_added.add(topic.id) topics.append(topic) - new_article.add_topic(topic, session, TopicOriginType.URL_PARSED.value) + new_article.add_topic_if_doesnt_exist( + topic, session, TopicOriginType.URL_PARSED.value + ) if len(topics) > 0: print("Used URL PARSED") @@ -387,7 +391,9 @@ def add_topics(new_article, feed, url_keywords, session): ) # The threshold is being at least half or above rounded down if count >= threshold: print(f"Used INFERRED: {top_topic}, {count}, with t={threshold}") - new_article.add_topic(top_topic, session, TopicOriginType.INFERRED.value) + new_article.add_topic_if_doesnt_exist( + top_topic, session, TopicOriginType.INFERRED.value + ) session.add(new_article) return TopicOriginType.INFERRED.value, [ t.topic.title for t in new_article.topics diff --git a/zeeguu/core/model/article.py b/zeeguu/core/model/article.py index ea57dc8b..98141dc7 100644 --- a/zeeguu/core/model/article.py +++ b/zeeguu/core/model/article.py @@ -278,9 +278,14 @@ def article_info_for_teacher(self): def is_owned_by(self, user): return self.uploader_id == user.id - def add_topic(self, topic, session, origin_type: TopicOriginType): - - t = ArticleTopicMap( + def add_or_replace_topic(self, topic, session, origin_type: TopicOriginType): + t = ArticleTopicMap.create_or_update( + article=self, topic=topic, origin_type=origin_type + ) + session.add(t) + + def add_topic_if_doesnt_exist(self, topic, session, origin_type: TopicOriginType): + t = ArticleTopicMap.create_if_doesnt_exists( article=self, topic=topic, origin_type=origin_type ) session.add(t) @@ -296,10 +301,12 @@ def recalculate_topics_from_url_keywords(self, session): topics.append(topic) self.add_topics_from_url_keyword(topics, session) - def add_topics_from_url_keyword(self, topics, session): for topic in topics: - self.add_topic(topic, session, TopicOriginType.URL_PARSED.value) + t = ArticleTopicMap.create_or_update( + article=self, topic=topic, origin_type=TopicOriginType.URL_PARSED + ) + session.add(t) def add_url_keyword(self, url_keyword, rank, session): diff --git a/zeeguu/core/model/article_topic_map.py b/zeeguu/core/model/article_topic_map.py index 12dd315c..e818fa7d 100644 --- a/zeeguu/core/model/article_topic_map.py +++ b/zeeguu/core/model/article_topic_map.py @@ -1,5 +1,6 @@ from zeeguu.core.model import db from sqlalchemy import Column, Integer, ForeignKey +from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm import relationship from enum import IntEnum @@ -18,3 +19,43 @@ class ArticleTopicMap(db.Model): origin_type = Column(Integer) article = relationship("Article", back_populates="topics") topic = relationship("Topic", back_populates="articles") + + def __init__(self, article, topic, origin_type): + self.article = article + self.topic = topic + self.origin_type = origin_type + + @classmethod + def create_or_update(cls, article, topic, origin_type: TopicOriginType): + try: + topic_mapping = ( + cls.query.filter(cls.article == article) + .filter(cls.topic == topic) + .one() + ) + if topic_mapping.origin_type != origin_type: + print( + f"## Article Topic Mapping Found '{topic.title}', overriding {topic_mapping.origin_type} with {origin_type}" + ) + topic_mapping.origin_type = origin_type + except NoResultFound: + topic_mapping = ArticleTopicMap( + article=article, topic=topic, origin_type=origin_type + ) + + return topic_mapping + + @classmethod + def create_if_doesnt_exists(cls, article, topic, origin_type: TopicOriginType): + try: + topic_mapping = ( + cls.query.filter(cls.article == article) + .filter(cls.topic == topic) + .one() + ) + return None + except NoResultFound: + topic_mapping = ArticleTopicMap( + article=article, topic=topic, origin_type=origin_type + ) + return topic_mapping diff --git a/zeeguu/core/test/test_article.py b/zeeguu/core/test/test_article.py index 83a60475..82ce7789 100644 --- a/zeeguu/core/test/test_article.py +++ b/zeeguu/core/test/test_article.py @@ -32,13 +32,34 @@ def test_article_representation_does_not_error(self): def test_add_topic(self): sports = TopicRule.get_or_create_topic(1) health_society = TopicRule.get_or_create_topic(5) - self.article1.add_topic(health_society, session, TopicOriginType.HARDSET) - self.article1.add_topic(sports, session, TopicOriginType.HARDSET) + self.article1.add_topic_if_doesnt_exist( + health_society, session, TopicOriginType.HARDSET + ) + self.article1.add_topic_if_doesnt_exist( + sports, session, TopicOriginType.HARDSET + ) assert len(self.article1.topics) == 2 article_topics = [atm.topic for atm in self.article1.topics] assert sports in article_topics assert health_society in article_topics + def test_topic_replacement(self): + health_society = TopicRule.get_or_create_topic(5) + self.article1.add_topic_if_doesnt_exist( + health_society, session, TopicOriginType.INFERRED + ) + article_topics = [atm.topic for atm in self.article1.topics] + assert len(self.article1.topics) == 1 + assert health_society in article_topics + assert TopicOriginType.INFERRED == self.article1.topics[0].origin_type + + self.article1.add_or_replace_topic( + health_society, session, TopicOriginType.HARDSET + ) + assert len(self.article1.topics) == 1 + assert health_society in article_topics + assert TopicOriginType.HARDSET == self.article1.topics[0].origin_type + def test_find_or_create(self): self.new_art = Article.find_or_create(session, URL_SPIEGEL_VENEZUELA) assert self.new_art.fk_difficulty From ff37fbb7db967e326c869f2a7c5cdabd70828655 Mon Sep 17 00:00:00 2001 From: Tiago Ribeiro Date: Tue, 26 Nov 2024 10:09:12 +0100 Subject: [PATCH 22/22] Update Model Naming - The table naming was still under the New_ terminology and that has been migrated. --- tools/update_es_based_on_url_keyword.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/tools/update_es_based_on_url_keyword.py b/tools/update_es_based_on_url_keyword.py index 3b7bd091..6bea0a8c 100644 --- a/tools/update_es_based_on_url_keyword.py +++ b/tools/update_es_based_on_url_keyword.py @@ -11,7 +11,6 @@ 3. Re-index all the documents to ES. If RE_INDEX_ONLY_ARTICLES_IN_ES, only the ones that were in ES are re-index, otherwise all the documents are indexed. - This script expects the following parameters: - URL_KEYWORD_TO_UPDATE (str): the keyword we seek to update the ES @@ -35,7 +34,7 @@ IS_DELETION = True RECALCULATE_TOPICS = True RE_INDEX_ONLY_ARTICLES_IN_ES = True -ITERATION_STEP = 1000 +ITERATION_STEP = 100 # coding=utf-8 @@ -54,8 +53,8 @@ from zeeguu.core.model import ( ArticleUrlKeywordMap, UrlKeyword, - NewArticleTopicMap, - NewTopic, + ArticleTopicMap, + Topic, ) from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING @@ -120,7 +119,7 @@ def gen_docs(articles: list[Article]): f"Got articles for url_keyword '{URL_KEYWORD_TO_UPDATE}' total: {len(ids_of_articles_containing_keyword)}", ) - # Updating url_keyword new_topic mapping + # Updating url_keyword topic mapping # And the topics that were added based on that keyword. if IS_DELETION: topics_ids_to_delete_mappings = [] @@ -130,17 +129,17 @@ def gen_docs(articles: list[Article]): # so the following code would delete both danish and norwegian topics url_keywords = UrlKeyword.find_all_by_keyword(URL_KEYWORD_TO_UPDATE) for u_key in url_keywords: - if u_key.new_topic_id: - topics.append(NewTopic.find_by_id(u_key.new_topic_id)) - topics_ids_to_delete_mappings.append(u_key.new_topic_id) - u_key.new_topic_id = None + if u_key.topic_id: + topics.append(Topic.find_by_id(u_key.topic_id)) + topics_ids_to_delete_mappings.append(u_key.topic_id) + u_key.topic_id = None print( - f"Deleting new_topics '{ ','.join([t.title for t in topics])} ' for articles which have the keyword: '{URL_KEYWORD_TO_UPDATE}'" + f"Deleting topics '{ ','.join([t.title for t in topics])} ' for articles which have the keyword: '{URL_KEYWORD_TO_UPDATE}'" ) - article_topic_mappings_to_delete = NewArticleTopicMap.query.filter( - NewArticleTopicMap.article_id.in_(list(ids_of_articles_containing_keyword)) - ).filter(NewArticleTopicMap.new_topic_id.in_(topics_ids_to_delete_mappings)) + article_topic_mappings_to_delete = ArticleTopicMap.query.filter( + ArticleTopicMap.article_id.in_(list(ids_of_articles_containing_keyword)) + ).filter(ArticleTopicMap.topic_id.in_(topics_ids_to_delete_mappings)) print( f"Found '{len(article_topic_mappings_to_delete.all())}' topic mappings to delete." )