Skip to content

Commit

Permalink
refactor: serverless connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Nov 25, 2024
1 parent d341ff9 commit e242e6e
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 56 deletions.
125 changes: 85 additions & 40 deletions internal/core/plugin_manager/install_to_serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,30 @@ package plugin_manager

import (
"fmt"
"regexp"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
"github.com/langgenius/dify-plugin-daemon/internal/db"
"github.com/langgenius/dify-plugin-daemon/internal/types/models"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
)

var (
variablePattern = regexp.MustCompile(`(\w+)=([^,]+)`)
)

func extractVariables(message string) map[string]string {
matches := variablePattern.FindAllStringSubmatch(message, -1)
variables := make(map[string]string)
for _, match := range matches {
variables[match[1]] = match[2]
}
return variables
}

// InstallToAWSFromPkg installs a plugin to AWS Lambda
func (p *PluginManager) InstallToAWSFromPkg(
decoder decoder.PluginDecoder,
Expand Down Expand Up @@ -47,65 +62,95 @@ func (p *PluginManager) InstallToAWSFromPkg(
lambdaFunctionName := ""

response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
if r.Event == serverless.Info {
if r.Stage == serverless.LaunchStageStart {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventInfo,
Data: "Installing...",
})
} else if r.Event == serverless.Done {
if lambdaUrl == "" || lambdaFunctionName == "" {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: "Internal server error, failed to get lambda url or function name",
})
return
}
// check if the plugin is already installed
_, err := db.GetOne[models.ServerlessRuntime](
db.Equal("checksum", checksum),
db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA)),
)
if err == db.ErrDatabaseNotFound {
// create a new serverless runtime
serverlessModel := &models.ServerlessRuntime{
Checksum: checksum,
Type: models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
FunctionURL: lambdaUrl,
FunctionName: lambdaFunctionName,
PluginUniqueIdentifier: uniqueIdentity.String(),
Declaration: declaration,
} else if r.Stage == serverless.LaunchStageRun {
if r.State == serverless.LaunchStateSuccess {
// split the message to get the lambda url and function name
variables := extractVariables(r.Message)
if len(variables) != 3 {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: "Internal server error, failed to get lambda url or function name",
})
return
}

lambdaUrl = variables["endpoint"]
lambdaFunctionName = variables["name"]

if lambdaUrl == "" || lambdaFunctionName == "" {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: fmt.Sprintf(
"Internal server error, failed to get lambda url or function name,"+
" one of them is empty, lambdaUrl: %3s, lambdaFunctionName: %3s",
lambdaUrl, lambdaFunctionName,
),
})
return
}
err = db.Create(serverlessModel)
if err != nil {

// check if the plugin is already installed
_, err := db.GetOne[models.ServerlessRuntime](
db.Equal("checksum", checksum),
db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA)),
)
if err == db.ErrDatabaseNotFound {
// create a new serverless runtime
serverlessModel := &models.ServerlessRuntime{
Checksum: checksum,
Type: models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
FunctionURL: lambdaUrl,
FunctionName: lambdaFunctionName,
PluginUniqueIdentifier: uniqueIdentity.String(),
Declaration: declaration,
}
err = db.Create(serverlessModel)
if err != nil {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: "Failed to create serverless runtime",
})
return
}
} else if err != nil {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: "Failed to create serverless runtime",
Data: "Failed to check if the plugin is already installed",
})
return
}
} else if err != nil {

} else if r.State == serverless.LaunchStateRunning {
// do nothing
}
} else if r.Stage == serverless.LaunchStageEnd {
if r.State == serverless.LaunchStateSuccess {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: "Failed to check if the plugin is already installed",
Event: PluginInstallEventDone,
Data: "Installed",
})
return
}

} else if r.Stage == serverless.LaunchStageBuild {
// do nothing
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventDone,
Data: "Installed",
Event: PluginInstallEventInfo,
Data: "Building...",
})
} else if r.Event == serverless.Error {
}

// any error occurs
if r.State == serverless.LaunchStateFailed {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: "Internal server error",
})
} else if r.Event == serverless.LambdaUrl {
lambdaUrl = r.Message
} else if r.Event == serverless.Lambda {
lambdaFunctionName = r.Message
} else {
newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
// log the error message
log.Error("failed to install plugin to AWS Lambda, stage: %s, state: %s, error: %s", r.Stage, r.State, r.Message)
}
})
})
Expand Down
19 changes: 19 additions & 0 deletions internal/core/plugin_manager/install_to_serverless_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package plugin_manager

import (
"testing"
)

func TestExtractVariables(t *testing.T) {
message := "endpoint=${endpoint},name=${name},id=${id}"
variables := extractVariables(message)
if variables["endpoint"] != "${endpoint}" {
t.Errorf("endpoint is not correct")
}
if variables["name"] != "${name}" {
t.Errorf("name is not correct")
}
if variables["id"] != "${id}" {
t.Errorf("id is not correct")
}
}
22 changes: 14 additions & 8 deletions internal/core/plugin_manager/serverless/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,25 @@ func FetchLambda(identity string, checksum string) (*LambdaFunction, error) {
return &response.Data, nil
}

type LaunchAWSLambdaFunctionEvent string
type LaunchFunctionStage string
type LaunchFunctionState string

const (
Error LaunchAWSLambdaFunctionEvent = "error"
Info LaunchAWSLambdaFunctionEvent = "info"
Lambda LaunchAWSLambdaFunctionEvent = "lambda"
LambdaUrl LaunchAWSLambdaFunctionEvent = "lambda_url"
Done LaunchAWSLambdaFunctionEvent = "done"
LaunchStageStart LaunchFunctionStage = "start"
LaunchStageBuild LaunchFunctionStage = "build"
LaunchStageRun LaunchFunctionStage = "run"
LaunchStageEnd LaunchFunctionStage = "end"

LaunchStateSuccess LaunchFunctionState = "success"
LaunchStateRunning LaunchFunctionState = "running"
LaunchStateFailed LaunchFunctionState = "failed"
)

type LaunchAWSLambdaFunctionResponse struct {
Event LaunchAWSLambdaFunctionEvent `json:"event"`
Message string `json:"message"`
Stage LaunchFunctionStage
Obj string
State LaunchFunctionState
Message string
}

// Launch the lambda function from serverless connector, it will receive the context_tar as the input
Expand Down
15 changes: 7 additions & 8 deletions internal/core/plugin_manager/serverless/upload.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serverless

import (
"fmt"
"os"
"time"

Expand Down Expand Up @@ -40,17 +41,15 @@ func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambda
}
} else {
// found, return directly
response := stream.NewStream[LaunchAWSLambdaFunctionResponse](3)
response := stream.NewStream[LaunchAWSLambdaFunctionResponse](2)
response.Write(LaunchAWSLambdaFunctionResponse{
Event: LambdaUrl,
Message: function.FunctionURL,
Stage: LaunchStageRun,
State: LaunchStateSuccess,
Message: fmt.Sprintf("endpoint=%s,name=%s,id=%s", function.FunctionURL, function.FunctionName, identity),
})
response.Write(LaunchAWSLambdaFunctionResponse{
Event: Lambda,
Message: function.FunctionName,
})
response.Write(LaunchAWSLambdaFunctionResponse{
Event: Done,
Stage: LaunchStageEnd,
State: LaunchStateSuccess,
Message: "",
})
response.Close()
Expand Down

0 comments on commit e242e6e

Please sign in to comment.