Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ public final class SubjectUtil {
HAS_CALL_AS ? null : lookupDoAsThrowException();
private static final MethodHandle CURRENT = lookupCurrent();

// copied from org.apache.hadoop.util.Shell to break circular dependency
// "1.8"->8, "9"->9, "10"->10
private static final int JAVA_SPEC_VER = Math.max(8,
Integer.parseInt(System.getProperty("java.specification.version").split("\\.")[0]));

public static final boolean THREAD_INHERITS_SUBJECT = checkThreadInheritsSubject();

/**
* Try to return the method handle for Subject#callAs()
*
* @return the method handle, or null if the Java version does not have it
*/
private static MethodHandle lookupCallAs() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -71,6 +83,38 @@ private static MethodHandle lookupCallAs() {
}
}

/**
* Determine whether we need to explicitly propagate the Subject into new Threads.
*
* @return true if new Threads inherit the Subject from the parent
*/
private static boolean checkThreadInheritsSubject() {

boolean securityManagerEnabled = true;
try {
// TODO this needs SecurityManager to compile, use reflection to look it up instead
SecurityManager sm = System.getSecurityManager();
System.setSecurityManager(sm);
} catch (UnsupportedOperationException e) {
// JDK24+ unconditionally throws this, so we don't need to check for JDK24+
// explicitly
securityManagerEnabled = false;
} catch (Throwable t) {
// don't care
}

return JAVA_SPEC_VER < 22 || securityManagerEnabled;
}

/**
* Look up the method handle for Subject#doAs(PrivilegedAction)
*
* This is only called if Subject#callAs() does not exist.
* If we can't fall back to doAs(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if unable to get the method handle
*/
private static MethodHandle lookupDoAs() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -82,6 +126,15 @@ private static MethodHandle lookupDoAs() {
}
}

/**
* Look up the method handle for Subject#doAs(PrivilegedExceptionAction)
*
* This is only called if Subject#callAs() does not exist.
* If we can't fall back to doAs(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if unable to get the method handle
*/
private static MethodHandle lookupDoAsThrowException() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -93,6 +146,15 @@ private static MethodHandle lookupDoAsThrowException() {
}
}

/**
* Look up the method handle for Subject#current().
*
* If Subject#current() is not present, fall back to returning
* a method handle for Subject.getSubject(AccessController.getContext())
*
* @return the method handle or null if it does not exist
* @throws ExceptionInInitializerError if neither current() nor the fallback is found
*/
private static MethodHandle lookupCurrent() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -112,6 +174,15 @@ private static MethodHandle lookupCurrent() {
}
}

/**
* Look up the method handle for Subject#getSubject(AccessControlContext)
*
* This is only called if Subject#current() does not exist.
* If we can't fall back to getSubject(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if cannot get the handle
*/
private static MethodHandle lookupGetSubject() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -124,6 +195,15 @@ private static MethodHandle lookupGetSubject() {
}
}

/**
* Look up the method handle for AccessController.getAccessControlContext()
*
* This is only called if Subject#current() does not exist.
* If we can't find this method, then we can't fall back which is hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if cannot get the handle
*/
private static MethodHandle lookupGetContext() {
try {
// Use reflection to work with Java versions that have and don't have
Expand Down Expand Up @@ -264,6 +344,13 @@ public static Subject current() {
}
}

/**
* Convert a Callable into a PrivilegedAction
*
* @param <T> return type
* @param callable to be converted
* @return PrivilegedAction wrapping the callable
*/
private static <T> PrivilegedAction<T> callableToPrivilegedAction(
Callable<T> callable) {
return () -> {
Expand All @@ -275,11 +362,25 @@ private static <T> PrivilegedAction<T> callableToPrivilegedAction(
};
}

/**
* Convert a PrivilegedExceptionAction into a Callable
*
* @param <T> return type
* @param action to be wrapped
* @return Callable wrapping the action
*/
private static <T> Callable<T> privilegedExceptionActionToCallable(
PrivilegedExceptionAction<T> action) {
return action::run;
}

/**
* Convert a PrivilegedAction into a Callable
*
* @param <T> return type
* @param action to be wrapped
* @return Callable wrapping the action
*/
private static <T> Callable<T> privilegedActionToCallable(
PrivilegedAction<T> action) {
return action::run;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void uncaughtException(Thread t, Throwable e) {
}

@Override
public void run() {
public void work() {
while (shouldRun) {
try {
loopUntilConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void tryStart() {
if (running.compareAndSet(null, current)) {
final Daemon daemon = new Daemon() {
@Override
public void run() {
public void work() {
for (; isRunning(this);) {
final long waitTime = checkCalls();
tryStop(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,74 @@

package org.apache.hadoop.util;

import java.security.PrivilegedAction;
import java.util.concurrent.ThreadFactory;

import javax.security.auth.Subject;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.authentication.util.SubjectUtil;

/** A thread that has called {@link Thread#setDaemon(boolean) } with true.*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
/**
* A thread that has called {@link Thread#setDaemon(boolean) } with true.
* <p>
* The runnable code must either be specified in the runnable parameter or in
* the overridden work() method.
* <p>
* See {@link org.apache.hadoop.util.concurrent.SubjectInheritingThread} for the Subject inheritance behavior this
* class adds.
*
*/
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
@InterfaceStability.Unstable
public class Daemon extends Thread {

Subject startSubject;

@Override
public final void start() {
if (!SubjectUtil.THREAD_INHERITS_SUBJECT) {
startSubject = SubjectUtil.current();
}
super.start();
}

/**
* Override this instead of run()
*/
public void work() {
if (runnable != null) {
runnable.run();
}
}

@Override
public final void run() {
if (!SubjectUtil.THREAD_INHERITS_SUBJECT) {
SubjectUtil.doAs(startSubject, new PrivilegedAction<Void>() {

@Override
public Void run() {
work();
return null;
}

});
} else {
work();
}
}

{
setDaemon(true); // always a daemon
setDaemon(true); // always a daemon
}

/**
* Provide a factory for named daemon threads,
* for use in ExecutorServices constructors
* Provide a factory for named daemon threads, for use in ExecutorServices
* constructors
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
public static class DaemonFactory extends Daemon implements ThreadFactory {

@Override
Expand All @@ -47,30 +96,33 @@ public Thread newThread(Runnable runnable) {
}

Runnable runnable = null;

/** Construct a daemon thread. */
public Daemon() {
super();
}

/**
* Construct a daemon thread.
*
* @param runnable runnable.
*/
public Daemon(Runnable runnable) {
super(runnable);
this.runnable = runnable;
this.setName(((Object)runnable).toString());
this.setName(((Object) runnable).toString());
}

/**
* Construct a daemon thread to be part of a specified thread group.
* @param group thread group.
*
* @param group thread group.
* @param runnable runnable.
*/
public Daemon(ThreadGroup group, Runnable runnable) {
super(group, runnable);
this.runnable = runnable;
this.setName(((Object)runnable).toString());
this.setName(((Object) runnable).toString());
}

public Runnable getRunnable() {
Expand Down
Loading
Loading