Skip to content

Commit

Permalink
tests: plugin state lifetime in cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Aug 1, 2024
1 parent 5005aeb commit 42a8826
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 11 deletions.
5 changes: 4 additions & 1 deletion internal/cluster/lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func (c *Cluster) clusterLifetime() {
c.i_am_master = true
log.Info("current node has become the master of the cluster")
} else {
c.i_am_master = false
if c.i_am_master {
c.i_am_master = false
log.Info("current node has released the master slot")
}
}
} else {
// update the master
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func (c *Cluster) updateNodeStatus() error {
c.node_lock.Lock()
defer c.node_lock.Unlock()

c.nodes.Clear()
for node_id, node := range nodes {
c.nodes.Clear()
c.nodes.Store(node_id, node)
}

Expand Down
218 changes: 209 additions & 9 deletions internal/cluster/plugin_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,222 @@
package cluster

import (
"testing"
"time"

"github.com/google/uuid"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
)

func getRandomPluginRuntime() entities.PluginRuntime {
return entities.PluginRuntime{
Config: plugin_entities.PluginDeclaration{
Name: uuid.New().String(),
Version: "0.0.1",
Type: plugin_entities.PluginType,
Author: "Yeuoly",
CreatedAt: time.Now(),
Plugins: []string{"test"},
type fakePlugin struct {
entities.PluginRuntime
}

func (r *fakePlugin) InitEnvironment() error {
return nil
}

func (r *fakePlugin) Identity() (string, error) {
return "", nil
}

func (r *fakePlugin) StartPlugin() error {
return nil
}

func (r *fakePlugin) Type() entities.PluginRuntimeType {
return entities.PLUGIN_RUNTIME_TYPE_LOCAL
}

func (r *fakePlugin) Wait() (<-chan bool, error) {
return nil, nil
}

func getRandomPluginRuntime() fakePlugin {
return fakePlugin{
PluginRuntime: entities.PluginRuntime{
Config: plugin_entities.PluginDeclaration{
Name: uuid.New().String(),
Version: "0.0.1",
Type: plugin_entities.PluginType,
Author: "Yeuoly",
CreatedAt: time.Now(),
Plugins: []string{"test"},
},
},
}
}

func TestPluginScheduleLifetime(t *testing.T) {
plugin := getRandomPluginRuntime()
cluster, err := createSimulationCluster(1)
if err != nil {
t.Errorf("create simulation cluster failed: %v", err)
return
}

launchSimulationCluster(cluster, t)
defer closeSimulationCluster(cluster, t)

time.Sleep(time.Second * 1)

// add plugin to the cluster
err = cluster[0].RegisterPlugin(&plugin)
if err != nil {
t.Errorf("register plugin failed: %v", err)
return
}

hashed_identity, err := plugin.HashedIdentity()
if err != nil {
t.Errorf("get plugin hashed identity failed: %v", err)
return
}

nodes, err := cluster[0].FetchPluginAvailableNodes(hashed_identity)
if err != nil {
t.Errorf("fetch plugin available nodes failed: %v", err)
return
}

if len(nodes) != 1 {
t.Errorf("plugin not scheduled")
return
}

if nodes[0] != cluster[0].id {
t.Errorf("plugin scheduled to wrong node")
return
}

// trigger plugin stop
plugin.TriggerStop()

// wait for the plugin to stop
time.Sleep(time.Second * 1)

// check if the plugin is stopped
nodes, err = cluster[0].FetchPluginAvailableNodes(hashed_identity)
if err != nil {
t.Errorf("fetch plugin available nodes failed: %v", err)
return
}

if len(nodes) != 0 {
t.Errorf("plugin not stopped")
return
}
}

func TestPluginScheduleWhenMasterClusterShutdown(t *testing.T) {
plugins := []fakePlugin{
getRandomPluginRuntime(),
getRandomPluginRuntime(),
}

cluster, err := createSimulationCluster(2)
if err != nil {
t.Errorf("create simulation cluster failed: %v", err)
return
}

launchSimulationCluster(cluster, t)
defer closeSimulationCluster(cluster, t)

// add plugin to the cluster
for i, plugin := range plugins {
err = cluster[i].RegisterPlugin(&plugin)
if err != nil {
t.Errorf("register plugin failed: %v", err)
return
}
}

// wait for the plugin to be scheduled
time.Sleep(time.Second * 1)

// close master node and wait for new master to be elected
master_idx := -1

for i, node := range cluster {
if node.IsMaster() {
master_idx = i
// close the master node
node.Close()
break
}
}

if master_idx == -1 {
t.Errorf("master node not found")
return
}

// wait for the new master to be elected
i := 0
for ; i < 10; i++ {
time.Sleep(time.Second * 1)
found := false
for i, node := range cluster {
if node.IsMaster() && i != master_idx {
found = true
break
}
}

if found {
break
}
}

if i == 10 {
t.Errorf("master node is not elected")
return
}

// check if plugins[master_idx] is removed
hashed_identity, err := plugins[master_idx].HashedIdentity()
if err != nil {
t.Errorf("get plugin hashed identity failed: %v", err)
return
}

ticker := time.NewTicker(time.Second)
timeout := time.NewTimer(MASTER_GC_INTERVAL * 2)
done := false
for !done {
select {
case <-ticker.C:
nodes, err := cluster[master_idx].FetchPluginAvailableNodes(hashed_identity)
if err != nil {
t.Errorf("fetch plugin available nodes failed: %v", err)
return
}
if len(nodes) == 0 {
done = true
}
case <-timeout.C:
t.Errorf("plugin not removed")
return
}
}

// check if plugins[1-master_idx] is still scheduled
hashed_identity, err = plugins[1-master_idx].HashedIdentity()
if err != nil {
t.Errorf("get plugin hashed identity failed: %v", err)
return
}

nodes, err := cluster[1-master_idx].FetchPluginAvailableNodes(hashed_identity)
if err != nil {
t.Errorf("fetch plugin available nodes failed: %v", err)
return
}

if len(nodes) != 1 {
t.Errorf("plugin not scheduled")
return
}
}

0 comments on commit 42a8826

Please sign in to comment.