Skip to content

Commit df1da9f

Browse files
authored
RSDK-8525: New answerer workers adding to a waitgroup must atomicly perform their closeCtx read and wg.Add call. (#322)
1 parent 9333cad commit df1da9f

File tree

1 file changed

+38
-21
lines changed

1 file changed

+38
-21
lines changed

rpc/wrtc_signaling_answerer.go

+38-21
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,8 @@ func (ans *webrtcSignalingAnswerer) Start() {
9191
ans.startStopMu.Lock()
9292
defer ans.startStopMu.Unlock()
9393

94-
ans.bgWorkersMu.RLock()
94+
// No lock is necessary here. It is illegal to call `ans.Stop` before `ans.Start` returns.
9595
ans.bgWorkers.Add(1)
96-
ans.bgWorkersMu.RUnlock()
9796

9897
// attempt to make connection in a loop
9998
utils.ManagedGo(func() {
@@ -158,17 +157,23 @@ func (ans *webrtcSignalingAnswerer) startAnswerer() {
158157
}
159158
return answerClient, nil
160159
}
161-
ans.bgWorkersMu.RLock()
162-
ans.bgWorkers.Add(1)
163-
ans.bgWorkersMu.RUnlock()
164160

165-
// Check if closeCtx has errored: underlying answerer may have been
166-
// `Stop`ped, in which case we mark this answer worker as `Done` and
167-
// return.
168-
if err := ans.closeCtx.Err(); err != nil {
169-
ans.bgWorkers.Done()
161+
// The answerer may be stopped (canceling the context and waiting on background workers)
162+
// concurrently to executing the below code. In that circumstance we must guarantee either:
163+
// * `Stop` waiting on the `bgWorkers` WaitGroup observes our `bgWorkers.Add` or
164+
// * Our code observes `Stop`s closing of the `closeCtx`
165+
//
166+
// We use a mutex to make the read of the `closeCtx` and write to the `bgWorkers` atomic. `Stop`
167+
// takes a competing mutex around canceling the `closeCtx`.
168+
ans.bgWorkersMu.RLock()
169+
select {
170+
case <-ans.closeCtx.Done():
171+
ans.bgWorkersMu.RUnlock()
170172
return
173+
default:
171174
}
175+
ans.bgWorkers.Add(1)
176+
ans.bgWorkersMu.RUnlock()
172177

173178
utils.ManagedGo(func() {
174179
var client webrtcpb.SignalingService_AnswerClient
@@ -186,6 +191,7 @@ func (ans *webrtcSignalingAnswerer) startAnswerer() {
186191
return
187192
default:
188193
}
194+
189195
var err error
190196
// `newAnswer` opens a bidi grpc stream to the signaling server. But otherwise sends no requests.
191197
client, err = newAnswer()
@@ -232,10 +238,14 @@ func (ans *webrtcSignalingAnswerer) Stop() {
232238
ans.startStopMu.Lock()
233239
defer ans.startStopMu.Unlock()
234240

235-
ans.cancelBgWorkers()
241+
// Code adding workers must atomically check the `closeCtx` before adding to the `bgWorkers`
242+
// wait group. Canceling the context must not split those two operations. We ensure this
243+
// atomicity by acquiring the `bgWorkersMu` write lock.
236244
ans.bgWorkersMu.Lock()
237-
ans.bgWorkers.Wait()
245+
ans.cancelBgWorkers()
246+
// Background workers require the `bgWorkersMu`. Release the mutex before calling `Wait`.
238247
ans.bgWorkersMu.Unlock()
248+
ans.bgWorkers.Wait()
239249

240250
ans.connMu.Lock()
241251
defer ans.connMu.Unlock()
@@ -383,19 +393,26 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
383393
})
384394
}
385395
}
386-
// must spin off to unblock the ICE gatherer
387-
ans.bgWorkersMu.RLock()
388-
ans.bgWorkers.Add(1)
389-
ans.bgWorkersMu.RUnlock()
390396

391-
// Check if closeCtx has errored: underlying answerer may have been
392-
// `Stop`ped, in which case we mark this answer worker as `Done` and
393-
// return.
394-
if err := ans.closeCtx.Err(); err != nil {
395-
ans.bgWorkers.Done()
397+
// The answerer may be stopped (canceling the context and waiting on background workers)
398+
// concurrently to executing the below code. In that circumstance we must guarantee
399+
// either:
400+
// * `Stop` waiting on the `bgWorkers` WaitGroup observes our `bgWorkers.Add` or
401+
// * Our code observes `Stop`s closing of the `closeCtx`
402+
//
403+
// We use a mutex to make the read of the `closeCtx` and write to the `bgWorkers`
404+
// atomic. `Stop` takes a competing mutex around canceling the `closeCtx`.
405+
ans.bgWorkersMu.RLock()
406+
select {
407+
case <-ans.closeCtx.Done():
408+
ans.bgWorkersMu.RUnlock()
396409
return
410+
default:
397411
}
412+
ans.bgWorkers.Add(1)
413+
ans.bgWorkersMu.RUnlock()
398414

415+
// must spin off to unblock the ICE gatherer
399416
utils.PanicCapturingGo(func() {
400417
defer ans.bgWorkers.Done()
401418

0 commit comments

Comments
 (0)