Skip to content

Commit

Permalink
Merge pull request #3 from criteo/next
Browse files Browse the repository at this point in the history
Add additional options
  • Loading branch information
pierresouchay authored Dec 18, 2018
2 parents fd7464a + 46a888a commit b0a3858
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 31 deletions.
46 changes: 44 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"flag"
"log"
"os"
"os/signal"
"sync"
"time"

consul "github.com/hashicorp/consul/api"
Expand All @@ -22,6 +24,9 @@ func main() {
stale := flag.Bool("query-stale", false, "Run stale blocking queries")
token := flag.String("token", "", "ACL token")
watchers := flag.Int("watchers", 1, "Number of concurrnet watchers on service")
monitor := flag.Int("monitor", 0, "Consul PID")
runtime := flag.Duration("time", 0, "Time to run the benchmark")
latepc := flag.Float64("late-ratio", 0, "Ratio of late callers")
flag.Parse()

if *token == "" {
Expand Down Expand Up @@ -51,14 +56,51 @@ func main() {
return
}

done := make(chan struct{})
var wg sync.WaitGroup

if *monitor > 0 {
wg.Add(1)
go func() {
Monitor(int32(*monitor), stats, done)
wg.Done()
}()
}

if *runtime > 0 {
go func() {
time.Sleep(*runtime)
close(done)
}()
}

var qf queryFn
if !*useRPC {
qf = QueryAgent(c, *serviceName, *wait, *stale)
} else {
qf = QueryServer(*rpcAddr, *dc, *serviceName, *wait, *stale)
}

go RunQueries(qf, *watchers, stats)
wg.Add(1)
go func() {
RunQueries(qf, *watchers, *latepc, stats, done)
wg.Done()
}()

wg.Add(1)
go func() {
DisplayStats(stats, done)
wg.Done()
}()

go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
<-signals
close(done)
}()

DisplayStats(stats)
<-done
wg.Wait()
os.Exit(0)
}
38 changes: 38 additions & 0 deletions monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"log"
"time"

"github.com/shirou/gopsutil/process"
)

func Monitor(pid int32, stats chan Stat, done chan struct{}) {
proc, err := process.NewProcess(pid)
if err != nil {
log.Fatal(err)
}
proc.Percent(0)

tick := time.Tick(time.Second)
for {
select {
case <-done:
return
case <-tick:
p, err := proc.Percent(0)
if err != nil {
log.Println(err)
} else {
select {
case stats <- Stat{
Label: "CPU",
Value: p,
}:
case <-done:
return
}
}
}
}
}
31 changes: 17 additions & 14 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package main

import (
"log"
"math/rand"
"net"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

Expand All @@ -16,27 +16,26 @@ import (

type queryFn func(uint64) (uint64, error)

func RunQueries(fn queryFn, count int, stats chan Stat) error {
func RunQueries(fn queryFn, count int, lateRatio float64, stats chan Stat, done chan struct{}) error {
log.Println("Starting", count, "watchers...")

var qps int32
var lateQps int32

errs := make(chan error, 1)
done := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-done:
return
default:
}

index := uint64(0)
index := uint64(1)
var err error
var i int
for {
if rand.Float64() <= lateRatio {
atomic.AddInt32(&lateQps, 1)
index -= 50
if index <= 0 {
index = 1
}
}
index, err = fn(index)
if err != nil {
select {
Expand All @@ -46,17 +45,21 @@ func RunQueries(fn queryFn, count int, stats chan Stat) error {
return
}
atomic.AddInt32(&qps, 1)
i++
}
}()
}
go func() {
for range time.Tick(time.Second) {
c := atomic.SwapInt32(&qps, 0)
lc := atomic.SwapInt32(&lateQps, 0)
stats <- Stat{"QPS", float64(c)}
stats <- Stat{"Late QPS", float64(lc)}
}
}()
log.Println("Watchers started.")
wg.Wait()

<-done
select {
case err := <-errs:
return err
Expand Down
69 changes: 54 additions & 15 deletions stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,75 @@ type Stat struct {
Value float64
}

func DisplayStats(ch <-chan Stat) {
func DisplayStats(ch <-chan Stat, done chan struct{}) {
type statC struct {
Stat
Count int
}

var l sync.Mutex
entries := map[string]Stat{}
avg := make(map[string]*statC)
entries := make(map[string]Stat)

go func() {
for {
e := <-ch
l.Lock()
entries[e.Label] = e
if _, ok := avg[e.Label]; !ok {
avg[e.Label] = &statC{
Count: 0,
Stat: e,
}
}
avg[e.Label].Value = (avg[e.Label].Value*float64(avg[e.Label].Count) + e.Value) / float64(avg[e.Label].Count+1)
avg[e.Label].Count++
l.Unlock()
}
}()

for range time.Tick(time.Second) {
l.Lock()
var entriesSlice []Stat
for _, e := range entries {
entriesSlice = append(entriesSlice, e)
}

sort.Slice(entriesSlice, func(i, j int) bool {
return strings.Compare(entriesSlice[i].Label, entriesSlice[j].Label) < 0
printLine := func(entries []Stat) {
sort.Slice(entries, func(i, j int) bool {
return strings.Compare(entries[i].Label, entries[j].Label) < 0
})
var s []string
for _, e := range entriesSlice {
s = append(s, fmt.Sprintf("%s: %f", e.Label, e.Value))
for _, e := range entries {
s = append(s, fmt.Sprintf("%s: %.2f", e.Label, e.Value))
}

log.Println(strings.Join(s, ", "))
entriesSlice = entriesSlice[:0]
l.Unlock()
}

start := time.Now()
tick := time.Tick(time.Second)

for {
select {
case <-done:
l.Lock()
entriesSlice := []Stat{{
Label: "Runtime (s)",
Value: time.Since(start).Seconds(),
}}
for _, e := range avg {
entriesSlice = append(entriesSlice, e.Stat)
}

log.Println("====== Summary ======")
printLine(entriesSlice)
l.Unlock()
return
case <-tick:
l.Lock()
var entriesSlice []Stat
for _, e := range entries {
entriesSlice = append(entriesSlice, e)
}

printLine(entriesSlice)

entriesSlice = entriesSlice[:0]
l.Unlock()
}
}
}

0 comments on commit b0a3858

Please sign in to comment.