Skip to content

Commit

Permalink
Merge pull request #28 from 0daryo/feature/pubsub-ordering
Browse files Browse the repository at this point in the history
Add: pubsub ordering key
  • Loading branch information
KenFujimoto12 authored Jul 21, 2022
2 parents 380e436 + 181f5ed commit 7a41f1e
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 44 deletions.
12 changes: 7 additions & 5 deletions application/export_change_streams.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package application

import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/pubsub"
"context"
"fmt"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/pubsub"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/cam-inc/mxtransporter/config"
pconfig "github.com/cam-inc/mxtransporter/config/pubsub"
interfaceForBigquery "github.com/cam-inc/mxtransporter/interfaces/bigquery"
iff "github.com/cam-inc/mxtransporter/interfaces/file"
interfaceForKinesisStream "github.com/cam-inc/mxtransporter/interfaces/kinesis-stream"
Expand All @@ -20,8 +24,6 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"strings"
"time"
)

type agent string
Expand Down Expand Up @@ -164,7 +166,7 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error
return err
}
psClientImpl := &interfaceForPubsub.PubsubClientImpl{psClient, c.Log}
psImpl = interfaceForPubsub.PubsubImpl{psClientImpl, c.Log}
psImpl = interfaceForPubsub.PubsubImpl{psClientImpl, c.Log, pconfig.PubSubConfig().OrderingBy}
case KinesisStream:
ksClient, err := c.Watcher.newKinesisClient(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion config/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const (
KINESIS_STREAM_NAME = "KINESIS_STREAM_NAME"
KINESIS_STREAM_REGION = "KINESIS_STREAM_REGION"

PUBSUB_TOPIC_NAME = "PUBSUB_TOPIC_NAME"
PUBSUB_TOPIC_NAME = "PUBSUB_TOPIC_NAME"
PUBSUB_ORDERING_BY = "PUBSUB_ORDERING_BY"

MONGODB_HOST = "MONGODB_HOST"
MONGODB_DATABASE = "MONGODB_DATABASE"
Expand Down
7 changes: 5 additions & 2 deletions config/pubsub/pubsub_config.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package pubsub

import (
"github.com/cam-inc/mxtransporter/config/constant"
"os"

"github.com/cam-inc/mxtransporter/config/constant"
)

type PubSub struct {
TopicName string
TopicName string
OrderingBy string
}

func PubSubConfig() PubSub {
var psCfg PubSub
psCfg.TopicName = os.Getenv(constant.PUBSUB_TOPIC_NAME)
psCfg.OrderingBy = os.Getenv(constant.PUBSUB_ORDERING_BY)
return psCfg
}
15 changes: 10 additions & 5 deletions config/pubsub/pubsub_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ import (

func Test_PubSubConfig(t *testing.T) {
t.Run("Check to call the set environment variable.", func(t *testing.T) {
mTopicID := "xxx"
if err := os.Setenv("PUBSUB_TOPIC_NAME", mTopicID); err != nil {
if err := os.Setenv("PUBSUB_TOPIC_NAME", "xxx"); err != nil {
t.Fatalf("Failed to set file PUBSUB_TOPIC_NAME environment variables.")
}

if err := os.Setenv("PUBSUB_ORDERING_BY", "yyy"); err != nil {
t.Fatalf("Failed to set file PUBSUB_ORDERING_BY environment variables.")
}
psCfg := PubSubConfig()
if e, a := psCfg.TopicName, mTopicID; !reflect.DeepEqual(e, a) {
t.Fatal("Environment variable MONGODB_DATABASE is not acquired correctly.")
want := PubSub{
TopicName: "xxx",
OrderingBy: "yyy",
}
if !reflect.DeepEqual(want, psCfg) {
t.Fatalf("Environment variable PUBSUB_* is not acquired correctly. want: %v, got: %v", want, psCfg)
}
})
}
11 changes: 10 additions & 1 deletion docs/gcp/gke/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ or
EXPORT_DESTINATION=bigquery,pubsub
```

### Pubsub Ordering (optional)
If you want to order message in pubsub, set ```PUBSUB_ORDERING_BY``` env.
https://cloud.google.com/pubsub/docs/ordering

**NOTICE**
ordering message can cause performance issues.

see https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8

### BigQuery schema (optional)
If you want to export change streams to BigQuery, specify the following table schema.

Expand Down Expand Up @@ -162,4 +171,4 @@ $ make upgrade
![image](https://user-images.githubusercontent.com/37132477/141406547-41edf9eb-5a17-4191-9ee3-3f13ba17ec07.png)

A pod is created for each collection, and a persistent volume is linked to each pod.
Since the StatefulSet is created, even if the pod stops, you can get the change streams by referring to the resume token saved in the persistent volume again.
Since the StatefulSet is created, even if the pod stops, you can get the change streams by referring to the resume token saved in the persistent volume again.
8 changes: 8 additions & 0 deletions docs/gcp/gke/README_JP.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ or
EXPORT_DESTINATION=bigquery,pubsub
```

### Pubsub Ordering (オプション)
メッセージの順序指定を利用したい場合、環境変数```PUBSUB_ORDERING_BY```を設定する必要があります。
https://cloud.google.com/pubsub/docs/ordering

**注意**
メッセージの順序指定はパフォーマンスに悪影響をもたらす可能性があります。
参照: https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8

### BigQuery スキーマ (オプション)
Change Streams を BigQuery にエクスポートしたい場合、以下のようなテーブルスキーマを指定する必要があります。

Expand Down
52 changes: 39 additions & 13 deletions interfaces/pubsub/export.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package pubsub

import (
"cloud.google.com/go/pubsub"
"context"
"encoding/json"
"fmt"
"strings"
"time"

"cloud.google.com/go/pubsub"
pubsubConfig "github.com/cam-inc/mxtransporter/config/pubsub"
"github.com/cam-inc/mxtransporter/pkg/errors"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"strings"
"time"
)

var results []*pubsub.PublishResult
Expand All @@ -18,12 +20,13 @@ type (
IPubsub interface {
topicExists(ctx context.Context, topicID string) (bool, error)
createTopic(ctx context.Context, topicID string) (*pubsub.Topic, error)
publishMessage(ctx context.Context, topicID string, csArray []string) error
publishMessage(ctx context.Context, topicID string, csArray []string, pmo ...publishMessageOption) error
}

PubsubImpl struct {
Pubsub IPubsub
Log *zap.SugaredLogger
Pubsub IPubsub
Log *zap.SugaredLogger
OrderingBy string
}

PubsubClientImpl struct {
Expand All @@ -32,6 +35,14 @@ type (
}
)

func withOrderingKey(orderingKey string) publishMessageOption {
return func(o *pubsub.Message) {
o.OrderingKey = orderingKey
}
}

type publishMessageOption func(opts *pubsub.Message)

func (p *PubsubClientImpl) topicExists(ctx context.Context, topicID string) (bool, error) {
return p.PubsubClient.Topic(topicID).Exists(ctx)
}
Expand All @@ -40,13 +51,16 @@ func (p *PubsubClientImpl) createTopic(ctx context.Context, topicID string) (*pu
return p.PubsubClient.CreateTopic(ctx, topicID)
}

func (p *PubsubClientImpl) publishMessage(ctx context.Context, topicID string, csArray []string) error {
func (p *PubsubClientImpl) publishMessage(ctx context.Context, topicID string, csArray []string, pmo ...publishMessageOption) error {
topic := p.PubsubClient.Topic(topicID)
defer topic.Stop()

r := topic.Publish(ctx, &pubsub.Message{
message := &pubsub.Message{
Data: []byte(strings.Join(csArray, "|")),
})
}
for _, pmo := range pmo {
pmo(message)
}
r := topic.Publish(ctx, message)

for _, r := range append(results, r) {
id, err := r.Get(ctx)
Expand Down Expand Up @@ -111,9 +125,21 @@ func (p *PubsubImpl) ExportToPubsub(ctx context.Context, cs primitive.M) error {
string(updDesc),
}

if err := p.Pubsub.publishMessage(ctx, topicID, r); err != nil {
return err
if p.OrderingBy != "" {
key, err := p.orderingKey(cs)
if err != nil {
return err
}
return p.Pubsub.publishMessage(ctx, topicID, r, withOrderingKey(key))
}

return nil
return p.Pubsub.publishMessage(ctx, topicID, r)
}

func (p *PubsubImpl) orderingKey(cs primitive.M) (string, error) {
key, ok := cs[p.OrderingBy]
if !ok {
return "", errors.InvalidErrorPubSubOrderingKey.New(fmt.Sprintf("Failed to get orderingKey cs: %v, orderingBy: %s", cs, p.OrderingBy))
}
return fmt.Sprintf("%v", key), nil
}
5 changes: 3 additions & 2 deletions interfaces/pubsub/export_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
package pubsub

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"reflect"

"cloud.google.com/go/pubsub"
)

type mockPubsubClientImpl struct {
Expand All @@ -23,7 +24,7 @@ func (*mockPubsubClientImpl) createTopic(ctx context.Context, topicID string) (*
return nil, nil
}

func (m *mockPubsubClientImpl) publishMessage(_ context.Context, _ string, csArray []string) error {
func (m *mockPubsubClientImpl) publishMessage(_ context.Context, _ string, csArray []string, pmo ...publishMessageOption) error {
if csArray == nil {
return fmt.Errorf("Expect csItems to not be nil.")
}
Expand Down
51 changes: 36 additions & 15 deletions interfaces/pubsub/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ package pubsub

import (
"context"
"math"
"testing"
"time"

"github.com/cam-inc/mxtransporter/config"
"github.com/cam-inc/mxtransporter/pkg/logger"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"math"
"testing"
"time"
)

func Test_ExportToPubSub(t *testing.T) {
Expand Down Expand Up @@ -48,8 +49,18 @@ func Test_ExportToPubSub(t *testing.T) {
{
name: "Pass to publish a message to pubsub.",
runner: func(t *testing.T) {
psClientImpl := &mockPubsubClientImpl{nil, testCsArray}
mockPsImpl := PubsubImpl{psClientImpl, l}
psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: testCsArray}
mockPsImpl := PubsubImpl{psClientImpl, l, ""}
if err := mockPsImpl.ExportToPubsub(ctx, csMap); err != nil {
t.Fatalf("Testing Error, ErrorMessage: %v", err)
}
},
},
{
name: "Pass to publish a message to pubsub with ordering key.",
runner: func(t *testing.T) {
psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: testCsArray}
mockPsImpl := PubsubImpl{psClientImpl, l, "documentKey"}
if err := mockPsImpl.ExportToPubsub(ctx, csMap); err != nil {
t.Fatalf("Testing Error, ErrorMessage: %v", err)
}
Expand All @@ -69,8 +80,8 @@ func Test_ExportToPubSub(t *testing.T) {
"updateDescription": primitive.M{"zzzzz": "test update description"},
}

psClientImpl := &mockPubsubClientImpl{nil, nil}
mockPsImpl := PubsubImpl{psClientImpl, l}
psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil}
mockPsImpl := PubsubImpl{psClientImpl, l, ""}
if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil {
t.Fatalf("Not behaving as intended.")
}
Expand All @@ -90,8 +101,8 @@ func Test_ExportToPubSub(t *testing.T) {
"updateDescription": primitive.M{"zzzzz": "test update description"},
}

psClientImpl := &mockPubsubClientImpl{nil, nil}
mockPsImpl := PubsubImpl{psClientImpl, l}
psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil}
mockPsImpl := PubsubImpl{psClientImpl, l, ""}
if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil {
t.Fatalf("Not behaving as intended.")
}
Expand All @@ -111,8 +122,8 @@ func Test_ExportToPubSub(t *testing.T) {
"updateDescription": primitive.M{"zzzzz": "test update description"},
}

psClientImpl := &mockPubsubClientImpl{nil, nil}
mockPsImpl := PubsubImpl{psClientImpl, l}
psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil}
mockPsImpl := PubsubImpl{psClientImpl, l, ""}
if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil {
t.Fatalf("Not behaving as intended.")
}
Expand All @@ -132,8 +143,8 @@ func Test_ExportToPubSub(t *testing.T) {
"updateDescription": primitive.M{"zzzzz": "test update description"},
}

psClientImpl := &mockPubsubClientImpl{nil, nil}
mockPsImpl := PubsubImpl{psClientImpl, l}
psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil}
mockPsImpl := PubsubImpl{psClientImpl, l, ""}
if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil {
t.Fatalf("Not behaving as intended.")
}
Expand All @@ -153,8 +164,18 @@ func Test_ExportToPubSub(t *testing.T) {
"updateDescription": math.NaN(),
}

psClientImpl := &mockPubsubClientImpl{nil, nil}
mockPsImpl := PubsubImpl{psClientImpl, l}
psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil}
mockPsImpl := PubsubImpl{psClientImpl, l, ""}
if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil {
t.Fatalf("Not behaving as intended.")
}
},
},
{
name: "Failed to get ordering key.",
runner: func(t *testing.T) {
psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: testCsArray}
mockPsImpl := PubsubImpl{psClientImpl, l, "invalid-key"}
if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil {
t.Fatalf("Not behaving as intended.")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
InternalServerErrorPubSubFind = errType("500: pubsub find error")
InternalServerErrorPubSubCreate = errType("500: pubsub create error")
InternalServerErrorPubSubPublish = errType("500: pubsub publish error")
InvalidErrorPubSubOrderingKey = errType("400: pubsub ordering key error")
// kinesis stream
InternalServerErrorKinesisStreamPut = errType("500: kinesis stream put error")
// local storage file
Expand Down

0 comments on commit 7a41f1e

Please sign in to comment.