diff --git a/cmd/server/main.go b/cmd/server/main.go index 2aa9f28..80864e1 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -28,7 +28,7 @@ func main() { log.Panic("Invalid configuration: %s", err.Error()) } - server.Run(&config) + (&server.App{}).Run(&config) } func setDefault(config *app.Config) { diff --git a/internal/cluster/init.go b/internal/cluster/init.go index a1ad045..d3ec185 100644 --- a/internal/cluster/init.go +++ b/internal/cluster/init.go @@ -1,25 +1,25 @@ package cluster -import "github.com/langgenius/dify-plugin-daemon/internal/types/app" +import ( + "sync" + + "github.com/langgenius/dify-plugin-daemon/internal/types/app" +) type Cluster struct { port uint16 -} -var ( - cluster *Cluster -) + plugins map[string]*PluginLifeTime + plugin_lock sync.Mutex +} -func Launch(config *app.Config) { - cluster = &Cluster{ - port: uint16(config.ServerPort), +func NewCluster(config *app.Config) *Cluster { + return &Cluster{ + port: uint16(config.ServerPort), + plugins: make(map[string]*PluginLifeTime), } - - go func() { - cluster.clusterLifetime() - }() } -func GetCluster() *Cluster { - return cluster +func (c *Cluster) Launch(config *app.Config) { + go c.clusterLifetime() } diff --git a/internal/cluster/state.go b/internal/cluster/state.go index 916b1b5..0cd01d3 100644 --- a/internal/cluster/state.go +++ b/internal/cluster/state.go @@ -1 +1,40 @@ package cluster + +import ( + "github.com/langgenius/dify-plugin-daemon/internal/types/entities" + "github.com/langgenius/dify-plugin-daemon/internal/utils/log" +) + +type PluginLifeTime struct { + lifetime entities.PluginRuntimeTimeLifeInterface +} + +// RegisterPlugin registers a plugin to the cluster, and start to be scheduled +func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error { + identity, err := lifetime.Identity() + if err != nil { + return err + } + + lifetime.OnStop(func() { + c.plugin_lock.Lock() + delete(c.plugins, identity) + c.plugin_lock.Unlock() + }) + + c.plugin_lock.Lock() + if !lifetime.Stopped() { + c.plugins[identity] = &PluginLifeTime{ + lifetime: lifetime, + } + } + c.plugin_lock.Unlock() + + log.Info("start to schedule plugin %s", identity) + + return nil +} + +func (c *Cluster) SchedulePlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error { + return nil +} diff --git a/internal/core/plugin_daemon/model_service.go b/internal/core/plugin_daemon/model_service.go index 1573e94..02b70fe 100644 --- a/internal/core/plugin_daemon/model_service.go +++ b/internal/core/plugin_daemon/model_service.go @@ -20,10 +20,8 @@ func genericInvokePlugin[Req any, Rsp any]( response_buffer_size int, typ backwards_invocation.PluginAccessType, action backwards_invocation.PluginAccessAction, -) ( - *stream.StreamResponse[Rsp], error, -) { - runtime := plugin_manager.Get(session.PluginIdentity()) +) (*stream.StreamResponse[Rsp], error) { + runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity()) if runtime == nil { return nil, errors.New("plugin not found") } diff --git a/internal/core/plugin_manager/init.go b/internal/core/plugin_manager/init.go index c214436..215c316 100644 --- a/internal/core/plugin_manager/init.go +++ b/internal/core/plugin_manager/init.go @@ -6,7 +6,9 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/types/entities" ) -var m sync.Map +var ( + m sync.Map +) func checkPluginExist(identity string) (entities.PluginRuntimeInterface, bool) { if v, ok := m.Load(identity); ok { diff --git a/internal/core/plugin_manager/lifetime.go b/internal/core/plugin_manager/lifetime.go index 74b3dbf..87fb46d 100644 --- a/internal/core/plugin_manager/lifetime.go +++ b/internal/core/plugin_manager/lifetime.go @@ -8,22 +8,22 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/utils/log" ) -func lifetime(config *app.Config, r entities.PluginRuntimeInterface) { +func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInterface) { start_failed_times := 0 configuration := r.Configuration() log.Info("new plugin logged in: %s", configuration.Identity()) defer log.Info("plugin %s has exited", configuration.Identity()) - // store plugin runtime - m.Store(configuration.Identity(), r) - defer m.Delete(configuration.Identity()) - - // update lifetime state for this pod - addLifetimeState(r) + // add plugin to cluster + err := p.cluster.RegisterPlugin(r) + if err != nil { + log.Error("add plugin to cluster failed: %s", err.Error()) + return + } // remove lifetime state after plugin if it has been stopped - defer deleteLifetimeState(r) + defer r.TriggerStop() for !r.Stopped() { if err := r.InitEnvironment(); err != nil { diff --git a/internal/core/plugin_manager/lifetime_manager.go b/internal/core/plugin_manager/lifetime_manager.go deleted file mode 100644 index 1ab020a..0000000 --- a/internal/core/plugin_manager/lifetime_manager.go +++ /dev/null @@ -1,146 +0,0 @@ -package plugin_manager - -import ( - "sync" - "time" - - "github.com/google/uuid" - "github.com/langgenius/dify-plugin-daemon/internal/types/app" - "github.com/langgenius/dify-plugin-daemon/internal/types/entities" - "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities" - "github.com/langgenius/dify-plugin-daemon/internal/utils/cache" - "github.com/langgenius/dify-plugin-daemon/internal/utils/log" - "github.com/langgenius/dify-plugin-daemon/internal/utils/parser" -) - -const ( - KEY_PLUGIN_LIFETIME_STATE = "lifetime_state" - KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK = "lifetime_state_modify_lock" -) - -type PluginLifeTime struct { - Identity string `json:"identity"` - Restarts int `json:"restarts"` - Status string `json:"status"` - Config plugin_entities.PluginDeclaration `json:"configuration"` -} - -type pluginLifeCollection struct { - Collection map[string]PluginLifeTime `json:"state"` - ID string `json:"id"` - LastCheckAt time.Time `json:"last_check_at"` -} - -func (p pluginLifeCollection) MarshalBinary() ([]byte, error) { - return parser.MarshalJsonBytes(p), nil -} - -var ( - instanceId = uuid.New().String() - - pluginLifetimeStateLock = sync.RWMutex{} - pluginLifetimeCollection = pluginLifeCollection{ - Collection: map[string]PluginLifeTime{}, - ID: instanceId, - } -) - -func startLifeTimeManager(config *app.Config) { - go func() { - // do check immediately - doClusterLifetimeCheck(config.LifetimeCollectionGCInterval) - - duration := time.Duration(config.LifetimeCollectionHeartbeatInterval) * time.Second - for range time.NewTicker(duration).C { - doClusterLifetimeCheck(config.LifetimeCollectionGCInterval) - } - }() -} - -func doClusterLifetimeCheck(heartbeat_interval int) { - // check and update self lifetime state - if err := updateCurrentInstanceLifetimeCollection(); err != nil { - log.Error("update current instance lifetime state failed: %s", err.Error()) - return - } - - // lock cluster and do cluster lifetime check - if cache.Lock(KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK, time.Second*10, time.Second*10) != nil { - log.Error("update lifetime state failed: lock failed") - return - } - defer cache.Unlock(KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK) - - cluster_lifetime_collections, err := fetchClusterPluginLifetimeCollections() - if err != nil { - log.Error("fetch cluster plugin lifetime state failed: %s", err.Error()) - return - } - - for cluster_id, state := range cluster_lifetime_collections { - if state.ID == instanceId { - continue - } - - // skip if last check has been done in $LIFETIME_COLLECTION_CG_INTERVAL - cg_interval := time.Duration(heartbeat_interval) * time.Second - if time.Since(state.LastCheckAt) < cg_interval { - continue - } - - // if last check has not been done in $LIFETIME_COLLECTION_CG_INTERVAL * 2, delete it - if time.Since(state.LastCheckAt) > cg_interval*2 { - if err := cache.DelMapField(KEY_PLUGIN_LIFETIME_STATE, cluster_id); err != nil { - log.Error("delete cluster plugin lifetime state failed: %s", err.Error()) - } else { - log.Info("delete cluster plugin lifetime state due to no longer active: %s", cluster_id) - } - } - } -} - -func newLifetimeFromRuntimeState(state entities.PluginRuntimeInterface) PluginLifeTime { - s := state.RuntimeState() - c := state.Configuration() - - return PluginLifeTime{ - Identity: c.Identity(), - Restarts: s.Restarts, - Status: s.Status, - Config: *c, - } -} - -func addLifetimeState(state entities.PluginRuntimeInterface) { - pluginLifetimeStateLock.Lock() - defer pluginLifetimeStateLock.Unlock() - - pluginLifetimeCollection.Collection[state.Configuration().Identity()] = newLifetimeFromRuntimeState(state) -} - -func deleteLifetimeState(state entities.PluginRuntimeInterface) { - pluginLifetimeStateLock.Lock() - defer pluginLifetimeStateLock.Unlock() - - delete(pluginLifetimeCollection.Collection, state.Configuration().Identity()) -} - -func updateCurrentInstanceLifetimeCollection() error { - pluginLifetimeStateLock.Lock() - defer pluginLifetimeStateLock.Unlock() - - pluginLifetimeCollection.LastCheckAt = time.Now() - - m.Range(func(key, value interface{}) bool { - if v, ok := value.(entities.PluginRuntimeInterface); ok { - pluginLifetimeCollection.Collection[v.Configuration().Identity()] = newLifetimeFromRuntimeState(v) - } - return true - }) - - return cache.SetMapOneField(KEY_PLUGIN_LIFETIME_STATE, instanceId, pluginLifetimeCollection) -} - -func fetchClusterPluginLifetimeCollections() (map[string]pluginLifeCollection, error) { - return cache.GetMap[pluginLifeCollection](KEY_PLUGIN_LIFETIME_STATE) -} diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index 5405062..a607ef5 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -3,6 +3,7 @@ package plugin_manager import ( "fmt" + "github.com/langgenius/dify-plugin-daemon/internal/cluster" "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation" "github.com/langgenius/dify-plugin-daemon/internal/types/app" "github.com/langgenius/dify-plugin-daemon/internal/types/entities" @@ -10,7 +11,25 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/utils/log" ) -func List() []entities.PluginRuntimeInterface { +type PluginManager struct { + cluster *cluster.Cluster +} + +var ( + manager *PluginManager +) + +func InitGlobalPluginManager(cluster *cluster.Cluster) { + manager = &PluginManager{ + cluster: cluster, + } +} + +func GetGlobalPluginManager() *PluginManager { + return manager +} + +func (p *PluginManager) List() []entities.PluginRuntimeInterface { var runtimes []entities.PluginRuntimeInterface m.Range(func(key, value interface{}) bool { if v, ok := value.(entities.PluginRuntimeInterface); ok { @@ -21,7 +40,7 @@ func List() []entities.PluginRuntimeInterface { return runtimes } -func Get(identity string) entities.PluginRuntimeInterface { +func (p *PluginManager) Get(identity string) entities.PluginRuntimeInterface { if v, ok := m.Load(identity); ok { if r, ok := v.(entities.PluginRuntimeInterface); ok { return r @@ -30,15 +49,15 @@ func Get(identity string) entities.PluginRuntimeInterface { return nil } -func Put(path string, binary []byte) { +func (p *PluginManager) Put(path string, binary []byte) { //TODO: put binary into } -func Delete(identity string) { +func (p *PluginManager) Delete(identity string) { //TODO: delete binary from } -func Init(configuration *app.Config) { +func (p *PluginManager) Init(configuration *app.Config) { // TODO: init plugin manager log.Info("start plugin manager daemon...") @@ -57,8 +76,5 @@ func Init(configuration *app.Config) { } // start plugin watcher - startWatcher(configuration) - - // start plugin lifetime manager - startLifeTimeManager(configuration) + p.startWatcher(configuration) } diff --git a/internal/core/plugin_manager/watcher.go b/internal/core/plugin_manager/watcher.go index 40f5578..4e36e24 100644 --- a/internal/core/plugin_manager/watcher.go +++ b/internal/core/plugin_manager/watcher.go @@ -15,19 +15,19 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/utils/routine" ) -func startWatcher(config *app.Config) { +func (p *PluginManager) startWatcher(config *app.Config) { go func() { log.Info("start to handle new plugins in path: %s", config.StoragePath) - handleNewPlugins(config) + p.handleNewPlugins(config) for range time.NewTicker(time.Second * 30).C { - handleNewPlugins(config) + p.handleNewPlugins(config) } }() - startRemoteWatcher(config) + p.startRemoteWatcher(config) } -func startRemoteWatcher(config *app.Config) { +func (p *PluginManager) startRemoteWatcher(config *app.Config) { // launch TCP debugging server if enabled if config.PluginRemoteInstallingEnabled { server := remote_manager.NewRemotePluginServer(config) @@ -39,13 +39,13 @@ func startRemoteWatcher(config *app.Config) { }() go func() { server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) { - lifetime(config, rpr) + p.lifetime(config, rpr) }) }() } } -func handleNewPlugins(config *app.Config) { +func (p *PluginManager) handleNewPlugins(config *app.Config) { // load local plugins firstly for plugin := range loadNewPlugins(config.StoragePath) { var plugin_interface entities.PluginRuntimeInterface @@ -64,7 +64,7 @@ func handleNewPlugins(config *app.Config) { } routine.Submit(func() { - lifetime(config, plugin_interface) + p.lifetime(config, plugin_interface) }) } } diff --git a/internal/server/app.go b/internal/server/app.go new file mode 100644 index 0000000..989bf83 --- /dev/null +++ b/internal/server/app.go @@ -0,0 +1,11 @@ +package server + +import ( + "github.com/langgenius/dify-plugin-daemon/internal/cluster" + "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager" +) + +type App struct { + plugin_manager *plugin_manager.PluginManager + cluster *cluster.Cluster +} diff --git a/internal/server/server.go b/internal/server/server.go index 11b113e..7bb5d28 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -9,7 +9,10 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/utils/routine" ) -func Run(config *app.Config) { +func (a *App) Run(config *app.Config) { + a.cluster = cluster.NewCluster(config) + plugin_manager.InitGlobalPluginManager(a.cluster) + // init routine pool routine.InitPool(config.RoutinePoolSize) @@ -20,10 +23,10 @@ func Run(config *app.Config) { process.Init(config) // init plugin daemon - plugin_manager.Init(config) + a.plugin_manager.Init(config) - // init cluster - cluster.Launch(config) + // launch cluster + a.cluster.Launch(config) // start http server server(config) diff --git a/internal/service/invoke_tool.go b/internal/service/invoke_tool.go index ce74c55..3803365 100644 --- a/internal/service/invoke_tool.go +++ b/internal/service/invoke_tool.go @@ -14,7 +14,7 @@ import ( func createSession[T any](r *plugin_entities.InvokePluginRequest[T]) *session_manager.Session { session := session_manager.NewSession(r.TenantId, r.UserId, parser.MarshalPluginIdentity(r.PluginName, r.PluginVersion)) - runtime := plugin_manager.Get(session.PluginIdentity()) + runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity()) session.BindRuntime(runtime) return session } diff --git a/internal/types/entities/runtime.go b/internal/types/entities/runtime.go index 343b70a..4d5026a 100644 --- a/internal/types/entities/runtime.go +++ b/internal/types/entities/runtime.go @@ -10,7 +10,7 @@ type ( PluginRuntime struct { State PluginRuntimeState `json:"state"` Config plugin_entities.PluginDeclaration `json:"config"` - Connector PluginConnector `json:"-"` + onStopped []func() `json:"-"` } PluginRuntimeInterface interface { @@ -25,6 +25,8 @@ type ( StartPlugin() error Stopped() bool Stop() + OnStop(func()) + TriggerStop() RuntimeState() *PluginRuntimeState Wait() (<-chan bool, error) Type() PluginRuntimeType @@ -56,6 +58,16 @@ func (r *PluginRuntime) RuntimeState() *PluginRuntimeState { return &r.State } +func (r *PluginRuntime) OnStop(f func()) { + r.onStopped = append(r.onStopped, f) +} + +func (r *PluginRuntime) TriggerStop() { + for _, f := range r.onStopped { + f() + } +} + type PluginRuntimeType string const ( @@ -80,9 +92,3 @@ const ( PLUGIN_RUNTIME_STATUS_RESTARTING = "restarting" PLUGIN_RUNTIME_STATUS_PENDING = "pending" ) - -type PluginConnector interface { - OnMessage(func([]byte)) - Read([]byte) int - Write([]byte) int -}