diff --git a/README.md b/README.md index f4baa8e..1cd0c26 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,8 @@ Usage of ./endlessh-go Path to the MaxMind DB file. -port value SSH listening port. You may provide multiple -port flags to listen to multiple ports. (default "2222") + -prometheus_clean_unseen_seconds int + Remove series if the IP is not seen for the given time. Set to 0 to disable. (default 0) -prometheus_entry string Entry point for prometheus (default "metrics") -prometheus_host string diff --git a/main.go b/main.go index da6e07d..6da6fc2 100644 --- a/main.go +++ b/main.go @@ -121,6 +121,7 @@ func main() { prometheusHost := flag.String("prometheus_host", "0.0.0.0", "The address for prometheus") prometheusPort := flag.String("prometheus_port", "2112", "The port for prometheus") prometheusEntry := flag.String("prometheus_entry", "metrics", "Entry point for prometheus") + prometheusCleanUnseenSeconds := flag.Int("prometheus_clean_unseen_seconds", 0, "Remove series if the IP is not seen for the given time. Set to 0 to disable. (default 0)") geoipSupplier := flag.String("geoip_supplier", "off", "Supplier to obtain Geohash of IPs. Possible values are \"off\", \"ip-api\", \"max-mind-db\"") maxMindDbFileName := flag.String("max_mind_db", "", "Path to the MaxMind DB file.") @@ -155,6 +156,14 @@ func main() { startAccepting(*maxClients, *connType, *connHost, connPort, interval, clients, records) } for { - time.Sleep(time.Duration(1<<63 - 1)) + if *prometheusCleanUnseenSeconds <= 0 { + time.Sleep(time.Duration(1<<63 - 1)) + } else { + time.Sleep(time.Second * time.Duration(60)) + records <- metrics.RecordEntry{ + RecordType: metrics.RecordEntryTypeClean, + CleanOlderThan: *prometheusCleanUnseenSeconds, + } + } } } diff --git a/metrics/metrics.go b/metrics/metrics.go index 99ffea3..a34d40a 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -19,6 +19,8 @@ package metrics import ( "endlessh-go/geoip" "net/http" + "os" + "time" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" @@ -26,6 +28,7 @@ import ( ) var ( + pq *UpdatablePriorityQueue totalClients *prometheus.CounterVec totalClientsClosed *prometheus.CounterVec totalBytes *prometheus.CounterVec @@ -35,6 +38,7 @@ var ( ) func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) { + pq = NewUpdatablePriorityQueue() totalClients = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "endlessh_client_open_count_total", @@ -84,7 +88,10 @@ func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) { http.Handle("/"+prometheusEntry, handler) go func() { glog.Infof("Starting Prometheus on %v:%v, entry point is /%v", prometheusHost, prometheusPort, prometheusEntry) - http.ListenAndServe(prometheusHost+":"+prometheusPort, nil) + if err := http.ListenAndServe(prometheusHost+":"+prometheusPort, nil); err != nil { + glog.Errorf("Error starting Prometheus at port %v:%v: %v", prometheusHost, prometheusPort, err) + os.Exit(1) + } }() } @@ -92,6 +99,7 @@ const ( RecordEntryTypeStart = iota RecordEntryTypeSend = iota RecordEntryTypeStop = iota + RecordEntryTypeClean = iota ) type RecordEntry struct { @@ -100,6 +108,7 @@ type RecordEntry struct { LocalPort string BytesSent int MillisecondsSpent int64 + CleanOlderThan int } func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.GeoOption) chan RecordEntry { @@ -126,6 +135,7 @@ func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.Ge "country": country, "location": location}).Inc() totalClients.With(prometheus.Labels{"local_port": r.LocalPort}).Inc() + pq.Update(r.IpAddr, time.Now()) case RecordEntryTypeSend: secondsSpent := float64(r.MillisecondsSpent) / 1000 clientSeconds.With(prometheus.Labels{ @@ -133,8 +143,26 @@ func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.Ge "local_port": r.LocalPort}).Add(secondsSpent) totalBytes.With(prometheus.Labels{"local_port": r.LocalPort}).Add(float64(r.BytesSent)) totalSeconds.With(prometheus.Labels{"local_port": r.LocalPort}).Add(secondsSpent) + pq.Update(r.IpAddr, time.Now()) case RecordEntryTypeStop: totalClientsClosed.With(prometheus.Labels{"local_port": r.LocalPort}).Inc() + pq.Update(r.IpAddr, time.Now()) + case RecordEntryTypeClean: + glog.Warningf("Cleanup") + top := pq.Peek() + deadline := time.Now().Add(-time.Second * time.Duration(r.CleanOlderThan)) + if top == nil { + glog.Warningf("top is nil") + } else { + glog.Warningf("%v %v", top.Key, top.Value) + } + for top != nil && top.Value.Before(deadline) { + glog.Warningf("%v %v", top.Key, top.Value) + clientIP.DeletePartialMatch(prometheus.Labels{"ip": top.Key}) + clientSeconds.DeletePartialMatch(prometheus.Labels{"ip": top.Key}) + pq.Pop() + top = pq.Peek() + } } } }() diff --git a/metrics/priority_queue.go b/metrics/priority_queue.go new file mode 100644 index 0000000..a7559fe --- /dev/null +++ b/metrics/priority_queue.go @@ -0,0 +1,94 @@ +package metrics + +import ( + "container/heap" + "time" +) + +// Pair represents a key-value pair with a timestamp +type Pair struct { + Key string + Value time.Time + HeapIdx int // Index in the heap for efficient updates +} + +// PriorityQueue is a min-heap implementation for Pairs +type PriorityQueue []*Pair + +// Len returns the length of the priority queue +func (pq PriorityQueue) Len() int { return len(pq) } + +// Less compares two pairs based on their values (timestamps) +func (pq PriorityQueue) Less(i, j int) bool { + return pq[i].Value.Before(pq[j].Value) +} + +// Swap swaps two pairs in the priority queue +func (pq PriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].HeapIdx = i + pq[j].HeapIdx = j +} + +// Push adds a pair to the priority queue +func (pq *PriorityQueue) Push(x interface{}) { + pair := x.(*Pair) + pair.HeapIdx = len(*pq) + *pq = append(*pq, pair) +} + +// Pop removes the pair with the minimum value (timestamp) from the priority queue +func (pq *PriorityQueue) Pop() interface{} { + old := *pq + n := len(old) + pair := old[n-1] + pair.HeapIdx = -1 // for safety + *pq = old[0 : n-1] + return pair +} + +// UpdatablePriorityQueue represents the data structure with the priority queue +type UpdatablePriorityQueue struct { + pq PriorityQueue + keyMap map[string]*Pair +} + +// NewUpdatablePriorityQueue initializes a new UpdatablePriorityQueue +func NewUpdatablePriorityQueue() *UpdatablePriorityQueue { + return &UpdatablePriorityQueue{ + pq: make(PriorityQueue, 0), + keyMap: make(map[string]*Pair), + } +} + +// Update adds or updates a key-value pair in the data structure +func (ds *UpdatablePriorityQueue) Update(key string, value time.Time) { + if pair, ok := ds.keyMap[key]; ok { + // Key exists, update the time + pair.Value = value + heap.Fix(&ds.pq, pair.HeapIdx) + } else { + // Key does not exist, create a new entry + pair := &Pair{Key: key, Value: value} + heap.Push(&ds.pq, pair) + ds.keyMap[key] = pair + } +} + +// Peek returns the entry with the minimal time +func (ds *UpdatablePriorityQueue) Peek() *Pair { + if ds.pq.Len() == 0 { + return nil + } + return ds.pq[0] +} + +// Pop removes the entry with the minimal time +func (ds *UpdatablePriorityQueue) Pop() *Pair { + if ds.pq.Len() == 0 { + return nil + } + pair := heap.Pop(&ds.pq).(*Pair) + delete(ds.keyMap, pair.Key) + return pair +}