generated from ansys/template
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhelper.py
137 lines (118 loc) · 4.17 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# Copyright (C) 2021 - 2025 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Unit Test Helper Module.
This module offers some helpers that can be useful in PySpeos unit tests.
For example a method to check file existence depending on if the file is in the docker container or
in local.
"""
from pathlib import Path
import subprocess
import time
from ansys.speos.core import LOG # Global logger
from ansys.speos.core.kernel.job import JobLink, messages as job_messages
from ansys.speos.core.kernel.proto_message_utils import protobuf_message_to_str
from ansys.speos.core.speos import SpeosClient
from tests.conftest import config
def clean_all_dbs(speos_client: SpeosClient):
"""Clean all database entries of a current SpeosRPC client.
Parameters
----------
speos_client : ansys.speos.core.kernel.client.SpeosClient
SpeosRPC server client
Returns
-------
None
"""
for item in (
speos_client.jobs().list()
+ speos_client.scenes().list()
+ speos_client.simulation_templates().list()
+ speos_client.sensor_templates().list()
+ speos_client.source_templates().list()
+ speos_client.intensity_templates().list()
+ speos_client.spectrums().list()
+ speos_client.vop_templates().list()
+ speos_client.sop_templates().list()
+ speos_client.parts().list()
+ speos_client.bodies().list()
+ speos_client.faces().list()
):
item.delete()
def run_job_and_check_state(job: JobLink):
"""Run a job and wait for state changes.
Parameters
----------
job: ansys.speos.core.kernel.job.JobLink
Job to be run and validated
"""
job.start()
job_state_res = job.get_state()
while (
job_state_res.state != job_messages.Job.State.FINISHED
and job_state_res.state != job_messages.Job.State.STOPPED
and job_state_res.state != job_messages.Job.State.IN_ERROR
):
time.sleep(2)
job_state_res = job.get_state()
LOG.info(protobuf_message_to_str(job_state_res))
if job_state_res.state == job_messages.Job.State.IN_ERROR:
LOG.error(protobuf_message_to_str(job.get_error()))
assert False
def does_file_exist(path):
"""Check file existence.
Parameters
----------
path (str) - path of the file.
Returns
-------
True if the file exists.
Return type
-----------
bool
"""
if config.get("SpeosServerOnDocker"):
return (
subprocess.call(
"docker exec "
+ config.get("SpeosContainerName")
+ ' test -f "'
+ Path(path).as_posix()
+ '"',
shell=True,
)
== 0
)
else:
return Path(path).exists()
def remove_file(path):
"""Remove file.
Parameters
----------
path (str) - path of the file.
"""
if config.get("SpeosServerOnDocker"):
subprocess.call(
"docker exec " + config.get("SpeosContainerName") + ' rm -rf "' + path + '"',
shell=True,
)
else:
Path(path).unlink()