Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
*/
package org.apache.hadoop.io;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.AccessController;
import java.security.PrivilegedAction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;

import org.apache.hadoop.thirdparty.com.google.common.primitives.UnsignedBytes;

Expand Down Expand Up @@ -57,8 +54,7 @@ private static Comparer<byte[]> lexicographicalComparerJavaImpl() {


/**
* Provides a lexicographical comparer implementation; either a Java
* implementation or a faster implementation based on {@link Unsafe}.
* Provides a lexicographical comparer Java implementation.
*
* <p>Uses reflection to gracefully fall back to the Java implementation if
* {@code Unsafe} isn't available.
Expand Down Expand Up @@ -131,34 +127,19 @@ public int compareTo(byte[] buffer1, int offset1, int length1,
private enum UnsafeComparer implements Comparer<byte[]> {
INSTANCE;

static final Unsafe theUnsafe;

/** The offset to the first element in a byte array. */
static final int BYTE_ARRAY_BASE_OFFSET;

static {
theUnsafe = (Unsafe) AccessController.doPrivileged(
new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return f.get(null);
} catch (NoSuchFieldException e) {
// It doesn't matter what we throw;
// it's swallowed in getBestComparer().
throw new Error();
} catch (IllegalAccessException e) {
throw new Error();
}
}
});

BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
/**
* For a byte[] array, the base offset is always 0 because byte is the most fundamental data type in Java,
* and it does not have any additional header information.
* Therefore, it can be safely assumed that its base offset is 0.
*/
BYTE_ARRAY_BASE_OFFSET = 0;

// sanity check - this should never fail
if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
if (Byte.BYTES != 1) {
throw new AssertionError();
}
}
Expand Down Expand Up @@ -207,8 +188,8 @@ public int compareTo(byte[] buffer1, int offset1, int length1,
* On the other hand, it is substantially faster on 64-bit.
*/
for (i = 0; i < strideLimit; i += stride) {
long lw = theUnsafe.getLong(buffer1, offset1Adj + (long) i);
long rw = theUnsafe.getLong(buffer2, offset2Adj + (long) i);
long lw = getLongFromByteBuffer(buffer1, offset1Adj + i);
long rw = getLongFromByteBuffer(buffer2, offset2Adj + i);

if (lw != rw) {
if (!littleEndian) {
Expand Down Expand Up @@ -241,5 +222,11 @@ public int compareTo(byte[] buffer1, int offset1, int length1,
return length1 - length2;
}
}

/** Get the long value from the specified offset. */
private static long getLongFromByteBuffer(byte[] buffer, int offset) {
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
return byteBuffer.order(ByteOrder.nativeOrder()).getLong(offset);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
*/
package org.apache.hadoop.io.nativeio;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
Expand All @@ -45,7 +46,6 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;

import org.apache.hadoop.classification.VisibleForTesting;

Expand Down Expand Up @@ -898,16 +898,35 @@ static long getMemlockLimit() {
*/
static long getOperatingSystemPageSize() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe)f.get(null);
return unsafe.pageSize();
if (isAvailable()) {
return getOperatingSystemPageSize0();
} else {
String os = System.getProperty("os.name").toLowerCase();
ProcessBuilder processBuilder;
if (os.contains("mac") || os.contains("linux") || os.contains("unix")) {
processBuilder = new ProcessBuilder("getconf", "PAGESIZE");
} else {
return 4096;
}
Process process = processBuilder.start();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
String line = reader.readLine();
if (line != null && !line.isEmpty()) {
return Long.parseLong(line.trim());
} else {
return 4096;
}
}
}
} catch (Throwable e) {
LOG.warn("Unable to get operating system page size. Guessing 4096.", e);
return 4096;
}
}

private static native long getOperatingSystemPageSize0();

private static class CachedUid {
final long timestamp;
final String username;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,25 @@ JNIEnv *env, jclass clazz)
#endif
}

JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_getOperatingSystemPageSize0(
JNIEnv *env, jclass clazz)
{
#ifdef UNIX
long pageSize = sysconf(_SC_PAGESIZE);
if (pageSize < 0) {
// Guess 4096
return 4096;
}
return (jlong) pageSize;
#endif
#ifdef WINDOWS
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
return (jlong) sysInfo.dwPageSize;
#endif
}

JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_copyFileUnbuffered0(
JNIEnv *env, jclass clazz, jstring jsrc, jstring jdst)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
Expand All @@ -37,8 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sun.misc.Unsafe;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.ComparisonChain;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
Expand All @@ -54,19 +53,6 @@ public class ShortCircuitShm {

protected static final int BYTES_PER_SLOT = 64;

private static final Unsafe unsafe = safetyDance();

private static Unsafe safetyDance() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (Unsafe)f.get(null);
} catch (Throwable e) {
LOG.error("failed to load misc.Unsafe", e);
}
return null;
}

/**
* Calculate the usable size of a shared memory segment.
* We round down to a multiple of the slot size and do some validation.
Expand Down Expand Up @@ -259,17 +245,17 @@ public class Slot {
private static final long ANCHORABLE_FLAG = 1L<<62;

/**
* The slot address in memory.
* The slot address in memory. Use AtomicLong instead of Unsafe
*/
private final long slotAddress;
private final AtomicLong slotAddress;

/**
* BlockId of the block this slot is used for.
*/
private final ExtendedBlockId blockId;

Slot(long slotAddress, ExtendedBlockId blockId) {
this.slotAddress = slotAddress;
this.slotAddress = new AtomicLong(slotAddress);
this.blockId = blockId;
}

Expand Down Expand Up @@ -307,41 +293,39 @@ public SlotId getSlotId() {
*/
public int getSlotIdx() {
return Ints.checkedCast(
(slotAddress - baseAddress) / BYTES_PER_SLOT);
(slotAddress.get() - baseAddress) / BYTES_PER_SLOT);
}

/**
* Clear the slot.
*/
void clear() {
unsafe.putLongVolatile(null, this.slotAddress, 0);
slotAddress.set(0);
}

private boolean isSet(long flag) {
long prev = unsafe.getLongVolatile(null, this.slotAddress);
long prev = slotAddress.get();
return (prev & flag) != 0;
}

private void setFlag(long flag) {
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
prev = slotAddress.get();
if ((prev & flag) != 0) {
return;
}
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, prev | flag));
} while (!slotAddress.compareAndSet(prev, prev | flag));
}

private void clearFlag(long flag) {
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
prev = slotAddress.get();
if ((prev & flag) == 0) {
return;
}
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, prev & (~flag)));
} while (!slotAddress.compareAndSet(prev, prev & (~flag)));
}

public boolean isValid() {
Expand Down Expand Up @@ -369,7 +353,7 @@ public void makeUnanchorable() {
}

public boolean isAnchored() {
long prev = unsafe.getLongVolatile(null, this.slotAddress);
long prev = slotAddress.get();
// Slot is no longer valid.
return (prev & VALID_FLAG) != 0 && ((prev & 0x7fffffff) != 0);
}
Expand All @@ -385,7 +369,7 @@ public boolean isAnchored() {
public boolean addAnchor() {
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
prev = slotAddress.get();
if ((prev & VALID_FLAG) == 0) {
// Slot is no longer valid.
return false;
Expand All @@ -398,8 +382,7 @@ public boolean addAnchor() {
// Too many other threads have anchored the slot (2 billion?)
return false;
}
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, prev + 1));
} while (!slotAddress.compareAndSet(prev, prev + 1));
return true;
}

Expand All @@ -409,12 +392,11 @@ public boolean addAnchor() {
public void removeAnchor() {
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
prev = slotAddress.get();
Preconditions.checkState((prev & 0x7fffffff) != 0,
"Tried to remove anchor for slot " + slotAddress +", which was " +
"not anchored.");
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, prev - 1));
} while (!slotAddress.compareAndSet(prev, prev - 1));
}

@Override
Expand Down Expand Up @@ -473,11 +455,6 @@ public ShortCircuitShm(ShmId shmId, FileInputStream stream)
throw new UnsupportedOperationException(
"DfsClientShm is not yet implemented for Windows.");
}
if (unsafe == null) {
throw new UnsupportedOperationException(
"can't use DfsClientShm because we failed to " +
"load misc.Unsafe.");
}
this.shmId = shmId;
this.mmappedLength = getUsableLength(stream);
this.baseAddress = POSIX.mmap(stream.getFD(),
Expand Down
Loading