diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fd1b9ac..1e51519 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -72,7 +72,6 @@ jobs: include: - {salt-version: "3006.8", python-version: "3.10"} - {salt-version: "3007.1", python-version: "3.10"} - ## - {salt-version: "3007", python-version: "3.11"} ## - {salt-version: "3007", python-version: "3.12"} steps: @@ -195,7 +194,6 @@ jobs: include: - {salt-version: "3006.8", python-version: "3.10"} - {salt-version: "3007.1", python-version: "3.10"} - ## - {salt-version: "3007", python-version: "3.11"} ## - {salt-version: "3007", python-version: "3.12"} steps: @@ -323,7 +321,6 @@ jobs: include: - {salt-version: "3006.8", python-version: "3.10"} - {salt-version: "3007.1", python-version: "3.10"} - ## - {salt-version: "3007", python-version: "3.11"} ## - {salt-version: "3007", python-version: "3.12"} steps: diff --git a/docs/ref/modules/index.rst b/docs/ref/modules/index.rst index 9c5e97e..e7bd8b1 100644 --- a/docs/ref/modules/index.rst +++ b/docs/ref/modules/index.rst @@ -9,4 +9,5 @@ _________________ .. autosummary:: :toctree: + cassandra_cql cassandra_mod diff --git a/docs/ref/modules/saltext.cassandra.modules.cassandra_cql.rst b/docs/ref/modules/saltext.cassandra.modules.cassandra_cql.rst new file mode 100644 index 0000000..b9961b2 --- /dev/null +++ b/docs/ref/modules/saltext.cassandra.modules.cassandra_cql.rst @@ -0,0 +1,5 @@ +``cassandra_cql`` +================= + +.. automodule:: saltext.cassandra.modules.cassandra_cql + :members: diff --git a/docs/ref/returners/index.rst b/docs/ref/returners/index.rst index 3cec202..bd1a16c 100644 --- a/docs/ref/returners/index.rst +++ b/docs/ref/returners/index.rst @@ -9,4 +9,5 @@ ________________ .. autosummary:: :toctree: + cassandra_cql_return cassandra_mod diff --git a/docs/ref/returners/saltext.cassandra.returners.cassandra_cql_return.rst b/docs/ref/returners/saltext.cassandra.returners.cassandra_cql_return.rst new file mode 100644 index 0000000..27e9ea0 --- /dev/null +++ b/docs/ref/returners/saltext.cassandra.returners.cassandra_cql_return.rst @@ -0,0 +1,5 @@ +``cassandra_cql`` +================= + +.. automodule:: saltext.cassandra.returners.cassandra_cql_return + :members: diff --git a/src/saltext/cassandra/modules/cassandra_cql.py b/src/saltext/cassandra/modules/cassandra_cql.py new file mode 100644 index 0000000..6ce2fd6 --- /dev/null +++ b/src/saltext/cassandra/modules/cassandra_cql.py @@ -0,0 +1,1503 @@ +""" +Cassandra Database Module + +.. versionadded:: 2015.5.0 + +This module works with Cassandra v2 and v3 and hence generates +queries based on the internal schema of said version. + +:depends: DataStax Python Driver for Apache Cassandra + https://github.com/datastax/python-driver + pip install cassandra-driver +:referenced by: Salt's cassandra_cql returner +:configuration: + The Cassandra cluster members and connection port can either be specified + in the master or minion config, the minion's pillar or be passed to the module. + + Example configuration in the config for a single node: + + .. code-block:: yaml + + cassandra: + cluster: 192.168.50.10 + port: 9000 + + Example configuration in the config for a cluster: + + .. code-block:: yaml + + cassandra: + cluster: + - 192.168.50.10 + - 192.168.50.11 + - 192.168.50.12 + port: 9000 + username: cas_admin + + .. versionchanged:: 2016.11.0 + + Added support for ``ssl_options`` and ``protocol_version``. + + Example configuration with + `ssl options `_: + + If ``ssl_options`` are present in cassandra config the cassandra_cql returner + will use SSL. SSL isn't used if ``ssl_options`` isn't specified. + + .. code-block:: yaml + + cassandra: + cluster: + - 192.168.50.10 + - 192.168.50.11 + - 192.168.50.12 + port: 9000 + username: cas_admin + + ssl_options: + ca_certs: /etc/ssl/certs/ca-bundle.trust.crt + + # SSL version should be one from the ssl module + # This is an optional parameter + ssl_version: PROTOCOL_TLSv1 + + Additionally you can also specify the ``protocol_version`` to + `use `_. + + .. code-block:: yaml + + cassandra: + cluster: + - 192.168.50.10 + - 192.168.50.11 + - 192.168.50.12 + port: 9000 + username: cas_admin + + # defaults to 4, if not set + protocol_version: 3 + + Also all configuration could be passed directly to module as arguments. + + .. code-block:: bash + + salt minion1 cassandra_cql.info contact_points=delme-nextgen-01 port=9042 cql_user=cassandra cql_pass=cassandra protocol_version=4 + + salt minion1 cassandra_cql.info ssl_options='{"ca_certs": /path/to/-ca.crt}' + + We can also provide the load balancing policy as arguments + + .. code-block:: bash + + salt minion1 cassandra_cql.cql_query "alter user cassandra with password 'cassandra2' ;" contact_points=scylladb cql_user=user1 cql_pass=password port=9142 protocol_version=4 ssl_options='{"ca_certs": path-to-client-ca.crt}' load_balancing_policy=DCAwareRoundRobinPolicy load_balancing_policy_args='{"local_dc": "datacenter1"}' + +""" + +import logging +import re +import ssl + +import salt.utils.json +import salt.utils.versions +from salt.exceptions import CommandExecutionError + +SSL_VERSION = "ssl_version" + +log = logging.getLogger(__name__) + +__virtualname__ = "cassandra_cql" + +HAS_DRIVER = False +try: + # pylint: disable=import-error,no-name-in-module + from cassandra.auth import PlainTextAuthProvider + from cassandra.cluster import Cluster + from cassandra.cluster import NoHostAvailable + from cassandra.connection import ConnectionException + from cassandra.connection import ConnectionShutdown + from cassandra.connection import OperationTimedOut + from cassandra.policies import DCAwareRoundRobinPolicy + from cassandra.policies import ExponentialReconnectionPolicy + from cassandra.policies import HostDistance + from cassandra.policies import HostFilterPolicy + from cassandra.policies import IdentityTranslator + from cassandra.policies import LoadBalancingPolicy + from cassandra.policies import NoSpeculativeExecutionPlan + from cassandra.policies import NoSpeculativeExecutionPolicy + from cassandra.policies import RetryPolicy + from cassandra.policies import RoundRobinPolicy + from cassandra.policies import SimpleConvictionPolicy + from cassandra.policies import TokenAwarePolicy + from cassandra.policies import WhiteListRoundRobinPolicy + from cassandra.query import dict_factory + + # pylint: enable=import-error,no-name-in-module + HAS_DRIVER = True + + LOAD_BALANCING_POLICY_MAP = { + "HostDistance": HostDistance, + "LoadBalancingPolicy": LoadBalancingPolicy, + "RoundRobinPolicy": RoundRobinPolicy, + "DCAwareRoundRobinPolicy": DCAwareRoundRobinPolicy, + "WhiteListRoundRobinPolicy": WhiteListRoundRobinPolicy, + "TokenAwarePolicy": TokenAwarePolicy, + "HostFilterPolicy": HostFilterPolicy, + "SimpleConvictionPolicy": SimpleConvictionPolicy, + "ExponentialReconnectionPolicy": ExponentialReconnectionPolicy, + "RetryPolicy": RetryPolicy, + "IdentityTranslator": IdentityTranslator, + "NoSpeculativeExecutionPlan": NoSpeculativeExecutionPlan, + "NoSpeculativeExecutionPolicy": NoSpeculativeExecutionPolicy, + } + +except ImportError: + pass + + +def __virtual__(): + """ + Return virtual name of the module only if the python driver can be loaded. + + :return: The virtual name of the module. + :rtype: str + """ + if HAS_DRIVER: + return __virtualname__ + return (False, "Cannot load cassandra_cql module: python driver not found") + + +def _async_log_errors(errors): + log.error("Cassandra_cql asynchronous call returned: %s", errors) + + +def _get_lbp_policy(name, **policy_args): + """ + Returns the Load Balancer Policy class by name + """ + if name in LOAD_BALANCING_POLICY_MAP: + return LOAD_BALANCING_POLICY_MAP.get(name)(**policy_args) + else: + log.error("The policy %s is not available", name) + + +def _load_properties(property_name, config_option, set_default=False, default=None): + """ + Load properties for the cassandra module from config or pillar. + + :param property_name: The property to load. + :type property_name: str or list of str + :param config_option: The name of the config option. + :type config_option: str + :param set_default: Should a default be set if not found in config. + :type set_default: bool + :param default: The default value to be set. + :type default: str or int + :return: The property fetched from the configuration or default. + :rtype: str or list of str + """ + if not property_name: + log.debug("No property specified in function, trying to load from salt configuration") + try: + options = __salt__["config.option"]("cassandra", default={}) + except BaseException as e: + log.error("Failed to get cassandra config options. Reason: %s", e) + raise + + loaded_property = options.get(config_option) + if not loaded_property: + if set_default: + log.debug("Setting default Cassandra %s to %s", config_option, default) + loaded_property = default + else: + log.error( + "No cassandra %s specified in the configuration or passed to the module.", + config_option, + ) + raise CommandExecutionError(f"ERROR: Cassandra {config_option} cannot be empty.") + return loaded_property + return property_name + + +def _get_ssl_opts(): + """ + Parse out ssl_options for Cassandra cluster connection. + Make sure that the ssl_version (if any specified) is valid. + """ + sslopts = __salt__["config.option"]("cassandra", default={}).get("ssl_options", None) + ssl_opts = {} + + if sslopts: + ssl_opts["ca_certs"] = sslopts["ca_certs"] + if SSL_VERSION in sslopts: + if not sslopts[SSL_VERSION].startswith("PROTOCOL_"): + valid_opts = ", ".join([x for x in dir(ssl) if x.startswith("PROTOCOL_")]) + raise CommandExecutionError( + "Invalid protocol_version specified! Please make sure " + "that the ssl protocol version is one from the SSL " + "fmodule. Valid options are {valid_opts}" + ) + else: + ssl_opts[SSL_VERSION] = getattr(ssl, sslopts[SSL_VERSION]) + return ssl_opts + else: + return None + + +def _connect( + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Connect to a Cassandra cluster. + + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str or list of str + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The session and cluster objects. + :rtype: cluster object, session object + """ + # Lazy load the Cassandra cluster and session for this module by creating a + # cluster and session when cql_query is called the first time. Get the + # Cassandra cluster and session from this module's __context__ after it is + # loaded the first time cql_query is called. + # + # TODO: Call cluster.shutdown() when the module is unloaded on + # master/minion shutdown. Currently, Master.shutdown() and Minion.shutdown() + # do nothing to allow loaded modules to gracefully handle resources stored + # in __context__ (i.e. connection pools). This means that the connection + # pool is orphaned and Salt relies on Cassandra to reclaim connections. + # Perhaps if Master/Minion daemons could be enhanced to call an "__unload__" + # function, or something similar for each loaded module, connection pools + # and the like can be gracefully reclaimed/shutdown. + if ( + __context__ + and "cassandra_cql_returner_cluster" in __context__ + and "cassandra_cql_returner_session" in __context__ + ): + return ( + __context__["cassandra_cql_returner_cluster"], + __context__["cassandra_cql_returner_session"], + ) + else: + if contact_points is None: + contact_points = _load_properties(property_name=contact_points, config_option="cluster") + contact_points = ( + contact_points if isinstance(contact_points, list) else contact_points.split(",") + ) + if port is None: + port = _load_properties( + property_name=port, config_option="port", set_default=True, default=9042 + ) + if cql_user is None: + cql_user = _load_properties( + property_name=cql_user, + config_option="username", + set_default=True, + default="cassandra", + ) + if cql_pass is None: + cql_pass = _load_properties( + property_name=cql_pass, + config_option="password", + set_default=True, + default="cassandra", + ) + if protocol_version is None: + protocol_version = _load_properties( + property_name=protocol_version, + config_option="protocol_version", + set_default=True, + default=4, + ) + + if load_balancing_policy_args is None: + load_balancing_policy_args = _load_properties( + property_name=load_balancing_policy_args, + config_option="load_balancing_policy_args", + set_default=True, + default={}, + ) + + if load_balancing_policy is None: + load_balancing_policy = _load_properties( + property_name=load_balancing_policy, + config_option="load_balancing_policy", + set_default=True, + default="RoundRobinPolicy", + ) + + if load_balancing_policy_args: + lbp_policy_cls = _get_lbp_policy(load_balancing_policy, **load_balancing_policy_args) + else: + lbp_policy_cls = _get_lbp_policy(load_balancing_policy) + + try: + auth_provider = PlainTextAuthProvider(username=cql_user, password=cql_pass) + if ssl_options is None: + ssl_opts = _get_ssl_opts() + else: + ssl_opts = ssl_options + if ssl_opts: + cluster = Cluster( + contact_points, + port=port, + auth_provider=auth_provider, + ssl_options=ssl_opts, + protocol_version=protocol_version, + load_balancing_policy=lbp_policy_cls, + compression=True, + ) + else: + cluster = Cluster( + contact_points, + port=port, + auth_provider=auth_provider, + protocol_version=protocol_version, + load_balancing_policy=lbp_policy_cls, + compression=True, + ) + for recontimes in range(1, 4): + try: + session = cluster.connect() + break + except OperationTimedOut: + log.warning("Cassandra cluster.connect timed out, try %s", recontimes) + if recontimes >= 3: + raise + + # TODO: Call cluster.shutdown() when the module is unloaded on shutdown. + __context__["cassandra_cql_returner_cluster"] = cluster + __context__["cassandra_cql_returner_session"] = session + __context__["cassandra_cql_prepared"] = {} + + log.debug("Successfully connected to Cassandra cluster at %s", contact_points) + return cluster, session + except TypeError: + pass + except (ConnectionException, ConnectionShutdown, NoHostAvailable) as err: + log.error("Could not connect to Cassandra cluster at %s", contact_points) + # pylint: disable=W0707 + raise CommandExecutionError(str(err)) + + +def cql_query( + query, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Run a query on a Cassandra cluster and return a dictionary. + + :param query: The query to execute. + :type query: str + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param params: The parameters for the query, optional. + :type params: str + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: A dictionary from the return values of the query + :rtype: list[dict] + + CLI Example: + + .. code-block:: bash + + salt 'cassandra-server' cassandra_cql.cql_query "SELECT * FROM users_by_name WHERE first_name = 'jane'" + """ + try: + cluster, session = _connect( + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not get Cassandra cluster session.") + raise + except BaseException as e: + log.critical("Unexpected error while getting Cassandra cluster session: %s", e) + raise + + session.row_factory = dict_factory + ret = [] + + # Cassandra changed their internal schema from v2 to v3 + # If the query contains a dictionary sorted by versions + # Find the query for the current cluster version. + # https://issues.apache.org/jira/browse/CASSANDRA-6717 + if isinstance(query, dict): + cluster_version = version( + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + ) + match = re.match(r"^(\d+)\.(\d+)(?:\.(\d+))?", cluster_version) + major, minor, point = match.groups() + # try to find the specific version in the query dictionary + # then try the major version + # otherwise default to the highest version number + try: + query = query[cluster_version] + except KeyError: + query = query.get(major, max(query)) + log.debug("New query is: %s", query) + + try: + results = session.execute(query) + except BaseException as e: + log.error("Failed to execute query: %s\n reason: %s", query, e) + msg = f"ERROR: Cassandra query failed: {query} reason: {e}" + # pylint: disable=W0707 + raise CommandExecutionError(msg) + + if results: + for result in results: + values = {} + for key, value in result.items(): + # Salt won't return dictionaries with odd types like uuid.UUID + if not isinstance(value, str): + # Must support Cassandra collection types. + # Namely, Cassandras set, list, and map collections. + if not isinstance(value, (set, list, dict)): + value = str(value) + values[key] = value + ret.append(values) + + return ret + + +def cql_query_with_prepare( + query, + statement_name, + statement_arguments, + asynchronous=False, + callback_errors=None, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, + **kwargs, +): + """ + Run a query on a Cassandra cluster and return a dictionary. + + This function should not be used asynchronously for SELECTs -- it will not + return anything and we don't currently have a mechanism for handling a future + that will return results. + + :param query: The query to execute. + :type query: str + :param statement_name: Name to assign the prepared statement in the __context__ dictionary + :type statement_name: str + :param statement_arguments: Bind parameters for the SQL statement + :type statement_arguments: list[str] + :param asynchronous: Run this query in asynchronous mode + :type asynchronous: bool + :param async: Run this query in asynchronous mode (an alias to 'asynchronous') + NOTE: currently it overrides 'asynchronous' and it will be dropped in version 3001! + :type async: bool + :param callback_errors: Function to call after query runs if there is an error + :type callback_errors: Function callable + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param params: The parameters for the query, optional. + :type params: str + :param protocol_version: Cassandra protocol version to use. + :type port: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: A dictionary from the return values of the query + :rtype: list[dict] + + CLI Example: + + .. code-block:: bash + + # Insert data asynchronously + salt this-node cassandra_cql.cql_query_with_prepare "name_insert" "INSERT INTO USERS (first_name, last_name) VALUES (?, ?)" \ + statement_arguments=['John','Doe'], asynchronous=True + + # Select data, should not be asynchronous because there is not currently a facility to return data from a future + salt this-node cassandra_cql.cql_query_with_prepare "name_select" "SELECT * FROM USERS WHERE first_name=?" \ + statement_arguments=['John'] + """ + # Backward-compatibility with Python 3.7: "async" is a reserved word + if "async" in kwargs: + asynchronous = kwargs.get("async", False) + try: + cluster, session = _connect( + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=None, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=None, + ) + except CommandExecutionError: + log.critical("Could not get Cassandra cluster session.") + raise + except BaseException as e: + log.critical("Unexpected error while getting Cassandra cluster session: %s", e) + raise + + if statement_name not in __context__["cassandra_cql_prepared"]: + try: + bound_statement = session.prepare(query) + __context__["cassandra_cql_prepared"][statement_name] = bound_statement + except BaseException as e: + log.critical("Unexpected error while preparing SQL statement: %s", e) + raise + else: + bound_statement = __context__["cassandra_cql_prepared"][statement_name] + + session.row_factory = dict_factory + ret = [] + results = {} + + try: + if asynchronous: + future_results = session.execute_async(bound_statement.bind(statement_arguments)) + # future_results.add_callbacks(_async_log_errors) + else: + results = session.execute(bound_statement.bind(statement_arguments)) + except BaseException as e: + log.error("Failed to execute query: %s\n reason: %s", query, e) + msg = f"ERROR: Cassandra query failed: {query} reason: {e}" + # pylint: disable=W0707 + raise CommandExecutionError(msg) + + if results and not asynchronous: + for result in results: + values = {} + for key, value in result.items(): + # Salt won't return dictionaries with odd types like uuid.UUID + if not isinstance(value, str): + # Must support Cassandra collection types. + # Namely, Cassandras set, list, and map collections. + if not isinstance(value, (set, list, dict)): + value = str(value) + values[key] = value + ret.append(values) + + # If this was a synchronous call, then we either have an empty list + # because there was no return, or we have a return + # If this was an asynchronous call we only return the empty list + return ret + + +def version( + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Show the Cassandra version. + + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The version for this Cassandra cluster. + :rtype: str + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.version + + salt 'minion1' cassandra_cql.version contact_points=minion1 + """ + query = "select release_version from system.local limit 1;" + + try: + ret = cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not get Cassandra version.") + raise + except BaseException as e: + log.critical("Unexpected error while getting Cassandra version: %s", e) + raise + + return ret[0].get("release_version") + + +def info( + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Show the Cassandra information for this cluster. + + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The information for this Cassandra cluster. + :rtype: dict + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.info + + salt 'minion1' cassandra_cql.info contact_points=minion1 + """ + + query = """select cluster_name, + data_center, + partitioner, + host_id, + rack, + release_version, + cql_version, + schema_version, + thrift_version + from system.local + limit 1;""" + + ret = {} + + try: + ret = cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not list Cassandra info.") + raise + except BaseException as e: + log.critical("Unexpected error while listing Cassandra info: %s", e) + raise + + return ret + + +def list_keyspaces( + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + List keyspaces in a Cassandra cluster. + + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The keyspaces in this Cassandra cluster. + :rtype: list[dict] + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.list_keyspaces + + salt 'minion1' cassandra_cql.list_keyspaces contact_points=minion1 port=9000 + """ + query = { + "2": "select keyspace_name from system.schema_keyspaces;", + "3": "select keyspace_name from system_schema.keyspaces;", + } + + ret = {} + + try: + ret = cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not list keyspaces.") + raise + except BaseException as e: + log.critical("Unexpected error while listing keyspaces: %s", e) + raise + + return ret + + +def list_column_families( + keyspace=None, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + List column families in a Cassandra cluster for all keyspaces or just the provided one. + + :param keyspace: The keyspace to provide the column families for, optional. + :type keyspace: str + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The column families in this Cassandra cluster. + :rtype: list[dict] + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.list_column_families + + salt 'minion1' cassandra_cql.list_column_families contact_points=minion1 + + salt 'minion1' cassandra_cql.list_column_families keyspace=system + """ + where_clause = f"where keyspace_name = '{keyspace}'" if keyspace else "" + + query = { + "2": f"select columnfamily_name from system.schema_columnfamilies {where_clause};", + "3": f"select column_name from system_schema.columns {where_clause};", + } + + ret = {} + + try: + ret = cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not list column families.") + raise + except BaseException as e: + log.critical("Unexpected error while listing column families: %s", e) + raise + + return ret + + +def keyspace_exists( + keyspace, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Check if a keyspace exists in a Cassandra cluster. + + :param keyspace The keyspace name to check for. + :type keyspace: str + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The info for the keyspace or False if it does not exist. + :rtype: dict + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.keyspace_exists keyspace=system + """ + query = { + "2": ( + f"select keyspace_name from system.schema_keyspaces where keyspace_name = '{keyspace}';" + ), + "3": ( + f"select keyspace_name from system_schema.keyspaces where keyspace_name = '{keyspace}';" + ), + } + + try: + ret = cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not determine if keyspace exists.") + raise + except BaseException as e: + log.critical("Unexpected error while determining if keyspace exists: %s", e) + raise + + return True if ret else False + + +def create_keyspace( + keyspace, + replication_strategy="SimpleStrategy", + replication_factor=1, + replication_datacenters=None, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Create a new keyspace in Cassandra. + + :param keyspace: The keyspace name + :type keyspace: str + :param replication_strategy: either `SimpleStrategy` or `NetworkTopologyStrategy` + :type replication_strategy: str + :param replication_factor: number of replicas of data on multiple nodes. not used if using NetworkTopologyStrategy + :type replication_factor: int + :param replication_datacenters: string or dict of datacenter names to replication factors, required if using + NetworkTopologyStrategy (will be a dict if coming from state file). + :type replication_datacenters: str | dict[str, int] + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The info for the keyspace or False if it does not exist. + :rtype: dict + + CLI Example: + + .. code-block:: bash + + # CLI Example: + salt 'minion1' cassandra_cql.create_keyspace keyspace=newkeyspace + + salt 'minion1' cassandra_cql.create_keyspace keyspace=newkeyspace replication_strategy=NetworkTopologyStrategy \ + replication_datacenters='{"datacenter_1": 3, "datacenter_2": 2}' + """ + existing_keyspace = keyspace_exists( + keyspace, + contact_points=contact_points, + cql_user=cql_user, + cql_pass=cql_pass, + port=port, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + if not existing_keyspace: + # Add the strategy, replication_factor, etc. + replication_map = {"class": replication_strategy} + + if replication_datacenters: + if isinstance(replication_datacenters, str): + try: + replication_datacenter_map = salt.utils.json.loads(replication_datacenters) + replication_map.update(**replication_datacenter_map) + except BaseException: # pylint: disable=W0703 + log.error("Could not load json replication_datacenters.") + return False + else: + replication_map.update(**replication_datacenters) + else: + replication_map["replication_factor"] = replication_factor + + query = f"create keyspace {keyspace} with replication = {replication_map} and durable_writes = true;" + try: + cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not create keyspace.") + raise + except BaseException as e: + log.critical("Unexpected error while creating keyspace: %s", e) + raise + + +def drop_keyspace( + keyspace, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Drop a keyspace if it exists in a Cassandra cluster. + + :param keyspace: The keyspace to drop. + :type keyspace: str + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The info for the keyspace or False if it does not exist. + :rtype: dict + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.drop_keyspace keyspace=test + + salt 'minion1' cassandra_cql.drop_keyspace keyspace=test contact_points=minion1 + """ + existing_keyspace = keyspace_exists( + keyspace, + contact_points=contact_points, + cql_user=cql_user, + cql_pass=cql_pass, + port=port, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + if existing_keyspace: + query = f"""drop keyspace {keyspace};""" + try: + cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not drop keyspace.") + raise + except BaseException as e: + log.critical("Unexpected error while dropping keyspace: %s", e) + raise + + return True + + +def list_users( + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + List existing users in this Cassandra cluster. + + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: The list of existing users. + :rtype: dict + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.list_users + + salt 'minion1' cassandra_cql.list_users contact_points=minion1 + """ + query = "list users;" + + ret = {} + + try: + ret = cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not list users.") + raise + except BaseException as e: + log.critical("Unexpected error while listing users: %s", e) + raise + + return ret + + +def create_user( + username, + password, + superuser=False, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Create a new cassandra user with credentials and superuser status. + + :param username: The name of the new user. + :type username: str + :param password: The password of the new user. + :type password: str + :param superuser: Is the new user going to be a superuser? default: False + :type superuser: bool + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: + :rtype: + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.create_user username=joe password=secret + + salt 'minion1' cassandra_cql.create_user username=joe password=secret superuser=True + + salt 'minion1' cassandra_cql.create_user username=joe password=secret superuser=True contact_points=minion1 + """ + superuser_cql = "superuser" if superuser else "nosuperuser" + query = f"create user if not exists {username} with password '{password}' {superuser_cql};" + log.debug( + "Attempting to create a new user with username=%s superuser=%s", + username, + superuser_cql, + ) + + # The create user query doesn't actually return anything if the query succeeds. + # If the query fails, catch the exception, log a messange and raise it again. + try: + cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not create user.") + raise + except BaseException as e: + log.critical("Unexpected error while creating user: %s", e) + raise + + return True + + +def list_permissions( + username=None, + resource=None, + resource_type="keyspace", + permission=None, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + List permissions. + + :param username: The name of the user to list permissions for. + :type username: str + :param resource: The resource (keyspace or table), if None, permissions for all resources are listed. + :type resource: str + :param resource_type: The resource_type (keyspace or table), defaults to 'keyspace'. + :type resource_type: str + :param permission: A permission name (e.g. select), if None, all permissions are listed. + :type permission: str + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: Dictionary of permissions. + :rtype: dict + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.list_permissions + + salt 'minion1' cassandra_cql.list_permissions username=joe resource=test_keyspace permission=select + + salt 'minion1' cassandra_cql.list_permissions username=joe resource=test_table resource_type=table \ + permission=select contact_points=minion1 + """ + keyspace_cql = f"{resource_type} {resource}" if resource else "all keyspaces" + permission_cql = f"{permission} permission" if permission else "all permissions" + query = f"list {permission_cql} on {keyspace_cql}" + + if username: + query = f"{query} of {username}" + + log.debug("Attempting to list permissions with query '%s'", query) + + ret = {} + + try: + ret = cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not list permissions.") + raise + except BaseException as e: + log.critical("Unexpected error while listing permissions: %s", e) + raise + + return ret + + +def grant_permission( + username, + resource=None, + resource_type="keyspace", + permission=None, + contact_points=None, + port=None, + cql_user=None, + cql_pass=None, + protocol_version=None, + load_balancing_policy=None, + load_balancing_policy_args=None, + ssl_options=None, +): + """ + Grant permissions to a user. + + :param username: The name of the user to grant permissions to. + :type username: str + :param resource: The resource (keyspace or table), if None, permissions for all resources are granted. + :type resource: str + :param resource_type: The resource_type (keyspace or table), defaults to 'keyspace'. + :type resource_type: str + :param permission: A permission name (e.g. select), if None, all permissions are granted. + :type permission: str + :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. + :type contact_points: str | list[str] + :param cql_user: The Cassandra user if authentication is turned on. + :type cql_user: str + :param cql_pass: The Cassandra user password if authentication is turned on. + :type cql_pass: str + :param port: The Cassandra cluster port, defaults to None. + :type port: int + :param protocol_version: Cassandra protocol version to use. + :type protocol_version: int + :param load_balancing_policy: cassandra.policy class name to use + :type load_balancing_policy: str + :param load_balancing_policy_args: cassandra.policy constructor args + :type load_balancing_policy_args: dict + :param ssl_options: Cassandra protocol version to use. + :type ssl_options: dict + :return: + :rtype: + + CLI Example: + + .. code-block:: bash + + salt 'minion1' cassandra_cql.grant_permission + + salt 'minion1' cassandra_cql.grant_permission username=joe resource=test_keyspace permission=select + + salt 'minion1' cassandra_cql.grant_permission username=joe resource=test_table resource_type=table \ + permission=select contact_points=minion1 + """ + permission_cql = f"grant {permission}" if permission else "grant all permissions" + resource_cql = f"on {resource_type} {resource}" if resource else "on all keyspaces" + query = f"{permission_cql} {resource_cql} to {username}" + log.debug("Attempting to grant permissions with query '%s'", query) + + try: + cql_query( + query, + contact_points=contact_points, + port=port, + cql_user=cql_user, + cql_pass=cql_pass, + protocol_version=protocol_version, + load_balancing_policy=load_balancing_policy, + load_balancing_policy_args=load_balancing_policy_args, + ssl_options=ssl_options, + ) + except CommandExecutionError: + log.critical("Could not grant permissions.") + raise + except BaseException as e: + log.critical("Unexpected error while granting permissions: %s", e) + raise + + return True diff --git a/src/saltext/cassandra/returners/cassandra_cql_return.py b/src/saltext/cassandra/returners/cassandra_cql_return.py new file mode 100644 index 0000000..2aad0c7 --- /dev/null +++ b/src/saltext/cassandra/returners/cassandra_cql_return.py @@ -0,0 +1,453 @@ +""" +Return data to a cassandra server + +.. versionadded:: 2015.5.0 + +:maintainer: Corin Kochenower +:maturity: new as of 2015.2 +:depends: salt.modules.cassandra_cql +:depends: DataStax Python Driver for Apache Cassandra + https://github.com/datastax/python-driver + pip install cassandra-driver +:platform: all + +:configuration: + To enable this returner, the minion will need the DataStax Python Driver + for Apache Cassandra ( https://github.com/datastax/python-driver ) + installed and the following values configured in the minion or master + config. The list of cluster IPs must include at least one cassandra node + IP address. No assumption or default will be used for the cluster IPs. + The cluster IPs will be tried in the order listed. The port, username, + and password values shown below will be the assumed defaults if you do + not provide values.: + + .. code-block:: yaml + + cassandra: + cluster: + - 192.168.50.11 + - 192.168.50.12 + - 192.168.50.13 + port: 9042 + username: salt + password: salt + + Use the following cassandra database schema: + + .. code-block:: text + + CREATE KEYSPACE IF NOT EXISTS salt + WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}; + + CREATE USER IF NOT EXISTS salt WITH PASSWORD 'salt' NOSUPERUSER; + + GRANT ALL ON KEYSPACE salt TO salt; + + USE salt; + + CREATE TABLE IF NOT EXISTS salt.salt_returns ( + jid text, + minion_id text, + fun text, + alter_time timestamp, + full_ret text, + return text, + success boolean, + PRIMARY KEY (jid, minion_id, fun) + ) WITH CLUSTERING ORDER BY (minion_id ASC, fun ASC); + CREATE INDEX IF NOT EXISTS salt_returns_minion_id ON salt.salt_returns (minion_id); + CREATE INDEX IF NOT EXISTS salt_returns_fun ON salt.salt_returns (fun); + + CREATE TABLE IF NOT EXISTS salt.jids ( + jid text PRIMARY KEY, + load text + ); + + CREATE TABLE IF NOT EXISTS salt.minions ( + minion_id text PRIMARY KEY, + last_fun text + ); + CREATE INDEX IF NOT EXISTS minions_last_fun ON salt.minions (last_fun); + + CREATE TABLE IF NOT EXISTS salt.salt_events ( + id timeuuid, + tag text, + alter_time timestamp, + data text, + master_id text, + PRIMARY KEY (id, tag) + ) WITH CLUSTERING ORDER BY (tag ASC); + CREATE INDEX tag ON salt.salt_events (tag); + + +Required python modules: cassandra-driver + +To use the cassandra returner, append '--return cassandra_cql' to the salt command. ex: + +.. code-block:: bash + + salt '*' test.ping --return_cql cassandra + +Note: if your Cassandra instance has not been tuned much you may benefit from +altering some timeouts in `cassandra.yaml` like so: + +.. code-block:: yaml + + # How long the coordinator should wait for read operations to complete + read_request_timeout_in_ms: 5000 + # How long the coordinator should wait for seq or index scans to complete + range_request_timeout_in_ms: 20000 + # How long the coordinator should wait for writes to complete + write_request_timeout_in_ms: 20000 + # How long the coordinator should wait for counter writes to complete + counter_write_request_timeout_in_ms: 10000 + # How long a coordinator should continue to retry a CAS operation + # that contends with other proposals for the same row + cas_contention_timeout_in_ms: 5000 + # How long the coordinator should wait for truncates to complete + # (This can be much longer, because unless auto_snapshot is disabled + # we need to flush first so we can snapshot before removing the data.) + truncate_request_timeout_in_ms: 60000 + # The default timeout for other, miscellaneous operations + request_timeout_in_ms: 20000 + +As always, your mileage may vary and your Cassandra cluster may have different +needs. SaltStack has seen situations where these timeouts can resolve +some stacktraces that appear to come from the Datastax Python driver. + +""" + +import logging +import time +import uuid + +import salt.exceptions +import salt.returners +import salt.utils.jid +import salt.utils.json +from salt.exceptions import CommandExecutionError + +try: + # The following imports are not directly required by this module. Rather, + # they are required by the modules/cassandra_cql execution module, on which + # this module depends. + # + # This returner cross-calls the cassandra_cql execution module using the __salt__ dunder. + # + # The modules/cassandra_cql execution module will not load if the DataStax Python Driver + # for Apache Cassandra is not installed. + # + # This module will try to load all of the 3rd party dependencies on which the + # modules/cassandra_cql execution module depends. + # + # Effectively, if the DataStax Python Driver for Apache Cassandra is not + # installed, both the modules/cassandra_cql execution module and this returner module + # will not be loaded by Salt's loader system. + # pylint: disable=unused-import,no-name-in-module + from cassandra.auth import PlainTextAuthProvider + from cassandra.cluster import Cluster + from cassandra.cluster import NoHostAvailable + from cassandra.connection import ConnectionException + from cassandra.connection import ConnectionShutdown + from cassandra.query import dict_factory + + # pylint: enable=unused-import,no-name-in-module + HAS_CASSANDRA_DRIVER = True +except ImportError as e: + HAS_CASSANDRA_DRIVER = False + +log = logging.getLogger(__name__) + +# Define the module's virtual name +# +# The 'cassandra' __virtualname__ is already taken by the +# returners/cassandra_return module, which utilizes nodetool. This module +# cross-calls the modules/cassandra_cql execution module, which uses the +# DataStax Python Driver for Apache Cassandra. Namespacing allows both the +# modules/cassandra_cql and returners/cassandra_cql modules to use the +# virtualname 'cassandra_cql'. +__virtualname__ = "cassandra_cql" + + +def __virtual__(): + if not HAS_CASSANDRA_DRIVER: + return ( + False, + "Could not import cassandra_cql returner; cassandra-driver is not installed.", + ) + + return True + + +def returner(ret): + """ + Return data to one of potentially many clustered cassandra nodes + """ + query = """INSERT INTO salt.salt_returns ( + jid, minion_id, fun, alter_time, full_ret, return, success + ) VALUES (?, ?, ?, ?, ?, ?, ?)""" + + ret_fun = ret["fun"] + ret_id = ret["id"] + ret_jid = ret["jid"] + statement_arguments = [ + f"{ret_jid}", + f"{ret_id}", + f"{ret_fun}", + int(time.time() * 1000), + salt.utils.json.dumps(ret).replace("'", "''"), + salt.utils.json.dumps(ret["return"]).replace("'", "''"), + ret.get("success", False), + ] + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + __salt__["cassandra_cql.cql_query_with_prepare"]( + query, "returner_return", tuple(statement_arguments), asynchronous=True + ) + except CommandExecutionError: + log.critical("Could not insert into salt_returns with Cassandra returner.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical("Unexpected error while inserting into salt_returns: %s", e) + raise + + # Store the last function called by the minion + # The data in salt.minions will be used by get_fun and get_minions + query = """INSERT INTO salt.minions ( + minion_id, last_fun + ) VALUES (?, ?)""" + + statement_arguments = [f"{ret_id}", f"{ret_fun}"] + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + __salt__["cassandra_cql.cql_query_with_prepare"]( + query, "returner_minion", tuple(statement_arguments), asynchronous=True + ) + except CommandExecutionError: + log.critical("Could not store minion ID with Cassandra returner.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical( + "Unexpected error while inserting minion ID into the minions table: %s", + e, + ) + raise + + +def event_return(events): + """ + Return event to one of potentially many clustered cassandra nodes + + Requires that configuration be enabled via 'event_return' + option in master config. + + Cassandra does not support an auto-increment feature due to the + highly inefficient nature of creating a monotonically increasing + number across all nodes in a distributed database. Each event + will be assigned a uuid by the connecting client. + """ + for event in events: + tag = event.get("tag", "") + data = event.get("data", "") + query = """INSERT INTO salt.salt_events ( + id, alter_time, data, master_id, tag + ) VALUES ( + ?, ?, ?, ?, ?) + """ + statement_arguments = [ + str(uuid.uuid1()), + int(time.time() * 1000), + salt.utils.json.dumps(data).replace("'", "''"), + __opts__["id"], + tag, + ] + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + __salt__["cassandra_cql.cql_query_with_prepare"]( + query, "salt_events", statement_arguments, asynchronous=True + ) + except CommandExecutionError: + log.critical("Could not store events with Cassandra returner.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical("Unexpected error while inserting into salt_events: %s", e) + raise + + +def save_load(jid, load, minions=None): + """ + Save the load to the specified jid id + """ + # Load is being stored as a text datatype. Single quotes are used in the + # VALUES list. Therefore, all single quotes contained in the results from + # salt.utils.json.dumps(load) must be escaped Cassandra style. + query = """INSERT INTO salt.jids ( + jid, load + ) VALUES (?, ?)""" + + statement_arguments = [jid, salt.utils.json.dumps(load).replace("'", "''")] + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + __salt__["cassandra_cql.cql_query_with_prepare"]( + query, "save_load", statement_arguments, asynchronous=True + ) + except CommandExecutionError: + log.critical("Could not save load in jids table.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical("Unexpected error while inserting into jids: %s", e) + raise + + +def save_minions(jid, minions, syndic_id=None): # pylint: disable=unused-argument + """ + Included for API consistency + """ + + +# salt-run jobs.list_jobs FAILED +def get_load(jid): + """ + Return the load data that marks a specified jid + """ + query = """SELECT load FROM salt.jids WHERE jid = ?;""" + + ret = {} + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + data = __salt__["cassandra_cql.cql_query_with_prepare"](query, "get_load", [jid]) + if data: + load = data[0].get("load") + if load: + ret = salt.utils.json.loads(load) + except CommandExecutionError: + log.critical("Could not get load from jids table.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical("Unexpected error while getting load from jids: %s", e) + raise + + return ret + + +# salt-call ret.get_jid cassandra_cql 20150327234537907315 PASSED +def get_jid(jid): + """ + Return the information returned when the specified job id was executed + """ + query = """SELECT minion_id, full_ret FROM salt.salt_returns WHERE jid = ?;""" + + ret = {} + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + data = __salt__["cassandra_cql.cql_query_with_prepare"](query, "get_jid", [jid]) + if data: + for row in data: + minion = row.get("minion_id") + full_ret = row.get("full_ret") + if minion and full_ret: + ret[minion] = salt.utils.json.loads(full_ret) + except CommandExecutionError: + log.critical("Could not select job specific information.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical("Unexpected error while getting job specific information: %s", e) + raise + + return ret + + +# salt-call ret.get_fun cassandra_cql test.ping PASSED +def get_fun(fun): + """ + Return a dict of the last function called for all minions + """ + query = """SELECT minion_id, last_fun FROM salt.minions where last_fun = ?;""" + + ret = {} + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + data = __salt__["cassandra_cql.cql_query"](query, "get_fun", [fun]) + if data: + for row in data: + minion = row.get("minion_id") + last_fun = row.get("last_fun") + if minion and last_fun: + ret[minion] = last_fun + except CommandExecutionError: + log.critical("Could not get the list of minions.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical("Unexpected error while getting list of minions: %s", e) + raise + + return ret + + +# salt-call ret.get_jids cassandra_cql PASSED +def get_jids(): + """ + Return a list of all job ids + """ + query = """SELECT jid, load FROM salt.jids;""" + + ret = {} + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + data = __salt__["cassandra_cql.cql_query"](query) + if data: + for row in data: + jid = row.get("jid") + load = row.get("load") + if jid and load: + ret[jid] = salt.utils.jid.format_jid_instance(jid, salt.utils.json.loads(load)) + except CommandExecutionError: + log.critical("Could not get a list of all job ids.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical("Unexpected error while getting list of all job ids: %s", e) + raise + + return ret + + +# salt-call ret.get_minions cassandra_cql PASSED +def get_minions(): + """ + Return a list of minions + """ + query = """SELECT DISTINCT minion_id FROM salt.minions;""" + + ret = [] + + # cassandra_cql.cql_query may raise a CommandExecutionError + try: + data = __salt__["cassandra_cql.cql_query"](query) + if data: + for row in data: + minion = row.get("minion_id") + if minion: + ret.append(minion) + except CommandExecutionError: + log.critical("Could not get the list of minions.") + raise + except Exception as e: # pylint: disable=broad-except + log.critical("Unexpected error while getting list of minions: %s", e) + raise + + return ret + + +def prep_jid(nocache, passed_jid=None): # pylint: disable=unused-argument + """ + Do any work necessary to prepare a JID, including sending a custom id + """ + return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__) diff --git a/tests/support/mock.py b/tests/support/mock.py new file mode 100644 index 0000000..eae40cf --- /dev/null +++ b/tests/support/mock.py @@ -0,0 +1,472 @@ +""" + :codeauthor: Pedro Algarvio (pedro@algarvio.me) + + tests.support.mock + ~~~~~~~~~~~~~~~~~~ + + Helper module that wraps `mock` and provides some fake objects in order to + properly set the function/class decorators and yet skip the test case's + execution. + + Note: mock >= 2.0.0 required since unittest.mock does not have + MagicMock.assert_called in Python < 3.6. +""" + +# pylint: disable=unused-import,function-redefined + +import copy +import errno +import fnmatch +import sys + +# By these days, we should blowup if mock is not available +from unittest import mock + +# pylint: disable=no-name-in-module,no-member +from unittest.mock import ANY +from unittest.mock import DEFAULT +from unittest.mock import FILTER_DIR +from unittest.mock import AsyncMock +from unittest.mock import MagicMock +from unittest.mock import Mock +from unittest.mock import NonCallableMagicMock +from unittest.mock import NonCallableMock +from unittest.mock import PropertyMock +from unittest.mock import __version__ +from unittest.mock import call +from unittest.mock import create_autospec +from unittest.mock import patch +from unittest.mock import sentinel + +import salt.utils.stringutils + +# pylint: disable=no-name-in-module,no-member + + +__mock_version = tuple( + int(part) for part in mock.__version__.split(".") if part.isdigit() +) # pylint: disable=no-member + + +class MockFH: + def __init__(self, filename, read_data, *args, **kwargs): + self.filename = filename + self.read_data = read_data + try: + self.mode = args[0] + except IndexError: + self.mode = kwargs.get("mode", "r") + self.binary_mode = "b" in self.mode + self.read_mode = any(x in self.mode for x in ("r", "+")) + self.write_mode = any(x in self.mode for x in ("w", "a", "+")) + self.empty_string = b"" if self.binary_mode else "" + self.call = MockCall(filename, *args, **kwargs) + self.read_data_iter = self._iterate_read_data(read_data) + self.read = Mock(side_effect=self._read) + self.readlines = Mock(side_effect=self._readlines) + self.readline = Mock(side_effect=self._readline) + self.write = Mock(side_effect=self._write) + self.writelines = Mock(side_effect=self._writelines) + self.close = Mock() + self.seek = Mock(side_effect=self._seek) + self.__loc = 0 + self.__read_data_ok = False + + def _iterate_read_data(self, read_data): + """ + Helper for mock_open: + Retrieve lines from read_data via a generator so that separate calls to + readline, read, and readlines are properly interleaved + """ + # Newline will always be a bytestring on PY2 because mock_open will have + # normalized it to one. + newline = b"\n" if isinstance(read_data, bytes) else "\n" + + read_data = [line + newline for line in read_data.split(newline)] + + if read_data[-1] == newline: + # If the last line ended in a newline, the list comprehension will have an + # extra entry that's just a newline. Remove this. + read_data = read_data[:-1] + else: + # If there wasn't an extra newline by itself, then the file being + # emulated doesn't have a newline to end the last line, so remove the + # newline that we added in the list comprehension. + read_data[-1] = read_data[-1][:-1] + + yield from read_data + + @property + def write_calls(self): + """ + Return a list of all calls to the .write() mock + """ + return [x[1][0] for x in self.write.mock_calls] + + @property + def writelines_calls(self): + """ + Return a list of all calls to the .writelines() mock + """ + return [x[1][0] for x in self.writelines.mock_calls] + + def tell(self): + return self.__loc + + def __check_read_data(self): + if not self.__read_data_ok: + if self.binary_mode: + if not isinstance(self.read_data, bytes): + raise TypeError( + f"{self.filename} opened in binary mode, expected read_data to be " + f"bytes, not {type(self.read_data).__name__}" + ) + else: + if not isinstance(self.read_data, str): + raise TypeError( + f"{self.filename} opened in non-binary mode, expected read_data to " + f"be str, not {type(self.read_data).__name__}" + ) + # No need to repeat this the next time we check + self.__read_data_ok = True + + def _read(self, size=0): + self.__check_read_data() + if not self.read_mode: + raise OSError("File not open for reading") + if not isinstance(size, int) or size < 0: + raise TypeError("a positive integer is required") + + joined = self.empty_string.join(self.read_data_iter) + if not size: + # read() called with no args, return everything + self.__loc += len(joined) + return joined + else: + # read() called with an explicit size. Return a slice matching the + # requested size, but before doing so, reset read_data to reflect + # what we read. + self.read_data_iter = self._iterate_read_data(joined[size:]) + ret = joined[:size] + self.__loc += len(ret) + return ret + + def _readlines(self, size=None): # pylint: disable=unused-argument + # TODO: Implement "size" argument + self.__check_read_data() + if not self.read_mode: + raise OSError("File not open for reading") + ret = list(self.read_data_iter) + self.__loc += sum(len(x) for x in ret) + return ret + + def _readline(self, size=None): # pylint: disable=unused-argument + # TODO: Implement "size" argument + self.__check_read_data() + if not self.read_mode: + raise OSError("File not open for reading") + try: + ret = next(self.read_data_iter) + self.__loc += len(ret) + return ret + except StopIteration: + return self.empty_string + + def __iter__(self): + self.__check_read_data() + if not self.read_mode: + raise OSError("File not open for reading") + while True: + try: + ret = next(self.read_data_iter) + self.__loc += len(ret) + yield ret + except StopIteration: + break + + def _write(self, content): + if not self.write_mode: + raise OSError("File not open for writing") + else: + content_type = type(content) + if self.binary_mode and content_type is not bytes: + raise TypeError(f"a bytes-like object is required, not '{content_type.__name__}'") + elif not self.binary_mode and content_type is not str: + raise TypeError(f"write() argument must be str, not {content_type.__name__}") + + def _writelines(self, lines): + if not self.write_mode: + raise OSError("File not open for writing") + for line in lines: + self._write(line) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): # pylint: disable=unused-argument + pass + + # For some reason this gets called with additional args on Windows when + # running the following test: + # tests/pytests/unit/beacons/test_log_beacon.py::test_log_match + # Let's just absorb them with *args + def _seek(self, pos=0, *args): + self.__loc = pos + self.read_data_iter = self._iterate_read_data(self.read_data) + + +class MockCall: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + def __repr__(self): + ret = "MockCall(" + for arg in self.args: + ret += repr(arg) + ", " + if not self.kwargs: + if self.args: + # Remove trailing ', ' + ret = ret[:-2] + else: + for key, val in self.kwargs.items(): + ret += f"{salt.utils.stringutils.to_str(key)}={repr(val)}" + ret += ")" + return ret + + def __str__(self): + return self.__repr__() + + def __eq__(self, other): + return self.args == other.args and self.kwargs == other.kwargs + + +class MockOpen: + r''' + This class can be used to mock the use of ``open()``. + + ``read_data`` is a string representing the contents of the file to be read. + By default, this is an empty string. + + Optionally, ``read_data`` can be a dictionary mapping ``fnmatch.fnmatch()`` + patterns to strings (or optionally, exceptions). This allows the mocked + filehandle to serve content for more than one file path. + + .. code-block:: python + + data = { + '/etc/foo.conf': textwrap.dedent("""\ + Foo + Bar + Baz + """), + '/etc/bar.conf': textwrap.dedent("""\ + A + B + C + """), + } + with patch('salt.utils.files.fopen', mock_open(read_data=data): + do stuff + + If the file path being opened does not match any of the glob expressions, + an IOError will be raised to simulate the file not existing. + + Passing ``read_data`` as a string is equivalent to passing it with a glob + expression of "*". That is to say, the below two invocations are + equivalent: + + .. code-block:: python + + mock_open(read_data='foo\n') + mock_open(read_data={'*': 'foo\n'}) + + Instead of a string representing file contents, ``read_data`` can map to an + exception, and that exception will be raised if a file matching that + pattern is opened: + + .. code-block:: python + + data = { + '/etc/*': IOError(errno.EACCES, 'Permission denied'), + '*': 'Hello world!\n', + } + with patch('salt.utils.files.fopen', mock_open(read_data=data)): + do stuff + + The above would raise an exception if any files within /etc are opened, but + would produce a mocked filehandle if any other file is opened. + + To simulate file contents changing upon subsequent opens, the file contents + can be a list of strings/exceptions. For example: + + .. code-block:: python + + data = { + '/etc/foo.conf': [ + 'before\n', + 'after\n', + ], + '/etc/bar.conf': [ + IOError(errno.ENOENT, 'No such file or directory', '/etc/bar.conf'), + 'Hey, the file exists now!', + ], + } + with patch('salt.utils.files.fopen', mock_open(read_data=data): + do stuff + + The first open of ``/etc/foo.conf`` would return "before\n" when read, + while the second would return "after\n" when read. For ``/etc/bar.conf``, + the first read would raise an exception, while the second would open + successfully and read the specified string. + + Expressions will be attempted in dictionary iteration order (the exception + being ``*`` which is tried last), so if a file path matches more than one + fnmatch expression then the first match "wins". If your use case calls for + overlapping expressions, then an OrderedDict can be used to ensure that the + desired matching behavior occurs: + + .. code-block:: python + + data = OrderedDict() + data['/etc/foo.conf'] = 'Permission granted!' + data['/etc/*'] = IOError(errno.EACCES, 'Permission denied') + data['*'] = '*': 'Hello world!\n' + with patch('salt.utils.files.fopen', mock_open(read_data=data): + do stuff + + The following attributes are tracked for the life of a mock object: + + * call_count - Tracks how many fopen calls were attempted + * filehandles - This is a dictionary mapping filenames to lists of MockFH + objects, representing the individual times that a given file was opened. + ''' + + def __init__(self, read_data=""): + # If the read_data contains lists, we will be popping it. So, don't + # modify the original value passed. + read_data = copy.copy(read_data) + + # Normalize read_data, Python 2 filehandles should never produce unicode + # types on read. + if not isinstance(read_data, dict): + read_data = {"*": read_data} + + self.read_data = read_data + self.filehandles = {} + self.calls = [] + self.call_count = 0 + + def __call__(self, name, *args, **kwargs): + """ + Match the file being opened to the patterns in the read_data and spawn + a mocked filehandle with the corresponding file contents. + """ + call = MockCall(name, *args, **kwargs) + self.calls.append(call) + self.call_count += 1 + for pat in self.read_data: + if pat == "*": + continue + if fnmatch.fnmatch(name, pat): + matched_pattern = pat + break + else: + # No non-glob match in read_data, fall back to '*' + matched_pattern = "*" + try: + matched_contents = self.read_data[matched_pattern] + try: + # Assuming that the value for the matching expression is a + # list, pop the first element off of it. + file_contents = matched_contents.pop(0) + except AttributeError: + # The value for the matching expression is a string (or exception) + file_contents = matched_contents + except IndexError: + # We've run out of file contents, abort! + # pylint: disable=W0707 + raise RuntimeError( + f"File matching expression '{matched_pattern}' opened more times than expected" + ) + + try: + # Raise the exception if the matched file contents are an + # instance of an exception class. + raise file_contents + except TypeError: + # Contents were not an exception, so proceed with creating the + # mocked filehandle. + pass + + ret = MockFH(name, file_contents, *args, **kwargs) + self.filehandles.setdefault(name, []).append(ret) + return ret + except KeyError: + # No matching glob in read_data, treat this as a file that does + # not exist and raise the appropriate exception. + # pylint: disable=W0707 + raise OSError(errno.ENOENT, "No such file or directory", name) + + def write_calls(self, path=None): + """ + Returns the contents passed to all .write() calls. Use `path` to narrow + the results to files matching a given pattern. + """ + ret = [] + for filename, handles in self.filehandles.items(): + if path is None or fnmatch.fnmatch(filename, path): + for fh_ in handles: + ret.extend(fh_.write_calls) + return ret + + def writelines_calls(self, path=None): + """ + Returns the contents passed to all .writelines() calls. Use `path` to + narrow the results to files matching a given pattern. + """ + ret = [] + for filename, handles in self.filehandles.items(): + if path is None or fnmatch.fnmatch(filename, path): + for fh_ in handles: + ret.extend(fh_.writelines_calls) + return ret + + +class MockTimedProc: + """ + Class used as a stand-in for salt.utils.timed_subprocess.TimedProc + """ + + class _Process: + """ + Used to provide a dummy "process" attribute + """ + + def __init__(self, returncode=0, pid=12345): + self.returncode = returncode + self.pid = pid + + def __init__(self, stdout=None, stderr=None, returncode=0, pid=12345): + if stdout is not None and not isinstance(stdout, bytes): + raise TypeError("Must pass stdout to MockTimedProc as bytes") + if stderr is not None and not isinstance(stderr, bytes): + raise TypeError("Must pass stderr to MockTimedProc as bytes") + self._stdout = stdout + self._stderr = stderr + self.process = self._Process(returncode=returncode, pid=pid) + + def run(self): + pass + + @property + def stdout(self): + return self._stdout + + @property + def stderr(self): + return self._stderr + + +# reimplement mock_open to support multiple filehandles +mock_open = MockOpen diff --git a/tests/unit/modules/test_cassandra_cql.py b/tests/unit/modules/test_cassandra_cql.py new file mode 100644 index 0000000..b9d9ea5 --- /dev/null +++ b/tests/unit/modules/test_cassandra_cql.py @@ -0,0 +1,325 @@ +""" +Test case for the cassandra_cql module +""" + +import logging +import ssl +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from salt.exceptions import CommandExecutionError + +## import salt.modules.cassandra_cql as cassandra_cql +import saltext.cassandra.modules.cassandra_cql as cassandra_cql + +log = logging.getLogger(__name__) + + +pytestmark = pytest.mark.skipif( + not cassandra_cql.HAS_DRIVER, reason="Cassandra CQL driver not loaded" +) + + +@pytest.fixture +def configure_loader_modules(): + return {cassandra_cql: {}} + + +def test_cql_query(caplog): + """ + Test salt.modules.cassandra_cql.cql_query function + """ + + mock_session = MagicMock() + mock_client = MagicMock() + mock = MagicMock(return_value=(mock_session, mock_client)) + query = "query" + with patch.object(cassandra_cql, "_connect", mock): + query_result = cassandra_cql.cql_query(query) + + # pylint: disable=C1803 + assert query_result == [] + + query = {"5.0.1": "query1", "5.0.0": "query2"} + mock_version = MagicMock(return_value="5.0.1") + mock_session = MagicMock() + mock_client = MagicMock() + mock = MagicMock(return_value=(mock_session, mock_client)) + with patch.object(cassandra_cql, "version", mock_version): + with patch.object(cassandra_cql, "_connect", mock): + query_result = cassandra_cql.cql_query(query) + # pylint: disable=C1803 + assert query_result == [] + + +def test_cql_query_with_prepare(caplog): + """ + Test salt.modules.cassandra_cql.cql_query_with_prepare function + """ + + mock_session = MagicMock() + mock_client = MagicMock() + mock = MagicMock(return_value=(mock_session, mock_client)) + query = "query" + statement_args = {"arg1": "test"} + + mock_context = MagicMock(return_value={"cassandra_cql_prepared": {"statement_name": query}}) + with patch.object(cassandra_cql, "__context__", mock_context): + with patch.object(cassandra_cql, "_connect", mock): + query_result = cassandra_cql.cql_query_with_prepare( + query, "statement_name", statement_args + ) + # pylint: disable=C1803 + assert query_result == [] + + +def test_version(caplog): + """ + Test salt.modules.cassandra_cql.version function + """ + mock_cql_query = MagicMock(return_value=[{"release_version": "5.0.1"}]) + + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + version = cassandra_cql.version() + assert version == "5.0.1" + + mock_cql_query = MagicMock(side_effect=CommandExecutionError) + with pytest.raises(CommandExecutionError) as err: + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + version = cassandra_cql.version() + assert f"{err.value}" == "" + assert "Could not get Cassandra version." in caplog.text + for record in caplog.records: + assert record.levelname == "CRITICAL" + + +def test_info(): + """ + Test salt.modules.cassandra_cql.info function + """ + expected = {"result": "info"} + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + info = cassandra_cql.info() + + assert info == expected + + +def test_list_keyspaces(): + """ + Test salt.modules.cassandra_cql.list_keyspaces function + """ + expected = [{"keyspace_name": "name1"}, {"keyspace_name": "name2"}] + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + keyspaces = cassandra_cql.list_keyspaces() + + assert keyspaces == expected + + +def test_list_column_families(): + """ + Test salt.modules.cassandra_cql.list_column_families function + """ + expected = [{"colum_name": "column1"}, {"column_name": "column2"}] + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + columns = cassandra_cql.list_column_families() + + assert columns == expected + + +def test_keyspace_exists(): + """ + Test salt.modules.cassandra_cql.keyspace_exists function + """ + expected = "keyspace" + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + exists = cassandra_cql.keyspace_exists("keyspace") + + assert exists == bool(expected) + + expected = [] + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + exists = cassandra_cql.keyspace_exists("keyspace") + + assert exists == bool(expected) + + +def test_create_keyspace(): + """ + Test salt.modules.cassandra_cql.create_keyspace function + """ + expected = None + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + result = cassandra_cql.create_keyspace("keyspace") + + assert result == expected + + +def test_drop_keyspace(): + """ + Test salt.modules.cassandra_cql.drop_keyspace function + """ + expected = True + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + result = cassandra_cql.drop_keyspace("keyspace") + + assert result == expected + + +def test_list_users(): + """ + Test salt.modules.cassandra_cql.list_users function + """ + expected = [{"name": "user1", "super": True}, {"name": "user2", "super": False}] + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + result = cassandra_cql.list_users() + + assert result == expected + + +def test_create_user(): + """ + Test salt.modules.cassandra_cql.create_user function + """ + expected = True + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + result = cassandra_cql.create_user("user", "password") + + assert result == expected + + +def test_list_permissions(): + """ + Test salt.modules.cassandra_cql.list_permissions function + """ + expected = [ + { + "permission": "ALTER", + "resource": "", + "role": "user1", + "username": "user1", + } + ] + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + result = cassandra_cql.list_permissions(username="user1", resource="one") + + assert result == expected + + +def test_grant_permission(): + """ + Test salt.modules.cassandra_cql.grant_permission function + """ + expected = True + mock_cql_query = MagicMock(return_value=expected) + with patch.object(cassandra_cql, "cql_query", mock_cql_query): + result = cassandra_cql.grant_permission(username="user1", resource="one") + + assert result == expected + + +def test_returns_opts_if_specified(): + """ + If ssl options are present then check that they are parsed and returned + """ + options = MagicMock( + return_value={ + "cluster": ["192.168.50.10", "192.168.50.11", "192.168.50.12"], + "port": 9000, + "ssl_options": { + "ca_certs": "/etc/ssl/certs/ca-bundle.trust.crt", + "ssl_version": "PROTOCOL_TLSv1", + }, + "username": "cas_admin", + } + ) + + with patch.dict(cassandra_cql.__salt__, {"config.option": options}): + + assert cassandra_cql._get_ssl_opts() == { # pylint: disable=protected-access + "ca_certs": "/etc/ssl/certs/ca-bundle.trust.crt", + "ssl_version": ssl.PROTOCOL_TLSv1, + } # pylint: disable=no-member + + +def test_invalid_protocol_version(): + """ + Check that the protocol version is imported only if it isvalid + """ + options = MagicMock( + return_value={ + "cluster": ["192.168.50.10", "192.168.50.11", "192.168.50.12"], + "port": 9000, + "ssl_options": { + "ca_certs": "/etc/ssl/certs/ca-bundle.trust.crt", + "ssl_version": "Invalid", + }, + "username": "cas_admin", + } + ) + + with patch.dict(cassandra_cql.__salt__, {"config.option": options}): + with pytest.raises(CommandExecutionError): + cassandra_cql._get_ssl_opts() # pylint: disable=protected-access + + +def test_unspecified_opts(): + """ + Check that it returns None when ssl opts aren't specified + """ + with patch.dict(cassandra_cql.__salt__, {"config.option": MagicMock(return_value={})}): + assert cassandra_cql._get_ssl_opts() is None + + +def test_valid_asynchronous_args(): + mock_execute = MagicMock(return_value={}) + mock_execute_async = MagicMock(return_value={}) + mock_context = { + "cassandra_cql_returner_cluster": MagicMock(return_value={}), + "cassandra_cql_returner_session": MagicMock( + execute=mock_execute, + execute_async=mock_execute_async, + prepare=lambda _: MagicMock(bind=lambda _: None), # mock prepared_statement + row_factory=None, + ), + "cassandra_cql_prepared": {}, + } + + with patch.dict(cassandra_cql.__context__, mock_context): + cassandra_cql.cql_query_with_prepare( + "SELECT now() from system.local;", "select_now", [], asynchronous=True + ) + mock_execute_async.assert_called_once() + + +def test_valid_async_args(): + mock_execute = MagicMock(return_value={}) + mock_execute_async = MagicMock(return_value={}) + mock_context = { + "cassandra_cql_returner_cluster": MagicMock(return_value={}), + "cassandra_cql_returner_session": MagicMock( + execute=mock_execute, + execute_async=mock_execute_async, + prepare=lambda _: MagicMock(bind=lambda _: None), + # mock prepared_statement + row_factory=None, + ), + "cassandra_cql_prepared": {}, + } + + kwargs = {"async": True} # to avoid syntax error in python 3.7 + with patch.dict(cassandra_cql.__context__, mock_context): + cassandra_cql.cql_query_with_prepare( + "SELECT now() from system.local;", "select_now", [], **kwargs + ) + mock_execute_async.assert_called_once()