Skip to content

Commit

Permalink
clean metrics, remove series if the ip is not seen for a while.
Browse files Browse the repository at this point in the history
  • Loading branch information
shizunge committed Jan 19, 2024
1 parent a1155d7 commit 96f114b
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 2 deletions.
11 changes: 10 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func main() {
prometheusHost := flag.String("prometheus_host", "0.0.0.0", "The address for prometheus")
prometheusPort := flag.String("prometheus_port", "2112", "The port for prometheus")
prometheusEntry := flag.String("prometheus_entry", "metrics", "Entry point for prometheus")
prometheusCleanUnseenSeconds := flag.Int("prometheus_clean_unseen_seconds", 0, "Remove series if the IP is not seen for the given time. Set to 0 to disable.")
geoipSupplier := flag.String("geoip_supplier", "off", "Supplier to obtain Geohash of IPs. Possible values are \"off\", \"ip-api\", \"max-mind-db\"")
maxMindDbFileName := flag.String("max_mind_db", "", "Path to the MaxMind DB file.")

Expand Down Expand Up @@ -155,6 +156,14 @@ func main() {
startAccepting(*maxClients, *connType, *connHost, connPort, interval, clients, records)
}
for {
time.Sleep(time.Duration(1<<63 - 1))
if *prometheusCleanUnseenSeconds <= 0 {
time.Sleep(time.Duration(1<<63 - 1))
} else {
time.Sleep(time.Second * time.Duration(60))
records <- metrics.RecordEntry{
RecordType: metrics.RecordEntryTypeClean,
CleanOlderThan: *prometheusCleanUnseenSeconds,
}
}
}
}
30 changes: 29 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package metrics
import (
"endlessh-go/geoip"
"net/http"
"os"
"time"

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
pq *UpdatablePriorityQueue
totalClients *prometheus.CounterVec
totalClientsClosed *prometheus.CounterVec
totalBytes *prometheus.CounterVec
Expand All @@ -35,6 +38,7 @@ var (
)

func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) {
pq = NewUpdatablePriorityQueue()
totalClients = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "endlessh_client_open_count_total",
Expand Down Expand Up @@ -84,14 +88,18 @@ func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) {
http.Handle("/"+prometheusEntry, handler)
go func() {
glog.Infof("Starting Prometheus on %v:%v, entry point is /%v", prometheusHost, prometheusPort, prometheusEntry)
http.ListenAndServe(prometheusHost+":"+prometheusPort, nil)
if err := http.ListenAndServe(prometheusHost+":"+prometheusPort, nil); err != nil {
glog.Errorf("Error starting Prometheus at port %v:%v: %v", prometheusHost, prometheusPort, err)
os.Exit(1)
}
}()
}

const (
RecordEntryTypeStart = iota
RecordEntryTypeSend = iota
RecordEntryTypeStop = iota
RecordEntryTypeClean = iota
)

type RecordEntry struct {
Expand All @@ -100,6 +108,7 @@ type RecordEntry struct {
LocalPort string
BytesSent int
MillisecondsSpent int64
CleanOlderThan int
}

func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.GeoOption) chan RecordEntry {
Expand All @@ -126,15 +135,34 @@ func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.Ge
"country": country,
"location": location}).Inc()
totalClients.With(prometheus.Labels{"local_port": r.LocalPort}).Inc()
pq.Update(r.IpAddr, time.Now())
case RecordEntryTypeSend:
secondsSpent := float64(r.MillisecondsSpent) / 1000
clientSeconds.With(prometheus.Labels{
"ip": r.IpAddr,
"local_port": r.LocalPort}).Add(secondsSpent)
totalBytes.With(prometheus.Labels{"local_port": r.LocalPort}).Add(float64(r.BytesSent))
totalSeconds.With(prometheus.Labels{"local_port": r.LocalPort}).Add(secondsSpent)
pq.Update(r.IpAddr, time.Now())
case RecordEntryTypeStop:
totalClientsClosed.With(prometheus.Labels{"local_port": r.LocalPort}).Inc()
pq.Update(r.IpAddr, time.Now())
case RecordEntryTypeClean:
glog.Warningf("Cleanup")
top := pq.Peek()
deadline := time.Now().Add(-time.Second * time.Duration(r.CleanOlderThan))
if top == nil {
glog.Warningf("top is nil")
} else {
glog.Warningf("%v %v", top.Key, top.Value)
}
for top != nil && top.Value.Before(deadline) {
glog.Warningf("%v %v", top.Key, top.Value)
clientIP.DeletePartialMatch(prometheus.Labels{"ip": top.Key})
clientSeconds.DeletePartialMatch(prometheus.Labels{"ip": top.Key})
pq.Pop()
top = pq.Peek()
}
}
}
}()
Expand Down
94 changes: 94 additions & 0 deletions metrics/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package metrics

import (
"container/heap"
"time"
)

// Pair represents a key-value pair with a timestamp
type Pair struct {
Key string
Value time.Time
HeapIdx int // Index in the heap for efficient updates
}

// PriorityQueue is a min-heap implementation for Pairs
type PriorityQueue []*Pair

// Len returns the length of the priority queue
func (pq PriorityQueue) Len() int { return len(pq) }

// Less compares two pairs based on their values (timestamps)
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Value.Before(pq[j].Value)
}

// Swap swaps two pairs in the priority queue
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].HeapIdx = i
pq[j].HeapIdx = j
}

// Push adds a pair to the priority queue
func (pq *PriorityQueue) Push(x interface{}) {
pair := x.(*Pair)
pair.HeapIdx = len(*pq)
*pq = append(*pq, pair)
}

// Pop removes the pair with the minimum value (timestamp) from the priority queue
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
pair := old[n-1]
pair.HeapIdx = -1 // for safety
*pq = old[0 : n-1]
return pair
}

// UpdatablePriorityQueue represents the data structure with the priority queue
type UpdatablePriorityQueue struct {
pq PriorityQueue
keyMap map[string]*Pair
}

// NewUpdatablePriorityQueue initializes a new UpdatablePriorityQueue
func NewUpdatablePriorityQueue() *UpdatablePriorityQueue {
return &UpdatablePriorityQueue{
pq: make(PriorityQueue, 0),
keyMap: make(map[string]*Pair),
}
}

// Update adds or updates a key-value pair in the data structure
func (ds *UpdatablePriorityQueue) Update(key string, value time.Time) {
if pair, ok := ds.keyMap[key]; ok {
// Key exists, update the time
pair.Value = value
heap.Fix(&ds.pq, pair.HeapIdx)
} else {
// Key does not exist, create a new entry
pair := &Pair{Key: key, Value: value}
heap.Push(&ds.pq, pair)
ds.keyMap[key] = pair
}
}

// Peek returns the entry with the minimal time
func (ds *UpdatablePriorityQueue) Peek() *Pair {
if ds.pq.Len() == 0 {
return nil
}
return ds.pq[0]
}

// Pop removes the entry with the minimal time
func (ds *UpdatablePriorityQueue) Pop() *Pair {
if ds.pq.Len() == 0 {
return nil
}
pair := heap.Pop(&ds.pq).(*Pair)
delete(ds.keyMap, pair.Key)
return pair
}

0 comments on commit 96f114b

Please sign in to comment.