-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Potential data/write conflicts #1336
Conversation
This bug has been discussed in #1333. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your contribution.
I don't understand what the issue is with different goroutines calling ctx.write
. Didn't we add a lock before writing?
pulsar-client-go/pulsar/internal/connection.go
Lines 518 to 519 in 024e230
c.writeBufferLock.Lock() | |
defer c.writeBufferLock.Unlock() |
If there is a issue, a single client may have multiple consumer and producer objects(many goroutines), which might use the same connection. How can we ensure they are all in the same goroutine?
BTW: Would it be possible to add unit tests to help reproduce the issue instead of guessing?
It is the same as writing a file in different threads, it will lead to data conflict. |
@shibd Writing data and listening for the close event in separate goroutines can lead to inconsistencies in the response. This way may cause incorrect responses due to the race condition between the broker's successful response and the connection close. For example:
|
@nodece Thanks for explaining, I see.
I will revert my requested change, but I prefer to have a unit test to cover this.
In addition to @nodece's explanation, currently the go func() {
for {
select {
/* ... */
case req := <-c.incomingRequestsCh:
/* ... */
c.internalSendRequest(req) // [1]
}
}
}()
for {
select {
/* ... */
case data := <-c.writeRequestsCh:
/* ... */
c.internalWriteData(data) // [2] |
@gunli Please check CI. |
@gunli The failure of To resolve this issue, read and write operations should be handled in separate goroutines to avoid blocking and ensure smooth message processing. |
OK, I will create a new PR to fix this, it seems |
Please merge #1343 first, I will rebase and update this PR after that. |
mark it as draft until #1343 is merged, I will rebase and update this PR after that. |
Fixes #1335
Master Issue: #1335
Motivation
Sync all the write calls of a connection into one goroutine to avoid potential data/write conflicts
Modifications
Sync all the write calls of a connection into one goroutine.
Verifying this change
Does this pull request potentially affect one of the following parts:
No
@shibd @nodece @RobertIndie @BewareMyPower @crossoverJie
If
yes
was chosen, please highlight the changesDocumentation