Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discarding Corrupt Message due to apparent decompression failure #1331

Open
frankjkelly opened this issue Feb 12, 2025 · 10 comments
Open

Discarding Corrupt Message due to apparent decompression failure #1331

frankjkelly opened this issue Feb 12, 2025 · 10 comments
Assignees

Comments

@frankjkelly
Copy link

Expected behavior

Expect more logs to help debug what is going on with decompression

Actual behavior

We get the message

{"log":{"consumerID":95,"level":"ERROR",
"msg":"Discarding corrupted message","msgID":{"entryId":1792,"ledgerId":758404,"partition":-1},"name":"","subscription":"reader-czjug","time":"2025-02-12T10:46:13.616639169Z",
"topic":"persistent://XXXX/wav/084b74e6-f4c6-4ff8-9bff-d35370e6a77b",
"validationError":1},"stream":"stdout","timestamp":1739357173616}

which looks like it is coming from here

func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
validationError pb.CommandAck_ValidationError) {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to discardCorruptedMessage " +
"by closing or closed consumer")
return
}
pc.log.WithFields(log.Fields{
"msgID": msgID,
"validationError": validationError,
}).Error("Discarding corrupted message")

validationError:1 appears to be a decompression error

CommandAck_DecompressionError CommandAck_ValidationError = 1

which means it is coming from here I guess?

uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer)
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
return err
}

and given no other log lines then it suggests a cause is this

uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
if err != nil {
return nil, err
}

Steps to reproduce

Sorry I don't have reproduction steps but maybe we could add some logging here?

uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
if err != nil {
return nil, err
}

System configuration

Pulsar version: 2.11.3 and 3.3.2 (we upgraded brokers and still seeing same issue)
Pulsar Golang client: 0.14.0

@RobertIndie
Copy link
Member

The consumer would print the error log here:

if err != nil {
c.log.
WithError(err).
WithField("consumerID", consumerID).
Error("handle message Id: ", response.MessageId)
}

Could you check this log? Here is an example:

time="2025-02-20T18:29:28+08:00" level=error msg="handle message Id: ledgerId:97  entryId:2  partition:-1" consumerID=1 error="fake decompress error" local_addr="127.0.0.1:59035" remote_addr="pulsar://localhost:6650"

@frankjkelly
Copy link
Author

Thanks @RobertIndie yes I see a number of them all with the same message

{"log":{"consumerID":76,
"error":"unexpected EOF","level":"ERROR","local_addr":{"IP":"10.108.35.29","Port":51180,"Zone":""},
"msg":"handle message Id: ledgerId:60350 entryId:361 partition:-1",
"remote_addr":{"ForceQuery":false,"Fragment":"","Host":"platform-pulsar-broker-0.platform-pulsar-broker.t-bt.svc.cluster.local:6650","OmitHost":false,"Opaque":"","Path":"","RawFragment":"","RawPath":"","RawQuery":"","Scheme":"pulsar","User":null},"time":"2025-02-20T13:46:26.095528909Z"},"stream":"stdout","timestamp":1740059186095}

@RobertIndie
Copy link
Member

Sorry for the late reply. Seems I misunderstood and thought it was just a logging issue.

{"log":{"consumerID":76,
"error":"unexpected EOF","level":"ERROR","local_addr":{"IP":"10.108.35.29","Port":51180,"Zone":""},
"msg":"handle message Id: ledgerId:60350 entryId:361 partition:-1",
"remote_addr":{"ForceQuery":false,"Fragment":"","Host":"platform-pulsar-broker-0.platform-pulsar-broker.t-bt.svc.cluster.local:6650","OmitHost":false,"Opaque":"","Path":"","RawFragment":"","RawPath":"","RawQuery":"","Scheme":"pulsar","User":null},"time":"2025-02-20T13:46:26.095528909Z"},"stream":"stdout","timestamp":1740059186095}

Which compression algorithm were you using?
Could you try using another client, like a Java client, to consume that topic if the issue happens again? This will help us determine if the message is actually corrupted or if there's a problem with the Go client's decompression.
How often does this issue occur? Does it only affect a few messages within a topic?

@frankjkelly
Copy link
Author

@RobertIndie we are using CompressionType.LZ4

We have other services consuming the same data that are in Java and they do not report any errors.
We gauge it occurs roughly 2% of our audio workflows.
The downside is we are streaming binary data (audio) so the corruption of any one packet disrupts the entire stream.

@RobertIndie
Copy link
Member

we are using CompressionType.LZ4

That's strange. The LZ4 decompression doesn't seem to use the reader, so it shouldn't cause the EOF error. Maybe I missed something.
However, I noticed that version v2.0.5 of LZ4 is outdated, so I submitted a PR to upgrade it to v4: #1341. The new version of LZ4 has resolved many EOF errors, but I'm not sure if it addresses this issue.

I also submitted another PR to improve the decompression error message to include more useful information: #1342.

After mering this PR, we can try it again and hope we can get more helpful info.

@frankjkelly
Copy link
Author

Awesome - thank you @RobertIndie !!!

@nodece would it be possible to include these two PRs (or at least the logging one) in the 0.15.0 release or will it be in a subsequent release?
https://github.com/apache/pulsar-client-go/releases/tag/v0.15.0-candidate-1

@nodece
Copy link
Member

nodece commented Mar 6, 2025

@frankjkelly It has been a long time since the last release candidate was submitted for voting, but no PMC members have participated in the vote. As a result, I will no longer proceed with the release process.

If you want to use the latest version, please use the following command:

go get github.com/apache/pulsar-client-go@master

@frankjkelly
Copy link
Author

frankjkelly commented Mar 6, 2025

@nodece Sorry to hear about the release. My company has a contract with StreamNative so perhaps I can create an ask of the organization and see if that will help move the release along (with or without the changes).

@nodece
Copy link
Member

nodece commented Mar 7, 2025

@frankjkelly The StreamNative has multiple PMC members, it will be helpful.

@RobertIndie
Copy link
Member

@nodece I can help move the release forward. Or would you like to hand it over to me for 0.15.0 release?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants