Skip to content
This repository has been archived by the owner on Mar 16, 2023. It is now read-only.

Allow desktop development using docker services #345

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,7 @@ target/
.vscode/

_ares.c
_ares.h
_ares.h

# pyenv
.python-version
Empty file added minemeld/cli/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions minemeld/cli/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from init import setup
from minemeld.run.launcher import main

if __name__ == '__main__':
setup()
main()
7 changes: 7 additions & 0 deletions minemeld/cli/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from gevent.monkey import patch_all

patch_all()


def setup():
pass
6 changes: 6 additions & 0 deletions minemeld/cli/traced.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from init import setup
from minemeld.traced.main import main

if __name__ == '__main__':
setup()
main()
6 changes: 6 additions & 0 deletions minemeld/cli/web.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from init import setup
from minemeld.flask.main import app

if __name__ == '__main__':
setup()
app.run(threaded=True, passthrough_errors=False)
9 changes: 7 additions & 2 deletions minemeld/collectd.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ def _open_socket(self):
if self.socket is not None:
return

_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
_socket.connect(self.path)
if self.path.startswith('tcp://'):
hostinfo = self.path[6:].split(':')
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_socket.connect((hostinfo[0], int(hostinfo[1])))
else:
_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
_socket.connect(self.path)

self.socket = _socket

Expand Down
103 changes: 40 additions & 63 deletions minemeld/comm/zmqredis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@

import logging
import uuid
import os
import time

import gevent
import gevent.event
import ujson as json
from errno import EAGAIN

import redis
import ujson as json
import zmq.green as zmq

from minemeld.utils import get_config_value

LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -138,7 +136,6 @@ class ZMQRpcFanoutClientChannel(object):
def __init__(self, fanout):
self.socket = None
self.reply_socket = None
self.context = None

self.fanout = fanout
self.active_rpcs = {}
Expand Down Expand Up @@ -233,13 +230,8 @@ def connect(self, context):
if self.socket is not None:
return

self.context = context

self.socket = context.socket(zmq.PUB)
self.socket.bind('ipc:///var/run/minemeld/{}'.format(self.fanout))

self.reply_socket = context.socket(zmq.REP)
self.reply_socket.bind('ipc:///var/run/minemeld/{}:reply'.format(self.fanout))
self.socket = context.zmq_bind(zmq.PUB, self.fanout)
self.reply_socket = context.zmq_bind(zmq.REP, '{}:reply'.format(self.fanout))

def disconnect(self):
if self.socket is None:
Expand Down Expand Up @@ -277,8 +269,7 @@ def _send_result(self, reply_to, id_, result=None, error=None):
}

if self.fanout is not None:
reply_socket = self.context.socket(zmq.REQ)
reply_socket.connect('ipc:///var/run/minemeld/{}'.format(reply_to))
reply_socket = self.context.zmq_connect(zmq.REQ, reply_to)
LOG.debug('RPC Server {} result to {}'.format(self.name, reply_to))
reply_socket.send_json(ans)
reply_socket.recv()
Expand Down Expand Up @@ -350,23 +341,12 @@ def connect(self, context):

if self.fanout is not None:
# we are subscribers
self.socket = self.context.socket(zmq.SUB)
self.socket.connect('ipc:///var/run/minemeld/{}'.format(self.fanout))
self.socket = context.zmq_connect(zmq.SUB, self.fanout)
self.socket.setsockopt(zmq.SUBSCRIBE, b'') # set the filter to empty to recv all messages

else:
# we are a router
self.socket = self.context.socket(zmq.ROUTER)

if self.name[0] == '@':
address = 'ipc://@/var/run/minemeld/{}:rpc'.format(
self.name[1:]
)
else:
address = 'ipc:///var/run/minemeld/{}:rpc'.format(
self.name
)
self.socket.bind(address)
self.socket = context.zmq_bind(zmq.ROUTER, '{}:rpc'.format(self.name))

def disconnect(self):
if self.socket is not None:
Expand All @@ -378,7 +358,6 @@ class ZMQPubChannel(object):
def __init__(self, topic):
self.socket = None
self.reply_socket = None
self.context = None
self.topic = topic

def publish(self, method, params=None):
Expand Down Expand Up @@ -410,10 +389,7 @@ def connect(self, context):
if self.socket is not None:
return

self.context = context

self.socket = context.socket(zmq.PUB)
self.socket.bind('ipc:///var/run/minemeld/{}'.format(self.topic))
self.socket = context.zmq_bind(zmq.PUB, self.topic)

def disconnect(self):
if self.socket is None:
Expand All @@ -436,7 +412,6 @@ def __init__(self, name, obj, allowed_methods=None,
self.method_prefix = method_prefix
self.topic = topic

self.context = None
self.socket = None

def run(self):
Expand Down Expand Up @@ -483,10 +458,7 @@ def connect(self, context):
if self.socket is not None:
return

self.context = context

self.socket = self.context.socket(zmq.SUB)
self.socket.connect('ipc:///var/run/minemeld/{}'.format(self.topic))
self.socket = context.zmq_connect(zmq.SUB, self.topic)
self.socket.setsockopt(zmq.SUBSCRIBE, b'') # set the filter to empty to recv all messages

def disconnect(self):
Expand Down Expand Up @@ -578,12 +550,11 @@ def __init__(self, config):

self.failure_listeners = []

self.redis_config = {
'url': os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock')
}
self.redis_cp = redis.ConnectionPool.from_url(
self.redis_config['url']
)
self.redis_config = {'url': get_config_value(config, 'redis_url', 'unix:///var/run/redis/redis.sock')}
self.redis_cp = redis.ConnectionPool.from_url(self.redis_config['url'])

self.zmq_config = {'url': get_config_value(config, 'zmq_url', 'ipc://{}/var/run/minemeld/{}')}
self.zmq_cache = {}

def add_failure_listener(self, listener):
self.failure_listeners.append(listener)
Expand Down Expand Up @@ -663,18 +634,7 @@ def send_rpc(self, dest, method, params,
'params': params
}

socket = self.context.socket(zmq.REQ)

if dest[0] == '@':
address = 'ipc://@/var/run/minemeld/{}:rpc'.format(
dest[1:]
)
else:
address = 'ipc:///var/run/minemeld/{}:rpc'.format(
dest
)

socket.connect(address)
socket = self.zmq_connect(zmq.REQ, '{}:rpc'.format(dest))
socket.setsockopt(zmq.LINGER, 0)
socket.send_json(body)
LOG.debug('RPC sent to {}:rpc for method {}'.format(dest, method))
Expand Down Expand Up @@ -754,26 +714,43 @@ def _ioloop_failure(self, g):
for l in self.failure_listeners:
l()

def zmq_address(self, dest):
r = self.zmq_cache.get(dest)
if r is None:
format_args = ('@', dest[1:]) if dest[0] == '@' else ('', dest)
r = self.zmq_cache[dest] = self.zmq_config['url'].format(*format_args)
return r

def zmq_connect(self, socket_type, dest):
socket = self.context.socket(socket_type)
socket.connect(self.zmq_address(dest))
return socket

def zmq_bind(self, socket_type, dest):
socket = self.context.socket(socket_type)
socket.bind(self.zmq_address(dest))
return socket

def start(self, start_dispatching=True):
self.context = zmq.Context()

for rfcc in self.rpc_fanout_clients_channels:
rfcc.connect(self.context)
rfcc.connect(self)

for rpcc in self.rpc_server_channels.values():
rpcc.connect(self.context)
rpcc.connect(self)

for sc in self.sub_channels:
sc.connect()

for mwsc in self.mw_sub_channels:
mwsc.connect(self.context)
mwsc.connect(self)

for pc in self.pub_channels:
pc.connect()

for mwpc in self.mw_pub_channels:
mwpc.connect(self.context)
mwpc.connect(self)

if start_dispatching:
self.start_dispatching()
Expand Down Expand Up @@ -848,9 +825,9 @@ def stop(self):

@staticmethod
def cleanup(config):
redis_cp = redis.ConnectionPool.from_url(
os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock')
)
redis_config = {'url': get_config_value(config, 'redis_url', 'unix:///var/run/redis/redis.sock')}
redis_cp = redis.ConnectionPool.from_url(redis_config['url'])

SR = redis.StrictRedis(connection_pool=redis_cp)
tkeys = SR.keys(pattern='mm:topic:*')
if len(tkeys) > 0:
Expand Down
17 changes: 10 additions & 7 deletions minemeld/flask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import logging

import yaml
from flask import Flask

import minemeld.loader
from minemeld.flask import config
from minemeld.utils import get_config_value, initialize_default_nodes_distribution
from .logger import LOG

REDIS_URL = os.environ.get('REDIS_URL', 'unix:///var/run/redis/redis.sock')


def create_app():
yaml.SafeLoader.add_constructor(
Expand All @@ -36,8 +35,11 @@ def create_app():

LOG.init_app(app)

# extension code
from . import config
config.init()
initialize_default_nodes_distribution(config)

# extension code
from . import aaa
from . import session
from . import mmrpc
Expand All @@ -47,10 +49,11 @@ def create_app():
from . import sns
from . import events

session.init_app(app, REDIS_URL)
redis_url = get_config_value(config, 'MGMTBUS.config.redis_url', 'unix:///var/run/redis/redis.sock')

session.init_app(app, redis_url)
aaa.init_app(app)

config.init()
if config.get('DEBUG', False):
logging.getLogger().setLevel(logging.DEBUG)
else:
Expand All @@ -61,7 +64,7 @@ def create_app():
supervisorclient.init_app(app)
jobs.init_app(app)
sns.init_app()
events.init_app(app, REDIS_URL)
events.init_app(app, redis_url)

# entrypoints
from . import metricsapi # noqa
Expand Down
2 changes: 2 additions & 0 deletions minemeld/flask/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ def init():
if not os.path.isdir(config_path):
config_path = os.path.dirname(config_path)

os.environ['MM_CONFIG_DIR'] = config_path

# init global vars
API_CONFIG_PATH = os.path.join(config_path, 'api')
API_CONFIG_LOCK = filelock.FileLock(
Expand Down
7 changes: 4 additions & 3 deletions minemeld/flask/configapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from flask import request, jsonify

from minemeld.flask import config
from .redisclient import SR
from .aaa import MMBlueprint
from .logger import LOG
Expand Down Expand Up @@ -248,11 +249,11 @@ def _commit_config(version):

fabric = _get_stanza('fabric')
if fabric is not None:
newconfig['fabric'] = json.loads(fabric)['properties']
newconfig['fabric'] = fabric['properties']

mgmtbus = _get_stanza('mgmtbus')
if mgmtbus is not None:
newconfig['mgmtbus'] = json.loads(mgmtbus)['properties']
newconfig['mgmtbus'] = mgmtbus['properties']

newconfig['nodes'] = {}
for n in range(config_info['next_node_id']):
Expand All @@ -274,7 +275,7 @@ def _commit_config(version):
# original config is not used because it could be modified
# during validation
temp_config = minemeld.run.config.MineMeldConfig.from_dict(copy.deepcopy(newconfig))
valid = minemeld.run.config.resolve_prototypes(temp_config)
valid = minemeld.run.config.resolve_prototypes(config, temp_config)
if not valid:
raise ValueError('Error resolving prototypes')
messages = minemeld.run.config.validate_config(temp_config)
Expand Down
6 changes: 3 additions & 3 deletions minemeld/flask/configdataapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def __init__(self, cpath, datafilename):
# API for working with side configs and dynamic data files
@BLUEPRINT.route('/<datafilename>', methods=['GET'], read_write=False)
def get_config_data(datafilename):
cpath = os.path.dirname(os.environ.get('MM_CONFIG'))
cpath = os.environ.get('MM_CONFIG_DIR')

datafiletype = request.values.get('t', 'yaml')

Expand All @@ -392,7 +392,7 @@ def get_config_data(datafilename):

@BLUEPRINT.route('/<datafilename>', methods=['PUT'], read_write=True)
def save_config_data(datafilename):
cpath = os.path.dirname(os.environ.get('MM_CONFIG'))
cpath = os.environ.get('MM_CONFIG_DIR')

datafiletype = request.values.get('t', 'yaml')

Expand All @@ -419,7 +419,7 @@ def save_config_data(datafilename):

@BLUEPRINT.route('/<datafilename>/append', methods=['POST'], read_write=True)
def append_config_data(datafilename):
cpath = os.path.dirname(os.environ.get('MM_CONFIG'))
cpath = os.environ.get('MM_CONFIG_DIR')

datafiletype = request.values.get('t', 'yaml')

Expand Down
Loading