Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions example/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,63 @@
import eq
import tg

from hamilton import base, driver
from hamilton import driver
from hamilton.io.materialization import to
from hamilton.experimental.h_cache import CachingGraphAdapter
from hamilton.plugins import h_experiments, matplotlib_extensions, pandas_extensions # noqa: F401

from molexp.cmdline import CMDLineExecutionManager
from hamilton.execution import executors

tracker_hook = h_experiments.ExperimentTracker(
experiment_name="exp",
base_directory="./experiments",
)

execution_manager = CMDLineExecutionManager(
executors.SynchronousLocalTaskExecutor(),
executors.MultiThreadingExecutor(20), # max parallelism
)

dr = (
driver.Builder()
.with_modules(build, eq, tg)
# .with_config(config)
.with_adapters(tracker_hook, CachingGraphAdapter('.cache'))
.with_adapters(tracker_hook, CachingGraphAdapter(".cache"))
.enable_dynamic_execution(allow_experimental_mode=True)
.with_execution_manager(execution_manager)
.build()
)

inputs = dict({
# 'work_dir': ''
'repeat_unit': ['N', 'M'],
'repeat': 1,
'n_chains': 20,
'density': 0.005,
})
inputs = dict(
{
# 'work_dir': ''
"repeat_unit": ["N", "M"],
"repeat": 1,
"n_chains": 20,
"density": 0.005,
}
)

materializers = [
to.pickle(
id="after_build",
dependencies=['submit'],
dependencies=["submit"],
path="/proj/snic2021-5-546/users/x_jicli/exp/.cache/to_lammps.pickle",
)
]

for repeat_unit in ['NMNMP', 'NMNMNMP']:
for repeat_unit in ["NMNMP", "NMNMNMP"]:
for repeat in [1, 4, 8]:
inputs['repeat_unit'] = list(repeat_unit)
inputs['repeat'] = repeat
inputs["repeat_unit"] = list(repeat_unit)
inputs["repeat"] = repeat

inputs['work_dir'] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}"
Path(inputs['work_dir']).mkdir(exist_ok=True)
inputs["work_dir"] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}"
Path(inputs["work_dir"]).mkdir(exist_ok=True)
dr.visualize_materialization(
*materializers,
inputs=inputs,
output_file_path=f"{tracker_hook.run_directory}/dag",
render_kwargs=dict(view=False, format="png"),
)
dr.materialize(*materializers, inputs=inputs)
dr.materialize(*materializers, inputs=inputs)
105 changes: 105 additions & 0 deletions example/run_batch1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
Option 1: Use hamilton within Hamilton.
"""

from pathlib import Path
import build
import eq
import tg

from hamilton import driver
from hamilton.io.materialization import to
from hamilton.experimental.h_cache import CachingGraphAdapter
from hamilton.plugins import h_experiments, matplotlib_extensions, pandas_extensions # noqa: F401
from hamilton.function_modifiers import tag, value, parameterize

from molexp.cmdline import CMDLineExecutionManager
from hamilton.execution import executors


# could use @resolve to dynamically create this via passed in configuration.
# this is statically declared here for now.
cross_product = {
f"{ru}x{r}": {"repeat_unit": value(ru), "repeat": value(r)}
for ru in ["NMNMP", "NMNMNMP"]
for r in [1, 4, 8]
}


@parameterize(**cross_product)
@tag(cmdline="slurm")
def experiment(repeat_unit: str, repeat: int) -> dict:
"""Node to run an experiment."""
tracker_hook = h_experiments.ExperimentTracker(
experiment_name="exp",
base_directory="./experiments",
)

execution_manager = CMDLineExecutionManager(
executors.SynchronousLocalTaskExecutor(),
executors.SynchronousLocalTaskExecutor(),
)

dr = (
driver.Builder()
.with_modules(build, eq, tg)
# .with_config(config)
.with_adapters(tracker_hook, CachingGraphAdapter(".cache"))
.enable_dynamic_execution(allow_experimental_mode=True)
.with_execution_manager(execution_manager)
.build()
)

inputs = dict(
{
"n_chains": 20,
"density": 0.005,
}
)

materializers = [
to.pickle(
id="after_build",
dependencies=["submit"],
path="/proj/snic2021-5-546/users/x_jicli/exp/.cache/to_lammps.pickle",
)
]

inputs["repeat_unit"] = list(repeat_unit)
inputs["repeat"] = repeat

inputs["work_dir"] = f"{''.join(inputs['repeat_unit'])}x{inputs['repeat']}"
Path(inputs["work_dir"]).mkdir(exist_ok=True)
dr.visualize_materialization(
*materializers,
inputs=inputs,
output_file_path=f"{tracker_hook.run_directory}/dag",
render_kwargs=dict(view=False, format="png"),
)
meta, _ = dr.materialize(*materializers, inputs=inputs)
return meta


if __name__ == "__main__":
import run_batch1

# tracker_hook = h_experiments.ExperimentTracker(
# experiment_name="exp",
# base_directory="./experiments",
# )

execution_manager = CMDLineExecutionManager(
executors.SynchronousLocalTaskExecutor(),
executors.MultiThreadingExecutor(20),
)

dr = (
driver.Builder()
.with_modules(run_batch1)
# .with_config(config)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_execution_manager(execution_manager)
.build()
)
dr.display_all_functions("run_batch1.png")
dr.execute([dr.list_available_variables()])
2 changes: 1 addition & 1 deletion src/molexp/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ def wrapper(*args, **kwargs):
if inspect.isgeneratorfunction(func):
# get the return type and set it as the return type of the wrapper
wrapper.__annotations__["return"] = inspect.signature(func).return_annotation[2]
return wrapper
return wrapper
26 changes: 13 additions & 13 deletions src/molexp/submitor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import inspect
import functools
import subprocess
import os

from pathlib import Path

from pysqa import QueueAdapter
import time
from hamilton.function_modifiers import tag

print(Path('.queues').absolute())
print(Path(".queues").absolute())
qa = QueueAdapter(directory=".queues")


class Monitor:
job_queue = []

Expand All @@ -26,9 +26,9 @@ def __del__(self):
if self in self.job_queue:
self.job_queue.remove(self)

def __eq__(self, other:'Monitor'):
def __eq__(self, other: "Monitor"):
return self.job_id == other.job_id

def wait(self, pool_interval=5):
while True:
status = self.get_status()
Expand All @@ -45,7 +45,7 @@ def wait(self, pool_interval=5):
def get_all_job_status(cls):
job_id = [monitor.job_id for monitor in cls.job_queue]
return qa.get_status_of_jobs(job_id)

@classmethod
def wait_all(self, pool_interval=5):
while True:
Expand All @@ -57,12 +57,11 @@ def wait_all(self, pool_interval=5):
else:
print(f"Jobs are {status}.")
time.sleep(pool_interval)


def submit():
"""Decorator to run the result of a function as a command line command."""


def decorator(func):
if not inspect.isgeneratorfunction(func):
raise ValueError("Function must be a generator.")
Expand All @@ -71,17 +70,15 @@ def decorator(func):
def wrapper(*args, **kwargs):
# we don't want to change the current working directory
# until we are executing the function
name = func.__name__
# name = func.__name__

# If the function is a generator, we need to block and monitor the task if required
generator = func(*args, **kwargs)
arguments: dict = next(generator)
monitor = arguments.pop("monitor", False)
# Run the command and capture the output
print(arguments)
job_id = qa.submit_job(
**arguments
)
job_id = qa.submit_job(**arguments)
# slurm_task_info := Submitted batch job 30588834

# TODO: blocked, but usually many workflow execute in parallel,
Expand All @@ -107,7 +104,10 @@ def wrapper(*args, **kwargs):
# get the return type and set it as the return type of the wrapper
print(wrapper.__annotations__)
print(inspect.signature(func).return_annotation)
wrapper.__annotations__["return"] = inspect.signature(func).return_annotation # Jichen: why [2] ?
wrapper.__annotations__["return"] = inspect.signature(
func
).return_annotation # Jichen: why [2] ?
return wrapper

decorator = tag(cmdline="slurm")(decorator)
return decorator