Skip to content

Commit

Permalink
Merge pull request #67 from nyaruka/report_lag
Browse files Browse the repository at this point in the history
Add reporting of lag as a metric
  • Loading branch information
rowanseymour authored Apr 25, 2024
2 parents 891815f + 5e8aeaf commit b12425f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 10 deletions.
40 changes: 36 additions & 4 deletions daemon.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package indexer

import (
"context"
"database/sql"
"log/slog"
"sync"
"time"

"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/pkg/errors"
)

type Daemon struct {
Expand Down Expand Up @@ -78,6 +80,9 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
func (d *Daemon) startStatsReporter(interval time.Duration) {
d.wg.Add(1) // add ourselves to the wait group

// calculating lag is more expensive than reading indexer stats so we only do it every 5th iteration
var iterations int64

go func() {
defer func() {
slog.Info("analytics exiting")
Expand All @@ -89,13 +94,19 @@ func (d *Daemon) startStatsReporter(interval time.Duration) {
case <-d.quit:
return
case <-time.After(interval):
d.reportStats()
d.reportStats(iterations%5 == 0)
}

iterations++
}
}()
}

func (d *Daemon) reportStats() {
func (d *Daemon) reportStats(includeLag bool) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

log := slog.New(slog.Default().Handler())
metrics := make(map[string]float64, len(d.indexers)*2)

for _, ix := range d.indexers {
Expand All @@ -115,9 +126,16 @@ func (d *Daemon) reportStats() {
metrics[ix.Name()+"_rate"] = rateInPeriod

d.prevStats[ix] = stats
}

log := slog.New(slog.Default().Handler())
if includeLag {
lag, err := d.calculateLag(ctx, ix)
if err != nil {
log.Error("error getting db last modified", "index", ix.Name(), "error", err)
} else {
metrics[ix.Name()+"_lag"] = lag.Seconds()
}
}
}

for k, v := range metrics {
analytics.Gauge("indexer."+k, v)
Expand All @@ -127,6 +145,20 @@ func (d *Daemon) reportStats() {
log.Info("stats reported")
}

func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Duration, error) {
esLastModified, err := ix.GetESLastModified(ix.Name())
if err != nil {
return 0, errors.Wrap(err, "error getting ES last modified")
}

dbLastModified, err := ix.GetDBLastModified(ctx, d.db)
if err != nil {
return 0, errors.Wrap(err, "error getting DB last modified")
}

return dbLastModified.Sub(esLastModified), nil
}

// Stop stops this daemon
func (d *Daemon) Stop() {
slog.Info("daemon stopping")
Expand Down
17 changes: 13 additions & 4 deletions indexers/base.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package indexers

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -31,6 +32,9 @@ type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Stats() Stats

GetESLastModified(index string) (time.Time, error)
GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error)
}

// IndexDefinition is what we pass to elastic to create an index,
Expand Down Expand Up @@ -313,13 +317,18 @@ type queryResponse struct {
} `json:"hits"`
}

// GetLastModified queries a concrete index and finds the last modified document, returning its modified time
func (i *baseIndexer) GetLastModified(index string) (time.Time, error) {
// GetESLastModified queries a concrete index and finds the last modified document, returning its modified time
func (i *baseIndexer) GetESLastModified(index string) (time.Time, error) {
lastModified := time.Time{}

// get the newest document on our index
queryResponse := queryResponse{}
_, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse)
queryResponse := &queryResponse{}
_, err := utils.MakeJSONRequest(
http.MethodPost,
fmt.Sprintf("%s/%s/_search", i.elasticURL, index),
[]byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1}`),
queryResponse,
)
if err != nil {
return lastModified, err
}
Expand Down
12 changes: 11 additions & 1 deletion indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
remapAlias = true
}

lastModified, err := i.GetLastModified(physicalIndex)
lastModified, err := i.GetESLastModified(physicalIndex)
if err != nil {
return "", errors.Wrap(err, "error finding last modified")
}
Expand Down Expand Up @@ -274,3 +274,13 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st

return totalCreated, totalDeleted, nil
}

func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {
lastModified := time.Time{}

if err := db.QueryRowContext(ctx, "SELECT MAX(modified_on) FROM contacts_contact").Scan(&lastModified); err != nil {
return lastModified, err
}

return lastModified, nil
}
15 changes: 14 additions & 1 deletion indexers/contacts_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package indexers_test

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -189,6 +190,14 @@ func TestContacts(t *testing.T) {
ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
assert.Equal(t, "indexer_test", ix1.Name())

dbModified, err := ix1.GetDBLastModified(context.Background(), db)
assert.NoError(t, err)
assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), dbModified, 0)

// error trying to get ES last modified on before index exists
_, err = ix1.GetESLastModified(aliasName)
assert.Error(t, err)

expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))

indexName, err := ix1.Index(db, false, false)
Expand All @@ -197,6 +206,10 @@ func TestContacts(t *testing.T) {

time.Sleep(1 * time.Second)

esModified, err := ix1.GetESLastModified(aliasName)
assert.NoError(t, err)
assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), esModified, 0)

assertIndexerStats(t, ix1, 9, 0)
assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName})

Expand All @@ -205,7 +218,7 @@ func TestContacts(t *testing.T) {
assertQuery(t, es, tc.query, tc.expected, "query mismatch for %s", string(jsonx.MustMarshal(src)))
}

lastModified, err := ix1.GetLastModified(indexName)
lastModified, err := ix1.GetESLastModified(indexName)
assert.NoError(t, err)
assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC))

Expand Down

0 comments on commit b12425f

Please sign in to comment.