Skip to content
49 changes: 27 additions & 22 deletions amlb/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ def __init__(
self.framework_def, self.framework_name = framework, framework.name
log.debug("Using framework definition: %s.", self.framework_def)

self.constraint_def, self.constraint_name = rget().constraint_definition(
constraint_name
task_constraint = rget().constraint_definition(constraint_name)
self.constraint_def, self.constraint_name = (
task_constraint,
task_constraint.name,
)
log.debug("Using constraint definition: %s.", self.constraint_def)

Expand Down Expand Up @@ -619,33 +621,32 @@ def handle_unfulfilled(message, on_auto="warn"):
os_recommended_mem = ns.get(
rconfig(), f"{mode}.os_mem_size_mb", rconfig().benchmarks.os_mem_size_mb
)
left_for_app_mem = int(sys_mem.available - os_recommended_mem)
assigned_mem = round(
self.max_mem_size_mb
if self.max_mem_size_mb > 0
else left_for_app_mem
if left_for_app_mem > 0
else sys_mem.available
)

if self.max_mem_size_mb <= 0:
left_for_app_mem = int(sys_mem.available - os_recommended_mem)
self.max_mem_size_mb = (
left_for_app_mem if left_for_app_mem > 0 else sys_mem.available
)
self.max_mem_size_mb = round(self.max_mem_size_mb)

if self.max_mem_size_mb > sys_mem.total:
raise JobError(
f"Total system memory {sys_mem.total} MB does not meet requirements (max_mem_size_mb={self.max_mem_size_mb} MB)!.",
)

log.info(
"Assigning %.f MB (total=%.f MB) for new %s task.",
assigned_mem,
self.max_mem_size_mb,
sys_mem.total,
self.name,
)
self.max_mem_size_mb = assigned_mem
if assigned_mem > sys_mem.total:
handle_unfulfilled(
f"Total system memory {sys_mem.total} MB does not meet requirements ({assigned_mem} MB)!.",
on_auto="fail",
)
elif assigned_mem > sys_mem.available:
if self.max_mem_size_mb > sys_mem.available:
handle_unfulfilled(
f"Assigned memory ({assigned_mem} MB) exceeds system available memory ({sys_mem.available} MB / total={sys_mem.total} MB)!"
f"Assigned memory ({self.max_mem_size_mb} MB) exceeds system available memory ({sys_mem.available} MB / total={sys_mem.total} MB)!"
)
elif assigned_mem > sys_mem.total - os_recommended_mem:
elif self.max_mem_size_mb > sys_mem.total - os_recommended_mem:
handle_unfulfilled(
f"Assigned memory ({assigned_mem} MB) is within {sys_mem.available} MB of system total memory {sys_mem.total} MB): "
f"Assigned memory ({self.max_mem_size_mb} MB) is within {sys_mem.available} MB of system total memory {sys_mem.total} MB): "
f"We recommend a {os_recommended_mem} MB buffer, otherwise OS memory usage might interfere with the benchmark task."
)

Expand All @@ -654,7 +655,11 @@ def handle_unfulfilled(message, on_auto="warn"):
os_recommended_vol = rconfig().benchmarks.os_vol_size_mb
if self.min_vol_size_mb > sys_vol.free:
handle_unfulfilled(
f"Available storage ({sys_vol.free} MB / total={sys_vol.total} MB) does not meet requirements ({self.min_vol_size_mb+os_recommended_vol} MB)!"
f"Available storage ({sys_vol.free} MB / total={sys_vol.total} MB) does not meet requirements (min_vol_size_mb={self.min_vol_size_mb} MB)!"
)
elif self.min_vol_size_mb > sys_vol.free + os_recommended_vol:
handle_unfulfilled(
f"Required storage min_vol_size_mb ({self.min_vol_size_mb}MB) together with recommended storage for OS ({os_recommended_vol} MB exceeds available storage ({sys_vol.free} MB)."
)


Expand Down
6 changes: 3 additions & 3 deletions amlb/benchmarks/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def benchmark_load(
name, benchmark_definition_dirs
)

hard_defaults = next((task for task in tasks if task.name == "__defaults__"), None)
tasks = [task for task in tasks if task is not hard_defaults]
file_defaults = next((task for task in tasks if task.name == "__defaults__"), None)
tasks = [task for task in tasks if task is not file_defaults]
for t in tasks:
t.name = str_sanitize(t.name)
return hard_defaults, tasks, benchmark_path, str_sanitize(benchmark_name)
return file_defaults, tasks, benchmark_path, str_sanitize(benchmark_name)
21 changes: 21 additions & 0 deletions amlb/frameworks/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,24 @@ def load_framework_definition(
framework_name, tag = framework_name.split(":", 1)
definition_ns, name = configuration.framework_definition(framework_name, tag)
return Framework(**Namespace.dict(definition_ns))


@dataclass
class TaskConstraint:
name: str
folds: int
max_runtime_seconds: int
cores: int
min_vol_size_mb: int | None = None
ec2_volume_type: str | None = None


@dataclass
class Task(TaskConstraint):
dataset: Namespace | None = None # TODO: Specify file dataset description
enabled: bool = True
description: str = ""
openml_task_id: int | None = None
metric: str | list[str] | None = None
# Specific to time series
quantile_levels: list[float] | None = None
163 changes: 88 additions & 75 deletions amlb/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import copy
import dataclasses
import logging
import os
import random
Expand All @@ -14,6 +15,7 @@

from amlb.benchmarks.parser import benchmark_load
from amlb.frameworks import default_tag, load_framework_definitions
from .frameworks.definitions import TaskConstraint
from .utils import (
Namespace,
lazy_property,
Expand Down Expand Up @@ -171,7 +173,7 @@ def _frameworks(self):
return load_framework_definitions(frameworks_file, self.config)

@memoize
def constraint_definition(self, name):
def constraint_definition(self, name: str) -> TaskConstraint:
"""
:param name: name of the benchmark constraint definition as defined in the constraints file
:return: a Namespace object with the constraint config (folds, cores, max_runtime_seconds, ...) for the current benchmamk run.
Expand All @@ -183,7 +185,7 @@ def constraint_definition(self, name):
name, self.config.benchmarks.constraints_file
)
)
return constraint, constraint.name
return TaskConstraint(**Namespace.dict(constraint))

@lazy_property
def _constraints(self):
Expand All @@ -205,42 +207,44 @@ def _constraints(self):
constraints_lookup[name.lower()] = c
return constraints_lookup

# @memoize
def benchmark_definition(self, name, defaults=None):
def benchmark_definition(self, name: str, defaults: TaskConstraint | None = None):
return self._benchmark_definition(name, self.config, defaults)

def _benchmark_definition(
self,
name: str,
config_: Namespace,
defaults_for_task: TaskConstraint | None = None,
):
"""
:param name: name of the benchmark as defined by resources/benchmarks/{name}.yaml, the path to a user-defined benchmark description file or a study id.
:param defaults: defaults used as a base config for each task in the benchmark definition
:return:
"""
hard_defaults, tasks, benchmark_path, benchmark_name = benchmark_load(
name, self.config.benchmarks.definition_dir
file_defaults, tasks, benchmark_path, benchmark_name = benchmark_load(
name, config_.benchmarks.definition_dir
)

defaults = None
if defaults_for_task is not None:
defaults = Namespace(**dataclasses.asdict(defaults_for_task))
defaults = Namespace.merge(
defaults, hard_defaults, Namespace(name="__defaults__")
defaults, file_defaults, Namespace(name="__defaults__")
)
for task in tasks:
task |= defaults # add missing keys from hard defaults + defaults
self._validate_task(task)
Resources._validate_task(task)
Resources._add_task_defaults(task, config_)

self._validate_task(defaults, lenient=True)
Resources._add_task_defaults(defaults, config_)
defaults.enabled = False
tasks.append(defaults)
log.debug("Available task definitions:\n%s", tasks)
return tasks, benchmark_name, benchmark_path

def _validate_task(self, task, lenient=False):
missing = []
for conf in ["name"]:
if task[conf] is None:
missing.append(conf)
if not lenient and len(missing) > 0:
raise ValueError(
"{missing} mandatory properties as missing in task definition {taskdef}.".format(
missing=missing, taskdef=task
)
)

@staticmethod
def _add_task_defaults(task: Namespace, config_: Namespace):
if task["id"] is None:
task["id"] = Resources.generate_task_identifier(task)
for conf in [
"max_runtime_seconds",
"cores",
Expand All @@ -250,75 +254,84 @@ def _validate_task(self, task, lenient=False):
"quantile_levels",
]:
if task[conf] is None:
task[conf] = self.config.benchmarks.defaults[conf]
task[conf] = config_.benchmarks.defaults[conf]
log.debug(
"Config `{config}` not set for task {name}, using default `{value}`.".format(
config=conf, name=task.name, value=task[conf]
config=conf, name=task["name"], value=task[conf]
)
)

conf = "id"
if task[conf] is None:
task[conf] = (
"openml.org/t/{}".format(task.openml_task_id)
if task["openml_task_id"] is not None
else "openml.org/d/{}".format(task.openml_dataset_id)
if task["openml_dataset_id"] is not None
else (
(
task.dataset["id"]
if isinstance(task.dataset, (dict, Namespace))
else task.dataset
if isinstance(task.dataset, str)
else None
)
or task.name
)
if task["dataset"] is not None
else None
)
if not lenient and task[conf] is None:
raise ValueError(
"task definition must contain an ID or one property "
"among ['openml_task_id', 'dataset'] to create an ID, "
"but task definition is {task}".format(task=str(task))
)
if task["metric"] is None:
task["metric"] = None

conf = "metric"
if task[conf] is None:
task[conf] = None

conf = "ec2_instance_type"
if task[conf] is None:
i_series = self.config.aws.ec2.instance_type.series
i_map = self.config.aws.ec2.instance_type.map
if str(task.cores) in i_map:
i_size = i_map[str(task.cores)]
elif task.cores > 0:
supported_cores = list(
map(int, Namespace.dict(i_map).keys() - {"default"})
)
supported_cores.sort()
cores = next((c for c in supported_cores if c >= task.cores), "default")
i_size = i_map[str(cores)]
else:
i_size = i_map.default
task[conf] = ".".join([i_series, i_size])
if task["ec2_instance_type"] is None:
task["ec2_instance_type"] = Resources.lookup_ec2_instance_type(
config_, task.cores
)
log.debug(
"Config `{config}` not set for task {name}, using default selection `{value}`.".format(
config=conf, name=task.name, value=task[conf]
config=conf, name=task["name"], value=task["ec2_instance_type"]
)
)

conf = "ec2_volume_type"
if task[conf] is None:
task[conf] = self.config.aws.ec2.volume_type
if task["ec2_volume_type"] is None:
task["ec2_volume_type"] = config_.aws.ec2.volume_type
log.debug(
"Config `{config}` not set for task {name}, using default `{value}`.".format(
config=conf, name=task.name, value=task[conf]
config=conf, name=task["name"], value=task["ec2_volume_type"]
)
)

@staticmethod
def _validate_task(task: Namespace) -> None:
"""Raises ValueError if task does not have a name and a way to generate an identifier."""
if task["name"] is None:
raise ValueError(
f"`name` is mandatory but missing in task definition {task}."
)
task_id = Namespace.get(task, "id", Resources.generate_task_identifier(task))
if task_id is None:
raise ValueError(
"task definition must contain an ID or one property "
"among ['openml_task_id', 'dataset'] to create an ID, "
"but task definition is {task}".format(task=str(task))
)

@staticmethod
def lookup_ec2_instance_type(config_: Namespace, cores: int) -> str:
i_series = config_.aws.ec2.instance_type.series
i_map = config_.aws.ec2.instance_type.map
i_size = Resources.lookup_suitable_instance_size(i_map, cores)
return f"{i_series}.{i_size}"

@staticmethod
def lookup_suitable_instance_size(cores_to_size: Namespace, cores: int) -> str:
if str(cores) in cores_to_size:
return cores_to_size[str(cores)]

supported_cores = list(map(int, set(dir(cores_to_size)) - {"default"}))
if cores <= 0 or cores > max(supported_cores):
return cores_to_size.default

best_match = next(
(str(c) for c in sorted(supported_cores) if c >= cores), "default"
)
return cores_to_size[best_match]

@staticmethod
def generate_task_identifier(task: Namespace) -> str | None:
if task["openml_task_id"] is not None:
return f"openml.org/t/{task.openml_task_id}"
if task["openml_dataset_id"] is not None:
return f"openml.org/d/{task.openml_dataset_id}"
if task["dataset"] is None:
return None
if isinstance(task.dataset, (dict, Namespace)):
return task.dataset["id"]
if isinstance(task.dataset, str):
return task.dataset
return task.name


__INSTANCE__: Resources | None = None

Expand Down
8 changes: 5 additions & 3 deletions frameworks/FEDOT/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ def run_fedot_tabular(dataset: Dataset, config: TaskConfig):
__file__, "exec.py", input_data=data, dataset=dataset, config=config
)


def run_fedot_timeseries(dataset: Dataset, config: TaskConfig):
from frameworks.shared.caller import run_in_venv

dataset = deepcopy(dataset)

data = dict(
Expand All @@ -43,6 +45,6 @@ def run_fedot_timeseries(dataset: Dataset, config: TaskConfig):
repeated_item_id=dataset.repeated_item_id,
)

return run_in_venv(__file__, "exec_ts.py",
input_data=data, dataset=dataset, config=config)

return run_in_venv(
__file__, "exec_ts.py", input_data=data, dataset=dataset, config=config
)
Loading
Loading