diff --git a/internal/cluster/lifetime.go b/internal/cluster/lifetime.go index 7fcd45e..b4d1b92 100644 --- a/internal/cluster/lifetime.go +++ b/internal/cluster/lifetime.go @@ -79,7 +79,10 @@ func (c *Cluster) clusterLifetime() { defer pluginSchedulerTicker.Stop() // vote for all ips and find the best one, prepare for later traffic scheduling - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "cluster", + "function": "voteAddressesWhenInit", + }, func() { if err := c.updateNodeStatus(); err != nil { log.Error("failed to update the status of the node: %s", err.Error()) } diff --git a/internal/core/dify_invocation/real/http_request.go b/internal/core/dify_invocation/real/http_request.go index 98d258c..68f9d22 100644 --- a/internal/core/dify_invocation/real/http_request.go +++ b/internal/core/dify_invocation/real/http_request.go @@ -68,7 +68,10 @@ func StreamResponse[T any](i *RealBackwardsInvocation, method string, path strin newResponse.OnClose(func() { response.Close() }) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "dify_invocation", + "function": "StreamResponse", + }, func() { defer newResponse.Close() for response.Next() { t, err := response.Read() diff --git a/internal/core/dify_invocation/tester/mock.go b/internal/core/dify_invocation/tester/mock.go index 4d07dbc..763d720 100644 --- a/internal/core/dify_invocation/tester/mock.go +++ b/internal/core/dify_invocation/tester/mock.go @@ -18,7 +18,7 @@ func NewMockedDifyInvocation() dify_invocation.BackwardsInvocation { func (m *MockedDifyInvocation) InvokeLLM(payload *dify_invocation.InvokeLLMRequest) (*stream.Stream[model_entities.LLMResultChunk], error) { stream := stream.NewStream[model_entities.LLMResultChunk](5) - routine.Submit(func() { + routine.Submit(nil, func() { stream.Write(model_entities.LLMResultChunk{ Model: model_entities.LLMModel(payload.Model), PromptMessages: payload.PromptMessages, @@ -129,7 +129,7 @@ func (m *MockedDifyInvocation) InvokeRerank(payload *dify_invocation.InvokeReran func (m *MockedDifyInvocation) InvokeTTS(payload *dify_invocation.InvokeTTSRequest) (*stream.Stream[model_entities.TTSResult], error) { stream := stream.NewStream[model_entities.TTSResult](5) - routine.Submit(func() { + routine.Submit(nil, func() { for i := 0; i < 10; i++ { stream.Write(model_entities.TTSResult{ Result: "a1b2c3d4", @@ -157,7 +157,7 @@ func (m *MockedDifyInvocation) InvokeModeration(payload *dify_invocation.InvokeM func (m *MockedDifyInvocation) InvokeTool(payload *dify_invocation.InvokeToolRequest) (*stream.Stream[tool_entities.ToolResponseChunk], error) { stream := stream.NewStream[tool_entities.ToolResponseChunk](5) - routine.Submit(func() { + routine.Submit(nil, func() { for i := 0; i < 10; i++ { stream.Write(tool_entities.ToolResponseChunk{ Type: tool_entities.ToolResponseChunkTypeText, @@ -175,7 +175,7 @@ func (m *MockedDifyInvocation) InvokeTool(payload *dify_invocation.InvokeToolReq func (m *MockedDifyInvocation) InvokeApp(payload *dify_invocation.InvokeAppRequest) (*stream.Stream[map[string]any], error) { stream := stream.NewStream[map[string]any](5) - routine.Submit(func() { + routine.Submit(nil, func() { stream.Write(map[string]any{ "event": "agent_message", "message_id": "5ad4cb98-f0c7-4085-b384-88c403be6290", diff --git a/internal/core/plugin_daemon/backwards_invocation/task.go b/internal/core/plugin_daemon/backwards_invocation/task.go index fd2b631..bf17336 100644 --- a/internal/core/plugin_daemon/backwards_invocation/task.go +++ b/internal/core/plugin_daemon/backwards_invocation/task.go @@ -57,7 +57,10 @@ func InvokeDify( } // dispatch invocation task - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_daemon", + "function": "InvokeDify", + }, func() { dispatchDifyInvocationTask(requestHandle) defer requestHandle.EndResponse() }) diff --git a/internal/core/plugin_daemon/endpoint_service.go b/internal/core/plugin_daemon/endpoint_service.go index 4636a9b..8fb6f2e 100644 --- a/internal/core/plugin_daemon/endpoint_service.go +++ b/internal/core/plugin_daemon/endpoint_service.go @@ -60,7 +60,11 @@ func InvokeEndpoint( } response.Write(dehexed) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_daemon", + "function": "InvokeEndpoint", + "type": "body_write", + }, func() { defer response.Close() for resp.Next() { chunk, err := resp.Read() diff --git a/internal/core/plugin_daemon/tool_service.go b/internal/core/plugin_daemon/tool_service.go index c645c1b..fa86780 100644 --- a/internal/core/plugin_daemon/tool_service.go +++ b/internal/core/plugin_daemon/tool_service.go @@ -50,7 +50,12 @@ func InvokeTool( } newResponse := stream.NewStream[tool_entities.ToolResponseChunk](128) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_daemon", + "function": "InvokeTool", + "tool_name": request.Tool, + "tool_provider": request.Provider, + }, func() { files := make(map[string]*bytes.Buffer) defer newResponse.Close() diff --git a/internal/core/plugin_manager/aws_manager/io.go b/internal/core/plugin_manager/aws_manager/io.go index 0aee7b0..7f33bda 100644 --- a/internal/core/plugin_manager/aws_manager/io.go +++ b/internal/core/plugin_manager/aws_manager/io.go @@ -59,7 +59,12 @@ func (r *AWSPluginRuntime) Write(session_id string, data []byte) { req.Header.Set("Accept", "text/event-stream") req.Header.Set("Dify-Plugin-Session-ID", session_id) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "aws_manager", + "function": "Write", + "session_id": session_id, + "lambda_url": r.LambdaURL, + }, func() { // remove the session from listeners defer r.listeners.Delete(session_id) defer l.Close() diff --git a/internal/core/plugin_manager/install_to_local.go b/internal/core/plugin_manager/install_to_local.go index 4c93b28..4e3b290 100644 --- a/internal/core/plugin_manager/install_to_local.go +++ b/internal/core/plugin_manager/install_to_local.go @@ -32,7 +32,10 @@ func (p *PluginManager) InstallToLocal( } response := stream.NewStream[PluginInstallResponse](128) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "function": "InstallToLocal", + }, func() { defer response.Close() ticker := time.NewTicker(time.Second * 5) // check heartbeat every 5 seconds diff --git a/internal/core/plugin_manager/install_to_serverless.go b/internal/core/plugin_manager/install_to_serverless.go index c5eec6d..dd0554e 100644 --- a/internal/core/plugin_manager/install_to_serverless.go +++ b/internal/core/plugin_manager/install_to_serverless.go @@ -38,7 +38,13 @@ func (p *PluginManager) InstallToAWSFromPkg( } newResponse := stream.NewStream[PluginInstallResponse](128) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "function": "InstallToAWSFromPkg", + "checksum": checksum, + "unique_identity": uniqueIdentity.String(), + "source": source, + }, func() { defer func() { newResponse.Close() }() diff --git a/internal/core/plugin_manager/launcher.go b/internal/core/plugin_manager/launcher.go index 1aaae7d..d52eb44 100644 --- a/internal/core/plugin_manager/launcher.go +++ b/internal/core/plugin_manager/launcher.go @@ -155,7 +155,10 @@ func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.Plugi errChan := make(chan error) // local plugin - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "function": "LaunchLocal", + }, func() { defer func() { if r := recover(); r != nil { log.Error("plugin runtime panic: %v", r) @@ -165,7 +168,10 @@ func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.Plugi // add max launching lock to prevent too many plugins launching at the same time p.maxLaunchingLock <- true - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "function": "LaunchLocal", + }, func() { // wait for plugin launched <-launchedChan // release max launching lock diff --git a/internal/core/plugin_manager/local_manager/environment_python.go b/internal/core/plugin_manager/local_manager/environment_python.go index 41d5046..72a086d 100644 --- a/internal/core/plugin_manager/local_manager/environment_python.go +++ b/internal/core/plugin_manager/local_manager/environment_python.go @@ -109,7 +109,10 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error { lastActiveAt := time.Now() - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "function": "InitPythonEnvironment", + }, func() { defer wg.Done() // read stdout buf := make([]byte, 1024) @@ -123,7 +126,10 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error { } }) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "function": "InitPythonEnvironment", + }, func() { defer wg.Done() // read stderr buf := make([]byte, 1024) @@ -144,7 +150,10 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error { } }) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "function": "InitPythonEnvironment", + }, func() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { diff --git a/internal/core/plugin_manager/local_manager/run.go b/internal/core/plugin_manager/local_manager/run.go index af20a06..caceffe 100644 --- a/internal/core/plugin_manager/local_manager/run.go +++ b/internal/core/plugin_manager/local_manager/run.go @@ -126,13 +126,21 @@ func (r *LocalPluginRuntime) StartPlugin() error { wg.Add(2) // listen to plugin stdout - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "type": "local", + "function": "StartStdout", + }, func() { defer wg.Done() stdio.StartStdout(func() {}) }) // listen to plugin stderr - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "type": "local", + "function": "StartStderr", + }, func() { defer wg.Done() stdio.StartStderr() }) diff --git a/internal/core/plugin_manager/remote_manager/run.go b/internal/core/plugin_manager/remote_manager/run.go index 5a4998b..33265a6 100644 --- a/internal/core/plugin_manager/remote_manager/run.go +++ b/internal/core/plugin_manager/remote_manager/run.go @@ -32,8 +32,17 @@ func (r *RemotePluginRuntime) Type() plugin_entities.PluginRuntimeType { func (r *RemotePluginRuntime) StartPlugin() error { var exitError error + identity, err := r.Identity() + if err != nil { + return err + } + // handle heartbeat - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "remote_manager", + "function": "StartPlugin", + "plugin_id": identity.String(), + }, func() { r.lastActiveAt = time.Now() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() diff --git a/internal/core/plugin_manager/watcher.go b/internal/core/plugin_manager/watcher.go index c1828bf..4b50091 100644 --- a/internal/core/plugin_manager/watcher.go +++ b/internal/core/plugin_manager/watcher.go @@ -47,7 +47,12 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) { return } p.m.Store(identity.String(), rpr) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "plugin_manager", + "function": "startRemoteWatcher", + "plugin_id": identity.String(), + "type": "remote", + }, func() { defer func() { if err := recover(); err != nil { log.Error("plugin runtime error: %v", err) diff --git a/internal/server/controllers/pprof.go b/internal/server/controllers/pprof.go new file mode 100644 index 0000000..31a122f --- /dev/null +++ b/internal/server/controllers/pprof.go @@ -0,0 +1,27 @@ +package controllers + +import ( + "net/http/pprof" + + "github.com/gin-gonic/gin" +) + +func PprofIndex(c *gin.Context) { + pprof.Index(c.Writer, c.Request) +} + +func PprofCmdline(c *gin.Context) { + pprof.Cmdline(c.Writer, c.Request) +} + +func PprofProfile(c *gin.Context) { + pprof.Profile(c.Writer, c.Request) +} + +func PprofSymbol(c *gin.Context) { + pprof.Symbol(c.Writer, c.Request) +} + +func PprofTrace(c *gin.Context) { + pprof.Trace(c.Writer, c.Request) +} diff --git a/internal/server/http_server.go b/internal/server/http_server.go index 6f1074c..63479ba 100644 --- a/internal/server/http_server.go +++ b/internal/server/http_server.go @@ -22,6 +22,7 @@ func (app *App) server(config *app.Config) func() { app.endpointGroup(engine.Group("/e"), config) app.awsLambdaTransactionGroup(engine.Group("/backwards-invocation"), config) app.pluginGroup(engine.Group("/plugin/:tenant_id"), config) + app.pprofGroup(engine.Group("/debug/pprof"), config) srv := &http.Server{ Addr: fmt.Sprintf(":%d", config.ServerPort), @@ -137,3 +138,15 @@ func (app *App) pluginManagementGroup(group *gin.RouterGroup, config *app.Config func (app *App) pluginAssetGroup(group *gin.RouterGroup) { group.GET("/:id", gzip.Gzip(gzip.DefaultCompression), controllers.GetAsset) } + +func (app *App) pprofGroup(group *gin.RouterGroup, config *app.Config) { + if config.PPROFEnabled { + group.Use(CheckingKey(config.ServerKey)) + + group.GET("/", controllers.PprofIndex) + group.GET("/cmdline", controllers.PprofCmdline) + group.GET("/profile", controllers.PprofProfile) + group.GET("/symbol", controllers.PprofSymbol) + group.GET("/trace", controllers.PprofTrace) + } +} diff --git a/internal/server/middleware.go b/internal/server/middleware.go index 19e4092..a4677f9 100644 --- a/internal/server/middleware.go +++ b/internal/server/middleware.go @@ -17,8 +17,7 @@ func CheckingKey(key string) gin.HandlerFunc { return func(c *gin.Context) { // get header X-Api-Key if c.GetHeader(constants.X_API_KEY) != key { - c.JSON(200, exception.UnauthorizedError().ToResponse()) - c.Abort() + c.AbortWithStatusJSON(401, exception.UnauthorizedError().ToResponse()) return } diff --git a/internal/service/base_sse.go b/internal/service/base_sse.go index 3f8d12c..b84b0db 100644 --- a/internal/service/base_sse.go +++ b/internal/service/base_sse.go @@ -46,7 +46,10 @@ func baseSSEService[R any]( return } - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "service", + "function": "baseSSEService", + }, func() { for pluginDaemonResponse.Next() { chunk, err := pluginDaemonResponse.Read() if err != nil { diff --git a/internal/service/endpoint.go b/internal/service/endpoint.go index 1939ba2..5a0bc5d 100644 --- a/internal/service/endpoint.go +++ b/internal/service/endpoint.go @@ -138,7 +138,10 @@ func Endpoint( } defer close() - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "service", + "function": "Endpoint", + }, func() { defer close() for response.Next() { chunk, err := response.Read() diff --git a/internal/types/app/config.go b/internal/types/app/config.go index 96629f5..2a013b3 100644 --- a/internal/types/app/config.go +++ b/internal/types/app/config.go @@ -84,6 +84,8 @@ type Config struct { PythonInterpreterPath string `envconfig:"PYTHON_INTERPRETER_PATH"` DisplayClusterLog bool `envconfig:"DISPLAY_CLUSTER_LOG"` + + PPROFEnabled bool `envconfig:"PPROF_ENABLED"` } func (c *Config) Validate() error { diff --git a/internal/utils/http_requests/http_warpper.go b/internal/utils/http_requests/http_warpper.go index 6d009cf..eb9ffbd 100644 --- a/internal/utils/http_requests/http_warpper.go +++ b/internal/utils/http_requests/http_warpper.go @@ -110,7 +110,10 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string resp.Body.Close() }) - routine.Submit(func() { + routine.Submit(map[string]string{ + "module": "http_requests", + "function": "RequestAndParseStream", + }, func() { scanner := bufio.NewScanner(resp.Body) defer resp.Body.Close() diff --git a/internal/utils/routine/pool.go b/internal/utils/routine/pool.go index fa13b44..a47f763 100644 --- a/internal/utils/routine/pool.go +++ b/internal/utils/routine/pool.go @@ -1,6 +1,8 @@ package routine import ( + "context" + "runtime/pprof" "sync" "sync/atomic" @@ -29,8 +31,22 @@ func InitPool(size int) { p, _ = ants.NewPool(size, ants.WithNonblocking(false)) } -func Submit(f func()) { - p.Submit(f) +func Submit(labels map[string]string, f func()) { + if labels == nil { + labels = map[string]string{} + } + + p.Submit(func() { + label := []string{} + if len(labels) > 0 { + for k, v := range labels { + label = append(label, k, v) + } + } + pprof.Do(context.Background(), pprof.Labels(label...), func(ctx context.Context) { + f() + }) + }) } func WithMaxRoutine(maxRoutine int, tasks []func(), on_finish ...func()) { @@ -42,13 +58,16 @@ func WithMaxRoutine(maxRoutine int, tasks []func(), on_finish ...func()) { maxRoutine = len(tasks) } - Submit(func() { + Submit(map[string]string{ + "module": "routine", + "function": "WithMaxRoutine", + }, func() { wg := sync.WaitGroup{} taskIndex := int32(0) for i := 0; i < maxRoutine; i++ { wg.Add(1) - Submit(func() { + Submit(nil, func() { defer wg.Done() currentIndex := atomic.AddInt32(&taskIndex, 1) for currentIndex <= int32(len(tasks)) {