Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt client connection issues #111

pikowai opened this issue Jan 2, 2022 · 0 comments

Mqtt client connection issues #111

pikowai opened this issue Jan 2, 2022 · 0 comments


Copy link

pikowai commented Jan 2, 2022

I am using flask with uwsgi and nginx to accept webhooks from a private instance of The Things Stack and then trying to post mqtt commands to control the IO devices in the field. I can connect using paho to create a client and post data. When I try to use flask_mqtt the client connects and then gives an error message:

16 Error: Received CONNACK (0, 0)
16 Error: Sending PINGREQ
16 Error: Received PINGRESP
16 Error: Sending PINGREQ

On the server side, the logs show:

stack_1 | INFO Connected application_uid=pump-control namespace=applicationserver/io/mqtt remote_addr=
stack_1 | WARN Failed to setup connection error=read tcp> i/o timeout namespace=applicationserver/io/mqtt remote_addr=
stack_1 | WARN Error receiving packet client_id= error=read tcp> i/o timeout namespace=applicationserver/io/mqtt username=pump-control
stack_1 | WARN Error when reading packet application_uid=pump-control error=read tcp> i/o timeout namespace=applicationserver/io/mqtt remote_addr=

It works fine if I just use the paho mqtt client, it runs happily as a service.

working source with some connection details removed.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from flask import Flask, request
import paho.mqtt.client as mqttClient
import json
import re
import time
import base64
import ssl

app = Flask(__name__)

TLS_PORT = 8883  # Secure port
MQTT_PASSWORD = ""  # Leave this in blank
TOPIC = "v3/x/devices/"
DEVICE_LABEL = "dam-pump/"
QUEUE_REPLACE = "down/replace"
QUEUE_PUSH = "down/push"
TLS_CERT_PATH = "isrgrootx1.pem"  # Put here the path of your TLS cert
OFF_CMD = bytes([8,2,0,0,1,0,0])
ON_CMD = bytes([8,2,0,0,1,0,3])
ON1_CMD = bytes([8,2,0,0,1,0,1])
ON2_CMD = bytes([8,2,0,0,1,0,2])

connected = 0

def connect(mqtt_client, mqtt_username, mqtt_password, broker_endpoint, port):
    global connected

    if not mqtt_client.is_connected():
        mqtt_client.username_pw_set(mqtt_username, password=mqtt_password)
        mqtt_client.on_connect = on_connect
        mqtt_client.on_publish = on_publish
        mqtt_client.tls_set(ca_certs=TLS_CERT_PATH, certfile=None,
                            keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
                            tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
        mqtt_client.connect(broker_endpoint, port=port)

        attempts = 0

        while not connected and attempts < 5:  # Wait for connection
            print("Attempting to connect...")
            attempts += 1

    if not connected:
        print("[ERROR] Could not connect to broker")
        return False

    return True

def publish(mqtt_client, topic, payload):

        mqtt_client.publish(topic, payload)

    except Exception as e:
        print("[ERROR] Could not publish data, error: {}".format(e))

def send_command(mqtt,cmd):
    payload_dict = {"downlinks": [{"f_port": 1,"frm_payload": "","priority": "NORMAL"}]}
    raw_cmd = base64.b64encode(cmd)
    payload_dict["downlinks"][0]["frm_payload"] = raw_cmd.decode("utf8")
    payload = json.dumps(payload_dict)
    topic = "{}{}{}".format(TOPIC, DEVICE_LABEL, QUEUE_REPLACE)
    print (topic)
    ret = mqtt.publish (topic, payload)
    print('return from mqtt is ',ret)

def processP1(payload):
    print('p1 array voltage ',payload['uplink_message']['decoded_payload']['pvArray1V'])
    array_voltage = payload['uplink_message']['decoded_payload']['pvArray1V']
    if array_voltage < 24.5:
        NIGHT = 1
        cmd = ''
        NIGHT = 0
        cmd = ''
    return cmd

def processP2(payload):
    batt_voltage = payload['uplink_message']['decoded_payload']['batt1V']
    print('p2 payload',payload['uplink_message']['decoded_payload']['batt1V'])
    print('bat voltage is ', batt_voltage)
    if batt_voltage < 24:
        LOW_BATT = 1
        cmd = OFF_CMD
        LOW_BATT = 0
        cmd = ''
    return cmd

def processIO(payload):
    print('IO payload',payload["uplink_message"]["decoded_payload"])
    if (payload["uplink_message"]["decoded_payload"]['DI2'] == 1 and payload["uplink_message"]["decoded_payload"]['DI1']==1):
                cmd = ON1_CMD
    elif payload["uplink_message"]["decoded_payload"]['DI1']==1:
        if PUMP_STATE == 0:
            PUMP_STATE = 1
            PUMP_START_TIME = time.time()
            cmd = ''
            print('pump started at ',PUMP_START_TIME)
        elif PUMP_STATE == 1:
            print ("pump running time is ",time.time() - PUMP_START_TIME)
            if time.time() - PUMP_START_TIME > 1500:
                cmd = ON1_CMD 
            elif payload["uplink_message"]["decoded_payload"]['DI2'] == 1:
                cmd = ON1_CMD
                cmd = ''
            cmd = OFF_CMD
            print('pump is over time')
#    elif payload["uplink_message"]["decoded_payload"]['DO1']==0:
#        cmd = ON1_CMD
    elif payload["uplink_message"]["decoded_payload"]['DO1']==1:
        cmd = OFF_CMD
        PUMP_STATE = 0
        cmd = ''
        print('pump stopped')
    return cmd

def ackManage(payload):
    print('ack recieved')
    cmd = ''
    return cmd 

def payload_switch(payload):
    switch = {'DataFrameP1':processP1,
    print('here', payload["uplink_message"]["decoded_payload"]['msgType'])
    func = switch.get(payload["uplink_message"]["decoded_payload"]['msgType'],'')
    cmd = func(payload)        
    return cmd

def parse_request(req):
    Parses application/json request body data into a Python dictionary
    payload = req.get_data()
  #  payload = unquote_plus(payload)
  #  payload = re.sub('payload=', '', payload)
    payload = json.loads(payload)
    cmd = payload_switch(payload)

    return cmd

def on_connect(client, userdata, flags, rc):
    global connected  # Use global variable
    if rc == 0:

        print("[INFO] Connected to broker")
        connected = True  # Signal connection
        print("[INFO] Error, connection failed")

def on_publish(client, userdata, result):
    print("Published!", result)

@app.route('/', methods=['GET'])
def index():
    Go to localhost:5000 to see a message
    return ('This is a website.', 200, None)

@app.route('/api/print', methods=['POST'])
def print_test():
    Send a POST request to localhost:5000/api/print with a JSON body with a "p" key
    to print that message in the server console.
    payload = parse_request(request)
    print (payload['p'])
    return ("", 200, None)

@app.route('/api/pv-data', methods=['POST'])
def pv_webhook():
    print ("Processing request...")
    cmd = parse_request(request)
    print ("cmd is ", cmd)
    if cmd != '':
        print('sending cmd', cmd)
        mqtt = mqttClient.Client()
        if not connect(mqtt, MQTT_USERNAME,
            return False
        send_command(mqtt, cmd)
    return ("OK",200, None)

if __name__ == '__main__':, use_reloader=False, host='', port = 9000)

While this connects and then immediately disconnects

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from flask import Flask, request
from flask_mqtt import Mqtt
import json
import re
import time
import base64
import ssl

app = Flask(__name__)

app.config['MQTT_BROKER_URL'] = 'test'  # use the free broker from HIVEMQ
app.config['MQTT_BROKER_PORT'] = 8883  # default port for non-tls connection
app.config['MQTT_USERNAME'] = 'test'  # set the username here if you need authentication for the broker
app.config['MQTT_PASSWORD'] = ''  # set the password here if the broker demands authentication
app.config['MQTT_KEEPALIVE'] = 5  # set the time interval for sending a ping to the broker to 5 seconds
app.config['MQTT_TLS_ENABLED'] = True
app.config['MQTT_TLS_INSECURE'] = True
app.config['MQTT_TLS_VERSION'] = ssl.PROTOCOL_TLSv1_2
app.config['MQTT_TLS_CA_CERTS'] = 'isrgrootx1.pem'# set TLS to disabled for testing purposes
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_CLIENT_ID'] = '111222555'
app.config['MQTT_CLEAN_SESSION'] = True
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 0
app.config['MQTT_PROTOCOL'] = 'MQTTv31'

TOPIC = "v3/pump-control/devices/"
QUEUE_REPLACE = "down/replace"
QUEUE_PUSH = "down/push"
DEVICE_LABEL = 'dam-pump/'
OFF_CMD = bytes([8,2,0,0,1,0,0])
ON_CMD = bytes([8,2,0,0,1,0,3])
ON1_CMD = bytes([8,2,0,0,1,0,1])
ON2_CMD = bytes([8,2,0,0,1,0,2])

mqtt = Mqtt(app)

def send_command(mqtt,cmd):
    payload_dict = {"downlinks": [{"f_port": 1,"frm_payload": "","priority": "NORMAL"}]}
    raw_cmd = base64.b64encode(cmd)
    payload_dict["downlinks"][0]["frm_payload"] = raw_cmd.decode("utf8")
    payload = json.dumps(payload_dict)
    topic = "{}{}{}".format(TOPIC, DEVICE_LABEL, QUEUE_REPLACE)
    print (topic)
    ret = mqtt.publish (topic, payload)
    print('return from mqtt is ',ret)

def processP1(payload):
    print('p1 array voltage ',payload['uplink_message']['decoded_payload']['pvArray1V'])
    array_voltage = payload['uplink_message']['decoded_payload']['pvArray1V']
    if array_voltage < 24.5:
        NIGHT = 1
        cmd = ''
        NIGHT = 0
        cmd = ''
    return cmd

def processP2(payload):
    batt_voltage = payload['uplink_message']['decoded_payload']['batt1V']
    print('p2 payload',payload['uplink_message']['decoded_payload']['batt1V'])
    print('bat voltage is ', batt_voltage)
    if batt_voltage < 24:
        LOW_BATT = 1
        cmd = OFF_CMD
        LOW_BATT = 0
        cmd = ''
    return cmd

def processIO(payload):
    print('IO payload',payload["uplink_message"]["decoded_payload"])
    if (payload["uplink_message"]["decoded_payload"]['DI2'] == 1 and payload["uplink_message"]["decoded_payload"]['DI1']==1):
                cmd = ON1_CMD
    elif payload["uplink_message"]["decoded_payload"]['DI1']==1:
        if PUMP_STATE == 0:
            PUMP_STATE = 1
            PUMP_START_TIME = time.time()
            cmd = ''
            print('pump started at ',PUMP_START_TIME)
        elif PUMP_STATE == 1:
            print ("pump running time is ",time.time() - PUMP_START_TIME)
            if time.time() - PUMP_START_TIME > 300:
                cmd = ON1_CMD 
            elif payload["uplink_message"]["decoded_payload"]['DI2'] == 1:
                cmd = ON1_CMD
                cmd = ''
            cmd = OFF_CMD
            print('pump is over time')
#    elif payload["uplink_message"]["decoded_payload"]['DO1']==0:
#        cmd = ON1_CMD
    elif payload["uplink_message"]["decoded_payload"]['DO1']==1:
        cmd = OFF_CMD
        PUMP_STATE = 0
        cmd = ''
        print('pump stopped')
    return cmd

def ackManage(payload):
    print('ack recieved')
    cmd = ''
    return cmd 

def payload_switch(payload):
    switch = {'DataFrameP1':processP1,
    print('here', payload["uplink_message"]["decoded_payload"]['msgType'])
    func = switch.get(payload["uplink_message"]["decoded_payload"]['msgType'],'')
    cmd = func(payload)        
    return cmd

def parse_request(req):
    Parses application/json request body data into a Python dictionary
    payload = req.get_data()
  #  payload = unquote_plus(payload)
  #  payload = re.sub('payload=', '', payload)
    payload = json.loads(payload)
    cmd = payload_switch(payload)

    return cmd

def handle_logging(client, userdata, level, buf):
    print(level,'Error: {}'.format(buf))

@app.route('/', methods=['GET'])
def index():
    Go to localhost:5000 to see a message
    return ('This is a website.', 200, None)

@app.route('/api/print', methods=['POST'])
def print_test():
    Send a POST request to localhost:5000/api/print with a JSON body with a "p" key
    to print that message in the server console.
    payload = parse_request(request)
    print (payload['p'])
    return ("", 200, None)

@app.route('/api/sum', methods=['POST'])
def sum():
    Send a POST request to localhost:5000/api/sum with a JSON body with an "a" and "b" key
    to have the app add those numbers together and return a response string with their sum.
    print ("Processing request...")
    payload = parse_request(request)
    print ("Receieved following paylod:")
    print (payload)

    print ("Adding sum...")
    summation = payload['a'] + payload['b']
    print ("Found sum: %s" % summation)

    print ("Creating response string...")
    resp = '%s + %s = %s' % (payload['a'], payload['b'], summation)
    print ("Sending the following response:")
    print (resp)

    return (resp, 200, None)

@app.route('/api/pv-data', methods=['POST'])
def pv_webhook():
    print ("Processing request...")
    cmd = parse_request(request)
    print ("cmd is ", cmd)
    if cmd != '':
        print('sending cmd', cmd)
        send_command(mqtt, cmd)
    return ("OK",200, None)

if __name__ == '__main__':, use_reloader=False, host='', port = 9000)

the following is the uwsgi.ini I use,

module = pumphook:app

processes = 1
single-interpreter = true

socket = flask.sock
chmod-socket = 664
vacuum = true

die-on-term = true

Any advice on where I'm going wrong would be much appreciated,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet
None yet

No branches or pull requests

1 participant