Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (d *decompressor) passthrough(input []byte, checksum *uint32) ([]byte, erro
_, _ = d.checksum.Write(input) // Update checksum (no error possible)
if checksum != nil {
if curChecksum := d.getChecksum(); curChecksum != *checksum {
return nil, fmt.Errorf("Invalid checksum %x; should be %x", curChecksum, *checksum)
return nil, fmt.Errorf("invalid checksum %x; should be %x", curChecksum, *checksum)
}
}
return input, nil
Expand Down Expand Up @@ -168,7 +168,7 @@ func (d *decompressor) decompress(input []byte, checksum uint32) ([]byte, error)
return nil, err
} else if n == 0 {
// Nothing more to read; since checksum didn't match (above), fail:
return nil, fmt.Errorf("Invalid checksum %x; should be %x", d.getChecksum(), checksum)
return nil, fmt.Errorf("invalid checksum %x; should be %x", d.getChecksum(), checksum)
}
_, _ = d.checksum.Write(d.buffer[0:n]) // Update checksum (no error possible)

Expand Down
2 changes: 1 addition & 1 deletion codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func init() {
randomData = make([]byte, 65536)
var b byte
var step byte = 1
for i, _ := range randomData {
for i := range randomData {
if rando.Intn(10) == 0 {
b = byte(rando.Intn(256))
step = byte(rando.Intn(4))
Expand Down
98 changes: 29 additions & 69 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,13 @@ import (
"io"
"math/rand"
"net/http"
"runtime/debug"
"strings"
"sync/atomic"
"time"

"github.com/coder/websocket"
)

// A function that handles an incoming BLIP request and optionally sends a response.
// A handler is called on a new goroutine so it can take as long as it needs to.
// For example, if it has to send a synchronous network request before it can construct
// a response, that's fine.
type Handler func(request *Message)

// Utility function that responds to a Message with a 404 error.
func Unhandled(request *Message) {
request.Response().SetError(BLIPErrorDomain, 404, "No handler for BLIP request")
}

// Defines how incoming requests are dispatched to handler functions.
type Context struct {

Expand All @@ -49,14 +37,14 @@ type Context struct {
// Patterns that the Origin header must match (if non-empty)
origin []string

HandlerForProfile map[string]Handler // Handler function for a request Profile
DefaultHandler Handler // Handler for all otherwise unhandled requests
FatalErrorHandler func(error) // Called when connection has a fatal error
HandlerPanicHandler func(request, response *Message, err interface{}) // Called when a profile handler panics
MaxSendQueueCount int // Max # of messages being sent at once (if >0)
Logger LogFn // Logging callback; defaults to log.Printf
LogMessages bool // If true, will log about messages
LogFrames bool // If true, will log about frames (very verbose)
RequestHandler AsyncHandler // Callback that handles incoming requests
FatalErrorHandler FatalErrorHandler // Called when connection has a fatal error
HandlerPanicHandler HandlerPanicHandler // Called when a profile handler panics
MaxSendQueueCount int // Max # of messages being sent at once (if >0)
MaxDispatchedBytes int // Max total size of incoming requests being dispatched (if >0)
Logger LogFn // Logging callback; defaults to log.Printf
LogMessages bool // If true, will log about messages
LogFrames bool // If true, will log about frames (very verbose)

OnExitCallback func() // OnExitCallback callback invoked when the underlying connection closes and the receive loop exits.

Expand All @@ -65,12 +53,21 @@ type Context struct {
// An identifier that uniquely defines the context. NOTE: Random Number Generator not seeded by go-blip.
ID string

HandlerForProfile map[string]Handler // deprecated; use RequestHandler & ByProfileDispatcher
DefaultHandler Handler // deprecated; use RequestHandler & ByProfileDispatcher

bytesSent atomic.Uint64 // Number of bytes sent
bytesReceived atomic.Uint64 // Number of bytes received

cancelCtx context.Context // When cancelled, closes all connections. Terminates receiveLoop(s), which triggers sender and parseLoop stop
}

// A function called when a Handler function panics.
type HandlerPanicHandler func(request, response *Message, err interface{})

// A function called when the connection closes due to a fatal protocol error.
type FatalErrorHandler func(error)

// Defines a logging interface for use within the blip codebase. Implemented by Context.
// Any code that needs to take a Context just for logging purposes should take a Logger instead.
type LogContext interface {
Expand Down Expand Up @@ -116,6 +113,13 @@ func NewContextCustomID(id string, opts ContextOptions) (*Context, error) {
}

func (blipCtx *Context) start(ws *websocket.Conn) *Sender {
if blipCtx.RequestHandler == nil {
// Compatibility mode: If the app hasn't set a RequestHandler, set one that uses the old
// handlerForProfile and defaultHandler.
blipCtx.RequestHandler = blipCtx.compatibilityHandler
} else if len(blipCtx.HandlerForProfile) > 0 || blipCtx.DefaultHandler != nil {
panic("blip.Context cannot have both a RequestHandler and legacy handlerForProfile or defaultHandler")
}
r := newReceiver(blipCtx, ws)
r.sender = newSender(blipCtx, ws, r)
r.sender.start()
Expand Down Expand Up @@ -252,9 +256,9 @@ func (bwss *BlipWebsocketServer) handshake(w http.ResponseWriter, r *http.Reques
protocolHeader := r.Header.Get("Sec-WebSocket-Protocol")
protocol, found := includesProtocol(protocolHeader, bwss.blipCtx.SupportedSubProtocols)
if !found {
stringSeperatedProtocols := strings.Join(bwss.blipCtx.SupportedSubProtocols, ",")
bwss.blipCtx.log("Error: Client doesn't support any of WS protocols: %s only %s", stringSeperatedProtocols, protocolHeader)
err := fmt.Errorf("I only speak %s protocols", stringSeperatedProtocols)
stringSeparatedProtocols := strings.Join(bwss.blipCtx.SupportedSubProtocols, ",")
bwss.blipCtx.log("Error: Client doesn't support any of WS protocols: %s only %s", stringSeparatedProtocols, protocolHeader)
err := fmt.Errorf("I only speak %s protocols", stringSeparatedProtocols)
http.Error(w, err.Error(), http.StatusInternalServerError)
return nil, err
}
Expand Down Expand Up @@ -286,52 +290,6 @@ func (bwss *BlipWebsocketServer) handle(ws *websocket.Conn) {
ws.Close(websocket.StatusNormalClosure, "")
}

//////// DISPATCHING MESSAGES:

func (blipCtx *Context) dispatchRequest(request *Message, sender *Sender) {
defer func() {
// On return/panic, send the response:
response := request.Response()
if panicked := recover(); panicked != nil {
if blipCtx.HandlerPanicHandler != nil {
blipCtx.HandlerPanicHandler(request, response, panicked)
} else {
stack := debug.Stack()
blipCtx.log("PANIC handling BLIP request %v: %v:\n%s", request, panicked, stack)
if response != nil {
response.SetError(BLIPErrorDomain, 500, fmt.Sprintf("Panic: %v", panicked))
}
}
}
if response != nil {
sender.send(response)
}
}()

blipCtx.logMessage("Incoming BLIP Request: %s", request)
handler := blipCtx.HandlerForProfile[request.Properties["Profile"]]
if handler == nil {
handler = blipCtx.DefaultHandler
if handler == nil {
handler = Unhandled
}
}
handler(request)
}

func (blipCtx *Context) dispatchResponse(response *Message) {
defer func() {
// On return/panic, log a warning:
if panicked := recover(); panicked != nil {
stack := debug.Stack()
blipCtx.log("PANIC handling BLIP response %v: %v:\n%s", response, panicked, stack)
}
}()

blipCtx.logMessage("Incoming BLIP Response: %s", response)
//panic("UNIMPLEMENTED") //TODO
}

//////// LOGGING:

func (blipCtx *Context) log(format string, params ...interface{}) {
Expand All @@ -350,6 +308,8 @@ func (blipCtx *Context) logFrame(format string, params ...interface{}) {
}
}

//////// UTILITIES:

func includesProtocol(header string, protocols []string) (string, bool) {
for _, item := range strings.Split(header, ",") {
for _, protocol := range protocols {
Expand Down
111 changes: 19 additions & 92 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,17 @@ func TestServerAbruptlyCloseConnectionBehavior(t *testing.T) {
blipContextEchoServer.LogFrames = true

// Websocket Server
server := blipContextEchoServer.WebSocketServer()

// HTTP Handler wrapping websocket server
http.Handle("/TestServerAbruptlyCloseConnectionBehavior", server)
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go func() {
t.Error(http.Serve(listener, nil))
}()
listener := startTestListener(t, blipContextEchoServer)
defer listener.Close()

// ----------------- Setup Echo Client ----------------------------------------

blipContextEchoClient, err := NewContext(defaultContextOptions)
if err != nil {
t.Fatal(err)
}
port := listener.Addr().(*net.TCPAddr).Port
destUrl := fmt.Sprintf("ws://localhost:%d/TestServerAbruptlyCloseConnectionBehavior", port)
sender, err := blipContextEchoClient.Dial(destUrl)
if err != nil {
t.Fatalf("Error opening WebSocket: %v", err)
}
sender := startTestClient(t, blipContextEchoClient, listener)
defer sender.Close()

// Create echo request
echoRequest := NewRequest()
Expand All @@ -126,15 +113,7 @@ func TestServerAbruptlyCloseConnectionBehavior(t *testing.T) {

// Read the echo response
response := echoRequest.Response() // <--- SG #3268 was causing this to block indefinitely
responseBody, err := response.Body()

// Assertions about echo response (these might need to be altered, maybe what's expected in this scenario is actually an error)
assert.True(t, err == nil)
assert.True(t, len(responseBody) == 0)

// TODO: add more assertions about the response. I'm not seeing any errors, or any
// TODO: way to differentiate this response with a normal response other than having an empty body

requireBLIPError(t, response, BLIPErrorDomain, DisconnectedCode)
}

/*
Expand Down Expand Up @@ -204,12 +183,7 @@ func TestClientAbruptlyCloseConnectionBehavior(t *testing.T) {
sent := clientSender.Send(echoAmplifyRequest)
assert.True(t, sent)
echoAmplifyResponse := echoAmplifyRequest.Response() // <--- SG #3268 was causing this to block indefinitely
echoAmplifyResponseBody, _ := echoAmplifyResponse.Body()
assert.True(t, len(echoAmplifyResponseBody) == 0)

// TODO: add more assertions about the response. I'm not seeing any errors, or any
// TODO: way to differentiate this response with a normal response other than having an empty body

requireBLIPError(t, echoAmplifyResponse, BLIPErrorDomain, DisconnectedCode)
}

// Create a blip profile handler to respond to echo requests and then abruptly close the socket
Expand Down Expand Up @@ -239,31 +213,15 @@ func TestClientAbruptlyCloseConnectionBehavior(t *testing.T) {
blipContextEchoServer.LogFrames = true

// Websocket Server
server := blipContextEchoServer.WebSocketServer()

// HTTP Handler wrapping websocket server
http.Handle("/TestClientAbruptlyCloseConnectionBehavior", server)
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
go func() {
t.Error(http.Serve(listener, nil))
}()
listener := startTestListener(t, blipContextEchoServer)
defer listener.Close()

// ----------------- Setup Echo Client ----------------------------------------

blipContextEchoClient, err := NewContext(defaultContextOptions)
if err != nil {
t.Fatal(err)
}
port := listener.Addr().(*net.TCPAddr).Port
destUrl := fmt.Sprintf("ws://localhost:%d/TestClientAbruptlyCloseConnectionBehavior", port)
sender, err := blipContextEchoClient.Dial(destUrl)
if err != nil {
t.Fatalf("Error opening WebSocket: %v", err)
}

// Handle EchoAmplifyData that should be initiated by server in response to getting incoming echo requests
dispatchEchoAmplify := func(request *Message) {
_, err := request.Body()
Expand All @@ -277,6 +235,9 @@ func TestClientAbruptlyCloseConnectionBehavior(t *testing.T) {
}
blipContextEchoClient.HandlerForProfile["BLIPTest/EchoAmplifyData"] = dispatchEchoAmplify

sender := startTestClient(t, blipContextEchoClient, listener)
defer sender.Close()

// Create echo request
echoRequest := NewRequest()
echoRequest.SetProfile("BLIPTest/EchoData")
Expand All @@ -299,7 +260,7 @@ func TestClientAbruptlyCloseConnectionBehavior(t *testing.T) {
responseBody, err := response.Body()

// Assertions about echo response (these might need to be altered, maybe what's expected in this scenario is actually an error)
assert.True(t, err == nil)
assert.NoError(t, err)
assert.Equal(t, "hello", string(responseBody))

// Wait until the amplify request was received by client (from server), and that the server read the response
Expand Down Expand Up @@ -395,35 +356,24 @@ func TestUnsupportedSubProtocol(t *testing.T) {
serverCtx.LogMessages = true
serverCtx.LogFrames = true

server := serverCtx.WebSocketServer()

mux := http.NewServeMux()
mux.Handle("/someBlip", server)
listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}

go func() {
err := http.Serve(listener, mux)
if err != nil {
panic(err)
}
}()
listener := startTestListener(t, serverCtx)
defer listener.Close()

// Client
client, err := NewContext(ContextOptions{ProtocolIds: testCase.ClientProtocol})
if err != nil {
t.Fatal(err)
}
port := listener.Addr().(*net.TCPAddr).Port
destUrl := fmt.Sprintf("ws://localhost:%d/someBlip", port)
destUrl := fmt.Sprintf("ws://localhost:%d/blip", port)

s, err := client.Dial(destUrl)
if testCase.ExpectError {
assert.True(t, err != nil)
} else {
assert.Equal(t, nil, err)
}
if s != nil {
s.Close()
}

Expand Down Expand Up @@ -681,18 +631,16 @@ func TestServerContextClose(t *testing.T) {
echoRequest.SetBody(echoResponseBody)
receivedRequests.Add(1)
sent := sender.Send(echoRequest)
assert.True(t, sent)
require.True(t, sent)

// Read the echo response. Closed connection will result in empty response, as EOF message
// isn't currently returned by blip client
// Read the echo response if we sent something
response := echoRequest.Response()
responseBody, err := response.Body()
assert.True(t, err == nil)
if len(responseBody) == 0 {
log.Printf("empty response, connection closed")
return
}

assert.Equal(t, echoResponseBody, responseBody)
}
}
Expand Down Expand Up @@ -728,27 +676,6 @@ func assertHandlerNoError(t *testing.T, server *BlipWebsocketServer, wg *sync.Wa
}
}

// Wait for the WaitGroup, or return an error if the wg.Wait() doesn't return within timeout
// TODO: this code is duplicated with code in Sync Gateway utilities_testing.go. Should be refactored to common repo.
func WaitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) error {

// Create a channel so that a goroutine waiting on the waitgroup can send it's result (if any)
wgFinished := make(chan bool)

go func() {
wg.Wait()
wgFinished <- true
}()

select {
case <-wgFinished:
return nil
case <-time.After(timeout):
return fmt.Errorf("Timed out waiting after %v", timeout)
}

}

// StringPtr returns a pointer to the string value passed in
func StringPtr(s string) *string {
return &s
Expand Down
Loading