Skip to content
64 changes: 64 additions & 0 deletions src/autogluon/bench/eval/aggregate/aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging

from autogluon.bench.eval.benchmark_context.output_suite_context import OutputSuiteContext
from autogluon.common.savers import save_pd

logger = logging.getLogger(__name__)


def aggregate(
s3_bucket: str,
module: str,
benchmark_name: str,
artifact: str | None = "results",
constraint: str | None = None,
include_infer_speed: bool = False,
mode: str = "ray",
) -> None:
"""
Aggregates objects across an agbenchmark. Functionality depends on artifact specified:

Params:
-------
s3_bucket: str
Name of the relevant s3_bucket
module: str
The name of the relevant autogluon module:
can be one of ['tabular', 'timeseries', 'multimodal']
benchmark_name: str
The name of the relevant benchmark that was run
artifact: str
The desired artifact to be aggregatedL can be one of ['results', 'learning_curves']
constraint: str
Name of constraint used in benchmark
include_infer_speed: bool
Include inference speed in aggregation.
mode: str
Can be one of ['seq', 'ray'].
If seq, runs sequentially.
If ray, utilizes parallelization.
"""
result_path = f"{module}/{benchmark_name}"
path_prefix = f"s3://{s3_bucket}/{result_path}/"
contains = f".{constraint}." if constraint else None

output_suite_context = OutputSuiteContext(
path=path_prefix,
contains=contains,
include_infer_speed=include_infer_speed,
mode=mode,
)

if artifact == "learning_curves":
save_path = f"s3://{s3_bucket}/aggregated/{result_path}/{artifact}"
artifact_path = output_suite_context.aggregate_learning_curves(save_path=save_path)
else:
aggregated_results_name = f"results_automlbenchmark_{constraint}_{benchmark_name}.csv"
results_df = output_suite_context.aggregate_results()

print(results_df)

artifact_path = f"s3://{s3_bucket}/aggregated/{result_path}/{aggregated_results_name}"
save_pd.save(path=artifact_path, df=results_df)

logger.info(f"Aggregated output saved to {artifact_path}!")
28 changes: 0 additions & 28 deletions src/autogluon/bench/eval/aggregate/results.py

This file was deleted.

36 changes: 36 additions & 0 deletions src/autogluon/bench/eval/benchmark_context/output_context.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import zipfile
from io import BytesIO, TextIOWrapper
from typing import Optional, Set, Union
Expand All @@ -6,9 +7,12 @@
import numpy as np
import pandas as pd

from autogluon.bench.eval.benchmark_context.utils import copy_s3_object, get_s3_paths
from autogluon.common.loaders import load_pd, load_pkl
from autogluon.common.utils.s3_utils import s3_path_to_bucket_prefix

logger = logging.getLogger(__name__)


class OutputContext:
def __init__(self, path):
Expand Down Expand Up @@ -49,6 +53,10 @@ def path_results(self):
def path_leaderboard(self):
return self.path + "leaderboard.csv"

@property
def path_learning_curves(self):
return self.path + "learning_curves/"

@property
def path_model_failures(self):
return self.path + "model_failures.csv"
Expand Down Expand Up @@ -99,6 +107,34 @@ def load_results(
def load_leaderboard(self) -> pd.DataFrame:
return load_pd.load(self.path_leaderboard)

def load_learning_curves(self, save_path: str, suffix: str = "learning_curves.json") -> bool:
path = self.path_learning_curves
all_curves = get_s3_paths(path_prefix=path, suffix=suffix)

ok = True
for origin_path in all_curves:
dataset, fold = self.get_dataset_fold(origin_path)
destination_path = f"{save_path}/{dataset}/{fold}/learning_curves.json"
res = copy_s3_object(origin_path=origin_path, destination_path=destination_path)
ok &= res
if not res:
logger.log(f"Learning Curve artifact at {origin_path} could not be copied to {destination_path}")

return ok

def get_dataset_fold(self, path_str: str) -> tuple[str, str]:
parts = path_str.rstrip("/").split("/")

if len(parts) < 3:
raise ValueError(
f"Improper folder dimensions at {path_str}. Expected following path structure: .../dataset/fold/learning_curves.json"
)

# path pattern: .../dataset/fold/learning_curves.json
dataset, fold, _ = parts[-3:]

return dataset, fold

def load_model_failures(self) -> pd.DataFrame:
"""Load and return the raw model failures file"""
return load_pd.load(self.path_model_failures)
Expand Down
20 changes: 20 additions & 0 deletions src/autogluon/bench/eval/benchmark_context/output_suite_context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import copy
import logging
import os
from typing import List, Optional, Set

Expand All @@ -12,6 +13,8 @@
from autogluon.bench.eval.benchmark_context.utils import get_s3_paths
from autogluon.common.loaders import load_pd

logger = logging.getLogger(__name__)

DEFAULT_COLUMNS_TO_KEEP = [
"id",
"task",
Expand Down Expand Up @@ -175,6 +178,13 @@ def get_zeroshot_metadata_size_bytes(self, allow_exception=False) -> List[int]:
allow_exception=allow_exception,
)

def load_learning_curves(self, save_path: str) -> list[bool]:
return self._loop_func(
func=OutputContext.load_learning_curves,
input_list=self.output_contexts,
kwargs=dict(save_path=save_path),
)

def filter_failures(self):
amlb_info_list = self.get_amlb_info()
output_contexts_valid = []
Expand Down Expand Up @@ -206,6 +216,16 @@ def aggregate_results(self, results_list: List[pd.DataFrame] | None = None) -> p
results_df = pd.concat(results_list, ignore_index=True)
return results_df

def aggregate_learning_curves(self, save_path: str) -> str:
outcomes = self.load_learning_curves(save_path=save_path)
failures = outcomes.count(False)
if failures > 0:
logger.log(
f"WARNING: {failures} of {self.num_contexts} learning_curve artifacts failed to copy successfully from {self.path} to {save_path}"
)

return save_path

def load_leaderboards(self) -> List[pd.DataFrame]:
if self.num_contexts == 0:
raise AssertionError("Empty output_contexts!")
Expand Down
103 changes: 102 additions & 1 deletion src/autogluon/bench/eval/benchmark_context/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,112 @@
from __future__ import annotations

from urllib.parse import urlparse

import boto3

from autogluon.common.loaders import load_s3
from autogluon.common.utils import s3_utils


def get_s3_paths(path_prefix: str, contains=None, suffix=None):
def is_s3_url(path: str) -> bool:
"""
Checks if path is a s3 uri.

Params:
-------
path: str
The path to check.

Returns:
--------
bool: whether the path is a s3 uri.
"""
if (path[:2] == "s3") and ("://" in path[:6]):
return True
return False


def get_bucket_key(s3_uri: str) -> tuple[str, str]:
"""
Retrieves the bucket and key from a s3 uri.

Params:
-------
origin_path: str
The path (s3 uri) to be parsed.

Returns:
--------
bucket_name: str
the associated bucket name
object_key: str
the associated key
"""
if not is_s3_url(s3_uri):
raise ValueError("Invalid S3 URI scheme. It should be 's3'.")

parsed_uri = urlparse(s3_uri)
bucket_name = parsed_uri.netloc
object_key = parsed_uri.path.lstrip("/")

return bucket_name, object_key


def get_s3_paths(path_prefix: str, contains: str | None = None, suffix: str | None = None) -> list[str]:
"""
Gets all s3 paths in the path_prefix that contain 'contains'
and end with 'suffix.'

Params:
-------
path_prefix: str
The path prefix.
contains : Optional[str], default = None
Can be specified to limit the returned outputs.
For example, by specifying the constraint, such as ".1h8c."
suffix: str, default = None
Can be specified to limit the returned outputs.
For example, by specifying "leaderboard.csv" only objects ending
with this suffix will be included
If no suffix provided, will save all files in artifact directory.

Returns:
--------
List[str]: All s3 paths that adhere to the conditions passed in.
"""
bucket, prefix = s3_utils.s3_path_to_bucket_prefix(path_prefix)
objects = load_s3.list_bucket_prefix_suffix_contains_s3(
bucket=bucket, prefix=prefix, suffix=suffix, contains=contains
)
paths_full = [s3_utils.s3_bucket_prefix_to_path(bucket=bucket, prefix=file, version="s3") for file in objects]
return paths_full


def copy_s3_object(origin_path: str, destination_path: str) -> bool:
"""
Copies s3 object from origin_path to destination_path

Params:
-------
origin_path: str
The path (s3 uri) to the original location of the object
destination_path: str
The path (s3 uri) to the intended destination location of the object

Returns:
--------
bool: whether the copy was successful.
"""
origin_bucket, origin_key = get_bucket_key(origin_path)
destination_bucket, destination_key = get_bucket_key(destination_path)

try:
s3 = boto3.client("s3")
s3.copy_object(
Bucket=destination_bucket, CopySource={"Bucket": origin_bucket, "Key": origin_key}, Key=destination_key
)
return True
except:
pass

return False
50 changes: 42 additions & 8 deletions src/autogluon/bench/eval/scripts/aggregate_amlb_results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typer

from autogluon.bench.eval.aggregate.results import aggregate_results
from autogluon.bench.eval.aggregate.aggregate import aggregate

app = typer.Typer()

Expand All @@ -12,6 +12,9 @@ def aggregate_amlb_results(
benchmark_name: str = typer.Argument(
help="Folder name of benchmark run in which all objects with path 'scores/results.csv' get aggregated."
),
artifact: str = typer.Option(
"results", help="What should be saved, one of ['results', 'learning_curves'], default='results'"
),
constraint: str = typer.Option(
None,
help="Name of constraint used in benchmark, refer to https://github.com/openml/automlbenchmark/blob/master/resources/constraints.yaml. Not applicable when `module==multimodal`",
Expand All @@ -20,17 +23,48 @@ def aggregate_amlb_results(
mode: str = typer.Option("ray", help='Aggregation mode: "seq" or "ray".'),
):
"""
Finds "scores/results.csv" under s3://<s3_bucket>/<module>/<benchmark_name> recursively with the constraint if provided,
and append all results into one file at s3://<s3_bucket>/aggregated/<module>/<benchmark_name>/results_automlbenchmark_<constraint>_<benchmark_name>.csv
Aggregates objects across an agbenchmark. Functionality depends on artifact specified:

Params:
-------
s3_bucket: str
Name of the relevant s3_bucket
module: str
The name of the relevant autogluon module: can be one of ['tabular', 'timeseries', 'multimodal']
benchmark_name: str
The name of the relevant benchmark that was run
artifact: str
The desired artifact to be aggregatedL can be one of ['results', 'learning_curves']
constraint: str
Name of constraint used in benchmark
include_infer_speed: bool
Include inference speed in aggregation.
mode: str
Can be one of ['seq', 'ray'].
If seq, runs sequentially.
If ray, utilizes parallelization.

Artifact Outcomes: ['results', 'learning_curves']
results:
Finds "scores/results.csv" under s3://<s3_bucket>/<module>/<benchmark_name> recursively with the constraint if provided,
and append all results into one file at s3://<s3_bucket>/aggregated/<module>/<benchmark_name>/results_automlbenchmark_<constraint>_<benchmark_name>.csv

Example:
agbench aggregate-amlb-results autogluon-benchmark-metrics tabular ag_tabular_20230629T140546 --constraint test

learning_curves:
Finds specified learning_curves.json files under s3://<s3_bucket>/<module>/<benchmark_name> recursively with the constraint if provided,
and stores all artifacts in common directory at s3://<s3_bucket>/aggregated/<module>/<benchmark_name>/

Example:
agbench aggregate-amlb-results autogluon-benchmark-metrics tabular ag_tabular_20230629T140546 --constraint test
Example:
agbench aggregate-amlb-results autogluon-benchmark-metrics tabular ag_bench_learning_curves_20240802T163522 --artifact learning_curves --constraint toy
"""

aggregate_results(
aggregate(
s3_bucket=s3_bucket,
s3_prefix=f"{module}/",
version_name=benchmark_name,
module=module,
benchmark_name=benchmark_name,
artifact=artifact,
constraint=constraint,
include_infer_speed=include_infer_speed,
mode=mode,
Expand Down