From b4235fb01ace6e464b84e90c1d8d8cf79e932253 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Wed, 22 Jan 2025 08:17:33 +0100 Subject: [PATCH 1/4] feat: writing error to stderr --- .github/workflows/post_to_mastodon.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/post_to_mastodon.sh b/.github/workflows/post_to_mastodon.sh index 20a65a2..045e87f 100644 --- a/.github/workflows/post_to_mastodon.sh +++ b/.github/workflows/post_to_mastodon.sh @@ -2,7 +2,7 @@ # Extract version from PR tag passed as environment variable if [ -z "${PR_TITLE}" ]; then # apparently unset, workflow broken? - echo "Error: 'PR_TITLE' environment variable is not set." + >&2 echo "Error: 'PR_TITLE' environment variable is not set." exit 1 fi version="${PR_TITLE##* }" From 27b86103519cc25aef2269b3f0f2918be3672e5b Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Sun, 9 Mar 2025 12:15:53 +0100 Subject: [PATCH 2/4] feat: added code to report job efficiency - to be tested --- .../efficiency_report.py | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 snakemake_executor_plugin_slurm/efficiency_report.py diff --git a/snakemake_executor_plugin_slurm/efficiency_report.py b/snakemake_executor_plugin_slurm/efficiency_report.py new file mode 100644 index 0000000..77506d9 --- /dev/null +++ b/snakemake_executor_plugin_slurm/efficiency_report.py @@ -0,0 +1,122 @@ +import pandas as pd +import subprocess +import re + + +def time_to_seconds(time_str): + """Convert SLURM time format to seconds.""" + if pd.isna(time_str) or time_str.strip() == "": + return 0 + parts = time_str.split(":") + parts = [int(p) for p in parts] + if len(parts) == 3: # H:M:S + return parts[0] * 3600 + parts[1] * 60 + parts[2] + elif len(parts) == 2: # M:S + return parts[0] * 60 + parts[1] + elif len(parts) == 1: # S + return parts[0] + return 0 + + +def parse_maxrss(maxrss): + """Convert MaxRSS to MB.""" + if pd.isna(maxrss) or maxrss.strip() == "" or maxrss == "0": + return 0 + match = re.match(r"(\d+)([KMG]?)", maxrss) + if match: + value, unit = match.groups() + value = int(value) + unit_multipliers = {"K": 1 / 1024, "M": 1, "G": 1024} + return value * unit_multipliers.get(unit, 1) + return 0 + + +def parse_reqmem(reqmem): + """Convert requested memory to MB.""" + if pd.isna(reqmem) or reqmem.strip() == "": + return 0 + match = re.match( + r"(\d+)([KMG])?(\S+)?", reqmem + ) # Handles "4000M" or "4G" or "2G/node" + if match: + value, unit, per_unit = match.groups() + value = int(value) + unit_multipliers = {"K": 1 / 1024, "M": 1, "G": 1024} + mem_mb = value * unit_multipliers.get(unit, 1) + if per_unit == "/node": + return mem_mb # Memory is per node + return mem_mb # Default case (per CPU or total) + return 0 + + +def fetch_sacct_data(run_uuid, logger, efficiency_threshold=0.8): + """Fetch sacct job data for a Snakemake workflow and compute efficiency metrics.""" + + cmd = [ + "sacct", + f"--name={run_uuid}", + "--format=JobID,JobName,Comment,Elapsed,TotalCPU,NNodes,NCPUS,MaxRSS,ReqMem", + "--parsable2", + "--noheader", + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + lines = result.stdout.strip().split("\n") + except subprocess.CalledProcessError: + logger.warning(f"Error: Failed to retrieve job data for workflow ({run_uuid}).") + return None + + # Convert to DataFrame + df = pd.DataFrame( + (l.split("|") for l in lines), + columns=[ + "JobID", + "JobName", + "Comment", + "Elapsed", + "TotalCPU", + "NNodes", + "NCPUS", + "MaxRSS", + "ReqMem", + ], + ) + + # Convert types + df["NNodes"] = pd.to_numeric(df["NNodes"], errors="coerce") + df["NCPUS"] = pd.to_numeric(df["NCPUS"], errors="coerce") + + # Convert time fields + df["Elapsed_sec"] = df["Elapsed"].apply(time_to_seconds) + df["TotalCPU_sec"] = df["TotalCPU"].apply(time_to_seconds) + + # Compute CPU efficiency + df["CPU Efficiency (%)"] = ( + df["TotalCPU_sec"] / (df["Elapsed_sec"] * df["NCPUS"]) + ) * 100 + df["CPU Efficiency (%)"] = df["CPU Efficiency (%)"].fillna(0).round(2) + + # Convert MaxRSS + df["MaxRSS_MB"] = df["MaxRSS"].apply(parse_maxrss) + + # Convert ReqMem and calculate memory efficiency + df["RequestedMem_MB"] = df["ReqMem"].apply(parse_reqmem) + df["Memory Usage (%)"] = (df["MaxRSS_MB"] / df["RequestedMem_MB"]) * 100 + df["Memory Usage (%)"] = df["Memory Usage (%)"].fillna(0).round(2) + + # Log warnings for low efficiency + for _, row in df.iterrows(): + if row["CPU Efficiency (%)"] < efficiency_threshold: + logger.warning( + f"Job {row['JobID']} for rule '{row['Comment']}' ({row['JobName']})", + f" has low CPU efficiency: {row['CPU Efficiency (%)']}%." + ) + logfile = f"efficiency_report_{run_uuid}.log" + + logger.info(f"Saved efficiency evaluation to '{logfile}'") + return df + + +# Example usage: +#df = fetch_sacct_data("snakemake_workflow_uuid") # Replace with actual UUID From 7e6b369a4b8bbd0c3c31d5320407339aa7fcc412 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Mon, 10 Mar 2025 15:32:00 +0100 Subject: [PATCH 3/4] feat: calling the efficiency evaluator - needs to be tested --- snakemake_executor_plugin_slurm/__init__.py | 2 ++ snakemake_executor_plugin_slurm/efficiency_report.py | 6 +----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 9789ef5..2a53031 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -29,6 +29,7 @@ from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task from .utils import delete_slurm_environment, delete_empty_dirs +from .efficiency_report import fetch_sacct_data @dataclass @@ -124,6 +125,7 @@ def __post_init__(self): self._preemption_warning = False # no preemption warning has been issued self.slurm_logdir = None atexit.register(self.clean_old_logs) + atexit.register(self.fetch_sacct_data) def clean_old_logs(self) -> None: """Delete files older than specified age from the SLURM log directory.""" diff --git a/snakemake_executor_plugin_slurm/efficiency_report.py b/snakemake_executor_plugin_slurm/efficiency_report.py index 77506d9..967d1e3 100644 --- a/snakemake_executor_plugin_slurm/efficiency_report.py +++ b/snakemake_executor_plugin_slurm/efficiency_report.py @@ -113,10 +113,6 @@ def fetch_sacct_data(run_uuid, logger, efficiency_threshold=0.8): f" has low CPU efficiency: {row['CPU Efficiency (%)']}%." ) logfile = f"efficiency_report_{run_uuid}.log" + df.to_csv(logfile) logger.info(f"Saved efficiency evaluation to '{logfile}'") - return df - - -# Example usage: -#df = fetch_sacct_data("snakemake_workflow_uuid") # Replace with actual UUID From 4b671e3f752eaa69ee0bb70c651953d46da71843 Mon Sep 17 00:00:00 2001 From: meesters Date: Mon, 5 May 2025 14:02:15 +0200 Subject: [PATCH 4/4] feat: added helper function to calculate efficiency report and colorize terminal output from functions bound to atexit() --- .../efficiency_report.py | 79 ++----------------- snakemake_executor_plugin_slurm/utils.py | 21 +++++ 2 files changed, 26 insertions(+), 74 deletions(-) diff --git a/snakemake_executor_plugin_slurm/efficiency_report.py b/snakemake_executor_plugin_slurm/efficiency_report.py index 967d1e3..2dff9d4 100644 --- a/snakemake_executor_plugin_slurm/efficiency_report.py +++ b/snakemake_executor_plugin_slurm/efficiency_report.py @@ -1,6 +1,5 @@ -import pandas as pd -import subprocess import re +import pandas as pd def time_to_seconds(time_str): @@ -8,13 +7,14 @@ def time_to_seconds(time_str): if pd.isna(time_str) or time_str.strip() == "": return 0 parts = time_str.split(":") - parts = [int(p) for p in parts] + if len(parts) == 3: # H:M:S + parts = [int(p) for p in parts] return parts[0] * 3600 + parts[1] * 60 + parts[2] elif len(parts) == 2: # M:S - return parts[0] * 60 + parts[1] + return int(parts[0]) * 60 + float(parts[1]) elif len(parts) == 1: # S - return parts[0] + return float(parts[0]) return 0 @@ -47,72 +47,3 @@ def parse_reqmem(reqmem): return mem_mb # Memory is per node return mem_mb # Default case (per CPU or total) return 0 - - -def fetch_sacct_data(run_uuid, logger, efficiency_threshold=0.8): - """Fetch sacct job data for a Snakemake workflow and compute efficiency metrics.""" - - cmd = [ - "sacct", - f"--name={run_uuid}", - "--format=JobID,JobName,Comment,Elapsed,TotalCPU,NNodes,NCPUS,MaxRSS,ReqMem", - "--parsable2", - "--noheader", - ] - - try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - lines = result.stdout.strip().split("\n") - except subprocess.CalledProcessError: - logger.warning(f"Error: Failed to retrieve job data for workflow ({run_uuid}).") - return None - - # Convert to DataFrame - df = pd.DataFrame( - (l.split("|") for l in lines), - columns=[ - "JobID", - "JobName", - "Comment", - "Elapsed", - "TotalCPU", - "NNodes", - "NCPUS", - "MaxRSS", - "ReqMem", - ], - ) - - # Convert types - df["NNodes"] = pd.to_numeric(df["NNodes"], errors="coerce") - df["NCPUS"] = pd.to_numeric(df["NCPUS"], errors="coerce") - - # Convert time fields - df["Elapsed_sec"] = df["Elapsed"].apply(time_to_seconds) - df["TotalCPU_sec"] = df["TotalCPU"].apply(time_to_seconds) - - # Compute CPU efficiency - df["CPU Efficiency (%)"] = ( - df["TotalCPU_sec"] / (df["Elapsed_sec"] * df["NCPUS"]) - ) * 100 - df["CPU Efficiency (%)"] = df["CPU Efficiency (%)"].fillna(0).round(2) - - # Convert MaxRSS - df["MaxRSS_MB"] = df["MaxRSS"].apply(parse_maxrss) - - # Convert ReqMem and calculate memory efficiency - df["RequestedMem_MB"] = df["ReqMem"].apply(parse_reqmem) - df["Memory Usage (%)"] = (df["MaxRSS_MB"] / df["RequestedMem_MB"]) * 100 - df["Memory Usage (%)"] = df["Memory Usage (%)"].fillna(0).round(2) - - # Log warnings for low efficiency - for _, row in df.iterrows(): - if row["CPU Efficiency (%)"] < efficiency_threshold: - logger.warning( - f"Job {row['JobID']} for rule '{row['Comment']}' ({row['JobName']})", - f" has low CPU efficiency: {row['CPU Efficiency (%)']}%." - ) - logfile = f"efficiency_report_{run_uuid}.log" - df.to_csv(logfile) - - logger.info(f"Saved efficiency evaluation to '{logfile}'") diff --git a/snakemake_executor_plugin_slurm/utils.py b/snakemake_executor_plugin_slurm/utils.py index 695eb92..10cb770 100644 --- a/snakemake_executor_plugin_slurm/utils.py +++ b/snakemake_executor_plugin_slurm/utils.py @@ -102,3 +102,24 @@ def set_gres_string(job: JobExecutorInterface) -> str: # we assume here, that the validator ensures that the 'gpu_string' # is an integer return f" --gpus={gpu_string}" + +# We need this colorize function as a fall back for methods registering +# with atexit(): There the logger is not correctly available and we need +# to print colorized messages according to the intended logger level. +BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) +RESET_SEQ = "\033[0m" +COLOR_SEQ = "\033[%dm" +BOLD_SEQ = "\033[1m" + +colors = { + "DEBUG": BLUE, + "INFO": GREEN, + "WARNING": YELLOW, + "ERROR": RED, + "CRITICAL": MAGENTA, +} + +def colorize_message(level, message): + """Colorize a message based on the log level.""" + color = colors.get(level.upper(), WHITE) + return f"{COLOR_SEQ % (30 + color)}{message}{RESET_SEQ}"