Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions dev-support/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pipeline {
environment {
YETUS='yetus'
// Branch or tag name. Yetus release tags are 'rel/X.Y.Z'
YETUS_VERSION='rel/0.14.0'
YETUS_VERSION='a7d29a6a72750a0c5c39512f33945e773e69303e'
}

parameters {
Expand All @@ -71,7 +71,7 @@ pipeline {
checkout([
$class: 'GitSCM',
branches: [[name: "${env.YETUS_VERSION}"]],
userRemoteConfigs: [[ url: 'https://github.com/apache/yetus.git']]]
userRemoteConfigs: [[ url: 'https://github.com/ayushtkn/yetus.git']]]
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ public final class SubjectUtil {
HAS_CALL_AS ? null : lookupDoAsThrowException();
private static final MethodHandle CURRENT = lookupCurrent();

// copied from org.apache.hadoop.util.Shell to break circular dependency
// "1.8"->8, "9"->9, "10"->10
private static final int JAVA_SPEC_VER = Math.max(8,
Integer.parseInt(System.getProperty("java.specification.version").split("\\.")[0]));

public static final boolean THREAD_INHERITS_SUBJECT = checkThreadInheritsSubject();

/**
* Try to return the method handle for Subject#callAs()
*
* @return the method handle, or null if the Java version does not have it
*/
private static MethodHandle lookupCallAs() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -71,6 +83,38 @@ private static MethodHandle lookupCallAs() {
}
}

/**
* Determine whether we need to explicitly propagate the Subject into new Threads.
*
* @return true if new Threads inherit the Subject from the parent
*/
private static boolean checkThreadInheritsSubject() {

boolean securityManagerEnabled = true;
try {
// TODO this needs SecurityManager to compile, use reflection to look it up instead
SecurityManager sm = System.getSecurityManager();
System.setSecurityManager(sm);
} catch (UnsupportedOperationException e) {
// JDK24+ unconditionally throws this, so we don't need to check for JDK24+
// explicitly
securityManagerEnabled = false;
} catch (Throwable t) {
// don't care
}

return JAVA_SPEC_VER < 22 || securityManagerEnabled;
}

/**
* Look up the method handle for Subject#doAs(PrivilegedAction)
*
* This is only called if Subject#callAs() does not exist.
* If we can't fall back to doAs(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if unable to get the method handle
*/
private static MethodHandle lookupDoAs() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -82,6 +126,15 @@ private static MethodHandle lookupDoAs() {
}
}

/**
* Look up the method handle for Subject#doAs(PrivilegedExceptionAction)
*
* This is only called if Subject#callAs() does not exist.
* If we can't fall back to doAs(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if unable to get the method handle
*/
private static MethodHandle lookupDoAsThrowException() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -93,6 +146,15 @@ private static MethodHandle lookupDoAsThrowException() {
}
}

/**
* Look up the method handle for Subject#current().
*
* If Subject#current() is not present, fall back to returning
* a method handle for Subject.getSubject(AccessController.getContext())
*
* @return the method handle or null if it does not exist
* @throws ExceptionInInitializerError if neither current() nor the fallback is found
*/
private static MethodHandle lookupCurrent() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -112,6 +174,15 @@ private static MethodHandle lookupCurrent() {
}
}

/**
* Look up the method handle for Subject#getSubject(AccessControlContext)
*
* This is only called if Subject#current() does not exist.
* If we can't fall back to getSubject(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if cannot get the handle
*/
private static MethodHandle lookupGetSubject() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -124,6 +195,15 @@ private static MethodHandle lookupGetSubject() {
}
}

/**
* Look up the method handle for AccessController.getAccessControlContext()
*
* This is only called if Subject#current() does not exist.
* If we can't find this method, then we can't fall back which is hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if cannot get the handle
*/
private static MethodHandle lookupGetContext() {
try {
// Use reflection to work with Java versions that have and don't have
Expand Down Expand Up @@ -264,6 +344,13 @@ public static Subject current() {
}
}

/**
* Convert a Callable into a PrivilegedAction
*
* @param <T> return type
* @param callable to be converted
* @return PrivilegedAction wrapping the callable
*/
private static <T> PrivilegedAction<T> callableToPrivilegedAction(
Callable<T> callable) {
return () -> {
Expand All @@ -275,11 +362,25 @@ private static <T> PrivilegedAction<T> callableToPrivilegedAction(
};
}

/**
* Convert a PrivilegedExceptionAction into a Callable
*
* @param <T> return type
* @param action to be wrapped
* @return Callable wrapping the action
*/
private static <T> Callable<T> privilegedExceptionActionToCallable(
PrivilegedExceptionAction<T> action) {
return action::run;
}

/**
* Convert a PrivilegedAction into a Callable
*
* @param <T> return type
* @param action to be wrapped
* @return Callable wrapping the action
*/
private static <T> Callable<T> privilegedActionToCallable(
PrivilegedAction<T> action) {
return action::run;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -105,15 +106,16 @@ public Collection<PropertyChange> getChangedProperties(
/**
* A background thread to apply configuration changes.
*/
private static class ReconfigurationThread extends Thread {
private static class ReconfigurationThread extends SubjectInheritingThread {
private ReconfigurableBase parent;

ReconfigurationThread(ReconfigurableBase base) {
super();
this.parent = base;
}

// See {@link ReconfigurationServlet#applyChanges}
public void run() {
public void work() {
LOG.info("Starting reconfiguration task.");
final Configuration oldConf = parent.getConf();
final Configuration newConf = parent.getNewConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -107,7 +108,7 @@ void init() {
*/
private void initRefreshThread(boolean runImmediately) {
if (refreshInterval > 0) {
refreshUsed = new Thread(new RefreshThread(this, runImmediately),
refreshUsed = new SubjectInheritingThread(new RefreshThread(this, runImmediately),
"refreshUsed-" + dirPath);
refreshUsed.setDaemon(true);
refreshUsed.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,7 +39,7 @@
*/
@InterfaceAudience.Private
public class DelegationTokenRenewer
extends Thread {
extends SubjectInheritingThread {
private static final Logger LOG = LoggerFactory
.getLogger(DelegationTokenRenewer.class);

Expand Down Expand Up @@ -263,7 +264,7 @@ public <T extends FileSystem & Renewable> void removeRenewAction(
}

@Override
public void run() {
public void work() {
for(;;) {
RenewAction<?> action = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -4087,7 +4088,7 @@ private interface StatisticsAggregator<T> {
static {
STATS_DATA_REF_QUEUE = new ReferenceQueue<>();
// start a single daemon cleaner thread
STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
STATS_DATA_CLEANER = new SubjectInheritingThread(new StatisticsDataReferenceCleaner());
STATS_DATA_CLEANER.
setName(StatisticsDataReferenceCleaner.class.getName());
STATS_DATA_CLEANER.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void uncaughtException(Thread t, Throwable e) {
}

@Override
public void run() {
public void work() {
while (shouldRun) {
try {
loopUntilConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ha;

import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.slf4j.Logger;

import java.io.BufferedReader;
Expand Down Expand Up @@ -50,7 +51,7 @@ enum StreamType {
this.stream = stream;
this.type = type;

thread = new Thread(new Runnable() {
thread = new SubjectInheritingThread(new Runnable() {
@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void tryStart() {
if (running.compareAndSet(null, current)) {
final Daemon daemon = new Daemon() {
@Override
public void run() {
public void work() {
for (; isRunning(this);) {
final long waitTime = checkCalls();
tryStop(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.apache.hadoop.tracing.Span;
import org.apache.hadoop.tracing.Tracer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -407,7 +408,7 @@ public synchronized void setRpcResponse(Writable rpcResponse) {
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
private class Connection extends SubjectInheritingThread {
private InetSocketAddress server; // server ip:port
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
Expand Down Expand Up @@ -448,7 +449,7 @@ private class Connection extends Thread {
Consumer<Connection> removeMethod) {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
this.rpcRequestThread = new Thread(new RpcRequestSender(),
this.rpcRequestThread = new SubjectInheritingThread(new RpcRequestSender(),
"IPC Parameter Sending Thread for " + remoteId);
this.rpcRequestThread.setDaemon(true);

Expand Down Expand Up @@ -1126,7 +1127,7 @@ private synchronized void sendPing() throws IOException {
}

@Override
public void run() {
public void work() {
try {
// Don't start the ipc parameter sending thread until we start this
// thread, because the shutdown logic only gets triggered if this
Expand Down
Loading