Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report queue length metrics #8

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/crossposting-service/di/inject_pubsub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package di

import (
watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/google/wire"
"github.com/planetary-social/nos-crossposting-service/service/adapters/memorypubsub"
"github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite"
Expand All @@ -15,10 +16,13 @@ var memoryPubsubSet = wire.NewSet(
)

var sqlitePubsubSet = wire.NewSet(
sqlite.NewWatermillSchemaAdapter,
sqlite.NewSqliteSchema,
wire.Bind(new(watermillsql.SchemaAdapter), new(sqlite.SqliteSchema)),

sqlite.NewWatermillOffsetsAdapter,
sqlite.NewWatermillSubscriber,
sqlitepubsubport.NewTweetCreatedEventSubscriber,
sqlite.NewSubscriber,
)

var sqliteTxPubsubSet = wire.NewSet(
Expand Down
28 changes: 18 additions & 10 deletions cmd/crossposting-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.17
github.com/nbd-wtf/go-nostr v0.18.10
github.com/oklog/ulid/v2 v2.1.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/rs/zerolog v1.29.1
github.com/sirupsen/logrus v1.9.3
Expand Down Expand Up @@ -51,7 +52,6 @@ require (
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions service/adapters/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Prometheus struct {
applicationHandlerCallsCounter *prometheus.CounterVec
applicationHandlerCallDurationHistogram *prometheus.HistogramVec

subscriptionQueueLengthGauge *prometheus.GaugeVec
numberOfPublicKeyDownloadersGauge prometheus.Gauge
numberOfPublicKeyDownloaderRelaysGauge *prometheus.GaugeVec
relayConnectionStateGauge *prometheus.GaugeVec
Expand Down Expand Up @@ -140,6 +141,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
applicationHandlerCallsCounter: applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram: applicationHandlerCallDurationHistogram,

subscriptionQueueLengthGauge: subscriptionQueueLengthGauge,
numberOfPublicKeyDownloadersGauge: numberOfPublicKeyDownloadersGauge,
numberOfPublicKeyDownloaderRelaysGauge: numberOfPublicKeyDownloaderRelaysGauge,
relayConnectionStateGauge: relayConnectionStateGauge,
Expand Down Expand Up @@ -192,6 +194,10 @@ func (p *Prometheus) ReportCallingTwitterAPIToPostATweet(err error) {
p.twitterAPICallsToPostTweetCounter.With(labels).Inc()
}

func (p *Prometheus) ReportSubscriptionQueueLength(topic string, n int) {
p.subscriptionQueueLengthGauge.With(prometheus.Labels{labelTopic: topic}).Set(float64(n))
}

type ApplicationCall struct {
handlerName string
p *Prometheus
Expand Down
1 change: 1 addition & 0 deletions service/adapters/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type TestAdapters struct {
type TestedItems struct {
TransactionProvider *TestTransactionProvider
Migrations *Migrations
Subscriber *Subscriber
}

func Open(conf config.Config) (*sql.DB, error) {
Expand Down
53 changes: 53 additions & 0 deletions service/adapters/sqlite/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package sqlite

import (
"context"
"database/sql"

watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)

type Subscriber struct {
watermillSubscriber *watermillsql.Subscriber
offsetsAdapter watermillsql.OffsetsAdapter
schema SqliteSchema
db *sql.DB
}

func NewSubscriber(
watermillSubscriber *watermillsql.Subscriber,
offsetsAdapter watermillsql.OffsetsAdapter,
schema SqliteSchema,
db *sql.DB,
) *Subscriber {
return &Subscriber{
watermillSubscriber: watermillSubscriber,
offsetsAdapter: offsetsAdapter,
schema: schema,
db: db,
}
}

func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) (<-chan *message.Message, error) {
return s.watermillSubscriber.Subscribe(ctx, TweetCreatedTopic)
}

func (s *Subscriber) TweetCreatedQueueLength(ctx context.Context) (int, error) {
offsetsQuery, offsetsQueryArgs := s.offsetsAdapter.NextOffsetQuery(TweetCreatedTopic, consumerGroupName)

selectQuery := `
SELECT COUNT(*)
FROM ` + s.schema.MessagesTable(TweetCreatedTopic) + `
WHERE offset > (` + offsetsQuery + `)`

row := s.db.QueryRowContext(ctx, selectQuery, offsetsQueryArgs...)

var count int
if err := row.Scan(&count); err != nil {
return 0, errors.Wrap(err, "error calling row scan")
}

return count, nil
}
57 changes: 57 additions & 0 deletions service/adapters/sqlite/subscriber_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sqlite_test

import (
"context"
"testing"
"time"

"github.com/planetary-social/nos-crossposting-service/internal/fixtures"
"github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite"
"github.com/planetary-social/nos-crossposting-service/service/domain"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSubscriber_TweetCreatedQueueLength(t *testing.T) {
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

n, err := adapters.Subscriber.TweetCreatedQueueLength(ctx)
require.NoError(t, err)
require.Equal(t, 0, n)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.Publisher.PublishTweetCreated(fixtures.SomeAccountID(), domain.NewTweet(fixtures.SomeString()))
require.NoError(t, err)

return nil
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err := adapters.Publisher.PublishTweetCreated(fixtures.SomeAccountID(), domain.NewTweet(fixtures.SomeString()))
require.NoError(t, err)

return nil
})
require.NoError(t, err)

n, err = adapters.Subscriber.TweetCreatedQueueLength(ctx)
require.NoError(t, err)
require.Equal(t, 2, n)

go func() {
ch, err := adapters.Subscriber.SubscribeToTweetCreated(ctx)
require.NoError(t, err)

for msg := range ch {
msg.Ack()
}
}()

require.EventuallyWithT(t, func(t *assert.CollectT) {
n, err := adapters.Subscriber.TweetCreatedQueueLength(ctx)
assert.NoError(t, err)
assert.Equal(t, 0, n)
}, 5*time.Second, 100*time.Millisecond)
}
22 changes: 13 additions & 9 deletions service/adapters/sqlite/watermill.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/boreq/errors"
)

const (
consumerGroupName = "main"
)

func NewWatermillPublisher(
tx *sql.Tx,
logger watermill.LoggerAdapter,
Expand All @@ -33,7 +37,7 @@ func NewWatermillSubscriber(
offsetsAdapter watermillsql.OffsetsAdapter,
) (*watermillsql.Subscriber, error) {
config := watermillsql.SubscriberConfig{
ConsumerGroup: "main",
ConsumerGroup: consumerGroupName,
PollInterval: 30 * time.Second,
ResendInterval: 30 * time.Second,
RetryInterval: 30 * time.Second,
Expand All @@ -45,14 +49,6 @@ func NewWatermillSubscriber(
return watermillsql.NewSubscriber(db, config, logger)
}

func NewWatermillSchemaAdapter() watermillsql.SchemaAdapter {
return SqliteSchema{
GenerateMessagesTableName: func(topic string) string {
return fmt.Sprintf("watermill_%s", topic)
},
}
}

func NewWatermillOffsetsAdapter() watermillsql.OffsetsAdapter {
return SqliteOffsetsAdapter{
GenerateMessagesOffsetsTableName: func(topic string) string {
Expand All @@ -66,6 +62,14 @@ type SqliteSchema struct {
SubscribeBatchSize int
}

func NewSqliteSchema() SqliteSchema {
return SqliteSchema{
GenerateMessagesTableName: func(topic string) string {
return fmt.Sprintf("watermill_%s", topic)
},
}
}

func (s SqliteSchema) SchemaInitializingQueries(topic string) []string {
createMessagesTable := strings.Join([]string{
"CREATE TABLE IF NOT EXISTS " + s.MessagesTable(topic) + " (",
Expand Down
1 change: 1 addition & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type Metrics interface {
ReportNumberOfPublicKeyDownloaderRelays(publicKey domain.PublicKey, n int)
ReportRelayConnectionState(relayAddress domain.RelayAddress, state RelayConnectionState)
ReportCallingTwitterAPIToPostATweet(err error)
ReportSubscriptionQueueLength(topic string, n int)
}

type ApplicationCall interface {
Expand Down
51 changes: 42 additions & 9 deletions service/ports/sqlitepubsub/tweet_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package sqlitepubsub
import (
"context"
"encoding/json"
"time"

watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/internal/logging"
Expand All @@ -14,30 +14,37 @@ import (
"github.com/planetary-social/nos-crossposting-service/service/domain/accounts"
)

const reportMetricsEvery = 60 * time.Second

type SendTweetHandler interface {
Handle(ctx context.Context, cmd app.SendTweet) (err error)
}

type TweetCreatedEventSubscriber struct {
handler SendTweetHandler
watermillSubscriber *watermillsql.Subscriber
logger logging.Logger
handler SendTweetHandler
subscriber *sqlite.Subscriber
logger logging.Logger
metrics app.Metrics
}

func NewTweetCreatedEventSubscriber(
handler SendTweetHandler,
watermillSubscriber *watermillsql.Subscriber,
subscriber *sqlite.Subscriber,
logger logging.Logger,
metrics app.Metrics,
) *TweetCreatedEventSubscriber {
return &TweetCreatedEventSubscriber{
handler: handler,
watermillSubscriber: watermillSubscriber,
logger: logger.New("tweetCreatedEventSubscriber"),
handler: handler,
subscriber: subscriber,
logger: logger.New("tweetCreatedEventSubscriber"),
metrics: metrics,
}
}

func (s *TweetCreatedEventSubscriber) Run(ctx context.Context) error {
ch, err := s.watermillSubscriber.Subscribe(ctx, sqlite.TweetCreatedTopic)
go s.reportMetricsLoop(ctx)

ch, err := s.subscriber.SubscribeToTweetCreated(ctx)
if err != nil {
return errors.Wrap(err, "error calling subscribe")
}
Expand Down Expand Up @@ -74,3 +81,29 @@ func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *me

return nil
}

func (s *TweetCreatedEventSubscriber) reportMetricsLoop(ctx context.Context) {
for {
if err := s.reportMetrics(ctx); err != nil {
s.logger.Error().WithError(err).Message("error reporting metrics")
}

select {
case <-time.After(reportMetricsEvery):
continue
case <-ctx.Done():
return

}
}
}

func (s *TweetCreatedEventSubscriber) reportMetrics(ctx context.Context) error {
n, err := s.subscriber.TweetCreatedQueueLength(ctx)
if err != nil {
return errors.Wrap(err, "error reading queue length")
}

s.metrics.ReportSubscriptionQueueLength(sqlite.TweetCreatedTopic, n)
return nil
}