-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #25 from yuyiguo/datasource-cmsswpop
create tracer server for cms swpop and update configs and so on.
- Loading branch information
Showing
7 changed files
with
430 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
[ | ||
{"Domain": "accre.vanderbilt.edu", "RSEs": [ "T2_US_Vanderbilt" ] }, | ||
{"Domain": "acrc.bris.ac.uk", "RSEs": [ "T2_UK_SGrid_Bristol" ] }, | ||
{"Domain": "phy.bris.ac.uk", "RSEs": [ "T2_UK_SGrid_Bristol" ] }, | ||
{"Domain": "baylor.edu", "RSEs": [ "T3_US_Baylor" ] }, | ||
{"Domain": "brunel.ac.uk", "RSEs": [ "T2_UK_London_Brunel" ] }, | ||
{"Domain": "cern.ch", "RSEs": [ "T2_CH_CERN" ] }, | ||
{"Domain": "ciemat.es", "RSEs": [ "T2_ES_CIEMAT" ] }, | ||
{"Domain": "cis.gov.pl", "RSEs": [ "T2_PL_Swierk" ] }, | ||
{"Domain": "cism.ucl.ac.be", "RSEs": [ "T2_BE_UCL" ] }, | ||
{"Domain": "cmsaf.mit.edu", "RSEs": [ "T2_US_MIT" ] }, | ||
{"Domain": "colorado.edu", "RSEs": [ "T3_US_Colorado" ] }, | ||
{"Domain": "cr.cnaf.infn.it", "RSEs": [ "T1_IT_CNAF_Disk" ] }, | ||
{"Domain": "csc.fi", "RSEs": [ "T2_FI_HIP" ] }, | ||
{"Domain": "cscs.ch", "RSEs": [ "T2_CH_CSCS" ] }, | ||
{"Domain": "lcg.cscs.ch", "RSEs": [ "T2_CH_CSCS" ] }, | ||
{"Domain": "datagrid.cea.fr", "RSEs": [ "T2_FR_GRIF_IRFU" ] }, | ||
{"Domain": "desy.de", "RSEs": [ "T2_DE_DESY" ] }, | ||
{"Domain": "echo.stfc.ac.uk", "RSEs": [ "T1_UK_RAL_Disk" ] }, | ||
{"Domain": "eos.grid.vbc.ac.at", "RSEs": [ "T2_AT_Vienna" ] }, | ||
{"Domain": "grid.vbc.ac.at", "RSEs": [ "T2_AT_Vienna" ] }, | ||
{"Domain": "fnal.gov", "RSEs": [ "T1_US_FNAL_Disk" ] }, | ||
{"Domain": "grid.hep.ph.ic.ac.uk", "RSEs": [ "T2_UK_London_IC" ] }, | ||
{"Domain": "grid.metu.edu.tr", "RSEs": [ "T2_TR_METU" ] }, | ||
{"Domain": "grid.nchc.org.tw", "RSEs": [ "T2_TW_NCHC" ] }, | ||
{"Domain": "gridka.de", "RSEs": [ "T1_DE_KIT_Disk" ] }, | ||
{"Domain": "gridpp.rl.ac.uk", "RSEs": [ "T1_UK_RAL_Disk" ] }, | ||
{"Domain": "hep.kbfi.ee", "RSEs": [ "T2_EE_Estonia" ] }, | ||
{"Domain": "hep.uprm.edu", "RSEs": [ "T3_US_PuertoRico" ] }, | ||
{"Domain": "hep.wisc.edu", "RSEs": [ "T2_US_Wisconsin" ] }, | ||
{"Domain": "hepgrid.uerj.br", "RSEs": [ "T2_BR_UERJ" ] }, | ||
{"Domain": "ifca.es", "RSEs": [ "T2_ES_IFCA" ] }, | ||
{"Domain": "ihep.ac.cn", "RSEs": [ "T2_CN_Beijing" ] }, | ||
{"Domain": "iihe.ac.be", "RSEs": [ "T2_BE_IIHE" ] }, | ||
{"Domain": "in2p3.fr", "RSEs": [ | ||
"T1_FR_CCIN2P3_Disk", | ||
"T2_FR_GRIF_LLR", | ||
"T2_FR_IPHC" | ||
] }, | ||
{"Domain": "indiacms.res.in", "RSEs": [ "T2_IN_TIFR" ] }, | ||
{"Domain": "inr.troitsk.ru", "RSEs": [ "T2_RU_INR" ] }, | ||
{"Domain": "itep.ru", "RSEs": [ "T2_RU_ITEP" ] }, | ||
{"Domain": "jinr-t1.ru", "RSEs": [ "T1_RU_JINR_Disk" ] }, | ||
{"Domain": "jinr.ru", "RSEs": [ "T2_RU_JINR" ] }, | ||
{"Domain": "kfki.hu", "RSEs": [ "T2_HU_Budapest" ] }, | ||
{"Domain": "kipt.kharkov.ua", "RSEs": [ "T2_UA_KIPT" ] }, | ||
{"Domain": "knu.ac.kr", "RSEs": [ "T3_KR_KNU" ] }, | ||
{"Domain": "lnl.infn.it", "RSEs": [ "T2_IT_Legnaro" ] }, | ||
{"Domain": "m45.ihep.su", "RSEs": [ "T2_RU_IHEP" ] }, | ||
{"Domain": "ncg.ingrid.pt", "RSEs": [ "T2_PT_NCG_Lisbon" ] }, | ||
{"Domain": "physics.uoi.gr", "RSEs": [ "T2_GR_Ioannina" ] }, | ||
{"Domain": "physik.rwth-aachen.de", "RSEs": [ "T2_DE_RWTH" ] }, | ||
{"Domain": "pi.infn.it", "RSEs": [ "T2_IT_Pisa" ] }, | ||
{"Domain": "pic.es", "RSEs": [ "T1_ES_PIC_Disk" ] }, | ||
{"Domain": "pp.rl.ac.uk", "RSEs": [ "T2_UK_SGrid_RALPP" ] }, | ||
{"Domain": "psi.ch", "RSEs": [ "T3_CH_PSI" ] }, | ||
{"Domain": "rc.ufl.edu", "RSEs": [ "T2_US_Florida" ] }, | ||
{"Domain": "rcac.purdue.edu", "RSEs": [ "T2_US_Purdue" ] }, | ||
{"Domain": "recas.ba.infn.it", "RSEs": [ "T2_IT_Bari" ] }, | ||
{"Domain": "roma1.infn.it", "RSEs": [ "T2_IT_Rome" ] }, | ||
{"Domain": "rutgers.edu", "RSEs": [ "T3_US_Rutgers" ] }, | ||
{"Domain": "sdfarm.kr", "RSEs": [ "T2_KR_KISTI" ] }, | ||
{"Domain": "sprace.org.br", "RSEs": [ "T2_BR_SPRACE" ] }, | ||
{"Domain": "t2.ucsd.edu", "RSEs": [ "T2_US_UCSD" ] }, | ||
{"Domain": "ultralight.org", "RSEs": [ "T2_US_Caltech" ] }, | ||
{"Domain": "unl.edu", "RSEs": [ "T2_US_Nebraska" ] } | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,256 @@ | ||
package main | ||
|
||
// cmsswpopTracer - Is one of the three RucioTracer. It handles data from | ||
// CMSSWPOP: /topic/cms.swpop | ||
// Process it, then produce a Ruci trace message and then it to topic: | ||
// /topic/cms.rucio.tracer | ||
// | ||
// Authors: Yuyi Guo | ||
// Created: June 2021 | ||
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"log" | ||
"strings" | ||
"sync/atomic" | ||
"time" | ||
|
||
// load-balanced stomp manager | ||
|
||
// stomp library | ||
"github.com/go-stomp/stomp" | ||
) | ||
|
||
// SWPOPRecord defines CMSSW POP record structure. | ||
type SWPOPRecord struct { | ||
SiteName string `json:"site_name"` | ||
Usrdn string `json:"user_dn"` | ||
ClientHost string `json:"client_host"` | ||
ClientDomain string `json:"client_domain"` | ||
ServerHost string `json:"server_host"` | ||
ServerDomain string `json:"server_domain"` | ||
Lfn string `json:"file_lfn"` | ||
JobType string `json:"app_info"` | ||
Ts int64 `json:"start_time"` | ||
} | ||
|
||
// DomainRSE: Define the struct of Domain and RSE map. | ||
type DomainRSE struct { | ||
Domain string | ||
RSEs []string | ||
} | ||
|
||
// Define the domain and RSE map file | ||
var domainRSEMap []DomainRSE | ||
|
||
// Receivedperk_swpop keeps number of messages per 1k | ||
var Receivedperk_swpop uint64 | ||
|
||
// swpopConsumer consumes for cmssw pop topic | ||
func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64, string, error) { | ||
//first to check to make sure there is something in msg, | ||
//otherwise we will get error: | ||
// | ||
Received_swpop.Inc() | ||
atomic.AddUint64(&Receivedperk_swpop, 1) | ||
if msg == nil || msg.Body == nil { | ||
return "", nil, "", "", 0, "", errors.New("Empty message") | ||
} | ||
// | ||
if Config.Verbose > 2 { | ||
log.Println("*****************Source AMQ message of swpop*********************") | ||
log.Println("\n" + string(msg.Body)) | ||
log.Println("*******************End AMQ message of swpop**********************") | ||
} | ||
|
||
var rec SWPOPRecord | ||
err := json.Unmarshal(msg.Body, &rec) | ||
if err != nil { | ||
log.Printf("Enable to Unmarchal input message. Error: %v", err) | ||
return "", nil, "", "", 0, "", err | ||
} | ||
if Config.Verbose > 2 { | ||
log.Println(" ******Parsed swpop record******") | ||
log.Println("\n", rec) | ||
log.Println("******End parsed swpop record******") | ||
} | ||
// process received message, e.g. extract some fields | ||
var lfn string | ||
var sitename []string | ||
var usrdn string | ||
var ts int64 | ||
var jobtype string | ||
var wnname string | ||
// Check the data | ||
if len(rec.Lfn) > 0 { | ||
lfn = rec.Lfn | ||
} else { | ||
return "", nil, "", "", 0, "", errors.New("No Lfn found") | ||
} | ||
if strings.ToLower(rec.ServerDomain) == "unknown" || len(rec.ServerDomain) <= 0 { | ||
if len(rec.SiteName) == 0 { | ||
return "", nil, "", "", 0, "", errors.New("No RSEs found") | ||
} else { | ||
sitename = append(sitename, rec.SiteName) | ||
} | ||
} else { | ||
sitename = findRSEs(rec.ServerDomain) | ||
} | ||
if len(sitename) <= 0 { | ||
return "", nil, "", "", 0, "", errors.New("No RSEs' map found") | ||
} | ||
// | ||
if rec.Ts == 0 { | ||
ts = time.Now().Unix() | ||
} else { | ||
ts = rec.Ts | ||
} | ||
// | ||
|
||
if len(rec.JobType) > 0 { | ||
jobtype = rec.JobType | ||
} else { | ||
jobtype = "unknow" | ||
} | ||
// | ||
if len(rec.ClientDomain) > 0 { | ||
wnname = rec.ClientDomain | ||
} else { | ||
wnname = "unknow" | ||
} | ||
if len(rec.ClientHost) > 0 { | ||
wnname = rec.ClientHost + "@" + wnname | ||
} else { | ||
wnname = "unknow" + "@" + wnname | ||
} | ||
// | ||
if len(rec.Usrdn) > 0 { | ||
usrdn = rec.Usrdn | ||
} else { | ||
usrdn = "" | ||
} | ||
return lfn, sitename, usrdn, jobtype, ts, wnname, nil | ||
} | ||
|
||
// swpopTrace makes swpop trace and send it to rucio endpoint | ||
func swpopTrace(msg *stomp.Message) ([]string, error) { | ||
var dids []string | ||
//get trace data | ||
lfn, sitename, usrdn, jobtype, ts, wnname, err := swpopConsumer(msg) | ||
if err != nil { | ||
log.Println("Bad swpop message.") | ||
return nil, errors.New("Bad swpop message") | ||
} | ||
for _, s := range sitename { | ||
trc := NewTrace(lfn, s, ts, jobtype, wnname, "swpop", usrdn) | ||
data, err := json.Marshal(trc) | ||
if err != nil { | ||
if Config.Verbose > 1 { | ||
log.Printf("Unable to marshal back to JSON string , error: %v, data: %v\n", err, trc) | ||
} else { | ||
log.Printf("Unable to marshal back to JSON string, error: %v \n", err) | ||
} | ||
dids = append(dids, fmt.Sprintf("%v", trc.DID)) | ||
} | ||
if Config.Verbose > 2 { | ||
log.Println("********* Rucio trace record ***************") | ||
log.Println("\n" + string(data)) | ||
log.Println("******** Done Rucio trace record *************") | ||
} | ||
// send data to Stomp endpoint | ||
if Config.EndpointProducer != "" { | ||
err := stompMgr.Send(data, stomp.SendOpt.Header("appversion", "swpopAMQ")) | ||
if err != nil { | ||
dids = append(dids, fmt.Sprintf("%v", trc.DID)) | ||
log.Printf("Failed to send %s to stomp.", trc.DID) | ||
} else { | ||
Send_swpop.Inc() | ||
} | ||
} else { | ||
log.Fatalln("*** Config.Enpoint is empty, check config file! ***") | ||
} | ||
} | ||
return dids, nil | ||
} | ||
|
||
// server gets messages from consumer AMQ end pointer, make tracers and send to AMQ producer end point. | ||
func swpopServer() { | ||
log.Println("Stomp broker URL: ", Config.StompURIConsumer) | ||
// get connection | ||
sub, err := subscribe(Config.EndpointConsumer, Config.StompURIConsumer) | ||
if err != nil { | ||
log.Println(err) | ||
} | ||
// | ||
err2 := parseRSEMap(fsitemap) | ||
if err2 != nil { | ||
log.Fatalln("Unable to parse rucio doamin RSE map file %s, error: %v", fsitemap, err2) | ||
} | ||
|
||
var tc uint64 | ||
t1 := time.Now().Unix() | ||
var t2 int64 | ||
var ts uint64 | ||
|
||
for { | ||
// check first if subscription is still valid, otherwise get a new one | ||
if sub == nil { | ||
time.Sleep(time.Duration(Config.Interval) * time.Second) | ||
sub, err = subscribe(Config.EndpointConsumer, Config.StompURIConsumer) | ||
if err != nil { | ||
log.Println("unable to get new subscription", err) | ||
continue | ||
} | ||
} | ||
// get stomp messages from subscriber channel | ||
select { | ||
case msg := <-sub.C: | ||
if msg.Err != nil { | ||
log.Println("receive error message", msg.Err) | ||
sub, err = subscribe(Config.EndpointConsumer, Config.StompURIConsumer) | ||
if err != nil { | ||
log.Println("unable to subscribe to", Config.EndpointConsumer, err) | ||
} | ||
break | ||
} | ||
// process stomp messages | ||
dids, err := swpopTrace(msg) | ||
if err == nil { | ||
Traces_swpop.Inc() | ||
atomic.AddUint64(&tc, 1) | ||
if Config.Verbose > 1 { | ||
log.Println("The number of traces processed in 1000 group: ", atomic.LoadUint64(&tc)) | ||
} | ||
} | ||
|
||
if atomic.LoadUint64(&tc) == 1000 { | ||
atomic.StoreUint64(&tc, 0) | ||
t2 = time.Now().Unix() - t1 | ||
t1 = time.Now().Unix() | ||
log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_swpop)) | ||
log.Printf("Processing 1000 messages took %d seconds.\n", t2) | ||
atomic.StoreUint64(&Receivedperk_swpop, 0) | ||
} | ||
if err != nil && err.Error() != "Empty message" { | ||
log.Println("SWPOP message processing error", err) | ||
} | ||
//got error message "SWPOP message processing error unexpected end of JSON input". | ||
//Code stoped to loop??? YG 6/22/2021 | ||
if len(dids) > 0 { | ||
log.Printf("DIDS in Error: %v .\n ", dids) | ||
} | ||
default: | ||
sleep := time.Duration(Config.Interval) * time.Millisecond | ||
if atomic.LoadUint64(&ts) == 10000 { | ||
atomic.StoreUint64(&ts, 0) | ||
if Config.Verbose > 3 { | ||
log.Println("waiting for 10000x", sleep) | ||
} | ||
} | ||
time.Sleep(sleep) | ||
atomic.AddUint64(&ts, 1) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.