Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
added statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
lruggieri committed Oct 29, 2019
1 parent 39966a7 commit 231161e
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 32 deletions.
8 changes: 6 additions & 2 deletions cmd/magneticod/dht/mainline/indexingService.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"go.uber.org/zap"
)

var (
StatsPrintClock = 10 * time.Second
)

type IndexingService struct {
// Private
protocol *Protocol
Expand Down Expand Up @@ -79,8 +83,8 @@ func (is *IndexingService) Start() {
go is.index()

zap.L().Info("Indexing Service started!")
if DefaultThrottleRate > 0{
zap.L().Info("Throttle set to "+strconv.Itoa(DefaultThrottleRate)+" msg/s")
if DefaultThrottleRate > 0 {
zap.L().Info("Throttle set to " + strconv.Itoa(DefaultThrottleRate) + " msg/s")
}
}

Expand Down
128 changes: 128 additions & 0 deletions cmd/magneticod/dht/mainline/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package mainline
import (
"crypto/rand"
"crypto/sha1"
"github.com/boramalper/magnetico/pkg/util"
"net"
"sort"
"strconv"
"sync"
"time"

Expand All @@ -16,6 +19,8 @@ type Protocol struct {
transport *Transport
eventHandlers ProtocolEventHandlers
started bool

stats protocolStats
}

type ProtocolEventHandlers struct {
Expand All @@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol
p = new(Protocol)
p.eventHandlers = eventHandlers
p.transport = NewTransport(laddr, p.onMessage, p.eventHandlers.OnCongestion)
p.stats = protocolStats{
messageTypeCount: make(map[string]map[string]int),
}

p.currentTokenSecret, p.previousTokenSecret = make([]byte, 20), make([]byte, 20)
_, err := rand.Read(p.currentTokenSecret)
Expand All @@ -56,6 +64,7 @@ func (p *Protocol) Start() {
p.started = true

p.transport.Start()
go p.printStats()
go p.updateTokenSecret()
}

Expand All @@ -67,7 +76,114 @@ func (p *Protocol) Terminate() {
p.transport.Terminate()
}

//statistics
type protocolStats struct {
sync.RWMutex
messageTypeCount map[string]map[string]int //type=>subtype=>count
}

func (ps *protocolStats) Reset() {
ps.Lock()
defer ps.Unlock()
ps.messageTypeCount = make(map[string]map[string]int)
}

type messageTypeCountOrdered struct {
messageType string
messageCount int
percentageOverTotal float64
subMessages orderedMessagesCount
}
type orderedMessagesCount []*messageTypeCountOrdered


func (omc orderedMessagesCount) Len() int {
return len(omc)
}
func (omc orderedMessagesCount) Swap(i, j int) {
omc[i], omc[j] = omc[j], omc[i]
}
func (omc orderedMessagesCount) Less(i, j int) bool {
return omc[i].messageCount > omc[j].messageCount
}
func (omc orderedMessagesCount) CalculatePercentagesOverTotal(totalMessages int) {
for _, mtco := range omc {
if mtco.subMessages != nil && len(mtco.subMessages) > 0 {
mtco.subMessages.CalculatePercentagesOverTotal(totalMessages)
}
mtco.percentageOverTotal = util.RoundToDecimal(
(float64(mtco.messageCount)/float64(totalMessages))*100, 2)
}
}
func (omc orderedMessagesCount) Sort() {
for _, mtco := range omc {
if mtco.subMessages != nil && len(mtco.subMessages) > 0 {
mtco.subMessages.Sort()
}
}
sort.Sort(omc)
}
func (omc orderedMessagesCount) String() string {
/*
string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't
be a problem and it will be much easier to write down and read
*/
mostReceivedMessageTypes := ""
for mIdx, m := range omc {
if mIdx > 0 {
mostReceivedMessageTypes += ", "
}
mostReceivedMessageTypes += m.messageType
mostReceivedMessageTypes +=
" (" + strconv.Itoa(m.messageCount) + ", " + strconv.FormatFloat(m.percentageOverTotal, 'f', -1, 64) + "%)"

if m.subMessages != nil && len(m.subMessages) > 0 {
//add stats for submessages unless there is only 1 submessage with len 0 (empty)
if !(len(m.subMessages) == 1 && len(m.subMessages[0].messageType) == 0) {
mostReceivedMessageTypes += "[ " + m.subMessages.String() + " ]"
}
}
}
return mostReceivedMessageTypes
}
func (p *Protocol) printStats() {
for {
time.Sleep(StatsPrintClock)
p.stats.RLock()
orderedMessages := make(orderedMessagesCount, 0, len(p.stats.messageTypeCount))
totalMessages := 0
for mType, mSubTypes := range p.stats.messageTypeCount {
mCount := 0
orderedSubMessages := make(orderedMessagesCount, 0, len(mSubTypes))
for mSubType, mSubCount := range mSubTypes {
mCount += mSubCount
totalMessages += mSubCount
orderedSubMessages = append(orderedSubMessages, &messageTypeCountOrdered{
messageType: mSubType,
messageCount: mSubCount,
})
}
orderedMessages = append(orderedMessages, &messageTypeCountOrdered{
messageType: mType,
messageCount: mCount,
subMessages: orderedSubMessages,
})
}
p.stats.RUnlock()
orderedMessages.CalculatePercentagesOverTotal(totalMessages)
orderedMessages.Sort()

zap.L().Info("Protocol stats (on "+StatsPrintClock.String()+"):",
zap.String("message type", orderedMessages.String()),
)

p.stats.Reset()
}
}

func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
temporaryQ := msg.Q

switch msg.Y {
case "q":
switch msg.Q {
Expand Down Expand Up @@ -140,6 +256,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
//
// sample_infohashes > get_peers > find_node > ping / announce_peer
if len(msg.R.Samples) != 0 { // The message should be a sample_infohashes response.
temporaryQ = "sample_infohashes"
if !validateSampleInfohashesResponseMessage(msg) {
// zap.L().Debug("An invalid sample_infohashes response received!")
return
Expand All @@ -148,6 +265,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
p.eventHandlers.OnSampleInfohashesResponse(msg, addr)
}
} else if len(msg.R.Token) != 0 { // The message should be a get_peers response.
temporaryQ = "get_peers"
if !validateGetPeersResponseMessage(msg) {
// zap.L().Debug("An invalid get_peers response received!")
return
Expand All @@ -156,6 +274,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
p.eventHandlers.OnGetPeersResponse(msg, addr)
}
} else if len(msg.R.Nodes) != 0 { // The message should be a find_node response.
temporaryQ = "find_node"
if !validateFindNodeResponseMessage(msg) {
// zap.L().Debug("An invalid find_node response received!")
return
Expand All @@ -164,6 +283,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
p.eventHandlers.OnFindNodeResponse(msg, addr)
}
} else { // The message should be a ping or an announce_peer response.
temporaryQ = "ping_or_announce"
if !validatePingORannouncePeerResponseMessage(msg) {
// zap.L().Debug("An invalid ping OR announce_peer response received!")
return
Expand All @@ -182,6 +302,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
zap.String("type", msg.Y))
*/
}

//let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field
p.stats.Lock()
if _, ok := p.stats.messageTypeCount[msg.Y]; !ok {
p.stats.messageTypeCount[msg.Y] = make(map[string]int)
}
p.stats.messageTypeCount[msg.Y][temporaryQ]++
p.stats.Unlock()
}

func (p *Protocol) SendMessage(msg *Message, addr *net.UDPAddr) {
Expand Down
Loading

0 comments on commit 231161e

Please sign in to comment.