-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathConjoinDiagnosticWorkspaces.py
More file actions
168 lines (149 loc) · 6.23 KB
/
ConjoinDiagnosticWorkspaces.py
File metadata and controls
168 lines (149 loc) · 6.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from typing import List
from mantid.api import MatrixWorkspace, PythonAlgorithm, WorkspaceGroupProperty
from mantid.dataobjects import TableWorkspace
from mantid.kernel import Direction, IntPropertyWithValue
from mantid.simpleapi import (
BufferMissingColumnsAlgo,
CloneWorkspace,
ConjoinTableWorkspaces,
ConjoinWorkspaces,
DeleteWorkspace,
ExtractSingleSpectrum,
GroupWorkspaces,
RenameWorkspace,
mtd,
)
from snapred.backend.log.logger import snapredLogger
from snapred.meta.mantid.FitPeaksOutput import FIT_PEAK_DIAG_SUFFIX, FitOutputEnum
logger = snapredLogger.getLogger(__name__)
class ConjoinDiagnosticWorkspaces(PythonAlgorithm):
"""
Given the grouped diagnostic output from PDCalibration run on one spectum at a time,
combine the sub-workspaces in an intelligent way.
"""
INPUTGRPPROP1 = "DiagnosticWorkspace"
OUTPUTGRPPROP = "TotalDiagnosticWorkspace"
def category(self):
return "SNAPRed Diffraction Calibration"
def newNamesFromOld(self, oldNames: List[str], newName: str) -> List[str]:
selectedNames = set(self.diagnosticSuffix.values())
result = []
for oldName in oldNames:
elements = oldName.split("_")
suffix = next((f"_{x}" for x in elements if f"_{x}" in selectedNames), None)
if suffix is not None:
if self.diagnosticSuffix[FitOutputEnum.PeakPosition] in suffix:
result.append(f"__{newName}{suffix}") # Prepend "__" to the entire string
else:
result.append(f"{newName}{suffix}")
return result
def PyInit(self):
# declare properties
self.declareProperty(
WorkspaceGroupProperty(self.INPUTGRPPROP1, "", direction=Direction.Input),
doc="Table workspace from peak-fitting diagnosis.",
)
self.declareProperty(IntPropertyWithValue("AddAtIndex", 0))
self.declareProperty(
WorkspaceGroupProperty(self.OUTPUTGRPPROP, "", direction=Direction.Output),
doc="Result of conjoining the diagnostic workspaces",
)
self.declareProperty("AutoDelete", False)
self.setRethrows(True)
# NOTE must be in alphabetical order
self.diagnosticSuffix = FIT_PEAK_DIAG_SUFFIX.copy()
def PyExec(self) -> None:
self.autoDelete = self.getProperty("AutoDelete").value
index = self.getProperty("AddAtIndex").value
diag1 = self.getPropertyValue(self.INPUTGRPPROP1)
outws = self.getPropertyValue(self.OUTPUTGRPPROP)
# sort by name to pevent bad things from happening
mtd[diag1].sortByName()
oldNames = mtd[diag1].getNames()
newNames = self.newNamesFromOld(oldNames, outws)
# if the input is expected to autodelete, it must be ungrouped first
if self.autoDelete:
for name in oldNames:
mtd[diag1].remove(name)
if index == 0:
for old, new in zip(oldNames, newNames):
if self.autoDelete:
RenameWorkspace(
InputWorkspace=old,
OutputWorkspace=new,
)
else:
CloneWorkspace(
InputWorkspace=old,
OutputWorkspace=new,
)
if isinstance(mtd[new], MatrixWorkspace) and index < mtd[new].getNumberHistograms():
ExtractSingleSpectrum(
InputWorkspace=new,
OutputWorkspace=new,
WorkspaceIndex=index,
)
GroupWorkspaces(
InputWorkspaces=newNames,
OutputWorkspace=outws,
)
else:
for old, new in zip(oldNames, newNames):
ws = mtd[old]
if isinstance(ws, MatrixWorkspace):
self.conjoinMatrixWorkspaces(old, new, index)
elif isinstance(ws, TableWorkspace):
self.conjoinTableWorkspaces(old, new, index)
else:
raise RuntimeError(f"Unrecognized workspace type {type(ws)}")
if self.autoDelete:
for oldName in oldNames:
if oldName in mtd:
DeleteWorkspace(oldName)
self.setProperty(self.OUTPUTGRPPROP, mtd[outws])
def conjoinMatrixWorkspaces(self, inws, outws, index):
tmpws = f"{inws}_{index}"
if index < mtd[inws].getNumberHistograms():
ExtractSingleSpectrum(
InputWorkspace=inws,
Outputworkspace=tmpws,
WorkspaceIndex=index,
)
else:
CloneWorkspace(
InputWorkspace=inws,
OutputWorkspace=tmpws,
)
logger.debug(f"{outws} already has spectrum numbers of {mtd[outws].getSpectrumNumbers()}")
logger.debug(f"Conjoining {tmpws} with {outws}, adding spectrum numbers of {mtd[tmpws].getSpectrumNumbers()}")
specNumbers = list(mtd[outws].getSpectrumNumbers())
specNumbers.extend(list(mtd[tmpws].getSpectrumNumbers()))
ConjoinWorkspaces(
InputWorkspace1=outws,
InputWorkspace2=tmpws,
CheckOverlapping=False,
CheckMatchingBins=False, # Not available in 6.11.0.3rc2
)
# TODO: Remove when Defect 14460 is resolved.
# There is a defect in ConjoinWorkspaces that incorrectly determines
# if spectrum numbers need to be remapped.
for i, specNum in enumerate(specNumbers):
mtd[outws].getSpectrum(i).setSpectrumNo(specNum)
logger.debug(f"resulting spectrum numbers: {mtd[outws].getSpectrumNumbers()}")
if self.autoDelete and inws in mtd:
DeleteWorkspace(inws)
assert outws in mtd
def conjoinTableWorkspaces(self, inws, outws, index): # noqa: ARG002
BufferMissingColumnsAlgo(
Workspace1=inws,
Workspace2=outws,
)
BufferMissingColumnsAlgo(
Workspace1=outws,
Workspace2=inws,
)
ConjoinTableWorkspaces(
InputWorkspace1=outws,
InputWorkspace2=inws,
AutoDelete=self.autoDelete,
)