Skip to content

Commit c6cda57

Browse files
committed
Harden StatusHA check to fail if distribution coordinator MBean is missing
1 parent ee4f553 commit c6cda57

File tree

2 files changed

+53
-7
lines changed

2 files changed

+53
-7
lines changed

java/coherence-utils/src/main/java/com/oracle/coherence/k8s/OperatorRestServer.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.Collections;
1515
import java.util.Enumeration;
1616
import java.util.HashMap;
17+
import java.util.HashSet;
1718
import java.util.Map;
1819
import java.util.Objects;
1920
import java.util.Optional;
@@ -102,7 +103,12 @@ public class OperatorRestServer {
102103
/**
103104
* The MBean name of the Service MBean.
104105
*/
105-
public static final String MBEAN_SERVICE = "%s:" + Registry.SERVICE_TYPE
106+
public static final String MBEAN_SERVICE = Registry.SERVICE_TYPE + ",name=*,nodeId=*";
107+
108+
/**
109+
* The MBean name of the Service MBean pattern.
110+
*/
111+
public static final String MBEAN_SERVICE_PATTERN = "%s:" + Registry.SERVICE_TYPE
106112
+ ",name=%s,nodeId=%d";
107113

108114
/**
@@ -133,8 +139,8 @@ public class OperatorRestServer {
133139
/**
134140
* The MBean attribute to check the state of a partitioned cache service.
135141
*/
136-
public static final String[] CACHE_SERVICE_ATTRIBUTES = new String[] {"StorageEnabled", "MemberCount",
137-
"OwnedPartitionsPrimary", "PartitionsAll"};
142+
public static final String[] CACHE_SERVICE_ATTRIBUTES = new String[] {"Type", "StorageEnabled", "MemberCount",
143+
"OwnedPartitionsPrimary", "PartitionsAll", "StorageEnabledCount"};
138144

139145
/**
140146
* The value of the Status HA attribute to signify endangered.
@@ -545,6 +551,29 @@ boolean isStatusHA(String exclusions) {
545551
Cluster cluster = clusterSupplier.get();
546552
if (cluster != null && cluster.isRunning()) {
547553
int id = cluster.getLocalMember().getId();
554+
555+
Set<String> cacheServices = getDistributedCacheServiceNames();
556+
Set<String> distributionCoordinators = getPartitionAssignmentMBeans();
557+
558+
// Ensure we have a DistributionCoordinator for all cache services
559+
// If the senior just died we might not have one
560+
if (cacheServices.size() != distributionCoordinators.size()) {
561+
Set<String> coords = new HashSet<>();
562+
for (String s : distributionCoordinators) {
563+
ObjectName objectName = ObjectName.getInstance(s);
564+
coords.add(objectName.getKeyProperty("service"));
565+
}
566+
for (String name : cacheServices) {
567+
if (!coords.contains(name)) {
568+
err("CoherenceOperator: StatusHA check failed - No DistributionCoordinator "
569+
+ "for DistributedCache service " + name);
570+
}
571+
}
572+
err("CoherenceOperator: StatusHA check failed - DistributedCache service count " + cacheServices.size()
573+
+ " does not match DistributionCoordinator count " + distributionCoordinators.size());
574+
return false;
575+
}
576+
548577
for (String mBean : getPartitionAssignmentMBeans()) {
549578
if (allowEndangered != null && allowEndangered.stream().anyMatch(mBean::contains)) {
550579
// this service is allowed to be endangered so skip it.
@@ -605,7 +634,7 @@ private boolean isCacheServiceSafe(String mBean, int memberId) throws MalformedO
605634
ObjectName objectName = ObjectName.getInstance(mBean);
606635
String domain = objectName.getDomain();
607636
String serviceName = objectName.getKeyProperty("service");
608-
String serviceMBean = String.format(MBEAN_SERVICE, domain, serviceName, memberId);
637+
String serviceMBean = String.format(MBEAN_SERVICE_PATTERN, domain, serviceName, memberId);
609638
Map<String, Object> attributes = getMBeanAttributes(serviceMBean, CACHE_SERVICE_ATTRIBUTES);
610639
Boolean storageEnabled = (Boolean) attributes.get(ATTRIB_STORAGE_ENABLED);
611640
Integer memberCount = (Integer) attributes.get(ATTRIB_MEMBER_COUNT);
@@ -729,6 +758,23 @@ private Set<String> getPartitionAssignmentMBeans() {
729758
.orElse(Collections.emptySet());
730759
}
731760

761+
private Set<String> getDistributedCacheServiceNames() throws MalformedObjectNameException {
762+
Set<String> cacheServices = new HashSet<>();
763+
Set<String> set = getMBeanServerProxy()
764+
.map(p -> p.queryNames(MBEAN_SERVICE, null))
765+
.orElse(Collections.emptySet());
766+
767+
for (String mBean : set) {
768+
Map<String, Object> attributes = getMBeanAttributes(mBean, new String[]{"Type"});
769+
String type = (String) attributes.get("type");
770+
if ("DistributedCache".equals(type)) {
771+
ObjectName objectName = new ObjectName(mBean);
772+
cacheServices.add(objectName.getKeyProperty("name"));
773+
}
774+
}
775+
return cacheServices;
776+
}
777+
732778
private Set<String> getPersistenceCoordinatorMBeans() {
733779
return getMBeanServerProxy()
734780
.map(p -> p.queryNames(MBEAN_PERSISTENCE_COORDINATOR, null))

test/e2e/remote/suspend_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestNotSuspendServicesWhenSuspendDisabled(t *testing.T) {
9696
// assert that the cache service is suspended
9797
svc, err := ManagementOverRestRequest(&c, "/management/coherence/cluster/services/PartitionedCache")
9898
g.Expect(err).NotTo(HaveOccurred())
99-
g.Expect(svc["quorumStatus"]).To(ContainElement("Suspended"))
99+
g.Expect(svc["quorumStatus"]).NotTo(ContainElement("Suspended"))
100100

101101
// remove the test finalizer which should then let everything be deleted
102102
err = removeAllFinalizers(&c)
@@ -198,7 +198,7 @@ func TestNotSuspendServicesOnScaleDownToZeroIfSuspendDisabled(t *testing.T) {
198198
// assert that the cache service is suspended
199199
svc, err := ManagementOverRestRequest(&c, "/management/coherence/cluster/services/PartitionedCache")
200200
g.Expect(err).NotTo(HaveOccurred())
201-
g.Expect(svc["quorumStatus"]).To(ContainElement("Suspended"))
201+
g.Expect(svc["quorumStatus"]).NotTo(ContainElement("Suspended"))
202202

203203
// remove the test finalizer from the StatefulSet and Coherence deployment which should then let everything be deleted
204204
err = removeAllFinalizers(sts)
@@ -247,7 +247,7 @@ func TestNotSuspendServicesInMultipleDeployments(t *testing.T) {
247247
// assert that the cache service is NOT suspended
248248
svc, err := ManagementOverRestRequest(&cOne, "/management/coherence/cluster/services/PartitionedCache")
249249
g.Expect(err).NotTo(HaveOccurred())
250-
g.Expect(svc["quorumStatus"]).To(ContainElement("Suspended"))
250+
g.Expect(svc["quorumStatus"]).NotTo(ContainElement("Suspended"))
251251
}
252252

253253
func waitForFinalizerTasks(n types.NamespacedName) error {

0 commit comments

Comments
 (0)