From 77a458afb3964dbdb9ea0f8b41d00b3f2e7ea275 Mon Sep 17 00:00:00 2001 From: Luca Ruggieri Date: Sun, 27 Oct 2019 10:33:37 +0100 Subject: [PATCH] added statistics --- .../dht/mainline/indexingService.go | 8 +- cmd/magneticod/dht/mainline/protocol.go | 128 ++++++++++++++ cmd/magneticod/dht/mainline/transport.go | 166 +++++++++++++++--- cmd/magneticod/main.go | 2 +- pkg/util/util.go | 10 ++ 5 files changed, 283 insertions(+), 31 deletions(-) diff --git a/cmd/magneticod/dht/mainline/indexingService.go b/cmd/magneticod/dht/mainline/indexingService.go index 11d4bf19d..d7bd40594 100644 --- a/cmd/magneticod/dht/mainline/indexingService.go +++ b/cmd/magneticod/dht/mainline/indexingService.go @@ -10,6 +10,10 @@ import ( "go.uber.org/zap" ) +var ( + StatsPrintClock = 10 * time.Second +) + type IndexingService struct { // Private protocol *Protocol @@ -79,8 +83,8 @@ func (is *IndexingService) Start() { go is.index() zap.L().Info("Indexing Service started!") - if DefaultThrottleRate > 0{ - zap.L().Info("Throttle set to "+strconv.Itoa(DefaultThrottleRate)+" msg/s") + if DefaultThrottleRate > 0 { + zap.L().Info("Throttle set to " + strconv.Itoa(DefaultThrottleRate) + " msg/s") } } diff --git a/cmd/magneticod/dht/mainline/protocol.go b/cmd/magneticod/dht/mainline/protocol.go index 92c766187..8a8d21de1 100644 --- a/cmd/magneticod/dht/mainline/protocol.go +++ b/cmd/magneticod/dht/mainline/protocol.go @@ -3,7 +3,10 @@ package mainline import ( "crypto/rand" "crypto/sha1" + "github.com/boramalper/magnetico/pkg/util" "net" + "sort" + "strconv" "sync" "time" @@ -16,6 +19,8 @@ type Protocol struct { transport *Transport eventHandlers ProtocolEventHandlers started bool + + stats protocolStats } type ProtocolEventHandlers struct { @@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol p = new(Protocol) p.eventHandlers = eventHandlers p.transport = NewTransport(laddr, p.onMessage, p.eventHandlers.OnCongestion) + p.stats = protocolStats{ + messageTypeCount: make(map[string]map[string]int), + } p.currentTokenSecret, p.previousTokenSecret = make([]byte, 20), make([]byte, 20) _, err := rand.Read(p.currentTokenSecret) @@ -56,6 +64,7 @@ func (p *Protocol) Start() { p.started = true p.transport.Start() + go p.printStats() go p.updateTokenSecret() } @@ -67,7 +76,114 @@ func (p *Protocol) Terminate() { p.transport.Terminate() } +//statistics +type protocolStats struct { + sync.RWMutex + messageTypeCount map[string]map[string]int //type=>subtype=>count +} + +func (ps *protocolStats) Reset() { + ps.Lock() + defer ps.Unlock() + ps.messageTypeCount = make(map[string]map[string]int) +} + +type messageTypeCountOrdered struct { + messageType string + messageCount int + percentageOverTotal float64 + subMessages orderedMessagesCount +} +type orderedMessagesCount []*messageTypeCountOrdered + + +func (omc orderedMessagesCount) Len() int { + return len(omc) +} +func (omc orderedMessagesCount) Swap(i, j int) { + omc[i], omc[j] = omc[j], omc[i] +} +func (omc orderedMessagesCount) Less(i, j int) bool { + return omc[i].messageCount > omc[j].messageCount +} +func (omc orderedMessagesCount) CalculatePercentagesOverTotal(totalMessages int) { + for _, mtco := range omc { + if mtco.subMessages != nil && len(mtco.subMessages) > 0 { + mtco.subMessages.CalculatePercentagesOverTotal(totalMessages) + } + mtco.percentageOverTotal = util.RoundToDecimal( + (float64(mtco.messageCount)/float64(totalMessages))*100, 2) + } +} +func (omc orderedMessagesCount) Sort() { + for _, mtco := range omc { + if mtco.subMessages != nil && len(mtco.subMessages) > 0 { + mtco.subMessages.Sort() + } + } + sort.Sort(omc) +} +func (omc orderedMessagesCount) String() string { + /* + string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't + be a problem and it will be much easier to write down and read + */ + mostReceivedMessageTypes := "" + for mIdx, m := range omc { + if mIdx > 0 { + mostReceivedMessageTypes += ", " + } + mostReceivedMessageTypes += m.messageType + mostReceivedMessageTypes += + " (" + strconv.Itoa(m.messageCount) + ", " + strconv.FormatFloat(m.percentageOverTotal, 'f', -1, 64) + "%)" + + if m.subMessages != nil && len(m.subMessages) > 0 { + //add stats for submessages unless there is only 1 submessage with len 0 (empty) + if !(len(m.subMessages) == 1 && len(m.subMessages[0].messageType) == 0) { + mostReceivedMessageTypes += "[ " + m.subMessages.String() + " ]" + } + } + } + return mostReceivedMessageTypes +} +func (p *Protocol) printStats() { + for { + time.Sleep(StatsPrintClock) + p.stats.RLock() + orderedMessages := make(orderedMessagesCount, 0, len(p.stats.messageTypeCount)) + totalMessages := 0 + for mType, mSubTypes := range p.stats.messageTypeCount { + mCount := 0 + orderedSubMessages := make(orderedMessagesCount, 0, len(mSubTypes)) + for mSubType, mSubCount := range mSubTypes { + mCount += mSubCount + totalMessages += mSubCount + orderedSubMessages = append(orderedSubMessages, &messageTypeCountOrdered{ + messageType: mSubType, + messageCount: mSubCount, + }) + } + orderedMessages = append(orderedMessages, &messageTypeCountOrdered{ + messageType: mType, + messageCount: mCount, + subMessages: orderedSubMessages, + }) + } + p.stats.RUnlock() + orderedMessages.CalculatePercentagesOverTotal(totalMessages) + orderedMessages.Sort() + + zap.L().Info("Protocol stats (on "+StatsPrintClock.String()+"):", + zap.String("message type", orderedMessages.String()), + ) + + p.stats.Reset() + } +} + func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { + temporaryQ := msg.Q + switch msg.Y { case "q": switch msg.Q { @@ -140,6 +256,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { // // sample_infohashes > get_peers > find_node > ping / announce_peer if len(msg.R.Samples) != 0 { // The message should be a sample_infohashes response. + temporaryQ = "sample_infohashes" if !validateSampleInfohashesResponseMessage(msg) { // zap.L().Debug("An invalid sample_infohashes response received!") return @@ -148,6 +265,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { p.eventHandlers.OnSampleInfohashesResponse(msg, addr) } } else if len(msg.R.Token) != 0 { // The message should be a get_peers response. + temporaryQ = "get_peers" if !validateGetPeersResponseMessage(msg) { // zap.L().Debug("An invalid get_peers response received!") return @@ -156,6 +274,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { p.eventHandlers.OnGetPeersResponse(msg, addr) } } else if len(msg.R.Nodes) != 0 { // The message should be a find_node response. + temporaryQ = "find_node" if !validateFindNodeResponseMessage(msg) { // zap.L().Debug("An invalid find_node response received!") return @@ -164,6 +283,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { p.eventHandlers.OnFindNodeResponse(msg, addr) } } else { // The message should be a ping or an announce_peer response. + temporaryQ = "ping_or_announce" if !validatePingORannouncePeerResponseMessage(msg) { // zap.L().Debug("An invalid ping OR announce_peer response received!") return @@ -182,6 +302,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { zap.String("type", msg.Y)) */ } + + //let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field + p.stats.Lock() + if _, ok := p.stats.messageTypeCount[msg.Y]; !ok { + p.stats.messageTypeCount[msg.Y] = make(map[string]int) + } + p.stats.messageTypeCount[msg.Y][temporaryQ]++ + p.stats.Unlock() } func (p *Protocol) SendMessage(msg *Message, addr *net.UDPAddr) { diff --git a/cmd/magneticod/dht/mainline/transport.go b/cmd/magneticod/dht/mainline/transport.go index 4c1f92521..b8bb7b8e5 100644 --- a/cmd/magneticod/dht/mainline/transport.go +++ b/cmd/magneticod/dht/mainline/transport.go @@ -1,17 +1,22 @@ package mainline import ( - "net" - "time" - + "bytes" "github.com/anacrolix/torrent/bencode" sockaddr "github.com/libp2p/go-sockaddr/net" "go.uber.org/zap" "golang.org/x/sys/unix" + "net" + "sort" + "strconv" + "strings" + "sync" + "time" ) -var( - DefaultThrottleRate = -1 // <= 0 for unlimited requests +var ( + //Throttle rate that transport will have at Start time. Set <= 0 for unlimited requests + DefaultThrottleRate = -1 ) type Transport struct { @@ -27,8 +32,9 @@ type Transport struct { // OnCongestion onCongestion func() - throttlingRate int //available messages per second. If <=0, it is considered disabled + throttlingRate int //available messages per second. If <=0, it is considered disabled throttleTicketsChannel chan struct{} //channel giving tickets (allowance) to make send a message + stats *transportStats } func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onCongestion func()) *Transport { @@ -48,7 +54,7 @@ func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onConges t.onMessage = onMessage t.onCongestion = onCongestion t.throttleTicketsChannel = make(chan struct{}) - t.throttlingRate = DefaultThrottleRate + t.SetThrottle(DefaultThrottleRate) var err error t.laddr, err = net.ResolveUDPAddr("udp", laddr) @@ -59,10 +65,15 @@ func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onConges zap.L().Panic("IP address is not IPv4!") } + t.stats = &transportStats{ + sentPorts: make(map[string]int), + } + return t } -func (t *Transport) SetThrottle(rate int){ +//Sets t throttle rate at runtime +func (t *Transport) SetThrottle(rate int) { t.throttlingRate = rate } @@ -93,6 +104,7 @@ func (t *Transport) Start() { zap.L().Fatal("Could NOT bind the socket!", zap.Error(err)) } + go t.printStats() go t.readMessages() go t.Throttle() } @@ -132,54 +144,144 @@ func (t *Transport) readMessages() { continue } + t.stats.Lock() + t.stats.totalRead++ + t.stats.Unlock() t.onMessage(&msg, from) } } -func (t *Transport) Throttle(){ - if t.throttlingRate > 0{ +//Manages throttling for transport. To be called as a routine at Start time. Should never return. +func (t *Transport) Throttle() { + if t.throttlingRate > 0 { resetChannel := make(chan struct{}) - dealer := func(resetRequest chan struct{}){ + dealer := func(resetRequest chan struct{}) { ticketGiven := 0 tooManyTicketGiven := false - for{ - select{ - case <- t.throttleTicketsChannel: { - ticketGiven++ - if ticketGiven >= t.throttlingRate{ - tooManyTicketGiven = true - break + for { + select { + case <-t.throttleTicketsChannel: + { + ticketGiven++ + if ticketGiven >= t.throttlingRate { + tooManyTicketGiven = true + break + } + } + case <-resetRequest: + { + return } - } - case <- resetRequest: { - return - } } - if tooManyTicketGiven{break} + if tooManyTicketGiven { + break + } } - <- resetRequest + <-resetRequest return } go dealer(resetChannel) - for range time.Tick(1*time.Second){ + for range time.Tick(1 * time.Second) { resetChannel <- struct{}{} go dealer(resetChannel) } - }else{ + } else { //no limit, keep giving tickets to whoever requests it - for{ + for { <-t.throttleTicketsChannel } } } +//statistics +type transportStats struct { + sync.RWMutex + sentPorts map[string]int + totalSend int + totalRead int +} + +func (ts *transportStats) Reset() { + ts.Lock() + defer ts.Unlock() + ts.sentPorts = make(map[string]int) + ts.totalSend = 0 + ts.totalRead = 0 +} + +type statPortCount struct { + portNumber string + portCount int +} +type statPortCounts []statPortCount + +func (s statPortCounts) Len() int { + return len(s) +} +func (s statPortCounts) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s statPortCounts) Less(i, j int) bool { + return s[i].portCount > s[j].portCount +} + +var totalSend int + +func (t *Transport) printStats() { + for { + time.Sleep(StatsPrintClock) + t.stats.RLock() + tempOrderedPorts := make(statPortCounts, 0, len(t.stats.sentPorts)) + currentTotalSend := t.stats.totalSend + currentTotalRead := t.stats.totalRead + for port, count := range t.stats.sentPorts { + tempOrderedPorts = append(tempOrderedPorts, statPortCount{port, count}) + } + t.stats.RUnlock() + + sort.Sort(tempOrderedPorts) + + mostUsedPortsBuffer := bytes.Buffer{} + sendRateBuffer := bytes.Buffer{} + readRateBuffer := bytes.Buffer{} + + for i, pc := range tempOrderedPorts { + if i > 5 { + break + } else if i > 0 { + mostUsedPortsBuffer.WriteString(", ") + } + + mostUsedPortsBuffer.WriteString(pc.portNumber) + mostUsedPortsBuffer.WriteString("(") + mostUsedPortsBuffer.WriteString(strconv.Itoa(pc.portCount)) + mostUsedPortsBuffer.WriteString(")") + } + + sendRateBuffer.WriteString(strconv.FormatFloat(float64(currentTotalSend)/StatsPrintClock.Seconds(), 'f', -1, 64)) + sendRateBuffer.WriteString(" msg/s") + + readRateBuffer.WriteString(strconv.FormatFloat(float64(currentTotalRead)/StatsPrintClock.Seconds(), 'f', -1, 64)) + readRateBuffer.WriteString(" msg/s") + + zap.L().Info("Transport stats for socket "+strconv.Itoa(t.fd)+" (on "+StatsPrintClock.String()+"):", + zap.String("ports", mostUsedPortsBuffer.String()), + zap.String("send rate", sendRateBuffer.String()), + zap.String("read rate", readRateBuffer.String()), + ) + + //finally, reset stats + t.stats.Reset() + } +} + func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) { //get ticket t.throttleTicketsChannel <- struct{}{} @@ -188,13 +290,21 @@ func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) { if err != nil { zap.L().Panic("Could NOT marshal an outgoing message! (Programmer error.)") } - addrSA := sockaddr.NetAddrToSockaddr(addr) if addrSA == nil { zap.L().Debug("Wrong net address for the remote peer!", zap.String("addr", addr.String())) return } + t.stats.Lock() + a := strings.Split(addr.String(), ":")[1] + if _, ok := t.stats.sentPorts[a]; ok { + t.stats.sentPorts[a]++ + } else { + t.stats.sentPorts[a] = 1 + } + t.stats.totalSend++ + t.stats.Unlock() err = unix.Sendto(t.fd, data, 0, addrSA) if err == unix.EPERM || err == unix.ENOBUFS { diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index 54bcbe648..90ae2a510 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -141,7 +141,7 @@ func parseFlags() (*opFlags, error) { IndexerInterval uint `long:"indexer-interval" description:"Indexing interval in integer seconds." default:"1"` IndexerMaxNeighbors uint `long:"indexer-max-neighbors" description:"Maximum number of neighbors of an indexer." default:"10000"` - LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"200"` + LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"200"` MaxThrottle uint `long:"max-throttle" description:"Maximum requests per second." default:"0"` Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` diff --git a/pkg/util/util.go b/pkg/util/util.go index 9c070a188..398c37d2b 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -2,6 +2,7 @@ package util import ( "encoding/hex" + "math" "go.uber.org/zap/zapcore" ) @@ -9,3 +10,12 @@ import ( func HexField(key string, val []byte) zapcore.Field { return zapcore.Field{Key: key, Type: zapcore.StringType, String: hex.EncodeToString(val)} } + +//round iFloat to iDecimalPlaces decimal points +func RoundToDecimal(iFloat float64, iDecimalPlaces int) float64 { + var multiplier float64 = 10 + for i := 1; i < iDecimalPlaces; i++ { + multiplier = multiplier * 10 + } + return math.Round(iFloat*multiplier) / multiplier +}