From b14185db14eb8666b179b7ecb4a51a15b3cc9557 Mon Sep 17 00:00:00 2001 From: Luca Ruggieri Date: Sun, 27 Oct 2019 10:33:37 +0100 Subject: [PATCH] added statistics --- .../dht/mainline/indexingService.go | 4 + cmd/magneticod/dht/mainline/protocol.go | 124 +++++++++++++++ cmd/magneticod/dht/mainline/transport.go | 144 +++++++++++++++--- pkg/util/util.go | 16 ++ 4 files changed, 265 insertions(+), 23 deletions(-) diff --git a/cmd/magneticod/dht/mainline/indexingService.go b/cmd/magneticod/dht/mainline/indexingService.go index 11d4bf19d..1b50137b8 100644 --- a/cmd/magneticod/dht/mainline/indexingService.go +++ b/cmd/magneticod/dht/mainline/indexingService.go @@ -10,6 +10,10 @@ import ( "go.uber.org/zap" ) +var( + StatsPrintClock = 10*time.Second +) + type IndexingService struct { // Private protocol *Protocol diff --git a/cmd/magneticod/dht/mainline/protocol.go b/cmd/magneticod/dht/mainline/protocol.go index 92c766187..7c1a6f482 100644 --- a/cmd/magneticod/dht/mainline/protocol.go +++ b/cmd/magneticod/dht/mainline/protocol.go @@ -3,7 +3,10 @@ package mainline import ( "crypto/rand" "crypto/sha1" + "github.com/boramalper/magnetico/pkg/util" "net" + "sort" + "strconv" "sync" "time" @@ -16,6 +19,8 @@ type Protocol struct { transport *Transport eventHandlers ProtocolEventHandlers started bool + + stats protocolStats } type ProtocolEventHandlers struct { @@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol p = new(Protocol) p.eventHandlers = eventHandlers p.transport = NewTransport(laddr, p.onMessage, p.eventHandlers.OnCongestion) + p.stats = protocolStats{ + messageTypeCount:make(map[string]map[string]int), + } p.currentTokenSecret, p.previousTokenSecret = make([]byte, 20), make([]byte, 20) _, err := rand.Read(p.currentTokenSecret) @@ -56,6 +64,7 @@ func (p *Protocol) Start() { p.started = true p.transport.Start() + go p.printStats() go p.updateTokenSecret() } @@ -67,7 +76,110 @@ func (p *Protocol) Terminate() { p.transport.Terminate() } +//statistics +type protocolStats struct{ + sync.RWMutex + messageTypeCount map[string]map[string]int //type=>subtype=>count +} +func (ps *protocolStats) Reset(){ + ps.Lock() + defer ps.Unlock() + ps.messageTypeCount = make(map[string]map[string]int) +} +type messageTypeCountOrdered struct{ + messageType string + messageCount int + percentageOverTotal float64 + subMessages orderedMessagesCount +} +type orderedMessagesCount []*messageTypeCountOrdered +func (omc orderedMessagesCount) Len() int { + return len(omc) +} +func (omc orderedMessagesCount) Swap(i, j int) { + omc[i], omc[j] = omc[j], omc[i] +} +func (omc orderedMessagesCount) Less(i, j int) bool { + return omc[i].messageCount > omc[j].messageCount +} +func (omc orderedMessagesCount)CalculatePercentagesOverTotal(totalMessages int){ + for _,mtco := range omc{ + if mtco.subMessages != nil && len(mtco.subMessages) > 0{ + mtco.subMessages.CalculatePercentagesOverTotal(totalMessages) + } + mtco.percentageOverTotal = util.RoundToDecimal( + (float64(mtco.messageCount) / float64(totalMessages)) * 100,2) + } +} +func (omc orderedMessagesCount) Sort(){ + for _,mtco := range omc{ + if mtco.subMessages != nil && len(mtco.subMessages) > 0{ + mtco.subMessages.Sort() + } + } + sort.Sort(omc) +} +func (omc orderedMessagesCount) String() string { + /* + string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't + be a problem and it will be much easier to write down and read + */ + mostReceivedMessageTypes := "" + for mIdx, m := range omc { + if mIdx > 0 { + mostReceivedMessageTypes += ", " + } + mostReceivedMessageTypes += m.messageType + mostReceivedMessageTypes += + " (" + strconv.Itoa(m.messageCount) + ", " + strconv.FormatFloat(m.percentageOverTotal, 'f', -1, 64) + "%)" + + if m.subMessages != nil && len(m.subMessages) > 0 { + //add stats for submessages unless there is only 1 submessage with len 0 (empty) + if ! (len(m.subMessages) == 1 && len(m.subMessages[0].messageType) == 0) { + mostReceivedMessageTypes += "[ " + m.subMessages.String() + " ]" + } + } + } + return mostReceivedMessageTypes +} +func (p *Protocol) printStats() { + for { + time.Sleep(StatsPrintClock) + p.stats.RLock() + orderedMessages := make(orderedMessagesCount, 0, len(p.stats.messageTypeCount)) + totalMessages := 0 + for mType, mSubTypes := range p.stats.messageTypeCount { + mCount := 0 + orderedSubMessages := make(orderedMessagesCount, 0, len(mSubTypes)) + for mSubType, mSubCount := range mSubTypes { + mCount += mSubCount + totalMessages += mSubCount + orderedSubMessages = append(orderedSubMessages, &messageTypeCountOrdered{ + messageType: mSubType, + messageCount: mSubCount, + }) + } + orderedMessages = append(orderedMessages, &messageTypeCountOrdered{ + messageType: mType, + messageCount: mCount, + subMessages: orderedSubMessages, + }) + } + p.stats.RUnlock() + orderedMessages.CalculatePercentagesOverTotal(totalMessages) + orderedMessages.Sort() + + zap.L().Info("Protocol stats (on "+StatsPrintClock.String()+"):", + zap.String("message type", orderedMessages.String()), + ) + + p.stats.Reset() + } +} + func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { + temporaryQ := msg.Q + switch msg.Y { case "q": switch msg.Q { @@ -140,6 +252,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { // // sample_infohashes > get_peers > find_node > ping / announce_peer if len(msg.R.Samples) != 0 { // The message should be a sample_infohashes response. + temporaryQ = "sample_infohashes" if !validateSampleInfohashesResponseMessage(msg) { // zap.L().Debug("An invalid sample_infohashes response received!") return @@ -148,6 +261,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { p.eventHandlers.OnSampleInfohashesResponse(msg, addr) } } else if len(msg.R.Token) != 0 { // The message should be a get_peers response. + temporaryQ = "get_peers" if !validateGetPeersResponseMessage(msg) { // zap.L().Debug("An invalid get_peers response received!") return @@ -156,6 +270,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { p.eventHandlers.OnGetPeersResponse(msg, addr) } } else if len(msg.R.Nodes) != 0 { // The message should be a find_node response. + temporaryQ = "find_node" if !validateFindNodeResponseMessage(msg) { // zap.L().Debug("An invalid find_node response received!") return @@ -164,6 +279,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { p.eventHandlers.OnFindNodeResponse(msg, addr) } } else { // The message should be a ping or an announce_peer response. + temporaryQ = "ping_or_announce" if !validatePingORannouncePeerResponseMessage(msg) { // zap.L().Debug("An invalid ping OR announce_peer response received!") return @@ -182,6 +298,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { zap.String("type", msg.Y)) */ } + + //let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field + p.stats.Lock() + if _, ok := p.stats.messageTypeCount[msg.Y] ; !ok{ + p.stats.messageTypeCount[msg.Y] = make(map[string]int) + } + p.stats.messageTypeCount[msg.Y][temporaryQ] ++ + p.stats.Unlock() } func (p *Protocol) SendMessage(msg *Message, addr *net.UDPAddr) { diff --git a/cmd/magneticod/dht/mainline/transport.go b/cmd/magneticod/dht/mainline/transport.go index 4c1f92521..ec2548509 100644 --- a/cmd/magneticod/dht/mainline/transport.go +++ b/cmd/magneticod/dht/mainline/transport.go @@ -1,13 +1,17 @@ package mainline import ( - "net" - "time" - + "bytes" "github.com/anacrolix/torrent/bencode" sockaddr "github.com/libp2p/go-sockaddr/net" "go.uber.org/zap" "golang.org/x/sys/unix" + "net" + "sort" + "strconv" + "strings" + "sync" + "time" ) var( @@ -29,6 +33,7 @@ type Transport struct { throttlingRate int //available messages per second. If <=0, it is considered disabled throttleTicketsChannel chan struct{} //channel giving tickets (allowance) to make send a message + stats *transportStats } func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onCongestion func()) *Transport { @@ -59,6 +64,10 @@ func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onConges zap.L().Panic("IP address is not IPv4!") } + t.stats = &transportStats{ + sentPorts: make(map[string]int), + } + return t } @@ -93,6 +102,7 @@ func (t *Transport) Start() { zap.L().Fatal("Could NOT bind the socket!", zap.Error(err)) } + go t.printStats() go t.readMessages() go t.Throttle() } @@ -132,53 +142,133 @@ func (t *Transport) readMessages() { continue } + t.stats.Lock() + t.stats.totalRead++ + t.stats.Unlock() t.onMessage(&msg, from) } } -func (t *Transport) Throttle(){ - if t.throttlingRate > 0{ +func (t *Transport) Throttle() { + if t.throttlingRate > 0 { resetChannel := make(chan struct{}) - dealer := func(resetRequest chan struct{}){ + dealer := func(resetRequest chan struct{}) { ticketGiven := 0 tooManyTicketGiven := false - for{ - select{ - case <- t.throttleTicketsChannel: { - ticketGiven++ - if ticketGiven >= t.throttlingRate{ - tooManyTicketGiven = true - break + for { + select { + case <-t.throttleTicketsChannel: + { + ticketGiven++ + if ticketGiven >= t.throttlingRate { + tooManyTicketGiven = true + break + } + } + case <-resetRequest: + { + return } - } - case <- resetRequest: { - return - } } - if tooManyTicketGiven{break} + if tooManyTicketGiven { + break + } } - <- resetRequest + <-resetRequest return } go dealer(resetChannel) - for range time.Tick(1*time.Second){ + for range time.Tick(1 * time.Second) { resetChannel <- struct{}{} go dealer(resetChannel) } - }else{ + } else { //no limit, keep giving tickets to whoever requests it - for{ + for { <-t.throttleTicketsChannel } } } +//statistics +type transportStats struct{ + sync.RWMutex + sentPorts map[string]int + totalSend int + totalRead int +} +func(ts *transportStats) Reset(){ + ts.Lock() + defer ts.Unlock() + ts.sentPorts = make(map[string]int) + ts.totalSend = 0 + ts.totalRead = 0 +} +type statPortCount struct{ + portNumber string + portCount int +} +type statPortCounts []statPortCount +func (s statPortCounts) Len() int { + return len(s) +} +func (s statPortCounts) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s statPortCounts) Less(i, j int) bool { + return s[i].portCount > s[j].portCount +} +var totalSend int +func (t *Transport) printStats(){ + for{ + time.Sleep(StatsPrintClock) + t.stats.RLock() + tempOrderedPorts := make(statPortCounts,0,len(t.stats.sentPorts)) + currentTotalSend := t.stats.totalSend + currentTotalRead := t.stats.totalRead + for port, count := range t.stats.sentPorts{ + tempOrderedPorts = append(tempOrderedPorts, statPortCount{port,count}) + } + t.stats.RUnlock() + + sort.Sort(tempOrderedPorts) + + mostUsedPortsBuffer := bytes.Buffer{} + sendRateBuffer := bytes.Buffer{} + readRateBuffer := bytes.Buffer{} + + for i,pc := range tempOrderedPorts{ + if i > 5 {break}else if i > 0{ + mostUsedPortsBuffer.WriteString(", ")} + + mostUsedPortsBuffer.WriteString(pc.portNumber) + mostUsedPortsBuffer.WriteString("(") + mostUsedPortsBuffer.WriteString(strconv.Itoa(pc.portCount)) + mostUsedPortsBuffer.WriteString(")") + } + + sendRateBuffer.WriteString(strconv.FormatFloat(float64(currentTotalSend) / StatsPrintClock.Seconds(),'f',-1,64)) + sendRateBuffer.WriteString(" msg/s") + + readRateBuffer.WriteString(strconv.FormatFloat(float64(currentTotalRead) / StatsPrintClock.Seconds(),'f',-1,64)) + readRateBuffer.WriteString(" msg/s") + + zap.L().Info("Transport stats for socket "+strconv.Itoa(t.fd)+" (on "+StatsPrintClock.String()+"):", + zap.String("ports",mostUsedPortsBuffer.String()), + zap.String("send rate", sendRateBuffer.String()), + zap.String("read rate", readRateBuffer.String()), + ) + + //finally, reset stats + t.stats.Reset() + } +} func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) { //get ticket @@ -188,13 +278,21 @@ func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) { if err != nil { zap.L().Panic("Could NOT marshal an outgoing message! (Programmer error.)") } - addrSA := sockaddr.NetAddrToSockaddr(addr) if addrSA == nil { zap.L().Debug("Wrong net address for the remote peer!", zap.String("addr", addr.String())) return } + t.stats.Lock() + a := strings.Split(addr.String(),":")[1] + if _, ok := t.stats.sentPorts[a] ; ok{ + t.stats.sentPorts[a] ++ + }else{ + t.stats.sentPorts[a] = 1 + } + t.stats.totalSend++ + t.stats.Unlock() err = unix.Sendto(t.fd, data, 0, addrSA) if err == unix.EPERM || err == unix.ENOBUFS { diff --git a/pkg/util/util.go b/pkg/util/util.go index 9c070a188..9238ccb7b 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -2,6 +2,7 @@ package util import ( "encoding/hex" + "math" "go.uber.org/zap/zapcore" ) @@ -9,3 +10,18 @@ import ( func HexField(key string, val []byte) zapcore.Field { return zapcore.Field{Key: key, Type: zapcore.StringType, String: hex.EncodeToString(val)} } + +func Round(x float64) float64 { + t := math.Trunc(x) + if math.Abs(x-t) >= 0.5 { + return t + math.Copysign(1, x) + } + return t +} +func RoundToDecimal(iFloat float64, iDecimalPlaces int) float64{ + var multiplier float64 = 10 + for i := 1; i < iDecimalPlaces ; i++{ + multiplier = multiplier * 10 + } + return Round(iFloat*multiplier)/multiplier +} \ No newline at end of file