From f5a3f0af0f1ad8b53ab125562ac4f87d05dbffb0 Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Thu, 30 Jun 2022 16:33:22 +0200 Subject: [PATCH 01/11] Add dumping of files and some unit tests --- SCAutolib/controller.py | 2 +- SCAutolib/models/CA.py | 30 +++++++---- requirements.txt | 19 +++---- test/conftest.py | 10 +++- test/test_controller.py | 113 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 151 insertions(+), 23 deletions(-) create mode 100644 test/test_controller.py diff --git a/SCAutolib/controller.py b/SCAutolib/controller.py index d74e7689..0f2bfb34 100644 --- a/SCAutolib/controller.py +++ b/SCAutolib/controller.py @@ -133,7 +133,7 @@ def setup_local_ca(self, force: bool = False): ca_dir: Path = self.lib_conf["ca"]["local_ca"]["dir"] cnf = file.OpensslCnf(ca_dir.joinpath("ca.cnf"), "CA", str(ca_dir)) - self.local_ca = CA.LocalCA(root_dir=ca_dir) + self.local_ca = CA.LocalCA(dir=ca_dir, cnf=cnf) if force: logger.warning(f"Removing previous local CA in a directory " diff --git a/SCAutolib/models/CA.py b/SCAutolib/models/CA.py index c76c5f97..1b56b8db 100644 --- a/SCAutolib/models/CA.py +++ b/SCAutolib/models/CA.py @@ -12,7 +12,6 @@ from shutil import rmtree, copy from socket import gethostname -# from SCAutolib.models.user import IPAUser from SCAutolib import TEMPLATES_DIR, logger, run, LIB_DIR from SCAutolib.exceptions import SCAutolibException @@ -177,11 +176,11 @@ def request_cert(self, csr: Path, username: str, if cert_out is not None: if cert_out.is_dir(): cert_out = cert_out.joinpath(f"{username}.pem") - elif cert_out.is_file(): + elif cert_out.is_file() and cert_out.suffixes[-1] != ".pem": cert_out = cert_out.with_suffix(".pem") else: cert_out = self._certs.joinpath(f"{username}.pem") - cmd = ["openssl", "ca", "-config", str(self._ca_cnf), + cmd = ["openssl", "ca", "-config", self._ca_cnf.path, "-batch", "-keyfile", str(self._ca_key), "-in", str(csr), "-notext", "-days", "365", "-extensions", "usr_cert", "-out", str(cert_out)] @@ -195,9 +194,9 @@ def revoke_cert(self, cert: Path): :param cert: path to the certificate :type cert: pathlib.Path """ - cmd = ['openssl', 'ca', '-config', self._ca_cnf, '-revoke', cert] + cmd = ['openssl', 'ca', '-config', self._ca_cnf.path, '-revoke', cert] run(cmd, check=True) - cmd = ['openssl', 'ca', '-config', self._ca_cnf, '-gencrl', + cmd = ['openssl', 'ca', '-config', self._ca_cnf.path, '-gencrl', '-out', self._crl] run(cmd, check=True) logger.info("Certificate is revoked") @@ -268,9 +267,15 @@ def __init__(self, ip_addr: str, hostname: str, domain: str, self._ipa_server_realm = realm if realm is not None else domain.upper() self._ipa_client_hostname = client_hostname self._ipa_server_root_passwd = root_passwd - self.meta_client: ClientMeta = ClientMeta(self._ipa_server_hostname, - verify_ssl=False) - self.meta_client.login("admin", self._ipa_server_admin_passwd) + + try: + self.meta_client: ClientMeta = ClientMeta(self._ipa_server_hostname, + verify_ssl=False) + self.meta_client.login("admin", self._ipa_server_admin_passwd) + logger.info("Connected to IPA via meta client") + except python_freeipa.exceptions.BadRequest: + logger.warning("Can't login to the IPA server. " + "Client might be not configured") def setup(self, force: bool = False): """ @@ -297,6 +302,7 @@ def setup(self, force: bool = False): self._add_to_resolv() self._set_hostname() + logger.info("Installing IPA client") run(["ipa-client-install", "-p", "admin", "--password", self._ipa_server_admin_passwd, "--server", self._ipa_server_hostname, @@ -312,6 +318,10 @@ def setup(self, force: bool = False): run(f'bash {ipa_client_script} /etc/ipa/ca.crt', check=True) logger.debug("Setup of IPA client for smart card is finished") + self.meta_client: ClientMeta = ClientMeta(self._ipa_server_hostname, + verify_ssl=False) + self.meta_client.login("admin", self._ipa_server_admin_passwd) + policy = self.meta_client.pwpolicy_show(a_cn="global_policy")["result"] if ["0"] != policy["krbminpwdlife"]: self.meta_client.pwpolicy_mod(a_cn="global_policy", @@ -373,7 +383,7 @@ def _add_to_hosts(self): with open("/etc/hosts", "r+") as f: cnt = f.read() if entry not in cnt: - f.write(entry) + f.write(f"\n{entry}\n") logger.warning( f"New entry {entry} for IPA server is added to /etc/hosts") logger.info( @@ -507,7 +517,7 @@ def revoke_cert(self, cert_path: Path): logger.info(f"Certificate {cert.serial_number} is revoked") return cert.serial_number - def restore(self): + def cleanup(self): """ Remove IPA client from the system and from the IPA server diff --git a/requirements.txt b/requirements.txt index 5b1b8187..8451706c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,8 @@ -click -coloredlogs -cryptography -paramiko -fabric -pyparsing -python-decouple -python-freeipa -PyYAML -pexpect -invoke +click>=8 +coloredlogs>=15 +paramiko>=2.10 +fabric>=2.7 +invoke>=1.7 +pytest>=7 +schema>=0.7 +python_freeipa>=1.0 diff --git a/test/conftest.py b/test/conftest.py index d29a6646..2665867c 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -14,7 +14,8 @@ def pytest_addoption(parser): """ - Define CLI parameters + Define CLI parameters. Parameters for IPA would be serialised to ipa_config + fixture. """ parser.addoption( "--ipa-ip", action="store", help="IP address of IPA server", @@ -44,6 +45,13 @@ def pytest_generate_tests(metafunc): ipa_admin_passwd = metafunc.config.option.ipa_admin_passwd ipa_root_passwd = metafunc.config.option.ipa_root_passwd + if 'ipa_config' in metafunc.fixturenames \ + and all([ipa_ip, ipa_hostname, ipa_admin_passwd, ipa_root_passwd]): + ipa_config = {"ip": ipa_ip, "hostname": ipa_hostname, + "admin_passwd": ipa_admin_passwd, + "root_passwd": ipa_root_passwd} + metafunc.parametrize("ipa_config", [ipa_config]) + if 'ipa_ip' in metafunc.fixturenames and ipa_ip is not None: metafunc.parametrize("ipa_ip", [ipa_ip], scope="session") if 'ipa_hostname' in metafunc.fixturenames and ipa_hostname is not None: diff --git a/test/test_controller.py b/test/test_controller.py new file mode 100644 index 00000000..ecc6427b --- /dev/null +++ b/test/test_controller.py @@ -0,0 +1,113 @@ +from subprocess import check_output + +import pytest +from shutil import copy + +from SCAutolib.controller import Controller +from SCAutolib.models.CA import IPAServerCA +from conftest import FILES_DIR +import json +from configparser import ConfigParser + + +@pytest.fixture() +def dummy_config(tmp_path): + config_path = f'{tmp_path}/dummy_config_file.json' + copy(f"{FILES_DIR}/dummy_config_file.json", config_path) + with open(f"{FILES_DIR}/dummy_config_file.json", "r") as f: + cnt = f.read() + with open(config_path, "w") as f: + f.write(cnt.replace("{path}", str(tmp_path))) + + return config_path + + +@pytest.fixture() +def wrong_dummy_config(dummy_config): + with open(dummy_config, "r") as f: + conf = json.load(f) + + +@pytest.fixture() +def controller(dummy_config): + return Controller(dummy_config) + + +@pytest.fixture() +def ready_ipa(ipa_config): + domain = ipa_config["hostname"].split(".", 1)[1] + client_name = f'client-{ipa_config["hostname"]}' + # cmd = ["ipa-client-install", "-p", "admin", + # "--password", ipa_config["admin_passwd"], + # "--server", ipa_config["hostname"], + # "--domain", domain, # noqa: E501 user everything after first dot as domain, e.g ipa.test.local -> test.local would be used + # "--realm", domain.upper(), + # "--hostname", client_name, + # "--all-ip-addresses", "--force", "--force-join", "--no-ntp", "-U"] + # check_output(cmd, input="yes", encoding="utf-8") + return IPAServerCA(ip_addr=ipa_config["ip"], + server_hostname=ipa_config["hostname"], + admin_passwd=ipa_config["admin_passwd"], + root_passwd=ipa_config["root_passwd"], + domain=domain, + client_hostname=client_name) + + +def test_parse_config(dummy_config): + """Test that configuration is parsed and validated properly.""" + cnt = Controller(dummy_config) + + assert cnt.conf_path.is_absolute() + assert isinstance(cnt.lib_conf, dict) + + +def test_prepare(controller): + """Test for overall setup including dumps.""" + cnt: Controller = controller + + +def test_setup_system(controller): + cnt: Controller = controller + packages = ["opensc", "httpd", "sssd", "sssd-tools", "gnutls-utils", + "pcsc-lite-ccid", "pcsc-lite", "virt_cacard", "vpcd", + "softhsm"] + + check_output(["dnf", "remove", "softhsm", "-y"], encoding="utf8") + + cnt.setup_system(install_missing=True, gdm=False) + + for p in packages: + out = check_output(["rpm", "-qa", p], encoding="utf-8") + assert p in out + current_sssd = ConfigParser() + with open("/etc/sssd/sssd.conf", "r") as f: + current_sssd.read_file(f) + + msg = "Current SSSD conf content is different from config parser object " \ + "in the controller" + assert set(current_sssd.sections()).issubset(set( + cnt.sssd_conf._default_parser.sections())), msg + + +def test_users_create(controller, tmp_path, ready_ipa): + """Test for adding local and IPA users to the systems and initializing all + required files.""" + cnt: Controller = controller + cnt.ipa_ca = ready_ipa + for u in cnt.lib_conf["users"]: + cnt.setup_user(u) + + for p in [t["card_dir"] for t in cnt.lib_conf["users"]]: + assert p.joinpath("sofhtsm2.conf").exists() + + +def test_cas_create(controller): + cnt: Controller = controller + + +def test_enroll_card(controller): + cnt: Controller = controller + + +def test_cleanup(controller): + cnt: Controller = controller From 7092c92f410ae24a6da0c7e5ad5ba78bc9a30c03 Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Mon, 11 Jul 2022 15:53:36 +0200 Subject: [PATCH 02/11] Add dumping methods for objects used in tests --- SCAutolib/controller.py | 2 +- SCAutolib/models/CA.py | 182 +++++++++++++++++++++------------------ SCAutolib/models/card.py | 54 ++++++++---- SCAutolib/models/user.py | 31 ++++++- SCAutolib/utils.py | 8 +- 5 files changed, 174 insertions(+), 103 deletions(-) diff --git a/SCAutolib/controller.py b/SCAutolib/controller.py index 0f2bfb34..55730abd 100644 --- a/SCAutolib/controller.py +++ b/SCAutolib/controller.py @@ -5,7 +5,7 @@ from typing import Union from SCAutolib import (logger, run, LIB_DIR, LIB_BACKUP, LIB_DUMP, - LIB_DUMP_USERS, LIB_DUMP_CAS, LIB_DUMP_CARDS) + LIB_DUMP_USERS, LIB_DUMP_CAS, LIB_DUMP_CARD) from SCAutolib.exceptions import SCAutolibWrongConfig, SCAutolibException from SCAutolib.models import CA, file, user, card, authselect as auth from SCAutolib.models.file import File diff --git a/SCAutolib/models/CA.py b/SCAutolib/models/CA.py index 1b56b8db..196e2e53 100644 --- a/SCAutolib/models/CA.py +++ b/SCAutolib/models/CA.py @@ -1,22 +1,26 @@ import re +import json import os import paramiko +import python_freeipa from cryptography import x509 from fabric.connection import Connection from hashlib import md5 from invoke import Responder -from pathlib import Path +from pathlib import Path, PosixPath from python_freeipa import exceptions from python_freeipa.client_meta import ClientMeta from shutil import rmtree, copy from socket import gethostname -from SCAutolib import TEMPLATES_DIR, logger, run, LIB_DIR +from SCAutolib import TEMPLATES_DIR, logger, run, LIB_DIR, LIB_DUMP_CAS from SCAutolib.exceptions import SCAutolibException +from SCAutolib.models.file import OpensslCnf -class CA: +class BaseCA: + dump_file: Path = None def request_cert(self, csr, username: str, cert_out: Path): """ @@ -33,7 +37,7 @@ def request_cert(self, csr, username: str, cert_out: Path): """ ... - def setup(self, force: bool = False): + def setup(self): """ Configure the CA @@ -59,68 +63,73 @@ def revoke_cert(self, cert: Path): ... -class LocalCA(CA): +class LocalCA(BaseCA): template = Path(TEMPLATES_DIR, "ca.cnf") + dump_file = LIB_DUMP_CAS.joinpath("local-ca.json") - def __init__(self, root_dir: Path = Path("/etc/SCAutolib/ca")): + def __init__(self, root_dir: Path = None, cnf: OpensslCnf = None): """ Class for local CA. Initialize required attributes, real setup is made by LocalCA.setup() method :param root_dir: Path to root directory of the CA. By default, is in - /etc/SCAutolib/ca + /etc/SCAutolib/ca :type: Path """ - self.root_dir: Path = root_dir - self._conf_dir: Path = Path(root_dir, "conf") - self._newcerts: Path = Path(root_dir, "newcerts") - self._certs: Path = Path(root_dir, "certs") - self._crl: Path = Path(root_dir, "crl", "root.pem") + self.root_dir: Path = Path("/etc/SCAutolib/ca") if root_dir is None else root_dir + assert self.root_dir is not None + self._conf_dir: Path = self.root_dir.joinpath("conf") + self._newcerts: Path = self.root_dir.joinpath("newcerts") + self._certs: Path = self.root_dir.joinpath("certs") + self._crl: Path = self.root_dir.joinpath("crl", "root.pem") self._ca_pki_db: Path = Path("/etc/sssd/pki/sssd_auth_ca_db.pem") - self._ca_cnf: Path = self.root_dir.joinpath("ca.cnf") - self._ca_cert: Path = Path(root_dir, "rootCA.pem") - self._ca_key: Path = Path(root_dir, "rootCA.key") + self._ca_cnf: OpensslCnf = cnf + self._ca_cert: Path = self.root_dir.joinpath("rootCA.pem") + self._ca_key: Path = self.root_dir.joinpath("rootCA.key") - self._serial: Path = Path(root_dir, "serial") - self._index: Path = Path(root_dir, "index.txt") + self._serial: Path = self.root_dir.joinpath("serial") + self._index: Path = self.root_dir.joinpath("index.txt") - def setup(self, force: bool = False): + @property + def cnf(self): + return self._ca_cnf + + @property + def __dict__(self): + """ + Customising default property for better serialisation for storing to + JSON format. + + :return: dictionary with all values. Path objects are typed to string. + :rtype: dict + """ + dict_ = {k: str(v) if type(v) is PosixPath else v + for k, v in super().__dict__.items()} + if self._ca_cnf: + dict_["_ca_cnf"] = str(self._ca_cnf.path) + return dict_ + + def setup(self): """ Creates directory and file structure needed by local CA. If directory already exists and force = True, directory would be recursively deleted and new local CA would be created. Otherwise, configuration would be skipped. - - :param force: overwrite existing configuration with force if True, - otherwise, skip configuration. - :type force: bool """ - if self.root_dir.exists(): - logger.warning(f"Directory {self.root_dir} already exists.") - if not force: - logger.warning("Skipping configuration.") - return - - self.cleanup() self.root_dir.mkdir(parents=True, exist_ok=True) - self._ca_cnf.parent.mkdir(exist_ok=True) self._newcerts.mkdir(exist_ok=True) self._certs.mkdir(exist_ok=True) self._crl.parent.mkdir(exist_ok=True) - # Copy template and edit it with current root dir for CA - copy(self.template, self._ca_cnf) - with self._ca_cnf.open("r+") as f: - f.write(f.read().format(ROOT_DIR=self.root_dir)) with self._serial.open("w") as f: f.write("01") self._index.touch() # Generate self-signed certificate - cmd = ['openssl', 'req', '-batch', '-config', self._ca_cnf, + cmd = ['openssl', 'req', '-batch', '-config', self._ca_cnf.path, '-x509', '-new', '-nodes', '-newkey', 'rsa:2048', '-keyout', self._ca_key, '-sha256', '-set_serial', '0', '-extensions', 'v3_ca', '-out', self._ca_cert] @@ -130,7 +139,7 @@ def setup(self, force: bool = False): f"CA self-signed certificate is generated into {self._ca_cert}") # Configuring CRL - run(['openssl', 'ca', '-config', self._ca_cnf, '-gencrl', + run(['openssl', 'ca', '-config', self._ca_cnf.path, '-gencrl', '-out', self._crl], check=True) with self._ca_cert.open("r") as f_cert: @@ -210,7 +219,7 @@ def cleanup(self): logger.info(f"Local CA from {self.root_dir} is removed") -class IPAServerCA(CA): +class IPAServerCA(BaseCA): """ Class represents IPA server with integrated CA. Through this class communication with IPA server is made primarily using @@ -231,8 +240,9 @@ class IPAServerCA(CA): _ipa_client_hostname: str = None _ipa_server_root_passwd: str = None meta_client: ClientMeta = None + dump_file = LIB_DUMP_CAS.joinpath("ipa-server.json") - def __init__(self, ip_addr: str, hostname: str, domain: str, + def __init__(self, ip_addr: str, server_hostname: str, domain: str, admin_passwd: str, root_passwd: str, client_hostname: str, realm: str = None): """ @@ -242,8 +252,8 @@ def __init__(self, ip_addr: str, hostname: str, domain: str, :param ip_addr: IP address of the IPA server :type ip_addr: str - :param hostname: Hostname of the IPA server - :type hostname: str + :param server_hostname: Hostname of the IPA server + :type server_hostname: str :param domain: Domain name of the IPA server :type domain: str :param admin_passwd: Password for admin user on the IPA server @@ -259,7 +269,7 @@ def __init__(self, ip_addr: str, hostname: str, domain: str, :type realm: str """ self._ipa_server_ip = ip_addr - self._ipa_server_hostname = hostname + self._ipa_server_hostname = server_hostname self._add_to_hosts() # So we can log in to the IPA before setup self._ipa_server_domain = domain @@ -268,34 +278,36 @@ def __init__(self, ip_addr: str, hostname: str, domain: str, self._ipa_client_hostname = client_hostname self._ipa_server_root_passwd = root_passwd - try: - self.meta_client: ClientMeta = ClientMeta(self._ipa_server_hostname, - verify_ssl=False) - self.meta_client.login("admin", self._ipa_server_admin_passwd) - logger.info("Connected to IPA via meta client") - except python_freeipa.exceptions.BadRequest: - logger.warning("Can't login to the IPA server. " - "Client might be not configured") + self._meta_client_login() - def setup(self, force: bool = False): + @property + def is_installed(self): """ - Setup IPA client for IPA server. After IPA client is installed, system - would be configured for smart card login with IPA using script from - IPA server obtained via SSH. + :return: True, if IPA client is installed on the system (ipa command + returns zero return code), otherwise False + :rtype: bool + """ + return False - :param force: if True, previous installation of the IPA client would be - removed - :type force: bool + @property + def __dict__(self): """ + Customising default property for better serialisation for storing to + JSON format. - if self.is_installed: - logger.warning("IPA client is already configured on this system.") - if not force: - logger.info("Set force argument to True if you want to remove " - "previous installation.") - return - self.restore() + :return: dictionary with all values. Path objects are typed to string. + :rtype: dict + """ + dict_: dict = super().__dict__.copy() + dict_.pop("meta_client") + return dict_ + def setup(self): + """ + Setup IPA client for IPA server. After IPA client is installed, system + would be configured for smart card login with IPA using script from + IPA server obtained via SSH. + """ logger.info(f"Start setup of IPA client on the system for " f"{self._ipa_server_hostname} IPA server.") @@ -318,9 +330,7 @@ def setup(self, force: bool = False): run(f'bash {ipa_client_script} /etc/ipa/ca.crt', check=True) logger.debug("Setup of IPA client for smart card is finished") - self.meta_client: ClientMeta = ClientMeta(self._ipa_server_hostname, - verify_ssl=False) - self.meta_client.login("admin", self._ipa_server_admin_passwd) + self._meta_client_login() policy = self.meta_client.pwpolicy_show(a_cn="global_policy")["result"] if ["0"] != policy["krbminpwdlife"]: @@ -335,15 +345,18 @@ def setup(self, force: bool = False): # TODO: add to restore client host name logger.info("IPA client is configured on the system.") - @property - def is_installed(self): + def _meta_client_login(self): """ - :return: True, if IPA client is installed on the system (ipa command - returns zero return code), otherwise False - :rtype: bool + Login to admin user via IPA meta client. """ - out = run(["ipa", "help"], print_=False, check=False) - return out.returncode == 0 + try: + self.meta_client: ClientMeta = ClientMeta(self._ipa_server_hostname, + verify_ssl=False) + self.meta_client.login("admin", self._ipa_server_admin_passwd) + logger.info("Connected to IPA via meta client") + except python_freeipa.exceptions.BadRequest: + logger.warning("Can't login to the IPA server. " + "Client might be not configured") def _set_hostname(self): """ @@ -359,18 +372,19 @@ def _add_to_resolv(self): """ nameserver = f"nameserver {self._ipa_server_ip}" pattern = rf"^nameserver\s+{self._ipa_server_ip}\s*" - with open("/etc/resolv.conf", "w+") as f: + with open("/etc/resolv.conf", "r") as f: cnt = f.read() - logger.debug(f"Original resolv.conf:\n{cnt}") - if re.match(pattern, cnt) is None: - logger.warning(f"Nameserver {self._ipa_server_ip} is not " - "present in /etc/resolve.conf. Adding...") - f.write(nameserver + "\n" + cnt) - logger.info( - "IPA server is added to /etc/resolv.conf " - "as first nameserver") - run("chattr -i /etc/resolv.conf") - logger.info("File /etc/resolv.conf is blocked for editing") + logger.debug(f"Original resolv.conf:\n{cnt}") + if re.match(pattern, cnt) is None: + logger.warning(f"Nameserver {self._ipa_server_ip} is not " + "present in /etc/resolve.conf. Adding...") + cnt = (nameserver + "\n" + cnt) + with open("/etc/resolv.conf", "w") as f: + f.write(cnt) + logger.info( + "IPA server is added to /etc/resolv.conf as first nameserver") + run("chattr -i /etc/resolv.conf") + logger.info("File /etc/resolv.conf is blocked for editing") with open("/etc/resolv.conf", "r") as f: logger.debug(f"New resolv.conf\n{f.read()}") diff --git a/SCAutolib/models/card.py b/SCAutolib/models/card.py index 4e07a9a4..dd8b91e4 100644 --- a/SCAutolib/models/card.py +++ b/SCAutolib/models/card.py @@ -17,9 +17,9 @@ class Card: based on the type of the card. """ uri: str = None - user = None - _pattern: str = None dump_file: Path = None + _user = None + _pattern: str = None def _set_uri(self): """ @@ -99,7 +99,8 @@ def __init__(self, user, insert: bool = False, self._service_location = Path( f"/etc/systemd/system/{self._service_name}.service") self._insert = insert - self._nssdb = self.user.card_dir.joinpath("db") + self.dump_file = LIB_DUMP_CARDS.joinpath( + f"card-{self._user.username}.json") self._softhsm2_conf = softhsm2_conf if softhsm2_conf \ else Path("/home", self.user.username, "softhsm2.conf") @@ -134,6 +135,18 @@ def __exit__(self, exp_type, exp_value, exp_traceback): logger.error(format_exc()) self.remove() + @property + def __dict__(self): + # Need to copy to not referencing the same object what leads to + # changing it on retyping + dict_ = super().__dict__.copy() + for k, v in dict_.items(): + if type(v) in (PosixPath, Path): + dict_[k] = str(v) + dict_["_softhsm2_conf"] = str(self._softhsm2_conf.path) + dict_.pop("_user") + return dict_ + @property def softhsm2_conf(self): return self._softhsm2_conf @@ -143,6 +156,15 @@ def softhsm2_conf(self, conf: Path): assert conf.exists(), "File doesn't exist" self._softhsm2_conf = conf + @property + def user(self): + return self._user + + @user.setter + def user(self, system_user): + self._user = system_user + self._nssdb = self.user.card_dir.joinpath("db") + def insert(self): """ Insert virtual smart card by starting the corresponding service. @@ -169,19 +191,19 @@ def enroll(self): NSS database) with pkcs11-tool. """ cmd = ["pkcs11-tool", "--module", "libsofthsm2.so", "--slot-index", - '0', "-w", str(self._private_key), "-y", "privkey", "--label", - f"'{self.user.username}'", "-p", self.user.pin, "--set-id", "0", + '0', "-w", self._user.key, "-y", "privkey", "--label", + f"'{self._user.username}'", "-p", self._user.pin, "--set-id", "0", "-d", "0"] - run(cmd, env={"SOFTHSM2_CONF": str(self._softhsm2_conf)}) + run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf.path}) logger.debug( - f"User key {self._private_key} is added to virtual smart card") + f"User key {self._user.key} is added to virtual smart card") cmd = ['pkcs11-tool', '--module', 'libsofthsm2.so', '--slot-index', "0", - '-w', str(self._cert), '-y', 'cert', '-p', self.user.pin, - '--label', f"'{self.user.username}'", '--set-id', "0", '-d', "0"] - run(cmd, env={"SOFTHSM2_CONF": str(self._softhsm2_conf)}) + '-w', self._user.cert, '-y', 'cert', '-p', self._user.pin, + '--label', f"'{self._user.username}'", '--set-id', "0", '-d', "0"] + run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf.path}) logger.debug( - f"User certificate {self._cert} is added to virtual smart card") + f"User certificate {self._user.cert} is added to virtual smart card") # To get URI of the card, the card has to be inserted # Virtual smart card can't be started without a cert and a key uploaded @@ -197,9 +219,10 @@ def create(self): required for each virtual card. """ - assert self._softhsm2_conf.exists(),\ + assert self._softhsm2_conf.path.exists(), \ "Can't proceed, SoftHSM2 conf doesn't exist" - Path(f"{self.user.card_dir}/tokens").mkdir() + + self.user.card_dir.joinpath("tokens").mkdir(exist_ok=True) p11lib = "/usr/lib64/pkcs11/libsofthsm2.so" # Initialize SoftHSM2 token. An error would be raised if token with same @@ -207,11 +230,12 @@ def create(self): cmd = ["softhsm2-util", "--init-token", "--free", "--label", self.user.username, "--so-pin", "12345678", "--pin", self.user.pin] - run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf}, check=True) + run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf.path}, check=True) logger.debug( f"SoftHSM token is initialized with label '{self.user.username}'") # Initialize NSS db + self._nssdb = self.user.card_dir.joinpath("db") self._nssdb.mkdir(exist_ok=True) run(f"modutil -create -dbdir sql:{self._nssdb} -force", check=True) logger.debug(f"NSS database is initialized in {self._nssdb}") @@ -226,7 +250,7 @@ def create(self): with self._template.open() as tmp: with self._service_location.open("w") as f: f.write(tmp.read().format(username=self.user.username, - softhsm2_conf=self._softhsm2_conf, + softhsm2_conf=self._softhsm2_conf.path, card_dir=self.user.card_dir)) logger.debug(f"Service is created in {self._service_location}") diff --git a/SCAutolib/models/user.py b/SCAutolib/models/user.py index 5813e45b..c237c08b 100644 --- a/SCAutolib/models/user.py +++ b/SCAutolib/models/user.py @@ -14,7 +14,7 @@ from pathlib import Path, PosixPath from shutil import rmtree -from SCAutolib import run, logger, LIB_DUMP_USERS +from SCAutolib import run, logger, LIB_DUMP_USERS, LIB_DUMP_CARD from SCAutolib.exceptions import SCAutolibException from SCAutolib.models import card as card_model from SCAutolib.models.CA import IPAServerCA @@ -77,6 +77,8 @@ class User(BaseUser): """ Generic class to represent system users. """ + _card = None + dump_file: Path = None def __init__(self, username: str, password: str, pin: str, cnf: Path = None, key: Path = None, cert: Path = None, @@ -241,6 +243,15 @@ def gen_csr(self): run(cmd) return csr_path + def load(self): + with self.dump_file.open("r") as f: + cnt = json.load(f) + cnt["card_dir"] = Path(cnt["card_dir"]) + + for k, v in cnt.__dict__.items(): + setattr(self, k, v) + return self + class IPAUser(User): """ @@ -273,6 +284,19 @@ def __init__(self, ipa_server: IPAServerCA, *args, **kwargs): self._meta_client = ipa_server.meta_client self._ipa_hostname = ipa_server.ipa_server_hostname + @property + def __dict__(self): + """ + Customising default property for better serialisation for storing to + JSON format. + + :return: dictionary with all values. Path objects are typed to string. + :rtype: dict + """ + dict_ = super().__dict__ + dict_.pop("_meta_client") + return dict_ + def add_user(self): """ Adds IPA user to IPA server. @@ -329,3 +353,8 @@ def gen_csr(self): str(csr_path), "-subj", f"/CN={self.username}"] run(cmd) return csr_path + + def load(self, ipa_server: IPAServerCA): + super(IPAUser, self).load() + self._meta_client = ipa_server.meta_client + return self diff --git a/SCAutolib/utils.py b/SCAutolib/utils.py index c0711734..da98b4f9 100644 --- a/SCAutolib/utils.py +++ b/SCAutolib/utils.py @@ -1,10 +1,14 @@ +""" +This module provides different additional helping functions that are used +across the library. These functions are made based on library demands and are +not attended to cover some general use-cases or specific corner cases. +""" import json from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa -from pathlib import Path - from enum import Enum +from pathlib import Path from SCAutolib import run, logger, TEMPLATES_DIR from SCAutolib.exceptions import SCAutolibException From 3dd997a4af5c2d916a1ff13888a6b4ca0fbdcac7 Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Tue, 12 Jul 2022 16:28:02 +0200 Subject: [PATCH 03/11] Fix loading the user with a card to tests --- SCAutolib/__init__.py | 16 +++++++-- SCAutolib/cli_commands.py | 29 +++++++++++++++++ SCAutolib/controller.py | 2 +- SCAutolib/models/CA.py | 3 +- SCAutolib/models/authselect.py | 59 ++++++++++++++++------------------ SCAutolib/models/card.py | 26 ++++++++++++--- SCAutolib/models/user.py | 12 ------- test/test_controller.py | 41 ++++++++++------------- 8 files changed, 112 insertions(+), 76 deletions(-) diff --git a/SCAutolib/__init__.py b/SCAutolib/__init__.py index 2f7d5d95..c5c4eb1c 100644 --- a/SCAutolib/__init__.py +++ b/SCAutolib/__init__.py @@ -26,7 +26,8 @@ def run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, - print_=True, **kwargs) -> subprocess.CompletedProcess: + print_=True, return_code: list = None, **kwargs) \ + -> subprocess.CompletedProcess: """ Wrapper for subrpocess.run function. This function explicitly set several parameter of original function and also provides similar thing as @@ -36,6 +37,10 @@ def run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, subprocess.run function needed to be passed to this wrapper, you can do it by adding same parameters names in key=value format. + :param return_code: acceptable return codes from given commands. + If check=True, and the return code of the cmd is not in the return_code + list an subprocess.CalledProcessError exception would be raised. + :type return_code: list :param cmd: Command to be executed :type cmd: list or str :param stdout: Redirection of stdout. Default is subprocess.PIPE @@ -58,6 +63,8 @@ def run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, :return: Completed process from subprocess.run :rtype: subprocess.CompletedProcess """ + if return_code is None: + return_code = [0] if type(cmd) == str: cmd = cmd.split(" ") logger.debug(f"run: {' '.join([str(i) for i in cmd])}") @@ -69,6 +76,9 @@ def run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, if out.stderr != "": logger.warning(out.stderr) - if check and out.returncode != 0: - raise subprocess.CalledProcessError(out.returncode, cmd) + if check: + if out.returncode not in return_code: + logger.error(f"Unexpected return code {out.returncode}. " + f"Expected: {return_code}") + raise subprocess.CalledProcessError(out.returncode, cmd) return out diff --git a/SCAutolib/cli_commands.py b/SCAutolib/cli_commands.py index 21966a2e..65b15489 100644 --- a/SCAutolib/cli_commands.py +++ b/SCAutolib/cli_commands.py @@ -1,6 +1,35 @@ import click +from SCAutolib.controller import Controller @click.group() def cli(): pass + + +@click.command() +@click.option("--ca-type", "-t", required=False, default='all', + show_default=True, + help="Type of the CA to be configured. If not set, all CA's " + "from the config file would be configured") +@click.option("--conf-file", "-c", required=False, default="./conf.json", + type=click.Path(exists=True, resolve_path=True), + show_default=True) +def setup_ca(conf_file, ca_type): + cnt = Controller(conf_file, {"ip_addr": "10.10.10.10"}) + cnt.setup_ipa_client() + + +@click.command() +@click.option("--conf", "-c", required=True) +@click.option("--force", "-f", required=False, default=False, is_flag=True) +@click.option("--gdm", "-g", required=False, default=False, is_flag=True) +@click.option("--install-missing", "-i", required=False, default=True, + is_flag=True) +def prepare(conf, force, gdm, install_missing): + cnt = Controller(conf) + cnt.prepare(force, gdm, install_missing) + + +cli.add_command(setup_ca) +cli.add_command(prepare) diff --git a/SCAutolib/controller.py b/SCAutolib/controller.py index 55730abd..0f2bfb34 100644 --- a/SCAutolib/controller.py +++ b/SCAutolib/controller.py @@ -5,7 +5,7 @@ from typing import Union from SCAutolib import (logger, run, LIB_DIR, LIB_BACKUP, LIB_DUMP, - LIB_DUMP_USERS, LIB_DUMP_CAS, LIB_DUMP_CARD) + LIB_DUMP_USERS, LIB_DUMP_CAS, LIB_DUMP_CARDS) from SCAutolib.exceptions import SCAutolibWrongConfig, SCAutolibException from SCAutolib.models import CA, file, user, card, authselect as auth from SCAutolib.models.file import File diff --git a/SCAutolib/models/CA.py b/SCAutolib/models/CA.py index 196e2e53..55d4ef9f 100644 --- a/SCAutolib/models/CA.py +++ b/SCAutolib/models/CA.py @@ -76,7 +76,8 @@ def __init__(self, root_dir: Path = None, cnf: OpensslCnf = None): /etc/SCAutolib/ca :type: Path """ - self.root_dir: Path = Path("/etc/SCAutolib/ca") if root_dir is None else root_dir + self.root_dir: Path = Path("/etc/SCAutolib/ca") if root_dir is None \ + else root_dir assert self.root_dir is not None self._conf_dir: Path = self.root_dir.joinpath("conf") self._newcerts: Path = self.root_dir.joinpath("newcerts") diff --git a/SCAutolib/models/authselect.py b/SCAutolib/models/authselect.py index 151af6a6..ee8e6c7e 100644 --- a/SCAutolib/models/authselect.py +++ b/SCAutolib/models/authselect.py @@ -4,18 +4,17 @@ authselect(8)) and applies sssd profile with selected 'Authselect profile features' (see man authselect-migration(7)). """ - from os.path import exists -from subprocess import check_output as subp_co -from subprocess import PIPE + from traceback import format_exc +from SCAutolib import LIB_BACKUP from SCAutolib import logger +from SCAutolib.utils import run class Authselect: - - backup_name = "SCAutolib_authselect_backup" + backup_name = LIB_BACKUP.joinpath("SCAutolib_authselect_backup") def __init__(self, required=False, lock_on_removal=False, mk_homedir=False): """ @@ -31,34 +30,33 @@ def __init__(self, required=False, lock_on_removal=False, mk_homedir=False): :param mk_homedir: specifies with-mkhomedir option :type mk_homedir: bool """ + self._options = ["with-smartcard"] - self._required = required - self._lock_on_removal = lock_on_removal - self._mk_homedir = mk_homedir + if required: + self._options.append("with-smartcard-required") + if lock_on_removal: + self._options.append("with-smartcard-lock-on-removal") + if mk_homedir: + self._options.append("with-mkhomedir") def _set(self): """ Set authselect with SSSD profile and set selected Authselect profile features. Features are passed into the constructor. """ - if self._required: - self._options.append("with-smartcard-required") - if self._lock_on_removal: - self._options.append("with-smartcard-lock-on-removal") - if self._mk_homedir: - self._options.append("with-mkhomedir") # compose and run Authselect command - args = ["authselect", "select", "sssd", *self._options, - "--backup", self.backup_name, "--force"] - subp_co(args, stderr=PIPE, encoding="utf=8") + cmd = ["authselect", "select", "sssd", *self._options, + "--backup", self.backup_name, "--force"] + run(cmd) # get modified setup and log it - current = subp_co(["authselect", "current"], stderr=PIPE, - encoding="utf-8") - logger.debug(f"Current SSSD setting is: {current}") - logger.debug(f"Original SSSD configuration was backed up with " - f"authselect to default location as : {self.backup_name}") + logger.debug("Current Authselect setting is:") + run(["authselect", "current"], return_code=[0, 2]) + + logger.debug(f"Original Authselect configuration was backed up with " + f"authselect to default location as : " + f"{str(self.backup_name)}") logger.debug("Default location is: /var/lib/authselect/backups/") def _restore(self): @@ -66,18 +64,17 @@ def _restore(self): Restore the previous configuration of authselect. """ if exists(f"/var/lib/authselect/backups/{self.backup_name}"): - subp_co(["authselect", "backup-restore", self.backup_name, - "--debug"], stderr=PIPE, encoding="utf=8") - current = subp_co(["authselect", "current"], stderr=PIPE, - encoding="utf-8") - logger.debug(f"SSSD configuration is restored to {current}.") - subp_co(["authselect", "backup-remove", self.backup_name, - "--debug"], stderr=PIPE, encoding="utf=8") - logger.debug("Authselect backup file is removed.") + cmd = ["authselect", "backup-restore", self.backup_name, "--debug"] + run(cmd) + logger.debug("Authselect configuration is restored to:") + run(["authselect", "current"], return_code=[0, 2]) + + # run(["authselect", "backup-remove", self.backup_name, "--debug"]) + # logger.debug("Authselect backup file is removed.") else: # as _set and _restore should be used in context manager defined in # this class, it should not happen that backup does not exist except - # something failed or it's misused + # something failed, or it's misused raise FileNotFoundError("Backup file not found. _restore method was" "probably called in unexpected manner.") diff --git a/SCAutolib/models/card.py b/SCAutolib/models/card.py index dd8b91e4..e508053d 100644 --- a/SCAutolib/models/card.py +++ b/SCAutolib/models/card.py @@ -3,9 +3,11 @@ that we are using in the library. Those types are: virtual smart card, real (physical) smart card in standard reader, cards in the removinator. """ +import json + import re import time -from pathlib import Path +from pathlib import Path, PosixPath from traceback import format_exc from SCAutolib import run, logger, TEMPLATES_DIR, LIB_DUMP_CARDS @@ -18,6 +20,7 @@ class Card: """ uri: str = None dump_file: Path = None + type: str = None _user = None _pattern: str = None @@ -53,6 +56,19 @@ def enroll(self): """ ... + @staticmethod + def load(json_file, **kwars): + with json_file.open("r") as f: + cnt = json.load(f) + + card = None + if cnt["type"] == "virtual": + assert "user" in kwars.keys(),\ + "No user is provided to load the card." + card = VirtualCard(user=kwars["user"], insert=cnt["_insert"]) + card.uri = cnt["uri"] + return card + class VirtualCard(Card): """ @@ -71,6 +87,8 @@ class VirtualCard(Card): _template: Path = Path(TEMPLATES_DIR, "virt_cacard.service") _pattern = r"(pkcs11:model=PKCS%2315%20emulated;" \ r"manufacturer=Common%20Access%20Card;serial=.*)" + _insert: bool = None + type = "virtual" def __init__(self, user, insert: bool = False, softhsm2_conf: Path = None): @@ -219,7 +237,7 @@ def create(self): required for each virtual card. """ - assert self._softhsm2_conf.path.exists(), \ + assert self._softhsm2_conf.exists(), \ "Can't proceed, SoftHSM2 conf doesn't exist" self.user.card_dir.joinpath("tokens").mkdir(exist_ok=True) @@ -230,7 +248,7 @@ def create(self): cmd = ["softhsm2-util", "--init-token", "--free", "--label", self.user.username, "--so-pin", "12345678", "--pin", self.user.pin] - run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf.path}, check=True) + run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf}, check=True) logger.debug( f"SoftHSM token is initialized with label '{self.user.username}'") @@ -250,7 +268,7 @@ def create(self): with self._template.open() as tmp: with self._service_location.open("w") as f: f.write(tmp.read().format(username=self.user.username, - softhsm2_conf=self._softhsm2_conf.path, + softhsm2_conf=self._softhsm2_conf, card_dir=self.user.card_dir)) logger.debug(f"Service is created in {self._service_location}") diff --git a/SCAutolib/models/user.py b/SCAutolib/models/user.py index c237c08b..b6ef2572 100644 --- a/SCAutolib/models/user.py +++ b/SCAutolib/models/user.py @@ -12,7 +12,6 @@ import pwd import python_freeipa from pathlib import Path, PosixPath -from shutil import rmtree from SCAutolib import run, logger, LIB_DUMP_USERS, LIB_DUMP_CARD from SCAutolib.exceptions import SCAutolibException @@ -77,8 +76,6 @@ class User(BaseUser): """ Generic class to represent system users. """ - _card = None - dump_file: Path = None def __init__(self, username: str, password: str, pin: str, cnf: Path = None, key: Path = None, cert: Path = None, @@ -243,15 +240,6 @@ def gen_csr(self): run(cmd) return csr_path - def load(self): - with self.dump_file.open("r") as f: - cnt = json.load(f) - cnt["card_dir"] = Path(cnt["card_dir"]) - - for k, v in cnt.__dict__.items(): - setattr(self, k, v) - return self - class IPAUser(User): """ diff --git a/test/test_controller.py b/test/test_controller.py index ecc6427b..0172384b 100644 --- a/test/test_controller.py +++ b/test/test_controller.py @@ -1,13 +1,11 @@ -from subprocess import check_output - import pytest +from configparser import ConfigParser from shutil import copy +from subprocess import check_output from SCAutolib.controller import Controller from SCAutolib.models.CA import IPAServerCA from conftest import FILES_DIR -import json -from configparser import ConfigParser @pytest.fixture() @@ -22,12 +20,6 @@ def dummy_config(tmp_path): return config_path -@pytest.fixture() -def wrong_dummy_config(dummy_config): - with open(dummy_config, "r") as f: - conf = json.load(f) - - @pytest.fixture() def controller(dummy_config): return Controller(dummy_config) @@ -60,10 +52,11 @@ def test_parse_config(dummy_config): assert cnt.conf_path.is_absolute() assert isinstance(cnt.lib_conf, dict) - -def test_prepare(controller): - """Test for overall setup including dumps.""" - cnt: Controller = controller +# +# def test_prepare(controller): +# """Test for overall setup including dumps.""" +# cnt: Controller = controller +# cnt.prepare(False, False, False) def test_setup_system(controller): @@ -101,13 +94,13 @@ def test_users_create(controller, tmp_path, ready_ipa): assert p.joinpath("sofhtsm2.conf").exists() -def test_cas_create(controller): - cnt: Controller = controller - - -def test_enroll_card(controller): - cnt: Controller = controller - - -def test_cleanup(controller): - cnt: Controller = controller +# def test_cas_create(controller): +# cnt: Controller = controller +# +# +# def test_enroll_card(controller): +# cnt: Controller = controller +# +# +# def test_cleanup(controller): +# cnt: Controller = controller From 76c3418729bd021e6d6c724d7396f4444726c490 Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Mon, 18 Jul 2022 09:24:22 +0200 Subject: [PATCH 04/11] Add load method for CA's --- SCAutolib/models/CA.py | 63 +++++++++++++++++++++++++++++++--------- SCAutolib/models/user.py | 18 +++++++----- 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/SCAutolib/models/CA.py b/SCAutolib/models/CA.py index 55d4ef9f..aa792ed4 100644 --- a/SCAutolib/models/CA.py +++ b/SCAutolib/models/CA.py @@ -11,7 +11,7 @@ from pathlib import Path, PosixPath from python_freeipa import exceptions from python_freeipa.client_meta import ClientMeta -from shutil import rmtree, copy +from shutil import rmtree from socket import gethostname from SCAutolib import TEMPLATES_DIR, logger, run, LIB_DIR, LIB_DUMP_CAS @@ -62,6 +62,28 @@ def revoke_cert(self, cert: Path): """ ... + @staticmethod + def load(json_file): + """ + Load CA from JSON file. + :return: CA object + """ + with json_file.dump_file.open("r") as f: + cnt = json.load(f) + + if "_ipa_server_ip" in cnt.keys(): + ca = IPAServerCA(ip_addr=cnt["_ipa_server_ip"], + server_hostname=cnt["_ipa_server_hostname"], + root_passwd=cnt["_ipa_server_root_password"], + admin_passwd=cnt["_ipa_server_admin_password"], + client_hostname=cnt["_ipa_client_hostname"], + domain=cnt["_ipa_server_domain"], + realm=cnt["_ipa_server_realm"]) + else: + ca = LocalCA(root_dir=cnt["root_dir"]) + logger.debug(f"CA {type(ca)} is restored from file {json_file}") + return ca + class LocalCA(BaseCA): template = Path(TEMPLATES_DIR, "ca.cnf") @@ -284,11 +306,19 @@ def __init__(self, ip_addr: str, server_hostname: str, domain: str, @property def is_installed(self): """ - :return: True, if IPA client is installed on the system (ipa command - returns zero return code), otherwise False + :return: True, if IPA client is installed on the system (/etc/ipa + directory contains ca.crt file from IPA server), otherwise False :rtype: bool """ - return False + d = Path("/etc/ipa") + result = d.exists() + if result: + result = d.joinpath("ca.crt") + return result + + @property + def domain(self): + return self._ipa_server_domain @property def __dict__(self): @@ -316,14 +346,21 @@ def setup(self): self._set_hostname() logger.info("Installing IPA client") - run(["ipa-client-install", "-p", "admin", - "--password", self._ipa_server_admin_passwd, - "--server", self._ipa_server_hostname, - "--domain", self._ipa_server_domain, - "--realm", self._ipa_server_realm, - "--hostname", self._ipa_client_hostname, - "--all-ip-addresses", "--force", "--force-join", "--no-ntp", "-U"], - input="yes") + try: + run(["ipa-client-install", "-p", "admin", + "--password", self._ipa_server_admin_passwd, + "--server", self._ipa_server_hostname, + "--domain", self._ipa_server_domain, + "--realm", self._ipa_server_realm, + "--hostname", self._ipa_client_hostname, + "--force", "--force-join", "--no-ntp", + "--no-dns-sshfp", "--mkhomedir", "--unattended"], + input="yes") + except: + logger.critical("Installation of IPA client is failed") + rmtree("/etc/ipa/*") + logger.debug("Directory /etc/ipa is removed") + raise logger.debug("IPA client is installed") ipa_client_script = self._get_sc_setup_script() @@ -549,7 +586,7 @@ def cleanup(self): except exceptions.NotFound: logger.error(f"Current hostname ({gethostname()}) is not found " f"on the IPA server") - run(["ipa-client-install", "--uninstall", "-U"], check=True) + run(["ipa-client-install", "--uninstall", "-U"], return_code=[0, 2]) logger.info("IPA client is removed.") class __PKeyChild(paramiko.PKey): diff --git a/SCAutolib/models/user.py b/SCAutolib/models/user.py index b6ef2572..b966fdbf 100644 --- a/SCAutolib/models/user.py +++ b/SCAutolib/models/user.py @@ -13,7 +13,7 @@ import python_freeipa from pathlib import Path, PosixPath -from SCAutolib import run, logger, LIB_DUMP_USERS, LIB_DUMP_CARD +from SCAutolib import run, logger, LIB_DUMP_USERS, LIB_DUMP_CARDS from SCAutolib.exceptions import SCAutolibException from SCAutolib.models import card as card_model from SCAutolib.models.CA import IPAServerCA @@ -123,7 +123,7 @@ def card(self, card: card_model.Card): @card.deleter def card(self): - logger.info("Deleting the existing card from {self.username}") + logger.info(f"Deleting the existing card from {self.username}") self._card = None @property @@ -213,6 +213,15 @@ def delete_user(self): logger.info(f"User {self.username} is not present on the system") def add_user(self, force=False): + """ + Add user to the local system with `useradd` bash command and set + password for created user. + + :param force: specifies if the user should be recreated if the + collision appears. + :type force: bool + :return: + """ try: pwd.getpwnam(self.username) msg = f"User {self.username} already exists on this " \ @@ -341,8 +350,3 @@ def gen_csr(self): str(csr_path), "-subj", f"/CN={self.username}"] run(cmd) return csr_path - - def load(self, ipa_server: IPAServerCA): - super(IPAUser, self).load() - self._meta_client = ipa_server.meta_client - return self From 009284d5e3a12d75aa64e4c29d305b02ad7109b0 Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Mon, 1 Aug 2022 15:50:08 +0200 Subject: [PATCH 05/11] Add strict checks for CNF file --- SCAutolib/exceptions.py | 9 +++++++++ SCAutolib/models/CA.py | 17 +++++++++++++++-- test/fixtures.py | 23 ++++++++++++++++++++--- test/test_card.py | 11 ++--------- 4 files changed, 46 insertions(+), 14 deletions(-) diff --git a/SCAutolib/exceptions.py b/SCAutolib/exceptions.py index 958f30af..b545515e 100644 --- a/SCAutolib/exceptions.py +++ b/SCAutolib/exceptions.py @@ -9,3 +9,12 @@ class SCAutolibException(Exception): """ def __init__(self, *args): super().__init__(*args) + + +class SCAutolibWrongConfig(SCAutolibException): + default = "Key/section for current operation is not present in the " \ + "configuration file" + + def __init__(self, msg=None): + msg = self.default if msg is None else msg + super().__init__(msg) diff --git a/SCAutolib/models/CA.py b/SCAutolib/models/CA.py index aa792ed4..4a4da77b 100644 --- a/SCAutolib/models/CA.py +++ b/SCAutolib/models/CA.py @@ -107,7 +107,10 @@ def __init__(self, root_dir: Path = None, cnf: OpensslCnf = None): self._crl: Path = self.root_dir.joinpath("crl", "root.pem") self._ca_pki_db: Path = Path("/etc/sssd/pki/sssd_auth_ca_db.pem") - self._ca_cnf: OpensslCnf = cnf + self._ca_cnf: OpensslCnf = cnf if cnf else OpensslCnf( + conf_type="CA", + filepath=self.root_dir.joinpath("ca.cnf"), + replace=str(self.root_dir)) self._ca_cert: Path = self.root_dir.joinpath("rootCA.pem") self._ca_key: Path = self.root_dir.joinpath("rootCA.key") @@ -118,6 +121,12 @@ def __init__(self, root_dir: Path = None, cnf: OpensslCnf = None): def cnf(self): return self._ca_cnf + @cnf.setter + def cnf(self, cnf: OpensslCnf): + if not cnf.path.exists(): + raise SCAutolibException("CNF file does not exist") + self._ca_cnf = cnf + @property def __dict__(self): """ @@ -140,6 +149,10 @@ def setup(self): and new local CA would be created. Otherwise, configuration would be skipped. """ + if self._ca_cnf is None: + raise SCAutolibException("CA CNF file is not set") + elif not self._ca_cnf.path.exists(): + raise SCAutolibException("CA CNF does not exist") self.root_dir.mkdir(parents=True, exist_ok=True) self._newcerts.mkdir(exist_ok=True) @@ -457,7 +470,7 @@ def _get_sc_setup_script(self) -> Path: "on IPA client") with Connection(self._ipa_server_ip, user="root", connect_kwargs={"password": - self._ipa_server_root_passwd}) as c: + self._ipa_server_root_passwd}) as c: # Delete this block when PR in paramiko will be accepted # https://github.com/paramiko/paramiko/issues/396 #### noqa:E266 diff --git a/test/fixtures.py b/test/fixtures.py index 4dc0aaa8..55a51972 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -2,14 +2,31 @@ from pathlib import Path from shutil import copyfile +from SCAutolib.models import CA from SCAutolib.models.card import VirtualCard -from SCAutolib.models.file import SSSDConf, File +from SCAutolib.models.file import SSSDConf, File, OpensslCnf from SCAutolib.models.user import User +@pytest.fixture(scope="session") +def local_ca_fixture(tmp_path_factory, backup_sssd_ca_db): + root = tmp_path_factory.mktemp("ca").joinpath("local-ca") + root.mkdir(exist_ok=True) + cnf = OpensslCnf(conf_type="CA", filepath=root.joinpath("ca.cnf"), + replace=str(root)) + ca = CA.LocalCA(root, cnf) + try: + cnf.create() + cnf.save() + ca.setup() + except FileExistsError: + pass + return ca + + @pytest.fixture -def local_user(tmp_path): - user = User("testuser", "testpassword", "123456") +def local_user(tmp_path, request): + user = User(f"user-{request.node.name}", "testpassword", "123456") user.card_dir = tmp_path user.dump_file = tmp_path.joinpath("test-user-dump-file.json") user.card = VirtualCard(user=user) diff --git a/test/test_card.py b/test/test_card.py index 42139cb0..20b50882 100644 --- a/test/test_card.py +++ b/test/test_card.py @@ -8,14 +8,7 @@ @pytest.fixture() -def local_ca(tmp_path): - ca = LocalCA(Path(tmp_path, "ca")) - ca.setup(force=True) - return ca - - -@pytest.fixture() -def gen_key_and_cert(local_ca, local_user): +def gen_key_and_cert(local_ca_fixture, local_user): csr = Path(local_user.card_dir, f"{local_user.username}.csr") cert = Path(local_user.card_dir, f"{local_user.username}.cert") key = Path(local_user.card_dir, f"{local_user.username}.key") @@ -23,7 +16,7 @@ def gen_key_and_cert(local_ca, local_user): "-keyout", key, "-out", csr, "-subj", f"/CN={local_user.username}"] check_output(cmd, encoding="utf-8") - local_ca.request_cert(csr, username=local_user.username, cert_out=cert) + local_ca_fixture.request_cert(csr, username=local_user.username, cert_out=cert) return key, cert From def6f5fd061b072fc7a97d54e619c04d6d8261a4 Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Mon, 1 Aug 2022 15:52:40 +0200 Subject: [PATCH 06/11] Fix using attributes with class objects --- SCAutolib/models/card.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/SCAutolib/models/card.py b/SCAutolib/models/card.py index e508053d..7b8ea9e1 100644 --- a/SCAutolib/models/card.py +++ b/SCAutolib/models/card.py @@ -161,7 +161,7 @@ def __dict__(self): for k, v in dict_.items(): if type(v) in (PosixPath, Path): dict_[k] = str(v) - dict_["_softhsm2_conf"] = str(self._softhsm2_conf.path) + dict_["_softhsm2_conf"] = str(self._softhsm2_conf) dict_.pop("_user") return dict_ @@ -183,6 +183,10 @@ def user(self, system_user): self._user = system_user self._nssdb = self.user.card_dir.joinpath("db") + @property + def service_location(self): + return self._service_location + def insert(self): """ Insert virtual smart card by starting the corresponding service. @@ -212,14 +216,14 @@ def enroll(self): '0', "-w", self._user.key, "-y", "privkey", "--label", f"'{self._user.username}'", "-p", self._user.pin, "--set-id", "0", "-d", "0"] - run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf.path}) + run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf}) logger.debug( f"User key {self._user.key} is added to virtual smart card") cmd = ['pkcs11-tool', '--module', 'libsofthsm2.so', '--slot-index', "0", '-w', self._user.cert, '-y', 'cert', '-p', self._user.pin, '--label', f"'{self._user.username}'", '--set-id', "0", '-d', "0"] - run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf.path}) + run(cmd, env={"SOFTHSM2_CONF": self._softhsm2_conf}) logger.debug( f"User certificate {self._user.cert} is added to virtual smart card") From 00e11ea3c3e0d1a6f4af1ac7fc6601ec166c1a3b Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Mon, 1 Aug 2022 15:54:24 +0200 Subject: [PATCH 07/11] Fix removing card service as cleanup --- SCAutolib/models/user.py | 2 ++ test/test_card.py | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/SCAutolib/models/user.py b/SCAutolib/models/user.py index b966fdbf..1e248708 100644 --- a/SCAutolib/models/user.py +++ b/SCAutolib/models/user.py @@ -8,6 +8,8 @@ The classes implement add_user and delete_user methods which can be used to create or remove a specified user in the system or in the specified IPA server. """ +from shutil import rmtree + import json import pwd import python_freeipa diff --git a/test/test_card.py b/test/test_card.py index 20b50882..06a4a16f 100644 --- a/test/test_card.py +++ b/test/test_card.py @@ -29,7 +29,11 @@ def local_user_with_smart_card(local_user, gen_key_and_cert): local_user.key, local_user.cert = gen_key_and_cert local_user.card.softhsm2_conf = hsm_conf.path - return local_user + yield local_user + + if local_user.card.service_location and \ + local_user.card.service_location.exists(): + local_user.card.service_location.unlink() @pytest.mark.service_restart From f3a94589629a3b432bba8cfaa07d800a08d9fad7 Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Mon, 1 Aug 2022 15:55:02 +0200 Subject: [PATCH 08/11] Fix unit tests for local CA --- test/test_ca.py | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/test/test_ca.py b/test/test_ca.py index a6688b89..39db5bb2 100644 --- a/test/test_ca.py +++ b/test/test_ca.py @@ -7,18 +7,10 @@ from shutil import copyfile from subprocess import check_output, run, PIPE, CalledProcessError +import SCAutolib.exceptions from SCAutolib import TEMPLATES_DIR from SCAutolib.models import CA - - -@pytest.fixture(scope="session") -def local_ca_fixture(tmp_path_factory, backup_sssd_ca_db): - ca = CA.LocalCA(tmp_path_factory.mktemp("ca").joinpath("local-ca")) - try: - ca.setup() - except FileExistsError: - pass - return ca +from SCAutolib.models.file import OpensslCnf @pytest.fixture(scope="session") @@ -95,7 +87,13 @@ def clean_ipa(): def test_local_ca_setup(backup_sssd_ca_db, tmpdir, caplog): sssd_auth_ca_db = Path("/etc/sssd/pki/sssd_auth_ca_db.pem") - ca = CA.LocalCA(Path(tmpdir, "ca")) + root = Path(tmpdir, "ca") + root.mkdir() + cnf = OpensslCnf(conf_type="CA", filepath=root.joinpath("ca.cnf"), + replace=str(root)) + cnf.create() + cnf.save() + ca = CA.LocalCA(root, cnf) ca.setup() assert ca.root_dir.exists() @@ -111,6 +109,22 @@ def test_local_ca_setup(backup_sssd_ca_db, tmpdir, caplog): assert "Local CA is configured" in caplog.messages +def test_local_ca_raise_no_cnf(backup_sssd_ca_db, tmpdir, caplog): + root = Path(tmpdir, "ca") + root.mkdir() + cnf = OpensslCnf(conf_type="CA", filepath=root.joinpath("ca.cnf"), + replace=str(root)) + ca = CA.LocalCA(root) + with pytest.raises(SCAutolib.exceptions.SCAutolibException): + ca.setup() + + cnf.create() + cnf.save() + + ca.cnf = cnf + ca.setup() + + def test_request_cert(local_ca_fixture, tmpdir): csr = Path(tmpdir, "username.csr") cnf = Path(tmpdir, "user.cnf") @@ -158,7 +172,7 @@ def test_ipa_server_setup(dummy_ipa_vals, ipa_meta_client, caplog): ipa_ca = CA.IPAServerCA(ip_addr=dummy_ipa_vals["server_ip"], client_hostname=dummy_ipa_vals[ "client_hostname"], - hostname=dummy_ipa_vals["server_hostname"], + server_hostname=dummy_ipa_vals["server_hostname"], root_passwd=dummy_ipa_vals[ "server_root_passwd"], admin_passwd=dummy_ipa_vals[ From 395ad4f6ddddce7a5159f1e1651e0182f7133f7f Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Tue, 2 Aug 2022 11:06:53 +0200 Subject: [PATCH 09/11] Fix unit test fixtures usage --- SCAutolib/models/file.py | 2 + SCAutolib/models/user.py | 6 +- SCAutolib/utils.py | 4 +- test/conftest.py | 3 +- test/files/dummy_config_file.json | 35 ++++++++++ test/fixtures.py | 48 +++++++++++++- test/pytest.ini | 1 + test/test_ca.py | 106 ++++++------------------------ test/test_controller.py | 43 +++++------- test/test_user.py | 23 +++---- 10 files changed, 132 insertions(+), 139 deletions(-) create mode 100644 test/files/dummy_config_file.json diff --git a/SCAutolib/models/file.py b/SCAutolib/models/file.py index c592ec5d..5edd7dec 100644 --- a/SCAutolib/models/file.py +++ b/SCAutolib/models/file.py @@ -167,6 +167,8 @@ def get(self, key, section: str = None, separator: str = "="): with self._conf_file.open() as config: self._simple_content = config.readlines() for line in self._simple_content: + if line.strip().startswith("#") or line.strip() == "": + continue key_from_file, value = line.split(separator, maxsplit=1) if key_from_file == key: return value.strip() diff --git a/SCAutolib/models/user.py b/SCAutolib/models/user.py index 1e248708..1d7e373e 100644 --- a/SCAutolib/models/user.py +++ b/SCAutolib/models/user.py @@ -214,14 +214,10 @@ def delete_user(self): pass logger.info(f"User {self.username} is not present on the system") - def add_user(self, force=False): + def add_user(self): """ Add user to the local system with `useradd` bash command and set password for created user. - - :param force: specifies if the user should be recreated if the - collision appears. - :type force: bool :return: """ try: diff --git a/SCAutolib/utils.py b/SCAutolib/utils.py index da98b4f9..045c97b8 100644 --- a/SCAutolib/utils.py +++ b/SCAutolib/utils.py @@ -101,8 +101,8 @@ def _check_packages(packages): """ missing = [] for pkg in packages: - out = run(["rpm", "-q", pkg]) - if pkg not in out.stdout: + out = run(["rpm", "-q", pkg], return_code=[0, 1]) + if out.returncode == 1: logger.warning(f"Package {pkg} is required for the testing, " f"but is not present in the system") missing.append(pkg) diff --git a/test/conftest.py b/test/conftest.py index 2665867c..db759bf6 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -49,7 +49,8 @@ def pytest_generate_tests(metafunc): and all([ipa_ip, ipa_hostname, ipa_admin_passwd, ipa_root_passwd]): ipa_config = {"ip": ipa_ip, "hostname": ipa_hostname, "admin_passwd": ipa_admin_passwd, - "root_passwd": ipa_root_passwd} + "root_passwd": ipa_root_passwd, + "domain": ipa_hostname.split(".", 1)[1]} metafunc.parametrize("ipa_config", [ipa_config]) if 'ipa_ip' in metafunc.fixturenames and ipa_ip is not None: diff --git a/test/files/dummy_config_file.json b/test/files/dummy_config_file.json new file mode 100644 index 00000000..27fcb503 --- /dev/null +++ b/test/files/dummy_config_file.json @@ -0,0 +1,35 @@ +{ + "root_passwd": "redhat", + "ca": { + "local_ca": { + + }, + "ipa": { + "admin_passwd": "SECret.123", + "root_passwd": "redhat", + "ip_addr": "192.168.122.60", + "server_hostname": "ipa-server-v2.test.local", + "client_hostname": "unit.test.local", + "domain": "test.local", + "realm": "test.local" + } + }, + "users": [ + { + "name": "local-user", + "passwd": "654321", + "pin": "123456", + "card_dir": "/root/local-user", + "card_type": "virtual", + "local": true + }, + { + "name": "unit-user", + "passwd": "unit-user-passwd", + "pin": "123456", + "card_dir": "/root/unit-user", + "card_type": "virtual", + "local": false + } + ] +} diff --git a/test/fixtures.py b/test/fixtures.py index 55a51972..c270d47c 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -1,3 +1,5 @@ +from subprocess import check_output, run, CalledProcessError, PIPE + import pytest from pathlib import Path from shutil import copyfile @@ -6,6 +8,39 @@ from SCAutolib.models.card import VirtualCard from SCAutolib.models.file import SSSDConf, File, OpensslCnf from SCAutolib.models.user import User +import pwd + + +@pytest.fixture(scope="session") +def ipa_fixture(ipa_config): + client_name = f'client-{ipa_config["hostname"]}' + cmd = ["ipa-client-install", "-p", "admin", + "--password", ipa_config["admin_passwd"], + "--server", ipa_config["hostname"], + "--domain", ipa_config["domain"], + "--realm", ipa_config["domain"].upper(), + "--hostname", client_name, + "--all-ip-addresses", "--force", "--force-join", + "--no-ntp", "-U"] + + proc = run(cmd, input="yes", encoding="utf-8", stdout=PIPE, stderr=PIPE) + # Return code 3 is a return code when IPA client is already installed + if proc.returncode not in [0, 3]: + raise CalledProcessError(proc.returncode, cmd) + + return CA.IPAServerCA(ip_addr=ipa_config["ip"], + server_hostname=ipa_config["hostname"], + admin_passwd=ipa_config["admin_passwd"], + root_passwd=ipa_config["root_passwd"], + domain=ipa_config["domain"], + client_hostname=client_name) + + +@pytest.fixture(scope="session") +def clean_ipa(): + yield + check_output(["ipa-client-install", "--uninstall", "--unattended"], + encoding="utf-8") @pytest.fixture(scope="session") @@ -26,11 +61,20 @@ def local_ca_fixture(tmp_path_factory, backup_sssd_ca_db): @pytest.fixture def local_user(tmp_path, request): - user = User(f"user-{request.node.name}", "testpassword", "123456") + # In linux useradd command max length of the username is 32 chars + username = f"user-{request.node.name}"[0:32] + user = User(username, "testpassword", "123456") user.card_dir = tmp_path user.dump_file = tmp_path.joinpath("test-user-dump-file.json") user.card = VirtualCard(user=user) - return user + yield user + + # Delete the user if it was added during the test phase + try: + pwd.getpwnam(username) + check_output(["userdel", username], encoding="utf-8") + except KeyError: + pass @pytest.fixture() diff --git a/test/pytest.ini b/test/pytest.ini index e4af82e8..9697e095 100644 --- a/test/pytest.ini +++ b/test/pytest.ini @@ -6,6 +6,7 @@ markers = ipa: test working with FreeIPA (client or server) filterwarnings = ignore::DeprecationWarning + ignore::urllib3.exceptions.InsecureRequestWarning ignore:Unverified HTTPS request is being made to host.*:: env = D:IPA_IP= diff --git a/test/test_ca.py b/test/test_ca.py index 39db5bb2..54c4daf6 100644 --- a/test/test_ca.py +++ b/test/test_ca.py @@ -13,23 +13,6 @@ from SCAutolib.models.file import OpensslCnf -@pytest.fixture(scope="session") -def dummy_ipa_vals(ipa_ip, ipa_hostname, ipa_admin_passwd, ipa_root_passwd): - """ - Creates dummy values for IPA serve and client for testings - """ - domain = ipa_hostname.split(".", 1)[1] - return { - "server_ip": ipa_ip, - "server_domain": domain, - "server_hostname": ipa_hostname, - "server_admin_passwd": ipa_admin_passwd, - "server_realm": domain.upper(), - "server_root_passwd": ipa_root_passwd, - "client_hostname": f"client-hostname.{domain}" - } - - @pytest.fixture() def dummy_user(): class User: @@ -41,50 +24,17 @@ def __init__(self): @pytest.fixture() -def ipa_meta_client(dummy_ipa_vals): +def ipa_meta_client(ipa_config): """ Return ready-to-use IPA MetaClient with admin login. This fixture might not work if there is no mapping rule on your system for given IPA IP address and IPA hostnames (no corresponding entry in /etc/hosts) """ - client = ClientMeta(dummy_ipa_vals["server_hostname"], verify_ssl=False) - client.login("admin", dummy_ipa_vals["server_admin_passwd"]) + client = ClientMeta(ipa_config["hostname"], verify_ssl=False) + client.login("admin", ipa_config["admin_passwd"]) return client -@pytest.fixture(scope="session") -def installed_ipa(dummy_ipa_vals): - cmd = ["ipa-client-install", "-p", "admin", - "--password", dummy_ipa_vals["server_admin_passwd"], - "--server", dummy_ipa_vals["server_hostname"], - "--domain", dummy_ipa_vals["server_domain"], - "--realm", dummy_ipa_vals["server_realm"], - "--hostname", dummy_ipa_vals["client_hostname"], - "--all-ip-addresses", "--force", "--force-join", - "--no-ntp", "-U"] - print("Installing IPA client on the system") - proc = run(cmd, input="yes", encoding="utf-8", stdout=PIPE, stderr=PIPE) - if proc.returncode not in [0, 3]: - raise CalledProcessError(proc.returncode, cmd) - print("IPA client is installed on the system") - return CA.IPAServerCA(ip_addr=dummy_ipa_vals["server_ip"], - client_hostname=dummy_ipa_vals[ - "client_hostname"], - hostname=dummy_ipa_vals["server_hostname"], - root_passwd=dummy_ipa_vals[ - "server_root_passwd"], - admin_passwd=dummy_ipa_vals[ - "server_admin_passwd"], - domain=dummy_ipa_vals["server_domain"]) - - -@pytest.fixture(scope="session") -def clean_ipa(): - yield - check_output(["ipa-client-install", "--uninstall", "--unattended"], - encoding="utf-8") - - def test_local_ca_setup(backup_sssd_ca_db, tmpdir, caplog): sssd_auth_ca_db = Path("/etc/sssd/pki/sssd_auth_ca_db.pem") root = Path(tmpdir, "ca") @@ -168,20 +118,18 @@ def test_revoke_cert(local_ca_fixture, tmpdir): @pytest.mark.ipa -def test_ipa_server_setup(dummy_ipa_vals, ipa_meta_client, caplog): - ipa_ca = CA.IPAServerCA(ip_addr=dummy_ipa_vals["server_ip"], - client_hostname=dummy_ipa_vals[ - "client_hostname"], - server_hostname=dummy_ipa_vals["server_hostname"], - root_passwd=dummy_ipa_vals[ - "server_root_passwd"], - admin_passwd=dummy_ipa_vals[ - "server_admin_passwd"], - domain=dummy_ipa_vals["server_domain"]) +def test_ipa_server_setup(ipa_config, ipa_meta_client, caplog): + client_name = f'client-{ipa_config["hostname"]}' + ipa_ca = CA.IPAServerCA(ip_addr=ipa_config["ip"], + server_hostname=ipa_config["hostname"], + admin_passwd=ipa_config["admin_passwd"], + root_passwd=ipa_config["root_passwd"], + domain=ipa_config["domain"], + client_hostname=client_name) ipa_ca.setup() # Test if meta client can get info about freshly configured host - ipa_meta_client.host_show(a_fqdn=dummy_ipa_vals["client_hostname"]) + ipa_meta_client.host_show(a_fqdn=client_name) try: policy = ipa_meta_client.pwpolicy_show(a_cn="global_policy")["result"] @@ -193,8 +141,8 @@ def test_ipa_server_setup(dummy_ipa_vals, ipa_meta_client, caplog): @pytest.mark.ipa -def test_ipa_cert_request_and_revoke(installed_ipa, ipa_meta_client, - dummy_ipa_vals, tmpdir, dummy_user): +def test_ipa_cert_request_and_revoke(ipa_fixture, ipa_meta_client, + tmpdir, dummy_user): csr = Path(tmpdir, "cert.csr") key = Path(tmpdir, "cert.key") cert = Path(tmpdir, "cert.out") @@ -208,8 +156,7 @@ def test_ipa_cert_request_and_revoke(installed_ipa, ipa_meta_client, dummy_user.username, dummy_user.username, o_userpassword=dummy_user.password) - ipa_ca = installed_ipa - out = ipa_ca.request_cert(csr, dummy_user.username, cert) + out = ipa_fixture.request_cert(csr, dummy_user.username, cert) assert out.suffix == ".pem" with out.open("rb") as f: @@ -218,7 +165,7 @@ def test_ipa_cert_request_and_revoke(installed_ipa, ipa_meta_client, # If cert is not properly created, this would raise an IPA exception ipa_meta_client.cert_show(a_serial_number=cert_obj.serial_number) - ipa_ca.revoke_cert(out) + ipa_fixture.revoke_cert(out) revoked = ipa_meta_client.cert_show( a_serial_number=cert_obj.serial_number)["result"]["revoked"] assert revoked @@ -227,20 +174,9 @@ def test_ipa_cert_request_and_revoke(installed_ipa, ipa_meta_client, @pytest.mark.ipa -def test_ipa_user_add(installed_ipa, ipa_meta_client, dummy_ipa_vals, - dummy_user): +def test_ipa_user_add(ipa_fixture, ipa_meta_client, dummy_user): try: - ipa_ca = CA.IPAServerCA(ip_addr=dummy_ipa_vals["server_ip"], - client_hostname=dummy_ipa_vals[ - "client_hostname"], - hostname=dummy_ipa_vals["server_hostname"], - root_passwd=dummy_ipa_vals[ - "server_root_passwd"], - admin_passwd=dummy_ipa_vals[ - "server_admin_passwd"], - domain=dummy_ipa_vals["server_domain"]) - ipa_ca.setup() - ipa_ca.add_user(dummy_user) + ipa_fixture.add_user(dummy_user) # If the user is not properly created, this would raise an IPA exception ipa_meta_client.user_show(a_uid=dummy_user.username) @@ -249,13 +185,11 @@ def test_ipa_user_add(installed_ipa, ipa_meta_client, dummy_ipa_vals, @pytest.mark.ipa -def test_ipa_user_del(installed_ipa, ipa_meta_client, dummy_ipa_vals, - dummy_user): +def test_ipa_user_del(ipa_fixture, ipa_meta_client, dummy_user): ipa_meta_client.user_add(dummy_user.username, dummy_user.username, dummy_user.username, dummy_user.username, o_userpassword=dummy_user.password) - ipa_ca = installed_ipa - ipa_ca.del_user(dummy_user) + ipa_fixture.del_user(dummy_user) # If the user is not properly created, this would raise an IPA exception with pytest.raises(python_freeipa.exceptions.NotFound): diff --git a/test/test_controller.py b/test/test_controller.py index 0172384b..e8a14e12 100644 --- a/test/test_controller.py +++ b/test/test_controller.py @@ -25,26 +25,6 @@ def controller(dummy_config): return Controller(dummy_config) -@pytest.fixture() -def ready_ipa(ipa_config): - domain = ipa_config["hostname"].split(".", 1)[1] - client_name = f'client-{ipa_config["hostname"]}' - # cmd = ["ipa-client-install", "-p", "admin", - # "--password", ipa_config["admin_passwd"], - # "--server", ipa_config["hostname"], - # "--domain", domain, # noqa: E501 user everything after first dot as domain, e.g ipa.test.local -> test.local would be used - # "--realm", domain.upper(), - # "--hostname", client_name, - # "--all-ip-addresses", "--force", "--force-join", "--no-ntp", "-U"] - # check_output(cmd, input="yes", encoding="utf-8") - return IPAServerCA(ip_addr=ipa_config["ip"], - server_hostname=ipa_config["hostname"], - admin_passwd=ipa_config["admin_passwd"], - root_passwd=ipa_config["root_passwd"], - domain=domain, - client_hostname=client_name) - - def test_parse_config(dummy_config): """Test that configuration is parsed and validated properly.""" cnt = Controller(dummy_config) @@ -82,17 +62,24 @@ def test_setup_system(controller): cnt.sssd_conf._default_parser.sections())), msg -def test_users_create(controller, tmp_path, ready_ipa): +@pytest.mark.ipa +def test_users_create_and_delete(controller, tmp_path, ipa_fixture): """Test for adding local and IPA users to the systems and initializing all required files.""" cnt: Controller = controller - cnt.ipa_ca = ready_ipa - for u in cnt.lib_conf["users"]: - cnt.setup_user(u) - - for p in [t["card_dir"] for t in cnt.lib_conf["users"]]: - assert p.joinpath("sofhtsm2.conf").exists() - + cnt.ipa_ca = ipa_fixture + + try: + for u in cnt.lib_conf["users"]: + cnt.setup_user(u) + for p in [t["card_dir"] for t in cnt.lib_conf["users"]]: + assert p.joinpath("sofhtsm2.conf").exists() + + for u in cnt.lib_conf["users"]: + cnt.setup_user(u) + finally: + for u in cnt.users: + u.delete_user() # def test_cas_create(controller): # cnt: Controller = controller diff --git a/test/test_user.py b/test/test_user.py index 0a285ec1..aaf65bbe 100644 --- a/test/test_user.py +++ b/test/test_user.py @@ -18,15 +18,11 @@ def test_add_and_remove_local_user(local_user): pwd.getpwnam(local_user.username) -def test_add_and_remove_key_cert_pair(local_user): - cwd = Path(os.getcwd()) - ca = LocalCA(cwd) - ca.setup() - - local_user.key = ca._ca_key - local_user.cert = ca._ca_cert - assert local_user.key == ca._ca_key - assert local_user.cert == ca._ca_cert +def test_add_and_remove_key_cert_pair(local_user, local_ca_fixture): + local_user.key = local_ca_fixture._ca_key + local_user.cert = local_ca_fixture._ca_cert + assert local_user.key == local_ca_fixture._ca_key + assert local_user.cert == local_ca_fixture._ca_cert del local_user.key del local_user.cert @@ -35,12 +31,9 @@ def test_add_and_remove_key_cert_pair(local_user): assert local_user.key is None -def test_add_and_remove_cnf(local_user): - ca = LocalCA(Path(os.getcwd())) - ca.setup() - - local_user.cnf = ca._ca_cnf - assert local_user.cnf == ca._ca_cnf +def test_add_and_remove_cnf(local_user, local_ca_fixture): + local_user.cnf = local_ca_fixture._ca_cnf + assert local_user.cnf == local_ca_fixture._ca_cnf del local_user.cnf assert local_user.cnf is None From 13ff145fbf55fb4173432ec2d508d377669f584f Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Tue, 2 Aug 2022 13:55:34 +0200 Subject: [PATCH 10/11] Update load and dump user's card --- SCAutolib/controller.py | 6 +++--- SCAutolib/models/CA.py | 4 ++-- SCAutolib/models/card.py | 19 +++++++++---------- SCAutolib/models/user.py | 9 ++++++--- SCAutolib/templates/virtcacard.cil | 3 +++ SCAutolib/utils.py | 6 ++++++ test/conftest.py | 13 ++++++++----- test/test_ca.py | 2 +- test/test_card.py | 24 ++++++++++++++++++++++-- test/test_controller.py | 22 +--------------------- test/test_user.py | 10 ++++------ 11 files changed, 65 insertions(+), 53 deletions(-) create mode 100644 SCAutolib/templates/virtcacard.cil diff --git a/SCAutolib/controller.py b/SCAutolib/controller.py index 0f2bfb34..8439cf36 100644 --- a/SCAutolib/controller.py +++ b/SCAutolib/controller.py @@ -133,7 +133,7 @@ def setup_local_ca(self, force: bool = False): ca_dir: Path = self.lib_conf["ca"]["local_ca"]["dir"] cnf = file.OpensslCnf(ca_dir.joinpath("ca.cnf"), "CA", str(ca_dir)) - self.local_ca = CA.LocalCA(dir=ca_dir, cnf=cnf) + self.local_ca = CA.LocalCA(root_dir=ca_dir, cnf=cnf) if force: logger.warning(f"Removing previous local CA in a directory " @@ -256,7 +256,7 @@ def setup_user(self, user_dict: dict, force: bool = False): hsm_conf.save() new_card = card.VirtualCard(new_user) - new_card.softhsm2_conf = hsm_conf + new_card.softhsm2_conf = hsm_conf.path else: raise NotImplementedError("Other card type than 'virtual' does not " "supported yet") @@ -429,7 +429,7 @@ def _general_steps_for_ipa(): :return: name of the IPA client package for current Linux """ os_version = _get_os_version() - if os_version != OSVersion.RHEL_9: + if os_version not in (OSVersion.RHEL_9, OSVersion.CentOS_9): run("dnf module enable -y idm:DL1") run("dnf install @idm:DL1 -y") logger.debug("idm:DL1 module is installed") diff --git a/SCAutolib/models/CA.py b/SCAutolib/models/CA.py index 4a4da77b..76fbb5c2 100644 --- a/SCAutolib/models/CA.py +++ b/SCAutolib/models/CA.py @@ -469,8 +469,8 @@ def _get_sc_setup_script(self) -> Path: logger.debug("Start receiving client script for setting up smart card " "on IPA client") with Connection(self._ipa_server_ip, user="root", - connect_kwargs={"password": - self._ipa_server_root_passwd}) as c: + connect_kwargs={ + "password": self._ipa_server_root_passwd}) as c: # Delete this block when PR in paramiko will be accepted # https://github.com/paramiko/paramiko/issues/396 #### noqa:E266 diff --git a/SCAutolib/models/card.py b/SCAutolib/models/card.py index 7b8ea9e1..4155313b 100644 --- a/SCAutolib/models/card.py +++ b/SCAutolib/models/card.py @@ -4,10 +4,9 @@ (physical) smart card in standard reader, cards in the removinator. """ import json - import re import time -from pathlib import Path, PosixPath +from pathlib import Path from traceback import format_exc from SCAutolib import run, logger, TEMPLATES_DIR, LIB_DUMP_CARDS @@ -65,7 +64,8 @@ def load(json_file, **kwars): if cnt["type"] == "virtual": assert "user" in kwars.keys(),\ "No user is provided to load the card." - card = VirtualCard(user=kwars["user"], insert=cnt["_insert"]) + card = VirtualCard(user=kwars["user"], + softhsm2_conf=Path(cnt["softhsm"])) card.uri = cnt["uri"] return card @@ -157,13 +157,12 @@ def __exit__(self, exp_type, exp_value, exp_traceback): def __dict__(self): # Need to copy to not referencing the same object what leads to # changing it on retyping - dict_ = super().__dict__.copy() - for k, v in dict_.items(): - if type(v) in (PosixPath, Path): - dict_[k] = str(v) - dict_["_softhsm2_conf"] = str(self._softhsm2_conf) - dict_.pop("_user") - return dict_ + return { + "softhsm": str(self._softhsm2_conf), + "type": "virtual", + "uri": self.uri, + "username": self.user.username + } @property def softhsm2_conf(self): diff --git a/SCAutolib/models/user.py b/SCAutolib/models/user.py index 1d7e373e..8a164fa6 100644 --- a/SCAutolib/models/user.py +++ b/SCAutolib/models/user.py @@ -15,7 +15,7 @@ import python_freeipa from pathlib import Path, PosixPath -from SCAutolib import run, logger, LIB_DUMP_USERS, LIB_DUMP_CARDS +from SCAutolib import run, logger, LIB_DUMP_USERS from SCAutolib.exceptions import SCAutolibException from SCAutolib.models import card as card_model from SCAutolib.models.CA import IPAServerCA @@ -71,6 +71,8 @@ def load(json_file, **kwargs): key=cnt["_key"], cert=cnt["_cert"]) logger.debug(f"User {user.__class__} is loaded: {user.__dict__}") + if "card" in cnt: + return user, Path(cnt["card"]) return user @@ -191,8 +193,9 @@ def __dict__(self): if type(v) in (PosixPath, Path): dict_[k] = str(v) - if self._card: - dict_["_card"] = str(self._card.dump_file) + if self._card and isinstance(self._card, card_model.VirtualCard): + dict_.pop("_card") + dict_["card"] = str(self._card.dump_file) return dict_ def delete_user(self): diff --git a/SCAutolib/templates/virtcacard.cil b/SCAutolib/templates/virtcacard.cil new file mode 100644 index 00000000..fe017c49 --- /dev/null +++ b/SCAutolib/templates/virtcacard.cil @@ -0,0 +1,3 @@ +(allow pcscd_t node_t(tcp_socket(node_bind))) +;; allow p11_child to read softhsm cache - not present in RHEL by default +(allow sssd_t named_cache_t(dir(read search))) diff --git a/SCAutolib/utils.py b/SCAutolib/utils.py index 045c97b8..c3190cc4 100644 --- a/SCAutolib/utils.py +++ b/SCAutolib/utils.py @@ -21,6 +21,8 @@ class OSVersion(Enum): Fedora = 1 RHEL_9 = 2 RHEL_8 = 3 + CentOS_8 = 4 + CentOS_9 = 5 def _check_selinux(): @@ -75,6 +77,10 @@ def _get_os_version(): return OSVersion.RHEL_8 elif "Fedora" in cnt: return OSVersion.Fedora + elif "CentOS Stream release 8" in cnt: + return OSVersion.CentOS_8 + elif "CentOS Stream release 9" in cnt: + return OSVersion.CentOS_9 else: raise SCAutolibException("OS is not detected.") diff --git a/test/conftest.py b/test/conftest.py index db759bf6..8407960e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,10 +1,11 @@ from os import environ -from shutil import rmtree + import logging import os -from subprocess import check_output, CalledProcessError +from shutil import rmtree -from SCAutolib import LIB_DIR +from SCAutolib import (LIB_DIR, LIB_BACKUP, LIB_DUMP, LIB_DUMP_CARDS, + LIB_DUMP_USERS, LIB_DUMP_CAS) from fixtures import * # noqa: F401 DIR_PATH = os.path.dirname(os.path.abspath(__file__)) @@ -51,7 +52,7 @@ def pytest_generate_tests(metafunc): "admin_passwd": ipa_admin_passwd, "root_passwd": ipa_root_passwd, "domain": ipa_hostname.split(".", 1)[1]} - metafunc.parametrize("ipa_config", [ipa_config]) + metafunc.parametrize("ipa_config", [ipa_config], scope="session") if 'ipa_ip' in metafunc.fixturenames and ipa_ip is not None: metafunc.parametrize("ipa_ip", [ipa_ip], scope="session") @@ -68,7 +69,9 @@ def pytest_generate_tests(metafunc): def pytest_sessionstart(session): - LIB_DIR.mkdir(exist_ok=True, parents=True) + for d in (LIB_DIR, LIB_BACKUP, LIB_DUMP, LIB_DUMP_CARDS, LIB_DUMP_USERS, + LIB_DUMP_CAS): + d.mkdir(exist_ok=True, parents=True) def pytest_sessionfinish(session, exitstatus): diff --git a/test/test_ca.py b/test/test_ca.py index 54c4daf6..c1b7a4d3 100644 --- a/test/test_ca.py +++ b/test/test_ca.py @@ -5,7 +5,7 @@ from python_freeipa.client_meta import ClientMeta from random import randint from shutil import copyfile -from subprocess import check_output, run, PIPE, CalledProcessError +from subprocess import check_output import SCAutolib.exceptions from SCAutolib import TEMPLATES_DIR diff --git a/test/test_card.py b/test/test_card.py index 06a4a16f..8e1588da 100644 --- a/test/test_card.py +++ b/test/test_card.py @@ -3,8 +3,10 @@ from subprocess import check_output, run from time import sleep -from SCAutolib.models.CA import LocalCA +from SCAutolib.models.card import Card from SCAutolib.models.file import SoftHSM2Conf +from SCAutolib.models.user import BaseUser +from SCAutolib.utils import dump_to_json @pytest.fixture() @@ -16,7 +18,8 @@ def gen_key_and_cert(local_ca_fixture, local_user): "-keyout", key, "-out", csr, "-subj", f"/CN={local_user.username}"] check_output(cmd, encoding="utf-8") - local_ca_fixture.request_cert(csr, username=local_user.username, cert_out=cert) + local_ca_fixture.request_cert(csr, username=local_user.username, + cert_out=cert) return key, cert @@ -78,3 +81,20 @@ def test_context_manager(local_user_with_smart_card): "uploaded to the virtual card" proc = run(["systemctl", "status", sc._service_name]) assert proc.returncode == 3 # Service is not active + + +@pytest.mark.service_restart +def test_load_user_with_card(local_user_with_smart_card): + local_user_with_smart_card.card.create() + local_user_with_smart_card.card.enroll() + + dump_to_json(local_user_with_smart_card.card) + dump_to_json(local_user_with_smart_card) + + user, card_file = BaseUser.load(local_user_with_smart_card.dump_file) + card = Card.load(card_file, user=user) + + assert card.uri == local_user_with_smart_card.card.uri + + card.insert() + card.remove() diff --git a/test/test_controller.py b/test/test_controller.py index e8a14e12..33d7df80 100644 --- a/test/test_controller.py +++ b/test/test_controller.py @@ -4,7 +4,6 @@ from subprocess import check_output from SCAutolib.controller import Controller -from SCAutolib.models.CA import IPAServerCA from conftest import FILES_DIR @@ -32,13 +31,8 @@ def test_parse_config(dummy_config): assert cnt.conf_path.is_absolute() assert isinstance(cnt.lib_conf, dict) -# -# def test_prepare(controller): -# """Test for overall setup including dumps.""" -# cnt: Controller = controller -# cnt.prepare(False, False, False) - +@pytest.mark.service_restart def test_setup_system(controller): cnt: Controller = controller packages = ["opensc", "httpd", "sssd", "sssd-tools", "gnutls-utils", @@ -74,20 +68,6 @@ def test_users_create_and_delete(controller, tmp_path, ipa_fixture): cnt.setup_user(u) for p in [t["card_dir"] for t in cnt.lib_conf["users"]]: assert p.joinpath("sofhtsm2.conf").exists() - - for u in cnt.lib_conf["users"]: - cnt.setup_user(u) finally: for u in cnt.users: u.delete_user() - -# def test_cas_create(controller): -# cnt: Controller = controller -# -# -# def test_enroll_card(controller): -# cnt: Controller = controller -# -# -# def test_cleanup(controller): -# cnt: Controller = controller diff --git a/test/test_user.py b/test/test_user.py index aaf65bbe..7c456137 100644 --- a/test/test_user.py +++ b/test/test_user.py @@ -1,11 +1,9 @@ -import pytest -import pwd import os -from pathlib import Path +import pwd +import pytest -from SCAutolib.models.CA import LocalCA -from SCAutolib.utils import dump_to_json from SCAutolib.models.user import BaseUser +from SCAutolib.utils import dump_to_json @pytest.mark.skipif(os.getuid() != 0, reason="Requires root privileges!") @@ -42,7 +40,7 @@ def test_add_and_remove_cnf(local_user, local_ca_fixture): def test_dump_and_load_user(local_user): dump_to_json(local_user) - user = BaseUser.load(local_user.dump_file) + user, card_file = BaseUser.load(local_user.dump_file) assert user.username == local_user.username assert user.pin == local_user.pin From 3652ba0a403f9c45de35b8c28f35f30d7ffd467f Mon Sep 17 00:00:00 2001 From: x00Pavel Date: Fri, 5 Aug 2022 09:07:01 +0200 Subject: [PATCH 11/11] Add function for user setup --- SCAutolib/models/CA.py | 3 ++- SCAutolib/models/authselect.py | 3 --- SCAutolib/models/card.py | 4 ++++ SCAutolib/utils.py | 6 ++++-- test/files/dummy_config_file.json | 6 +++--- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/SCAutolib/models/CA.py b/SCAutolib/models/CA.py index 76fbb5c2..6b32dd83 100644 --- a/SCAutolib/models/CA.py +++ b/SCAutolib/models/CA.py @@ -428,7 +428,7 @@ def _add_to_resolv(self): logger.debug(f"Original resolv.conf:\n{cnt}") if re.match(pattern, cnt) is None: logger.warning(f"Nameserver {self._ipa_server_ip} is not " - "present in /etc/resolve.conf. Adding...") + "present in /etc/resolv.conf. Adding...") cnt = (nameserver + "\n" + cnt) with open("/etc/resolv.conf", "w") as f: f.write(cnt) @@ -599,6 +599,7 @@ def cleanup(self): except exceptions.NotFound: logger.error(f"Current hostname ({gethostname()}) is not found " f"on the IPA server") + # Return code 2 means that the IPA client is not configured run(["ipa-client-install", "--uninstall", "-U"], return_code=[0, 2]) logger.info("IPA client is removed.") diff --git a/SCAutolib/models/authselect.py b/SCAutolib/models/authselect.py index ee8e6c7e..11deff47 100644 --- a/SCAutolib/models/authselect.py +++ b/SCAutolib/models/authselect.py @@ -68,9 +68,6 @@ def _restore(self): run(cmd) logger.debug("Authselect configuration is restored to:") run(["authselect", "current"], return_code=[0, 2]) - - # run(["authselect", "backup-remove", self.backup_name, "--debug"]) - # logger.debug("Authselect backup file is removed.") else: # as _set and _restore should be used in context manager defined in # this class, it should not happen that backup does not exist except diff --git a/SCAutolib/models/card.py b/SCAutolib/models/card.py index 4155313b..1c2a0302 100644 --- a/SCAutolib/models/card.py +++ b/SCAutolib/models/card.py @@ -10,6 +10,7 @@ from traceback import format_exc from SCAutolib import run, logger, TEMPLATES_DIR, LIB_DUMP_CARDS +from SCAutolib.exceptions import SCAutolibException class Card: @@ -67,6 +68,9 @@ def load(json_file, **kwars): card = VirtualCard(user=kwars["user"], softhsm2_conf=Path(cnt["softhsm"])) card.uri = cnt["uri"] + else: + raise SCAutolibException( + f"Unknown card type: {cnt['type']}") return card diff --git a/SCAutolib/utils.py b/SCAutolib/utils.py index c3190cc4..c34dd68b 100644 --- a/SCAutolib/utils.py +++ b/SCAutolib/utils.py @@ -1,5 +1,5 @@ """ -This module provides different additional helping functions that are used +This module provides a set of additional helping functions that are used across the library. These functions are made based on library demands and are not attended to cover some general use-cases or specific corner cases. """ @@ -54,7 +54,9 @@ def _gen_private_key(key_path: Path): :param key_path: path to output certificate """ - key = rsa.generate_private_key(public_exponent=65537, key_size=4096) + # CAC specification do not specify key size specifies key size + # up to 2048 bits, so keys greater than 2048 bits is not supported + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) with key_path.open("wb") as f: f.write(key.private_bytes( diff --git a/test/files/dummy_config_file.json b/test/files/dummy_config_file.json index 27fcb503..7e38f65b 100644 --- a/test/files/dummy_config_file.json +++ b/test/files/dummy_config_file.json @@ -2,7 +2,7 @@ "root_passwd": "redhat", "ca": { "local_ca": { - + "dir": "{path}/local-ca" }, "ipa": { "admin_passwd": "SECret.123", @@ -19,7 +19,7 @@ "name": "local-user", "passwd": "654321", "pin": "123456", - "card_dir": "/root/local-user", + "card_dir": "{path}/local-user", "card_type": "virtual", "local": true }, @@ -27,7 +27,7 @@ "name": "unit-user", "passwd": "unit-user-passwd", "pin": "123456", - "card_dir": "/root/unit-user", + "card_dir": "{path}/unit-user", "card_type": "virtual", "local": false }