From fa5c98549aae32df63a9c3e34574701e45287d29 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 26 Sep 2024 12:44:55 +0200 Subject: [PATCH] chore(refactor): track grpcProcess in the model structure (#3663) * chore(refactor): track grpcProcess in the model structure This avoids to have to handle in two parts the data relative to the same model. It makes it easier to track and use mutex with. This also fixes races conditions while accessing to the model. Signed-off-by: Ettore Di Giacinto * chore(tests): run protogen-go before starting aio tests Signed-off-by: Ettore Di Giacinto * chore(tests): install protoc in aio tests Signed-off-by: Ettore Di Giacinto --------- Signed-off-by: Ettore Di Giacinto --- .github/workflows/test.yml | 11 ++++++++++- Makefile | 2 +- pkg/model/initializers.go | 15 ++++++++++----- pkg/model/loader.go | 32 ++++++++++++++------------------ pkg/model/loader_test.go | 4 ++-- pkg/model/model.go | 18 ++++++++++++++++-- pkg/model/process.go | 33 ++++++++++++++++++--------------- 7 files changed, 71 insertions(+), 44 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2af3fd002ec7..b62f86ef4650 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -178,13 +178,22 @@ jobs: uses: actions/checkout@v4 with: submodules: true + - name: Dependencies + run: | + # Install protoc + curl -L -s https://github.com/protocolbuffers/protobuf/releases/download/v26.1/protoc-26.1-linux-x86_64.zip -o protoc.zip && \ + unzip -j -d /usr/local/bin protoc.zip bin/protoc && \ + rm protoc.zip + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.34.2 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@1958fcbe2ca8bd93af633f11e97d44e567e945af + PATH="$PATH:$HOME/go/bin" make protogen-go - name: Build images run: | docker build --build-arg FFMPEG=true --build-arg IMAGE_TYPE=extras --build-arg EXTRA_BACKENDS=rerankers --build-arg MAKEFLAGS="--jobs=5 --output-sync=target" -t local-ai:tests -f Dockerfile . BASE_IMAGE=local-ai:tests DOCKER_AIO_IMAGE=local-ai-aio:test make docker-aio - name: Test run: | - LOCALAI_MODELS_DIR=$PWD/models LOCALAI_IMAGE_TAG=test LOCALAI_IMAGE=local-ai-aio \ + PATH="$PATH:$HOME/go/bin" LOCALAI_MODELS_DIR=$PWD/models LOCALAI_IMAGE_TAG=test LOCALAI_IMAGE=local-ai-aio \ make run-e2e-aio - name: Setup tmate session if tests fail if: ${{ failure() }} diff --git a/Makefile b/Makefile index 121b8e50127b..4efee986b4f7 100644 --- a/Makefile +++ b/Makefile @@ -468,7 +468,7 @@ run-e2e-image: ls -liah $(abspath ./tests/e2e-fixtures) docker run -p 5390:8080 -e MODELS_PATH=/models -e THREADS=1 -e DEBUG=true -d --rm -v $(TEST_DIR):/models --gpus all --name e2e-tests-$(RANDOM) localai-tests -run-e2e-aio: +run-e2e-aio: protogen-go @echo 'Running e2e AIO tests' $(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --flake-attempts 5 -v -r ./tests/e2e-aio diff --git a/pkg/model/initializers.go b/pkg/model/initializers.go index 80dd10b47e44..d0f47373755b 100644 --- a/pkg/model/initializers.go +++ b/pkg/model/initializers.go @@ -304,18 +304,19 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string return nil, fmt.Errorf("failed allocating free ports: %s", err.Error()) } // Make sure the process is executable - if err := ml.startProcess(uri, o.model, serverAddress); err != nil { + process, err := ml.startProcess(uri, o.model, serverAddress) + if err != nil { log.Error().Err(err).Str("path", uri).Msg("failed to launch ") return nil, err } log.Debug().Msgf("GRPC Service Started") - client = NewModel(modelName, serverAddress) + client = NewModel(modelName, serverAddress, process) } else { log.Debug().Msg("external backend is uri") // address - client = NewModel(modelName, uri) + client = NewModel(modelName, uri, nil) } } else { grpcProcess := backendPath(o.assetDir, backend) @@ -346,13 +347,14 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string args, grpcProcess = library.LoadLDSO(o.assetDir, args, grpcProcess) // Make sure the process is executable in any circumstance - if err := ml.startProcess(grpcProcess, o.model, serverAddress, args...); err != nil { + process, err := ml.startProcess(grpcProcess, o.model, serverAddress, args...) + if err != nil { return nil, err } log.Debug().Msgf("GRPC Service Started") - client = NewModel(modelName, serverAddress) + client = NewModel(modelName, serverAddress, process) } log.Debug().Msgf("Wait for the service to start up") @@ -374,6 +376,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string if !ready { log.Debug().Msgf("GRPC Service NOT ready") + ml.deleteProcess(o.model) return nil, fmt.Errorf("grpc service not ready") } @@ -385,9 +388,11 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options) if err != nil { + ml.deleteProcess(o.model) return nil, fmt.Errorf("could not load model: %w", err) } if !res.Success { + ml.deleteProcess(o.model) return nil, fmt.Errorf("could not load model (no success): %s", res.Message) } diff --git a/pkg/model/loader.go b/pkg/model/loader.go index 4f1ec8411641..68ac1a31032f 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -13,7 +13,6 @@ import ( "github.com/mudler/LocalAI/pkg/utils" - process "github.com/mudler/go-processmanager" "github.com/rs/zerolog/log" ) @@ -21,20 +20,18 @@ import ( // TODO: Split ModelLoader and TemplateLoader? Just to keep things more organized. Left together to share a mutex until I look into that. Would split if we seperate directories for .bin/.yaml and .tmpl type ModelLoader struct { - ModelPath string - mu sync.Mutex - models map[string]*Model - grpcProcesses map[string]*process.Process - templates *templates.TemplateCache - wd *WatchDog + ModelPath string + mu sync.Mutex + models map[string]*Model + templates *templates.TemplateCache + wd *WatchDog } func NewModelLoader(modelPath string) *ModelLoader { nml := &ModelLoader{ - ModelPath: modelPath, - models: make(map[string]*Model), - templates: templates.NewTemplateCache(modelPath), - grpcProcesses: make(map[string]*process.Process), + ModelPath: modelPath, + models: make(map[string]*Model), + templates: templates.NewTemplateCache(modelPath), } return nml @@ -127,6 +124,8 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) ( modelFile := filepath.Join(ml.ModelPath, modelName) log.Debug().Msgf("Loading model in memory from file: %s", modelFile) + ml.mu.Lock() + defer ml.mu.Unlock() model, err := loader(modelName, modelFile) if err != nil { return nil, err @@ -136,8 +135,6 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) ( return nil, fmt.Errorf("loader didn't return a model") } - ml.mu.Lock() - defer ml.mu.Unlock() ml.models[modelName] = model return model, nil @@ -146,14 +143,13 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) ( func (ml *ModelLoader) ShutdownModel(modelName string) error { ml.mu.Lock() defer ml.mu.Unlock() - - _, ok := ml.models[modelName] + model, ok := ml.models[modelName] if !ok { return fmt.Errorf("model %s not found", modelName) } retries := 1 - for ml.models[modelName].GRPC(false, ml.wd).IsBusy() { + for model.GRPC(false, ml.wd).IsBusy() { log.Debug().Msgf("%s busy. Waiting.", modelName) dur := time.Duration(retries*2) * time.Second if dur > retryTimeout { @@ -185,8 +181,8 @@ func (ml *ModelLoader) CheckIsLoaded(s string) *Model { if !alive { log.Warn().Msgf("GRPC Model not responding: %s", err.Error()) log.Warn().Msgf("Deleting the process in order to recreate it") - process, exists := ml.grpcProcesses[s] - if !exists { + process := m.Process() + if process == nil { log.Error().Msgf("Process not found for '%s' and the model is not responding anymore !", s) return m } diff --git a/pkg/model/loader_test.go b/pkg/model/loader_test.go index c16a6e5063da..d0ad4e0cfef1 100644 --- a/pkg/model/loader_test.go +++ b/pkg/model/loader_test.go @@ -63,7 +63,7 @@ var _ = Describe("ModelLoader", func() { Context("LoadModel", func() { It("should load a model and keep it in memory", func() { - mockModel = model.NewModel("foo", "test.model") + mockModel = model.NewModel("foo", "test.model", nil) mockLoader := func(modelName, modelFile string) (*model.Model, error) { return mockModel, nil @@ -88,7 +88,7 @@ var _ = Describe("ModelLoader", func() { Context("ShutdownModel", func() { It("should shutdown a loaded model", func() { - mockModel = model.NewModel("foo", "test.model") + mockModel = model.NewModel("foo", "test.model", nil) mockLoader := func(modelName, modelFile string) (*model.Model, error) { return mockModel, nil diff --git a/pkg/model/model.go b/pkg/model/model.go index 6cb81d1032ae..6e4fd31680f3 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -1,20 +1,32 @@ package model -import grpc "github.com/mudler/LocalAI/pkg/grpc" +import ( + "sync" + + grpc "github.com/mudler/LocalAI/pkg/grpc" + process "github.com/mudler/go-processmanager" +) type Model struct { ID string `json:"id"` address string client grpc.Backend + process *process.Process + sync.Mutex } -func NewModel(ID, address string) *Model { +func NewModel(ID, address string, process *process.Process) *Model { return &Model{ ID: ID, address: address, + process: process, } } +func (m *Model) Process() *process.Process { + return m.process +} + func (m *Model) GRPC(parallel bool, wd *WatchDog) grpc.Backend { if m.client != nil { return m.client @@ -25,6 +37,8 @@ func (m *Model) GRPC(parallel bool, wd *WatchDog) grpc.Backend { enableWD = true } + m.Lock() + defer m.Unlock() m.client = grpc.NewClient(m.address, parallel, wd, enableWD) return m.client } diff --git a/pkg/model/process.go b/pkg/model/process.go index bcd1fccb26b6..48631d79f1b5 100644 --- a/pkg/model/process.go +++ b/pkg/model/process.go @@ -16,20 +16,22 @@ import ( ) func (ml *ModelLoader) deleteProcess(s string) error { - if _, exists := ml.grpcProcesses[s]; exists { - if err := ml.grpcProcesses[s].Stop(); err != nil { - log.Error().Err(err).Msgf("(deleteProcess) error while deleting grpc process %s", s) + if m, exists := ml.models[s]; exists { + process := m.Process() + if process != nil { + if err := process.Stop(); err != nil { + log.Error().Err(err).Msgf("(deleteProcess) error while deleting process %s", s) + } } } - delete(ml.grpcProcesses, s) delete(ml.models, s) return nil } func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error { var err error = nil - for k, p := range ml.grpcProcesses { - if filter(k, p) { + for k, m := range ml.models { + if filter(k, m.Process()) { e := ml.ShutdownModel(k) err = errors.Join(err, e) } @@ -44,17 +46,20 @@ func (ml *ModelLoader) StopAllGRPC() error { func (ml *ModelLoader) GetGRPCPID(id string) (int, error) { ml.mu.Lock() defer ml.mu.Unlock() - p, exists := ml.grpcProcesses[id] + p, exists := ml.models[id] if !exists { return -1, fmt.Errorf("no grpc backend found for %s", id) } - return strconv.Atoi(p.PID) + if p.Process() == nil { + return -1, fmt.Errorf("no grpc backend found for %s", id) + } + return strconv.Atoi(p.Process().PID) } -func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string, args ...string) error { +func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string, args ...string) (*process.Process, error) { // Make sure the process is executable if err := os.Chmod(grpcProcess, 0700); err != nil { - return err + return nil, err } log.Debug().Msgf("Loading GRPC Process: %s", grpcProcess) @@ -63,7 +68,7 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string workDir, err := filepath.Abs(filepath.Dir(grpcProcess)) if err != nil { - return err + return nil, err } grpcControlProcess := process.New( @@ -79,10 +84,8 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string ml.wd.AddAddressModelMap(serverAddress, id) } - ml.grpcProcesses[id] = grpcControlProcess - if err := grpcControlProcess.Run(); err != nil { - return err + return grpcControlProcess, err } log.Debug().Msgf("GRPC Service state dir: %s", grpcControlProcess.StateDir()) @@ -116,5 +119,5 @@ func (ml *ModelLoader) startProcess(grpcProcess, id string, serverAddress string } }() - return nil + return grpcControlProcess, nil }