diff --git a/application/export_change_streams.go b/application/export_change_streams.go index ccd0b8b..d42e90d 100644 --- a/application/export_change_streams.go +++ b/application/export_change_streams.go @@ -1,12 +1,16 @@ package application import ( - "cloud.google.com/go/bigquery" - "cloud.google.com/go/pubsub" "context" "fmt" + "strings" + "time" + + "cloud.google.com/go/bigquery" + "cloud.google.com/go/pubsub" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/cam-inc/mxtransporter/config" + pconfig "github.com/cam-inc/mxtransporter/config/pubsub" interfaceForBigquery "github.com/cam-inc/mxtransporter/interfaces/bigquery" iff "github.com/cam-inc/mxtransporter/interfaces/file" interfaceForKinesisStream "github.com/cam-inc/mxtransporter/interfaces/kinesis-stream" @@ -20,8 +24,6 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" "golang.org/x/sync/errgroup" - "strings" - "time" ) type agent string @@ -164,7 +166,7 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error return err } psClientImpl := &interfaceForPubsub.PubsubClientImpl{psClient, c.Log} - psImpl = interfaceForPubsub.PubsubImpl{psClientImpl, c.Log} + psImpl = interfaceForPubsub.PubsubImpl{psClientImpl, c.Log, pconfig.PubSubConfig().OrderingBy} case KinesisStream: ksClient, err := c.Watcher.newKinesisClient(ctx) if err != nil { diff --git a/config/constant/constant.go b/config/constant/constant.go index b0744c5..504bb50 100644 --- a/config/constant/constant.go +++ b/config/constant/constant.go @@ -7,7 +7,8 @@ const ( KINESIS_STREAM_NAME = "KINESIS_STREAM_NAME" KINESIS_STREAM_REGION = "KINESIS_STREAM_REGION" - PUBSUB_TOPIC_NAME = "PUBSUB_TOPIC_NAME" + PUBSUB_TOPIC_NAME = "PUBSUB_TOPIC_NAME" + PUBSUB_ORDERING_BY = "PUBSUB_ORDERING_BY" MONGODB_HOST = "MONGODB_HOST" MONGODB_DATABASE = "MONGODB_DATABASE" diff --git a/config/pubsub/pubsub_config.go b/config/pubsub/pubsub_config.go index be69dd7..93207bc 100644 --- a/config/pubsub/pubsub_config.go +++ b/config/pubsub/pubsub_config.go @@ -1,16 +1,19 @@ package pubsub import ( - "github.com/cam-inc/mxtransporter/config/constant" "os" + + "github.com/cam-inc/mxtransporter/config/constant" ) type PubSub struct { - TopicName string + TopicName string + OrderingBy string } func PubSubConfig() PubSub { var psCfg PubSub psCfg.TopicName = os.Getenv(constant.PUBSUB_TOPIC_NAME) + psCfg.OrderingBy = os.Getenv(constant.PUBSUB_ORDERING_BY) return psCfg } diff --git a/config/pubsub/pubsub_config_test.go b/config/pubsub/pubsub_config_test.go index 5bd5eb9..0f7d3d5 100644 --- a/config/pubsub/pubsub_config_test.go +++ b/config/pubsub/pubsub_config_test.go @@ -11,14 +11,19 @@ import ( func Test_PubSubConfig(t *testing.T) { t.Run("Check to call the set environment variable.", func(t *testing.T) { - mTopicID := "xxx" - if err := os.Setenv("PUBSUB_TOPIC_NAME", mTopicID); err != nil { + if err := os.Setenv("PUBSUB_TOPIC_NAME", "xxx"); err != nil { t.Fatalf("Failed to set file PUBSUB_TOPIC_NAME environment variables.") } - + if err := os.Setenv("PUBSUB_ORDERING_BY", "yyy"); err != nil { + t.Fatalf("Failed to set file PUBSUB_ORDERING_BY environment variables.") + } psCfg := PubSubConfig() - if e, a := psCfg.TopicName, mTopicID; !reflect.DeepEqual(e, a) { - t.Fatal("Environment variable MONGODB_DATABASE is not acquired correctly.") + want := PubSub{ + TopicName: "xxx", + OrderingBy: "yyy", + } + if !reflect.DeepEqual(want, psCfg) { + t.Fatalf("Environment variable PUBSUB_* is not acquired correctly. want: %v, got: %v", want, psCfg) } }) } diff --git a/docs/gcp/gke/README.md b/docs/gcp/gke/README.md index 0835df3..4e23a1b 100644 --- a/docs/gcp/gke/README.md +++ b/docs/gcp/gke/README.md @@ -33,6 +33,15 @@ or EXPORT_DESTINATION=bigquery,pubsub ``` +### Pubsub Ordering (optional) +If you want to order message in pubsub, set ```PUBSUB_ORDERING_BY``` env. +https://cloud.google.com/pubsub/docs/ordering + +**NOTICE** +ordering message can cause performance issues. + +see https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8 + ### BigQuery schema (optional) If you want to export change streams to BigQuery, specify the following table schema. @@ -162,4 +171,4 @@ $ make upgrade ![image](https://user-images.githubusercontent.com/37132477/141406547-41edf9eb-5a17-4191-9ee3-3f13ba17ec07.png) A pod is created for each collection, and a persistent volume is linked to each pod. -Since the StatefulSet is created, even if the pod stops, you can get the change streams by referring to the resume token saved in the persistent volume again. \ No newline at end of file +Since the StatefulSet is created, even if the pod stops, you can get the change streams by referring to the resume token saved in the persistent volume again. diff --git a/docs/gcp/gke/README_JP.md b/docs/gcp/gke/README_JP.md index d037d04..1b96a80 100644 --- a/docs/gcp/gke/README_JP.md +++ b/docs/gcp/gke/README_JP.md @@ -33,6 +33,14 @@ or EXPORT_DESTINATION=bigquery,pubsub ``` +### Pubsub Ordering (オプション) +メッセージの順序指定を利用したい場合、環境変数```PUBSUB_ORDERING_BY```を設定する必要があります。 +https://cloud.google.com/pubsub/docs/ordering + +**注意** +メッセージの順序指定はパフォーマンスに悪影響をもたらす可能性があります。 +参照: https://medium.com/google-cloud/google-cloud-pub-sub-ordered-delivery-1e4181f60bc8 + ### BigQuery スキーマ (オプション) Change Streams を BigQuery にエクスポートしたい場合、以下のようなテーブルスキーマを指定する必要があります。 diff --git a/interfaces/pubsub/export.go b/interfaces/pubsub/export.go index e24cb69..10a43c4 100644 --- a/interfaces/pubsub/export.go +++ b/interfaces/pubsub/export.go @@ -1,15 +1,17 @@ package pubsub import ( - "cloud.google.com/go/pubsub" "context" "encoding/json" + "fmt" + "strings" + "time" + + "cloud.google.com/go/pubsub" pubsubConfig "github.com/cam-inc/mxtransporter/config/pubsub" "github.com/cam-inc/mxtransporter/pkg/errors" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" - "strings" - "time" ) var results []*pubsub.PublishResult @@ -18,12 +20,13 @@ type ( IPubsub interface { topicExists(ctx context.Context, topicID string) (bool, error) createTopic(ctx context.Context, topicID string) (*pubsub.Topic, error) - publishMessage(ctx context.Context, topicID string, csArray []string) error + publishMessage(ctx context.Context, topicID string, csArray []string, pmo ...publishMessageOption) error } PubsubImpl struct { - Pubsub IPubsub - Log *zap.SugaredLogger + Pubsub IPubsub + Log *zap.SugaredLogger + OrderingBy string } PubsubClientImpl struct { @@ -32,6 +35,14 @@ type ( } ) +func withOrderingKey(orderingKey string) publishMessageOption { + return func(o *pubsub.Message) { + o.OrderingKey = orderingKey + } +} + +type publishMessageOption func(opts *pubsub.Message) + func (p *PubsubClientImpl) topicExists(ctx context.Context, topicID string) (bool, error) { return p.PubsubClient.Topic(topicID).Exists(ctx) } @@ -40,13 +51,16 @@ func (p *PubsubClientImpl) createTopic(ctx context.Context, topicID string) (*pu return p.PubsubClient.CreateTopic(ctx, topicID) } -func (p *PubsubClientImpl) publishMessage(ctx context.Context, topicID string, csArray []string) error { +func (p *PubsubClientImpl) publishMessage(ctx context.Context, topicID string, csArray []string, pmo ...publishMessageOption) error { topic := p.PubsubClient.Topic(topicID) defer topic.Stop() - - r := topic.Publish(ctx, &pubsub.Message{ + message := &pubsub.Message{ Data: []byte(strings.Join(csArray, "|")), - }) + } + for _, pmo := range pmo { + pmo(message) + } + r := topic.Publish(ctx, message) for _, r := range append(results, r) { id, err := r.Get(ctx) @@ -111,9 +125,21 @@ func (p *PubsubImpl) ExportToPubsub(ctx context.Context, cs primitive.M) error { string(updDesc), } - if err := p.Pubsub.publishMessage(ctx, topicID, r); err != nil { - return err + if p.OrderingBy != "" { + key, err := p.orderingKey(cs) + if err != nil { + return err + } + return p.Pubsub.publishMessage(ctx, topicID, r, withOrderingKey(key)) } - return nil + return p.Pubsub.publishMessage(ctx, topicID, r) +} + +func (p *PubsubImpl) orderingKey(cs primitive.M) (string, error) { + key, ok := cs[p.OrderingBy] + if !ok { + return "", errors.InvalidErrorPubSubOrderingKey.New(fmt.Sprintf("Failed to get orderingKey cs: %v, orderingBy: %s", cs, p.OrderingBy)) + } + return fmt.Sprintf("%v", key), nil } diff --git a/interfaces/pubsub/export_mock.go b/interfaces/pubsub/export_mock.go index 464a715..bb4aa8d 100644 --- a/interfaces/pubsub/export_mock.go +++ b/interfaces/pubsub/export_mock.go @@ -4,10 +4,11 @@ package pubsub import ( - "cloud.google.com/go/pubsub" "context" "fmt" "reflect" + + "cloud.google.com/go/pubsub" ) type mockPubsubClientImpl struct { @@ -23,7 +24,7 @@ func (*mockPubsubClientImpl) createTopic(ctx context.Context, topicID string) (* return nil, nil } -func (m *mockPubsubClientImpl) publishMessage(_ context.Context, _ string, csArray []string) error { +func (m *mockPubsubClientImpl) publishMessage(_ context.Context, _ string, csArray []string, pmo ...publishMessageOption) error { if csArray == nil { return fmt.Errorf("Expect csItems to not be nil.") } diff --git a/interfaces/pubsub/export_test.go b/interfaces/pubsub/export_test.go index 7812b50..77d4b9b 100644 --- a/interfaces/pubsub/export_test.go +++ b/interfaces/pubsub/export_test.go @@ -5,13 +5,14 @@ package pubsub import ( "context" + "math" + "testing" + "time" + "github.com/cam-inc/mxtransporter/config" "github.com/cam-inc/mxtransporter/pkg/logger" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" - "math" - "testing" - "time" ) func Test_ExportToPubSub(t *testing.T) { @@ -48,8 +49,18 @@ func Test_ExportToPubSub(t *testing.T) { { name: "Pass to publish a message to pubsub.", runner: func(t *testing.T) { - psClientImpl := &mockPubsubClientImpl{nil, testCsArray} - mockPsImpl := PubsubImpl{psClientImpl, l} + psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: testCsArray} + mockPsImpl := PubsubImpl{psClientImpl, l, ""} + if err := mockPsImpl.ExportToPubsub(ctx, csMap); err != nil { + t.Fatalf("Testing Error, ErrorMessage: %v", err) + } + }, + }, + { + name: "Pass to publish a message to pubsub with ordering key.", + runner: func(t *testing.T) { + psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: testCsArray} + mockPsImpl := PubsubImpl{psClientImpl, l, "documentKey"} if err := mockPsImpl.ExportToPubsub(ctx, csMap); err != nil { t.Fatalf("Testing Error, ErrorMessage: %v", err) } @@ -69,8 +80,8 @@ func Test_ExportToPubSub(t *testing.T) { "updateDescription": primitive.M{"zzzzz": "test update description"}, } - psClientImpl := &mockPubsubClientImpl{nil, nil} - mockPsImpl := PubsubImpl{psClientImpl, l} + psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil} + mockPsImpl := PubsubImpl{psClientImpl, l, ""} if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil { t.Fatalf("Not behaving as intended.") } @@ -90,8 +101,8 @@ func Test_ExportToPubSub(t *testing.T) { "updateDescription": primitive.M{"zzzzz": "test update description"}, } - psClientImpl := &mockPubsubClientImpl{nil, nil} - mockPsImpl := PubsubImpl{psClientImpl, l} + psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil} + mockPsImpl := PubsubImpl{psClientImpl, l, ""} if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil { t.Fatalf("Not behaving as intended.") } @@ -111,8 +122,8 @@ func Test_ExportToPubSub(t *testing.T) { "updateDescription": primitive.M{"zzzzz": "test update description"}, } - psClientImpl := &mockPubsubClientImpl{nil, nil} - mockPsImpl := PubsubImpl{psClientImpl, l} + psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil} + mockPsImpl := PubsubImpl{psClientImpl, l, ""} if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil { t.Fatalf("Not behaving as intended.") } @@ -132,8 +143,8 @@ func Test_ExportToPubSub(t *testing.T) { "updateDescription": primitive.M{"zzzzz": "test update description"}, } - psClientImpl := &mockPubsubClientImpl{nil, nil} - mockPsImpl := PubsubImpl{psClientImpl, l} + psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil} + mockPsImpl := PubsubImpl{psClientImpl, l, ""} if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil { t.Fatalf("Not behaving as intended.") } @@ -153,8 +164,18 @@ func Test_ExportToPubSub(t *testing.T) { "updateDescription": math.NaN(), } - psClientImpl := &mockPubsubClientImpl{nil, nil} - mockPsImpl := PubsubImpl{psClientImpl, l} + psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: nil} + mockPsImpl := PubsubImpl{psClientImpl, l, ""} + if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil { + t.Fatalf("Not behaving as intended.") + } + }, + }, + { + name: "Failed to get ordering key.", + runner: func(t *testing.T) { + psClientImpl := &mockPubsubClientImpl{pubsubClient: nil, cs: testCsArray} + mockPsImpl := PubsubImpl{psClientImpl, l, "invalid-key"} if err := mockPsImpl.ExportToPubsub(ctx, csMap); err == nil { t.Fatalf("Not behaving as intended.") } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 624240a..c1251cb 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -21,6 +21,7 @@ const ( InternalServerErrorPubSubFind = errType("500: pubsub find error") InternalServerErrorPubSubCreate = errType("500: pubsub create error") InternalServerErrorPubSubPublish = errType("500: pubsub publish error") + InvalidErrorPubSubOrderingKey = errType("400: pubsub ordering key error") // kinesis stream InternalServerErrorKinesisStreamPut = errType("500: kinesis stream put error") // local storage file