Skip to content

Commit

Permalink
tasks: add task to update domain statistics
Browse files Browse the repository at this point in the history
* Adds a task to efficiently updates the domain statistics on a
  regular basis.
  • Loading branch information
lnielsen committed Feb 19, 2024
1 parent 43afda0 commit 9c99d66
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 7 deletions.
13 changes: 7 additions & 6 deletions invenio_accounts/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

from datetime import datetime

from flask_security import SQLAlchemyUserDatastore
from flask import current_app
from flask_security import SQLAlchemyUserDatastore, user_confirmed

from .models import Domain, Role, User
from .proxies import current_db_change_history
Expand Down Expand Up @@ -45,11 +46,11 @@ def block_user(self, user):
def activate_user(self, user):
"""Activate a unconfirmed/deactivated/blocked user."""
res = super().activate_user(user)
if res:
user.blocked_at = None
if user.confirmed_at is None:
user.confirmed_at = datetime.utcnow()
return True
user.blocked_at = None
if user.confirmed_at is None:
user.confirmed_at = datetime.utcnow()
user_confirmed.send(current_app._get_current_object(), user=user)
return res

def deactivate_user(self, user):
"""Deactivate a user.
Expand Down
75 changes: 74 additions & 1 deletion invenio_accounts/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
from flask import current_app
from flask_mail import Message
from invenio_db import db
from sqlalchemy import func, or_

from .models import LoginInformation, SessionActivity
from .models import Domain, LoginInformation, SessionActivity, User
from .sessions import delete_session


Expand Down Expand Up @@ -73,3 +74,75 @@ def delete_ips():
LoginInformation.current_login_at < expiration_date,
).update({LoginInformation.current_login_ip: None})
db.session.commit()


@shared_task
def update_domain_status():
"""Update domain statistics."""
# This subquery calculate the number of users per domain from the users
# table.
subquery = (
db.session.query(
User.domain,
func.count(User.id).label("num_users"),
func.count(User.active).filter(User.active == True).label("num_active"),
func.count(User.active).filter(User.active == False).label("num_inactive"),
func.count(User.confirmed_at).label("num_confirmed"),
func.count(User.verified_at).label("num_verified"),
func.count(User.blocked_at).label("num_blocked"),
)
.group_by(User.domain)
.subquery("n")
)

# Using above subquery, we find the domains that has changed.
stmt = (
db.session.query(
Domain.domain,
subquery.c.num_users,
subquery.c.num_active,
subquery.c.num_inactive,
subquery.c.num_confirmed,
subquery.c.num_verified,
subquery.c.num_blocked,
)
.join(subquery, Domain.domain == subquery.c.domain)
.filter(
or_(
Domain.num_users != subquery.c.num_users,
Domain.num_active != subquery.c.num_active,
Domain.num_inactive != subquery.c.num_inactive,
Domain.num_confirmed != subquery.c.num_confirmed,
Domain.num_verified != subquery.c.num_verified,
Domain.num_blocked != subquery.c.num_blocked,
)
)
)

# If statistics are updated regularly, the number of updates is relatively
# low and hence fit in memory. We read all data first, to avoid starting
# to modify the same table we're reading from.
domain_updates = []
for row in stmt.all():
domain_updates.append(row)

# Update the database
i = 0
for row in domain_updates:
domain, users, active, inactive, confirmed, verified, blocked = row
db.session.query(Domain).filter(Domain.domain == domain).update(
{
"num_users": users,
"num_active": active,
"num_inactive": inactive,
"num_confirmed": confirmed,
"num_verified": verified,
"num_blocked": blocked,
}
)
i += 1
# Commit batches of 500 updates
if i % 500 == 0:
db.session.commit()
if i % 500 != 0:
db.session.commit()

0 comments on commit 9c99d66

Please sign in to comment.