Skip to content

Commit

Permalink
Add reporting of lag as a metric
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Apr 25, 2024
1 parent 891815f commit be18e6a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 6 deletions.
40 changes: 36 additions & 4 deletions daemon.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package indexer

import (
"context"
"database/sql"
"log/slog"
"sync"
"time"

"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/pkg/errors"
)

type Daemon struct {
Expand Down Expand Up @@ -78,6 +80,9 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
func (d *Daemon) startStatsReporter(interval time.Duration) {
d.wg.Add(1) // add ourselves to the wait group

// calculating lag is more expensive than reading indexer stats so we only do it every 5th iteration
var iterations int64

Check warning on line 85 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L83-L85

Added lines #L83 - L85 were not covered by tests
go func() {
defer func() {
slog.Info("analytics exiting")
Expand All @@ -89,13 +94,19 @@ func (d *Daemon) startStatsReporter(interval time.Duration) {
case <-d.quit:
return
case <-time.After(interval):
d.reportStats()
d.reportStats(iterations%5 == 0)

Check warning on line 97 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L97

Added line #L97 was not covered by tests
}

iterations++

Check warning on line 100 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L100

Added line #L100 was not covered by tests
}
}()
}

func (d *Daemon) reportStats() {
func (d *Daemon) reportStats(includeLag bool) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

log := slog.New(slog.Default().Handler())

Check warning on line 109 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L105-L109

Added lines #L105 - L109 were not covered by tests
metrics := make(map[string]float64, len(d.indexers)*2)

for _, ix := range d.indexers {
Expand All @@ -115,9 +126,16 @@ func (d *Daemon) reportStats() {
metrics[ix.Name()+"_rate"] = rateInPeriod

d.prevStats[ix] = stats
}

log := slog.New(slog.Default().Handler())
if includeLag {
lag, err := d.calculateLag(ctx, ix)
if err != nil {
log.Error("error getting db last modified", "index", ix.Name(), "error", err)
} else {
metrics[ix.Name()+"_lag"] = lag.Seconds()
}

Check warning on line 136 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L130-L136

Added lines #L130 - L136 were not covered by tests
}
}

for k, v := range metrics {
analytics.Gauge("indexer."+k, v)
Expand All @@ -127,6 +145,20 @@ func (d *Daemon) reportStats() {
log.Info("stats reported")
}

func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Duration, error) {
esLastModified, err := ix.GetLastModified(ix.Name())
if err != nil {
return 0, errors.Wrap(err, "error getting ES last modified")
}

Check warning on line 152 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L148-L152

Added lines #L148 - L152 were not covered by tests

dbLastModified, err := ix.GetDBLastModified(ctx, d.db)
if err != nil {
return 0, errors.Wrap(err, "error getting DB last modified")
}

Check warning on line 157 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L154-L157

Added lines #L154 - L157 were not covered by tests

return dbLastModified.Sub(esLastModified), nil

Check warning on line 159 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L159

Added line #L159 was not covered by tests
}

// Stop stops this daemon
func (d *Daemon) Stop() {
slog.Info("daemon stopping")
Expand Down
13 changes: 11 additions & 2 deletions indexers/base.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package indexers

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -31,6 +32,9 @@ type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Stats() Stats

GetLastModified(index string) (time.Time, error)
GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error)
}

// IndexDefinition is what we pass to elastic to create an index,
Expand Down Expand Up @@ -318,8 +322,13 @@ func (i *baseIndexer) GetLastModified(index string) (time.Time, error) {
lastModified := time.Time{}

// get the newest document on our index
queryResponse := queryResponse{}
_, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse)
queryResponse := &queryResponse{}
_, err := utils.MakeJSONRequest(
http.MethodPost,
fmt.Sprintf("%s/%s/_search", i.elasticURL, index),
[]byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1}`),
queryResponse,
)
if err != nil {
return lastModified, err
}
Expand Down
24 changes: 24 additions & 0 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,27 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st

return totalCreated, totalDeleted, nil
}

func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {
lastModified := time.Time{}

if err := db.QueryRowContext(ctx, "SELECT MAX(modified_on) FROM contacts_contact").Scan(&lastModified); err != nil {
return lastModified, err
}

Check warning on line 283 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L278-L283

Added lines #L278 - L283 were not covered by tests

return lastModified, nil

Check warning on line 285 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L285

Added line #L285 was not covered by tests
}

func (i *ContactIndexer) Lag(ctx context.Context, db *sql.DB) (time.Duration, error) {
esLastModified, err := i.GetLastModified(i.name)
if err != nil {
return 0, errors.Wrap(err, "error getting ES last modified")
}

Check warning on line 292 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L288-L292

Added lines #L288 - L292 were not covered by tests

var dbLastModified time.Time
if err := db.QueryRowContext(ctx, "SELECT MAX(modified_on) FROM contacts_contact").Scan(&dbLastModified); err != nil {
return 0, errors.Wrap(err, "error getting ES last modified")
}

Check warning on line 297 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L294-L297

Added lines #L294 - L297 were not covered by tests

return dbLastModified.Sub(esLastModified), nil

Check warning on line 299 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L299

Added line #L299 was not covered by tests
}

0 comments on commit be18e6a

Please sign in to comment.