diff --git a/tools/api/generate-openapi.py b/tools/api/generate-openapi.py index fe194ac85..a79dd1297 100755 --- a/tools/api/generate-openapi.py +++ b/tools/api/generate-openapi.py @@ -1,231 +1,179 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3 """Yoda API OpenAPI documentation generator. -This extracts all Yoda API functions from a ruleset, and generates an OpenAPI +This extracts all Yoda API functions from the ruleset, and generates an OpenAPI file based on the function signatures and docstrings. - -Note: depending on ruleset installation directory, you may need to run this -with a custom PYTHONPATH environment variable. By default /etc/irods is -included in the search path for ruleset imports. - -This module imports (and therefore executes) ruleset code. -Do not run it on untrusted codebases. """ __copyright__ = 'Copyright (c) 2020-2024, Utrecht University' __license__ = 'GPLv3, see LICENSE' -__author__ = ('Chris Smeele') -__author__ = ('Lazlo Westerhof') -# (in alphabetical order) - import argparse -import inspect +import ast +from collections import OrderedDict import json +import os import re import sys -from collections import OrderedDict -from importlib import import_module +from glob import glob +from typing import Any, Dict, Tuple, Union -parser = argparse.ArgumentParser(description=__doc__) -parser.add_argument('ruleset', metavar='RULESET', type=str, - help='a Python module/package name for an iRODS ruleset') -parser.add_argument('--core', dest='core', action='store_const', const=True, default=False, - help='only generate core API') -parser.add_argument('--module', action="store", dest="module", default=False, - help='only generate API of specific module') -args = parser.parse_args() -ruleset_name = args.ruleset -core = args.core -module = args.module +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + group = parser.add_mutually_exclusive_group() + group.add_argument('--core', dest='core', action='store_const', const=True, default=False, + help='only generate core API') + group.add_argument('--module', action="store", dest="module", default=None, + help='only generate API of specific module') + return parser.parse_args() -# Strategy: Import the requested ruleset with an instrumented environment, and -# apply introspection to extract API function information. +def get_ruleset_dir() -> str: + return os.path.join(os.path.realpath(os.path.dirname(__file__)), "../..") -# First we work on our environment: -class Sandbag(object): - """A sturdy object that does not mind being pushed around. +def get_ast_tree_of_file(ruleset_dir: str, ruleset_file: str): + init_file = os.path.join(ruleset_dir, ruleset_file) + with open(init_file, 'r') as file: + tree = ast.parse(file.read(), filename=init_file) + return tree - Used as a stub for various internal irods modules so that we can import - rulesets without errors. - """ - def __init__(self, *_, **kw): - self._data = kw - - def __call__(self, *_, **__): - return Sandbag() - def __getattr__(self, k): - return self._data.get(k, Sandbag()) - - def __setattr__(self, k, v): - if k == '_data': - return super(Sandbag, self).__setattr__(k, v) +def oDict(*xs: Tuple) -> OrderedDict: + return OrderedDict(xs) -class api(object): - """Injected util.api module that intercepts all API function declarations. +def get_openapi_template(ruleset_description: Union[str, None], ruleset_version: Union[str, None], + core: bool, module: str) -> OrderedDict: + """Create an OpenAPI document base template - Alternatively we could dir() the ruleset module and extract functions that - way, but that results in a mess. - By replacing the API decorator instead we can preserve order of - declarations, allowing for a more logical documentation structure. + Note: for the most part, order matters (e.g. ordering of API function list). + So we use ordered dicts. """ - fns = [] - - @staticmethod - def make(): - def f(g): - api.fns += [(g.__name__, g)] - return g - return f - - -# Inject iRODS modules. -sys.modules['irods_types'] = Sandbag() -sys.modules['genquery'] = Sandbag() -sys.modules['session_vars'] = Sandbag() - -# Inject other modules. -sys.modules['pysqlcipher3'] = Sandbag() - -# Inject the API shim, and its parent modules if needed. -if ruleset_name != 'rules_uu': - sys.modules['rules_uu'] = Sandbag(util = Sandbag(api = api)) - sys.modules['rules_uu.util'] = Sandbag(api = api) -sys.modules['rules_uu.util.api'] = api - -# Rulesets should be usable anywhere in PYTHONPATH. -# Add the iRODS directory to it for convenience. -sys.path += ['/etc/irods'] - -try: - # Import the ruleset. - ruleset_mod = import_module(ruleset_name) -except Exception as e: - print('Could not import ruleset <{}>: {}'.format(ruleset_name, e), file=sys.stderr) - raise - - -# Create an OpenAPI document. -# ... base template - -# Note: for the most part, order matters (e.g. ordering of API function list). -# So we use ordered dicts. -def oDict(*xs): - return OrderedDict(xs) - -title = 'Yoda API' -if core: - title = 'Yoda core API' -if module: - title = 'Yoda {} API'.format(module) - -spec = oDict(('openapi', '3.0.0'), - ('info', - oDict(('description', ruleset_mod.__doc__), - ('contact', - oDict(('email', 'l.r.westerhof@uu.nl'))), - ('version', getattr(ruleset_mod, '__version__', '9999')), - ('title', title))), - ('servers', - [oDict(('url', 'https://portal.yoda.test/api'), ('description', 'Local Yoda development server'))]), - ('security', [ oDict(('cookieAuth', [])), oDict(('basicAuth', [])) ]), - ('components', - oDict(('schemas', - oDict(('result_error', - oDict(('type', 'object'), - ('properties', - oDict(('status', oDict(('type', 'string'), ('description', 'Holds an error ID'))), - ('status_info', oDict(('type', 'string'), ('description', 'Holds a human-readable error description'))), - ('data', - oDict(('description', 'empty'), - ('nullable', True), - ('type', 'object'))))))))), - ('securitySchemes', - oDict(('cookieAuth', - oDict(('in', 'cookie'), - ('type', 'apiKey'), - # ('name', 'session'))), - ('name', 'yoda_session'))), - ('basicAuth', oDict(('type', 'http'), ('scheme', 'basic'))))), - ('responses', - oDict(('status_400', - oDict(('description', 'Bad request'), - ('content', - oDict(('application/json', - oDict(('schema', oDict(('$ref', '#/components/schemas/result_error'))))))))), - ('status_500', - oDict(('description', 'Internal error'), - ('content', - oDict(('application/json', - oDict(('schema', oDict(('$ref', '#/components/schemas/result_error'))))))))), - )))), - ('paths', oDict()) - ) - -def gen_fn_spec(name, fn): + title = 'Yoda API' + if core: + title = 'Yoda core API' + if module: + title = 'Yoda {} API'.format(module) + + ruleset_description_str = ruleset_description if ruleset_description is not None else "N/A" + ruleset_version_str = ruleset_version if ruleset_version is not None else '9999' + + spec = oDict(('openapi', '3.0.0'), + ('info', + oDict(('description', ruleset_description_str), + ('contact', + oDict(('email', 'l.r.westerhof@uu.nl'))), + ('version', ruleset_version_str), + ('title', title))), + ('servers', + [oDict(('url', 'https://portal.yoda.test/api'), ('description', 'Local Yoda development server'))]), + ('security', [oDict(('cookieAuth', [])), + oDict(('basicAuth', []))]), + ('components', + oDict(('schemas', + oDict(('result_error', + oDict(('type', 'object'), + ('properties', + oDict(('status', oDict(('type', 'string'), ('description', 'Holds an error ID'))), + ('status_info', oDict(('type', 'string'), ('description', + 'Holds a human-readable error description'))), + ('data', + oDict(('description', 'empty'), + ('nullable', True), + ('type', 'object'))))))))), + ('securitySchemes', + oDict(('cookieAuth', + oDict(('in', 'cookie'), + ('type', 'apiKey'), + # ('name', 'session'))), + ('name', 'yoda_session'))), + ('basicAuth', oDict(('type', 'http'), ('scheme', 'basic'))))), + ('responses', + oDict(('status_400', + oDict(('description', 'Bad request'), + ('content', + oDict(('application/json', + oDict(('schema', oDict(('$ref', '#/components/schemas/result_error'))))))))), + ('status_500', + oDict(('description', 'Internal error'), + ('content', + oDict(('application/json', + oDict(('schema', oDict(('$ref', '#/components/schemas/result_error'))))))))), + )))), + ('paths', oDict()) + ) + return spec + + +def get_json_type(input: str) -> str: + """Translate Python type to JSON type if a translation is available, otherwise use the Python type.""" + match_nullable_type = re.search(r'^(\w+)\s*\|\s*[Nn]one$', input) + if match_nullable_type: + input = match_nullable_type[1] + + types_lookup_table = { + 'str': 'string', + 'int': 'integer', + 'bool': 'boolean', + 'dict': 'object', + 'list': 'array'} + return types_lookup_table.get(input, input) + + +def is_nullable_type(input: str) -> bool: + return (re.search(r'^(\w+)\s*\|\s*[Nn]one$', input) + or re.search(r'^[Nn]one\s*\|\s*(\w+)$', input)) + + +def gen_fn_spec(function_name: str, function_properties: Dict): """Generate OpenAPI spec for one function (one path)""" - mod = fn.__module__.replace(ruleset_name+'.', '') - - print('{}/{}'.format(mod, name), file=sys.stderr) - - # Convert function signature -> argument spec. - # TODO: Python3: Extract type annotations. - # Also see https://docs.python.org/3.8/library/typing.html#typing.TypedDict - # for annotation of complex structures. - - a_pos, a_var, a_kw, a_defaults = inspect.getargspec(fn) - - a_pos = a_pos[1:] # ignore callback/context param. + doc: str = str(function_properties.get("doc")) if function_properties.get("doc") is not None else "" - required = a_pos if a_defaults is None else a_pos[:-len(a_defaults) ] - optional = [] if a_defaults is None else a_pos[ -len(a_defaults):] + props = oDict() + for arg_name in function_properties["args"]: + arg_properties = function_properties["args"][arg_name] - doc = fn.__doc__ or '(undocumented)' + # Try to get type from docstring, otherwise from type annotation. If neither is available, assume it's as string + doc_py_type = re.findall(r'^\s*:type\s+' + re.escape(function_name) + r':\s*(.+?)\s*$', doc, re.MULTILINE) + ann_py_type = arg_properties["annotation"] + py_type = doc_py_type[-1] if len(doc_py_type) > 0 else (ann_py_type if ann_py_type is not None else "str") + json_type = get_json_type(py_type) + nullable_type = is_nullable_type(py_type) - # Map Python types to JSON schema types. - types = {'str': 'string' - ,'int': 'integer' - ,'bool': 'boolean' - ,'dict': 'object' - ,'list': 'array'} + search_param_pattern: str = r'^\s*:param\s+' + re.escape(arg_name) + r':\s*(.+?)\s*$' + arg_description = (re.findall(search_param_pattern, str(doc), re.MULTILINE) or ['(undocumented)'])[-1] - paramdocs = oDict(*[(k, (types[(re.findall(r'^\s*:type\s+' +re.escape(k)+r':\s*(.+?)\s*$', doc, re.MULTILINE) or ['str'])[-1]], - (re.findall(r'^\s*:param\s+'+re.escape(k)+r':\s*(.+?)\s*$', doc, re.MULTILINE) or ['(undocumented)'])[-1], - None if i < len(required) else a_defaults[i-len(required)])) - for i, k in enumerate(required+optional)]) + arg_default = arg_properties["default_value"] - # Sphinx-compatible parameter documentation. - doc = re.sub(r'^\s*:param.*?\n', '', doc, flags=re.MULTILINE|re.DOTALL) - doc = re.sub(r'^\s*:type.*?\n', '', doc, flags=re.MULTILINE|re.DOTALL) + props[arg_name] = {"type": json_type, + "description": arg_description, + "default": arg_default, + "nullable": nullable_type} - # Only retrieve summary. - doc = re.sub(r'^\s*[\r\n].*', '', doc, flags=re.MULTILINE|re.DOTALL) + # Remove everything but the summary from the docstring + doc = re.sub(r'^\s*:param.*?\n', '', doc, flags=re.MULTILINE | re.DOTALL) + doc = re.sub(r'^\s*:type.*?\n', '', doc, flags=re.MULTILINE | re.DOTALL) + doc = re.sub(r'^\s*[\r\n].*', '', doc, flags=re.MULTILINE | re.DOTALL) - req = list(required) - props = oDict(*[(name, { 'type': paramdocs[name][0], - 'description': paramdocs[name][1], - 'default': paramdocs[name][2] }) - for name in required+optional]) - - for name in required+optional: + for name in props: if props[name]['type'] == 'array': props[name]['items'] = oDict() dataspec = { 'type': 'object', - 'required': req, + 'required': [arg_name for arg_name in function_properties["args"] + if function_properties["args"][arg_name]["required"]], 'properties': props } + tags = [function_properties["tag"]] + # Silly. - if req == []: + if dataspec['required'] == []: del dataspec['required'] # Currently, arguments are specified as a JSON string in a a @@ -235,66 +183,160 @@ def gen_fn_spec(name, fn): # data to actual request parameters (e.g. individual form "fields"). return oDict( - ('post', - oDict(('tags', [mod]), - ('summary', doc), - ('requestBody', - oDict(('required', True), - ('content', - # How do we encode arguments? - # - # 1) as a JSON 'data' property - # This is in line with the current PHP Yoda portal, - # but as a result parameter documentation is unaccessible from swagger, - # and optional parameters are missing completely. - # - # oDict(('multipart/form-data', - # oDict(('schema', - # oDict(('type', 'object'), - # ('properties', - # oDict(('data', dataspec))))))))))), - # - # 2) as a JSON request body. Same result as (1) - # - # oDict(('application/json', - # oDict(('schema', dataspec))))))), - # - # 3) Toplevel parameters as form fields. - # Not in line with the current portal, - # but provides the best documentation value. - # - oDict(('application/json', - oDict(('schema', dataspec))))))), - ('responses', - oDict(('200', - oDict(('description', 'Success'), - ('content', - oDict(('application/json', - oDict(('schema', - oDict(('type', 'object'), - ('properties', - oDict(('status', oDict(('type', 'string'))), - ('status_info', oDict(('type', 'string'), ('nullable', True))), - ('data', oDict(('nullable', True))))))))))))), - ('400', oDict(('$ref', '#/components/responses/status_400'))), - ('500', oDict(('$ref', '#/components/responses/status_500')))))))) - -for name, fn in api.fns: - if '' in name: - # Ignore weird undocumented inline definitions. - continue - - name = re.sub('^api_', '', name) - - if core: - modules = ['datarequest', 'deposit'] - if name.startswith(tuple(modules)): + ('post', + oDict(('tags', tags), + ('summary', doc), + ('requestBody', + oDict(('required', True), + ('content', + # How do we encode arguments? + # + # 1) as a JSON 'data' property + # This is in line with the current PHP Yoda portal, + # but as a result parameter documentation is unaccessible from swagger, + # and optional parameters are missing completely. + # + # oDict(('multipart/form-data', + # oDict(('schema', + # oDict(('type', 'object'), + # ('properties', + # oDict(('data', dataspec))))))))))), + # + # 2) as a JSON request body. Same result as (1) + # + # oDict(('application/json', + # oDict(('schema', dataspec))))))), + # + # 3) Toplevel parameters as form fields. + # Not in line with the current portal, + # but provides the best documentation value. + # + oDict(('application/json', + oDict(('schema', dataspec))))))), + ('responses', + oDict(('200', + oDict(('description', 'Success'), + ('content', + oDict(('application/json', + oDict(('schema', + oDict(('type', 'object'), + ('properties', + oDict(('status', oDict(('type', 'string'))), + ('status_info', oDict( + ('type', 'string'), ('nullable', True))), + ('data', oDict(('nullable', True))))))))))))), + ('400', oDict(('$ref', '#/components/responses/status_400'))), + ('500', oDict(('$ref', '#/components/responses/status_500')))))))) + + +def add_api_data_to_spec( + spec: OrderedDict, api_function_data: OrderedDict, core: bool, module: str) -> None: + """Add collected information about API functions to the output document.""" + for function_name in api_function_data: + if '' in function_name: + # Ignore weird undocumented inline definitions. continue - if module: - if not name.startswith(module): - continue - - spec['paths'].update([('/'+name, gen_fn_spec(name, fn))]) - -print(json.dumps(spec)) + name = re.sub('^api_', '', function_name) + + if core: + modules = ['datarequest', 'deposit'] + if name.startswith(tuple(modules)): + continue + + if module: + if not name.startswith(module): + continue + + spec['paths'].update( + [('/' + name, gen_fn_spec(function_name, api_function_data[function_name]))]) + + +def get_ruleset_description(ruleset_dir: str) -> Union[str, None]: + """Get the global doc string of the ruleset""" + tree = get_ast_tree_of_file(ruleset_dir, "__init__.py") + return ast.get_docstring(tree) + + +def get_ruleset_version(ruleset_dir) -> Union[str, None]: + """Get the global version of the ruleset""" + tree = get_ast_tree_of_file(ruleset_dir, "__init__.py") + version = None + for node in ast.walk(tree): + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name) and target.id == "__version__": + if isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): + return node.value.value + return version + + +def get_api_function_data(ruleset_dir: str, core: bool, module: str) -> OrderedDict: + """Collect argument, docstring and decorator information for API functions.""" + result = oDict() + ruleset_source_files = glob(os.path.join(ruleset_dir, "*.py")) + for source_file in ruleset_source_files: + with open(source_file, "r") as file: + tree = ast.parse(file.read()) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + function_name = node.name + + if not function_name.startswith("api_"): + continue + + def _get_argument_data(node): + argdata = oDict() + for i, arg in enumerate(node.args.args): + + if i == 0: + continue # Skip the internal context argument + + arg_name = arg.arg + annotation = None + if arg.annotation: + annotation = ast.unparse(arg.annotation) if hasattr(ast, 'unparse') else None + + default_value = None + required = True + if i >= len(node.args.args) - len(node.args.defaults): + default_index = i - (len(node.args.args) - len(node.args.defaults)) + default_value = ast.unparse(node.args.defaults[default_index]) if hasattr(ast, 'unparse') else None + required = False + + argdata[arg_name] = {"annotation": annotation, "default_value": default_value, "required": required} + + return argdata + + function_properties: dict[str, Any] = dict() + function_properties["doc"] = ast.get_docstring(node) + function_properties["args"] = _get_argument_data(node) + function_properties["tag"] = os.path.basename(source_file)[:-3] + function_properties["decorators"] = [ast.unparse(decorator) if hasattr(ast, 'unparse') else None + for decorator in node.decorator_list] + if "api.make()" in function_properties["decorators"]: + result[function_name] = function_properties + return result + + +def main(args: argparse.Namespace) -> None: + ruleset_dir = get_ruleset_dir() + ruleset_description = get_ruleset_description(ruleset_dir) + ruleset_version = get_ruleset_version(ruleset_dir) + api_function_data = get_api_function_data(ruleset_dir, args.core, args.module) + spec = get_openapi_template(ruleset_description, ruleset_version, args.core, args.module) + add_api_data_to_spec(spec, api_function_data, args.core, args.module) + print(json.dumps(spec)) + + +def _ensure_python_version_okay(): + if (sys.version_info.major < 3 + or (sys.version_info.major == 3 and sys.version_info.minor < 9)): + print("Error: this script requires Python 3.9 or higher to run.") + sys.exit(1) + + +if __name__ == "__main__": + _ensure_python_version_okay() + args = get_args() + main(args)