-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: SendAsync callback was not invoked when producer is in reconnecting #1333
base: master
Are you sure you want to change the base?
Conversation
@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() { | |||
} | |||
case connectionClosed := <-p.connectClosedCh: | |||
p.log.Info("runEventsLoop will reconnect in producer") | |||
p.reconnectToBroker(connectionClosed) | |||
// reconnect to broker in a new goroutine so that it won't block the event loop, see issue #1332 | |||
go p.reconnectToBroker(connectionClosed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if my concern is correct.
If use a new thread here, we need to carefully handle follow case:
- When a connection closed request comes in during reconnection, we may need to add a
reconnecting
status to prevent concurrency issues. - During reconnection, if a message is sent using the old connection(maybe closed connection), I'm not sure about the behavior here. Could it stuck or throw error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with the check below, we can't be 100% sure of using the old connection, because it's inherently concurrent.
conn := p._getConn()
if conn.Closed() {
return
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I'm thinking that before a message enters the pendingQueue
, even if the user is using the sendAsync
method, we should block the user's call.
This way, on the user side, the timeout should be calculated from when the sendAsync method call succeeds; otherwise, it should block at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with the check below, we can't be 100% sure of using the old connection, because it's inherently concurrent.
conn := p._getConn() if conn.Closed() { return }
I agree with that. Actually, I think that it is quite strange to pass a buffer to a connection's channel when sending a message, as this causes the buffer to be read and written by two different goroutines, leading to a data race. What's worse is that if a buffer is sent into a connection's channel and the connection is closed at the same time, the buffer ends up in an new uncertain or pending state, we need to pay more attention to handling this situation again, currently, this situation is not handled when the connection is closed by network or by server notification pb.BaseCommand_CLOSE_PRODUCER/pb.BaseCommand_CLOSE_CONSUMER, may be we need a new PR to handle this.
In my opinion, it would be better to keep the message in the pending queue and use timeout events and server ack events to determine whether a message has timed out or succeeded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I'm thinking that before a message enters the
pendingQueue
, even if the user is using thesendAsync
method, we should block the user's call.This way, on the user side, the timeout should be calculated from when the sendAsync method call succeeds; otherwise, it should block at this point.
I have thought about that, but that will be a breaking change. If we change it that way, SendAsync will sometimes become SendSync and the config MaxPendingMessages
will become meaningless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't say we should introduce new goroutines.
What I mean is that we are already sending data in the goroutine of partitionProducer.internalSend()
, we don't have to create a new goroutine again in connection.run()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at the Java implementation, you'll find that Java handles many things in
sendAsync internal
, and before these logics are processed, the user call is blocked.
We have done the same things in go client, the question is that partitionProducer.runEventsLoop()
is block in the reconnecting case, which prevent the data from entering the pendingQueue
, and failTimeoutMessages()
can't failed it, because failTimeoutMessages()
only fail the ones in pendingQueue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope we can use a single goroutinue to handle the reconnect, so like: #691, and then add a variable to record the reconnection state.
If this state is reconnecting, we don't send the request.
I think the connection.State
is sufficient and timely enough to control this, because the reconnection state of a producer relies on connection.State
and the notification from a connection
. If the connection is closed, we should stop sending. The problem is that the connection's implementation still needs to be refactored, as I mentioned above: make the sending logic synchronous and close the connection when receiving BaseCommand_CLOSE_PRODUCER
or BaseCommand_CLOSE_CONSUMER
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the
connection.State
is sufficient and timely enough to control this, because the reconnection state of a producer relies onconnection.State
and the notification from aconnection
. If the connection is closed, we should stop sending.
A connection has multiple producers and consumers, please consider one case the connection is active, the producer is inactive, you should not send the message to the broker.
https://github.com/apache/pulsar/blob/v4.0.2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2366-L2382, this is java implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the
connection.State
is sufficient and timely enough to control this, because the reconnection state of a producer relies onconnection.State
and the notification from aconnection
. If the connection is closed, we should stop sending.A connection has multiple producers and consumers, please consider one case the connection is active, the producer is inactive, you should not send the message to the broker.
https://github.com/apache/pulsar/blob/v4.0.2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2366-L2382, this is java implementation.
I looked into the java code, but I still can't understand it as the JAVA implemention encapsulate too much details and I know little about JAVA.
You said that please consider one case the connection is active, the producer is inactive
, as I know, only the connection's close event can trigger a produce from active to inactive.
Regardless, the current implemention of the conneciton in pulsar-go-client is problematic, c.internalSendRequest(req)
and c.internalWriteData(data)/c.sendPing()
can happen at the same time as the are run in 2 goroutines, they will call c.cnx.Write()
finally, which will cause write/data conflict, IMO, this can be considered as a BUG, but we can fix it in another PR.
And, the conneciton use incomingRequestsCh
with capacity of 10 (incomingRequestsCh: make(chan *request, 10)
) to receive and hold user's data, this can be considered as another pendingQueue
, now we have 2 pendingQueue
s, one in the producer, think aboute a case: a user's data/buffer is sent into producer.pendingQueue
and then sent into connection.incomingRequestsCh
, but c.cnx.Write()
is not called yet, now there can be a problem: If the timeout config is short, now, the reqeust is treated as timeout
by producer.failTimeoutMessages()
, then the buffer is put back into the pool, but connection.incomingRequestsCh
still keeps a reference of the buffer, if the buffer is realloced by the pool, now, if the connection run to c.cnx.Write()
, there will be data race. This can be considered another BUG,t oo, a new PR is needed again.
go func() {
for {
select {
case <-c.closeCh:
c.failLeftRequestsWhenClose()
return
case req := <-c.incomingRequestsCh:
if req == nil {
return // TODO: this never gonna be happen
}
c.internalSendRequest(req)
}
}
}()
for {
select {
case <-c.closeCh:
return
case cmd := <-c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
case data := <-c.writeRequestsCh:
if data == nil {
return
}
c.internalWriteData(data)
case <-pingSendTicker.C:
c.sendPing()
}
}
Fixes #1332
Master Issue: #1332
Motivation
SendAsync() callback should be called to give a response to the user/application when the producer is busy in reconnecting.
Modifications
Run reconnecting in a seperate goroutine.
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation