Skip to content

Commit

Permalink
feat: support for pprof
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Dec 4, 2024
1 parent f9fc134 commit 407fb94
Show file tree
Hide file tree
Showing 22 changed files with 168 additions and 30 deletions.
5 changes: 4 additions & 1 deletion internal/cluster/lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func (c *Cluster) clusterLifetime() {
defer pluginSchedulerTicker.Stop()

// vote for all ips and find the best one, prepare for later traffic scheduling
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "cluster",
"function": "voteAddressesWhenInit",
}, func() {
if err := c.updateNodeStatus(); err != nil {
log.Error("failed to update the status of the node: %s", err.Error())
}
Expand Down
5 changes: 4 additions & 1 deletion internal/core/dify_invocation/real/http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func StreamResponse[T any](i *RealBackwardsInvocation, method string, path strin
newResponse.OnClose(func() {
response.Close()
})
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "dify_invocation",
"function": "StreamResponse",
}, func() {
defer newResponse.Close()
for response.Next() {
t, err := response.Read()
Expand Down
8 changes: 4 additions & 4 deletions internal/core/dify_invocation/tester/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewMockedDifyInvocation() dify_invocation.BackwardsInvocation {

func (m *MockedDifyInvocation) InvokeLLM(payload *dify_invocation.InvokeLLMRequest) (*stream.Stream[model_entities.LLMResultChunk], error) {
stream := stream.NewStream[model_entities.LLMResultChunk](5)
routine.Submit(func() {
routine.Submit(nil, func() {
stream.Write(model_entities.LLMResultChunk{
Model: model_entities.LLMModel(payload.Model),
PromptMessages: payload.PromptMessages,
Expand Down Expand Up @@ -129,7 +129,7 @@ func (m *MockedDifyInvocation) InvokeRerank(payload *dify_invocation.InvokeReran

func (m *MockedDifyInvocation) InvokeTTS(payload *dify_invocation.InvokeTTSRequest) (*stream.Stream[model_entities.TTSResult], error) {
stream := stream.NewStream[model_entities.TTSResult](5)
routine.Submit(func() {
routine.Submit(nil, func() {
for i := 0; i < 10; i++ {
stream.Write(model_entities.TTSResult{
Result: "a1b2c3d4",
Expand Down Expand Up @@ -157,7 +157,7 @@ func (m *MockedDifyInvocation) InvokeModeration(payload *dify_invocation.InvokeM

func (m *MockedDifyInvocation) InvokeTool(payload *dify_invocation.InvokeToolRequest) (*stream.Stream[tool_entities.ToolResponseChunk], error) {
stream := stream.NewStream[tool_entities.ToolResponseChunk](5)
routine.Submit(func() {
routine.Submit(nil, func() {
for i := 0; i < 10; i++ {
stream.Write(tool_entities.ToolResponseChunk{
Type: tool_entities.ToolResponseChunkTypeText,
Expand All @@ -175,7 +175,7 @@ func (m *MockedDifyInvocation) InvokeTool(payload *dify_invocation.InvokeToolReq

func (m *MockedDifyInvocation) InvokeApp(payload *dify_invocation.InvokeAppRequest) (*stream.Stream[map[string]any], error) {
stream := stream.NewStream[map[string]any](5)
routine.Submit(func() {
routine.Submit(nil, func() {
stream.Write(map[string]any{
"event": "agent_message",
"message_id": "5ad4cb98-f0c7-4085-b384-88c403be6290",
Expand Down
5 changes: 4 additions & 1 deletion internal/core/plugin_daemon/backwards_invocation/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func InvokeDify(
}

// dispatch invocation task
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_daemon",
"function": "InvokeDify",
}, func() {
dispatchDifyInvocationTask(requestHandle)
defer requestHandle.EndResponse()
})
Expand Down
6 changes: 5 additions & 1 deletion internal/core/plugin_daemon/endpoint_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ func InvokeEndpoint(
}

response.Write(dehexed)
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_daemon",
"function": "InvokeEndpoint",
"type": "body_write",
}, func() {
defer response.Close()
for resp.Next() {
chunk, err := resp.Read()
Expand Down
7 changes: 6 additions & 1 deletion internal/core/plugin_daemon/tool_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ func InvokeTool(
}

newResponse := stream.NewStream[tool_entities.ToolResponseChunk](128)
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_daemon",
"function": "InvokeTool",
"tool_name": request.Tool,
"tool_provider": request.Provider,
}, func() {
files := make(map[string]*bytes.Buffer)
defer newResponse.Close()

Expand Down
7 changes: 6 additions & 1 deletion internal/core/plugin_manager/aws_manager/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Dify-Plugin-Session-ID", session_id)

routine.Submit(func() {
routine.Submit(map[string]string{
"module": "aws_manager",
"function": "Write",
"session_id": session_id,
"lambda_url": r.LambdaURL,
}, func() {
// remove the session from listeners
defer r.listeners.Delete(session_id)
defer l.Close()
Expand Down
5 changes: 4 additions & 1 deletion internal/core/plugin_manager/install_to_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func (p *PluginManager) InstallToLocal(
}

response := stream.NewStream[PluginInstallResponse](128)
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "InstallToLocal",
}, func() {
defer response.Close()

ticker := time.NewTicker(time.Second * 5) // check heartbeat every 5 seconds
Expand Down
8 changes: 7 additions & 1 deletion internal/core/plugin_manager/install_to_serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ func (p *PluginManager) InstallToAWSFromPkg(
}

newResponse := stream.NewStream[PluginInstallResponse](128)
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "InstallToAWSFromPkg",
"checksum": checksum,
"unique_identity": uniqueIdentity.String(),
"source": source,
}, func() {
defer func() {
newResponse.Close()
}()
Expand Down
10 changes: 8 additions & 2 deletions internal/core/plugin_manager/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.Plugi
errChan := make(chan error)

// local plugin
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "LaunchLocal",
}, func() {
defer func() {
if r := recover(); r != nil {
log.Error("plugin runtime panic: %v", r)
Expand All @@ -165,7 +168,10 @@ func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.Plugi

// add max launching lock to prevent too many plugins launching at the same time
p.maxLaunchingLock <- true
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "LaunchLocal",
}, func() {
// wait for plugin launched
<-launchedChan
// release max launching lock
Expand Down
15 changes: 12 additions & 3 deletions internal/core/plugin_manager/local_manager/environment_python.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {

lastActiveAt := time.Now()

routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "InitPythonEnvironment",
}, func() {
defer wg.Done()
// read stdout
buf := make([]byte, 1024)
Expand All @@ -123,7 +126,10 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
}
})

routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "InitPythonEnvironment",
}, func() {
defer wg.Done()
// read stderr
buf := make([]byte, 1024)
Expand All @@ -144,7 +150,10 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
}
})

routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "InitPythonEnvironment",
}, func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
Expand Down
12 changes: 10 additions & 2 deletions internal/core/plugin_manager/local_manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,21 @@ func (r *LocalPluginRuntime) StartPlugin() error {
wg.Add(2)

// listen to plugin stdout
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"type": "local",
"function": "StartStdout",
}, func() {
defer wg.Done()
stdio.StartStdout(func() {})
})

// listen to plugin stderr
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"type": "local",
"function": "StartStderr",
}, func() {
defer wg.Done()
stdio.StartStderr()
})
Expand Down
11 changes: 10 additions & 1 deletion internal/core/plugin_manager/remote_manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,17 @@ func (r *RemotePluginRuntime) Type() plugin_entities.PluginRuntimeType {
func (r *RemotePluginRuntime) StartPlugin() error {
var exitError error

identity, err := r.Identity()
if err != nil {
return err
}

// handle heartbeat
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "remote_manager",
"function": "StartPlugin",
"plugin_id": identity.String(),
}, func() {
r.lastActiveAt = time.Now()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
Expand Down
7 changes: 6 additions & 1 deletion internal/core/plugin_manager/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) {
return
}
p.m.Store(identity.String(), rpr)
routine.Submit(func() {
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "startRemoteWatcher",
"plugin_id": identity.String(),
"type": "remote",
}, func() {
defer func() {
if err := recover(); err != nil {
log.Error("plugin runtime error: %v", err)
Expand Down
27 changes: 27 additions & 0 deletions internal/server/controllers/pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package controllers

import (
"net/http/pprof"

"github.com/gin-gonic/gin"
)

func PprofIndex(c *gin.Context) {
pprof.Index(c.Writer, c.Request)
}

func PprofCmdline(c *gin.Context) {
pprof.Cmdline(c.Writer, c.Request)
}

func PprofProfile(c *gin.Context) {
pprof.Profile(c.Writer, c.Request)
}

func PprofSymbol(c *gin.Context) {
pprof.Symbol(c.Writer, c.Request)
}

func PprofTrace(c *gin.Context) {
pprof.Trace(c.Writer, c.Request)
}
13 changes: 13 additions & 0 deletions internal/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (app *App) server(config *app.Config) func() {
app.endpointGroup(engine.Group("/e"), config)
app.awsLambdaTransactionGroup(engine.Group("/backwards-invocation"), config)
app.pluginGroup(engine.Group("/plugin/:tenant_id"), config)
app.pprofGroup(engine.Group("/debug/pprof"), config)

srv := &http.Server{
Addr: fmt.Sprintf(":%d", config.ServerPort),
Expand Down Expand Up @@ -137,3 +138,15 @@ func (app *App) pluginManagementGroup(group *gin.RouterGroup, config *app.Config
func (app *App) pluginAssetGroup(group *gin.RouterGroup) {
group.GET("/:id", gzip.Gzip(gzip.DefaultCompression), controllers.GetAsset)
}

func (app *App) pprofGroup(group *gin.RouterGroup, config *app.Config) {
if config.PPROFEnabled {
group.Use(CheckingKey(config.ServerKey))

group.GET("/", controllers.PprofIndex)
group.GET("/cmdline", controllers.PprofCmdline)
group.GET("/profile", controllers.PprofProfile)
group.GET("/symbol", controllers.PprofSymbol)
group.GET("/trace", controllers.PprofTrace)
}
}
3 changes: 1 addition & 2 deletions internal/server/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ func CheckingKey(key string) gin.HandlerFunc {
return func(c *gin.Context) {
// get header X-Api-Key
if c.GetHeader(constants.X_API_KEY) != key {
c.JSON(200, exception.UnauthorizedError().ToResponse())
c.Abort()
c.AbortWithStatusJSON(401, exception.UnauthorizedError().ToResponse())
return
}

Expand Down
5 changes: 4 additions & 1 deletion internal/service/base_sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func baseSSEService[R any](
return
}

routine.Submit(func() {
routine.Submit(map[string]string{
"module": "service",
"function": "baseSSEService",
}, func() {
for pluginDaemonResponse.Next() {
chunk, err := pluginDaemonResponse.Read()
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion internal/service/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ func Endpoint(
}
defer close()

routine.Submit(func() {
routine.Submit(map[string]string{
"module": "service",
"function": "Endpoint",
}, func() {
defer close()
for response.Next() {
chunk, err := response.Read()
Expand Down
2 changes: 2 additions & 0 deletions internal/types/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type Config struct {
PythonInterpreterPath string `envconfig:"PYTHON_INTERPRETER_PATH"`

DisplayClusterLog bool `envconfig:"DISPLAY_CLUSTER_LOG"`

PPROFEnabled bool `envconfig:"PPROF_ENABLED"`
}

func (c *Config) Validate() error {
Expand Down
5 changes: 4 additions & 1 deletion internal/utils/http_requests/http_warpper.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
resp.Body.Close()
})

routine.Submit(func() {
routine.Submit(map[string]string{
"module": "http_requests",
"function": "RequestAndParseStream",
}, func() {
scanner := bufio.NewScanner(resp.Body)
defer resp.Body.Close()

Expand Down
Loading

0 comments on commit 407fb94

Please sign in to comment.