Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
albertogpz committed Feb 8, 2022
1 parent 77db403 commit 4ce6664
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ public interface GatewaySender {
int CONNECTION_RETRY_INTERVAL = Integer
.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "gateway-connection-retry-interval", 1000);

String DEFAULT_TYPE = "SerialGatewaySender";

/**
* The order policy. This enum is applicable only when concurrency-level is > 1.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class GatewaySenderAttributesImpl implements MutableGatewaySenderAttribut

private boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS;

private String type = GatewaySender.DEFAULT_TYPE;
private String type;

private boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE;

Expand Down Expand Up @@ -179,7 +179,7 @@ public void setGroupTransactionEvents(boolean groupTransEvents) {

public void setType(String type) {
this.type = type;
isParallel = type.equals("Parallel") ? true : false;
isParallel = type.contains("Parallel") ? true : false;
}

public void setForInternalUse(boolean forInternalUse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package org.apache.geode.internal.lang;

import java.util.Optional;
import static org.apache.geode.internal.lang.SystemProperty.getProductBooleanProperty;

import org.apache.geode.internal.cache.eviction.LRUListWithAsyncSorting;

Expand All @@ -26,9 +26,6 @@
*/
public class SystemPropertyHelper {

public static final String GEODE_PREFIX = "geode.";
public static final String GEMFIRE_PREFIX = "gemfire.";

/**
* When set to "true" enables asynchronous eviction algorithm (defaults to true). For more details
* see {@link LRUListWithAsyncSorting}.
Expand Down Expand Up @@ -109,98 +106,6 @@ public class SystemPropertyHelper {
*/
public static final String RE_AUTHENTICATE_WAIT_TIME = "reauthenticate.wait.time";

/**
* This method will try to look up "geode." and "gemfire." versions of the system property. It
* will check and prefer "geode." setting first, then try to check "gemfire." setting.
*
* @param name system property name set in Geode
* @return an Optional containing the Boolean value of the system property
*/
public static Optional<Boolean> getProductBooleanProperty(String name) {
String property = getProperty(name);
return property != null ? Optional.of(Boolean.parseBoolean(property)) : Optional.empty();
}

/**
* This method will try to look up "geode." and "gemfire." versions of the system property. It
* will check and prefer "geode." setting first, then try to check "gemfire." setting.
*
* @param name system property name set in Geode
* @return an Optional containing the Integer value of the system property
*/
public static Optional<Integer> getProductIntegerProperty(String name) {
Integer propertyValue = Integer.getInteger(GEODE_PREFIX + name);
if (propertyValue == null) {
propertyValue = Integer.getInteger(GEMFIRE_PREFIX + name);
}

if (propertyValue != null) {
return Optional.of(propertyValue);
} else {
return Optional.empty();
}
}

/**
* This method will try to look up "geode." and "gemfire." versions of the system property. It
* will check and prefer "geode." setting first, then try to check "gemfire." setting.
*
* @param name system property name set in Geode
* @return an Optional containing the Long value of the system property
*/
public static Optional<Long> getProductLongProperty(String name) {
Long propertyValue = Long.getLong(GEODE_PREFIX + name);
if (propertyValue == null) {
propertyValue = Long.getLong(GEMFIRE_PREFIX + name);
}

if (propertyValue != null) {
return Optional.of(propertyValue);
} else {
return Optional.empty();
}
}

/**
* This method will try to look up "geode." and "gemfire." versions of the system property. It
* will check and prefer "geode." setting first, then try to check "gemfire." setting.
*
* @param name system property name set in Geode
* @return the integer value of the system property if exits or the default value
*/
public static Integer getProductIntegerProperty(String name, int defaultValue) {
return getProductIntegerProperty(name).orElse(defaultValue);
}

public static Long getProductLongProperty(String name, long defaultValue) {
return getProductLongProperty(name).orElse(defaultValue);
}

/**
* This method will try to look up "geode." and "gemfire." versions of the system property. It
* will check and prefer "geode." setting first, then try to check "gemfire." setting.
*
* @param name system property name set in Geode
* @return an Optional containing the String value of the system property
*/
public static Optional<String> getProductStringProperty(String name) {
String property = getProperty(name);
return property != null ? Optional.of(property) : Optional.empty();
}

public static String getProperty(String name) {
String property = getGeodeProperty(name);
return property != null ? property : getGemfireProperty(name);
}

private static String getGeodeProperty(String name) {
return System.getProperty(GEODE_PREFIX + name);
}

private static String getGemfireProperty(String name) {
return System.getProperty(GEMFIRE_PREFIX + name);
}

/**
* As of Geode 1.4.0, a region set operation will be in a transaction even if it is the first
* operation in the transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ private GatewaySender createGatewaySender(Cache cache,

String type = gatewaySenderCreateArgs.getType();
if (type != null) {
gateway.setType(gatewaySenderCreateArgs.getType());
gateway.setParallel(gatewaySenderCreateArgs.getType().contains("Parallel"));
gateway.setType(type);
gateway.setParallel(type.contains("Parallel"));
} else {
Boolean isParallel = gatewaySenderCreateArgs.isParallel();
if (isParallel != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
@RunWith(GeodeParamsRunner.class)
public class TxGroupingPartitionedRegionDUnitTest extends TxGroupingBaseDUnitTest {
@Test
@Parameters({"TxGroupParallelGatewaySender", "TxGroupSerialGatewaySender"})
@Parameters({"TxGroupingParallelGatewaySender", "TxGroupingSerialGatewaySender"})
public void testPartitionedRegionPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions(
String type)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,15 @@ public void testParallelPropagationHAWithGroupTransactionEvents() throws Excepti
assertThat(londonServerStats.get(0)).isEqualTo(0);

// eventsReceived
// We may see two retried events (as transactions are made of 2 events) on all members due to
// We may see 4 retried events (as transactions are made of 4 events) on all members due to
// the kill
assertThat(londonServerStats.get(1)).isLessThanOrEqualTo((entries + 2) * redundantCopies);
assertThat(londonServerStats.get(1))
.isLessThanOrEqualTo((entries + putsPerTransaction) * redundantCopies);
assertThat(londonServerStats.get(1)).isGreaterThanOrEqualTo(entries * redundantCopies);

// queuedEvents
assertThat(londonServerStats.get(2)).isLessThanOrEqualTo((entries + 2) * redundantCopies);
assertThat(londonServerStats.get(2))
.isLessThanOrEqualTo((entries + putsPerTransaction) * redundantCopies);
assertThat(londonServerStats.get(2)).isGreaterThanOrEqualTo(entries * redundantCopies);

// batches redistributed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.internal.txgrouping.serial.TxGroupingSerialGatewaySenderImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;

public class WanTxGroupingConfigurationJUnitTest {
Expand Down Expand Up @@ -63,9 +64,8 @@ public void test_ValidateSerialGatewaySenderGroupTransactionEventsAttributeSetTo
public void test_create_SerialGatewaySender_ThrowsException_when_GroupTransactionEvents_isTrue_and_DispatcherThreads_is_greaterThanOne() {
cache = new CacheFactory().set(MCAST_PORT, "0").create();
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setParallel(false);
fact.setDispatcherThreads(2);
fact.setGroupTransactionEvents(true);
fact.setType(TxGroupingSerialGatewaySenderImpl.TYPE);
assertThatThrownBy(() -> fact.create("NYSender", 2))
.isInstanceOf(GatewaySenderException.class)
.hasMessageContaining(
Expand All @@ -77,7 +77,7 @@ public void test_create_GatewaySender_ThrowsException_when_GroupTransactionEvent
cache = new CacheFactory().set(MCAST_PORT, "0").create();
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setBatchConflationEnabled(true);
fact.setGroupTransactionEvents(true);
fact.setType(TxGroupingSerialGatewaySenderImpl.TYPE);
assertThatThrownBy(() -> fact.create("NYSender", 2))
.isInstanceOf(GatewaySenderException.class)
.hasMessageContaining(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.internal.lang.SystemProperty;
import org.apache.geode.util.internal.GeodeGlossary;

public class TxGroupingGatewaySenderProperties implements TxGroupingGatewaySender {
Expand Down Expand Up @@ -52,7 +52,7 @@ public class TxGroupingGatewaySenderProperties implements TxGroupingGatewaySende
* gateway sender queue when group-transaction-events is true.
*/
public static final int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS =
SystemPropertyHelper.getProductIntegerProperty(
SystemProperty.getProductIntegerProperty(
GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS_PROPERTY).orElse(1);

private AtomicInteger retriesToGetTransactionEventsFromQueue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.wan.internal.serial.RemoteSerialGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;

Expand All @@ -31,11 +34,10 @@ public TxGroupingRemoteSerialGatewaySenderEventProcessor(
super(sender, id, threadsMonitoring, cleanQueues);
}

// @Override
// protected @NotNull RegionQueue createRegionQueue(
// final @NotNull AbstractGatewaySender sender, final @NotNull String regionName,
// final @NotNull CacheListener<?, ?> listener, final boolean cleanQueues) {
// return new TxGroupingSerialGatewaySenderQueue((TxGroupingSerialGatewaySenderImpl) sender,
// regionName, listener, cleanQueues);
// }
@Override
protected @NotNull RegionQueue createRegionQueue(
final @NotNull AbstractGatewaySender sender, final @NotNull String regionName,
final @NotNull CacheListener<Long, AsyncEvent<?, ?>> listener, final boolean cleanQueues) {
return new TxGroupingSerialGatewaySenderQueue(sender, regionName, listener, cleanQueues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
Expand Down Expand Up @@ -2609,48 +2608,6 @@ public static void doTxPuts(String regionName) {
mgr.commit();
}

public static void doTxPutsWithRetryIfError(String regionName, final long putsPerTransaction,
final long transactions, long offset) {
Region<Object, Object> r = cache.getRegion(Region.SEPARATOR + regionName);

long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100);
long j = 0;
CacheTransactionManager mgr = cache.getCacheTransactionManager();
for (int i = 0; i < transactions; i++) {
boolean done = false;
do {
try {
mgr.begin();
for (j = 0; j < putsPerTransaction; j++) {
long key = keyOffset + ((j + (10 * i)) * 100);
String value = "Value_" + key;
r.put(key, value);
}
mgr.commit();
done = true;
} catch (TransactionException e) {
logger.info("Something went wrong with transaction [{},{}]. Retrying. Error: {}", i, j,
e.getMessage());
e.printStackTrace();
} catch (IllegalStateException e1) {
logger.info("Something went wrong with transaction [{},{}]. Retrying. Error: {}", i, j,
e1.getMessage());
e1.printStackTrace();
try {
mgr.rollback();
logger.info("Rolled back transaction [{},{}]. Retrying. Error: {}", i, j,
e1.getMessage());
} catch (Exception e2) {
logger.info(
"Something went wrong when rolling back transaction [{},{}]. Retrying transaction. Error: {}",
i, j, e2.getMessage());
e2.printStackTrace();
}
}
} while (!done);
}
}

public static void doNextPuts(String regionName, int start, int numPuts) {
IgnoredException exp =
addIgnoredException(CacheClosedException.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.cache30.MyGatewayEventFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter1;
import org.apache.geode.cache30.MyGatewayTransportFilter2;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void test_GatewaySender_without_Locator() {
cache = new CacheFactory().set(MCAST_PORT, "0").create();

GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setParallel(true);
fact.setType(ParallelGatewaySenderImpl.TYPE);
GatewaySender sender1 = fact.create("NYSender", 2);
sender1.start();
fail("Expected IllegalStateException but not thrown");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
Expand Down Expand Up @@ -220,6 +222,13 @@ public GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
public @NotNull GatewaySender create(final @NotNull String id, final int remoteDSId) {
attrs.setId(id);
attrs.setRemoteDs(remoteDSId);
if (attrs.getType() == null) {
if (attrs.isParallel()) {
attrs.setType(ParallelGatewaySenderImpl.TYPE);
} else {
attrs.setType(SerialGatewaySenderImpl.TYPE);
}
}

validate(cache, attrs);

Expand Down

0 comments on commit 4ce6664

Please sign in to comment.