diff --git a/.gitignore b/.gitignore index c5f1a61a..e102ca5f 100644 --- a/.gitignore +++ b/.gitignore @@ -62,4 +62,7 @@ target/ .vscode/ _ares.c -_ares.h \ No newline at end of file +_ares.h + +# pyenv +.python-version diff --git a/minemeld/cli/__init__.py b/minemeld/cli/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/minemeld/cli/engine.py b/minemeld/cli/engine.py new file mode 100644 index 00000000..f76fd15c --- /dev/null +++ b/minemeld/cli/engine.py @@ -0,0 +1,6 @@ +from init import setup +from minemeld.run.launcher import main + +if __name__ == '__main__': + setup() + main() diff --git a/minemeld/cli/init.py b/minemeld/cli/init.py new file mode 100644 index 00000000..fbe073a3 --- /dev/null +++ b/minemeld/cli/init.py @@ -0,0 +1,7 @@ +from gevent.monkey import patch_all + +patch_all() + + +def setup(): + pass diff --git a/minemeld/cli/traced.py b/minemeld/cli/traced.py new file mode 100644 index 00000000..e7200054 --- /dev/null +++ b/minemeld/cli/traced.py @@ -0,0 +1,6 @@ +from init import setup +from minemeld.traced.main import main + +if __name__ == '__main__': + setup() + main() diff --git a/minemeld/cli/web.py b/minemeld/cli/web.py new file mode 100644 index 00000000..1ed3524e --- /dev/null +++ b/minemeld/cli/web.py @@ -0,0 +1,6 @@ +from init import setup +from minemeld.flask.main import app + +if __name__ == '__main__': + setup() + app.run(threaded=True, passthrough_errors=False) diff --git a/minemeld/collectd.py b/minemeld/collectd.py index 20532f3f..9c7161f7 100644 --- a/minemeld/collectd.py +++ b/minemeld/collectd.py @@ -38,8 +38,13 @@ def _open_socket(self): if self.socket is not None: return - _socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - _socket.connect(self.path) + if self.path.startswith('tcp://'): + hostinfo = self.path[6:].split(':') + _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + _socket.connect((hostinfo[0], int(hostinfo[1]))) + else: + _socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + _socket.connect(self.path) self.socket = _socket diff --git a/minemeld/comm/zmqredis.py b/minemeld/comm/zmqredis.py index 25ecad90..3f8e3b85 100644 --- a/minemeld/comm/zmqredis.py +++ b/minemeld/comm/zmqredis.py @@ -23,17 +23,15 @@ import logging import uuid -import os -import time import gevent import gevent.event -import ujson as json -from errno import EAGAIN - import redis +import ujson as json import zmq.green as zmq +from minemeld.utils import get_config_value + LOG = logging.getLogger(__name__) @@ -138,7 +136,6 @@ class ZMQRpcFanoutClientChannel(object): def __init__(self, fanout): self.socket = None self.reply_socket = None - self.context = None self.fanout = fanout self.active_rpcs = {} @@ -233,13 +230,8 @@ def connect(self, context): if self.socket is not None: return - self.context = context - - self.socket = context.socket(zmq.PUB) - self.socket.bind('ipc:///var/run/minemeld/{}'.format(self.fanout)) - - self.reply_socket = context.socket(zmq.REP) - self.reply_socket.bind('ipc:///var/run/minemeld/{}:reply'.format(self.fanout)) + self.socket = context.zmq_bind(zmq.PUB, self.fanout) + self.reply_socket = context.zmq_bind(zmq.REP, '{}:reply'.format(self.fanout)) def disconnect(self): if self.socket is None: @@ -277,8 +269,7 @@ def _send_result(self, reply_to, id_, result=None, error=None): } if self.fanout is not None: - reply_socket = self.context.socket(zmq.REQ) - reply_socket.connect('ipc:///var/run/minemeld/{}'.format(reply_to)) + reply_socket = self.context.zmq_connect(zmq.REQ, reply_to) LOG.debug('RPC Server {} result to {}'.format(self.name, reply_to)) reply_socket.send_json(ans) reply_socket.recv() @@ -350,23 +341,12 @@ def connect(self, context): if self.fanout is not None: # we are subscribers - self.socket = self.context.socket(zmq.SUB) - self.socket.connect('ipc:///var/run/minemeld/{}'.format(self.fanout)) + self.socket = context.zmq_connect(zmq.SUB, self.fanout) self.socket.setsockopt(zmq.SUBSCRIBE, b'') # set the filter to empty to recv all messages else: # we are a router - self.socket = self.context.socket(zmq.ROUTER) - - if self.name[0] == '@': - address = 'ipc://@/var/run/minemeld/{}:rpc'.format( - self.name[1:] - ) - else: - address = 'ipc:///var/run/minemeld/{}:rpc'.format( - self.name - ) - self.socket.bind(address) + self.socket = context.zmq_bind(zmq.ROUTER, '{}:rpc'.format(self.name)) def disconnect(self): if self.socket is not None: @@ -378,7 +358,6 @@ class ZMQPubChannel(object): def __init__(self, topic): self.socket = None self.reply_socket = None - self.context = None self.topic = topic def publish(self, method, params=None): @@ -410,10 +389,7 @@ def connect(self, context): if self.socket is not None: return - self.context = context - - self.socket = context.socket(zmq.PUB) - self.socket.bind('ipc:///var/run/minemeld/{}'.format(self.topic)) + self.socket = context.zmq_bind(zmq.PUB, self.topic) def disconnect(self): if self.socket is None: @@ -436,7 +412,6 @@ def __init__(self, name, obj, allowed_methods=None, self.method_prefix = method_prefix self.topic = topic - self.context = None self.socket = None def run(self): @@ -483,10 +458,7 @@ def connect(self, context): if self.socket is not None: return - self.context = context - - self.socket = self.context.socket(zmq.SUB) - self.socket.connect('ipc:///var/run/minemeld/{}'.format(self.topic)) + self.socket = context.zmq_connect(zmq.SUB, self.topic) self.socket.setsockopt(zmq.SUBSCRIBE, b'') # set the filter to empty to recv all messages def disconnect(self): @@ -578,12 +550,11 @@ def __init__(self, config): self.failure_listeners = [] - self.redis_config = { - 'url': os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - } - self.redis_cp = redis.ConnectionPool.from_url( - self.redis_config['url'] - ) + self.redis_config = {'url': get_config_value(config, 'redis_url', 'unix:///var/run/redis/redis.sock')} + self.redis_cp = redis.ConnectionPool.from_url(self.redis_config['url']) + + self.zmq_config = {'url': get_config_value(config, 'zmq_url', 'ipc://{}/var/run/minemeld/{}')} + self.zmq_cache = {} def add_failure_listener(self, listener): self.failure_listeners.append(listener) @@ -663,18 +634,7 @@ def send_rpc(self, dest, method, params, 'params': params } - socket = self.context.socket(zmq.REQ) - - if dest[0] == '@': - address = 'ipc://@/var/run/minemeld/{}:rpc'.format( - dest[1:] - ) - else: - address = 'ipc:///var/run/minemeld/{}:rpc'.format( - dest - ) - - socket.connect(address) + socket = self.zmq_connect(zmq.REQ, '{}:rpc'.format(dest)) socket.setsockopt(zmq.LINGER, 0) socket.send_json(body) LOG.debug('RPC sent to {}:rpc for method {}'.format(dest, method)) @@ -754,26 +714,43 @@ def _ioloop_failure(self, g): for l in self.failure_listeners: l() + def zmq_address(self, dest): + r = self.zmq_cache.get(dest) + if r is None: + format_args = ('@', dest[1:]) if dest[0] == '@' else ('', dest) + r = self.zmq_cache[dest] = self.zmq_config['url'].format(*format_args) + return r + + def zmq_connect(self, socket_type, dest): + socket = self.context.socket(socket_type) + socket.connect(self.zmq_address(dest)) + return socket + + def zmq_bind(self, socket_type, dest): + socket = self.context.socket(socket_type) + socket.bind(self.zmq_address(dest)) + return socket + def start(self, start_dispatching=True): self.context = zmq.Context() for rfcc in self.rpc_fanout_clients_channels: - rfcc.connect(self.context) + rfcc.connect(self) for rpcc in self.rpc_server_channels.values(): - rpcc.connect(self.context) + rpcc.connect(self) for sc in self.sub_channels: sc.connect() for mwsc in self.mw_sub_channels: - mwsc.connect(self.context) + mwsc.connect(self) for pc in self.pub_channels: pc.connect() for mwpc in self.mw_pub_channels: - mwpc.connect(self.context) + mwpc.connect(self) if start_dispatching: self.start_dispatching() @@ -848,9 +825,9 @@ def stop(self): @staticmethod def cleanup(config): - redis_cp = redis.ConnectionPool.from_url( - os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - ) + redis_config = {'url': get_config_value(config, 'redis_url', 'unix:///var/run/redis/redis.sock')} + redis_cp = redis.ConnectionPool.from_url(redis_config['url']) + SR = redis.StrictRedis(connection_pool=redis_cp) tkeys = SR.keys(pattern='mm:topic:*') if len(tkeys) > 0: diff --git a/minemeld/flask/__init__.py b/minemeld/flask/__init__.py index 637d90d5..e0a2f3c5 100644 --- a/minemeld/flask/__init__.py +++ b/minemeld/flask/__init__.py @@ -12,17 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import logging import yaml from flask import Flask import minemeld.loader +from minemeld.flask import config +from minemeld.utils import get_config_value, initialize_default_nodes_distribution from .logger import LOG -REDIS_URL = os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - def create_app(): yaml.SafeLoader.add_constructor( @@ -36,8 +35,11 @@ def create_app(): LOG.init_app(app) - # extension code from . import config + config.init() + initialize_default_nodes_distribution(config) + + # extension code from . import aaa from . import session from . import mmrpc @@ -47,10 +49,11 @@ def create_app(): from . import sns from . import events - session.init_app(app, REDIS_URL) + redis_url = get_config_value(config, 'MGMTBUS.config.redis_url', 'unix:///var/run/redis/redis.sock') + + session.init_app(app, redis_url) aaa.init_app(app) - config.init() if config.get('DEBUG', False): logging.getLogger().setLevel(logging.DEBUG) else: @@ -61,7 +64,7 @@ def create_app(): supervisorclient.init_app(app) jobs.init_app(app) sns.init_app() - events.init_app(app, REDIS_URL) + events.init_app(app, redis_url) # entrypoints from . import metricsapi # noqa diff --git a/minemeld/flask/config.py b/minemeld/flask/config.py index 83468c5f..5a788259 100644 --- a/minemeld/flask/config.py +++ b/minemeld/flask/config.py @@ -201,6 +201,8 @@ def init(): if not os.path.isdir(config_path): config_path = os.path.dirname(config_path) + os.environ['MM_CONFIG_DIR'] = config_path + # init global vars API_CONFIG_PATH = os.path.join(config_path, 'api') API_CONFIG_LOCK = filelock.FileLock( diff --git a/minemeld/flask/configapi.py b/minemeld/flask/configapi.py index 0d127496..4adee4ad 100644 --- a/minemeld/flask/configapi.py +++ b/minemeld/flask/configapi.py @@ -22,6 +22,7 @@ from flask import request, jsonify +from minemeld.flask import config from .redisclient import SR from .aaa import MMBlueprint from .logger import LOG @@ -248,11 +249,11 @@ def _commit_config(version): fabric = _get_stanza('fabric') if fabric is not None: - newconfig['fabric'] = json.loads(fabric)['properties'] + newconfig['fabric'] = fabric['properties'] mgmtbus = _get_stanza('mgmtbus') if mgmtbus is not None: - newconfig['mgmtbus'] = json.loads(mgmtbus)['properties'] + newconfig['mgmtbus'] = mgmtbus['properties'] newconfig['nodes'] = {} for n in range(config_info['next_node_id']): @@ -274,7 +275,7 @@ def _commit_config(version): # original config is not used because it could be modified # during validation temp_config = minemeld.run.config.MineMeldConfig.from_dict(copy.deepcopy(newconfig)) - valid = minemeld.run.config.resolve_prototypes(temp_config) + valid = minemeld.run.config.resolve_prototypes(config, temp_config) if not valid: raise ValueError('Error resolving prototypes') messages = minemeld.run.config.validate_config(temp_config) diff --git a/minemeld/flask/configdataapi.py b/minemeld/flask/configdataapi.py index f57c1174..6bfc94ee 100644 --- a/minemeld/flask/configdataapi.py +++ b/minemeld/flask/configdataapi.py @@ -374,7 +374,7 @@ def __init__(self, cpath, datafilename): # API for working with side configs and dynamic data files @BLUEPRINT.route('/', methods=['GET'], read_write=False) def get_config_data(datafilename): - cpath = os.path.dirname(os.environ.get('MM_CONFIG')) + cpath = os.environ.get('MM_CONFIG_DIR') datafiletype = request.values.get('t', 'yaml') @@ -392,7 +392,7 @@ def get_config_data(datafilename): @BLUEPRINT.route('/', methods=['PUT'], read_write=True) def save_config_data(datafilename): - cpath = os.path.dirname(os.environ.get('MM_CONFIG')) + cpath = os.environ.get('MM_CONFIG_DIR') datafiletype = request.values.get('t', 'yaml') @@ -419,7 +419,7 @@ def save_config_data(datafilename): @BLUEPRINT.route('//append', methods=['POST'], read_write=True) def append_config_data(datafilename): - cpath = os.path.dirname(os.environ.get('MM_CONFIG')) + cpath = os.environ.get('MM_CONFIG_DIR') datafiletype = request.values.get('t', 'yaml') diff --git a/minemeld/flask/jobs.py b/minemeld/flask/jobs.py index 8955abb2..271112d6 100644 --- a/minemeld/flask/jobs.py +++ b/minemeld/flask/jobs.py @@ -29,7 +29,7 @@ from gevent.subprocess import Popen from flask import g -from . import REDIS_URL +from minemeld.utils import get_config_value from . import config from .logger import LOG @@ -38,8 +38,8 @@ REDIS_CP = redis.ConnectionPool.from_url( - REDIS_URL, - max_connections=int(os.environ.get('REDIS_MAX_CONNECTIONS', 5)) + get_config_value(config, 'MGMTBUS.config.redis_url', 'unix:///var/run/redis/redis.sock'), + max_connections=int(get_config_value(config, 'redis_max_connections', '5')) ) REDIS_JOBS_GROUP_PREFIX = 'mm-jobs-{}' @@ -95,7 +95,7 @@ def _collect_job(self, jobdata): def _job_monitor_glet(self, job_group, jobid, description, args, data): jobname = (REDIS_JOBS_GROUP_PREFIX+'-{}').format(job_group, jobid) joblogfile = os.path.join( - config.get('MINEMELD_LOG_DIRECTORY_PATH', '/tmp'), + os.path.abspath(config.get('MINEMELD_LOG_DIRECTORY_PATH', '/tmp')), '{}.log'.format(jobname) ) jobtempdir = tempfile.mkdtemp(prefix=jobname) diff --git a/minemeld/flask/logsapi.py b/minemeld/flask/logsapi.py index 82db9b7e..81fa722d 100644 --- a/minemeld/flask/logsapi.py +++ b/minemeld/flask/logsapi.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os from flask import send_from_directory, jsonify @@ -31,7 +32,7 @@ def get_minemeld_engine_log(): if log_directory is None: return jsonify(error={'message': 'LOG_DIRECTORY not set'}), 500 - return send_from_directory(log_directory, 'minemeld-engine.log', as_attachment=True) + return send_from_directory(os.path.abspath(log_directory), 'minemeld-engine.log', as_attachment=True) @BLUEPRINT.route('/minemeld-web.log', methods=['GET'], read_write=True) @@ -40,4 +41,4 @@ def get_minemeld_web_log(): if log_directory is None: return jsonify(error={'message': 'LOG_DIRECTORY not set'}), 500 - return send_from_directory(log_directory, 'minemeld-web.log', as_attachment=True) + return send_from_directory(os.path.abspath(log_directory), 'minemeld-web.log', as_attachment=True) diff --git a/minemeld/flask/redisclient.py b/minemeld/flask/redisclient.py index 36373185..825cf6f6 100644 --- a/minemeld/flask/redisclient.py +++ b/minemeld/flask/redisclient.py @@ -1,20 +1,16 @@ -import os - import redis import werkzeug.local - from flask import g -from . import REDIS_URL +from minemeld.flask import config +from minemeld.utils import get_config_value from .logger import LOG - __all__ = ['init_app', 'SR'] - REDIS_CP = redis.ConnectionPool.from_url( - REDIS_URL, - max_connections=int(os.environ.get('REDIS_MAX_CONNECTIONS', 200)) + get_config_value(config, 'MGMTBUS.config.redis_url', 'unix:///var/run/redis/redis.sock'), + max_connections=int(get_config_value(config, 'redis_max_connections', '5')) ) diff --git a/minemeld/flask/statusapi.py b/minemeld/flask/statusapi.py index 95fec066..558a63a4 100644 --- a/minemeld/flask/statusapi.py +++ b/minemeld/flask/statusapi.py @@ -159,10 +159,8 @@ def get_minemeld_status(): @BLUEPRINT.route('/config', methods=['GET'], read_write=False) def get_minemeld_running_config(): - rcpath = os.path.join( - os.path.dirname(os.environ.get('MM_CONFIG')), - 'running-config.yml' - ) + rcpath = os.path.join(os.environ.get('MM_CONFIG_DIR'), 'running-config.yml') + with open(rcpath, 'r') as f: rcconfig = yaml.safe_load(f) @@ -262,7 +260,7 @@ def generate_local_backup(): certs_path = config.get('MINEMELD_LOCAL_CERTS_PATH', None) if certs_path is not None: args.append(certs_path) - config_path = os.path.dirname(os.environ.get('MM_CONFIG')) + config_path = os.environ.get('MM_CONFIG_DIR') args.append(config_path) jobs = JOBS_MANAGER.get_jobs(job_group='status-backup') diff --git a/minemeld/flask/supervisorapi.py b/minemeld/flask/supervisorapi.py index 860aafb4..ded065db 100644 --- a/minemeld/flask/supervisorapi.py +++ b/minemeld/flask/supervisorapi.py @@ -23,6 +23,7 @@ from flask import jsonify +from minemeld.utils import get_config_value from . import config from .supervisorclient import MMSupervisor from .aaa import MMBlueprint @@ -81,8 +82,18 @@ def service_status(): supervisorstate = MMSupervisor.supervisor.getState() except: - LOG.exception("Exception connecting to supervisor") - return jsonify(result={'statename': 'STOPPED'}) + if get_config_value(config, 'exceptions.supervisor', 'allow') != 'suppress': + LOG.exception("Exception connecting to supervisor") + import datetime + begin = datetime.datetime.now() - datetime.datetime.utcfromtimestamp(0) - datetime.timedelta(days=1) + start = begin.total_seconds() + return jsonify(result={ + 'statename': 'STOPPED', + 'processes': { + 'minemeld-engine': {'statename': 'RUNNING', 'start': start, 'children': 1}, + 'minemeld-web': {'statename': 'RUNNING', 'start': start, 'children': 1} + } + }) supervisorstate['processes'] = {} pinfo = MMSupervisor.supervisor.getAllProcessInfo() diff --git a/minemeld/flask/utils.py b/minemeld/flask/utils.py index b0998b5c..c700d9b7 100644 --- a/minemeld/flask/utils.py +++ b/minemeld/flask/utils.py @@ -47,19 +47,13 @@ def __ne__(self, other): def running_config_path(): - rcpath = os.path.join( - os.path.dirname(os.environ.get('MM_CONFIG')), - 'running-config.yml' - ) + rcpath = os.path.join(os.environ.get('MM_CONFIG_DIR'), 'running-config.yml') return rcpath def committed_config_path(): - ccpath = os.path.join( - os.path.dirname(os.environ.get('MM_CONFIG')), - 'committed-config.yml' - ) + ccpath = os.path.join(os.environ.get('MM_CONFIG_DIR'), 'committed-config.yml') return ccpath diff --git a/minemeld/ft/redis.py b/minemeld/ft/redis.py index 511cb9d2..ce32a8fc 100644 --- a/minemeld/ft/redis.py +++ b/minemeld/ft/redis.py @@ -19,6 +19,7 @@ import os import ujson as json +from minemeld.utils import get_config_value from . import base from . import actorbase @@ -38,9 +39,7 @@ def __init__(self, name, chassis, config): def configure(self): super(RedisSet, self).configure() - self.redis_url = self.config.get('redis_url', - os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - ) + self.redis_url = get_config_value(self.config, 'redis_url', 'unix:///var/run/redis/redis.sock') self.scoring_attribute = self.config.get( 'scoring_attribute', 'last_seen' @@ -202,9 +201,7 @@ def gc(name, config=None): redis_skey = name redis_skey_value = '{}.value'.format(name) redis_skey_chkp = '{}.chkp'.format(name) - redis_url = config.get('redis_url', - os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - ) + redis_url = get_config_value(config, 'redis_url', 'unix:///var/run/redis/redis.sock') cp = None try: diff --git a/minemeld/ft/taxii.py b/minemeld/ft/taxii.py index d1c9723b..b7e417e0 100644 --- a/minemeld/ft/taxii.py +++ b/minemeld/ft/taxii.py @@ -58,6 +58,7 @@ import mixbox.idgen import mixbox.namespaces +from minemeld.utils import get_config_value from . import basepoller from . import base from . import actorbase @@ -1384,9 +1385,7 @@ def __init__(self, name, chassis, config): def configure(self): super(DataFeed, self).configure() - self.redis_url = self.config.get('redis_url', - os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - ) + self.redis_url = get_config_value(self.config, 'redis_url', 'unix:///var/run/redis/redis.sock') self.namespace = self.config.get('namespace', 'minemeld') self.namespaceuri = self.config.get( @@ -1690,9 +1689,7 @@ def gc(name, config=None): redis_skey = name redis_skey_value = '{}.value'.format(name) redis_skey_chkp = '{}.chkp'.format(name) - redis_url = config.get('redis_url', - os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - ) + redis_url = get_config_value(config, 'redis_url', 'unix:///var/run/redis/redis.sock') cp = None try: diff --git a/minemeld/mgmtbus.py b/minemeld/mgmtbus.py index 641a4207..6ce99204 100644 --- a/minemeld/mgmtbus.py +++ b/minemeld/mgmtbus.py @@ -46,6 +46,7 @@ import minemeld.comm import minemeld.ft +from minemeld.utils import get_config_value from .collectd import CollectdClient from .startupplanner import plan @@ -89,7 +90,7 @@ def __init__(self, ftlist, config, comm_class, comm_config, num_chassis): self._status = {} self.SR = redis.StrictRedis.from_url( - os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') + get_config_value(comm_config, 'redis_url', 'unix:///var/run/redis/redis.sock') ) self.comm = minemeld.comm.factory(self.comm_class, self.comm_config) diff --git a/minemeld/run/config.py b/minemeld/run/config.py index 4685f266..1f073c25 100644 --- a/minemeld/run/config.py +++ b/minemeld/run/config.py @@ -31,7 +31,6 @@ import minemeld.loader - __all__ = ['load_config', 'validate_config', 'resolve_prototypes'] @@ -261,7 +260,7 @@ def _load_config_from_file(f): return valid, MineMeldConfig.from_dict(config) -def _load_and_validate_config_from_file(path): +def _load_and_validate_config_from_file(sys_config, path): valid = False config = None @@ -278,7 +277,7 @@ def _load_and_validate_config_from_file(path): valid, config = False, None if valid and config is not None: - valid = resolve_prototypes(config) + valid = resolve_prototypes(sys_config, config) if valid and config is not None: vresults = validate_config(config) @@ -369,7 +368,7 @@ def _destroy_old_nodes(config): dpool = None -def _load_config_from_dir(path): +def _load_config_from_dir(sys_config, path): ccpath = os.path.join( path, COMMITTED_CONFIG @@ -379,8 +378,8 @@ def _load_config_from_dir(path): RUNNING_CONFIG ) - ccvalid, cconfig = _load_and_validate_config_from_file(ccpath) - rcvalid, rcconfig = _load_and_validate_config_from_file(rcpath) + ccvalid, cconfig = _load_and_validate_config_from_file(sys_config, ccpath) + rcvalid, rcconfig = _load_and_validate_config_from_file(sys_config, rcpath) if not rcvalid and not ccvalid: # both running and canidate are not valid @@ -463,10 +462,10 @@ def _detect_cycles(nodes): return nedges == 0 -def resolve_prototypes(config): +def resolve_prototypes(sys_config, config): # retrieve prototype dir from environment # used for main library and local library - paths = os.getenv(PROTOTYPE_ENV, None) + paths = sys_config.get(PROTOTYPE_ENV, os.environ.get(PROTOTYPE_ENV, None)) if paths is None: raise RuntimeError('Unable to load prototypes: %s ' 'environment variable not set' % @@ -562,13 +561,13 @@ def validate_config(config): return result -def load_config(config_path): +def load_config(sys_config, config_path): if os.path.isdir(config_path): - return _load_config_from_dir(config_path) + return _load_config_from_dir(sys_config, config_path) # this is just a file, as we can't do a delta # we just load it and mark all the nodes as added - valid, config = _load_and_validate_config_from_file(config_path) + valid, config = _load_and_validate_config_from_file(sys_config, config_path) if not valid: raise RuntimeError('Invalid config') config.compute_changes(None) diff --git a/minemeld/run/launcher.py b/minemeld/run/launcher.py index bb5bbf34..f982e399 100644 --- a/minemeld/run/launcher.py +++ b/minemeld/run/launcher.py @@ -33,6 +33,7 @@ import minemeld.comm import minemeld.run.config +from minemeld.utils import initialize_default_nodes_distribution from minemeld import __version__ LOG = logging.getLogger(__name__) @@ -143,9 +144,11 @@ def _setup_environment(config): cdir = os.path.dirname(config) os.environ['MM_CONFIG_DIR'] = cdir - if not 'REQUESTS_CA_BUNDLE' in os.environ and 'MM_CA_BUNDLE' in os.environ: + if 'REQUESTS_CA_BUNDLE' not in os.environ and 'MM_CA_BUNDLE' in os.environ: os.environ['REQUESTS_CA_BUNDLE'] = os.environ['MM_CA_BUNDLE'] + initialize_default_nodes_distribution({}) + def main(): mbusmaster = None @@ -214,7 +217,7 @@ def _disk_space_monitor(num_nodes): _setup_environment(args.config) # load and validate config - config = minemeld.run.config.load_config(args.config) + config = minemeld.run.config.load_config({}, args.config) LOG.info("mm-run.py config: %s", config) diff --git a/minemeld/supervisord/listener.py b/minemeld/supervisord/listener.py index 51f59c82..95c8145e 100644 --- a/minemeld/supervisord/listener.py +++ b/minemeld/supervisord/listener.py @@ -7,6 +7,8 @@ import ujson from supervisor import childutils +from minemeld.utils import get_config_value + LOG = logging.getLogger(__name__) @@ -45,9 +47,7 @@ def main(): engine_process_name = os.environ.get('MM_ENGINE_PROCESSNAME', 'minemeld-engine') - SR = redis.StrictRedis.from_url( - os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - ) + SR = redis.StrictRedis.from_url(get_config_value({}, 'redis_url', 'unix:///var/run/redis/redis.sock')) while True: hdrs, payload = childutils.listener.wait(sys.stdin, sys.stdout) diff --git a/minemeld/traced/queryprocessor.py b/minemeld/traced/queryprocessor.py index 821b881c..ded49624 100644 --- a/minemeld/traced/queryprocessor.py +++ b/minemeld/traced/queryprocessor.py @@ -29,6 +29,8 @@ import gevent.event import redis +from minemeld.utils import get_config_value + LOG = logging.getLogger(__name__) QUERY_QUEUE = 'mmtraced:query' @@ -50,9 +52,7 @@ def __init__(self, store, query, timestamp, counter, self.starting_counter = counter self.num_lines = num_lines - self.redis_url = redis_config.get('redis_url', - os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock') - ) + self.redis_url = get_config_value(redis_config, 'redis_url', 'unix:///var/run/redis/redis.sock') super(Query, self).__init__() diff --git a/minemeld/utils.py b/minemeld/utils.py new file mode 100644 index 00000000..36bd2345 --- /dev/null +++ b/minemeld/utils.py @@ -0,0 +1,48 @@ +import json +import os +import sys + +import pkg_resources + + +def get_config_value(config, key, default_value): + if '.' in key: + key_path = key.split('.') + for i in key_path[:-1]: + config = config.get(i, {}) + key = key_path[-1] + env_key = '_'.join([i.upper() for i in key_path]) + else: + env_key = key + return config.get(key.lower(), os.environ.get(env_key.upper(), default_value)) + + +def initialize_default_nodes_distribution(sys_config): + try: + ws = getattr(sys.modules['minemeld.loader'], '_WS') + if ws is None: + ws = pkg_resources.WorkingSet() + if len([ep for ep in ws.iter_entry_points('minemeld_nodes')]) == 0: + path = sys_config.get('MINEMELD_NODES_PATH', os.environ.get('MINEMELD_NODES_PATH', None)) + with open(path, 'r') as f: + node_definitions = json.load(f) + node_endpoints = [ + '{} = {}'.format(xk, xv['class']) + for xk, xv in node_definitions.items() + if 'class' in xv + ] + project_name = 'minemeld_synthetic_core' + project_version = '1.0' + python_version = '2.7' + dist = pkg_resources.Distribution( + location='/tmp/{}-{}-py{}.egg'.format(project_name, project_version, python_version), + project_name=project_name, + version=project_version, + py_version=python_version + ) + node_map = {'minemeld_nodes': node_endpoints} + dist._ep_map = pkg_resources.EntryPoint.parse_map(node_map, dist) + ws.add(dist) + setattr(sys.modules['minemeld.loader'], '_WS', ws) + except (KeyError, OSError): + pass diff --git a/tests/test_run_config.py b/tests/test_run_config.py index 8f9b27a9..8051ebda 100644 --- a/tests/test_run_config.py +++ b/tests/test_run_config.py @@ -34,7 +34,7 @@ class MineMeldRunConfigTests(unittest.TestCase): def test_defaults_from_file(self): emptypath = os.path.join(MYDIR, 'empty.yml') - config = minemeld.run.config.load_config(emptypath) + config = minemeld.run.config.load_config({}, emptypath) self.assertEqual(config.fabric['class'], 'AMQP') self.assertEqual(config.fabric['config'], {'num_connections': 5}) @@ -46,7 +46,7 @@ def test_defaults_from_file(self): def test_prototype_1(self): protopath = os.path.join(MYDIR, 'test-prototype-1.yml') - config = minemeld.run.config.load_config(protopath) + config = minemeld.run.config.load_config({}, protopath) self.assertEqual(config.nodes['testprototype']['class'], 'minemeld.ft.http.HttpFT') self.assertEqual(