From 0aeb937b19a070c6d861ed6961c57d6fb51bd2fc Mon Sep 17 00:00:00 2001 From: yuyi Date: Fri, 2 Jul 2021 11:46:17 -0500 Subject: [PATCH] create tracer server for cmsswpop and update configs and so on. --- etc/domainsitemap.txt | 67 +++++++++ stompserver/cmsswpopTracer.go | 256 +++++++++++++++++++++++++++++++++ stompserver/endpointsConfig.go | 16 ++- stompserver/fwjrTracer.go | 44 +++--- stompserver/stompserver.go | 37 +++-- stompserver/traceDef.go | 15 +- stompserver/utils.go | 34 ++++- 7 files changed, 430 insertions(+), 39 deletions(-) create mode 100644 etc/domainsitemap.txt create mode 100644 stompserver/cmsswpopTracer.go diff --git a/etc/domainsitemap.txt b/etc/domainsitemap.txt new file mode 100644 index 0000000..ba3f041 --- /dev/null +++ b/etc/domainsitemap.txt @@ -0,0 +1,67 @@ +[ + {"Domain": "accre.vanderbilt.edu", "RSEs": [ "T2_US_Vanderbilt" ] }, + {"Domain": "acrc.bris.ac.uk", "RSEs": [ "T2_UK_SGrid_Bristol" ] }, + {"Domain": "phy.bris.ac.uk", "RSEs": [ "T2_UK_SGrid_Bristol" ] }, + {"Domain": "baylor.edu", "RSEs": [ "T3_US_Baylor" ] }, + {"Domain": "brunel.ac.uk", "RSEs": [ "T2_UK_London_Brunel" ] }, + {"Domain": "cern.ch", "RSEs": [ "T2_CH_CERN" ] }, + {"Domain": "ciemat.es", "RSEs": [ "T2_ES_CIEMAT" ] }, + {"Domain": "cis.gov.pl", "RSEs": [ "T2_PL_Swierk" ] }, + {"Domain": "cism.ucl.ac.be", "RSEs": [ "T2_BE_UCL" ] }, + {"Domain": "cmsaf.mit.edu", "RSEs": [ "T2_US_MIT" ] }, + {"Domain": "colorado.edu", "RSEs": [ "T3_US_Colorado" ] }, + {"Domain": "cr.cnaf.infn.it", "RSEs": [ "T1_IT_CNAF_Disk" ] }, + {"Domain": "csc.fi", "RSEs": [ "T2_FI_HIP" ] }, + {"Domain": "cscs.ch", "RSEs": [ "T2_CH_CSCS" ] }, + {"Domain": "lcg.cscs.ch", "RSEs": [ "T2_CH_CSCS" ] }, + {"Domain": "datagrid.cea.fr", "RSEs": [ "T2_FR_GRIF_IRFU" ] }, + {"Domain": "desy.de", "RSEs": [ "T2_DE_DESY" ] }, + {"Domain": "echo.stfc.ac.uk", "RSEs": [ "T1_UK_RAL_Disk" ] }, + {"Domain": "eos.grid.vbc.ac.at", "RSEs": [ "T2_AT_Vienna" ] }, + {"Domain": "grid.vbc.ac.at", "RSEs": [ "T2_AT_Vienna" ] }, + {"Domain": "fnal.gov", "RSEs": [ "T1_US_FNAL_Disk" ] }, + {"Domain": "grid.hep.ph.ic.ac.uk", "RSEs": [ "T2_UK_London_IC" ] }, + {"Domain": "grid.metu.edu.tr", "RSEs": [ "T2_TR_METU" ] }, + {"Domain": "grid.nchc.org.tw", "RSEs": [ "T2_TW_NCHC" ] }, + {"Domain": "gridka.de", "RSEs": [ "T1_DE_KIT_Disk" ] }, + {"Domain": "gridpp.rl.ac.uk", "RSEs": [ "T1_UK_RAL_Disk" ] }, + {"Domain": "hep.kbfi.ee", "RSEs": [ "T2_EE_Estonia" ] }, + {"Domain": "hep.uprm.edu", "RSEs": [ "T3_US_PuertoRico" ] }, + {"Domain": "hep.wisc.edu", "RSEs": [ "T2_US_Wisconsin" ] }, + {"Domain": "hepgrid.uerj.br", "RSEs": [ "T2_BR_UERJ" ] }, + {"Domain": "ifca.es", "RSEs": [ "T2_ES_IFCA" ] }, + {"Domain": "ihep.ac.cn", "RSEs": [ "T2_CN_Beijing" ] }, + {"Domain": "iihe.ac.be", "RSEs": [ "T2_BE_IIHE" ] }, + {"Domain": "in2p3.fr", "RSEs": [ + "T1_FR_CCIN2P3_Disk", + "T2_FR_GRIF_LLR", + "T2_FR_IPHC" + ] }, + {"Domain": "indiacms.res.in", "RSEs": [ "T2_IN_TIFR" ] }, + {"Domain": "inr.troitsk.ru", "RSEs": [ "T2_RU_INR" ] }, + {"Domain": "itep.ru", "RSEs": [ "T2_RU_ITEP" ] }, + {"Domain": "jinr-t1.ru", "RSEs": [ "T1_RU_JINR_Disk" ] }, + {"Domain": "jinr.ru", "RSEs": [ "T2_RU_JINR" ] }, + {"Domain": "kfki.hu", "RSEs": [ "T2_HU_Budapest" ] }, + {"Domain": "kipt.kharkov.ua", "RSEs": [ "T2_UA_KIPT" ] }, + {"Domain": "knu.ac.kr", "RSEs": [ "T3_KR_KNU" ] }, + {"Domain": "lnl.infn.it", "RSEs": [ "T2_IT_Legnaro" ] }, + {"Domain": "m45.ihep.su", "RSEs": [ "T2_RU_IHEP" ] }, + {"Domain": "ncg.ingrid.pt", "RSEs": [ "T2_PT_NCG_Lisbon" ] }, + {"Domain": "physics.uoi.gr", "RSEs": [ "T2_GR_Ioannina" ] }, + {"Domain": "physik.rwth-aachen.de", "RSEs": [ "T2_DE_RWTH" ] }, + {"Domain": "pi.infn.it", "RSEs": [ "T2_IT_Pisa" ] }, + {"Domain": "pic.es", "RSEs": [ "T1_ES_PIC_Disk" ] }, + {"Domain": "pp.rl.ac.uk", "RSEs": [ "T2_UK_SGrid_RALPP" ] }, + {"Domain": "psi.ch", "RSEs": [ "T3_CH_PSI" ] }, + {"Domain": "rc.ufl.edu", "RSEs": [ "T2_US_Florida" ] }, + {"Domain": "rcac.purdue.edu", "RSEs": [ "T2_US_Purdue" ] }, + {"Domain": "recas.ba.infn.it", "RSEs": [ "T2_IT_Bari" ] }, + {"Domain": "roma1.infn.it", "RSEs": [ "T2_IT_Rome" ] }, + {"Domain": "rutgers.edu", "RSEs": [ "T3_US_Rutgers" ] }, + {"Domain": "sdfarm.kr", "RSEs": [ "T2_KR_KISTI" ] }, + {"Domain": "sprace.org.br", "RSEs": [ "T2_BR_SPRACE" ] }, + {"Domain": "t2.ucsd.edu", "RSEs": [ "T2_US_UCSD" ] }, + {"Domain": "ultralight.org", "RSEs": [ "T2_US_Caltech" ] }, + {"Domain": "unl.edu", "RSEs": [ "T2_US_Nebraska" ] } +] diff --git a/stompserver/cmsswpopTracer.go b/stompserver/cmsswpopTracer.go new file mode 100644 index 0000000..dd214f2 --- /dev/null +++ b/stompserver/cmsswpopTracer.go @@ -0,0 +1,256 @@ +package main + +// cmsswpopTracer - Is one of the three RucioTracer. It handles data from +// CMSSWPOP: /topic/cms.swpop +// Process it, then produce a Ruci trace message and then it to topic: +// /topic/cms.rucio.tracer +// +// Authors: Yuyi Guo +// Created: June 2021 + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "strings" + "sync/atomic" + "time" + + // load-balanced stomp manager + + // stomp library + "github.com/go-stomp/stomp" +) + +// SWPOPRecord defines CMSSW POP record structure. +type SWPOPRecord struct { + SiteName string `json:"site_name"` + Usrdn string `json:"user_dn"` + ClientHost string `json:"client_host"` + ClientDomain string `json:"client_domain"` + ServerHost string `json:"server_host"` + ServerDomain string `json:"server_domain"` + Lfn string `json:"file_lfn"` + JobType string `json:"app_info"` + Ts int64 `json:"start_time"` +} + +// DomainRSE: Define the struct of Domain and RSE map. +type DomainRSE struct { + Domain string + RSEs []string +} + +// Define the domain and RSE map file +var domainRSEMap []DomainRSE + +// Receivedperk_swpop keeps number of messages per 1k +var Receivedperk_swpop uint64 + +// swpopConsumer consumes for cmssw pop topic +func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64, string, error) { + //first to check to make sure there is something in msg, + //otherwise we will get error: + // + Received_swpop.Inc() + atomic.AddUint64(&Receivedperk_swpop, 1) + if msg == nil || msg.Body == nil { + return "", nil, "", "", 0, "", errors.New("Empty message") + } + // + if Config.Verbose > 2 { + log.Println("*****************Source AMQ message of swpop*********************") + log.Println("\n" + string(msg.Body)) + log.Println("*******************End AMQ message of swpop**********************") + } + + var rec SWPOPRecord + err := json.Unmarshal(msg.Body, &rec) + if err != nil { + log.Printf("Enable to Unmarchal input message. Error: %v", err) + return "", nil, "", "", 0, "", err + } + if Config.Verbose > 2 { + log.Println(" ******Parsed swpop record******") + log.Println("\n", rec) + log.Println("******End parsed swpop record******") + } + // process received message, e.g. extract some fields + var lfn string + var sitename []string + var usrdn string + var ts int64 + var jobtype string + var wnname string + // Check the data + if len(rec.Lfn) > 0 { + lfn = rec.Lfn + } else { + return "", nil, "", "", 0, "", errors.New("No Lfn found") + } + if strings.ToLower(rec.ServerDomain) == "unknown" || len(rec.ServerDomain) <= 0 { + if len(rec.SiteName) == 0 { + return "", nil, "", "", 0, "", errors.New("No RSEs found") + } else { + sitename = append(sitename, rec.SiteName) + } + } else { + sitename = findRSEs(rec.ServerDomain) + } + if len(sitename) <= 0 { + return "", nil, "", "", 0, "", errors.New("No RSEs' map found") + } + // + if rec.Ts == 0 { + ts = time.Now().Unix() + } else { + ts = rec.Ts + } + // + + if len(rec.JobType) > 0 { + jobtype = rec.JobType + } else { + jobtype = "unknow" + } + // + if len(rec.ClientDomain) > 0 { + wnname = rec.ClientDomain + } else { + wnname = "unknow" + } + if len(rec.ClientHost) > 0 { + wnname = rec.ClientHost + "@" + wnname + } else { + wnname = "unknow" + "@" + wnname + } + // + if len(rec.Usrdn) > 0 { + usrdn = rec.Usrdn + } else { + usrdn = "" + } + return lfn, sitename, usrdn, jobtype, ts, wnname, nil +} + +// swpopTrace makes swpop trace and send it to rucio endpoint +func swpopTrace(msg *stomp.Message) ([]string, error) { + var dids []string + //get trace data + lfn, sitename, usrdn, jobtype, ts, wnname, err := swpopConsumer(msg) + if err != nil { + log.Println("Bad swpop message.") + return nil, errors.New("Bad swpop message") + } + for _, s := range sitename { + trc := NewTrace(lfn, s, ts, jobtype, wnname, "swpop", usrdn) + data, err := json.Marshal(trc) + if err != nil { + if Config.Verbose > 1 { + log.Printf("Unable to marshal back to JSON string , error: %v, data: %v\n", err, trc) + } else { + log.Printf("Unable to marshal back to JSON string, error: %v \n", err) + } + dids = append(dids, fmt.Sprintf("%v", trc.DID)) + } + if Config.Verbose > 2 { + log.Println("********* Rucio trace record ***************") + log.Println("\n" + string(data)) + log.Println("******** Done Rucio trace record *************") + } + // send data to Stomp endpoint + if Config.EndpointProducer != "" { + err := stompMgr.Send(data, stomp.SendOpt.Header("appversion", "swpopAMQ")) + if err != nil { + dids = append(dids, fmt.Sprintf("%v", trc.DID)) + log.Printf("Failed to send %s to stomp.", trc.DID) + } else { + Send_swpop.Inc() + } + } else { + log.Fatalln("*** Config.Enpoint is empty, check config file! ***") + } + } + return dids, nil +} + +// server gets messages from consumer AMQ end pointer, make tracers and send to AMQ producer end point. +func swpopServer() { + log.Println("Stomp broker URL: ", Config.StompURIConsumer) + // get connection + sub, err := subscribe(Config.EndpointConsumer, Config.StompURIConsumer) + if err != nil { + log.Println(err) + } + // + err2 := parseRSEMap(fsitemap) + if err2 != nil { + log.Fatalln("Unable to parse rucio doamin RSE map file %s, error: %v", fsitemap, err2) + } + + var tc uint64 + t1 := time.Now().Unix() + var t2 int64 + var ts uint64 + + for { + // check first if subscription is still valid, otherwise get a new one + if sub == nil { + time.Sleep(time.Duration(Config.Interval) * time.Second) + sub, err = subscribe(Config.EndpointConsumer, Config.StompURIConsumer) + if err != nil { + log.Println("unable to get new subscription", err) + continue + } + } + // get stomp messages from subscriber channel + select { + case msg := <-sub.C: + if msg.Err != nil { + log.Println("receive error message", msg.Err) + sub, err = subscribe(Config.EndpointConsumer, Config.StompURIConsumer) + if err != nil { + log.Println("unable to subscribe to", Config.EndpointConsumer, err) + } + break + } + // process stomp messages + dids, err := swpopTrace(msg) + if err == nil { + Traces_swpop.Inc() + atomic.AddUint64(&tc, 1) + if Config.Verbose > 1 { + log.Println("The number of traces processed in 1000 group: ", atomic.LoadUint64(&tc)) + } + } + + if atomic.LoadUint64(&tc) == 1000 { + atomic.StoreUint64(&tc, 0) + t2 = time.Now().Unix() - t1 + t1 = time.Now().Unix() + log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_swpop)) + log.Printf("Processing 1000 messages took %d seconds.\n", t2) + atomic.StoreUint64(&Receivedperk_swpop, 0) + } + if err != nil && err.Error() != "Empty message" { + log.Println("SWPOP message processing error", err) + } + //got error message "SWPOP message processing error unexpected end of JSON input". + //Code stoped to loop??? YG 6/22/2021 + if len(dids) > 0 { + log.Printf("DIDS in Error: %v .\n ", dids) + } + default: + sleep := time.Duration(Config.Interval) * time.Millisecond + if atomic.LoadUint64(&ts) == 10000 { + atomic.StoreUint64(&ts, 0) + if Config.Verbose > 3 { + log.Println("waiting for 10000x", sleep) + } + } + time.Sleep(sleep) + atomic.AddUint64(&ts, 1) + } + } +} diff --git a/stompserver/endpointsConfig.go b/stompserver/endpointsConfig.go index c61356a..2999436 100644 --- a/stompserver/endpointsConfig.go +++ b/stompserver/endpointsConfig.go @@ -23,8 +23,10 @@ type Configuration struct { Verbose int `json:"verbose"` // Port defines http server port number for monitoring metrics. Port int `json:"port"` + // StompURLTest defines StompAMQ URI testbed for consumer and Producer. + StompURIProducer string `json:"stompURIProducer"` // StompURL defines StompAMQ URI for consumer and Producer. - StompURI string `json:"stompURI"` + StompURIConsumer string `json:"stompURIConsumer"` // StompLogin defines StompAQM login name. StompLogin string `json:"stompLogin"` // StompPassword defines StompAQM password. @@ -85,9 +87,9 @@ func parseConfig(configFile string) error { // // initStomp is a function to initialize a stomp object of endpointProducer. -func initStomp(endpoint string) *lbstomp.StompManager { +func initStomp(endpoint string, stompURI string) *lbstomp.StompManager { p := lbstomp.Config{ - URI: Config.StompURI, + URI: stompURI, Login: Config.StompLogin, Password: Config.StompPassword, Iterations: Config.StompIterations, @@ -105,20 +107,20 @@ func initStomp(endpoint string) *lbstomp.StompManager { } // subscribe is a helper function to subscribe to StompAMQ end-point as a listener. -func subscribe(endpoint string) (*stomp.Subscription, error) { - smgr := initStomp(endpoint) +func subscribe(endpoint string, stompURI string) (*stomp.Subscription, error) { + smgr := initStomp(endpoint, stompURI) // get connection conn, addr, err := smgr.GetConnection() if err != nil { return nil, err } - log.Println("stomp connection", conn, addr) + log.Println("\n stomp connection", conn, addr) // subscribe to ActiveMQ topic sub, err := conn.Subscribe(endpoint, stomp.AckAuto) if err != nil { log.Println("unable to subscribe to", endpoint, err) return nil, err } - log.Println("stomp subscription", sub) + log.Println("\n stomp subscription", sub) return sub, err } diff --git a/stompserver/fwjrTracer.go b/stompserver/fwjrTracer.go index 29cab67..a24d43f 100644 --- a/stompserver/fwjrTracer.go +++ b/stompserver/fwjrTracer.go @@ -17,13 +17,13 @@ import ( "time" // load-balanced stomp manager - lbstomp "github.com/vkuznet/lb-stomp" + // stomp library "github.com/go-stomp/stomp" ) -// sitemap defines maps between the names from the data message and the name Ruci server has. -var sitemap map[string]string +// Sitemap defines maps between the names from the data message and the name Ruci server has. +var Sitemap map[string]string // Lfnsite for the map of lfn and site type Lfnsite struct { @@ -64,7 +64,7 @@ type FWJRRecord struct { } // stompMgr defines the stomp manager for the producer. -var stompMgr *lbstomp.StompManager +//var stompMgr *lbstomp.StompManager // FWJRconsumer Consumes for FWJR/WMArchive topic func FWJRconsumer(msg *stomp.Message) ([]Lfnsite, int64, string, string, error) { @@ -83,7 +83,7 @@ func FWJRconsumer(msg *stomp.Message) ([]Lfnsite, int64, string, string, error) // if Config.Verbose > 2 { log.Println("*****************Source AMQ message of wmarchive*********************") - log.Println("Source AMQ message of wmarchive: ", string(msg.Body)) + log.Println("\n", string(msg.Body)) log.Println("*******************End AMQ message of wmarchive**********************") } @@ -94,7 +94,9 @@ func FWJRconsumer(msg *stomp.Message) ([]Lfnsite, int64, string, string, error) return lfnsite, 0, "", "", err } if Config.Verbose > 2 { - log.Printf("******PARSED FWJR record******: %+v", rec) + log.Println("******Parsed FWJR record****** ") + log.Printf("\n %v", rec) + log.Println(" ") } // process received message, e.g. extract some fields var ts int64 @@ -156,11 +158,11 @@ func FWJRtrace(msg *stomp.Message) ([]string, error) { goodlfn := ls.lfn site := ls.site if len(goodlfn) > 0 && len(site) > 0 { - if s, ok := sitemap[site]; ok { + if s, ok := Sitemap[site]; ok { site = s } for _, glfn := range goodlfn { - trc := NewTrace(glfn, site, ts, jobtype, wnname) + trc := NewTrace(glfn, site, ts, jobtype, wnname, "fwjr", "unknown") data, err := json.Marshal(trc) if err != nil { if Config.Verbose > 0 { @@ -173,7 +175,7 @@ func FWJRtrace(msg *stomp.Message) ([]string, error) { } if Config.Verbose > 2 { log.Println("********* Rucio trace record ***************") - log.Println("Rucio trace record: ", string(data)) + log.Println("\n", string(data)) log.Println("******** Done Rucio trace record *************") } // send data to Stomp endpoint @@ -197,24 +199,28 @@ func FWJRtrace(msg *stomp.Message) ([]string, error) { // server gets messages from consumer AMQ end pointer, make tracers and send to AMQ producer end point. func fwjrServer() { - log.Println("Stomp broker URL: ", Config.StompURI) + log.Println("Stomp broker URL: ", Config.StompURIConsumer) // get connection - sub, err := subscribe(Config.EndpointConsumer) + sub, err := subscribe(Config.EndpointConsumer, Config.StompURIConsumer) if err != nil { - log.Fatal(err) + log.Println(err) } - stompMgr = initStomp(Config.EndpointProducer) + err2 := parseSitemap(fsitemap) + if err2 != nil { + log.Fatalf("Unable to parse rucio sitemap file %s, error: %v", fsitemap, err2) + } var tc uint64 t1 := time.Now().Unix() var t2 int64 + var ts uint64 for { // check first if subscription is still valid, otherwise get a new one if sub == nil { time.Sleep(time.Duration(Config.Interval) * time.Second) - sub, err = subscribe(Config.EndpointConsumer) + sub, err = subscribe(Config.EndpointConsumer, Config.StompURIConsumer) if err != nil { log.Println("unable to get new subscription", err) continue @@ -225,7 +231,7 @@ func fwjrServer() { case msg := <-sub.C: if msg.Err != nil { log.Println("receive error message", msg.Err) - sub, err = subscribe(Config.EndpointConsumer) + sub, err = subscribe(Config.EndpointConsumer, Config.StompURIConsumer) if err != nil { log.Println("unable to subscribe to", Config.EndpointConsumer, err) } @@ -259,10 +265,14 @@ func fwjrServer() { } default: sleep := time.Duration(Config.Interval) * time.Millisecond - if Config.Verbose > 3 { - log.Println("waiting for ", sleep) + if atomic.LoadUint64(&ts) == 10000 { + atomic.StoreUint64(&ts, 0) + if Config.Verbose > 3 { + log.Println("waiting for x10000", sleep) + } } time.Sleep(sleep) + atomic.AddUint64(&ts, 1) } } } diff --git a/stompserver/stompserver.go b/stompserver/stompserver.go index 15be304..2f35160 100644 --- a/stompserver/stompserver.go +++ b/stompserver/stompserver.go @@ -16,7 +16,7 @@ import ( "net/http" // load-balanced stomp manager - // lbstomp "github.com/vkuznet/lb-stomp" + lbstomp "github.com/vkuznet/lb-stomp" // stomp library // prometheus apis @@ -39,10 +39,23 @@ var ( Name: "rucio_tracer_fwjr_traces", Help: "The number of traces messages", }) + + Received_swpop = promauto.NewCounter(prometheus.CounterOpts{ + Name: "rucio_tracer_swpop_received", + Help: "The number of received messages of swpop", + }) + Send_swpop = promauto.NewCounter(prometheus.CounterOpts{ + Name: "rucio_tracer_swpop_send", + Help: "The number of send messages od swpop", + }) + Traces_swpop = promauto.NewCounter(prometheus.CounterOpts{ + Name: "rucio_tracer_swpop_traces", + Help: "The number of traces messages os swpop", + }) ) // stompMgr defines the stomp manager for the producer. -//var stompMgr *lbstomp.StompManager +var stompMgr *lbstomp.StompManager // httpServer complementary http server to serve the metrics func httpServer(addr string) { @@ -50,6 +63,9 @@ func httpServer(addr string) { log.Fatal(http.ListenAndServe(addr, nil)) } +// fsitemap map file +var fsitemap string + // func main() { // usage: ./RucioTracer -config stompserverconfig.json -sitemap ../etc/ruciositemap.json @@ -57,27 +73,28 @@ func main() { // use this line to print in logs the filene:lineNumber for each log entry log.SetFlags(log.LstdFlags | log.Lshortfile) var config string - var fsitemap string + //var fsitemap string flag.StringVar(&config, "config", "", "config file name") - flag.StringVar(&fsitemap, "sitemap", "", "runcio sitemap file") + flag.StringVar(&fsitemap, "sitemap", "", "runcio sitemap/domainRSEMap file") flag.Parse() err2 := parseConfig(config) if err2 != nil { log.Fatalf("Unable to parse config file %s, error: %v", config, err2) } - err2 = parseSitemap(fsitemap) - if err2 != nil { - log.Fatalf("Unable to parse rucio sitemap file %s, error: %v", fsitemap, err2) - } if Config.Verbose > 3 { log.Printf("%v", Config) - log.Printf("%v", sitemap) + log.Printf("%v", fsitemap) } - //stompMgr = initStomp(Config.EndpointProducer) + stompMgr = initStomp(Config.EndpointProducer, Config.StompURIProducer) // start HTTP server which can be used for metrics go httpServer(fmt.Sprintf(":%d", Config.Port)) // start AMQ server to handle rucio traces if Config.Producer == "wmarchive" { fwjrServer() + } else if Config.Producer == "cmsswpop" { + swpopServer() + } else { + log.Fatalln("No trace system defined. Check server configuration.") } + } diff --git a/stompserver/traceDef.go b/stompserver/traceDef.go index 29ff810..7a6e0d3 100644 --- a/stompserver/traceDef.go +++ b/stompserver/traceDef.go @@ -40,12 +40,18 @@ type Trace struct { } // NewTrace creates a new instance of Rucio Trace. -func NewTrace(lfn string, site string, ts int64, jobtype string, wnname string) Trace { +func NewTrace(lfn string, site string, ts int64, jobtype string, wnname string, account string, usrdn string) Trace { + if account == "" { + account = "fwjr" + } + if usrdn == "" { + usrdn = "unknown" + } trc := Trace{ - Account: "fwjr", + Account: account, ClientState: "DONE", Filename: lfn, - DID: fmt.Sprintf("cms: %s", lfn), + DID: fmt.Sprintf("cms:%s", lfn), EventType: "get", EventVersion: "API_1.21.6", FileReadts: ts, @@ -53,7 +59,8 @@ func NewTrace(lfn string, site string, ts int64, jobtype string, wnname string) Scope: "cms", Timestamp: ts, TraceTimeentryUnix: ts, - Usrdn: "/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=fwjr/CN=0/CN=fwjr/CN=0", + Usrdn: usrdn, + Jobtype: jobtype, Wnname: wnname, } return trc diff --git a/stompserver/utils.go b/stompserver/utils.go index 79a0246..88db0eb 100644 --- a/stompserver/utils.go +++ b/stompserver/utils.go @@ -8,6 +8,7 @@ import ( "encoding/json" "io/ioutil" "log" + "strings" ) func inslicestr(s []string, v string) bool { @@ -35,10 +36,41 @@ func parseSitemap(mapFile string) error { return err } //log.Println(string(data)) - err = json.Unmarshal(data, &sitemap) + err = json.Unmarshal(data, &Sitemap) if err != nil { log.Println("Unable to parse sitemap", err) return err } return nil } + +//findRSEs: For a give domain, find its RSE list +func findRSEs(domain string) []string { + var RSEs []string + d := strings.ToUpper(strings.TrimSpace(domain)) + if len(d) == 0 { + return RSEs + } + for _, v := range domainRSEMap { + vd := strings.ToUpper(strings.TrimSpace(v.Domain)) + if vd == d || strings.Contains(vd, d) || strings.Contains(d, vd) { + return v.RSEs + } + } + return RSEs +} + +// parseRSEMap: for given srver domain to find out the list of RSEs. +func parseRSEMap(RSEfile string) error { + data, err := ioutil.ReadFile(RSEfile) + if err != nil { + log.Println("Unable to read domainRSEMap.txt file", err) + return err + } + err = json.Unmarshal(data, &domainRSEMap) + if err != nil { + log.Println("Unable to unmarshal domainRSEMap", err) + return err + } + return nil +}