-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchrisClient.py
More file actions
145 lines (132 loc) · 5.47 KB
/
chrisClient.py
File metadata and controls
145 lines (132 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
### Python Chris Client Implementation ###
from base_client import BaseClient
import json
import requests
from loguru import logger
import sys
from pipeline import Pipeline
from notification import Notification
LOG = logger.debug
logger_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> │ "
"<level>{level: <5}</level> │ "
"<yellow>{name: >28}</yellow>::"
"<cyan>{function: <30}</cyan> @"
"<cyan>{line: <4}</cyan> ║ "
"<level>{message}</level>"
)
logger.remove()
logger.add(sys.stderr, format=logger_format)
class ChrisClient(BaseClient):
def __init__(self, url: str, token: str):
self.api_base = url.rstrip('/')
self.auth = token
self.headers = {"Content-Type": "application/json", "Authorization": f"Token {token}"}
self.pacs_series_url = f"{url}/pacs/series/"
def health_check(self):
endpoint = f"{self.api_base}/"
response = requests.request("GET", endpoint, headers=self.headers, timeout=30)
response.raise_for_status()
try:
return response.json()
except ValueError:
return response.text
def pacs_pull(self):
pass
def pacs_push(self):
pass
async def anonymize(self, params: dict, pv_id: int):
pipe = Pipeline(self.api_base, self.auth)
plugin_params = {
'PACS-query': {
"PACSurl": params["pull"]["url"],
"PACSname": params["pull"]["pacs"],
"PACSdirective": json.dumps(params["search"])
},
'PACS-retrieve': {
"PACSurl": params["pull"]["url"],
"PACSname": params["pull"]["pacs"],
"copyInputFile": True
},
'verify-registration': {
"CUBEurl": self.api_base,
"folderName": params["push"]["Folder name"],
"neuroDicomLocation": params["push"]["Dicom path"],
"neuroAnonLocation": params["push"]["Dicom anonymized path"],
"neuroNiftiLocation": params["push"]["Nifti path"],
"PACSurl": params["pull"]["url"],
"PACSname": params["pull"]["pacs"],
"SMTPServer": params["notify"]["smtp_server"],
"recipients": params["notify"]["recipients"],
"largeSequenceSize": params["relay"]["largeSequenceSize"],
"largeSequencePollInterval": params["relay"]["largeSequencePollInterval"],
}
}
d_ret = await pipe.run_pipeline(
previous_inst = pv_id,
pipeline_name = "PACS query, retrieve, registration verification, and run pipeline in CUBE 20250806",
pipeline_params = plugin_params )
return d_ret
async def neuro_pull(self, neuro_location: str, feed_name: str, filter_str: str, job_params: dict):
"""
1. Pull data from the neuro tree
2. Run anonymization pipeline to the root node
"""
send_params: dict = job_params["push"]
LOG(f"Pulling {filter_str} from {neuro_location}")
ntf = Notification(self.api_base, self.auth)
neuro_plugin_id = ntf.get_plugin_id({"name": "pl-neurofiles-pull"})
# Run pl-neuro_pull using filters
neuro_inst_id = ntf.create_plugin_instance(neuro_plugin_id,
{
"path": neuro_location,
"include": filter_str,
"title": feed_name}
)
LOG(f"Created new analysis: {feed_name}")
# Run anonymization pipeline
pipe = Pipeline(self.api_base, self.auth)
plugin_params = {
'PACS-query': {
"PACSurl": job_params["pull"]["url"],
"PACSname": job_params["pull"]["pacs"],
"PACSdirective": json.dumps(job_params["search"])
},
'send-dicoms-to-neuro-FS': {
"path": f"{send_params['Dicom path']}/{send_params['Folder name']}/",
"include": "*.dcm",
"min_size": "0",
"timeout": "0",
"max_size": "1G",
"max_depth": "3"
},
'send-anon-dicoms-to-neuro-FS': {
"path": f"{send_params['Dicom anonymized path']}/{send_params['Folder name']}/",
"include": "*.dcm",
"min_size": "0",
"timeout": "0",
"max_size": "1G",
"max_depth": "3"
},
'send-niftii-to-neuro-FS': {
"path": f"{send_params['Nifti path']}/{send_params['Folder name']}/",
"include": "*",
"min_size": "0",
"timeout": "0",
"max_size": "1G",
"max_depth": "3"
},
# additional parameters for pipeline
'verify-registration': {
"PACSname": job_params["pull"]["pacs"],
"SMTPServer": job_params["notify"]["smtp_server"],
"recipients": job_params["notify"]["recipients"]
}
}
d_ret = await pipe.run_pipeline(
previous_inst=neuro_inst_id,
pipeline_name="DICOM anonymization, niftii conversion, and push to neuro tree v20250326",
pipeline_params=plugin_params)
return d_ret
def run_neuro_plugin(self, params: dict):
pass