Skip to content

Commit

Permalink
stop sending if the connenction is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Mar 7, 2025
1 parent 139514f commit 5c3154b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 7 deletions.
5 changes: 3 additions & 2 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type Connection interface {
Close()
WaitForClose() <-chan struct{}
IsProxied() bool
Closed() bool
}

type ConsumerHandler interface {
Expand Down Expand Up @@ -658,7 +659,7 @@ func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
}

func (c *connection) internalSendRequest(req *request) {
if c.closed() {
if c.Closed() {
c.log.Warnf("internalSendRequest failed for connectionClosed")
if req.callback != nil {
req.callback(req.cmd, ErrConnectionClosed)
Expand Down Expand Up @@ -1064,7 +1065,7 @@ func (c *connection) setStateClosed() {
c.state.Store(int32(connectionClosed))
}

func (c *connection) closed() bool {
func (c *connection) Closed() bool {
return connectionClosed == c.getState()
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
// When the current connection is in a closed state or the broker actively notifies that the
// current connection is closed, we need to remove the connection object from the current
// connection pool and create a new connection.
if conn.closed() {
if conn.Closed() {
p.log.Debugf("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
delete(p.connections, key)
Expand Down
7 changes: 4 additions & 3 deletions pulsar/internal/connection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"fmt"
"io"

pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"google.golang.org/protobuf/proto"

pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

type connectionReader struct {
Expand All @@ -41,7 +42,7 @@ func (r *connectionReader) readFromConnection() {
for {
cmd, headersAndPayload, err := r.readSingleCommand()
if err != nil {
if !r.cnx.closed() {
if !r.cnx.Closed() {
r.cnx.log.WithError(err).Infof("Error reading from connection")
r.cnx.Close()
}
Expand Down Expand Up @@ -122,7 +123,7 @@ func (r *connectionReader) readAtLeast(size uint32) error {
n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
if err != nil {
// has the connection been closed?
if r.cnx.closed() {
if r.cnx.Closed() {
return errConnectionClosed
}
r.cnx.Close()
Expand Down
10 changes: 9 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,15 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
sequenceID: sequenceID,
sendRequests: callbacks,
})
p._getConn().WriteData(buffer)

// If the connection is closed, stop sending data. Continuing to send data
// to a closed connection will cause the buffer to be passed to it, which
// prevents further processing.
conn := p._getConn()
if conn.Closed() {
return
}
conn.WriteData(buffer)
}
}

Expand Down

0 comments on commit 5c3154b

Please sign in to comment.