Skip to content

Commit 5f54622

Browse files
Rexrexcsn
authored andcommitted
Revert "Handling EC2 Health Scheduled Events"
This commit reverts the following Scheduled Events related commits: * edeb81e * 613c43d * 8876b8f * fb9d07b * 13eb8e4 Signed-off-by: Rex <[email protected]>
1 parent edeb81e commit 5f54622

File tree

17 files changed

+141
-1064
lines changed

17 files changed

+141
-1064
lines changed

src/common/schedulers/sge_commands.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import collections
1212
import logging
1313
import re
14-
import subprocess
1514
from xml.etree import ElementTree
1615

1716
from common import sge
@@ -155,22 +154,16 @@ def install_sge_on_compute_nodes(hosts, cluster_user):
155154
return succeeded_hosts
156155

157156

158-
def lock_node(hostname):
157+
def lock_host(hostname):
159158
logging.info("Locking host %s", hostname)
160159
command = ["qmod", "-d", "all.q@{0}".format(hostname)]
161-
try:
162-
run_sge_command(command)
163-
except subprocess.CalledProcessError:
164-
logging.error("Error locking host %s", hostname)
160+
run_sge_command(command)
165161

166162

167-
def unlock_node(hostname):
163+
def unlock_host(hostname):
168164
logging.info("Unlocking host %s", hostname)
169165
command = ["qmod", "-e", "all.q@{0}".format(hostname)]
170-
try:
171-
run_sge_command(command)
172-
except subprocess.CalledProcessError:
173-
logging.error("Error unlocking host %s", hostname)
166+
run_sge_command(command)
174167

175168

176169
def _run_sge_command_for_multiple_hosts(hosts, command_template):

src/common/schedulers/slurm_commands.py

Lines changed: 1 addition & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,11 @@
1212

1313
import logging
1414
import math
15-
import subprocess
1615
from textwrap import wrap
1716

1817
from common.schedulers.converters import ComparableObject, from_table_to_obj_list
19-
from common.utils import check_command_output, run_command
18+
from common.utils import check_command_output
2019

21-
SLURM_NODE_ERROR_STATES = ["down", "drained", "fail"]
22-
SLURM_NODE_DISABLED_STATES = ["draining", "drained"]
2320
PENDING_RESOURCES_REASONS = [
2421
"Resources",
2522
"Nodes required for job are DOWN, DRAINED or reserved for jobs in higher priority partitions",
@@ -62,16 +59,6 @@ def get_jobs_info(job_state_filter=None):
6259
return SlurmJob.from_table(output)
6360

6461

65-
def get_node_state(hostname):
66-
# retrieves the state of a specific node
67-
# https://slurm.schedmd.com/sinfo.html#lbAG
68-
# Output format:
69-
# down*
70-
command = "/opt/slurm/bin/sinfo --noheader -o '%T' -n {}".format(hostname)
71-
output = check_command_output(command).strip()
72-
return output
73-
74-
7562
def get_pending_jobs_info(
7663
instance_properties=None, max_nodes_filter=None, filter_by_pending_reasons=None, log_pending_jobs=True
7764
):
@@ -282,39 +269,6 @@ def job_runnable_on_given_node(job_resources_per_node, resources_available, exis
282269
return True
283270

284271

285-
def lock_node(hostname, reason=None):
286-
# hostname format: ip-10-0-0-114.eu-west-1.compute.internal
287-
hostname = hostname.split(".")[0]
288-
logging.info("Locking host %s", hostname)
289-
command = [
290-
"/opt/slurm/bin/scontrol",
291-
"update",
292-
"NodeName={0}".format(hostname),
293-
"State=DRAIN",
294-
"Reason={}".format(reason if reason else '"Shutting down"'),
295-
]
296-
try:
297-
run_command(command)
298-
except subprocess.CalledProcessError:
299-
logging.error("Error locking host %s", hostname)
300-
301-
302-
def unlock_node(hostname, reason=None):
303-
hostname = hostname.split(".")[0]
304-
logging.info("Unlocking host %s", hostname)
305-
command = [
306-
"/opt/slurm/bin/scontrol",
307-
"update",
308-
"NodeName={0}".format(hostname),
309-
"State=RESUME",
310-
"Reason={}".format(reason if reason else '"Unlocking"'),
311-
]
312-
try:
313-
run_command(command)
314-
except subprocess.CalledProcessError:
315-
logging.error("Error unlocking host %s", hostname)
316-
317-
318272
class SlurmJob(ComparableObject):
319273
# This is the format after being processed by reformat_table function
320274
# JOBID|ST|NODES|CPUS|TASKS|CPUS_PER_TASK|MIN_CPUS|REASON|TRES_PER_JOB|TRES_PER_TASK

src/common/schedulers/torque_commands.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
from common.schedulers.converters import ComparableObject, from_xml_to_obj
1818
from common.utils import check_command_output, run_command
1919

20-
TORQUE_NODE_ERROR_STATES = ("down", "unknown")
21-
TORQUE_NODE_DISABLED_STATE = "offline"
20+
TORQUE_NODE_ERROR_STATES = ("down", "offline", "unknown")
2221
TORQUE_NODE_STATES = (
2322
"free",
2423
"offline",
@@ -130,19 +129,6 @@ def delete_nodes(hosts):
130129
return succeeded_hosts
131130

132131

133-
def lock_node(hostname, unlock=False, note=None):
134-
# hostname format: ip-10-0-0-114.eu-west-1.compute.internal
135-
hostname = hostname.split(".")[0]
136-
mod = unlock and "-c" or "-o"
137-
command = [TORQUE_BIN_DIR + "pbsnodes", mod, hostname]
138-
if note:
139-
command.append("-N '{}'".format(note))
140-
try:
141-
run_command(command)
142-
except subprocess.CalledProcessError:
143-
logging.error("Error %s host %s", "unlocking" if unlock else "locking", hostname)
144-
145-
146132
def update_cluster_limits(max_nodes, node_slots):
147133
try:
148134
logging.info("Updating cluster limits: max_nodes=%d, node_slots=%d", max_nodes, node_slots)

src/common/utils.py

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -39,32 +39,10 @@ class CriticalError(Exception):
3939
class EventType(Enum):
4040
ADD = "ADD"
4141
REMOVE = "REMOVE"
42-
HEALTH = "HEALTH"
4342

4443

45-
class QueueType(Enum):
46-
instance = "instance"
47-
health = "health"
48-
49-
50-
SUPPORTED_EVENTTYPE_FOR_QUEUETYPE = {
51-
QueueType.instance: [EventType.ADD, EventType.REMOVE],
52-
QueueType.health: [EventType.HEALTH],
53-
}
54-
5544
Host = collections.namedtuple("Host", ["instance_id", "hostname", "slots", "gpus"])
5645
UpdateEvent = collections.namedtuple("UpdateEvent", ["action", "message", "host"])
57-
INSTANCE_ALIVE_STATES = ["pending", "running"]
58-
TREAT_DISABLED_AS_DOWN_WARNING = (
59-
"Considering node as down because there is no job running and node is in a disabled state. "
60-
"The node could have been put into this disabled state automatically by ParallelCluster "
61-
"in response to an EC2 scheduled maintenance event, or manually by the system administrator."
62-
)
63-
POSSIBLE_LOCK_CONFLICT_WARNING = (
64-
"Instance %s/%s currently in disabled state %s. "
65-
"Risk of lock being released by nodewatcher if locking the node because of scheduled event now. "
66-
"Marking event as failed to retry later."
67-
)
6846

6947

7048
def load_module(module):
@@ -411,33 +389,3 @@ def retrieve_max_cluster_size(region, proxy_config, asg_name, fallback):
411389
)
412390
log.critical(error_msg)
413391
raise CriticalError(error_msg)
414-
415-
416-
def get_cluster_instance_info(stack_name, region, proxy_config, instance_ids=None, include_master=False):
417-
"""Return a list of instance_ids that are in the cluster."""
418-
try:
419-
instances_in_cluster = []
420-
ec2_client = boto3.client("ec2", region_name=region, config=proxy_config)
421-
instance_paginator = ec2_client.get_paginator("describe_instances")
422-
nodes_to_include = ["Compute", "Master"] if include_master else ["Compute"]
423-
function_args = {
424-
"Filters": [
425-
{"Name": "tag:Application", "Values": [stack_name]},
426-
{"Name": "tag:Name", "Values": nodes_to_include},
427-
]
428-
}
429-
if instance_ids:
430-
function_args["InstanceIds"] = instance_ids
431-
for page in instance_paginator.paginate(**function_args):
432-
for reservation in page.get("Reservations"):
433-
for instance in reservation.get("Instances"):
434-
is_alive = instance.get("State").get("Name") in INSTANCE_ALIVE_STATES
435-
instance_id = instance.get("InstanceId")
436-
if is_alive:
437-
instances_in_cluster.append(instance_id)
438-
439-
return instances_in_cluster
440-
441-
except Exception as e:
442-
logging.error("Failed retrieving instance_ids for cluster %s with exception: %s", stack_name, e)
443-
raise

src/nodewatcher/plugins/sge.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@
1111

1212
import logging
1313
import socket
14+
import subprocess
1415

1516
from common.schedulers.sge_commands import (
16-
SGE_DISABLED_STATE,
1717
SGE_ERROR_STATES,
1818
SGE_HOLD_STATE,
1919
get_compute_nodes_info,
2020
get_jobs_info,
2121
get_pending_jobs_info,
22-
lock_node,
23-
unlock_node,
2422
)
25-
from common.utils import TREAT_DISABLED_AS_DOWN_WARNING, check_command_output
23+
from common.schedulers.sge_commands import lock_host as sge_lock_host
24+
from common.schedulers.sge_commands import unlock_host
25+
from common.utils import check_command_output
2626

2727
log = logging.getLogger(__name__)
2828

@@ -58,10 +58,13 @@ def has_pending_jobs(instance_properties, max_size):
5858

5959

6060
def lock_host(hostname, unlock=False):
61-
if unlock:
62-
unlock_node(hostname)
63-
else:
64-
lock_node(hostname)
61+
try:
62+
if unlock:
63+
unlock_host(hostname)
64+
else:
65+
sge_lock_host(hostname)
66+
except subprocess.CalledProcessError:
67+
log.error("Error %s host %s", "unlocking" if unlock else "locking", hostname)
6568

6669

6770
def is_node_down():
@@ -83,12 +86,7 @@ def is_node_down():
8386

8487
node = nodes.get(host_fqdn, nodes.get(hostname))
8588
log.info("Node is in state: '{0}'".format(node.state))
86-
# check if any error state is present
8789
if all(error_state not in node.state for error_state in SGE_ERROR_STATES):
88-
# Consider the node down if it's in disabled state and there is no job running
89-
if SGE_DISABLED_STATE in node.state and not has_jobs(hostname):
90-
log.warning(TREAT_DISABLED_AS_DOWN_WARNING)
91-
return True
9290
return False
9391
except Exception as e:
9492
log.error("Failed when checking if node is down with exception %s. Reporting node as down.", e)

src/nodewatcher/plugins/slurm.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,8 @@
1212
import logging
1313
import subprocess
1414

15-
from common.schedulers.slurm_commands import (
16-
PENDING_RESOURCES_REASONS,
17-
SLURM_NODE_ERROR_STATES,
18-
get_node_state,
19-
get_pending_jobs_info,
20-
lock_node,
21-
unlock_node,
22-
)
23-
from common.utils import TREAT_DISABLED_AS_DOWN_WARNING, check_command_output
15+
from common.schedulers.slurm_commands import PENDING_RESOURCES_REASONS, get_pending_jobs_info
16+
from common.utils import check_command_output, run_command
2417

2518
log = logging.getLogger(__name__)
2619

@@ -61,22 +54,44 @@ def has_pending_jobs(instance_properties, max_size):
6154

6255

6356
def lock_host(hostname, unlock=False):
57+
# hostname format: ip-10-0-0-114.eu-west-1.compute.internal
58+
hostname = hostname.split(".")[0]
6459
if unlock:
65-
unlock_node(hostname)
60+
log.info("Unlocking host %s", hostname)
61+
command = [
62+
"/opt/slurm/bin/scontrol",
63+
"update",
64+
"NodeName={0}".format(hostname),
65+
"State=RESUME",
66+
'Reason="Unlocking"',
67+
]
6668
else:
67-
lock_node(hostname)
69+
log.info("Locking host %s", hostname)
70+
command = [
71+
"/opt/slurm/bin/scontrol",
72+
"update",
73+
"NodeName={0}".format(hostname),
74+
"State=DRAIN",
75+
'Reason="Shutting down"',
76+
]
77+
try:
78+
run_command(command)
79+
except subprocess.CalledProcessError:
80+
log.error("Error %s host %s", "unlocking" if unlock else "locking", hostname)
6881

6982

7083
def is_node_down():
7184
"""Check if node is down according to scheduler."""
7285
try:
73-
hostname = check_command_output("hostname").strip()
74-
output = get_node_state(hostname)
86+
# retrieves the state of a specific node
87+
# https://slurm.schedmd.com/sinfo.html#lbAG
88+
# Output format:
89+
# down*
90+
command = "/bin/bash -c \"/opt/slurm/bin/sinfo --noheader -o '%T' -n $(hostname)\""
91+
output = check_command_output(command).strip()
7592
log.info("Node is in state: '{0}'".format(output))
76-
if output and all(state not in output for state in SLURM_NODE_ERROR_STATES):
93+
if output and all(state not in output for state in ["down", "drained", "fail"]):
7794
return False
78-
if output and "drained" in output:
79-
log.warning(TREAT_DISABLED_AS_DOWN_WARNING)
8095
except Exception as e:
8196
log.error("Failed when checking if node is down with exception %s. Reporting node as down.", e)
8297

src/nodewatcher/plugins/torque.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@
1010
# limitations under the License.
1111

1212
import logging
13+
import subprocess
1314

1415
from common.schedulers.torque_commands import (
15-
TORQUE_NODE_DISABLED_STATE,
16+
TORQUE_BIN_DIR,
1617
TORQUE_NODE_ERROR_STATES,
1718
TORQUE_RUNNING_JOB_STATE,
1819
TORQUE_SUSPENDED_JOB_STATE,
1920
get_compute_nodes_info,
2021
get_jobs_info,
2122
get_pending_jobs_info,
22-
lock_node,
2323
)
24-
from common.utils import TREAT_DISABLED_AS_DOWN_WARNING, check_command_output
24+
from common.utils import check_command_output, run_command
2525

2626
log = logging.getLogger(__name__)
2727

@@ -56,7 +56,14 @@ def has_pending_jobs(instance_properties, max_size):
5656

5757

5858
def lock_host(hostname, unlock=False):
59-
lock_node(hostname, unlock=unlock)
59+
# hostname format: ip-10-0-0-114.eu-west-1.compute.internal
60+
hostname = hostname.split(".")[0]
61+
mod = unlock and "-c" or "-o"
62+
command = [TORQUE_BIN_DIR + "pbsnodes", mod, hostname]
63+
try:
64+
run_command(command)
65+
except subprocess.CalledProcessError:
66+
log.error("Error %s host %s", "unlocking" if unlock else "locking", hostname)
6067

6168

6269
def is_node_down():
@@ -67,11 +74,6 @@ def is_node_down():
6774
if node:
6875
log.info("Node is in state: '{0}'".format(node.state))
6976
if all(error_state not in node.state for error_state in TORQUE_NODE_ERROR_STATES):
70-
# Consider the node down if it is in Disabled state placed by scheduled event
71-
# and does not have job
72-
if TORQUE_NODE_DISABLED_STATE in node.state and not has_jobs(hostname):
73-
log.warning(TREAT_DISABLED_AS_DOWN_WARNING)
74-
return True
7577
return False
7678
else:
7779
log.warning("Node is not attached to scheduler. Reporting as down")

0 commit comments

Comments
 (0)