From 70e184fbc3e67b7ce7a312977529d8a97c52ef1b Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 14:48:40 +0900 Subject: [PATCH] Add tweet created count per account ID metrics --- cmd/crossposting-service/di/wire_gen.go | 4 +- service/adapters/prometheus/prometheus.go | 29 ++++++++++++-- service/adapters/sqlite/subscriber.go | 44 +++++++++++++++++++++ service/adapters/sqlite/subscriber_test.go | 39 ++++++++++++++++++ service/app/app.go | 1 + service/ports/sqlitepubsub/tweet_created.go | 8 ++++ 6 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 service/adapters/sqlite/subscriber_test.go diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index 5906550..a5f0984 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -92,7 +92,7 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S receivedEventSubscriber := memorypubsub2.NewReceivedEventSubscriber(receivedEventPubSub, processReceivedEventHandler, logger) sendTweetHandler := app.NewSendTweetHandler(genericTransactionProvider, appTwitter, logger, prometheusPrometheus) pubSub := sqlite.NewPubSub(db, logger) - subscriber := sqlite.NewSubscriber(pubSub) + subscriber := sqlite.NewSubscriber(pubSub, db) tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, subscriber, logger, prometheusPrometheus) migrationsStorage, err := sqlite.NewMigrationsStorage(db) if err != nil { @@ -132,7 +132,7 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te genericAdaptersFactoryFn := newTestAdaptersFactoryFn(diBuildTransactionSqliteAdaptersDependencies) genericTransactionProvider := sqlite.NewTestTransactionProvider(db, genericAdaptersFactoryFn) pubSub := sqlite.NewPubSub(db, logger) - subscriber := sqlite.NewSubscriber(pubSub) + subscriber := sqlite.NewSubscriber(pubSub, db) migrationsStorage, err := sqlite.NewMigrationsStorage(db) if err != nil { cleanup() diff --git a/service/adapters/prometheus/prometheus.go b/service/adapters/prometheus/prometheus.go index 0ce891c..b40526c 100644 --- a/service/adapters/prometheus/prometheus.go +++ b/service/adapters/prometheus/prometheus.go @@ -8,6 +8,7 @@ import ( "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" ) @@ -34,6 +35,8 @@ const ( labelAction = "action" labelActionValuePostTweet = "postTweet" labelActionValueGetUser = "getUser" + + labelAccountID = "accountID" ) type Prometheus struct { @@ -46,6 +49,7 @@ type Prometheus struct { relayConnectionStateGauge *prometheus.GaugeVec twitterAPICallsCounter *prometheus.CounterVec purplePagesLookupResultCounter *prometheus.CounterVec + tweetCreatedCountPerAccountGauge *prometheus.GaugeVec registry *prometheus.Registry @@ -74,7 +78,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { }, []string{labelTopic}, ) - versionGague := prometheus.NewGaugeVec( + versionGauge := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "version", Help: "This metric exists just to put a commit label on it.", @@ -115,18 +119,26 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { }, []string{labelResult}, ) + tweetCreatedCountPerAccountGauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "tweet_created_per_account", + Help: "Tracks number of tweet created events in the queue per account id.", + }, + []string{labelAccountID}, + ) reg := prometheus.NewRegistry() for _, v := range []prometheus.Collector{ applicationHandlerCallsCounter, applicationHandlerCallDurationHistogram, subscriptionQueueLengthGauge, - versionGague, + versionGauge, numberOfPublicKeyDownloadersGauge, numberOfPublicKeyDownloaderRelaysGauge, relayConnectionStateGauge, twitterAPICallsCounter, purplePagesLookupResultCounter, + tweetCreatedCountPerAccountGauge, collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), collectors.NewGoCollector(), @@ -147,7 +159,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { vcsTime = setting.Value } } - versionGague.With(prometheus.Labels{labelGo: buildInfo.GoVersion, labelVcsRevision: vcsRevision, labelVcsTime: vcsTime}).Set(1) + versionGauge.With(prometheus.Labels{labelGo: buildInfo.GoVersion, labelVcsRevision: vcsRevision, labelVcsTime: vcsTime}).Set(1) } return &Prometheus{ @@ -160,6 +172,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { relayConnectionStateGauge: relayConnectionStateGauge, twitterAPICallsCounter: twitterAPICallsCounter, purplePagesLookupResultCounter: purplePagesLookupResultCounter, + tweetCreatedCountPerAccountGauge: tweetCreatedCountPerAccountGauge, registry: reg, @@ -236,6 +249,16 @@ func (p *Prometheus) ReportPurplePagesLookupResult(err *error) { p.purplePagesLookupResultCounter.With(labels).Inc() } +func (p *Prometheus) ReportTweetCreatedCountPerAccount(m map[accounts.AccountID]int) { + p.tweetCreatedCountPerAccountGauge.Reset() + + for accountId, count := range m { + p.tweetCreatedCountPerAccountGauge. + With(prometheus.Labels{labelAccountID: accountId.String()}). + Set(float64(count)) + } +} + type ApplicationCall struct { handlerName string p *Prometheus diff --git a/service/adapters/sqlite/subscriber.go b/service/adapters/sqlite/subscriber.go index 564bbc0..c6980d5 100644 --- a/service/adapters/sqlite/subscriber.go +++ b/service/adapters/sqlite/subscriber.go @@ -2,17 +2,24 @@ package sqlite import ( "context" + "database/sql" + + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" ) type Subscriber struct { pubsub *PubSub + db *sql.DB } func NewSubscriber( pubsub *PubSub, + db *sql.DB, ) *Subscriber { return &Subscriber{ pubsub: pubsub, + db: db, } } @@ -23,3 +30,40 @@ func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) <-chan *Receiv func (s *Subscriber) TweetCreatedQueueLength(ctx context.Context) (int, error) { return s.pubsub.QueueLength(TweetCreatedTopic) } + +func (s *Subscriber) TweetCreatedAnalysis(ctx context.Context) (TweetCreatedAnalysis, error) { + analysis := TweetCreatedAnalysis{ + TweetsPerAccountID: make(map[accounts.AccountID]int), + } + + rows, err := s.db.Query( + "SELECT json_extract(payload, '$.accountID') as accountID, COUNT(*) FROM pubsub WHERE topic = ? GROUP BY accountID", + TweetCreatedTopic, + ) + if err != nil { + return TweetCreatedAnalysis{}, errors.Wrap(err, "query error") + } + + for rows.Next() { + var ( + accountIDPrimitive string + count int + ) + if err := rows.Scan(&accountIDPrimitive, &count); err != nil { + return TweetCreatedAnalysis{}, errors.Wrap(err, "scan error") + } + + accountID, err := accounts.NewAccountID(accountIDPrimitive) + if err != nil { + return TweetCreatedAnalysis{}, errors.Wrap(err, "error creating account id") + } + + analysis.TweetsPerAccountID[accountID] = count + } + + return analysis, nil +} + +type TweetCreatedAnalysis struct { + TweetsPerAccountID map[accounts.AccountID]int +} diff --git a/service/adapters/sqlite/subscriber_test.go b/service/adapters/sqlite/subscriber_test.go new file mode 100644 index 0000000..45cbda8 --- /dev/null +++ b/service/adapters/sqlite/subscriber_test.go @@ -0,0 +1,39 @@ +package sqlite_test + +import ( + "context" + "testing" + + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/stretchr/testify/require" +) + +func TestSubscriber_TweetCreatedAnalysis(t *testing.T) { + t.Parallel() + + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + for i := 0; i < 10; i++ { + accountID := fixtures.SomeAccountID() + + for j := 0; j <= i; j++ { + err := adapters.Publisher.PublishTweetCreated(accountID, domain.NewTweet(fixtures.SomeString())) + require.NoError(t, err) + } + } + + return nil + }) + require.NoError(t, err) + + analysis, err := adapters.Subscriber.TweetCreatedAnalysis(ctx) + require.NoError(t, err) + require.Equal(t, 10, len(analysis.TweetsPerAccountID)) + for _, count := range analysis.TweetsPerAccountID { + require.NotZero(t, count) + } +} diff --git a/service/app/app.go b/service/app/app.go index 576b050..05c2229 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -138,6 +138,7 @@ type Metrics interface { ReportCallingTwitterAPIToGetAUser(err error) ReportSubscriptionQueueLength(topic string, n int) ReportPurplePagesLookupResult(err *error) + ReportTweetCreatedCountPerAccount(map[accounts.AccountID]int) } type ApplicationCall interface { diff --git a/service/ports/sqlitepubsub/tweet_created.go b/service/ports/sqlitepubsub/tweet_created.go index e5afd04..33ed5bb 100644 --- a/service/ports/sqlitepubsub/tweet_created.go +++ b/service/ports/sqlitepubsub/tweet_created.go @@ -103,5 +103,13 @@ func (s *TweetCreatedEventSubscriber) reportMetrics(ctx context.Context) error { } s.metrics.ReportSubscriptionQueueLength(sqlite.TweetCreatedTopic, n) + + analysis, err := s.subscriber.TweetCreatedAnalysis(ctx) + if err != nil { + return errors.Wrap(err, "error reading queue length") + } + + s.metrics.ReportTweetCreatedCountPerAccount(analysis.TweetsPerAccountID) + return nil }