From 8af4b3f755028fbabf3440d674e6fba4cefe4c2c Mon Sep 17 00:00:00 2001 From: sasile Date: Tue, 22 May 2018 14:47:36 +0300 Subject: [PATCH 1/4] histogram optimizations --- http_blaster.go | 108 ++++++++++++++++++++----- httpblaster/executor.go | 40 ++++++--- httpblaster/histogram/latency_hist.go | 36 ++++++--- httpblaster/worker/ingest_worker.go | 17 ++-- httpblaster/worker/perf_worker.go | 15 ++-- httpblaster/worker/worker_base.go | 13 ++- httpblaster/worker/worker_interface.go | 7 +- 7 files changed, 171 insertions(+), 65 deletions(-) diff --git a/http_blaster.go b/http_blaster.go index 59fdc08..9ea68dc 100644 --- a/http_blaster.go +++ b/http_blaster.go @@ -34,7 +34,8 @@ import ( "sync" "sync/atomic" "time" - "github.com/v3io/http_blaster/httpblaster/histogram" + //"github.com/v3io/http_blaster/httpblaster/histogram" + "sort" ) var ( @@ -57,9 +58,9 @@ var ( enable_ui bool ch_put_latency chan time.Duration ch_get_latency chan time.Duration - LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector - LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector - StatusesCollector tui.StatusesCollector + //LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector + //LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector + //StatusesCollector tui.StatusesCollector term_ui *tui.Term_ui dump_failures bool = true dump_location string = "." @@ -165,9 +166,9 @@ func load_test_Config() { } func generate_executors(term_ui *tui.Term_ui) { - ch_put_latency = LatencyCollectorPut.New() - ch_get_latency = LatencyCollectorGet.New() - ch_statuses := StatusesCollector.New(160, 1) + //ch_put_latency = LatencyCollectorPut.New() + //ch_get_latency = LatencyCollectorGet.New() + //ch_statuses := StatusesCollector.New(160, 1) for Name, workload := range cfg.Workloads { log.Println("Adding executor for ", Name) @@ -183,7 +184,7 @@ func generate_executors(term_ui *tui.Term_ui) { TermUi: term_ui, Ch_get_latency: ch_get_latency, Ch_put_latency: ch_put_latency, - Ch_statuses: ch_statuses, + //Ch_statuses: ch_statuses, DumpFailures: dump_failures, DumpLocation: dump_location} executors = append(executors, e) @@ -202,8 +203,8 @@ func wait_for_completion() { log.Println("Wait for executors to finish") ex_group.Wait() end_time = time.Now() - close(ch_get_latency) - close(ch_put_latency) + //close(ch_get_latency) + ///close(ch_put_latency) } func wait_for_ui_completion(ch_done chan struct{}) { @@ -418,7 +419,7 @@ func enable_tui() chan struct{} { case <-tick: //term_ui.Update_put_latency_chart(LatencyCollectorPut.Get()) //term_ui.Update_get_latency_chart(LatencyCollectorGet.Get()) - term_ui.Update_status_codes(StatusesCollector.Get()) + //term_ui.Update_status_codes(StatusesCollector.Get()) term_ui.Refresh_log() term_ui.Render() } @@ -429,6 +430,68 @@ func enable_tui() chan struct{} { return nil } + +func dump_latencies_histograms(){ + latency_get := make(map[int64]int) + latency_put := make(map[int64]int) + total_get:=0 + total_put:=0 + + for _,e := range executors{ + hist := e.LatencyHist() + if e.GetType() == "GET"{ + for k,v:=range hist{ + latency_get[k]+=v + total_get+=v + } + }else{ + for k,v:=range hist{ + latency_put[k]+=v + total_put+=v + } + } + } + dump_latency_histogram(latency_get, total_get, "GET") + dump_latency_histogram(latency_put, total_put, "PUT") + +} + +func dump_latency_histogram(hist map[int64]int, total int, req_type string) ([]string, []float64) { + var keys []int + var prefix string + title := "type \t usec \t\t percentage\n" + if req_type == "GET"{ + prefix = "GetHist" + }else{ + prefix = "PutHist" + } + strout := fmt.Sprintf("%s Latency Histograms:\n", prefix) + for k := range hist { + keys = append(keys, int(k)) + } + sort.Ints(keys) + log.Debugln("latency hist wait released") + res_strings := [] string{} + res_values := [] float64{} + for _,k := range keys{ + v := hist[int64(k)] + res_strings = append(res_strings, fmt.Sprintf("%5d - %5d", + k*100, (k+1)*100) ) + value := float64(v * 100) / float64(total) + res_values = append(res_values,value) + } + + if len(res_strings)>0{ + strout += title + for i, v := range res_strings { + strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix, v,res_values[i]) + } + } + log.Println(strout) + return res_strings, res_values +} + +/* func dump_latencies_histograms() { prefix_get := "GetHist" prefix_put := "PutHist" @@ -460,16 +523,17 @@ func dump_latencies_histograms() { } log.Println(strout) } +*/ -func dump_status_code_histogram() { - log.Println("Status codes:") - labels, values := StatusesCollector.Get() - for i, v := range labels { - if values[i] != 0 { - log.Println(fmt.Sprintf("%v %v%%", v, values[i])) - } - } -} +//func dump_status_code_histogram() { +// log.Println("Status codes:") +// labels, values := StatusesCollector.Get() +// for i, v := range labels { +// if values[i] != 0 { +// log.Println(fmt.Sprintf("%v %v%%", v, values[i])) +// } +// } +//} func main() { parse_cmd_line_args() @@ -478,7 +542,7 @@ func main() { configure_log() log.Println("Starting http_blaster") - defer handle_exit() + //defer handle_exit() //defer close_log_file() defer stop_cpu_profile() defer write_mem_profile() @@ -489,7 +553,7 @@ func main() { wait_for_completion() log.Println("Executors done!") dump_latencies_histograms() - dump_status_code_histogram() + //dump_status_code_histogram() err_code := report() log.Println("Done with error code ", err_code) wait_for_ui_completion(ch_done) diff --git a/httpblaster/executor.go b/httpblaster/executor.go index f0087d2..c6cd4cd 100644 --- a/httpblaster/executor.go +++ b/httpblaster/executor.go @@ -66,7 +66,7 @@ type Executor struct { TermUi *tui.Term_ui Ch_get_latency chan time.Duration Ch_put_latency chan time.Duration - Ch_statuses chan int + //Ch_statuses chan int DumpFailures bool DumpLocation string } @@ -137,6 +137,10 @@ func (self *Executor)GetWorkerType() worker.WorkerType { return worker.INGESTION_WORKER } +func (self *Executor)GetType()string { + return self.Workload.Type +} + func (self *Executor) run(wg *sync.WaitGroup) error { defer wg.Done() self.Start_time = time.Now() @@ -160,15 +164,17 @@ func (self *Executor) run(wg *sync.WaitGroup) error { self.Globals.RetryOnStatusCodes, self.Globals.RetryCount, self.Globals.PemFile, i) self.workers = append(self.workers, w) - var ch_latency chan time.Duration - if self.Workload.Type == "GET" { - ch_latency = self.Ch_get_latency - } else { - ch_latency = self.Ch_put_latency - } + //var ch_latency chan time.Duration + //if self.Workload.Type == "GET" { + // ch_latency = self.Ch_get_latency + //} else { + // ch_latency = self.Ch_put_latency + //} - go w.RunWorker(ch_response, ch_req, &workers_wg, release_req_flag, ch_latency, - self.Ch_statuses, self.DumpFailures, self.DumpLocation) + go w.RunWorker(ch_response, ch_req, + &workers_wg, release_req_flag,// ch_latency, + //self.Ch_statuses, + self.DumpFailures, self.DumpLocation) } ended := make(chan bool) go func() { @@ -239,10 +245,6 @@ LOOP: func (self *Executor) Start(wg *sync.WaitGroup) error { self.results.Statuses = make(map[int]uint64) log.Info("at executor start ", self.Workload) - //self.host = self.Globals.Server - //self.port = self.Globals.Port - //self.tls_mode = self.Globals.TLSMode - //self.Globals.StatusCodesAcceptance = self.Globals.StatusCodesAcceptance go func() { self.run(wg) }() @@ -289,3 +291,15 @@ func (self *Executor) Report() (executor_result, error) { } return self.results, nil } + +func (self *Executor)LatencyHist() map[int64]int { + res := make(map[int64]int) + for _,w := range self.workers{ + hist := w.GetHist() + for k,v:= range hist{ + res[k]+=v + } + } + return res +} + diff --git a/httpblaster/histogram/latency_hist.go b/httpblaster/histogram/latency_hist.go index 67b4c07..b0f223b 100644 --- a/httpblaster/histogram/latency_hist.go +++ b/httpblaster/histogram/latency_hist.go @@ -10,32 +10,44 @@ import ( type LatencyHist struct { ch_values chan time.Duration - hist map[int]int + hist map[int64]int count int64 + size int64 wg sync.WaitGroup } -func (self *LatencyHist) Add(v time.Duration) { - log.Debugln("values added") +func (self *LatencyHist)Add(v time.Duration) { self.ch_values <- v + self.size++ } -func (self *LatencyHist) place(v float64) { - self.hist[int(v/100)]++ +func (self *LatencyHist)Close() { + close(self.ch_values) +} + +func (self *LatencyHist) place(v int64) { + if v > 1000000{ + self.hist[10000]++ + }else if v > 100000{ + self.hist[1000]++ + }else { + self.hist[v/100]++ + } } func (self *LatencyHist)New()chan time.Duration { log.Debugln("new latency hist") - self.hist = make(map[int]int) + self.hist = make(map[int64]int) self.wg.Add(1) self.ch_values = make(chan time.Duration, 10000) + go func() { defer self.wg.Done() for v := range self.ch_values { self.count++ - self.place(float64(v.Nanoseconds() / 1000)) + self.place(v.Nanoseconds() / 1000) } }() return self.ch_values @@ -46,14 +58,13 @@ func (self *LatencyHist) GetResults() ([]string, []float64) { self.wg.Wait() var keys []int for k := range self.hist { - keys = append(keys, k) + keys = append(keys, int(k)) } sort.Ints(keys) - log.Debugln("latency hist wait released") res_strings := [] string{} res_values := [] float64{} for _,k := range keys{ - v := self.hist[k] + v := self.hist[int64(k)] res_strings = append(res_strings, fmt.Sprintf("%5d - %5d", k*100, (k+1)*100) ) value := float64(v * 100) / float64(self.count) @@ -61,3 +72,8 @@ func (self *LatencyHist) GetResults() ([]string, []float64) { } return res_strings, res_values } + +func (self *LatencyHist)GetHistMap()map[int64]int{ + self.wg.Wait() + return self.hist +} \ No newline at end of file diff --git a/httpblaster/worker/ingest_worker.go b/httpblaster/worker/ingest_worker.go index cd3b850..d62aac6 100644 --- a/httpblaster/worker/ingest_worker.go +++ b/httpblaster/worker/ingest_worker.go @@ -89,8 +89,8 @@ func (w *IngestWorker) dump_requests(ch_dump chan *fasthttp.Request, dump_locati func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_req chan *request_generators.Request, wg *sync.WaitGroup, release_req bool, - ch_latency chan time.Duration, - ch_statuses chan int, + //ch_latency chan time.Duration, + //ch_statuses chan int, dump_requests bool, dump_location string) { defer wg.Done() @@ -134,18 +134,18 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r } var err error - var d time.Duration + //var d time.Duration response := request_generators.AcquireResponse() LOOP: for i := 0; i < w.retry_count; i++ { - err, d = w.send_request(submit_request, response) + err, _ = w.send_request(submit_request, response) if err != nil { //retry on error response.Response.Reset() continue } else{ - ch_statuses <- response.Response.StatusCode() - ch_latency <- d + //ch_statuses <- response.Response.StatusCode() + //ch_latency <- d } if response.Response.StatusCode() >= http.StatusBadRequest { if _, ok := w.retry_codes[response.Response.StatusCode()]; !ok { @@ -159,8 +159,8 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r break LOOP } } - ch_statuses <- response.Response.StatusCode() - ch_latency <- d + //ch_statuses <- response.Response.StatusCode() + //ch_latency <- d if response.Response.StatusCode() >= http.StatusBadRequest && response.Response.StatusCode() < http.StatusInternalServerError && dump_requests { @@ -184,5 +184,6 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r close(ch_dump) sync_dump.Wait() } + w.hist.Close() w.close_connection() } diff --git a/httpblaster/worker/perf_worker.go b/httpblaster/worker/perf_worker.go index aff8fb0..c909fa1 100644 --- a/httpblaster/worker/perf_worker.go +++ b/httpblaster/worker/perf_worker.go @@ -24,7 +24,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/v3io/http_blaster/httpblaster/request_generators" "sync" - "time" + //"time" ) type PerfWorker struct { @@ -37,8 +37,8 @@ func (w *PerfWorker) UseBase(c WorkerBase) { func (w *PerfWorker) RunWorker(ch_resp chan *request_generators.Response, ch_req chan *request_generators.Request, wg *sync.WaitGroup, release_req bool, - ch_latency chan time.Duration, - ch_statuses chan int, + //ch_latency chan time.Duration, + //ch_statuses chan int, dump_requests bool, dump_location string) { defer wg.Done() @@ -54,17 +54,18 @@ func (w *PerfWorker) RunWorker(ch_resp chan *request_generators.Response, ch_req req_type.Do(func() { w.Results.Method = string(req.Request.Header.Method()) }) - err, d := w.send_request(req, response) + err, _ := w.send_request(req, response) if err != nil{ log.Errorf("send request failed %s", err.Error()) } - ch_statuses <- response.Response.StatusCode() - ch_latency <- d + //ch_statuses <- response.Response.StatusCode() + //ch_latency <- d request_generators.ReleaseRequest(req) response.Response.Reset() } - + log.Debugln("closing hist") + w.hist.Close() w.close_connection() } diff --git a/httpblaster/worker/worker_base.go b/httpblaster/worker/worker_base.go index f2e0921..4803277 100644 --- a/httpblaster/worker/worker_base.go +++ b/httpblaster/worker/worker_base.go @@ -15,6 +15,7 @@ import ( "os" "time" "sync" + "github.com/v3io/http_blaster/httpblaster/histogram" ) const DialTimeout = 60 * time.Second @@ -35,6 +36,7 @@ type WorkerBase struct { retry_count int timer *time.Timer id int + hist *histogram.LatencyHist } func (w *WorkerBase) open_connection() { @@ -131,6 +133,7 @@ func (w *WorkerBase) send(req *fasthttp.Request, resp *fasthttp.Response, w.timer.Reset(timeout) select { case duration := <-w.ch_duration: + w.hist.Add(duration) return nil, duration case err := <-w.ch_error: log.Debugf("request completed with error:%s", err.Error()) @@ -191,6 +194,10 @@ func (w *WorkerBase) GetResults() worker_results { return w.Results } +func (w *WorkerBase)GetHist() map[int64]int{ + return w.hist.GetHistMap() +} + func NewWorker(worker_type WorkerType, host string, tls_client bool, lazy int, retry_codes []int, retry_count int, pem_file string, id int) Worker { if host == "" { return nil @@ -204,12 +211,14 @@ func NewWorker(worker_type WorkerType, host string, tls_client bool, lazy int, r retry_count = 1 } var worker Worker + hist := &histogram.LatencyHist{} + hist.New() if worker_type == PERFORMANCE_WORKER { worker = &PerfWorker{WorkerBase{host: host, is_tls_client: tls_client, retry_codes: retry_codes_map, - retry_count: retry_count, pem_file: pem_file, id: id}} + retry_count: retry_count, pem_file: pem_file, id: id, hist: hist}} }else{ worker = &IngestWorker{WorkerBase{host: host, is_tls_client: tls_client, retry_codes: retry_codes_map, - retry_count: retry_count, pem_file: pem_file, id: id}} + retry_count: retry_count, pem_file: pem_file, id: id, hist: hist}} } worker.Init(lazy) return worker diff --git a/httpblaster/worker/worker_interface.go b/httpblaster/worker/worker_interface.go index 6608867..5f86eeb 100644 --- a/httpblaster/worker/worker_interface.go +++ b/httpblaster/worker/worker_interface.go @@ -3,19 +3,20 @@ package worker import ( "github.com/v3io/http_blaster/httpblaster/request_generators" "sync" - "time" + //"time" ) type Worker interface { UseBase(c WorkerBase) Init(lazy int) GetResults() worker_results + GetHist() map[int64]int RunWorker(ch_resp chan *request_generators.Response, ch_req chan *request_generators.Request, wg *sync.WaitGroup, release_req bool, - ch_latency chan time.Duration, - ch_statuses chan int, + //ch_latency chan time.Duration, + //ch_statuses chan int, dump_requests bool, dump_location string) } From 496b471bd468632ce7c14bb7fd622ddd687f7dc2 Mon Sep 17 00:00:00 2001 From: sasile Date: Tue, 22 May 2018 16:26:30 +0300 Subject: [PATCH 2/4] histogram + fmt --- http_blaster.go | 122 ++++++++++++--------- httpblaster/executor.go | 25 ++--- httpblaster/histogram/latency_hist.go | 47 ++++---- httpblaster/histogram/latency_hist_test.go | 16 +-- httpblaster/tui/latency_collector.go | 8 +- httpblaster/worker/ingest_worker.go | 2 +- httpblaster/worker/perf_worker.go | 2 +- httpblaster/worker/worker_base.go | 12 +- httpblaster/worker/worker_interface.go | 2 +- httpblaster/worker/worker_type.go | 7 +- 10 files changed, 130 insertions(+), 113 deletions(-) diff --git a/http_blaster.go b/http_blaster.go index 9ea68dc..5985747 100644 --- a/http_blaster.go +++ b/http_blaster.go @@ -22,8 +22,8 @@ package main import ( "flag" "fmt" - log "github.com/sirupsen/logrus" "github.com/Gurpartap/logrus-stack" + log "github.com/sirupsen/logrus" "github.com/v3io/http_blaster/httpblaster" "github.com/v3io/http_blaster/httpblaster/config" "github.com/v3io/http_blaster/httpblaster/tui" @@ -39,31 +39,31 @@ import ( ) var ( - start_time time.Time - end_time time.Time - wl_id int32 = -1 - conf_file string - results_file string - showVersion bool - dataBfr []byte - cpu_profile = false - mem_profile = false - cfg config.TomlConfig - executors []*httpblaster.Executor - ex_group sync.WaitGroup - enable_log bool - log_file *os.File - worker_qd int = 10000 - verbose bool = false - enable_ui bool - ch_put_latency chan time.Duration - ch_get_latency chan time.Duration + start_time time.Time + end_time time.Time + wl_id int32 = -1 + conf_file string + results_file string + showVersion bool + dataBfr []byte + cpu_profile = false + mem_profile = false + cfg config.TomlConfig + executors []*httpblaster.Executor + ex_group sync.WaitGroup + enable_log bool + log_file *os.File + worker_qd int = 10000 + verbose bool = false + enable_ui bool + ch_put_latency chan time.Duration + ch_get_latency chan time.Duration //LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector //LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector //StatusesCollector tui.StatusesCollector - term_ui *tui.Term_ui - dump_failures bool = true - dump_location string = "." + term_ui *tui.Term_ui + dump_failures bool = true + dump_location string = "." ) const AppVersion = "3.0.3" @@ -185,8 +185,8 @@ func generate_executors(term_ui *tui.Term_ui) { Ch_get_latency: ch_get_latency, Ch_put_latency: ch_put_latency, //Ch_statuses: ch_statuses, - DumpFailures: dump_failures, - DumpLocation: dump_location} + DumpFailures: dump_failures, + DumpLocation: dump_location} executors = append(executors, e) } } @@ -430,24 +430,23 @@ func enable_tui() chan struct{} { return nil } - -func dump_latencies_histograms(){ +func dump_latencies_histograms() { latency_get := make(map[int64]int) latency_put := make(map[int64]int) - total_get:=0 - total_put:=0 + total_get := 0 + total_put := 0 - for _,e := range executors{ + for _, e := range executors { hist := e.LatencyHist() - if e.GetType() == "GET"{ - for k,v:=range hist{ - latency_get[k]+=v - total_get+=v + if e.GetType() == "GET" { + for k, v := range hist { + latency_get[k] += v + total_get += v } - }else{ - for k,v:=range hist{ - latency_put[k]+=v - total_put+=v + } else { + for k, v := range hist { + latency_put[k] += v + total_put += v } } } @@ -455,36 +454,61 @@ func dump_latencies_histograms(){ dump_latency_histogram(latency_put, total_put, "PUT") } +func remap_latency_histogram(hist map[int64]int) map[int64]int { + res := make(map[int64]int) + for k, v := range hist { + if k > 10000 { //1 sec + res[10000] += v + } else if k > 5000 { //500 mili + res[5000] += v + } else if k > 1000 { // 100mili + res[1000] += v + } else if k > 100 { //10 mili + res[100] += v + } else if k > 50 { //5 mili + res[50] += v + } else if k > 20 { //2 mili + res[20] += v + } else if k > 10 { //1 mili + res[10] += v + } else { //below 1 mili + res[k] += v + } + } + return res +} -func dump_latency_histogram(hist map[int64]int, total int, req_type string) ([]string, []float64) { +func dump_latency_histogram(histogram map[int64]int, total int, req_type string) ([]string, []float64) { var keys []int var prefix string title := "type \t usec \t\t percentage\n" - if req_type == "GET"{ + if req_type == "GET" { prefix = "GetHist" - }else{ + } else { prefix = "PutHist" } strout := fmt.Sprintf("%s Latency Histograms:\n", prefix) + hist := remap_latency_histogram(histogram) for k := range hist { keys = append(keys, int(k)) } sort.Ints(keys) log.Debugln("latency hist wait released") - res_strings := [] string{} - res_values := [] float64{} - for _,k := range keys{ + res_strings := []string{} + res_values := []float64{} + + for _, k := range keys { v := hist[int64(k)] res_strings = append(res_strings, fmt.Sprintf("%5d - %5d", - k*100, (k+1)*100) ) - value := float64(v * 100) / float64(total) - res_values = append(res_values,value) + k*100, (k+1)*100)) + value := float64(v*100) / float64(total) + res_values = append(res_values, value) } - if len(res_strings)>0{ + if len(res_strings) > 0 { strout += title for i, v := range res_strings { - strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix, v,res_values[i]) + strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix, v, res_values[i]) } } log.Println(strout) diff --git a/httpblaster/executor.go b/httpblaster/executor.go index c6cd4cd..501d045 100644 --- a/httpblaster/executor.go +++ b/httpblaster/executor.go @@ -67,8 +67,8 @@ type Executor struct { Ch_get_latency chan time.Duration Ch_put_latency chan time.Duration //Ch_statuses chan int - DumpFailures bool - DumpLocation string + DumpFailures bool + DumpLocation string } func (self *Executor) load_request_generator() (chan *request_generators.Request, @@ -129,15 +129,15 @@ func (self *Executor) load_request_generator() (chan *request_generators.Request return ch_req, release_req, ch_response } -func (self *Executor)GetWorkerType() worker.WorkerType { +func (self *Executor) GetWorkerType() worker.WorkerType { gen_type := strings.ToLower(self.Workload.Generator) - if gen_type == request_generators.PERFORMANCE{ + if gen_type == request_generators.PERFORMANCE { return worker.PERFORMANCE_WORKER } return worker.INGESTION_WORKER } -func (self *Executor)GetType()string { +func (self *Executor) GetType() string { return self.Workload.Type } @@ -161,8 +161,8 @@ func (self *Executor) run(wg *sync.WaitGroup) error { server := fmt.Sprintf("%s:%s", host_address, self.Globals.Port) w := worker.NewWorker(self.GetWorkerType(), server, self.Globals.TLSMode, self.Workload.Lazy, - self.Globals.RetryOnStatusCodes, - self.Globals.RetryCount, self.Globals.PemFile, i) + self.Globals.RetryOnStatusCodes, + self.Globals.RetryCount, self.Globals.PemFile, i) self.workers = append(self.workers, w) //var ch_latency chan time.Duration //if self.Workload.Type == "GET" { @@ -172,7 +172,7 @@ func (self *Executor) run(wg *sync.WaitGroup) error { //} go w.RunWorker(ch_response, ch_req, - &workers_wg, release_req_flag,// ch_latency, + &workers_wg, release_req_flag, // ch_latency, //self.Ch_statuses, self.DumpFailures, self.DumpLocation) } @@ -292,14 +292,13 @@ func (self *Executor) Report() (executor_result, error) { return self.results, nil } -func (self *Executor)LatencyHist() map[int64]int { +func (self *Executor) LatencyHist() map[int64]int { res := make(map[int64]int) - for _,w := range self.workers{ + for _, w := range self.workers { hist := w.GetHist() - for k,v:= range hist{ - res[k]+=v + for k, v := range hist { + res[k] += v } } return res } - diff --git a/httpblaster/histogram/latency_hist.go b/httpblaster/histogram/latency_hist.go index b0f223b..824b80a 100644 --- a/httpblaster/histogram/latency_hist.go +++ b/httpblaster/histogram/latency_hist.go @@ -1,42 +1,35 @@ package histogram import ( - "time" - "sync" - log "github.com/sirupsen/logrus" "fmt" + log "github.com/sirupsen/logrus" "sort" + "sync" + "time" ) type LatencyHist struct { ch_values chan time.Duration - hist map[int64]int - count int64 - size int64 - wg sync.WaitGroup + hist map[int64]int + count int64 + size int64 + wg sync.WaitGroup } - -func (self *LatencyHist)Add(v time.Duration) { +func (self *LatencyHist) Add(v time.Duration) { self.ch_values <- v self.size++ } -func (self *LatencyHist)Close() { +func (self *LatencyHist) Close() { close(self.ch_values) } -func (self *LatencyHist) place(v int64) { - if v > 1000000{ - self.hist[10000]++ - }else if v > 100000{ - self.hist[1000]++ - }else { - self.hist[v/100]++ - } +func (self *LatencyHist) place(v int64) { + self.hist[v/100]++ } -func (self *LatencyHist)New()chan time.Duration { +func (self *LatencyHist) New() chan time.Duration { log.Debugln("new latency hist") self.hist = make(map[int64]int) self.wg.Add(1) @@ -61,19 +54,19 @@ func (self *LatencyHist) GetResults() ([]string, []float64) { keys = append(keys, int(k)) } sort.Ints(keys) - res_strings := [] string{} - res_values := [] float64{} - for _,k := range keys{ + res_strings := []string{} + res_values := []float64{} + for _, k := range keys { v := self.hist[int64(k)] res_strings = append(res_strings, fmt.Sprintf("%5d - %5d", - k*100, (k+1)*100) ) - value := float64(v * 100) / float64(self.count) - res_values = append(res_values,value) + k*100, (k+1)*100)) + value := float64(v*100) / float64(self.count) + res_values = append(res_values, value) } return res_strings, res_values } -func (self *LatencyHist)GetHistMap()map[int64]int{ +func (self *LatencyHist) GetHistMap() map[int64]int { self.wg.Wait() return self.hist -} \ No newline at end of file +} diff --git a/httpblaster/histogram/latency_hist_test.go b/httpblaster/histogram/latency_hist_test.go index f05e781..038d2ae 100644 --- a/httpblaster/histogram/latency_hist_test.go +++ b/httpblaster/histogram/latency_hist_test.go @@ -1,15 +1,15 @@ package histogram import ( + "math/rand" "testing" "time" - "math/rand" ) func TestLatencyHist_Get(t *testing.T) { l := LatencyHist{} - c:=l.New() - req:= 1000000 + c := l.New() + req := 1000000 go func() { for i := 0; i < req; i++ { @@ -19,11 +19,11 @@ func TestLatencyHist_Get(t *testing.T) { close(c) }() - s,v:= l.GetResults() - total:= float64(0) - for i,_ := range s{ - total+=v[i] + s, v := l.GetResults() + total := float64(0) + for i, _ := range s { + total += v[i] t.Logf("%6v(us)\t\t%3.2f%%", s[i], v[i]) } t.Logf("Total: %3.3f", total) -} \ No newline at end of file +} diff --git a/httpblaster/tui/latency_collector.go b/httpblaster/tui/latency_collector.go index 03c1fef..f6d8839 100644 --- a/httpblaster/tui/latency_collector.go +++ b/httpblaster/tui/latency_collector.go @@ -34,19 +34,17 @@ func (self *LatencyCollector) GetResults() ([]string, []float64) { } -func (self *LatencyCollector) GetQuantile(q float64) (float64) { +func (self *LatencyCollector) GetQuantile(q float64) float64 { return self.WeighHist.CDF(q) } - -func (self *LatencyCollector) GetCount() (float64) { +func (self *LatencyCollector) GetCount() float64 { return self.WeighHist.Count() } - -func (self *LatencyCollector) String() (string) { +func (self *LatencyCollector) String() string { return self.WeighHist.String() } diff --git a/httpblaster/worker/ingest_worker.go b/httpblaster/worker/ingest_worker.go index d62aac6..a2d6403 100644 --- a/httpblaster/worker/ingest_worker.go +++ b/httpblaster/worker/ingest_worker.go @@ -143,7 +143,7 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r //retry on error response.Response.Reset() continue - } else{ + } else { //ch_statuses <- response.Response.StatusCode() //ch_latency <- d } diff --git a/httpblaster/worker/perf_worker.go b/httpblaster/worker/perf_worker.go index c909fa1..a90873b 100644 --- a/httpblaster/worker/perf_worker.go +++ b/httpblaster/worker/perf_worker.go @@ -56,7 +56,7 @@ func (w *PerfWorker) RunWorker(ch_resp chan *request_generators.Response, ch_req }) err, _ := w.send_request(req, response) - if err != nil{ + if err != nil { log.Errorf("send request failed %s", err.Error()) } diff --git a/httpblaster/worker/worker_base.go b/httpblaster/worker/worker_base.go index 4803277..3dec6d9 100644 --- a/httpblaster/worker/worker_base.go +++ b/httpblaster/worker/worker_base.go @@ -8,19 +8,21 @@ import ( "errors" "fmt" log "github.com/sirupsen/logrus" + "github.com/v3io/http_blaster/httpblaster/histogram" "github.com/v3io/http_blaster/httpblaster/request_generators" "github.com/valyala/fasthttp" "io/ioutil" "net" "os" - "time" "sync" - "github.com/v3io/http_blaster/httpblaster/histogram" + "time" ) const DialTimeout = 60 * time.Second const RequestTimeout = 600 * time.Second + var do_once sync.Once + type WorkerBase struct { host string conn net.Conn @@ -36,7 +38,7 @@ type WorkerBase struct { retry_count int timer *time.Timer id int - hist *histogram.LatencyHist + hist *histogram.LatencyHist } func (w *WorkerBase) open_connection() { @@ -194,7 +196,7 @@ func (w *WorkerBase) GetResults() worker_results { return w.Results } -func (w *WorkerBase)GetHist() map[int64]int{ +func (w *WorkerBase) GetHist() map[int64]int { return w.hist.GetHistMap() } @@ -216,7 +218,7 @@ func NewWorker(worker_type WorkerType, host string, tls_client bool, lazy int, r if worker_type == PERFORMANCE_WORKER { worker = &PerfWorker{WorkerBase{host: host, is_tls_client: tls_client, retry_codes: retry_codes_map, retry_count: retry_count, pem_file: pem_file, id: id, hist: hist}} - }else{ + } else { worker = &IngestWorker{WorkerBase{host: host, is_tls_client: tls_client, retry_codes: retry_codes_map, retry_count: retry_count, pem_file: pem_file, id: id, hist: hist}} } diff --git a/httpblaster/worker/worker_interface.go b/httpblaster/worker/worker_interface.go index 5f86eeb..904c6ac 100644 --- a/httpblaster/worker/worker_interface.go +++ b/httpblaster/worker/worker_interface.go @@ -10,7 +10,7 @@ type Worker interface { UseBase(c WorkerBase) Init(lazy int) GetResults() worker_results - GetHist() map[int64]int + GetHist() map[int64]int RunWorker(ch_resp chan *request_generators.Response, ch_req chan *request_generators.Request, wg *sync.WaitGroup, diff --git a/httpblaster/worker/worker_type.go b/httpblaster/worker/worker_type.go index 276b651..f7752b5 100644 --- a/httpblaster/worker/worker_type.go +++ b/httpblaster/worker/worker_type.go @@ -1,7 +1,8 @@ package worker type WorkerType int32 + const ( - PERFORMANCE_WORKER WorkerType = iota - INGESTION_WORKER WorkerType = iota -) \ No newline at end of file + PERFORMANCE_WORKER WorkerType = iota + INGESTION_WORKER WorkerType = iota +) From 12ac34c651ca74c477c10575775b1603fdd8241b Mon Sep 17 00:00:00 2001 From: sasile Date: Tue, 22 May 2018 17:21:30 +0300 Subject: [PATCH 3/4] cleanup --- http_blaster.go | 47 ++--------------------------------------------- 1 file changed, 2 insertions(+), 45 deletions(-) diff --git a/http_blaster.go b/http_blaster.go index 5985747..6a155ad 100644 --- a/http_blaster.go +++ b/http_blaster.go @@ -454,6 +454,7 @@ func dump_latencies_histograms() { dump_latency_histogram(latency_put, total_put, "PUT") } + func remap_latency_histogram(hist map[int64]int) map[int64]int { res := make(map[int64]int) for k, v := range hist { @@ -499,8 +500,7 @@ func dump_latency_histogram(histogram map[int64]int, total int, req_type string) for _, k := range keys { v := hist[int64(k)] - res_strings = append(res_strings, fmt.Sprintf("%5d - %5d", - k*100, (k+1)*100)) + res_strings = append(res_strings, fmt.Sprintf("%5d", k*100)) value := float64(v*100) / float64(total) res_values = append(res_values, value) } @@ -515,49 +515,6 @@ func dump_latency_histogram(histogram map[int64]int, total int, req_type string) return res_strings, res_values } -/* -func dump_latencies_histograms() { - prefix_get := "GetHist" - prefix_put := "PutHist" - title := "type \t usec \t\t percentage\n" - strout := "Latency Histograms:\n" - log.Println("LatencyCollectorGet") - vs_get, ls_get := LatencyCollectorGet.GetResults() - if len(vs_get) >0 { - strout += "Get latency histogram:\n" - strout += title - //total := float64(0) - for i, v := range vs_get { - //value, _ := strconv.ParseFloat(v, 64) - //total += ls_get[i] - strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_get, v,ls_get[i]) - } - //strout += fmt.Sprintf("total: %v", total) - } - vs_put, ls_put := LatencyCollectorPut.GetResults() - if len(vs_put) >0 { - strout += "Put latency histogram:\n" - strout += title - - for i, v := range vs_put { - if ls_put[i] != 0 { - strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_put, v,ls_put[i]) - } - } - } - log.Println(strout) -} -*/ - -//func dump_status_code_histogram() { -// log.Println("Status codes:") -// labels, values := StatusesCollector.Get() -// for i, v := range labels { -// if values[i] != 0 { -// log.Println(fmt.Sprintf("%v %v%%", v, values[i])) -// } -// } -//} func main() { parse_cmd_line_args() From 2f28cf070067982ff93500c3858411dcac38848a Mon Sep 17 00:00:00 2001 From: sasile Date: Tue, 22 May 2018 17:22:04 +0300 Subject: [PATCH 4/4] version 3.0.5 --- http_blaster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http_blaster.go b/http_blaster.go index 6a155ad..e4f6a32 100644 --- a/http_blaster.go +++ b/http_blaster.go @@ -66,7 +66,7 @@ var ( dump_location string = "." ) -const AppVersion = "3.0.3" +const AppVersion = "3.0.5" func init() { const (