From 39966a7ca57c1b33bce070834868ef16241bec12 Mon Sep 17 00:00:00 2001 From: Luca Ruggieri Date: Sun, 27 Oct 2019 11:07:22 +0100 Subject: [PATCH] Throttling enabled --- .../dht/mainline/indexingService.go | 4 ++ cmd/magneticod/dht/mainline/transport.go | 62 +++++++++++++++++++ cmd/magneticod/main.go | 4 ++ 3 files changed, 70 insertions(+) diff --git a/cmd/magneticod/dht/mainline/indexingService.go b/cmd/magneticod/dht/mainline/indexingService.go index 2dcdc784c..11d4bf19d 100644 --- a/cmd/magneticod/dht/mainline/indexingService.go +++ b/cmd/magneticod/dht/mainline/indexingService.go @@ -3,6 +3,7 @@ package mainline import ( "math/rand" "net" + "strconv" "sync" "time" @@ -78,6 +79,9 @@ func (is *IndexingService) Start() { go is.index() zap.L().Info("Indexing Service started!") + if DefaultThrottleRate > 0{ + zap.L().Info("Throttle set to "+strconv.Itoa(DefaultThrottleRate)+" msg/s") + } } func (is *IndexingService) Terminate() { diff --git a/cmd/magneticod/dht/mainline/transport.go b/cmd/magneticod/dht/mainline/transport.go index 3f993edee..4c1f92521 100644 --- a/cmd/magneticod/dht/mainline/transport.go +++ b/cmd/magneticod/dht/mainline/transport.go @@ -2,6 +2,7 @@ package mainline import ( "net" + "time" "github.com/anacrolix/torrent/bencode" sockaddr "github.com/libp2p/go-sockaddr/net" @@ -9,6 +10,10 @@ import ( "golang.org/x/sys/unix" ) +var( + DefaultThrottleRate = -1 // <= 0 for unlimited requests +) + type Transport struct { fd int laddr *net.UDPAddr @@ -21,6 +26,9 @@ type Transport struct { onMessage func(*Message, *net.UDPAddr) // OnCongestion onCongestion func() + + throttlingRate int //available messages per second. If <=0, it is considered disabled + throttleTicketsChannel chan struct{} //channel giving tickets (allowance) to make send a message } func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onCongestion func()) *Transport { @@ -39,6 +47,8 @@ func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onConges t.buffer = make([]byte, 65507) t.onMessage = onMessage t.onCongestion = onCongestion + t.throttleTicketsChannel = make(chan struct{}) + t.throttlingRate = DefaultThrottleRate var err error t.laddr, err = net.ResolveUDPAddr("udp", laddr) @@ -52,6 +62,10 @@ func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onConges return t } +func (t *Transport) SetThrottle(rate int){ + t.throttlingRate = rate +} + func (t *Transport) Start() { // Why check whether the Transport `t` started or not, here and not -for instance- in // t.Terminate()? @@ -80,6 +94,7 @@ func (t *Transport) Start() { } go t.readMessages() + go t.Throttle() } func (t *Transport) Terminate() { @@ -121,7 +136,54 @@ func (t *Transport) readMessages() { } } +func (t *Transport) Throttle(){ + if t.throttlingRate > 0{ + resetChannel := make(chan struct{}) + + dealer := func(resetRequest chan struct{}){ + ticketGiven := 0 + tooManyTicketGiven := false + for{ + select{ + case <- t.throttleTicketsChannel: { + ticketGiven++ + if ticketGiven >= t.throttlingRate{ + tooManyTicketGiven = true + break + } + } + case <- resetRequest: { + return + } + } + + if tooManyTicketGiven{break} + } + + <- resetRequest + return + + } + + go dealer(resetChannel) + for range time.Tick(1*time.Second){ + resetChannel <- struct{}{} + + go dealer(resetChannel) + } + + }else{ + //no limit, keep giving tickets to whoever requests it + for{ + <-t.throttleTicketsChannel + } + } +} + func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) { + //get ticket + t.throttleTicketsChannel <- struct{}{} + data, err := bencode.Marshal(msg) if err != nil { zap.L().Panic("Could NOT marshal an outgoing message! (Programmer error.)") diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index b241f23d7..54bcbe648 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -1,6 +1,7 @@ package main import ( + "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" "math/rand" "net" "os" @@ -141,6 +142,7 @@ func parseFlags() (*opFlags, error) { IndexerMaxNeighbors uint `long:"indexer-max-neighbors" description:"Maximum number of neighbors of an indexer." default:"10000"` LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"200"` + MaxThrottle uint `long:"max-throttle" description:"Maximum requests per second." default:"0"` Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory"` @@ -183,6 +185,8 @@ func parseFlags() (*opFlags, error) { ) } + mainline.DefaultThrottleRate = int(cmdF.MaxThrottle) + opF.Verbosity = len(cmdF.Verbose) opF.Profile = cmdF.Profile