Skip to content

Commit

Permalink
feat: migrate old user from Tunnistamo to Keycloak
Browse files Browse the repository at this point in the history
Migration happens one user at a time upon login.

Feature can be configured using the following settings.
- `HELUSERS_USER_MIGRATE_ENABLED` enable the feature.
  Defaults to `False`.
- `HELUSERS_USER_MIGRATE_EMAIL_DOMAINS` whitelisted email domains
  for migration. Defaults to `["hel.fi"]`.
- `HELUSERS_USER_MIGRATE_AMRS` which authentication methods are
  used for migration. Defaults to `["helsinkiad"]`.

Migration logic is only run on certain conditions:
- Correct authentication method is used (AMR-claim)
- Email domain is correct
- User with the new UUID doesn't exist yet
- Old user is found by email
- Old user has username generated by helusers.utils.uuid_to_username

Instead of allowing a new user to be created the migration is done by
replacing the old user UUID with the one from the incoming token
payload. Logic which is run later should take care of updating other
user related fields.

Primary key is separate from the user UUID, so the user UUID can be
changed. This migration should therefore retain all the data related
to the user.

Migration logic only supports authentication methods from this package
and Python Social Auth pipeline helusers.defaults.SOCIAL_AUTH_PIPELINE.
This doesn't support migrating users which are using e.g. a different
pipeline for Python Social Auth (e.g. the default pipeline).

Refs: HP-2429
  • Loading branch information
charn committed Jun 25, 2024
1 parent 85ef192 commit 8823e85
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 0 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,34 @@ If you're not allowing users to log in with passwords, you may disable the
username/password form from Django admin login page by setting `HELUSERS_PASSWORD_LOGIN_DISABLED`
to `True`.


#### Migrating old user from Tunnistamo to Keycloak

By default, the migration logic is configured to support migrating users from Tunnistamo
AD authentication to Keycloak AD authentication. The migration should be tested by the
service before enabling it in production. This migration logic most likely shouldn't be
configured for other authentication methods besides AD (i.e. staff/admin) users.

When transitioning from one authentication provider to another, it is possible to
migrate the old user data for the new user with a different UUID. Migration is done by
finding the old user instance and replacing its UUID with the new one from the token
payload. So instead of creating a new user instance, we update the old one. Migration
happens one user at a time upon login.

Feature can be configured using the following settings.
- `HELUSERS_USER_MIGRATE_ENABLED`: Enable the feature. Defaults to `False`.
- `HELUSERS_USER_MIGRATE_EMAIL_DOMAINS`: Whitelisted email domains for migration.
Defaults to `["hel.fi"]`.
- `HELUSERS_USER_MIGRATE_AMRS` which authentication methods are used for migration.
Defaults to `["helsinkiad"]`.

Migration logic is only run on certain conditions:
- Correct authentication method is used (AMR-claim)
- Email domain is correct
- User with the new UUID doesn't exist yet
- Old user is found by email
- Username has been generated by helusers.utils.uuid_to_username for the old user

# Development

Virtual Python environment can be used. For example:
Expand Down
112 changes: 112 additions & 0 deletions helusers/tests/test_migrate_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import uuid

import pytest
from django.contrib.auth import get_user_model

from helusers.user_utils import get_or_create_user, migrate_user
from helusers.utils import uuid_to_username


@pytest.fixture(autouse=True)
def setup_migrate(settings):
settings.HELUSERS_USER_MIGRATE_ENABLED = True
settings.HELUSERS_USER_MIGRATE_AMRS = ["a", "b"]
settings.HELUSERS_USER_MIGRATE_EMAIL_DOMAINS = ["example.com", "example.org"]


@pytest.mark.parametrize(
"migrate_enabled, amr, email, good_username, expect_migration",
[
pytest.param(True, ["a"], "[email protected]", True, True, id="migrate"),
pytest.param(
True, ["a"], "[email protected]", False, False, id="wrong_username"
),
pytest.param(True, ["c"], "[email protected]", True, False, id="wrong_amr"),
pytest.param(True, ["a"], "[email protected]", True, False, id="wrong_domain"),
pytest.param(False, ["a"], "[email protected]", True, False, id="disabled"),
],
)
@pytest.mark.django_db
def test_migrate_user(
settings, migrate_enabled, amr, email, good_username, expect_migration
):
settings.HELUSERS_USER_MIGRATE_ENABLED = migrate_enabled
old_uuid = uuid.uuid4()
new_uuid = uuid.uuid4()
old_username = uuid_to_username(old_uuid) if good_username else str(old_uuid)
user_model = get_user_model()
user = user_model.objects.create(
uuid=old_uuid,
username=old_username,
first_name="A",
last_name="User",
email=email,
)

payload = {
"sub": str(new_uuid),
"amr": amr,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
}

migrate_user(user_id=str(new_uuid), payload=payload)

if expect_migration:
user.refresh_from_db()
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
else:
user.refresh_from_db()
assert user.uuid == old_uuid
assert user.username == old_username


@pytest.mark.parametrize(
"migrate_enabled, amr, email, good_username, expect_migration",
[
pytest.param(True, ["a"], "[email protected]", True, True, id="migrate"),
pytest.param(
True, ["a"], "[email protected]", False, False, id="wrong_username"
),
pytest.param(True, ["c"], "[email protected]", True, False, id="wrong_amr"),
pytest.param(True, ["a"], "[email protected]", True, False, id="wrong_domain"),
pytest.param(False, ["a"], "[email protected]", True, False, id="disabled"),
],
)
@pytest.mark.django_db
def test_get_or_create_user_migrate_user(
settings, migrate_enabled, amr, email, good_username, expect_migration
):
settings.HELUSERS_USER_MIGRATE_ENABLED = migrate_enabled
old_uuid = uuid.uuid4()
new_uuid = uuid.uuid4()
user_model = get_user_model()
old_user = user_model.objects.create(
uuid=old_uuid,
username=uuid_to_username(old_uuid) if good_username else str(old_uuid),
first_name="A",
last_name="User",
email=email,
)

payload = {
"sub": str(new_uuid),
"amr": amr,
"email": old_user.email,
"first_name": old_user.first_name,
"last_name": old_user.last_name,
}

user = get_or_create_user(payload)

if expect_migration:
assert user_model.objects.count() == 1
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
else:
assert user_model.objects.count() == 2
assert user_model.objects.filter(uuid=old_uuid).exists()
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
78 changes: 78 additions & 0 deletions helusers/user_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from django.db import IntegrityError, transaction
from django.utils.translation import gettext as _

from helusers.utils import uuid_to_username

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -111,7 +113,81 @@ def convert_to_uuid(convertable, namespace=None):
return generated_uuid


def migrate_user(user_id: str, payload: dict):
"""Migrate old user from Tunnistamo to Keycloak.
Migration logic is only run on certain conditions:
- Correct authentication method is used (AMR-claim)
- Email domain is correct
- User with the new UUID doesn't exist yet
- Old user is found by email
- Old user has username generated by helusers.utils.uuid_to_username
Instead of allowing a new user to be created the migration is done by
replacing the old user UUID with the one from the incoming token
payload. Logic which is run later should take care of updating other
user related fields.
Primary key is separate from the user UUID, so the user UUID can be
changed. This migration should therefore retain all the data related
to the user.
Migration logic only supports authentication methods from this package
and Python Social Auth pipeline helusers.defaults.SOCIAL_AUTH_PIPELINE.
This doesn't support migrating users which are using e.g. a different
pipeline for Python Social Auth (e.g. the default pipeline).
"""
if not getattr(settings, "HELUSERS_USER_MIGRATE_ENABLED", False):
return

amrs_to_migrate = getattr(settings, "HELUSERS_USER_MIGRATE_AMRS", ["helsinkiad"])
domains_to_migrate = getattr(
settings, "HELUSERS_USER_MIGRATE_EMAIL_DOMAINS", ["hel.fi"]
)

# Don't continue if not using the expected authentication method.
if not (
(amr := payload.get("amr"))
and isinstance(amr, list)
and any(value in amrs_to_migrate for value in amr)
):
logger.debug(f"User {user_id} has wrong amr: {amr}, not trying to migrate.")
return

uid = UUID(user_id)
email = payload.get("email")

if not email or not any(
[email.endswith(f"@{domain}") for domain in domains_to_migrate]
):
logger.debug(
f"User {user_id} email: {email} doesn't have a whitelisted domain, not trying to migrate."
)
return

user_model = get_user_model()

if user_model.objects.filter(uuid=uid).exists():
logger.debug(f"User {user_id} exist, not trying to migrate.")
return

users = user_model.objects.filter(email=email, username__startswith="u-")

if users.count() > 1:
logger.warning(
f"Multiple users found for email {email} when migrating user {user_id}."
)

if user := users.first():
logger.info(f"Migrating user {user.uuid} to {user_id}.")

user.uuid = uid
user.username = uuid_to_username(uid)
user.save()


def get_or_create_user(payload, oidc=False):
"""Get, create or update a user based on the payload."""
user_id = payload.get("sub")
if not user_id:
msg = _("Invalid payload. sub missing")
Expand All @@ -126,6 +202,8 @@ def get_or_create_user(payload, oidc=False):
namespace = payload.get("tid")
user_id = convert_to_uuid(user_id, namespace)

migrate_user(user_id, payload)

try_again = False
try:
user = _try_create_or_update(user_id, payload, oidc)
Expand Down

0 comments on commit 8823e85

Please sign in to comment.