Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT DO NOT REVIEW] Make ObserverHolder thread safe by having a thread local observer member #6882

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.geode.cache.query.internal;

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -60,10 +61,12 @@ public class QueryTraceJUnitTest {
@Before
public void setUp() throws Exception {
CacheUtils.startCache();
DefaultQuery.testHook = new BeforeQueryExecutionHook();
}

@After
public void tearDown() throws Exception {
DefaultQuery.testHook = null;
CacheUtils.closeCache();
}

Expand Down Expand Up @@ -104,7 +107,11 @@ public void testTraceOnPartitionedRegionWithTracePrefix() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -141,7 +148,11 @@ public void testTraceOnLocalRegionWithTracePrefix() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -183,7 +194,11 @@ public void testNegTraceOnPartitionedRegionWithTracePrefix() throws Exception {
assertFalse(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should not have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -223,7 +238,11 @@ public void testNegTraceOnLocalRegionWithTracePrefix() throws Exception {
assertFalse(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should not have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -262,7 +281,11 @@ public void testTraceOnPartitionedRegionWithTracePrefixNoComments() throws Excep
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -296,8 +319,11 @@ public void testTraceOnLocalRegionWithTracePrefixNoComments() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);
// The query should return all elements in region.

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
}
Expand Down Expand Up @@ -331,7 +357,11 @@ public void testTraceOnPartitionedRegionWithSmallTracePrefixNoComments() throws
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -366,7 +396,11 @@ public void testTraceOnLocalRegionWithSmallTracePrefixNoComments() throws Except
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -438,4 +472,21 @@ public void testQueryFailLocalRegionWithSmallTracePrefixNoSpace() throws Excepti
}
}

private class BeforeQueryExecutionHook implements DefaultQuery.TestHook {
private QueryObserver observer = null;

@Override
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
final ExecutionContext executionContext) {
switch (spot) {
case BEFORE_QUERY_EXECUTION:
observer = QueryObserverHolder.getInstance();
break;
}
}

public QueryObserver getObserver() {
return observer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.apache.geode.cache.query.internal;


import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.internal.MakeNotStatic;

Expand Down Expand Up @@ -45,35 +46,48 @@ public class QueryObserverHolder {
*/
@Immutable
private static final QueryObserver NO_OBSERVER = new QueryObserverAdapter();
/**
* The threadlocal current observer which will be notified of all query events.
*/
private static final ThreadLocal<QueryObserver> _instance = new ThreadLocal<>();

/**
* The current observer which will be notified of all query events.
*/
@MakeNotStatic
private static QueryObserver _instance = NO_OBSERVER;
private static volatile QueryObserver _globalInstance = NO_OBSERVER;

/**
* Set the given observer to be notified of query events. Returns the current observer.
*/
public static QueryObserver setInstance(QueryObserver observer) {
Support.assertArg(observer != null, "setInstance expects a non-null argument!");
QueryObserver oldObserver = _instance;
_instance = observer;
QueryObserver oldObserver = _globalInstance;
_instance.set(observer);
_globalInstance = observer;
return oldObserver;
}

public static boolean hasObserver() {
return _instance != NO_OBSERVER;
if (_instance.get() != null) {
return _instance.get() != NO_OBSERVER;
}
return _globalInstance != NO_OBSERVER;
}

/** Return the current QueryObserver instance */
public static QueryObserver getInstance() {
return _instance;
if (_instance.get() == null) {
_instance.set(_globalInstance);
}
return _instance.get();
}

/**
* Only for test purposes.
*/
public static void reset() {
_instance = NO_OBSERVER;
_instance.set(NO_OBSERVER);
_globalInstance = NO_OBSERVER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,13 @@ private DataCommandResult select(InternalCache cache, Object principal, String q
Query query = qs.newQuery(queryString);
DefaultQuery tracedQuery = (DefaultQuery) query;
WrappedIndexTrackingQueryObserver queryObserver = null;
QueryObserver origQueryObserver = null;
String queryVerboseMsg = null;
long startTime = -1;
if (tracedQuery.isTraced()) {
startTime = NanoTimer.getTime();
queryObserver = new WrappedIndexTrackingQueryObserver();
QueryObserverHolder.setInstance(queryObserver);
origQueryObserver = QueryObserverHolder.setInstance(queryObserver);
}
List<SelectResultRow> list = new ArrayList<>();

Expand All @@ -237,6 +238,9 @@ private DataCommandResult select(InternalCache cache, Object principal, String q
if (queryObserver != null) {
QueryObserverHolder.reset();
}
if (tracedQuery.isTraced()) {
QueryObserverHolder.setInstance(origQueryObserver);
}
}
}

Expand Down