diff --git a/http_blaster.go b/http_blaster.go index 59fdc08..e4f6a32 100644 --- a/http_blaster.go +++ b/http_blaster.go @@ -22,8 +22,8 @@ package main import ( "flag" "fmt" - log "github.com/sirupsen/logrus" "github.com/Gurpartap/logrus-stack" + log "github.com/sirupsen/logrus" "github.com/v3io/http_blaster/httpblaster" "github.com/v3io/http_blaster/httpblaster/config" "github.com/v3io/http_blaster/httpblaster/tui" @@ -34,38 +34,39 @@ import ( "sync" "sync/atomic" "time" - "github.com/v3io/http_blaster/httpblaster/histogram" + //"github.com/v3io/http_blaster/httpblaster/histogram" + "sort" ) var ( - start_time time.Time - end_time time.Time - wl_id int32 = -1 - conf_file string - results_file string - showVersion bool - dataBfr []byte - cpu_profile = false - mem_profile = false - cfg config.TomlConfig - executors []*httpblaster.Executor - ex_group sync.WaitGroup - enable_log bool - log_file *os.File - worker_qd int = 10000 - verbose bool = false - enable_ui bool - ch_put_latency chan time.Duration - ch_get_latency chan time.Duration - LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector - LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector - StatusesCollector tui.StatusesCollector - term_ui *tui.Term_ui - dump_failures bool = true - dump_location string = "." + start_time time.Time + end_time time.Time + wl_id int32 = -1 + conf_file string + results_file string + showVersion bool + dataBfr []byte + cpu_profile = false + mem_profile = false + cfg config.TomlConfig + executors []*httpblaster.Executor + ex_group sync.WaitGroup + enable_log bool + log_file *os.File + worker_qd int = 10000 + verbose bool = false + enable_ui bool + ch_put_latency chan time.Duration + ch_get_latency chan time.Duration + //LatencyCollectorGet histogram.LatencyHist// tui.LatencyCollector + //LatencyCollectorPut histogram.LatencyHist//tui.LatencyCollector + //StatusesCollector tui.StatusesCollector + term_ui *tui.Term_ui + dump_failures bool = true + dump_location string = "." ) -const AppVersion = "3.0.3" +const AppVersion = "3.0.5" func init() { const ( @@ -165,9 +166,9 @@ func load_test_Config() { } func generate_executors(term_ui *tui.Term_ui) { - ch_put_latency = LatencyCollectorPut.New() - ch_get_latency = LatencyCollectorGet.New() - ch_statuses := StatusesCollector.New(160, 1) + //ch_put_latency = LatencyCollectorPut.New() + //ch_get_latency = LatencyCollectorGet.New() + //ch_statuses := StatusesCollector.New(160, 1) for Name, workload := range cfg.Workloads { log.Println("Adding executor for ", Name) @@ -183,9 +184,9 @@ func generate_executors(term_ui *tui.Term_ui) { TermUi: term_ui, Ch_get_latency: ch_get_latency, Ch_put_latency: ch_put_latency, - Ch_statuses: ch_statuses, - DumpFailures: dump_failures, - DumpLocation: dump_location} + //Ch_statuses: ch_statuses, + DumpFailures: dump_failures, + DumpLocation: dump_location} executors = append(executors, e) } } @@ -202,8 +203,8 @@ func wait_for_completion() { log.Println("Wait for executors to finish") ex_group.Wait() end_time = time.Now() - close(ch_get_latency) - close(ch_put_latency) + //close(ch_get_latency) + ///close(ch_put_latency) } func wait_for_ui_completion(ch_done chan struct{}) { @@ -418,7 +419,7 @@ func enable_tui() chan struct{} { case <-tick: //term_ui.Update_put_latency_chart(LatencyCollectorPut.Get()) //term_ui.Update_get_latency_chart(LatencyCollectorGet.Get()) - term_ui.Update_status_codes(StatusesCollector.Get()) + //term_ui.Update_status_codes(StatusesCollector.Get()) term_ui.Refresh_log() term_ui.Render() } @@ -430,47 +431,91 @@ func enable_tui() chan struct{} { } func dump_latencies_histograms() { - prefix_get := "GetHist" - prefix_put := "PutHist" - title := "type \t usec \t\t percentage\n" - strout := "Latency Histograms:\n" - log.Println("LatencyCollectorGet") - vs_get, ls_get := LatencyCollectorGet.GetResults() - if len(vs_get) >0 { - strout += "Get latency histogram:\n" - strout += title - //total := float64(0) - for i, v := range vs_get { - //value, _ := strconv.ParseFloat(v, 64) - //total += ls_get[i] - strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_get, v,ls_get[i]) + latency_get := make(map[int64]int) + latency_put := make(map[int64]int) + total_get := 0 + total_put := 0 + + for _, e := range executors { + hist := e.LatencyHist() + if e.GetType() == "GET" { + for k, v := range hist { + latency_get[k] += v + total_get += v + } + } else { + for k, v := range hist { + latency_put[k] += v + total_put += v + } } - //strout += fmt.Sprintf("total: %v", total) } - vs_put, ls_put := LatencyCollectorPut.GetResults() - if len(vs_put) >0 { - strout += "Put latency histogram:\n" - strout += title + dump_latency_histogram(latency_get, total_get, "GET") + dump_latency_histogram(latency_put, total_put, "PUT") - for i, v := range vs_put { - if ls_put[i] != 0 { - strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix_put, v,ls_put[i]) - } +} + +func remap_latency_histogram(hist map[int64]int) map[int64]int { + res := make(map[int64]int) + for k, v := range hist { + if k > 10000 { //1 sec + res[10000] += v + } else if k > 5000 { //500 mili + res[5000] += v + } else if k > 1000 { // 100mili + res[1000] += v + } else if k > 100 { //10 mili + res[100] += v + } else if k > 50 { //5 mili + res[50] += v + } else if k > 20 { //2 mili + res[20] += v + } else if k > 10 { //1 mili + res[10] += v + } else { //below 1 mili + res[k] += v } } - log.Println(strout) + return res } -func dump_status_code_histogram() { - log.Println("Status codes:") - labels, values := StatusesCollector.Get() - for i, v := range labels { - if values[i] != 0 { - log.Println(fmt.Sprintf("%v %v%%", v, values[i])) +func dump_latency_histogram(histogram map[int64]int, total int, req_type string) ([]string, []float64) { + var keys []int + var prefix string + title := "type \t usec \t\t percentage\n" + if req_type == "GET" { + prefix = "GetHist" + } else { + prefix = "PutHist" + } + strout := fmt.Sprintf("%s Latency Histograms:\n", prefix) + hist := remap_latency_histogram(histogram) + for k := range hist { + keys = append(keys, int(k)) + } + sort.Ints(keys) + log.Debugln("latency hist wait released") + res_strings := []string{} + res_values := []float64{} + + for _, k := range keys { + v := hist[int64(k)] + res_strings = append(res_strings, fmt.Sprintf("%5d", k*100)) + value := float64(v*100) / float64(total) + res_values = append(res_values, value) + } + + if len(res_strings) > 0 { + strout += title + for i, v := range res_strings { + strout += fmt.Sprintf("%s: %s \t\t %3.4f%%\n", prefix, v, res_values[i]) } } + log.Println(strout) + return res_strings, res_values } + func main() { parse_cmd_line_args() load_test_Config() @@ -478,7 +523,7 @@ func main() { configure_log() log.Println("Starting http_blaster") - defer handle_exit() + //defer handle_exit() //defer close_log_file() defer stop_cpu_profile() defer write_mem_profile() @@ -489,7 +534,7 @@ func main() { wait_for_completion() log.Println("Executors done!") dump_latencies_histograms() - dump_status_code_histogram() + //dump_status_code_histogram() err_code := report() log.Println("Done with error code ", err_code) wait_for_ui_completion(ch_done) diff --git a/httpblaster/executor.go b/httpblaster/executor.go index f0087d2..501d045 100644 --- a/httpblaster/executor.go +++ b/httpblaster/executor.go @@ -66,9 +66,9 @@ type Executor struct { TermUi *tui.Term_ui Ch_get_latency chan time.Duration Ch_put_latency chan time.Duration - Ch_statuses chan int - DumpFailures bool - DumpLocation string + //Ch_statuses chan int + DumpFailures bool + DumpLocation string } func (self *Executor) load_request_generator() (chan *request_generators.Request, @@ -129,14 +129,18 @@ func (self *Executor) load_request_generator() (chan *request_generators.Request return ch_req, release_req, ch_response } -func (self *Executor)GetWorkerType() worker.WorkerType { +func (self *Executor) GetWorkerType() worker.WorkerType { gen_type := strings.ToLower(self.Workload.Generator) - if gen_type == request_generators.PERFORMANCE{ + if gen_type == request_generators.PERFORMANCE { return worker.PERFORMANCE_WORKER } return worker.INGESTION_WORKER } +func (self *Executor) GetType() string { + return self.Workload.Type +} + func (self *Executor) run(wg *sync.WaitGroup) error { defer wg.Done() self.Start_time = time.Now() @@ -157,18 +161,20 @@ func (self *Executor) run(wg *sync.WaitGroup) error { server := fmt.Sprintf("%s:%s", host_address, self.Globals.Port) w := worker.NewWorker(self.GetWorkerType(), server, self.Globals.TLSMode, self.Workload.Lazy, - self.Globals.RetryOnStatusCodes, - self.Globals.RetryCount, self.Globals.PemFile, i) + self.Globals.RetryOnStatusCodes, + self.Globals.RetryCount, self.Globals.PemFile, i) self.workers = append(self.workers, w) - var ch_latency chan time.Duration - if self.Workload.Type == "GET" { - ch_latency = self.Ch_get_latency - } else { - ch_latency = self.Ch_put_latency - } + //var ch_latency chan time.Duration + //if self.Workload.Type == "GET" { + // ch_latency = self.Ch_get_latency + //} else { + // ch_latency = self.Ch_put_latency + //} - go w.RunWorker(ch_response, ch_req, &workers_wg, release_req_flag, ch_latency, - self.Ch_statuses, self.DumpFailures, self.DumpLocation) + go w.RunWorker(ch_response, ch_req, + &workers_wg, release_req_flag, // ch_latency, + //self.Ch_statuses, + self.DumpFailures, self.DumpLocation) } ended := make(chan bool) go func() { @@ -239,10 +245,6 @@ LOOP: func (self *Executor) Start(wg *sync.WaitGroup) error { self.results.Statuses = make(map[int]uint64) log.Info("at executor start ", self.Workload) - //self.host = self.Globals.Server - //self.port = self.Globals.Port - //self.tls_mode = self.Globals.TLSMode - //self.Globals.StatusCodesAcceptance = self.Globals.StatusCodesAcceptance go func() { self.run(wg) }() @@ -289,3 +291,14 @@ func (self *Executor) Report() (executor_result, error) { } return self.results, nil } + +func (self *Executor) LatencyHist() map[int64]int { + res := make(map[int64]int) + for _, w := range self.workers { + hist := w.GetHist() + for k, v := range hist { + res[k] += v + } + } + return res +} diff --git a/httpblaster/histogram/latency_hist.go b/httpblaster/histogram/latency_hist.go index 67b4c07..824b80a 100644 --- a/httpblaster/histogram/latency_hist.go +++ b/httpblaster/histogram/latency_hist.go @@ -1,41 +1,46 @@ package histogram import ( - "time" - "sync" - log "github.com/sirupsen/logrus" "fmt" + log "github.com/sirupsen/logrus" "sort" + "sync" + "time" ) type LatencyHist struct { ch_values chan time.Duration - hist map[int]int - count int64 - wg sync.WaitGroup + hist map[int64]int + count int64 + size int64 + wg sync.WaitGroup } - func (self *LatencyHist) Add(v time.Duration) { - log.Debugln("values added") self.ch_values <- v + self.size++ } -func (self *LatencyHist) place(v float64) { - self.hist[int(v/100)]++ +func (self *LatencyHist) Close() { + close(self.ch_values) } -func (self *LatencyHist)New()chan time.Duration { +func (self *LatencyHist) place(v int64) { + self.hist[v/100]++ +} + +func (self *LatencyHist) New() chan time.Duration { log.Debugln("new latency hist") - self.hist = make(map[int]int) + self.hist = make(map[int64]int) self.wg.Add(1) self.ch_values = make(chan time.Duration, 10000) + go func() { defer self.wg.Done() for v := range self.ch_values { self.count++ - self.place(float64(v.Nanoseconds() / 1000)) + self.place(v.Nanoseconds() / 1000) } }() return self.ch_values @@ -46,18 +51,22 @@ func (self *LatencyHist) GetResults() ([]string, []float64) { self.wg.Wait() var keys []int for k := range self.hist { - keys = append(keys, k) + keys = append(keys, int(k)) } sort.Ints(keys) - log.Debugln("latency hist wait released") - res_strings := [] string{} - res_values := [] float64{} - for _,k := range keys{ - v := self.hist[k] + res_strings := []string{} + res_values := []float64{} + for _, k := range keys { + v := self.hist[int64(k)] res_strings = append(res_strings, fmt.Sprintf("%5d - %5d", - k*100, (k+1)*100) ) - value := float64(v * 100) / float64(self.count) - res_values = append(res_values,value) + k*100, (k+1)*100)) + value := float64(v*100) / float64(self.count) + res_values = append(res_values, value) } return res_strings, res_values } + +func (self *LatencyHist) GetHistMap() map[int64]int { + self.wg.Wait() + return self.hist +} diff --git a/httpblaster/histogram/latency_hist_test.go b/httpblaster/histogram/latency_hist_test.go index f05e781..038d2ae 100644 --- a/httpblaster/histogram/latency_hist_test.go +++ b/httpblaster/histogram/latency_hist_test.go @@ -1,15 +1,15 @@ package histogram import ( + "math/rand" "testing" "time" - "math/rand" ) func TestLatencyHist_Get(t *testing.T) { l := LatencyHist{} - c:=l.New() - req:= 1000000 + c := l.New() + req := 1000000 go func() { for i := 0; i < req; i++ { @@ -19,11 +19,11 @@ func TestLatencyHist_Get(t *testing.T) { close(c) }() - s,v:= l.GetResults() - total:= float64(0) - for i,_ := range s{ - total+=v[i] + s, v := l.GetResults() + total := float64(0) + for i, _ := range s { + total += v[i] t.Logf("%6v(us)\t\t%3.2f%%", s[i], v[i]) } t.Logf("Total: %3.3f", total) -} \ No newline at end of file +} diff --git a/httpblaster/tui/latency_collector.go b/httpblaster/tui/latency_collector.go index 03c1fef..f6d8839 100644 --- a/httpblaster/tui/latency_collector.go +++ b/httpblaster/tui/latency_collector.go @@ -34,19 +34,17 @@ func (self *LatencyCollector) GetResults() ([]string, []float64) { } -func (self *LatencyCollector) GetQuantile(q float64) (float64) { +func (self *LatencyCollector) GetQuantile(q float64) float64 { return self.WeighHist.CDF(q) } - -func (self *LatencyCollector) GetCount() (float64) { +func (self *LatencyCollector) GetCount() float64 { return self.WeighHist.Count() } - -func (self *LatencyCollector) String() (string) { +func (self *LatencyCollector) String() string { return self.WeighHist.String() } diff --git a/httpblaster/worker/ingest_worker.go b/httpblaster/worker/ingest_worker.go index cd3b850..a2d6403 100644 --- a/httpblaster/worker/ingest_worker.go +++ b/httpblaster/worker/ingest_worker.go @@ -89,8 +89,8 @@ func (w *IngestWorker) dump_requests(ch_dump chan *fasthttp.Request, dump_locati func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_req chan *request_generators.Request, wg *sync.WaitGroup, release_req bool, - ch_latency chan time.Duration, - ch_statuses chan int, + //ch_latency chan time.Duration, + //ch_statuses chan int, dump_requests bool, dump_location string) { defer wg.Done() @@ -134,18 +134,18 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r } var err error - var d time.Duration + //var d time.Duration response := request_generators.AcquireResponse() LOOP: for i := 0; i < w.retry_count; i++ { - err, d = w.send_request(submit_request, response) + err, _ = w.send_request(submit_request, response) if err != nil { //retry on error response.Response.Reset() continue - } else{ - ch_statuses <- response.Response.StatusCode() - ch_latency <- d + } else { + //ch_statuses <- response.Response.StatusCode() + //ch_latency <- d } if response.Response.StatusCode() >= http.StatusBadRequest { if _, ok := w.retry_codes[response.Response.StatusCode()]; !ok { @@ -159,8 +159,8 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r break LOOP } } - ch_statuses <- response.Response.StatusCode() - ch_latency <- d + //ch_statuses <- response.Response.StatusCode() + //ch_latency <- d if response.Response.StatusCode() >= http.StatusBadRequest && response.Response.StatusCode() < http.StatusInternalServerError && dump_requests { @@ -184,5 +184,6 @@ func (w *IngestWorker) RunWorker(ch_resp chan *request_generators.Response, ch_r close(ch_dump) sync_dump.Wait() } + w.hist.Close() w.close_connection() } diff --git a/httpblaster/worker/perf_worker.go b/httpblaster/worker/perf_worker.go index aff8fb0..a90873b 100644 --- a/httpblaster/worker/perf_worker.go +++ b/httpblaster/worker/perf_worker.go @@ -24,7 +24,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/v3io/http_blaster/httpblaster/request_generators" "sync" - "time" + //"time" ) type PerfWorker struct { @@ -37,8 +37,8 @@ func (w *PerfWorker) UseBase(c WorkerBase) { func (w *PerfWorker) RunWorker(ch_resp chan *request_generators.Response, ch_req chan *request_generators.Request, wg *sync.WaitGroup, release_req bool, - ch_latency chan time.Duration, - ch_statuses chan int, + //ch_latency chan time.Duration, + //ch_statuses chan int, dump_requests bool, dump_location string) { defer wg.Done() @@ -54,17 +54,18 @@ func (w *PerfWorker) RunWorker(ch_resp chan *request_generators.Response, ch_req req_type.Do(func() { w.Results.Method = string(req.Request.Header.Method()) }) - err, d := w.send_request(req, response) + err, _ := w.send_request(req, response) - if err != nil{ + if err != nil { log.Errorf("send request failed %s", err.Error()) } - ch_statuses <- response.Response.StatusCode() - ch_latency <- d + //ch_statuses <- response.Response.StatusCode() + //ch_latency <- d request_generators.ReleaseRequest(req) response.Response.Reset() } - + log.Debugln("closing hist") + w.hist.Close() w.close_connection() } diff --git a/httpblaster/worker/worker_base.go b/httpblaster/worker/worker_base.go index f2e0921..3dec6d9 100644 --- a/httpblaster/worker/worker_base.go +++ b/httpblaster/worker/worker_base.go @@ -8,18 +8,21 @@ import ( "errors" "fmt" log "github.com/sirupsen/logrus" + "github.com/v3io/http_blaster/httpblaster/histogram" "github.com/v3io/http_blaster/httpblaster/request_generators" "github.com/valyala/fasthttp" "io/ioutil" "net" "os" - "time" "sync" + "time" ) const DialTimeout = 60 * time.Second const RequestTimeout = 600 * time.Second + var do_once sync.Once + type WorkerBase struct { host string conn net.Conn @@ -35,6 +38,7 @@ type WorkerBase struct { retry_count int timer *time.Timer id int + hist *histogram.LatencyHist } func (w *WorkerBase) open_connection() { @@ -131,6 +135,7 @@ func (w *WorkerBase) send(req *fasthttp.Request, resp *fasthttp.Response, w.timer.Reset(timeout) select { case duration := <-w.ch_duration: + w.hist.Add(duration) return nil, duration case err := <-w.ch_error: log.Debugf("request completed with error:%s", err.Error()) @@ -191,6 +196,10 @@ func (w *WorkerBase) GetResults() worker_results { return w.Results } +func (w *WorkerBase) GetHist() map[int64]int { + return w.hist.GetHistMap() +} + func NewWorker(worker_type WorkerType, host string, tls_client bool, lazy int, retry_codes []int, retry_count int, pem_file string, id int) Worker { if host == "" { return nil @@ -204,12 +213,14 @@ func NewWorker(worker_type WorkerType, host string, tls_client bool, lazy int, r retry_count = 1 } var worker Worker + hist := &histogram.LatencyHist{} + hist.New() if worker_type == PERFORMANCE_WORKER { worker = &PerfWorker{WorkerBase{host: host, is_tls_client: tls_client, retry_codes: retry_codes_map, - retry_count: retry_count, pem_file: pem_file, id: id}} - }else{ + retry_count: retry_count, pem_file: pem_file, id: id, hist: hist}} + } else { worker = &IngestWorker{WorkerBase{host: host, is_tls_client: tls_client, retry_codes: retry_codes_map, - retry_count: retry_count, pem_file: pem_file, id: id}} + retry_count: retry_count, pem_file: pem_file, id: id, hist: hist}} } worker.Init(lazy) return worker diff --git a/httpblaster/worker/worker_interface.go b/httpblaster/worker/worker_interface.go index 6608867..904c6ac 100644 --- a/httpblaster/worker/worker_interface.go +++ b/httpblaster/worker/worker_interface.go @@ -3,19 +3,20 @@ package worker import ( "github.com/v3io/http_blaster/httpblaster/request_generators" "sync" - "time" + //"time" ) type Worker interface { UseBase(c WorkerBase) Init(lazy int) GetResults() worker_results + GetHist() map[int64]int RunWorker(ch_resp chan *request_generators.Response, ch_req chan *request_generators.Request, wg *sync.WaitGroup, release_req bool, - ch_latency chan time.Duration, - ch_statuses chan int, + //ch_latency chan time.Duration, + //ch_statuses chan int, dump_requests bool, dump_location string) } diff --git a/httpblaster/worker/worker_type.go b/httpblaster/worker/worker_type.go index 276b651..f7752b5 100644 --- a/httpblaster/worker/worker_type.go +++ b/httpblaster/worker/worker_type.go @@ -1,7 +1,8 @@ package worker type WorkerType int32 + const ( - PERFORMANCE_WORKER WorkerType = iota - INGESTION_WORKER WorkerType = iota -) \ No newline at end of file + PERFORMANCE_WORKER WorkerType = iota + INGESTION_WORKER WorkerType = iota +)