From 6018daebd117c367005003a6232da7de7f5b4bd8 Mon Sep 17 00:00:00 2001 From: SteBaum Date: Tue, 7 Jan 2025 17:21:35 +0100 Subject: [PATCH 1/2] feat: added test framework --- .gitignore | 1 + README.md | 16 ++ poetry.lock | 124 +++++++++- pyproject.toml | 2 + tests/__init__.py | 0 tests/conftest.py | 537 +++++++++++++++++++++++++++++++++++++++++ tests/pytest.ini | 8 + tests/test_hbase.py | 261 ++++++++++++++++++++ tests/test_hdfs.py | 57 +++++ tests/test_hive.py | 152 ++++++++++++ tests/test_kerberos.py | 37 +++ tests/test_knox.py | 96 ++++++++ tests/test_phoenix.py | 157 ++++++++++++ tests/test_spark.py | 134 ++++++++++ tests/test_yarn.py | 11 + 15 files changed, 1592 insertions(+), 1 deletion(-) create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/pytest.ini create mode 100644 tests/test_hbase.py create mode 100644 tests/test_hdfs.py create mode 100644 tests/test_hive.py create mode 100644 tests/test_kerberos.py create mode 100644 tests/test_knox.py create mode 100644 tests/test_phoenix.py create mode 100644 tests/test_spark.py create mode 100644 tests/test_yarn.py diff --git a/.gitignore b/.gitignore index ae52934..065506d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ files .vagrant* ansible_collections/community* tdp_vars +tests/__pycache__ diff --git a/README.md b/README.md index c6a5630..d439aa2 100644 --- a/README.md +++ b/README.md @@ -261,6 +261,22 @@ disable 'tdp_user_table' drop 'tdp_user_table' ``` +## Automatic python tests with pytest and testinfra + +Run the tests sequentially: + +```sh +py.test tests +``` + +Run the tests in parallel: + +```sh +py.test -n 2 tests +``` + +**Note:** Running the tests in parallel requires more resources and tests might fail if resources are not sufficient. + ## Web UI links To access the components web UI links on your host , you will have to setup the IP adresses with their respective FQDN in `etc/hosts`, introduce the SSL certificate into your browser and install and configure Kerberos client. Luckely a container image has been created where verything is alraedy setup. However, the SSl certificate which is created with the `ansible_collections/tosit/tdp_prerequisites/playbooks/certificates.yml` playbook must already present in `files/tdp_getting_started_certs` otherwise the build will fail. diff --git a/poetry.lock b/poetry.lock index db3c2fb..f8555e8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -499,6 +499,34 @@ files = [ {file = "decorator-4.4.2.tar.gz", hash = "sha256:e3a62f0520172440ca0dcc823749319382e377f37f140a0b99ef45fecb84bfe7"}, ] +[[package]] +name = "exceptiongroup" +version = "1.2.2" +description = "Backport of PEP 654 (exception groups)" +optional = false +python-versions = ">=3.7" +files = [ + {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, + {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, +] + +[package.extras] +test = ["pytest (>=6)"] + +[[package]] +name = "execnet" +version = "2.1.1" +description = "execnet: rapid multi-Python deployment" +optional = false +python-versions = ">=3.8" +files = [ + {file = "execnet-2.1.1-py3-none-any.whl", hash = "sha256:26dee51f1b80cebd6d0ca8e74dd8745419761d3bef34163928cbebbdc4749fdc"}, + {file = "execnet-2.1.1.tar.gz", hash = "sha256:5189b52c6121c24feae288166ab41b32549c7e2348652736540b9e6e7d4e72e3"}, +] + +[package.extras] +testing = ["hatch", "pre-commit", "pytest", "tox"] + [[package]] name = "filelock" version = "3.16.1" @@ -658,6 +686,17 @@ files = [ docs = ["jaraco.packaging (>=8.2)", "rst.linker (>=1.9)", "sphinx"] testing = ["pytest (>=3.5,!=3.7.3)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=1.2.3)", "pytest-cov", "pytest-enabler", "pytest-flake8", "pytest-mypy"] +[[package]] +name = "iniconfig" +version = "2.0.0" +description = "brain-dead simple config-ini parsing" +optional = false +python-versions = ">=3.7" +files = [ + {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, + {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, +] + [[package]] name = "jinja2" version = "3.1.4" @@ -1224,6 +1263,21 @@ docs = ["furo (>=2024.8.6)", "proselint (>=0.14)", "sphinx (>=8.0.2)", "sphinx-a test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=8.3.2)", "pytest-cov (>=5)", "pytest-mock (>=3.14)"] type = ["mypy (>=1.11.2)"] +[[package]] +name = "pluggy" +version = "1.5.0" +description = "plugin and hook calling mechanisms for python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, +] + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + [[package]] name = "psycopg2-binary" version = "2.9.10" @@ -1278,6 +1332,7 @@ files = [ {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:bb89f0a835bcfc1d42ccd5f41f04870c1b936d8507c6df12b7737febc40f0909"}, {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f0c2d907a1e102526dd2986df638343388b94c33860ff3bbe1384130828714b1"}, {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f8157bed2f51db683f31306aa497311b560f2265998122abe1dce6428bd86567"}, + {file = "psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:27422aa5f11fbcd9b18da48373eb67081243662f9b46e6fd07c3eb46e4535142"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:eb09aa7f9cecb45027683bb55aebaaf45a0df8bf6de68801a6afdc7947bb09d4"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b73d6d7f0ccdad7bc43e6d34273f70d587ef62f824d7261c4ae9b8b1b6af90e8"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ce5ab4bf46a211a8e924d307c1b1fcda82368586a19d0a24f8ae166f5c784864"}, @@ -1533,6 +1588,68 @@ files = [ {file = "pyrsistent-0.20.0.tar.gz", hash = "sha256:4c48f78f62ab596c679086084d0dd13254ae4f3d6c72a83ffdf5ebdef8f265a4"}, ] +[[package]] +name = "pytest" +version = "8.3.4" +description = "pytest: simple powerful testing with Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-8.3.4-py3-none-any.whl", hash = "sha256:50e16d954148559c9a74109af1eaf0c945ba2d8f30f0a3d3335edde19788b6f6"}, + {file = "pytest-8.3.4.tar.gz", hash = "sha256:965370d062bce11e73868e0335abac31b4d3de0e82f4007408d242b4f8610761"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "sys_platform == \"win32\""} +exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=1.5,<2" +tomli = {version = ">=1", markers = "python_version < \"3.11\""} + +[package.extras] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] + +[[package]] +name = "pytest-testinfra" +version = "10.1.1" +description = "Test infrastructures" +optional = false +python-versions = ">=3.9" +files = [ + {file = "pytest-testinfra-10.1.1.tar.gz", hash = "sha256:a876f1453a01b58d94d9d936dd50344c2c01ac7880a2b41d15bdf233aed9cf1f"}, + {file = "pytest_testinfra-10.1.1-py3-none-any.whl", hash = "sha256:b990dc7d77b49a1bba24818fbff49b6171d8c46d606fb5ca86b937de690d7062"}, +] + +[package.dependencies] +pytest = ">=6" + +[package.extras] +ansible = ["ansible"] +paramiko = ["paramiko"] +salt = ["salt"] +winrm = ["pywinrm"] + +[[package]] +name = "pytest-xdist" +version = "3.6.1" +description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest_xdist-3.6.1-py3-none-any.whl", hash = "sha256:9ed4adfb68a016610848639bb7e02c9352d5d9f03d04809919e2dafc3be4cca7"}, + {file = "pytest_xdist-3.6.1.tar.gz", hash = "sha256:ead156a4db231eec769737f57668ef58a2084a34b2e55c4a8fa20d861107300d"}, +] + +[package.dependencies] +execnet = ">=2.1" +pytest = ">=7.0.0" + +[package.extras] +psutil = ["psutil (>=3.0)"] +setproctitle = ["setproctitle"] +testing = ["filelock"] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -1711,6 +1828,7 @@ files = [ {file = "ruamel.yaml.clib-0.2.12-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f66efbc1caa63c088dead1c4170d148eabc9b80d95fb75b6c92ac0aad2437d76"}, {file = "ruamel.yaml.clib-0.2.12-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:22353049ba4181685023b25b5b51a574bce33e7f51c759371a7422dcae5402a6"}, {file = "ruamel.yaml.clib-0.2.12-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:932205970b9f9991b34f55136be327501903f7c66830e9760a8ffb15b07f05cd"}, + {file = "ruamel.yaml.clib-0.2.12-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:a52d48f4e7bf9005e8f0a89209bf9a73f7190ddf0489eee5eb51377385f59f2a"}, {file = "ruamel.yaml.clib-0.2.12-cp310-cp310-win32.whl", hash = "sha256:3eac5a91891ceb88138c113f9db04f3cebdae277f5d44eaa3651a4f573e6a5da"}, {file = "ruamel.yaml.clib-0.2.12-cp310-cp310-win_amd64.whl", hash = "sha256:ab007f2f5a87bd08ab1499bdf96f3d5c6ad4dcfa364884cb4549aa0154b13a28"}, {file = "ruamel.yaml.clib-0.2.12-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:4a6679521a58256a90b0d89e03992c15144c5f3858f40d7c18886023d7943db6"}, @@ -1719,6 +1837,7 @@ files = [ {file = "ruamel.yaml.clib-0.2.12-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:811ea1594b8a0fb466172c384267a4e5e367298af6b228931f273b111f17ef52"}, {file = "ruamel.yaml.clib-0.2.12-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:cf12567a7b565cbf65d438dec6cfbe2917d3c1bdddfce84a9930b7d35ea59642"}, {file = "ruamel.yaml.clib-0.2.12-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:7dd5adc8b930b12c8fc5b99e2d535a09889941aa0d0bd06f4749e9a9397c71d2"}, + {file = "ruamel.yaml.clib-0.2.12-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1492a6051dab8d912fc2adeef0e8c72216b24d57bd896ea607cb90bb0c4981d3"}, {file = "ruamel.yaml.clib-0.2.12-cp311-cp311-win32.whl", hash = "sha256:bd0a08f0bab19093c54e18a14a10b4322e1eacc5217056f3c063bd2f59853ce4"}, {file = "ruamel.yaml.clib-0.2.12-cp311-cp311-win_amd64.whl", hash = "sha256:a274fb2cb086c7a3dea4322ec27f4cb5cc4b6298adb583ab0e211a4682f241eb"}, {file = "ruamel.yaml.clib-0.2.12-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:20b0f8dc160ba83b6dcc0e256846e1a02d044e13f7ea74a3d1d56ede4e48c632"}, @@ -1727,6 +1846,7 @@ files = [ {file = "ruamel.yaml.clib-0.2.12-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:749c16fcc4a2b09f28843cda5a193e0283e47454b63ec4b81eaa2242f50e4ccd"}, {file = "ruamel.yaml.clib-0.2.12-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:bf165fef1f223beae7333275156ab2022cffe255dcc51c27f066b4370da81e31"}, {file = "ruamel.yaml.clib-0.2.12-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:32621c177bbf782ca5a18ba4d7af0f1082a3f6e517ac2a18b3974d4edf349680"}, + {file = "ruamel.yaml.clib-0.2.12-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:b82a7c94a498853aa0b272fd5bc67f29008da798d4f93a2f9f289feb8426a58d"}, {file = "ruamel.yaml.clib-0.2.12-cp312-cp312-win32.whl", hash = "sha256:e8c4ebfcfd57177b572e2040777b8abc537cdef58a2120e830124946aa9b42c5"}, {file = "ruamel.yaml.clib-0.2.12-cp312-cp312-win_amd64.whl", hash = "sha256:0467c5965282c62203273b838ae77c0d29d7638c8a4e3a1c8bdd3602c10904e4"}, {file = "ruamel.yaml.clib-0.2.12-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:4c8c5d82f50bb53986a5e02d1b3092b03622c02c2eb78e29bec33fd9593bae1a"}, @@ -1735,6 +1855,7 @@ files = [ {file = "ruamel.yaml.clib-0.2.12-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:96777d473c05ee3e5e3c3e999f5d23c6f4ec5b0c38c098b3a5229085f74236c6"}, {file = "ruamel.yaml.clib-0.2.12-cp313-cp313-musllinux_1_1_i686.whl", hash = "sha256:3bc2a80e6420ca8b7d3590791e2dfc709c88ab9152c00eeb511c9875ce5778bf"}, {file = "ruamel.yaml.clib-0.2.12-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:e188d2699864c11c36cdfdada94d781fd5d6b0071cd9c427bceb08ad3d7c70e1"}, + {file = "ruamel.yaml.clib-0.2.12-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:4f6f3eac23941b32afccc23081e1f50612bdbe4e982012ef4f5797986828cd01"}, {file = "ruamel.yaml.clib-0.2.12-cp313-cp313-win32.whl", hash = "sha256:6442cb36270b3afb1b4951f060eccca1ce49f3d087ca1ca4563a6eb479cb3de6"}, {file = "ruamel.yaml.clib-0.2.12-cp313-cp313-win_amd64.whl", hash = "sha256:e5b8daf27af0b90da7bb903a876477a9e6d7270be6146906b276605997c7e9a3"}, {file = "ruamel.yaml.clib-0.2.12-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:fc4b630cd3fa2cf7fce38afa91d7cfe844a9f75d7f0f36393fa98815e911d987"}, @@ -1743,6 +1864,7 @@ files = [ {file = "ruamel.yaml.clib-0.2.12-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e2f1c3765db32be59d18ab3953f43ab62a761327aafc1594a2a1fbe038b8b8a7"}, {file = "ruamel.yaml.clib-0.2.12-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:d85252669dc32f98ebcd5d36768f5d4faeaeaa2d655ac0473be490ecdae3c285"}, {file = "ruamel.yaml.clib-0.2.12-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e143ada795c341b56de9418c58d028989093ee611aa27ffb9b7f609c00d813ed"}, + {file = "ruamel.yaml.clib-0.2.12-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:2c59aa6170b990d8d2719323e628aaf36f3bfbc1c26279c0eeeb24d05d2d11c7"}, {file = "ruamel.yaml.clib-0.2.12-cp39-cp39-win32.whl", hash = "sha256:beffaed67936fbbeffd10966a4eb53c402fafd3d6833770516bf7314bc6ffa12"}, {file = "ruamel.yaml.clib-0.2.12-cp39-cp39-win_amd64.whl", hash = "sha256:040ae85536960525ea62868b642bdb0c2cc6021c9f9d507810c0c604e66f5a7b"}, {file = "ruamel.yaml.clib-0.2.12.tar.gz", hash = "sha256:6c8fbb13ec503f99a91901ab46e0b07ae7941cd527393187039aec586fdfd36f"}, @@ -2007,4 +2129,4 @@ dev = ["doc8", "flake8", "flake8-import-order", "rstcheck[sphinx]", "sphinx"] [metadata] lock-version = "2.0" python-versions = ">=3.9.0,<4.0" -content-hash = "bff46103d2f2aea331921cb28e18a0f8b0af2ff39c68d79394081139538eddf4" +content-hash = "dce18b6175be778e16bea67f033930c55573920fdf60f555ed3748ec027f3f7c" diff --git a/pyproject.toml b/pyproject.toml index b2dc5ad..2a88e53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,8 @@ tdp-lib = { path = "tdp-lib", develop = true, extras=["visualization", "mysql", passlib = "1.7.4" jmespath = "1.0.1" ansible-lint = {version = "6.17.2", markers = "platform_system != 'Windows'"} +pytest-xdist = "^3.6.1" +pytest-testinfra = "^10.1.1" [build-system] requires = ["poetry-core"] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0146404 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,537 @@ +import contextlib +import csv +import functools +import io +import json +import logging +import re +import tempfile +import time +from datetime import datetime +from typing import Any, Callable, Dict, Generator, List, Optional + +import pytest +import testinfra +from filelock import FileLock +from testinfra.host import Host + +logger = logging.getLogger("testinfra") + +RE_CURL_HTTP_STATUS = re.compile("< HTTP/[^ ]+ ([^ ]+) ") + +USERS = [ + "tdp_user", + "smoke_user", +] + +USERS_CREDS = { + "tdp_user": "tdp_user123", + "smoke_user": "smoke_user123", +} + + +# Based on testinfra.plugin.pytest_generate_tests +# Enables to have a "host" fixture with a"session" scope +def pytest_generate_tests(metafunc): + if "_testinfra_host_custom" in metafunc.fixturenames: + if metafunc.config.option.hosts is not None: + hosts = metafunc.config.option.hosts.split(",") + elif hasattr(metafunc.module, "testinfra_hosts"): + hosts = metafunc.module.testinfra_hosts + else: + hosts = [None] + params = testinfra.get_hosts( + hosts, + connection=metafunc.config.option.connection, + ssh_config=metafunc.config.option.ssh_config, + ssh_identity_file=metafunc.config.option.ssh_identity_file, + sudo=metafunc.config.option.sudo, + sudo_user=metafunc.config.option.sudo_user, + ansible_inventory=metafunc.config.option.ansible_inventory, + force_ansible=metafunc.config.option.force_ansible, + ) + params = sorted(params, key=lambda x: x.backend.get_pytest_id()) + ids = [e.backend.get_pytest_id() for e in params] + metafunc.parametrize( + "_testinfra_host_custom", params, ids=ids, scope="session", indirect=True + ) + + +@pytest.fixture(scope="session") +def _testinfra_host_custom(request): + return request.param + + +@pytest.fixture(scope="session") +def host(_testinfra_host_custom): + return _testinfra_host_custom + + +def retry( + func: Optional[Callable] = None, + nb_retries: int = 6, + sleep_time_between_tries: int = 10, +): + if func is None: + return functools.partial( + retry, + nb_retries=nb_retries, + sleep_time_between_tries=sleep_time_between_tries, + ) + + @functools.wraps(func) + def retry_func(*args, **kwargs): + for i in range(nb_retries - 1): + try: + return func(*args, **kwargs) + except: + time.sleep(sleep_time_between_tries) + return func(*args, **kwargs) + + return retry_func + + +@pytest.fixture(scope="session") +def realm(host: Host) -> str: + return host.ansible.get_variables()["realm"] + + +# https://github.com/pytest-dev/pytest-xdist/tree/v2.4.0#making-session-scoped-fixtures-execute-only-once +@pytest.fixture(scope="session") +def lock(tmp_path_factory, worker_id: str): + no_lock_data = {} + + # Mode séquentiel + # Les variables à sauvegarder sont dans un "dict" + # de la fixture "no_lock_data" + @contextlib.contextmanager + def no_lock_context(namespace: str, teardown: bool = False): + yield no_lock_data.setdefault(namespace, {"last_worker": True}) + + # Mode parallèle + # Les variables à sauvegarder et à partager entre les workers + # sont sauvegardés sur un emplacement partagé entre les workers + # avec un format JSON et un lock + @contextlib.contextmanager + def lock_context(namespace: str, teardown: bool = False): + data = {} + # get the temp directory shared by all workers + root_tmp_dir = tmp_path_factory.getbasetemp().parent + fn = f"{root_tmp_dir}/{namespace}.json" + with FileLock(f"{fn}.lock"): + try: + with open(fn, "r") as fd: + data = json.load(fd) + except: + pass + + if teardown: + workers = data.setdefault("workers", {}) + if worker_id in workers: + del workers[worker_id] + data.setdefault("workers_teardown", []).append(worker_id) + else: + # Un "set" n'est pas serializable en json donc + # utilisation d'un "dict" à la place + data.setdefault("workers", {})[worker_id] = True + data.setdefault("workers_used", []).append(worker_id) + + data["last_worker"] = not data["workers"] + if data["last_worker"]: + data.setdefault("workers_last", []).append(worker_id) + yield data + with open(fn, "w") as fd: + json.dump(data, fd) + + if worker_id == "master": + return no_lock_context + return lock_context + + +@pytest.fixture(scope="session") +def users() -> List[str]: + return USERS + + +@pytest.fixture(scope="session", params=USERS) +def user( + host: Host, request: Any, realm: str, lock: Callable +) -> Generator[str, None, None]: + user: str = request.param + with lock(f"user_{user}"): + with host.sudo(user): + kinit_cmd = host.run(f"kinit -kt /home/{user}/{user}.keytab {user}@{realm}") + if kinit_cmd.rc != 0: + pytest.fail(kinit_cmd.stderr) + + yield user + with lock(f"user_{user}", teardown=True) as data: + if not data["last_worker"]: + return + with host.sudo(user): + host.run("kdestroy") + + +@pytest.fixture(scope="session") +def zk_hosts(host: Host) -> List[str]: + return host.backend.get_hosts("zk") + + +@pytest.fixture(scope="session") +def ranger_manager(host: Host) -> Dict[str, str]: + return { + "url": f"https://{host.backend.get_hosts('ranger_admin')[0]}:6182", + "auth_creds": "admin:RangerAdmin123", + } + + +@pytest.fixture(scope="session") +def knox_gateway(host: Host, user: str) -> Dict[str, str]: + return { + "url": f"https://{host.backend.get_hosts('knox')[0]}:8443", + "user_creds": USERS_CREDS[user], + } + + +@pytest.fixture(scope="session") +def hbase_rest(host: Host) -> str: + return f"https://{host.backend.get_hosts('hbase_rest')[0]}:8080" + + +@pytest.fixture(scope="session") +def upload_file( + host: Host, + lock: Callable, +) -> Generator[ + Callable[[str, str, Optional[str], Optional[str], Optional[int]], None], None, None +]: + def scp_func( + local_file: str, + distant_file: str, + owner: Optional[str] = None, + group: Optional[str] = None, + permissions: Optional[int] = None, + ): + with lock("upload_file") as data: + if distant_file in data.setdefault("uploaded_files", []): + return + add_opts = [] + + if owner: + add_opts += [f"owner={owner}"] + if group: + add_opts += [f"group={group}"] + if permissions: + add_opts += [f"mode={permissions:o}"] + result = host.ansible( + "copy", + f"src={local_file} dest={distant_file} {' '.join(add_opts)}", + check=False, + ) + data["uploaded_files"].append(distant_file) + assert "state" in result, result["msg"] + + yield scp_func + with lock("upload_file", teardown=True) as data: + if not data["last_worker"]: + return + uploaded_files = data.get("uploaded_files", []) + for uploaded_file in uploaded_files[:]: + host.ansible("file", f"state=absent path={uploaded_file}", check=False) + uploaded_files.remove(uploaded_file) + + +@pytest.fixture(scope="session") +def render_file( + lock: Callable, + upload_file: Callable[ + [str, str, Optional[str], Optional[str], Optional[int]], None + ], +) -> Callable[[str, str, Optional[Dict[str, str]]], None]: + def render_func( + distant_path: str, + content: str, + render_variables: Optional[Dict[str, str]] = None, + *args, + **kwargs, + ): + with lock("render_file"): + if render_variables is None: + rendered_content = content + else: + rendered_content = content.format(**render_variables) + + with tempfile.NamedTemporaryFile() as file_descriptor: + file_descriptor.write(bytes(rendered_content, "utf-8")) + file_descriptor.flush() + upload_file(file_descriptor.name, distant_path, *args, **kwargs) + + return render_func + + +@pytest.fixture(scope="session") +def hdfs_dir( + host: Host, + user: str, + lock: Callable, +) -> Generator[Callable[[str], None], None, None]: + def hdfs_dir_func(distant_hdfs_path: str): + with lock(f"hdfs_dir_{user}") as data: + if distant_hdfs_path in data.setdefault("hdfs_dirs", []): + return + with host.sudo(user): + host.check_output(f"hdfs dfs -mkdir '{distant_hdfs_path}'") + data["hdfs_dirs"].append(distant_hdfs_path) + + yield hdfs_dir_func + with lock(f"hdfs_dir_{user}", teardown=True) as data: + if not data["last_worker"]: + return + with host.sudo(user): + hdfs_dirs = data.get("hdfs_dirs", []) + for hdfs_dir in reversed(hdfs_dirs[:]): + host.check_output(f"hdfs dfs -rm -r -f {hdfs_dir}") + hdfs_dirs.remove(hdfs_dir) + + +@pytest.fixture(scope="session") +def render_hdfs_file( + host: Host, + user: str, + lock: Callable, + render_file: Callable[[str, str, Optional[Dict[str, str]]], None], +) -> Generator[ + Callable[ + [ + str, + str, + Optional[Dict[str, str]], + Optional[str], + Optional[str], + Optional[str], + Optional[int], + ], + None, + ], + None, + None, +]: + def render_hdfs_func( + distant_hdfs_path: str, + content: str, + render_variables: Optional[Dict[str, str]] = None, + distant_path: Optional[str] = None, + owner: Optional[str] = None, + group: Optional[str] = None, + permissions: Optional[int] = None, + ): + if distant_path is None: + distant_path = f"/tmp/{user}_{distant_hdfs_path.replace('/', '_')}" + + render_file( + distant_path, + content, + render_variables, + owner=user, + group=user, + permissions=permissions, + ) + with lock(f"render_hdfs_file_{user}") as data: + if distant_hdfs_path in data.setdefault("rendered_hdfs_files", []): + return + with host.sudo(user): + host.check_output( + f"hdfs dfs -put '{distant_path}' '{distant_hdfs_path}'" + ) + data["rendered_hdfs_files"].append(distant_hdfs_path) + if owner: + host.check_output( + f"hdfs dfs -chown '{owner}' '{distant_hdfs_path}'" + ) + if group: + host.check_output( + f"hdfs dfs -chgrp '{group}' '{distant_hdfs_path}'" + ) + if permissions: + host.check_output( + f"hdfs dfs -chmod '{permissions:0}' '{distant_hdfs_path}'" + ) + + yield render_hdfs_func + with lock(f"render_hdfs_file_{user}", teardown=True) as data: + if not data["last_worker"]: + return + with host.sudo(user): + rendered_hdfs_files = data.get("rendered_hdfs_files", []) + for hdfs_file in rendered_hdfs_files[:]: + host.check_output(f"hdfs dfs -rm -f {hdfs_file}") + rendered_hdfs_files.remove(hdfs_file) + + +@pytest.fixture(scope="session") +def dataset_weight() -> List[dict]: + nb_lines = 200 + categories = ("sportscar", "truck", "berline") + dataset = [ + { + "id": i, + "weight": i / 10 + 50, + "category": categories[i % len(categories)], + } + for i in range(nb_lines) + ] + return dataset + + +@pytest.fixture(scope="session") +def dataset_weight_csv( + dataset_weight: List[dict], + hdfs_dir: Callable[[str], None], + render_hdfs_file: Callable[[str, str], None], +) -> dict: + dataset_dir = "dataset_csv" + dataset_file = "weight.csv" + dataset_hdfs_path = f"{dataset_dir}/{dataset_file}" + nb_lines = len(dataset_weight) + hdfs_dir(dataset_dir) + + with io.StringIO(newline="") as string_buffer: + fieldnames = dataset_weight[0].keys() if len(dataset_weight) > 0 else [] + writer = csv.DictWriter(string_buffer, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(dataset_weight) + dataset_content = string_buffer.getvalue() + + render_hdfs_file(dataset_hdfs_path, dataset_content) + + return { + "hdfs_dir": dataset_dir, + "hdfs_path": dataset_hdfs_path, + "nb_lines": nb_lines, + } + + +@pytest.fixture(scope="session") +def curl( + host: Host, +) -> Callable: + def curl_func( + curl_args: str, + check_status_code: bool = True, + ) -> dict: + curl_cmd = "curl --verbose --insecure" + curl_result = host.run_expect([0], f"{curl_cmd} {curl_args}") + + match = RE_CURL_HTTP_STATUS.findall(curl_result.stderr) + if match: + http_status = int(match[-1]) + + if check_status_code and http_status >= 400: + pytest.fail(f"Erreur curl http status {http_status}\n{curl_result}") + return { + "command": curl_result, + "http_status": http_status, + } + else: + pytest.fail(f"HTTP status non trouvé\n{curl_result}") + return {} + + return curl_func + + +@pytest.fixture(scope="session") +def ranger_policy( + lock: Callable, + curl: Callable, + ranger_manager: Dict[str, str], +) -> Generator[Callable[[str, str, dict, List[dict], int], None], None, None]: + ranger_url = ranger_manager["url"] + ranger_policy_url = f"{ranger_url}/service/public/v2/api/policy" + ranger_audits_url = f"{ranger_url}/service/assets/exportAudit?pageSize=10&sortBy=createDate&sortType=desc" + ranger_creds = ranger_manager["auth_creds"] + curl_args = f"--user '{ranger_creds}' -H 'Accept: application/json'" + + def ranger_policy_func( + name: str, + service: str, + resources: dict, + policyItems: List[dict], + minimum_policy_pulled: int = 0, + **kwargs, + ): + with lock("ranger_policy") as data: + if name in data.setdefault("created_policies", {}): + return + policy = { + "name": name, + "service": service, + "description": name, + "isEnabled": True, + "isAuditEnabled": True, + "resources": resources, + "policyItems": policyItems, + } + policy.update(kwargs) + curl_args_create = f"{curl_args} -H 'Content-Type: application/json' -X POST '{ranger_policy_url}' -d '{json.dumps(policy)}'" + curl_result = curl(curl_args_create) + policy = json.loads(curl_result["command"].stdout) + data["created_policies"][name] = policy["id"] + policy_created_time = datetime.utcfromtimestamp(policy["createTime"] / 1000) + if minimum_policy_pulled > 0: + + @retry(sleep_time_between_tries=5, nb_retries=10) + def check_policy_pulled_n_times(): + curl_result = curl( + f"{curl_args} '{ranger_audits_url}&repository={service}'" + ) + result = json.loads(curl_result["command"].stdout) + times_pulled = 0 + for audit in result["vXPolicyExportAudits"]: + audit_update_date = datetime.strptime( + audit["updateDate"], "%Y-%m-%dT%H:%M:%SZ" + ) + if audit_update_date >= policy_created_time: + times_pulled += 1 + if times_pulled >= minimum_policy_pulled: + break + else: + raise ValueError( + "Pas assez de machines ont pull la dernière version de policy" + ) + + check_policy_pulled_n_times() + + yield ranger_policy_func + with lock("ranger_policy", teardown=True) as data: + if not data["last_worker"]: + return + created_policies = data.get("created_policies", {}) + for policy_name in list(created_policies.keys()): + policy_id = created_policies[policy_name] + curl_args_delete = ( + f"{curl_args} -X DELETE '{ranger_policy_url}/{policy_id}'" + ) + curl(curl_args_delete) + del created_policies[policy_name] + + +@pytest.fixture(scope="function") +def user_file( + user: str, render_file: Callable[[str, str], dict], request: pytest.FixtureRequest +) -> Dict[str, str]: + distant_file = f"/tmp/{user}_tempory_file_{request.node.originalname}" + distant_hdfs_path = "temporary_file" + file_content = f"{user} file test" + render_file( + distant_file, + file_content, + owner=user, + group=user, + permissions=0o644, + ) + + return { + "distant_file": distant_file, + "distant_hdfs_path": distant_hdfs_path, + "file_content": file_content, + } diff --git a/tests/pytest.ini b/tests/pytest.ini new file mode 100644 index 0000000..ef3fe6b --- /dev/null +++ b/tests/pytest.ini @@ -0,0 +1,8 @@ +[pytest] +log_cli = True +addopts = + --color=yes + --verbose + --connection=ansible + --sudo + -rsxX diff --git a/tests/test_hbase.py b/tests/test_hbase.py new file mode 100644 index 0000000..d5944ba --- /dev/null +++ b/tests/test_hbase.py @@ -0,0 +1,261 @@ +# HBase REST documentation https://hbase.apache.org/book.html#_rest +import base64 +import hashlib +import io +import json +import time +from typing import Callable, Generator, List + +import pytest +from testinfra import host + +from .conftest import USERS, retry + +testinfra_hosts = ["edge"] + + +def hbase_table_from_user(user: str): + return f"{user}_table_hbase".upper() + + +@pytest.fixture(scope="module") +def hbase_table(user: str) -> str: + return hbase_table_from_user(user) + + +@pytest.fixture(scope="module") +def nb_region_server(host: host.Host) -> int: + return len(host.backend.get_hosts("hbase_rs")) + + +@pytest.fixture(scope="module") +def hbase_ranger_policy( + nb_region_server: int, + ranger_policy: Callable[[str, str, dict, List[dict], int], dict], +): + resources = { + "table": { + "values": [hbase_table_from_user(user) for user in USERS], + "isExcludes": False, + }, + "column-family": {"values": ["*"], "isExcludes": False}, + "column": {"values": ["*"], "isExcludes": False}, + } + policyItems = [ + { + "users": USERS, + "accesses": [ + {"isAllowed": True, "type": "read"}, + {"isAllowed": True, "type": "write"}, + {"isAllowed": True, "type": "create"}, + {"isAllowed": True, "type": "admin"}, + ], + } + ] + ranger_policy("hbase_pytest", "hbase-tdp", resources, policyItems, nb_region_server) + + +def to_base64(value): + return str(base64.b64encode(bytes(value, "utf-8")), "ascii") + + +def create_hbase_dataset( + host: host.Host, + user: str, + hbase_table: str, + dataset_weight: List[dict], + curl: Callable, + hbase_rest: str, +) -> int: + nb_lines = len(dataset_weight) + row_ids = [ + hashlib.sha256(bytes(str(time.time()), "utf-8")).hexdigest() + + "_" + + str(data["id"]) + for data in dataset_weight + ] + + weight_column = to_base64("car:weight") + category_column = to_base64("car:category") + data = { + "Row": [ + { + "key": to_base64(row_id), + "Cell": [ + { + "column": weight_column, + "$": to_base64(str(data["weight"])), + }, + { + "column": category_column, + "$": to_base64(data["category"]), + }, + ], + } + for row_id, data in zip(row_ids, dataset_weight) + ] + } + curl_args = [ + "--negotiate", + "--user :", + "--header 'Accept: application/json'", + "--header 'Content-Type: application/json'", + "--request PUT", + f"--data '{json.dumps(data)}'", + f"{hbase_rest}/{hbase_table}/fakerow", + ] + with host.sudo(user): + curl(" ".join(curl_args)) + return nb_lines + + +@pytest.fixture(scope="module") +def setup_hbase_table( + host: host.Host, + user: str, + hbase_table: str, + lock: Callable, + hbase_ranger_policy: None, + dataset_weight: List[dict], + curl: Callable, + hbase_rest: str, +) -> Generator[dict, None, None]: + nb_lines: int + with lock(f"hbase_table_{hbase_table}") as data: + if not data.setdefault("hbase_table_created", False): + table_schema = { + "ColumnSchema": [ + {"name": "car", "VERSIONS": 3}, + {"name": "opinion", "VERSIONS": 5}, + ] + } + curl_args = [ + "--negotiate", + "--user :", + "--header 'Accept: application/json'", + "--header 'Content-Type: application/json'", + "--request POST", + f"--data '{json.dumps(table_schema)}'", + f"{hbase_rest}/{hbase_table}/schema", + ] + with host.sudo(user): + + @retry + def create_table(): + curl(" ".join(curl_args)) + + create_table() + data["hbase_table_created"] = True + nb_lines = create_hbase_dataset( + host, user, hbase_table, dataset_weight, curl, hbase_rest + ) + data["hbase_nb_lines"] = nb_lines + else: + nb_lines = data["hbase_nb_lines"] + + yield {"table_name": hbase_table, "nb_lines": nb_lines} + + with lock(f"hbase_table_{hbase_table}", teardown=True) as data: + if not data["last_worker"]: + return + curl_args = [ + "--negotiate", + "--user :", + "--request DELETE", + f"{hbase_rest}/{hbase_table}/schema", + ] + with host.sudo(user): + curl(" ".join(curl_args)) + data["hbase_table_created"] = False + + +def test_scanning_with_filter_works( + host: host.Host, + user: str, + setup_hbase_table: dict, + hbase_rest: str, + curl: Callable, +): + hbase_table = setup_hbase_table["table_name"] + nb_lines = setup_hbase_table["nb_lines"] + scanner_args = { + "batch": nb_lines, + "filter": json.dumps( + { + "op": "EQUAL", + "type": "RowFilter", + "comparator": {"value": "_150", "type": "SubstringComparator"}, + } + ), + } + curl_args = [ + "--negotiate", + "--user :", + "--header 'Accept: application/json'", + "--header 'Content-Type: application/json'", + ] + + curl_data_args = [ + "--request POST", + f"--data '{json.dumps(scanner_args)}'", + f"{hbase_rest}/{hbase_table}/scanner", + ] + + with host.sudo(user): + curl_result = curl(" ".join(curl_args + curl_data_args)) + for line in io.StringIO(curl_result["command"].stderr).readlines(): + if "Location:" in line: + scanner_uri = line.split(": ")[1].strip() + break + else: + raise ValueError("Pas trouvé d'url location pour le scanner") + try: + curl_result = curl(" ".join(curl_args + [scanner_uri])) + data = json.loads(curl_result["command"].stdout) + assert len(data["Row"]) == 1 + finally: + # free the scanner + curl(" ".join(curl_args + ["--request DELETE", scanner_uri])) + + +def test_hbase_script_is_executed( + host: host.Host, + user: str, + setup_hbase_table: dict, + render_file: Callable, +): + + hbase_table = setup_hbase_table["table_name"] + nb_lines = setup_hbase_table["nb_lines"] + + script = "count '{table}'" + script_path = f"/tmp/{user}_count_script.hrb" + + render_file( + script_path, + script, + {"table": hbase_table}, + owner=user, + group=user, + permissions=0o644, + ) + + with host.sudo(user): + stdout = host.check_output( + f"cat '{script_path}' | hbase shell --noninteractive" + ) + assert f"{nb_lines} row(s)" in stdout + + +def test_hbase_mapreduce_row_count_works( + host: host.Host, + user: str, + setup_hbase_table: dict, +): + hbase_table = setup_hbase_table["table_name"] + nb_lines = setup_hbase_table["nb_lines"] + with host.sudo(user): + stdout = host.run_expect( + [0], f"hbase org.apache.hadoop.hbase.mapreduce.RowCounter '{hbase_table}'" + ).stdout + assert f"ROWS={nb_lines}" in stdout diff --git a/tests/test_hdfs.py b/tests/test_hdfs.py new file mode 100644 index 0000000..819c5bb --- /dev/null +++ b/tests/test_hdfs.py @@ -0,0 +1,57 @@ +import os +from typing import Dict + +from testinfra import host + +testinfra_hosts = ["edge"] + + +def test_keytab_file_is_conform(host: host.Host, user: str): + keytab = host.file(os.path.join("/home", user, f"{user}.keytab")) + assert keytab.user == user + assert keytab.group == user + assert keytab.mode == 0o600 + + +def test_hdfs_user_directory_exists(host: host.Host, user: str): + with host.sudo(user): + host.check_output(f"hdfs dfs -ls {os.path.join('/user', user)}") + + +def test_create_temporary_file_in_user_directory( + host: host.Host, + user: str, + user_file: Dict[str, str], +): + distant_file = user_file["distant_file"] + distant_hdfs_path = user_file["distant_hdfs_path"] + file_content = user_file["file_content"] + + with host.sudo(user): + hdfs_cmd = host.run_expect( + [0], f"hdfs dfs -put {distant_file} {distant_hdfs_path}" + ) + assert hdfs_cmd.stderr == "", hdfs_cmd + assert hdfs_cmd.stdout == "", hdfs_cmd + + try: + hdfs_cmd = host.run_expect( + [0], f"hdfs dfs -stat '%F:%u:%g:%a' {distant_hdfs_path}" + ) + assert hdfs_cmd.stderr == "", hdfs_cmd + assert user in hdfs_cmd.stdout, hdfs_cmd + assert "644" in hdfs_cmd.stdout, hdfs_cmd + + hdfs_cmd = host.run_expect([0], f"hdfs dfs -cat {distant_hdfs_path}") + assert hdfs_cmd.stderr == "", hdfs_cmd + assert hdfs_cmd.stdout == file_content, hdfs_cmd + finally: + hdfs_cmd = host.run_expect([0], f"hdfs dfs -rm {distant_hdfs_path}") + assert hdfs_cmd.stderr == "", hdfs_cmd + assert f"Deleted {distant_hdfs_path}" in hdfs_cmd.stdout, hdfs_cmd + + hdfs_cmd = host.run_expect( + [1], f"hdfs dfs -stat '%F:%u:%g:%a' {distant_hdfs_path}" + ) + assert "No such file or directory" in hdfs_cmd.stderr, hdfs_cmd + assert hdfs_cmd.stdout == "", hdfs_cmd diff --git a/tests/test_hive.py b/tests/test_hive.py new file mode 100644 index 0000000..23581b9 --- /dev/null +++ b/tests/test_hive.py @@ -0,0 +1,152 @@ +import textwrap +from typing import Callable, Generator, List + +import pytest + +from testinfra import host + +from .conftest import retry + +testinfra_hosts = ["edge"] + + +@pytest.fixture(scope="module") +def hive_database(user: str) -> str: + return f"{user}_db" + + +@pytest.fixture(scope="module") +def hive_table(user: str) -> str: + return f"{user}_table" + + +@pytest.fixture(scope="module") +def hive_ranger_policy( + user: str, + hive_database: str, + ranger_policy: Callable[[str, str, dict, List[dict]], dict], +): + resources = { + "database": {"values": [hive_database], "isExcludes": False}, + "table": {"values": ["*"], "isExcludes": False}, + "column": {"values": ["*"], "isExcludes": False}, + } + policyItems = [ + { + "users": [user], + "accesses": [ + {"isAllowed": True, "type": "select"}, + {"isAllowed": True, "type": "update"}, + {"isAllowed": True, "type": "create"}, + {"isAllowed": True, "type": "drop"}, + {"isAllowed": True, "type": "alter"}, + {"isAllowed": True, "type": "index"}, + {"isAllowed": True, "type": "lock"}, + {"isAllowed": True, "type": "all"}, + {"isAllowed": True, "type": "read"}, + {"isAllowed": True, "type": "write"}, + {"isAllowed": True, "type": "refresh"}, + ], + } + ] + ranger_policy(f"{user}_hive_test", "hive-tdp", resources, policyItems) + + +@pytest.fixture(scope="module") +def setup_hive_database( + host: host.Host, + user: str, + hive_database: str, + hive_ranger_policy: None, +) -> Generator[str, None, None]: + with host.sudo(user): + retry( + lambda: host.check_output( + f"beeline -e \"CREATE DATABASE {hive_database} LOCATION '{hive_database}'\"" + ) + )() + yield hive_database + with host.sudo(user): + host.check_output(f"beeline -e 'DROP DATABASE {hive_database}'") + + +@pytest.fixture(scope="module") +def setup_hive_table( + host: host.Host, + user: str, + dataset_weight_csv: dict, + setup_hive_database: str, + hive_table: str, + render_file: Callable, +) -> Generator[str, None, None]: + hive_database = setup_hive_database + dataset_csv = dataset_weight_csv["hdfs_dir"] + create_table_script = """ + USE {database}; + + CREATE EXTERNAL TABLE {table} ( + id int, + weight float, + category varchar(9) + ) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY "," + STORED AS TEXTFILE + LOCATION "{dataset_csv}" + tblproperties("skip.header.line.count"="1"); + """ + create_table_script = textwrap.dedent(create_table_script) + distant_path = f"/tmp/{user}_create_table.hql" + render_file( + distant_path, + create_table_script, + { + "database": hive_database, + "table": hive_table, + "dataset_csv": dataset_csv, + }, + owner=user, + group=user, + permissions=0o644, + ) + with host.sudo(user): + host.check_output(f"beeline -f '{distant_path}'") + yield hive_table + with host.sudo(user): + host.check_output( + f"beeline -e 'use {hive_database}' -e 'DROP TABLE {hive_table}'" + ) + + +def test_hive_csv_script_is_executed( + host: host.Host, + user: str, + dataset_weight_csv: dict, + setup_hive_database: str, + setup_hive_table: str, + render_file: Callable, +): + nb_lines = dataset_weight_csv["nb_lines"] + hive_database = setup_hive_database + hive_table = setup_hive_table + script = """ + USE {database}; + + SELECT COUNT(*) FROM {table}; + """ + script = textwrap.dedent(script) + script_path = f"/tmp/{user}_script.hql" + render_file( + script_path, + script, + { + "database": hive_database, + "table": hive_table, + }, + owner=user, + group=user, + permissions=0o644, + ) + with host.sudo(user): + stdout = host.check_output(f"beeline -f '{script_path}'") + assert f"{nb_lines}" in stdout diff --git a/tests/test_kerberos.py b/tests/test_kerberos.py new file mode 100644 index 0000000..bfa1c63 --- /dev/null +++ b/tests/test_kerberos.py @@ -0,0 +1,37 @@ +from operator import itemgetter + +import pytest + +from testinfra import host + +testinfra_hosts = ["ldap"] + + +@pytest.fixture(scope="module") +def admin_credentials(host: host.Host) -> tuple: + return itemgetter("kadmin_principal", "kadmin_password")( + host.ansible.get_variables() + ) + + +def test_krb5_server_is_installed(host: host.Host): + krb5 = host.package("krb5-server") + assert krb5.is_installed + + +def test_kdc_is_running(host: host.Host): + kdc = host.service("krb5kdc") + assert kdc.is_running + assert kdc.is_enabled + + +def test_kinit_kadmin_kdestroy_is_working( + host: host.Host, admin_credentials: tuple, realm: str +): + kadmin_principal, kadmin_password = admin_credentials + kadmin_principal = kadmin_principal.replace("{{ realm }}", realm) + host.run_expect([0], f'kinit "{kadmin_principal}" <<< "{kadmin_password}"') + try: + host.run_expect([0], f'kadmin list_principals <<< "{kadmin_password}"') + finally: + host.run_expect([0], "kdestroy") diff --git a/tests/test_knox.py b/tests/test_knox.py new file mode 100644 index 0000000..64dcf4b --- /dev/null +++ b/tests/test_knox.py @@ -0,0 +1,96 @@ +import json +from typing import Callable, Dict, List + +import pytest + +from testinfra import host + +from .conftest import USERS, retry + +testinfra_hosts = ["edge"] + + +@pytest.fixture(scope="module") +def webhdfs_ranger_policy( + ranger_policy: Callable[[str, str, dict, List[dict]], dict], +): + resources = { + "topology": {"values": ["tdpldap"], "isExcludes": False}, + "service": {"values": ["WEBHDFS"], "isExcludes": False}, + } + policyItems = [ + { + "users": USERS, + "accesses": [ + {"isAllowed": True, "type": "allow"}, + ], + } + ] + ranger_policy("webhdfs_test", "knox-tdp", resources, policyItems) + + +def test_create_webhdfs_temporary_file_in_user_directory( + host: host.Host, + user: str, + user_file: Dict[str, str], + webhdfs_ranger_policy: None, + knox_gateway: Dict[str, str], + curl: Callable, +): + distant_file = user_file["distant_file"] + distant_hdfs_path = user_file["distant_hdfs_path"] + file_content = user_file["file_content"] + user_creds = knox_gateway["user_creds"] + knox_url = knox_gateway["url"] + knox_gateway_url = f"{knox_url}/gateway/tdpldap/webhdfs/v1" + + curl_result = retry( + lambda: curl( + f"-L -T {distant_file} -u {user}:{user_creds} -X PUT '{knox_gateway_url}/user/{user}/{distant_hdfs_path}?op=CREATE'" + ) + )() + assert curl_result["http_status"] == 201, curl_result + + try: + curl_result = curl( + f"-L -u {user}:{user_creds} -X GET '{knox_gateway_url}/user/{user}/{distant_hdfs_path}?op=OPEN'" + ) + assert file_content in curl_result["command"].stdout, curl_result + + curl_result = curl( + f"-L -u {user}:{user_creds} -X GET '{knox_gateway_url}/user/{user}/{distant_hdfs_path}?op=LISTSTATUS'" + ) + assert curl_result["http_status"] == 200, curl_result + + liststatus = json.loads(curl_result["command"].stdout) + assert "FileStatuses" in liststatus, curl_result + assert "FileStatus" in liststatus["FileStatuses"], curl_result + assert len(liststatus["FileStatuses"]["FileStatus"]) == 1, curl_result + + filestatus = liststatus["FileStatuses"]["FileStatus"][0] + assert filestatus["group"] == user, curl_result + assert filestatus["owner"] == user, curl_result + assert filestatus["type"] == "FILE", curl_result + finally: + curl_result = curl( + f"-L -u {user}:{user_creds} -X DELETE '{knox_gateway_url}/user/{user}/{distant_hdfs_path}?op=DELETE'" + ) + assert curl_result["http_status"] == 200, curl_result + + curl_result = curl( + f"-L -u {user}:{user_creds} -X GET '{knox_gateway_url}/user/{user}/{distant_hdfs_path}?op=LISTSTATUS'", + check_status_code=False, + ) + liststatus = json.loads(curl_result["command"].stdout) + assert "RemoteException" in liststatus, curl_result + + assert "exception" in liststatus["RemoteException"], curl_result + assert ( + "FileNotFoundException" in liststatus["RemoteException"]["exception"] + ), curl_result + + assert "message" in liststatus["RemoteException"], curl_result + assert ( + f"File /user/{user}/{distant_hdfs_path} does not exist." + in liststatus["RemoteException"]["message"] + ), curl_result diff --git a/tests/test_phoenix.py b/tests/test_phoenix.py new file mode 100644 index 0000000..709e351 --- /dev/null +++ b/tests/test_phoenix.py @@ -0,0 +1,157 @@ +import textwrap +from typing import Callable, Generator, List + +import pytest + +from testinfra import host + +from .conftest import retry + +testinfra_hosts = ["edge"] + + +@pytest.fixture(scope="module") +def phoenix_table(user: str) -> str: + return f"{user}_table_phoenix".upper() + + +@pytest.fixture(scope="module") +def phoenix_ranger_policy( + user: str, + phoenix_table: str, + ranger_policy: Callable[[str, str, dict, List[dict]], dict], +): + resources = { + "table": {"values": [phoenix_table], "isExcludes": False}, + "column-family": {"values": ["*"], "isExcludes": False}, + "column": {"values": ["*"], "isExcludes": False}, + } + policyItems = [ + { + "users": [user], + "accesses": [ + {"isAllowed": True, "type": "read"}, + {"isAllowed": True, "type": "write"}, + {"isAllowed": True, "type": "create"}, + {"isAllowed": True, "type": "admin"}, + ], + } + ] + ranger_policy(f"{user}_phoenix_test", "hbase-tdp", resources, policyItems) + + +@pytest.fixture(scope="module") +def setup_phoenix_table( + host: host.Host, + user: str, + phoenix_table: str, + render_file: Callable, + phoenix_ranger_policy: None, +) -> Generator[str, None, None]: + create_table_script = """ + CREATE TABLE IF NOT EXISTS {table} ( + id bigint not null, + car.weight float, + car.category varchar, + CONSTRAINT pk PRIMARY KEY (id) + ); + """ + create_table_script = textwrap.dedent(create_table_script) + script_path = f"/tmp/{user}_create_table.sql" + render_file( + script_path, + create_table_script, + {"table": phoenix_table}, + owner=user, + group=user, + permissions=0o644, + ) + with host.sudo(user): + + def create_table(): + host.check_output(f"sqlline.py '{script_path}'") + + retry(create_table)() + yield phoenix_table + drop_table_script = "DROP TABLE {table};" + script_path = f"/tmp/{user}_drop_table.sql" + render_file( + script_path, + drop_table_script, + {"table": phoenix_table}, + owner=user, + group=user, + permissions=0o644, + ) + with host.sudo(user): + host.check_output(f"sqlline.py '{script_path}'") + + +@pytest.fixture(scope="module") +def phoenix_dataset( + host: host.Host, + user: str, + setup_phoenix_table: str, + render_file: Callable, + dataset_weight: List[dict], +) -> dict: + phoenix_table = setup_phoenix_table + nb_lines = len(dataset_weight) + upsert = "UPSERT INTO {table} VALUES ({id}, {weight}, '{category}');" + upsert_commands = [ + upsert.format( + table=phoenix_table, + id=data["id"], + weight=data["weight"], + category=data["category"], + ) + for data in dataset_weight + ] + categories = list(set(data["category"] for data in dataset_weight)) + script = "\n".join(upsert_commands) + + script_path = f"/tmp/{user}_dataset.sql" + render_file( + script_path, + script, + owner=user, + group=user, + permissions=0o644, + ) + + with host.sudo(user): + host.check_output(f"sqlline.py {script_path}") + return {"nb_lines": nb_lines, "categories": categories} + + +def test_phoenix_script_is_executed( + host: host.Host, + user: str, + setup_phoenix_table: str, + render_file: Callable, + phoenix_dataset: dict, +): + phoenix_table = setup_phoenix_table + categories = phoenix_dataset["categories"] + script = """ + SELECT * FROM {table} LIMIT 10; + SELECT COUNT(*) FROM {table} WHERE car.weight > 10; + SELECT car.category, AVG(car.weight) FROM {table} GROUP BY car.category; + """ + script = textwrap.dedent(script) + script_path = f"/tmp/{user}_script.sql" + render_file( + script_path, + script, + { + "table": phoenix_table, + }, + owner=user, + group=user, + permissions=0o644, + ) + with host.sudo(user): + stdout = host.check_output(f"sqlline.py '{script_path}'") + # can't assert because phoenix sqlline client doing tricks in terminal + # can only rely on command return code + # assert all(category in stdout for category in categories) diff --git a/tests/test_spark.py b/tests/test_spark.py new file mode 100644 index 0000000..0ff3d5c --- /dev/null +++ b/tests/test_spark.py @@ -0,0 +1,134 @@ +import textwrap +from typing import Any, Callable + +import pytest + +from testinfra import host + +testinfra_hosts = ["edge"] + + +@pytest.fixture(scope="module", params=["spark", "spark3"]) +def spark_version(request: Any) -> str: + return request.param + + +def test_pyspark_csv_script_is_executed( + host: host.Host, + user: str, + spark_version: str, + dataset_weight_csv: dict, + render_file: Callable, +): + code = """ + #!/bin/sh + export SPARK_CONF_DIR=/etc/{spark_version}/conf + expect -c " + set timeout 45 + spawn /opt/tdp/{spark_version}/bin/pyspark --master yarn --deploy-mode client --executor-memory 5G --num-executors 3 --executor-cores 2 + expect \\">>>\\" + send \\"data = spark.read.option(\\\\\\"header\\\\\\", True).csv(\\\\\\"{dataset_hdfs_path}\\\\\\")\\r\\" + expect \\">>>\\" + send \\"print(\\\\\\"data count is {{}}\\\\\\".format(data.count()))\\r\\" + expect \\">>>\\" + send \\"quit()\\r\\" + interact + exit 0 + " + """ + code = textwrap.dedent(code) + dataset_hdfs_path = dataset_weight_csv["hdfs_path"] + code_path = f"/tmp/{user}_pyspark_test.sh" + owner = user + group = user + permissions = 0o755 + + render_file( + code_path, + code, + { + "dataset_hdfs_path": dataset_hdfs_path, + "spark_version": spark_version, + }, + owner=owner, + group=group, + permissions=permissions, + ) + + nb_lines = dataset_weight_csv["nb_lines"] + with host.sudo(user): + stdout = host.check_output(code_path) + assert f"data count is {nb_lines}" in stdout + + +def test_spark_submit_csv_script_is_executed( + host: host.Host, + user: str, + spark_version: str, + dataset_weight_csv: dict, + render_file: Callable, +): + code = """ + from pyspark.sql import SparkSession + + spark = SparkSession.builder.appName( + "Test spark-submit" + ).getOrCreate() + + data = spark.read.option( + "header", True + ).csv("{dataset_hdfs_path}") + + print( + "data count is {{}}".format( + data.count() + ) + ) + """ + code = textwrap.dedent(code) + dataset_hdfs_path = dataset_weight_csv["hdfs_path"] + code_path = f"/tmp/{user}_spark_test.py" + owner = user + group = user + permissions = 0o755 + + render_file( + code_path, + code, + { + "dataset_hdfs_path": dataset_hdfs_path, + }, + owner=owner, + group=group, + permissions=permissions, + ) + + nb_lines = dataset_weight_csv["nb_lines"] + with host.sudo(user): + stdout = host.check_output( + f"SPARK_CONF_DIR=/etc/{spark_version}/conf" + f" /opt/tdp/{spark_version}/bin/spark-submit" + " --master yarn" + " --deploy-mode client" + " --executor-memory 5G" + " --num-executors 3" + " --executor-cores 2" + f" {code_path}" + ) + assert f"data count is {nb_lines}" in stdout + + +def test_spark_submit_jar_is_executed(host: host.Host, user: str, spark_version: str): + with host.sudo(user): + spark_stdout = host.check_output( + f"SPARK_CONF_DIR=/etc/{spark_version}/conf" + f" /opt/tdp/{spark_version}/bin/spark-submit" + " --master yarn" + " --deploy-mode client" + " --executor-memory 5G" + " --num-executors 3" + " --executor-cores 2" + " --class org.apache.spark.examples.JavaSparkPi" + f" /opt/tdp/{spark_version}/examples/jars/spark-examples*.jar 50" + ) + assert "Pi is roughly" in spark_stdout diff --git a/tests/test_yarn.py b/tests/test_yarn.py new file mode 100644 index 0000000..aa5f86c --- /dev/null +++ b/tests/test_yarn.py @@ -0,0 +1,11 @@ +from testinfra import host + +testinfra_hosts = ["edge"] + + +def test_yarn_works(host: host.Host, user: str): + with host.sudo(user): + yarn_stdout = host.check_output( + 'yarn jar "/opt/tdp/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar" pi 3 50' + ) + assert "Estimated value of Pi is" in yarn_stdout From 3f02b726f7e819df3992f78ad0e9c8c98b7d2d82 Mon Sep 17 00:00:00 2001 From: SteBaum Date: Wed, 29 Jan 2025 10:02:23 +0100 Subject: [PATCH 2/2] feat: added webhdfs test without knox --- tests/conftest.py | 5 ++++ tests/test_hdfs.py | 72 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 0146404..b3c26d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -193,6 +193,11 @@ def knox_gateway(host: Host, user: str) -> Dict[str, str]: } +@pytest.fixture(scope="session") +def webhdfs_gateway(host: Host, user: str) -> str: + return f"https://{host.backend.get_hosts('hdfs_nn')[0]}:9871" + + @pytest.fixture(scope="session") def hbase_rest(host: Host) -> str: return f"https://{host.backend.get_hosts('hbase_rest')[0]}:8080" diff --git a/tests/test_hdfs.py b/tests/test_hdfs.py index 819c5bb..dce3581 100644 --- a/tests/test_hdfs.py +++ b/tests/test_hdfs.py @@ -1,8 +1,12 @@ +import json import os -from typing import Dict +from typing import Callable, Dict from testinfra import host +from .conftest import retry + + testinfra_hosts = ["edge"] @@ -55,3 +59,69 @@ def test_create_temporary_file_in_user_directory( ) assert "No such file or directory" in hdfs_cmd.stderr, hdfs_cmd assert hdfs_cmd.stdout == "", hdfs_cmd + + +def test_create_webhdfs_temporary_file_in_user_directory( + host: host.Host, + user: str, + user_file: Dict[str, str], + webhdfs_gateway: str, + curl: Callable, +): + distant_file = user_file["distant_file"] + distant_hdfs_path = user_file["distant_hdfs_path"] + file_content = user_file["file_content"] + webhdfs_url = webhdfs_gateway + webhdfs_gateway_url = f"{webhdfs_url}/webhdfs/v1" + + with host.sudo(user): + curl_result = retry( + lambda: curl( + f"-L -T {distant_file} --negotiate -u : -X PUT '{webhdfs_gateway_url}/user/{user}/{distant_hdfs_path}?op=CREATE'" + ) + )() + assert curl_result["http_status"] == 201, curl_result + + try: + curl_result = curl( + f"-L --negotiate -u : -X GET '{webhdfs_gateway_url}/user/{user}/{distant_hdfs_path}?op=OPEN'" + ) + assert file_content in curl_result["command"].stdout, curl_result + + curl_result = curl( + f"-L --negotiate -u : -X GET '{webhdfs_gateway_url}/user/{user}/{distant_hdfs_path}?op=LISTSTATUS'" + ) + assert curl_result["http_status"] == 200, curl_result + + liststatus = json.loads(curl_result["command"].stdout) + assert "FileStatuses" in liststatus, curl_result + assert "FileStatus" in liststatus["FileStatuses"], curl_result + assert len(liststatus["FileStatuses"]["FileStatus"]) == 1, curl_result + + filestatus = liststatus["FileStatuses"]["FileStatus"][0] + assert filestatus["group"] == user, curl_result + assert filestatus["owner"] == user, curl_result + assert filestatus["type"] == "FILE", curl_result + finally: + curl_result = curl( + f"-L --negotiate -u : -X DELETE '{webhdfs_gateway_url}/user/{user}/{distant_hdfs_path}?op=DELETE'" + ) + assert curl_result["http_status"] == 200, curl_result + + curl_result = curl( + f"-L --negotiate -u : -X GET '{webhdfs_gateway_url}/user/{user}/{distant_hdfs_path}?op=LISTSTATUS'", + check_status_code=False, + ) + liststatus = json.loads(curl_result["command"].stdout) + assert "RemoteException" in liststatus, curl_result + + assert "exception" in liststatus["RemoteException"], curl_result + assert ( + "FileNotFoundException" in liststatus["RemoteException"]["exception"] + ), curl_result + + assert "message" in liststatus["RemoteException"], curl_result + assert ( + f"File /user/{user}/{distant_hdfs_path} does not exist." + in liststatus["RemoteException"]["message"] + ), curl_result