Skip to content

Commit

Permalink
Add connection function which create a new additon by address, no lon…
Browse files Browse the repository at this point in the history
…ger subscrbing all connection at once. Wheneve making a subscriber always to make a new connection.
  • Loading branch information
yuyiguo committed Sep 1, 2021
1 parent 09f9329 commit 6793d43
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 69 deletions.
134 changes: 79 additions & 55 deletions stompserver/endpointsConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,38 @@ func initStomp(endpoint string, stompURI string) *lbstomp.StompManager {
Protocol: Config.Protocol,
Verbose: Config.Verbose,
}
// This will make the connection pool and get the addresses. However, we will have to connect to
// each address seperatly in order to handle the individual broker in go rountin listern
stompManger := lbstomp.New(p)
log.Println(stompManger.String())
log.Println(stompManger.Addresses)
// disconnect foe now
stompManger.ResetConnection()
return stompManger
}

func Connection(smgr *lbstomp.StompManager, addr string) (*stomp.Conn, string, error) {
sendTimeout := time.Duration(smgr.Config.SendTimeout)
recvTimeout := time.Duration(smgr.Config.RecvTimeout)
heartBeatGracePeriod := smgr.Config.HeartBeatGracePeriod
if heartBeatGracePeriod == 0 {
heartBeatGracePeriod = 10
}
//
conn, err := stomp.Dial(smgr.Config.Protocol, addr,
stomp.ConnOpt.Login(smgr.Config.Login, smgr.Config.Password),
stomp.ConnOpt.HeartBeat(sendTimeout*time.Millisecond, recvTimeout*time.Millisecond),
stomp.ConnOpt.HeartBeatGracePeriodMultiplier(heartBeatGracePeriod),
)
if err != nil {
log.Printf("Unable to connect to '%s', error %v\n", addr, err)
return nil, addr, err
} else {
log.Printf("connected to StompAMQ server '%s'\n", addr)
return conn, addr, nil
}
}

// subscribe is a helper function to subscribe to StompAMQ end-point as a listener.
func subscribe(smgr *lbstomp.StompManager) (*stomp.Subscription, error) {
// get connection
Expand All @@ -134,77 +160,75 @@ func subscribeAll(smgr *lbstomp.StompManager) ([]*stomp.Subscription, error) {
log.Println("stomp connection", conn, addr)
// subscribe to ActiveMQ topic
sub, err := conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
if err != nil {
log.Println("unable to subscribe to: ", smgr.Config.Endpoint, ". Error message: n", err)
return subscriptions, err
if err != nil || sub == nil {
log.Fatalln("unable to subscribe to: ", smgr.Config.Endpoint, ". Error message: n", err)
//return subscriptions, err
}
subscriptions = append(subscriptions, sub)
}
log.Println("stomp subscriptions", subscriptions)
log.Println("stomp subscriptions succeed: ", subscriptions)
return subscriptions, nil
}

//
// listener is a function to get data from a subsciption and pass the data to a chan
func listener(smgr *lbstomp.StompManager, sub *stomp.Subscription, ch chan<- *stomp.Message, cpool int) {
func listener(smgr *lbstomp.StompManager, addr string, ch chan<- *stomp.Message) {
var sub *stomp.Subscription
// get stomp messages from the subscriber channel
sleep := time.Duration(Config.Interval) * time.Millisecond
var err error
sleep := time.Duration(Config.Interval*1000) * time.Millisecond
// make connection for the addr .
conn, addr2, err := Connection(smgr, addr)
if err != nil || conn == nil {
log.Println("Error when connect to ", addr2, ". Error message: ", err)
time.Sleep(sleep)
// try again
if conn != nil {
conn.Disconnect()
}
conn, addr2, err = Connection(smgr, addr)
if err != nil || conn == nil {
log.Println("Error when connect to ", addr2, ". Error message: ", err, ". Disconnecting ...")
if conn != nil {
conn.Disconnect()
}
}
}
if conn != nil {
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
if err != nil || sub == nil {
log.Println("unable to subscribe to: ", smgr.Config.Endpoint, ". Error message: n", err)
time.Sleep(sleep)
// try again
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
if err != nil || sub == nil {
log.Fatalln("unable to subscribe to: ", smgr.Config.Endpoint, ". Error message: n", err)
}
}
}
//
for {
if conn == nil || sub == nil || !sub.Active() {
if conn != nil {
conn.Disconnect()
}
conn, addr2, err = Connection(smgr, addr)
if err != nil || conn == nil {
time.Sleep(sleep)
continue
}
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
if err != nil || sub == nil {
time.Sleep(sleep)
continue
}
}
select {
case msg := <-sub.C:
if Config.Verbose > 2 {
log.Printf("********* conn pool # %d ************", cpool)
log.Printf("********* connection to %s ************\n", addr)
}
if msg.Err != nil {
log.Println("receive error message: ", msg.Err)
// subscription checking
if sub != nil {
if !sub.Active() {
// we have to connect back to the same connection/broker with cpool
conn := smgr.ConnectionPool[cpool]
log.Println("stomp connection: ", conn)
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
if err != nil {
log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err)
break
// wait
//time.Sleep(sleep)
// conn := smgr.ConnectionPool[cpool]
//log.Println("stomp reconnected: ", conn)
// subscribe to ActiveMQ topic
//sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
// FIXME: listener is goroutin, exit(1) exit this gorountin ? all the entire server?
//if err != nil {
//log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err)
//break
//log.Fatalf("Cann't subscribe to connect %d of %s, exit(1).\n ", cpool, smgr.Config.Endpoint)
//}
//break
}
} else {
time.Sleep(sleep)
}
} else {
conn := smgr.ConnectionPool[cpool]
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
if err != nil {
log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err)
// wait
time.Sleep(sleep)
//conn := smgr.ConnectionPool[cpool]
//log.Println("stomp reconnected: ", conn)
// subscribe to ActiveMQ topic
sub, err = conn.Subscribe(smgr.Config.Endpoint, stomp.AckAuto)
// FIXME: listener is goroutin, exit(1) exit this gorountin ? or the entire server?
if err != nil {
log.Println("unable to subscribe to: ", Config.EndpointConsumer, ". Error message: ", err)
break
//log.Fatalf("Cann't subscribe to connect %d of %s, exit(1). \n", cpool, smgr.Config.Endpoint)
}
break
}
}
} else {
ch <- msg
}
Expand Down
19 changes: 5 additions & 14 deletions stompserver/xrootdTracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,24 +224,15 @@ func xrtdServer() {
var ts uint64
var restartSrv uint
smgr := initStomp(Config.EndpointConsumer, Config.StompURIConsumer)
// get subscriptions
subs, err := subscribeAll(smgr)
if err != nil {
log.Println(err)
subs, err = subscribeAll(smgr)
if err != nil {
log.Fatalf("Unable to subscribe to all the brokers, fatal error!\n")
}
}
// ch for all the listeners to write to
ch := make(chan *stomp.Message)
// defer close executed when the main function is about to exit.
// In this way the channel is to be closed and no resources taken.
defer close(ch)
for i, sub := range subs {
go listener(smgr, sub, ch, i)

for _, addr := range smgr.Addresses {
go listener(smgr, addr, ch)
}
//
for {
// get stomp messages from ch
select {
Expand All @@ -264,9 +255,9 @@ func xrtdServer() {
atomic.StoreUint64(&tc, 0)
t2 = time.Now().Unix() - t1
t1 = time.Now().Unix()
log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_swpop))
log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_xrtd))
log.Printf("Processing 1000 messages took %d seconds.\n", t2)
atomic.StoreUint64(&Receivedperk_swpop, 0)
atomic.StoreUint64(&Receivedperk_xrtd, 0)
}
if err != nil && err.Error() != "Empty message" {
log.Println("xrootd/aaa message processing error", err)
Expand Down

0 comments on commit 6793d43

Please sign in to comment.