Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into remove-compatible-hadoop-version
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Mar 6, 2025
2 parents 75fc637 + 55e0f0a commit 6c24c0c
Show file tree
Hide file tree
Showing 305 changed files with 10,144 additions and 7,708 deletions.
1 change: 1 addition & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ org.objenesis:objenesis:2.6
org.xerial.snappy:snappy-java:1.1.10.4
org.yaml:snakeyaml:2.0
org.wildfly.openssl:wildfly-openssl:2.1.4.Final
ro.isdc.wro4j:wro4j-maven-plugin:1.8.0
software.amazon.awssdk:bundle:2.25.53
net.jodah:failsafe:2.4.4

Expand Down
5 changes: 5 additions & 0 deletions dev-support/docker/Dockerfile_windows_10
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ RUN setx PATH "%PATH%;C:\Maven\apache-maven-3.8.8\bin"
RUN setx PATH "%PATH%;C:\CMake\cmake-3.19.0-win64-x64\bin"
RUN setx PATH "%PATH%;C:\ZStd"
RUN setx PATH "%PATH%;C:\Program Files\Git\usr\bin"
RUN setx PATH "%PATH%;C:\Python"

# The mvnsite module runs a bash script and somewhere down in the invocation, it resorts to call
# /usr/bin/env python3. Thus, we need to create the following symbolic link to satisfy this need.
RUN powershell New-Item -ItemType SymbolicLink -Path "C:\Python\python3" -Target "C:\Python\python.exe"

# We get strange Javadoc errors without this.
RUN setx classpath ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.StringJoiner;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -181,4 +182,11 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate, release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -438,6 +439,13 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, (b) -> { });
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// If the stream doesn't have checksums, just delegate.
if (sums == null) {
Expand All @@ -462,8 +470,8 @@ public void readVectored(List<? extends FileRange> ranges,
}
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate);
sums.readVectored(checksumRanges, allocate, release);
datas.readVectored(dataRanges, allocate, release);
for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -306,4 +307,11 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate, release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.io.Sizes.S_16K;
import static org.apache.hadoop.io.Sizes.S_1M;

Expand Down Expand Up @@ -136,4 +138,31 @@ default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
VectoredReadUtils.readVectored(this, ranges, allocate);
}

/**
* Extension of {@link #readVectored(List, IntFunction)} where a {@code release(buffer)}
* operation may be invoked if problems surface during reads.
* <p>
* The {@code release} operation is invoked after an IOException
* to return the actively buffer to a pool before reporting a failure
* in the future.
* <p>
* The default implementation calls {@link #readVectored(List, IntFunction)}.p
* <p>
* Implementations SHOULD override this method if they can release buffers as
* part of their error handling.
* @param ranges the byte ranges to read
* @param allocate function to allocate ByteBuffer
* @param release callable to release a ByteBuffer.
* @throws IOException any IOE.
* @throws IllegalArgumentException if any of ranges are invalid, or they overlap.
* @throws NullPointerException null arguments.
*/
default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws IOException {
requireNonNull(release);
readVectored(ranges, allocate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,29 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.impl.VectorIOBufferPool;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
Expand Down Expand Up @@ -319,74 +323,131 @@ AsynchronousFileChannel getAsyncChannel() throws IOException {
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: sortedRanges) {
// Set up all of the futures, so that the caller can await on
// their completion.
for (FileRange range: sortedRanges) {
validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
AsynchronousFileChannel channel = getAsyncChannel();
ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
for(int i = 0; i < sortedRanges.size(); ++i) {
FileRange range = sortedRanges.get(i);
buffers[i] = allocate.apply(range.getLength());
channel.read(buffers[i], range.getOffset(), i, asyncHandler);
}
} catch (IOException ioe) {
LOG.debug("Exception occurred during vectored read ", ioe);
for(FileRange range: sortedRanges) {
range.getData().completeExceptionally(ioe);
}
}
final ByteBufferPool pool = new VectorIOBufferPool(allocate, release);
// Initiate the asynchronous reads.
new AsyncHandler(getAsyncChannel(),
sortedRanges,
pool)
.initiateRead();
}
}

/**
* A CompletionHandler that implements readFully and translates back
* into the form of CompletionHandler that our users expect.
* <p>
* All reads are started in {@link #initiateRead()};
* the handler then receives callbacks on success
* {@link #completed(Integer, Integer)}, and on failure
* by {@link #failed(Throwable, Integer)}.
* These are mapped to the specific range in the read, and its
* outcome updated.
*/
static class AsyncHandler implements CompletionHandler<Integer, Integer> {
private static class AsyncHandler implements CompletionHandler<Integer, Integer> {
/** File channel to read from. */
private final AsynchronousFileChannel channel;

/** Ranges to fetch. */
private final List<? extends FileRange> ranges;

/**
* Pool providing allocate/release operations.
*/
private final ByteBufferPool allocateRelease;

/** Buffers being read. */
private final ByteBuffer[] buffers;

AsyncHandler(AsynchronousFileChannel channel,
List<? extends FileRange> ranges,
ByteBuffer[] buffers) {
/**
* Instantiate.
* @param channel open channel.
* @param ranges ranges to read.
* @param allocateRelease pool for allocating buffers, and releasing on failure
*/
AsyncHandler(
final AsynchronousFileChannel channel,
final List<? extends FileRange> ranges,
final ByteBufferPool allocateRelease) {
this.channel = channel;
this.ranges = ranges;
this.buffers = buffers;
this.buffers = new ByteBuffer[ranges.size()];
this.allocateRelease = allocateRelease;
}

/**
* Initiate the read operation.
* <p>
* Allocate all buffers, queue the read into the channel,
* providing this object as the handler.
*/
private void initiateRead() {
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
buffers[i] = allocateRelease.getBuffer(false, range.getLength());
channel.read(buffers[i], range.getOffset(), i, this);
}
}

/**
* Callback for a completed full/partial read.
* <p>
* For an EOF the number of bytes may be -1.
* That is mapped to a {@link #failed(Throwable, Integer)} outcome.
* @param result The bytes read.
* @param rangeIndex range index within the range list.
*/
@Override
public void completed(Integer result, Integer r) {
FileRange range = ranges.get(r);
ByteBuffer buffer = buffers[r];
public void completed(Integer result, Integer rangeIndex) {
FileRange range = ranges.get(rangeIndex);
ByteBuffer buffer = buffers[rangeIndex];
if (result == -1) {
failed(new EOFException("Read past End of File"), r);
// no data was read back.
failed(new EOFException("Read past End of File"), rangeIndex);
} else {
if (buffer.remaining() > 0) {
// issue a read for the rest of the buffer
// QQ: What if this fails? It has the same handler.
channel.read(buffer, range.getOffset() + buffer.position(), r, this);
channel.read(buffer, range.getOffset() + buffer.position(), rangeIndex, this);
} else {
// QQ: Why is this required? I think because we don't want the
// user to read data beyond limit.
// Flip the buffer and declare success.
buffer.flip();
range.getData().complete(buffer);
}
}
}

/**
* The read of the range failed.
* <p>
* Release the buffer supplied for this range, then
* report to the future as {{completeExceptionally(exc)}}
* @param exc exception.
* @param rangeIndex range index within the range list.
*/
@Override
public void failed(Throwable exc, Integer r) {
LOG.debug("Failed while reading range {} ", r, exc);
ranges.get(r).getData().completeExceptionally(exc);
public void failed(Throwable exc, Integer rangeIndex) {
LOG.debug("Failed while reading range {} ", rangeIndex, exc);
// release the buffer
allocateRelease.putBuffer(buffers[rangeIndex]);
// report the failure.
ranges.get(rangeIndex).getData().completeExceptionally(exc);
}

}

@Override
Expand Down
Loading

0 comments on commit 6c24c0c

Please sign in to comment.