Skip to content

Commit 278a920

Browse files
authoredAug 15, 2024··
RSDK-8549: Answerer Makeover (#325)
1 parent 7c0e55c commit 278a920

File tree

1 file changed

+174
-197
lines changed

1 file changed

+174
-197
lines changed
 

‎rpc/wrtc_signaling_answerer.go

+174-197
Original file line numberDiff line numberDiff line change
@@ -195,38 +195,67 @@ func (ans *webrtcSignalingAnswerer) startAnswerer() {
195195
var err error
196196
// `newAnswer` opens a bidi grpc stream to the signaling server. But otherwise sends no requests.
197197
client, err = newAnswer()
198-
receivedInitRequest := false
199-
if err == nil {
200-
// `ans.answer` will send the initial message to the signaling server that says it
201-
// is ready to accept connections. Then it waits, typically for a long time, for a
202-
// caller to show up. Which is when the signaling server will send a
203-
// response. `ans.answer` then follows with gathering ICE candidates and learning of
204-
// the caller's ICE candidates to create a working WebRTC PeerConnection. If
205-
// successful, the `PeerConnection` + `webrtcServerChannel` will be registered and
206-
// available for the `webrtcServer`.
207-
receivedInitRequest, err = ans.answer(client)
198+
if err != nil {
199+
if checkExceptionalError(err) != nil {
200+
ans.logger.Warnw("error communicating with signaling server", "error", err)
201+
if !utils.SelectContextOrWait(ans.closeCtx, answererReconnectWait) {
202+
return
203+
}
204+
}
205+
continue
208206
}
209207

210-
switch {
211-
case err == nil:
212-
case receivedInitRequest && err != nil:
213-
// We received an error while trying to connect to a caller/peer.
214-
ans.logger.Errorw("error connecting to peer", "error", err)
215-
if !utils.SelectContextOrWait(ans.closeCtx, answererReconnectWait) {
216-
return
217-
}
218-
case !receivedInitRequest && err != nil:
219-
// Exceptional errors represent a broken connection to the signaling server. While
220-
// direct gRPC connections will reconnect on their own, we should wait a little
221-
// before trying to call again. Common errors represent that an operation has
222-
// failed, but can be safely retried over the existing connection.
208+
// `client.Recv` waits, typically for a long time, for a caller to show up. Which is
209+
// when the signaling server will send a response saying someone wants to connect.
210+
incomingCallerReq, err := client.Recv()
211+
if err != nil {
223212
if checkExceptionalError(err) != nil {
224213
ans.logger.Warnw("error communicating with signaling server", "error", err)
225214
if !utils.SelectContextOrWait(ans.closeCtx, answererReconnectWait) {
226215
return
227216
}
228217
}
218+
continue
229219
}
220+
221+
// Create an `answerAttempt` to take advantage of the `sendError` method for the
222+
// upcoming type check.
223+
aa := &answerAttempt{
224+
webrtcSignalingAnswerer: ans,
225+
uuid: incomingCallerReq.Uuid,
226+
client: client,
227+
trickleEnabled: true,
228+
}
229+
230+
initStage, ok := incomingCallerReq.Stage.(*webrtcpb.AnswerRequest_Init)
231+
if !ok {
232+
aa.sendError(fmt.Errorf("expected first stage to be init; got %T", incomingCallerReq.Stage))
233+
ans.logger.Warnw("error communicating with signaling server", "error", err)
234+
continue
235+
}
236+
237+
if cfg := initStage.Init.OptionalConfig; cfg != nil && cfg.DisableTrickle {
238+
aa.trickleEnabled = false
239+
}
240+
aa.offerSDP = initStage.Init.Sdp
241+
242+
var answerCtx context.Context
243+
var answerCtxCancel func()
244+
if deadline := initStage.Init.Deadline; deadline != nil {
245+
answerCtx, answerCtxCancel = context.WithDeadline(aa.closeCtx, deadline.AsTime())
246+
} else {
247+
answerCtx, answerCtxCancel = context.WithTimeout(aa.closeCtx, getDefaultOfferDeadline())
248+
}
249+
250+
if err = aa.connect(answerCtx); err != nil {
251+
answerCtxCancel()
252+
// We received an error while trying to connect to a caller/peer.
253+
ans.logger.Errorw("error connecting to peer", "error", err)
254+
if !utils.SelectContextOrWait(ans.closeCtx, answererReconnectWait) {
255+
return
256+
}
257+
}
258+
answerCtxCancel()
230259
}
231260
}, func() {
232261
ans.bgWorkers.Done()
@@ -257,58 +286,40 @@ func (ans *webrtcSignalingAnswerer) Stop() {
257286
}
258287
}
259288

260-
// answer accepts a single call offer, responds with a corresponding SDP, and
261-
// attempts to establish a WebRTC connection with the caller via ICE. Once established,
262-
// the designated WebRTC data channel is passed off to the underlying Server which
263-
// is then used as the server end of a gRPC connection.
264-
func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_AnswerClient) (receivedInitRequest bool, err error) {
265-
receivedInitRequest = false
289+
type answerAttempt struct {
290+
*webrtcSignalingAnswerer
291+
// The uuid is the key for communicating with the signaling server about this connection
292+
// attempt.
293+
uuid string
294+
client webrtcpb.SignalingService_AnswerClient
266295

267-
resp, err := client.Recv()
268-
if err != nil {
269-
return receivedInitRequest, err
270-
}
296+
trickleEnabled bool
297+
offerSDP string
271298

272-
receivedInitRequest = true
273-
uuid := resp.Uuid
274-
initStage, ok := resp.Stage.(*webrtcpb.AnswerRequest_Init)
275-
if !ok {
276-
err := errors.Errorf("expected first stage to be init; got %T", resp.Stage)
277-
return receivedInitRequest, client.Send(&webrtcpb.AnswerResponse{
278-
Uuid: uuid,
279-
Stage: &webrtcpb.AnswerResponse_Error{
280-
Error: &webrtcpb.AnswerResponseErrorStage{
281-
Status: ErrorToStatus(err).Proto(),
282-
},
283-
},
284-
})
285-
}
286-
init := initStage.Init
299+
// When a connection attempt concludes, either with success or failure, we will fire a single
300+
// message to the signaling server. This allows the signaling server to release resources
301+
// related to this connection attempt.
302+
sendDoneErrOnce sync.Once
303+
}
287304

288-
disableTrickle := false
289-
if init.OptionalConfig != nil {
290-
disableTrickle = init.OptionalConfig.DisableTrickle
291-
}
305+
// connect accepts a single call offer, responds with a corresponding SDP, and
306+
// attempts to establish a WebRTC connection with the caller via ICE. Once established,
307+
// the designated WebRTC data channel is passed off to the underlying Server which
308+
// is then used as the server end of a gRPC connection.
309+
func (aa *answerAttempt) connect(ctx context.Context) (err error) {
292310
pc, dc, err := newPeerConnectionForServer(
293-
ans.closeCtx,
294-
init.Sdp,
295-
ans.webrtcConfig,
296-
disableTrickle,
297-
ans.logger,
311+
ctx,
312+
aa.offerSDP,
313+
aa.webrtcConfig,
314+
!aa.trickleEnabled,
315+
aa.logger,
298316
)
299317
if err != nil {
300-
return receivedInitRequest, client.Send(&webrtcpb.AnswerResponse{
301-
Uuid: uuid,
302-
Stage: &webrtcpb.AnswerResponse_Error{
303-
Error: &webrtcpb.AnswerResponseErrorStage{
304-
Status: ErrorToStatus(err).Proto(),
305-
},
306-
},
307-
})
318+
aa.sendError(err)
319+
return err
308320
}
309321

310-
serverChannel := ans.server.NewChannel(pc, dc, ans.hosts)
311-
322+
// We have a PeerConnection object. Install an error handler.
312323
var successful bool
313324
defer func() {
314325
if !(successful && err == nil) {
@@ -320,7 +331,7 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
320331
connInfo := getWebRTCPeerConnectionStats(pc)
321332
iceConnectionState := pc.ICEConnectionState()
322333
iceGatheringState := pc.ICEGatheringState()
323-
ans.logger.Warnw("Connection establishment failed",
334+
aa.logger.Warnw("Connection establishment failed",
324335
"conn_id", connInfo.ID,
325336
"ice_connection_state", iceConnectionState,
326337
"ice_gathering_state", iceGatheringState,
@@ -334,55 +345,20 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
334345
}
335346
}()
336347

337-
// only send once since exchange may end or ICE may end
338-
var sendDoneErrorOnce sync.Once
339-
sendDone := func() error {
340-
var err error
341-
sendDoneErrorOnce.Do(func() {
342-
err = client.Send(&webrtcpb.AnswerResponse{
343-
Uuid: uuid,
344-
Stage: &webrtcpb.AnswerResponse_Done{
345-
Done: &webrtcpb.AnswerResponseDoneStage{},
346-
},
347-
})
348-
})
349-
return err
350-
}
351-
352-
var exchangeCtx context.Context
353-
var exchangeCancel func()
354-
if initStage.Init.Deadline != nil {
355-
exchangeCtx, exchangeCancel = context.WithDeadline(ans.closeCtx, initStage.Init.Deadline.AsTime())
356-
} else {
357-
exchangeCtx, exchangeCancel = context.WithTimeout(ans.closeCtx, getDefaultOfferDeadline())
358-
}
359-
360-
errCh := make(chan error)
361-
defer exchangeCancel()
362-
sendErr := func(err error) {
363-
if isEOF(err) {
364-
ans.logger.Warnf("answerer swallowing err %v", err)
365-
return
366-
}
367-
ans.logger.Warnf("answerer received err %v of type %T", err, err)
368-
select {
369-
case <-exchangeCtx.Done():
370-
case errCh <- err:
371-
}
372-
}
348+
serverChannel := aa.server.NewChannel(pc, dc, aa.hosts)
373349

374350
initSent := make(chan struct{})
375-
if !init.OptionalConfig.DisableTrickle {
351+
if aa.trickleEnabled {
376352
answer, err := pc.CreateAnswer(nil)
377353
if err != nil {
378-
return receivedInitRequest, err
354+
return err
379355
}
380356

381357
var pendingCandidates sync.WaitGroup
382358
waitOneHost := make(chan struct{})
383359
var waitOneHostOnce sync.Once
384360
pc.OnICECandidate(func(icecandidate *webrtc.ICECandidate) {
385-
if exchangeCtx.Err() != nil {
361+
if ctx.Err() != nil {
386362
return
387363
}
388364
if icecandidate != nil {
@@ -402,177 +378,178 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
402378
//
403379
// We use a mutex to make the read of the `closeCtx` and write to the `bgWorkers`
404380
// atomic. `Stop` takes a competing mutex around canceling the `closeCtx`.
405-
ans.bgWorkersMu.RLock()
381+
aa.bgWorkersMu.RLock()
406382
select {
407-
case <-ans.closeCtx.Done():
408-
ans.bgWorkersMu.RUnlock()
383+
case <-aa.closeCtx.Done():
384+
aa.bgWorkersMu.RUnlock()
409385
return
410386
default:
411387
}
412-
ans.bgWorkers.Add(1)
413-
ans.bgWorkersMu.RUnlock()
388+
aa.bgWorkers.Add(1)
389+
aa.bgWorkersMu.RUnlock()
414390

415391
// must spin off to unblock the ICE gatherer
416392
utils.PanicCapturingGo(func() {
417-
defer ans.bgWorkers.Done()
393+
defer aa.bgWorkers.Done()
418394

419395
if icecandidate != nil {
420396
defer pendingCandidates.Done()
421397
}
422398

423399
select {
424400
case <-initSent:
425-
case <-exchangeCtx.Done():
401+
case <-ctx.Done():
426402
return
427403
}
428404
// there are no more candidates coming during this negotiation
429405
if icecandidate == nil {
430406
if _, ok := os.LookupEnv(testDelayAnswererNegotiationVar); ok {
431407
// RSDK-4293: Introducing a sleep here replicates the conditions
432408
// for a prior goroutine leak.
433-
ans.logger.Debug("Sleeping to delay the end of the negotiation")
409+
aa.logger.Debug("Sleeping to delay the end of the negotiation")
434410
time.Sleep(1 * time.Second)
435411
}
436412
pendingCandidates.Wait()
437-
if err := sendDone(); err != nil {
438-
sendErr(err)
439-
}
413+
aa.sendDone()
440414
return
441415
}
442416
iProto := iceCandidateToProto(icecandidate)
443-
if err := client.Send(&webrtcpb.AnswerResponse{
444-
Uuid: uuid,
417+
if err := aa.client.Send(&webrtcpb.AnswerResponse{
418+
Uuid: aa.uuid,
445419
Stage: &webrtcpb.AnswerResponse_Update{
446420
Update: &webrtcpb.AnswerResponseUpdateStage{
447421
Candidate: iProto,
448422
},
449423
},
450424
}); err != nil {
451-
sendErr(err)
425+
aa.sendError(err)
452426
}
453427
})
454428
})
455429

456430
err = pc.SetLocalDescription(answer)
457431
if err != nil {
458-
return receivedInitRequest, err
432+
return err
459433
}
460434

461435
select {
462-
case <-exchangeCtx.Done():
463-
return receivedInitRequest, exchangeCtx.Err()
464436
case <-waitOneHost:
437+
// Dan: We wait for one host before proceeding to ensure the initial response has some
438+
// candidate information. This is a Nagle's algorithm-esque batching optimization. I
439+
// think.
440+
case <-ctx.Done():
441+
return ctx.Err()
465442
}
466443
}
467444

468445
encodedSDP, err := EncodeSDP(pc.LocalDescription())
469446
if err != nil {
470-
return receivedInitRequest, client.Send(&webrtcpb.AnswerResponse{
471-
Uuid: uuid,
472-
Stage: &webrtcpb.AnswerResponse_Error{
473-
Error: &webrtcpb.AnswerResponseErrorStage{
474-
Status: ErrorToStatus(err).Proto(),
475-
},
476-
},
477-
})
447+
aa.sendError(err)
448+
return err
478449
}
479450

480-
if err := client.Send(&webrtcpb.AnswerResponse{
481-
Uuid: uuid,
451+
if err := aa.client.Send(&webrtcpb.AnswerResponse{
452+
Uuid: aa.uuid,
482453
Stage: &webrtcpb.AnswerResponse_Init{
483454
Init: &webrtcpb.AnswerResponseInitStage{
484455
Sdp: encodedSDP,
485456
},
486457
},
487458
}); err != nil {
488-
return receivedInitRequest, err
459+
return err
489460
}
490461
close(initSent)
491462

492-
if !init.OptionalConfig.DisableTrickle {
493-
exchangeCandidates := func() error {
494-
for {
495-
if err := exchangeCtx.Err(); err != nil {
496-
if errors.Is(err, context.Canceled) {
497-
return nil
498-
}
499-
return err
500-
}
463+
if aa.trickleEnabled {
464+
done := make(chan struct{})
465+
defer func() { <-done }()
466+
467+
utils.PanicCapturingGoWithCallback(func() {
468+
defer close(done)
501469

502-
ansResp, err := client.Recv()
470+
for {
471+
// `client` was constructed based off of the `ans.closeCtx`. We rely on the
472+
// underlying `client.Recv` implementation checking that context for cancelation.
473+
ansResp, err := aa.client.Recv()
503474
if err != nil {
504475
if !errors.Is(err, io.EOF) {
505-
return err
476+
aa.logger.Warn("Error receiving initial message from signaling server", "err", err)
506477
}
507-
return nil
478+
return
508479
}
509480

510-
switch s := ansResp.Stage.(type) {
481+
switch stage := ansResp.Stage.(type) {
511482
case *webrtcpb.AnswerRequest_Init:
512483
case *webrtcpb.AnswerRequest_Update:
513-
if ansResp.Uuid != uuid {
514-
return errors.Errorf("uuid mismatch; have=%q want=%q", ansResp.Uuid, uuid)
484+
if ansResp.Uuid != aa.uuid {
485+
aa.sendError(fmt.Errorf("uuid mismatch; have=%q want=%q", ansResp.Uuid, aa.uuid))
486+
return
515487
}
516-
cand := iceCandidateFromProto(s.Update.Candidate)
488+
cand := iceCandidateFromProto(stage.Update.Candidate)
517489
if err := pc.AddICECandidate(cand); err != nil {
518-
return err
490+
aa.sendError(err)
491+
return
519492
}
520493
case *webrtcpb.AnswerRequest_Done:
521-
return nil
494+
return
522495
case *webrtcpb.AnswerRequest_Error:
523-
respStatus := status.FromProto(s.Error.Status)
524-
return fmt.Errorf("error from requester: %w", respStatus.Err())
496+
respStatus := status.FromProto(stage.Error.Status)
497+
aa.sendError(fmt.Errorf("error from requester: %w", respStatus.Err()))
498+
return
525499
default:
526-
return errors.Errorf("unexpected stage %T", s)
500+
aa.sendError(fmt.Errorf("unexpected stage %T", stage))
501+
return
527502
}
528503
}
529-
}
530-
531-
done := make(chan struct{})
532-
defer func() { <-done }()
533-
utils.PanicCapturingGoWithCallback(func() {
534-
defer close(done)
535-
if err := exchangeCandidates(); err != nil {
536-
sendErr(err)
537-
}
538504
}, func(err interface{}) {
539-
sendErr(fmt.Errorf("%v", err))
505+
aa.sendError(fmt.Errorf("%v", err))
540506
})
541507
}
542508

543-
doAnswer := func() error {
544-
select {
545-
case <-exchangeCtx.Done():
546-
return multierr.Combine(exchangeCtx.Err(), serverChannel.Close())
547-
case <-serverChannel.Ready():
548-
return nil
549-
case <-errCh:
550-
return multierr.Combine(err, serverChannel.Close())
551-
}
509+
select {
510+
case <-serverChannel.Ready():
511+
// Happy path
512+
successful = true
513+
case <-ctx.Done():
514+
// Timed out or signaling server was closed.
515+
aa.sendError(multierr.Combine(ctx.Err(), serverChannel.Close()))
516+
return ctx.Err()
552517
}
553518

554-
if answerErr := doAnswer(); answerErr != nil {
555-
var err error
556-
sendDoneErrorOnce.Do(func() {
557-
err = client.Send(&webrtcpb.AnswerResponse{
558-
Uuid: uuid,
559-
Stage: &webrtcpb.AnswerResponse_Error{
560-
Error: &webrtcpb.AnswerResponseErrorStage{
561-
Status: ErrorToStatus(answerErr).Proto(),
562-
},
519+
aa.sendDone()
520+
return nil
521+
}
522+
523+
func (aa *answerAttempt) sendDone() {
524+
aa.sendDoneErrOnce.Do(func() {
525+
sendErr := aa.client.Send(&webrtcpb.AnswerResponse{
526+
Uuid: aa.uuid,
527+
Stage: &webrtcpb.AnswerResponse_Done{
528+
Done: &webrtcpb.AnswerResponseDoneStage{},
529+
},
530+
})
531+
532+
if sendErr != nil {
533+
// Errors communicating with the signaling server have no bearing on whether the
534+
// PeerConnection is usable. Log and ignore the send error.
535+
aa.logger.Warnw("Failed to send connection success message to signaling server", "sendErr", sendErr)
536+
}
537+
})
538+
}
539+
540+
func (aa *answerAttempt) sendError(err error) {
541+
aa.sendDoneErrOnce.Do(func() {
542+
sendErr := aa.client.Send(&webrtcpb.AnswerResponse{
543+
Uuid: aa.uuid,
544+
Stage: &webrtcpb.AnswerResponse_Error{
545+
Error: &webrtcpb.AnswerResponseErrorStage{
546+
Status: ErrorToStatus(err).Proto(),
563547
},
564-
})
548+
},
565549
})
566-
return receivedInitRequest, err
567-
}
568-
if err := sendDone(); err != nil {
569-
// Errors from sendDone (such as EOF) are sometimes caused by the signaling
570-
// server "ending" the exchange process earlier than the answerer due to
571-
// the caller being able to establish a connection without all the
572-
// answerer's ICE candidates (trickle ICE). Only Warn the error here to
573-
// avoid accidentally Closing a healthy, established peer connection.
574-
ans.logger.Warnw("error ending signaling exchange from answer client", "error", err)
575-
}
576-
successful = true
577-
return receivedInitRequest, nil
550+
551+
if sendErr != nil {
552+
aa.logger.Warnw("Failed to send error message to signaling server", "sendErr", sendErr)
553+
}
554+
})
578555
}

0 commit comments

Comments
 (0)
Please sign in to comment.