Skip to content

feat(supervisor): Add tailing logs function #257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 25, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions devservices/utils/supervisor.py
Original file line number Diff line number Diff line change
@@ -6,12 +6,31 @@
import socket
import subprocess
import xmlrpc.client
from enum import IntEnum

from devservices.constants import DEVSERVICES_SUPERVISOR_CONFIG_DIR
from devservices.exceptions import SupervisorConfigError
from devservices.exceptions import SupervisorConnectionError
from devservices.exceptions import SupervisorError
from devservices.exceptions import SupervisorProcessError
from devservices.utils.console import Console


class SupervisorProcessState(IntEnum):
"""
Supervisor process states.
https://supervisord.org/subprocess.html#process-states
"""

STOPPED = 0
STARTING = 10
RUNNING = 20
BACKOFF = 30
STOPPING = 40
EXITED = 100
FATAL = 200
UNKNOWN = 1000


class UnixSocketHTTPConnection(http.client.HTTPConnection):
@@ -101,6 +120,21 @@
f"Failed to connect to supervisor XML-RPC server: {e.errmsg}"
)

def _is_program_running(self, program_name: str) -> bool:
try:
client = self._get_rpc_client()
process_info = client.supervisor.getProcessInfo(program_name)
if not isinstance(process_info, dict):
return False

state = process_info.get("state")
if not isinstance(state, int):
return False

Check warning on line 132 in devservices/utils/supervisor.py

Codecov / codecov/patch

devservices/utils/supervisor.py#L132

Added line #L132 was not covered by tests
return state == SupervisorProcessState.RUNNING
except xmlrpc.client.Fault:
# If we can't get the process info, assume it's not running
return False

def start_supervisor_daemon(self) -> None:
try:
subprocess.run(["supervisord", "-c", self.config_file_path], check=True)
@@ -118,6 +152,8 @@
raise SupervisorError(f"Failed to stop supervisor: {e.faultString}")

def start_program(self, program_name: str) -> None:
if self._is_program_running(program_name):
return
try:
self._get_rpc_client().supervisor.startProcess(program_name)
except xmlrpc.client.Fault as e:
@@ -126,9 +162,35 @@
)

def stop_program(self, program_name: str) -> None:
if not self._is_program_running(program_name):
return
try:
self._get_rpc_client().supervisor.stopProcess(program_name)
except xmlrpc.client.Fault as e:
raise SupervisorProcessError(
f"Failed to stop program {program_name}: {e.faultString}"
)

def tail_program_logs(self, program_name: str) -> None:
if not self._is_program_running(program_name):
console = Console()
console.failure(f"Program {program_name} is not running")
return

try:
# Use supervisorctl tail command
subprocess.run(
[
"supervisorctl",
"-c",
self.config_file_path,
"tail",
"-f",
program_name,
],
check=True,
)
except subprocess.CalledProcessError as e:
raise SupervisorError(f"Failed to tail logs for {program_name}: {str(e)}")
except KeyboardInterrupt:
pass
159 changes: 159 additions & 0 deletions tests/utils/test_supervisor.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
from devservices.exceptions import SupervisorError
from devservices.exceptions import SupervisorProcessError
from devservices.utils.supervisor import SupervisorManager
from devservices.utils.supervisor import SupervisorProcessState
from devservices.utils.supervisor import UnixSocketHTTPConnection
from devservices.utils.supervisor import UnixSocketTransport

@@ -112,6 +113,50 @@ def test_get_rpc_client_failure(
assert transport_arg.socket_path == supervisor_manager.socket_path


@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_is_program_running_success(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.RUNNING
}
assert supervisor_manager._is_program_running("test_program")


@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_is_program_running_program_not_running(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.STOPPED
}
assert not supervisor_manager._is_program_running("test_program")


@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_is_program_running_typing_error(
mock_rpc_client: mock.MagicMock,
supervisor_manager: SupervisorManager,
capsys: pytest.CaptureFixture[str],
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = 1
assert not supervisor_manager._is_program_running("test_program")
mock_rpc_client.return_value.supervisor.getProcessInfo.side_effect = {
"state": [SupervisorProcessState.STOPPED]
}
assert not supervisor_manager._is_program_running("test_program")


@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_is_program_running_failure(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.side_effect = (
xmlrpc.client.Fault(1, "Error")
)
assert not supervisor_manager._is_program_running("test_program")


@mock.patch("devservices.utils.supervisor.subprocess.run")
def test_start_supervisor_daemon_success(
mock_subprocess_run: mock.MagicMock, supervisor_manager: SupervisorManager
@@ -163,6 +208,9 @@ def test_stop_supervisor_daemon_failure(
def test_start_program_success(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.STOPPED
}
supervisor_manager.start_program("test_program")
supervisor_manager._get_rpc_client().supervisor.startProcess.assert_called_once_with(
"test_program"
@@ -173,17 +221,34 @@ def test_start_program_success(
def test_start_program_failure(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.STOPPED
}
mock_rpc_client.return_value.supervisor.startProcess.side_effect = (
xmlrpc.client.Fault(1, "Error")
)
with pytest.raises(SupervisorProcessError):
supervisor_manager.start_program("test_program")


@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_start_program_already_running(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.RUNNING
}
supervisor_manager.start_program("test_program")
mock_rpc_client.supervisor.startProcess.assert_not_called()


@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_stop_program_success(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.RUNNING
}
supervisor_manager.stop_program("test_program")
supervisor_manager._get_rpc_client().supervisor.stopProcess.assert_called_once_with(
"test_program"
@@ -194,13 +259,27 @@ def test_stop_program_success(
def test_stop_program_failure(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.RUNNING
}
mock_rpc_client.return_value.supervisor.stopProcess.side_effect = (
xmlrpc.client.Fault(1, "Error")
)
with pytest.raises(SupervisorProcessError):
supervisor_manager.stop_program("test_program")


@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_stop_program_not_running(
mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.STOPPED
}
supervisor_manager.stop_program("test_program")
mock_rpc_client.supervisor.stopProcess.assert_not_called()


def test_extend_config_file(
supervisor_manager: SupervisorManager, tmp_path: Path
) -> None:
@@ -227,3 +306,83 @@ def test_extend_config_file(
"""
)


@mock.patch("devservices.utils.supervisor.subprocess.run")
@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def tail_program_logs_success(
mock_rpc_client: mock.MagicMock,
mock_subprocess_run: mock.MagicMock,
supervisor_manager: SupervisorManager,
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.RUNNING
}
supervisor_manager.tail_program_logs("test_program")
mock_subprocess_run.assert_called_once_with(
[
"supervisorctl",
"-c",
supervisor_manager.config_file_path,
"fg",
"test_program",
],
check=True,
)


@mock.patch("devservices.utils.supervisor.subprocess.run")
@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_tail_program_logs_not_running(
mock_rpc_client: mock.MagicMock,
mock_subprocess_run: mock.MagicMock,
supervisor_manager: SupervisorManager,
capsys: pytest.CaptureFixture[str],
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.STOPPED
}
supervisor_manager.tail_program_logs("test_program")
captured = capsys.readouterr()
assert "Program test_program is not running" in captured.out
mock_subprocess_run.assert_not_called()


@mock.patch("devservices.utils.supervisor.subprocess.run")
@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_tail_program_logs_failure(
mock_rpc_client: mock.MagicMock,
mock_subprocess_run: mock.MagicMock,
supervisor_manager: SupervisorManager,
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.RUNNING
}
mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, "supervisorctl")
with pytest.raises(SupervisorError, match="Failed to tail logs for test_program"):
supervisor_manager.tail_program_logs("test_program")


@mock.patch("devservices.utils.supervisor.subprocess.run")
@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy")
def test_tail_program_logs_keyboard_interrupt(
mock_rpc_client: mock.MagicMock,
mock_subprocess_run: mock.MagicMock,
supervisor_manager: SupervisorManager,
) -> None:
mock_rpc_client.return_value.supervisor.getProcessInfo.return_value = {
"state": SupervisorProcessState.RUNNING
}
mock_subprocess_run.side_effect = KeyboardInterrupt()
supervisor_manager.tail_program_logs("test_program")
mock_subprocess_run.assert_called_once_with(
[
"supervisorctl",
"-c",
supervisor_manager.config_file_path,
"tail",
"-f",
"test_program",
],
check=True,
)