Skip to content

Commit

Permalink
nsqd: refactor /stats; add gossip state
Browse files Browse the repository at this point in the history
  • Loading branch information
stephensearles authored and mreiferson committed Feb 27, 2015
1 parent 473321b commit 9d1eb6f
Showing 1 changed file with 48 additions and 27 deletions.
75 changes: 48 additions & 27 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/bitly/nsq/internal/http_api"
"github.com/bitly/nsq/internal/protocol"
"github.com/bitly/nsq/internal/version"
"github.com/hashicorp/serf/serf"
)

type httpServer struct {
Expand Down Expand Up @@ -179,6 +180,10 @@ func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request) {
if !s.ctx.nsqd.IsHealthy() {
code = 500
}
if s.ctx.nsqd.serf != nil && (s.ctx.nsqd.serf.State() == serf.SerfAlive || len(s.ctx.nsqd.serf.Members()) < 2) {
code = 500
health = "NOK - gossip unhealthy"
}
w.Header().Set("Content-Length", strconv.Itoa(len(health)))
w.WriteHeader(code)
io.WriteString(w, health)
Expand Down Expand Up @@ -233,6 +238,10 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e
}

func (s *httpServer) doLookup(req *http.Request) (interface{}, error) {
if s.ctx.nsqd.serf == nil || s.ctx.nsqd.serf.State() != serf.SerfAlive {
return nil, http_api.Err{400, "GOSSIP_NOT_ENABLED"}
}

reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}
Expand Down Expand Up @@ -528,8 +537,13 @@ func (s *httpServer) doStats(req *http.Request) (interface{}, error) {
startTime := s.ctx.nsqd.GetStartTime()
uptime := time.Since(startTime)

var serfStats map[string]string
if s.ctx.nsqd.serf != nil {
serfStats = s.ctx.nsqd.serf.Stats()
}

if !jsonFormat {
return s.printStats(stats, health, startTime, uptime), nil
return s.printStats(stats, health, startTime, uptime, serfStats), nil
}

return struct {
Expand All @@ -540,56 +554,55 @@ func (s *httpServer) doStats(req *http.Request) (interface{}, error) {
}{version.Binary, health, startTime.Unix(), stats}, nil
}

func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration) []byte {
var buf bytes.Buffer
w := &buf
func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration, gossip map[string]string) []byte {
w := &bytes.Buffer{}
now := time.Now()
io.WriteString(w, fmt.Sprintf("%s\n", version.String("nsqd")))
io.WriteString(w, fmt.Sprintf("start_time %v\n", startTime.Format(time.RFC3339)))
io.WriteString(w, fmt.Sprintf("uptime %s\n", uptime))
fmt.Fprintf(w, "%s\n", version.String("nsqd"))
fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339))
fmt.Fprintf(w, "uptime %s\n", uptime)
if len(stats) == 0 {
io.WriteString(w, "\nNO_TOPICS\n")
return buf.Bytes()
w.WriteString("\nNO_TOPICS\n")
return w.Bytes()
}
io.WriteString(w, fmt.Sprintf("\nHealth: %s\n", health))
fmt.Fprintf(w, "\nHealth: %s\n", health)
for _, t := range stats {
var pausedPrefix string
if t.Paused {
pausedPrefix = "*P "
} else {
pausedPrefix = " "
}
io.WriteString(w, fmt.Sprintf("\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
pausedPrefix,
t.TopicName,
t.Depth,
t.BackendDepth,
t.MessageCount,
t.E2eProcessingLatency))
t.E2eProcessingLatency)
for _, c := range t.Channels {
if c.Paused {
pausedPrefix = " *P "
} else {
pausedPrefix = " "
}
io.WriteString(w,
fmt.Sprintf("%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
pausedPrefix,
c.ChannelName,
c.Depth,
c.BackendDepth,
c.InFlightCount,
c.DeferredCount,
c.RequeueCount,
c.TimeoutCount,
c.MessageCount,
c.E2eProcessingLatency))
fmt.Fprintf(w,
"%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
pausedPrefix,
c.ChannelName,
c.Depth,
c.BackendDepth,
c.InFlightCount,
c.DeferredCount,
c.RequeueCount,
c.TimeoutCount,
c.MessageCount,
c.E2eProcessingLatency)
for _, client := range c.Clients {
connectTime := time.Unix(client.ConnectTime, 0)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
_, port, _ := net.SplitHostPort(client.RemoteAddress)
io.WriteString(w, fmt.Sprintf(" [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
fmt.Fprintf(w, " [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
client.Version,
fmt.Sprintf("%s:%s", client.Name, port),
client.State,
Expand All @@ -599,9 +612,17 @@ func (s *httpServer) printStats(stats []TopicStats, health string, startTime tim
client.RequeueCount,
client.MessageCount,
duration,
))
)
}
}
}
return buf.Bytes()

if gossip != nil {
fmt.Fprintf(w, "\nGossip:\n")
for k, v := range gossip {
fmt.Fprintf(w, " %s: %s\n", k, v)
}
}

return w.Bytes()
}

0 comments on commit 9d1eb6f

Please sign in to comment.