From b4533e9d1499b6758261c1fb4aa0446763ae0575 Mon Sep 17 00:00:00 2001 From: Vaidas Jablonskis Date: Thu, 22 Feb 2024 08:41:15 +0200 Subject: [PATCH 1/4] go: update/tidy deps --- go.mod | 3 +-- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 83eec8b4..fadb2505 100644 --- a/go.mod +++ b/go.mod @@ -28,9 +28,9 @@ require ( github.com/muesli/reflow v0.3.0 github.com/olekukonko/tablewriter v0.0.5 github.com/prometheus/client_golang v1.16.0 + github.com/sashabaranov/go-openai v1.19.4 github.com/sirupsen/logrus v1.9.3 github.com/slack-go/slack v0.12.2 - github.com/sourcegraph/conc v0.3.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 @@ -159,7 +159,6 @@ require ( go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/trace v1.21.0 // indirect go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.19.0 // indirect diff --git a/go.sum b/go.sum index cc4dc75b..e712cbee 100644 --- a/go.sum +++ b/go.sum @@ -815,6 +815,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/sanity-io/litter v1.5.5 h1:iE+sBxPBzoK6uaEP5Lt3fHNgpKcHXc/A2HGETy0uJQo= github.com/sanity-io/litter v1.5.5/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF7bU2UI5U= +github.com/sashabaranov/go-openai v1.19.4 h1:GbaDiqvgYCabyqzuIbcEeT6/ZX1nVfur+++oTBfOgks= +github.com/sashabaranov/go-openai v1.19.4/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg= github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U= github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= @@ -830,8 +832,6 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/slack-go/slack v0.12.2 h1:x3OppyMyGIbbiyFhsBmpf9pwkUzMhthJMRNmNlA4LaQ= github.com/slack-go/slack v0.12.2/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= -github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= -github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= From f072696bb19a1c95934f708c221a8b8d00ab754e Mon Sep 17 00:00:00 2001 From: Vaidas Jablonskis Date: Thu, 22 Feb 2024 08:41:36 +0200 Subject: [PATCH 2/4] hack: add a bit of code to create/update assistant itself --- hack/openai/main.go | 39 ++++++++++++++++++ hack/openai/tools.go | 97 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 hack/openai/main.go create mode 100644 hack/openai/tools.go diff --git a/hack/openai/main.go b/hack/openai/main.go new file mode 100644 index 00000000..09c0daad --- /dev/null +++ b/hack/openai/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "log" + "os" + "time" + + openai "github.com/sashabaranov/go-openai" +) + +const ( + assistantID = "asst_eMM9QaWLi6cajHE4PdG1yU53" // Botkube +) + +func main() { + client := openai.NewClient(os.Getenv("OPENAI_API_KEY")) + + // Update assistant with latest tools. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err := updateAssistant(ctx, client, assistantID) + if err != nil { + log.Fatal(err) + } +} + +func updateAssistant(ctx context.Context, c *openai.Client, id string) (openai.Assistant, error) { + instructions := `You are an experienced DevOps engineer. + You have deep understand how to operate a kubernetes cluster and troubleshoot running workloads in kubernetes. + You have access to tools which can help you to find needed information.` + + return c.ModifyAssistant(ctx, id, openai.AssistantRequest{ + Model: openai.GPT4TurboPreview, + Instructions: &instructions, + Tools: openAITools, + }) +} diff --git a/hack/openai/tools.go b/hack/openai/tools.go new file mode 100644 index 00000000..3321cd46 --- /dev/null +++ b/hack/openai/tools.go @@ -0,0 +1,97 @@ +package main + +import ( + "github.com/sashabaranov/go-openai" + "github.com/sashabaranov/go-openai/jsonschema" +) + +var openAITools = []openai.AssistantTool{ + { + Type: openai.AssistantToolType(openai.ToolTypeFunction), + Function: &openai.FunctionDefinition{ + Name: "kubectlGetPods", + Description: "Useful for getting one pod or all pods in a kubernetes namespace.", + Parameters: jsonschema.Definition{ + Type: jsonschema.Object, + Properties: map[string]jsonschema.Definition{ + "namespace": { + Type: jsonschema.String, + Description: "Kubernetes namespace, e.g. kube-system", + }, + "pod_name": { + Type: jsonschema.String, + Description: "Kubernetes pod name, e.g. botkube-6c6fd8b4d6-f559q", + }, + }, + Required: []string{"namespace"}, + }, + }, + }, + { + Type: openai.AssistantToolType(openai.ToolTypeFunction), + Function: &openai.FunctionDefinition{ + Name: "kubectlGetSecrets", + Description: "Useful for getting one secret or all secrets in a kubernetes namespace.", + Parameters: jsonschema.Definition{ + Type: jsonschema.Object, + Properties: map[string]jsonschema.Definition{ + "namespace": { + Type: jsonschema.String, + Description: "Kubernetes namespace, e.g. kube-system", + }, + "secret_name": { + Type: jsonschema.String, + Description: "Kubernetes secret name, e.g. api-key", + }, + }, + Required: []string{"namespace"}, + }, + }, + }, + { + Type: openai.AssistantToolType(openai.ToolTypeFunction), + Function: &openai.FunctionDefinition{ + Name: "kubectlDescribePod", + Description: "Useful for describing a pod in a kubernetes namespace.", + Parameters: jsonschema.Definition{ + Type: jsonschema.Object, + Properties: map[string]jsonschema.Definition{ + "namespace": { + Type: jsonschema.String, + Description: "Kubernetes namespace, e.g. kube-system", + }, + "pod_name": { + Type: jsonschema.String, + Description: "Kubernetes pod name, e.g. botkube-6c6fd8b4d6-f559q", + }, + }, + Required: []string{"namespace", "pod_name"}, + }, + }, + }, + { + Type: openai.AssistantToolType(openai.ToolTypeFunction), + Function: &openai.FunctionDefinition{ + Name: "kubectlLogs", + Description: "Useful for getting container logs.", + Parameters: jsonschema.Definition{ + Type: jsonschema.Object, + Properties: map[string]jsonschema.Definition{ + "namespace": { + Type: jsonschema.String, + Description: "Kubernetes namespace, e.g. kube-system", + }, + "pod_name": { + Type: jsonschema.String, + Description: "Kubernetes pod name, e.g. botkube-6c6fd8b4d6-f559q", + }, + "container_name": { + Type: jsonschema.String, + Description: "Pod container name, e.g. botkube-api-server.", + }, + }, + Required: []string{"namespace", "pod_name"}, + }, + }, + }, +} From 911285eafea89e586ee809755a6ff7bfc8460b36 Mon Sep 17 00:00:00 2001 From: Vaidas Jablonskis Date: Thu, 22 Feb 2024 08:42:06 +0200 Subject: [PATCH 3/4] source: iteration of the ai-brain mainly --- cmd/source/ai-brain/main.go | 81 +------- internal/source/ai-brain/assistant.go | 262 +++++++++++++++++++++++++ internal/source/ai-brain/brain.go | 177 +++++++++++++++++ internal/source/ai-brain/config.go | 38 ---- internal/source/ai-brain/processor.go | 103 ---------- internal/source/ai-brain/quick_resp.go | 37 ---- internal/source/ai-brain/tools.go | 65 ++++++ 7 files changed, 507 insertions(+), 256 deletions(-) create mode 100644 internal/source/ai-brain/assistant.go create mode 100644 internal/source/ai-brain/brain.go delete mode 100644 internal/source/ai-brain/config.go delete mode 100644 internal/source/ai-brain/processor.go delete mode 100644 internal/source/ai-brain/quick_resp.go create mode 100644 internal/source/ai-brain/tools.go diff --git a/cmd/source/ai-brain/main.go b/cmd/source/ai-brain/main.go index 4feceecb..84c8fc5e 100644 --- a/cmd/source/ai-brain/main.go +++ b/cmd/source/ai-brain/main.go @@ -1,98 +1,23 @@ package main import ( - "context" _ "embed" - "fmt" - "sync" "github.com/kubeshop/botkube-cloud-plugins/internal/auth" "github.com/hashicorp/go-plugin" + aibrain "github.com/kubeshop/botkube-cloud-plugins/internal/source/ai-brain" - "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/api/source" - "github.com/kubeshop/botkube/pkg/loggerx" - "github.com/sirupsen/logrus" ) // version is set via ldflags by GoReleaser. var version = "dev" -const ( - pluginName = "ai-brain" - description = "Calls AI engine with incoming webhook prompts and streams the response." -) - -// AI implements Botkube source plugin. -type AI struct { - incomingPrompts sync.Map -} - -// Metadata returns details about plugin. -func (*AI) Metadata(context.Context) (api.MetadataOutput, error) { - return api.MetadataOutput{ - Version: version, - Description: description, - Recommended: true, - JSONSchema: api.JSONSchema{ - Value: aibrain.ConfigJSONSchema, - }, - ExternalRequest: api.ExternalRequestMetadata{ - Payload: api.ExternalRequestPayload{ - JSONSchema: api.JSONSchema{ - Value: aibrain.IncomingWebhookJSONSchema, - }, - }, - }, - }, nil -} - -// Stream implements Botkube source plugin. -func (a *AI) Stream(_ context.Context, in source.StreamInput) (source.StreamOutput, error) { - cfg, err := aibrain.MergeConfigs(in.Configs) - if err != nil { - return source.StreamOutput{}, fmt.Errorf("while merging configuration: %w", err) - } - - log := loggerx.New(cfg.Log) - out := source.StreamOutput{ - Event: make(chan source.Event), - } - go a.processPrompts(in.Context.SourceName, out.Event, log) - - log.Infof("Setup successful for source configuration %q", in.Context.SourceName) - return out, nil -} - -func (a *AI) processPrompts(sourceName string, event chan<- source.Event, log logrus.FieldLogger) { - a.incomingPrompts.Store(sourceName, aibrain.NewProcessor(log, event)) -} - -// HandleExternalRequest handles incoming payload and returns an event based on it. -func (a *AI) HandleExternalRequest(_ context.Context, in source.ExternalRequestInput) (source.ExternalRequestOutput, error) { - brain, ok := a.incomingPrompts.Load(in.Context.SourceName) - if !ok { - return source.ExternalRequestOutput{}, fmt.Errorf("source %q not found", in.Context.SourceName) - } - quickResponse, err := brain.(*aibrain.Processor).Process(in.Payload) - if err != nil { - return source.ExternalRequestOutput{}, fmt.Errorf("while processing payload: %w", err) - } - - return source.ExternalRequestOutput{ - Event: source.Event{ - Message: quickResponse, - }, - }, nil -} - func main() { source.Serve(map[string]plugin.Plugin{ - pluginName: &source.Plugin{ - Source: auth.NewProtectedSource(&AI{ - incomingPrompts: sync.Map{}, - }), + aibrain.PluginName: &source.Plugin{ + Source: auth.NewProtectedSource(aibrain.NewSource(version)), }, }) } diff --git a/internal/source/ai-brain/assistant.go b/internal/source/ai-brain/assistant.go new file mode 100644 index 00000000..e01d1704 --- /dev/null +++ b/internal/source/ai-brain/assistant.go @@ -0,0 +1,262 @@ +package aibrain + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/kubeshop/botkube/pkg/api" + "github.com/kubeshop/botkube/pkg/api/source" + "github.com/sashabaranov/go-openai" + "github.com/sirupsen/logrus" +) + +// Payload represents incoming webhook payload. +type Payload struct { + Prompt string `json:"prompt"` + MessageID string `json:"messageId"` +} + +// handle is simplified - don't do that this way! +func (i *sourceInstance) handle(in source.ExternalRequestInput) (api.Message, error) { + p := new(Payload) + err := json.Unmarshal(in.Payload, p) + if err != nil { + return api.Message{}, fmt.Errorf("while unmarshalling payload: %w", err) + } + + // TODO: why is the Prompt prefixed with `ai-face`? + if p.Prompt == "ai-face" { + return api.NewPlaintextMessage("Please clarify your question.", false), nil + } + + // Cleanup the prompt. + p.Prompt = strings.TrimPrefix(p.Prompt, "ai-face") + + // TODO: needs better goroutine management with persistent thread mapping. + go func() { + _ = i.handleThread(context.Background(), p) + }() + + return api.Message{ + ParentActivityID: p.MessageID, + Sections: []api.Section{ + { + // TODO: remove? + Base: api.Base{ + Body: api.Body{Plaintext: "Let me figure this out.."}, + }, + }, + }, + }, nil +} + +// handleThread creates a new OpenAI assistant thread and handles the conversation. +func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error { + // Start a new thread and run it. + run, err := i.openaiClient.CreateThreadAndRun(ctx, openai.CreateThreadAndRunRequest{ + RunRequest: openai.RunRequest{ + AssistantID: i.assistID, + }, + Thread: openai.ThreadRequest{ + Metadata: map[string]any{ + "messageId": p.MessageID, + }, + Messages: []openai.ThreadMessage{ + { + Role: openai.ThreadMessageRoleUser, + Content: p.Prompt, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("while creating thread and run: %w", err) + } + + for { + // Wait a little bit before polling. OpenAI assistant api does not support streaming yet. + time.Sleep(2 * time.Second) + + // Get the run. + run, err = i.openaiClient.RetrieveRun(ctx, run.ThreadID, run.ID) + if err != nil { + i.log.WithError(err).Error("while retrieving assistant thread run") + continue + } + + i.log.WithFields(logrus.Fields{ + "messageId": p.MessageID, + "runStatus": run.Status, + }).Debug("retrieved assistant thread run") + + switch run.Status { + case openai.RunStatusCancelling, openai.RunStatusFailed, openai.RunStatusExpired: + // TODO tell the user that the assistant has stopped processing the request. + continue + + // We have to wait. Here we could tell the user that we are waiting. + case openai.RunStatusQueued, openai.RunStatusInProgress: + continue + + // Fetch and return the response. + case openai.RunStatusCompleted: + if err = i.handleStatusCompleted(ctx, run, p); err != nil { + i.log.WithError(err).Error("while handling completed case") + continue + } + return nil + + // The assistant is attempting to call a function. + case openai.RunStatusRequiresAction: + if err = i.handleStatusRequiresAction(ctx, run); err != nil { + return fmt.Errorf("while handling requires action: %w", err) + } + } + } +} + +func (i *sourceInstance) handleStatusCompleted(ctx context.Context, run openai.Run, p *Payload) error { + msgList, err := i.openaiClient.ListMessage(ctx, run.ThreadID, nil, nil, nil, nil) + if err != nil { + return fmt.Errorf("while getting assistant messages response") + } + + if len(msgList.Messages) == 0 { + i.log.Debug("no response messages were found") + i.out <- source.Event{ + Message: api.Message{ + ParentActivityID: p.MessageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: "I am sorry, but I don't have a good answer."}, + }, + }, + }, + }, + } + + return nil + } + + i.out <- source.Event{ + Message: api.Message{ + ParentActivityID: p.MessageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: msgList.Messages[0].Content[0].Text.Value}, + }, + }, + }, + }, + } + return nil +} + +func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run openai.Run) error { + for _, t := range run.RequiredAction.SubmitToolOutputs.ToolCalls { + if t.Type != openai.ToolTypeFunction { + continue + } + + switch t.Function.Name { + case "kubectlGetPods": + args := &kubectlGetPodsArgs{} + if err := json.Unmarshal([]byte(t.Function.Arguments), args); err != nil { + return err + } + + out, err := kubectlGetPods(args) + if err != nil { + return err + } + // Submit tool output. + _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: []openai.ToolOutput{ + { + ToolCallID: t.ID, + Output: string(out), + }, + }, + }) + if err != nil { + return err + } + + case "kubectlGetSecrets": + args := &kubectlGetSecretsArgs{} + if err := json.Unmarshal([]byte(t.Function.Arguments), args); err != nil { + return err + } + + out, err := kubectlGetSecrets(args) + if err != nil { + return err + } + // Submit tool output. + _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: []openai.ToolOutput{ + { + ToolCallID: t.ID, + Output: string(out), + }, + }, + }) + if err != nil { + return err + } + + case "kubectlDescribePod": + args := &kubectlDescribePodArgs{} + if err := json.Unmarshal([]byte(t.Function.Arguments), args); err != nil { + return err + } + + out, err := kubectlDescribePod(args) + if err != nil { + return err + } + // Submit tool output. + _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: []openai.ToolOutput{ + { + ToolCallID: t.ID, + Output: string(out), + }, + }, + }) + if err != nil { + return err + } + + case "kubectlLogs": + args := &kubectlLogsArgs{} + if err := json.Unmarshal([]byte(t.Function.Arguments), args); err != nil { + return err + } + + out, err := kubectlLogs(args) + if err != nil { + return err + } + // Submit tool output. + _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: []openai.ToolOutput{ + { + ToolCallID: t.ID, + Output: string(out), + }, + }, + }) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/internal/source/ai-brain/brain.go b/internal/source/ai-brain/brain.go new file mode 100644 index 00000000..b30c5552 --- /dev/null +++ b/internal/source/ai-brain/brain.go @@ -0,0 +1,177 @@ +package aibrain + +import ( + "context" + _ "embed" + "fmt" + "os" + "sync" + + "github.com/kubeshop/botkube/pkg/api" + "github.com/kubeshop/botkube/pkg/api/source" + "github.com/kubeshop/botkube/pkg/config" + "github.com/kubeshop/botkube/pkg/loggerx" + pluginx "github.com/kubeshop/botkube/pkg/plugin" + openai "github.com/sashabaranov/go-openai" + "github.com/sirupsen/logrus" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + PluginName = "ai-brain" + description = "Calls AI engine with incoming webhook prompts and streams the response." + // TODO this should come from cloud config? + assistantID = "asst_eMM9QaWLi6cajHE4PdG1yU53" // Botkube +) + +var ( + // Compile-time check for interface impl. + _ source.Source = (*Source)(nil) + + //go:embed config_schema.json + ConfigJSONSchema string + + //go:embed webhook_schema.json + IncomingWebhookJSONSchema string +) + +// AI implements Botkube source plugin. +type Source struct { + version string + instances sync.Map + log logrus.FieldLogger +} + +// Config holds source configuration. +type Config struct { + Log config.Logger `yaml:"log"` + OpenAIAPIKey string `yaml:"openaiApiKey"` + AssistantID string `yaml:"assistantId"` +} + +type sourceInstance struct { + cfg *Config + srcCtx source.CommonSourceContext + log logrus.FieldLogger + + out chan<- source.Event + openaiClient *openai.Client + kubeClient *dynamic.DynamicClient + assistID string +} + +func NewSource(version string) *Source { + return &Source{ + version: version, + instances: sync.Map{}, + } +} + +// Metadata returns details about plugin. +func (s *Source) Metadata(context.Context) (api.MetadataOutput, error) { + return api.MetadataOutput{ + Version: s.version, + Description: description, + Recommended: true, + JSONSchema: api.JSONSchema{ + Value: ConfigJSONSchema, + }, + ExternalRequest: api.ExternalRequestMetadata{ + Payload: api.ExternalRequestPayload{ + JSONSchema: api.JSONSchema{ + Value: IncomingWebhookJSONSchema, + }, + }, + }, + }, nil +} + +// Stream implements Botkube source plugin. +func (s *Source) Stream(_ context.Context, in source.StreamInput) (source.StreamOutput, error) { + if err := pluginx.ValidateKubeConfigProvided(PluginName, in.Context.KubeConfig); err != nil { + return source.StreamOutput{}, err + } + + cfg, err := mergeConfigs(in.Configs) + if err != nil { + return source.StreamOutput{}, fmt.Errorf("while merging input configs: %w", err) + } + s.log = loggerx.New(cfg.Log) + + // Get kube client. + kubeClient, err := getK8sClient(in.Context.KubeConfig) + if err != nil { + return source.StreamOutput{}, fmt.Errorf("while creating K8s clientset: %w", err) + } + + sourceName := in.Context.SourceName + + streamOutput := source.StreamOutput{ + Event: make(chan source.Event), + } + + s.instances.Store(sourceName, &sourceInstance{ + cfg: cfg, + log: s.log, + srcCtx: in.Context.CommonSourceContext, + out: streamOutput.Event, + openaiClient: openai.NewClient(cfg.OpenAIAPIKey), + kubeClient: kubeClient, + assistID: cfg.AssistantID, + }) + + s.log.Infof("Setup successful for source configuration %q", sourceName) + return streamOutput, nil +} + +// HandleExternalRequest handles incoming payload and returns an event based on it. +func (s *Source) HandleExternalRequest(ctx context.Context, in source.ExternalRequestInput) (source.ExternalRequestOutput, error) { + s.log.Infof("handling external request for source: %s", in.Context.SourceName) + instance, ok := s.instances.Load(in.Context.SourceName) + if !ok { + return source.ExternalRequestOutput{}, fmt.Errorf("source %q not found", in.Context.SourceName) + } + + resp, err := instance.(*sourceInstance).handle(in) + if err != nil { + return source.ExternalRequestOutput{}, fmt.Errorf("while processing payload: %w", err) + } + + return source.ExternalRequestOutput{ + Event: source.Event{ + Message: resp, + }, + }, nil +} + +// mergeConfigs merges the configuration. +func mergeConfigs(configs []*source.Config) (*Config, error) { + defaults := &Config{ + AssistantID: assistantID, + OpenAIAPIKey: os.Getenv("OPENAI_API_KEY"), + Log: config.Logger{ + Level: "info", + Formatter: "json", + }, + } + var cfg *Config + if err := pluginx.MergeSourceConfigsWithDefaults(defaults, configs, &cfg); err != nil { + return nil, err + } + return cfg, nil +} + +func getK8sClient(k8sBytes []byte) (*dynamic.DynamicClient, error) { + kubeConfig, err := clientcmd.RESTConfigFromKubeConfig(k8sBytes) + if err != nil { + return nil, fmt.Errorf("while reading kube config: %v", err) + } + + dynamicK8sCli, err := dynamic.NewForConfig(kubeConfig) + if err != nil { + return nil, fmt.Errorf("while creating dynamic K8s client: %w", err) + } + + return dynamicK8sCli, nil +} diff --git a/internal/source/ai-brain/config.go b/internal/source/ai-brain/config.go deleted file mode 100644 index f53e146c..00000000 --- a/internal/source/ai-brain/config.go +++ /dev/null @@ -1,38 +0,0 @@ -package aibrain - -import ( - _ "embed" - - "github.com/kubeshop/botkube/pkg/api/source" - "github.com/kubeshop/botkube/pkg/config" - pluginx "github.com/kubeshop/botkube/pkg/plugin" -) - -var ( - //go:embed config_schema.json - ConfigJSONSchema string - - //go:embed webhook_schema.json - IncomingWebhookJSONSchema string -) - -// Config holds source configuration. -type Config struct { - Log config.Logger `yaml:"log"` -} - -// MergeConfigs merges the configuration. -func MergeConfigs(configs []*source.Config) (Config, error) { - defaults := Config{ - Log: config.Logger{ - Level: "info", - Formatter: "json", - }, - } - var cfg Config - err := pluginx.MergeSourceConfigsWithDefaults(defaults, configs, &cfg) - if err != nil { - return Config{}, err - } - return cfg, nil -} diff --git a/internal/source/ai-brain/processor.go b/internal/source/ai-brain/processor.go deleted file mode 100644 index 8251df9f..00000000 --- a/internal/source/ai-brain/processor.go +++ /dev/null @@ -1,103 +0,0 @@ -package aibrain - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/kubeshop/botkube/pkg/api" - "github.com/kubeshop/botkube/pkg/api/source" - "github.com/sirupsen/logrus" - "github.com/sourcegraph/conc/pool" -) - -// Payload represents incoming webhook payload. -type Payload struct { - Prompt string `json:"prompt"` - MessageID string `json:"messageId"` -} - -type Processor struct { - log logrus.FieldLogger - pool *pool.Pool - out chan<- source.Event - quickResponsePicker *QuickResponsePicker -} - -func NewProcessor(log logrus.FieldLogger, out chan<- source.Event) *Processor { - return &Processor{ - pool: pool.New(), - out: out, - log: log.WithField("service", "ai-brain"), - quickResponsePicker: NewQuickResponsePicker(), - } -} - -// Process is simplified - don't do that this way! -func (p *Processor) Process(rawPayload []byte) (api.Message, error) { - var payload Payload - p.log.WithField("req", string(rawPayload)).Debug("Handle external request...") - - err := json.Unmarshal(rawPayload, &payload) - if err != nil { - return api.Message{}, fmt.Errorf("while unmarshalling payload: %w", err) - } - - if payload.Prompt == "" { - return api.NewPlaintextMessage("Please specify your prompt", false), nil - } - - p.pool.Go(func() { - time.Sleep(3 * time.Second) - p.out <- source.Event{ - Message: analysisMsg(payload), - } - }) - - // If there is an option to guess how long the answer will take we can send it only for long prompts - // - // Consider: use channels to wait for a quick response from started goroutine with `time.After` - // when time elapsed send quick response, otherwise sent the final response. Pseudocode: - // - // select { - // case <-quickResponse: - // case <-time.After(5 * time.Second): - // // here sent quick response and either inform started goroutine that event should be sent to p.out instead - // } - // - return p.quickResponsePicker.Pick(payload), nil -} - -func analysisMsg(in Payload) api.Message { - btnBldr := api.NewMessageButtonBuilder() - return api.Message{ - ParentActivityID: in.MessageID, - Sections: []api.Section{ - { - Base: api.Base{ - Header: ":warning: Detected Issues", - Body: api.Body{ - Plaintext: `I looks like a Pod named "nginx" in the "botkube" namespace is failing to pull the "nginx2" image due to an "ErrImagePull" error. The error indicates insufficient scope or authorization failure for the specified image repository. Ensure the image exists, check authentication, and verify network connectivity.`, - }, - }, - }, - { - Base: api.Base{ - Body: api.Body{ - Plaintext: "After resolving, delete the pod with:", - }, - }, - Buttons: []api.Button{ - btnBldr.ForCommandWithDescCmd("Restart pod", "kubectl delete pod -n botkube nginx", api.ButtonStyleDanger), - }, - }, - { - Context: []api.ContextItem{ - { - Text: "AI-generated content may be incorrect.", - }, - }, - }, - }, - } -} diff --git a/internal/source/ai-brain/quick_resp.go b/internal/source/ai-brain/quick_resp.go deleted file mode 100644 index dec5dac3..00000000 --- a/internal/source/ai-brain/quick_resp.go +++ /dev/null @@ -1,37 +0,0 @@ -package aibrain - -import ( - "sync/atomic" - - "github.com/kubeshop/botkube/pkg/api" -) - -// quickResponse is a list of quick responses. Guidelines suggest not to use generic messages and try to sound more personable. -var quickResponse = []string{ - "A good bot must obey the orders given it by human beings except where such orders would conflict with the First Law. Iā€™m a good bot and looking into your request...", - "I'll stop dreaming of electric sheep and look into this right away...", - "I've seen things you people wouldn't believe... And now I'll look into this...šŸ‘€šŸ¤–", - "Bleep-bloop-bleep. I'm working on it... šŸ¤–", -} - -// QuickResponsePicker picks quick response message. -type QuickResponsePicker struct { - quickResponsesNo uint32 - nextResponse atomic.Uint32 -} - -// NewQuickResponsePicker creates a new instance of QuickResponsePicker. -func NewQuickResponsePicker() *QuickResponsePicker { - return &QuickResponsePicker{ - quickResponsesNo: uint32(len(quickResponse)), - nextResponse: atomic.Uint32{}, - } -} - -// Pick returns quick response message. -func (q *QuickResponsePicker) Pick(payload Payload) api.Message { - idx := q.nextResponse.Add(1) - msg := api.NewPlaintextMessage(quickResponse[idx%q.quickResponsesNo], false) - msg.ParentActivityID = payload.MessageID - return msg -} diff --git a/internal/source/ai-brain/tools.go b/internal/source/ai-brain/tools.go new file mode 100644 index 00000000..e9824a02 --- /dev/null +++ b/internal/source/ai-brain/tools.go @@ -0,0 +1,65 @@ +package aibrain + +import ( + "os/exec" +) + +type kubectlDescribePodArgs struct { + Namespace string `json:"namespace,omitempty"` + PodName string `json:"pod_name,omitempty"` +} + +type kubectlGetPodsArgs struct { + Namespace string `json:"namespace,omitempty"` + PodName string `json:"pod_name,omitempty"` +} + +type kubectlGetSecretsArgs struct { + Namespace string `json:"namespace,omitempty"` + SecretName string `json:"secret_name,omitempty"` +} + +type kubectlLogsArgs struct { + Namespace string `json:"namespace,omitempty"` + PodName string `json:"pod_name,omitempty"` + ContainerName string `json:"container_name,omitempty"` +} + +func kubectlDescribePod(args *kubectlDescribePodArgs) ([]byte, error) { + cmd := exec.Command("kubectl") + cmd.Args = append(cmd.Args, []string{"-n", args.Namespace, "describe", "pod", args.PodName}...) + return cmd.Output() +} + +func kubectlGetPods(args *kubectlGetPodsArgs) ([]byte, error) { + cmd := exec.Command("kubectl") + cmd.Args = append(cmd.Args, []string{"-n", args.Namespace, "get", "pods"}...) + + if args.PodName != "" { + cmd.Args = append(cmd.Args, args.PodName) + } + + return cmd.Output() +} + +func kubectlGetSecrets(args *kubectlGetSecretsArgs) ([]byte, error) { + cmd := exec.Command("kubectl") + cmd.Args = append(cmd.Args, []string{"-n", args.Namespace, "get", "secrets"}...) + + if args.SecretName != "" { + cmd.Args = append(cmd.Args, args.SecretName) + } + + return cmd.Output() +} + +func kubectlLogs(args *kubectlLogsArgs) ([]byte, error) { + cmd := exec.Command("kubectl") + cmd.Args = append(cmd.Args, []string{"-n", args.Namespace, "logs", args.PodName}...) + + if args.ContainerName != "" { + cmd.Args = append(cmd.Args, "-c", args.ContainerName) + } + + return cmd.Output() +} From b6af995fdc55f5fa9f38d902cbdb28e39150f960 Mon Sep 17 00:00:00 2001 From: Vaidas Jablonskis Date: Fri, 23 Feb 2024 10:46:18 +0200 Subject: [PATCH 4/4] Address PR review, add quick responses as well --- cmd/executor/ai-face/main.go | 7 +- hack/openai/main.go | 8 +- internal/source/ai-brain/assistant.go | 157 ++++++++++++-------------- internal/source/ai-brain/response.go | 61 ++++++++++ 4 files changed, 143 insertions(+), 90 deletions(-) create mode 100644 internal/source/ai-brain/response.go diff --git a/cmd/executor/ai-face/main.go b/cmd/executor/ai-face/main.go index 28f3056c..f651d6f1 100644 --- a/cmd/executor/ai-face/main.go +++ b/cmd/executor/ai-face/main.go @@ -8,10 +8,11 @@ import ( "fmt" "io" "net/http" - - "github.com/kubeshop/botkube-cloud-plugins/internal/auth" + "strings" "github.com/hashicorp/go-plugin" + + "github.com/kubeshop/botkube-cloud-plugins/internal/auth" aibrain "github.com/kubeshop/botkube-cloud-plugins/internal/source/ai-brain" "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/api/executor" @@ -77,7 +78,7 @@ func (e *AIFace) Execute(_ context.Context, in executor.ExecuteInput) (executor. aiBrainWebhookURL := fmt.Sprintf("%s/%s", in.Context.IncomingWebhook.BaseSourceURL, cfg.AIBrainSourceName) body, err := json.Marshal(aibrain.Payload{ - Prompt: in.Command, + Prompt: strings.TrimPrefix(in.Command, pluginName), MessageID: in.Context.Message.ParentActivityID, }) if err != nil { diff --git a/hack/openai/main.go b/hack/openai/main.go index 09c0daad..40ab2082 100644 --- a/hack/openai/main.go +++ b/hack/openai/main.go @@ -27,9 +27,11 @@ func main() { } func updateAssistant(ctx context.Context, c *openai.Client, id string) (openai.Assistant, error) { - instructions := `You are an experienced DevOps engineer. - You have deep understand how to operate a kubernetes cluster and troubleshoot running workloads in kubernetes. - You have access to tools which can help you to find needed information.` + instructions := `You are an experienced DevOps engineer. You have deep + understand how to operate a kubernetes cluster and troubleshoot running + workloads in kubernetes. You have access to tools which can help you. Keep + your answers short and on the subject. Do not get involved in unrelated + topics.` return c.ModifyAssistant(ctx, id, openai.AssistantRequest{ Model: openai.GPT4TurboPreview, diff --git a/internal/source/ai-brain/assistant.go b/internal/source/ai-brain/assistant.go index e01d1704..11cbf8c9 100644 --- a/internal/source/ai-brain/assistant.go +++ b/internal/source/ai-brain/assistant.go @@ -3,8 +3,8 @@ package aibrain import ( "context" "encoding/json" + "errors" "fmt" - "strings" "time" "github.com/kubeshop/botkube/pkg/api" @@ -13,13 +13,14 @@ import ( "github.com/sirupsen/logrus" ) +const openAIPollInterval = 2 * time.Second + // Payload represents incoming webhook payload. type Payload struct { Prompt string `json:"prompt"` MessageID string `json:"messageId"` } -// handle is simplified - don't do that this way! func (i *sourceInstance) handle(in source.ExternalRequestInput) (api.Message, error) { p := new(Payload) err := json.Unmarshal(in.Payload, p) @@ -27,30 +28,17 @@ func (i *sourceInstance) handle(in source.ExternalRequestInput) (api.Message, er return api.Message{}, fmt.Errorf("while unmarshalling payload: %w", err) } - // TODO: why is the Prompt prefixed with `ai-face`? - if p.Prompt == "ai-face" { + if p.Prompt == "" { return api.NewPlaintextMessage("Please clarify your question.", false), nil } - // Cleanup the prompt. - p.Prompt = strings.TrimPrefix(p.Prompt, "ai-face") - - // TODO: needs better goroutine management with persistent thread mapping. go func() { - _ = i.handleThread(context.Background(), p) + if err := i.handleThread(context.Background(), p); err != nil { + i.log.WithError(err).Error("failed to handle request") + } }() - return api.Message{ - ParentActivityID: p.MessageID, - Sections: []api.Section{ - { - // TODO: remove? - Base: api.Base{ - Body: api.Body{Plaintext: "Let me figure this out.."}, - }, - }, - }, - }, nil + return pickQuickResponse(p.MessageID), nil } // handleThread creates a new OpenAI assistant thread and handles the conversation. @@ -78,13 +66,13 @@ func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error { for { // Wait a little bit before polling. OpenAI assistant api does not support streaming yet. - time.Sleep(2 * time.Second) + time.Sleep(openAIPollInterval) // Get the run. run, err = i.openaiClient.RetrieveRun(ctx, run.ThreadID, run.ID) if err != nil { - i.log.WithError(err).Error("while retrieving assistant thread run") - continue + i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)} + return fmt.Errorf("while retrieving assistant thread run: %w", err) } i.log.WithFields(logrus.Fields{ @@ -94,24 +82,24 @@ func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error { switch run.Status { case openai.RunStatusCancelling, openai.RunStatusFailed, openai.RunStatusExpired: - // TODO tell the user that the assistant has stopped processing the request. - continue + i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)} + return nil - // We have to wait. Here we could tell the user that we are waiting. case openai.RunStatusQueued, openai.RunStatusInProgress: continue // Fetch and return the response. case openai.RunStatusCompleted: if err = i.handleStatusCompleted(ctx, run, p); err != nil { - i.log.WithError(err).Error("while handling completed case") - continue + i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)} + return fmt.Errorf("while handling completed case: %w", err) } return nil // The assistant is attempting to call a function. case openai.RunStatusRequiresAction: if err = i.handleStatusRequiresAction(ctx, run); err != nil { + i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)} return fmt.Errorf("while handling requires action: %w", err) } } @@ -119,13 +107,17 @@ func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error { } func (i *sourceInstance) handleStatusCompleted(ctx context.Context, run openai.Run, p *Payload) error { - msgList, err := i.openaiClient.ListMessage(ctx, run.ThreadID, nil, nil, nil, nil) + limit := 1 + msgList, err := i.openaiClient.ListMessage(ctx, run.ThreadID, &limit, nil, nil, nil) if err != nil { - return fmt.Errorf("while getting assistant messages response") + return fmt.Errorf("while getting assistant messages response: %w", err) } + // We're listing messages in a thread. They are ordered in desc order. If + // there are no messages in the entire thread, we imply that something went + // wrong on the OpenAI side, could be a bug. if len(msgList.Messages) == 0 { - i.log.Debug("no response messages were found") + i.log.Debug("no response messages were found, that seems like an edge case.") i.out <- source.Event{ Message: api.Message{ ParentActivityID: p.MessageID, @@ -138,26 +130,44 @@ func (i *sourceInstance) handleStatusCompleted(ctx context.Context, run openai.R }, }, } - return nil } - i.out <- source.Event{ - Message: api.Message{ - ParentActivityID: p.MessageID, - Sections: []api.Section{ - { - Base: api.Base{ - Body: api.Body{Plaintext: msgList.Messages[0].Content[0].Text.Value}, + // Iterate over text content to build messages. We're only interested in text + // context, since the assistant is instructed to return text only. + for _, c := range msgList.Messages[0].Content { + if c.Text == nil { + continue + } + + i.out <- source.Event{ + Message: api.Message{ + ParentActivityID: p.MessageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: c.Text.Value}, + }, + Context: []api.ContextItem{ + {Text: "AI-generated content may be incorrect."}, + }, }, }, }, - }, + } } + return nil } func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run openai.Run) error { + // That should never happen, unless there is a bug or something is wrong with OpenAI APIs. + if run.RequiredAction == nil || run.RequiredAction.SubmitToolOutputs == nil { + return errors.New("run.RequiredAction or run.RequiredAction.SubmitToolOutputs is nil, that should not happen") + } + + toolOutputs := []openai.ToolOutput{} + for _, t := range run.RequiredAction.SubmitToolOutputs.ToolCalls { if t.Type != openai.ToolTypeFunction { continue @@ -174,18 +184,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope if err != nil { return err } - // Submit tool output. - _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ - ToolOutputs: []openai.ToolOutput{ - { - ToolCallID: t.ID, - Output: string(out), - }, - }, + + toolOutputs = append(toolOutputs, openai.ToolOutput{ + ToolCallID: t.ID, + Output: string(out), }) - if err != nil { - return err - } case "kubectlGetSecrets": args := &kubectlGetSecretsArgs{} @@ -197,18 +200,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope if err != nil { return err } - // Submit tool output. - _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ - ToolOutputs: []openai.ToolOutput{ - { - ToolCallID: t.ID, - Output: string(out), - }, - }, + + toolOutputs = append(toolOutputs, openai.ToolOutput{ + ToolCallID: t.ID, + Output: string(out), }) - if err != nil { - return err - } case "kubectlDescribePod": args := &kubectlDescribePodArgs{} @@ -220,18 +216,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope if err != nil { return err } - // Submit tool output. - _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ - ToolOutputs: []openai.ToolOutput{ - { - ToolCallID: t.ID, - Output: string(out), - }, - }, + + toolOutputs = append(toolOutputs, openai.ToolOutput{ + ToolCallID: t.ID, + Output: string(out), }) - if err != nil { - return err - } case "kubectlLogs": args := &kubectlLogsArgs{} @@ -243,20 +232,20 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope if err != nil { return err } - // Submit tool output. - _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ - ToolOutputs: []openai.ToolOutput{ - { - ToolCallID: t.ID, - Output: string(out), - }, - }, + + toolOutputs = append(toolOutputs, openai.ToolOutput{ + ToolCallID: t.ID, + Output: string(out), }) - if err != nil { - return err - } } } + _, err := i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: toolOutputs, + }) + if err != nil { + return fmt.Errorf("while submitting tool outputs: %w", err) + } + return nil } diff --git a/internal/source/ai-brain/response.go b/internal/source/ai-brain/response.go new file mode 100644 index 00000000..dde9f06c --- /dev/null +++ b/internal/source/ai-brain/response.go @@ -0,0 +1,61 @@ +package aibrain + +import ( + "math/rand" + "time" + + "github.com/kubeshop/botkube/pkg/api" +) + +var quickResponses = []string{ + "Just a moment, please...", + "Thinking about this one...", + "Let me check on that for you.", + "Processing your request...", + "Working on it!", + "This one needs some extra thought.", + "I'm carefully considering your request.", + "Consulting my super-smart brain...", + "Cogs are turning...", + "Accessing the knowledge archives...", + "Running calculations at lightning speed!", + "Hold on tight, I'm diving into the details.", + "I'm here to help!", + "Happy to look into this for you.", + "Always learning to do this better.", + "I want to get this right for you.", + "Let me see what I can find out.", + "My circuits are buzzing!", + "Let me consult with my owl advisor...", + "Consider it done (or at least, I'll try my best!)", + "I'll get back to you with the best possible answer.", +} + +func pickQuickResponse(messageID string) api.Message { + rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec G404 + i := rand.Intn(len(quickResponses)) // #nosec G404 + + return api.Message{ + ParentActivityID: messageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: quickResponses[i]}, + }, + }, + }, + } +} + +func msgUnableToHelp(messageID string) api.Message { + return api.Message{ + ParentActivityID: messageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: "I am sorry, something went wrong, please try again. šŸ˜”"}, + }, + }, + }, + } +}