From c234e0a9621c87126c12c2376cd103fa56a5bb98 Mon Sep 17 00:00:00 2001 From: Rory Z <16801068+Rory-Z@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:59:02 +0800 Subject: [PATCH] fix(probe): fix muilt target error Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com> --- prober/mqtt.go | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/prober/mqtt.go b/prober/mqtt.go index 9412e24..5d12a27 100644 --- a/prober/mqtt.go +++ b/prober/mqtt.go @@ -1,6 +1,7 @@ package prober import ( + "context" "emqx-exporter/config" "time" @@ -14,7 +15,31 @@ type MQTTProbe struct { MsgChan <-chan mqtt.Message } -var mqttProbe *MQTTProbe +var mqttProbeMap map[string]*MQTTProbe + +func init() { + mqttProbeMap = make(map[string]*MQTTProbe) + go func() { + for { + for target, probe := range mqttProbeMap { + if probe == nil { + delete(mqttProbeMap, target) + continue + } + if !probe.Client.IsConnected() { + delete(mqttProbeMap, target) + continue + } + } + + select { + case <-context.Background().Done(): + return + case <-time.After(5 * time.Second): + } + } + }() +} func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target).SetClientID(probe.ClientID).SetUsername(probe.Username).SetPassword(probe.Password) @@ -45,11 +70,13 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { } func ProbeMQTT(probe config.Probe, logger log.Logger) bool { - if mqttProbe == nil { + mqttProbe, ok := mqttProbeMap[probe.Target] + if !ok { var err error if mqttProbe, err = initMQTTProbe(probe, logger); err != nil { return false } + mqttProbeMap[probe.Target] = mqttProbe } if !mqttProbe.Client.IsConnected() {