From a74a1a4801eade055b352135ee7e696d9f68c56a Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Mon, 14 Oct 2024 18:11:22 +0800 Subject: [PATCH] refactor: plugin installation changed to sync tasks --- internal/server/controllers/plugins.go | 60 +++++++++++------- internal/server/http_server.go | 7 ++- internal/service/install_plugin.go | 85 +++++++++++++++++--------- 3 files changed, 99 insertions(+), 53 deletions(-) diff --git a/internal/server/controllers/plugins.go b/internal/server/controllers/plugins.go index 3ff859e..1429166 100644 --- a/internal/server/controllers/plugins.go +++ b/internal/server/controllers/plugins.go @@ -9,7 +9,6 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/types/app" "github.com/langgenius/dify-plugin-daemon/internal/types/entities" "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities" - "github.com/langgenius/dify-plugin-daemon/internal/utils/parser" ) func GetAsset(c *gin.Context) { @@ -24,7 +23,7 @@ func GetAsset(c *gin.Context) { c.Data(http.StatusOK, "application/octet-stream", asset) } -func InstallPluginFromPkg(app *app.Config) gin.HandlerFunc { +func UploadPlugin(app *app.Config) gin.HandlerFunc { return func(c *gin.Context) { dify_pkg_file_header, err := c.FormFile("dify_pkg") if err != nil { @@ -45,19 +44,6 @@ func InstallPluginFromPkg(app *app.Config) gin.HandlerFunc { verify_signature := c.PostForm("verify_signature") == "true" - source := c.PostForm("source") - if source == "" { - c.JSON(http.StatusOK, entities.NewErrorResponse(-400, "Source is required")) - return - } - - meta_str := c.PostForm("meta") - meta, err := parser.UnmarshalJson[map[string]any](meta_str) - if err != nil { - c.JSON(http.StatusOK, entities.NewErrorResponse(-400, err.Error())) - return - } - dify_pkg_file, err := dify_pkg_file_header.Open() if err != nil { c.JSON(http.StatusOK, entities.NewErrorResponse(-400, err.Error())) @@ -65,28 +51,56 @@ func InstallPluginFromPkg(app *app.Config) gin.HandlerFunc { } defer dify_pkg_file.Close() - service.InstallPluginFromPkg(app, c, tenant_id, dify_pkg_file, verify_signature, source, meta) + c.JSON(http.StatusOK, service.UploadPluginFromPkg(app, c, tenant_id, dify_pkg_file, verify_signature)) } } -func InstallPluginFromIdentifier(app *app.Config) gin.HandlerFunc { +func InstallPluginFromIdentifiers(app *app.Config) gin.HandlerFunc { return func(c *gin.Context) { BindRequest(c, func(request struct { - TenantID string `uri:"tenant_id" validate:"required"` - PluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier `json:"plugin_unique_identifier" validate:"required,plugin_unique_identifier"` - Source string `json:"source" validate:"required"` - Meta map[string]any `json:"meta" validate:"omitempty"` + TenantID string `uri:"tenant_id" validate:"required"` + PluginUniqueIdentifiers []plugin_entities.PluginUniqueIdentifier `json:"plugin_unique_identifiers" validate:"required,dive,plugin_unique_identifier"` + Source string `json:"source" validate:"required"` + Meta map[string]any `json:"meta" validate:"omitempty"` }) { if request.Meta == nil { request.Meta = map[string]any{} } - c.JSON(http.StatusOK, service.InstallPluginFromIdentifier( - request.TenantID, request.PluginUniqueIdentifier, request.Source, request.Meta, + c.JSON(http.StatusOK, service.InstallPluginFromIdentifiers( + request.TenantID, request.PluginUniqueIdentifiers, request.Source, request.Meta, )) }) } } +func FetchPluginInstallationTasks(c *gin.Context) { + BindRequest(c, func(request struct { + TenantID string `uri:"tenant_id" validate:"required"` + Page int `form:"page" validate:"required,min=1"` + PageSize int `form:"page_size" validate:"required,min=1,max=256"` + }) { + c.JSON(http.StatusOK, service.FetchPluginInstallationTasks(request.TenantID, request.Page, request.PageSize)) + }) +} + +func FetchPluginInstallationTask(c *gin.Context) { + BindRequest(c, func(request struct { + TenantID string `uri:"tenant_id" validate:"required"` + TaskID string `uri:"task_id" validate:"required"` + }) { + c.JSON(http.StatusOK, service.FetchPluginInstallationTask(request.TenantID, request.TaskID)) + }) +} + +func FetchPluginManifest(c *gin.Context) { + BindRequest(c, func(request struct { + TenantID string `uri:"tenant_id" validate:"required"` + PluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier `form:"plugin_unique_identifier" validate:"required,plugin_unique_identifier"` + }) { + c.JSON(http.StatusOK, service.FetchPluginManifest(request.TenantID, request.PluginUniqueIdentifier)) + }) +} + func UninstallPlugin(c *gin.Context) { BindRequest(c, func(request struct { TenantID string `uri:"tenant_id" validate:"required"` diff --git a/internal/server/http_server.go b/internal/server/http_server.go index d8fbe98..a52f719 100644 --- a/internal/server/http_server.go +++ b/internal/server/http_server.go @@ -113,8 +113,11 @@ func (app *App) endpointManagementGroup(group *gin.RouterGroup) { } func (app *App) pluginManagementGroup(group *gin.RouterGroup, config *app.Config) { - group.POST("/install/pkg", controllers.InstallPluginFromPkg(config)) - group.POST("/install/identifier", controllers.InstallPluginFromIdentifier(config)) + group.POST("/install/upload", controllers.UploadPlugin(config)) + group.POST("/install/identifiers", controllers.InstallPluginFromIdentifiers(config)) + group.GET("/install/task/:id", controllers.FetchPluginInstallationTask) + group.GET("/install/tasks", controllers.FetchPluginInstallationTasks) + group.GET("/fetch/manifest", controllers.FetchPluginManifest) group.GET("/fetch/identifier", controllers.FetchPluginFromIdentifier) group.POST("/uninstall", controllers.UninstallPlugin) group.GET("/list", gzip.Gzip(gzip.DefaultCompression), controllers.ListPlugins) diff --git a/internal/service/install_plugin.go b/internal/service/install_plugin.go index cf2afb2..722d28b 100644 --- a/internal/service/install_plugin.go +++ b/internal/service/install_plugin.go @@ -7,7 +7,6 @@ import ( "mime/multipart" "github.com/gin-gonic/gin" - "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager" "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder" "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier" "github.com/langgenius/dify-plugin-daemon/internal/db" @@ -16,49 +15,78 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities" "github.com/langgenius/dify-plugin-daemon/internal/types/models" "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd" - "github.com/langgenius/dify-plugin-daemon/internal/utils/stream" ) -func InstallPluginFromPkg( +func UploadPluginFromPkg( config *app.Config, c *gin.Context, tenant_id string, dify_pkg_file multipart.File, verify_signature bool, - source string, - meta map[string]any, -) { - manager := plugin_manager.Manager() - +) *entities.Response { plugin_file, err := io.ReadAll(dify_pkg_file) if err != nil { - c.JSON(200, entities.NewErrorResponse(-500, err.Error())) - return + return entities.NewErrorResponse(-500, err.Error()) } decoder, err := decoder.NewZipPluginDecoder(plugin_file) if err != nil { - c.JSON(200, entities.NewErrorResponse(-500, err.Error())) - return + return entities.NewErrorResponse(-500, err.Error()) } if config.ForceVerifyingSignature || verify_signature { err := verifier.VerifyPlugin(decoder) if err != nil { - c.JSON(200, entities.NewErrorResponse(-500, errors.Join(err, errors.New( + return entities.NewErrorResponse(-500, errors.Join(err, errors.New( "plugin verification has been enabled, and the plugin you want to install has a bad signature", - )).Error())) - return + )).Error()) } } - baseSSEService( - func() (*stream.Stream[plugin_manager.PluginInstallResponse], error) { - return manager.Install(tenant_id, decoder, source, meta) - }, - c, - 3600, - ) + manifest, err := decoder.Manifest() + if err != nil { + return entities.NewErrorResponse(-500, err.Error()) + } + + return entities.NewSuccessResponse(manifest.Identity()) +} + +func InstallPluginFromIdentifiers( + tenant_id string, + plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier, + source string, + meta map[string]any, +) *entities.Response { + // TODO: create installation task and dispatch to workers + for _, plugin_unique_identifier := range plugin_unique_identifiers { + if err := InstallPluginFromIdentifier(tenant_id, plugin_unique_identifier, source, meta); err != nil { + return entities.NewErrorResponse(-500, err.Error()) + } + } + + return entities.NewSuccessResponse(true) +} + +func FetchPluginInstallationTasks( + tenant_id string, + page int, + page_size int, +) *entities.Response { + return nil +} + +func FetchPluginInstallationTask( + tenant_id string, + task_id string, +) *entities.Response { + return nil +} + +func FetchPluginManifest( + tenant_id string, + plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, +) *entities.Response { + return nil } func InstallPluginFromIdentifier( @@ -66,29 +94,30 @@ func InstallPluginFromIdentifier( plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, source string, meta map[string]any, -) *entities.Response { +) error { + // TODO: refactor // check if identifier exists plugin, err := db.GetOne[models.Plugin]( db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()), ) if err == db.ErrDatabaseNotFound { - return entities.NewErrorResponse(-404, "Plugin not found") + return errors.New("plugin not found") } if err != nil { - return entities.NewErrorResponse(-500, err.Error()) + return err } if plugin.InstallType == plugin_entities.PLUGIN_RUNTIME_TYPE_REMOTE { - return entities.NewErrorResponse(-500, "remote plugin not supported") + return errors.New("remote plugin not supported") } declaration := plugin.Declaration // install to this workspace if _, _, err := curd.InstallPlugin(tenant_id, plugin_unique_identifier, plugin.InstallType, &declaration, source, meta); err != nil { - return entities.NewErrorResponse(-500, err.Error()) + return err } - return entities.NewSuccessResponse(true) + return nil } func FetchPluginFromIdentifier(