diff --git a/stompserver/endpointsConfig.go b/stompserver/endpointsConfig.go index 10b896c..d502e28 100644 --- a/stompserver/endpointsConfig.go +++ b/stompserver/endpointsConfig.go @@ -102,12 +102,38 @@ func initStomp(endpoint string, stompURI string) *lbstomp.StompManager { Protocol: Config.Protocol, Verbose: Config.Verbose, } + // This will make the connection pool and get the addresses. However, we will have to connect to + // each address seperatly in order to handle the individual broker in go rountin listern stompManger := lbstomp.New(p) log.Println(stompManger.String()) log.Println(stompManger.Addresses) + // disconnect foe now + stompManger.ResetConnection() return stompManger } +func Connection(smgr *lbstomp.StompManager, addr string) (*stomp.Conn, string, error) { + sendTimeout := time.Duration(smgr.Config.SendTimeout) + recvTimeout := time.Duration(smgr.Config.RecvTimeout) + heartBeatGracePeriod := smgr.Config.HeartBeatGracePeriod + if heartBeatGracePeriod == 0 { + heartBeatGracePeriod = 10 + } + // + conn, err := stomp.Dial(smgr.Config.Protocol, addr, + stomp.ConnOpt.Login(smgr.Config.Login, smgr.Config.Password), + stomp.ConnOpt.HeartBeat(sendTimeout*time.Millisecond, recvTimeout*time.Millisecond), + stomp.ConnOpt.HeartBeatGracePeriodMultiplier(heartBeatGracePeriod), + ) + if err != nil { + log.Printf("Unable to connect to '%s', error %v\n", addr, err) + return nil, addr, err + } else { + log.Printf("connected to StompAMQ server '%s'\n", addr) + return conn, addr, nil + } +} + // subscribe is a helper function to subscribe to StompAMQ end-point as a listener. func subscribe(smgr *lbstomp.StompManager) (*stomp.Subscription, error) { // get connection @@ -134,77 +160,75 @@ func subscribeAll(smgr *lbstomp.StompManager) ([]*stomp.Subscription, error) { log.Println("stomp connection", conn, addr) // subscribe to ActiveMQ topic sub, err := conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) - if err != nil { - log.Println("unable to subscribe to: ", smgr.Config.Endpoint, ". Error message: n", err) - return subscriptions, err + if err != nil || sub == nil { + log.Fatalln("unable to subscribe to: ", smgr.Config.Endpoint, ". Error message: n", err) + //return subscriptions, err } subscriptions = append(subscriptions, sub) } - log.Println("stomp subscriptions", subscriptions) + log.Println("stomp subscriptions succeed: ", subscriptions) return subscriptions, nil } // // listener is a function to get data from a subsciption and pass the data to a chan -func listener(smgr *lbstomp.StompManager, sub *stomp.Subscription, ch chan<- *stomp.Message, cpool int) { +func listener(smgr *lbstomp.StompManager, addr string, ch chan<- *stomp.Message) { + var sub *stomp.Subscription // get stomp messages from the subscriber channel - sleep := time.Duration(Config.Interval) * time.Millisecond - var err error + sleep := time.Duration(Config.Interval*1000) * time.Millisecond + // make connection for the addr . + conn, addr2, err := Connection(smgr, addr) + if err != nil || conn == nil { + log.Println("Error when connect to ", addr2, ". Error message: ", err) + time.Sleep(sleep) + // try again + if conn != nil { + conn.Disconnect() + } + conn, addr2, err = Connection(smgr, addr) + if err != nil || conn == nil { + log.Println("Error when connect to ", addr2, ". Error message: ", err, ". Disconnecting ...") + if conn != nil { + conn.Disconnect() + } + } + } + if conn != nil { + sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) + if err != nil || sub == nil { + log.Println("unable to subscribe to: ", smgr.Config.Endpoint, ". Error message: n", err) + time.Sleep(sleep) + // try again + sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) + if err != nil || sub == nil { + log.Fatalln("unable to subscribe to: ", smgr.Config.Endpoint, ". Error message: n", err) + } + } + } + // for { + if conn == nil || sub == nil || !sub.Active() { + if conn != nil { + conn.Disconnect() + } + conn, addr2, err = Connection(smgr, addr) + if err != nil || conn == nil { + time.Sleep(sleep) + continue + } + sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) + if err != nil || sub == nil { + time.Sleep(sleep) + continue + } + } select { case msg := <-sub.C: if Config.Verbose > 2 { - log.Printf("********* conn pool # %d ************", cpool) + log.Printf("********* connection to %s ************\n", addr) } if msg.Err != nil { log.Println("receive error message: ", msg.Err) - // subscription checking - if sub != nil { - if !sub.Active() { - // we have to connect back to the same connection/broker with cpool - conn := smgr.ConnectionPool[cpool] - log.Println("stomp connection: ", conn) - sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) - if err != nil { - log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err) - break - // wait - //time.Sleep(sleep) - // conn := smgr.ConnectionPool[cpool] - //log.Println("stomp reconnected: ", conn) - // subscribe to ActiveMQ topic - //sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) - // FIXME: listener is goroutin, exit(1) exit this gorountin ? all the entire server? - //if err != nil { - //log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err) - //break - //log.Fatalf("Cann't subscribe to connect %d of %s, exit(1).\n ", cpool, smgr.Config.Endpoint) - //} - //break - } - } else { - time.Sleep(sleep) - } - } else { - conn := smgr.ConnectionPool[cpool] - sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) - if err != nil { - log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err) - // wait - time.Sleep(sleep) - //conn := smgr.ConnectionPool[cpool] - //log.Println("stomp reconnected: ", conn) - // subscribe to ActiveMQ topic - sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto) - // FIXME: listener is goroutin, exit(1) exit this gorountin ? or the entire server? - if err != nil { - log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err) - break - //log.Fatalf("Cann't subscribe to connect %d of %s, exit(1). \n", cpool, smgr.Config.Endpoint) - } - break - } - } } else { ch <- msg } diff --git a/stompserver/xrootdTracer.go b/stompserver/xrootdTracer.go index 42a0cde..585488d 100644 --- a/stompserver/xrootdTracer.go +++ b/stompserver/xrootdTracer.go @@ -224,24 +224,15 @@ func xrtdServer() { var ts uint64 var restartSrv uint smgr := initStomp(Config.EndpointConsumer, Config.StompURIConsumer) - // get subscriptions - subs, err := subscribeAll(smgr) - if err != nil { - log.Println(err) - subs, err = subscribeAll(smgr) - if err != nil { - log.Fatalf("Unable to subscribe to all the brokers, fatal error!\n") - } - } // ch for all the listeners to write to ch := make(chan *stomp.Message) // defer close executed when the main function is about to exit. // In this way the channel is to be closed and no resources taken. defer close(ch) - for i, sub := range subs { - go listener(smgr, sub, ch, i) - + for _, addr := range smgr.Addresses { + go listener(smgr, addr, ch) } + // for { // get stomp messages from ch select { @@ -264,9 +255,9 @@ func xrtdServer() { atomic.StoreUint64(&tc, 0) t2 = time.Now().Unix() - t1 t1 = time.Now().Unix() - log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_swpop)) + log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_xrtd)) log.Printf("Processing 1000 messages took %d seconds.\n", t2) - atomic.StoreUint64(&Receivedperk_swpop, 0) + atomic.StoreUint64(&Receivedperk_xrtd, 0) } if err != nil && err.Error() != "Empty message" { log.Println("xrootd/aaa message processing error", err)