Skip to content

Commit

Permalink
Merge pull request #145 from pehala/black
Browse files Browse the repository at this point in the history
Use black for formatting
  • Loading branch information
pehala authored Feb 23, 2023
2 parents 875a63a + 744bff4 commit 8c68e03
Show file tree
Hide file tree
Showing 60 changed files with 718 additions and 660 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ ifdef html
PYTEST += --html=$(resultsdir)/report-$(@F).html
endif

commit-acceptance: pylint mypy all-is-package
commit-acceptance: black pylint mypy all-is-package

pylint mypy: pipenv-dev
pipenv run $@ $(flags) testsuite

black: pipenv-dev
pipenv run black --line-length 120 --check testsuite --diff

all-is-package:
@echo
@echo "Searching for dirs missing __init__.py"
Expand Down
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mypy = "*"
pylint = "*"
types-PyYAML = "*"
black = {version = "*", extras = ["d"]}
# Have commented out python-language-server to make it available quickly
# for the development
#python-language-server = "*"
Expand Down
97 changes: 56 additions & 41 deletions testsuite/certificates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class CFSSLException(Exception):
@dataclasses.dataclass
class CertInfo:
"""Certificate configuration details"""

hosts: Optional[Union[Collection[str], str]] = None
ca: bool = False
children: Optional[Dict[str, Optional["CertInfo"]]] = None
Expand All @@ -24,6 +25,7 @@ class CertInfo:
@dataclasses.dataclass
class Certificate:
"""Object representing Signed certificate"""

key: str
certificate: str
chain: str
Expand All @@ -32,13 +34,14 @@ class Certificate:
@dataclasses.dataclass
class UnsignedKey:
"""Object representing generated key waiting to be signed"""

key: str
csr: str


def build_cert_request_json(common_name: str,
names: Optional[List[Dict[str, str]]] = None,
hosts: Optional[Collection[str]] = None) -> dict:
def build_cert_request_json(
common_name: str, names: Optional[List[Dict[str, str]]] = None, hosts: Optional[Collection[str]] = None
) -> dict:
"""
Build certificate request for the CFSSL client
:param common_name: certificate identifier
Expand All @@ -50,15 +53,13 @@ def build_cert_request_json(common_name: str,
"CN": common_name,
"names": names,
"hosts": hosts,
"key": {
"algo": "rsa",
"size": 4096
},
"key": {"algo": "rsa", "size": 4096},
}


class CFSSLClient:
"""Client for working with CFSSL library"""

DEFAULT_NAMES = [
{
"O": "Red Hat Inc.",
Expand All @@ -73,20 +74,20 @@ def __init__(self, binary) -> None:
super().__init__()
self.binary = binary

def _execute_command(self,
command: str,
*args: str,
stdin: Optional[str] = None,
env: Optional[Dict[str, str]] = None):
def _execute_command(
self, command: str, *args: str, stdin: Optional[str] = None, env: Optional[Dict[str, str]] = None
):
args = (self.binary, command, *args)
try:
response = subprocess.run(args,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
input=stdin,
universal_newlines=bool(stdin),
check=False,
env=env)
response = subprocess.run(
args,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
input=stdin,
universal_newlines=bool(stdin),
check=False,
env=env,
)
if response.returncode != 0:
raise CFSSLException(f"CFSSL command {args} returned non-zero response code, error {response.stderr}")
return json.loads(response.stdout)
Expand All @@ -101,8 +102,9 @@ def exists(self):
"""Returns true if the binary exists and is correctly set up"""
return shutil.which(self.binary)

def generate_key(self, common_name: str, names: Optional[List[Dict[str, str]]] = None,
hosts: Optional[Collection[str]] = None) -> UnsignedKey:
def generate_key(
self, common_name: str, names: Optional[List[Dict[str, str]]] = None, hosts: Optional[Collection[str]] = None
) -> UnsignedKey:
"""Generates unsigned key"""
data = build_cert_request_json(common_name, names, hosts)

Expand All @@ -114,35 +116,46 @@ def sign_intermediate_authority(self, key: UnsignedKey, certificate_authority: C
args = [
"-ca=env:CA",
"-ca-key=env:KEY",
f"-config={resources.files('testsuite.resources.tls').joinpath('intermediate_config.json')}"
f"-config={resources.files('testsuite.resources.tls').joinpath('intermediate_config.json')}",
]
result = self._execute_command("sign", *args, "-", stdin=key.csr, env={
"CA": certificate_authority.certificate,
"KEY": certificate_authority.key})
result = self._execute_command(
"sign",
*args,
"-",
stdin=key.csr,
env={"CA": certificate_authority.certificate, "KEY": certificate_authority.key},
)
return Certificate(key=key.key, certificate=result["cert"], chain=result["cert"])

def sign(self, key: UnsignedKey, certificate_authority: Certificate) -> Certificate:
"""Signs unsigned key"""
result = self._execute_command("sign", "-ca=env:CA", "-ca-key=env:KEY", "-", stdin=key.csr, env={
"CA": certificate_authority.certificate,
"KEY": certificate_authority.key})
result = self._execute_command(
"sign",
"-ca=env:CA",
"-ca-key=env:KEY",
"-",
stdin=key.csr,
env={"CA": certificate_authority.certificate, "KEY": certificate_authority.key},
)
chain = result["cert"] + certificate_authority.chain
return Certificate(key=key.key, certificate=result["cert"], chain=chain)

def self_sign(self, common_name: str,
names: Optional[List[Dict[str, str]]] = None,
hosts: Optional[Collection[str]] = None) -> Certificate:
def self_sign(
self, common_name: str, names: Optional[List[Dict[str, str]]] = None, hosts: Optional[Collection[str]] = None
) -> Certificate:
"""Creates self-signed certificate"""
data = build_cert_request_json(common_name, names, hosts)

result = self._execute_command("selfsign", common_name, "-", stdin=json.dumps(data))
return Certificate(key=result["key"], certificate=result["cert"], chain=result["cert"])

def create_authority(self,
common_name: str,
hosts: Collection[str],
names: Optional[List[Dict[str, str]]] = None,
certificate_authority: Optional[Certificate] = None) -> Certificate:
def create_authority(
self,
common_name: str,
hosts: Collection[str],
names: Optional[List[Dict[str, str]]] = None,
certificate_authority: Optional[Certificate] = None,
) -> Certificate:
"""Generates self-signed root or intermediate CA certificate and private key
Args:
:param common_name: identifier to the certificate and key.
Expand All @@ -160,11 +173,13 @@ def create_authority(self,
certificate = self.sign_intermediate_authority(key, certificate_authority)
return certificate

def create(self,
common_name: str,
hosts: Collection[str],
certificate_authority: Optional[Certificate] = None,
names: Optional[List[Dict[str, str]]] = None) -> Certificate:
def create(
self,
common_name: str,
hosts: Collection[str],
certificate_authority: Optional[Certificate] = None,
names: Optional[List[Dict[str, str]]] = None,
) -> Certificate:
"""Create a new certificate.
Args:
:param common_name: Exact DNS match for which this certificate is valid
Expand Down
23 changes: 14 additions & 9 deletions testsuite/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@ class DefaultValueValidator(Validator):
"""Validator which will run default function only when the original value is missing"""

def __init__(self, name, default, **kwargs) -> None:
super().__init__(name, ne=None,
messages={
"operations": ("{name} must {operation} {op_value} but it is {value} in env {env}. "
"You might be missing tools on the cluster.")
},
default=default,
when=Validator(name, must_exist=False),
**kwargs)
super().__init__(
name,
ne=None,
messages={
"operations": (
"{name} must {operation} {op_value} but it is {value} in env {env}. "
"You might be missing tools on the cluster."
)
},
default=default,
when=Validator(name, must_exist=False),
**kwargs
)


settings = Dynaconf(
Expand All @@ -35,5 +40,5 @@ def __init__(self, name, default, **kwargs) -> None:
Validator("kuadrant.enable", must_exist=False, eq=False) | Validator("kuadrant.gateway.name", must_exist=True),
],
validate_only=["authorino", "kuadrant"],
loaders=["dynaconf.loaders.env_loader", "testsuite.config.openshift_loader"]
loaders=["dynaconf.loaders.env_loader", "testsuite.config.openshift_loader"],
)
5 changes: 1 addition & 4 deletions testsuite/config/openshift_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ def load(obj, env=None, silent=True, key=None, filename=None):
config = weakget(obj)
section = config["openshift"]
client = OpenShiftClient(
section["project"] % None,
section["api_url"] % None,
section["token"] % None,
section["kubeconfig_path"] % None
section["project"] % None, section["api_url"] % None, section["token"] % None, section["kubeconfig_path"] % None
)
obj["openshift"] = client

Expand Down
4 changes: 4 additions & 0 deletions testsuite/config/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

def fetch_route(name, force_http=False):
"""Fetches the URL of a route with specific name"""

def _fetcher(settings, _):
try:
openshift = settings["tools"]
Expand All @@ -17,11 +18,13 @@ def _fetcher(settings, _):
except Exception:
logger.warning("Unable to fetch route %s from tools", name)
return None

return _fetcher


def fetch_secret(name, key):
"""Fetches the key out of a secret with specific name"""

def _fetcher(settings, _):
try:
openshift = settings["tools"]
Expand All @@ -30,4 +33,5 @@ def _fetcher(settings, _):
except Exception:
logger.warning("Unable to fetch secret %s[%s] from tools", name, key)
return None

return _fetcher
39 changes: 33 additions & 6 deletions testsuite/httpx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def create_tmp_file(content: str):

class UnexpectedResponse(Exception):
"""Slightly different response attributes were expected"""

def __init__(self, msg, response):
super().__init__(msg)
self.response = response
Expand Down Expand Up @@ -58,12 +59,38 @@ def add_retry_code(self, code):
self.retry_codes.add(code)

@backoff.on_exception(backoff.fibo, UnexpectedResponse, max_tries=8, jitter=None)
def request(self, method: str, url, *, content=None, data=None, files=None,
json=None, params=None, headers=None, cookies=None, auth=None, follow_redirects=None,
timeout=None, extensions=None) -> Response:
response = super().request(method, url, content=content, data=data, files=files, json=json, params=params,
headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects,
timeout=timeout, extensions=extensions)
def request(
self,
method: str,
url,
*,
content=None,
data=None,
files=None,
json=None,
params=None,
headers=None,
cookies=None,
auth=None,
follow_redirects=None,
timeout=None,
extensions=None,
) -> Response:
response = super().request(
method,
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
if response.status_code in self.retry_codes:
raise UnexpectedResponse(f"Didn't expect '{response.status_code}' status code", response)
return response
15 changes: 7 additions & 8 deletions testsuite/httpx/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
class HttpxOidcClientAuth(Auth):
"""Auth class for Httpx client for product secured by oidc"""

def __init__(self, token: TokenType, location="authorization",
username: str = None, password: str = None) -> None:
def __init__(self, token: TokenType, location="authorization", username: str = None, password: str = None) -> None:
self.location = location
self._token = token
self.username = username
Expand All @@ -34,12 +33,12 @@ def token(self):
return self._token

def _add_credentials(self, request: Request, token):
if self.location == 'authorization':
request.headers['Authorization'] = f"Bearer {token}"
elif self.location == 'headers':
request.headers['access_token'] = token
elif self.location == 'query':
request.url = URL(request.url, params={'access_token': token})
if self.location == "authorization":
request.headers["Authorization"] = f"Bearer {token}"
elif self.location == "headers":
request.headers["access_token"] = token
elif self.location == "query":
request.url = URL(request.url, params={"access_token": token})
else:
raise ValueError(f"Unknown credentials location '{self.location}'")

Expand Down
Loading

0 comments on commit 8c68e03

Please sign in to comment.