-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][ml] Support Bookkeeper batch read #23180
base: master
Are you sure you want to change the base?
Conversation
# Conflicts: # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
if (!useBatchRead(entriesToRead, useBookkeeperV2WireProtocol, isStriped)) { | ||
return handle.readAsync(firstEntry, lastEntry); | ||
} | ||
return handle.batchReadAsync(firstEntry, entriesToRead, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If maxSize = 0, info log will be printed for each call, which will affect performance.
https://github.com/apache/bookkeeper/blob/7c41204506122ed6904289f4814d4130274874aa/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L955-L967
private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
boolean isRecoveryRead) {
int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes;
if (maxSize > nettyMaxFrameSizeBytes) {
LOG.info(
"The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.",
nettyMaxFrameSizeBytes);
maxSize = nettyMaxFrameSizeBytes;
}
if (maxSize <= 0) {
LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes);
maxSize = nettyMaxFrameSizeBytes;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If isAutoSkipNonRecoverableData is set to true, is the batch read failure consistent with the behavior here?
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
Lines 115 to 129 in 66e1a06
} else if (cursor.getConfig().isAutoSkipNonRecoverableData() | |
&& exception instanceof NonRecoverableLedgerException) { | |
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(), | |
readPosition, exception.getMessage()); | |
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger(); | |
Position nexReadPosition; | |
Long lostLedger = null; | |
if (exception instanceof ManagedLedgerException.LedgerNotExistException) { | |
// try to find and move to next valid ledger | |
nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId()); | |
lostLedger = readPosition.getLedgerId(); | |
} else { | |
// Skip this read operation | |
nexReadPosition = ledger.getValidPositionAfterSkippedEntries(readPosition, count); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If maxSize = 0, info log will be printed for each call, which will affect performance. https://github.com/apache/bookkeeper/blob/7c41204506122ed6904289f4814d4130274874aa/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L955-L967
private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize, boolean isRecoveryRead) { int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes; if (maxSize > nettyMaxFrameSizeBytes) { LOG.info( "The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes); maxSize = nettyMaxFrameSizeBytes; } if (maxSize <= 0) { LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes); maxSize = nettyMaxFrameSizeBytes; }
apache/bookkeeper#4485 I created a PR to change the log level to debug
int entriesToRead = (int) (lastEntry - firstEntry + 1); | ||
// Batch read is not supported for striped ledgers. | ||
LedgerMetadata m = handle.getLedgerMetadata(); | ||
boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to be made clear in the document that enabling batch does not necessarily mean batch reading.
Related BK PR: apache/bookkeeper#4485 |
Motivation
In BP-62, Bookkeeper starting to support batch read API,
the PR is to support Bookkeeper batch reading on the Pulsar side.
Modifications
bookkeeperEnableBatchRead
to control enable BK batch read or not, the default value is false.Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: dao-jun#13