diff --git a/examples/run_scripts_v5/download_tabforestpfn_weights.py b/examples/run_scripts_v5/download_tabforestpfn_weights.py new file mode 100644 index 00000000..aadd7192 --- /dev/null +++ b/examples/run_scripts_v5/download_tabforestpfn_weights.py @@ -0,0 +1,49 @@ +from autogluon.common.loaders import load_s3 + + +if __name__ == '__main__': + output_prefix = "/home/ubuntu/workspace/tabpfn_weights/" + s3_bucket = "autogluon-zeroshot" + s3_prefix = "tabpfn/tabforestpfn/" + + filenames = [ + "tabpfn.pt", + "tabforest.pt", + "tabforestpfn.pt", + ] + + for filename in filenames: + load_s3.download(s3_bucket, f"{s3_prefix}{filename}", f"{output_prefix}/{filename}") + + s3_bucket = "mmts-tb" + s3_prefix = "tabular/TabPFN_mix_7_models/" + + filenames = [ + "TabPFN_mix_7_step_500000.pt", + "TabPFN_mix_7_step_600000.pt", + "TabPFN_mix_7_step_300000.pt", + ] + + for filename in filenames: + load_s3.download(s3_bucket, f"{s3_prefix}{filename}", f"{output_prefix}/{filename}") + + s3_bucket = "autogluon-zeroshot" + s3_prefix = "tabpfn/tabdpt/" + output_prefix = "/home/ubuntu/workspace/tabdpt_weights/" + + filenames = [ + "tabdpt_76M.ckpt", + ] + + for filename in filenames: + load_s3.download(s3_bucket, f"{s3_prefix}{filename}", f"{output_prefix}/{filename}") + + s3_bucket = "mmts-tb" + s3_prefix = "tabular/" + + filenames = [ + "TabPFN_real_mix_7_models/model_step_500000.pt", + ] + + for filename in filenames: + load_s3.download(s3_bucket, f"{s3_prefix}{filename}", f"{output_prefix}/{filename}") diff --git a/examples/run_scripts_v5/run_simple_benchmark_w_simulator_realmlp.py b/examples/run_scripts_v5/run_simple_benchmark_w_simulator_realmlp.py new file mode 100644 index 00000000..88323b05 --- /dev/null +++ b/examples/run_scripts_v5/run_simple_benchmark_w_simulator_realmlp.py @@ -0,0 +1,296 @@ +from __future__ import annotations + +import pandas as pd + +from tabrepo import EvaluationRepository, EvaluationRepositoryCollection, Evaluator +from tabrepo.benchmark.experiment import AGModelBagExperiment, ExperimentBatchRunner + +from tabrepo.benchmark.models.ag import RealMLPModel + + +# To re-use the pre-computed results if you have the file "tabrepo_artifacts_realmlp_20250201.zip": +# cd {this_dir} +# unzip tabrepo_artifacts_realmlp_20250201.zip +# Note: This file is currently located in "s3://tabrepo/artifacts/methods/realmlp/tabrepo_artifacts_realmlp_20250201.zip" +# Not publicly available +# You can regenerate this artifact from scratch by running the code. On a 192 CPU core machine, this will take approximately 25 hours. +# If the artifact is present, it will be used and the models will not be re-run. +if __name__ == '__main__': + # Load Context + context_name = "D244_F3_C1530_200" # 200 smallest datasets. To run larger, set to "D244_F3_C1530_200" + expname = "./initial_experiment_simple_simulator" # folder location of all experiment artifacts + ignore_cache = False # set to True to overwrite existing caches and re-run experiments from scratch + + # TODO: in future shouldn't require downloading all repo_og preds (100+ GB) before running experiments + # Should only need preds for ensembling part, but not for comparisons + repo_og: EvaluationRepository = EvaluationRepository.from_context(context_name, cache=True) + + # df_out = get_feature_info(repo_og) + # + # a = df_out[("int", ("bool",))] + # print(a) + # b = a[a > 0] + # datasets_with_bool = list(b.index) + # + # # Sample for a quick demo + datasets = repo_og.datasets() + # datasets = repo_og.datasets(problem_type="regression") + # datasets_filter = repo_og.datasets(problem_type="binary") + repo_og.datasets(problem_type="multiclass") + # datasets = [d for d in datasets if d in datasets_filter] + repo_og.datasets(problem_type="regression") + # # datasets = datasets[:173] + # datasets_og = datasets + # datasets = [d for d in datasets_og if d in datasets_with_bool] + [d for d in datasets_og if d not in datasets_with_bool] + + # datasets = repo_og.datasets(problem_type="regression") + # datasets = datasets[:6] # FIXME: ImputeF crashes on GAMETES_Epistasis_2-Way_1000atts_0_4H_EDM-1_EDM-1_1 fold 0 + folds = [0, 1, 2] + # datasets = ["Internet-Advertisements"] + + # To run everything: + # datasets = repo_og.datasets() + # folds = repo_og.folds + + # TODO: Why is RealMLP slow when running sequentially / not in a bag? Way slower than it should be. Torch threads? + methods = [ + AGModelBagExperiment( # 2025/02/01 num_cpus=192, pytabkit==1.1.3 + name="RealMLP_c1_BAG_L1_v4_noes_r0", + model_cls=RealMLPModel, + model_hyperparameters={ + "random_state": 0, + "use_early_stopping": False, + "use_ls": None, + "bool_to_cat": False, + "impute_bool": True, + }, + ), + AGModelBagExperiment( # 2025/02/01 num_cpus=192, pytabkit==1.1.3 + name="RealMLP_c2_BAG_L1_AutoLS", + model_cls=RealMLPModel, + model_hyperparameters={ + "random_state": 0, + "use_early_stopping": False, + "use_ls": "auto", + "bool_to_cat": False, + "impute_bool": True, + }, + ), + AGModelBagExperiment( # 2025/02/01 num_cpus=192, pytabkit==1.1.3 + name="RealMLP_c2_BAG_L1_AutoLS_AUCStop", + model_cls=RealMLPModel, + model_hyperparameters={ + "random_state": 0, + "use_early_stopping": False, + "use_ls": "auto", + "bool_to_cat": False, + "impute_bool": True, + # "use_roc_auc_to_stop": True, + }, + ), + AGModelBagExperiment( # 2025/02/07 num_cpus=192, pytabkit==1.2.1 + name="RealMLP_c2_BAG_L1_AutoLS_AUCStop_boolcat_impF_naT", + model_cls=RealMLPModel, + model_hyperparameters={ + "random_state": 0, + "use_early_stopping": False, + "use_ls": "auto", + # "use_roc_auc_to_stop": True, + "bool_to_cat": True, + "impute_bool": False, + "name_categories": True, + }, + ), + AGModelBagExperiment( # 2025/02/12 num_cpus=192, pytabkit==1.2.1 + name="RealMLP_c2_BAG_L1_TD", + model_cls=RealMLPModel, + model_hyperparameters={ + "random_state": 0, + "use_early_stopping": False, + "use_ls": "auto", + # "use_roc_auc_to_stop": True, + "bool_to_cat": True, + "impute_bool": False, + "name_categories": True, + # "td_s_reg": False, + }, + ), + # AGModelBagExperiment( # 2025/03/05 num_cpus=192, pytabkit==1.2.1 + # name="RealMLP_c1_BAG_L1", + # model_cls=RealMLPModel, + # model_hyperparameters={}, + # ), + ] + + exp_batch_runner = ExperimentBatchRunner( + expname=expname, + task_metadata=repo_og.task_metadata, + cache_path_format="task_first", + ) + + # results_lst = exp_batch_runner.load_results( + # methods=methods, + # datasets=datasets, + # folds=folds, + # ) + + results_lst = exp_batch_runner.run( + methods=methods, + datasets=datasets, + folds=folds, + ignore_cache=ignore_cache, + ) + + repo = exp_batch_runner.repo_from_results(results_lst=results_lst) + + # TODO: repo.configs_type should not be None for custom methods + repo.print_info() + + save_path = "repo_new" + repo.to_dir(path=save_path) # Load the repo later via `EvaluationRepository.from_dir(save_path)` + + print(f"New Configs : {repo.configs()}") + + shared_datasets = [d for d in repo.datasets(union=False) if d in repo_og.datasets()] + + # repo_tabforestpfn = EvaluationRepository.from_dir(path="tabforestpfn_sim") + # shared_datasets = [d for d in shared_datasets if d in repo_tabforestpfn.datasets(union=False)] + # repo_combined = EvaluationRepositoryCollection(repos=[repo_og, repo, repo_tabforestpfn], config_fallback="ExtraTrees_c1_BAG_L1") + repo_combined = EvaluationRepositoryCollection(repos=[repo_og, repo], config_fallback="ExtraTrees_c1_BAG_L1") + repo_combined = repo_combined.subset(datasets=shared_datasets) + repo_combined.set_config_fallback("ExtraTrees_c1_BAG_L1") + evaluator = Evaluator(repo=repo_combined) + + repo_combined.print_info() + + comparison_configs_og = [ + "RandomForest_c1_BAG_L1", + "ExtraTrees_c1_BAG_L1", + "LightGBM_c1_BAG_L1", + "XGBoost_c1_BAG_L1", + "CatBoost_c1_BAG_L1", + "NeuralNetTorch_c1_BAG_L1", + "NeuralNetFastAI_c1_BAG_L1", + ] + + comparison_configs = comparison_configs_og + [ + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop_121", + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop_121_bool_to_cat", + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop_121_bool_to_cat_false", + + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop_121_bool_to_cat2", + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop_121_bool_to_cat_false2", + # "RealMLP_c2_BAG_L1_AutoLS_bool_to_cat", + # "RealMLP_c2_BAG_L1_AutoLS_impute_true", + # "RealMLP_c2_BAG_L1_AutoLS_bool_to_cat_impute_true", + + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop_boolcat_imputeT", # FIXME: CRASHES? + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop_boolcat_imputeF", # FIXME: CRASHES? + + # "RealMLP_c1_BAG_L1_v4_noes_r0", # did 600 runs (200x3) + # "RealMLP_c2_BAG_L1_AutoLS", # 200x3 + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop", # 200x3 + "RealMLP_c2_BAG_L1_AutoLS_AUCStop_boolcat_impF_naT", # 200x3 + # "RealMLP_c2_BAG_L1_AutoLS_AUCStop_impF_naT", # 175x3 + "RealMLP_c2_BAG_L1_TD", # 200x3 + # "RealMLP_c1_BAG_L1", + ] + + evaluator.compute_avg_config_prediction_delta(configs=comparison_configs + ["EBM_BAG_L1", "TabPFNv2_N4_BAG_L1", "TabPFN_Mix7_600000_N4_E30_FIX_BAG_L1"]) + + # comparison_configs += [ + # "TabForestPFN_N4_E10_BAG_L1", + # "TabForestPFN_N4_E30_BAG_L1", + # "TabForestPFN_N4_E50_BAG_L1", + # "TabForestPFN_N1_E10_S4_BAG_L1", + # "TabPFN_Mix7_500000_N4_E30_BAG_L1", + # "TabPFN_Mix7_600000_N4_E30_BAG_L1", + # "TabPFN_Mix7_300000_N4_E30_BAG_L1", + # "TabPFN_Mix7_600000_N4_E30_S4_BAG_L1", + # "TabPFNv2_N4_BAG_L1", + # "EBM_BAG_L1", + # "TabPFN_Mix7_600000_N1_E0_BAG_L1", + # "TabPFN_Mix7_600000_N4_E0_BAG_L1", + # "LightGBM_c1_BAG_L1_V2", + # "TabDPT_N1_E0_BAG_L1", + # "TabRMix7_500000_N1_E0_BAG_L1", + # "TabRMix7_500000_N4_E30_BAG_L1", + # "TabPFN_Mix7_600000_N4_E30_FIX_BAG_L1", + # "TabPFN_Mix7_600000_N4_E50_FIX_BAG_L1", + # "TabPFN_Mix7_600000_N4_E30_FIX_BAG_L1_COMPARISON", + # ] + + + + df_ensemble_results, df_ensemble_weights = repo_combined.evaluate_ensembles(configs=comparison_configs, ensemble_size=40) + df_ensemble_results = df_ensemble_results.reset_index() + df_ensemble_results["framework"] = "ensemble_with_RealMLP_c1" + + df_ensemble_results_og, df_ensemble_weights_og = repo_combined.evaluate_ensembles(configs=comparison_configs_og, ensemble_size=40) + df_ensemble_results_og = df_ensemble_results_og.reset_index() + df_ensemble_results_og["framework"] = "ensemble_og" + + # from script_utils import load_ag11_bq_baseline + # df_processed_ag11_2024 = load_ag11_bq_baseline(datasets=repo_combined.datasets(), folds=repo_combined.folds, repo=repo_combined) + + repo_og.set_config_fallback("ExtraTrees_c1_BAG_L1") + df_zeroshot_portfolio_og = evaluator.zeroshot_portfolio(configs=repo_og.configs()) + df_zeroshot_portfolio_og["framework"] = "zeroshot_og" + + df_zeroshot_portfolio_w_realmlp = evaluator.zeroshot_portfolio(configs=repo_combined.configs()) + df_zeroshot_portfolio_w_realmlp["framework"] = "zeroshot_w_realmlp" + + df_zeroshot_portfolio_w_realmlp_single = evaluator.zeroshot_portfolio(configs=repo_og.configs() + ["RealMLP_c2_BAG_L1_TD"]) + df_zeroshot_portfolio_w_realmlp_single["framework"] = "zeroshot_w_realmlp_single" + + df_zeroshot_portfolio_w_realmlp_2 = evaluator.zeroshot_portfolio(configs=repo_og.configs() + ["RealMLP_c2_BAG_L1_TD", "RealMLP_c2_BAG_L1_AutoLS_AUCStop_boolcat_impF_naT"]) + df_zeroshot_portfolio_w_realmlp_2["framework"] = "zeroshot_w_realmlp_2" + + df_zeroshot_portfolio_w_realmlp_n5 = evaluator.zeroshot_portfolio(configs=repo_combined.configs(), n_portfolios=10) + df_zeroshot_portfolio_w_realmlp_n5["framework"] = "zeroshot_w_realmlp_n10" + + df_zeroshot_portfolio_n5 = evaluator.zeroshot_portfolio(configs=repo_og.configs(), n_portfolios=10) + df_zeroshot_portfolio_n5["framework"] = "zeroshot_og_n10" + + results_df = pd.concat([ + df_ensemble_results, + df_ensemble_results_og, + # df_processed_ag11_2024, + df_zeroshot_portfolio_og, + df_zeroshot_portfolio_w_realmlp, + df_zeroshot_portfolio_w_realmlp_single, + df_zeroshot_portfolio_w_realmlp_2, + df_zeroshot_portfolio_w_realmlp_n5, + df_zeroshot_portfolio_n5, + ], ignore_index=True) + + baselines = [ + "AutoGluon_bq_4h8c_2023_11_14", + # "AutoGluon_bq_4h8c_2024_10_25", + ] + + p = evaluator.plot_ensemble_weights(df_ensemble_weights=df_ensemble_weights, figsize=(16, 60)) + p.savefig("ensemble_weights_w_RealMLP_c1") + + metrics = evaluator.compare_metrics( + results_df=results_df, + datasets=datasets, + folds=folds, + baselines=baselines, + configs=comparison_configs, + ) + + metrics_tmp = metrics.reset_index(drop=False) + + with pd.option_context("display.max_rows", None, "display.max_columns", None, "display.width", 1000): + print(f"Config Metrics Example:\n{metrics.head(100)}") + + evaluator_output = evaluator.plot_overall_rank_comparison( + results_df=metrics, + save_dir=expname, + evaluator_kwargs={ + "treat_folds_as_datasets": True, + "frameworks_compare_vs_all": [ + "RealMLP_c2_BAG_L1_TD", + "zeroshot_w_realmlp", + ], + }, + ) diff --git a/examples/run_scripts_v5/run_tabforestpfn_benchmark_w_simulator.py b/examples/run_scripts_v5/run_tabforestpfn_benchmark_w_simulator.py new file mode 100644 index 00000000..6fa0a568 --- /dev/null +++ b/examples/run_scripts_v5/run_tabforestpfn_benchmark_w_simulator.py @@ -0,0 +1,352 @@ +from __future__ import annotations + +import pandas as pd + +from tabrepo import load_repository, EvaluationRepository +from tabrepo.benchmark.models.wrapper.AutoGluon_class import AGWrapper +from tabrepo.benchmark.models.wrapper.ag_model import AGModelWrapper +from tabrepo.benchmark.models.ag import ( + ExplainableBoostingMachineModel, + TabDPTModel, + TabPFNV2ClientModel, +) +from autogluon.tabular.models import TabPFNMixModel +from tabrepo.benchmark.experiment.experiment_utils import run_experiments, convert_leaderboard_to_configs +from tabrepo.utils.cache import CacheFunctionPickle +from tabrepo.repository.repo_utils import convert_time_infer_s_from_batch_to_sample + +from script_utils import load_ag11_bq_baseline +from autogluon.core.models import DummyModel # This model is a placeholder for models that don't work anymore/are deleted + + +if __name__ == '__main__': + # Load Context + context_name = "D244_F3_C1530_200" # 100 smallest datasets. To run larger, set to "D244_F3_C1530_200" + expname = "./initial_experiment_tabforestpfn_simulator" # folder location of all experiment artifacts + ignore_cache = False # set to True to overwrite existing caches and re-run experiments from scratch + + repo: EvaluationRepository = load_repository(context_name, cache=True) + + # a = repo.configs(tasks=[("balance-scale", 0), ("adasda", 2)]) + + # Subset to tasks supported by TabPFNv2 + task_metadata = repo.task_metadata.copy(deep=True) + task_metadata = task_metadata[task_metadata["NumberOfInstances"] <= 10000] + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] > 5000] + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] > 9000] + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] >= 2000] + task_metadata = task_metadata[task_metadata["NumberOfFeatures"] <= 500] + # task_metadata = task_metadata[task_metadata["NumberOfFeatures"] <= 100] + task_metadata = task_metadata[task_metadata["NumberOfClasses"] <= 10] + + datasets = list(task_metadata["dataset"]) + # datasets = datasets[:50] # Capping to 50 because TabPFNv2 runs into daily limit with more + + task_metadata = task_metadata[task_metadata["dataset"].isin(datasets)] + task_metadata = task_metadata[task_metadata["NumberOfClasses"] >= 2] + # task_metadata = task_metadata[task_metadata["NumberOfFeatures"] <= 100] + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] > 1000] + datasets = list(task_metadata["dataset"]) + + # datasets = datasets[:91] + datasets[92:] + + # TabPFNv2 fails on these datasets for unknown reasons + banned_datasets = ["Indian_pines", "topo_2_1"] + + datasets = [d for d in datasets if d not in banned_datasets] + + # datasets = datasets[:1] + + folds = [0, 1, 2] + + task_metadata = task_metadata[task_metadata["dataset"].isin(datasets)] + + # To run everything: + # datasets = repo.datasets() + # folds = repo.folds + path_weights_tabpfn_real_mix7_500000 = '/home/ubuntu/workspace/tabpfn_weights/TabPFN_real_mix_7_models/model_step_500000.pt' + + path_weights_tabpfn_mix7_500000 = '/home/ubuntu/workspace/tabpfn_weights/TabPFN_mix_7_step_500000.pt' + path_weights_tabpfn_mix7_600000 = '/home/ubuntu/workspace/tabpfn_weights/TabPFN_mix_7_step_600000.pt' + path_weights_tabpfn_mix7_300000 = '/home/ubuntu/workspace/tabpfn_weights/TabPFN_mix_7_step_300000.pt' + + methods = [ + # ("LightGBM_c1_BAG_L1_V2", CustomAutoGluon, {"fit_kwargs": { + # "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + # "hyperparameters": {"GBM": [{}]}, + # }}), + ("TabForestPFN_N4_E10_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {DummyModel: [{"n_ensembles": 4, "max_epochs": 10}]}, + }}), + ("TabForestPFN_N1_E10_S4_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 4, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {DummyModel: [{"n_ensembles": 1, "max_epochs": 10}]}, + }}), + ("TabForestPFN_N4_E30_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {DummyModel: [{"n_ensembles": 4, "max_epochs": 30}]}, + }}), + ("TabPFN_Mix7_500000_N4_E30_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {DummyModel: [{"n_ensembles": 4, "max_epochs": 30, "path_weights": path_weights_tabpfn_mix7_500000}]}, + }}), + ("TabPFN_Mix7_600000_N4_E30_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {DummyModel: [{"n_ensembles": 4, "max_epochs": 30, "path_weights": path_weights_tabpfn_mix7_600000}]}, + }}), + ("TabPFN_Mix7_300000_N4_E30_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {DummyModel: [{"n_ensembles": 4, "max_epochs": 30, "path_weights": path_weights_tabpfn_mix7_300000}]}, + }}), + ("TabPFN_Mix7_600000_N4_E30_S4_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 4, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {DummyModel: [{"n_ensembles": 4, "max_epochs": 30, "path_weights": path_weights_tabpfn_mix7_600000}]}, + }}), + ("EBM_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {ExplainableBoostingMachineModel: [{}]}, + }}), + ("TabPFNv2_N4_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {TabPFNV2ClientModel: [{"n_estimators": 4}]}, + }}), + ("TabPFN_Mix7_600000_N1_E0_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {DummyModel: [{"n_ensembles": 1, "max_epochs": 0, "path_weights": path_weights_tabpfn_mix7_600000}]}, + }}), + # ("TabRMix7_500000_N1_E0_BAG_L1", AGWrapper, {"fit_kwargs": { + # "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + # "hyperparameters": {DummyModel: [{"n_ensembles": 1, "max_epochs": 0, "path_weights": path_weights_tabpfn_real_mix7_500000}]}, + # }}), + # ("TabRMix7_500000_N4_E30_BAG_L1", AGWrapper, {"fit_kwargs": { + # "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + # "hyperparameters": {DummyModel: [{"n_ensembles": 4, "max_epochs": 30, "path_weights": path_weights_tabpfn_real_mix7_500000}]}, + # }}), + ("TabDPT", AGModelWrapper, {"model_cls": TabDPTModel}), + ("TabDPT_CS10000", AGModelWrapper, {"model_cls": TabDPTModel, "hyperparameters": {"context_size": 10000}}), + ("TabDPT_CS128", AGModelWrapper, {"model_cls": TabDPTModel, "hyperparameters": {"context_size": 128}}), + ("TabDPT_CS256", AGModelWrapper, {"model_cls": TabDPTModel, "hyperparameters": {"context_size": 256}}), + ("TabDPT_CS512", AGModelWrapper, {"model_cls": TabDPTModel, "hyperparameters": {"context_size": 512}}), + ("TabDPT_CS1024", AGModelWrapper, {"model_cls": TabDPTModel, "hyperparameters": {"context_size": 1024}}), + # ("TabPFNv2_N32", AGModelWrapper, {"model_cls": TabPFNV2ClientModel, "hyperparameters": {"n_estimators": 32}}), + # ("TabDPT_N1_E0_BAG_L1", AGWrapper, {"fit_kwargs": { + # "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + # "hyperparameters": {TabDPTModel: [{}]}, + # }}), + + # NEW RUNS AFTER FIXING EPOCHS, CHECKPOINTING, TORCH THREADS, AND STOPPING METRIC (2024/11/18) + ("TabPFN_Mix7_600000_N4_E30_FIX_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {TabPFNMixModel: [{"n_ensembles": 4, "max_epochs": 30, "path_weights": path_weights_tabpfn_mix7_600000}]}, + }}), + ("TabPFN_Mix7_600000_N4_E50_FIX_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {TabPFNMixModel: [{"n_ensembles": 4, "max_epochs": 50, "path_weights": path_weights_tabpfn_mix7_600000}]}, + }}), + ] + + # FIXME: experiment_cls, cache_true/false, etc. + tids = [repo.dataset_to_tid(dataset) for dataset in datasets] + results_lst = run_experiments( + expname=expname, + tids=tids, + folds=folds, + methods=methods, + cache_cls=CacheFunctionPickle, + task_metadata=repo.task_metadata, + ignore_cache=ignore_cache, + cache_path_format="task_first", + ) + + results_baselines = [result["df_results"] for result in results_lst if result["simulation_artifacts"] is None] + + if results_baselines: + df_baselines = pd.concat(results_baselines, ignore_index=True) + df_baselines = convert_time_infer_s_from_batch_to_sample(df_baselines, repo=repo) + else: + df_baselines = None + + results_configs = [result for result in results_lst if result["simulation_artifacts"] is not None] + + results_lst_simulation_artifacts = [result["simulation_artifacts"] for result in results_configs] + results_lst_df = [result["df_results"] for result in results_configs] + results_lst_df = [convert_leaderboard_to_configs(df) for df in results_lst_df] # TODO: Remove later, keeping to make old runs compatible with new runs + + df_configs = pd.concat(results_lst_df, ignore_index=True) + + # TODO: Remove later, keeping to make old runs compatible with new runs + df_configs["metric"] = df_configs["metric"].map({ + "root_mean_squared_error": "rmse", + }).fillna(df_configs["metric"]) + + df_configs = convert_time_infer_s_from_batch_to_sample(df_configs, repo=repo) + + df_processed_ag12_2024 = load_ag11_bq_baseline(datasets=datasets, folds=folds, repo=repo) + df_baselines = pd.concat([df_baselines, df_processed_ag12_2024], ignore_index=True) + + df_configs = df_configs.drop(columns=["tid"]) + df_baselines = df_baselines.drop(columns=["tid"]) + + repo_2: EvaluationRepository = EvaluationRepository.from_raw( + df_configs=df_configs, + df_baselines=df_baselines, + results_lst_simulation_artifacts=results_lst_simulation_artifacts, + task_metadata=task_metadata, + ) + + save_loc = "tabforestpfn_sim" + repo_2.to_dir(path=save_loc) + + print(f"New Configs : {repo_2.configs()}") + + # FIXME: Allow picking ensemble based on test score, to see the difference in weights + + from tabrepo import EvaluationRepositoryCollection + repo_combined = EvaluationRepositoryCollection(repos=[repo, repo_2], config_fallback="ExtraTrees_c1_BAG_L1") + repo_combined = repo_combined.subset(datasets=repo_2.datasets(), folds=repo_2.folds) + + # FIXME: repo_combined._zeroshot_context.df_metrics contains 200 datasets when it should contain only 110 + + configs = repo_combined.configs() + + repo_combined.print_info() + + # FIXME: Add boxplot of overfitting, basically, val vs test, percent loss delta + a = repo_combined.metrics() + + configs_og = repo.configs() + + # result_ens_og, result_ens_og_weights = repo_combined.evaluate_ensembles(datasets=repo_combined.datasets(), configs=configs_og, ensemble_size=25, rank=False) + # + # weights_og = result_ens_og_weights.mean(axis=0).sort_values(ascending=False) + # print("weights_og") + # print(weights_og) + # + # result_ens, result_ens_weights = repo_combined.evaluate_ensembles(datasets=repo_combined.datasets(), configs=configs, ensemble_size=25, rank=False) + # weights = result_ens_weights.mean(axis=0).sort_values(ascending=False) + # print("weights") + # print(weights) + + # result_ens_og_cheat, result_ens_og_weights_cheat = repo_combined.evaluate_ensembles(datasets=repo_combined.datasets(), configs=configs_og, ensemble_size=25, rank=False, ensemble_kwargs={"cheater": True}) + # result_ens_cheat, result_ens_weights_cheat = repo_combined.evaluate_ensembles(datasets=repo_combined.datasets(), configs=configs, ensemble_size=25, rank=False, ensemble_kwargs={"cheater": True}) + + # weights_og = result_ens_og_weights_cheat.mean(axis=0).sort_values(ascending=False) + # print("weights_og cheater") + # print(weights_og) + # + # weights = result_ens_weights_cheat.mean(axis=0).sort_values(ascending=False) + # print("weights cheater") + # print(weights) + + # result_ens_og = result_ens_og.reset_index() + # result_ens_og["framework"] = "ALL" + # result_ens = result_ens.reset_index() + # result_ens["framework"] = "ALL_PLUS_TabPFN" + + # result_ens_og_cheat = result_ens_og_cheat.reset_index() + # result_ens_og_cheat["framework"] = "ALL_CHEAT" + # result_ens_cheat = result_ens_cheat.reset_index() + # result_ens_cheat["framework"] = "ALL_PLUS_TabForestPFN_CHEAT" + + # results_df_2 = pd.concat([ + # result_ens, + # result_ens_og, + # # df_processed_ag12_2024, + # # result_ens_cheat, + # # result_ens_og_cheat, + # ], ignore_index=True) + + # results_df_2 = convert_time_infer_s_from_sample_to_batch(results_df_2, repo=repo_combined) + + # print(f"AVG OG: {result_ens_og[0].mean()}") + # print(f"AVG: {result_ens[0].mean()}") + + with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000): + print(df_configs.head(100)) + + comparison_configs = [ + "RandomForest_c1_BAG_L1", + "ExtraTrees_c1_BAG_L1", + "LightGBM_c1_BAG_L1", + "XGBoost_c1_BAG_L1", + "CatBoost_c1_BAG_L1", + # "TabPFN_c1_BAG_L1", + "NeuralNetTorch_c1_BAG_L1", + "NeuralNetFastAI_c1_BAG_L1", + "TabForestPFN_N4_E10_BAG_L1", + "TabForestPFN_N4_E30_BAG_L1", + "TabForestPFN_N4_E50_BAG_L1", + "TabForestPFN_N1_E10_S4_BAG_L1", + "TabPFN_Mix7_500000_N4_E30_BAG_L1", + "TabPFN_Mix7_600000_N4_E30_BAG_L1", + "TabPFN_Mix7_300000_N4_E30_BAG_L1", + "TabPFN_Mix7_600000_N4_E30_S4_BAG_L1", + "TabPFNv2_N4_BAG_L1", + "EBM_BAG_L1", + "TabPFN_Mix7_600000_N1_E0_BAG_L1", + "TabPFN_Mix7_600000_N4_E0_BAG_L1", + "LightGBM_c1_BAG_L1_V2", + "TabDPT_N1_E0_BAG_L1", + "TabRMix7_500000_N1_E0_BAG_L1", + "TabRMix7_500000_N4_E30_BAG_L1", + "TabPFN_Mix7_600000_N4_E30_FIX_BAG_L1", + "TabPFN_Mix7_600000_N4_E50_FIX_BAG_L1", + "TabPFN_Mix7_600000_N4_E30_FIX_BAG_L1_COMPARISON", + ] + + baselines = [ + "AutoGluon_bq_4h8c_2023_11_14", + "H2OAutoML_4h8c_2023_11_14", + "flaml_4h8c_2023_11_14", + "lightautoml_4h8c_2023_11_14", + "autosklearn_4h8c_2023_11_14", + "AutoGluon_bq_4h8c_2024_10_25", + "TabPFNv2_N32", + "TabDPT", + "TabDPT_CS10000", + "TabDPT_CS128", + "TabDPT_CS256", + "TabDPT_CS512", + "TabDPT_CS1024", + ] + + from tabrepo.evaluation.evaluator import Evaluator + + evaluator = Evaluator(repo_combined) + + metrics = evaluator.compare_metrics( + # results_df, + # results_df_2, + datasets=datasets, + folds=folds, + baselines=baselines, + configs=comparison_configs, + ) + + metrics_tmp = metrics.reset_index(drop=False) + + with pd.option_context("display.max_rows", None, "display.max_columns", None, "display.width", 1000): + print(f"Config Metrics Example:\n{metrics.head(100)}") + evaluator_kwargs = { + "frameworks_compare_vs_all": [ + "TabPFN_Mix7_600000_N4_E30_FIX_BAG_L1", + "TabPFN_Mix7_600000_N4_E30_S4_BAG_L1", + "TabPFNv2_N4_BAG_L1", + # "ALL", + # "ALL_PLUS_TabPFN", + # "AutoGluon_bq_mainline_4h8c_2024_10_25", + 'AutoGluon 1.1 (4h8c)', + ], + "frameworks_rename": { + "AutoGluon_bq_4h8c_2024_10_25": "AutoGluon 1.1 (4h8c)", + "AutoGluon_bq_4h8c_2023_11_14": "AutoGluon 0.8 (4h8c)", + }, + # "frameworks_compare_vs_all": ["TabPFNv2"], + } + evaluator_output = evaluator.plot_overall_rank_comparison( + results_df=metrics, + evaluator_kwargs=evaluator_kwargs, + save_dir=expname, + calibration_framework="RandomForest_c1_BAG_L1", + ) diff --git a/examples/run_scripts_v5/run_tabforestpfn_benchmark_w_simulator_2025_02_14.py b/examples/run_scripts_v5/run_tabforestpfn_benchmark_w_simulator_2025_02_14.py new file mode 100644 index 00000000..f19f07fd --- /dev/null +++ b/examples/run_scripts_v5/run_tabforestpfn_benchmark_w_simulator_2025_02_14.py @@ -0,0 +1,247 @@ +from __future__ import annotations + +import pandas as pd + +from tabrepo import load_repository, EvaluationRepository +from tabrepo.benchmark.models.wrapper.AutoGluon_class import AGWrapper +from tabrepo.benchmark.models.ag import TabPFNV2Model, TabPFNV2ClientModel +from tabrepo.benchmark.experiment.experiment_utils import run_experiments, convert_leaderboard_to_configs +from tabrepo.utils.cache import CacheFunctionPickle +from tabrepo.repository.repo_utils import convert_time_infer_s_from_batch_to_sample + +from script_utils import load_ag11_bq_baseline + + +if __name__ == '__main__': + # Load Context + context_name = "D244_F3_C1530_200" # 100 smallest datasets. To run larger, set to "D244_F3_C1530_200" + expname = "./initial_experiment_tabforestpfn_simulator" # folder location of all experiment artifacts + ignore_cache = False # set to True to overwrite existing caches and re-run experiments from scratch + + repo: EvaluationRepository = load_repository(context_name, cache=True) + + # a = repo.configs(tasks=[("balance-scale", 0), ("adasda", 2)]) + + # Subset to tasks supported by TabPFNv2 + task_metadata = repo.task_metadata.copy(deep=True) + task_metadata = task_metadata[task_metadata["NumberOfInstances"] <= 10000] + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] <= 2000] # FIXME TMP + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] > 5000] + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] > 9000] + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] >= 2000] + task_metadata = task_metadata[task_metadata["NumberOfFeatures"] <= 500] + # task_metadata = task_metadata[task_metadata["NumberOfFeatures"] <= 100] + task_metadata = task_metadata[task_metadata["NumberOfClasses"] <= 10] + + datasets = list(task_metadata["dataset"]) + # datasets = datasets[:50] # Capping to 50 because TabPFNv2 runs into daily limit with more + + task_metadata = task_metadata[task_metadata["dataset"].isin(datasets)] + # task_metadata = task_metadata[task_metadata["NumberOfClasses"] >= 2] + # task_metadata = task_metadata[task_metadata["NumberOfClasses"] < 2] + # task_metadata = task_metadata[task_metadata["NumberOfFeatures"] <= 100] + # task_metadata = task_metadata[task_metadata["NumberOfInstances"] > 1000] + datasets = list(task_metadata["dataset"]) + + # datasets = datasets[:91] + datasets[92:] + + # TabPFNv2 fails on these datasets for unknown reasons + banned_datasets = ["Indian_pines", "topo_2_1"] + + datasets = [d for d in datasets if d not in banned_datasets] + + datasets = datasets[:44] + + folds = [0, 1, 2] + + task_metadata = task_metadata[task_metadata["dataset"].isin(datasets)] + + # To run everything: + # datasets = repo.datasets() + # folds = repo.folds + + methods = [ + ("TabPFNv2_N4_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {TabPFNV2ClientModel: [{"n_estimators": 4}]}, + }}), + # ("TabPFNv2Local_N4_BAG_L1", AGWrapper, {"fit_kwargs": { + # "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + # "hyperparameters": {TabPFNV2Model: [{"n_estimators": 4}]}, + # }}), + ("TabPFNv2LocalParallel_N4_BAG_L1", AGWrapper, {"fit_kwargs": { + "num_bag_folds": 8, "num_bag_sets": 1, "fit_weighted_ensemble": False, "calibrate": False, + "hyperparameters": {TabPFNV2Model: [{"n_estimators": 4, "ag_args_ensemble": {"fold_fitting_strategy": "parallel_local"}}]}, + }}), + ] + + # FIXME: experiment_cls, cache_true/false, etc. + tids = [repo.dataset_to_tid(dataset) for dataset in datasets] + results_lst = run_experiments( + expname=expname, + tids=tids, + folds=folds, + methods=methods, + cache_cls=CacheFunctionPickle, + task_metadata=repo.task_metadata, + ignore_cache=ignore_cache, + ) + + results_baselines = [result["df_results"] for result in results_lst if result["simulation_artifacts"] is None] + + if results_baselines: + df_baselines = pd.concat(results_baselines, ignore_index=True) + df_baselines = convert_time_infer_s_from_batch_to_sample(df_baselines, repo=repo) + else: + df_baselines = None + + results_configs = [result for result in results_lst if result["simulation_artifacts"] is not None] + + results_lst_simulation_artifacts = [result["simulation_artifacts"] for result in results_configs] + results_lst_df = [result["df_results"] for result in results_configs] + results_lst_df = [convert_leaderboard_to_configs(df) for df in results_lst_df] # TODO: Remove later, keeping to make old runs compatible with new runs + + df_configs = pd.concat(results_lst_df, ignore_index=True) + + # TODO: Remove later, keeping to make old runs compatible with new runs + df_configs["metric"] = df_configs["metric"].map({ + "root_mean_squared_error": "rmse", + }).fillna(df_configs["metric"]) + + df_configs = convert_time_infer_s_from_batch_to_sample(df_configs, repo=repo) + + df_processed_ag12_2024 = load_ag11_bq_baseline(datasets=datasets, folds=folds, repo=repo) + df_baselines = pd.concat([df_baselines, df_processed_ag12_2024], ignore_index=True) + + df_configs = df_configs.drop(columns=["tid"]) + df_baselines = df_baselines.drop(columns=["tid"]) + + repo_2: EvaluationRepository = EvaluationRepository.from_raw( + df_configs=df_configs, + df_baselines=df_baselines, + results_lst_simulation_artifacts=results_lst_simulation_artifacts, + task_metadata=task_metadata, + ) + + save_loc = "tabforestpfn_sim_v2" + repo_2.to_dir(path=save_loc) + + print(f"New Configs : {repo_2.configs()}") + + # FIXME: Allow picking ensemble based on test score, to see the difference in weights + + from tabrepo import EvaluationRepositoryCollection + repo_combined = EvaluationRepositoryCollection(repos=[repo, repo_2], config_fallback="ExtraTrees_c1_BAG_L1") + repo_combined = repo_combined.subset(datasets=repo_2.datasets(), folds=repo_2.folds) + + # FIXME: repo_combined._zeroshot_context.df_metrics contains 200 datasets when it should contain only 110 + + configs = repo_combined.configs() + + repo_combined.print_info() + + # FIXME: Add boxplot of overfitting, basically, val vs test, percent loss delta + a = repo_combined.metrics() + + configs_og = repo.configs() + + # result_ens_og, result_ens_og_weights = repo_combined.evaluate_ensembles(datasets=repo_combined.datasets(), configs=configs_og, ensemble_size=25, rank=False) + # + # weights_og = result_ens_og_weights.mean(axis=0).sort_values(ascending=False) + # print("weights_og") + # print(weights_og) + # + # result_ens, result_ens_weights = repo_combined.evaluate_ensembles(datasets=repo_combined.datasets(), configs=configs, ensemble_size=25, rank=False) + # weights = result_ens_weights.mean(axis=0).sort_values(ascending=False) + # print("weights") + # print(weights) + + # result_ens_og_cheat, result_ens_og_weights_cheat = repo_combined.evaluate_ensembles(datasets=repo_combined.datasets(), configs=configs_og, ensemble_size=25, rank=False, ensemble_kwargs={"cheater": True}) + # result_ens_cheat, result_ens_weights_cheat = repo_combined.evaluate_ensembles(datasets=repo_combined.datasets(), configs=configs, ensemble_size=25, rank=False, ensemble_kwargs={"cheater": True}) + + # weights_og = result_ens_og_weights_cheat.mean(axis=0).sort_values(ascending=False) + # print("weights_og cheater") + # print(weights_og) + # + # weights = result_ens_weights_cheat.mean(axis=0).sort_values(ascending=False) + # print("weights cheater") + # print(weights) + + # result_ens_og = result_ens_og.reset_index() + # result_ens_og["framework"] = "ALL" + # result_ens = result_ens.reset_index() + # result_ens["framework"] = "ALL_PLUS_TabPFN" + + # result_ens_og_cheat = result_ens_og_cheat.reset_index() + # result_ens_og_cheat["framework"] = "ALL_CHEAT" + # result_ens_cheat = result_ens_cheat.reset_index() + # result_ens_cheat["framework"] = "ALL_PLUS_TabForestPFN_CHEAT" + + # results_df_2 = pd.concat([ + # result_ens, + # result_ens_og, + # # df_processed_ag12_2024, + # # result_ens_cheat, + # # result_ens_og_cheat, + # ], ignore_index=True) + + # results_df_2 = convert_time_infer_s_from_sample_to_batch(results_df_2, repo=repo_combined) + + # print(f"AVG OG: {result_ens_og[0].mean()}") + # print(f"AVG: {result_ens[0].mean()}") + + with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000): + print(df_configs.head(100)) + + comparison_configs = [ + "RandomForest_c1_BAG_L1", + "ExtraTrees_c1_BAG_L1", + "LightGBM_c1_BAG_L1", + "XGBoost_c1_BAG_L1", + "CatBoost_c1_BAG_L1", + # "TabPFN_c1_BAG_L1", + "NeuralNetTorch_c1_BAG_L1", + "NeuralNetFastAI_c1_BAG_L1", + + "TabPFNv2_N4_BAG_L1", + # "TabPFNv2Local_N4_BAG_L1", + "TabPFNv2LocalParallel_N4_BAG_L1", + ] + + baselines = [ + "AutoGluon_bq_4h8c_2023_11_14", + ] + + from tabrepo.evaluation.evaluator import Evaluator + + evaluator = Evaluator(repo_combined) + + metrics = evaluator.compare_metrics( + # results_df, + # results_df_2, + datasets=datasets, + folds=folds, + baselines=baselines, + configs=comparison_configs, + ) + + metrics_tmp = metrics.reset_index(drop=False) + + with pd.option_context("display.max_rows", None, "display.max_columns", None, "display.width", 1000): + print(f"Config Metrics Example:\n{metrics.head(100)}") + evaluator_kwargs = { + "treat_folds_as_datasets": True, + "frameworks_compare_vs_all": [ + "TabPFNv2_N4_BAG_L1", + ], + "frameworks_rename": { + "AutoGluon_bq_4h8c_2023_11_14": "AutoGluon 0.8 (4h8c)", + }, + # "frameworks_compare_vs_all": ["TabPFNv2"], + } + evaluator_output = evaluator.plot_overall_rank_comparison( + results_df=metrics, + evaluator_kwargs=evaluator_kwargs, + save_dir=expname, + calibration_framework="RandomForest_c1_BAG_L1", + ) diff --git a/examples/run_scripts_v5/script_utils.py b/examples/run_scripts_v5/script_utils.py new file mode 100644 index 00000000..cdee9c64 --- /dev/null +++ b/examples/run_scripts_v5/script_utils.py @@ -0,0 +1,50 @@ +import copy + +import pandas as pd + +from autogluon.common.loaders import load_pd +from autogluon_benchmark.preprocessing.amlb_preprocessor import AMLBPreprocessor +from tabrepo.repository.repo_utils import convert_time_infer_s_from_batch_to_sample +from tabrepo.repository.abstract_repository import AbstractRepository + + +def load_ag11_bq_baseline(datasets: list[str], folds: list[int], repo: AbstractRepository) -> pd.DataFrame: + ag12_raw = load_pd.load(f"s3://automl-benchmark-ag/aggregated/ec2/2024_10_25/results.csv") + + df_processed_ag12_2024: pd.DataFrame = AMLBPreprocessor(framework_suffix="2024_10_25").transform(df=ag12_raw) + df_processed_ag12_2024 = df_processed_ag12_2024[df_processed_ag12_2024["framework"] == "AutoGluon_bq_mainline_4h8c_2024_10_25"] + df_processed_ag12_2024 = df_processed_ag12_2024[df_processed_ag12_2024["dataset"].isin(datasets)] + df_processed_ag12_2024 = df_processed_ag12_2024[df_processed_ag12_2024["fold"].isin(folds)] + + df_processed_ag12_2024["metric"] = df_processed_ag12_2024["metric"].map({ + "auc": "roc_auc", + "neg_logloss": "log_loss", + }) + + baseline_fillna = "AutoGluon_bq_4h8c_2023_11_14" + baseline_df = copy.deepcopy(repo._zeroshot_context.df_baselines) + baseline_df = baseline_df.drop(columns=["task"]) + baseline_df = baseline_df[baseline_df["framework"] == baseline_fillna] + + df_processed_ag12_2024 = convert_time_infer_s_from_batch_to_sample(df_processed_ag12_2024, repo=repo) + df_processed_ag12_2024_ref = df_processed_ag12_2024.set_index(["dataset", "fold"]) + + fillna_rows = [] + for dataset in datasets: + for fold in folds: + if (dataset, fold) not in df_processed_ag12_2024_ref.index: + print(dataset, fold) + fillna_row = baseline_df[(baseline_df["dataset"] == dataset) & (baseline_df["fold"] == fold)] + print(fillna_row) + assert len(fillna_row) == 1 + fillna_rows.append(fillna_row) + + if fillna_rows: + fillna_rows = pd.concat(fillna_rows, axis=0, ignore_index=True) + fillna_rows["framework"] = "AutoGluon_bq_mainline_4h8c_2024_10_25" + df_processed_ag12_2024 = pd.concat([df_processed_ag12_2024, fillna_rows], ignore_index=True) + df_processed_ag12_2024 = df_processed_ag12_2024[baseline_df.columns] + + df_processed_ag12_2024["framework"] = "AutoGluon_bq_4h8c_2024_10_25" + + return df_processed_ag12_2024 diff --git a/examples/run_scripts_v6/temp_script_v6.py b/examples/run_scripts_v6/temp_script_v6.py new file mode 100644 index 00000000..262a69e8 --- /dev/null +++ b/examples/run_scripts_v6/temp_script_v6.py @@ -0,0 +1,195 @@ +from __future__ import annotations + +import logging +import os +import pandas as pd +import datetime as dt + +from tabrepo.scripts_v6 import logging_config +from tabrepo.scripts_v6.logging_config import utils_logger as log +from tabrepo import load_repository, EvaluationRepository +from tabrepo.scripts_v6.LGBM_class import CustomLGBM +from tabrepo.utils.experiment_utils_v6 import run_experiments, convert_leaderboard_to_configs + + +def datetime_iso(datetime=None, date=True, time=True, micros=False, date_sep='-', datetime_sep='T', time_sep=':', + micros_sep='.', no_sep=False): + """ + + :param date: + :param time: + :param micros: + :param date_sep: + :param time_sep: + :param datetime_sep: + :param micros_sep: + :param no_sep: if True then all separators are taken as empty string + :return: + """ + if no_sep: + date_sep = time_sep = datetime_sep = micros_sep = '' + strf = "" + if date: + strf += "%Y{_}%m{_}%d".format(_=date_sep) + if time: + strf += datetime_sep + if time: + strf += "%H{_}%M{_}%S".format(_=time_sep) + if micros: + strf += "{_}%f".format(_=micros_sep) + datetime = dt.datetime.utcnow() if datetime is None else datetime + return datetime.strftime(strf) + + +def output_dirs(root=None, subdirs=None, create=False): + root = root if root is not None else '.' + if create: + os.makedirs(root, exist_ok=True) + + dirs = { + 'root': root, + } + + if subdirs is not None: + if isinstance(subdirs, str): + subdirs = [subdirs] + + for d in subdirs: + subdir_path = os.path.join(root, d) + dirs[d] = subdir_path + if create: + os.makedirs(subdir_path, exist_ok=True) + + return dirs + + +script_name = os.path.splitext(os.path.basename(__file__))[0] +now_str = datetime_iso(date_sep='', time_sep='') +log_dir = output_dirs(subdirs='logs', create=True)['logs'] +logging_config.setup(log_file=os.path.join(log_dir, '{script}.{now}.log'.format(script=script_name, now=now_str)), + root_file=os.path.join(log_dir, '{script}.{now}.full.log'.format(script=script_name, now=now_str)), + root_level=logging.DEBUG, app_level=logging.INFO, console_level=logging.INFO, print_to_log=True) + +if __name__ == '__main__': + + log.info("Starting execution script...") + log.debug(f"Logs are stored in: {log_dir}") + + context_name = "D244_F3_C1530_30" + log.info(f"Loading repository for context: {context_name}") + try: + repo: EvaluationRepository = load_repository(context_name, cache=True) + log.info("Repository loaded successfully.") + except Exception as e: + log.error(f"Failed to load repository: {e}", exc_info=True) + raise + + expname = "./initial_experiment_tabpfn_v6" # folder location of all experiment artifacts + ignore_cache = True # set to True to overwrite existing caches and re-run experiments from scratch + + # To run everything: + # datasets = repo.datasets + # folds = repo.folds + folds = [0] + # datasets = [ + # "blood-transfusion-service-center", # binary + # "Australian", # binary + # "balance-scale", # multiclass + # # "MIP-2016-regression", # regression + # ] + + datasets = [ + "blood-transfusion-service-center", # binary + ] + log.info(f"Selected Datasets: {datasets}") + log.info(f"Folds to run: {folds}") + + try: + tids = [repo.dataset_to_tid(dataset) for dataset in datasets] + except Exception as e: + log.warning(f"Some datasets may not belong to the repository: {e}", exc_info=True) + + methods_dict = { + "LightGBM": { + "learning_rate": 0.15, + "num_leaves": 32, + "verbose": -1, # To suppress warnings + }, + } + method_cls_dict = { + "LightGBM": CustomLGBM, + } + methods = list(methods_dict.keys()) + log.info(f"Methods to run: {methods}") + + log.info("Running experiments...") + try: + results_lst = run_experiments( + expname=expname, + tids=tids, + folds=folds, + methods=methods, + methods_dict=methods_dict, + method_cls=method_cls_dict, + task_metadata=repo.task_metadata, + ignore_cache=ignore_cache, + ) + log.info("Experiments Status: Successful.") + except Exception as e: + log.error(f"An error occurred while running experiments: {e}", exc_info=True) + raise + + log.info("Concatenating results into Dataframe...") + try: + results_df = pd.concat(results_lst, ignore_index=True) + except Exception as e: + log.error(f"An error occurred while concatenating results: {e}", exc_info=True) + + log.info("Renaming leaderboard columns... ") + results_df = convert_leaderboard_to_configs(results_df) + with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000): + print(results_df) + + comparison_configs = [ + "RandomForest_c1_BAG_L1", + "ExtraTrees_c1_BAG_L1", + "LightGBM_c1_BAG_L1", + "XGBoost_c1_BAG_L1", + "CatBoost_c1_BAG_L1", + "TabPFN_c1_BAG_L1", + "NeuralNetTorch_c1_BAG_L1", + "NeuralNetFastAI_c1_BAG_L1", + ] + log.info(f"Comparison configs: {comparison_configs}") + + baselines = [ + "AutoGluon_bq_4h8c_2023_11_14", + ] + log.info(f"Baseline: {baselines}") + + log.info(f"Comparing metrics...") + from tabrepo.evaluation.evaluator import Evaluator + evaluator = Evaluator(repo=repo) + try: + metrics = evaluator.compare_metrics( + results_df, + datasets=datasets, + folds=folds, + baselines=baselines, + configs=comparison_configs, + ) + except Exception as e: + log.error(f"An error occurred in compare_metrics(): {e}", exc_info=True) + raise + + with pd.option_context("display.max_rows", None, "display.max_columns", None, "display.width", 1000): + print(f"Config Metrics Example:\n{metrics}") + + log.info("Plotting overall rank comparison...") + try: + evaluator_output = evaluator.plot_overall_rank_comparison( + results_df=metrics, + save_dir=expname, + ) + except Exception as e: + log.error(f"An error occurred in plot_overall_rank_comparison(): {e}", exc_info=True) diff --git a/scripts/tabflow/configs/method_configs.yaml b/scripts/tabflow/configs/method_configs.yaml new file mode 100644 index 00000000..0738e862 --- /dev/null +++ b/scripts/tabflow/configs/method_configs.yaml @@ -0,0 +1,92 @@ +# Type, name and class can be strings like "LGBModel" or can be objects like LGBModel +# Do not worry about data-types, the framework will handle them + +methods: + - type: "AGModelBagExperiment" + name: "LightGBM_c1_BAG_L1_Reproduced-v1" + model_cls: "LGBModel" + model_hyperparameters: + random_state: 0 + n_estimators: 1000 + learning_rate: 0.05 + num_bag_folds: 8 + time_limit: 3600 + calibrate: true + + - type: "AGModelBagExperiment" + name: "LightGBM_c1_BAG_L1_Reproduced-v2" + model_cls: "LGBModel" + model_hyperparameters: + random_state: 0 + n_estimators: 1000 + learning_rate: 0.10 + num_bag_folds: 8 + time_limit: 3600 + calibrate: true + + - type: "AGModelBagExperiment" + name: "LightGBM_c1_BAG_L1_Reproduced-v3" + model_cls: "LGBModel" + model_hyperparameters: + random_state: 0 + n_estimators: 1000 + learning_rate: 0.15 + num_bag_folds: 8 + time_limit: 3600 + calibrate: true + + - type: "AGModelBagExperiment" + name: "RealMLP_c2_BAG_L1_TD_v2_SM" + model_cls: "RealMLPModel" + model_hyperparameters: + random_state: 0 + use_early_stopping: false + use_ls: "auto" + bool_to_cat: true + impute_bool: false + name_categories: true + num_bag_folds: 8 + time_limit: 3600 + + - type: "AGModelBagExperiment" + name: "RealMLP_c2_BAG_L1_TD_v2_SM_ES" + model_cls: "RealMLPModel" + model_hyperparameters: + random_state: 0 + use_early_stopping: true + use_ls: "auto" + bool_to_cat: true + impute_bool: false + name_categories: true + num_bag_folds: 8 + time_limit: 3600 + + # - type: "AGModelBagExperiment" + # name: "LightGBM_BAG_L1_Custom" + # model_cls: "LGBModel" + # model_hyperparameters: + # num_boost_round: 100 + # learning_rate: 0.02 + # ag_args_fit: { + # early_stop: None + # } + # num_bag_folds: 8 + # time_limit: 3600 + + # - type: "AGModelBagExperiment" + # name: "XGBoost_c1_BAG_L1_Reproduced" + # model_cls: "XGBoostModel" + # model_hyperparameters: + # random_state: 0 + # n_estimators: 1000 + # num_bag_folds: 8 + # time_limit: 3600 + # calibrate: true + + # - type: "Experiment" + # name: "LightGBM_Custom" + # method_cls: "SimpleLightGBM" + # method_kwargs: + # hyperparameters: + # learning_rate: 0.05 + \ No newline at end of file diff --git a/scripts/tabflow/docker/Dockerfile_SM b/scripts/tabflow/docker/Dockerfile_SM new file mode 100644 index 00000000..866db6a5 --- /dev/null +++ b/scripts/tabflow/docker/Dockerfile_SM @@ -0,0 +1,23 @@ +# FROM 763104351884.dkr.ecr.us-west-2.amazonaws.com/autogluon-training:1.1.1-cpu-py311-ubuntu20.04 +FROM 763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.5.1-cpu-py311-ubuntu22.04-sagemaker + +# Source install Autogluon Mainline - get changes after refactor +RUN git clone https://github.com/autogluon/autogluon +RUN cd autogluon && ./full_install.sh +RUN cd .. + +COPY ../../ . +COPY ./tabflow/src/evaluate.py . + +# Install the required packages +RUN pip install -e tabrepo \ + && pip install -e autogluon-bench \ + && pip install -e autogluon-benchmark + +# Install pytabkit and seaborn +RUN pip install pytabkit seaborn + +# Generate configs and give permissions +RUN chmod +x ./tabrepo/scripts/run_generate_all_configs.py +RUN python ./tabrepo/scripts/run_generate_all_configs.py +RUN chmod +x ./evaluate.py \ No newline at end of file diff --git a/scripts/tabflow/docker/build_docker.sh b/scripts/tabflow/docker/build_docker.sh new file mode 100755 index 00000000..f863a599 --- /dev/null +++ b/scripts/tabflow/docker/build_docker.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Check if the correct number of arguments are provided +if [ "$#" -ne 5 ]; then + echo "Usage: $0 " + exit 1 +fi + +REPO_NAME=$1 +TAG=$2 +SOURCE_ACCOUNT=$3 +TARGET_ACCOUNT=$4 +REGION=$5 + +# Login to AWS ECR +aws ecr get-login-password --region ${REGION} | docker login --username AWS --password-stdin ${SOURCE_ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com +aws ecr get-login-password --region ${REGION} | docker login --username AWS --password-stdin ${TARGET_ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com + +# Build the Docker image +docker build -t ${REPO_NAME} -f ./Dockerfile_SM ../.. + +# Tag the Docker image +docker tag ${REPO_NAME}:latest ${TARGET_ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${REPO_NAME}:${TAG} + +# Push the Docker image to the repository +docker push ${TARGET_ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${REPO_NAME}:${TAG} \ No newline at end of file diff --git a/scripts/tabflow/pyproject.toml b/scripts/tabflow/pyproject.toml new file mode 100644 index 00000000..1b476f19 --- /dev/null +++ b/scripts/tabflow/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tabflow" +version = "0.0.1" +requires-python = ">=3.10" +dependencies = [ + "boto3", + "sagemaker", + "pyyaml", + "tabrepo", + "autogluon-bench", + "autogluon-benchmark", + "autogluon", +] + +[project.scripts] +tabflow = "tabflow.launch_jobs:main" + + +[tool.setuptools] +package-dir = {"" = "src"} +packages = ["tabflow"] diff --git a/scripts/tabflow/src/tabflow/__init__.py b/scripts/tabflow/src/tabflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/tabflow/src/tabflow/evaluate.py b/scripts/tabflow/src/tabflow/evaluate.py new file mode 100644 index 00000000..185f2404 --- /dev/null +++ b/scripts/tabflow/src/tabflow/evaluate.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import argparse +import yaml +import pandas as pd +import json + + +from tabflow.utils import parse_method +from tabrepo import EvaluationRepository +from tabrepo.benchmark.experiment import ExperimentBatchRunner, AGModelBagExperiment, Experiment +from tabrepo.benchmark.models.simple import SimpleLightGBM +from autogluon.tabular.models import * +from tabrepo.benchmark.models.ag import * + +# from tabrepo import EvaluationRepository, EvaluationRepositoryCollection, Evaluator +# from tabrepo.benchmark.models.wrapper.AutoGluon_class import AGWrapper +# from tabrepo.benchmark.models.ag import RealMLPModel + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument('--experiment_name', type=str, required=True, help="Name of the experiment") + parser.add_argument('--context_name', type=str, required=True, help="Name of the context") + parser.add_argument('--datasets', nargs='+', type=str, required=True, help="List of datasets to evaluate") + parser.add_argument('--folds', nargs='+', type=int, required=True, help="List of folds to evaluate") + parser.add_argument('--method_name', type=str, required=True, help="Name of the method") + parser.add_argument('--method', type=str, required=True, help="Method to evaluate, dict as JSON string") + parser.add_argument('--s3_bucket', type=str, required=True, help="S3 bucket for the experiment") + + args = parser.parse_args() + + # Load Context + context_name = args.context_name + expname = args.experiment_name + ignore_cache = False # set to True to overwrite existing caches and re-run experiments from scratch + + #TODO: Download the repo without pred-proba + repo_og: EvaluationRepository = EvaluationRepository.from_context(context_name, cache=True) + + datasets = args.datasets + folds = args.folds + + method_dict = json.loads(args.method) + print(f"Method dict: {method_dict}") + methods = parse_method(method_dict, globals()) + print("\nMethods: ", methods) + + repo: EvaluationRepository = ExperimentBatchRunner(expname=expname, task_metadata=repo_og.task_metadata).run( + datasets=datasets, + folds=folds, + methods=[methods], + ignore_cache=ignore_cache, + mode="aws", + s3_bucket=args.s3_bucket, + ) \ No newline at end of file diff --git a/scripts/tabflow/src/tabflow/launch_jobs.py b/scripts/tabflow/src/tabflow/launch_jobs.py new file mode 100644 index 00000000..acbd4f47 --- /dev/null +++ b/scripts/tabflow/src/tabflow/launch_jobs.py @@ -0,0 +1,333 @@ +import boto3 +import sagemaker +import argparse +import json +import time +import random +import logging +import uuid + +from botocore.config import Config +from datetime import datetime +from pathlib import Path +from tabrepo import EvaluationRepository +from tabflow.utils import sanitize_job_name, check_s3_file_exists, yaml_to_methods + +logging.getLogger('botocore').setLevel(logging.ERROR) +logging.getLogger('sagemaker').setLevel(logging.ERROR) +logging.getLogger('boto3').setLevel(logging.ERROR) + + +DOCKER_IMAGE_ALIASES = { + "mlflow-image": "{ACCOUNT}.dkr.ecr.{REGION}.amazonaws.com/{REPO}:{IMAGE_TAG}", +} + +def save_training_job_logs(sagemaker_client, s3_client, job_name, bucket, cache_path): + """ + Retrieve logs for a completed SageMaker training job and save to S3. + + Args: + sagemaker_client: Boto3 SageMaker client + s3_client: Boto3 S3 client + job_name: Name of the SageMaker training job + bucket: S3 bucket name + cache_path: Path prefix where the logs should be saved (without the .log extension) + """ + try: + # Create CloudWatch logs client + standard log_group + retry_config = Config( + connect_timeout=5, + read_timeout=10, + retries={'max_attempts':20, + 'mode':'adaptive', + } + ) + cloudwatch_logs = boto3.client('logs', config=retry_config) + log_group ='/aws/sagemaker/TrainingJobs' + + response = cloudwatch_logs.describe_log_streams( + logGroupName=log_group, + logStreamNamePrefix=job_name + ) + + # Find the matching log stream + log_stream = None + for stream in response.get('logStreams', []): + if stream['logStreamName'].startswith(job_name): + log_stream = stream['logStreamName'] + break + + if not log_stream: + print(f"No log stream found for job {job_name}") + return + + # Get the logs + logs_response = cloudwatch_logs.get_log_events( + logGroupName=log_group, + logStreamName=log_stream + ) + + # Compile the log messages + log_content = "" + for event in logs_response['events']: + log_content += f"{event['message']}\n" + + # Continue retrieving logs if there are more + while 'nextForwardToken' in logs_response: + next_token = logs_response['nextForwardToken'] + logs_response = cloudwatch_logs.get_log_events( + logGroupName=log_group, + logStreamName=log_stream, + nextToken=next_token + ) + + # If no more new logs, break + if next_token == logs_response['nextForwardToken']: + break + + for event in logs_response['events']: + log_content += f"{event['message']}\n" + + # Save logs to S3 + log_file_path = f"{cache_path}/full_log.log" + s3_client.put_object( + Body=log_content.encode('utf-8'), + Bucket=bucket, + Key=log_file_path + ) + print(f"Logs saved to s3://{bucket}/{log_file_path}") + + except Exception as e: + logging.exception(f"Error saving logs for job {job_name}") + # print(f"Error saving logs for job {job_name}: {e}") + +class TrainingJobResourceManager: + def __init__(self, sagemaker_client, max_concurrent_jobs): + self.sagemaker_client = sagemaker_client + self.max_concurrent_jobs = max_concurrent_jobs + self.job_names = [] + self.job_cache_paths = {} # Job Name -> Training Log + + def add_job(self, job_name, cache_path): + self.job_names.append(job_name) + self.job_cache_paths[job_name] = cache_path + + def remove_completed_jobs(self, s3_client, s3_bucket): + completed_jobs = [] + for job_name in self.job_names: + response = self.sagemaker_client.describe_training_job(TrainingJobName=job_name) #FIXME:Throttling will happen here if Queue is too big + job_status = response['TrainingJobStatus'] + if job_status in ['Completed', 'Failed', 'Stopped']: + save_training_job_logs( + self.sagemaker_client, + s3_client, + job_name, + s3_bucket, + self.job_cache_paths[job_name] + ) + completed_jobs.append(job_name) + for job_name in completed_jobs: + self.job_names.remove(job_name) + + def wait_for_available_slot(self, s3_client, s3_bucket, poll_interval=10): + while True: + if len(self.job_names) < self.max_concurrent_jobs: + return len(self.job_names) + self.remove_completed_jobs(s3_client=s3_client, s3_bucket=s3_bucket) + print(f"Currently running {len(self.job_names)}/{self.max_concurrent_jobs} concurrent jobs. Waiting...") + time.sleep(poll_interval) + + def wait_for_all_jobs(self, s3_client, s3_bucket, poll_interval=10): + # Wait for a non-zero value + while self.job_names: + self.remove_completed_jobs(s3_client=s3_client, s3_bucket=s3_bucket) + print(f"Waiting for {len(self.job_names)} jobs to complete...") + time.sleep(poll_interval) + + +def launch_jobs( + experiment_name: str = "tabflow-test-cache", + context_name: str = "D244_F3_C1530_30", # 30 datasets. To run larger, set to "D244_F3_C1530_200" + entry_point: str = "evaluate.py", + source_dir: str = str(Path(__file__).parent), + instance_type: str = "ml.m6i.4xlarge", + docker_image_uri: str = "mlflow-image", + sagemaker_role: str = "arn:aws:iam::{ACCOUNT_ID}:role/service-role/{ROLE}", + aws_profile: str | None = None, + hyperparameters: dict = None, + keep_alive_period_in_seconds: int = 3600, + limit_runtime: int = 24 * 60 * 60, + datasets: list = None, + folds: list = None, + methods_file: str = "methods.yaml", + max_concurrent_jobs: int = 30, + max_retry_attempts: int = 20, + s3_bucket: str = "test-bucket", + add_timestamp: bool = False, + wait: bool = False, +) -> None: + """ + Launch multiple SageMaker training jobs. + + Args: + experiment_name: Name of the experiment + entry_point: The Python script to run + source_dir: Directory containing the training code + instance_type: SageMaker instance type + docker_image_uri: Docker image to use + sagemaker_role: AWS IAM role for SageMaker + aws_profile: AWS profile name + hyperparameters: Dictionary of hyperparameters to pass to the training script + keep_alive_period_in_seconds: Idle time before terminating the instance + limit_runtime: Maximum running time in seconds + datasets: List of datasets to evaluate + folds: List of folds to evaluate + methods_file: Path to the YAML file containing methods + max_concurrent_jobs: Maximum number of concurrent jobs, based on account limit + S3 bucket: S3 bucket to store the results + add_timestamp: Whether to add a timestamp to the experiment name + wait: Whether to wait for all jobs to complete + """ + + if add_timestamp: + timestamp = datetime.now().strftime("%d-%b-%Y-%H:%M:%S.%f")[:-3] + experiment_name = f"{experiment_name}-{timestamp}" + + # Create boto3 session + boto_session = boto3.Session(profile_name=aws_profile) if aws_profile else boto3.Session() + # Create SageMaker session + retry config + retry_config = Config( + connect_timeout=5, + read_timeout=10, + retries={'max_attempts':max_retry_attempts, + 'mode':'adaptive', + } + ) + sagemaker_client = boto_session.client('sagemaker', config=retry_config) + sagemaker_session = sagemaker.Session(boto_session=boto_session, sagemaker_client=sagemaker_client) + # Create S3 client + s3_client = boto_session.client('s3', config=retry_config) + + # Initialize the resource manager + resource_manager = TrainingJobResourceManager(sagemaker_client=sagemaker_client, max_concurrent_jobs=max_concurrent_jobs) + + methods = yaml_to_methods(methods_file=methods_file) + + repo_og: EvaluationRepository = EvaluationRepository.from_context(context_name, cache=True) + + if "run_all" in datasets: + datasets = repo_og.datasets() + else: + datasets = datasets + + if -1 in folds: + folds = repo_og.folds + else: + folds = folds + + total_jobs = len(datasets) * len(folds) * len(methods) + total_launched_jobs = 0 + + print(f"Preparing to launch {total_jobs} jobs with max concurrency of {max_concurrent_jobs}") + print(f"Instance keep-alive period set to {keep_alive_period_in_seconds} seconds to enable warm-starts") + + try: + for dataset in datasets: + for fold in folds: + for method in methods: + + method_name = method['name'] + cache_path = f"{experiment_name}/data/{method_name}/{repo_og.dataset_to_tid(dataset)}/{fold}" + + cache_name = f"{experiment_name}/data/{method_name}/{repo_og.dataset_to_tid(dataset)}/{fold}/results.pkl" + + # Change this check based on literals name_first or task_first + if check_s3_file_exists(s3_client=s3_client, bucket=s3_bucket, cache_name=cache_name): + print(f"Cache exists for {method_name} on dataset {dataset} fold {fold}. Skipping job launch.") + print(f"Cache path: s3://{s3_bucket}/{cache_path}\n") + continue + + current_jobs = resource_manager.wait_for_available_slot(s3_client=s3_client, s3_bucket=s3_bucket) + print(f"\nSlot available. Currently running {current_jobs}/{max_concurrent_jobs} jobs") + + # Create a unique job name + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + unique_id = str(uuid.uuid4().int)[:19] + base_name = f"{dataset[:4]}-f{fold}-{method_name[:4]}-{timestamp}-{unique_id}" + job_name = sanitize_job_name(base_name) + + + if docker_image_uri in DOCKER_IMAGE_ALIASES: + print(f"Expanding docker_image_uri alias '{docker_image_uri}' -> '{DOCKER_IMAGE_ALIASES[docker_image_uri]}'") + docker_image_uri = DOCKER_IMAGE_ALIASES[docker_image_uri] + + # Update hyperparameters for this job + job_hyperparameters = hyperparameters.copy() if hyperparameters else {} + job_hyperparameters.update({ + "experiment_name": experiment_name, + "context_name": context_name, + "dataset": dataset, + "fold": fold, # NOTE: Can be a 'str' as well, refer to Estimators in SM docs + "method_name": method_name, + "method": f"'{json.dumps(method)}'", + "s3_bucket": s3_bucket, + }) + + # Create the estimator + estimator = sagemaker.estimator.Estimator( + entry_point=entry_point, + source_dir=source_dir, + image_uri=docker_image_uri, + role=sagemaker_role, + instance_count=1, + instance_type=instance_type, + sagemaker_session=sagemaker_session, + hyperparameters=job_hyperparameters, + keep_alive_period_in_seconds=keep_alive_period_in_seconds, + max_run=limit_runtime, + disable_profiler=True, # Prevent debug profiler from running + ) + + # Launch the training job + estimator.fit(wait=False, job_name=job_name) + resource_manager.add_job(job_name=job_name, cache_path=cache_path) + total_launched_jobs += 1 + print(f"Launched job {total_launched_jobs} out of a total of {total_jobs} jobs: {job_name}\n") + + if wait: + resource_manager.wait_for_all_jobs(s3_client=s3_client, s3_bucket=s3_bucket) + except Exception as e: + print(f"Error launching jobs: {e}") + raise + + +def main(): + """Entrypoint for CLI""" + parser = argparse.ArgumentParser() + parser.add_argument('--experiment_name', type=str, default="tabflow-test-cache", help="Name of the experiment") + parser.add_argument('--context_name', type=str, default="D244_F3_C1530_30", help="Name of the context") + parser.add_argument('--datasets', nargs='+', type=str, required=True, help="List of datasets to evaluate") + parser.add_argument('--folds', nargs='+', type=int, required=True, help="List of folds to evaluate") + parser.add_argument('--methods_file', type=str, required=True, help="Path to the YAML file containing methods") + parser.add_argument('--max_concurrent_jobs', type=int, default=50, + help="Maximum number of concurrent jobs, based on account limit") + parser.add_argument('--s3_bucket', type=str, default="test-bucket", help="S3 bucket for the experiment") + parser.add_argument('--add_timestamp', action='store_true', help="Add timestamp to the experiment name") + parser.add_argument('--wait', action='store_true', help="Wait for all jobs to complete") + + args = parser.parse_args() + + launch_jobs( + experiment_name=args.experiment_name, + context_name=args.context_name, + datasets=args.datasets, + folds=args.folds, + methods_file=args.methods_file, + max_concurrent_jobs=args.max_concurrent_jobs, + s3_bucket=args.s3_bucket, + wait=args.wait, + ) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/tabflow/src/tabflow/utils.py b/scripts/tabflow/src/tabflow/utils.py new file mode 100644 index 00000000..2ea27506 --- /dev/null +++ b/scripts/tabflow/src/tabflow/utils.py @@ -0,0 +1,72 @@ +import re +import yaml +import boto3 + +from tabrepo.benchmark.models.model_register import infer_model_cls + + +def yaml_to_methods(methods_file: str) -> list: + with open(methods_file, 'r') as file: + methods_config = yaml.safe_load(file) + + return methods_config['methods'] + +def parse_method(method_config: dict, context=None): + + if context is None: + context = globals() + # Convert string class names to actual class references + # This assumes the classes are already defined or imported in evaluate.py + if 'model_cls' in method_config: + method_config["model_cls"] = infer_model_cls(method_config["model_cls"]) + # method_config['model_cls'] = eval(method_config['model_cls'], context) + if 'method_cls' in method_config: + method_config['method_cls'] = eval(method_config['method_cls'], context) + + # Evaluate all values in ag_args_fit + if "model_hyperparameters" in method_config: + if "ag_args_fit" in method_config["model_hyperparameters"]: + for key, value in method_config["model_hyperparameters"]["ag_args_fit"].items(): + if isinstance(value, str): + try: + method_config["model_hyperparameters"]["ag_args_fit"][key] = eval(value, context) + except NameError: + pass # If eval fails, keep the original string value + + + + method_type = eval(method_config.pop('type'), context) + method_obj = method_type(**method_config) + return method_obj + +def sanitize_job_name(name: str) -> str: + """ + Sanitize the job name to meet SageMaker requirements: + - Must be 1-63 characters long + - Must use only alphanumeric characters and hyphens + - Must start with a letter or number + - Must not end with a hyphen + """ + # Replace invalid characters with hyphens + name = re.sub('[^a-zA-Z0-9-]', '-', name) + # Remove consecutive hyphens + name = re.sub('-+', '-', name) + # Remove leading/trailing hyphens + name = name.strip('-') + # Ensure it starts with a letter or number + if not name[0].isalnum(): + name = 'j-' + name + # Truncate to 63 characters + return name[:63] + +def check_s3_file_exists(s3_client, bucket: str, cache_name: str) -> bool: + s3_path = f"s3://{bucket}/{cache_name}" + bucket, key = s3_path.replace("s3://", "").split("/", 1) + try: + s3_client.head_object(Bucket=bucket, Key=key) + return True + except s3_client.exceptions.ClientError as e: + if e.response['Error']['Code'] == '404': + return False + else: + raise \ No newline at end of file diff --git a/tabrepo/benchmark/experiment/experiment_utils.py b/tabrepo/benchmark/experiment/experiment_utils.py index 6acb67ed..96cb9bd2 100644 --- a/tabrepo/benchmark/experiment/experiment_utils.py +++ b/tabrepo/benchmark/experiment/experiment_utils.py @@ -54,6 +54,8 @@ def run( datasets: list[str], folds: list[int], ignore_cache: bool = False, + mode: str = "local", + s3_bucket: str = "test-bucket", ) -> list[dict[str, Any]]: """ @@ -87,6 +89,8 @@ def run( cache_cls=self.cache_cls, cache_cls_kwargs=self.cache_cls_kwargs, cache_path_format=self.cache_path_format, + mode=mode, + s3_bucket=s3_bucket, ) def load_results( @@ -150,6 +154,8 @@ def generate_repo_from_experiments( methods: list[Experiment], ignore_cache: bool, convert_time_infer_s_from_batch_to_sample: bool = True, + mode="local", + s3_bucket: str = "test-bucket", ) -> EvaluationRepository: """ @@ -171,6 +177,8 @@ def generate_repo_from_experiments( folds=folds, methods=methods, ignore_cache=ignore_cache, + mode=mode, + s3_bucket=s3_bucket, ) repo = self.repo_from_results( @@ -287,6 +295,8 @@ def run_experiments( cache_cls: Type[AbstractCacheFunction] | None = CacheFunctionPickle, cache_cls_kwargs: dict = None, cache_path_format: Literal["name_first", "task_first"] = "name_first", + mode: str = "local", + s3_bucket: str = "test-bucket", ) -> list[dict]: """ @@ -301,6 +311,8 @@ def run_experiments( cache_cls: WIP cache_cls_kwargs: WIP cache_path_format: {"name_first", "task_first"}, default "name_first" + mode: {"local", "aws"}, default "local" + S3_bucket: str, default "test-bucket" works only for aws mode, stores artifacts in the given bucket Returns ------- @@ -311,6 +323,12 @@ def run_experiments( if cache_cls_kwargs is None: cache_cls_kwargs = {} + # Modify cache path based on mode + if mode == "local": + base_cache_path = expname + else: + base_cache_path = f"s3://{s3_bucket}/{expname}" + methods_og = methods methods = [] for method in methods_og: @@ -359,7 +377,7 @@ def run_experiments( f"\tFitting {task_name} on fold {fold} for method {method.name}" ) - cacher = cache_cls(cache_name=cache_name, cache_path=expname, **cache_cls_kwargs) + cacher = cache_cls(cache_name=cache_name, cache_path=base_cache_path, **cache_cls_kwargs) if task is None: if ignore_cache or not cacher.exists: diff --git a/tabrepo/benchmark/run_compute_dtypes.py b/tabrepo/benchmark/run_compute_dtypes.py new file mode 100644 index 00000000..d55ffa42 --- /dev/null +++ b/tabrepo/benchmark/run_compute_dtypes.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +from collections import defaultdict + +import autogluon.common.loaders.load_json +from tabrepo import EvaluationRepository +from autogluon.common.loaders import load_pd +from autogluon.common.savers import save_pd +from autogluon.features import AutoMLPipelineFeatureGenerator, AsTypeFeatureGenerator + +import pandas as pd + + +def get_feature_info(repo: EvaluationRepository) -> pd.DataFrame: + datasets = repo.datasets() + + dataset_infos = {} + + from_csv = False + for dataset in datasets: + print(dataset) + task = repo.get_openml_task(dataset=dataset) + + X, y, X_test, y_test = task.get_train_test_split(fold=0) + if from_csv: + save_pd.save(path="tmp.csv", df=X) + X = load_pd.load(path="tmp.csv") + X.index = y.index + + # feature_generator = AutoMLPipelineFeatureGenerator() + feature_generator_2 = AsTypeFeatureGenerator() + + # X_transform = feature_generator.fit_transform(X=X, y=y) + X_transform_2 = feature_generator_2.fit_transform(X=X, y=y) + + # feature_metadata = feature_generator.feature_metadata + feature_metadata_2 = feature_generator_2.feature_metadata + feature_metadata = feature_metadata_2 + + num_each_type_raw = defaultdict(int) + num_each_type_special = defaultdict(int) + + type_map_raw = feature_metadata.type_map_raw + for k, v in type_map_raw.items(): + num_each_type_raw[v] += 1 + type_special = feature_metadata.get_feature_types_special(feature=k) + num_each_type_special[(v, tuple(sorted(type_special)))] += 1 + + dataset_infos[dataset] = { + "num_each_type_raw": num_each_type_raw, + "num_each_type_special": num_each_type_special, + } + + series_lst = [] + + for dataset in dataset_infos: + cur_info = dataset_infos[dataset] + num_each_type_raw = cur_info["num_each_type_raw"] + num_each_type_special = cur_info["num_each_type_special"] + + b = pd.Series(data=num_each_type_special, name=dataset) + series_lst.append(b) + + df_out = pd.concat(series_lst, axis=1).fillna(0).astype(int).T + + with pd.option_context("display.max_rows", None, "display.max_columns", None, "display.width", 1000): + print(df_out) + + return df_out + + +if __name__ == '__main__': + context_name = "D244_F3_C1530_10" + repo: EvaluationRepository = EvaluationRepository.from_context(context_name, cache=True) + + df_out = get_feature_info(repo) + + a = df_out[("int", ("bool",))] + print(a) + + b = a[a > 0] + print(b) + + # datasets = repo.datasets() + # + # dataset_infos = {} + # + # for dataset in datasets: + # print(dataset) + # task = repo.get_openml_task(dataset=dataset) + # + # X, y, X_test, y_test = task.get_train_test_split(fold=0) + # + # # feature_generator = AutoMLPipelineFeatureGenerator() + # feature_generator_2 = AsTypeFeatureGenerator() + # + # + # # X_transform = feature_generator.fit_transform(X=X, y=y) + # X_transform_2 = feature_generator_2.fit_transform(X=X, y=y) + # + # # feature_metadata = feature_generator.feature_metadata + # feature_metadata_2 = feature_generator_2.feature_metadata + # feature_metadata = feature_metadata_2 + # + # num_each_type_raw = defaultdict(int) + # num_each_type_special = defaultdict(int) + # + # type_map_raw = feature_metadata.type_map_raw + # for k, v in type_map_raw.items(): + # num_each_type_raw[v] += 1 + # type_special = feature_metadata.get_feature_types_special(feature=k) + # num_each_type_special[(v, tuple(sorted(type_special)))] += 1 + # + # dataset_infos[dataset] = { + # "num_each_type_raw": num_each_type_raw, + # "num_each_type_special": num_each_type_special, + # } + # + # series_lst = [] + # + # for dataset in dataset_infos: + # cur_info = dataset_infos[dataset] + # num_each_type_raw = cur_info["num_each_type_raw"] + # num_each_type_special = cur_info["num_each_type_special"] + # + # import pandas as pd + # b = pd.Series(data=num_each_type_special, name=dataset) + # series_lst.append(b) + # + # df_out = pd.concat(series_lst, axis=1).fillna(0).astype(int).T + # + # with pd.option_context("display.max_rows", None, "display.max_columns", None, "display.width", 1000): + # print(df_out) + # + + +""" + int float int object + () () (bool,) () +MIP-2016-regression 4 131 8 1 +Moneyball 4 7 2 1 +arcene 5969 3929 102 0 +boston 1 11 1 0 +dresses-sales 0 1 0 11 +fri_c3_500_50 0 50 0 0 +pm10 1 6 0 0 +sensory 7 0 4 0 +socmob 0 1 2 2 +tecator 0 124 0 0 + + int float int category + () () (bool,) () +MIP-2016-regression 4 131 8 1 +Moneyball 3 5 2 4 +arcene 5969 3929 102 0 +boston 0 11 1 1 +dresses-sales 0 1 0 11 +fri_c3_500_50 0 50 0 0 +pm10 1 6 0 0 +sensory 0 0 4 7 +socmob 0 1 2 2 +tecator 0 124 0 0 + +""" \ No newline at end of file diff --git a/tabrepo/scripts_v5/__init__.py b/tabrepo/scripts_v5/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tabrepo/scripts_v6/LGBM_class.py b/tabrepo/scripts_v6/LGBM_class.py new file mode 100644 index 00000000..9f650821 --- /dev/null +++ b/tabrepo/scripts_v6/LGBM_class.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import pandas as pd + +from .abstract_class import AbstractExecModel + + +class CustomLGBM(AbstractExecModel): + def get_model_cls(self): + from lightgbm import LGBMClassifier, LGBMRegressor + is_classification = self.problem_type in ['binary', 'multiclass'] + if is_classification: + model_cls = LGBMClassifier + elif self.problem_type == 'regression': + model_cls = LGBMRegressor + else: + raise AssertionError(f"LightGBM does not recognize the problem_type='{self.problem_type}'") + return model_cls + + def _fit(self, X: pd.DataFrame, y: pd.Series, **fit_args): + model_cls = self.get_model_cls() + self.model = model_cls(**fit_args) + self.model.fit( + X=X, + y=y + ) + return self + + def _predict(self, X: pd.DataFrame) -> pd.Series: + y_pred = self.model.predict(X) + return pd.Series(y_pred, index=X.index) + + def _predict_proba(self, X: pd.DataFrame) -> pd.DataFrame: + y_pred_proba = self.model.predict_proba(X) + return pd.DataFrame(y_pred_proba, columns=self.model.classes_, index=X.index) diff --git a/tabrepo/scripts_v6/__init__.py b/tabrepo/scripts_v6/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tabrepo/scripts_v6/abstract_class.py b/tabrepo/scripts_v6/abstract_class.py new file mode 100644 index 00000000..66ef4f48 --- /dev/null +++ b/tabrepo/scripts_v6/abstract_class.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import pandas as pd +import logging +from autogluon.core.data.label_cleaner import LabelCleaner, LabelCleanerDummy +from autogluon.core.metrics import Scorer +from autogluon.features import AutoMLPipelineFeatureGenerator +from autogluon_benchmark.utils.time_utils import Timer + +log = logging.getLogger(__name__) + +class AbstractExecModel: + + # TODO: Prateek: Find a way to put AutoGluon as default - in the case the user does not want their own class + def __init__( + self, + problem_type: str, + eval_metric: Scorer, + preprocess_data: bool = True, + preprocess_label: bool = True, + ): + self.problem_type = problem_type + self.eval_metric = eval_metric + self.preprocess_data = preprocess_data + self.preprocess_label = preprocess_label + self.label_cleaner: LabelCleaner = None + self._feature_generator = None + + def transform_y(self, y: pd.Series) -> pd.Series: + return self.label_cleaner.transform(y) + + def inverse_transform_y(self, y: pd.Series) -> pd.Series: + return self.label_cleaner.inverse_transform(y) + + def transform_y_pred_proba(self, y_pred_proba: pd.DataFrame) -> pd.DataFrame: + return self.label_cleaner.transform_proba(y_pred_proba, as_pandas=True) + + def inverse_transform_y_pred_proba(self, y_pred_proba: pd.DataFrame) -> pd.DataFrame: + return self.label_cleaner.inverse_transform_proba(y_pred_proba, as_pandas=True) + + def transform_X(self, X: pd.DataFrame) -> pd.DataFrame: + if self.preprocess_data: + return self._feature_generator.transform(X) + return X + + def _preprocess_fit_transform(self, X: pd.DataFrame, y: pd.Series): + if self.preprocess_label: + self.label_cleaner = LabelCleaner.construct(problem_type=self.problem_type, y=y) + else: + self.label_cleaner = LabelCleanerDummy(problem_type=self.problem_type) + if self.preprocess_data: + self._feature_generator = AutoMLPipelineFeatureGenerator() + X = self._feature_generator.fit_transform(X=X, y=y) + y = self.transform_y(y) + return X, y + + # TODO: Prateek, Add a toggle here to see if user wants to fit or fit and predict, also add model saving functionality + # TODO: Nick: Temporary name + def fit_custom(self, X: pd.DataFrame, y: pd.Series, X_test: pd.DataFrame, **fit_args): + ''' + Calls the fit function of the inheriting class and proceeds to perform predictions based on the problem type + + Returns + ------- + dict + Returns predictions, probabilities, fit time and inference time + ''' + log.info("Starting Fit Custom") + with (Timer() as timer_fit): + self.fit(X, y, **fit_args) + + if self.problem_type in ['binary', 'multiclass']: + with Timer() as timer_predict: + y_pred_proba = self.predict_proba(X_test) + y_pred = self.predict_from_proba(y_pred_proba) + else: + with Timer() as timer_predict: + y_pred = self.predict(X_test) + y_pred_proba = None + + out = { + 'predictions': y_pred, + 'probabilities': y_pred_proba, + 'time_fit': timer_fit.duration, + 'time_predict': timer_predict.duration, + } + + return out + + def fit(self, X: pd.DataFrame, y: pd.Series, **fit_args): + X, y = self._preprocess_fit_transform(X=X, y=y) + return self._fit(X=X, y=y, **fit_args) + + def _fit(self, X: pd.DataFrame, y: pd.Series, **fit_args): + raise NotImplementedError + + def predict_from_proba(self, y_pred_proba: pd.DataFrame) -> pd.Series: + return y_pred_proba.idxmax(axis=1) + + def predict(self, X: pd.DataFrame) -> pd.Series: + X = self.transform_X(X=X) + y_pred = self._predict(X) + return self.inverse_transform_y(y=y_pred) + + def _predict(self, X: pd.DataFrame): + raise NotImplementedError + + def predict_proba(self, X: pd.DataFrame) -> pd.DataFrame: + X = self.transform_X(X=X) + y_pred_proba = self._predict_proba(X=X) + return self.inverse_transform_y_pred_proba(y_pred_proba=y_pred_proba) + + def _predict_proba(self, X: pd.DataFrame) -> pd.DataFrame: + raise NotImplementedError diff --git a/tabrepo/scripts_v6/logging_config.py b/tabrepo/scripts_v6/logging_config.py new file mode 100644 index 00000000..aab809c7 --- /dev/null +++ b/tabrepo/scripts_v6/logging_config.py @@ -0,0 +1,147 @@ +""" +**logger** module just exposes a ``setup`` function to quickly configure the python logger. +""" +import datetime as dt +import io +import logging +import sys + +# prevent asap other modules from defining the root logger using basicConfig +logging.basicConfig(handlers=[logging.NullHandler()]) + +utils_logger = logging.getLogger('tabrepo.utils.experiment_utils_v6') +scripts_logger = logging.getLogger('tabrepo.scripts_v6.abstract_class') +singular_logger = logging.getLogger('singular_model') + +logging.TRACE = logging.TRACE if hasattr(logging, 'TRACE') else 5 + + +class MillisFormatter(logging.Formatter): + converter = dt.datetime.fromtimestamp # type: ignore + + def formatTime(self, record, datefmt=None): + ct = self.converter(record.created) + if datefmt: + t = ct.strftime(datefmt) + else: + t = ct.strftime("%Y-%m-%d %H:%M:%S") + s = "%s.%03d" % (t, record.msecs) + return s + + +def setup(log_file=None, root_file=None, root_level=logging.WARNING, app_level=None, console_level=None, + print_to_log=False): + """ + configures the Python logger. + :param log_file: + :param root_file: + :param root_level: + :param app_level: + :param console_level: + :return: + """ + logging.captureWarnings(True) + # warnings = logging.getLogger('py.warnings') + + if not sys.warnoptions: + import warnings + warnings.simplefilter("ignore") + + root = logging.getLogger() + root.setLevel(root_level) + + app_level = app_level if app_level else root_level + console_level = console_level if console_level else app_level + + # create console handler + console = logging.StreamHandler() + console.setLevel(console_level) + utils_logger.addHandler(console) + utils_logger.setLevel(app_level) + scripts_logger.addHandler(console) + scripts_logger.setLevel(app_level) + + file_formatter = MillisFormatter('[%(levelname)s] [%(name)s:%(asctime)s] %(message)s', datefmt='%H:%M:%S') + + if log_file: + # create file handler + app_handler = logging.FileHandler(log_file, mode='a') + app_handler.setLevel(app_level) + app_handler.setFormatter(file_formatter) + utils_logger.addHandler(app_handler) + scripts_logger.addHandler(app_handler) + + if root_file: + root_handler = logging.FileHandler(root_file, mode='a') + root_handler.setLevel(root_level) + root_handler.setFormatter(file_formatter) + root.addHandler(root_handler) + + if print_to_log: + import builtins + nl = '\n' + print_logger = logging.getLogger(utils_logger.name + '.print') + buffer = dict(out=None, err=None) + + ori_print = builtins.print + + def new_print(*args, sep=' ', end=nl, file=None): + if file not in [None, sys.stdout, sys.stderr]: + return ori_print(*args, sep=sep, end=end, file=file) + + nonlocal buffer + buf_type = 'err' if file is sys.stderr else 'out' + buf = buffer[buf_type] + if buf is None: + buf = buffer[buf_type] = io.StringIO() + line = sep.join(map(str, [*args])) + buf.write(line) # "end" newline always added by logger + if end == nl or line.endswith(nl): # flush buffer for every line + with buf: + level = logging.ERROR if buf_type == 'err' else logging.INFO + print_logger.log(level, buf.getvalue()) + buffer[buf_type] = None + + builtins.print = new_print + + +def singular_setup(individual_log_file=None, individual_level=logging.DEBUG, print_to_log=True): + ind_console = logging.StreamHandler() + ind_console.setLevel(individual_level) + singular_logger.addHandler(ind_console) + + file_formatter = MillisFormatter('[%(levelname)s] [%(name)s:%(asctime)s] %(message)s', datefmt='%H:%M:%S') + + individual_handler = logging.FileHandler(individual_log_file, mode='a') + individual_handler.setLevel(individual_level) + individual_handler.setFormatter(file_formatter) + singular_logger.addHandler(individual_handler) + + if print_to_log: + import builtins + nl = '\n' + print_logger = logging.getLogger(utils_logger.name + '.print') + singular_print_logger = logging.getLogger(singular_logger.name + '.print') + buffer = dict(out=None, err=None) + + ori_print = builtins.print + + def new_print(*args, sep=' ', end=nl, file=None): + if file not in [None, sys.stdout, sys.stderr]: + return ori_print(*args, sep=sep, end=end, file=file) + + nonlocal buffer + buf_type = 'err' if file is sys.stderr else 'out' + buf = buffer[buf_type] + if buf is None: + buf = buffer[buf_type] = io.StringIO() + line = sep.join(map(str, [*args])) + buf.write(line) # "end" newline always added by logger + if end == nl or line.endswith(nl): # flush buffer for every line + with buf: + level = logging.ERROR if buf_type == 'err' else logging.INFO + print_logger.log(level, buf.getvalue()) + singular_print_logger.log(level, buf.getvalue()) + buffer[buf_type] = None + + builtins.print = new_print diff --git a/tabrepo/utils/cache.py b/tabrepo/utils/cache.py index f6e6ed8c..7fa0e5f5 100644 --- a/tabrepo/utils/cache.py +++ b/tabrepo/utils/cache.py @@ -5,6 +5,7 @@ import pandas as pd import pickle import sys +import boto3 from pathlib import Path from contextlib import contextmanager from time import perf_counter @@ -13,6 +14,7 @@ from autogluon.common.loaders import load_pkl from autogluon.common.savers import save_pkl +from autogluon.common.utils import s3_utils from tabrepo.utils import catchtime @@ -123,18 +125,44 @@ def __init__(self, cache_name: str, cache_path: Path | str | None = None): # TODO: Remove default_cache_path? cache_path = default_cache_path self.cache_path = cache_path + self.is_s3 = str(cache_path).startswith("s3://") @property def cache_file(self) -> str: + if self.is_s3: + return f"{self.cache_path}/{self.cache_name}.pkl" return str(Path(self.cache_path) / (self.cache_name + ".pkl")) + + @property + def exists(self) -> bool: + if self.is_s3: + try: + s3 = boto3.client('s3') + bucket, key = s3_utils.s3_path_to_bucket_prefix(self.cache_file) + s3.head_object(Bucket=bucket, Key=key) + return True + except s3.exceptions.ClientError as e: + if e.response['Error']['Code'] == '404': + return False + else: + raise + else: + return Path(self.cache_file).exists() def save_cache(self, data: object) -> None: - cache_file = self.cache_file - Path(cache_file).parent.mkdir(parents=True, exist_ok=True) - with open(cache_file, "wb") as f: + if self.is_s3: + s3 = boto3.client('s3') + bucket, key = s3_utils.s3_path_to_bucket_prefix(self.cache_file) cache = pickle.dumps(data) - print(f'Writing cache with size {round(sys.getsizeof(cache) / 1e6, 3)} MB') - f.write(cache) + print(f'Writing cache with size {round(sys.getsizeof(cache) / 1e6, 3)} MB to {self.cache_file}') + s3.put_object(Bucket=bucket, Key=key, Body=cache) + else: + cache_file = self.cache_file + Path(cache_file).parent.mkdir(parents=True, exist_ok=True) + with open(cache_file, "wb") as f: + cache = pickle.dumps(data) + print(f'Writing cache with size {round(sys.getsizeof(cache) / 1e6, 3)} MB') + f.write(cache) def load_cache(self) -> object: with open(self.cache_file, "rb") as f: @@ -206,6 +234,50 @@ def catchtime(name: str, logger=None) -> float: print_fun(f"Time for {name}: {perf_counter() - start:.4f} secs") +# TODO: Delete and use CacheFunctionDF? +@dataclass +class Experiment: + expname: str # name of the parent experiment used to store the file + name: str # name of the specific experiment, e.g. "localsearch" + run_fun: Callable[..., list] # function to execute to obtain results + kwargs: dict = None + + def data(self, ignore_cache: bool = False): + kwargs = self.kwargs + if kwargs is None: + kwargs = {} + cacher = CacheFunctionDF(cache_name=self.name, cache_path=self.expname) + return cacher.cache( + lambda: pd.DataFrame(self.run_fun(**kwargs)), + ignore_cache=ignore_cache, + ) + + +# TODO: Delete and use CacheFunctionPickle? +@dataclass +class SimulationExperiment(Experiment): + def data(self, ignore_cache: bool = False) -> object: + kwargs = self.kwargs + if kwargs is None: + kwargs = {} + cacher = CacheFunctionPickle(cache_name=self.name, cache_path=self.expname) + return cacher.cache( + fun=lambda: self.run_fun(**kwargs), + ignore_cache=ignore_cache, + ) + + +# TODO: Delete and use CacheFunctionDummy? +@dataclass +class DummyExperiment(Experiment): + """ + Dummy Experiment class that doesn't perform caching and simply runs the run_fun and returns the result. + """ + + def data(self, ignore_cache: bool = False): + return self.run_fun() + + class SaveLoadMixin: """ Mixin class to add generic pickle save/load methods. diff --git a/tabrepo/utils/experiment_utils_v6.py b/tabrepo/utils/experiment_utils_v6.py new file mode 100644 index 00000000..d95040dc --- /dev/null +++ b/tabrepo/utils/experiment_utils_v6.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import pandas as pd +import logging +from typing import Callable, List +import os + +from autogluon.core.data.label_cleaner import LabelCleaner +from autogluon.core.metrics import get_metric, Scorer +from autogluon_benchmark.frameworks.autogluon.run import ag_eval_metric_map +from autogluon_benchmark.tasks.task_wrapper import OpenMLTaskWrapper +from tabrepo.utils.cache import DummyExperiment, Experiment +from tabrepo.scripts_v6 import logging_config + + +# TODO: Prateek: Give a toggle for just fitting and saving the model, if not call predict as well + +log = logging.getLogger(__name__) +model_log = logging.getLogger('singular_model') + +def run_experiments( + expname: str, + tids: List[int], + folds: List[int], + methods: List[str], + methods_dict: dict, + method_cls, # FIXME: Nick: This needs to be communicated on a per-method basis + task_metadata: pd.DataFrame, + ignore_cache: bool, + cache_class: Callable | None = Experiment, + cache_class_kwargs: dict = None +) -> list: + ''' + + Parameters + ---------- + expname: str, Name of the experiment given by the user + tids: list[int], List of OpenML task IDs given by the user + folds: list[int], Number of folds present for the given task + methods: list[str], Models used for fit() and predict() in this experiment + methods_dict: dict, methods (models) mapped to their respective init_args() + task_metadata: pd.DataFrame,OpenML task metadata + ignore_cache: bool, whether to use cached results (if present) + method_cls: WIP + cache_class: WIP + cache_class_kwargs: WIP + + Returns + ------- + result_lst: list, containing all metrics from fit() and predict() of all the given OpenML tasks + ''' + # TODO: Prateek, Check usage + if cache_class is None: + cache_class = DummyExperiment + if cache_class_kwargs is None: + cache_class_kwargs = {} + dataset_names = [task_metadata[task_metadata["tid"] == tid]["name"].iloc[0] for tid in tids] + print( + f"Running Experiments for expname: '{expname}'..." + f"\n\tFitting {len(tids)} datasets and {len(folds)} folds for a total of {len(tids) * len(folds)} tasks" + f"\n\tFitting {len(methods)} methods on {len(tids) * len(folds)} tasks for a total of {len(tids) * len(folds) * len(methods)} jobs..." + f"\n\tTIDs : {tids}" + f"\n\tDatasets: {dataset_names}" + f"\n\tFolds : {folds}" + f"\n\tMethods : {methods}" + ) + result_lst = [] + num_datasets = len(tids) + for i, tid in enumerate(tids): + task = OpenMLTaskWrapper.from_task_id(task_id=tid) + task_name = task_metadata[task_metadata["tid"] == tid]["name"].iloc[0] + print(f"Starting Dataset {i + 1}/{num_datasets}...") + for fold in folds: + for method in methods: + cache_name = f"data/tasks/{tid}/{fold}/{method}/results" + fit_args = methods_dict[method] + log.info( + f"\tFitting {task_name} on fold {fold} for method {method}" + ) + + individual_log_dir = os.path.join('./logs', task_name, str(fold), method) + os.makedirs(individual_log_dir, exist_ok=True) + individual_log_file = os.path.join(individual_log_dir, f"{task_name}_fold{fold}_{method}.log") + logging_config.singular_setup(individual_log_file=individual_log_file) + + model_log.info( + f"\tFitting {task_name} on fold {fold} for method {method}" + ) + + if isinstance(method_cls, dict): + cur_method_cls = method_cls[method] + else: + cur_method_cls = method_cls + + experiment = cache_class( + expname=expname, + name=cache_name, + run_fun=lambda: run_experiment( + method_cls=cur_method_cls, + task=task, + fold=fold, + task_name=task_name, + method=method, + fit_args=fit_args, + ), + **cache_class_kwargs + ) + # FIXME: The output df still needs evaluation and formatting, currently just has predictions + # probabilities, fit and infer times + out = experiment.data(ignore_cache=ignore_cache) + result_lst.append(out) + model_log.handlers.clear() + + return result_lst + + +def run_experiment(method_cls, task: OpenMLTaskWrapper, fold: int, task_name: str, method: str, fit_args: dict = None, + **kwargs): + eval_metric_name = ag_eval_metric_map[task.problem_type] + eval_metric: Scorer = get_metric(metric=eval_metric_name, problem_type=task.problem_type) + model = method_cls( + problem_type=task.problem_type, + eval_metric=eval_metric + ) + + X, y, X_test, y_test = task.get_train_test_split(fold=fold) + + out = model.fit_custom(X, y, X_test, **fit_args) + + label_cleaner = LabelCleaner.construct(problem_type=task.problem_type, y=y) + out["test_error"] = evaluate( + y_true=y_test, + y_pred=out["predictions"], + y_pred_proba=out["probabilities"], + scorer=eval_metric, + label_cleaner=label_cleaner, + problem_type=task.problem_type, + ) + + out["framework"] = method + out["dataset"] = task_name + out["tid"] = task.task_id + out["fold"] = fold + out["problem_type"] = task.problem_type + out["eval_metric"] = eval_metric_name + print(f"Task Name: {out['dataset']}") + print(f"Task ID: {out['tid']}") + print(f"Metric : {out['eval_metric']}") + print(f"Test Error: {out['test_error']:.4f}") + print(f"Fit Time: {out['time_fit']:.3f}s") + print(f"Infer Time: {out['time_predict']:.3f}s") + + out.pop("predictions") + out.pop("probabilities") + + df_results = pd.DataFrame([out]) + ordered_columns = ["dataset", "fold", "framework", "test_error", "eval_metric", "time_fit"] + columns_reorder = ordered_columns + [c for c in df_results.columns if c not in ordered_columns] + df_results = df_results[columns_reorder] + return df_results + + +def convert_leaderboard_to_configs(leaderboard: pd.DataFrame, minimal: bool = True) -> pd.DataFrame: + df_configs = leaderboard.rename(columns=dict( + time_fit="time_train_s", + time_predict="time_infer_s", + test_error="metric_error", + eval_metric="metric", + val_error="metric_error_val", + )) + if minimal: + df_configs = df_configs[[ + "dataset", + "fold", + "framework", + "metric_error", + "metric", + "problem_type", + "time_train_s", + "time_infer_s", + "tid", + ]] + return df_configs + + +def evaluate(y_true: pd.Series, y_pred: pd.Series, y_pred_proba: pd.DataFrame, scorer: Scorer, + label_cleaner: LabelCleaner, problem_type: str): + y_true = label_cleaner.transform(y_true) + if scorer.needs_pred: + y_pred = label_cleaner.transform(y_pred) + test_error = scorer.error(y_true=y_true, y_pred=y_pred) + elif problem_type == "binary": + y_pred_proba = label_cleaner.transform_proba(y_pred_proba, as_pandas=True) + test_error = scorer.error(y_true=y_true, y_pred=y_pred_proba.iloc[:, 1]) + else: + y_pred_proba = label_cleaner.transform_proba(y_pred_proba, as_pandas=True) + test_error = scorer.error(y_true=y_true, y_pred=y_pred_proba) + return test_error