Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP ignore stopping #7191

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di

protected boolean enforceThreadsConnectSameReceiver;

private boolean shadowRegionCreated = false;

protected AbstractGatewaySender() {
statisticsClock = disabledClock();
}
Expand Down Expand Up @@ -1537,6 +1539,14 @@ public String getExpectedReceiverUniqueId() {
return expectedReceiverUniqueId;
}

public void setShadowRegionCreated() {
this.shadowRegionCreated = true;
}

public boolean isShadowRegionCreated() {
return this.shadowRegionCreated;
}

/**
* Has a reference to a GatewayEventImpl and has a timeout value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,11 @@ public void addShadowPartitionedRegionForUserPR(PartitionedRegion userPR,
if (prQ == null) {
return;
}

if (!sender.isShadowRegionCreated()) {
sender.setShadowRegionCreated();
}

// TODO This should not be set on the PR but on the GatewaySender
prQ.enableConflation(sender.isBatchConflationEnabled());
if (isAccessor) {
Expand All @@ -589,6 +594,11 @@ public void addShadowPartitionedRegionForUserPR(PartitionedRegion userPR,
}

} else {

if (!sender.isShadowRegionCreated()) {
sender.setShadowRegionCreated();
}

if (isAccessor) {
return; // return from here if accessor node
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void before() throws Exception {

@After
public void after() {
gfsh.executeAndAssertThat("destroy region --name=parentRegion").statusIsSuccess();
gfsh.executeAndAssertThat(DESTROY + " --if-exists").statusIsSuccess();
gfsh.executeAndAssertThat(DESTROY_PARALLEL + " --if-exists").statusIsSuccess();
exln.remove();
Expand All @@ -118,6 +119,10 @@ public void testCreateSerialGatewaySenderWithDefault() throws Exception {
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();
// verify that server1's event queue has the default value
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
Expand All @@ -142,6 +147,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSize() throws Exception {
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100")
.statusIsSuccess();
Expand Down Expand Up @@ -169,6 +179,11 @@ public void testCreateSerialGatewaySenderAndInvalidAlterBatchSize() throws Excep
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=-10 --alert-threshold=100")
.statusIsError();
Expand Down Expand Up @@ -196,6 +211,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSizeCheckConfig() throws E
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100")
.statusIsSuccess();
Expand Down Expand Up @@ -243,6 +263,11 @@ public void testCreateSerialGatewaySenderAndChangeGroupTransaction() throws Exce
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat("alter gateway-sender --id=sender1 --group-transaction-events=true")
.statusIsError()
.containsOutput("alter-gateway-sender cannot be performed for --group-transaction-events");
Expand All @@ -261,6 +286,11 @@ public void testCreateSerialGatewaySenderAndAlterBatchSizeServerDown() throws Ex
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

server1.stop(false);

gfsh.executeAndAssertThat(
Expand Down Expand Up @@ -296,6 +326,11 @@ public void testCreateSerialGatewaySenderAndAlterEventFiters() throws Exception
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100 --gateway-event-filter="
+ MyGatewayEventFilter.class.getName())
Expand Down Expand Up @@ -365,6 +400,11 @@ public void testCreateSerialGatewaySenderAndAlterEventFitersAndRemove() throws E
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1 --batch-size=200 --alert-threshold=100 --gateway-event-filter="
+ MyGatewayEventFilter.class.getName())
Expand Down Expand Up @@ -432,6 +472,11 @@ public void testCreateParallelGatewaySenderAndAlterBatchSize() throws Exception
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1P");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1P").statusIsSuccess();

gfsh.executeAndAssertThat(
"alter gateway-sender --id=sender1P --batch-size=200 --alert-threshold=100")
.statusIsSuccess();
Expand Down Expand Up @@ -459,6 +504,11 @@ public void testCreateParallelGatewaySenderAndChangeGroupTransaction() throws Ex
gfsh.executeAndAssertThat("list gateways").statusIsSuccess()
.containsOutput("sender1P");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=sender1P").statusIsSuccess();

gfsh.executeAndAssertThat("alter gateway-sender --id=sender1P --group-transaction-events=true")
.statusIsSuccess();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,12 @@ public void testCreateDestroyParallelGatewaySender() {
"GatewaySender \"ln\" created on \"" + SERVER_4 + "\"",
"GatewaySender \"ln\" created on \"" + SERVER_5 + "\"");

gfsh.executeAndAssertThat("create region"
+ " --name=parentRegion"
+ " --type=PARTITION"
+ " --gateway-sender-id=ln").statusIsSuccess();

gfsh.executeAndAssertThat("destroy region --name=parentRegion").statusIsSuccess();
// destroy gateway sender and verify AEQs cleaned up
gfsh.executeAndAssertThat(DESTROY).statusIsSuccess()
.doesNotContainOutput("Did not complete waiting")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public void stop() {
if (!this.isRunning()) {
return;
}

if (!isShadowRegionCreated()) {
return;
}

// Stop the dispatcher
stopProcessing();

Expand Down