diff --git a/example/run.py b/example/run.py index c845481..7c8ac89 100644 --- a/example/run.py +++ b/example/run.py @@ -3,51 +3,63 @@ import eq import tg -from hamilton import base, driver +from hamilton import driver from hamilton.io.materialization import to from hamilton.experimental.h_cache import CachingGraphAdapter from hamilton.plugins import h_experiments, matplotlib_extensions, pandas_extensions # noqa: F401 +from molexp.cmdline import CMDLineExecutionManager +from hamilton.execution import executors + tracker_hook = h_experiments.ExperimentTracker( experiment_name="exp", base_directory="./experiments", ) +execution_manager = CMDLineExecutionManager( + executors.SynchronousLocalTaskExecutor(), + executors.MultiThreadingExecutor(20), # max parallelism +) + dr = ( driver.Builder() .with_modules(build, eq, tg) # .with_config(config) - .with_adapters(tracker_hook, CachingGraphAdapter('.cache')) + .with_adapters(tracker_hook, CachingGraphAdapter(".cache")) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_execution_manager(execution_manager) .build() ) -inputs = dict({ - # 'work_dir': '' - 'repeat_unit': ['N', 'M'], - 'repeat': 1, - 'n_chains': 20, - 'density': 0.005, -}) +inputs = dict( + { + # 'work_dir': '' + "repeat_unit": ["N", "M"], + "repeat": 1, + "n_chains": 20, + "density": 0.005, + } +) materializers = [ to.pickle( id="after_build", - dependencies=['submit'], + dependencies=["submit"], path="/proj/snic2021-5-546/users/x_jicli/exp/.cache/to_lammps.pickle", ) ] -for repeat_unit in ['NMNMP', 'NMNMNMP']: +for repeat_unit in ["NMNMP", "NMNMNMP"]: for repeat in [1, 4, 8]: - inputs['repeat_unit'] = list(repeat_unit) - inputs['repeat'] = repeat + inputs["repeat_unit"] = list(repeat_unit) + inputs["repeat"] = repeat - inputs['work_dir'] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}" - Path(inputs['work_dir']).mkdir(exist_ok=True) + inputs["work_dir"] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}" + Path(inputs["work_dir"]).mkdir(exist_ok=True) dr.visualize_materialization( *materializers, inputs=inputs, output_file_path=f"{tracker_hook.run_directory}/dag", render_kwargs=dict(view=False, format="png"), ) - dr.materialize(*materializers, inputs=inputs) \ No newline at end of file + dr.materialize(*materializers, inputs=inputs) diff --git a/example/run_batch1.py b/example/run_batch1.py new file mode 100644 index 0000000..c7f039f --- /dev/null +++ b/example/run_batch1.py @@ -0,0 +1,105 @@ +""" +Option 1: Use hamilton within Hamilton. +""" + +from pathlib import Path +import build +import eq +import tg + +from hamilton import driver +from hamilton.io.materialization import to +from hamilton.experimental.h_cache import CachingGraphAdapter +from hamilton.plugins import h_experiments, matplotlib_extensions, pandas_extensions # noqa: F401 +from hamilton.function_modifiers import tag, value, parameterize + +from molexp.cmdline import CMDLineExecutionManager +from hamilton.execution import executors + + +# could use @resolve to dynamically create this via passed in configuration. +# this is statically declared here for now. +cross_product = { + f"{ru}x{r}": {"repeat_unit": value(ru), "repeat": value(r)} + for ru in ["NMNMP", "NMNMNMP"] + for r in [1, 4, 8] +} + + +@parameterize(**cross_product) +@tag(cmdline="slurm") +def experiment(repeat_unit: str, repeat: int) -> dict: + """Node to run an experiment.""" + tracker_hook = h_experiments.ExperimentTracker( + experiment_name="exp", + base_directory="./experiments", + ) + + execution_manager = CMDLineExecutionManager( + executors.SynchronousLocalTaskExecutor(), + executors.SynchronousLocalTaskExecutor(), + ) + + dr = ( + driver.Builder() + .with_modules(build, eq, tg) + # .with_config(config) + .with_adapters(tracker_hook, CachingGraphAdapter(".cache")) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_execution_manager(execution_manager) + .build() + ) + + inputs = dict( + { + "n_chains": 20, + "density": 0.005, + } + ) + + materializers = [ + to.pickle( + id="after_build", + dependencies=["submit"], + path="/proj/snic2021-5-546/users/x_jicli/exp/.cache/to_lammps.pickle", + ) + ] + + inputs["repeat_unit"] = list(repeat_unit) + inputs["repeat"] = repeat + + inputs["work_dir"] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}" + Path(inputs["work_dir"]).mkdir(exist_ok=True) + dr.visualize_materialization( + *materializers, + inputs=inputs, + output_file_path=f"{tracker_hook.run_directory}/dag", + render_kwargs=dict(view=False, format="png"), + ) + meta, _ = dr.materialize(*materializers, inputs=inputs) + return meta + + +if __name__ == "__main__": + import run_batch1 + + # tracker_hook = h_experiments.ExperimentTracker( + # experiment_name="exp", + # base_directory="./experiments", + # ) + + execution_manager = CMDLineExecutionManager( + executors.SynchronousLocalTaskExecutor(), + executors.MultiThreadingExecutor(20), + ) + + dr = ( + driver.Builder() + .with_modules(run_batch1) + # .with_config(config) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_execution_manager(execution_manager) + .build() + ) + dr.display_all_functions("run_batch1.png") + dr.execute([dr.list_available_variables()]) diff --git a/src/molexp/cmdline.py b/src/molexp/cmdline.py index 7dab899..3b02244 100644 --- a/src/molexp/cmdline.py +++ b/src/molexp/cmdline.py @@ -51,4 +51,4 @@ def wrapper(*args, **kwargs): if inspect.isgeneratorfunction(func): # get the return type and set it as the return type of the wrapper wrapper.__annotations__["return"] = inspect.signature(func).return_annotation[2] - return wrapper \ No newline at end of file + return wrapper diff --git a/src/molexp/submitor.py b/src/molexp/submitor.py index c2b068a..0d741fe 100644 --- a/src/molexp/submitor.py +++ b/src/molexp/submitor.py @@ -1,16 +1,16 @@ import inspect import functools -import subprocess -import os from pathlib import Path from pysqa import QueueAdapter import time +from hamilton.function_modifiers import tag -print(Path('.queues').absolute()) +print(Path(".queues").absolute()) qa = QueueAdapter(directory=".queues") + class Monitor: job_queue = [] @@ -26,9 +26,9 @@ def __del__(self): if self in self.job_queue: self.job_queue.remove(self) - def __eq__(self, other:'Monitor'): + def __eq__(self, other: "Monitor"): return self.job_id == other.job_id - + def wait(self, pool_interval=5): while True: status = self.get_status() @@ -45,7 +45,7 @@ def wait(self, pool_interval=5): def get_all_job_status(cls): job_id = [monitor.job_id for monitor in cls.job_queue] return qa.get_status_of_jobs(job_id) - + @classmethod def wait_all(self, pool_interval=5): while True: @@ -57,12 +57,11 @@ def wait_all(self, pool_interval=5): else: print(f"Jobs are {status}.") time.sleep(pool_interval) - + def submit(): """Decorator to run the result of a function as a command line command.""" - def decorator(func): if not inspect.isgeneratorfunction(func): raise ValueError("Function must be a generator.") @@ -71,7 +70,7 @@ def decorator(func): def wrapper(*args, **kwargs): # we don't want to change the current working directory # until we are executing the function - name = func.__name__ + # name = func.__name__ # If the function is a generator, we need to block and monitor the task if required generator = func(*args, **kwargs) @@ -79,9 +78,7 @@ def wrapper(*args, **kwargs): monitor = arguments.pop("monitor", False) # Run the command and capture the output print(arguments) - job_id = qa.submit_job( - **arguments - ) + job_id = qa.submit_job(**arguments) # slurm_task_info := Submitted batch job 30588834 # TODO: blocked, but usually many workflow execute in parallel, @@ -107,7 +104,10 @@ def wrapper(*args, **kwargs): # get the return type and set it as the return type of the wrapper print(wrapper.__annotations__) print(inspect.signature(func).return_annotation) - wrapper.__annotations__["return"] = inspect.signature(func).return_annotation # Jichen: why [2] ? + wrapper.__annotations__["return"] = inspect.signature( + func + ).return_annotation # Jichen: why [2] ? return wrapper + decorator = tag(cmdline="slurm")(decorator) return decorator