From 9c99d66a9244f552c0bdec8f56fe97b97453e616 Mon Sep 17 00:00:00 2001 From: Lars Holm Nielsen Date: Wed, 7 Feb 2024 18:24:53 +0100 Subject: [PATCH] tasks: add task to update domain statistics * Adds a task to efficiently updates the domain statistics on a regular basis. --- invenio_accounts/datastore.py | 13 +++--- invenio_accounts/tasks.py | 75 ++++++++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/invenio_accounts/datastore.py b/invenio_accounts/datastore.py index 1fa6c050..5bad177d 100644 --- a/invenio_accounts/datastore.py +++ b/invenio_accounts/datastore.py @@ -10,7 +10,8 @@ from datetime import datetime -from flask_security import SQLAlchemyUserDatastore +from flask import current_app +from flask_security import SQLAlchemyUserDatastore, user_confirmed from .models import Domain, Role, User from .proxies import current_db_change_history @@ -45,11 +46,11 @@ def block_user(self, user): def activate_user(self, user): """Activate a unconfirmed/deactivated/blocked user.""" res = super().activate_user(user) - if res: - user.blocked_at = None - if user.confirmed_at is None: - user.confirmed_at = datetime.utcnow() - return True + user.blocked_at = None + if user.confirmed_at is None: + user.confirmed_at = datetime.utcnow() + user_confirmed.send(current_app._get_current_object(), user=user) + return res def deactivate_user(self, user): """Deactivate a user. diff --git a/invenio_accounts/tasks.py b/invenio_accounts/tasks.py index efaaa9a3..9fd92b54 100644 --- a/invenio_accounts/tasks.py +++ b/invenio_accounts/tasks.py @@ -14,8 +14,9 @@ from flask import current_app from flask_mail import Message from invenio_db import db +from sqlalchemy import func, or_ -from .models import LoginInformation, SessionActivity +from .models import Domain, LoginInformation, SessionActivity, User from .sessions import delete_session @@ -73,3 +74,75 @@ def delete_ips(): LoginInformation.current_login_at < expiration_date, ).update({LoginInformation.current_login_ip: None}) db.session.commit() + + +@shared_task +def update_domain_status(): + """Update domain statistics.""" + # This subquery calculate the number of users per domain from the users + # table. + subquery = ( + db.session.query( + User.domain, + func.count(User.id).label("num_users"), + func.count(User.active).filter(User.active == True).label("num_active"), + func.count(User.active).filter(User.active == False).label("num_inactive"), + func.count(User.confirmed_at).label("num_confirmed"), + func.count(User.verified_at).label("num_verified"), + func.count(User.blocked_at).label("num_blocked"), + ) + .group_by(User.domain) + .subquery("n") + ) + + # Using above subquery, we find the domains that has changed. + stmt = ( + db.session.query( + Domain.domain, + subquery.c.num_users, + subquery.c.num_active, + subquery.c.num_inactive, + subquery.c.num_confirmed, + subquery.c.num_verified, + subquery.c.num_blocked, + ) + .join(subquery, Domain.domain == subquery.c.domain) + .filter( + or_( + Domain.num_users != subquery.c.num_users, + Domain.num_active != subquery.c.num_active, + Domain.num_inactive != subquery.c.num_inactive, + Domain.num_confirmed != subquery.c.num_confirmed, + Domain.num_verified != subquery.c.num_verified, + Domain.num_blocked != subquery.c.num_blocked, + ) + ) + ) + + # If statistics are updated regularly, the number of updates is relatively + # low and hence fit in memory. We read all data first, to avoid starting + # to modify the same table we're reading from. + domain_updates = [] + for row in stmt.all(): + domain_updates.append(row) + + # Update the database + i = 0 + for row in domain_updates: + domain, users, active, inactive, confirmed, verified, blocked = row + db.session.query(Domain).filter(Domain.domain == domain).update( + { + "num_users": users, + "num_active": active, + "num_inactive": inactive, + "num_confirmed": confirmed, + "num_verified": verified, + "num_blocked": blocked, + } + ) + i += 1 + # Commit batches of 500 updates + if i % 500 == 0: + db.session.commit() + if i % 500 != 0: + db.session.commit()