Skip to content

Commit

Permalink
Merge pull request #283 from zeeguu/281-vejret-shouldnt-be-classify-a…
Browse files Browse the repository at this point in the history
…s-technology-science

281 vejret shouldnt be classify as technology science
  • Loading branch information
mircealungu authored Nov 26, 2024
2 parents 01c505f + ff37fbb commit 6ea514e
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 224 deletions.
183 changes: 137 additions & 46 deletions tools/mysql_to_elastic.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,159 @@
# coding=utf-8
import sqlalchemy as database
from zeeguu.api.app import create_app
from zeeguu.core.elastic.indexing import create_or_update, document_from_article
from sqlalchemy import func
from zeeguu.core.elastic.indexing import (
create_or_update_doc_for_bulk,
)
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, scan
import zeeguu.core
from sqlalchemy.orm import sessionmaker
from zeeguu.core.model import Article
import sys
from datetime import datetime
from sqlalchemy.orm.exc import NoResultFound

from zeeguu.api.app import create_app
from zeeguu.core.model import ArticleTopicMap
from zeeguu.core.elastic.settings import ES_ZINDEX, ES_CONN_STRING
from zeeguu.core.model.article_topic_map import TopicOriginType
import numpy as np
from tqdm import tqdm
import time

app = create_app()
app.app_context().push()
# First we should only index with topics so we can do
# inference based on the articles that have topics.

es = Elasticsearch([ES_CONN_STRING])
DB_URI = app.config["SQLALCHEMY_DATABASE_URI"]
engine = database.create_engine(DB_URI)
Session = sessionmaker(bind=engine)
session = Session()
# These parameters can be changed based on need.
# DELETE_INDEX - used to re-index from scratch. Default: False
# INDEX_WITH_TOPIC_ONLY - determines which articles are indexed. If set to True,
# only the articles which have a topic assigned to them are index. If false, then
# only the articles without the topic will be added. Default: True
# TOTAL_ITEMS - total items to be indexed, the IDs are sampled and this is used to
# have a variety of different articles in ES. Default: 5000
# NOTE: If you want to index all the articles, you shoud pass a number that's higher
# or equal to the number of articles in the DB
# ITERATION_STEP - number of articles to index before reporting. Default: 1000
DELETE_INDEX = False
INDEX_WITH_TOPIC_ONLY = True
TOTAL_ITEMS = 1000
ITERATION_STEP = 100

print(ES_CONN_STRING)
es = Elasticsearch(ES_CONN_STRING)
db_session = zeeguu.core.model.db.session
print(es.info())

def main(starting_index):

max_id = session.query(func.max(Article.id)).first()[0]
min_id = session.query(func.min(Article.id)).first()[0]
def main():
if DELETE_INDEX:
try:
es.options(ignore_status=[400, 404]).indices.delete(index=ES_ZINDEX)
print("Deleted index 'zeeguu'!")
except Exception as e:
print(f"Failed to delete: {e}")

if max_id is None:
max_id = 0
if min_id is None:
min_id = 0

print(f"starting import at: {starting_index}")
print(f"max id in db: {max_id}")
def fetch_articles_by_id(id_list):
for i in id_list:
try:
if es.exists(index=ES_ZINDEX, id=i):
print(f"Skipped for: '{i}', article already in ES.")
continue
article = Article.find_by_id(i)
if not article:
print(f"Skipped for: '{i}', article not in DB.")
continue
yield article
except NoResultFound:
print(f"fail for: '{i}'")
except Exception as e:
print(f"fail for: '{i}', {e}")

for i in range(max_id, min_id, -1):
print("article id...")
print(i)
try:
article = Article.find_by_id(i)
if article:
print(article.title)
print(article.id)
res = create_or_update(article, session)
print(res)
except NoResultFound:
print(f"fail for: {i}")
except:
print("fail for " + str(i))
# import traceback
# traceback.print_exc()
def gen_docs(articles_w_topics):
for article in articles_w_topics:
try:
yield create_or_update_doc_for_bulk(article, db_session)
except Exception as e:
print(f"fail for: '{article.id}', {e}")

# Sample Articles that have topics assigned and are not inferred
if INDEX_WITH_TOPIC_ONLY:
target_ids = np.array(
[
a_id[0]
for a_id in db_session.query(Article.id)
.join(ArticleTopicMap)
.filter(
ArticleTopicMap.origin_type != TopicOriginType.INFERRED
) # Do not index Inferred topics
.filter(Article.broken != 1) # Filter out documents that are broken
# .filter(Article.language_id == 2) If only one language
.distinct()
]
)
print("Got articles with topics, total: ", len(target_ids))
else:
articles_with_topic = set(
[
art_id_w_topic[0]
for art_id_w_topic in db_session.query(
ArticleTopicMap.article_id
).distinct()
]
)
target_ids = np.array(
list(
set([a_id[0] for a_id in db_session.query(Article.id)])
- articles_with_topic
)
)
print("Got articles without topics, total: ", len(target_ids))

if __name__ == "__main__":
total_articles_in_es = 0
if len(target_ids) == 0:
print("No articles found! Exiting...")
return
if es.indices.exists(index=ES_ZINDEX):
es_query = {"query": {"match_all": {}}}
ids_in_es = set(
[int(hit["_id"]) for hit in scan(es, index=ES_ZINDEX, query=es_query)]
)
total_articles_in_es = len(ids_in_es)
target_ids_not_in_es = list(filter(lambda x: x not in ids_in_es, target_ids))
else:
# The index was deleted / doesn't exist:
target_ids_not_in_es = target_ids

print("waiting for the ES process to boot up")
print(f"""Total articles in ES: {total_articles_in_es}""")
print(f"""Total articles missing: {len(target_ids_not_in_es)}""")
print(f"""Indexing a total of: {TOTAL_ITEMS}, in batches of: {ITERATION_STEP}""")

print(f"started at: {datetime.now()}")
starting_index = 0
# I noticed that if a document is not added then it won't let me query the ES search.
total_added = 0
errors_encountered = []
final_count_of_articles = min(TOTAL_ITEMS, len(target_ids_not_in_es))
sampled_ids = np.random.choice(
target_ids_not_in_es, final_count_of_articles, replace=False
)
for i_start in tqdm(range(0, final_count_of_articles, ITERATION_STEP)):
sub_sample = sampled_ids[i_start : i_start + ITERATION_STEP]
res_bulk, error_bulk = bulk(
es, gen_docs(fetch_articles_by_id(sub_sample)), raise_on_error=False
)
total_added += res_bulk
errors_encountered += error_bulk
total_bulk_errors = len(error_bulk)
if total_bulk_errors > 0:
print(f"## Warning, {total_bulk_errors} failed to index. With errors: ")
print(error_bulk)
print(f"Batch finished. ADDED:{res_bulk} | ERRORS: {total_bulk_errors}")
print(errors_encountered)
print(f"Total articles added: {total_added}")

if len(sys.argv) > 1:
starting_index = int(sys.argv[1])

main(starting_index)
print(f"ended at: {datetime.now()}")
if __name__ == "__main__":

print("waiting for the ES process to boot up")
start = datetime.now()
print(f"started at: {start}")
main()
end = datetime.now()
print(f"ended at: {end}")
print(f"Process took: {end-start}")
159 changes: 0 additions & 159 deletions tools/mysql_to_elastic_new_topics.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
continue
topics_added.add(topic.id)
topics.append(topic)
article.set_new_topics(topics, db_session)
article.add_new_topics_from_url_keyword(topics, db_session)
except Exception as e:
counter -= 1
print(f"Failed for article id: {a_id}, with: {e}")
Expand Down
Loading

0 comments on commit 6ea514e

Please sign in to comment.