Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix follow changes loop, better log formatting #58

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ flowchart TB

3. **Tag the Image:**
Tag the image with the `stable` tag as described in the [GitHub documentation](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry).
```
docker tag ghcr.io/planetary-social/nos-notification-service-go:latest ghcr.io/planetary-social/nos-notification-service-go:stable && docker push ghcr.io/planetary-social/nos-notification-service-go:stable
```

4. **Trigger the Image Update Process:**
The image update process checks for new tags every 3 minutes. Therefore, you should see the new image deployed in approximately 5 minutes.
10 changes: 4 additions & 6 deletions service/adapters/gcp/gcp_follow_change_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ func (p *GCPFollowChangeSubscriber) Subscribe(ctx context.Context) (<-chan *doma

ch := make(chan *domain.FollowChange)

defer func() {
close(ch)
p.subscriber.Close()
}()

go func() {
var payload domain.FollowChange
defer close(ch)
defer p.subscriber.Close()

for message := range subChan {
// We never retry messages so we can ACK immediately.
message.Ack()

var payload domain.FollowChange
if err := json.Unmarshal(message.Payload, &payload); err != nil {
p.logger.Error("error unmarshaling follow change payload", err, watermill.LogFields{"payload": string(message.Payload)})
continue
Expand Down
3 changes: 2 additions & 1 deletion service/app/follow_change_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (f *FollowChangePuller) Run(ctx context.Context) error {
}

for followChange := range ch {
f.logger.Debug().WithField("followChange", followChange).Message("received follow change")
f.logger.Debug().Message(followChange.String())

f.counter += 1
}

Expand Down
52 changes: 50 additions & 2 deletions service/domain/follow_change.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package domain

import (
"encoding/json"
"errors"
"fmt"
"time"
)

type FollowChange struct {
ChangeType string `json:"changeType"`
At uint `json:"at"`
At time.Time `json:"at"`
Follower PublicKey `json:"follower"`
Followee PublicKey `json:"followee"`
FriendlyFollowee string `json:"friendlyFollowee"`
FriendlyFollower string `json:"friendlyFollower"`
}

func NewFollowChange(changeType string, follower PublicKey, friendlyFollower string, followee PublicKey, friendlyFollowee string, at uint) FollowChange {
func NewFollowChange(changeType string, follower PublicKey, friendlyFollower string, followee PublicKey, friendlyFollowee string, at time.Time) FollowChange {
return FollowChange{
ChangeType: changeType,
Follower: follower,
Expand All @@ -19,3 +26,44 @@ func NewFollowChange(changeType string, follower PublicKey, friendlyFollower str
At: at,
}
}

func (f *FollowChange) UnmarshalJSON(data []byte) error {
var temp struct {
ChangeType string `json:"changeType"`
At int64 `json:"at"`
Follower string `json:"follower"`
Followee string `json:"followee"`
FriendlyFollowee string `json:"friendlyFollowee"`
FriendlyFollower string `json:"friendlyFollower"`
}

if err := json.Unmarshal(data, &temp); err != nil {
return err
}

f.ChangeType = temp.ChangeType
f.At = time.Unix(temp.At, 0)
f.FriendlyFollowee = temp.FriendlyFollowee
f.FriendlyFollower = temp.FriendlyFollower

var err error
f.Follower, err = NewPublicKeyFromHex(temp.Follower)
if err != nil {
return errors.New("invalid hex for follower: " + err.Error())
}

f.Followee, err = NewPublicKeyFromHex(temp.Followee)
if err != nil {
return errors.New("invalid hex for followee: " + err.Error())
}

return nil
}

func (f FollowChange) String() string {
if f.ChangeType == "unfollowed" {
return fmt.Sprintf("New unfollow: %s(%s) --x--> %s(%s)", f.FriendlyFollower, f.Follower.s, f.FriendlyFollowee, f.Followee.s)
}

return fmt.Sprintf("New follow: %s(%s) -----> %s(%s)", f.FriendlyFollower, f.Follower.s, f.FriendlyFollowee, f.Followee.s)
}
Loading