Skip to content

Commit

Permalink
Improve conn checker for cloud plugins
Browse files Browse the repository at this point in the history
1. Call cloud endpoints only if cloud envs are present (speeds up indexing)
2. Introduce a goroutine to supervise active cloud connections:
3. Replace panic with user-facing messages

minor:
1. Log connection errors instead of ignoring them
2. Simplify QGL client logic
3. Fix lint issues
4. Fix helm unit-tests
  • Loading branch information
mszostok committed Feb 22, 2024
1 parent 4463682 commit ff9c7ad
Show file tree
Hide file tree
Showing 23 changed files with 383 additions and 205 deletions.
6 changes: 4 additions & 2 deletions cmd/executor/ai-face/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"io"
"net/http"

"github.com/kubeshop/botkube-cloud-plugins/internal/auth"

"github.com/hashicorp/go-plugin"
aibrain "github.com/kubeshop/botkube-cloud-plugins/internal/source/ai-brain"
"github.com/kubeshop/botkube/pkg/api"
Expand Down Expand Up @@ -126,9 +128,9 @@ func (*AIFace) Help(context.Context) (api.Message, error) {
func main() {
executor.Serve(map[string]plugin.Plugin{
pluginName: &executor.Plugin{
Executor: &AIFace{
Executor: auth.NewProtectedExecutor(&AIFace{
httpClient: httpx.NewHTTPClient(),
},
}),
},
})
}
4 changes: 3 additions & 1 deletion cmd/executor/exec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"strings"

"github.com/kubeshop/botkube-cloud-plugins/internal/auth"

"github.com/MakeNowJust/heredoc"
"github.com/alexflint/go-arg"
"github.com/hashicorp/go-plugin"
Expand Down Expand Up @@ -202,7 +204,7 @@ func (i *XExecutor) getKubeconfig(ctx context.Context, log logrus.FieldLogger, i
func main() {
executor.Serve(map[string]plugin.Plugin{
pluginName: &executor.Plugin{
Executor: &XExecutor{},
Executor: auth.NewProtectedExecutor(&XExecutor{}),
},
})
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/executor/gh/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"text/template"

"github.com/kubeshop/botkube-cloud-plugins/internal/auth"

"github.com/hashicorp/go-plugin"

"github.com/kubeshop/botkube/pkg/api"
Expand Down Expand Up @@ -161,7 +163,7 @@ var depsDownloadLinks = map[string]api.Dependency{
func main() {
executor.Serve(map[string]plugin.Plugin{
pluginName: &executor.Plugin{
Executor: &GHExecutor{},
Executor: auth.NewProtectedExecutor(&GHExecutor{}),
},
})
}
Expand Down
1 change: 1 addition & 0 deletions cmd/executor/thread-mate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ThreadMateExecutor struct {
}

func NewThreadMateExecutor() *ThreadMateExecutor {
// It's not protected as we use it only as dev plugin, and we integrate it with the SocketSlack.
return &ThreadMateExecutor{}
}

Expand Down
6 changes: 4 additions & 2 deletions cmd/source/ai-brain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"sync"

"github.com/kubeshop/botkube-cloud-plugins/internal/auth"

"github.com/hashicorp/go-plugin"
aibrain "github.com/kubeshop/botkube-cloud-plugins/internal/source/ai-brain"
"github.com/kubeshop/botkube/pkg/api"
Expand Down Expand Up @@ -88,9 +90,9 @@ func (a *AI) HandleExternalRequest(_ context.Context, in source.ExternalRequestI
func main() {
source.Serve(map[string]plugin.Plugin{
pluginName: &source.Plugin{
Source: &AI{
Source: auth.NewProtectedSource(&AI{
incomingPrompts: sync.Map{},
},
}),
},
})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79
github.com/hashicorp/go-getter v1.7.1
github.com/hashicorp/go-plugin v1.4.10
github.com/hasura/go-graphql-client v0.8.1
github.com/huandu/xstrings v1.4.0
github.com/keptn/go-utils v0.20.4
github.com/kubeshop/botkube v0.13.1-0.20240219113106-577e1e800cff
Expand Down Expand Up @@ -105,7 +106,6 @@ require (
github.com/hashicorp/go-safetemp v1.0.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/hasura/go-graphql-client v0.8.1 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
73 changes: 73 additions & 0 deletions internal/auth/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package auth

import (
"context"

"github.com/kubeshop/botkube-cloud-plugins/internal/remote"

"github.com/kubeshop/botkube/pkg/api"
"github.com/kubeshop/botkube/pkg/api/executor"
)

// ProtectedExecutor protects Executor usage without active Cloud connection.
type ProtectedExecutor struct {
underlying executor.Executor
openSourceBlockage bool
initConnectionBlockage bool
authChecker *remote.ConnChecker
}

// NewProtectedExecutor returns wrapped Executor instance with Cloud connection checker.
func NewProtectedExecutor(exec executor.Executor) executor.Executor {
cfg, ok := remote.GetConfig()
if !ok {
return &ProtectedExecutor{
underlying: exec,
openSourceBlockage: true,
}
}

authChecker := remote.NewConnChecker(cfg)
err := authChecker.IsConnectedWithCould()
if err != nil {
return &ProtectedExecutor{
underlying: exec,
initConnectionBlockage: true,
}
}
noop := func() {}
authChecker.AsyncSupervisor(noop)

return &ProtectedExecutor{
underlying: exec,
authChecker: authChecker,
}
}

// Execute provides executor functionality only with active Cloud connection.
func (e *ProtectedExecutor) Execute(ctx context.Context, input executor.ExecuteInput) (executor.ExecuteOutput, error) {
if e.isBlocked() {
return executor.ExecuteOutput{
Message: unauthorizedMessage(e.openSourceBlockage),
}, nil
}

return e.underlying.Execute(ctx, input)
}

// Metadata returns plugin metadata even without active Cloud connection.
func (e *ProtectedExecutor) Metadata(ctx context.Context) (api.MetadataOutput, error) {
return e.underlying.Metadata(ctx)
}

// Help provides help functionality only with active Cloud connection.
func (e *ProtectedExecutor) Help(ctx context.Context) (api.Message, error) {
if e.isBlocked() {
return unauthorizedMessage(e.openSourceBlockage), nil
}
return e.underlying.Help(ctx)
}

func (e *ProtectedExecutor) isBlocked() bool {
return e.openSourceBlockage || e.initConnectionBlockage || e.authChecker.ReachedPermanentBlockage()
}
39 changes: 39 additions & 0 deletions internal/auth/msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package auth

import "github.com/kubeshop/botkube/pkg/api"

func unauthorizedMessage(isOpenSourceInstance bool) api.Message {
btnBuilder := api.NewMessageButtonBuilder()

if isOpenSourceInstance {
return api.Message{
Sections: []api.Section{
{
Base: api.Base{
Header: "Cloud-Only Plugin Enabled",
Description: "This plugin is available only on Botkube Cloud. To use it, migrate your Botkube Agent instance to Botkube Cloud.",
},
Buttons: []api.Button{
btnBuilder.ForURL("Migrate Installation", "https://docs.botkube.io/cli/migrating-installation-to-botkube-cloud", api.ButtonStylePrimary),
btnBuilder.ForURL("Open Botkube Cloud", "https://app.botkube.io"),
},
},
},
}
}

return api.Message{
Sections: []api.Section{
{
Base: api.Base{
Header: "Lost Connection with Botkube Cloud",
Description: "This plugin requires an active connection to Botkube Cloud. Ensure that your Agent has access to Botkube Cloud with valid credentials. Check Agent logs for more information.",
},
Buttons: []api.Button{
btnBuilder.ForURL("See Troubleshooting Guide", "https://docs.botkube.io/operation/common-problems"),
btnBuilder.ForURL("How to Get Agent Logs", "https://docs.botkube.io/operation/diagnostics#agent-logs"),
},
},
},
}
}
87 changes: 87 additions & 0 deletions internal/auth/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package auth

import (
"context"
"log"

"github.com/kubeshop/botkube-cloud-plugins/internal/remote"

"github.com/kubeshop/botkube/pkg/api"
"github.com/kubeshop/botkube/pkg/api/source"
)

// ProtectedSource protects source usage without active cloud connection.
type ProtectedSource struct {
underlying source.Source
openSourceBlockage bool
initConnectionBlockage bool
}

// NewProtectedSource returns wrapped Source instance with Cloud conn checker.
func NewProtectedSource(source source.Source) source.Source {
cfg, ok := remote.GetConfig()
if !ok {
return &ProtectedSource{
underlying: source,
openSourceBlockage: true,
}
}
authChecker := remote.NewConnChecker(cfg)
err := authChecker.IsConnectedWithCould()
if err != nil {
return &ProtectedSource{
underlying: source,
initConnectionBlockage: true,
}
}

onPermanentBlockage := func() {
// we may already start multiple streams (`Stream()`),
// so we want to kill the plugin process to stop all of them.
// Once restarted, and we will be still blocked, a proper message will be sent to all configured channels.
log.Fatal("Failed to connect with Cloud")
}
authChecker.AsyncSupervisor(onPermanentBlockage)
return &ProtectedSource{
underlying: source,
}
}

// Stream provides stream functionality only with active cloud connection.
func (s *ProtectedSource) Stream(ctx context.Context, in source.StreamInput) (source.StreamOutput, error) {
if s.isBlocked() {
out := source.StreamOutput{
Event: make(chan source.Event, 1),
}
out.Event <- source.Event{
Message: unauthorizedMessage(s.openSourceBlockage),
}

return out, nil
}

return s.underlying.Stream(ctx, in)
}

// Metadata returns plugin metadata even without active cloud connection.
func (s *ProtectedSource) Metadata(ctx context.Context) (api.MetadataOutput, error) {
return s.underlying.Metadata(ctx)
}

// HandleExternalRequest provides external request functionality only with active cloud connection.
func (s *ProtectedSource) HandleExternalRequest(ctx context.Context, in source.ExternalRequestInput) (source.ExternalRequestOutput, error) {
if s.isBlocked() {
out := source.ExternalRequestOutput{
Event: source.Event{
Message: unauthorizedMessage(s.openSourceBlockage),
},
}
return out, nil
}

return s.underlying.HandleExternalRequest(ctx, in)
}

func (s *ProtectedSource) isBlocked() bool {
return s.openSourceBlockage || s.initConnectionBlockage
}
39 changes: 0 additions & 39 deletions internal/executor/authorized/executor.go

This file was deleted.

11 changes: 5 additions & 6 deletions internal/executor/doctor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ import (
"strings"
"sync"

"github.com/PullRequestInc/go-gpt3"
"github.com/sirupsen/logrus"
stringsutil "k8s.io/utils/strings"

"github.com/kubeshop/botkube-cloud-plugins/internal/executor/authorized"
"github.com/kubeshop/botkube-cloud-plugins/internal/auth"

"github.com/PullRequestInc/go-gpt3"
"github.com/kubeshop/botkube/pkg/api"
"github.com/kubeshop/botkube/pkg/api/executor"
"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/loggerx"
pluginx "github.com/kubeshop/botkube/pkg/plugin"
"github.com/sirupsen/logrus"
stringsutil "k8s.io/utils/strings"
)

const (
Expand Down Expand Up @@ -59,7 +58,7 @@ func NewExecutor(ver string) executor.Executor {
exec := &Executor{
pluginVersion: ver,
}
return authorized.NewExecutor(exec)
return auth.NewProtectedExecutor(exec)
}

// Metadata returns details about the Doctor plugin.
Expand Down
Loading

0 comments on commit ff9c7ad

Please sign in to comment.