Skip to content

Commit

Permalink
Add tweet created count per account ID metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Nov 1, 2023
1 parent cc8524c commit 70e184f
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 5 deletions.
4 changes: 2 additions & 2 deletions cmd/crossposting-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 26 additions & 3 deletions service/adapters/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/planetary-social/nos-crossposting-service/internal/logging"
"github.com/planetary-social/nos-crossposting-service/service/app"
"github.com/planetary-social/nos-crossposting-service/service/domain"
"github.com/planetary-social/nos-crossposting-service/service/domain/accounts"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)
Expand All @@ -34,6 +35,8 @@ const (
labelAction = "action"
labelActionValuePostTweet = "postTweet"
labelActionValueGetUser = "getUser"

labelAccountID = "accountID"
)

type Prometheus struct {
Expand All @@ -46,6 +49,7 @@ type Prometheus struct {
relayConnectionStateGauge *prometheus.GaugeVec
twitterAPICallsCounter *prometheus.CounterVec
purplePagesLookupResultCounter *prometheus.CounterVec
tweetCreatedCountPerAccountGauge *prometheus.GaugeVec

registry *prometheus.Registry

Expand Down Expand Up @@ -74,7 +78,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
},
[]string{labelTopic},
)
versionGague := prometheus.NewGaugeVec(
versionGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "version",
Help: "This metric exists just to put a commit label on it.",
Expand Down Expand Up @@ -115,18 +119,26 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
},
[]string{labelResult},
)
tweetCreatedCountPerAccountGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "tweet_created_per_account",
Help: "Tracks number of tweet created events in the queue per account id.",
},
[]string{labelAccountID},
)

reg := prometheus.NewRegistry()
for _, v := range []prometheus.Collector{
applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram,
subscriptionQueueLengthGauge,
versionGague,
versionGauge,
numberOfPublicKeyDownloadersGauge,
numberOfPublicKeyDownloaderRelaysGauge,
relayConnectionStateGauge,
twitterAPICallsCounter,
purplePagesLookupResultCounter,
tweetCreatedCountPerAccountGauge,

collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
collectors.NewGoCollector(),
Expand All @@ -147,7 +159,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
vcsTime = setting.Value
}
}
versionGague.With(prometheus.Labels{labelGo: buildInfo.GoVersion, labelVcsRevision: vcsRevision, labelVcsTime: vcsTime}).Set(1)
versionGauge.With(prometheus.Labels{labelGo: buildInfo.GoVersion, labelVcsRevision: vcsRevision, labelVcsTime: vcsTime}).Set(1)
}

return &Prometheus{
Expand All @@ -160,6 +172,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
relayConnectionStateGauge: relayConnectionStateGauge,
twitterAPICallsCounter: twitterAPICallsCounter,
purplePagesLookupResultCounter: purplePagesLookupResultCounter,
tweetCreatedCountPerAccountGauge: tweetCreatedCountPerAccountGauge,

registry: reg,

Expand Down Expand Up @@ -236,6 +249,16 @@ func (p *Prometheus) ReportPurplePagesLookupResult(err *error) {
p.purplePagesLookupResultCounter.With(labels).Inc()
}

func (p *Prometheus) ReportTweetCreatedCountPerAccount(m map[accounts.AccountID]int) {
p.tweetCreatedCountPerAccountGauge.Reset()

for accountId, count := range m {
p.tweetCreatedCountPerAccountGauge.
With(prometheus.Labels{labelAccountID: accountId.String()}).
Set(float64(count))
}
}

type ApplicationCall struct {
handlerName string
p *Prometheus
Expand Down
44 changes: 44 additions & 0 deletions service/adapters/sqlite/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ package sqlite

import (
"context"
"database/sql"

"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/service/domain/accounts"
)

type Subscriber struct {
pubsub *PubSub
db *sql.DB
}

func NewSubscriber(
pubsub *PubSub,
db *sql.DB,
) *Subscriber {
return &Subscriber{
pubsub: pubsub,
db: db,
}
}

Expand All @@ -23,3 +30,40 @@ func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) <-chan *Receiv
func (s *Subscriber) TweetCreatedQueueLength(ctx context.Context) (int, error) {
return s.pubsub.QueueLength(TweetCreatedTopic)
}

func (s *Subscriber) TweetCreatedAnalysis(ctx context.Context) (TweetCreatedAnalysis, error) {
analysis := TweetCreatedAnalysis{
TweetsPerAccountID: make(map[accounts.AccountID]int),
}

rows, err := s.db.Query(
"SELECT json_extract(payload, '$.accountID') as accountID, COUNT(*) FROM pubsub WHERE topic = ? GROUP BY accountID",
TweetCreatedTopic,
)
if err != nil {
return TweetCreatedAnalysis{}, errors.Wrap(err, "query error")
}

for rows.Next() {
var (
accountIDPrimitive string
count int
)
if err := rows.Scan(&accountIDPrimitive, &count); err != nil {
return TweetCreatedAnalysis{}, errors.Wrap(err, "scan error")
}

accountID, err := accounts.NewAccountID(accountIDPrimitive)
if err != nil {
return TweetCreatedAnalysis{}, errors.Wrap(err, "error creating account id")
}

analysis.TweetsPerAccountID[accountID] = count
}

return analysis, nil
}

type TweetCreatedAnalysis struct {
TweetsPerAccountID map[accounts.AccountID]int
}
39 changes: 39 additions & 0 deletions service/adapters/sqlite/subscriber_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package sqlite_test

import (
"context"
"testing"

"github.com/planetary-social/nos-crossposting-service/internal/fixtures"
"github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite"
"github.com/planetary-social/nos-crossposting-service/service/domain"
"github.com/stretchr/testify/require"
)

func TestSubscriber_TweetCreatedAnalysis(t *testing.T) {
t.Parallel()

ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
for i := 0; i < 10; i++ {
accountID := fixtures.SomeAccountID()

for j := 0; j <= i; j++ {
err := adapters.Publisher.PublishTweetCreated(accountID, domain.NewTweet(fixtures.SomeString()))
require.NoError(t, err)
}
}

return nil
})
require.NoError(t, err)

analysis, err := adapters.Subscriber.TweetCreatedAnalysis(ctx)
require.NoError(t, err)
require.Equal(t, 10, len(analysis.TweetsPerAccountID))
for _, count := range analysis.TweetsPerAccountID {
require.NotZero(t, count)
}
}
1 change: 1 addition & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type Metrics interface {
ReportCallingTwitterAPIToGetAUser(err error)
ReportSubscriptionQueueLength(topic string, n int)
ReportPurplePagesLookupResult(err *error)
ReportTweetCreatedCountPerAccount(map[accounts.AccountID]int)
}

type ApplicationCall interface {
Expand Down
8 changes: 8 additions & 0 deletions service/ports/sqlitepubsub/tweet_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,13 @@ func (s *TweetCreatedEventSubscriber) reportMetrics(ctx context.Context) error {
}

s.metrics.ReportSubscriptionQueueLength(sqlite.TweetCreatedTopic, n)

analysis, err := s.subscriber.TweetCreatedAnalysis(ctx)
if err != nil {
return errors.Wrap(err, "error reading queue length")
}

s.metrics.ReportTweetCreatedCountPerAccount(analysis.TweetsPerAccountID)

return nil
}

0 comments on commit 70e184f

Please sign in to comment.