1
+ """Wrap a CWL document as a callable Python object."""
2
+
3
+ import argparse
4
+ import functools
1
5
import os
6
+ import sys
2
7
from typing import Any , Optional , Union
3
8
4
9
from . import load_tool
5
- from .context import LoadingContext , RuntimeContext
10
+ from .argparser import arg_parser
11
+ from .context import LoadingContext , RuntimeContext , getdefault
6
12
from .errors import WorkflowException
7
13
from .executors import JobExecutor , SingleJobExecutor
14
+ from .main import find_default_container
8
15
from .process import Process
9
- from .utils import CWLObjectType
16
+ from .resolver import tool_resolver
17
+ from .secrets import SecretStore
18
+ from .utils import DEFAULT_TMP_PREFIX , CWLObjectType
10
19
11
20
12
21
class WorkflowStatus (Exception ):
@@ -25,11 +34,13 @@ def __init__(self, t: Process, factory: "Factory") -> None:
25
34
self .t = t
26
35
self .factory = factory
27
36
28
- def __call__ (self , ** kwargs ):
29
- # type: (**Any) -> Union[str, Optional[CWLObjectType]]
30
- runtime_context = self .factory .runtime_context .copy ()
31
- runtime_context .basedir = os .getcwd ()
32
- out , status = self .factory .executor (self .t , kwargs , runtime_context )
37
+ def __call__ (self , ** kwargs : Any ) -> Union [str , Optional [CWLObjectType ]]:
38
+ """
39
+ Execute the process.
40
+
41
+ :raise WorkflowStatus: If the result is not a success.
42
+ """
43
+ out , status = self .factory .executor (self .t , kwargs , self .factory .runtime_context )
33
44
if status != "success" :
34
45
raise WorkflowStatus (out , status )
35
46
else :
@@ -44,21 +55,27 @@ class Factory:
44
55
45
56
def __init__ (
46
57
self ,
58
+ argsl : Optional [list [str ]] = None ,
59
+ args : Optional [argparse .Namespace ] = None ,
47
60
executor : Optional [JobExecutor ] = None ,
48
61
loading_context : Optional [LoadingContext ] = None ,
49
62
runtime_context : Optional [RuntimeContext ] = None ,
50
63
) -> None :
64
+ """Create a CWL Process factory from a CWL document."""
65
+ if argsl is not None :
66
+ args = arg_parser ().parse_args (argsl )
51
67
if executor is None :
52
- executor = SingleJobExecutor ()
53
- self .executor = executor
68
+ self .executor : JobExecutor = SingleJobExecutor ()
69
+ else :
70
+ self .executor = executor
54
71
if runtime_context is None :
55
- self .runtime_context = RuntimeContext ()
72
+ self .runtime_context = RuntimeContext (vars (args ))
73
+ self ._fix_runtime_context ()
56
74
else :
57
75
self .runtime_context = runtime_context
58
76
if loading_context is None :
59
- self .loading_context = LoadingContext ()
60
- self .loading_context .singularity = self .runtime_context .singularity
61
- self .loading_context .podman = self .runtime_context .podman
77
+ self .loading_context = LoadingContext (vars (args ))
78
+ self ._fix_loading_context (self .runtime_context )
62
79
else :
63
80
self .loading_context = loading_context
64
81
@@ -68,3 +85,45 @@ def make(self, cwl: Union[str, dict[str, Any]]) -> Callable:
68
85
if isinstance (load , int ):
69
86
raise WorkflowException ("Error loading tool" )
70
87
return Callable (load , self )
88
+
89
+ def _fix_loading_context (self , runtime_context : RuntimeContext ) -> None :
90
+ self .loading_context .resolver = getdefault (self .loading_context .resolver , tool_resolver )
91
+ self .loading_context .singularity = runtime_context .singularity
92
+ self .loading_context .podman = runtime_context .podman
93
+
94
+ def _fix_runtime_context (self ) -> None :
95
+ self .runtime_context .basedir = os .getcwd ()
96
+ self .runtime_context .find_default_container = functools .partial (
97
+ find_default_container , default_container = None , use_biocontainers = None
98
+ )
99
+
100
+ if sys .platform == "darwin" :
101
+ default_mac_path = "/private/tmp/docker_tmp"
102
+ if self .runtimeContext .tmp_outdir_prefix == DEFAULT_TMP_PREFIX :
103
+ self .runtimeContext .tmp_outdir_prefix = default_mac_path
104
+
105
+ for dirprefix in ("tmpdir_prefix" , "tmp_outdir_prefix" , "cachedir" ):
106
+ if (
107
+ getattr (self .runtime_context , dirprefix )
108
+ and getattr (self .runtime_context , dirprefix ) != DEFAULT_TMP_PREFIX
109
+ ):
110
+ sl = (
111
+ "/"
112
+ if getattr (self .runtime_context , dirprefix ).endswith ("/" )
113
+ or dirprefix == "cachedir"
114
+ else ""
115
+ )
116
+ setattr (
117
+ self .runtime_context ,
118
+ dirprefix ,
119
+ os .path .abspath (getattr (self .runtime_context , dirprefix )) + sl ,
120
+ )
121
+ if not os .path .exists (os .path .dirname (getattr (self .runtime_context , dirprefix ))):
122
+ try :
123
+ os .makedirs (os .path .dirname (getattr (self .runtime_context , dirprefix )))
124
+ except Exception as e :
125
+ print ("Failed to create directory: %s" , e )
126
+
127
+ self .runtime_context .secret_store = getdefault (
128
+ self .runtime_context .secret_store , SecretStore ()
129
+ )
0 commit comments