Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/varlink/go

go 1.13
go 1.21
130 changes: 33 additions & 97 deletions varlink/internal/ctxio/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,6 @@ func NewConn(c net.Conn) *Conn {
}
}

type ioret struct {
n int
err error
}

type rret struct {
val []byte
err error
}

// aLongTimeAgo is a time in the past that indicates a connection should
// immediately time out.
var aLongTimeAgo = time.Unix(1, 0)
Expand All @@ -47,104 +37,50 @@ func (c *Conn) Close() error {
// Write writes to the underlying connection.
// It is not safe for concurrent use with itself.
func (c *Conn) Write(ctx context.Context, buf []byte) (int, error) {
// Enable immediate connection cancelation via context by using the context's
// deadline and also setting a deadline in the past if/when the context is
// canceled. This pattern courtesy of @acln from #networking on Gophers Slack.
dl, _ := ctx.Deadline()
if err := c.conn.SetWriteDeadline(dl); err != nil {
return 0, err
}

ch := make(chan ioret, 1)
go func() {
n, err := c.conn.Write(buf)
ch <- ioret{n, err}
}()

select {
case <-ctx.Done():
// Set deadline to unblock pending Write.
if err := c.conn.SetWriteDeadline(aLongTimeAgo); err != nil {
return 0, err
}
// Wait for goroutine to exit, throwing away the error.
<-ch
// Reset deadline again.
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
return 0, err
}
return 0, ctx.Err()
case ret := <-ch:
return ret.n, ret.err
done := make(chan struct{})
ioInterrupted := context.AfterFunc(ctx, func() {
c.conn.SetWriteDeadline(aLongTimeAgo)
close(done)
})
n, err := c.conn.Write(buf)
if !ioInterrupted() {
<-done
c.conn.SetWriteDeadline(time.Time{})
return n, ctx.Err()
}
return n, err
}

// Read reads from the underlying connection.
// It is not safe for concurrent use with itself or ReadBytes.
func (c *Conn) Read(ctx context.Context, buf []byte) (int, error) {
// Enable immediate connection cancelation via context by using the context's
// deadline and also setting a deadline in the past if/when the context is
// canceled. This pattern courtesy of @acln from #networking on Gophers Slack.
dl, _ := ctx.Deadline()
if err := c.conn.SetReadDeadline(dl); err != nil {
return 0, err
}

ch := make(chan ioret, 1)
go func() {
n, err := c.conn.Read(buf)
ch <- ioret{n, err}
}()

select {
case <-ctx.Done():
// Set deadline to unblock pending Read.
if err := c.conn.SetReadDeadline(aLongTimeAgo); err != nil {
return 0, err
}
// Wait for goroutine to exit, throwing away the error.
<-ch
// Reset deadline again.
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return 0, err
}
return 0, ctx.Err()
case ret := <-ch:
return ret.n, ret.err
done := make(chan struct{})
ioInterrupted := context.AfterFunc(ctx, func() {
c.conn.SetReadDeadline(aLongTimeAgo)
close(done)
})
n, err := c.conn.Read(buf)
if !ioInterrupted() {
<-done
c.conn.SetReadDeadline(time.Time{})
return n, ctx.Err()
}
return n, err
}

// ReadBytes reads from the connection until the bytes are found.
// It is not safe for concurrent use with itself or Read.
func (c *Conn) ReadBytes(ctx context.Context, delim byte) ([]byte, error) {
// Enable immediate connection cancelation via context by using the context's
// deadline and also setting a deadline in the past if/when the context is
// canceled. This pattern courtesy of @acln from #networking on Gophers Slack.
dl, _ := ctx.Deadline()
if err := c.conn.SetReadDeadline(dl); err != nil {
return nil, err
}

ch := make(chan rret, 1)
go func() {
out, err := c.reader.ReadBytes(delim)
ch <- rret{out, err}
}()

select {
case <-ctx.Done():
// Set deadline to unblock pending Write.
if err := c.conn.SetReadDeadline(aLongTimeAgo); err != nil {
return nil, err
}
// Wait for goroutine to exit, throwing away the error.
<-ch
// Reset deadline again.
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return nil, err
}
return nil, ctx.Err()
case ret := <-ch:
return ret.val, ret.err
done := make(chan struct{})
ioInterrupted := context.AfterFunc(ctx, func() {
c.conn.SetReadDeadline(aLongTimeAgo)
close(done)
})
out, err := c.reader.ReadBytes(delim)
if !ioInterrupted() {
<-done
c.conn.SetReadDeadline(time.Time{})
return out, ctx.Err()
}
return out, err
}
Loading