Skip to content
This repository has been archived by the owner on Jun 20, 2020. It is now read-only.

Commit

Permalink
Allow multiple subscriptions (#37)
Browse files Browse the repository at this point in the history
This PR adds the ability for users to update the nubmer of
simultaneous client connections to the Pub/Sub subscription.

Pub/Sub has a hard limit of 10MBPS per client connection for
streaming fetches. The client-go library already supports
the ability to create multiple simultaneous GRPC clients and
we're just exposing that.
  • Loading branch information
josephlewis42 authored Jan 15, 2020
1 parent 5348230 commit e16881b
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 2 deletions.
7 changes: 7 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ pubsubbeat:
# Valid time units are "m" and "h". You can also compose them: "2h30m".
subscription.retention_duration: 168h # Defaults to 7 days

# How many simultaneous connections the beat will establish to the Pub/Sub endpoint.
# Pub/Sub streaming pull has a per-subscriber throughput limit,
# https://cloud.google.com/pubsub/quotas
# Increasing the pool size will increase the throughput of the beat until
# a different quota is hit.
subscription.connection_pool_size: 1

### JSON Configuration

# Whether to decode the Pub/Sub message as a JSON message.
Expand Down
8 changes: 8 additions & 0 deletions beater/pubsubbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
return nil, err
}

connectionPoolSize := config.Subscription.ConnectionPoolSize
subscription.ReceiveSettings.NumGoroutines = connectionPoolSize

if connectionPoolSize == 1 {
logger.Warnf("Pub/Sub streaming pull has a per-subscriber throughput limit, https://cloud.google.com/pubsub/quotas")
logger.Warnf("Use `subscription.connection_pool_size` to increase the numnber of subscribers.")
}

bt := &Pubsubbeat{
done: make(chan struct{}),
config: config,
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// Name of this beat
var Name = "pubsubbeat"
var settings = instance.Settings{
Name: Name,
Name: Name,
}

// RootCmd to handle beats cli
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Config struct {
RetainAckedMessages bool `config:"retain_acked_messages"`
RetentionDuration time.Duration `config:"retention_duration"`
Create bool `config:"create"`

// Settings for the Pub/Sub receiver
ConnectionPoolSize int `config:"connection_pool_size"`
}
Json struct {
Enabled bool `config:"enabled"`
Expand All @@ -46,6 +49,7 @@ type Config struct {

func GetDefaultConfig() Config {
config := Config{}
config.Subscription.ConnectionPoolSize = 1
config.Subscription.Create = true
config.Json.FieldsTimestampName = "@timestamp"
return config
Expand All @@ -67,6 +71,10 @@ func GetAndValidateConfig(cfg *common.Config) (*Config, error) {
return nil, fmt.Errorf("retention_duration cannot be longer than 7 days")
}

if cxns := c.Subscription.ConnectionPoolSize; cxns < 1 {
return nil, fmt.Errorf("Connection pool size must be >= 1, got: %d", cxns)
}

if c.CredentialsFile != "" {
if _, err := os.Stat(c.CredentialsFile); os.IsNotExist(err) {
return nil, fmt.Errorf("cannot find the credentials_file %q", c.CredentialsFile)
Expand Down
16 changes: 15 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,44 @@ func TestGetAndValidateConfigSubscriptionConfig(t *testing.T) {
cases := map[string]struct {
RetainAckedMessages bool
RetentionDuration string
ExpectError bool
ConnectionPoolSize int64

ExpectError bool
}{
"168h retention and keep acked messages": {
RetainAckedMessages: true,
RetentionDuration: "168h",
ConnectionPoolSize: 1,
ExpectError: false,
},
"10m retention and don't keep acked messages": {
RetainAckedMessages: false,
RetentionDuration: "10m",
ConnectionPoolSize: 1,
ExpectError: false,
},
"retention period invalid format": {
RetainAckedMessages: true,
RetentionDuration: "1d", // Duration should be in hours or minutes.
ConnectionPoolSize: 1,
ExpectError: true,
},
"retention period too short": {
RetainAckedMessages: true,
RetentionDuration: "9m",
ConnectionPoolSize: 1,
ExpectError: true,
},
"retention period too long": {
RetainAckedMessages: true,
RetentionDuration: "168h1m",
ConnectionPoolSize: 1,
ExpectError: true,
},
"zero connections": {
RetainAckedMessages: true,
RetentionDuration: "10m",
ConnectionPoolSize: 0,
ExpectError: true,
},
}
Expand All @@ -116,6 +129,7 @@ func TestGetAndValidateConfigSubscriptionConfig(t *testing.T) {
sConfig, _ := c.Child("subscription", -1)
sConfig.SetBool("retain_acked_messages", -1, tc.RetainAckedMessages)
sConfig.SetString("retention_duration", -1, tc.RetentionDuration)
sConfig.SetInt("connection_pool_size", -1, tc.ConnectionPoolSize)

conf, err := GetAndValidateConfig(c)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/go-sourcemap/sourcemap v2.1.2+incompatible // indirect
github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.4.0 // indirect
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/gorilla/mux v1.7.3 // indirect
Expand Down
7 changes: 7 additions & 0 deletions pubsubbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ pubsubbeat:
# Valid time units are "m" and "h". You can also compose them: "2h30m".
subscription.retention_duration: 168h # Defaults to 7 days

# How many simultaneous connections the beat will establish to the Pub/Sub endpoint.
# Pub/Sub streaming pull has a per-subscriber throughput limit,
# https://cloud.google.com/pubsub/quotas
# Increasing the pool size will increase the throughput of the beat until
# a different quota is hit.
subscription.connection_pool_size: 1

### JSON Configuration

# Whether to decode the Pub/Sub message as a JSON message.
Expand Down
7 changes: 7 additions & 0 deletions pubsubbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ pubsubbeat:
# Valid time units are "m" and "h". You can also compose them: "2h30m".
subscription.retention_duration: 168h # Defaults to 7 days

# How many simultaneous connections the beat will establish to the Pub/Sub endpoint.
# Pub/Sub streaming pull has a per-subscriber throughput limit,
# https://cloud.google.com/pubsub/quotas
# Increasing the pool size will increase the throughput of the beat until
# a different quota is hit.
subscription.connection_pool_size: 1

### JSON Configuration

# Whether to decode the Pub/Sub message as a JSON message.
Expand Down

0 comments on commit e16881b

Please sign in to comment.