Skip to content

Commit

Permalink
add: max message option
Browse files Browse the repository at this point in the history
Signed-off-by: tozastation <[email protected]>
  • Loading branch information
tozastation committed Mar 31, 2024
1 parent 3c0dcfb commit 9c9e778
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions pkg/integration/googlecloud/pubsubclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@ import (
"time"
)

// PubSubClient is a client for Google Cloud Pub/Sub
/*
PubSubClient is a client for Google Cloud Pub/Sub.
PubSub Client is used to receive GKE cluster notifications from Cloud Pub/Sub;
the reason for using the REST endpoint instead of gRPC is that we wanted to query only as much as needed, without using gRPC Streaming.
*/

type PubSubClient struct {
ProjectID string
SubscriptionID string
HTTPClient *http.Client
TimeoutSec int `json:"timeout_sec"`
EnableAck bool `json:"enable_ack"`
MaxMessages int `json:"max_messages"`
}

func newPubSubClient(ctx context.Context, projectID, subscriptionID string) (*PubSubClient, error) {
Expand All @@ -30,20 +35,25 @@ func newPubSubClient(ctx context.Context, projectID, subscriptionID string) (*Pu
if err != nil {
return nil, err
}
timeoutSec := 5
timeoutSec := 10
if viper.Get("googlecloud.pubsub.timeout_sec") != nil {
timeoutSec = viper.GetInt("googlecloud.pubsub.timeout_sec")
}
enableAcK := true
if viper.Get("googlecloud.pubsub.enable_ack") != nil {
enableAcK = viper.GetBool("googlecloud.pubsub.enable_ack")
}
maxMessage := 1
if viper.Get("googlecloud.pubsub.max_messages") != nil {
maxMessage = viper.GetInt("googlecloud.pubsub.max_messages")
}
return &PubSubClient{
ProjectID: projectID,
SubscriptionID: subscriptionID,
HTTPClient: googleDefaultClient,
TimeoutSec: timeoutSec,
EnableAck: enableAcK,
MaxMessages: maxMessage,
}, nil
}

Expand Down Expand Up @@ -75,7 +85,7 @@ func (p *PubSubClient) GetPullSubscriptionEndpoint() string {

func (p *PubSubClient) PullSubscription(ctx context.Context) (*PullSubscriptionResponse, error) {
reqBody := PullSubscriptionRequest{
MaxMessages: 1,
MaxMessages: p.MaxMessages,
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(reqBody); err != nil {
Expand All @@ -91,7 +101,7 @@ func (p *PubSubClient) PullSubscription(ctx context.Context) (*PullSubscriptionR
resp, err := p.HTTPClient.Do(req)
if err != nil {
if os.IsTimeout(err) {
fmt.Println("maybe message not found.")
fmt.Println("error by timeout. maybe message is not found.")
return nil, nil
}
return nil, err
Expand All @@ -104,6 +114,15 @@ func (p *PubSubClient) PullSubscription(ctx context.Context) (*PullSubscriptionR
if err := json.NewDecoder(resp.Body).Decode(&pullSubscriptionResponse); err != nil {
return nil, err
}
if p.EnableAck {
ackIDs := make([]string, 0, len(pullSubscriptionResponse.ReceivedMessages))
for _, msg := range pullSubscriptionResponse.ReceivedMessages {
ackIDs = append(ackIDs, msg.AckID)
}
if err := p.AckSubscription(ctx, ackIDs); err != nil {
return nil, err
}
}
return &pullSubscriptionResponse, nil
}

Expand Down

0 comments on commit 9c9e778

Please sign in to comment.