Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [#3814](https://github.com/livepeer/go-livepeer/pull/3814) ai/worker: Add scope pipeline support to worker and build scripts (@victorges)
* [#3823](https://github.com/livepeer/go-livepeer/pull/3823) ai/worker: Add sd15-v2v image support (@victorges)
* [#3843](https://github.com/livepeer/go-livepeer/pull/3843) ai/worker: Add sdxl-v2v image support (@victorges)
* [#3849](https://github.com/livepeer/go-livepeer/pull/3849) byoc: fix orchestrator stream setup when fails (@ad-astra-video)

#### Transcoder

Expand Down
26 changes: 26 additions & 0 deletions byoc/stream_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ func (bso *BYOCOrchestratorServer) StartStream() http.Handler {
eventsCh *trickle.TrickleLocalPublisher
dataCh *trickle.TrickleLocalPublisher
)
failedToStartStream := false

// reset trickle channels and release capacity on failure
defer func() {
if failedToStartStream {
bso.orch.FreeExternalCapabilityCapacity(orchJob.Req.Capability)
//close the trickle channels
if pubCh != nil {
pubCh.Close()
}
if subCh != nil {
subCh.Close()
}
if dataCh != nil {
dataCh.Close()
}
controlPubCh.Close()
eventsCh.Close()
}
}()

reqBodyForRunner := make(map[string]interface{})
reqBodyForRunner["gateway_request_id"] = mid
Expand Down Expand Up @@ -115,6 +135,7 @@ func (bso *BYOCOrchestratorServer) StartStream() http.Handler {
if err := json.Unmarshal(body, &bodyJSON); err != nil {
clog.Errorf(ctx, "Failed to parse body as JSON: %v", err)
http.Error(w, "Invalid JSON body", http.StatusBadRequest)
failedToStartStream = true
return
}
for key, value := range bodyJSON {
Expand All @@ -125,6 +146,7 @@ func (bso *BYOCOrchestratorServer) StartStream() http.Handler {
if err != nil {
clog.Errorf(ctx, "Failed to marshal request body err=%v", err)
http.Error(w, "Failed to marshal request body", http.StatusInternalServerError)
failedToStartStream = true
return
}

Expand All @@ -140,13 +162,15 @@ func (bso *BYOCOrchestratorServer) StartStream() http.Handler {
if err != nil {
clog.Errorf(ctx, "Error sending request to worker %v: %v", workerRoute, err)
respondWithError(w, "Error sending request to worker", http.StatusInternalServerError)
failedToStartStream = true
return
}

respBody, err := io.ReadAll(resp.Body)
if err != nil {
clog.Errorf(ctx, "Error reading response body: %v", err)
respondWithError(w, "Error reading response body", http.StatusInternalServerError)
failedToStartStream = true
return
}
defer resp.Body.Close()
Expand All @@ -160,6 +184,7 @@ func (bso *BYOCOrchestratorServer) StartStream() http.Handler {
//return error response from the worker
w.WriteHeader(resp.StatusCode)
w.Write(respBody)
failedToStartStream = true
return
}

Expand All @@ -173,6 +198,7 @@ func (bso *BYOCOrchestratorServer) StartStream() http.Handler {
if err != nil {
clog.Errorf(ctx, "Error adding stream to external capabilities: %v", err)
respondWithError(w, "Error adding stream to external capabilities", http.StatusInternalServerError)
failedToStartStream = true
return
}

Expand Down
Loading
Loading