From dc4beb4982ec8540a66ecdcaa968ef09f99a0486 Mon Sep 17 00:00:00 2001 From: Max Siegieda Date: Thu, 1 Nov 2018 12:45:24 +0000 Subject: [PATCH] Add an adapter for converting batch job IDs to log name IDs (#295) --- models/batch.go | 43 ++++++++++++++ models/batch_test.go | 72 ++++++++++++++++++++++- service/batchtologid/batchtologid.go | 68 +++++++++++++++++++++ service/batchtologid/batchtologid_test.go | 55 +++++++++++++++++ 4 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 service/batchtologid/batchtologid.go create mode 100644 service/batchtologid/batchtologid_test.go diff --git a/models/batch.go b/models/batch.go index e1133954..9293f5c3 100644 --- a/models/batch.go +++ b/models/batch.go @@ -3,6 +3,7 @@ package models //go:generate mockgen -source=batch.go -package=models -destination=batch_mock.go import ( + "context" "time" "github.com/jinzhu/gorm" @@ -11,8 +12,10 @@ import ( type BatchRepo interface { AddEvent(batchJob BatchJob, event BatchJobEvent) error New(batchID string) BatchJob + GetLogName(batchID string) (logName string, err error) SetLogName(id string, logName string) error ActiveJobsWithoutLogs(time.Time) ([]BatchJob, error) + HasStarted(batchID string) (started bool, err error) } const ( @@ -45,6 +48,36 @@ func (repo *batchRepo) New(batchID string) BatchJob { return batchJob } +// AwaitStarted polls the BatchRepo's DB for the state of the batch job +// associated with a given ID. It blocks until the batch job has started, unless +// an error occurs. +func BatchAwaitStarted(ctx context.Context, repo BatchRepo, batchID string, pollPeriod time.Duration) error { + for { + select { + case <-time.After(pollPeriod): + started, err := repo.HasStarted(batchID) + if err != nil { + return err + } + if started { + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// HasStarted returns if the build has started. +func (repo *batchRepo) HasStarted(batchID string) (bool, error) { + var batchJob BatchJob + err := repo.db.Preload("Events").Where("batch_id = ?", batchID).First(&batchJob).Error + if err != nil { + return false, err + } + return hasStarted(batchJob.Status()), nil +} + // AddEvent adds an event to the batch service. func (repo *batchRepo) AddEvent(batchJob BatchJob, event BatchJobEvent) error { db := repo.db @@ -52,6 +85,16 @@ func (repo *batchRepo) AddEvent(batchJob BatchJob, event BatchJobEvent) error { return err } +// GetLogName takes a BatchJob ID and returns that BatchJob's logname if present +func (repo *batchRepo) GetLogName(id string) (string, error) { + var batchJob BatchJob + err := repo.db.Where("batch_id = ?", id).First(&batchJob).Error + if err != nil { + return "", err + } + return batchJob.LogName, nil +} + func (repo *batchRepo) SetLogName(id string, logName string) error { batchJob := BatchJob{} err := repo.db.Where("batch_id = ?", id).First(&batchJob).Error diff --git a/models/batch_test.go b/models/batch_test.go index aa2f81ac..77211b0d 100644 --- a/models/batch_test.go +++ b/models/batch_test.go @@ -3,6 +3,7 @@ package models import ( + "context" "reflect" "testing" "time" @@ -53,6 +54,75 @@ func TestBatchSetLogName(t *testing.T) { }) } +func TestBatchGetLogName(t *testing.T) { + RunTransaction(func(db *gorm.DB) { + d := BatchDataSource(db) + batch := BatchJob{ + BatchID: "foo", + LogName: "foobarLogName", + } + db.Create(&batch) + returned, err := d.GetLogName(batch.BatchID) + if err != nil { + t.Error(err) + } + if batch.LogName != returned { + t.Fatalf("Failed to get batch job's log name. Expected: %v Got: %v \n", batch.LogName, returned) + return + } + }) +} + +func TestBatchAwaitStarted(t *testing.T) { + RunTransaction(func(db *gorm.DB) { + d := BatchDataSource(db) + + batch := BatchJob{ + BatchID: "foo", + } + db.Create(&batch) + + err := d.AddEvent(batch, BatchJobEvent{ + BatchJobID: batch.ID, + Status: StatusStarted, + }) + if err != nil { + t.Error(err) + } + + ctxtimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + err = BatchAwaitStarted(ctxtimeout, d, batch.BatchID, 100*time.Microsecond) + if err != nil { + t.Error(err) + } + }) +} + +func TestBatchHasStarted(t *testing.T) { + RunTransaction(func(db *gorm.DB) { + d := BatchDataSource(db) + batch := BatchJob{ + BatchID: "foo", + Events: []BatchJobEvent{ + BatchJobEvent{ + Status: StatusStarted, + }, + }, + } + db.Create(&batch) + + started, err := d.HasStarted(batch.BatchID) + if err != nil { + t.Error(err) + } + if !started { + t.Error("BatchJob with started event is not considered started") + } + }) +} + func TestBatchActiveJobsWithoutLogs(t *testing.T) { RunTransaction(func(db *gorm.DB) { d := BatchDataSource(db) @@ -131,7 +201,7 @@ func TestBatchActiveJobsWithoutLogsWithLogs(t *testing.T) { } if len(batchJobs) != 0 { - t.Fatal("Expected 0 batch jobs, got %s", len(batchJobs)) + t.Fatalf("Expected 0 batch jobs, got %v", len(batchJobs)) return } diff --git a/service/batchtologid/batchtologid.go b/service/batchtologid/batchtologid.go new file mode 100644 index 00000000..1bbb4295 --- /dev/null +++ b/service/batchtologid/batchtologid.go @@ -0,0 +1,68 @@ +// package batchtologid takes an AWS Batch Job ID and returns the associated AWS CloudWatch Log Name +package batchtologid + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/ReconfigureIO/platform/models" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/batch" +) + +type Adapter struct { + BatchRepo models.BatchRepo + AWS interface { + DescribeJobs( + *batch.DescribeJobsInput, + ) ( + *batch.DescribeJobsOutput, + error, + ) + } + PollingPeriod time.Duration +} + +// Do takes a batch job ID and returns the log name associated with that job. It +// attempts to do this by querying batchRepo. It first waits for the batch job +// to become started, which is a blocking operation. It then queries the batch +// repo for the log name. If this is not available, it queries AWS for the log +// name. +func (a *Adapter) Do(ctx context.Context, batchID string) (string, error) { + err := models.BatchAwaitStarted(ctx, a.BatchRepo, batchID, a.PollingPeriod) + if err != nil { + return "", err + } + + logname, err := a.BatchRepo.GetLogName(batchID) + if err != nil { + log.Printf("bidToLid: BatchRepo.GetLogName: %v \n", err) + return "", err + } + if logname != "" { + return logname, nil + } + + resp, err := a.AWS.DescribeJobs(&batch.DescribeJobsInput{ + Jobs: aws.StringSlice([]string{batchID}), + }) + if err != nil { + return "", err + } + if len(resp.Jobs) == 0 { + return "", fmt.Errorf("bidToLid: There is no AWS Batch Job with ID %v", batchID) + } + + if resp.Jobs[0].Container.LogStreamName == nil { + return "", errors.New("BatchToLogID: Adapter.Do: Got nil LogStreamName from AWS Batch") + } + err = a.BatchRepo.SetLogName(batchID, *resp.Jobs[0].Container.LogStreamName) + if err != nil { + log.Printf("bidToLid: BatchRepo.SetLogName: %v \n", err) + } + return *resp.Jobs[0].Container.LogStreamName, nil + +} diff --git a/service/batchtologid/batchtologid_test.go b/service/batchtologid/batchtologid_test.go new file mode 100644 index 00000000..62946adc --- /dev/null +++ b/service/batchtologid/batchtologid_test.go @@ -0,0 +1,55 @@ +package batchtologid + +import ( + "context" + "testing" + "time" + + "github.com/ReconfigureIO/platform/models" + "github.com/aws/aws-sdk-go/service/batch" + "github.com/golang/mock/gomock" +) + +var logName = "foobarLogName" +var batchID = "foobarBatchID" + +type fakeAWS struct{} + +func (aws *fakeAWS) DescribeJobs(input *batch.DescribeJobsInput) (*batch.DescribeJobsOutput, error) { + return &batch.DescribeJobsOutput{ + Jobs: []*batch.JobDetail{ + &batch.JobDetail{ + Container: &batch.ContainerDetail{ + LogStreamName: &logName, + }, + }, + }, + }, nil +} + +func TestBatchToLogID(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + batchRepo := models.NewMockBatchRepo(mockCtrl) + batchRepo.EXPECT().HasStarted(batchID).Return(true, nil) + batchRepo.EXPECT().GetLogName(batchID).Return("", nil) + batchRepo.EXPECT().SetLogName(batchID, logName).Return(nil) + + b2l := Adapter{ + BatchRepo: batchRepo, + AWS: &fakeAWS{}, + PollingPeriod: time.Microsecond, + } + + ctxtimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + returned, err := b2l.Do(ctxtimeout, batchID) + if err != nil { + t.Error(err) + } + if returned != logName { + t.Errorf("Returned log name did not match expected value. Returned: %v Expected: %v \n", returned, logName) + } +}