Skip to content

Commit

Permalink
Report queue length metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Oct 16, 2023
1 parent 31a1db2 commit 590dac1
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 29 deletions.
6 changes: 5 additions & 1 deletion cmd/crossposting-service/di/inject_pubsub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package di

import (
watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/google/wire"
"github.com/planetary-social/nos-crossposting-service/service/adapters/memorypubsub"
"github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite"
Expand All @@ -15,10 +16,13 @@ var memoryPubsubSet = wire.NewSet(
)

var sqlitePubsubSet = wire.NewSet(
sqlite.NewWatermillSchemaAdapter,
sqlite.NewSqliteSchema,
wire.Bind(new(watermillsql.SchemaAdapter), new(sqlite.SqliteSchema)),

sqlite.NewWatermillOffsetsAdapter,
sqlite.NewWatermillSubscriber,
sqlitepubsubport.NewTweetCreatedEventSubscriber,
sqlite.NewSubscriber,
)

var sqliteTxPubsubSet = wire.NewSet(
Expand Down
28 changes: 18 additions & 10 deletions cmd/crossposting-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions service/adapters/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Prometheus struct {
applicationHandlerCallsCounter *prometheus.CounterVec
applicationHandlerCallDurationHistogram *prometheus.HistogramVec

subscriptionQueueLengthGauge *prometheus.GaugeVec
numberOfPublicKeyDownloadersGauge prometheus.Gauge
numberOfPublicKeyDownloaderRelaysGauge *prometheus.GaugeVec
relayConnectionStateGauge *prometheus.GaugeVec
Expand Down Expand Up @@ -140,6 +141,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
applicationHandlerCallsCounter: applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram: applicationHandlerCallDurationHistogram,

subscriptionQueueLengthGauge: subscriptionQueueLengthGauge,
numberOfPublicKeyDownloadersGauge: numberOfPublicKeyDownloadersGauge,
numberOfPublicKeyDownloaderRelaysGauge: numberOfPublicKeyDownloaderRelaysGauge,
relayConnectionStateGauge: relayConnectionStateGauge,
Expand Down Expand Up @@ -192,6 +194,10 @@ func (p *Prometheus) ReportCallingTwitterAPIToPostATweet(err error) {
p.twitterAPICallsToPostTweetCounter.With(labels).Inc()
}

func (p *Prometheus) ReportSubscriptionQueueLength(topic string, n int) {
p.subscriptionQueueLengthGauge.With(prometheus.Labels{labelTopic: topic}).Set(float64(n))
}

type ApplicationCall struct {
handlerName string
p *Prometheus
Expand Down
1 change: 1 addition & 0 deletions service/adapters/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type TestAdapters struct {
type TestedItems struct {
TransactionProvider *TestTransactionProvider
Migrations *Migrations
Subscriber *Subscriber
}

func Open(conf config.Config) (*sql.DB, error) {
Expand Down
53 changes: 53 additions & 0 deletions service/adapters/sqlite/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package sqlite

import (
"context"
"database/sql"

watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)

type Subscriber struct {
watermillSubscriber *watermillsql.Subscriber
offsetsAdapter watermillsql.OffsetsAdapter
schema SqliteSchema
db *sql.DB
}

func NewSubscriber(
watermillSubscriber *watermillsql.Subscriber,
offsetsAdapter watermillsql.OffsetsAdapter,
schema SqliteSchema,
db *sql.DB,
) *Subscriber {
return &Subscriber{
watermillSubscriber: watermillSubscriber,
offsetsAdapter: offsetsAdapter,
schema: schema,
db: db,
}
}

func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) (<-chan *message.Message, error) {
return s.watermillSubscriber.Subscribe(ctx, TweetCreatedTopic)
}

func (s *Subscriber) TweetCreatedQueueLength(ctx context.Context) (int, error) {
offsetsQuery, offsetsQueryArgs := s.offsetsAdapter.NextOffsetQuery(TweetCreatedTopic, consumerGroupName)

selectQuery := `
SELECT COUNT(*)
FROM ` + s.schema.MessagesTable(TweetCreatedTopic) + `
WHERE offset > (` + offsetsQuery + `)`

row := s.db.QueryRowContext(ctx, selectQuery, offsetsQueryArgs...)

var count int
if err := row.Scan(&count); err != nil {
return 0, errors.Wrap(err, "error calling row scan")
}

return count, nil
}
57 changes: 57 additions & 0 deletions service/adapters/sqlite/subscriber_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sqlite_test

import (
"context"
"testing"
"time"

"github.com/planetary-social/nos-crossposting-service/internal/fixtures"
"github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite"
"github.com/planetary-social/nos-crossposting-service/service/domain"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSubscriber_TweetCreatedQueueLength(t *testing.T) {
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

n, err := adapters.Subscriber.TweetCreatedQueueLength(ctx)
require.NoError(t, err)
require.Equal(t, 0, n)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.Publisher.PublishTweetCreated(fixtures.SomeAccountID(), domain.NewTweet(fixtures.SomeString()))
require.NoError(t, err)

return nil
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.Publisher.PublishTweetCreated(fixtures.SomeAccountID(), domain.NewTweet(fixtures.SomeString()))
require.NoError(t, err)

return nil
})
require.NoError(t, err)

n, err = adapters.Subscriber.TweetCreatedQueueLength(ctx)
require.NoError(t, err)
require.Equal(t, 2, n)

go func() {
ch, err := adapters.Subscriber.SubscribeToTweetCreated(ctx)
require.NoError(t, err)

for msg := range ch {
msg.Ack()
}
}()

require.EventuallyWithT(t, func(t *assert.CollectT) {
n, err := adapters.Subscriber.TweetCreatedQueueLength(ctx)
assert.NoError(t, err)
assert.Equal(t, 0, n)
}, 5*time.Second, 100*time.Millisecond)
}
22 changes: 13 additions & 9 deletions service/adapters/sqlite/watermill.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/boreq/errors"
)

const (
consumerGroupName = "main"
)

func NewWatermillPublisher(
tx *sql.Tx,
logger watermill.LoggerAdapter,
Expand All @@ -33,7 +37,7 @@ func NewWatermillSubscriber(
offsetsAdapter watermillsql.OffsetsAdapter,
) (*watermillsql.Subscriber, error) {
config := watermillsql.SubscriberConfig{
ConsumerGroup: "main",
ConsumerGroup: consumerGroupName,
PollInterval: 30 * time.Second,
ResendInterval: 30 * time.Second,
RetryInterval: 30 * time.Second,
Expand All @@ -45,14 +49,6 @@ func NewWatermillSubscriber(
return watermillsql.NewSubscriber(db, config, logger)
}

func NewWatermillSchemaAdapter() watermillsql.SchemaAdapter {
return SqliteSchema{
GenerateMessagesTableName: func(topic string) string {
return fmt.Sprintf("watermill_%s", topic)
},
}
}

func NewWatermillOffsetsAdapter() watermillsql.OffsetsAdapter {
return SqliteOffsetsAdapter{
GenerateMessagesOffsetsTableName: func(topic string) string {
Expand All @@ -66,6 +62,14 @@ type SqliteSchema struct {
SubscribeBatchSize int
}

func NewSqliteSchema() SqliteSchema {
return SqliteSchema{
GenerateMessagesTableName: func(topic string) string {
return fmt.Sprintf("watermill_%s", topic)
},
}
}

func (s SqliteSchema) SchemaInitializingQueries(topic string) []string {
createMessagesTable := strings.Join([]string{
"CREATE TABLE IF NOT EXISTS " + s.MessagesTable(topic) + " (",
Expand Down
1 change: 1 addition & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type Metrics interface {
ReportNumberOfPublicKeyDownloaderRelays(publicKey domain.PublicKey, n int)
ReportRelayConnectionState(relayAddress domain.RelayAddress, state RelayConnectionState)
ReportCallingTwitterAPIToPostATweet(err error)
ReportSubscriptionQueueLength(topic string, n int)
}

type ApplicationCall interface {
Expand Down
51 changes: 42 additions & 9 deletions service/ports/sqlitepubsub/tweet_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package sqlitepubsub
import (
"context"
"encoding/json"
"time"

watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/internal/logging"
Expand All @@ -14,30 +14,37 @@ import (
"github.com/planetary-social/nos-crossposting-service/service/domain/accounts"
)

const reportMetricsEvery = 60 * time.Second

type SendTweetHandler interface {
Handle(ctx context.Context, cmd app.SendTweet) (err error)
}

type TweetCreatedEventSubscriber struct {
handler SendTweetHandler
watermillSubscriber *watermillsql.Subscriber
logger logging.Logger
handler SendTweetHandler
subscriber *sqlite.Subscriber
logger logging.Logger
metrics app.Metrics
}

func NewTweetCreatedEventSubscriber(
handler SendTweetHandler,
watermillSubscriber *watermillsql.Subscriber,
subscriber *sqlite.Subscriber,
logger logging.Logger,
metrics app.Metrics,
) *TweetCreatedEventSubscriber {
return &TweetCreatedEventSubscriber{
handler: handler,
watermillSubscriber: watermillSubscriber,
logger: logger.New("tweetCreatedEventSubscriber"),
handler: handler,
subscriber: subscriber,
logger: logger.New("tweetCreatedEventSubscriber"),
metrics: metrics,
}
}

func (s *TweetCreatedEventSubscriber) Run(ctx context.Context) error {
ch, err := s.watermillSubscriber.Subscribe(ctx, sqlite.TweetCreatedTopic)
go s.reportMetricsLoop(ctx)

ch, err := s.subscriber.SubscribeToTweetCreated(ctx)
if err != nil {
return errors.Wrap(err, "error calling subscribe")
}
Expand Down Expand Up @@ -74,3 +81,29 @@ func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *me

return nil
}

func (s *TweetCreatedEventSubscriber) reportMetricsLoop(ctx context.Context) {
for {
if err := s.reportMetrics(ctx); err != nil {
s.logger.Error().WithError(err).Message("error reporting metrics")
}

select {
case <-time.After(reportMetricsEvery):
continue
case <-ctx.Done():
return

}
}
}

func (s *TweetCreatedEventSubscriber) reportMetrics(ctx context.Context) error {
n, err := s.subscriber.TweetCreatedQueueLength(ctx)
if err != nil {
return errors.Wrap(err, "error reading queue length")
}

s.metrics.ReportSubscriptionQueueLength(sqlite.TweetCreatedTopic, n)
return nil
}

0 comments on commit 590dac1

Please sign in to comment.