From 5c3154b8f223b4da3ea6878c61a3bc6efd66f811 Mon Sep 17 00:00:00 2001 From: gunli Date: Tue, 18 Feb 2025 20:17:57 +0800 Subject: [PATCH] stop sending if the connenction is closed --- pulsar/internal/connection.go | 5 +++-- pulsar/internal/connection_pool.go | 2 +- pulsar/internal/connection_reader.go | 7 ++++--- pulsar/producer_partition.go | 10 +++++++++- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 57fc724190..3f622aad56 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -88,6 +88,7 @@ type Connection interface { Close() WaitForClose() <-chan struct{} IsProxied() bool + Closed() bool } type ConsumerHandler interface { @@ -658,7 +659,7 @@ func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error { } func (c *connection) internalSendRequest(req *request) { - if c.closed() { + if c.Closed() { c.log.Warnf("internalSendRequest failed for connectionClosed") if req.callback != nil { req.callback(req.cmd, ErrConnectionClosed) @@ -1064,7 +1065,7 @@ func (c *connection) setStateClosed() { c.state.Store(int32(connectionClosed)) } -func (c *connection) closed() bool { +func (c *connection) Closed() bool { return connectionClosed == c.getState() } diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index cd082188b6..363c7731ef 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -103,7 +103,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U // When the current connection is in a closed state or the broker actively notifies that the // current connection is closed, we need to remove the connection object from the current // connection pool and create a new connection. - if conn.closed() { + if conn.Closed() { p.log.Debugf("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) delete(p.connections, key) diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go index c2541f68c3..2397f29a10 100644 --- a/pulsar/internal/connection_reader.go +++ b/pulsar/internal/connection_reader.go @@ -21,8 +21,9 @@ import ( "fmt" "io" - pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "google.golang.org/protobuf/proto" + + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" ) type connectionReader struct { @@ -41,7 +42,7 @@ func (r *connectionReader) readFromConnection() { for { cmd, headersAndPayload, err := r.readSingleCommand() if err != nil { - if !r.cnx.closed() { + if !r.cnx.Closed() { r.cnx.log.WithError(err).Infof("Error reading from connection") r.cnx.Close() } @@ -122,7 +123,7 @@ func (r *connectionReader) readAtLeast(size uint32) error { n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size)) if err != nil { // has the connection been closed? - if r.cnx.closed() { + if r.cnx.Closed() { return errConnectionClosed } r.cnx.Close() diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 6e028f062e..bf53a0b76c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -903,7 +903,15 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, sequenceID: sequenceID, sendRequests: callbacks, }) - p._getConn().WriteData(buffer) + + // If the connection is closed, stop sending data. Continuing to send data + // to a closed connection will cause the buffer to be passed to it, which + // prevents further processing. + conn := p._getConn() + if conn.Closed() { + return + } + conn.WriteData(buffer) } }