1010# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
1111# ANY KIND, either express or implied. See the License for the specific
1212# language governing permissions and limitations under the License.
13+ import os
1314from math import ceil
1415
1516import numpy as np
2122from tsfresh .feature_extraction import EfficientFCParameters
2223from tsfresh .feature_extraction import MinimalFCParameters
2324from tsfresh .utilities .dataframe_functions import impute
25+ from tsfresh .defaults import N_PROCESSES # the default number of processes used by TSFresh, equals to n_vcores/2
2426
25- from sagemaker_sklearn_extension .preprocessing .data import RobustStandardScaler
26-
27-
27+ TOTAL_EXPANSION_THRESHOLD = 2500
2828DEFAULT_INPUT_SEQUENCE_LENGTH = 1000
2929SEQUENCE_EXPANSION_FACTOR = 2.5
30+ # do not use TSFresh parallelism in container serve(transform), does not work with server's workers
31+ N_TSFRESH_JOBS = 0 if os .environ .get ("SAGEMAKER_PROGRAM" ) == "sagemaker_serve" else N_PROCESSES
3032
3133
3234class TSFeatureExtractor (BaseEstimator , TransformerMixin ):
@@ -129,7 +131,10 @@ def fit(self, X, y=None):
129131 raise ValueError (
130132 f"length of sequences_lengths_q25 should be equal to number of columns in X (={ X .shape [1 ]} )."
131133 )
132-
134+ # cap total expansion for all columns
135+ expansion_thresholds = np .ceil (
136+ (self .sequences_lengths_q25 / np .sum (self .sequences_lengths_q25 )) * TOTAL_EXPANSION_THRESHOLD
137+ )
133138 ts_flattener = TSFlattener (max_allowed_length = self .max_allowed_length , trim_beginning = self .trim_beginning )
134139 tsfresh_feature_extractors = []
135140 for sequence_column_i , sequence_column in enumerate (X .T ):
@@ -140,6 +145,7 @@ def fit(self, X, y=None):
140145 extraction_type = self .extraction_type ,
141146 extraction_seed = self .extraction_seed ,
142147 sequence_length_q25 = self .sequences_lengths_q25 [sequence_column_i ],
148+ expansion_threshold = int (expansion_thresholds [sequence_column_i ]),
143149 )
144150 tsfresh_feature_extractor .fit (numeric_sequences )
145151 tsfresh_feature_extractors .append (tsfresh_feature_extractor )
@@ -315,12 +321,6 @@ class TSFreshFeatureExtractor(BaseEstimator, TransformerMixin):
315321 List contianing 25th percentile of sequence lengths for each column at the train step.
316322 If not provided, default value will be assigned (DEFAULT_INPUT_SEQUENCE_LENGTH).
317323
318- Attributes
319- ----------
320- self.robust_standard_scaler_ : ``sagemaker_sklearn_extension.preprocessing.data.RobustStandardScaler``
321- - `robust_standard_scaler_` is instantiated inside the fit method used for computing the mean and
322- the standard deviation.
323-
324324
325325 Examples
326326 --------
@@ -348,23 +348,21 @@ def __init__(
348348 extraction_type = "efficient" ,
349349 extraction_seed = 0 ,
350350 sequence_length_q25 = None ,
351+ expansion_threshold = None ,
351352 ):
352353 super ().__init__ ()
353354 self .augment = augment
354355 self .interpolation_method = interpolation_method
355356 self .extraction_type = extraction_type
356357 self .feature_sampling_seed = extraction_seed
357358 self .sequence_length_q25 = sequence_length_q25 or DEFAULT_INPUT_SEQUENCE_LENGTH
358- self .expansion_threshold = self ._compute_expansion_threshold (self .sequence_length_q25 )
359- self .robust_standard_scaler_ = RobustStandardScaler ()
359+ expansion_threshold = expansion_threshold or self ._compute_expansion_threshold (self .sequence_length_q25 )
360+ self .expansion_threshold = min (expansion_threshold , self ._compute_expansion_threshold (self .sequence_length_q25 ))
361+ # expansion_threshold will be the stricter between the one computed for this column and the one respecting
362+ # the total expansion for all columns
360363
361364 def fit (self , X , y = None ):
362- tsfresh_features , _ = self ._extract_tsfresh_features (X )
363-
364- # not all features included due to data expansion control
365- tsfresh_features = self ._filter_features (tsfresh_features , mode = "train" )
366-
367- self .robust_standard_scaler_ .fit (tsfresh_features )
365+ # Nothing to learn during fit.
368366 return self
369367
370368 def transform (self , X , y = None ):
@@ -379,13 +377,7 @@ def transform(self, X, y=None):
379377 tsfresh_features : np.array
380378
381379 """
382- check_is_fitted (self , "robust_standard_scaler_" )
383- transform_thresholds = [self ._compute_expansion_threshold (len (seq )) for seq in X ]
384380 tsfresh_features , X_df = self ._extract_tsfresh_features (X )
385- tsfresh_features = self ._filter_features (
386- tsfresh_features , mode = "transform" , transform_thresholds = transform_thresholds
387- )
388- tsfresh_features = self .robust_standard_scaler_ .transform (tsfresh_features )
389381 if self .augment :
390382 # Stack the extracted features to the original sequences in X, after padding with np.nans any shorter
391383 # input sequences in X to match the length of the longest sequence, and imputing missing values as
@@ -462,7 +454,7 @@ def _extract_tsfresh_features(self, X):
462454 column_id = "id" ,
463455 column_sort = "time" ,
464456 impute_function = impute ,
465- n_jobs = 0 ,
457+ n_jobs = N_TSFRESH_JOBS ,
466458 )
467459 self .min_settings_card = tsfresh_features .shape [1 ]
468460 # Minimal features computed indepdently to ensure they go first in the output,
@@ -473,51 +465,74 @@ def _extract_tsfresh_features(self, X):
473465 else :
474466 settings = ComprehensiveFCParameters ()
475467 settings = {k : v for k , v in settings .items () if k not in min_settings }
476- tsfresh_features_extra = extract_features (
477- X_df_no_nans ,
478- default_fc_parameters = settings ,
479- column_id = "id" ,
480- column_sort = "time" ,
481- impute_function = impute ,
482- n_jobs = 0 ,
483- )
484- self .extra_settings_card = tsfresh_features_extra .shape [1 ]
485- tsfresh_features = pd .concat ([tsfresh_features , tsfresh_features_extra ], axis = 1 )
468+
469+ self ._apply_feature_threshold (settings )
470+ if settings :
471+ # check that efficient strategies are not emptied when applying expansion threshold
472+ tsfresh_features_extra = extract_features (
473+ X_df_no_nans ,
474+ default_fc_parameters = settings ,
475+ column_id = "id" ,
476+ column_sort = "time" ,
477+ impute_function = impute ,
478+ n_jobs = N_TSFRESH_JOBS ,
479+ )
480+ tsfresh_features = pd .concat ([tsfresh_features , tsfresh_features_extra ], axis = 1 )
486481
487482 # If X_df.dropna() dropped some observations entirely (i.e., due to all NaNs),
488483 # impute each tsfresh feature for those observations with the median of that tsfresh feature
489484 tsfresh_features_imputed = impute (tsfresh_features .reindex (pd .RangeIndex (X_df ["id" ].max () + 1 )))
490485 return tsfresh_features_imputed , X_df
491486
492- def _filter_features (self , tsfresh_features , mode = "transform" , transform_thresholds = None ):
493- if self .expansion_threshold < self .min_settings_card :
494- raise ValueError (
495- f"Provided filter threshold(s) (= { self .expansion_threshold } ) can not be smaller than "
496- f"number of features generated by minimal settings (= { self .min_settings_card } )"
497- )
498- filter_order = np .arange (self .min_settings_card , tsfresh_features .shape [1 ])
487+ def _apply_feature_threshold (self , settings ):
488+ """Accepts a settings dictionary, with all the possible generated features,
489+ and filters features if needed until their count matches the given "self.expansion_threshold"
490+ (minus minimal features).
491+ Does that in a reproducible "random" way, controlled by "self.feature_sampling_seed".
492+ Draws Random indexes to be filtered, then iterates over the settings dictionary assigning an index to each value
493+ and performs the filtering based on that index.
494+ """
495+ settings .pop ("linear_trend_timewise" , None ) # remove these 5 features that need dateTime indexes for sequences
496+ max_available_features = self ._get_features_count (settings )
497+ if self .expansion_threshold >= max_available_features + self .min_settings_card :
498+ return # no need to limit
499+
500+ filter_order = np .arange (max_available_features )
499501 random_state = np .random .get_state ()
500502 np .random .seed (self .feature_sampling_seed )
501503 np .random .shuffle (filter_order )
502504 np .random .set_state (random_state )
503- survivors = list (range (self .min_settings_card )) + list (
504- filter_order [: self .expansion_threshold - self .min_settings_card ]
505- )
506- tsfresh_features = tsfresh_features .iloc [:, survivors ]
507-
508- if mode == "transform" :
509- if len (transform_thresholds ) != tsfresh_features .shape [0 ]:
510- raise ValueError (
511- f"In 'transform' mode transform_thresholds should have number of entries "
512- f"(= { len (transform_thresholds )} ) that corresponds to the number of records "
513- f"in tsfresh_features (= { tsfresh_features .shape [0 ]} )."
514- )
515- for thrsh_i , thrsh in enumerate (transform_thresholds ):
516- tsfresh_features .iloc [thrsh_i , thrsh :] = 0
517- return tsfresh_features
505+ removed_indices = list (filter_order [max (0 , self .expansion_threshold - self .min_settings_card ) :])
506+ removed_indices .sort ()
507+
508+ feature_idx = 0
509+ for k in list (settings .keys ()):
510+ if isinstance (settings [k ], list ):
511+ survived_list = []
512+ # case the value is a list, each list element is counted separately
513+ for index , _ in enumerate (settings [k ]):
514+ if removed_indices and removed_indices [0 ] == feature_idx :
515+ del removed_indices [0 ]
516+ else :
517+ survived_list .append (settings [k ][index ])
518+ feature_idx += 1
519+ # copy the "survived", features to the final list. if no one survived, delete the settings key.
520+ if survived_list :
521+ settings [k ] = survived_list
522+ else :
523+ del settings [k ]
524+ else :
525+ # case the value is None, count it as one feature
526+ if removed_indices and removed_indices [0 ] == feature_idx :
527+ del removed_indices [0 ]
528+ del settings [k ]
529+ feature_idx += 1
518530
519531 def _compute_expansion_threshold (self , input_len ):
520532 return int (max (ceil (SEQUENCE_EXPANSION_FACTOR * input_len + 1 ) + 1 , 10 ))
521533
522534 def _more_tags (self ):
523535 return {"_skip_test" : True , "allow_nan" : True }
536+
537+ def _get_features_count (self , settings ):
538+ return sum ([len (v ) if isinstance (v , list ) else 1 for v in settings .values ()])
0 commit comments