From 907f655d77bb42418442098a7004982c261d6a55 Mon Sep 17 00:00:00 2001 From: Sietse Snel Date: Fri, 20 Dec 2024 16:46:13 +0100 Subject: [PATCH] YDA-6066: Adjust OpenAPI script for Py3 Update generate-openapi script so that it works correctly with Python 3. This includes:: - Extracting functions to make code easier to understand. - Instead of loading the ruleset and using custom instrumentation to inspect it, collect properties of API rules using the built-in AST module. This removes the need to update the OpenAPI script as dependencies change, since we no longer need to ensure we have stub code in place to be able to run the instrumentation. --- tools/api/generate-openapi.py | 536 ++++++++++++++++++---------------- 1 file changed, 290 insertions(+), 246 deletions(-) diff --git a/tools/api/generate-openapi.py b/tools/api/generate-openapi.py index fe194ac85..cd0d7ba41 100755 --- a/tools/api/generate-openapi.py +++ b/tools/api/generate-openapi.py @@ -1,231 +1,181 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3 """Yoda API OpenAPI documentation generator. -This extracts all Yoda API functions from a ruleset, and generates an OpenAPI +This extracts all Yoda API functions from the ruleset, and generates an OpenAPI file based on the function signatures and docstrings. - -Note: depending on ruleset installation directory, you may need to run this -with a custom PYTHONPATH environment variable. By default /etc/irods is -included in the search path for ruleset imports. - -This module imports (and therefore executes) ruleset code. -Do not run it on untrusted codebases. """ __copyright__ = 'Copyright (c) 2020-2024, Utrecht University' __license__ = 'GPLv3, see LICENSE' -__author__ = ('Chris Smeele') -__author__ = ('Lazlo Westerhof') -# (in alphabetical order) - import argparse -import inspect +import ast +from collections import OrderedDict import json +import os import re import sys -from collections import OrderedDict -from importlib import import_module +from glob import glob +from typing import Any, Dict, Tuple, Union -parser = argparse.ArgumentParser(description=__doc__) -parser.add_argument('ruleset', metavar='RULESET', type=str, - help='a Python module/package name for an iRODS ruleset') -parser.add_argument('--core', dest='core', action='store_const', const=True, default=False, - help='only generate core API') -parser.add_argument('--module', action="store", dest="module", default=False, - help='only generate API of specific module') -args = parser.parse_args() -ruleset_name = args.ruleset -core = args.core -module = args.module +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + group = parser.add_mutually_exclusive_group() + group.add_argument('--core', dest='core', action='store_const', const=True, default=False, + help='only generate core API') + group.add_argument('--module', action="store", dest="module", default=None, + help='only generate API of specific module') + return parser.parse_args() -# Strategy: Import the requested ruleset with an instrumented environment, and -# apply introspection to extract API function information. +def get_ruleset_dir() -> str: + return os.path.join(os.path.realpath(os.path.dirname(__file__)), "../..") -# First we work on our environment: -class Sandbag(object): - """A sturdy object that does not mind being pushed around. +def get_ast_tree_of_file(ruleset_dir: str, ruleset_file: str): + init_file = os.path.join(ruleset_dir, ruleset_file) + with open(init_file, 'r') as file: + tree = ast.parse(file.read(), filename=init_file) + return tree - Used as a stub for various internal irods modules so that we can import - rulesets without errors. - """ - def __init__(self, *_, **kw): - self._data = kw - - def __call__(self, *_, **__): - return Sandbag() - def __getattr__(self, k): - return self._data.get(k, Sandbag()) - - def __setattr__(self, k, v): - if k == '_data': - return super(Sandbag, self).__setattr__(k, v) +def oDict(*xs: Tuple) -> OrderedDict: + return OrderedDict(xs) -class api(object): - """Injected util.api module that intercepts all API function declarations. +def get_openapi_template(ruleset_description: Union[str, None], ruleset_version: Union[str, None], + core: bool, module: str) -> OrderedDict: + """Create an OpenAPI document base template - Alternatively we could dir() the ruleset module and extract functions that - way, but that results in a mess. - By replacing the API decorator instead we can preserve order of - declarations, allowing for a more logical documentation structure. + Note: for the most part, order matters (e.g. ordering of API function list). + So we use ordered dicts. """ - fns = [] - - @staticmethod - def make(): - def f(g): - api.fns += [(g.__name__, g)] - return g - return f - - -# Inject iRODS modules. -sys.modules['irods_types'] = Sandbag() -sys.modules['genquery'] = Sandbag() -sys.modules['session_vars'] = Sandbag() - -# Inject other modules. -sys.modules['pysqlcipher3'] = Sandbag() - -# Inject the API shim, and its parent modules if needed. -if ruleset_name != 'rules_uu': - sys.modules['rules_uu'] = Sandbag(util = Sandbag(api = api)) - sys.modules['rules_uu.util'] = Sandbag(api = api) -sys.modules['rules_uu.util.api'] = api - -# Rulesets should be usable anywhere in PYTHONPATH. -# Add the iRODS directory to it for convenience. -sys.path += ['/etc/irods'] - -try: - # Import the ruleset. - ruleset_mod = import_module(ruleset_name) -except Exception as e: - print('Could not import ruleset <{}>: {}'.format(ruleset_name, e), file=sys.stderr) - raise - - -# Create an OpenAPI document. -# ... base template - -# Note: for the most part, order matters (e.g. ordering of API function list). -# So we use ordered dicts. -def oDict(*xs): - return OrderedDict(xs) - -title = 'Yoda API' -if core: - title = 'Yoda core API' -if module: - title = 'Yoda {} API'.format(module) - -spec = oDict(('openapi', '3.0.0'), - ('info', - oDict(('description', ruleset_mod.__doc__), - ('contact', - oDict(('email', 'l.r.westerhof@uu.nl'))), - ('version', getattr(ruleset_mod, '__version__', '9999')), - ('title', title))), - ('servers', - [oDict(('url', 'https://portal.yoda.test/api'), ('description', 'Local Yoda development server'))]), - ('security', [ oDict(('cookieAuth', [])), oDict(('basicAuth', [])) ]), - ('components', - oDict(('schemas', - oDict(('result_error', - oDict(('type', 'object'), - ('properties', - oDict(('status', oDict(('type', 'string'), ('description', 'Holds an error ID'))), - ('status_info', oDict(('type', 'string'), ('description', 'Holds a human-readable error description'))), - ('data', - oDict(('description', 'empty'), - ('nullable', True), - ('type', 'object'))))))))), - ('securitySchemes', - oDict(('cookieAuth', - oDict(('in', 'cookie'), - ('type', 'apiKey'), - # ('name', 'session'))), - ('name', 'yoda_session'))), - ('basicAuth', oDict(('type', 'http'), ('scheme', 'basic'))))), - ('responses', - oDict(('status_400', - oDict(('description', 'Bad request'), - ('content', - oDict(('application/json', - oDict(('schema', oDict(('$ref', '#/components/schemas/result_error'))))))))), - ('status_500', - oDict(('description', 'Internal error'), - ('content', - oDict(('application/json', - oDict(('schema', oDict(('$ref', '#/components/schemas/result_error'))))))))), - )))), - ('paths', oDict()) - ) - -def gen_fn_spec(name, fn): + title = 'Yoda API' + if core: + title = 'Yoda core API' + if module: + title = 'Yoda {} API'.format(module) + + ruleset_description_str = ruleset_description if ruleset_description is not None else "N/A" + ruleset_version_str = ruleset_version if ruleset_version is not None else '9999' + + spec = oDict(('openapi', '3.0.0'), + ('info', + oDict(('description', ruleset_description_str), + ('contact', + oDict(('email', 'l.r.westerhof@uu.nl'))), + ('version', ruleset_version_str), + ('title', title))), + ('servers', + [oDict(('url', 'https://portal.yoda.test/api'), ('description', 'Local Yoda development server'))]), + ('security', [oDict(('cookieAuth', [])), + oDict(('basicAuth', []))]), + ('components', + oDict(('schemas', + oDict(('result_error', + oDict(('type', 'object'), + ('properties', + oDict(('status', oDict(('type', 'string'), ('description', 'Holds an error ID'))), + ('status_info', oDict(('type', 'string'), ('description', + 'Holds a human-readable error description'))), + ('data', + oDict(('description', 'empty'), + ('nullable', True), + ('type', 'object'))))))))), + ('securitySchemes', + oDict(('cookieAuth', + oDict(('in', 'cookie'), + ('type', 'apiKey'), + # ('name', 'session'))), + ('name', 'yoda_session'))), + ('basicAuth', oDict(('type', 'http'), ('scheme', 'basic'))))), + ('responses', + oDict(('status_400', + oDict(('description', 'Bad request'), + ('content', + oDict(('application/json', + oDict(('schema', oDict(('$ref', '#/components/schemas/result_error'))))))))), + ('status_500', + oDict(('description', 'Internal error'), + ('content', + oDict(('application/json', + oDict(('schema', oDict(('$ref', '#/components/schemas/result_error'))))))))), + )))), + ('paths', oDict()) + ) + return spec + + +def get_json_type(input: str) -> str: + """Translate Python type to JSON type if a translation is available, otherwise use the Python type.""" + match_nullable_type = re.search(r'^(\w+)\s*\|\s*[Nn]one$', input) + if match_nullable_type: + input = match_nullable_type[1] + + types_lookup_table = { + 'str': 'string', + 'int': 'integer', + 'bool': 'boolean', + 'dict': 'object', + 'Dict': 'object', + 'list': 'array', + 'List': 'array'} + return types_lookup_table.get(input, input) + + +def is_nullable_type(input: str) -> bool: + return bool(re.search(r'^(\w+)\s*\|\s*[Nn]one$', input) + or re.search(r'^[Nn]one\s*\|\s*(\w+)$', input)) + + +def gen_fn_spec(function_name: str, function_properties: Dict): """Generate OpenAPI spec for one function (one path)""" - mod = fn.__module__.replace(ruleset_name+'.', '') - - print('{}/{}'.format(mod, name), file=sys.stderr) - - # Convert function signature -> argument spec. - # TODO: Python3: Extract type annotations. - # Also see https://docs.python.org/3.8/library/typing.html#typing.TypedDict - # for annotation of complex structures. - - a_pos, a_var, a_kw, a_defaults = inspect.getargspec(fn) - - a_pos = a_pos[1:] # ignore callback/context param. + doc: str = str(function_properties.get("doc")) if function_properties.get("doc") is not None else "" - required = a_pos if a_defaults is None else a_pos[:-len(a_defaults) ] - optional = [] if a_defaults is None else a_pos[ -len(a_defaults):] + props = oDict() + for arg_name in function_properties["args"]: + arg_properties = function_properties["args"][arg_name] - doc = fn.__doc__ or '(undocumented)' + # Try to get type from docstring, otherwise from type annotation. If neither is available, assume it's as string + doc_py_type = re.findall(r'^\s*:type\s+' + re.escape(function_name) + r':\s*(.+?)\s*$', doc, re.MULTILINE) + ann_py_type = arg_properties["annotation"] + py_type = doc_py_type[-1] if len(doc_py_type) > 0 else (ann_py_type if ann_py_type is not None else "str") + json_type = get_json_type(py_type) + nullable_type = is_nullable_type(py_type) - # Map Python types to JSON schema types. - types = {'str': 'string' - ,'int': 'integer' - ,'bool': 'boolean' - ,'dict': 'object' - ,'list': 'array'} + search_param_pattern: str = r'^\s*:param\s+' + re.escape(arg_name) + r':\s*(.+?)\s*$' + arg_description = (re.findall(search_param_pattern, str(doc), re.MULTILINE) or ['(undocumented)'])[-1] - paramdocs = oDict(*[(k, (types[(re.findall(r'^\s*:type\s+' +re.escape(k)+r':\s*(.+?)\s*$', doc, re.MULTILINE) or ['str'])[-1]], - (re.findall(r'^\s*:param\s+'+re.escape(k)+r':\s*(.+?)\s*$', doc, re.MULTILINE) or ['(undocumented)'])[-1], - None if i < len(required) else a_defaults[i-len(required)])) - for i, k in enumerate(required+optional)]) + arg_default = arg_properties["default_value"] - # Sphinx-compatible parameter documentation. - doc = re.sub(r'^\s*:param.*?\n', '', doc, flags=re.MULTILINE|re.DOTALL) - doc = re.sub(r'^\s*:type.*?\n', '', doc, flags=re.MULTILINE|re.DOTALL) + props[arg_name] = {"type": json_type, + "description": arg_description, + "default": arg_default, + "nullable": nullable_type} - # Only retrieve summary. - doc = re.sub(r'^\s*[\r\n].*', '', doc, flags=re.MULTILINE|re.DOTALL) + # Remove everything but the summary from the docstring + doc = re.sub(r'^\s*:param.*?\n', '', doc, flags=re.MULTILINE | re.DOTALL) + doc = re.sub(r'^\s*:type.*?\n', '', doc, flags=re.MULTILINE | re.DOTALL) + doc = re.sub(r'^\s*[\r\n].*', '', doc, flags=re.MULTILINE | re.DOTALL) - req = list(required) - props = oDict(*[(name, { 'type': paramdocs[name][0], - 'description': paramdocs[name][1], - 'default': paramdocs[name][2] }) - for name in required+optional]) - - for name in required+optional: + for name in props: if props[name]['type'] == 'array': props[name]['items'] = oDict() dataspec = { 'type': 'object', - 'required': req, + 'required': [arg_name for arg_name in function_properties["args"] + if function_properties["args"][arg_name]["required"]], 'properties': props } + tags = [function_properties["tag"]] + # Silly. - if req == []: + if dataspec['required'] == []: del dataspec['required'] # Currently, arguments are specified as a JSON string in a a @@ -235,66 +185,160 @@ def gen_fn_spec(name, fn): # data to actual request parameters (e.g. individual form "fields"). return oDict( - ('post', - oDict(('tags', [mod]), - ('summary', doc), - ('requestBody', - oDict(('required', True), - ('content', - # How do we encode arguments? - # - # 1) as a JSON 'data' property - # This is in line with the current PHP Yoda portal, - # but as a result parameter documentation is unaccessible from swagger, - # and optional parameters are missing completely. - # - # oDict(('multipart/form-data', - # oDict(('schema', - # oDict(('type', 'object'), - # ('properties', - # oDict(('data', dataspec))))))))))), - # - # 2) as a JSON request body. Same result as (1) - # - # oDict(('application/json', - # oDict(('schema', dataspec))))))), - # - # 3) Toplevel parameters as form fields. - # Not in line with the current portal, - # but provides the best documentation value. - # - oDict(('application/json', - oDict(('schema', dataspec))))))), - ('responses', - oDict(('200', - oDict(('description', 'Success'), - ('content', - oDict(('application/json', - oDict(('schema', - oDict(('type', 'object'), - ('properties', - oDict(('status', oDict(('type', 'string'))), - ('status_info', oDict(('type', 'string'), ('nullable', True))), - ('data', oDict(('nullable', True))))))))))))), - ('400', oDict(('$ref', '#/components/responses/status_400'))), - ('500', oDict(('$ref', '#/components/responses/status_500')))))))) - -for name, fn in api.fns: - if '' in name: - # Ignore weird undocumented inline definitions. - continue - - name = re.sub('^api_', '', name) - - if core: - modules = ['datarequest', 'deposit'] - if name.startswith(tuple(modules)): + ('post', + oDict(('tags', tags), + ('summary', doc), + ('requestBody', + oDict(('required', True), + ('content', + # How do we encode arguments? + # + # 1) as a JSON 'data' property + # This is in line with the current PHP Yoda portal, + # but as a result parameter documentation is unaccessible from swagger, + # and optional parameters are missing completely. + # + # oDict(('multipart/form-data', + # oDict(('schema', + # oDict(('type', 'object'), + # ('properties', + # oDict(('data', dataspec))))))))))), + # + # 2) as a JSON request body. Same result as (1) + # + # oDict(('application/json', + # oDict(('schema', dataspec))))))), + # + # 3) Toplevel parameters as form fields. + # Not in line with the current portal, + # but provides the best documentation value. + # + oDict(('application/json', + oDict(('schema', dataspec))))))), + ('responses', + oDict(('200', + oDict(('description', 'Success'), + ('content', + oDict(('application/json', + oDict(('schema', + oDict(('type', 'object'), + ('properties', + oDict(('status', oDict(('type', 'string'))), + ('status_info', oDict( + ('type', 'string'), ('nullable', True))), + ('data', oDict(('nullable', True))))))))))))), + ('400', oDict(('$ref', '#/components/responses/status_400'))), + ('500', oDict(('$ref', '#/components/responses/status_500')))))))) + + +def add_api_data_to_spec( + spec: OrderedDict, api_function_data: OrderedDict, core: bool, module: str) -> None: + """Add collected information about API functions to the output document.""" + for function_name in api_function_data: + if '' in function_name: + # Ignore weird undocumented inline definitions. continue - if module: - if not name.startswith(module): - continue - - spec['paths'].update([('/'+name, gen_fn_spec(name, fn))]) - -print(json.dumps(spec)) + name = re.sub('^api_', '', function_name) + + if core: + modules = ['datarequest', 'deposit'] + if name.startswith(tuple(modules)): + continue + + if module: + if not name.startswith(module): + continue + + spec['paths'].update( + [('/' + name, gen_fn_spec(function_name, api_function_data[function_name]))]) + + +def get_ruleset_description(ruleset_dir: str) -> Union[str, None]: + """Get the global doc string of the ruleset""" + tree = get_ast_tree_of_file(ruleset_dir, "__init__.py") + return ast.get_docstring(tree) + + +def get_ruleset_version(ruleset_dir) -> Union[str, None]: + """Get the global version of the ruleset""" + tree = get_ast_tree_of_file(ruleset_dir, "__init__.py") + version = None + for node in ast.walk(tree): + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name) and target.id == "__version__": + if isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): + return node.value.value + return version + + +def get_api_function_data(ruleset_dir: str, core: bool, module: str) -> OrderedDict: + """Collect argument, docstring and decorator information for API functions.""" + result = oDict() + ruleset_source_files = glob(os.path.join(ruleset_dir, "*.py")) + for source_file in ruleset_source_files: + with open(source_file, "r") as file: + tree = ast.parse(file.read()) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + function_name = node.name + + if not function_name.startswith("api_"): + continue + + def _get_argument_data(node): + argdata = oDict() + for i, arg in enumerate(node.args.args): + + if i == 0: + continue # Skip the internal context argument + + arg_name = arg.arg + annotation = None + if arg.annotation: + annotation = ast.unparse(arg.annotation) if hasattr(ast, 'unparse') else None + + default_value = None + required = True + if i >= len(node.args.args) - len(node.args.defaults): + default_index = i - (len(node.args.args) - len(node.args.defaults)) + default_value = ast.unparse(node.args.defaults[default_index]) if hasattr(ast, 'unparse') else None + required = False + + argdata[arg_name] = {"annotation": annotation, "default_value": default_value, "required": required} + + return argdata + + function_properties: dict[str, Any] = dict() + function_properties["doc"] = ast.get_docstring(node) + function_properties["args"] = _get_argument_data(node) + function_properties["tag"] = os.path.basename(source_file)[:-3] + function_properties["decorators"] = [ast.unparse(decorator) if hasattr(ast, 'unparse') else None + for decorator in node.decorator_list] + if "api.make()" in function_properties["decorators"]: + result[function_name] = function_properties + return result + + +def main(args: argparse.Namespace) -> None: + ruleset_dir = get_ruleset_dir() + ruleset_description = get_ruleset_description(ruleset_dir) + ruleset_version = get_ruleset_version(ruleset_dir) + api_function_data = get_api_function_data(ruleset_dir, args.core, args.module) + spec = get_openapi_template(ruleset_description, ruleset_version, args.core, args.module) + add_api_data_to_spec(spec, api_function_data, args.core, args.module) + print(json.dumps(spec)) + + +def _ensure_python_version_okay(): + if (sys.version_info.major < 3 + or (sys.version_info.major == 3 and sys.version_info.minor < 9)): + print("Error: this script requires Python 3.9 or higher to run.") + sys.exit(1) + + +if __name__ == "__main__": + _ensure_python_version_okay() + args = get_args() + main(args)