-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathexample.py
More file actions
64 lines (55 loc) · 1.47 KB
/
example.py
File metadata and controls
64 lines (55 loc) · 1.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from pathlib import Path
from uuid import UUID
from tierkreis.builder import GraphBuilder
from tierkreis.controller import run_graph
from tierkreis.controller.data.models import TKR
from tierkreis.controller.executor.uv_executor import UvExecutor
from tierkreis.storage import FileStorage
from tierkreis.storage import read_outputs
from tierkreis.builtins.stubs import tkr_str
def simple_graph() -> GraphBuilder[TKR[int], TKR[str]]:
g = GraphBuilder(TKR[int], TKR[str])
a = g.task(tkr_str(g.inputs))
g.outputs(a)
return g
def main() -> None:
graph_a = simple_graph().data
graph_b = simple_graph().data
graph = graph_a + graph_b
inputs = {"value": 0}
storage = FileStorage(
UUID(int=222),
name="serial_graph",
)
executor = UvExecutor(
Path(__file__).parent / "tierkreis_workers", storage.logs_path
)
storage.clean_graph_files()
run_graph(
storage,
executor,
graph,
inputs,
)
res = read_outputs(graph, storage)
print(res)
graph = graph_a @ graph_b
storage = FileStorage(
UUID(int=223),
name="parallel_graph",
)
executor = UvExecutor(
Path(__file__).parent / "tierkreis_workers", storage.logs_path
)
storage.clean_graph_files()
run_graph(
storage,
executor,
graph,
inputs,
)
res = read_outputs(graph, storage)
print(res)
if __name__ == "__main__":
main()
print("All Done")