-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathfilters.py
More file actions
345 lines (289 loc) · 15.8 KB
/
filters.py
File metadata and controls
345 lines (289 loc) · 15.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""
This module provides functions for flagging pandas data series based on a range of criteria. The functions are largely
intended for application in wind plant operational energy analysis, particularly wind speed vs. power curves.
"""
from __future__ import annotations
import numpy as np
import scipy as sp
import pandas as pd
from sklearn.cluster import KMeans
from openoa.utils._converters import (
series_to_df,
series_method,
dataframe_method,
convert_args_to_lists,
)
def range_flag(
data: pd.DataFrame | pd.Series,
lower: float | list[float],
upper: float | list[float],
col: list[str] | None = None,
) -> pd.Series | pd.DataFrame:
"""Flag data for which the specified data is outside the provided range of [lower, upper].
Args:
data (:obj:`pandas.Series` | `pandas.DataFrame`): data frame containing the column to be flagged;
can either be a ``pandas.Series`` or ``pandas.DataFrame``. If a ``pandas.DataFrame``, a list of
threshold values and columns (if checking a subset of the columns) must be provided.
col (:obj:`list[str]`): column(s) in :pyattr:`data` to be flagged, by default None. Only
required when the `data` is a ``pandas.DataFrame`` and a subset of the columns will be
checked. Must be the same length as :py:attr:`lower` and :py:attr:`upper`.
lower (:obj:`float` | `list[float]`): lower threshold (inclusive) for each element of :py:attr:`data`,
if it's a ``pd.Series``, or the list of lower thresholds for each column in `col`. If the same
threshold is applied to each column, then pass the single value, otherwise, it must be
the same length as :py:attr:`col` and :py:attr:`upper`.
upper (:obj:`float` | `list[float]`): upper threshold (inclusive) for each element of :py:attr:`data`,
if it's a ``pd.Series``, or the list of upper thresholds for each column in :py:attr:`col`. If the same
threshold is applied to each column, then pass the single value, otherwise, it must be
the same length as :py:attr:`lower` and :py:attr:`col`.
Returns:
:obj:`pandas.Series` | `pandas.DataFrame`: Series or DataFrame (depending on :py:attr:`data` type) with
boolean entries.
"""
# Prepare the inputs to be standardized for use with DataFrames
if to_series := isinstance(data, pd.Series):
data, col = series_to_df(data)
if col is None:
col = data.columns.tolist()
upper, lower = convert_args_to_lists(len(col), upper, lower)
if len(col) != len(lower) != len(upper):
raise ValueError("The inputs to `col`, `above`, and `below` must be the same length.")
# Only flag the desired columns
subset = data.loc[:, col].copy()
flag = ~(subset.ge(lower) & subset.le(upper))
# Return back a pd.Series if one was provided, else a pd.DataFrame
return flag[col[0]] if to_series else flag
def unresponsive_flag(
data: pd.DataFrame | pd.Series,
threshold: int = 3,
col: list[str] | None = None,
) -> pd.Series | pd.DataFrame:
"""Flag time stamps for which the reported data does not change for `threshold` repeated intervals.
Args:
data (:obj:`pandas.Series` | `pandas.DataFrame`): data frame containing the column to be flagged;
can either be a `pandas.Series` or ``pandas.DataFrame``. If a ``pandas.DataFrame``, a list of
threshold values and columns (if checking a subset of the columns) must be provided.
col (:obj:`list[str]`): column(s) in `data` to be flagged, by default None. Only required when
the `data` is a ``pandas.DataFrame`` and a subset of the columns will be checked. Must be
the same length as :py:attr:`lower` and :py:attr:`upper`.
threshold (:obj:`int`): number of intervals over which measurment does not change for each
element of :py:attr:`data`, regardless if it's a ``pd.Series`` or ``pd.DataFrame``.
Defaults to 3.
Returns:
:obj:`pandas.Series` | `pandas.DataFrame`: Series or DataFrame (depending on ``data`` type) with
boolean entries.
"""
# Prepare the inputs to be standardized for use with DataFrames
if to_series := isinstance(data, pd.Series):
data, col = series_to_df(data)
if col is None:
col = data.columns.tolist()
if not isinstance(threshold, int):
raise TypeError("The input to `threshold` must be an integer.")
# Get boolean value of the difference in successive time steps is not equal to zero, and take the
# rolling sum of the boolean diff column in period lengths defined by threshold
subset = data.loc[:, col].copy()
flag = subset.diff(axis=0).ne(0).rolling(threshold - 1).sum()
# Create boolean series that is True if rolling sum is zero
flag = flag == 0
# Need to flag preceding `threshold` values as well
flag = flag | np.any(
[flag.shift(-1 - i, axis=0, fill_value=False) for i in range(threshold - 1)], axis=0
)
# Return back a pd.Series if one was provided, else a pd.DataFrame
return flag[col[0]] if to_series else flag
def std_range_flag(
data: pd.DataFrame | pd.Series,
threshold: float | list[float] = 2.0,
col: list[str] | None = None,
) -> pd.Series | pd.DataFrame:
"""Flag time stamps for which the measurement is outside of the threshold number of standard deviations
from the mean across the data.
... note:: This method does not distinguish between asset IDs.
Args:
data (:obj:`pandas.Series` | `pandas.DataFrame`): data frame containing the column to be flagged;
can either be a ``pandas.Series`` or ``pandas.DataFrame``. If a ``pandas.DataFrame``, a list of
threshold values and columns (if checking a subset of the columns) must be provided.
col (:obj:`list[str]`): column(s) in :py:attr:`data` to be flagged, by default None. Only required when
the :py:attr:`data` is a `pandas.DataFrame` and a subset of the columns will be checked. Must be
the same length as :py:attr:`lower` and :py:attr:`upper`.
threshold (:obj:`float` | `list[float]`): multiplicative factor on the standard deviation of :py:attr:`data`,
if it's a ``pd.Series``, or the list of multiplicative factors on the standard deviation for
each column in :py:attr:`col`. If the same factor is applied to each column, then pass the single
value, otherwise, it must be the same length as :py:attr:`col` and :py:attr:`upper`.
Returns:
:obj:`pandas.Series` | `pandas.DataFrame`: Series or DataFrame (depending on :py:attr:`data` type) with
boolean entries.
"""
# Prepare the inputs to be standardized for use with DataFrames
if to_series := isinstance(data, pd.Series):
data, col = series_to_df(data)
if col is None:
col = data.columns.tolist()
threshold, *_ = convert_args_to_lists(len(col), threshold)
if len(col) != len(threshold):
raise ValueError("The inputs to `col` and `threshold` must be the same length.")
subset = data.loc[:, col].copy()
data_mean = np.nanmean(subset.values, axis=0)
data_std = np.nanstd(subset.values, ddof=1, axis=0) * np.array(threshold)
flag = subset.le(data_mean - data_std) | subset.ge(data_mean + data_std)
# Return back a pd.Series if one was provided, else a pd.DataFrame
return flag[col[0]] if to_series else flag
@series_method(data_cols=["window_col", "value_col"])
def window_range_flag(
window_col: str | pd.Series = None,
window_start: float = -np.inf,
window_end: float = np.inf,
value_col: str | pd.Series = None,
value_min: float = -np.inf,
value_max: float = np.inf,
data: pd.DataFrame = None,
) -> pd.Series:
"""Flag time stamps for which measurement in `window_col` are within the range: [`window_start`, `window_end`], and
the measurements in `value_col` are outside of the range [`value_min`, `value_max`].
Args:
data (:obj:`pandas.DataFrame`): data frame containing the columns :py:attr:`window_col` and
`value_col`, by default None.
window_col (:obj:`str` | `pandas.Series`): Name of the column or used to define the window
range or the data as a pandas Series, by default None.
window_start(:obj:`float`): minimum value for the inclusive window, by default -np.inf.
window_end(:obj:`float`): maximum value for the inclusive window, by default np.inf.
value_col (:obj:`str` | `pandas.Series`): Name of the column used to define the value range
or the data as a pandas Series, by default None.
value_max(:obj:`float`): upper threshold for the inclusive data range; default np.inf
value_min(:obj:`float`): lower threshold for the inclusive data range; default -np.inf
Returns:
:obj:`pandas.Series`: Series with boolean entries.
"""
flag = window_col.between(window_start, window_end) & ~value_col.between(value_min, value_max)
return flag
@series_method(data_cols=["bin_col", "value_col"])
def bin_filter(
bin_col: pd.Series | str,
value_col: pd.Series | str,
bin_width: float,
threshold: float = 2,
center_type: str = "mean",
bin_min: float = None,
bin_max: float = None,
threshold_type: str = "std",
direction: str = "all",
data: pd.DataFrame = None,
):
"""Flag time stamps for which data in `value_col` when binned by data in `bin_col` into bins of
width `bin_width` are outside the `threhsold` bin. The `center_type` of each bin can be either the
median or mean, and flagging can be applied directionally (i.e. above or below the center, or both)
Args:
bin_col(:obj:`pandas.Series` | `str`): The Series or column in :py:attr:`data` to be used for binning.
value_col(:obj:`pandas.Series`): The Series or column in :py:attr:`data` to be flagged.
bin_width(:obj:`float`): Width of bin in units of :py:attr:`bin_col`
threshold(:obj:`float`): Outlier threshold (multiplicative factor of std of `value_col` in bin)
bin_min(:obj:`float`): Minimum bin value below which flag should not be applied
bin_max(:obj:`float`): Maximum bin value above which flag should not be applied
threshold_type(:obj:`str`): Option to apply a 'std', 'scalar', or 'mad' (median absolute deviation)
based threshold
center_type(:obj:`str`): Option to use a 'mean' or 'median' center for each bin
direction(:obj:`str`): Option to apply flag only to data 'above' or 'below' the mean, by default 'all'
data(:obj:`pd.DataFrame`): DataFrame containing both :py:attr:`bin_col` and :py:attr:`value_col`, if data
are part of the same DataFrame, by default None.
Returns:
:obj:`pandas.Series(bool)`: Array-like object with boolean entries.
"""
if center_type not in ("mean", "median"):
raise ValueError("Incorrect `center_type` specified; must be one of 'mean' or 'median'.")
if threshold_type not in ("std", "scalar", "mad"):
raise ValueError("Incorrect `threshold_type` specified; must be one of 'std' or 'scalar'.")
if direction not in ("all", "above", "below"):
raise ValueError(
"Incorrect `direction` specified; must be one of 'all', 'above', or 'below'."
)
# Set bin min and max values if not passed to function
if bin_min is None:
bin_min = np.min(bin_col.values)
if bin_max is None:
bin_max = np.max(bin_col.values)
# Define bin edges
bin_edges = np.arange(bin_min, bin_max, bin_width)
# Ensure the last bin edge value is bin_max
bin_edges = np.unique(np.clip(np.append(bin_edges, bin_max), bin_min, bin_max))
# Bin the data and recreate the comparison data as a multi-column data frame
which_bin_col = np.digitize(bin_col, bin_edges, right=True)
# Create the flag values as a matrix with each column being the timestamp's binned value,
# e.g., all columns values are NaN if the data point is not in that bin
flag_vals = (
value_col.to_frame().set_index(pd.Series(which_bin_col, name="bin"), append=True).unstack()
)
drop = [i for i, el in enumerate(flag_vals.columns.names) if el != "bin"]
flag_vals.columns = flag_vals.columns.droplevel(drop).rename(None)
# Create a False array as default, so flags are set to True
flag_df = pd.DataFrame(np.zeros_like(flag_vals, dtype=bool), index=flag_vals.index)
# Get center of binned data
if center_type == "median":
center = np.nanmedian(flag_vals.values, axis=0)
else:
center = np.nanmean(flag_vals.values, axis=0)
center = pd.DataFrame(
np.full(flag_vals.shape, center),
index=flag_vals.index,
columns=flag_vals.columns,
)
# Define threshold of data flag
if threshold_type == "std":
deviation = np.nanstd(flag_vals.values, ddof=1, axis=0) * threshold
elif threshold_type == "scalar":
deviation = threshold
else: # median absolute deviation (mad)
deviation = np.nanmedian(np.abs(flag_vals.values - center.values), axis=0) * threshold
# Perform flagging depending on specfied direction
if direction in ("above", "all"):
flag_df |= flag_vals > center + deviation
if direction in ("below", "all"):
flag_df |= flag_vals < center - deviation
# Get all instances where the value is True, and reset any values outside the bin limits
flag = pd.Series(np.nanmax(flag_df, axis=1), index=flag_df.index, dtype="bool")
flag.loc[(bin_col <= bin_min) | (bin_col > bin_max)] = False
return flag
@dataframe_method(data_cols=["data_col1", "data_col2"])
def cluster_mahalanobis_2d(
data_col1: pd.Series | str,
data_col2: pd.Series | str,
n_clusters: int = 13,
dist_thresh: float = 3.0,
data: pd.DataFrame = None,
) -> pd.Series:
"""K-means clustering of data into `n_cluster` clusters; Mahalanobis distance evaluated for each cluster and
points with distances outside of `dist_thresh` are flagged; distinguishes between asset IDs.
Args:
data_col1(:obj:`pandas.Series` | `str`): Series or column :py:attr:`data` corresponding to the first
data column in a 2D cluster analysis
data_col2(:obj:`pandas.Series` | `str`): Series or column :py:attr:`data` corresponding to the second
data column in a 2D cluster analysis
n_clusters(:obj:`int`):' number of clusters to use
dist_thresh(:obj:`float`): maximum Mahalanobis distance within each cluster for data to be remain unflagged
data(:obj:`pd.DataFrame`): DataFrame containing both :py:attr:`data_col1` and :py:attr:`data_col2`, if data
are part of the same DataFrame, by default None.
Returns:
:obj:`pandas.Series(bool)`: Array-like object with boolean entries.
"""
data = data.loc[:, [data_col1, data_col2]].copy()
kmeans = KMeans(n_clusters=n_clusters).fit(data)
# Define empty flag of 'False' values with indices matching value_col
flag = pd.Series(index=data.index, data=False)
# Loop through clusters and flag data that fall outside a threshold distance from cluster center
for i in range(n_clusters):
# Extract data for cluster
clust_sub = kmeans.labels_ == i
cluster = data.loc[clust_sub]
# Cluster centroid
centroid = kmeans.cluster_centers_[i]
# Cluster covariance and inverse covariance
covmx = cluster.cov()
invcovmx = sp.linalg.inv(covmx)
# Compute mahalnobis distance of each point in cluster
mahalanobis_dist = cluster.apply(
lambda r: sp.spatial.distance.mahalanobis(r.values, centroid, invcovmx), axis=1
)
# Flag data outside the distance threshold
flag_bin = mahalanobis_dist > dist_thresh
# Record flags in final flag column
flag.loc[flag_bin.index] = flag_bin
return flag