Skip to content

Commit

Permalink
Add lookup plugin to query CloudStack API (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanganellilore authored Dec 2, 2024
1 parent 52fcebc commit ca0a634
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 0 deletions.
89 changes: 89 additions & 0 deletions plugins/lookup/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright (c) 2024, Lorenzo Tanganelli
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function

__metaclass__ = type

DOCUMENTATION = """
name: api
author: Lorenzo Tanganelli (@tanganellilore)
short_description: Iteract with the Cloudstack API via lookup
requirements:
- None
description:
- Returns GET requests from the Cloudstack API.
options:
_terms:
description:
- The endpoint to query, i.e. listUserData, listVirtualMachines, etc.
required: True
query_params:
description:
- The query parameters to search for in the form of key/value pairs.
type: dict
required: False
extends_documentation_fragment:
- ngine_io.cloudstack.cloudstack
notes:
- If the query is not filtered properly this can cause a performance impact.
"""

EXAMPLES = """
- name: List all UserData from the API
set_fact:
controller_settings: "{{ lookup('ngine_io.cloudstack.api', 'listUserData', query_params={ 'listall': true }) }}"
- name: List all Virtual Machines from the API
set_fact:
virtual_machines: "{{ lookup('ngine_io.cloudstack.api', 'listVirtualMachines') }}"
- name: List specific Virtual Machines from the API
set_fact:
virtual_machines: "{{ lookup('ngine_io.cloudstack.api', 'listVirtualMachines', query_params={ 'name': 'myvmname' }) }}"
"""

RETURN = """
_raw:
description:
- Response from the API
type: dict
returned: on successful request
"""

from ansible.plugins.lookup import LookupBase
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_native
from ansible.utils.display import Display

from ..module_utils.cloudstack_api import AnsibleCloudStackAPI


class LookupModule(LookupBase):
display = Display()

def handle_error(self, **kwargs):
raise AnsibleError(to_native(kwargs.get("msg")))

def warn_callback(self, warning):
self.display.warning(warning)

def run(self, terms, variables=None, **kwargs):
if len(terms) != 1:
raise AnsibleError("You must pass exactly one endpoint to query")

self.set_options(direct=kwargs)

module = AnsibleCloudStackAPI(argument_spec={}, direct_params=kwargs, error_callback=self.handle_error, warn_callback=self.warn_callback)

args = {}
if self.get_option("query_params"):
args.update(self.get_option("query_params", {}))

res = module.query_api(terms[0], **args)

if res is None:
return []
if isinstance(res, list):
return res

return [res]
119 changes: 119 additions & 0 deletions plugins/module_utils/cloudstack_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2024, Lorenzo Tanganelli
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)

from __future__ import absolute_import, division, print_function

__metaclass__ = type


import os
import re
import sys
import traceback

from ansible.module_utils._text import to_native
from ansible.module_utils.basic import missing_required_lib, AnsibleModule

CS_IMP_ERR = None
try:
from cs import CloudStack, CloudStackException

HAS_LIB_CS = True
except ImportError:
CS_IMP_ERR = traceback.format_exc()
HAS_LIB_CS = False


if sys.version_info > (3,):
long = int


class AnsibleCloudStackAPI(AnsibleModule):

error_callback = None
warn_callback = None
AUTH_ARGSPEC = dict(
api_key=os.getenv("CLOUDSTACK_KEY"),
api_secret=os.getenv("CLOUDSTACK_SECRET"),
api_url=os.getenv("CLOUDSTACK_ENDPOINT"),
api_http_method=os.getenv("CLOUDSTACK_METHOD", "get"),
api_timeout=os.getenv("CLOUDSTACK_TIMEOUT", 10),
api_verify_ssl_cert=os.getenv("CLOUDSTACK_VERIFY"),
validate_certs=os.getenv("CLOUDSTACK_DANGEROUS_NO_TLS_VERIFY", True),
)

def __init__(self, argument_spec=None, direct_params=None, error_callback=None, warn_callback=None, **kwargs):

if not HAS_LIB_CS:
self.fail_json(msg=missing_required_lib("cs"), exception=CS_IMP_ERR)

full_argspec = {}
full_argspec.update(AnsibleCloudStackAPI.AUTH_ARGSPEC)
full_argspec.update(argument_spec)
kwargs["supports_check_mode"] = True

self.error_callback = error_callback
self.warn_callback = warn_callback

self._cs = None
self.result = {}

if direct_params is not None:
for param, value in full_argspec.items():
if param in direct_params:
setattr(self, param, direct_params[param])
else:
setattr(self, param, value)
else:
super(AnsibleCloudStackAPI, self).__init__(argument_spec=full_argspec, **kwargs)

# Perform some basic validation
if not re.match("^https{0,1}://", self.api_url):
self.api_url = "https://{0}".format(self.api_url)

def fail_json(self, **kwargs):
if self.error_callback:
self.error_callback(**kwargs)
else:
super().fail_json(**kwargs)

def exit_json(self, **kwargs):
super().exit_json(**kwargs)

@property
def cs(self):
if self._cs is None:
api_config = self.get_api_config()
self._cs = CloudStack(**api_config)
return self._cs

def get_api_config(self):
api_config = {
"endpoint": self.api_url,
"key": self.api_key,
"secret": self.api_secret,
"timeout": self.api_timeout,
"method": self.api_http_method,
"verify": self.api_verify_ssl_cert,
"dangerous_no_tls_verify": not self.validate_certs,
}

return api_config

def query_api(self, command, **args):

try:
res = getattr(self.cs, command)(**args)

if "errortext" in res:
self.fail_json(msg="Failed: '%s'" % res["errortext"])

except CloudStackException as e:
self.fail_json(msg="CloudStackException: %s" % to_native(e))

except Exception as e:
self.fail_json(msg=to_native(e))

# res.update({'params': self.result, 'query_params': args})
return res

0 comments on commit ca0a634

Please sign in to comment.