Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Install allauth-mfa and add data migration #3333

Merged
merged 13 commits into from
May 2, 2024
7 changes: 1 addition & 6 deletions app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,6 @@ def get_private_ip():
"grandchallenge.subdomains.middleware.subdomain_urlconf_middleware",
"grandchallenge.timezones.middleware.TimezoneMiddleware",
"machina.apps.forum_permission.middleware.ForumPermissionMiddleware",
# 2FA middleware, needs to be after subdomain middleware
# TwoFactorMiddleware resets the login flow if another page is loaded
# between login and successfully entering two-factor credentials. We're using
# a modified version of the original allauth_2fa middleware to pass the
# correct urlconf.
"grandchallenge.core.middleware.TwoFactorMiddleware",
# Force 2FA for staff users
"grandchallenge.core.middleware.RequireStaffAndSuperuser2FAMiddleware",
# Flatpage fallback almost last
Expand Down Expand Up @@ -515,6 +509,7 @@ def get_private_ip():
"drf_spectacular",
"allauth",
"allauth.account",
"allauth.mfa",
"allauth.socialaccount",
"grandchallenge.profiles.providers.gmail",
# Notifications with overrides
Expand Down
78 changes: 51 additions & 27 deletions app/grandchallenge/core/middleware.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,57 @@
from allauth_2fa.middleware import BaseRequire2FAMiddleware
from django.urls import Resolver404, get_resolver
from allauth import app_settings
from allauth.account.adapter import get_adapter
from allauth.account.urls import urlpatterns as account_urlpatterns
from allauth.mfa.utils import is_mfa_enabled
from allauth.socialaccount.urls import (
urlpatterns as social_account_urlpatterns,
)
from django.contrib import messages
from django.http import HttpResponseRedirect
from django.utils.deprecation import MiddlewareMixin

from grandchallenge.subdomains.utils import reverse

class RequireStaffAndSuperuser2FAMiddleware(BaseRequire2FAMiddleware):
def require_2fa(self, request):
# Staff users and superusers are required to have 2FA.

class RequireStaffAndSuperuser2FAMiddleware(MiddlewareMixin):
"""Force multi-factor authentication for staff users and superusers."""

allowed_urls = [
amickan marked this conversation as resolved.
Show resolved Hide resolved
*[pattern.name for pattern in account_urlpatterns],
*[pattern.name for pattern in social_account_urlpatterns],
# mfa urls
"mfa_activate_totp",
"mfa_index",
]

def mfa_required(self, request):
return request.user.is_staff or request.user.is_superuser

def redirect_to_mfa_setup(self, request):
adapter = get_adapter(request)
adapter.add_message(
request, messages.ERROR, "allauth/mfa/require_mfa.txt"
)
return HttpResponseRedirect(reverse("mfa_activate_totp"))

def process_view(self, request, view_func, view_args, view_kwargs):
# If MFA is not enabled, do nothing
if not app_settings.MFA_ENABLED:
return None

# If the user is not authenticated, do nothing
if not request.user.is_authenticated:
return None

# If we are on an allowed page, do nothing
if request.resolver_match.url_name in self.allowed_urls:
return None

# If this request does not require MFA, do nothing
if not self.mfa_required(request):
return None

# If the user has MFA enabled already, do nothing
if is_mfa_enabled(request.user):
return None

class TwoFactorMiddleware(MiddlewareMixin):
"""
Reset the login flow if another page is loaded halfway through the login.
(I.e. if the user has logged in with a username/password, but not yet
entered their two-factor credentials.) This makes sure a user does not stay
half logged in by mistake.
"""

def process_request(self, request):
try:
match = get_resolver(request.urlconf).resolve(request.path)
if (
match
and not match.url_name
or not match.url_name.startswith("two-factor-authenticate")
):
try:
del request.session["allauth_2fa_user_id"]
except KeyError:
pass
except Resolver404:
pass
return self.redirect_to_mfa_setup(request)
72 changes: 2 additions & 70 deletions app/grandchallenge/profiles/adapters.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,17 @@
from allauth.account.adapter import DefaultAccountAdapter
from allauth.account.utils import (
get_next_redirect_url,
user_email,
user_username,
)
from allauth.core.exceptions import ImmediateHttpResponse
from allauth.account.utils import user_email, user_username
from allauth.socialaccount.adapter import DefaultSocialAccountAdapter
from allauth.socialaccount.models import SocialLogin
from allauth_2fa.utils import user_has_valid_totp_device
from django import forms
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.exceptions import PermissionDenied
from django.http import HttpResponseRedirect
from django.utils.http import url_has_allowed_host_and_scheme, urlencode
from django.utils.http import url_has_allowed_host_and_scheme

from grandchallenge.challenges.models import Challenge
from grandchallenge.emails.emails import send_standard_email_batch
from grandchallenge.profiles.models import EmailSubscriptionTypes
from grandchallenge.subdomains.utils import reverse


class AccountAdapter(DefaultAccountAdapter):
def has_2fa_enabled(self, user):
"""Returns True if the user has 2FA configured."""
return user_has_valid_totp_device(user)

def is_safe_url(self, url):
challenge_domains = {
Expand Down Expand Up @@ -57,61 +44,6 @@ def clean_email(self, email):

return email

def pre_login(self, request, user, **kwargs):
# this is copied from the a pending PR on django-allauth-2fa repo:
# https://github.com/valohai/django-allauth-2fa/pull/131

response = super().pre_login(request, user, **kwargs)
if response:
return response

# Require two-factor authentication if it has been configured
if self.has_2fa_enabled(user):
self.stash_pending_login(request, user, **kwargs)
redirect_url = reverse("two-factor-authenticate")
query_params = request.GET.copy()
next_url = get_next_redirect_url(request)
if next_url:
query_params["next"] = next_url
if query_params:
redirect_url += "?" + urlencode(query_params)
raise ImmediateHttpResponse(
response=HttpResponseRedirect(redirect_url)
)

def stash_pending_login(self, request, user, **kwargs):
# this is copied from the a pending PR on django-allauth-2fa repo:
# https://github.com/valohai/django-allauth-2fa/pull/131

# Cast to string for the case when this is not a JSON serializable
# object, e.g. a UUID.
request.session["allauth_2fa_user_id"] = str(user.id)
login_kwargs = kwargs.copy()
signal_kwargs = login_kwargs.get("signal_kwargs")
if signal_kwargs:
sociallogin = signal_kwargs.get("sociallogin")
if sociallogin:
signal_kwargs = signal_kwargs.copy()
signal_kwargs["sociallogin"] = sociallogin.serialize()
login_kwargs["signal_kwargs"] = signal_kwargs
request.session["allauth_2fa_login"] = login_kwargs

def unstash_pending_login_kwargs(self, request):
# this is copied from the a pending PR on django-allauth-2fa repo:
# https://github.com/valohai/django-allauth-2fa/pull/131

login_kwargs = request.session.pop("allauth_2fa_login", None)
if login_kwargs is None:
raise PermissionDenied()
signal_kwargs = login_kwargs.get("signal_kwargs")
if signal_kwargs:
sociallogin = signal_kwargs.get("sociallogin")
if sociallogin:
signal_kwargs["sociallogin"] = SocialLogin.deserialize(
sociallogin
)
return login_kwargs

def post_login(self, request, user, **kwargs):
response = super().post_login(request, user, **kwargs)
site = Site.objects.get_current()
Expand Down
3 changes: 2 additions & 1 deletion app/grandchallenge/profiles/admin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from allauth.mfa.utils import is_mfa_enabled
from django.contrib import admin
from django.contrib.auth import get_user_model
from django.contrib.auth.admin import UserAdmin
Expand Down Expand Up @@ -64,7 +65,7 @@ def get_queryset(self, request):

@admin.display(boolean=True, description="User has 2FA enabled")
def has_2fa_enabled(self, obj):
return obj.totp_device_count > 0
return is_mfa_enabled(obj)


User = get_user_model()
Expand Down
50 changes: 50 additions & 0 deletions app/grandchallenge/profiles/migrations/0019_migrate_mfa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import base64

from allauth.mfa.adapter import get_adapter
from allauth.mfa.models import Authenticator
from django.db import migrations
from django_otp.plugins.otp_static.models import StaticDevice
from django_otp.plugins.otp_totp.models import TOTPDevice


def migrate_mfa(apps, schema_editor):
adapter = get_adapter()
authenticators = []
for totp in TOTPDevice.objects.filter(confirmed=True).iterator():
amickan marked this conversation as resolved.
Show resolved Hide resolved
recovery_codes = set()
for sdevice in StaticDevice.objects.filter(
confirmed=True, user_id=totp.user_id
).iterator():
recovery_codes.update(
sdevice.token_set.values_list("token", flat=True)
)
secret = base64.b32encode(bytes.fromhex(totp.key)).decode("ascii")
totp_authenticator = Authenticator(
user_id=totp.user_id,
type=Authenticator.Type.TOTP,
data={"secret": adapter.encrypt(secret)},
)
authenticators.append(totp_authenticator)
authenticators.append(
Authenticator(
user_id=totp.user_id,
type=Authenticator.Type.RECOVERY_CODES,
data={
"migrated_codes": [
adapter.encrypt(c) for c in recovery_codes
],
},
)
)
Authenticator.objects.bulk_create(authenticators)


class Migration(migrations.Migration):

dependencies = [
("profiles", "0018_remove_userprofile_receive_notification_emails"),
]

operations = [
migrations.RunPython(migrate_mfa, elidable=True),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% extends 'base.html' %}
{% load i18n %}

{% block title %}{% blocktrans with user.username as username %}
{{ username }}'s profile
{% endblocktrans %}{% endblock %}
{% block content_title %}
<h2>{{ user.username }} {% if user.get_full_name %}(
{{ user.get_full_name }}
){% endif %}</h2>{% endblock %}

{% block breadcrumbs %}
<ol class="breadcrumb">
<li class="breadcrumb-item"><a href>Users</a></li>
<li class="breadcrumb-item active" aria-current="page">{{ user.username }}</li>
</ol>
{% endblock %}

{% block content %}{% endblock %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{% load i18n %}
{% blocktrans %}You must enable multi-factor authentication for your account before you can proceed.{% endblocktrans %}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ <h2 class="mb-3">{{ profile.user.username }}</h2>
href='{% url 'api-tokens:list' %}'><i
class="fas fa-unlock fa-fw"></i>&nbsp;{% trans "Manage API Tokens" %}</a>
{% if request.user|has_2fa_enabled %}
<a class="list-group-item list-group-item-action" href="{% url 'two-factor-backup-tokens' %}" title="Two-Factor Authentication Settings"><i
class="fas fa-qrcode fa-fw"></i>{% trans "2FA Settings" %}</a>
<a class="list-group-item list-group-item-action" href="{% url 'mfa_index' %}" title="Multi-Factor Authentication Settings"><i
class="fas fa-qrcode fa-fw"></i>{% trans "MFA Settings" %}</a>
{% else %}
<a class="list-group-item list-group-item-action" href="{% url 'two-factor-setup' %}" title="Enable Two-Factor Authentication"><i
class="fas fa-qrcode fa-fw"></i>{% trans "Enable 2FA" %}</a>
<a class="list-group-item list-group-item-action" href="{% url 'mfa_activate_totp' %}" title="Enable Multi-Factor Authentication"><i
class="fas fa-qrcode fa-fw"></i>{% trans "Enable MFA" %}</a>
{% endif %}
</ul>
{% endif %}
Expand Down
4 changes: 2 additions & 2 deletions app/grandchallenge/profiles/templatetags/profiles.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from allauth_2fa.utils import user_has_valid_totp_device
from allauth.mfa.utils import is_mfa_enabled
from django import template
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractUser
Expand Down Expand Up @@ -65,4 +65,4 @@ def user_profile_links_from_usernames(usernames):

@register.filter
def has_2fa_enabled(user):
return user_has_valid_totp_device(user)
return is_mfa_enabled(user)
44 changes: 26 additions & 18 deletions app/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from typing import NamedTuple

import pytest
from allauth.mfa import recovery_codes, totp
from allauth.mfa.models import Authenticator
from django.conf import settings
from django.contrib.auth import authenticate
from django.contrib.auth.models import Group
from django.contrib.sites.models import Site
from django_otp.oath import TOTP
from django_otp.plugins.otp_totp.models import TOTPDevice

from grandchallenge.components.backends import docker_client
from grandchallenge.components.models import ComponentInterface
Expand Down Expand Up @@ -449,37 +449,45 @@ def challenge_reviewer():
return user


AUTH_URL = reverse_lazy("two-factor-authenticate")


def get_token_from_totp_device(totp_model) -> str:
return TOTP(
key=totp_model.bin_key,
step=totp_model.step,
t0=totp_model.t0,
digits=totp_model.digits,
).token()
AUTH_URL = reverse_lazy("mfa_authenticate")


def do_totp_authentication(
client,
totp_device: TOTPDevice,
user,
*,
auth_url: str = AUTH_URL,
auth_url=AUTH_URL,
):
token = get_token_from_totp_device(totp_device)
client.post(auth_url, {"otp_token": token})
device = Authenticator.objects.get(
user=user, type=Authenticator.Type.RECOVERY_CODES
)
client.post(
auth_url,
{"code": device.wrap().get_unused_codes()[0]},
)


@pytest.fixture
def authenticated_staff_user(client):
user = UserFactory(username="john", is_staff=True)
totp_device = user.totpdevice_set.create()
totp.TOTP.activate(user, totp.generate_totp_secret())
recovery_codes.RecoveryCodes.activate(user)
user = authenticate(
username=user.username, password=SUPER_SECURE_TEST_PASSWORD
)
do_totp_authentication(
client=client,
totp_device=totp_device,
user=user,
)
return user


@pytest.fixture
def user_with_totp():
def _make_user_with_totp(is_staff=False):
amickan marked this conversation as resolved.
Show resolved Hide resolved
user = UserFactory(is_staff=is_staff)
totp.TOTP.activate(user, totp.generate_totp_secret())
recovery_codes.RecoveryCodes.activate(user)
return user

return _make_user_with_totp
Loading