Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sync that queue #855

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/src/foundation/python-packages/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,5 @@ cryptography = { appliances = ["server"] }
jinja2 = { appliances = ["server"], bootstrap = true }
markupsafe = { appliances = ["server"] }
pycparser = { appliances = ["server"] }
pyyaml = { appliances = ["server"], bootstrap = true }
pyyaml = { appliances = ["server", "client"], bootstrap = true }
six = { appliances = ["server"] }
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import stack.commands
from pathlib import Path
from stack.util import _exec

class Plugin(stack.commands.Plugin):
"""
Sync smq settings to hosts and restart all smq related services
"""

def provides(self):
return 'smq'

def requires(self):
return []

def run(self, args):
hosts = args['hosts']
settings_file = Path('/opt/stack/etc/stacki.yml')
services = ['smq-processor', 'smq-producer', 'smq-publisher', 'smq-shipper']

for host in hosts:
self.owner.notify('Sync SMQ Settings')
if settings_file.is_file():
copy_settings = _exec(f'scp {settings_file} {host}:{settings_file}', shlexsplit=True)
if copy_settings.returncode != 0:
self.owner.notify(f'Failed to copy settings file to {host}:{copy_settings.stderr}')
for service in services:
restart_smq = _exec(f'ssh -t {host} "systemctl restart {service}" ', shlexsplit=True)
if restart_smq.returncode != 0:
self.owner.notify(f'Failed to restart service {service} on host {host}')
69 changes: 51 additions & 18 deletions common/src/stack/mq/pylib/mq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import socket
import sys
import os
import yaml
from pathlib import Path

# We need the "ports" class when we are inside the installer, but don't need to
# setup and zmq based services. The alternative is to add zmq and dependencies
Expand All @@ -23,26 +25,57 @@



class ports:
class PortsMeta(type):
"""
Socket port numbers used by the Stack Message Queue daemons.

:var publish: UDP socket service for publishing a message
:var subscribe: zmq.SUB socket for subscribing to a channel
:var control: TCP socket service for enabling/disabling channel propagation
:publish: UDP socket service for publishing a message
:subscribe: zmq.SUB socket for subscribing to a channel
:control: TCP socket service for enabling/disabling channel propagation
"""
publish = 5000
subscribe = 5001
control = 5002

# Stacki settings file location
# and default ports
SETTINGS = Path('/opt/stack/etc/stacki.yml')
pub_port = 5000
sub_port = 5001
con_port = 5002

def load_settings(cls):
config = {}
if cls.SETTINGS.is_file():
with cls.SETTINGS.open() as f:
config = yaml.safe_load(f)
return config

@property
def publish(cls):
config = cls.load_settings()
port = int(config.get('smq.pub.port', cls.pub_port))
return port

@property
def subscribe(cls):
config = cls.load_settings()
port = int(config.get('smq.sub.port', cls.sub_port))
return port

@property
def control(cls):
config = cls.load_settings()
port = int(config.get('smq.control.port', cls.con_port))
return port

class ports(metaclass=PortsMeta):
pass

class Message():
"""
Stack Message Queue Message

A Message is composed of header fields and the *message* text body. For many
applications only body is manipulated and other fields are controlled by
lower software levels.
lower software levels.

For simple Messages the *message* body can be a string. For more complex Messages
the body should be a json encoded python dictionary.
Expand Down Expand Up @@ -87,7 +120,7 @@ def __init__(self, payload=None, *, message=None, channel=None, hops=None, ttl=N

# JSON does not override parameters, this allows loading an
# existing Message and overwriting some of the fields

self.channel = channel if channel else msg.get('channel')
self.id = id if id else msg.get('id')
self.payload = payload if payload else msg.get('payload')
Expand Down Expand Up @@ -160,7 +193,7 @@ def getPayload(self):
def setPayload(self, payload):
"""
Sets the payload text

:param payload: text
:type payload: string
"""
Expand All @@ -169,7 +202,7 @@ def setPayload(self, payload):

def getHops(self):
"""
:returns: number of software hops
:returns: number of software hops
"""
return self.hops

Expand All @@ -189,7 +222,7 @@ def setSource(self, addr):
"""
Set the source host address. This address can be a hostname
or an IP Address.

:param addr: source address
:type addr: string
"""
Expand Down Expand Up @@ -234,9 +267,9 @@ def setID(self, id):
def addHop(self):
"""
Increments the hop count for the :class:`Message`. A hop is
defined as a software hop not a physical network hop.
defined as a software hop not a physical network hop.
Every time an application receives and retransmits a message the
hop should be incremented.
hop should be incremented.
This value is used to debugging.
"""
self.hops += 1
Expand Down Expand Up @@ -289,12 +322,12 @@ def unsubscribe(self, channel):
:type channel: string
"""
self.sub.setsockopt_string(zmq.UNSUBSCRIBE, channel)

def run(self):
while True:
try:
channel, payload = self.sub.recv_multipart()
msg = Message(message=payload.decode(),
msg = Message(message=payload.decode(),
channel=channel.decode())
except:
continue
Expand Down Expand Up @@ -341,10 +374,10 @@ def run(self):
#
# Note the callback() always pushes data as
# stack.mq.Message objects. This is the only
# part of the code where we handle receiving
# part of the code where we handle receiving
# unstructured data.
#
# Design point here was to keep the clients
# Design point here was to keep the clients
# simple so we don't need an API to write to
# the message queue.

Expand Down
5 changes: 5 additions & 0 deletions common/src/stack/pylib/stack/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
# these settings control the starting points for discover-nodes (nee insert ethers)
discovery.base.rack: '0'
discovery.base.rank: '0'

# smq ports
smq.pub.port: '5000'
smq.sub.port: '5001'
smq.control.port: '5002'
'''

def get_settings():
Expand Down
14 changes: 12 additions & 2 deletions redhat/nodes/backend.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ for o in stack.api.Call('list cart'):
</stack:eval>
</stack:native>

<!-- give me some status -->
<!-- give me some status -->

<stack:script stack:stage="install-pre">
/opt/stack/bin/stacki-status.py Running preinstall
Expand All @@ -50,11 +50,21 @@ include ld.so.conf.d/*.conf

<stack:script stack:stage="install-post" stack:chroot="false">
/opt/stack/bin/stacki-status.py install complete rebooting
<stack:file stack:name="/opt/stack/etc/stacki.yml">
<stack:eval>cat /opt/stack/etc/stacki.yml;</stack:eval>
</stack:file>
</stack:script>

<!-- create the stacki.yml -->
<stack:script stack:shell="/opt/stack/bin/python3" stack:stage="install-post">
import stack.settings

stack.settings.write_default_settings_file(overwrite=True)
</stack:script>

<stack:script stack:cond="release == 'redhat7'" stack:stage="install-post">
cp /run/install/tmp/stack.conf /tmp/stack.conf
</stack:script>

</stack:stack>
</stack:stack>

12 changes: 10 additions & 2 deletions sles/nodes/backend.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<stack:stack>

<stack:description>
Handle for Backend Appliance
</stack:description>
Expand Down Expand Up @@ -56,6 +56,11 @@ rm -f /etc/zypp/repos.d/*
<stack:report stack:name="host.repo">&hostname;</stack:report>
</stack:script>

<stack:script stack:stage="install-post">
<stack:file stack:name="/opt/stack/etc/stacki.yml">
<stack:eval>cat /opt/stack/etc/stacki.yml;</stack:eval>
</stack:file>
</stack:script>

<stack:script stack:stage="install-post" stack:cond="release == 'sles12'">
<stack:report stack:name="host.interface">&hostname;</stack:report>
Expand All @@ -69,6 +74,9 @@ rm -f /etc/zypp/repos.d/*
<stack:report stack:name="host.interface">&hostname;</stack:report>
<stack:report stack:name="host.route">&hostname;</stack:report>
<stack:report stack:name="host.resolv">&hostname;</stack:report>
<stack:file stack:name="/opt/stack/etc/stacki.yml">
<stack:eval>cat /opt/stack/etc/stacki.yml;</stack:eval>
</stack:file>

#
# the above command (report host resolv) writes /etc/resolv.conf. make sure
Expand Down Expand Up @@ -102,5 +110,5 @@ rm -rf /tmp/ipmisetup
</stack:script>


</stack:stack>
</stack:stack>