Skip to content

Commit 47a4f77

Browse files
author
Giannis Mitropoulos
committed
change: apply feature thresholds before extracting features & add cap in total generated features.
- preselect features to be extracted, saving memory and time used to extract redundant features. - apply a cap in total features generated, currently each sequence can generate up to 781 features, slowing down inference, for many sequences.
1 parent 27dd611 commit 47a4f77

File tree

2 files changed

+129
-41
lines changed

2 files changed

+129
-41
lines changed

src/sagemaker_sklearn_extension/feature_extraction/sequences.py

Lines changed: 66 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
from tsfresh.utilities.dataframe_functions import impute
2525
from tsfresh.defaults import N_PROCESSES # the default number of processes used by TSFresh, equals to n_vcores/2
2626

27-
28-
27+
TOTAL_EXPANSION_THRESHOLD = 2500
2928
DEFAULT_INPUT_SEQUENCE_LENGTH = 1000
3029
SEQUENCE_EXPANSION_FACTOR = 2.5
3130
# do not use TSFresh parallelism in container serve(transform), does not work with server's workers
@@ -132,7 +131,10 @@ def fit(self, X, y=None):
132131
raise ValueError(
133132
f"length of sequences_lengths_q25 should be equal to number of columns in X (={X.shape[1]})."
134133
)
135-
134+
# cap total expansion for all columns
135+
expansion_thresholds = np.ceil(
136+
(self.sequences_lengths_q25 / np.sum(self.sequences_lengths_q25)) * TOTAL_EXPANSION_THRESHOLD
137+
)
136138
ts_flattener = TSFlattener(max_allowed_length=self.max_allowed_length, trim_beginning=self.trim_beginning)
137139
tsfresh_feature_extractors = []
138140
for sequence_column_i, sequence_column in enumerate(X.T):
@@ -143,6 +145,7 @@ def fit(self, X, y=None):
143145
extraction_type=self.extraction_type,
144146
extraction_seed=self.extraction_seed,
145147
sequence_length_q25=self.sequences_lengths_q25[sequence_column_i],
148+
expansion_threshold=int(expansion_thresholds[sequence_column_i]),
146149
)
147150
tsfresh_feature_extractor.fit(numeric_sequences)
148151
tsfresh_feature_extractors.append(tsfresh_feature_extractor)
@@ -345,14 +348,18 @@ def __init__(
345348
extraction_type="efficient",
346349
extraction_seed=0,
347350
sequence_length_q25=None,
351+
expansion_threshold=None,
348352
):
349353
super().__init__()
350354
self.augment = augment
351355
self.interpolation_method = interpolation_method
352356
self.extraction_type = extraction_type
353357
self.feature_sampling_seed = extraction_seed
354358
self.sequence_length_q25 = sequence_length_q25 or DEFAULT_INPUT_SEQUENCE_LENGTH
355-
self.expansion_threshold = self._compute_expansion_threshold(self.sequence_length_q25)
359+
expansion_threshold = expansion_threshold or self._compute_expansion_threshold(self.sequence_length_q25)
360+
self.expansion_threshold = min(expansion_threshold, self._compute_expansion_threshold(self.sequence_length_q25))
361+
# expansion_threshold will be the stricter between the one computed for this column and the one respecting
362+
# the total expansion for all columns
356363

357364
def fit(self, X, y=None):
358365
# Nothing to learn during fit.
@@ -370,11 +377,7 @@ def transform(self, X, y=None):
370377
tsfresh_features : np.array
371378
372379
"""
373-
transform_thresholds = [self._compute_expansion_threshold(len(seq)) for seq in X]
374380
tsfresh_features, X_df = self._extract_tsfresh_features(X)
375-
tsfresh_features = self._filter_features(
376-
tsfresh_features, mode="transform", transform_thresholds=transform_thresholds
377-
)
378381
if self.augment:
379382
# Stack the extracted features to the original sequences in X, after padding with np.nans any shorter
380383
# input sequences in X to match the length of the longest sequence, and imputing missing values as
@@ -462,51 +465,74 @@ def _extract_tsfresh_features(self, X):
462465
else:
463466
settings = ComprehensiveFCParameters()
464467
settings = {k: v for k, v in settings.items() if k not in min_settings}
465-
tsfresh_features_extra = extract_features(
466-
X_df_no_nans,
467-
default_fc_parameters=settings,
468-
column_id="id",
469-
column_sort="time",
470-
impute_function=impute,
471-
n_jobs=N_TSFRESH_JOBS,
472-
)
473-
self.extra_settings_card = tsfresh_features_extra.shape[1]
474-
tsfresh_features = pd.concat([tsfresh_features, tsfresh_features_extra], axis=1)
468+
469+
self._apply_feature_threshold(settings)
470+
if settings:
471+
# check that efficient strategies are not emptied when applying expansion threshold
472+
tsfresh_features_extra = extract_features(
473+
X_df_no_nans,
474+
default_fc_parameters=settings,
475+
column_id="id",
476+
column_sort="time",
477+
impute_function=impute,
478+
n_jobs=N_TSFRESH_JOBS,
479+
)
480+
tsfresh_features = pd.concat([tsfresh_features, tsfresh_features_extra], axis=1)
475481

476482
# If X_df.dropna() dropped some observations entirely (i.e., due to all NaNs),
477483
# impute each tsfresh feature for those observations with the median of that tsfresh feature
478484
tsfresh_features_imputed = impute(tsfresh_features.reindex(pd.RangeIndex(X_df["id"].max() + 1)))
479485
return tsfresh_features_imputed, X_df
480486

481-
def _filter_features(self, tsfresh_features, mode="transform", transform_thresholds=None):
482-
if self.expansion_threshold < self.min_settings_card:
483-
raise ValueError(
484-
f"Provided filter threshold(s) (= {self.expansion_threshold}) can not be smaller than "
485-
f"number of features generated by minimal settings (= {self.min_settings_card})"
486-
)
487-
filter_order = np.arange(self.min_settings_card, tsfresh_features.shape[1])
487+
def _apply_feature_threshold(self, settings):
488+
"""Accepts a settings dictionary, with all the possible generated features,
489+
and filters features if needed until their count matches the given "self.expansion_threshold"
490+
(minus minimal features).
491+
Does that in a reproducible "random" way, controlled by "self.feature_sampling_seed".
492+
Draws Random indexes to be filtered, then iterates over the settings dictionary assigning an index to each value
493+
and performs the filtering based on that index.
494+
"""
495+
settings.pop("linear_trend_timewise", None) # remove these 5 features that need dateTime indexes for sequences
496+
max_available_features = self._get_features_count(settings)
497+
if self.expansion_threshold >= max_available_features + self.min_settings_card:
498+
return # no need to limit
499+
500+
filter_order = np.arange(max_available_features)
488501
random_state = np.random.get_state()
489502
np.random.seed(self.feature_sampling_seed)
490503
np.random.shuffle(filter_order)
491504
np.random.set_state(random_state)
492-
survivors = list(range(self.min_settings_card)) + list(
493-
filter_order[: self.expansion_threshold - self.min_settings_card]
494-
)
495-
tsfresh_features = tsfresh_features.iloc[:, survivors]
496-
497-
if mode == "transform":
498-
if len(transform_thresholds) != tsfresh_features.shape[0]:
499-
raise ValueError(
500-
f"In 'transform' mode transform_thresholds should have number of entries "
501-
f"(= {len(transform_thresholds)}) that corresponds to the number of records "
502-
f"in tsfresh_features (= {tsfresh_features.shape[0]})."
503-
)
504-
for thrsh_i, thrsh in enumerate(transform_thresholds):
505-
tsfresh_features.iloc[thrsh_i, thrsh:] = 0
506-
return tsfresh_features
505+
removed_indices = list(filter_order[max(0, self.expansion_threshold - self.min_settings_card) :])
506+
removed_indices.sort()
507+
508+
feature_idx = 0
509+
for k in list(settings.keys()):
510+
if isinstance(settings[k], list):
511+
survived_list = []
512+
# case the value is a list, each list element is counted separately
513+
for index, _ in enumerate(settings[k]):
514+
if removed_indices and removed_indices[0] == feature_idx:
515+
del removed_indices[0]
516+
else:
517+
survived_list.append(settings[k][index])
518+
feature_idx += 1
519+
# copy the "survived", features to the final list. if no one survived, delete the settings key.
520+
if survived_list:
521+
settings[k] = survived_list
522+
else:
523+
del settings[k]
524+
else:
525+
# case the value is None, count it as one feature
526+
if removed_indices and removed_indices[0] == feature_idx:
527+
del removed_indices[0]
528+
del settings[k]
529+
feature_idx += 1
507530

508531
def _compute_expansion_threshold(self, input_len):
509532
return int(max(ceil(SEQUENCE_EXPANSION_FACTOR * input_len + 1) + 1, 10))
510533

511534
def _more_tags(self):
512535
return {"_skip_test": True, "allow_nan": True}
536+
537+
def _get_features_count(self, settings):
538+
return sum([len(v) if isinstance(v, list) else 1 for v in settings.values()])

test/test_sequence_transformer.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@
4242
[",,,"],
4343
[",,,"],
4444
]
45-
45+
X_sequence_5_columns = [
46+
["1, 2", "11, 12", "1, 4", "11, 99", "7, 4"],
47+
["1, 5", "11, 99", "7, 4", "71, 88", "7, 2"],
48+
["1, 33", "11, 88", "1, 2", "1, 7", "11, 99"],
49+
]
4650

4751
# with variable-length inputs
4852
X_sequence_varying_length = [["1, 2"], ["11, 111"], ["2, 3, 1, 4"]]
@@ -256,6 +260,64 @@ def test_time_series_expansion_control(sequences_lengths_q25, feats_num):
256260
assert X_out.shape[1] == feats_num
257261

258262

263+
def test_time_series_expansion_control_seed():
264+
time_series_feature_extractor = TSFeatureExtractor(
265+
extraction_type="efficient", augment=False, sequences_lengths_q25=[5], extraction_seed=27
266+
)
267+
time_series_feature_extractor.fit(X_sequence)
268+
X_out = time_series_feature_extractor.tsfresh_feature_extractors_[0].transform(
269+
TSFlattener().transform(enumerate(np.array(X_sequence).T).__next__()[1].reshape(-1, 1))
270+
)
271+
assert (
272+
list(X_out.columns.values).sort()
273+
== [
274+
"0__sum_values",
275+
"0__median",
276+
"0__mean",
277+
"0__length",
278+
"0__standard_deviation",
279+
"0__variance",
280+
"0__root_mean_square",
281+
"0__maximum",
282+
"0__minimum",
283+
"0__cwt_coefficients__coeff_14__w_5__widths_(2, 5, 10, 20)",
284+
'0__fft_coefficient__attr_"imag"__coeff_85',
285+
'0__fft_coefficient__attr_"abs"__coeff_19',
286+
'0__fft_coefficient__attr_"abs"__coeff_72',
287+
'0__fft_coefficient__attr_"angle"__coeff_63',
288+
"0__energy_ratio_by_chunks__num_segments_10__segment_focus_4",
289+
].sort()
290+
)
291+
292+
293+
@pytest.mark.parametrize(
294+
"sequences_lengths_q25, expansion_thresholds",
295+
[([500, 200, 1000, 2000, 100], [329, 132, 658, 1316, 66]), ([5, 10, 20, 15, 25], [15, 27, 52, 40, 65])],
296+
)
297+
def test_time_series_expansion_control_across_columns(sequences_lengths_q25, expansion_thresholds):
298+
time_series_feature_extractor = TSFeatureExtractor(
299+
extraction_type="efficient", augment=False, sequences_lengths_q25=sequences_lengths_q25
300+
)
301+
time_series_feature_extractor.fit(X_sequence_5_columns)
302+
for i, extractor in enumerate(time_series_feature_extractor.tsfresh_feature_extractors_):
303+
assert expansion_thresholds[i] == extractor.expansion_threshold
304+
305+
306+
@pytest.mark.parametrize(
307+
"settings, expansion_threshold, expected_settings",
308+
[
309+
({"k1": None, "k2": ["v1", "v2"], "k3": None}, 1, {"k2": ["v2"]}),
310+
({"k2": ["v1", "v2", "v3"], "k3": None}, 2, {"k2": ["v3"], "k3": None}),
311+
({"k1": None, "k2": ["v1", "v2"], "k3": None}, 15, {"k1": None, "k2": ["v1", "v2"], "k3": None}),
312+
],
313+
)
314+
def test_apply_feature_threshold(settings, expansion_threshold, expected_settings):
315+
time_series_feature_extractor = TSFreshFeatureExtractor(expansion_threshold=expansion_threshold)
316+
time_series_feature_extractor.min_settings_card = 0
317+
time_series_feature_extractor._apply_feature_threshold(settings)
318+
assert settings == expected_settings
319+
320+
259321
def test_time_series_all_nan_column():
260322
time_series_feature_extractor = TSFeatureExtractor(extraction_type="efficient", augment=False)
261323
X_out = time_series_feature_extractor.fit_transform(X_all_nan_column)

0 commit comments

Comments
 (0)