Skip to content

Commit

Permalink
Refactor task logs framework (#4396)
Browse files Browse the repository at this point in the history
* Refactor task logs framework

Signed-off-by: Jeev B <[email protected]>

* Return templateLogPluginCollection instead of nil even if no plugins are specified

Signed-off-by: Jeev B <[email protected]>

---------

Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb authored Nov 10, 2023
1 parent 48f53f0 commit 09cb3b1
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 262 deletions.
25 changes: 7 additions & 18 deletions flyteplugins/go/tasks/logs/config.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,34 @@
package logs

import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
)

//go:generate pflags LogConfig --default-var=DefaultConfig

// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

// LogConfig encapsulates plugins' log configs
type LogConfig struct {
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`
CloudwatchTemplateURI TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"`
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`
CloudwatchTemplateURI tasklog.TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"`

IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"`
// Deprecated: Please use KubernetesTemplateURI
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
KubernetesTemplateURI TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"`
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
KubernetesTemplateURI tasklog.TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"`

IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"`
// Deprecated: Please use StackDriverTemplateURI
GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"`
// Deprecated: Please use StackDriverTemplateURI
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

Templates []TemplateLogPluginConfig `json:"templates" pflag:"-,"`
}
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

type TemplateLogPluginConfig struct {
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
Scheme tasklog.TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."`
Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"`
}

var (
Expand Down
53 changes: 17 additions & 36 deletions flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
"github.com/flyteorg/flyte/flytestdlib/logger"
)

type logPlugin struct {
Name string
Plugin tasklog.Plugin
}

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
if logPlugin == nil {
Expand Down Expand Up @@ -66,67 +61,53 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas
return logs.TaskLogs, nil
}

type taskLogPluginWrapper struct {
logPlugins []logPlugin
type templateLogPluginCollection struct {
plugins []tasklog.TemplateLogPlugin
}

func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklog.Output, err error) {
logs := make([]*core.TaskLog, 0, len(t.logPlugins))
suffix := input.LogName
func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.Output, error) {
var taskLogs []*core.TaskLog

for _, plugin := range t.logPlugins {
input.LogName = plugin.Name + suffix
o, err := plugin.Plugin.GetTaskLogs(input)
for _, plugin := range t.plugins {
o, err := plugin.GetTaskLogs(input)
if err != nil {
return tasklog.Output{}, err
}

logs = append(logs, o.TaskLogs...)
taskLogs = append(taskLogs, o.TaskLogs...)
}

return tasklog.Output{
TaskLogs: logs,
}, nil
return tasklog.Output{TaskLogs: taskLogs}, nil
}

// InitializeLogPlugins initializes log plugin based on config.
func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
// Use a list to maintain order.
logPlugins := make([]logPlugin, 0, 2)
var plugins []tasklog.TemplateLogPlugin

if cfg.IsKubernetesEnabled {
if len(cfg.KubernetesTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsCloudwatchEnabled {
if len(cfg.CloudwatchTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsStackDriverEnabled {
if len(cfg.StackDriverTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON})
}
}

if len(cfg.Templates) > 0 {
for _, cfg := range cfg.Templates {
logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.Scheme, cfg.TemplateURIs, cfg.MessageFormat)})
}
}

if len(logPlugins) == 0 {
return nil, nil
}

return taskLogPluginWrapper{logPlugins: logPlugins}, nil
plugins = append(plugins, cfg.Templates...)
return templateLogPluginCollection{plugins: plugins}, nil
}
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c

func TestGetLogsForContainerInPod_Templates(t *testing.T) {
assertTestSucceeded(t, &LogConfig{
Templates: []TemplateLogPluginConfig{
Templates: []tasklog.TemplateLogPlugin{
{
DisplayName: "StackDriver",
TemplateURIs: []string{
Expand Down
10 changes: 10 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
TemplateSchemeTaskExecution
)

// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

type TemplateVar struct {
Regex *regexp.Regexp
Value string
Expand Down Expand Up @@ -57,3 +60,10 @@ type Plugin interface {
// Generates a TaskLog object given necessary computation information
GetTaskLogs(i Input) (logs Output, err error)
}

type TemplateLogPlugin struct {
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
Scheme TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."`
}
57 changes: 12 additions & 45 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type templateRegexes struct {
ExecutionName *regexp.Regexp
ExecutionProject *regexp.Regexp
ExecutionDomain *regexp.Regexp
GeneratedName *regexp.Regexp
}

func initDefaultRegexes() templateRegexes {
Expand All @@ -58,6 +59,7 @@ func initDefaultRegexes() templateRegexes {
MustCreateRegex("executionName"),
MustCreateRegex("executionProject"),
MustCreateRegex("executionDomain"),
MustCreateRegex("generatedName"),
}
}

Expand Down Expand Up @@ -121,6 +123,10 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
defaultRegexes.NodeID,
input.TaskExecutionID.GetUniqueNodeID(),
},
TemplateVar{
defaultRegexes.GeneratedName,
input.TaskExecutionID.GetGeneratedName(),
},
TemplateVar{
defaultRegexes.TaskRetryAttempt,
strconv.FormatUint(uint64(taskExecutionIdentifier.RetryAttempt), 10),
Expand Down Expand Up @@ -172,55 +178,16 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
return vars
}

// A simple log plugin that supports templates in urls to build the final log link.
// See `defaultRegexes` for supported templates.
type TemplateLogPlugin struct {
scheme TemplateScheme
templateUris []string
messageFormat core.TaskLog_MessageFormat
}

func (s TemplateLogPlugin) GetTaskLog(podName, podUID, namespace, containerName, containerID, logName string, podRFC3339StartTime string, podRFC3339FinishTime string, podUnixStartTime, podUnixFinishTime int64) (core.TaskLog, error) {
o, err := s.GetTaskLogs(Input{
LogName: logName,
Namespace: namespace,
PodName: podName,
PodUID: podUID,
ContainerName: containerName,
ContainerID: containerID,
PodRFC3339StartTime: podRFC3339StartTime,
PodRFC3339FinishTime: podRFC3339FinishTime,
PodUnixStartTime: podUnixStartTime,
PodUnixFinishTime: podUnixFinishTime,
})

if err != nil || len(o.TaskLogs) == 0 {
return core.TaskLog{}, err
}

return *o.TaskLogs[0], nil
}

func (s TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
templateVars := input.templateVarsForScheme(s.scheme)
taskLogs := make([]*core.TaskLog, 0, len(s.templateUris))
for _, templateURI := range s.templateUris {
func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
templateVars := input.templateVarsForScheme(p.Scheme)
taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs))
for _, templateURI := range p.TemplateURIs {
taskLogs = append(taskLogs, &core.TaskLog{
Uri: replaceAll(templateURI, templateVars),
Name: input.LogName,
MessageFormat: s.messageFormat,
Name: p.DisplayName + input.LogName,
MessageFormat: p.MessageFormat,
})
}

return Output{TaskLogs: taskLogs}, nil
}

// NewTemplateLogPlugin creates a template-based log plugin with the provided template Uri and message format.
// See `defaultRegexes` for supported templates.
func NewTemplateLogPlugin(scheme TemplateScheme, templateUris []string, messageFormat core.TaskLog_MessageFormat) TemplateLogPlugin {
return TemplateLogPlugin{
scheme: scheme,
templateUris: templateUris,
messageFormat: messageFormat,
}
}
Loading

0 comments on commit 09cb3b1

Please sign in to comment.