-
Notifications
You must be signed in to change notification settings - Fork 52
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
273 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,273 @@ | ||
import secrets | ||
import string | ||
from contextlib import contextmanager | ||
from io import BytesIO | ||
from tempfile import NamedTemporaryFile | ||
|
||
import numpy | ||
import SimpleITK | ||
from allauth.account.models import EmailAddress | ||
from django.conf import settings | ||
from django.contrib.auth import get_user_model | ||
from django.core.files.base import ContentFile | ||
from django.core.files.uploadedfile import InMemoryUploadedFile | ||
from django.db import transaction | ||
from faker import Faker | ||
from PIL import Image as PILImage | ||
from PIL import ImageDraw, ImageFont | ||
|
||
from grandchallenge.archives.models import Archive, ArchiveItem | ||
from grandchallenge.cases.models import Image, ImageFile | ||
from grandchallenge.challenges.models import Challenge | ||
from grandchallenge.components.models import ( | ||
ComponentInterface, | ||
ComponentInterfaceValue, | ||
) | ||
from grandchallenge.evaluation.models import Phase | ||
from grandchallenge.evaluation.utils import SubmissionKindChoices | ||
from grandchallenge.invoices.models import Invoice | ||
from grandchallenge.verifications.models import Verification | ||
from grandchallenge.workstations.models import Workstation | ||
|
||
|
||
@transaction.atomic | ||
def run(): | ||
create_challenges() | ||
create_archives() | ||
|
||
|
||
def create_challenges(): | ||
challenge_count = Challenge.objects.count() | ||
|
||
for ii in range(2): | ||
create_challenge(challenge_num=challenge_count + 1 + ii) | ||
|
||
|
||
def create_archives(): | ||
archive_count = Archive.objects.count() | ||
|
||
for ii in range(2): | ||
create_archive(archive_num=archive_count + 1 + ii) | ||
|
||
|
||
def create_challenge(*, challenge_num): | ||
admin = _create_user(f"challenge-{challenge_num}", "admin") | ||
participant = _create_user(f"challenge-{challenge_num}", "participant") | ||
|
||
archive = _create_phase_archive( | ||
creator=admin, | ||
interfaces=_get_inputs(), | ||
title=f"Challenge {challenge_num}, Phase 1 Test Set", | ||
) | ||
|
||
c = Challenge.objects.create( | ||
short_name=f"challenge-{challenge_num}", | ||
creator=admin, | ||
hidden=False, | ||
logo=create_image_with_text(text=f"Challenge {challenge_num}, Logo"), | ||
) | ||
c.add_participant(participant) | ||
|
||
Invoice.objects.create( | ||
challenge=c, | ||
support_costs_euros=0, | ||
compute_costs_euros=10, | ||
storage_costs_euros=0, | ||
payment_status=Invoice.PaymentStatusChoices.PAID, | ||
) | ||
|
||
p = Phase.objects.create( | ||
challenge=c, | ||
title=f"Challenge {challenge_num} Phase 1", | ||
algorithm_time_limit=300, | ||
) | ||
|
||
p.algorithm_inputs.set(_get_inputs()) | ||
p.algorithm_outputs.set(_get_outputs()) | ||
|
||
p.submission_kind = SubmissionKindChoices.ALGORITHM | ||
p.archive = archive | ||
p.score_jsonpath = "score" | ||
p.submissions_limit_per_user_per_period = 10 | ||
p.save() | ||
|
||
|
||
def _get_inputs(): | ||
return ComponentInterface.objects.filter( | ||
slug__in=["generic-medical-image"] | ||
) | ||
|
||
|
||
def _get_outputs(): | ||
return ComponentInterface.objects.filter( | ||
slug__in=["generic-medical-image", "results-json-file"] | ||
) | ||
|
||
|
||
def create_archive(*, archive_num): | ||
editor = _create_user(f"archive-{archive_num}", "editor") | ||
uploader = _create_user(f"archive-{archive_num}", "uploader") | ||
user = _create_user(f"archive-{archive_num}", "user") | ||
|
||
title = f"Archive {archive_num}" | ||
|
||
archive = Archive.objects.create( | ||
title=title, | ||
logo=create_image_with_text(text=f"{title}, Logo"), | ||
workstation=Workstation.objects.get( | ||
slug=settings.DEFAULT_WORKSTATION_SLUG | ||
), | ||
) | ||
|
||
archive.add_editor(editor) | ||
archive.add_uploader(uploader) | ||
archive.add_user(user) | ||
|
||
add_archive_item(archive=archive, interfaces=_get_inputs()) | ||
|
||
|
||
def _create_phase_archive(*, creator, interfaces, title, items=5): | ||
archive = Archive.objects.create( | ||
title=title, | ||
logo=create_image_with_text(text=f"{title}, Logo"), | ||
workstation=Workstation.objects.get( | ||
slug=settings.DEFAULT_WORKSTATION_SLUG | ||
), | ||
) | ||
archive.add_editor(creator) | ||
|
||
for _ in range(items): | ||
add_archive_item(archive=archive, interfaces=interfaces) | ||
|
||
return archive | ||
|
||
|
||
def add_archive_item(*, archive, interfaces): | ||
archive_item_num = ArchiveItem.objects.filter(archive=archive).count() + 1 | ||
ai = ArchiveItem.objects.create(archive=archive) | ||
|
||
for interface in interfaces: | ||
v = ComponentInterfaceValue.objects.create(interface=interface) | ||
|
||
name = f"{archive.title}, Image {archive_item_num}, {interface.title}" | ||
|
||
im = Image.objects.create(name=name, width=10, height=10) | ||
im_file = ImageFile.objects.create(image=im) | ||
|
||
with _uploaded_image_file(text=name) as f: | ||
im_file.file.save(f"{im_file.pk}.mha", f) | ||
im_file.save() | ||
|
||
v.image = im | ||
v.save() | ||
|
||
ai.values.add(v) | ||
|
||
|
||
@contextmanager | ||
def _uploaded_image_file(*, text): | ||
jpeg_bytes = create_image_with_text(text=text) | ||
pil_image = PILImage.open(jpeg_bytes).convert("L") | ||
numpy_image = numpy.array(pil_image) | ||
sitk_image = SimpleITK.GetImageFromArray(numpy_image) | ||
|
||
with NamedTemporaryFile(suffix=".mha") as f: | ||
writer = SimpleITK.ImageFileWriter() | ||
writer.SetFileName(f.name) | ||
writer.SetImageIO("MetaImageIO") | ||
writer.Execute(sitk_image) | ||
|
||
f.seek(0) | ||
|
||
with ContentFile(f.read()) as content: | ||
yield content | ||
|
||
|
||
def create_image_with_text( | ||
*, text, image_size=(1024, 1024), font_size=50, font_path="Geneva.ttf" | ||
): | ||
# Create a new image with a black background | ||
image = PILImage.new("RGB", image_size, color="black") | ||
|
||
# Initialize the drawing context | ||
draw = ImageDraw.Draw(image) | ||
|
||
# Load the specified font | ||
try: | ||
font = ImageFont.truetype(font_path, font_size) | ||
except OSError: | ||
print(f"Font file {font_path} not found. Using default font.") | ||
font = ImageFont.load_default() | ||
|
||
# Calculate the bounding box of the text to be added | ||
text = text.replace(", ", "\n") | ||
text_bbox = draw.textbbox((0, 0), text, font=font) | ||
text_width = text_bbox[2] - text_bbox[0] | ||
text_height = text_bbox[3] - text_bbox[1] | ||
|
||
# Calculate X, Y position of the text to be centered | ||
x = (image_size[0] - text_width) / 2 | ||
y = (image_size[1] - text_height) / 2 | ||
|
||
# Add text to image | ||
draw.text((x, y), text, font=font, fill=(255, 255, 255)) | ||
|
||
# Save the image | ||
io = BytesIO() | ||
image.save(io, format="JPEG") | ||
image_file = InMemoryUploadedFile( | ||
io, None, f"{text}.jpg", "jpeg", image.size, None | ||
) | ||
image_file.seek(0) | ||
|
||
return image_file | ||
|
||
|
||
def generate_secure_password(length=32): | ||
# Define the character set: letters, digits, and punctuation | ||
characters = string.ascii_letters + string.digits + string.punctuation | ||
|
||
# Generate a secure random password | ||
password = "".join(secrets.choice(characters) for _ in range(length)) | ||
|
||
return password | ||
|
||
|
||
def _create_user(first_name, last_name): | ||
fake = Faker() | ||
|
||
user = get_user_model().objects.create( | ||
username=f"{first_name}-{last_name}", | ||
email=f"{first_name}-{last_name}@example.com", | ||
is_active=True, | ||
first_name=first_name, | ||
last_name=last_name, | ||
) | ||
|
||
users_password = generate_secure_password() | ||
|
||
print(f'"{user.username}": "{users_password}",') # noqa: B907 | ||
|
||
user.set_password(users_password) | ||
user.save() | ||
|
||
EmailAddress.objects.create( | ||
user=user, | ||
email=user.email, | ||
verified=True, | ||
primary=True, | ||
) | ||
|
||
Verification.objects.create( | ||
user=user, | ||
email=user.email, | ||
is_verified=True, | ||
) | ||
|
||
user.user_profile.institution = fake.company() | ||
user.user_profile.department = f"Department of {fake.job().title()}s" | ||
user.user_profile.country = fake.country_code() | ||
user.user_profile.receive_newsletter = True | ||
user.user_profile.save() | ||
|
||
return user |