Skip to content

Commit

Permalink
Consume pubsub queue and just log
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Aug 20, 2024
1 parent 39c3411 commit 7d5c20e
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 17 deletions.
13 changes: 13 additions & 0 deletions cmd/notification-service/di/inject_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var pubsubSet = wire.NewSet(

var googlePubsubSet = wire.NewSet(
newExternalEventPublisher,
newExternalFollowChangeSubscriber,
)

func newExternalEventPublisher(config config.Config, logger watermill.LoggerAdapter) (app.ExternalEventPublisher, error) {
Expand All @@ -31,3 +32,15 @@ func newExternalEventPublisher(config config.Config, logger watermill.LoggerAdap
return gcp.NewNoopPublisher(), nil
}
}

func newExternalFollowChangeSubscriber(config config.Config, logger watermill.LoggerAdapter) (app.ExternalFollowChangeSubscriber, error) {
if config.GooglePubSubEnabled() {
subscriber, err := gcp.NewWatermillSubscriber(config, logger)
if err != nil {
return nil, errors.Wrap(err, "error creating a watermil subscriber")
}
return gcp.NewFollowChangeSubscriber(subscriber, logger), nil
} else {
return gcp.NewNoopSubscriber(), nil
}
}
38 changes: 24 additions & 14 deletions cmd/notification-service/di/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,38 @@ import (
)

type Service struct {
app app.Application
server http.Server
metricsServer http.MetricsServer
downloader *app.Downloader
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber
eventSavedSubscriber *firestorepubsub.EventSavedSubscriber
eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache
app app.Application
server http.Server
metricsServer http.MetricsServer
downloader *app.Downloader
followChangePuller *app.FollowChangePuller
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber
externalFollowChangeSubscriber app.ExternalFollowChangeSubscriber
eventSavedSubscriber *firestorepubsub.EventSavedSubscriber
eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache
}

func NewService(
app app.Application,
server http.Server,
metricsServer http.MetricsServer,
downloader *app.Downloader,
followChangePuller *app.FollowChangePuller,
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber,
externalFollowChangeSubscriber app.ExternalFollowChangeSubscriber,
eventSavedSubscriber *firestorepubsub.EventSavedSubscriber,
eventWasAlreadySavedCache *adapters.MemoryEventWasAlreadySavedCache,
) Service {
return Service{
app: app,
server: server,
metricsServer: metricsServer,
downloader: downloader,
receivedEventSubscriber: receivedEventSubscriber,
eventSavedSubscriber: eventSavedSubscriber,
eventWasAlreadySavedCache: eventWasAlreadySavedCache,
app: app,
server: server,
metricsServer: metricsServer,
downloader: downloader,
followChangePuller: followChangePuller,
receivedEventSubscriber: receivedEventSubscriber,
externalFollowChangeSubscriber: externalFollowChangeSubscriber,
eventSavedSubscriber: eventSavedSubscriber,
eventWasAlreadySavedCache: eventWasAlreadySavedCache,
}
}

Expand Down Expand Up @@ -73,6 +79,10 @@ func (s Service) Run(ctx context.Context) error {
errCh <- errors.Wrap(s.receivedEventSubscriber.Run(ctx), "received event subscriber error")
}()

go func() {
errCh <- errors.Wrap(s.followChangePuller.Run(ctx), "follow change subscriber error")
}()

runners++
go func() {
errCh <- errors.Wrap(s.eventSavedSubscriber.Run(ctx), "event saved subscriber error")
Expand Down
8 changes: 8 additions & 0 deletions cmd/notification-service/di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func BuildService(context.Context, config.Config) (Service, func(), error) {
googlePubsubSet,
loggingSet,
adaptersSet,
followChangePullerSet,
)
return Service{}, nil, nil
}
Expand All @@ -49,13 +50,16 @@ func BuildIntegrationService(context.Context, config.Config) (IntegrationService
applicationSet,
firestoreAdaptersSet,
downloaderSet,
followChangePullerSet,
generatorSet,
pubsubSet,
loggingSet,
integrationAdaptersSet,

mocks.NewMockExternalEventPublisher,
mocks.NewMockExternalFollowChangeSubscriber,
wire.Bind(new(app.ExternalEventPublisher), new(*mocks.MockExternalEventPublisher)),
wire.Bind(new(app.ExternalFollowChangeSubscriber), new(*mocks.MockExternalFollowChangeSubscriber)),
)
return IntegrationService{}, nil, nil
}
Expand All @@ -79,6 +83,10 @@ var downloaderSet = wire.NewSet(
app.NewDownloader,
)

var followChangePullerSet = wire.NewSet(
app.NewFollowChangePuller,
)

var generatorSet = wire.NewSet(
notifications.NewGenerator,
)
14 changes: 12 additions & 2 deletions cmd/notification-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-multierror v1.1.1
github.com/nbd-wtf/go-nostr v0.25.7
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/rs/zerolog v1.29.1
github.com/sideshow/apns2 v0.23.0
Expand Down Expand Up @@ -61,7 +62,6 @@ require (
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
Expand Down
75 changes: 75 additions & 0 deletions service/adapters/gcp/gcp_follow_change_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package gcp

import (
"context"
"encoding/json"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
"github.com/pkg/errors"
"github.com/planetary-social/go-notification-service/service/config"
"github.com/planetary-social/go-notification-service/service/domain"
"google.golang.org/api/option"
)

const googlePubSubFollowChangeTopic = "follow-changes"

func NewWatermillSubscriber(config config.Config, logger watermill.LoggerAdapter) (*googlecloud.Subscriber, error) {
var options []option.ClientOption

if j := config.GooglePubSubCredentialsJSON(); len(j) > 0 {
options = append(options, option.WithCredentialsJSON(config.GooglePubSubCredentialsJSON()))
}

publisherConfig := googlecloud.SubscriberConfig{
ProjectID: config.GooglePubSubProjectID(),
DoNotCreateTopicIfMissing: true,
ClientOptions: options,
}

return googlecloud.NewSubscriber(publisherConfig, logger)
}

type GCPFollowChangeSubscriber struct {
subscriber *googlecloud.Subscriber
logger watermill.LoggerAdapter
}

func NewFollowChangeSubscriber(subscriber *googlecloud.Subscriber, logger watermill.LoggerAdapter) *GCPFollowChangeSubscriber {
return &GCPFollowChangeSubscriber{subscriber: subscriber, logger: logger}
}

func (p *GCPFollowChangeSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) {
subChan, err := p.subscriber.Subscribe(ctx, googlePubSubFollowChangeTopic)
if err != nil {
return nil, errors.Wrap(err, "error subscribing")
}

ch := make(chan *domain.FollowChange)

defer func() {
close(ch)
p.subscriber.Close()
}()

go func() {
var payload domain.FollowChange
for message := range subChan {
// We never retry messages so we can ACK immediately.
message.Ack()

if err := json.Unmarshal(message.Payload, &payload); err != nil {
p.logger.Error("error unmarshaling follow change payload", err, watermill.LogFields{"payload": string(message.Payload)})
continue
}

select {
case ch <- &payload:
case <-ctx.Done():
return
}
}
}()

return ch, nil
}
18 changes: 18 additions & 0 deletions service/adapters/gcp/noop_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package gcp

import (
"context"

"github.com/planetary-social/go-notification-service/service/domain"
)

type NoopSubscriber struct {
}

func NewNoopSubscriber() *NoopSubscriber {
return &NoopSubscriber{}
}
func (p *NoopSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) {
ch := make(chan *domain.FollowChange)
return ch, nil
}
19 changes: 19 additions & 0 deletions service/adapters/mocks/external_follow_change_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package mocks

import (
"context"

"github.com/planetary-social/go-notification-service/service/domain"
)

type MockExternalFollowChangeSubscriber struct {
}

func NewMockExternalFollowChangeSubscriber() *MockExternalFollowChangeSubscriber {
return &MockExternalFollowChangeSubscriber{}
}

func (m MockExternalFollowChangeSubscriber) Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error) {
ch := make(chan *domain.FollowChange)
return ch, nil
}
14 changes: 14 additions & 0 deletions service/adapters/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
labelHandlerName = "handlerName"
labelRelayDownloaderState = "state"
labelFollowChanges = "follow changes"
labelTopic = "topic"
labelVcsRevision = "vcsRevision"
labelVcsTime = "vcsTime"
Expand All @@ -32,6 +33,7 @@ type Prometheus struct {
applicationHandlerCallsCounter *prometheus.CounterVec
applicationHandlerCallDurationHistogram *prometheus.HistogramVec
relayDownloaderStateGauge *prometheus.GaugeVec
relayFollowChangeGauge prometheus.Counter
subscriptionQueueLengthGauge *prometheus.GaugeVec
apnsCallsCounter *prometheus.CounterVec

Expand Down Expand Up @@ -62,6 +64,12 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
},
[]string{labelRelayDownloaderState},
)
relayFollowChangeGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "follow_change_count",
Help: "Number of follow changes.",
},
)
subscriptionQueueLengthGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "subscription_queue_length",
Expand Down Expand Up @@ -89,6 +97,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram,
relayDownloaderStateGauge,
relayFollowChangeGauge,
subscriptionQueueLengthGauge,
versionGague,
apnsCallsCounter,
Expand Down Expand Up @@ -118,6 +127,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
applicationHandlerCallsCounter: applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram: applicationHandlerCallDurationHistogram,
relayDownloaderStateGauge: relayDownloaderStateGauge,
relayFollowChangeGauge: relayFollowChangeGauge,
subscriptionQueueLengthGauge: subscriptionQueueLengthGauge,
apnsCallsCounter: apnsCallsCounter,

Expand All @@ -135,6 +145,10 @@ func (p *Prometheus) MeasureRelayDownloadersState(n int, state app.RelayDownload
p.relayDownloaderStateGauge.With(prometheus.Labels{labelRelayDownloaderState: state.String()}).Set(float64(n))
}

func (p *Prometheus) MeasureFollowChange(n int) {
p.relayFollowChangeGauge.Add(float64(n))
}

func (p *Prometheus) ReportSubscriptionQueueLength(topic string, n int) {
p.subscriptionQueueLengthGauge.With(prometheus.Labels{labelTopic: topic}).Set(float64(n))
}
Expand Down
5 changes: 5 additions & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type ExternalEventPublisher interface {
PublishNewEventReceived(ctx context.Context, event domain.Event) error
}

type ExternalFollowChangeSubscriber interface {
Subscribe(ctx context.Context) (<-chan *domain.FollowChange, error)
}

type Application struct {
Commands Commands
Queries Queries
Expand Down Expand Up @@ -123,6 +127,7 @@ type ReceivedEventSubscriber interface {
type Metrics interface {
StartApplicationCall(handlerName string) ApplicationCall
MeasureRelayDownloadersState(n int, state RelayDownloaderState)
MeasureFollowChange(n int)
}

type ApplicationCall interface {
Expand Down
Loading

0 comments on commit 7d5c20e

Please sign in to comment.