Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add create system user #79

Merged
merged 8 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion src/jupyterhub_config/custom_spawner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import fcntl
import os
import subprocess
import tempfile

from jupyterhub.spawner import SimpleLocalProcessSpawner


Expand All @@ -14,4 +19,52 @@ def start(self):
variables, and sets the notebook directory before starting the server.
"""

return super().start()
username = self.user.name

# Ensure the system user exists
self._ensure_system_user(username, group='jupyterhub')

return super().start()

def _ensure_system_user(self, username: str, group: str = None):
"""
Create a system user with the given username if it does not already exist.
Ensure the group exists before creating the user.
Use a file lock to prevent race conditions.
"""

lock_file = os.path.join(tempfile.gettempdir(), f'user_creation_{username}.lock')

with open(lock_file, 'w') as lock:
fcntl.flock(lock, fcntl.LOCK_EX)
try:
# Check if user already exists
result = subprocess.run(['id', username], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
self.log.info(f'User {username} already exists')
return

# Create the user
self.log.info(f'Creating system user: {username}')
useradd_cmd = ['sudo', 'useradd', '-r']

if group:
# Check if the group exists, create if necessary
group_check = subprocess.run(['getent', 'group', group], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if group_check.returncode != 0:
self.log.info(f'Group {group} does not exist, creating it.')
subprocess.run(['sudo', 'groupadd', group], check=True)
else:
self.log.info(f'Group {group} already exists')

useradd_cmd.extend(['-g', group])

useradd_cmd.append(username)

subprocess.run(useradd_cmd, check=True)

except subprocess.CalledProcessError as e:
raise ValueError(f'Failed to create system user: {e}')

finally:
fcntl.flock(lock, fcntl.LOCK_UN)
102 changes: 101 additions & 1 deletion test/src/jupyterhub_config/custom_spawner_test.py
Original file line number Diff line number Diff line change
@@ -1 +1,101 @@
from jupyterhub_config.custom_spawner import *
import logging
import subprocess
import unittest
from subprocess import CalledProcessError
from unittest.mock import patch, MagicMock

import pytest

from jupyterhub_config.custom_spawner import VirtualEnvSpawner


# Test when the user already exists
@patch('subprocess.run')
def test_ensure_system_user_already_exists(mock_run, caplog):
with caplog.at_level(logging.INFO):
# Mock 'id' command to simulate user already exists
mock_run.return_value.returncode = 0

spawner = VirtualEnvSpawner()
username = 'testuser'
spawner._ensure_system_user(username)

mock_run.assert_called_once_with(['id', username], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

assert f'User {username} already exists' in caplog.text


# Test when the group and user need to be created
@patch('subprocess.run')
def test_ensure_system_user_create_group_and_user(mock_run, caplog):
with caplog.at_level(logging.INFO):
# Define side_effect to simulate user does not exist, group does not exist, and then successful creation
mock_run.side_effect = [
MagicMock(returncode=1), # 'id' command: User does not exist
MagicMock(returncode=2), # 'getent' command: Group does not exist
MagicMock(returncode=0), # 'groupadd' command: Group created successfully
MagicMock(returncode=0) # 'useradd' command: User created successfully
]

spawner = VirtualEnvSpawner()
username = 'testuser'
group = 'testgroup'
spawner._ensure_system_user(username, group)

expected_calls = [
unittest.mock.call(['id', username], stdout=subprocess.PIPE, stderr=subprocess.PIPE),
unittest.mock.call(['getent', 'group', group], stdout=subprocess.PIPE, stderr=subprocess.PIPE),
unittest.mock.call(['sudo', 'groupadd', group], check=True),
unittest.mock.call(['sudo', 'useradd', '-r', '-g', group, username], check=True)
]

# Check that the expected calls were made in order
mock_run.assert_has_calls(expected_calls, any_order=False)

assert f'Creating system user: {username}' in caplog.text
assert f'Group {group} does not exist, creating it.' in caplog.text


# Test when the user is created without a group
@patch('subprocess.run')
def test_ensure_system_user_create_user_without_group(mock_run, caplog):
with caplog.at_level(logging.INFO):
# Mock the 'id' command to simulate user does not exist
mock_run.side_effect = [
MagicMock(returncode=1), # User does not exist
MagicMock(returncode=0) # User created successfully
]

spawner = VirtualEnvSpawner()
username = 'testuser'
spawner._ensure_system_user(username)

assert f'Creating system user: {username}' in caplog.text
expected_calls = [
unittest.mock.call(['id', username], stdout=subprocess.PIPE, stderr=subprocess.PIPE),
unittest.mock.call(['sudo', 'useradd', '-r', username], check=True)
]

mock_run.assert_has_calls(expected_calls, any_order=False)


# Test subprocess.CalledProcessError is handled correctly
@patch('subprocess.run')
def test_ensure_system_user_error(mock_run):
# Mock the 'id' command to simulate user does not exist
# Mock 'useradd' command to raise CalledProcessError
mock_run.side_effect = [
MagicMock(returncode=1),
CalledProcessError(1, 'useradd')
]

spawner = VirtualEnvSpawner()
with pytest.raises(ValueError, match="Failed to create system user"):
spawner._ensure_system_user('testuser')

expected_calls = [
unittest.mock.call(['id', 'testuser'], stdout=subprocess.PIPE, stderr=subprocess.PIPE),
unittest.mock.call(['sudo', 'useradd', '-r', 'testuser'], check=True)
]

mock_run.assert_has_calls(expected_calls, any_order=False)
Loading