Skip to content

feat: adds workflow template to_yaml and to_json #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
7 changes: 7 additions & 0 deletions hpcflow/sdk/core/command_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,13 @@ def to_dict(self):
dct = super().to_dict()
return dct

def __eq__(self, other: object) -> bool:
if not isinstance(other, self.__class__):
return False
if self.to_dict() == other.to_dict():
return True
return False

def _get_members(self, ensure_contents=False, use_file_label=False):
out = super()._get_members(ensure_contents)
if use_file_label:
Expand Down
7 changes: 7 additions & 0 deletions hpcflow/sdk/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Mapping

from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedSeq
import sentry_sdk

from hpcflow.sdk.core.errors import FromSpecMissingObjectError, InvalidIdentifier
Expand Down Expand Up @@ -311,6 +312,12 @@ def read_YAML_file(path: PathLike):
return read_YAML(Path(path))


def FSlist(l):
cs = CommentedSeq(l)
cs.fa.set_flow_style()
return cs


def read_JSON_string(string: str):
return json.loads(string)

Expand Down
153 changes: 153 additions & 0 deletions hpcflow/sdk/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
read_JSON_string,
read_YAML,
read_YAML_file,
FSlist,
replace_items,
)
from ruamel.yaml import YAML
from io import StringIO
from json import dumps
from hpcflow.sdk.core.errors import (
InvalidInputSourceTaskReference,
LoopAlreadyExistsError,
Expand Down Expand Up @@ -181,6 +185,135 @@ def from_YAML_file(cls, path: PathLike) -> app.WorkflowTemplate:
"""
return cls._from_data(read_YAML_file(path))

@classmethod
def to_input_dict(cls, data):
"""Called by to_yaml and to_json.
Returns a python dict with only the necessary elements to save the template."""
d_workflow = {"name": data.name}

# Task_Schemas

# Tasks
l_tasks = []
for task in data.tasks:
d_task = {}
# Schemas
l_schemas = []
for schema in task.schemas:
l_schemas.append(schema.name)
d_task["schemas"] = l_schemas

# Element sets
l_element_sets = []
for element_set in task.element_sets:
d_element_set = {}

# Inputs
d_inputs = {}
for input in element_set.inputs:
if isinstance(input.value, list):
d_inputs[input.parameter.typ] = input.value
else:
d_inputs[input.parameter.typ] = input.value
if d_inputs:
d_element_set["inputs"] = d_inputs

# Sequences
l_sequences = []
for sequence in element_set.sequences:
l_sequences.extend(
[
{
"path": sequence.path,
"values": sequence.values,
"nesting_order": sequence.nesting_order,
}
]
)
if l_sequences:
d_element_set["sequences"] = l_sequences

# Resources
l_resources = {}
for resource in element_set.resources:
if resource.num_cores:
l_resources[resource.normalised_resources_path] = {
"num_cores": resource.num_cores
}
if l_resources:
d_element_set["resources"] = l_resources

# Input files
l_input_files = []
for input_file in element_set.input_files:
l_input_files.extend(
[
{
"file": input_file.file.label,
"path": input_file._path,
}
]
)
if l_input_files:
d_element_set["input_files"] = l_input_files

l_element_sets.append(d_element_set)
d_task["element_sets"] = l_element_sets

l_tasks.append(d_task)
if l_tasks:
d_workflow["tasks"] = l_tasks

return d_workflow

@classmethod
def to_yaml(cls, dumper, data):
"""Called by to_yaml_string."""
d_workflow = cls.to_input_dict(data)

for i, task in enumerate(d_workflow["tasks"]):
d_workflow["tasks"][i]["schemas"] = FSlist(task["schemas"])

for j, element_set in enumerate(task["element_sets"]):
if "inputs" in element_set:
for name, values in element_set["inputs"].items():
if isinstance(values, list):
d_workflow["tasks"][i]["element_sets"][j]["inputs"][
name
] = FSlist(values)

if "sequences" in element_set:
for k, sequence in enumerate(element_set["sequences"]):
d_workflow["tasks"][i]["element_sets"][j]["sequences"][k][
"values"
] = FSlist(sequence["values"])

return dumper.represent_mapping("tag:yaml.org,2002:map", d_workflow)

def to_yaml_string(self):
"""Get templeate as a formatted YAML string."""
wt_yaml = YAML()
yaml_string = StringIO()
wt_yaml.register_class(type(self))
wt_yaml.dump(self, yaml_string)
return yaml_string.getvalue()

def to_yaml_file(self, yaml_file_path):
"""Save to a YAML file.

Parameters
----------
string
The path to the yaml file in which the template will be saved.

"""
if not any(x in yaml_file_path for x in [".yml", ".yaml"]):
yaml_file_path = yaml_file_path + ".yml"
wt_yaml = YAML()
wt_yaml.register_class(type(self))
with open(yaml_file_path, "w") as output_file:
wt_yaml.dump(self, output_file)

@classmethod
def from_JSON_string(cls, string: str) -> app.WorkflowTemplate:
"""Load from a JSON string.
Expand All @@ -205,6 +338,26 @@ def from_JSON_file(cls, path: PathLike) -> app.WorkflowTemplate:
"""
return cls._from_data(read_JSON_file(path))

def to_json_string(self):
"""Get templeate as a formatted JSON string."""
d_workflow = self.to_input_dict(self)
json_string = dumps(d_workflow, indent=4)
return json_string

def to_json_file(self, json_file_path):
"""Save to a JSON file.

Parameters
----------
string
The path to the json file in which the template will be saved.

"""
if not ".json" in json_file_path:
json_file_path = json_file_path + ".json"
with open(json_file_path, "w") as output_file:
output_file.write(self.to_json_string())

@classmethod
def from_file(
cls,
Expand Down
Loading