Skip to content

Commit

Permalink
create tracer server for cmsswpop and update configs and so on.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyiguo committed Jul 2, 2021
1 parent bbf371a commit 0aeb937
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 39 deletions.
67 changes: 67 additions & 0 deletions etc/domainsitemap.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
[
{"Domain": "accre.vanderbilt.edu", "RSEs": [ "T2_US_Vanderbilt" ] },
{"Domain": "acrc.bris.ac.uk", "RSEs": [ "T2_UK_SGrid_Bristol" ] },
{"Domain": "phy.bris.ac.uk", "RSEs": [ "T2_UK_SGrid_Bristol" ] },
{"Domain": "baylor.edu", "RSEs": [ "T3_US_Baylor" ] },
{"Domain": "brunel.ac.uk", "RSEs": [ "T2_UK_London_Brunel" ] },
{"Domain": "cern.ch", "RSEs": [ "T2_CH_CERN" ] },
{"Domain": "ciemat.es", "RSEs": [ "T2_ES_CIEMAT" ] },
{"Domain": "cis.gov.pl", "RSEs": [ "T2_PL_Swierk" ] },
{"Domain": "cism.ucl.ac.be", "RSEs": [ "T2_BE_UCL" ] },
{"Domain": "cmsaf.mit.edu", "RSEs": [ "T2_US_MIT" ] },
{"Domain": "colorado.edu", "RSEs": [ "T3_US_Colorado" ] },
{"Domain": "cr.cnaf.infn.it", "RSEs": [ "T1_IT_CNAF_Disk" ] },
{"Domain": "csc.fi", "RSEs": [ "T2_FI_HIP" ] },
{"Domain": "cscs.ch", "RSEs": [ "T2_CH_CSCS" ] },
{"Domain": "lcg.cscs.ch", "RSEs": [ "T2_CH_CSCS" ] },
{"Domain": "datagrid.cea.fr", "RSEs": [ "T2_FR_GRIF_IRFU" ] },
{"Domain": "desy.de", "RSEs": [ "T2_DE_DESY" ] },
{"Domain": "echo.stfc.ac.uk", "RSEs": [ "T1_UK_RAL_Disk" ] },
{"Domain": "eos.grid.vbc.ac.at", "RSEs": [ "T2_AT_Vienna" ] },
{"Domain": "grid.vbc.ac.at", "RSEs": [ "T2_AT_Vienna" ] },
{"Domain": "fnal.gov", "RSEs": [ "T1_US_FNAL_Disk" ] },
{"Domain": "grid.hep.ph.ic.ac.uk", "RSEs": [ "T2_UK_London_IC" ] },
{"Domain": "grid.metu.edu.tr", "RSEs": [ "T2_TR_METU" ] },
{"Domain": "grid.nchc.org.tw", "RSEs": [ "T2_TW_NCHC" ] },
{"Domain": "gridka.de", "RSEs": [ "T1_DE_KIT_Disk" ] },
{"Domain": "gridpp.rl.ac.uk", "RSEs": [ "T1_UK_RAL_Disk" ] },
{"Domain": "hep.kbfi.ee", "RSEs": [ "T2_EE_Estonia" ] },
{"Domain": "hep.uprm.edu", "RSEs": [ "T3_US_PuertoRico" ] },
{"Domain": "hep.wisc.edu", "RSEs": [ "T2_US_Wisconsin" ] },
{"Domain": "hepgrid.uerj.br", "RSEs": [ "T2_BR_UERJ" ] },
{"Domain": "ifca.es", "RSEs": [ "T2_ES_IFCA" ] },
{"Domain": "ihep.ac.cn", "RSEs": [ "T2_CN_Beijing" ] },
{"Domain": "iihe.ac.be", "RSEs": [ "T2_BE_IIHE" ] },
{"Domain": "in2p3.fr", "RSEs": [
"T1_FR_CCIN2P3_Disk",
"T2_FR_GRIF_LLR",
"T2_FR_IPHC"
] },
{"Domain": "indiacms.res.in", "RSEs": [ "T2_IN_TIFR" ] },
{"Domain": "inr.troitsk.ru", "RSEs": [ "T2_RU_INR" ] },
{"Domain": "itep.ru", "RSEs": [ "T2_RU_ITEP" ] },
{"Domain": "jinr-t1.ru", "RSEs": [ "T1_RU_JINR_Disk" ] },
{"Domain": "jinr.ru", "RSEs": [ "T2_RU_JINR" ] },
{"Domain": "kfki.hu", "RSEs": [ "T2_HU_Budapest" ] },
{"Domain": "kipt.kharkov.ua", "RSEs": [ "T2_UA_KIPT" ] },
{"Domain": "knu.ac.kr", "RSEs": [ "T3_KR_KNU" ] },
{"Domain": "lnl.infn.it", "RSEs": [ "T2_IT_Legnaro" ] },
{"Domain": "m45.ihep.su", "RSEs": [ "T2_RU_IHEP" ] },
{"Domain": "ncg.ingrid.pt", "RSEs": [ "T2_PT_NCG_Lisbon" ] },
{"Domain": "physics.uoi.gr", "RSEs": [ "T2_GR_Ioannina" ] },
{"Domain": "physik.rwth-aachen.de", "RSEs": [ "T2_DE_RWTH" ] },
{"Domain": "pi.infn.it", "RSEs": [ "T2_IT_Pisa" ] },
{"Domain": "pic.es", "RSEs": [ "T1_ES_PIC_Disk" ] },
{"Domain": "pp.rl.ac.uk", "RSEs": [ "T2_UK_SGrid_RALPP" ] },
{"Domain": "psi.ch", "RSEs": [ "T3_CH_PSI" ] },
{"Domain": "rc.ufl.edu", "RSEs": [ "T2_US_Florida" ] },
{"Domain": "rcac.purdue.edu", "RSEs": [ "T2_US_Purdue" ] },
{"Domain": "recas.ba.infn.it", "RSEs": [ "T2_IT_Bari" ] },
{"Domain": "roma1.infn.it", "RSEs": [ "T2_IT_Rome" ] },
{"Domain": "rutgers.edu", "RSEs": [ "T3_US_Rutgers" ] },
{"Domain": "sdfarm.kr", "RSEs": [ "T2_KR_KISTI" ] },
{"Domain": "sprace.org.br", "RSEs": [ "T2_BR_SPRACE" ] },
{"Domain": "t2.ucsd.edu", "RSEs": [ "T2_US_UCSD" ] },
{"Domain": "ultralight.org", "RSEs": [ "T2_US_Caltech" ] },
{"Domain": "unl.edu", "RSEs": [ "T2_US_Nebraska" ] }
]
256 changes: 256 additions & 0 deletions stompserver/cmsswpopTracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package main

// cmsswpopTracer - Is one of the three RucioTracer. It handles data from
// CMSSWPOP: /topic/cms.swpop
// Process it, then produce a Ruci trace message and then it to topic:
// /topic/cms.rucio.tracer
//
// Authors: Yuyi Guo
// Created: June 2021

import (
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"sync/atomic"
"time"

// load-balanced stomp manager

// stomp library
"github.com/go-stomp/stomp"
)

// SWPOPRecord defines CMSSW POP record structure.
type SWPOPRecord struct {
SiteName string `json:"site_name"`
Usrdn string `json:"user_dn"`
ClientHost string `json:"client_host"`
ClientDomain string `json:"client_domain"`
ServerHost string `json:"server_host"`
ServerDomain string `json:"server_domain"`
Lfn string `json:"file_lfn"`
JobType string `json:"app_info"`
Ts int64 `json:"start_time"`
}

// DomainRSE: Define the struct of Domain and RSE map.
type DomainRSE struct {
Domain string
RSEs []string
}

// Define the domain and RSE map file
var domainRSEMap []DomainRSE

// Receivedperk_swpop keeps number of messages per 1k
var Receivedperk_swpop uint64

// swpopConsumer consumes for cmssw pop topic
func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64, string, error) {
//first to check to make sure there is something in msg,
//otherwise we will get error:
//
Received_swpop.Inc()
atomic.AddUint64(&Receivedperk_swpop, 1)
if msg == nil || msg.Body == nil {
return "", nil, "", "", 0, "", errors.New("Empty message")
}
//
if Config.Verbose > 2 {
log.Println("*****************Source AMQ message of swpop*********************")
log.Println("\n" + string(msg.Body))
log.Println("*******************End AMQ message of swpop**********************")
}

var rec SWPOPRecord
err := json.Unmarshal(msg.Body, &rec)
if err != nil {
log.Printf("Enable to Unmarchal input message. Error: %v", err)
return "", nil, "", "", 0, "", err
}
if Config.Verbose > 2 {
log.Println(" ******Parsed swpop record******")
log.Println("\n", rec)
log.Println("******End parsed swpop record******")
}
// process received message, e.g. extract some fields
var lfn string
var sitename []string
var usrdn string
var ts int64
var jobtype string
var wnname string
// Check the data
if len(rec.Lfn) > 0 {
lfn = rec.Lfn
} else {
return "", nil, "", "", 0, "", errors.New("No Lfn found")
}
if strings.ToLower(rec.ServerDomain) == "unknown" || len(rec.ServerDomain) <= 0 {
if len(rec.SiteName) == 0 {
return "", nil, "", "", 0, "", errors.New("No RSEs found")
} else {
sitename = append(sitename, rec.SiteName)
}
} else {
sitename = findRSEs(rec.ServerDomain)
}
if len(sitename) <= 0 {
return "", nil, "", "", 0, "", errors.New("No RSEs' map found")
}
//
if rec.Ts == 0 {
ts = time.Now().Unix()
} else {
ts = rec.Ts
}
//

if len(rec.JobType) > 0 {
jobtype = rec.JobType
} else {
jobtype = "unknow"
}
//
if len(rec.ClientDomain) > 0 {
wnname = rec.ClientDomain
} else {
wnname = "unknow"
}
if len(rec.ClientHost) > 0 {
wnname = rec.ClientHost + "@" + wnname
} else {
wnname = "unknow" + "@" + wnname
}
//
if len(rec.Usrdn) > 0 {
usrdn = rec.Usrdn
} else {
usrdn = ""
}
return lfn, sitename, usrdn, jobtype, ts, wnname, nil
}

// swpopTrace makes swpop trace and send it to rucio endpoint
func swpopTrace(msg *stomp.Message) ([]string, error) {
var dids []string
//get trace data
lfn, sitename, usrdn, jobtype, ts, wnname, err := swpopConsumer(msg)
if err != nil {
log.Println("Bad swpop message.")
return nil, errors.New("Bad swpop message")
}
for _, s := range sitename {
trc := NewTrace(lfn, s, ts, jobtype, wnname, "swpop", usrdn)
data, err := json.Marshal(trc)
if err != nil {
if Config.Verbose > 1 {
log.Printf("Unable to marshal back to JSON string , error: %v, data: %v\n", err, trc)
} else {
log.Printf("Unable to marshal back to JSON string, error: %v \n", err)
}
dids = append(dids, fmt.Sprintf("%v", trc.DID))
}
if Config.Verbose > 2 {
log.Println("********* Rucio trace record ***************")
log.Println("\n" + string(data))
log.Println("******** Done Rucio trace record *************")
}
// send data to Stomp endpoint
if Config.EndpointProducer != "" {
err := stompMgr.Send(data, stomp.SendOpt.Header("appversion", "swpopAMQ"))
if err != nil {
dids = append(dids, fmt.Sprintf("%v", trc.DID))
log.Printf("Failed to send %s to stomp.", trc.DID)
} else {
Send_swpop.Inc()
}
} else {
log.Fatalln("*** Config.Enpoint is empty, check config file! ***")
}
}
return dids, nil
}

// server gets messages from consumer AMQ end pointer, make tracers and send to AMQ producer end point.
func swpopServer() {
log.Println("Stomp broker URL: ", Config.StompURIConsumer)
// get connection
sub, err := subscribe(Config.EndpointConsumer, Config.StompURIConsumer)
if err != nil {
log.Println(err)
}
//
err2 := parseRSEMap(fsitemap)
if err2 != nil {
log.Fatalln("Unable to parse rucio doamin RSE map file %s, error: %v", fsitemap, err2)
}

var tc uint64
t1 := time.Now().Unix()
var t2 int64
var ts uint64

for {
// check first if subscription is still valid, otherwise get a new one
if sub == nil {
time.Sleep(time.Duration(Config.Interval) * time.Second)
sub, err = subscribe(Config.EndpointConsumer, Config.StompURIConsumer)
if err != nil {
log.Println("unable to get new subscription", err)
continue
}
}
// get stomp messages from subscriber channel
select {
case msg := <-sub.C:
if msg.Err != nil {
log.Println("receive error message", msg.Err)
sub, err = subscribe(Config.EndpointConsumer, Config.StompURIConsumer)
if err != nil {
log.Println("unable to subscribe to", Config.EndpointConsumer, err)
}
break
}
// process stomp messages
dids, err := swpopTrace(msg)
if err == nil {
Traces_swpop.Inc()
atomic.AddUint64(&tc, 1)
if Config.Verbose > 1 {
log.Println("The number of traces processed in 1000 group: ", atomic.LoadUint64(&tc))
}
}

if atomic.LoadUint64(&tc) == 1000 {
atomic.StoreUint64(&tc, 0)
t2 = time.Now().Unix() - t1
t1 = time.Now().Unix()
log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_swpop))
log.Printf("Processing 1000 messages took %d seconds.\n", t2)
atomic.StoreUint64(&Receivedperk_swpop, 0)
}
if err != nil && err.Error() != "Empty message" {
log.Println("SWPOP message processing error", err)
}
//got error message "SWPOP message processing error unexpected end of JSON input".
//Code stoped to loop??? YG 6/22/2021
if len(dids) > 0 {
log.Printf("DIDS in Error: %v .\n ", dids)
}
default:
sleep := time.Duration(Config.Interval) * time.Millisecond
if atomic.LoadUint64(&ts) == 10000 {
atomic.StoreUint64(&ts, 0)
if Config.Verbose > 3 {
log.Println("waiting for 10000x", sleep)
}
}
time.Sleep(sleep)
atomic.AddUint64(&ts, 1)
}
}
}
16 changes: 9 additions & 7 deletions stompserver/endpointsConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ type Configuration struct {
Verbose int `json:"verbose"`
// Port defines http server port number for monitoring metrics.
Port int `json:"port"`
// StompURLTest defines StompAMQ URI testbed for consumer and Producer.
StompURIProducer string `json:"stompURIProducer"`
// StompURL defines StompAMQ URI for consumer and Producer.
StompURI string `json:"stompURI"`
StompURIConsumer string `json:"stompURIConsumer"`
// StompLogin defines StompAQM login name.
StompLogin string `json:"stompLogin"`
// StompPassword defines StompAQM password.
Expand Down Expand Up @@ -85,9 +87,9 @@ func parseConfig(configFile string) error {

//
// initStomp is a function to initialize a stomp object of endpointProducer.
func initStomp(endpoint string) *lbstomp.StompManager {
func initStomp(endpoint string, stompURI string) *lbstomp.StompManager {
p := lbstomp.Config{
URI: Config.StompURI,
URI: stompURI,
Login: Config.StompLogin,
Password: Config.StompPassword,
Iterations: Config.StompIterations,
Expand All @@ -105,20 +107,20 @@ func initStomp(endpoint string) *lbstomp.StompManager {
}

// subscribe is a helper function to subscribe to StompAMQ end-point as a listener.
func subscribe(endpoint string) (*stomp.Subscription, error) {
smgr := initStomp(endpoint)
func subscribe(endpoint string, stompURI string) (*stomp.Subscription, error) {
smgr := initStomp(endpoint, stompURI)
// get connection
conn, addr, err := smgr.GetConnection()
if err != nil {
return nil, err
}
log.Println("stomp connection", conn, addr)
log.Println("\n stomp connection", conn, addr)
// subscribe to ActiveMQ topic
sub, err := conn.Subscribe(endpoint, stomp.AckAuto)
if err != nil {
log.Println("unable to subscribe to", endpoint, err)
return nil, err
}
log.Println("stomp subscription", sub)
log.Println("\n stomp subscription", sub)
return sub, err
}
Loading

0 comments on commit 0aeb937

Please sign in to comment.