Skip to content

Commit

Permalink
Merge pull request #59 from planetary-social/follow-change-notificati…
Browse files Browse the repository at this point in the history
…on-with-feature-flag

Feature flagged follow change notification
  • Loading branch information
dcadenas authored Aug 22, 2024
2 parents 208b1b2 + 1646c82 commit 167dc8d
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 19 deletions.
20 changes: 10 additions & 10 deletions cmd/notification-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions service/adapters/apns/apns.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package apns

import (
"encoding/json"
"time"

"github.com/boreq/errors"
"github.com/google/uuid"
"github.com/planetary-social/go-notification-service/internal/logging"
"github.com/planetary-social/go-notification-service/service/config"
"github.com/planetary-social/go-notification-service/service/domain"
"github.com/planetary-social/go-notification-service/service/domain/notifications"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/certificate"
Expand Down Expand Up @@ -73,3 +78,84 @@ func (a *APNS) SendNotification(notification notifications.Notification) error {

return nil
}

func (a *APNS) SendFollowChangeNotification(followChange domain.FollowChange, apnsToken domain.APNSToken) error {
if apnsToken.Hex() == "" {
return errors.New("invalid APNs token")
}
n, err := a.buildFollowChangeNotification(followChange, apnsToken)
if err != nil {
return err
}
resp, err := a.client.Push(n)
//a.metrics.ReportCallToAPNS(resp.StatusCode, err)
if err != nil {
return errors.Wrap(err, "error pushing the follow change notification")
}

if resp.StatusCode == 200 {
a.logger.Debug().
WithField("uuid", n.ApnsID).
WithField("response.reason", resp.Reason).
WithField("response.statusCode", resp.StatusCode).
WithField("host", a.client.Host).
Message("sent a follow change notification")
} else {
a.logger.Error().
WithField("uuid", n.ApnsID).
WithField("response.reason", resp.Reason).
WithField("response.statusCode", resp.StatusCode).
WithField("host", a.client.Host).
Message("failed to send a follow change notification")
}

return nil
}

func (a *APNS) buildFollowChangeNotification(followChange domain.FollowChange, apnsToken domain.APNSToken) (*apns2.Notification, error) {
payload, err := followChangePayload(followChange)
if err != nil {
return nil, errors.Wrap(err, "error creating a payload")
}

n := &apns2.Notification{
PushType: apns2.PushTypeAlert,
ApnsID: uuid.New().String(),
DeviceToken: apnsToken.Hex(),
Topic: a.cfg.APNSTopic(),
Payload: payload,
Priority: apns2.PriorityLow,
}

return n, nil
}

func followChangePayload(followChange domain.FollowChange) ([]byte, error) {
alertMessage := ""
if followChange.ChangeType == "unfollowed" {
alertMessage = followChange.FriendlyFollower + " has unfollowed you!"
} else {
alertMessage = followChange.FriendlyFollower + " is a new follower!"
}

payload := map[string]interface{}{
"aps": map[string]interface{}{
"alert": alertMessage,
"sound": "default",
"badge": 1,
},
"data": map[string]interface{}{
"changeType": followChange.ChangeType,
"at": followChange.At.Format(time.RFC3339),
"follower": followChange.Follower.Hex(),
"friendlyFollower": followChange.FriendlyFollower,
},
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
return nil, err
}

return payloadBytes, nil
}
7 changes: 7 additions & 0 deletions service/adapters/apns/apns_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/planetary-social/go-notification-service/internal"
"github.com/planetary-social/go-notification-service/internal/logging"
"github.com/planetary-social/go-notification-service/service/config"
"github.com/planetary-social/go-notification-service/service/domain"
"github.com/planetary-social/go-notification-service/service/domain/notifications"
)

Expand Down Expand Up @@ -34,6 +35,12 @@ func (a *APNSMock) SendNotification(notification notifications.Notification) err
return nil
}

func (a *APNSMock) SendFollowChangeNotification(followChange domain.FollowChange, token domain.APNSToken) error {
notification := notifications.Notification{}

return a.SendNotification(notification)
}

func (a *APNSMock) SentNotifications() []notifications.Notification {
a.sentNotificationsLock.Lock()
defer a.sentNotificationsLock.Unlock()
Expand Down
1 change: 1 addition & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Queries struct {

type APNS interface {
SendNotification(notification notifications.Notification) error
SendFollowChangeNotification(followChange domain.FollowChange, apnsToken domain.APNSToken) error
}

type EventOrError struct {
Expand Down
61 changes: 52 additions & 9 deletions service/app/follow_change_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,96 @@ import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/go-notification-service/internal/logging"
)

type FollowChangePuller struct {
externalFollowChangeSubscriber ExternalFollowChangeSubscriber
apns APNS
queries Queries
logger logging.Logger
metrics Metrics
counter int
}

func NewFollowChangePuller(
externalFollowChangeSubscriber ExternalFollowChangeSubscriber,
apns APNS,
queries Queries,
logger logging.Logger,
metrics Metrics,
) *FollowChangePuller {
return &FollowChangePuller{
externalFollowChangeSubscriber: externalFollowChangeSubscriber,
apns: apns,
queries: queries,
logger: logger.New("followChangePuller"),
metrics: metrics,
counter: 0,
}
}

// Listens for messages from the follow-change pubsub and for each of them, if
// they belong to one of our users, we send a notification
func (f *FollowChangePuller) Run(ctx context.Context) error {
go f.storeMetricsLoop(ctx)

ch, err := f.externalFollowChangeSubscriber.Subscribe(ctx)
if err != nil {
return err
return errors.Wrap(err, "error subscribing to follow changes")
}

for followChange := range ch {
f.logger.Debug().Message(followChange.String())
for {
select {
case followChange, ok := <-ch:
if !ok {
return nil // Channel closed, exit gracefully
}

f.counter += 1
}
f.logger.Debug().Message(followChange.String())

// TODO: for the moment, just send notifications to Daniel
if followChange.Followee.Hex() != "89ef92b9ebe6dc1e4ea398f6477f227e95429627b0a33dc89b640e137b256be5" {
f.logger.Debug().WithField("followee", followChange.Followee.Hex()).Message("ignoring follow change")
continue
}

tokens, err := f.queries.GetTokens.Handle(ctx, followChange.Followee)
if err != nil {
// Not one of our users, ignore
continue
}

for _, token := range tokens {
if err := f.apns.SendFollowChangeNotification(*followChange, token); err != nil {
f.logger.Error().
WithField("token", token.Hex()).
WithField("followee", followChange.Followee.Hex()).
WithError(err).
Message("error sending follow change notification")
continue
}
}

return nil
f.counter += 1
case <-ctx.Done():
f.logger.Debug().Message("context canceled, shutting down FollowChangePuller")
return nil
}
}
}

func (f *FollowChangePuller) storeMetricsLoop(ctx context.Context) {
for {
f.storeMetrics()
ticker := time.NewTicker(storeMetricsEvery)
defer ticker.Stop()

for {
select {
case <-time.After(storeMetricsEvery):
case <-ticker.C:
f.storeMetrics()
case <-ctx.Done():
f.logger.Debug().Message("context canceled, stopping metrics loop")
return
}
}
Expand Down

0 comments on commit 167dc8d

Please sign in to comment.