From 696b065bf3e556083131ee6883cc10aa7c863a12 Mon Sep 17 00:00:00 2001 From: "D. Sidiropoulos" Date: Wed, 16 Feb 2022 22:36:06 +0100 Subject: [PATCH] #59 new cli flag -l for column-separation in output textfile (to generate csvs etc) (#60) * chore (.gitignore): ignore .vscode folder and any *.code-workspace files * clean (toputils.go): consolidate 1024-related constants used in Psize() into kibibytes, mebibytes, kibibytes which are well-known, human-readable units of measurement * clean (toputils.go): neutral cleanups in Psize() to make it more readable - the if-else structure was not really needed and it has been simplified to use plain 'ifs' - the "NA" case at the bottom would never be reached because all possible cases are already covered from a numerical perspective * feat (cli flags): add new flag '-b' which causes all traffic to be printed in raw bytes * clean (Psize unit tests): refactor the test harness to have it employ t.Run() instead of raw tests * clean (Psize unit tests): refactor the test harness to have it employ t.Run() instead of raw tests * chore (.gitignore): ignore .vscode folder and any *.code-workspace files * feat (new cli flag): add new flag '-b' to print traffic in raw bytes - consolidate 1024-related constants used in Psize() into kibibytes, mebibytes, kibibytes which are well-known, human-readable units of measurement - the if-else structure was not really needed and it has been simplified to use plain 'ifs' - the "NA" case at the bottom would never be reached because all possible cases are already covered from a numerical perspective - neutral cleanups in Psize() to make it more readable - refactor the test harness to have it employ t.Run() instead of raw tests * clean (StartUI): trivial neutral cleanups * refa (redraw mech): the redraw channel now records the reason behind each redraw with this change we can support the new '-r N' flag to exit upon refreshing N times * feat (cli -r ): new flag "-r " to specify the maximum number of times nats-top should refresh nats-stats before exiting * feat (FetchStats()): extracted the fetching logic of MonitorStats() into a new method * feat (MonitorStats()): simplify the method by having it invoke FetchStats() to fetch new nats-stats * clean (FetchStats()): trivial simplification * feat (cli flag -o): new flag to save a snapshot of the output directly into a file * feat (cli flag '-o -'): passing '-o -' now prints the snapshot to the standard output * sidefix (nats-top): replace log.Fatalf(...) that are followed by calls to usage() with calls to fmt.Fprintf(os.stderr, ...) this fix allows the usage message to be actually printed (old was: log.Fatalf(...) would cause the program to os.exit(1) before the usage message could actually be printed) * clean (nats-top): remove method exitWithError() since it's not being used * sideclean (refreshOptionHeader): use fmt.Print() instead of fmt.Printf() * clean (nats-top): remove 'defaultHeader' as it was not being used anywhere * clean (nats-top): move comments on the side to conserve vertical space * clean (nats-top): move comments on the side to conserve vertical space * revert (nats-top.go): revert a hunk that sneaked-in unintentionally in the previous few commits * feat (file output formatter): turn the default "plain-text" formatter into its own separate formatter * feat (generate csv): introduce new method generateParagraphCSV() tailor-made to generate .csv string-output * doc (readme): update the usage message to have it mention the -l flag Co-authored-by: Kyriakos Sidiropoulos --- nats-top.go | 303 +++++++++++++++++++++++++++++++++++++--------------- readme.md | 6 +- 2 files changed, 221 insertions(+), 88 deletions(-) diff --git a/nats-top.go b/nats-top.go index 49f3301..0866e5a 100644 --- a/nats-top.go +++ b/nats-top.go @@ -26,6 +26,7 @@ var ( lookupDNS = flag.Bool("lookup", false, "Enable client addresses DNS lookup.") outputFile = flag.String("o", "", "Save the very first nats-top snapshot to the given file and exit. If '-' is passed then the snapshot is printed the standard output.") showVersion = flag.Bool("v", false, "Show nats-top version.") + outputDelimiter = flag.String("l", "", "Specifies the delimiter to use for the output file when the '-o' parameter is used. By default this option is unset which means that standard grid-like plain-text output will be used.") displayRawBytes = flag.Bool("b", false, "Display traffic in raw bytes.") maxStatsRefreshes = flag.Int("r", -1, "Specifies the maximum number of times nats-top should refresh nats-stats before exiting.") @@ -37,28 +38,11 @@ var ( skipVerifyOpt = flag.Bool("k", false, "Skip verifying server certificate") ) -const ( - DEFAULT_PADDING_SIZE = 2 - DEFAULT_PADDING = " " - - DEFAULT_HOST_PADDING_SIZE = 15 -) - -var ( - defaultHeader = []interface{}{"HOST", "CID", "NAME", "SUBS", "PENDING", "MSGS_TO", "MSGS_FROM", "BYTES_TO", "BYTES_FROM", "LANG", "VERSION", "UPTIME", "LAST ACTIVITY"} - - // Chopped: HOST CID NAME... - defaultHeaderFormat = "%-6s %-10s %-10s %-10s %-10s %-10s %-7s %-7s %-7s %-40s" - defaultRowFormat = "%-6d %-10s %-10s %-10s %-10s %-10s %-7s %-7s %-7s %-40s" - - usageHelp = ` -usage: nats-top [-s server] [-m http_port] [-ms https_port] [-n num_connections] [-d delay_secs] [-r max] [-sort by] +const usageHelp = ` +usage: nats-top [-s server] [-m http_port] [-ms https_port] [-n num_connections] [-d delay_secs] [-r max] [-o FILE] [-l DELIMITER] [-sort by] [-cert FILE] [-key FILE ][-cacert FILE] [-k] [-b] ` - // cache for reducing DNS lookups in case enabled - resolvedHosts = map[string]string{} -) func usage() { log.Fatalf(usageHelp) @@ -84,7 +68,7 @@ func main() { engine = top.NewEngine(*host, *httpsPort, *conns, *delay) err := engine.SetupHTTPS(*caCertOpt, *certOpt, *keyOpt, *skipVerifyOpt) if err != nil { - log.Printf("nats-top: %s", err) + fmt.Fprintf(os.Stderr, "nats-top: %s", err) usage() } } else { @@ -93,31 +77,31 @@ func main() { } if engine.Host == "" { - log.Printf("nats-top: invalid monitoring endpoint") + fmt.Fprintf(os.Stderr, "nats-top: invalid monitoring endpoint") usage() } if engine.Port == 0 { - log.Printf("nats-top: invalid monitoring port") + fmt.Fprintf(os.Stderr, "nats-top: invalid monitoring port") usage() } // Smoke test to abort in case can't connect to server since the beginning. _, err := engine.Request("/varz") if err != nil { - log.Printf("nats-top: /varz smoke test failed: %s", err) + fmt.Fprintf(os.Stderr, "nats-top: /varz smoke test failed: %s", err) usage() } sortOpt := server.SortOpt(*sortBy) if !sortOpt.IsValid() { - log.Fatalf("nats-top: invalid option to sort by: %s\n", sortOpt) + fmt.Fprintf(os.Stderr, "nats-top: invalid option to sort by: %s\n", sortOpt) usage() } engine.SortOpt = sortOpt if *outputFile != "" { - saveStatsSnapshotToFile(engine, outputFile) + saveStatsSnapshotToFile(engine, outputFile, *outputDelimiter) return } @@ -132,9 +116,9 @@ func main() { StartUI(engine) } -func saveStatsSnapshotToFile(engine *top.Engine, outputFile *string) { +func saveStatsSnapshotToFile(engine *top.Engine, outputFile *string, outputDelimiter string) { stats := engine.FetchStatsSnapshot() - text := generateParagraph(engine, stats) + text := generateParagraph(engine, stats, outputDelimiter) if *outputFile == "-" { fmt.Print(text) @@ -167,18 +151,38 @@ func cleanExit() { os.Exit(0) } -func exitWithError() { - ui.Close() +// generateParagraph takes an options map and latest Stats +// then returns a formatted paragraph ready to be rendered +func generateParagraph( + engine *top.Engine, + stats *top.Stats, + outputDelimiter string, +) string { - // Show cursor once again - fmt.Print("\033[?25h") + if len(outputDelimiter) > 0 { //default + return generateParagraphCSV(engine, stats, outputDelimiter) + } - os.Exit(1) + return generateParagraphPlainText(engine, stats) } -// generateParagraph takes an options map and latest Stats -// then returns a formatted paragraph ready to be rendered -func generateParagraph( +const ( + DEFAULT_PADDING_SIZE = 2 + DEFAULT_PADDING = " " + + DEFAULT_HOST_PADDING_SIZE = 15 +) + +var ( + resolvedHosts = map[string]string{} // cache for reducing DNS lookups in case enabled + + standardHeaders = []interface{}{"SUBS", "PENDING", "MSGS_TO", "MSGS_FROM", "BYTES_TO", "BYTES_FROM", "LANG", "VERSION", "UPTIME", "LAST_ACTIVITY"} + + defaultHeaderColumns = []string{"%-6s", "%-10s", "%-10s", "%-10s", "%-10s", "%-10s", "%-7s", "%-7s", "%-7s", "%-40s"} // Chopped: HOST CID NAME... + defaultRowColumns = []string{"%-6d", "%-10s", "%-10s", "%-10s", "%-10s", "%-10s", "%-7s", "%-7s", "%-7s", "%-40s"} +) + +func generateParagraphPlainText( engine *top.Engine, stats *top.Stats, ) string { @@ -209,39 +213,39 @@ func generateParagraph( inBytesRate := top.Psize(*displayRawBytes, int64(stats.Rates.InBytesRate)) outBytesRate := top.Psize(*displayRawBytes, int64(stats.Rates.OutBytesRate)) - info := "NATS server version %s (uptime: %s) %s" - info += "\nServer:\n Load: CPU: %.1f%% Memory: %s Slow Consumers: %d\n" + info := "NATS server version %s (uptime: %s) %s\n" + info += "Server:\n" + info += " Load: CPU: %.1f%% Memory: %s Slow Consumers: %d\n" info += " In: Msgs: %s Bytes: %s Msgs/Sec: %.1f Bytes/Sec: %s\n" info += " Out: Msgs: %s Bytes: %s Msgs/Sec: %.1f Bytes/Sec: %s" - text := fmt.Sprintf(info, serverVersion, uptime, stats.Error, + text := fmt.Sprintf( + info, serverVersion, uptime, stats.Error, cpu, mem, slowConsumers, inMsgs, inBytes, inMsgsRate, inBytesRate, - outMsgs, outBytes, outMsgsRate, outBytesRate) + outMsgs, outBytes, outMsgsRate, outBytesRate, + ) + text += fmt.Sprintf("\n\nConnections Polled: %d\n", numConns) displaySubs := engine.DisplaySubs - // Dynamically add columns and padding depending - header := make([]interface{}, 0) + header := make([]interface{}, 0) // Dynamically add columns and padding depending hostSize := DEFAULT_HOST_PADDING_SIZE - // Disable name unless we have seen one using it - nameSize := 0 + nameSize := 0 // Disable name unless we have seen one using it for _, conn := range stats.Connz.Conns { var size int var hostname string if *lookupDNS { - // Make a lookup for each one of the ips and memoize - // them for subsequent polls. - if addr, present := resolvedHosts[conn.IP]; !present { + if addr, present := resolvedHosts[conn.IP]; !present { // Make a lookup for each one of the ips and memoize them for subsequent polls addrs, err := net.LookupAddr(conn.IP) if err == nil && len(addrs) > 0 && len(addrs[0]) > 0 { hostname = addrs[0] resolvedHosts[conn.IP] = hostname } else { // Otherwise just continue to use ip:port as resolved host - // can be an empty string even though there were no errors. + // can be an empty string even though there were no errors hostname = fmt.Sprintf("%s:%d", conn.IP, conn.Port) resolvedHosts[conn.IP] = hostname } @@ -252,49 +256,43 @@ func generateParagraph( hostname = fmt.Sprintf("%s:%d", conn.IP, conn.Port) } - // host - size = len(hostname) + size = len(hostname) // host if size > hostSize { hostSize = size + DEFAULT_PADDING_SIZE } - // name - size = len(conn.Name) + size = len(conn.Name) // name if size > nameSize { nameSize = size + DEFAULT_PADDING_SIZE - // If using name, ensure that it is not too small... - minLen := len("NAME") + minLen := len("NAME") // If using name, ensure that it is not too small... if nameSize < minLen { nameSize = minLen } } } - // Initial padding - connHeader := DEFAULT_PADDING + connHeader := DEFAULT_PADDING // Initial padding - // HOST - header = append(header, "HOST") + header = append(header, "HOST") // HOST connHeader += "%-" + fmt.Sprintf("%d", hostSize) + "s " - // CID - header = append(header, "CID") + header = append(header, "CID") // CID connHeader += " %-6s " - // NAME - if nameSize > 0 { + if nameSize > 0 { // NAME header = append(header, "NAME") connHeader += "%-" + fmt.Sprintf("%d", nameSize) + "s " } - header = append(header, "SUBS", "PENDING", "MSGS_TO", "MSGS_FROM", "BYTES_TO", "BYTES_FROM", "LANG", "VERSION", "UPTIME", "LAST ACTIVITY") - connHeader += defaultHeaderFormat + header = append(header, standardHeaders...) + + connHeader += strings.Join(defaultHeaderColumns, " ") if displaySubs { connHeader += "%13s" } - // ...LAST ACTIVITY - connHeader += "\n" + + connHeader += "\n" // ...LAST ACTIVITY var connRows string if displaySubs { @@ -304,23 +302,19 @@ func generateParagraph( connRows = fmt.Sprintf(connHeader, header...) } - // Add to screen! - text += connRows + text += connRows // Add to screen! connValues := DEFAULT_PADDING - // HOST: e.g. 192.168.1.1:78901 - connValues += "%-" + fmt.Sprintf("%d", hostSize) + "s " + connValues += "%-" + fmt.Sprintf("%d", hostSize) + "s " // HOST: e.g. 192.168.1.1:78901 - // CID: e.g. 1234 - connValues += " %-6d " + connValues += " %-6d " // CID: e.g. 1234 - // NAME: e.g. hello - if nameSize > 0 { + if nameSize > 0 { // NAME: e.g. hello connValues += "%-" + fmt.Sprintf("%d", nameSize) + "s " } - connValues += defaultRowFormat + connValues += strings.Join(defaultRowColumns, " ") if displaySubs { connValues += "%s" } @@ -336,14 +330,12 @@ func generateParagraph( h = fmt.Sprintf("%s:%d", conn.IP, conn.Port) } - // Build the info line - var connLine string + var connLine string // Build the info line connLineInfo := make([]interface{}, 0) connLineInfo = append(connLineInfo, h) connLineInfo = append(connLineInfo, conn.Cid) - // Name not included unless present - if nameSize > 0 { + if nameSize > 0 { // Name not included unless present connLineInfo = append(connLineInfo, conn.Name) } @@ -356,15 +348,153 @@ func generateParagraph( if displaySubs { subs := strings.Join(conn.Subs, ", ") connLineInfo = append(connLineInfo, subs) - connLine = fmt.Sprintf(connValues, connLineInfo...) + } + + connLine = fmt.Sprintf(connValues, connLineInfo...) + + text += connLine // Add line to screen! + } + + return text +} + +func generateParagraphCSV( + engine *top.Engine, + stats *top.Stats, + delimiter string, +) string { + + defaultHeaderAndRowColumnsForCsv := []string{"%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s"} // Chopped: HOST CID NAME... + + cpu := stats.Varz.CPU // Snapshot current stats + memVal := stats.Varz.Mem + uptime := stats.Varz.Uptime + numConns := stats.Connz.NumConns + inMsgsVal := stats.Varz.InMsgs + outMsgsVal := stats.Varz.OutMsgs + inBytesVal := stats.Varz.InBytes + outBytesVal := stats.Varz.OutBytes + slowConsumers := stats.Varz.SlowConsumers + + var serverVersion string + if stats.Varz.Version != "" { + serverVersion = stats.Varz.Version + } + + mem := top.Psize(false, memVal) //memory is exempt from the rawbytes flag + inMsgs := top.Psize(*displayRawBytes, inMsgsVal) + outMsgs := top.Psize(*displayRawBytes, outMsgsVal) + inBytes := top.Psize(*displayRawBytes, inBytesVal) + outBytes := top.Psize(*displayRawBytes, outBytesVal) + inMsgsRate := stats.Rates.InMsgsRate + outMsgsRate := stats.Rates.OutMsgsRate + inBytesRate := top.Psize(*displayRawBytes, int64(stats.Rates.InBytesRate)) + outBytesRate := top.Psize(*displayRawBytes, int64(stats.Rates.OutBytesRate)) + + info := "NATS server version[__DELIM__]%s[__DELIM__](uptime: %s)[__DELIM__]%s\n" + info += "Server:\n" + info += "Load:[__DELIM__]CPU:[__DELIM__]%.1f%%[__DELIM__]Memory:[__DELIM__]%s[__DELIM__]Slow Consumers:[__DELIM__]%d\n" + info += "In:[__DELIM__]Msgs:[__DELIM__]%s[__DELIM__]Bytes:[__DELIM__]%s[__DELIM__]Msgs/Sec:[__DELIM__]%.1f[__DELIM__]Bytes/Sec:[__DELIM__]%s\n" + info += "Out:[__DELIM__]Msgs:[__DELIM__]%s[__DELIM__]Bytes:[__DELIM__]%s[__DELIM__]Msgs/Sec:[__DELIM__]%.1f[__DELIM__]Bytes/Sec:[__DELIM__]%s" + + text := fmt.Sprintf( + info, serverVersion, uptime, stats.Error, + cpu, mem, slowConsumers, + inMsgs, inBytes, inMsgsRate, inBytesRate, + outMsgs, outBytes, outMsgsRate, outBytesRate, + ) + + text += fmt.Sprintf("\n\nConnections Polled:[__DELIM__]%d\n", numConns) + + displaySubs := engine.DisplaySubs + for _, conn := range stats.Connz.Conns { + if !*lookupDNS { + continue + } + + _, present := resolvedHosts[conn.IP] + if present { + continue + } + + addrs, err := net.LookupAddr(conn.IP) + + hostname := "" + if err == nil && len(addrs) > 0 && len(addrs[0]) > 0 { // Make a lookup for each one of the ips and memoize them for subsequent polls + hostname = addrs[0] + } else { // Otherwise just continue to use ip:port as resolved host can be an empty string even though there were no errors + hostname = fmt.Sprintf("%s:%d", conn.IP, conn.Port) + } + + resolvedHosts[conn.IP] = hostname + } + + header := make([]interface{}, 0) // Dynamically add columns + connHeader := "" + + header = append(header, "HOST") // HOST + connHeader += "%s[__DELIM__]" + + header = append(header, "CID") // CID + connHeader += "%s[__DELIM__]" + + header = append(header, "NAME") // NAME + connHeader += "%s[__DELIM__]" + + header = append(header, standardHeaders...) + connHeader += strings.Join(defaultHeaderAndRowColumnsForCsv, "[__DELIM__]") + if displaySubs { + connHeader += "%s" + } + + connHeader += "\n" // ...LAST ACTIVITY + + if displaySubs { + header = append(header, "SUBSCRIPTIONS") + } + + text += fmt.Sprintf(connHeader, header...) // Add to screen! + + connValues := "%s[__DELIM__]" // HOST: e.g. 192.168.1.1:78901 + connValues += "%d[__DELIM__]" // CID: e.g. 1234 + connValues += "%s[__DELIM__]" // NAME: e.g. hello + + connValues += strings.Join(defaultHeaderAndRowColumnsForCsv, "[__DELIM__]") + if displaySubs { + connValues += "%s" + } + connValues += "\n" + + for _, conn := range stats.Connz.Conns { + var h string + if *lookupDNS { + if rh, present := resolvedHosts[conn.IP]; present { + h = rh + } } else { - connLine = fmt.Sprintf(connValues, connLineInfo...) + h = fmt.Sprintf("%s:%d", conn.IP, conn.Port) + } + + connLineInfo := make([]interface{}, 0) + connLineInfo = append(connLineInfo, h) + connLineInfo = append(connLineInfo, conn.Cid) + connLineInfo = append(connLineInfo, conn.Name) + connLineInfo = append(connLineInfo, fmt.Sprintf("%d", conn.NumSubs)) + connLineInfo = append(connLineInfo, top.Psize(*displayRawBytes, int64(conn.Pending)), top.Psize(*displayRawBytes, conn.OutMsgs), top.Psize(*displayRawBytes, conn.InMsgs)) + connLineInfo = append(connLineInfo, top.Psize(*displayRawBytes, conn.OutBytes), top.Psize(*displayRawBytes, conn.InBytes)) + connLineInfo = append(connLineInfo, conn.Lang, conn.Version) + connLineInfo = append(connLineInfo, conn.Uptime, conn.LastActivity) + + if displaySubs { + subs := strings.Join(conn.Subs, "[__DELIM__]") + connLineInfo = append(connLineInfo, subs) } - // Add line to screen! - text += connLine + text += fmt.Sprintf(connValues, connLineInfo...) // Add line to screen! } + text = strings.Replace(text, "[__DELIM__]", delimiter, -1) + return text } @@ -393,7 +523,7 @@ func StartUI(engine *top.Engine) { } // Show empty values on first display - text := generateParagraph(engine, cleanStats) + text := generateParagraph(engine, cleanStats, "") par := ui.NewPar(text) par.Height = ui.TermHeight() par.Width = ui.TermWidth() @@ -429,7 +559,7 @@ func StartUI(engine *top.Engine) { for { stats := <-engine.StatsCh - par.Text = generateParagraph(engine, stats) // Update top view text + par.Text = generateParagraph(engine, stats, "") // Update top view text redraw <- DueToNewStats } @@ -442,14 +572,13 @@ func StartUI(engine *top.Engine) { optionBuf := "" refreshOptionHeader := func() { - // Need to mask what was typed before - clrline := "\033[1;1H\033[6;1H " + clrline := "\033[1;1H\033[6;1H " // Need to mask what was typed before clrline += " " for i := 0; i < len(optionBuf); i++ { clrline += " " } - fmt.Printf(clrline) + fmt.Print(clrline) } evt := ui.EventCh() diff --git a/readme.md b/readme.md index 2802d76..79eaa7e 100644 --- a/readme.md +++ b/readme.md @@ -40,7 +40,7 @@ and releases of the binary are also [available](https://github.com/nats-io/nats- ## Usage ``` -usage: nats-top [-s server] [-m http_port] [-ms https_port] [-n num_connections] [-d delay_secs] [-r max] [-o FILE] [-sort by] +usage: nats-top [-s server] [-m http_port] [-ms https_port] [-n num_connections] [-d delay_secs] [-r max] [-o FILE] [-l DELIMITER] [-sort by] [-cert FILE] [-key FILE ][-cacert FILE] [-k] [-b] ``` @@ -64,6 +64,10 @@ usage: nats-top [-s server] [-m http_port] [-ms https_port] [-n num_connections] Saves the very first nats-top snapshot to the given file and exits. If '-' is passed then the snapshot is printed to the standard output. +- `-l delimiter` + + Specifies the delimiter to use for the output file when the '-o' parameter is used. By default this option is unset which means that standard grid-like plain-text output will be used. + - `-sort by ` Field to use for sorting the connections.