Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip/wan tx grouping module 1 #7083

Draft
wants to merge 20 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
042013d
GEODE-6588: Cleanup GatewaySenderAttributes
jake-at-work Sep 30, 2021
2457963
GEODE-6588: Cleanup AbstractGatewaySender
jake-at-work Sep 30, 2021
8c2d23e
GEODE-6588: Cleanup GatewaySenderFactoryImpl
jake-at-work Oct 1, 2021
ae74a42
MODULE: Extract interface for GatewaySenderAttributes.
jake-at-work Sep 30, 2021
7a3d45c
MODULE: Split out some factories.
jake-at-work Oct 13, 2021
35b072e
MODULE: Extract out TX grouping factories.
jake-at-work Oct 13, 2021
f7c0d21
MODULE: Create TX grouping sender impls.
jake-at-work Oct 13, 2021
477a823
MODULE: Remove AbstractGatwaySender.mustGroupTransactionEvents()
jake-at-work Oct 13, 2021
0be467b
MODULE: Stupid simple module.
jake-at-work Oct 13, 2021
07884e8
MODULE: ServiceLoader
jake-at-work Oct 14, 2021
5aa6bc7
MODULE: Move tests
jake-at-work Oct 14, 2021
1959233
MODULE: Move remaining tx-grouping functionality and DUnit tests
albertogpz Nov 5, 2021
74f9274
MODULE: Fix some GatewaySenderFactoryImpl tests
albertogpz Nov 5, 2021
a7416a3
MODULE: Do not allow changing group-transaction-events
albertogpz Nov 5, 2021
0c3d814
MODULE: Add gfsh create tx-grouping gw-sender cases.
albertogpz Nov 5, 2021
43194d0
Updated with Kirk's first review comments
albertogpz Dec 2, 2021
cbe859d
remove() refactored as suggested on jbarrett's review
albertogpz Dec 13, 2021
8c2f77d
Use --type=TxGroupingParallelGatewaySender... as suggested in jbarret…
albertogpz Dec 14, 2021
77db403
Added exceptions to public API changes
albertogpz Dec 14, 2021
4ce6664
Fix tests
albertogpz Dec 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions boms/geode-all-bom/src/test/resources/expected-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,12 @@
<version>${version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>geode-wan-txgrouping</artifactId>
<version>${version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>geode-web</artifactId>
Expand Down
8 changes: 7 additions & 1 deletion buildSrc/src/main/resources/japicmp_exceptions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,11 @@
"Class org.apache.geode.management.builder.GeodeClusterManagementServiceBuilder": "Moved internal class to fix split packages between geode-core and geode-management",
"Class org.apache.geode.management.api.ClusterManagementOperation": "Fixed missing @Experimental annotation",
"Method org.apache.geode.management.api.ClusterManagementOperation.getEndpoint()": "Fixed missing @Experimental annotation",
"Method org.apache.geode.management.api.ClusterManagementOperation.getOperator()": "Fixed missing @Experimental annotation"
"Method org.apache.geode.management.api.ClusterManagementOperation.getOperator()": "Fixed missing @Experimental annotation",
"Class org.apache.geode.cache.wan.GatewaySender":"Added to support new types of gatewaysenders",
"Method org.apache.geode.cache.wan.GatewaySender.getType()":"Added to support new types of gatewaysenders",
"Class org.apache.geode.cache.wan.GatewaySenderFactory":"Added to support new types of gatewaysenders",
"Method org.apache.geode.cache.wan.GatewaySenderFactory.setType(java.lang.String)":"Added to support new types of gatewaysenders",
"Class org.apache.geode.management.GatewaySenderMXBean":"Added to support new types of gatewaysenders",
"Method org.apache.geode.management.GatewaySenderMXBean.getType()":"Added to support new types of gatewaysenders"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.apache.geode.cache.CacheFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.test.junit.categories.AEQTest;

@Category({AEQTest.class})
Expand Down Expand Up @@ -55,7 +55,7 @@ public void tearDown() {
@Test
public void testStopClearsStats() {

GatewaySenderAttributes attrs = new GatewaySenderAttributes();
GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl();
String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
attrs.setId(tempId);
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
Expand All @@ -81,7 +81,7 @@ public void testStopClearsStats() {

@Test
public void testStopStart() {
GatewaySenderAttributes attrs = new GatewaySenderAttributes();
GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl();
String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
attrs.setId(tempId);
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.xmlcache.AsyncEventQueueCreation;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
Expand All @@ -53,13 +53,14 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
* Used internally to pass the attributes from this factory to the real GatewaySender it is
* creating.
*/
private final GatewaySenderAttributes gatewaySenderAttributes;
private final GatewaySenderAttributesImpl gatewaySenderAttributes;

public AsyncEventQueueFactoryImpl(InternalCache cache) {
this(cache, new GatewaySenderAttributes(), DEFAULT_BATCH_TIME_INTERVAL);
this(cache, new GatewaySenderAttributesImpl(), DEFAULT_BATCH_TIME_INTERVAL);
}

AsyncEventQueueFactoryImpl(InternalCache cache, GatewaySenderAttributes gatewaySenderAttributes,
AsyncEventQueueFactoryImpl(InternalCache cache,
GatewaySenderAttributesImpl gatewaySenderAttributes,
int batchTimeInterval) {
this.cache = cache;
this.gatewaySenderAttributes = gatewaySenderAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
clonedEvent.setEventId(newEventId);
}

@Override
public String getType() {
return "ParallelAsyncEventQueue";
}

private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
Expand All @@ -218,4 +223,5 @@ private ThreadsMonitoring getThreadMonitorObj() {
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
clonedEvent.setEventId(newEventId);
}

@Override
public String getType() {
return "SerialAsyncEventQueue";
}

private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2588,6 +2588,7 @@ public void setOverflowDirectory(String value) {
* &lt;attribute name="id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" />
* &lt;attribute name="remote-distributed-system-id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" />
* &lt;attribute name="parallel" type="{http://www.w3.org/2001/XMLSchema}boolean" />
* &lt;attribute name="type" type="{http://www.w3.org/2001/XMLSchema}string" />
* &lt;attribute name="manual-start" type="{http://www.w3.org/2001/XMLSchema}boolean" />
* &lt;attribute name="socket-buffer-size" type="{http://www.w3.org/2001/XMLSchema}string" />
* &lt;attribute name="socket-read-timeout" type="{http://www.w3.org/2001/XMLSchema}string" />
Expand Down Expand Up @@ -2628,6 +2629,8 @@ public static class GatewaySender {
protected String remoteDistributedSystemId;
@XmlAttribute(name = "parallel")
protected Boolean parallel;
@XmlAttribute(name = "type")
protected String type;
@XmlAttribute(name = "manual-start")
protected Boolean manualStart;
@XmlAttribute(name = "socket-buffer-size")
Expand Down Expand Up @@ -2820,6 +2823,29 @@ public void setParallel(Boolean value) {
parallel = value;
}

/**
* Gets the value of the parallel property.
*
* possible object is
* {@link String }
*
*/

public String getType() {
return type;
}

/**
* Sets the value of the type property.
*
* allowed object is
* {@link String }
*
*/
public void setType(String value) {
this.type = value;
}

/**
* Gets the value of the manualStart property.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import java.util.List;

import org.apache.geode.annotations.Immutable;
import org.apache.geode.internal.lang.SystemProperty;
import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.util.internal.GeodeGlossary;

/**
Expand Down Expand Up @@ -155,32 +153,6 @@ public interface GatewaySender {
int CONNECTION_RETRY_INTERVAL = Integer
.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "gateway-connection-retry-interval", 1000);

/**
* Number of times to retry to get events for a transaction from the gateway sender queue when
* group-transaction-events is set to true.
* When group-transaction-events is set to true and a batch ready to be sent does not contain
* all the events for all the transactions to which the events belong, the gateway sender will try
* to get the missing events of the transactions from the queue to add them to the batch
* before sending it.
* If the missing events are not in the queue when the gateway sender tries to get them
* it will retry for a maximum of times equal to the value set in this parameter before
* delivering the batch without the missing events and logging an error.
* Setting this parameter to a very low value could cause that under heavy load and
* group-transaction-events set to true, batches are sent with incomplete transactions. Setting it
* to a high value could cause that under heavy load and group-transaction-events set to true,
* batches are held for some time before being sent.
*/
int GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES =
Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "get-transaction-events-from-queue-retries",
10);
/**
* Milliseconds to wait before retrying to get events for a transaction from the
* gateway sender queue when group-transaction-events is true.
*/
int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS =
SystemProperty.getProductIntegerProperty(
SystemPropertyHelper.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS).orElse(1);

/**
* The order policy. This enum is applicable only when concurrency-level is > 1.
*
Expand Down Expand Up @@ -418,10 +390,13 @@ enum OrderPolicy {
*/
boolean isParallel();

String getType();

/**
* Returns groupTransactionEvents boolean property for this GatewaySender.
*
* @return groupTransactionEvents boolean property for this GatewaySender
* @deprecated use {@link #getType()}.
*
*/
boolean mustGroupTransactionEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface GatewaySenderFactory {
*/
GatewaySenderFactory setGroupTransactionEvents(boolean groupTransactionEvents);

GatewaySenderFactory setType(String type);

/**
* Adds a <code>GatewayEventFilter</code>
*
Expand Down Expand Up @@ -188,7 +190,8 @@ public interface GatewaySenderFactory {
*
* @param filter The <code>GatewayEventSubstitutionFilter</code>
*/
GatewaySenderFactory setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter);
GatewaySenderFactory setGatewayEventSubstitutionFilter(
GatewayEventSubstitutionFilter<?, ?> filter);

/**
* If true, receiver member id is checked by all dispatcher threads when the connection is
Expand Down
Loading