From c6cda57f63c3cee80727ee434df7f7888bfa5ada Mon Sep 17 00:00:00 2001 From: thegridman Date: Fri, 5 Mar 2021 00:40:14 +0300 Subject: [PATCH] Harden StatusHA check to fail if distribution coordinator MBean is missing --- .../coherence/k8s/OperatorRestServer.java | 54 +++++++++++++++++-- test/e2e/remote/suspend_test.go | 6 +-- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/java/coherence-utils/src/main/java/com/oracle/coherence/k8s/OperatorRestServer.java b/java/coherence-utils/src/main/java/com/oracle/coherence/k8s/OperatorRestServer.java index 5e2571a18..14a3d872f 100644 --- a/java/coherence-utils/src/main/java/com/oracle/coherence/k8s/OperatorRestServer.java +++ b/java/coherence-utils/src/main/java/com/oracle/coherence/k8s/OperatorRestServer.java @@ -14,6 +14,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -102,7 +103,12 @@ public class OperatorRestServer { /** * The MBean name of the Service MBean. */ - public static final String MBEAN_SERVICE = "%s:" + Registry.SERVICE_TYPE + public static final String MBEAN_SERVICE = Registry.SERVICE_TYPE + ",name=*,nodeId=*"; + + /** + * The MBean name of the Service MBean pattern. + */ + public static final String MBEAN_SERVICE_PATTERN = "%s:" + Registry.SERVICE_TYPE + ",name=%s,nodeId=%d"; /** @@ -133,8 +139,8 @@ public class OperatorRestServer { /** * The MBean attribute to check the state of a partitioned cache service. */ - public static final String[] CACHE_SERVICE_ATTRIBUTES = new String[] {"StorageEnabled", "MemberCount", - "OwnedPartitionsPrimary", "PartitionsAll"}; + public static final String[] CACHE_SERVICE_ATTRIBUTES = new String[] {"Type", "StorageEnabled", "MemberCount", + "OwnedPartitionsPrimary", "PartitionsAll", "StorageEnabledCount"}; /** * The value of the Status HA attribute to signify endangered. @@ -545,6 +551,29 @@ boolean isStatusHA(String exclusions) { Cluster cluster = clusterSupplier.get(); if (cluster != null && cluster.isRunning()) { int id = cluster.getLocalMember().getId(); + + Set cacheServices = getDistributedCacheServiceNames(); + Set distributionCoordinators = getPartitionAssignmentMBeans(); + + // Ensure we have a DistributionCoordinator for all cache services + // If the senior just died we might not have one + if (cacheServices.size() != distributionCoordinators.size()) { + Set coords = new HashSet<>(); + for (String s : distributionCoordinators) { + ObjectName objectName = ObjectName.getInstance(s); + coords.add(objectName.getKeyProperty("service")); + } + for (String name : cacheServices) { + if (!coords.contains(name)) { + err("CoherenceOperator: StatusHA check failed - No DistributionCoordinator " + + "for DistributedCache service " + name); + } + } + err("CoherenceOperator: StatusHA check failed - DistributedCache service count " + cacheServices.size() + + " does not match DistributionCoordinator count " + distributionCoordinators.size()); + return false; + } + for (String mBean : getPartitionAssignmentMBeans()) { if (allowEndangered != null && allowEndangered.stream().anyMatch(mBean::contains)) { // this service is allowed to be endangered so skip it. @@ -605,7 +634,7 @@ private boolean isCacheServiceSafe(String mBean, int memberId) throws MalformedO ObjectName objectName = ObjectName.getInstance(mBean); String domain = objectName.getDomain(); String serviceName = objectName.getKeyProperty("service"); - String serviceMBean = String.format(MBEAN_SERVICE, domain, serviceName, memberId); + String serviceMBean = String.format(MBEAN_SERVICE_PATTERN, domain, serviceName, memberId); Map attributes = getMBeanAttributes(serviceMBean, CACHE_SERVICE_ATTRIBUTES); Boolean storageEnabled = (Boolean) attributes.get(ATTRIB_STORAGE_ENABLED); Integer memberCount = (Integer) attributes.get(ATTRIB_MEMBER_COUNT); @@ -729,6 +758,23 @@ private Set getPartitionAssignmentMBeans() { .orElse(Collections.emptySet()); } + private Set getDistributedCacheServiceNames() throws MalformedObjectNameException { + Set cacheServices = new HashSet<>(); + Set set = getMBeanServerProxy() + .map(p -> p.queryNames(MBEAN_SERVICE, null)) + .orElse(Collections.emptySet()); + + for (String mBean : set) { + Map attributes = getMBeanAttributes(mBean, new String[]{"Type"}); + String type = (String) attributes.get("type"); + if ("DistributedCache".equals(type)) { + ObjectName objectName = new ObjectName(mBean); + cacheServices.add(objectName.getKeyProperty("name")); + } + } + return cacheServices; + } + private Set getPersistenceCoordinatorMBeans() { return getMBeanServerProxy() .map(p -> p.queryNames(MBEAN_PERSISTENCE_COORDINATOR, null)) diff --git a/test/e2e/remote/suspend_test.go b/test/e2e/remote/suspend_test.go index ed6ab60a0..c05cb5981 100644 --- a/test/e2e/remote/suspend_test.go +++ b/test/e2e/remote/suspend_test.go @@ -96,7 +96,7 @@ func TestNotSuspendServicesWhenSuspendDisabled(t *testing.T) { // assert that the cache service is suspended svc, err := ManagementOverRestRequest(&c, "/management/coherence/cluster/services/PartitionedCache") g.Expect(err).NotTo(HaveOccurred()) - g.Expect(svc["quorumStatus"]).To(ContainElement("Suspended")) + g.Expect(svc["quorumStatus"]).NotTo(ContainElement("Suspended")) // remove the test finalizer which should then let everything be deleted err = removeAllFinalizers(&c) @@ -198,7 +198,7 @@ func TestNotSuspendServicesOnScaleDownToZeroIfSuspendDisabled(t *testing.T) { // assert that the cache service is suspended svc, err := ManagementOverRestRequest(&c, "/management/coherence/cluster/services/PartitionedCache") g.Expect(err).NotTo(HaveOccurred()) - g.Expect(svc["quorumStatus"]).To(ContainElement("Suspended")) + g.Expect(svc["quorumStatus"]).NotTo(ContainElement("Suspended")) // remove the test finalizer from the StatefulSet and Coherence deployment which should then let everything be deleted err = removeAllFinalizers(sts) @@ -247,7 +247,7 @@ func TestNotSuspendServicesInMultipleDeployments(t *testing.T) { // assert that the cache service is NOT suspended svc, err := ManagementOverRestRequest(&cOne, "/management/coherence/cluster/services/PartitionedCache") g.Expect(err).NotTo(HaveOccurred()) - g.Expect(svc["quorumStatus"]).To(ContainElement("Suspended")) + g.Expect(svc["quorumStatus"]).NotTo(ContainElement("Suspended")) } func waitForFinalizerTasks(n types.NamespacedName) error {