diff --git a/http_blaster.go b/http_blaster.go index 1a2881c..59fdc08 100644 --- a/http_blaster.go +++ b/http_blaster.go @@ -34,7 +34,7 @@ import ( "sync" "sync/atomic" "time" - "strconv" + "github.com/v3io/http_blaster/httpblaster/histogram" ) var ( @@ -55,8 +55,10 @@ var ( worker_qd int = 10000 verbose bool = false enable_ui bool - LatencyCollectorGet tui.LatencyCollector - LatencyCollectorPut tui.LatencyCollector + ch_put_latency chan time.Duration + ch_get_latency chan time.Duration + LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector + LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector StatusesCollector tui.StatusesCollector term_ui *tui.Term_ui dump_failures bool = true @@ -163,8 +165,8 @@ func load_test_Config() { } func generate_executors(term_ui *tui.Term_ui) { - ch_put_latency := LatencyCollectorPut.New(160, 1) - ch_get_latency := LatencyCollectorGet.New(160, 1) + ch_put_latency = LatencyCollectorPut.New() + ch_get_latency = LatencyCollectorGet.New() ch_statuses := StatusesCollector.New(160, 1) for Name, workload := range cfg.Workloads { @@ -200,6 +202,8 @@ func wait_for_completion() { log.Println("Wait for executors to finish") ex_group.Wait() end_time = time.Now() + close(ch_get_latency) + close(ch_put_latency) } func wait_for_ui_completion(ch_done chan struct{}) { @@ -396,7 +400,7 @@ func exit(err_code int) { func handle_exit() { if err := recover(); err != nil { log.Println(err) - os.Exit(1) + log.Exit(1) } } @@ -412,8 +416,8 @@ func enable_tui() chan struct{} { case <-ch_done: return case <-tick: - term_ui.Update_put_latency_chart(LatencyCollectorPut.Get()) - term_ui.Update_get_latency_chart(LatencyCollectorGet.Get()) + //term_ui.Update_put_latency_chart(LatencyCollectorPut.Get()) + //term_ui.Update_get_latency_chart(LatencyCollectorGet.Get()) term_ui.Update_status_codes(StatusesCollector.Get()) term_ui.Refresh_log() term_ui.Render() @@ -428,17 +432,20 @@ func enable_tui() chan struct{} { func dump_latencies_histograms() { prefix_get := "GetHist" prefix_put := "PutHist" - title := "type \t usec \t\t\t percentage\n" + title := "type \t usec \t\t percentage\n" strout := "Latency Histograms:\n" log.Println("LatencyCollectorGet") vs_get, ls_get := LatencyCollectorGet.GetResults() if len(vs_get) >0 { strout += "Get latency histogram:\n" strout += title + //total := float64(0) for i, v := range vs_get { - value, _ := strconv.ParseFloat(v, 64) - strout += fmt.Sprintf("%s: %.3f \t\t %.4f%%\n", prefix_get, value, ls_get[i]) + //value, _ := strconv.ParseFloat(v, 64) + //total += ls_get[i] + strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_get, v,ls_get[i]) } + //strout += fmt.Sprintf("total: %v", total) } vs_put, ls_put := LatencyCollectorPut.GetResults() if len(vs_put) >0 { @@ -447,8 +454,7 @@ func dump_latencies_histograms() { for i, v := range vs_put { if ls_put[i] != 0 { - value, _ := strconv.ParseFloat(v, 64) - strout += fmt.Sprintf("%s:%.3f \t\t %.4f%%\n", prefix_put, value, ls_put[i]) + strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_put, v,ls_put[i]) } } } @@ -473,7 +479,7 @@ func main() { log.Println("Starting http_blaster") defer handle_exit() - defer close_log_file() + //defer close_log_file() defer stop_cpu_profile() defer write_mem_profile() diff --git a/httpblaster/histogram/latency_hist.go b/httpblaster/histogram/latency_hist.go new file mode 100644 index 0000000..67b4c07 --- /dev/null +++ b/httpblaster/histogram/latency_hist.go @@ -0,0 +1,63 @@ +package histogram + +import ( + "time" + "sync" + log "github.com/sirupsen/logrus" + "fmt" + "sort" +) + +type LatencyHist struct { + ch_values chan time.Duration + hist map[int]int + count int64 + wg sync.WaitGroup +} + + +func (self *LatencyHist) Add(v time.Duration) { + log.Debugln("values added") + self.ch_values <- v +} + +func (self *LatencyHist) place(v float64) { + self.hist[int(v/100)]++ +} + +func (self *LatencyHist)New()chan time.Duration { + log.Debugln("new latency hist") + self.hist = make(map[int]int) + self.wg.Add(1) + + self.ch_values = make(chan time.Duration, 10000) + go func() { + defer self.wg.Done() + for v := range self.ch_values { + self.count++ + self.place(float64(v.Nanoseconds() / 1000)) + } + }() + return self.ch_values +} + +func (self *LatencyHist) GetResults() ([]string, []float64) { + log.Debugln("get latency hist") + self.wg.Wait() + var keys []int + for k := range self.hist { + keys = append(keys, k) + } + sort.Ints(keys) + log.Debugln("latency hist wait released") + res_strings := [] string{} + res_values := [] float64{} + for _,k := range keys{ + v := self.hist[k] + res_strings = append(res_strings, fmt.Sprintf("%5d - %5d", + k*100, (k+1)*100) ) + value := float64(v * 100) / float64(self.count) + res_values = append(res_values,value) + } + return res_strings, res_values +} diff --git a/httpblaster/histogram/latency_hist_test.go b/httpblaster/histogram/latency_hist_test.go new file mode 100644 index 0000000..f05e781 --- /dev/null +++ b/httpblaster/histogram/latency_hist_test.go @@ -0,0 +1,29 @@ +package histogram + +import ( + "testing" + "time" + "math/rand" +) + +func TestLatencyHist_Get(t *testing.T) { + l := LatencyHist{} + c:=l.New() + req:= 1000000 + + go func() { + for i := 0; i < req; i++ { + l.Add(time.Microsecond * time.Duration(rand.Intn(2000))) + + } + close(c) + }() + + s,v:= l.GetResults() + total:= float64(0) + for i,_ := range s{ + total+=v[i] + t.Logf("%6v(us)\t\t%3.2f%%", s[i], v[i]) + } + t.Logf("Total: %3.3f", total) +} \ No newline at end of file diff --git a/httpblaster/tui/latency_collector.go b/httpblaster/tui/latency_collector.go index 9477b3a..03c1fef 100644 --- a/httpblaster/tui/latency_collector.go +++ b/httpblaster/tui/latency_collector.go @@ -11,7 +11,7 @@ type LatencyCollector struct { } func (self *LatencyCollector) New(n int, alpha float64) chan time.Duration { - self.WeighHist = gohistogram.NewHistogram(20) + self.WeighHist = gohistogram.NewHistogram(50) self.ch_values = make(chan time.Duration, 400000) go func() { for v := range self.ch_values { diff --git a/httpblaster/worker/ingest_worker.go b/httpblaster/worker/ingest_worker.go index 3df33d8..cd3b850 100644 --- a/httpblaster/worker/ingest_worker.go +++ b/httpblaster/worker/ingest_worker.go @@ -133,16 +133,15 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r oncePrepare.Do(prepareRequest) } - var response *request_generators.Response var err error var d time.Duration + response := request_generators.AcquireResponse() LOOP: for i := 0; i < w.retry_count; i++ { - response := request_generators.AcquireResponse() err, d = w.send_request(submit_request, response) if err != nil { //retry on error - request_generators.ReleaseResponse(response) + response.Response.Reset() continue } else{ ch_statuses <- response.Response.StatusCode() @@ -154,7 +153,7 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r break LOOP } else if i+1 < w.retry_count { //not the last loop - request_generators.ReleaseResponse(response) + response.Response.Reset() } } else { break LOOP