Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(java): ThreadPoolFury and ThreadLocalFury concurrency security issue #1525

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.fury.Fury;
import org.apache.fury.logging.Logger;
import org.apache.fury.logging.LoggerFactory;

/** A thread-safe object pool of {@link Fury}. */
@ThreadSafe
public class ClassLoaderFuryPooled {

private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderFuryPooled.class);
Expand Down Expand Up @@ -122,6 +124,20 @@ private void addFury() {
}

void setFactoryCallback(Consumer<Fury> factoryCallback) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should not be called concurently, users should invoke regitster and other fury config methods when creating ThreadSafeFury. It's not necessary to add a lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, all our register and config functions should be called when ThreadSafeFury is being created, but the purpose of adding the processCallback function should be contrary to this design. The processCallback interface allows register or config operations to be performed after created ThreadSafeFury.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadSafeFury safeFury = Fury.builder()
    .requireClassRegistration(true)
    .buildThreadSafeFury();
safeFury.register(A.class);
safeFury.register(B.class);

// Thread A
new Thread(() -> {
  safeFury.serialize(new A());
}).start();

// Thread B
new Thread(() -> {
  // processCallback(fury -> fury.register(clz));
  safeFury.register(C.class);
  safeFury.serialize(new C());
}).start();

As shown in the above Demo, how can we avoid safeFury.register(C.class) being called in Thread B.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should add code to avoid this, such as capture Thread, and check thread in later call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a feasible approach, but I think the methods that implement the ThreadSafeFury interface should be thread-safe, and all methods in ThreadSafeFury should be allowed to be called in any thread.

lock.lock();
this.factoryCallback = factoryCallback;
lock.unlock();
}

void traversalAllFury(Consumer<Fury> callback) {
try {
lock.lock();
allFury.keySet().forEach(callback);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected void processCallback(Consumer<Fury> callback) {
factoryCallback = factoryCallback.andThen(callback);
for (ClassLoaderFuryPooled furyPooled :
furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) {
furyPooled.allFury.keySet().forEach(callback);
furyPooled.traversalAllFury(callback);
furyPooled.setFactoryCallback(factoryCallback);
}
}
Expand Down
144 changes: 93 additions & 51 deletions java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import java.util.HashSet;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.fury.Fury;
import org.apache.fury.annotation.Internal;
import org.apache.fury.logging.Logger;
import org.apache.fury.logging.LoggerFactory;

/**
* An util to bind {@link Fury} with {@link ClassLoader}. If {@link ClassLoader} are changed, the
Expand All @@ -37,6 +41,7 @@
*/
@Internal
public final class LoaderBinding {
private static final Logger LOG = LoggerFactory.getLogger(LoaderBinding.class);
private final Function<ClassLoader, Fury> furyFactory;
// `WeakHashMap` won't work here too, since `Fury` hold classes which reference `ClassLoader`,
// which cause
Expand All @@ -46,6 +51,7 @@ public final class LoaderBinding {
private Consumer<Fury> bindingCallback = f -> {};
private ClassLoader loader;
private Fury fury;
private final Lock lock = new ReentrantLock();

public LoaderBinding(Function<ClassLoader, Fury> furyFactory) {
this.furyFactory = furyFactory;
Expand All @@ -56,29 +62,37 @@ public Fury get() {
}

public void visitAllFury(Consumer<Fury> consumer) {
if (furySoftMap.isEmpty()) {
for (Fury f : furyMap.values()) {
consumer.accept(f);
}
} else if (furyMap.isEmpty()) {
for (SoftReference<Fury> ref : furySoftMap.values()) {
Fury f = ref.get();
if (f != null) {
try {
lock.lock();
if (furySoftMap.isEmpty()) {
for (Fury f : furyMap.values()) {
consumer.accept(f);
}
}
} else {
Set<Fury> furySet = new HashSet<>(furyMap.size());
Collections.addAll(furyMap.values());
for (SoftReference<Fury> ref : furySoftMap.values()) {
Fury f = ref.get();
if (f != null) {
furySet.add(f);
} else if (furyMap.isEmpty()) {
for (SoftReference<Fury> ref : furySoftMap.values()) {
Fury f = ref.get();
if (f != null) {
consumer.accept(f);
}
}
} else {
Set<Fury> furySet = new HashSet<>(furyMap.size());
Collections.addAll(furyMap.values());
for (SoftReference<Fury> ref : furySoftMap.values()) {
Fury f = ref.get();
if (f != null) {
furySet.add(f);
}
}
for (Fury f : furySet) {
consumer.accept(f);
}
}
for (Fury f : furySet) {
consumer.accept(f);
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -114,36 +128,44 @@ public void setClassLoader(ClassLoader classLoader, StagingType stagingType) {
classLoader = Fury.class.getClassLoader();
}
this.loader = classLoader;
switch (stagingType) {
case NO_STAGING:
fury = furyFactory.apply(classLoader);
bindingCallback.accept(fury);
break;
case SOFT_STAGING:
{
SoftReference<Fury> furySoftReference = furySoftMap.get(classLoader);
Fury fury = furySoftReference == null ? null : furySoftReference.get();
if (fury == null) {
fury = furyFactory.apply(classLoader);
bindingCallback.accept(fury);
furySoftMap.put(classLoader, new SoftReference<>(fury));
this.fury = fury;
}
try {
lock.lock();
switch (stagingType) {
case NO_STAGING:
fury = furyFactory.apply(classLoader);
bindingCallback.accept(fury);
break;
}
case STRONG_STAGING:
{
Fury fury = furyMap.get(classLoader);
if (fury == null) {
fury = furyFactory.apply(classLoader);
bindingCallback.accept(fury);
furyMap.put(classLoader, fury);
this.fury = fury;
case SOFT_STAGING:
{
SoftReference<Fury> furySoftReference = furySoftMap.get(classLoader);
Fury fury = furySoftReference == null ? null : furySoftReference.get();
if (fury == null) {
fury = furyFactory.apply(classLoader);
bindingCallback.accept(fury);
furySoftMap.put(classLoader, new SoftReference<>(fury));
this.fury = fury;
}
break;
}
break;
}
default:
throw new IllegalArgumentException();
case STRONG_STAGING:
{
Fury fury = furyMap.get(classLoader);
if (fury == null) {
fury = furyFactory.apply(classLoader);
bindingCallback.accept(fury);
furyMap.put(classLoader, fury);
this.fury = fury;
}
break;
}
default:
throw new IllegalArgumentException();
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
Expand All @@ -155,8 +177,10 @@ public void setClassLoader(ClassLoader classLoader, StagingType stagingType) {
* referenced by other objects.
*/
public void clearClassLoader(ClassLoader classLoader) {
lock.lock();
furyMap.remove(classLoader);
SoftReference<Fury> softReference = furySoftMap.remove(classLoader);
lock.unlock();
if (softReference != null) {
softReference.clear();
}
Expand All @@ -167,18 +191,36 @@ public void clearClassLoader(ClassLoader classLoader) {
}

public void register(Class<?> clz) {
furyMap.values().forEach(fury -> fury.register(clz));
bindingCallback = bindingCallback.andThen(fury -> fury.register(clz));
try {
lock.lock();
furyMap.values().forEach(fury -> fury.register(clz));
bindingCallback = bindingCallback.andThen(fury -> fury.register(clz));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

public void register(Class<?> clz, int id) {
Preconditions.checkArgument(id < Short.MAX_VALUE);
furyMap.values().forEach(fury -> fury.register(clz, (short) id));
bindingCallback = bindingCallback.andThen(fury -> fury.register(clz, (short) id));
try {
lock.lock();
furyMap.values().forEach(fury -> fury.register(clz, (short) id));
bindingCallback = bindingCallback.andThen(fury -> fury.register(clz, (short) id));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

public void setBindingCallback(Consumer<Fury> bindingCallback) {
lock.lock();
this.bindingCallback = bindingCallback;
lock.unlock();
}

public enum StagingType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,5 @@ Args=--initialize-at-build-time=org.apache.fury.memory.MemoryBuffer,\
org.apache.fury.memory.MemoryUtils,\
org.apache.fury.type.DescriptorGrouper,\
sun.misc.Unsafe,\
com.google.common.collect.Platform
com.google.common.collect.Platform,\
org.apache.fury.util.LoaderBinding
Loading