Skip to content

Commit

Permalink
GEODE-8856: Persist gateway-sender startup-action
Browse files Browse the repository at this point in the history
New startup-action parameter with values "stop", "pause" and "start" is now persisted
during the runtime when following commands are issued:

    pause gateway-sender  --> startup-action="pause"
    stop gateway-sender   --> startup-action="stop"
    start gateway-sender  --> startup-action="start"
    resume gateway-sender --> startup-action="start"

Parameter is not updated and persisted when commands are executed per member as
cluster configuration is not updated in that case.

New startup-action parameter will now inter-work with manual-start in a following way:

  - If manual-start="true" and startup-action parameter is missing, then gateway sender
    will require manual start (same as before).
  - If manual-start is not set (or "false") and startup-action parameter is missing, then
    gateway sender will be started automatically (same as before).
  - If parameter startup-action is available in cluster configuration at startup,
    then gateway-sender will try to reach that state regardless of manual-start
    parameter value.
  • Loading branch information
jvarenina committed Sep 15, 2022
1 parent 0b0c6f8 commit 5537c05
Show file tree
Hide file tree
Showing 61 changed files with 3,664 additions and 432 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ javadoc/org/apache/geode/cache/wan/EventSequenceID.html
javadoc/org/apache/geode/cache/wan/GatewayEventFilter.html
javadoc/org/apache/geode/cache/wan/GatewayEventSubstitutionFilter.html
javadoc/org/apache/geode/cache/wan/GatewayQueueEvent.html
javadoc/org/apache/geode/cache/wan/GatewaySenderStartupAction.html
javadoc/org/apache/geode/cache/wan/GatewayReceiver.html
javadoc/org/apache/geode/cache/wan/GatewayReceiverFactory.html
javadoc/org/apache/geode/cache/wan/GatewaySender.OrderPolicy.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,4 @@ org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut
org/apache/geode/internal/cache/wan/GatewaySenderEventImpl$TransactionMetadataDisposition
org/apache/geode/internal/cache/tier/InterestType
org/apache/geode/internal/cache/tier/MessageType
org/apache/geode/cache/wan/GatewaySenderStartupAction
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public AsyncEventQueue create(String asyncQueueId, AsyncEventListener listener)
asyncEventQueue = asyncEventQueueImpl;
cache.addAsyncEventQueue(asyncEventQueueImpl);
if (pauseEventsDispatching) {
sender.setStartEventProcessorInPausedState();
sender.setStartEventProcessor(true);
}
if (!gatewaySenderAttributes.isManualStart()) {
sender.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public ParallelAsyncEventQueueImpl(InternalCache cache, StatisticsFactory statis
isForInternalUse = true;
}

@Override
public void recoverInStoppedState() {}

@Override
public void start() {
start(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public SerialAsyncEventQueueImpl(InternalCache cache, StatisticsFactory statisti
}
}

@Override
public void recoverInStoppedState() {}

@Override
public void start() {
start(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2760,6 +2760,8 @@ public static class GatewaySender {
protected String batchTimeInterval;
@XmlAttribute(name = "enable-persistence")
protected Boolean enablePersistence;
@XmlAttribute(name = "startup-action")
protected String startupAction;
@XmlAttribute(name = "disk-store-name")
protected String diskStoreName;
@XmlAttribute(name = "disk-synchronous")
Expand Down Expand Up @@ -2877,6 +2879,30 @@ public String getId() {
return id;
}

/**
* Gets the value of the startup-action property.
*
* possible object is
* {@link String }
*
* @return the gateway-sender startup action.
*/
public String getStartupAction() {
return startupAction;
}

/**
* Sets the value of the startup-action property.
*
* allowed object is
* {@link String }
*
* @param value gateway-sender startup action
*/
public void setStartupAction(String value) {
this.startupAction = value;
}

/**
* Sets the value of the id property.
*
Expand Down Expand Up @@ -2917,7 +2943,6 @@ public Boolean mustGroupTransactionEvents() {
return groupTransactionEvents;
}


public void setGroupTransactionEvents(Boolean value) {
groupTransactionEvents = value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan;

/*
* GatewaySenderStartupAction is persisted in cluster configuration when start, stop, pause or
* resume gateway-sender command is successfully executed. At startup member will read persisted
* startup-action parameter and act accordingly.
*/
public enum GatewaySenderStartupAction {
/*
* This action ("start") is persisted in cluster configuration after
* start or resume gateway-sender command is successfully executed.
* At startup member will start gateway sender. When set then
* this parameter has advantage over manual-start parameter.
*/
START("start"),
/*
* This action ("stop") is persisted in cluster configuration after
* stop gateway-sender command is successfully executed. At startup
* member will not start gateway-sender, but only recover
* gateway queues from persistent storage if needed. When set then
* this parameter has advantage over manual-start parameter.
*/
STOP("stop"),
/*
* This action ("pause") is persisted in cluster configuration after
* pause gateway-sender command is successfully executed. At startup
* member will start gateway-sender in paused state. When set then
* this parameter has advantage over manual-start parameter.
*/
PAUSE("pause"),
/*
* Used when startup-action parameter is not available in cluster configuration.
*/
NONE("none");

private String action;

GatewaySenderStartupAction(String action) {
this.action = action;
}

@Override
public String toString() {
return "GatewaySenderStartupAction {" +
"action='" + action + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,6 @@ boolean recoverPersistentBuckets() {
*/
PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(partitionedRegion);


// Check if the leader region or some child shadow PR region is persistent
// and return the first persistent region found
PartitionedRegion persistentLeader = getPersistentLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderStartupAction;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
Expand Down Expand Up @@ -242,6 +243,7 @@
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.logging.log4j.LogMarker;
Expand Down Expand Up @@ -1222,10 +1224,13 @@ public void postCreateRegion() {
* get the ParallelGatewaySender to create the colocated partitioned region for this
* region.
*/
InternalGatewaySender senderImpl = (InternalGatewaySender) sender;
if (sender.isRunning()) {
AbstractGatewaySender senderImpl = (AbstractGatewaySender) sender;
((ConcurrentParallelGatewaySenderQueue) senderImpl.getQueues()
.toArray(new RegionQueue[1])[0]).addShadowPartitionedRegionForUserPR(this);
} else if (GatewaySenderStartupAction.STOP == senderImpl
.calculateStartupActionForGatewaySender()) {
senderImpl.recoverInStoppedState();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
Expand All @@ -49,6 +48,7 @@
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderStartupAction;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.GatewayCancelledException;
import org.apache.geode.distributed.internal.DistributionAdvisee;
Expand Down Expand Up @@ -154,6 +154,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di

private int serialNumber;

protected GatewaySenderStartupAction startupAction;

protected GatewaySenderStats statistics;

private Stopper stopper;
Expand Down Expand Up @@ -186,6 +188,19 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di

protected volatile ConcurrentLinkedQueue<EntryEventImpl> tmpDroppedEvents =
new ConcurrentLinkedQueue<>();

/**
* Contains wan replication events that were dropped by parallel gateway senders.
* Activate this hook by setting system property <code>ENABLE_TEST_HOOK_TEMP_DROPPED_EVENTS</code>
*/
private volatile ConcurrentLinkedQueue<EntryEventImpl> testHookTempDroppedEvents;

/**
* Only used for testing purpose. This property enables test hook which collects all
* wan replication events that are dropped by parallel gateway senders.
*/
private static final boolean ENABLE_TEST_HOOK_TEMP_DROPPED_EVENTS =
Boolean.getBoolean("enable-test-hook-temp-dropped-events");
/**
* The number of seconds to wait before stopping the GatewaySender. Default is 0 seconds.
*/
Expand Down Expand Up @@ -270,7 +285,7 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
transFilters = Collections.unmodifiableList(attrs.getGatewayTransportFilters());
listeners = attrs.getAsyncEventListeners();
substitutionFilter = attrs.getGatewayEventSubstitutionFilter();
locatorDiscoveryCallback = attrs.getGatewayLocatoDiscoveryCallback();
locatorDiscoveryCallback = attrs.getGatewayLocatorDiscoveryCallback();
isDiskSynchronous = attrs.isDiskSynchronous();
policy = attrs.getOrderPolicy();
dispatcherThreads = attrs.getDispatcherThreads();
Expand All @@ -293,8 +308,10 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
}
initializeEventIdIndex();
}

isBucketSorted = attrs.isBucketSorted();
forwardExpirationDestroy = attrs.isForwardExpirationDestroy();
startupAction = attrs.getStartupAction();
}

public GatewaySenderAdvisor getSenderAdvisor() {
Expand Down Expand Up @@ -415,6 +432,31 @@ public int getSocketReadTimeout() {
return socketReadTimeout;
}

@Override
public GatewaySenderStartupAction getStartupAction() {
return startupAction;
}

/**
* This method returns startup action of gateway-sender. The startup action is calculated
* based on the startup-action (please check <code>{@link GatewaySenderStartupAction}</code>) and
* manual-start parameters. If set, then startup-action parameter has advantage over
* the manual-start parameter.
*
* @see GatewaySenderStartupAction
*/
public GatewaySenderStartupAction calculateStartupActionForGatewaySender() {
// If startup-action parameter is not available, then use manual-start parameter
// to determine initial state of gateway-sender
if (this.getStartupAction() == GatewaySenderStartupAction.NONE) {
if (!this.isManualStart()) {
return GatewaySenderStartupAction.START;
}
return GatewaySenderStartupAction.STOP;
}
return this.getStartupAction();
}

@Override
public boolean isBatchConflationEnabled() {
return isConflation;
Expand Down Expand Up @@ -586,6 +628,9 @@ public boolean isForInternalUse() {
@Override
public abstract void start();

@Override
public abstract void recoverInStoppedState();

@Override
public abstract void startWithCleanQueue();

Expand Down Expand Up @@ -922,8 +967,8 @@ public boolean isStartEventProcessorInPausedState() {
}

@Override
public void setStartEventProcessorInPausedState() {
startEventProcessorInPausedState = true;
public void setStartEventProcessor(boolean isPaused) {
startEventProcessorInPausedState = isPaused;
}

/**
Expand Down Expand Up @@ -1059,7 +1104,7 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
}

// this filter is defined by Asif which exist in old wan too. new wan has
// other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
// other GatewayEventFilter. Do we need to get rid of this filter. Cheetah is
// not considering this filter
if (!filter.enqueueEvent(event)) {
stats.incEventsFiltered();
Expand Down Expand Up @@ -1241,17 +1286,26 @@ private void recordDroppedEvent(EntryEventImpl event) {
eventProcessor.registerEventDroppedInPrimaryQueue(event);
} else {
tmpDroppedEvents.add(event);
if (ENABLE_TEST_HOOK_TEMP_DROPPED_EVENTS) {
if (testHookTempDroppedEvents == null) {
testHookTempDroppedEvents = new ConcurrentLinkedQueue<>();
}
testHookTempDroppedEvents.add(event);
}
if (logger.isDebugEnabled()) {
logger.debug("added to tmpDroppedEvents event: {}", event);
}
}
}

@VisibleForTesting
int getTmpDroppedEventSize() {
protected int getTempDroppedEventSize() {
return tmpDroppedEvents.size();
}

protected int getTempDroppedEventsHookSize() {
return testHookTempDroppedEvents.size();
}

/**
* During sender is getting started, if there are any cache operation on queue then that event
* will be stored in temp queue. Once sender is started, these event from tmp queue will be added
Expand All @@ -1267,10 +1321,7 @@ int getTmpDroppedEventSize() {
public void enqueueTempEvents() {
if (eventProcessor != null) {// Fix for defect #47308
// process tmpDroppedEvents
EntryEventImpl droppedEvent;
while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
}
processTempDroppedEvents();

TmpQueueEvent nextEvent = null;
final GatewaySenderStats stats = getStatistics();
Expand Down Expand Up @@ -1304,6 +1355,22 @@ this, getId(), nextEvent.getOperation(), nextEvent),
}
}

/**
* During sender is recovered in stopped state, if there are any cache operations while
* queue and event processor is being created then these events should be stored in
* tmpDroppedEvents temporary queue. Once event processor is created then queue will be
* drained and ParallelQueueRemovalMessage will be sent.
*/
public void processTempDroppedEvents() {
if (this.eventProcessor != null) {
// process tmpDroppedEvents
EntryEventImpl droppedEvent;
while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
}
}
}

/**
* Removes the EntryEventImpl, whose tailKey matches with the provided tailKey, from
* tmpQueueEvents.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public int getTotalQueueSize() {
return getQueue().size();
}

protected abstract void initializeMessageQueue(String id, boolean cleanQueues);
protected abstract void initializeMessageQueue(String id, boolean cleanQueues, boolean isStopped);

public void enqueueEvent(EnumListenerEvent operation, EntryEvent<?, ?> event,
Object substituteValue) throws IOException, CacheException {
Expand Down
Loading

0 comments on commit 5537c05

Please sign in to comment.