diff --git a/server/job_rpc.go b/server/job_rpc.go index d787bd2010..8ffd8b859e 100644 --- a/server/job_rpc.go +++ b/server/job_rpc.go @@ -338,7 +338,6 @@ func (ls *LivepeerServer) setupGatewayJob(ctx context.Context, r *http.Request, func (h *lphttp) ProcessJob(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return @@ -390,19 +389,7 @@ func (ls *LivepeerServer) submitJob(ctx context.Context, w http.ResponseWriter, //send the request to the Orchestrator(s) //the loop ends on Gateway error and bad request errors for _, orchToken := range gatewayJob.Orchs { - - // Extract the worker resource route from the URL path - // The prefix is "/process/request/" - // if the request does not include the last / of the prefix no additional url path is added - workerRoute := orchToken.ServiceAddr + "/process/request" - prefix := "/process/request/" workerResourceRoute := r.URL.Path - if strings.HasPrefix(workerResourceRoute, prefix) { - workerResourceRoute = workerResourceRoute[len(prefix):] - } - if workerResourceRoute != "" { - workerRoute = workerRoute + "/" + workerResourceRoute - } err := gatewayJob.sign() if err != nil { diff --git a/server/job_rpc_test.go b/server/job_rpc_test.go index 2cbcaa3a5c..df06778be3 100644 --- a/server/job_rpc_test.go +++ b/server/job_rpc_test.go @@ -13,6 +13,7 @@ import ( "net/http/httptest" "net/url" "slices" + "strings" "sync" "testing" "time" @@ -1325,3 +1326,107 @@ func createMockJobToken(hostUrl string) *core.JobToken { }, } } + +func TestSubmitJob_OrchestratorReceivesPOSTRequest(t *testing.T) { + // Track received requests + var receivedRequests []string + var requestMethods []string + var mu sync.Mutex + + // Create mock orchestrator server that captures request methods + mockOrchServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + requestMethods = append(requestMethods, r.Method) + receivedRequests = append(receivedRequests, r.URL.Path) + mu.Unlock() + + switch r.URL.Path { + case "/process/token": + // Return mock job token for orchestrator discovery + token := createMockJobToken("http://" + r.Host) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(token) + case "/process/request": + // Mock successful job processing response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"result": "success"}`)) + default: + http.NotFound(w, r) + } + })) + defer mockOrchServer.Close() + + // Set up LivepeerServer with mock orchestrator pool + node := mockJobLivepeerNode() + pool := newStubOrchestratorPool(node, []string{mockOrchServer.URL}) + node.OrchestratorPool = pool + + // Mock sender for payment processing + mockSender := pm.MockSender{} + mockSender.On("StartSession", mock.Anything).Return("foo") + mockSender.On("CreateTicketBatch", mock.Anything, mock.Anything).Return(mockTicketBatch(10), nil) + node.Sender = &mockSender + node.Balances = core.NewAddressBalances(10) + defer node.Balances.StopCleanup() + + ls := &LivepeerServer{LivepeerNode: node} + + // Create job request + jobDetails := JobRequestDetails{StreamId: "test-stream"} + jobParams := JobParameters{EnableVideoIngress: true, EnableVideoEgress: true, EnableDataOutput: true} + jobReq := JobRequest{ + ID: "test-job-123", + Request: marshalToString(t, jobDetails), + Parameters: marshalToString(t, jobParams), + Capability: "test-capability", + Timeout: 10, + Sender: "0x1234567890abcdef1234567890abcdef12345678", + Sig: "0x456", + } + + // Marshal and encode job request for header + jobReqBytes, err := json.Marshal(jobReq) + assert.NoError(t, err) + jobReqB64 := base64.StdEncoding.EncodeToString(jobReqBytes) + + // Create HTTP request to submit job + requestBody := `{"test": "data"}` + req := httptest.NewRequest(http.MethodPost, "/process/request", strings.NewReader(requestBody)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(jobRequestHdr, jobReqB64) + + // Execute submitJob + w := httptest.NewRecorder() + handler := ls.SubmitJob() + handler.ServeHTTP(w, req) + + // Verify the test results + mu.Lock() + defer mu.Unlock() + + // Check that we received requests + assert.Greater(t, len(receivedRequests), 0, "Expected to receive at least one request") + assert.Greater(t, len(requestMethods), 0, "Expected to capture at least one request method") + + // Verify that GET request was made for token discovery + assert.Contains(t, requestMethods, "GET", "Expected GET request for token discovery") + assert.Contains(t, receivedRequests, "/process/token", "Expected token discovery request") + + // Verify that POST request was made to process the job + assert.Contains(t, requestMethods, "POST", "Expected POST request for job processing") + assert.Contains(t, receivedRequests, "/process/request", "Expected job processing request") + + // Count POST requests specifically + postCount := 0 + for _, method := range requestMethods { + if method == "POST" { + postCount++ + } + } + assert.Greater(t, postCount, 0, "Expected at least one POST request to orchestrator") + + t.Logf("Received %d requests, %d POST requests", len(receivedRequests), postCount) + t.Logf("Request methods: %v", requestMethods) + t.Logf("Request paths: %v", receivedRequests) +} diff --git a/server/job_stream.go b/server/job_stream.go index e02653d6eb..cdf09e54fa 100644 --- a/server/job_stream.go +++ b/server/job_stream.go @@ -1359,15 +1359,17 @@ func (h *lphttp) StopStream(w http.ResponseWriter, r *http.Request) { if err != nil { clog.Errorf(ctx, "Error sending request to worker %v: %v", workerRoute, err) } + var respBody []byte + if resp.Body != nil { + respBody, err = io.ReadAll(resp.Body) + if err != nil { + clog.Errorf(ctx, "Error reading response body: %v", err) + } + defer resp.Body.Close() - respBody, err := io.ReadAll(resp.Body) - if err != nil { - clog.Errorf(ctx, "Error reading response body: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode > 399 { - clog.Errorf(ctx, "error processing stream stop request statusCode=%d", resp.StatusCode) + if resp.StatusCode > 399 { + clog.Errorf(ctx, "error processing stream stop request statusCode=%d", resp.StatusCode) + } } // Stop the stream and free capacity