From 09f93291edff7e0cdc0701f4f01413c03d184901 Mon Sep 17 00:00:00 2001 From: yuyi Date: Tue, 31 Aug 2021 14:33:11 -0500 Subject: [PATCH] testing for connection and subscription. --- stompserver/endpointsConfig.go | 41 +++++++++++++++++++++++++++++----- stompserver/xrootdTracer.go | 2 +- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/stompserver/endpointsConfig.go b/stompserver/endpointsConfig.go index ea5ae05..10b896c 100644 --- a/stompserver/endpointsConfig.go +++ b/stompserver/endpointsConfig.go @@ -159,20 +159,49 @@ func listener(smgr *lbstomp.StompManager, sub *stomp.Subscription, ch chan<- *st if msg.Err != nil { log.Println("receive error message: ", msg.Err) // subscription checking - if !sub.Active() { - // we have to connect back to the same connection/broker with cpool + if sub != nil { + if !sub.Active() { + // we have to connect back to the same connection/broker with cpool + conn := smgr.ConnectionPool[cpool] + log.Println("stomp connection: ", conn) + sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) + if err != nil { + log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err) + break + // wait + //time.Sleep(sleep) + // conn := smgr.ConnectionPool[cpool] + //log.Println("stomp reconnected: ", conn) + // subscribe to ActiveMQ topic + //sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) + // FIXME: listener is goroutin, exit(1) exit this gorountin ? all the entire server? + //if err != nil { + //log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err) + //break + //log.Fatalf("Cann't subscribe to connect %d of %s, exit(1).\n ", cpool, smgr.Config.Endpoint) + //} + //break + } + } else { + time.Sleep(sleep) + } + } else { conn := smgr.ConnectionPool[cpool] - log.Println("stomp connection: ", conn) sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) if err != nil { log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err) // wait time.Sleep(sleep) - conn := smgr.ConnectionPool[cpool] - log.Println("stomp reconnected: ", conn) + //conn := smgr.ConnectionPool[cpool] + //log.Println("stomp reconnected: ", conn) // subscribe to ActiveMQ topic sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) - // FIXME: IF there is error again, what we should do? YG aug25, 2021 + // FIXME: listener is goroutin, exit(1) exit this gorountin ? or the entire server? + if err != nil { + log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err) + break + //log.Fatalf("Cann't subscribe to connect %d of %s, exit(1). \n", cpool, smgr.Config.Endpoint) + } break } } diff --git a/stompserver/xrootdTracer.go b/stompserver/xrootdTracer.go index 0169f60..42a0cde 100644 --- a/stompserver/xrootdTracer.go +++ b/stompserver/xrootdTracer.go @@ -230,7 +230,7 @@ func xrtdServer() { log.Println(err) subs, err = subscribeAll(smgr) if err != nil { - log.Fatalf("Unable to subscribe to all the brokers, fatal error!") + log.Fatalf("Unable to subscribe to all the brokers, fatal error!\n") } } // ch for all the listeners to write to