-
Notifications
You must be signed in to change notification settings - Fork 15
Auto-track and globally shut down all Forge actors and services #357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cc5fb5a
9cc5531
84dcd17
8ba3e5f
5b8d81c
5fe99d5
36225af
d705df9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,8 @@ | |
import socket | ||
import uuid | ||
|
||
from typing import Any | ||
|
||
from monarch._src.actor.shape import NDSlice, Shape | ||
from monarch.actor import Actor, endpoint, HostMesh, ProcMesh, this_host | ||
from monarch.tools import commands | ||
|
@@ -131,6 +133,8 @@ def __init__(self, cfg: ProvisionerConfig | None = None): | |
if not self.launcher: | ||
logger.warning("Launcher not provided, remote allocations will not work.") | ||
|
||
self._allocations: list[Any] = [] # all live actor/service instances | ||
|
||
async def initialize(self): | ||
"""Call this after creating the instance""" | ||
if self.launcher is not None: | ||
|
@@ -302,8 +306,40 @@ async def stop_proc_mesh(self, proc_mesh: ProcMesh): | |
commands.kill(server_name) | ||
del self._proc_host_map[proc_mesh] | ||
|
||
async def track_allocation(self, alloc: Any): | ||
"""Tracks an allocation for cleanup.""" | ||
self._allocations.append(alloc) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm, I think an even simpler approach is to just track the proc meshes right? We can just do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would work for actor since shutting down actor is essentially stopping the |
||
|
||
async def shutdown_all_allocations(self): | ||
"""Gracefully shut down all tracked actors and services.""" | ||
from monarch._src.actor.actor_mesh import ActorMesh | ||
|
||
from forge.controller.actor import ForgeActor | ||
from forge.controller.service import ServiceInterface | ||
|
||
for alloc in reversed(self._allocations): | ||
try: | ||
# --- ServiceInterface --- | ||
if isinstance(alloc, ServiceInterface): | ||
await alloc.shutdown() | ||
|
||
# --- Actor instance (ForgeActor or underlying ActorMesh) --- | ||
elif isinstance(alloc, (ForgeActor, ActorMesh)): | ||
# Get the class to call shutdown on (ForgeActor or its bound class) | ||
actor_cls = getattr(alloc, "_class", None) or alloc.__class__ | ||
await actor_cls.shutdown(alloc) | ||
|
||
else: | ||
logger.warning(f"Unknown allocation type: {type(alloc)}") | ||
|
||
except Exception as e: | ||
logger.warning(f"Failed to shut down {alloc}: {e}") | ||
|
||
self._allocations.clear() | ||
|
||
async def shutdown(self): | ||
"""Tears down all remaining remote allocations.""" | ||
await self.shutdown_all_allocations() | ||
async with self._lock: | ||
for server_name in self._server_names: | ||
commands.kill(server_name) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this quiet check because otherwise when we shutdown a service, it would call actor.shutdown and print the log twice.