-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Pub/Sub to wait for operation completion #102
base: master
Are you sure you want to change the base?
Changes from 10 commits
0027055
992dd90
fa4511d
0fa972c
c8cb8ed
0ce1371
68e307f
5bbd152
1c07b85
29ccb4c
702e3ec
a889867
abe13b4
0b54845
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,16 @@ package watch | |
|
||
import ( | ||
"context" | ||
"crypto/rand" | ||
"encoding/binary" | ||
"encoding/json" | ||
"errors" | ||
"flag" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"cloud.google.com/go/pubsub" | ||
"github.com/googlegenomics/pipelines-tools/pipelines/internal/common" | ||
genomics "google.golang.org/api/genomics/v2alpha1" | ||
) | ||
|
@@ -32,16 +36,20 @@ var ( | |
|
||
actions = flags.Bool("actions", false, "show action details") | ||
details = flags.Bool("details", false, "show event details") | ||
topic = flags.String("topic", "", "the Pub/Sub topic to watch") | ||
) | ||
|
||
func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error { | ||
names := common.ParseFlags(flags, arguments) | ||
if len(names) < 1 { | ||
return errors.New("missing operation name") | ||
} | ||
if *topic == "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will still want to support polling and I think this doesn't need to be passed: when the operation data is fetched initially if there is a topic we'll start listening instead of polling. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
return errors.New("missing Pub/Sub topic") | ||
} | ||
|
||
name := common.ExpandOperationName(project, names[0]) | ||
result, err := watch(ctx, service, name) | ||
result, err := watch(ctx, service, project, name, *topic) | ||
if err != nil { | ||
return fmt.Errorf("watching pipeline: %v", err) | ||
} | ||
|
@@ -54,26 +62,52 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu | |
return nil | ||
} | ||
|
||
func watch(ctx context.Context, service *genomics.Service, name string) (interface{}, error) { | ||
func watch(ctx context.Context, service *genomics.Service, project, name, topic string) (interface{}, error) { | ||
sub, err := newPubSubSubscription(ctx, project, topic) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating Pub/Sub subscription: %v", err) | ||
} | ||
defer sub.Delete(ctx) | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
|
||
var events []*genomics.Event | ||
const initialDelay = 5 * time.Second | ||
delay := initialDelay | ||
for { | ||
var response interface{} | ||
var receiverErr error | ||
var receiverLock sync.Mutex | ||
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { | ||
receiverLock.Lock() | ||
defer receiverLock.Unlock() | ||
m.Ack() | ||
|
||
exit := func(r interface{}, err error) { | ||
if ctx.Err() != nil { | ||
return | ||
} | ||
response = r | ||
receiverErr = err | ||
cancel() | ||
} | ||
|
||
lro, err := service.Projects.Operations.Get(name).Context(ctx).Do() | ||
if err != nil { | ||
return nil, fmt.Errorf("getting operation status: %v", err) | ||
exit(nil, fmt.Errorf("getting operation status: %v", err)) | ||
return | ||
} | ||
|
||
var metadata genomics.Metadata | ||
if err := json.Unmarshal(lro.Metadata, &metadata); err != nil { | ||
return nil, fmt.Errorf("parsing metadata: %v", err) | ||
exit(nil, fmt.Errorf("parsing metadata: %v", err)) | ||
return | ||
} | ||
|
||
if *actions { | ||
*actions = false | ||
encoded, err := json.MarshalIndent(metadata.Pipeline.Actions, "", " ") | ||
if err != nil { | ||
return nil, fmt.Errorf("encoding actions: %v", err) | ||
exit(nil, fmt.Errorf("encoding actions: %v", err)) | ||
return | ||
} | ||
fmt.Printf("%s\n", encoded) | ||
} | ||
|
@@ -88,20 +122,40 @@ func watch(ctx context.Context, service *genomics.Service, name string) (interfa | |
} | ||
} | ||
events = metadata.Events | ||
delay = initialDelay | ||
} | ||
|
||
if lro.Done { | ||
if lro.Error != nil { | ||
return lro.Error, nil | ||
exit(lro.Error, nil) | ||
return | ||
} | ||
return lro.Response, nil | ||
exit(lro.Response, nil) | ||
} | ||
}) | ||
if err != nil && err != context.Canceled { | ||
return nil, fmt.Errorf("receiving message: %v", err) | ||
} | ||
return response, receiverErr | ||
} | ||
|
||
time.Sleep(delay) | ||
delay = time.Duration(float64(delay) * 1.5) | ||
if limit := time.Minute; delay > limit { | ||
delay = limit | ||
} | ||
func newPubSubSubscription(ctx context.Context, projectID, topicName string) (*pubsub.Subscription, error) { | ||
client, err := pubsub.NewClient(ctx, projectID) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating a Pub/Sub client: %v", err) | ||
} | ||
|
||
var id uint64 | ||
if err := binary.Read(rand.Reader, binary.LittleEndian, &id); err != nil { | ||
return nil, fmt.Errorf("generating subscription name: %v", err) | ||
} | ||
|
||
sub, err := client.CreateSubscription(ctx, fmt.Sprintf("s%d", id), pubsub.SubscriptionConfig{ | ||
Topic: client.Topic(topicName), | ||
AckDeadline: 10 * time.Second, | ||
ExpirationPolicy: 25 * time.Hour, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating subscription: %v", err) | ||
} | ||
return sub, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to build topics in the tooling - gcloud can be used for that - I think we just want to support an optional flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is one feature that should be enabled by default. I changed the functionality to always use projects//topics/pipelines-tool if only it has the lable created-by:pipelines-tool. If the tool has any problem in using Pub/Sub it will silently switch to long pooling. Also the user can specify --pub-sub=false to opt for long pooling.