Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: SendAsync callback was not invoked when producer is in reconnecting #1333

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

gunli
Copy link
Contributor

@gunli gunli commented Feb 18, 2025

Fixes #1332

Master Issue: #1332

Motivation

SendAsync() callback should be called to give a response to the user/application when the producer is busy in reconnecting.

Modifications

Run reconnecting in a seperate goroutine.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • run TestProducerKeepReconnectingAndThenCallSendAsync()

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@gunli
Copy link
Contributor Author

gunli commented Feb 18, 2025

@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() {
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker(connectionClosed)
// reconnect to broker in a new goroutine so that it won't block the event loop, see issue #1332
go p.reconnectToBroker(connectionClosed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if my concern is correct.

If use a new thread here, we need to carefully handle follow case:

  1. When a connection closed request comes in during reconnection, we may need to add a reconnecting status to prevent concurrency issues.
  2. During reconnection, if a message is sent using the old connection(maybe closed connection), I'm not sure about the behavior here. Could it stuck or throw error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with the check below, we can't be 100% sure of using the old connection, because it's inherently concurrent.

conn := p._getConn()
		if conn.Closed() {
			return
		}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm thinking that before a message enters the pendingQueue, even if the user is using the sendAsync method, we should block the user's call.

This way, on the user side, the timeout should be calculated from when the sendAsync method call succeeds; otherwise, it should block at this point.

Copy link
Contributor Author

@gunli gunli Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with the check below, we can't be 100% sure of using the old connection, because it's inherently concurrent.

conn := p._getConn()
		if conn.Closed() {
			return
		}

I agree with that. Actually, I think that it is quite strange to pass a buffer to a connection's channel when sending a message, as this causes the buffer to be read and written by two different goroutines, leading to a data race. What's worse is that if a buffer is sent into a connection's channel and the connection is closed at the same time, the buffer ends up in an new uncertain or pending state, we need to pay more attention to handling this situation again, currently, this situation is not handled when the connection is closed by network or by server notification pb.BaseCommand_CLOSE_PRODUCER/pb.BaseCommand_CLOSE_CONSUMER, may be we need a new PR to handle this.

In my opinion, it would be better to keep the message in the pending queue and use timeout events and server ack events to determine whether a message has timed out or succeeded.

Copy link
Contributor Author

@gunli gunli Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm thinking that before a message enters the pendingQueue, even if the user is using the sendAsync method, we should block the user's call.

This way, on the user side, the timeout should be calculated from when the sendAsync method call succeeds; otherwise, it should block at this point.

I have thought about that, but that will be a breaking change. If we change it that way, SendAsync will sometimes become SendSync and the config MaxPendingMessages will become meaningless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't say we should introduce new goroutines.

What I mean is that we are already sending data in the goroutine of partitionProducer.internalSend(), we don't have to create a new goroutine again in connection.run().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the Java implementation, you'll find that Java handles many things in sendAsync internal, and before these logics are processed, the user call is blocked.

We have done the same things in go client, the question is that partitionProducer.runEventsLoop() is block in the reconnecting case, which prevent the data from entering the pendingQueue, and failTimeoutMessages() can't failed it, because failTimeoutMessages() only fail the ones in pendingQueue.

Copy link
Contributor Author

@gunli gunli Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we can use a single goroutinue to handle the reconnect, so like: #691, and then add a variable to record the reconnection state.

If this state is reconnecting, we don't send the request.

I think the connection.State is sufficient and timely enough to control this, because the reconnection state of a producer relies on connection.State and the notification from a connection. If the connection is closed, we should stop sending. The problem is that the connection's implementation still needs to be refactored, as I mentioned above: make the sending logic synchronous and close the connection when receiving BaseCommand_CLOSE_PRODUCER or BaseCommand_CLOSE_CONSUMER.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the connection.State is sufficient and timely enough to control this, because the reconnection state of a producer relies on connection.State and the notification from a connection. If the connection is closed, we should stop sending.

A connection has multiple producers and consumers, please consider one case the connection is active, the producer is inactive, you should not send the message to the broker.

https://github.com/apache/pulsar/blob/v4.0.2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2366-L2382, this is java implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the connection.State is sufficient and timely enough to control this, because the reconnection state of a producer relies on connection.State and the notification from a connection. If the connection is closed, we should stop sending.

A connection has multiple producers and consumers, please consider one case the connection is active, the producer is inactive, you should not send the message to the broker.

https://github.com/apache/pulsar/blob/v4.0.2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2366-L2382, this is java implementation.

I looked into the java code, but I still can't understand it as the JAVA implemention encapsulate too much details and I know little about JAVA.

You said that please consider one case the connection is active, the producer is inactive, as I know, only the connection's close event can trigger a produce from active to inactive.

Regardless, the current implemention of the conneciton in pulsar-go-client is problematic, c.internalSendRequest(req) and c.internalWriteData(data)/c.sendPing() can happen at the same time as the are run in 2 goroutines, they will call c.cnx.Write() finally, which will cause write/data conflict, IMO, this can be considered as a BUG, but we can fix it in another PR.

And, the conneciton use incomingRequestsCh with capacity of 10 (incomingRequestsCh: make(chan *request, 10)) to receive and hold user's data, this can be considered as another pendingQueue, now we have 2 pendingQueues, one in the producer, think aboute a case: a user's data/buffer is sent into producer.pendingQueue and then sent into connection.incomingRequestsCh, but c.cnx.Write() is not called yet, now there can be a problem: If the timeout config is short, now, the reqeust is treated as timeout by producer.failTimeoutMessages(), then the buffer is put back into the pool, but connection.incomingRequestsCh still keeps a reference of the buffer, if the buffer is realloced by the pool, now, if the connection run to c.cnx.Write(), there will be data race. This can be considered another BUG,t oo, a new PR is needed again.

go func() {
		for {
			select {
			case <-c.closeCh:
				c.failLeftRequestsWhenClose()
				return

			case req := <-c.incomingRequestsCh:
				if req == nil {
					return // TODO: this never gonna be happen
				}
				c.internalSendRequest(req)
			}
		}
	}()

	for {
		select {
		case <-c.closeCh:
			return

		case cmd := <-c.incomingCmdCh:
			c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
		case data := <-c.writeRequestsCh:
			if data == nil {
				return
			}
			c.internalWriteData(data)

		case <-pingSendTicker.C:
			c.sendPing()
		}
	}

@gunli
Copy link
Contributor Author

gunli commented Feb 25, 2025

PRs to fix potential write conflicts #1336 and data race #1338 have been pushed.

@gunli gunli marked this pull request as draft March 7, 2025 08:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug][Producer] The callback was not invoked during reconnecting.
3 participants