Skip to content

Commit

Permalink
Add an adapter for converting batch job IDs to log name IDs (#295)
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Siegieda authored Nov 1, 2018
1 parent 2c7ee02 commit dc4beb4
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 1 deletion.
43 changes: 43 additions & 0 deletions models/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
//go:generate mockgen -source=batch.go -package=models -destination=batch_mock.go

import (
"context"
"time"

"github.com/jinzhu/gorm"
Expand All @@ -11,8 +12,10 @@ import (
type BatchRepo interface {
AddEvent(batchJob BatchJob, event BatchJobEvent) error
New(batchID string) BatchJob
GetLogName(batchID string) (logName string, err error)
SetLogName(id string, logName string) error
ActiveJobsWithoutLogs(time.Time) ([]BatchJob, error)
HasStarted(batchID string) (started bool, err error)
}

const (
Expand Down Expand Up @@ -45,13 +48,53 @@ func (repo *batchRepo) New(batchID string) BatchJob {
return batchJob
}

// AwaitStarted polls the BatchRepo's DB for the state of the batch job
// associated with a given ID. It blocks until the batch job has started, unless
// an error occurs.
func BatchAwaitStarted(ctx context.Context, repo BatchRepo, batchID string, pollPeriod time.Duration) error {
for {
select {
case <-time.After(pollPeriod):
started, err := repo.HasStarted(batchID)
if err != nil {
return err
}
if started {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// HasStarted returns if the build has started.
func (repo *batchRepo) HasStarted(batchID string) (bool, error) {
var batchJob BatchJob
err := repo.db.Preload("Events").Where("batch_id = ?", batchID).First(&batchJob).Error
if err != nil {
return false, err
}
return hasStarted(batchJob.Status()), nil
}

// AddEvent adds an event to the batch service.
func (repo *batchRepo) AddEvent(batchJob BatchJob, event BatchJobEvent) error {
db := repo.db
err := db.Model(&batchJob).Association("Events").Append(event).Error
return err
}

// GetLogName takes a BatchJob ID and returns that BatchJob's logname if present
func (repo *batchRepo) GetLogName(id string) (string, error) {
var batchJob BatchJob
err := repo.db.Where("batch_id = ?", id).First(&batchJob).Error
if err != nil {
return "", err
}
return batchJob.LogName, nil
}

func (repo *batchRepo) SetLogName(id string, logName string) error {
batchJob := BatchJob{}
err := repo.db.Where("batch_id = ?", id).First(&batchJob).Error
Expand Down
72 changes: 71 additions & 1 deletion models/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package models

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -53,6 +54,75 @@ func TestBatchSetLogName(t *testing.T) {
})
}

func TestBatchGetLogName(t *testing.T) {
RunTransaction(func(db *gorm.DB) {
d := BatchDataSource(db)
batch := BatchJob{
BatchID: "foo",
LogName: "foobarLogName",
}
db.Create(&batch)
returned, err := d.GetLogName(batch.BatchID)
if err != nil {
t.Error(err)
}
if batch.LogName != returned {
t.Fatalf("Failed to get batch job's log name. Expected: %v Got: %v \n", batch.LogName, returned)
return
}
})
}

func TestBatchAwaitStarted(t *testing.T) {
RunTransaction(func(db *gorm.DB) {
d := BatchDataSource(db)

batch := BatchJob{
BatchID: "foo",
}
db.Create(&batch)

err := d.AddEvent(batch, BatchJobEvent{
BatchJobID: batch.ID,
Status: StatusStarted,
})
if err != nil {
t.Error(err)
}

ctxtimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

err = BatchAwaitStarted(ctxtimeout, d, batch.BatchID, 100*time.Microsecond)
if err != nil {
t.Error(err)
}
})
}

func TestBatchHasStarted(t *testing.T) {
RunTransaction(func(db *gorm.DB) {
d := BatchDataSource(db)
batch := BatchJob{
BatchID: "foo",
Events: []BatchJobEvent{
BatchJobEvent{
Status: StatusStarted,
},
},
}
db.Create(&batch)

started, err := d.HasStarted(batch.BatchID)
if err != nil {
t.Error(err)
}
if !started {
t.Error("BatchJob with started event is not considered started")
}
})
}

func TestBatchActiveJobsWithoutLogs(t *testing.T) {
RunTransaction(func(db *gorm.DB) {
d := BatchDataSource(db)
Expand Down Expand Up @@ -131,7 +201,7 @@ func TestBatchActiveJobsWithoutLogsWithLogs(t *testing.T) {
}

if len(batchJobs) != 0 {
t.Fatal("Expected 0 batch jobs, got %s", len(batchJobs))
t.Fatalf("Expected 0 batch jobs, got %v", len(batchJobs))
return
}

Expand Down
68 changes: 68 additions & 0 deletions service/batchtologid/batchtologid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// package batchtologid takes an AWS Batch Job ID and returns the associated AWS CloudWatch Log Name
package batchtologid

import (
"context"
"errors"
"fmt"
"log"
"time"

"github.com/ReconfigureIO/platform/models"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/batch"
)

type Adapter struct {
BatchRepo models.BatchRepo
AWS interface {
DescribeJobs(
*batch.DescribeJobsInput,
) (
*batch.DescribeJobsOutput,
error,
)
}
PollingPeriod time.Duration
}

// Do takes a batch job ID and returns the log name associated with that job. It
// attempts to do this by querying batchRepo. It first waits for the batch job
// to become started, which is a blocking operation. It then queries the batch
// repo for the log name. If this is not available, it queries AWS for the log
// name.
func (a *Adapter) Do(ctx context.Context, batchID string) (string, error) {
err := models.BatchAwaitStarted(ctx, a.BatchRepo, batchID, a.PollingPeriod)
if err != nil {
return "", err
}

logname, err := a.BatchRepo.GetLogName(batchID)
if err != nil {
log.Printf("bidToLid: BatchRepo.GetLogName: %v \n", err)
return "", err
}
if logname != "" {
return logname, nil
}

resp, err := a.AWS.DescribeJobs(&batch.DescribeJobsInput{
Jobs: aws.StringSlice([]string{batchID}),
})
if err != nil {
return "", err
}
if len(resp.Jobs) == 0 {
return "", fmt.Errorf("bidToLid: There is no AWS Batch Job with ID %v", batchID)
}

if resp.Jobs[0].Container.LogStreamName == nil {
return "", errors.New("BatchToLogID: Adapter.Do: Got nil LogStreamName from AWS Batch")
}
err = a.BatchRepo.SetLogName(batchID, *resp.Jobs[0].Container.LogStreamName)
if err != nil {
log.Printf("bidToLid: BatchRepo.SetLogName: %v \n", err)
}
return *resp.Jobs[0].Container.LogStreamName, nil

}
55 changes: 55 additions & 0 deletions service/batchtologid/batchtologid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package batchtologid

import (
"context"
"testing"
"time"

"github.com/ReconfigureIO/platform/models"
"github.com/aws/aws-sdk-go/service/batch"
"github.com/golang/mock/gomock"
)

var logName = "foobarLogName"
var batchID = "foobarBatchID"

type fakeAWS struct{}

func (aws *fakeAWS) DescribeJobs(input *batch.DescribeJobsInput) (*batch.DescribeJobsOutput, error) {
return &batch.DescribeJobsOutput{
Jobs: []*batch.JobDetail{
&batch.JobDetail{
Container: &batch.ContainerDetail{
LogStreamName: &logName,
},
},
},
}, nil
}

func TestBatchToLogID(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

batchRepo := models.NewMockBatchRepo(mockCtrl)
batchRepo.EXPECT().HasStarted(batchID).Return(true, nil)
batchRepo.EXPECT().GetLogName(batchID).Return("", nil)
batchRepo.EXPECT().SetLogName(batchID, logName).Return(nil)

b2l := Adapter{
BatchRepo: batchRepo,
AWS: &fakeAWS{},
PollingPeriod: time.Microsecond,
}

ctxtimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

returned, err := b2l.Do(ctxtimeout, batchID)
if err != nil {
t.Error(err)
}
if returned != logName {
t.Errorf("Returned log name did not match expected value. Returned: %v Expected: %v \n", returned, logName)
}
}

0 comments on commit dc4beb4

Please sign in to comment.