diff --git a/cmd/crossposting-service/di/inject_pubsub.go b/cmd/crossposting-service/di/inject_pubsub.go index 09f7b41..c21369e 100644 --- a/cmd/crossposting-service/di/inject_pubsub.go +++ b/cmd/crossposting-service/di/inject_pubsub.go @@ -1,6 +1,7 @@ package di import ( + watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/service/adapters/memorypubsub" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" @@ -15,10 +16,13 @@ var memoryPubsubSet = wire.NewSet( ) var sqlitePubsubSet = wire.NewSet( - sqlite.NewWatermillSchemaAdapter, + sqlite.NewSqliteSchema, + wire.Bind(new(watermillsql.SchemaAdapter), new(sqlite.SqliteSchema)), + sqlite.NewWatermillOffsetsAdapter, sqlite.NewWatermillSubscriber, sqlitepubsubport.NewTweetCreatedEventSubscriber, + sqlite.NewSubscriber, ) var sqliteTxPubsubSet = wire.NewSet( diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index 2662311..9634b8c 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -85,15 +85,16 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S noopTwitter := twitter.NewNoopTwitter(logger) appTwitter := selectTwitterAdapterDependingOnConfig(configConfig, twitterTwitter, noopTwitter) sendTweetHandler := app.NewSendTweetHandler(genericTransactionProvider, appTwitter, logger, prometheusPrometheus) - schemaAdapter := sqlite.NewWatermillSchemaAdapter() + sqliteSchema := sqlite.NewSqliteSchema() offsetsAdapter := sqlite.NewWatermillOffsetsAdapter() - subscriber, err := sqlite.NewWatermillSubscriber(db, watermillAdapter, schemaAdapter, offsetsAdapter) + subscriber, err := sqlite.NewWatermillSubscriber(db, watermillAdapter, sqliteSchema, offsetsAdapter) if err != nil { cleanup() return Service{}, nil, err } - tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, subscriber, logger) - migrations := sqlite.NewMigrations(db, schemaAdapter, offsetsAdapter) + sqliteSubscriber := sqlite.NewSubscriber(subscriber, offsetsAdapter, sqliteSchema, db) + tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, sqliteSubscriber, logger, prometheusPrometheus) + migrations := sqlite.NewMigrations(db, sqliteSchema, offsetsAdapter) service := NewService(application, server, metricsServer, downloader, receivedEventSubscriber, tweetCreatedEventSubscriber, migrations) return service, func() { cleanup() @@ -119,12 +120,19 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te } genericAdaptersFactoryFn := newTestAdaptersFactoryFn(diBuildTransactionSqliteAdaptersDependencies) genericTransactionProvider := sqlite.NewTestTransactionProvider(db, genericAdaptersFactoryFn) - schemaAdapter := sqlite.NewWatermillSchemaAdapter() + sqliteSchema := sqlite.NewSqliteSchema() offsetsAdapter := sqlite.NewWatermillOffsetsAdapter() - migrations := sqlite.NewMigrations(db, schemaAdapter, offsetsAdapter) + migrations := sqlite.NewMigrations(db, sqliteSchema, offsetsAdapter) + subscriber, err := sqlite.NewWatermillSubscriber(db, watermillAdapter, sqliteSchema, offsetsAdapter) + if err != nil { + cleanup() + return sqlite.TestedItems{}, nil, err + } + sqliteSubscriber := sqlite.NewSubscriber(subscriber, offsetsAdapter, sqliteSchema, db) testedItems := sqlite.TestedItems{ TransactionProvider: genericTransactionProvider, Migrations: migrations, + Subscriber: sqliteSubscriber, } return testedItems, func() { cleanup() @@ -153,8 +161,8 @@ func buildTransactionSqliteAdapters(db *sql.DB, tx *sql.Tx, diBuildTransactionSq return app.Adapters{}, err } loggerAdapter := diBuildTransactionSqliteAdaptersDependencies.LoggerAdapter - schemaAdapter := sqlite.NewWatermillSchemaAdapter() - publisher, err := sqlite.NewWatermillPublisher(tx, loggerAdapter, schemaAdapter) + sqliteSchema := sqlite.NewSqliteSchema() + publisher, err := sqlite.NewWatermillPublisher(tx, loggerAdapter, sqliteSchema) if err != nil { return app.Adapters{}, err } @@ -192,8 +200,8 @@ func buildTestTransactionSqliteAdapters(db *sql.DB, tx *sql.Tx, diBuildTransacti return sqlite.TestAdapters{}, err } loggerAdapter := diBuildTransactionSqliteAdaptersDependencies.LoggerAdapter - schemaAdapter := sqlite.NewWatermillSchemaAdapter() - publisher, err := sqlite.NewWatermillPublisher(tx, loggerAdapter, schemaAdapter) + sqliteSchema := sqlite.NewSqliteSchema() + publisher, err := sqlite.NewWatermillPublisher(tx, loggerAdapter, sqliteSchema) if err != nil { return sqlite.TestAdapters{}, err } diff --git a/service/adapters/prometheus/prometheus.go b/service/adapters/prometheus/prometheus.go index e5c032d..6131c38 100644 --- a/service/adapters/prometheus/prometheus.go +++ b/service/adapters/prometheus/prometheus.go @@ -36,6 +36,7 @@ type Prometheus struct { applicationHandlerCallsCounter *prometheus.CounterVec applicationHandlerCallDurationHistogram *prometheus.HistogramVec + subscriptionQueueLengthGauge *prometheus.GaugeVec numberOfPublicKeyDownloadersGauge prometheus.Gauge numberOfPublicKeyDownloaderRelaysGauge *prometheus.GaugeVec relayConnectionStateGauge *prometheus.GaugeVec @@ -140,6 +141,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { applicationHandlerCallsCounter: applicationHandlerCallsCounter, applicationHandlerCallDurationHistogram: applicationHandlerCallDurationHistogram, + subscriptionQueueLengthGauge: subscriptionQueueLengthGauge, numberOfPublicKeyDownloadersGauge: numberOfPublicKeyDownloadersGauge, numberOfPublicKeyDownloaderRelaysGauge: numberOfPublicKeyDownloaderRelaysGauge, relayConnectionStateGauge: relayConnectionStateGauge, @@ -192,6 +194,10 @@ func (p *Prometheus) ReportCallingTwitterAPIToPostATweet(err error) { p.twitterAPICallsToPostTweetCounter.With(labels).Inc() } +func (p *Prometheus) ReportSubscriptionQueueLength(topic string, n int) { + p.subscriptionQueueLengthGauge.With(prometheus.Labels{labelTopic: topic}).Set(float64(n)) +} + type ApplicationCall struct { handlerName string p *Prometheus diff --git a/service/adapters/sqlite/sqlite.go b/service/adapters/sqlite/sqlite.go index b82e6cf..f40885d 100644 --- a/service/adapters/sqlite/sqlite.go +++ b/service/adapters/sqlite/sqlite.go @@ -23,6 +23,7 @@ type TestAdapters struct { type TestedItems struct { TransactionProvider *TestTransactionProvider Migrations *Migrations + Subscriber *Subscriber } func Open(conf config.Config) (*sql.DB, error) { diff --git a/service/adapters/sqlite/subscriber.go b/service/adapters/sqlite/subscriber.go new file mode 100644 index 0000000..6dbd7da --- /dev/null +++ b/service/adapters/sqlite/subscriber.go @@ -0,0 +1,53 @@ +package sqlite + +import ( + "context" + "database/sql" + + watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/pkg/errors" +) + +type Subscriber struct { + watermillSubscriber *watermillsql.Subscriber + offsetsAdapter watermillsql.OffsetsAdapter + schema SqliteSchema + db *sql.DB +} + +func NewSubscriber( + watermillSubscriber *watermillsql.Subscriber, + offsetsAdapter watermillsql.OffsetsAdapter, + schema SqliteSchema, + db *sql.DB, +) *Subscriber { + return &Subscriber{ + watermillSubscriber: watermillSubscriber, + offsetsAdapter: offsetsAdapter, + schema: schema, + db: db, + } +} + +func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) (<-chan *message.Message, error) { + return s.watermillSubscriber.Subscribe(ctx, TweetCreatedTopic) +} + +func (s *Subscriber) TweetCreatedQueueLength(ctx context.Context) (int, error) { + offsetsQuery, offsetsQueryArgs := s.offsetsAdapter.NextOffsetQuery(TweetCreatedTopic, consumerGroupName) + + selectQuery := ` + SELECT COUNT(*) + FROM ` + s.schema.MessagesTable(TweetCreatedTopic) + ` + WHERE offset > (` + offsetsQuery + `)` + + row := s.db.QueryRowContext(ctx, selectQuery, offsetsQueryArgs...) + + var count int + if err := row.Scan(&count); err != nil { + return 0, errors.Wrap(err, "error calling row scan") + } + + return count, nil +} diff --git a/service/adapters/sqlite/subscriber_test.go b/service/adapters/sqlite/subscriber_test.go new file mode 100644 index 0000000..272c04c --- /dev/null +++ b/service/adapters/sqlite/subscriber_test.go @@ -0,0 +1,57 @@ +package sqlite_test + +import ( + "context" + "testing" + "time" + + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSubscriber_TweetCreatedQueueLength(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + n, err := adapters.Subscriber.TweetCreatedQueueLength(ctx) + require.NoError(t, err) + require.Equal(t, 0, n) + + err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.Publisher.PublishTweetCreated(fixtures.SomeAccountID(), domain.NewTweet(fixtures.SomeString())) + require.NoError(t, err) + + return nil + }) + require.NoError(t, err) + + err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.Publisher.PublishTweetCreated(fixtures.SomeAccountID(), domain.NewTweet(fixtures.SomeString())) + require.NoError(t, err) + + return nil + }) + require.NoError(t, err) + + n, err = adapters.Subscriber.TweetCreatedQueueLength(ctx) + require.NoError(t, err) + require.Equal(t, 2, n) + + go func() { + ch, err := adapters.Subscriber.SubscribeToTweetCreated(ctx) + require.NoError(t, err) + + for msg := range ch { + msg.Ack() + } + }() + + require.EventuallyWithT(t, func(t *assert.CollectT) { + n, err := adapters.Subscriber.TweetCreatedQueueLength(ctx) + assert.NoError(t, err) + assert.Equal(t, 0, n) + }, 5*time.Second, 100*time.Millisecond) +} diff --git a/service/adapters/sqlite/watermill.go b/service/adapters/sqlite/watermill.go index 36eb86c..45612a7 100644 --- a/service/adapters/sqlite/watermill.go +++ b/service/adapters/sqlite/watermill.go @@ -13,6 +13,10 @@ import ( "github.com/boreq/errors" ) +const ( + consumerGroupName = "main" +) + func NewWatermillPublisher( tx *sql.Tx, logger watermill.LoggerAdapter, @@ -33,7 +37,7 @@ func NewWatermillSubscriber( offsetsAdapter watermillsql.OffsetsAdapter, ) (*watermillsql.Subscriber, error) { config := watermillsql.SubscriberConfig{ - ConsumerGroup: "main", + ConsumerGroup: consumerGroupName, PollInterval: 30 * time.Second, ResendInterval: 30 * time.Second, RetryInterval: 30 * time.Second, @@ -45,14 +49,6 @@ func NewWatermillSubscriber( return watermillsql.NewSubscriber(db, config, logger) } -func NewWatermillSchemaAdapter() watermillsql.SchemaAdapter { - return SqliteSchema{ - GenerateMessagesTableName: func(topic string) string { - return fmt.Sprintf("watermill_%s", topic) - }, - } -} - func NewWatermillOffsetsAdapter() watermillsql.OffsetsAdapter { return SqliteOffsetsAdapter{ GenerateMessagesOffsetsTableName: func(topic string) string { @@ -66,6 +62,14 @@ type SqliteSchema struct { SubscribeBatchSize int } +func NewSqliteSchema() SqliteSchema { + return SqliteSchema{ + GenerateMessagesTableName: func(topic string) string { + return fmt.Sprintf("watermill_%s", topic) + }, + } +} + func (s SqliteSchema) SchemaInitializingQueries(topic string) []string { createMessagesTable := strings.Join([]string{ "CREATE TABLE IF NOT EXISTS " + s.MessagesTable(topic) + " (", diff --git a/service/app/app.go b/service/app/app.go index a03f990..baf5168 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -119,6 +119,7 @@ type Metrics interface { ReportNumberOfPublicKeyDownloaderRelays(publicKey domain.PublicKey, n int) ReportRelayConnectionState(relayAddress domain.RelayAddress, state RelayConnectionState) ReportCallingTwitterAPIToPostATweet(err error) + ReportSubscriptionQueueLength(topic string, n int) } type ApplicationCall interface { diff --git a/service/ports/sqlitepubsub/tweet_created.go b/service/ports/sqlitepubsub/tweet_created.go index 22c4226..2ff1901 100644 --- a/service/ports/sqlitepubsub/tweet_created.go +++ b/service/ports/sqlitepubsub/tweet_created.go @@ -3,8 +3,8 @@ package sqlitepubsub import ( "context" "encoding/json" + "time" - watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/internal/logging" @@ -14,30 +14,37 @@ import ( "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" ) +const reportMetricsEvery = 60 * time.Second + type SendTweetHandler interface { Handle(ctx context.Context, cmd app.SendTweet) (err error) } type TweetCreatedEventSubscriber struct { - handler SendTweetHandler - watermillSubscriber *watermillsql.Subscriber - logger logging.Logger + handler SendTweetHandler + subscriber *sqlite.Subscriber + logger logging.Logger + metrics app.Metrics } func NewTweetCreatedEventSubscriber( handler SendTweetHandler, - watermillSubscriber *watermillsql.Subscriber, + subscriber *sqlite.Subscriber, logger logging.Logger, + metrics app.Metrics, ) *TweetCreatedEventSubscriber { return &TweetCreatedEventSubscriber{ - handler: handler, - watermillSubscriber: watermillSubscriber, - logger: logger.New("tweetCreatedEventSubscriber"), + handler: handler, + subscriber: subscriber, + logger: logger.New("tweetCreatedEventSubscriber"), + metrics: metrics, } } func (s *TweetCreatedEventSubscriber) Run(ctx context.Context) error { - ch, err := s.watermillSubscriber.Subscribe(ctx, sqlite.TweetCreatedTopic) + go s.reportMetricsLoop(ctx) + + ch, err := s.subscriber.SubscribeToTweetCreated(ctx) if err != nil { return errors.Wrap(err, "error calling subscribe") } @@ -74,3 +81,29 @@ func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *me return nil } + +func (s *TweetCreatedEventSubscriber) reportMetricsLoop(ctx context.Context) { + for { + if err := s.reportMetrics(ctx); err != nil { + s.logger.Error().WithError(err).Message("error reporting metrics") + } + + select { + case <-time.After(reportMetricsEvery): + continue + case <-ctx.Done(): + return + + } + } +} + +func (s *TweetCreatedEventSubscriber) reportMetrics(ctx context.Context) error { + n, err := s.subscriber.TweetCreatedQueueLength(ctx) + if err != nil { + return errors.Wrap(err, "error reading queue length") + } + + s.metrics.ReportSubscriptionQueueLength(sqlite.TweetCreatedTopic, n) + return nil +}