Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

feat: Download and report WES run log files #319

Merged
merged 4 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion packages/cli/internal/pkg/cli/logs_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,30 @@ func (o *logsWorkflowOpts) Execute() error {
}
} else {
printRunLog(runLog)
// Go and fetch the run-level log text if available
if len(runLog.Stdout) > 0 {
logData, err := o.workflowManager.GetRunLogData(o.runId, runLog.Stdout)
if err != nil {
log.Error().Msgf("Could not retrieve standard output from %s: %v", runLog.Stdout, err)
} else {
printLn("Run Standard Output:")
printLn(logData)
}
}
if len(runLog.Stderr) > 0 {
logData, err := o.workflowManager.GetRunLogData(o.runId, runLog.Stderr)
if err != nil {
log.Error().Msgf("Could not retrieve standard error from %s: %v", runLog.Stderr, err)
} else {
printLn("Run Standard Error:")
printLn(logData)
}
}
return nil
}

if len(jobIds) == 0 {
log.Info().Msgf("No logs available for run '%s'. Please try again later.", o.runId)
log.Info().Msgf("No job logs available for run '%s'. Please try again later.", o.runId)
return nil
}
notCachedJobIds := filterCachedJobIds(jobIds)
Expand Down
32 changes: 32 additions & 0 deletions packages/cli/internal/pkg/cli/logs_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,38 @@ func TestLogsWorkflowOpts_Execute(t *testing.T) {
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: \n\tName\t\tJobId\t\tStartTime\tStopTimeExitCode\n\tTest Task Name\tTest Job Id\t<nil>\t\t<nil>\t\n\t\n",
},
"runId stdout URL": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
opts.runId = testRunId
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLog(testRunId).Return(workflow.RunLog{
RunId: testRunId,
State: "COMPLETE",
Tasks: []workflow.Task(nil),
Stdout: "log/out",
}, nil)
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLogData(testRunId, "log/out").Return("This is output", nil)
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: No task logs available\nRun Standard Output:\nThis is output\n",
},
"runId stderr URL": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
opts.runId = testRunId
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLog(testRunId).Return(workflow.RunLog{
RunId: testRunId,
State: "COMPLETE",
Tasks: []workflow.Task(nil),
Stderr: "log/err",
}, nil)
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLogData(testRunId, "log/err").Return("This is error", nil)
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: No task logs available\nRun Standard Error:\nThis is error\n",
},
"runId no jobs": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
Expand Down
1 change: 1 addition & 0 deletions packages/cli/internal/pkg/cli/workflow/workflow_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type StatusManager interface {

type TasksManager interface {
GetRunLog(runId string) (RunLog, error)
GetRunLogData(runId string, dataUrl string) (string, error)
GetWorkflowTasks(runId string) ([]Task, error)
StatusWorkflowByName(workflowName string, numInstances int) ([]InstanceSummary, error)
}
Expand Down
29 changes: 23 additions & 6 deletions packages/cli/internal/pkg/cli/workflow/workflow_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ type Task struct {
}

type RunLog struct {
RunId string
State string
Tasks []Task
RunId string
State string
Stdout string
Stderr string
Tasks []Task
}

func (m *Manager) GetWorkflowTasks(runId string) ([]Task, error) {
Expand Down Expand Up @@ -49,12 +51,26 @@ func (m *Manager) GetRunLog(runId string) (RunLog, error) {
}

return RunLog{
RunId: m.taskProps.runLog.RunId,
State: string(m.taskProps.runLog.State),
Tasks: tasks,
RunId: m.taskProps.runLog.RunId,
State: string(m.taskProps.runLog.State),
Stdout: m.taskProps.runLog.RunLog.Stdout,
Stderr: m.taskProps.runLog.RunLog.Stderr,
Tasks: tasks,
}, nil
}

func (m *Manager) GetRunLogData(runId string, dataUrl string) (string, error) {
if m.err != nil {
return "", m.err
}
var data string
data, m.err = m.wes.GetRunLogData(context.Background(), runId, dataUrl)
if m.err != nil {
return "", m.err
}
return data, nil
}

func (m *Manager) setContextForRun(runId string) {
if m.err != nil {
return
Expand All @@ -72,6 +88,7 @@ func (m *Manager) getRunLog(runId string) {
return
}
m.runLog, m.err = m.wes.GetRunLog(context.Background(), runId)
log.Debug().Msgf("Obtained log: %v", m.runLog)
}

func (m *Manager) getTasks() ([]Task, error) {
Expand Down
15 changes: 15 additions & 0 deletions packages/cli/internal/pkg/mocks/manager/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions packages/cli/internal/pkg/mocks/wes/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions packages/cli/internal/pkg/wes/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ func (c *Client) GetRunLog(ctx context.Context, runId string) (wes.RunLog, error
runLog, _, err := c.wes.WorkflowExecutionServiceApi.GetRunLog(ctx, runId)
return runLog, err
}

func (c *Client) GetRunLogData(ctx context.Context, runId string, dataUrl string) (string, error) {
runLogData, _, err := c.wes.WorkflowExecutionServiceApi.GetRunLogData(ctx, runId, dataUrl)
return runLogData, err
}
1 change: 1 addition & 0 deletions packages/cli/internal/pkg/wes/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type Interface interface {
GetRunStatus(ctx context.Context, runId string) (string, error)
StopWorkflow(ctx context.Context, runId string) error
GetRunLog(ctx context.Context, runId string) (wes.RunLog, error)
GetRunLogData(ctx context.Context, runId string, dataUrl string) (string, error)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Additional methods for actually retrieving data pointed to by URLs in WES responses.
*/

package wes_client

import (
_context "context"
_ioutil "io/ioutil"
_nethttp "net/http"
_neturl "net/url"
"fmt"
"strings"
)

// Linger please
var (
_ _context.Context
)

/*
GetRunLogData Get data linked to by GetRunLog.
Returns the content of a URL reverenced in a GetRunLog response, if that URL is also on the WES server.
* @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
* @param runId The run ID used for the GetRunLog call
* @param dataUrl The URL string from the GetRunLog call
@return string
*/
func (a *WorkflowExecutionServiceApiService) GetRunLogData(ctx _context.Context, runId string, dataUrl string) (string, *_nethttp.Response, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this is inspired by the OpenAPI-generated api_workflow_execution_service.go? Could we be a little less verbose if we don't think of this as an extension/part of the WES client?

I find WES to be a little under-specified in some places, but I'm no expert and am pretty much going off the OpenAPI spec. If I'm wrong about any of my interpretations of what this URL should be and what should be contained there, let me know.

If we're assuming these are just text files available at either an absolute URL or one relative to the WES endpoint, can we just download that file? Do we need to bother with checking the WES-specified error codes and content types? I'm not sure we have any guarantees that the endpoint hosting the file will respond with the error codes/payloads that the WES client is expecting.

Not something we need to solve right now, but what's prompting me to consider a slightly less WES-endpoint-coupled approach is that most of our existing engines run in Batch, and we currently access their logs via Batch+Cloudwatch. Cloudwatch Logs don't have a standard URL scheme, but I suspect we'll (AGC) eventually want to extend this to allow for loading stdout/stderr from an S3 file or a Cloudwatch Log stream as well as via a URL.

For now, I'm just wondering if we can clean up some of this generated-ish code by just adding some plain-old file loading to packages/cli/internal/pkg/wes/Client.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not so much "inspired by" the OpenAPI-generated code as it is "a complete copy-paste of" the OpenAPI-generated code. I did that for consistency with the rest of the WES API implementation, since retrieving these logs is specified in WES (just in English that the code generator can't read). I also did it because I thought it would work better, faster, than me trying to write something from scratch.

If we don't care about consistency, we could streamline this to look a little more like a human wrote it.

It's not quite as simple as just downloading the file; we definitely need to visit prepareRequest, which does the signing to get the request through AGC's proxy, assuming the URL we got actually points to the server that gave it to us.

I left all the error code detection and error response parsing identical to the WES API request I copied from. If we get an error specified by WES, we'll understand it as specified by WES, and if we get some other error, we won't really understand it but we'll report it anyway. Probably nothing bad would happen if I cut the code, but I took leaving it in as the default.

This should eventually be extended to support S3 or CloudWatch URLs; it could even get AGC away from the | task names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer human-readable code to code that's consistent with the generated code, personally. Generated code is good in that it's generated, but not good for much else 😛

If these URLs being URLs on the WES server itself is common, then I'm fine with continuing to prepare requests and parse errors in the WES style.

var (
localVarHTTPMethod = _nethttp.MethodGet
localVarPostBody interface{}
localVarFormFileName string
localVarReturnValue string
)

// create path and map variables
localVarPath := a.client.cfg.BasePath + "/runs/{run_id}"
localVarPath = strings.Replace(localVarPath, "{"+"run_id"+"}", _neturl.PathEscape(parameterToString(runId, "")), -1)

// Evaluate dataUrl relative to localVarPath and replace localVarPath
base, err := _neturl.Parse(localVarPath)
if err != nil {
return localVarReturnValue, nil, err
}
evaluated, err := base.Parse(dataUrl)
if err != nil {
return localVarReturnValue, nil, err
}
if (evaluated.Scheme != base.Scheme && !strings.HasPrefix(evaluated.Scheme, "http")) {
// This doesn't look like something we can fetch
return localVarReturnValue, nil, fmt.Errorf("WES cannot be used to retrieve %s", dataUrl)
}
localVarPath = evaluated.String()

localVarHeaderParams := make(map[string]string)
files := make(map[string][]byte)
localVarQueryParams := _neturl.Values{}
localVarFormParams := _neturl.Values{}

// to determine the Content-Type header
localVarHTTPContentTypes := []string{}

// set Content-Type header
localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes)
if localVarHTTPContentType != "" {
localVarHeaderParams["Content-Type"] = localVarHTTPContentType
}

// to determine the Accept header
localVarHTTPHeaderAccepts := []string{"text/plain"}

// set Accept header
localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts)
if localVarHTTPHeaderAccept != "" {
localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept
}
r, err := a.client.prepareRequest(ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, files)
if err != nil {
return localVarReturnValue, nil, err
}

localVarHTTPResponse, err := a.client.callAPI(r)
if err != nil || localVarHTTPResponse == nil {
return localVarReturnValue, localVarHTTPResponse, err
}

localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we concerned about using ReadAll to load a potentially-large log file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not overly concerned about file size here; even a very large workflow log is probably under a gigabyte.

Streaming would of course be better, but I didn't want to invest the time to learn, design and implement a good way to stream it here. It's going to the console, so when it starts to get too large for memory it starts to get too large to really be useful. But it's also not a good idea to let a very large file crash the client.

(Really we would want to be able to make HEAD and byte range requests or something, and be able to get just a manageable piece from the end of the log, and to let the user then ask for other parts of the log somehow, but I also didn't want to build out that whole system right now.)

Maybe implementing streaming wouldn't actually be more work than it's worth. I could revise the download code to be able to handle errors internally while streaming successful responses, and change the return type to whatever type Body is, and then where it's actually used figure out what to do with the stream (it looks like io.Copy can copy streams to streams?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusting this to return a stream, and passing the stream up to the CLI command implementation in logs_workflow.go, and using io.Copy there, breaks the tests, because the data is copied to the standard output of the test process, instead of wherever the output capturing machinery that makes the expectedOutput values in logs_workflow_test.go expects the output to be going. It looks like the test code allows only printLn to be used for output.

localVarHTTPResponse.Body.Close()
if err != nil {
return localVarReturnValue, localVarHTTPResponse, err
}

if localVarHTTPResponse.StatusCode >= 300 {
newErr := GenericOpenAPIError{
body: localVarBody,
error: localVarHTTPResponse.Status,
}
if localVarHTTPResponse.StatusCode == 401 {
var v ErrorResponse
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarReturnValue, localVarHTTPResponse, newErr
}
newErr.model = v
return localVarReturnValue, localVarHTTPResponse, newErr
}
if localVarHTTPResponse.StatusCode == 403 {
var v ErrorResponse
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarReturnValue, localVarHTTPResponse, newErr
}
newErr.model = v
return localVarReturnValue, localVarHTTPResponse, newErr
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above. It might be reasonable to expect the endpoint to reply with standard HTTP 403/etc. codes, but can we expect the response payload to be something the WES client would be able to decode?

Copy link
Contributor Author

@adamnovak adamnovak Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way Toil implements this is by having log-retrieval endpoints as Toil-specific extensions to the WES server, so when they return errors they will probably be WES-style errors (401, 403, 404, or 500, with a JSON body with msg and status_code). So if we do get such an error, we probably should parse it as laid out in WES, if we can.

If we get something else, we definitely need to still return an error, but I'm not sure what kind of error. Right now it looks like it would be undefined response type or some complaint about unmarshalling. Maybe when decoding fails, instead of returning the error decode raised (wrapped as a WES error), we should return something more like Error: the server said <a bunch of bytes the server sent>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that's reasonable if that's a common case for these URLs. But yeah, I would suggest that we be prepared to just output the raw response of the server in case the error isn't a WES error. Perhaps just printing that something failed in the normal case and printing the raw response from the server in a debug case?

if localVarHTTPResponse.StatusCode == 404 {
var v ErrorResponse
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarReturnValue, localVarHTTPResponse, newErr
}
newErr.model = v
return localVarReturnValue, localVarHTTPResponse, newErr
}
if localVarHTTPResponse.StatusCode == 500 {
var v ErrorResponse
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
newErr.error = err.Error()
return localVarReturnValue, localVarHTTPResponse, newErr
}
newErr.model = v
}
return localVarReturnValue, localVarHTTPResponse, newErr
}

// TODO: Handle odd encodings, unacceptable UTF-8, etc. somehow.
localVarReturnValue = string(localVarBody)

return localVarReturnValue, localVarHTTPResponse, nil
}