Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enables pull subscription #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
{
"[go]": {
"editor.defaultFormatter": "golang.go",
"editor.insertSpaces": true,
"editor.formatOnSave": true,
"editor.rulers": [88],
"editor.codeActionsOnSave": {
"source.organizeImports": true
},
"editor.suggest.snippetsPreventQuickSuggestions": false,
"[go]": {
"editor.defaultFormatter": "golang.go",
"editor.insertSpaces": true,
"editor.formatOnSave": true,
"editor.rulers": [88],
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
},
"gopls": {
"ui.semanticTokens": true,
"ui.diagnostic.staticcheck": true
},
"go.coverOnSave": true,
"go.coverageDecorator": {
"type": "gutter",
"coveredHighlightColor": "rgba(64,128,128,0.5)",
"uncoveredHighlightColor": "rgba(128,64,64,0.25)",
"coveredGutterStyle": "blockgreen",
"uncoveredGutterStyle": "blockred"
},
"go.coverOnSingleTest": true,
"go.lintTool": "staticcheck",
"editor.suggest.snippetsPreventQuickSuggestions": false
},
"gopls": {
"ui.semanticTokens": true,
"ui.diagnostic.staticcheck": true
},
"go.coverOnSave": true,
"go.coverageDecorator": {
"type": "gutter",
"coveredHighlightColor": "rgba(64,128,128,0.5)",
"uncoveredHighlightColor": "rgba(128,64,64,0.25)",
"coveredGutterStyle": "blockgreen",
"uncoveredGutterStyle": "blockred"
},
"go.coverOnSingleTest": true,
"go.lintTool": "staticcheck"
}
10 changes: 3 additions & 7 deletions app/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,10 @@ func extractSubscriptions(container docker.Container) []pubsub.Subscription {
}
}

// Convert map to slice, only consider valid subscriptions
for _, subscription := range subscriptionMap {
if subscription.Topic != "" && subscription.Endpoint != "" {
// Only include subscriptions with both topic and endpoint populated
subscriptions = append(subscriptions, *subscription)
} else {
log.Warnf("skipping incomplete subscription: %s, both topic and endpoint must be provided\n", subscription.Name)
}

subscriptions = append(subscriptions, *subscription)

}

return subscriptions
Expand Down
13 changes: 12 additions & 1 deletion pubsub/pubsub.go
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if no endpoint is specified, no subscription should be created at all.

Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,18 @@ func (ps *pubSubImpl) CreateSubscription(ctx context.Context, subscription Subsc
func (ps *pubSubImpl) createSubscription(ctx context.Context, topic *gcps.Topic, subscription Subscription) error {
log := ps.log.WithField("subscription_id", subscription.GetSubscriptionID()).WithField("topic", subscription.Topic).WithField("endpoint", subscription.Endpoint)

_, err := ps.client.CreateSubscription(ctx, subscription.GetSubscriptionID(), createSubscriptionConfig(topic, subscription))
cfg := createSubscriptionConfig(topic, subscription)

// Determine if subscription is push or pull and set configuration accordingly
if subscription.Endpoint == "" {
cfg.PushConfig = gcps.PushConfig{}
} else {
cfg.PushConfig = gcps.PushConfig{
Endpoint: subscription.Endpoint,
}
}

_, err := ps.client.CreateSubscription(ctx, subscription.GetSubscriptionID(), cfg)
if err != nil {
log.WithError(err).Error("error creating subscription")
return err
Expand All @@ -105,6 +115,7 @@ func (ps *pubSubImpl) createSubscription(ctx context.Context, topic *gcps.Topic,
log.Debug("subscription created")

return nil

}

// func (ps *pubSubImpl) updateSubscription(ctx context.Context, sub *gcps.Subscription, subscription Subscription) error {
Expand Down