Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug workflow #287

Merged
merged 12 commits into from
Aug 6, 2023
31 changes: 19 additions & 12 deletions capsul/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def add_modifier(self, modifier):
raise ValueError(f'Invalid value for schema modification for parameter {self.parameter}: {modifier}')

def apply(self, metadata, process, parameter, initial_meta):
debug = False # (parameter == 't1mri_nobias')
debug = False # (parameter == 'nobias')
if debug: print('apply modifier to', parameter, ':', self, metadata, initial_meta)
for modifier in self.modifiers:
if isinstance(modifier, dict):
Expand Down Expand Up @@ -798,6 +798,7 @@ def metadata_modifications(self, process):

for field in process.user_fields():
# self.debug = (field.name == 't1mri_nobias')
done_mod = set()
if process.plugs[field.name].activated:
self.dprint(
f' Parse schema modifications for {field.name}')
Expand Down Expand Up @@ -858,17 +859,22 @@ def metadata_modifications(self, process):
if intra_node is not None:
filtered_meta = self.get_linked_metadata(
schema, intra_node, intra_src, intra_dst)
modifier = MetadataModifier(
schema, node, node_parameter,
filtered_meta=filtered_meta)
if not modifier.is_empty:
self.dprint(f' {modifier.modifiers}')
if filtered_meta is not None:
self.dprint(
' filtered_meta: '
f'{filtered_meta}')
result.setdefault(field.name,
[]).append(modifier)
if (node, node_parameter) not in done_mod:
# (avoid having several times the same modifier
# via different paths, some Prepend(), Append()
# may be duplicate)
modifier = MetadataModifier(
schema, node, node_parameter,
filtered_meta=filtered_meta)
if not modifier.is_empty:
self.dprint(f' {modifier.modifiers}')
if filtered_meta is not None:
self.dprint(
' filtered_meta: '
f'{filtered_meta}')
result.setdefault(field.name,
[]).append(modifier)
done_mod.add((node, node_parameter))
else:
self.dprint(f' {field.name} ignored (inactive)')

Expand Down Expand Up @@ -941,6 +947,7 @@ def get_schema(self, schema_name, index=None):
return schema

def generate_paths(self, executable):
# self.debug = True
if self.debug:
if self._current_iteration is not None:
iteration = f'[{self._current_iteration}]'
Expand Down
204 changes: 175 additions & 29 deletions capsul/execution_context.py

Large diffs are not rendered by default.

105 changes: 73 additions & 32 deletions capsul/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,7 @@ def workflow_ordered_nodes(self, remove_disabled_steps=True):
graph = self.workflow_graph(remove_disabled_steps)


# Start the topologival sort
# Start the topological sort
ordered_list = graph.topological_sort()

def walk_workflow(wokflow, workflow_list):
Expand All @@ -1541,7 +1541,7 @@ def walk_workflow(wokflow, workflow_list):
workflow_list = []
walk_workflow(ordered_list, workflow_list)

return workflow_list
return workflow_list

def find_empty_parameters(self):
""" Find internal File/Directory parameters not exported to the main
Expand Down Expand Up @@ -2248,18 +2248,21 @@ def dispatch_plugs(self, node, name):
name,
in_sub_pipelines=False,
activated_only=False,
process_only=False))
process_only=False,
direction=('links_from', 'links_to')))
while stack:
item = stack.pop()
if item not in done:
node, plug = item
yield (node, plug)
done.add(item)
stack.extend(self.get_linked_items(node,
plug,
stack.extend(self.get_linked_items(
node,
plug,
in_sub_pipelines=False,
activated_only=False,
process_only=False))
process_only=False,
direction=('links_from', 'links_to')))
self.enable_parameter_links = enable_parameter_links

def dispatch_all_values(self):
Expand All @@ -2278,19 +2281,31 @@ def get_linked_items(self, node, plug_name=None, in_sub_pipelines=True,
Going through switches and inside subpipelines, ignoring nodes that are
not activated.
The result is a generator of pairs (node, plug_name).

direction may be a sting, 'links_from', 'links_to', or a tuple
('linnks_from', 'links_to').
'''
if plug_name is None:
stack = [(node, plug) for plug in node.plugs]
else:
stack = [(node, plug_name)]
done = set()

while stack:
node, plug_name = stack.pop(0)
current = stack.pop(0)
if current in done:
continue
done.add(current)
node, plug_name = current
if activated_only and not node.activated:
continue
plug = node.plugs.get(plug_name)
if plug:
if direction is not None:
directions = (direction,)
if isinstance(direction, (tuple, list)):
directions = direction
else:
directions = (direction,)
else:
if isinstance(node, Pipeline):
if in_outer_pipelines:
Expand All @@ -2304,41 +2319,67 @@ def get_linked_items(self, node, plug_name=None, in_sub_pipelines=True,
else:
directions = ('links_from',)
for current_direction in directions:
for dest_plug_name, dest_node in (i[1:3] for i in getattr(plug, current_direction)):
if dest_node is node or (activated_only
and not dest_node.activated):
for dest_plug_name, dest_node in \
(i[1:3] for i in getattr(plug, current_direction)):
if dest_node is node \
or (activated_only
and not dest_node.activated):
continue
if isinstance(dest_node, Pipeline):
if ((in_sub_pipelines and dest_node is not self) or
(in_outer_pipelines and isinstance(dest_node, Pipeline))):
for n, p in self.get_linked_items(dest_node,
dest_plug_name,
activated_only=activated_only,
process_only=process_only,
in_sub_pipelines=in_sub_pipelines,
direction=current_direction,
in_outer_pipelines=in_outer_pipelines):
if ((in_sub_pipelines and dest_node is not self)
or in_outer_pipelines):
for n, p in self.get_linked_items(
dest_node,
dest_plug_name,
activated_only=activated_only,
process_only=process_only,
in_sub_pipelines=in_sub_pipelines,
direction=current_direction,
in_outer_pipelines=in_outer_pipelines):
if n is not node:
yield (n, p)
yield (dest_node, dest_plug_name)
if (n, p) not in done:
yield (n, p)
if (dest_node, dest_plug_name) not in done:
yield (dest_node, dest_plug_name)
elif isinstance(dest_node, Switch):
if dest_plug_name == 'switch':
if not process_only:
yield (dest_node, dest_plug_name)
if (dest_node, dest_plug_name) \
not in done:
yield (dest_node, dest_plug_name)
else:
for input_plug_name, output_plug_name in dest_node.connections():
if plug.output ^ isinstance(node, Pipeline):
if direction is None \
or (isinstance(direction,
(tuple, list))
and len(direction) == 2):
# if bidirectional search only
stack.append((dest_node, dest_plug_name))
for input_plug_name, output_plug_name \
in dest_node.connections():
if current_direction == 'links_to':
if dest_plug_name == input_plug_name:
if not process_only:
yield (dest_node, output_plug_name)
stack.append((dest_node, output_plug_name))
if not process_only \
and (dest_node,
output_plug_name) \
not in done:
yield (
dest_node,
output_plug_name)
stack.append((dest_node,
output_plug_name))
else:
if dest_plug_name == output_plug_name:
if not process_only:
yield (dest_node, input_plug_name)
stack.append((dest_node, input_plug_name))
if not process_only \
and (dest_node,
input_plug_name) \
not in done:
yield (
dest_node, input_plug_name)
stack.append((dest_node,
input_plug_name))
else:
yield (dest_node, dest_plug_name)
if (dest_node, dest_plug_name) not in done:
yield (dest_node, dest_plug_name)

def json(self, include_parameters=True):
result = super().json(include_parameters=include_parameters)
Expand Down
40 changes: 22 additions & 18 deletions capsul/pipeline/pipeline_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,13 @@ def _switch_changed(self, new_selection, old_selection):
setattr(self, output_plug_name,
getattr(self, corresponding_input_plug_name, undefined))

if self.pipeline is not None:
f = self.field(output_plug_name)
for n, p in self.pipeline.get_linked_items(
self, corresponding_input_plug_name,
direction='links_from'):
# copy input field metadata
for k, v in n.field(p).metadata().items():
setattr(f, k, v)
break

# Propagate the associated field documentation
out_field = self.field(output_plug_name)
in_field = self.field(corresponding_input_plug_name)
out_field.doc = in_field.metadata('doc', None)

self.propagate_fields_metadata()

self.pipeline.restore_update_nodes_and_plugs_activation()
self.__block_output_propagation = False

Expand Down Expand Up @@ -314,14 +306,6 @@ def _any_attribute_changed(self, new, old, name):
if self.switch == switch_selection:
self.__block_output_propagation = True
setattr(self, output_plug_name, new)
if self.pipeline is not None:
f = self.field(output_plug_name)
for n, p in self.pipeline.get_linked_items(
self, name, direction='links_from'):
# copy input field metadata
for k, v in n.field(p).metadata().items():
setattr(f, k, v)
break
self.__block_output_propagation = False

def __setstate__(self, state):
Expand Down Expand Up @@ -393,6 +377,26 @@ def configured_controller(self):
c.optional_params = [self.field(p).optional for p in self.inputs]
return c

def propagate_fields_metadata(self):
''' Propagate metadata from connected inputs (that is, outputs of
upstream processes) to outputs.
This is needed to get correct status (read/write) on output pipeline
plugs once the switch state is chosen.
'''
for output_plug_name in self._outputs:
# Get the associated input name
input_plug_name = f'{self.switch}_switch_{output_plug_name}'

if self.pipeline is not None:
f = self.field(output_plug_name)
for n, p in self.pipeline.get_linked_items(
self, input_plug_name,
direction='links_from'):
# copy input field metadata
for k, v in n.field(p).metadata().items():
setattr(f, k, v)
break

@classmethod
def build_node(cls, pipeline, name, conf_controller):
node = Switch(pipeline, name, conf_controller.inputs,
Expand Down
Loading
Loading