Skip to content

Commit

Permalink
Merge pull request #36 from yuyiguo/xrootd
Browse files Browse the repository at this point in the history
testing for connection and subscription.
  • Loading branch information
yuyiguo authored Aug 31, 2021
2 parents 0b5dbbc + 09f9329 commit 2b16ee1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
41 changes: 35 additions & 6 deletions stompserver/endpointsConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,49 @@ func listener(smgr *lbstomp.StompManager, sub *stomp.Subscription, ch chan<- *st
if msg.Err != nil {
log.Println("receive error message: ", msg.Err)
// subscription checking
if !sub.Active() {
// we have to connect back to the same connection/broker with cpool
if sub != nil {
if !sub.Active() {
// we have to connect back to the same connection/broker with cpool
conn := smgr.ConnectionPool[cpool]
log.Println("stomp connection: ", conn)
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
if err != nil {
log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err)
break
// wait
//time.Sleep(sleep)
// conn := smgr.ConnectionPool[cpool]
//log.Println("stomp reconnected: ", conn)
// subscribe to ActiveMQ topic
//sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
// FIXME: listener is goroutin, exit(1) exit this gorountin ? all the entire server?
//if err != nil {
//log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err)
//break
//log.Fatalf("Cann't subscribe to connect %d of %s, exit(1).\n ", cpool, smgr.Config.Endpoint)
//}
//break
}
} else {
time.Sleep(sleep)
}
} else {
conn := smgr.ConnectionPool[cpool]
log.Println("stomp connection: ", conn)
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
if err != nil {
log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err)
// wait
time.Sleep(sleep)
conn := smgr.ConnectionPool[cpool]
log.Println("stomp reconnected: ", conn)
//conn := smgr.ConnectionPool[cpool]
//log.Println("stomp reconnected: ", conn)
// subscribe to ActiveMQ topic
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
// FIXME: IF there is error again, what we should do? YG aug25, 2021
// FIXME: listener is goroutin, exit(1) exit this gorountin ? or the entire server?
if err != nil {
log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err)
break
//log.Fatalf("Cann't subscribe to connect %d of %s, exit(1). \n", cpool, smgr.Config.Endpoint)
}
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion stompserver/xrootdTracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func xrtdServer() {
log.Println(err)
subs, err = subscribeAll(smgr)
if err != nil {
log.Fatalf("Unable to subscribe to all the brokers, fatal error!")
log.Fatalf("Unable to subscribe to all the brokers, fatal error!\n")
}
}
// ch for all the listeners to write to
Expand Down

0 comments on commit 2b16ee1

Please sign in to comment.