Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func (sender *Sender) popNextMessage() *Message {
func (sender *Sender) requeue(msg *Message, bytesSent uint64) {
sender.requeueLock.Lock()
defer sender.requeueLock.Unlock()
// if icebox has been closed, return early to avoid requeuing messages to a closed sender
if sender.icebox == nil {
return
}
msg.bytesSent += bytesSent
if msg.bytesSent <= msg.bytesAcked+kMaxUnackedBytes {
// requeue it so it can send its next frame later
Expand Down
42 changes: 42 additions & 0 deletions sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,45 @@ func TestStopSenderClearsAllMessageQueues(t *testing.T) {
assert.Equal(t, 0, len(sender.icebox))

}

func TestSenderIceboxPanicAfterClosure(t *testing.T) {

blipContextEchoServer, err := NewContext(defaultContextOptions)
if err != nil {
t.Fatal(err)
}

server := blipContextEchoServer.WebSocketServer()
http.Handle("/TestSenderIceboxPanicAfterClosure", server)
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go func() {
t.Error(http.Serve(listener, nil))
}()

blipContextEchoClient, err := NewContext(defaultContextOptions)
if err != nil {
t.Fatal(err)
}
port := listener.Addr().(*net.TCPAddr).Port
destUrl := fmt.Sprintf("ws://localhost:%d/TestSenderIceboxPanicAfterClosure", port)
sender, err := blipContextEchoClient.Dial(destUrl)
if err != nil {
t.Fatalf("Error opening WebSocket: %v", err)
}

// close sender
sender.Close()

// try requeueing messages after closure
for i := 1; i < 20; i++ {
msg := NewRequest()
msgProp := Properties{
"id": fmt.Sprint(i),
}
msg.Properties = msgProp
sender.requeue(msg, 500000)
}
}
Loading