Skip to content

Commit

Permalink
feat: improve graph name display
Browse files Browse the repository at this point in the history
  • Loading branch information
mrh997 committed Dec 23, 2024
1 parent 636a7f7 commit 4c0b837
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 161 deletions.
39 changes: 0 additions & 39 deletions devops/callback_test.go

This file was deleted.

23 changes: 3 additions & 20 deletions devops/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,15 @@ import (
"context"
"time"

"github.com/cloudwego/eino/compose"

"github.com/cloudwego/eino-ext/devops/internal/apihandler"
"github.com/cloudwego/eino-ext/devops/internal/service"
"github.com/cloudwego/eino-ext/devops/internal/utils/safego"
"github.com/cloudwego/eino/compose"
)

// Deprecated: use Init instead.
func Run(ctx context.Context, opts ...ServerOption) error {
o := newServerOption(opts)

errCh := make(chan error)
safego.Go(ctx, func() {
errCh <- apihandler.StartHTTPServer(ctx, o.port)
})

select {
case err := <-errCh:
return err
case <-time.After(2 * time.Second):
return nil
}
}

// Init start einodev.
func Init(ctx context.Context, opts ...ServerOption) error {
compose.InitGraphCompileCallbacks([]compose.GraphCompileCallback{newGlobalDevGraphCompileCallback()})
compose.InitGraphCompileCallbacks([]compose.GraphCompileCallback{service.NewGlobalDevGraphCompileCallback()})

o := newServerOption(opts)

Expand Down
7 changes: 0 additions & 7 deletions devops/dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ import (

func Test_Debug(t *testing.T) {
ctx := context.Background()
PatchConvey("Test Success", t, func() {
PatchConvey("Test success", func() {
actualErr := Run(ctx)
assert.Nil(t, actualErr)
})
})

PatchConvey("Test Init Success", t, func() {
PatchConvey("Test success", func() {
actualErr := Init(ctx)
Expand Down
2 changes: 1 addition & 1 deletion devops/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/bytedance/mockey v1.2.12
github.com/cloudwego/eino v0.3.0
github.com/cloudwego/eino v0.3.1-alpha
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/stretchr/testify v1.9.0
Expand Down
6 changes: 4 additions & 2 deletions devops/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4
github.com/certifi/gocertifi v0.0.0-20190105021004-abcd57078448/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/eino v0.3.0 h1:Xp/zqvyyskRn0obOUvH0Rj/INZwq68z9vvTjXOsNhLw=
github.com/cloudwego/eino v0.3.0/go.mod h1:+kmJimGEcKuSI6OKhet7kBedkm1WUZS3H1QRazxgWUo=
github.com/cloudwego/eino v0.3.1-0.20241220071856-1f94cb26cbb4 h1:gZPPolzLPBphgCO1elRXxc0VLI9JXLXN6377aoFQElY=
github.com/cloudwego/eino v0.3.1-0.20241220071856-1f94cb26cbb4/go.mod h1:+kmJimGEcKuSI6OKhet7kBedkm1WUZS3H1QRazxgWUo=
github.com/cloudwego/eino v0.3.1-alpha h1:wwxusSiiWnt+tX/i+BevCGbbPuHeiGoBu50btzbBgn0=
github.com/cloudwego/eino v0.3.1-alpha/go.mod h1:+kmJimGEcKuSI6OKhet7kBedkm1WUZS3H1QRazxgWUo=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
27 changes: 6 additions & 21 deletions devops/internal/mock/container_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions devops/internal/service/call_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"reflect"
"time"

"github.com/cloudwego/eino-ext/devops/internal/model"
"github.com/cloudwego/eino-ext/devops/internal/utils/log"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/embedding"
"github.com/cloudwego/eino/components/indexer"
Expand All @@ -32,8 +34,6 @@ import (
"github.com/cloudwego/eino/components/retriever"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino-ext/devops/internal/model"
"github.com/cloudwego/eino-ext/devops/internal/utils/log"
)

func newCallbackOption(nodeKey, threadID string, node compose.GraphNodeInfo, stateCh chan *model.NodeDebugState) compose.Option {
Expand All @@ -43,7 +43,7 @@ func newCallbackOption(nodeKey, threadID string, node compose.GraphNodeInfo, sta
stateCh: stateCh,
node: node,
}
op := compose.WithNodeCallbacks(cb).DesignateNode(nodeKey)
op := compose.WithCallbacks(cb).DesignateNode(nodeKey)
return op
}

Expand Down
52 changes: 39 additions & 13 deletions devops/callback.go → devops/internal/service/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,50 @@
* limitations under the License.
*/

package einodev
package service

import (
"context"
"fmt"
"runtime"
"strings"

"github.com/cloudwego/eino-ext/devops/internal/model"
"github.com/cloudwego/eino/compose"

"github.com/cloudwego/eino-ext/devops/internal/model"
"github.com/cloudwego/eino-ext/devops/internal/service"
"github.com/cloudwego/eino-ext/devops/internal/utils/log"
)

const (
einodevBuildIdentify = "einodev/internal/model.GraphInfo.BuildDevGraph"
devBuildIdentify = "devops/internal/service.(*containerServiceImpl).CreateRunnable"
compileCallerDepth = 8
)

type globalDevGraphCompileCallback struct {
onFinish func(ctx context.Context, graphInfo *compose.GraphInfo)
}

func newGlobalDevGraphCompileCallback(opts ...DevOption) compose.GraphCompileCallback {
func NewGlobalDevGraphCompileCallback() compose.GraphCompileCallback {
onFinish := func(ctx context.Context, graphInfo *compose.GraphInfo) {
if graphInfo == nil || strings.Contains(graphInfo.Key, einodevBuildIdentify) {
if graphInfo == nil {
return
}

frame := getCompileFrame(compileCallerDepth)
if strings.Contains(frame.Function, devBuildIdentify) {
return
}

opt := model.GraphOption{
GenState: graphInfo.GenStateFn,
}

graphName := extractLastSegment(graphInfo.Key)
graphName := graphInfo.Name
if graphName == "" {
graphName = genGraphName()
}

_, err := service.ContainerSVC.AddGlobalGraphInfo(graphName, graphInfo, opt)
_, err := ContainerSVC.AddGraphInfo(graphName, graphInfo, opt)
if err != nil {
log.Errorf(err.Error())
}
Expand All @@ -62,11 +72,27 @@ func (d globalDevGraphCompileCallback) OnFinish(ctx context.Context, graphInfo *
d.onFinish(ctx, graphInfo)
}

func extractLastSegment(str string) string {
lastSlashIndex := strings.LastIndex(str, "/")
func getCompileFrame(skip int) runtime.Frame {
pcs := make([]uintptr, 1)
_ = runtime.Callers(skip, pcs)
frame, _ := runtime.CallersFrames(pcs).Next()
return frame
}

func genGraphName() string {
frame := getCompileFrame(compileCallerDepth)

if lastSlashIndex != -1 {
return str[lastSlashIndex+1:]
file := strings.TrimSuffix(frame.File, ".go")
lastSlashIdx := strings.LastIndex(file, "/")
if lastSlashIdx != -1 {
file = file[lastSlashIdx+1:]
}
return str

fun := frame.Function
lastDotIdx := strings.LastIndex(fun, ".")
if lastDotIdx != -1 {
fun = fun[lastDotIdx+1:]
}

return fmt.Sprintf("%s.%s:%d", file, fun, frame.Line)
}
99 changes: 99 additions & 0 deletions devops/internal/service/callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package service

import (
"context"
"testing"

"github.com/bytedance/mockey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"

"github.com/cloudwego/eino-ext/devops/internal/mock"
"github.com/cloudwego/eino-ext/devops/internal/model"
"github.com/cloudwego/eino/compose"
)

type callbackTestSuite struct {
suite.Suite
ctrl *gomock.Controller
ctx context.Context

mockContainer *mock.MockContainerService
}

func Test_callbackTestSuite(t *testing.T) {
suite.Run(t, new(callbackTestSuite))
}

func (c *callbackTestSuite) SetupSuite() {
c.ctrl = gomock.NewController(c.T())

c.mockContainer = mock.NewMockContainerService(c.ctrl)
ContainerSVC = c.mockContainer
}

func (c *callbackTestSuite) SetupTest() {
c.ctx = context.Background()
}

func (c *callbackTestSuite) buildCallbackGraph() {
g := compose.NewGraph[string, string]()
_ = g.AddLambdaNode("node", compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return input, nil
}))
_ = g.AddEdge(compose.START, "node")
_ = g.AddEdge("node", compose.END)
_, err := g.Compile(context.Background(), compose.WithGraphCompileCallbacks(NewGlobalDevGraphCompileCallback()))
assert.NoError(c.T(), err)
}

func (c *callbackTestSuite) Test_NewGlobalDevGraphCompileCallback() {
mockey.PatchConvey("add graph with no graph name", c.T(), func() {
c.mockContainer.EXPECT().AddGraphInfo(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(graphName string, graphInfo *compose.GraphInfo, graphOpt model.GraphOption) (graphID string, err error) {
assert.Equal(c.T(), "callback_test.buildCallbackGraph:63", graphName)
return "", nil
}).Times(1)

c.buildCallbackGraph()
})

mockey.PatchConvey("skip einodev compile graph", c.T(), func() {
var gi model.GraphInfo
c.mockContainer.EXPECT().AddGraphInfo(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(graphName string, graphInfo *compose.GraphInfo, graphOpt model.GraphOption) (graphID string, err error) {
gi = model.GraphInfo{
GraphInfo: graphInfo,
Option: graphOpt,
}

assert.Equal(c.T(), "callback_test.buildCallbackGraph:63", graphName)
return "", nil
}).Times(1)
c.buildCallbackGraph()

mockID := "mock_graph_id"
svcImpl := containerServiceImpl{
container: map[string]*model.GraphContainer{mockID: {GraphInfo: &gi}},
}
_, err := svcImpl.CreateRunnable(mockID, compose.START)
assert.NoError(c.T(), err)
})
}
Loading

0 comments on commit 4c0b837

Please sign in to comment.