Skip to content

Commit edeb81e

Browse files
Rexrexcsn
authored andcommitted
Scheduled Events: Bug Fixes
* Fixed bug in nodewatcher slurm plugin `lock_host`. Added unit test for this function. * Addressed comments from previous review Signed-off-by: Rex <[email protected]>
1 parent 613c43d commit edeb81e

File tree

10 files changed

+61
-42
lines changed

10 files changed

+61
-42
lines changed

src/common/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ class QueueType(Enum):
5555
Host = collections.namedtuple("Host", ["instance_id", "hostname", "slots", "gpus"])
5656
UpdateEvent = collections.namedtuple("UpdateEvent", ["action", "message", "host"])
5757
INSTANCE_ALIVE_STATES = ["pending", "running"]
58+
TREAT_DISABLED_AS_DOWN_WARNING = (
59+
"Considering node as down because there is no job running and node is in a disabled state. "
60+
"The node could have been put into this disabled state automatically by ParallelCluster "
61+
"in response to an EC2 scheduled maintenance event, or manually by the system administrator."
62+
)
63+
POSSIBLE_LOCK_CONFLICT_WARNING = (
64+
"Instance %s/%s currently in disabled state %s. "
65+
"Risk of lock being released by nodewatcher if locking the node because of scheduled event now. "
66+
"Marking event as failed to retry later."
67+
)
5868

5969

6070
def load_module(module):

src/nodewatcher/plugins/sge.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
lock_node,
2323
unlock_node,
2424
)
25-
from common.utils import check_command_output
25+
from common.utils import TREAT_DISABLED_AS_DOWN_WARNING, check_command_output
2626

2727
log = logging.getLogger(__name__)
2828

@@ -87,13 +87,7 @@ def is_node_down():
8787
if all(error_state not in node.state for error_state in SGE_ERROR_STATES):
8888
# Consider the node down if it's in disabled state and there is no job running
8989
if SGE_DISABLED_STATE in node.state and not has_jobs(hostname):
90-
log.warning(
91-
(
92-
"Considering node as down because there is no job running and node is in a disabled state. "
93-
"The node could have been put into this disabled state automatically by ParallelCluster "
94-
"in response to an EC2 scheduled maintenance event, or manually by the system administrator."
95-
)
96-
)
90+
log.warning(TREAT_DISABLED_AS_DOWN_WARNING)
9791
return True
9892
return False
9993
except Exception as e:

src/nodewatcher/plugins/slurm.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
lock_node,
2121
unlock_node,
2222
)
23-
from common.utils import check_command_output
23+
from common.utils import TREAT_DISABLED_AS_DOWN_WARNING, check_command_output
2424

2525
log = logging.getLogger(__name__)
2626

@@ -61,7 +61,7 @@ def has_pending_jobs(instance_properties, max_size):
6161

6262

6363
def lock_host(hostname, unlock=False):
64-
if unlock_node:
64+
if unlock:
6565
unlock_node(hostname)
6666
else:
6767
lock_node(hostname)
@@ -76,13 +76,7 @@ def is_node_down():
7676
if output and all(state not in output for state in SLURM_NODE_ERROR_STATES):
7777
return False
7878
if output and "drained" in output:
79-
log.warning(
80-
(
81-
"Considering node as down because there is no job running and node is in a disabled state. "
82-
"The node could have been put into this disabled state automatically by ParallelCluster in "
83-
"response to an EC2 scheduled maintenance event, or manually by the system administrator."
84-
)
85-
)
79+
log.warning(TREAT_DISABLED_AS_DOWN_WARNING)
8680
except Exception as e:
8781
log.error("Failed when checking if node is down with exception %s. Reporting node as down.", e)
8882

src/nodewatcher/plugins/torque.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
get_pending_jobs_info,
2222
lock_node,
2323
)
24-
from common.utils import check_command_output
24+
from common.utils import TREAT_DISABLED_AS_DOWN_WARNING, check_command_output
2525

2626
log = logging.getLogger(__name__)
2727

@@ -70,13 +70,7 @@ def is_node_down():
7070
# Consider the node down if it is in Disabled state placed by scheduled event
7171
# and does not have job
7272
if TORQUE_NODE_DISABLED_STATE in node.state and not has_jobs(hostname):
73-
log.warning(
74-
(
75-
"Considering node as down because there is no job running and node is in a disabled state. "
76-
"The node could have been put into this disabled state automatically by ParallelCluster in "
77-
"response to an EC2 scheduled maintenance event, or manually by the system administrator."
78-
)
79-
)
73+
log.warning(TREAT_DISABLED_AS_DOWN_WARNING)
8074
return True
8175
return False
8276
else:

src/sqswatcher/plugins/sge.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
remove_hosts_from_group,
2424
remove_hosts_from_queue,
2525
)
26-
from common.utils import EventType
26+
from common.utils import POSSIBLE_LOCK_CONFLICT_WARNING, EventType
2727

2828
log = logging.getLogger(__name__)
2929

@@ -124,11 +124,7 @@ def perform_health_actions(health_events):
124124
try:
125125
if _is_node_locked(event.host.hostname):
126126
log.warning(
127-
"Instance %s/%s currently in disabled state 'd'. "
128-
"Risk of lock being released by nodewatcher if locking the node because of scheduled event now. "
129-
"Marking event as failed to retry later.",
130-
event.host.instance_id,
131-
event.host.hostname,
127+
POSSIBLE_LOCK_CONFLICT_WARNING, event.host.instance_id, event.host.hostname, SGE_DISABLED_STATE,
132128
)
133129
failed.append(event)
134130
continue

src/sqswatcher/plugins/slurm.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
from common.remote_command_executor import RemoteCommandExecutor
2626
from common.schedulers.slurm_commands import SLURM_NODE_DISABLED_STATES, get_node_state, lock_node
27-
from common.utils import EventType, run_command
27+
from common.utils import POSSIBLE_LOCK_CONFLICT_WARNING, EventType, run_command
2828

2929
log = logging.getLogger(__name__)
3030

@@ -231,11 +231,10 @@ def perform_health_actions(health_events):
231231
try:
232232
if _is_node_locked(event.host.hostname):
233233
log.warning(
234-
"Instance %s/%s currently in disabled state 'draining/drained'. "
235-
"Risk of lock being released by nodewatcher if locking the node because of scheduled event now. "
236-
"Marking event as failed to retry later.",
234+
POSSIBLE_LOCK_CONFLICT_WARNING,
237235
event.host.instance_id,
238236
event.host.hostname,
237+
SLURM_NODE_DISABLED_STATES,
239238
)
240239
failed.append(event)
241240
continue

src/sqswatcher/plugins/torque.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
update_cluster_limits,
2424
wakeup_scheduler,
2525
)
26-
from common.utils import EventType
26+
from common.utils import POSSIBLE_LOCK_CONFLICT_WARNING, EventType
2727

2828
log = logging.getLogger(__name__)
2929

@@ -75,11 +75,10 @@ def perform_health_actions(health_events):
7575
try:
7676
if _is_node_locked(event.host.hostname):
7777
log.warning(
78-
"Instance %s/%s currently in disabled state 'offline'. "
79-
"Risk of lock being released by nodewatcher if locking the node because of scheduled event now. "
80-
"Marking event as failed to retry later.",
78+
POSSIBLE_LOCK_CONFLICT_WARNING,
8179
event.host.instance_id,
8280
event.host.hostname,
81+
TORQUE_NODE_DISABLED_STATE,
8382
)
8483
failed.append(event)
8584
continue

tests/nodewatcher/plugins/test_sge.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from assertpy import assert_that
1414
from common.schedulers.sge_commands import SGE_HOLD_STATE, SgeHost, SgeJob
15-
from nodewatcher.plugins.sge import has_jobs, has_pending_jobs, is_node_down
15+
from nodewatcher.plugins.sge import has_jobs, has_pending_jobs, is_node_down, lock_host
1616

1717

1818
@pytest.mark.parametrize(
@@ -184,3 +184,15 @@ def test_has_jobs(jobs, expected_result, mocker):
184184

185185
assert_that(has_jobs(hostname)).is_equal_to(expected_result)
186186
mock.assert_called_with(hostname_filter=hostname, job_state_filter="rs")
187+
188+
189+
@pytest.mark.parametrize(
190+
"hostname, unlock", [("ip-10-0-0-166", False), ("ip-10-0-0-166", True)],
191+
)
192+
def test_lock_host(hostname, unlock, mocker):
193+
if unlock:
194+
mock = mocker.patch("nodewatcher.plugins.sge.unlock_node", autospec=True)
195+
else:
196+
mock = mocker.patch("nodewatcher.plugins.sge.lock_node", autospec=True)
197+
lock_host(hostname, unlock)
198+
mock.assert_called_with(hostname)

tests/nodewatcher/plugins/test_slurm.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from assertpy import assert_that
1414
from common.schedulers.slurm_commands import PENDING_RESOURCES_REASONS, SlurmJob
15-
from nodewatcher.plugins.slurm import has_pending_jobs, is_node_down
15+
from nodewatcher.plugins.slurm import has_pending_jobs, is_node_down, lock_host
1616

1717

1818
@pytest.mark.parametrize(
@@ -88,3 +88,15 @@ def test_is_node_down(hostname, get_node_state_output, expected_result, mocker):
8888

8989
assert_that(is_node_down()).is_equal_to(expected_result)
9090
mock.assert_called_with(hostname)
91+
92+
93+
@pytest.mark.parametrize(
94+
"hostname, unlock", [("ip-10-0-0-166", False), ("ip-10-0-0-166", True)],
95+
)
96+
def test_lock_host(hostname, unlock, mocker):
97+
if unlock:
98+
mock = mocker.patch("nodewatcher.plugins.slurm.unlock_node", autospec=True)
99+
else:
100+
mock = mocker.patch("nodewatcher.plugins.slurm.lock_node", autospec=True)
101+
lock_host(hostname, unlock)
102+
mock.assert_called_with(hostname)

tests/nodewatcher/plugins/test_torque.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from assertpy import assert_that
1414
from common.schedulers.torque_commands import TorqueHost, TorqueJob, TorqueResourceList
15-
from nodewatcher.plugins.torque import has_jobs, has_pending_jobs, is_node_down
15+
from nodewatcher.plugins.torque import has_jobs, has_pending_jobs, is_node_down, lock_host
1616

1717

1818
@pytest.mark.parametrize(
@@ -171,3 +171,12 @@ def test_has_jobs(jobs, expected_result, mocker):
171171

172172
assert_that(has_jobs(hostname)).is_equal_to(expected_result)
173173
mock.assert_called_with(filter_by_exec_hosts={hostname.split(".")[0]}, filter_by_states=["R", "S"])
174+
175+
176+
@pytest.mark.parametrize(
177+
"hostname, unlock", [("ip-10-0-0-166", False), ("ip-10-0-0-166", True)],
178+
)
179+
def test_lock_host(hostname, unlock, mocker):
180+
mock = mocker.patch("nodewatcher.plugins.torque.lock_node", autospec=True)
181+
lock_host(hostname, unlock)
182+
mock.assert_called_with(hostname=hostname, unlock=unlock)

0 commit comments

Comments
 (0)