Skip to content

Commit

Permalink
Add more context for ray log template links (#4416)
Browse files Browse the repository at this point in the history
Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb authored Nov 13, 2023
1 parent 033276d commit 72e7438
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 5 deletions.
32 changes: 30 additions & 2 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ray
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -37,6 +38,14 @@ const (
DisableUsageStatsStartParameter = "disable-usage-stats"
)

var logTemplateRegexes = struct {
RayClusterName *regexp.Regexp
RayJobID *regexp.Regexp
}{
tasklog.MustCreateRegex("rayClusterName"),
tasklog.MustCreateRegex("rayJobID"),
}

type rayJobResourceHandler struct {
}

Expand Down Expand Up @@ -442,8 +451,27 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon

taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID()
input := tasklog.Input{
Namespace: rayJob.Namespace,
TaskExecutionID: taskExecID,
Namespace: rayJob.Namespace,
TaskExecutionID: taskExecID,
ExtraTemplateVarsByScheme: &tasklog.TemplateVarsByScheme{},
}
if rayJob.Status.JobId != "" {
input.ExtraTemplateVarsByScheme.Common = append(
input.ExtraTemplateVarsByScheme.Common,
tasklog.TemplateVar{
Regex: logTemplateRegexes.RayJobID,
Value: rayJob.Status.JobId,
},
)
}
if rayJob.Status.RayClusterName != "" {
input.ExtraTemplateVarsByScheme.Common = append(
input.ExtraTemplateVarsByScheme.Common,
tasklog.TemplateVar{
Regex: logTemplateRegexes.RayClusterName,
Value: rayJob.Status.RayClusterName,
},
)
}

// TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
Expand Down
114 changes: 111 additions & 3 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,13 +608,21 @@ func newPluginContext() k8s.PluginContext {

taskExecID := &mocks.TaskExecutionID{}
taskExecID.OnGetID().Return(core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Name: "my-task-name",
Project: "my-task-project",
Domain: "my-task-domain",
Version: "1",
},
NodeExecutionId: &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "my_name",
Project: "my_project",
Domain: "my_domain",
Name: "my-execution-name",
Project: "my-execution-project",
Domain: "my-execution-domain",
},
},
RetryAttempt: 1,
})
taskExecID.OnGetUniqueNodeID().Return("unique-node")
taskExecID.OnGetGeneratedName().Return("generated-name")
Expand Down Expand Up @@ -678,6 +686,106 @@ func TestGetTaskPhase(t *testing.T) {
}
}

func TestGetEventInfo_LogTemplates(t *testing.T) {
pluginCtx := newPluginContext()
testCases := []struct {
name string
rayJob rayv1alpha1.RayJob
logPlugin tasklog.TemplateLogPlugin
expectedTaskLogs []*core.TaskLog
}{
{
name: "namespace",
rayJob: rayv1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-namespace",
},
},
logPlugin: tasklog.TemplateLogPlugin{
DisplayName: "namespace",
TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}"},
},
expectedTaskLogs: []*core.TaskLog{
{
Name: "namespace",
Uri: "http://test/test-namespace",
},
},
},
{
name: "task execution ID",
rayJob: rayv1alpha1.RayJob{},
logPlugin: tasklog.TemplateLogPlugin{
DisplayName: "taskExecID",
TemplateURIs: []tasklog.TemplateURI{
"http://test/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}",
},
Scheme: tasklog.TemplateSchemeTaskExecution,
},
expectedTaskLogs: []*core.TaskLog{
{
Name: "taskExecID",
Uri: "http://test/projects/my-execution-project/domains/my-execution-domain/executions/my-execution-name/nodeId/unique-node/taskId/my-task-name/attempt/1",
},
},
},
{
name: "ray cluster name",
rayJob: rayv1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-namespace",
},
Status: rayv1alpha1.RayJobStatus{
RayClusterName: "ray-cluster",
},
},
logPlugin: tasklog.TemplateLogPlugin{
DisplayName: "ray cluster name",
TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}/{{ .rayClusterName }}"},
},
expectedTaskLogs: []*core.TaskLog{
{
Name: "ray cluster name",
Uri: "http://test/test-namespace/ray-cluster",
},
},
},
{
name: "ray job ID",
rayJob: rayv1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-namespace",
},
Status: rayv1alpha1.RayJobStatus{
JobId: "ray-job-1",
},
},
logPlugin: tasklog.TemplateLogPlugin{
DisplayName: "ray job ID",
TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}/{{ .rayJobID }}"},
},
expectedTaskLogs: []*core.TaskLog{
{
Name: "ray job ID",
Uri: "http://test/test-namespace/ray-job-1",
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ti, err := getEventInfoForRayJob(
logs.LogConfig{Templates: []tasklog.TemplateLogPlugin{tc.logPlugin}},
pluginCtx,
&tc.rayJob,
)
assert.NoError(t, err)
assert.Equal(t, tc.expectedTaskLogs, ti.Logs)
})
}
}

func TestGetEventInfo_DashboardURL(t *testing.T) {
pluginCtx := newPluginContext()
testCases := []struct {
Expand Down

0 comments on commit 72e7438

Please sign in to comment.