Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}

}
if n.NodeType == core.BroadcasterNode {
if n.NodeType == core.BroadcasterNode || n.NodeType == core.RemoteSignerNode {
maxEV, _ := new(big.Rat).SetString(*cfg.MaxTicketEV)
if maxEV == nil {
panic(fmt.Errorf("-maxTicketEV must be a valid rational number, but %v provided. Restart the node with a valid value for -maxTicketEV", *cfg.MaxTicketEV))
Expand Down
5 changes: 5 additions & 0 deletions core/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (b *Balance) Credit(amount *big.Rat) {
b.balances.Credit(b.addr, b.manifestID, amount)
}

// Reserve zeroes the balance and returns the current balance
func (b *Balance) Reserve() *big.Rat {
return b.balances.Reserve(b.addr, b.manifestID)
}

// StageUpdate prepares a balance update by reserving the current balance and returning the number of tickets
// to send with a payment, the new credit represented by the payment and the existing credit (i.e reserved balance)
func (b *Balance) StageUpdate(minCredit, ev *big.Rat) (int, *big.Rat, *big.Rat) {
Expand Down
25 changes: 25 additions & 0 deletions pm/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Sender interface {
// for creating new tickets
StartSession(ticketParams TicketParams) string

// StartSessionWithNonce is like StartSession with a non-default nonce
StartSessionWithNonce(ticketParams TicketParams, nonce uint32) string

// CleanupSession deletes session from the internal map
CleanupSession(sessionID string)

Expand All @@ -33,6 +36,9 @@ type Sender interface {

// EV returns the ticket EV for a session
EV(sessionID string) (*big.Rat, error)

// Nonce returns the current nonce for a session
Nonce(sessionID string) (uint32, error)
}

type session struct {
Expand Down Expand Up @@ -75,6 +81,17 @@ func (s *sender) StartSession(ticketParams TicketParams) string {
return sessionID
}

func (s *sender) StartSessionWithNonce(ticketParams TicketParams, nonce uint32) string {
sessionID := ticketParams.RecipientRandHash.Hex()

s.sessions.Store(sessionID, &session{
ticketParams: ticketParams,
senderNonce: nonce,
})

return sessionID
}

// EV returns the ticket EV for a session
func (s *sender) EV(sessionID string) (*big.Rat, error) {
session, err := s.loadSession(sessionID)
Expand All @@ -85,6 +102,14 @@ func (s *sender) EV(sessionID string) (*big.Rat, error) {
return ticketEV(session.ticketParams.FaceValue, session.ticketParams.WinProb), nil
}

func (s *sender) Nonce(sessionID string) (uint32, error) {
session, err := s.loadSession(sessionID)
if err != nil {
return 0, err
}
return session.senderNonce, nil
}

func (s *sender) CleanupSession(sessionID string) {
s.sessions.Delete(sessionID)
}
Expand Down
10 changes: 10 additions & 0 deletions pm/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ func (m *MockSender) StartSession(ticketParams TicketParams) string {
return args.String(0)
}

func (m *MockSender) StartSessionWithNonce(ticketParams TicketParams, nonce uint32) string {
args := m.Called(ticketParams, nonce)
return args.String(0)
}

// CleanupSession deletes session from the internal ma
func (m *MockSender) CleanupSession(sessionID string) {
m.Called(sessionID)
Expand Down Expand Up @@ -545,3 +550,8 @@ func (m *MockSender) ValidateTicketParams(ticketParams *TicketParams) error {
args := m.Called(ticketParams)
return args.Error(0)
}

func (m *MockSender) Nonce(sessionID string) (uint32, error) {
args := m.Called(sessionID)
return uint32(args.Int(0)), args.Error(1)
}
2 changes: 1 addition & 1 deletion server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
priceInfo := sess.OrchestratorInfo.PriceInfo
var paymentProcessor *LivePaymentProcessor
if priceInfo != nil && priceInfo.PricePerUnit != 0 {
paymentSender := livePaymentSender{}
paymentSender := params.liveParams.paymentSender
sendPaymentFunc := func(inPixels int64) error {
return paymentSender.SendPayment(context.Background(), &SegmentInfoSender{
sess: sess.BroadcastSession,
Expand Down
12 changes: 10 additions & 2 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func processStream(ctx context.Context, params aiRequestParams, req worker.GenLi
var err error
for {
perOrchCtx, perOrchCancel := context.WithCancelCause(ctx)
params.liveParams = newParams(params.liveParams, perOrchCancel)
params.liveParams = newLiveParams(params, perOrchCancel)
var resp interface{}
resp, err = processAIRequest(perOrchCtx, params, req)
if err != nil {
Expand Down Expand Up @@ -758,7 +758,8 @@ func processStream(ctx context.Context, params aiRequestParams, req worker.GenLi
<-firstProcessed
}

func newParams(params *liveRequestParams, cancelOrch context.CancelCauseFunc) *liveRequestParams {
func newLiveParams(aiParams aiRequestParams, cancelOrch context.CancelCauseFunc) *liveRequestParams {
params := aiParams.liveParams
return &liveRequestParams{
segmentReader: params.segmentReader,
rtmpOutputs: params.rtmpOutputs,
Expand All @@ -775,8 +776,15 @@ func newParams(params *liveRequestParams, cancelOrch context.CancelCauseFunc) *l
orchestrator: params.orchestrator,
startTime: time.Now(),
kickOrch: cancelOrch,
paymentSender: choosePaymentSender(aiParams),
}
}

func choosePaymentSender(params aiRequestParams) LivePaymentSender {
if hasRemoteSigner(params) {
return NewRemotePaymentSender(params.node)
}
return &livePaymentSender{}
}

func startProcessing(ctx context.Context, params aiRequestParams, res interface{}) error {
Expand Down
55 changes: 43 additions & 12 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type liveRequestParams struct {
pipeline string
orchestrator string

paymentSender LivePaymentSender
paymentProcessInterval time.Duration
outSegmentTimeout time.Duration

Expand Down Expand Up @@ -1051,28 +1052,54 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A
params.liveParams.sess = sess
params.liveParams.startTime = time.Now()

// Live Video should not reuse the existing session balance, because it could lead to not sending the init
// payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side.
// It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side.
clearSessionBalance(sess.BroadcastSession, core.RandomManifestID())
var paymentHeaders worker.RequestEditorFn
if hasRemoteSigner(params) {
rpp, ok := params.liveParams.paymentSender.(*remotePaymentSender)
if !ok {
return nil, errors.New("remote sender was not the correct type")
}
res, err := rpp.RequestPayment(ctx, &SegmentInfoSender{
sess: sess.BroadcastSession,
})
if err != nil {
return nil, err
}
paymentHeaders = func(_ context.Context, req *http.Request) error {
req.Header.Set(segmentHeader, res.SegCreds)
req.Header.Set(paymentHeader, res.Payment)
req.Header.Set("Authorization", protoVerAIWorker)
return nil
}
} else {

client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo)
// Live Video should not reuse the existing session balance, because it could lead to not sending the init
// payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side.
// It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side.
clearSessionBalance(sess.BroadcastSession, core.RandomManifestID())

var (
balUpdate *BalanceUpdate
err error
)
paymentHeaders, balUpdate, err = prepareAIPayment(ctx, sess, initPixelsToPay)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
return nil, err
defer completeBalanceUpdate(sess.BroadcastSession, balUpdate)
}
paymentHeaders, balUpdate, err := prepareAIPayment(ctx, sess, initPixelsToPay)

// Send request to orchestrator
client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
defer completeBalanceUpdate(sess.BroadcastSession, balUpdate)

// Send request to orchestrator
reqTimeout := 5 * time.Second
reqCtx, cancel := context.WithTimeout(ctx, reqTimeout)
defer cancel()
Expand Down Expand Up @@ -1669,3 +1696,7 @@ func encodeReqMetadata(metadata map[string]string) string {
metadataBytes, _ := json.Marshal(metadata)
return string(metadataBytes)
}

func hasRemoteSigner(params aiRequestParams) bool {
return params.node != nil && params.node.RemoteSignerAddr != nil
}
150 changes: 149 additions & 1 deletion server/live_payment.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package server

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
"net/http"
"net/url"
"sync"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
Expand All @@ -25,6 +30,7 @@ type SegmentInfoSender struct {
inPixels int64
priceInfo *net.PriceInfo
mid string
callCount int
}

type SegmentInfoReceiver struct {
Expand Down Expand Up @@ -164,7 +170,149 @@ func (r *livePaymentReceiver) AccountPayment(
return fmt.Errorf("insufficient balance, mid=%s, fee=%s, balance=%s", segmentInfo.sessionID, fee.FloatString(0), balanceStr)
}
r.orchestrator.DebitFees(segmentInfo.sender, core.ManifestID(segmentInfo.sessionID), segmentInfo.priceInfo, segmentInfo.inPixels)
clog.V(common.DEBUG).Infof(ctx, "Accounted payment for sessionID=%s, fee=%s", segmentInfo.sessionID, fee.FloatString(0))
balance = r.orchestrator.Balance(segmentInfo.sender, core.ManifestID(segmentInfo.sessionID))
clog.V(common.DEBUG).Infof(ctx, "Accounted payment for sessionID=%s, fee=%s balance=%s", segmentInfo.sessionID, fee.FloatString(0), balance.FloatString(0))
return nil
}

// Delegate ticket generation to a remote signer service and then forward the
// payment on to the orchestrator. Return intermediate payment state as a blob
type remotePaymentSender struct {
node *core.LivepeerNode
client *http.Client

// access to all fields below must be protected by the mutex mu
mu sync.Mutex
state RemotePaymentStateSig
}

func NewRemotePaymentSender(node *core.LivepeerNode) *remotePaymentSender {
return &remotePaymentSender{
node: node,
client: &http.Client{
Timeout: paymentRequestTimeout,
},
}
}

func (r *remotePaymentSender) RequestPayment(ctx context.Context, segmentInfo *SegmentInfoSender) (*RemotePaymentResponse, error) {
if r == nil || r.node == nil || r.node.RemoteSignerAddr == nil {
return nil, fmt.Errorf("remote signer not configured")
}

sess := segmentInfo.sess
if sess == nil || sess.OrchestratorInfo == nil || sess.OrchestratorInfo.PriceInfo == nil {
return nil, fmt.Errorf("missing session or OrchestratorInfo")
}

// Marshal OrchestratorInfo
oInfoBytes, err := proto.Marshal(&net.PaymentResult{Info: sess.OrchestratorInfo})
if err != nil {
return nil, fmt.Errorf("error marshaling OrchestratorInfo for remote signer: %w", err)
}

r.mu.Lock()
state := r.state
r.mu.Unlock()

// Build remote payment request
reqPayload := RemotePaymentRequest{
ManifestID: segmentInfo.mid,
Orchestrator: oInfoBytes,
State: state,
Type: RemoteType_LiveVideoToVideo,
}

body, err := json.Marshal(reqPayload)
if err != nil {
return nil, fmt.Errorf("error marshaling request payload for remote signer: %w", err)
}

remoteURL := r.node.RemoteSignerAddr.ResolveReference(&url.URL{Path: "/generate-live-payment"})
httpReq, err := http.NewRequestWithContext(ctx, "POST", remoteURL.String(), bytes.NewReader(body))
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := r.client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("failed to call remote signer: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode == HTTPStatusRefreshSession {
if segmentInfo.callCount > 3 {
return nil, errors.New("too many consecutive session refreshes")
}
if err := refreshSession(ctx, sess, true); err != nil {
return nil, fmt.Errorf("could not refresh session for remote signer: %w", err)
}
segmentInfo.callCount += 1
return r.RequestPayment(ctx, segmentInfo)
}

if resp.StatusCode != http.StatusOK {
data, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("remote signer returned status %d: %s", resp.StatusCode, string(data))
}

var rp RemotePaymentResponse
if err := json.NewDecoder(resp.Body).Decode(&rp); err != nil {
return nil, fmt.Errorf("failed to decode remote signer response: %w", err)
}

// Cache updated state blob and signature
r.mu.Lock()
r.state = rp.State
r.mu.Unlock()

return &rp, nil
}

// SendPayment via remote signer: request tickets + seg creds from remote signer
// and then forward them to the orchestrator.
func (r *remotePaymentSender) SendPayment(ctx context.Context, segmentInfo *SegmentInfoSender) error {
rp, err := r.RequestPayment(ctx, segmentInfo)
if err != nil {
return err
}

// Forward payment + segment credentials to orchestrator
url := segmentInfo.sess.OrchestratorInfo.Transcoder
req, err := http.NewRequestWithContext(ctx, "POST", url+"/payment", nil)
if err != nil {
clog.Errorf(ctx, "Could not generate payment request to orch=%s", url)
return err
}
req.Header.Set(paymentHeader, rp.Payment)
req.Header.Set(segmentHeader, rp.SegCreds)

resp, err := sendReqWithTimeout(req, paymentRequestTimeout)
if err != nil {
clog.Errorf(ctx, "Could not send payment to orch=%s err=%q", url, err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
clog.Errorf(ctx, "Orchestrator did not accept payment status=%d", resp.StatusCode)
return fmt.Errorf("orchestrator did not accept payment, status=%d", resp.StatusCode)
}

data, err := io.ReadAll(resp.Body)
if err != nil {
clog.Errorf(ctx, "Could not read response from orchestrator=%s err=%q", url, err)
return err
}

// Update session to refresh ticket params from the response
var pr net.PaymentResult
err = proto.Unmarshal(data, &pr)
if err != nil {
clog.Errorf(ctx, "Could not unmarshal response from orchestrator=%s err=%q", url, err)
return err
}
updateSession(segmentInfo.sess, &ReceivedTranscodeResult{Info: pr.Info})

return nil
}

Expand Down
Loading
Loading