From 911285eafea89e586ee809755a6ff7bfc8460b36 Mon Sep 17 00:00:00 2001 From: Vaidas Jablonskis Date: Thu, 22 Feb 2024 08:42:06 +0200 Subject: [PATCH] source: iteration of the ai-brain mainly --- cmd/source/ai-brain/main.go | 81 +------- internal/source/ai-brain/assistant.go | 262 +++++++++++++++++++++++++ internal/source/ai-brain/brain.go | 177 +++++++++++++++++ internal/source/ai-brain/config.go | 38 ---- internal/source/ai-brain/processor.go | 103 ---------- internal/source/ai-brain/quick_resp.go | 37 ---- internal/source/ai-brain/tools.go | 65 ++++++ 7 files changed, 507 insertions(+), 256 deletions(-) create mode 100644 internal/source/ai-brain/assistant.go create mode 100644 internal/source/ai-brain/brain.go delete mode 100644 internal/source/ai-brain/config.go delete mode 100644 internal/source/ai-brain/processor.go delete mode 100644 internal/source/ai-brain/quick_resp.go create mode 100644 internal/source/ai-brain/tools.go diff --git a/cmd/source/ai-brain/main.go b/cmd/source/ai-brain/main.go index 4feceecb..84c8fc5e 100644 --- a/cmd/source/ai-brain/main.go +++ b/cmd/source/ai-brain/main.go @@ -1,98 +1,23 @@ package main import ( - "context" _ "embed" - "fmt" - "sync" "github.com/kubeshop/botkube-cloud-plugins/internal/auth" "github.com/hashicorp/go-plugin" + aibrain "github.com/kubeshop/botkube-cloud-plugins/internal/source/ai-brain" - "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/api/source" - "github.com/kubeshop/botkube/pkg/loggerx" - "github.com/sirupsen/logrus" ) // version is set via ldflags by GoReleaser. var version = "dev" -const ( - pluginName = "ai-brain" - description = "Calls AI engine with incoming webhook prompts and streams the response." -) - -// AI implements Botkube source plugin. -type AI struct { - incomingPrompts sync.Map -} - -// Metadata returns details about plugin. -func (*AI) Metadata(context.Context) (api.MetadataOutput, error) { - return api.MetadataOutput{ - Version: version, - Description: description, - Recommended: true, - JSONSchema: api.JSONSchema{ - Value: aibrain.ConfigJSONSchema, - }, - ExternalRequest: api.ExternalRequestMetadata{ - Payload: api.ExternalRequestPayload{ - JSONSchema: api.JSONSchema{ - Value: aibrain.IncomingWebhookJSONSchema, - }, - }, - }, - }, nil -} - -// Stream implements Botkube source plugin. -func (a *AI) Stream(_ context.Context, in source.StreamInput) (source.StreamOutput, error) { - cfg, err := aibrain.MergeConfigs(in.Configs) - if err != nil { - return source.StreamOutput{}, fmt.Errorf("while merging configuration: %w", err) - } - - log := loggerx.New(cfg.Log) - out := source.StreamOutput{ - Event: make(chan source.Event), - } - go a.processPrompts(in.Context.SourceName, out.Event, log) - - log.Infof("Setup successful for source configuration %q", in.Context.SourceName) - return out, nil -} - -func (a *AI) processPrompts(sourceName string, event chan<- source.Event, log logrus.FieldLogger) { - a.incomingPrompts.Store(sourceName, aibrain.NewProcessor(log, event)) -} - -// HandleExternalRequest handles incoming payload and returns an event based on it. -func (a *AI) HandleExternalRequest(_ context.Context, in source.ExternalRequestInput) (source.ExternalRequestOutput, error) { - brain, ok := a.incomingPrompts.Load(in.Context.SourceName) - if !ok { - return source.ExternalRequestOutput{}, fmt.Errorf("source %q not found", in.Context.SourceName) - } - quickResponse, err := brain.(*aibrain.Processor).Process(in.Payload) - if err != nil { - return source.ExternalRequestOutput{}, fmt.Errorf("while processing payload: %w", err) - } - - return source.ExternalRequestOutput{ - Event: source.Event{ - Message: quickResponse, - }, - }, nil -} - func main() { source.Serve(map[string]plugin.Plugin{ - pluginName: &source.Plugin{ - Source: auth.NewProtectedSource(&AI{ - incomingPrompts: sync.Map{}, - }), + aibrain.PluginName: &source.Plugin{ + Source: auth.NewProtectedSource(aibrain.NewSource(version)), }, }) } diff --git a/internal/source/ai-brain/assistant.go b/internal/source/ai-brain/assistant.go new file mode 100644 index 00000000..e01d1704 --- /dev/null +++ b/internal/source/ai-brain/assistant.go @@ -0,0 +1,262 @@ +package aibrain + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/kubeshop/botkube/pkg/api" + "github.com/kubeshop/botkube/pkg/api/source" + "github.com/sashabaranov/go-openai" + "github.com/sirupsen/logrus" +) + +// Payload represents incoming webhook payload. +type Payload struct { + Prompt string `json:"prompt"` + MessageID string `json:"messageId"` +} + +// handle is simplified - don't do that this way! +func (i *sourceInstance) handle(in source.ExternalRequestInput) (api.Message, error) { + p := new(Payload) + err := json.Unmarshal(in.Payload, p) + if err != nil { + return api.Message{}, fmt.Errorf("while unmarshalling payload: %w", err) + } + + // TODO: why is the Prompt prefixed with `ai-face`? + if p.Prompt == "ai-face" { + return api.NewPlaintextMessage("Please clarify your question.", false), nil + } + + // Cleanup the prompt. + p.Prompt = strings.TrimPrefix(p.Prompt, "ai-face") + + // TODO: needs better goroutine management with persistent thread mapping. + go func() { + _ = i.handleThread(context.Background(), p) + }() + + return api.Message{ + ParentActivityID: p.MessageID, + Sections: []api.Section{ + { + // TODO: remove? + Base: api.Base{ + Body: api.Body{Plaintext: "Let me figure this out.."}, + }, + }, + }, + }, nil +} + +// handleThread creates a new OpenAI assistant thread and handles the conversation. +func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error { + // Start a new thread and run it. + run, err := i.openaiClient.CreateThreadAndRun(ctx, openai.CreateThreadAndRunRequest{ + RunRequest: openai.RunRequest{ + AssistantID: i.assistID, + }, + Thread: openai.ThreadRequest{ + Metadata: map[string]any{ + "messageId": p.MessageID, + }, + Messages: []openai.ThreadMessage{ + { + Role: openai.ThreadMessageRoleUser, + Content: p.Prompt, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("while creating thread and run: %w", err) + } + + for { + // Wait a little bit before polling. OpenAI assistant api does not support streaming yet. + time.Sleep(2 * time.Second) + + // Get the run. + run, err = i.openaiClient.RetrieveRun(ctx, run.ThreadID, run.ID) + if err != nil { + i.log.WithError(err).Error("while retrieving assistant thread run") + continue + } + + i.log.WithFields(logrus.Fields{ + "messageId": p.MessageID, + "runStatus": run.Status, + }).Debug("retrieved assistant thread run") + + switch run.Status { + case openai.RunStatusCancelling, openai.RunStatusFailed, openai.RunStatusExpired: + // TODO tell the user that the assistant has stopped processing the request. + continue + + // We have to wait. Here we could tell the user that we are waiting. + case openai.RunStatusQueued, openai.RunStatusInProgress: + continue + + // Fetch and return the response. + case openai.RunStatusCompleted: + if err = i.handleStatusCompleted(ctx, run, p); err != nil { + i.log.WithError(err).Error("while handling completed case") + continue + } + return nil + + // The assistant is attempting to call a function. + case openai.RunStatusRequiresAction: + if err = i.handleStatusRequiresAction(ctx, run); err != nil { + return fmt.Errorf("while handling requires action: %w", err) + } + } + } +} + +func (i *sourceInstance) handleStatusCompleted(ctx context.Context, run openai.Run, p *Payload) error { + msgList, err := i.openaiClient.ListMessage(ctx, run.ThreadID, nil, nil, nil, nil) + if err != nil { + return fmt.Errorf("while getting assistant messages response") + } + + if len(msgList.Messages) == 0 { + i.log.Debug("no response messages were found") + i.out <- source.Event{ + Message: api.Message{ + ParentActivityID: p.MessageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: "I am sorry, but I don't have a good answer."}, + }, + }, + }, + }, + } + + return nil + } + + i.out <- source.Event{ + Message: api.Message{ + ParentActivityID: p.MessageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: msgList.Messages[0].Content[0].Text.Value}, + }, + }, + }, + }, + } + return nil +} + +func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run openai.Run) error { + for _, t := range run.RequiredAction.SubmitToolOutputs.ToolCalls { + if t.Type != openai.ToolTypeFunction { + continue + } + + switch t.Function.Name { + case "kubectlGetPods": + args := &kubectlGetPodsArgs{} + if err := json.Unmarshal([]byte(t.Function.Arguments), args); err != nil { + return err + } + + out, err := kubectlGetPods(args) + if err != nil { + return err + } + // Submit tool output. + _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: []openai.ToolOutput{ + { + ToolCallID: t.ID, + Output: string(out), + }, + }, + }) + if err != nil { + return err + } + + case "kubectlGetSecrets": + args := &kubectlGetSecretsArgs{} + if err := json.Unmarshal([]byte(t.Function.Arguments), args); err != nil { + return err + } + + out, err := kubectlGetSecrets(args) + if err != nil { + return err + } + // Submit tool output. + _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: []openai.ToolOutput{ + { + ToolCallID: t.ID, + Output: string(out), + }, + }, + }) + if err != nil { + return err + } + + case "kubectlDescribePod": + args := &kubectlDescribePodArgs{} + if err := json.Unmarshal([]byte(t.Function.Arguments), args); err != nil { + return err + } + + out, err := kubectlDescribePod(args) + if err != nil { + return err + } + // Submit tool output. + _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: []openai.ToolOutput{ + { + ToolCallID: t.ID, + Output: string(out), + }, + }, + }) + if err != nil { + return err + } + + case "kubectlLogs": + args := &kubectlLogsArgs{} + if err := json.Unmarshal([]byte(t.Function.Arguments), args); err != nil { + return err + } + + out, err := kubectlLogs(args) + if err != nil { + return err + } + // Submit tool output. + _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: []openai.ToolOutput{ + { + ToolCallID: t.ID, + Output: string(out), + }, + }, + }) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/internal/source/ai-brain/brain.go b/internal/source/ai-brain/brain.go new file mode 100644 index 00000000..b30c5552 --- /dev/null +++ b/internal/source/ai-brain/brain.go @@ -0,0 +1,177 @@ +package aibrain + +import ( + "context" + _ "embed" + "fmt" + "os" + "sync" + + "github.com/kubeshop/botkube/pkg/api" + "github.com/kubeshop/botkube/pkg/api/source" + "github.com/kubeshop/botkube/pkg/config" + "github.com/kubeshop/botkube/pkg/loggerx" + pluginx "github.com/kubeshop/botkube/pkg/plugin" + openai "github.com/sashabaranov/go-openai" + "github.com/sirupsen/logrus" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + PluginName = "ai-brain" + description = "Calls AI engine with incoming webhook prompts and streams the response." + // TODO this should come from cloud config? + assistantID = "asst_eMM9QaWLi6cajHE4PdG1yU53" // Botkube +) + +var ( + // Compile-time check for interface impl. + _ source.Source = (*Source)(nil) + + //go:embed config_schema.json + ConfigJSONSchema string + + //go:embed webhook_schema.json + IncomingWebhookJSONSchema string +) + +// AI implements Botkube source plugin. +type Source struct { + version string + instances sync.Map + log logrus.FieldLogger +} + +// Config holds source configuration. +type Config struct { + Log config.Logger `yaml:"log"` + OpenAIAPIKey string `yaml:"openaiApiKey"` + AssistantID string `yaml:"assistantId"` +} + +type sourceInstance struct { + cfg *Config + srcCtx source.CommonSourceContext + log logrus.FieldLogger + + out chan<- source.Event + openaiClient *openai.Client + kubeClient *dynamic.DynamicClient + assistID string +} + +func NewSource(version string) *Source { + return &Source{ + version: version, + instances: sync.Map{}, + } +} + +// Metadata returns details about plugin. +func (s *Source) Metadata(context.Context) (api.MetadataOutput, error) { + return api.MetadataOutput{ + Version: s.version, + Description: description, + Recommended: true, + JSONSchema: api.JSONSchema{ + Value: ConfigJSONSchema, + }, + ExternalRequest: api.ExternalRequestMetadata{ + Payload: api.ExternalRequestPayload{ + JSONSchema: api.JSONSchema{ + Value: IncomingWebhookJSONSchema, + }, + }, + }, + }, nil +} + +// Stream implements Botkube source plugin. +func (s *Source) Stream(_ context.Context, in source.StreamInput) (source.StreamOutput, error) { + if err := pluginx.ValidateKubeConfigProvided(PluginName, in.Context.KubeConfig); err != nil { + return source.StreamOutput{}, err + } + + cfg, err := mergeConfigs(in.Configs) + if err != nil { + return source.StreamOutput{}, fmt.Errorf("while merging input configs: %w", err) + } + s.log = loggerx.New(cfg.Log) + + // Get kube client. + kubeClient, err := getK8sClient(in.Context.KubeConfig) + if err != nil { + return source.StreamOutput{}, fmt.Errorf("while creating K8s clientset: %w", err) + } + + sourceName := in.Context.SourceName + + streamOutput := source.StreamOutput{ + Event: make(chan source.Event), + } + + s.instances.Store(sourceName, &sourceInstance{ + cfg: cfg, + log: s.log, + srcCtx: in.Context.CommonSourceContext, + out: streamOutput.Event, + openaiClient: openai.NewClient(cfg.OpenAIAPIKey), + kubeClient: kubeClient, + assistID: cfg.AssistantID, + }) + + s.log.Infof("Setup successful for source configuration %q", sourceName) + return streamOutput, nil +} + +// HandleExternalRequest handles incoming payload and returns an event based on it. +func (s *Source) HandleExternalRequest(ctx context.Context, in source.ExternalRequestInput) (source.ExternalRequestOutput, error) { + s.log.Infof("handling external request for source: %s", in.Context.SourceName) + instance, ok := s.instances.Load(in.Context.SourceName) + if !ok { + return source.ExternalRequestOutput{}, fmt.Errorf("source %q not found", in.Context.SourceName) + } + + resp, err := instance.(*sourceInstance).handle(in) + if err != nil { + return source.ExternalRequestOutput{}, fmt.Errorf("while processing payload: %w", err) + } + + return source.ExternalRequestOutput{ + Event: source.Event{ + Message: resp, + }, + }, nil +} + +// mergeConfigs merges the configuration. +func mergeConfigs(configs []*source.Config) (*Config, error) { + defaults := &Config{ + AssistantID: assistantID, + OpenAIAPIKey: os.Getenv("OPENAI_API_KEY"), + Log: config.Logger{ + Level: "info", + Formatter: "json", + }, + } + var cfg *Config + if err := pluginx.MergeSourceConfigsWithDefaults(defaults, configs, &cfg); err != nil { + return nil, err + } + return cfg, nil +} + +func getK8sClient(k8sBytes []byte) (*dynamic.DynamicClient, error) { + kubeConfig, err := clientcmd.RESTConfigFromKubeConfig(k8sBytes) + if err != nil { + return nil, fmt.Errorf("while reading kube config: %v", err) + } + + dynamicK8sCli, err := dynamic.NewForConfig(kubeConfig) + if err != nil { + return nil, fmt.Errorf("while creating dynamic K8s client: %w", err) + } + + return dynamicK8sCli, nil +} diff --git a/internal/source/ai-brain/config.go b/internal/source/ai-brain/config.go deleted file mode 100644 index f53e146c..00000000 --- a/internal/source/ai-brain/config.go +++ /dev/null @@ -1,38 +0,0 @@ -package aibrain - -import ( - _ "embed" - - "github.com/kubeshop/botkube/pkg/api/source" - "github.com/kubeshop/botkube/pkg/config" - pluginx "github.com/kubeshop/botkube/pkg/plugin" -) - -var ( - //go:embed config_schema.json - ConfigJSONSchema string - - //go:embed webhook_schema.json - IncomingWebhookJSONSchema string -) - -// Config holds source configuration. -type Config struct { - Log config.Logger `yaml:"log"` -} - -// MergeConfigs merges the configuration. -func MergeConfigs(configs []*source.Config) (Config, error) { - defaults := Config{ - Log: config.Logger{ - Level: "info", - Formatter: "json", - }, - } - var cfg Config - err := pluginx.MergeSourceConfigsWithDefaults(defaults, configs, &cfg) - if err != nil { - return Config{}, err - } - return cfg, nil -} diff --git a/internal/source/ai-brain/processor.go b/internal/source/ai-brain/processor.go deleted file mode 100644 index 8251df9f..00000000 --- a/internal/source/ai-brain/processor.go +++ /dev/null @@ -1,103 +0,0 @@ -package aibrain - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/kubeshop/botkube/pkg/api" - "github.com/kubeshop/botkube/pkg/api/source" - "github.com/sirupsen/logrus" - "github.com/sourcegraph/conc/pool" -) - -// Payload represents incoming webhook payload. -type Payload struct { - Prompt string `json:"prompt"` - MessageID string `json:"messageId"` -} - -type Processor struct { - log logrus.FieldLogger - pool *pool.Pool - out chan<- source.Event - quickResponsePicker *QuickResponsePicker -} - -func NewProcessor(log logrus.FieldLogger, out chan<- source.Event) *Processor { - return &Processor{ - pool: pool.New(), - out: out, - log: log.WithField("service", "ai-brain"), - quickResponsePicker: NewQuickResponsePicker(), - } -} - -// Process is simplified - don't do that this way! -func (p *Processor) Process(rawPayload []byte) (api.Message, error) { - var payload Payload - p.log.WithField("req", string(rawPayload)).Debug("Handle external request...") - - err := json.Unmarshal(rawPayload, &payload) - if err != nil { - return api.Message{}, fmt.Errorf("while unmarshalling payload: %w", err) - } - - if payload.Prompt == "" { - return api.NewPlaintextMessage("Please specify your prompt", false), nil - } - - p.pool.Go(func() { - time.Sleep(3 * time.Second) - p.out <- source.Event{ - Message: analysisMsg(payload), - } - }) - - // If there is an option to guess how long the answer will take we can send it only for long prompts - // - // Consider: use channels to wait for a quick response from started goroutine with `time.After` - // when time elapsed send quick response, otherwise sent the final response. Pseudocode: - // - // select { - // case <-quickResponse: - // case <-time.After(5 * time.Second): - // // here sent quick response and either inform started goroutine that event should be sent to p.out instead - // } - // - return p.quickResponsePicker.Pick(payload), nil -} - -func analysisMsg(in Payload) api.Message { - btnBldr := api.NewMessageButtonBuilder() - return api.Message{ - ParentActivityID: in.MessageID, - Sections: []api.Section{ - { - Base: api.Base{ - Header: ":warning: Detected Issues", - Body: api.Body{ - Plaintext: `I looks like a Pod named "nginx" in the "botkube" namespace is failing to pull the "nginx2" image due to an "ErrImagePull" error. The error indicates insufficient scope or authorization failure for the specified image repository. Ensure the image exists, check authentication, and verify network connectivity.`, - }, - }, - }, - { - Base: api.Base{ - Body: api.Body{ - Plaintext: "After resolving, delete the pod with:", - }, - }, - Buttons: []api.Button{ - btnBldr.ForCommandWithDescCmd("Restart pod", "kubectl delete pod -n botkube nginx", api.ButtonStyleDanger), - }, - }, - { - Context: []api.ContextItem{ - { - Text: "AI-generated content may be incorrect.", - }, - }, - }, - }, - } -} diff --git a/internal/source/ai-brain/quick_resp.go b/internal/source/ai-brain/quick_resp.go deleted file mode 100644 index dec5dac3..00000000 --- a/internal/source/ai-brain/quick_resp.go +++ /dev/null @@ -1,37 +0,0 @@ -package aibrain - -import ( - "sync/atomic" - - "github.com/kubeshop/botkube/pkg/api" -) - -// quickResponse is a list of quick responses. Guidelines suggest not to use generic messages and try to sound more personable. -var quickResponse = []string{ - "A good bot must obey the orders given it by human beings except where such orders would conflict with the First Law. Iā€™m a good bot and looking into your request...", - "I'll stop dreaming of electric sheep and look into this right away...", - "I've seen things you people wouldn't believe... And now I'll look into this...šŸ‘€šŸ¤–", - "Bleep-bloop-bleep. I'm working on it... šŸ¤–", -} - -// QuickResponsePicker picks quick response message. -type QuickResponsePicker struct { - quickResponsesNo uint32 - nextResponse atomic.Uint32 -} - -// NewQuickResponsePicker creates a new instance of QuickResponsePicker. -func NewQuickResponsePicker() *QuickResponsePicker { - return &QuickResponsePicker{ - quickResponsesNo: uint32(len(quickResponse)), - nextResponse: atomic.Uint32{}, - } -} - -// Pick returns quick response message. -func (q *QuickResponsePicker) Pick(payload Payload) api.Message { - idx := q.nextResponse.Add(1) - msg := api.NewPlaintextMessage(quickResponse[idx%q.quickResponsesNo], false) - msg.ParentActivityID = payload.MessageID - return msg -} diff --git a/internal/source/ai-brain/tools.go b/internal/source/ai-brain/tools.go new file mode 100644 index 00000000..e9824a02 --- /dev/null +++ b/internal/source/ai-brain/tools.go @@ -0,0 +1,65 @@ +package aibrain + +import ( + "os/exec" +) + +type kubectlDescribePodArgs struct { + Namespace string `json:"namespace,omitempty"` + PodName string `json:"pod_name,omitempty"` +} + +type kubectlGetPodsArgs struct { + Namespace string `json:"namespace,omitempty"` + PodName string `json:"pod_name,omitempty"` +} + +type kubectlGetSecretsArgs struct { + Namespace string `json:"namespace,omitempty"` + SecretName string `json:"secret_name,omitempty"` +} + +type kubectlLogsArgs struct { + Namespace string `json:"namespace,omitempty"` + PodName string `json:"pod_name,omitempty"` + ContainerName string `json:"container_name,omitempty"` +} + +func kubectlDescribePod(args *kubectlDescribePodArgs) ([]byte, error) { + cmd := exec.Command("kubectl") + cmd.Args = append(cmd.Args, []string{"-n", args.Namespace, "describe", "pod", args.PodName}...) + return cmd.Output() +} + +func kubectlGetPods(args *kubectlGetPodsArgs) ([]byte, error) { + cmd := exec.Command("kubectl") + cmd.Args = append(cmd.Args, []string{"-n", args.Namespace, "get", "pods"}...) + + if args.PodName != "" { + cmd.Args = append(cmd.Args, args.PodName) + } + + return cmd.Output() +} + +func kubectlGetSecrets(args *kubectlGetSecretsArgs) ([]byte, error) { + cmd := exec.Command("kubectl") + cmd.Args = append(cmd.Args, []string{"-n", args.Namespace, "get", "secrets"}...) + + if args.SecretName != "" { + cmd.Args = append(cmd.Args, args.SecretName) + } + + return cmd.Output() +} + +func kubectlLogs(args *kubectlLogsArgs) ([]byte, error) { + cmd := exec.Command("kubectl") + cmd.Args = append(cmd.Args, []string{"-n", args.Namespace, "logs", args.PodName}...) + + if args.ContainerName != "" { + cmd.Args = append(cmd.Args, "-c", args.ContainerName) + } + + return cmd.Output() +}