Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into ohCloud_ExitCodeConversion
Browse files Browse the repository at this point in the history
  • Loading branch information
OhCloud authored Mar 23, 2022
2 parents 7a722f3 + 2a8fab0 commit b23c939
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 7 deletions.
50 changes: 49 additions & 1 deletion packages/cli/internal/pkg/cli/logs_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package cli

import (
"bufio"
"bytes"
"errors"
"fmt"
Expand Down Expand Up @@ -104,11 +105,58 @@ func (o *logsWorkflowOpts) Execute() error {
}
} else {
printRunLog(runLog)
// Go and fetch the run-level log text if available
if len(runLog.Stdout) > 0 {
logDataStream, err := o.workflowManager.GetRunLogData(o.runId, runLog.Stdout)
if err != nil {
log.Error().Msgf("Could not retrieve standard output from %s: %v", runLog.Stdout, err)
} else {
printLn("Run Standard Output:")
// We would like to copy from logDataStream to standard output,
// but we can't.
// We aren't actually allowed to use standard output here; we
// must do all our output through printLn, because otherwise
// the test harness cannot capture it and see that we have done
// it, and we fail the tests.
// So we need to go through each line in logDataStream, and printLn it.
scanner := bufio.NewScanner(*logDataStream)
for scanner.Scan() {
printLn(scanner.Text())
}
err = scanner.Err()
// TODO: bufio's Scanner can't handle arbitrarily long lines.
// If it finds a line longer than 64k, it will stop with an
// error.
// We can raise the limit, but we can't get rid of the limit.
// To fully support arbitrarily long lines in the log, we need
// to come up with a better way for the test harness to collect
// streaming output.
if err != nil {
return err
}
}
}
if len(runLog.Stderr) > 0 {
logDataStream, err := o.workflowManager.GetRunLogData(o.runId, runLog.Stderr)
if err != nil {
log.Error().Msgf("Could not retrieve standard error from %s: %v", runLog.Stderr, err)
} else {
printLn("Run Standard Error:")
scanner := bufio.NewScanner(*logDataStream)
for scanner.Scan() {
printLn(scanner.Text())
}
err = scanner.Err()
if err != nil {
return err
}
}
}
return nil
}

if len(jobIds) == 0 {
log.Info().Msgf("No logs available for run '%s'. Please try again later.", o.runId)
log.Info().Msgf("No job logs available for run '%s'. Please try again later.", o.runId)
return nil
}
notCachedJobIds := filterCachedJobIds(jobIds)
Expand Down
35 changes: 35 additions & 0 deletions packages/cli/internal/pkg/cli/logs_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cli
import (
"errors"
"fmt"
"io"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -147,6 +148,40 @@ func TestLogsWorkflowOpts_Execute(t *testing.T) {
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: \n\tName\t\tJobId\t\tStartTime\tStopTimeExitCode\n\tTest Task Name\tTest Job Id\t<nil>\t\t<nil>\t\n\t\n",
},
"runId stdout URL": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
opts.runId = testRunId
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLog(testRunId).Return(workflow.RunLog{
RunId: testRunId,
State: "COMPLETE",
Tasks: []workflow.Task(nil),
Stdout: "log/out",
}, nil)
stream := io.NopCloser(strings.NewReader("This is output"))
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLogData(testRunId, "log/out").Return(&stream, nil)
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: No task logs available\nRun Standard Output:\nThis is output\n",
},
"runId stderr URL": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
opts.runId = testRunId
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLog(testRunId).Return(workflow.RunLog{
RunId: testRunId,
State: "COMPLETE",
Tasks: []workflow.Task(nil),
Stderr: "log/err",
}, nil)
stream := io.NopCloser(strings.NewReader("This is error"))
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLogData(testRunId, "log/err").Return(&stream, nil)
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: No task logs available\nRun Standard Error:\nThis is error\n",
},
"runId no jobs": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/internal/pkg/cli/workflow/workflow_status.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package workflow

import (
"io"
)

type StatusManager interface {
StatusWorkflowAll(numInstances int) ([]InstanceSummary, error)
StatusWorkflowByInstanceId(instanceId string) ([]InstanceSummary, error)
Expand All @@ -9,6 +13,7 @@ type StatusManager interface {

type TasksManager interface {
GetRunLog(runId string) (RunLog, error)
GetRunLogData(runId string, dataUrl string) (*io.ReadCloser, error)
GetWorkflowTasks(runId string) ([]Task, error)
StatusWorkflowByName(workflowName string, numInstances int) ([]InstanceSummary, error)
}
Expand Down
30 changes: 24 additions & 6 deletions packages/cli/internal/pkg/cli/workflow/workflow_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workflow
import (
"fmt"
"strconv"
"io"
"strings"
"time"

Expand All @@ -19,9 +20,11 @@ type Task struct {
}

type RunLog struct {
RunId string
State string
Tasks []Task
RunId string
State string
Stdout string
Stderr string
Tasks []Task
}

func (m *Manager) GetWorkflowTasks(runId string) ([]Task, error) {
Expand Down Expand Up @@ -50,12 +53,26 @@ func (m *Manager) GetRunLog(runId string) (RunLog, error) {
}

return RunLog{
RunId: m.taskProps.runLog.RunId,
State: string(m.taskProps.runLog.State),
Tasks: tasks,
RunId: m.taskProps.runLog.RunId,
State: string(m.taskProps.runLog.State),
Stdout: m.taskProps.runLog.RunLog.Stdout,
Stderr: m.taskProps.runLog.RunLog.Stderr,
Tasks: tasks,
}, nil
}

func (m *Manager) GetRunLogData(runId string, dataUrl string) (*io.ReadCloser, error) {
if m.err != nil {
return nil, m.err
}
var stream *io.ReadCloser
stream, m.err = m.wes.GetRunLogData(context.Background(), runId, dataUrl)
if m.err != nil {
return nil, m.err
}
return stream, nil
}

func (m *Manager) setContextForRun(runId string) {
if m.err != nil {
return
Expand All @@ -73,6 +90,7 @@ func (m *Manager) getRunLog(runId string) {
return
}
m.runLog, m.err = m.wes.GetRunLog(context.Background(), runId)
log.Debug().Msgf("Obtained log: %v", m.runLog)
}

func (m *Manager) getTasks() ([]Task, error) {
Expand Down
16 changes: 16 additions & 0 deletions packages/cli/internal/pkg/mocks/manager/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions packages/cli/internal/pkg/mocks/wes/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions packages/cli/internal/pkg/wes/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wes

import (
"context"
"io"

"github.com/aws/amazon-genomics-cli/internal/pkg/wes/option"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -51,3 +52,8 @@ func (c *Client) GetRunLog(ctx context.Context, runId string) (wes.RunLog, error
runLog, _, err := c.wes.WorkflowExecutionServiceApi.GetRunLog(ctx, runId)
return runLog, err
}

func (c *Client) GetRunLogData(ctx context.Context, runId string, dataUrl string) (*io.ReadCloser, error) {
runLogDataStream, _, err := c.wes.WorkflowExecutionServiceApi.GetRunLogData(ctx, runId, dataUrl)
return runLogDataStream, err
}
2 changes: 2 additions & 0 deletions packages/cli/internal/pkg/wes/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wes

import (
"context"
"io"

"github.com/aws/amazon-genomics-cli/internal/pkg/wes/option"
wes "github.com/rsc/wes_client"
Expand All @@ -12,4 +13,5 @@ type Interface interface {
GetRunStatus(ctx context.Context, runId string) (string, error)
StopWorkflow(ctx context.Context, runId string) error
GetRunLog(ctx context.Context, runId string) (wes.RunLog, error)
GetRunLogData(ctx context.Context, runId string, dataUrl string) (*io.ReadCloser, error)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Additional methods for actually retrieving data pointed to by URLs in WES responses.
*/

package wes_client

import (
_context "context"
_ioutil "io/ioutil"
_nethttp "net/http"
_neturl "net/url"
"fmt"
"io"
"strings"
)

// Linger please
var (
_ _context.Context
)

/*
GetRunLogData Get data linked to by GetRunLog.
Returns a stream for the content of a URL referenced in a GetRunLog response.
* @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
* @param runId The run ID used for the GetRunLog call
* @param dataUrl The URL string from the GetRunLog call
@return *io.ReadCloser
*/
func (a *WorkflowExecutionServiceApiService) GetRunLogData(ctx _context.Context, runId string, dataUrl string) (*io.ReadCloser, *_nethttp.Response, error) {
var (
localVarHTTPMethod = _nethttp.MethodGet
localVarPostBody interface{}
localVarFormFileName string
localVarReturnValue *io.ReadCloser = nil
)

// create path and map variables
localVarPath := a.client.cfg.BasePath + "/runs/{run_id}"
localVarPath = strings.Replace(localVarPath, "{"+"run_id"+"}", _neturl.PathEscape(parameterToString(runId, "")), -1)

// Evaluate dataUrl relative to localVarPath and replace localVarPath
base, err := _neturl.Parse(localVarPath)
if err != nil {
return localVarReturnValue, nil, err
}
evaluated, err := base.Parse(dataUrl)
if err != nil {
return localVarReturnValue, nil, err
}
if (evaluated.Scheme != base.Scheme && !strings.HasPrefix(evaluated.Scheme, "http")) {
// This doesn't look like something we can fetch
return localVarReturnValue, nil, fmt.Errorf("WES cannot be used to retrieve %s", dataUrl)
}
localVarPath = evaluated.String()

// Request headers will go in here.
localVarHeaderParams := make(map[string]string)
// We don't use any of these, but we need them to invoke prepareRequest.
files := make(map[string][]byte)
localVarQueryParams := _neturl.Values{}
localVarFormParams := _neturl.Values{}

// We don't need any accept type choosing logic; we can only accept plain text.
localVarHeaderParams["Accept"] = "text/plain"

r, err := a.client.prepareRequest(ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, files)
if err != nil {
return localVarReturnValue, nil, err
}

// Make the request
localVarHTTPResponse, err := a.client.callAPI(r)
if err != nil || localVarHTTPResponse == nil {
return localVarReturnValue, localVarHTTPResponse, err
}
// Be ready to return the body stream
localVarReturnValue = &localVarHTTPResponse.Body

if localVarHTTPResponse.StatusCode >= 300 {
// Something has gone wrong sever-side (and this isn't a redirect)
// Fetch the entire body
localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body)
localVarHTTPResponse.Body.Close()
newErr := GenericOpenAPIError{
body: localVarBody,
error: localVarHTTPResponse.Status, // Despite the name, this must be a string
}
if err != nil {
// Something went wrong during error download.
// Add that error to our error as a string.
newErr.error = fmt.Sprintf("Failed to download body of HTTP error %d %s response: %v", localVarHTTPResponse.StatusCode, localVarHTTPResponse.Status, err)
return localVarReturnValue, localVarHTTPResponse, newErr
}
// Otherwise, we downloaded something. Maybe we can parse it as a WES-style JSON error.
var v ErrorResponse
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
// Nope, it's not a WES-style error.
// Don't explain why it's not parseable, just pass along what the server said.
newErr.error = fmt.Sprintf("Instead of log data, server sent a %d %s error with content: %s", localVarHTTPResponse.StatusCode, localVarHTTPResponse.Status, localVarBody)
return localVarReturnValue, localVarHTTPResponse, newErr
}
// Otherwise, it is a WES-style error we can understand (even if not a normally acceptable WES error code)
newErr.model = v
return localVarReturnValue, localVarHTTPResponse, newErr
}
// TODO: handle redirects?

return localVarReturnValue, localVarHTTPResponse, nil
}

0 comments on commit b23c939

Please sign in to comment.