Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Service Wiring / Concurrency pt 1 [ALT/Middleware] #2440

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4fb3302
squash down history for review: refactor for simpler service startup
dave-gray101 May 23, 2024
4a5e1cb
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 24, 2024
46ad1da
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 25, 2024
e2ab6cc
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 25, 2024
d0d50a1
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 25, 2024
96eec99
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 25, 2024
44e64e4
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 26, 2024
f10d36b
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 28, 2024
bdfded6
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 28, 2024
ed55e64
Merge branch 'master' into rf-service-wiring-1b
dave-gray101 May 28, 2024
2254858
merge
dave-gray101 May 29, 2024
5e12ea7
remove FiberContextExtractor in favor of http/middleware and RequestE…
dave-gray101 May 29, 2024
2930013
Merge branch 'master' into backport-ctx-locals-mw
dave-gray101 May 29, 2024
261519a
Merge branch 'master' into backport-ctx-locals-mw
dave-gray101 May 29, 2024
3e3f8a7
fix openai endpoints
dave-gray101 May 30, 2024
34def96
Merge branch 'master' into backport-ctx-locals-mw
dave-gray101 May 30, 2024
1ba99b5
middleware ok fix
dave-gray101 May 30, 2024
2d88ff1
Merge branch 'master' into backport-ctx-locals-mw
dave-gray101 May 30, 2024
16c5e68
exp
dave-gray101 May 30, 2024
c0bb79c
Merge branch 'backport-ctx-locals-mw' of ghgray101:/dave-gray101/Loca…
dave-gray101 May 30, 2024
c15c7a1
Merge branch 'master' into backport-ctx-locals-mw
dave-gray101 May 30, 2024
b141af7
Merge branch 'master' into backport-ctx-locals-mw
dave-gray101 May 30, 2024
d2cf85b
Merge branch 'master' into backport-ctx-locals-mw
dave-gray101 Jun 1, 2024
3b1cbc5
Merge branch 'master' into backport-ctx-locals-mw
dave-gray101 Jun 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/go/llm/rwkv/rwkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (llm *LLM) Load(opts *pb.ModelOptions) error {
model := rwkv.LoadFiles(opts.ModelFile, tokenizerPath, uint32(opts.GetThreads()))

if model == nil {
return fmt.Errorf("could not load model")
return fmt.Errorf("rwkv could not load model")
}
llm.rwkv = model
return nil
Expand Down
8 changes: 6 additions & 2 deletions core/application.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"github.com/go-skynet/LocalAI/core/backend"
"github.com/go-skynet/LocalAI/core/config"
"github.com/go-skynet/LocalAI/core/services"
"github.com/go-skynet/LocalAI/pkg/model"
Expand All @@ -17,20 +18,23 @@ type Application struct {
// Core Low-Level Services
BackendConfigLoader *config.BackendConfigLoader
ModelLoader *model.ModelLoader
StoresLoader *model.ModelLoader

// Backend Services
// EmbeddingsBackendService *backend.EmbeddingsBackendService
EmbeddingsBackendService *backend.EmbeddingsBackendService
// ImageGenerationBackendService *backend.ImageGenerationBackendService
// LLMBackendService *backend.LLMBackendService
// TranscriptionBackendService *backend.TranscriptionBackendService
// TextToSpeechBackendService *backend.TextToSpeechBackendService
TextToSpeechBackendService *backend.TextToSpeechBackendService
// RerankBackendService *backend.RerankBackendService

// LocalAI System Services
BackendMonitorService *services.BackendMonitorService
GalleryService *services.GalleryService
ListModelsService *services.ListModelsService
LocalAIMetricsService *services.LocalAIMetricsService
// OpenAIService *services.OpenAIService

}

// TODO [NEXT PR?]: Break up ApplicationConfig.
Expand Down
114 changes: 104 additions & 10 deletions core/backend/embeddings.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,128 @@ package backend

import (
"fmt"
"time"

"github.com/go-skynet/LocalAI/core/config"
"github.com/go-skynet/LocalAI/core/schema"
"github.com/google/uuid"
"github.com/rs/zerolog/log"

"github.com/go-skynet/LocalAI/pkg/concurrency"
"github.com/go-skynet/LocalAI/pkg/grpc"
model "github.com/go-skynet/LocalAI/pkg/model"
"github.com/go-skynet/LocalAI/pkg/model"
)

func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, backendConfig config.BackendConfig, appConfig *config.ApplicationConfig) (func() ([]float32, error), error) {
type EmbeddingsBackendService struct {
ml *model.ModelLoader
bcl *config.BackendConfigLoader
appConfig *config.ApplicationConfig
}

func NewEmbeddingsBackendService(ml *model.ModelLoader, bcl *config.BackendConfigLoader, appConfig *config.ApplicationConfig) *EmbeddingsBackendService {
return &EmbeddingsBackendService{
ml: ml,
bcl: bcl,
appConfig: appConfig,
}
}

func (ebs *EmbeddingsBackendService) Embeddings(request *schema.OpenAIRequest) *concurrency.JobResult[*schema.OpenAIRequest, *schema.OpenAIResponse] {

jr, wjr := concurrency.NewJobResult[*schema.OpenAIRequest, *schema.OpenAIResponse](request)

go func(wjr *concurrency.WritableJobResult[*schema.OpenAIRequest, *schema.OpenAIResponse]) {
id := uuid.New().String()
created := int(time.Now().Unix())
request = *wjr.Request // TODO is needed?

bc, err := ebs.bcl.LoadBackendConfigFileByName(request.Model, ebs.appConfig.ModelPath,
config.LoadOptionDebug(ebs.appConfig.Debug),
config.LoadOptionThreads(ebs.appConfig.Threads),
config.LoadOptionContextSize(ebs.appConfig.ContextSize),
config.LoadOptionF16(ebs.appConfig.F16),
)
if err != nil {
log.Error().Err(err).Str("modelPath", ebs.appConfig.ModelPath).Msg("unable to load backend config")
wjr.SetResult(nil, err)
return
}

// Set the parameters for the language model prediction
bc.UpdateFromOpenAIRequest(request)

items := []schema.Item{}

for i, s := range bc.InputToken {
// get the model function to call for the result
embedFn, err := ebs.modelEmbedding("", s, *bc)
if err != nil {
log.Error().Err(err).Ints("numeric tokens", s).Msg("error during modelEmbedding")
wjr.SetResult(nil, err)
return
}

embeddings, err := embedFn()
if err != nil {
log.Error().Err(err).Ints("numeric tokens", s).Msg("error during embedFn")
wjr.SetResult(nil, err)
return
}
items = append(items, schema.Item{Embedding: embeddings, Index: i, Object: "embedding"})
}

for i, s := range bc.InputStrings {
// get the model function to call for the result
embedFn, err := ebs.modelEmbedding(s, []int{}, *bc)
if err != nil {
log.Error().Err(err).Str("string tokens", s).Msg("error during modelEmbedding")
wjr.SetResult(nil, err)
return
}

embeddings, err := embedFn()
if err != nil {
log.Error().Err(err).Str("string tokens", s).Msg("error during embedFn")
wjr.SetResult(nil, err)
return
}
items = append(items, schema.Item{Embedding: embeddings, Index: i, Object: "embedding"})
}

resp := &schema.OpenAIResponse{
ID: id,
Created: created,
Model: request.Model, // we have to return what the user sent here, due to OpenAI spec.
Data: items,
Object: "list",
}
wjr.SetResult(resp, nil)
}(wjr)

return jr
}

func (ebs *EmbeddingsBackendService) modelEmbedding(s string, tokens []int, backendConfig config.BackendConfig) (func() ([]float32, error), error) {
modelFile := backendConfig.Model

grpcOpts := gRPCModelOpts(backendConfig)

var inferenceModel interface{}
var err error

opts := modelOpts(backendConfig, appConfig, []model.Option{
opts := modelOpts(backendConfig, ebs.appConfig, []model.Option{
model.WithLoadGRPCLoadModelOpts(grpcOpts),
model.WithThreads(uint32(*backendConfig.Threads)),
model.WithAssetDir(appConfig.AssetsDestination),
model.WithAssetDir(ebs.appConfig.AssetsDestination),
model.WithModel(modelFile),
model.WithContext(appConfig.Context),
model.WithContext(ebs.appConfig.Context),
})

if backendConfig.Backend == "" {
inferenceModel, err = loader.GreedyLoader(opts...)
inferenceModel, err = ebs.ml.GreedyLoader(opts...)
} else {
opts = append(opts, model.WithBackendString(backendConfig.Backend))
inferenceModel, err = loader.BackendLoader(opts...)
inferenceModel, err = ebs.ml.BackendLoader(opts...)
}
if err != nil {
return nil, err
Expand All @@ -39,7 +133,7 @@ func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, backendCo
switch model := inferenceModel.(type) {
case grpc.Backend:
fn = func() ([]float32, error) {
predictOptions := gRPCPredictOpts(backendConfig, loader.ModelPath)
predictOptions := gRPCPredictOpts(backendConfig, ebs.appConfig.ModelPath)
if len(tokens) > 0 {
embeds := []int32{}

Expand All @@ -48,7 +142,7 @@ func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, backendCo
}
predictOptions.EmbeddingTokens = embeds

res, err := model.Embeddings(appConfig.Context, predictOptions)
res, err := model.Embeddings(ebs.appConfig.Context, predictOptions)
if err != nil {
return nil, err
}
Expand All @@ -57,7 +151,7 @@ func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, backendCo
}
predictOptions.Embeddings = s

res, err := model.Embeddings(appConfig.Context, predictOptions)
res, err := model.Embeddings(ebs.appConfig.Context, predictOptions)
if err != nil {
return nil, err
}
Expand Down
112 changes: 84 additions & 28 deletions core/backend/tts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,58 +7,98 @@ import (
"path/filepath"

"github.com/go-skynet/LocalAI/core/config"
"github.com/go-skynet/LocalAI/core/schema"
"github.com/rs/zerolog/log"

"github.com/go-skynet/LocalAI/pkg/concurrency"
"github.com/go-skynet/LocalAI/pkg/grpc/proto"
model "github.com/go-skynet/LocalAI/pkg/model"
"github.com/go-skynet/LocalAI/pkg/model"
"github.com/go-skynet/LocalAI/pkg/utils"
)

func generateUniqueFileName(dir, baseName, ext string) string {
counter := 1
fileName := baseName + ext
type TextToSpeechBackendService struct {
ml *model.ModelLoader
bcl *config.BackendConfigLoader
appConfig *config.ApplicationConfig
}

for {
filePath := filepath.Join(dir, fileName)
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return fileName
func NewTextToSpeechBackendService(ml *model.ModelLoader, bcl *config.BackendConfigLoader, appConfig *config.ApplicationConfig) *TextToSpeechBackendService {
return &TextToSpeechBackendService{
ml: ml,
bcl: bcl,
appConfig: appConfig,
}
}

func (ttsbs *TextToSpeechBackendService) TextToAudioFile(request *schema.TTSRequest) *concurrency.JobResult[*schema.TTSRequest, string] {
jr, wjr := concurrency.NewJobResult[*schema.TTSRequest, string](request)

go func(wjr *concurrency.WritableJobResult[*schema.TTSRequest, string]) {
if request.Model == "" {
wjr.SetResult("", fmt.Errorf("model is required, no default available"))
return
}
bc, err := ttsbs.bcl.LoadBackendConfigFileByName(request.Model, ttsbs.appConfig.ModelPath,
config.LoadOptionDebug(ttsbs.appConfig.Debug),
config.LoadOptionThreads(ttsbs.appConfig.Threads),
config.LoadOptionContextSize(ttsbs.appConfig.ContextSize),
config.LoadOptionF16(ttsbs.appConfig.F16),
)
if err != nil || bc == nil {
log.Error().Err(err).Str("modelName", request.Model).Str("modelPath", ttsbs.appConfig.ModelPath).Msg("unable to load backend config")
wjr.SetResult("", err)
return
}

counter++
fileName = fmt.Sprintf("%s_%d%s", baseName, counter, ext)
}
if request.Backend != "" { // Allow users to specify a backend to use that overrides config.
bc.Backend = request.Backend
}
// TODO consider merging the below function in, but leave it seperated for diff reasons in the first PR
dst, err := ttsbs.modelTTS(request.Backend, request.Input, bc.Model, request.Voice, *bc)
log.Debug().Str("dst", dst).Err(err).Msg("modelTTS result in goroutine")
wjr.SetResult(dst, err)
}(wjr)

return jr
}

func ModelTTS(backend, text, modelFile, voice string, loader *model.ModelLoader, appConfig *config.ApplicationConfig, backendConfig config.BackendConfig) (string, *proto.Result, error) {
func (ttsbs *TextToSpeechBackendService) modelTTS(backend, text, modelFile, voice string, backendConfig config.BackendConfig) (string, error) {
bb := backend
if bb == "" {
bb = model.PiperBackend
}

grpcOpts := gRPCModelOpts(backendConfig)

opts := modelOpts(config.BackendConfig{}, appConfig, []model.Option{
opts := modelOpts(config.BackendConfig{}, ttsbs.appConfig, []model.Option{
model.WithBackendString(bb),
model.WithModel(modelFile),
model.WithContext(appConfig.Context),
model.WithAssetDir(appConfig.AssetsDestination),
model.WithContext(ttsbs.appConfig.Context),
model.WithAssetDir(ttsbs.appConfig.AssetsDestination),
model.WithLoadGRPCLoadModelOpts(grpcOpts),
})
ttsModel, err := loader.BackendLoader(opts...)
ttsModel, err := ttsbs.ml.BackendLoader(opts...)
if err != nil {
return "", nil, err
return "", err
}

if ttsModel == nil {
return "", nil, fmt.Errorf("could not load piper model")
return "", fmt.Errorf("could not load piper model")
}

if err := os.MkdirAll(appConfig.AudioDir, 0750); err != nil {
return "", nil, fmt.Errorf("failed creating audio directory: %s", err)
if ttsbs.appConfig.AudioDir == "" {
return "", fmt.Errorf("ApplicationConfig.AudioDir not set, cannot continue")
}

fileName := generateUniqueFileName(appConfig.AudioDir, "tts", ".wav")
filePath := filepath.Join(appConfig.AudioDir, fileName)
// Shouldn't be needed anymore. Consider removing later
if err := os.MkdirAll(ttsbs.appConfig.AudioDir, 0750); err != nil {
return "", fmt.Errorf("failed` creating audio directory: %s", err)
}

fileName := generateUniqueFileName(ttsbs.appConfig.AudioDir, "tts", ".wav")
filePath := filepath.Join(ttsbs.appConfig.AudioDir, fileName)

log.Debug().Str("filePath", filePath).Msg("computed output filePath")

// If the model file is not empty, we pass it joined with the model path
modelPath := ""
Expand All @@ -67,23 +107,39 @@ func ModelTTS(backend, text, modelFile, voice string, loader *model.ModelLoader,
// Checking first that it exists and is not outside ModelPath
// TODO: we should actually first check if the modelFile is looking like
// a FS path
mp := filepath.Join(loader.ModelPath, modelFile)
mp := filepath.Join(ttsbs.appConfig.ModelPath, modelFile)
if _, err := os.Stat(mp); err == nil {
if err := utils.VerifyPath(mp, appConfig.ModelPath); err != nil {
return "", nil, err
if err := utils.VerifyPath(mp, ttsbs.appConfig.ModelPath); err != nil {
return "", err
}
modelPath = mp
} else {
modelPath = modelFile
}
}

res, err := ttsModel.TTS(context.Background(), &proto.TTSRequest{
_, err = ttsModel.TTS(context.Background(), &proto.TTSRequest{
Text: text,
Model: modelPath,
Voice: voice,
Dst: filePath,
})

return filePath, res, err
return filePath, err
}

func generateUniqueFileName(dir, baseName, ext string) string {
counter := 1
fileName := baseName + ext

for {
filePath := filepath.Join(dir, fileName)
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return fileName
}

counter++
fileName = fmt.Sprintf("%s_%d%s", baseName, counter, ext)
}
}
6 changes: 3 additions & 3 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,16 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
}

if r.PreloadBackendOnly {
_, _, _, err := startup.Startup(opts...)
_, err := startup.Startup(opts...)
return err
}

cl, ml, options, err := startup.Startup(opts...)
app, err := startup.Startup(opts...)
if err != nil {
return fmt.Errorf("failed basic startup tasks with error %s", err.Error())
}

appHTTP, err := http.App(cl, ml, options)
appHTTP, err := http.App(app)
if err != nil {
log.Error().Err(err).Msg("error during HTTP App construction")
return err
Expand Down
Loading
Loading