Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
feat: Engine logs for specific workflow runs (#379)
Browse files Browse the repository at this point in the history
* added run-id flag for logs engine

* Initial implementation of run-id filtering for workflow engine logs

* Added unit tests for GetEngineLogByRunId

* Handle wes adapter time edge case and cli log stream processing race condition where the engine process is running but the log stream doesn't exist yet.
PR feedback

* Switch to isServerProcessEngine as most engines are batch job processes so we only need to track the exceptions

* allow output of both STDOUT and STDERR from engines
  • Loading branch information
markjschreiber authored Mar 28, 2022
1 parent a1c2d30 commit 8bda674
Show file tree
Hide file tree
Showing 10 changed files with 1,257 additions and 1,693 deletions.
2,499 changes: 833 additions & 1,666 deletions packages/cdk/npm-shrinkwrap.json

Large diffs are not rendered by default.

17 changes: 13 additions & 4 deletions packages/cli/internal/pkg/cli/context/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"reflect"

"github.com/aws/amazon-genomics-cli/internal/pkg/cli/spec"
"github.com/aws/amazon-genomics-cli/internal/pkg/constants"
)

var serverProcessEngines = map[string]bool{constants.CROMWELL: true}

type Summary struct {
Name string
MaxVCpus int
Expand All @@ -14,8 +17,14 @@ type Summary struct {
Engines []spec.Engine
}

func (i Summary) IsEmpty() bool {
return reflect.ValueOf(i).IsZero()
func (s Summary) IsEmpty() bool {
return reflect.ValueOf(s).IsZero()
}

//IsServerProcessEngine Does the workflow engine run as a server process? A server process engine has one to many
// mapping with workflow runs. The engine can be used to run multiple workflows and the process is re-used and long running.
func (s *Summary) IsServerProcessEngine() bool {
return serverProcessEngines[s.Engines[0].Engine]
}

type Detail struct {
Expand All @@ -36,6 +45,6 @@ type Instance struct {
IsDefinedInProjectFile bool
}

func (i Detail) IsEmpty() bool {
return reflect.ValueOf(i).IsZero()
func (d Detail) IsEmpty() bool {
return reflect.ValueOf(d).IsZero()
}
41 changes: 41 additions & 0 deletions packages/cli/internal/pkg/cli/context/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package context
import (
"testing"

"github.com/aws/amazon-genomics-cli/internal/pkg/cli/spec"
"github.com/aws/amazon-genomics-cli/internal/pkg/constants"
"github.com/stretchr/testify/assert"
)

Expand All @@ -25,3 +27,42 @@ func TestDetail_IsNotEmpty(t *testing.T) {
detail := Detail{WesUrl: "amazon.com"}
assert.False(t, detail.IsEmpty())
}

func TestSummary_IsServerProcessEngine(t *testing.T) {
tests := map[string]struct {
engineName string
expect bool
}{
"otherIsNotAServer": {
engineName: "other",
expect: false,
},

"cromwellIsAServer": {
engineName: constants.CROMWELL,
expect: true,
},

"snakeMakeIsNotAServer": {
engineName: constants.SNAKEMAKE,
expect: false,
},
"nextFlowIsNotAServer": {
engineName: constants.NEXTFLOW,
expect: false,
},
"miniwdlIsNotAServer": {
engineName: constants.MINIWDL,
expect: false,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
engine := spec.Engine{Engine: test.engineName}
summary := Summary{Engines: []spec.Engine{engine}}
actual := summary.IsServerProcessEngine()
assert.Equal(t, test.expect, actual)
})
}
}
13 changes: 11 additions & 2 deletions packages/cli/internal/pkg/cli/logs_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cli
import (
ctx "context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -130,7 +131,7 @@ func (o *logsSharedOpts) displayEventFromChannel(channel <-chan cwl.StreamEvent)

for event := range channel {
if event.Err != nil {
return event.Err
return handleLogStreamRaceCondition(event.Err)
}
if len(event.Logs) > 0 {
for _, line := range event.Logs {
Expand Down Expand Up @@ -230,8 +231,16 @@ func (o *logsSharedOpts) displayLogStreams(logGroupName string, startTime, endTi
for _, stream := range streams {
err := o.displayLogGroup(logGroupName, startTime, endTime, filter, stream)
if err != nil {
return err
return handleLogStreamRaceCondition(err)
}
}
return nil
}

func handleLogStreamRaceCondition(err error) error {
if strings.Contains(err.Error(), "ResourceNotFoundException: The specified log stream does not exist") {
log.Warn().Msgf("The logging process has started but the log stream is not yet created, please wait and try again.")
return nil
}
return err
}
101 changes: 93 additions & 8 deletions packages/cli/internal/pkg/cli/logs_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,34 @@
package cli

import (
"fmt"

"github.com/aws/amazon-genomics-cli/internal/pkg/aws"
"github.com/aws/amazon-genomics-cli/internal/pkg/cli/clierror"
"github.com/aws/amazon-genomics-cli/internal/pkg/cli/clierror/actionableerror"
"github.com/aws/amazon-genomics-cli/internal/pkg/cli/context"
"github.com/aws/amazon-genomics-cli/internal/pkg/cli/workflow"
"github.com/aws/amazon-genomics-cli/internal/pkg/constants"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)

const (
runIdFlag = "run-id"
runIdShort = "r"
runIdDescription = "filter to engine logs to this workflow run id"
runIdDefault = ""
)

type logsEngineVars struct {
logsSharedVars
workflowRunId string
}

type logsEngineOpts struct {
logsEngineVars
logsSharedOpts
workflowManager *workflow.Manager
}

func newLogsEngineOpts(vars logsEngineVars) (*logsEngineOpts, error) {
Expand All @@ -27,6 +41,7 @@ func newLogsEngineOpts(vars logsEngineVars) (*logsEngineOpts, error) {
ctxManager: context.NewManager(profile),
cwlClient: aws.CwlClient(profile),
},
workflowManager: workflow.NewManager(profile),
}, nil
}

Expand All @@ -35,6 +50,27 @@ func (o *logsEngineOpts) Validate() error {
return err
}

ctxMap, err := o.ctxManager.List()
if err != nil {
return err
}

summary := ctxMap[o.contextName]
engine := summary.Engines[0].Engine

if o.workflowRunId == "" {
if !summary.IsServerProcessEngine() {
log.Warn().Msgf("DEPRECATION WARNING!!")
log.Warn().Msgf("Obtaining engine logs for a workflow context where the engine is '%s' will return engine logs from ALL workflows run in this context",
engine)
log.Warn().Msgf("Specifying a run-id will be MANDATORY in future versions")
log.Warn().Msgf("Please run the command again with -r <run-id>")
}
} else if engine == constants.CROMWELL {
return actionableerror.New(fmt.Errorf("use of -%s (--%s) flag with Cromwell is invalid", runIdShort, runIdFlag),
"Cromwell doesn't currently support engine logs for specific runs, displaying complete log")
}

return o.parseTime(o.logsSharedVars)
}

Expand All @@ -45,24 +81,65 @@ func (o *logsEngineOpts) Execute() error {
}

logGroupName := contextInfo.EngineLogGroupName
log.Debug().Msgf("Engine log group name: '%s'", logGroupName)

if o.workflowRunId == "" {
return executeGetEngineLogForWholeGroup(o, logGroupName)
}
return executeGetEngineLogForRunId(o, logGroupName)
}

func executeGetEngineLogForWholeGroup(o *logsEngineOpts, logGroupName string) error {
if o.tail {
err = o.followLogGroup(logGroupName)
} else {
err = o.displayLogGroup(logGroupName, o.startTime, o.endTime, o.filter)
return o.followLogGroup(logGroupName)
}
return o.displayLogGroup(logGroupName, o.startTime, o.endTime, o.filter)
}

func executeGetEngineLogForRunId(o *logsEngineOpts, logGroupName string) error {
log.Info().Msgf("Getting log stream for workflow run '%s'", o.workflowRunId)

workflowRunLog, err := o.workflowManager.GetEngineLogByRunId(o.workflowRunId)
if err != nil {
return err
}

logStreamNames := streamNamesFromRunLog(workflowRunLog)
log.Debug().Msgf("Log stream name is: '%v'", logStreamNames)

if len(logStreamNames) > 0 {
if o.tail {
return o.followLogStreams(logGroupName, logStreamNames...)
}
return o.displayLogStreams(logGroupName, o.startTime, o.endTime, o.filter, logStreamNames...)
}

return err
workflowStatus := workflowRunLog.WorkflowStatus
log.Warn().Msgf("Cannot find an engine log stream for workflow run '%s', the current status of the run is: '%s', "+
"the log will not be available until after the workflow is RUNNING", o.workflowRunId, workflowStatus)
return nil
}

func streamNamesFromRunLog(workflowRunLog workflow.EngineLog) []string {
var streamNames = make([]string, 0)
if workflowRunLog.StdOut != "" {
streamNames = append(streamNames, workflowRunLog.StdOut)
}
if workflowRunLog.StdErr != "" {
streamNames = append(streamNames, workflowRunLog.StdErr)
}
return streamNames
}

func BuildLogsEngineCommand() *cobra.Command {
vars := logsEngineVars{}
cmd := &cobra.Command{
Use: "engine -c context_name [-f filter] [-s start_date] [-e end_date] [-l look_back] [-t]",
Use: "engine -c context_name [-r run_id] [-f filter] [-s start_date] [-e end_date] [-l look_back] [-t]",
Short: "Show workflow engine logs for a given context.",
Long: `Show workflow engine logs for a given context.
If no start, end, or look back periods are set, this command will show logs from the last hour.`,
Example: `
/code agc logs engine -c myCtx -s 2021/3/31 -e 2021/4/1 -f ERROR`,
/code agc logs engine -c myCtx -r 1234-aed-32db -s 2021/3/31 -e 2021/4/1 -f ERROR`,
RunE: runCmdE(func(cmd *cobra.Command, args []string) error {
opts, err := newLogsEngineOpts(vars)
if err != nil {
Expand All @@ -71,8 +148,15 @@ If no start, end, or look back periods are set, this command will show logs from
if err = opts.Validate(); err != nil {
return err
}
opts.setDefaultEndTimeIfEmpty()
log.Info().Msgf("Showing engine logs for '%s'", opts.contextName)

if vars.workflowRunId == "" {
opts.setDefaultEndTimeIfEmpty()
}
var msg = fmt.Sprintf("Showing engine logs for '%s'", opts.contextName)
if vars.endString != "" {
msg = fmt.Sprintf("Showing engine logs for '%s' from '%s", opts.contextName, opts.endString)
}
log.Info().Msg(msg)
if err = opts.Execute(); err != nil {
return clierror.New("logs engine", vars, err)
}
Expand All @@ -81,5 +165,6 @@ If no start, end, or look back periods are set, this command will show logs from
}
vars.setFilterFlags(cmd)
vars.setContextFlag(cmd)
cmd.Flags().StringVarP(&vars.workflowRunId, runIdFlag, runIdShort, runIdDefault, runIdDescription)
return cmd
}
Loading

0 comments on commit 8bda674

Please sign in to comment.