2
2
import numpy as np
3
3
import pandas as pd
4
4
import torch
5
- from typing import Dict , Tuple , Union , List
5
+ from typing import Dict , Tuple , Union , Optional , List
6
6
from flood_forecast .pre_dict import interpolate_dict
7
7
from flood_forecast .preprocessing .buil_dataset import get_data
8
8
from datetime import datetime
@@ -21,15 +21,17 @@ def __init__(
21
21
scaling = None ,
22
22
start_stamp : int = 0 ,
23
23
end_stamp : int = None ,
24
+ gcp_service_key : Optional [str ] = None ,
24
25
interpolate_param : bool = False ,
25
26
sort_column = None ,
26
27
scaled_cols = None ,
27
28
feature_params = None ,
28
29
no_scale = False ,
30
+ preformatted_df = False
29
31
30
32
):
31
- """A data loader that takes a CSV file and properly batches for use in training/eval a PyTorch model.
32
-
33
+ """
34
+ A data loader that takes a CSV file and properly batches for use in training/eval a PyTorch model
33
35
:param file_path: The path to the CSV file you wish to use (GCS compatible) or a Pandas dataframe.
34
36
:param forecast_history: This is the length of the historical time series data you wish to
35
37
utilize for forecasting
@@ -40,12 +42,10 @@ def __init__(
40
42
:param scaling: (highly reccomended) If provided should be a subclass of sklearn.base.BaseEstimator
41
43
and sklearn.base.TransformerMixin) i.e StandardScaler, MaxAbsScaler, MinMaxScaler, etc) Note without
42
44
a scaler the loss is likely to explode and cause infinite loss which will corrupt weights
43
- :param start_stamp: Optional if you want to only use part of a CSV for training, validation
45
+ :param start_stamp int : Optional if you want to only use part of a CSV for training, validation
44
46
or testing supply these
45
- :type start_stamp: int, optional
46
- :param end_stamp: Optional if you want to only use part of a CSV for training, validation,
47
- or testing supply these
48
- :type end_stamp: int, optional
47
+ :param end_stamp int: Optional if you want to only use part of a CSV for training, validation,
48
+ or testing supply these
49
49
:param sort_column str: The column to sort the time series on prior to forecast.
50
50
:param scaled_cols: The columns you want scaling applied to (if left blank will default to all columns)
51
51
:param feature_params: These are the datetime features you want to create.
@@ -122,13 +122,13 @@ def __len__(self) -> int:
122
122
len (self .df .index ) - self .forecast_history - self .forecast_length - 1
123
123
)
124
124
125
- def __sample_and_track_series__ (self , idx : int , series_id = None ):
125
+ def __sample_and_track_series__ (self , idx , series_id = None ):
126
126
pass
127
127
128
128
def inverse_scale (
129
129
self , result_data : Union [torch .Tensor , pd .Series , np .ndarray ]
130
130
) -> torch .Tensor :
131
- """Un-does the scaling of the data.
131
+ """Un-does the scaling of the data
132
132
133
133
:param result_data: The data you want to unscale can handle multiple data types.
134
134
:type result_data: Union[torch.Tensor, pd.Series, np.ndarray]
@@ -161,16 +161,16 @@ def inverse_scale(
161
161
162
162
163
163
class CSVSeriesIDLoader (CSVDataLoader ):
164
- def __init__ (self , series_id_col : str , main_params : dict , return_method : str , return_all : bool = True ):
164
+ def __init__ (self , series_id_col : str , main_params : dict , return_method : str , return_all = True ):
165
165
"""A data-loader for a CSV file that contains a series ID column.
166
166
167
- :param series_id_col: The id column of the series you want to forecast.
167
+ :param series_id_col: The id
168
168
:type series_id_col: str
169
169
:param main_params: The central set of parameters
170
170
:type main_params: dict
171
- :param return_method: The method of return (e.g. all series at once, one at a time, or a random sample)
171
+ :param return_method: The method of return
172
172
:type return_method: str
173
- :param return_all: Whether to return all items if set to True then __validate_data_in_df__ , defaults to True
173
+ :param return_all: Whether to return all items, defaults to True
174
174
:type return_all: bool, optional
175
175
"""
176
176
main_params1 = deepcopy (main_params )
@@ -203,7 +203,8 @@ def __init__(self, series_id_col: str, main_params: dict, return_method: str, re
203
203
print ("unique dict" )
204
204
205
205
def __validate_data__in_df (self ):
206
- """Makes sure the data in the data-frame is the proper length for each series."""
206
+ """Makes sure the data in the data-frame is the proper length for each series e
207
+ """
207
208
if self .return_all_series :
208
209
len_first = len (self .listed_vals [0 ])
209
210
print ("Length of first series is:" + str (len_first ))
@@ -230,6 +231,7 @@ def __getitem__(self, idx: int) -> Tuple[Dict, Dict]:
230
231
targ_list = {}
231
232
for va in self .listed_vals :
232
233
# We need to exclude the index column on one end and the series id column on the other
234
+
233
235
targ_start_idx = idx + self .forecast_history
234
236
idx2 = va [self .series_id_col ].iloc [0 ]
235
237
va_returned = va [va .columns .difference ([self .series_id_col ], sort = False )]
@@ -239,7 +241,8 @@ def __getitem__(self, idx: int) -> Tuple[Dict, Dict]:
239
241
targ_list [self .unique_dict [idx2 ]] = targ
240
242
return src_list , targ_list
241
243
else :
242
- raise NotImplementedError ("Current code only supports returning all the series at once at each iteration" )
244
+ raise NotImplementedError
245
+ return super ().__getitem__ (idx )
243
246
244
247
def __sample_series_id__ (idx , series_id ):
245
248
pass
@@ -264,12 +267,8 @@ def __init__(
264
267
** kwargs
265
268
):
266
269
"""
267
- A data loader for the test data and plotting code it is a subclass of CSVDataLoader.
268
- :param str df_path: The path to the CSV file you want to use (GCS compatible) or a Pandas DataFrame.
269
- :type df_path: str
270
- :param int forecast_total: The total length of the forecast.
271
- :
272
- :type forecast_total: int
270
+ :param str df_path: The path to the CSV file you want to use (GCS compatible) or a Pandas DataFrame
271
+ A data loader for the test data.
273
272
"""
274
273
if "file_path" not in kwargs :
275
274
kwargs ["file_path" ] = df_path
@@ -284,8 +283,8 @@ def __init__(
284
283
print (df_path )
285
284
self .forecast_total = forecast_total
286
285
# TODO these are antiquated delete them
287
- self .use_real_precip = use_real_precip
288
286
self .use_real_temp = use_real_temp
287
+ self .use_real_precip = use_real_precip
289
288
self .target_supplied = target_supplied
290
289
# Convert back to datetime and save index
291
290
sort_col1 = sort_column_clone if sort_column_clone else "datetime"
@@ -310,7 +309,7 @@ def __getitem__(self, idx):
310
309
historical_rows = self .df .iloc [idx : self .forecast_history + idx ]
311
310
target_idx_start = self .forecast_history + idx
312
311
# Why aren't we using these
313
- # targ_rows = self.df.ilo c [
312
+ # targ_rows = self.df.iloc [
314
313
# target_idx_start : self.forecast_total + target_idx_start
315
314
# ]
316
315
all_rows_orig = self .original_df .iloc [
@@ -320,7 +319,10 @@ def __getitem__(self, idx):
320
319
return historical_rows .float (), all_rows_orig , target_idx_start
321
320
322
321
def convert_real_batches (self , the_col : str , rows_to_convert ):
323
- """A helper function to return properly divided precip and temp values to be stacked with t forecasted cfs."""
322
+ """
323
+ A helper function to return properly divided precip and temp
324
+ values to be stacked with t forecasted cfs.
325
+ """
324
326
the_column = torch .from_numpy (rows_to_convert [the_col ].to_numpy ())
325
327
chunks = [
326
328
the_column [
@@ -333,7 +335,8 @@ def convert_real_batches(self, the_col: str, rows_to_convert):
333
335
def convert_history_batches (
334
336
self , the_col : Union [str , List [str ]], rows_to_convert : pd .DataFrame
335
337
):
336
- """A helper function to return dataframe in batches of size (history_len, num_features)
338
+ """A helper function to return dataframe in batches of
339
+ size (history_len, num_features)
337
340
338
341
Args:
339
342
the_col (str): column names
@@ -355,6 +358,10 @@ def __len__(self) -> int:
355
358
)
356
359
357
360
361
+ class TestLoaderABC (CSVTestLoader ):
362
+ pass
363
+
364
+
358
365
class AEDataloader (CSVDataLoader ):
359
366
def __init__ (
360
367
self ,
@@ -369,8 +376,9 @@ def __init__(
369
376
forecast_history = 1 ,
370
377
no_scale = True ,
371
378
sort_column = None ):
372
- """A data loader class for autoencoders. Overrides __len__ and __getitem__ from generic dataloader. Also defaults
373
- forecast_history and forecast_length to 1. Since AE will likely only use one row. Same parameters as before.
379
+ """A data loader class for autoencoders. Overrides __len__ and __getitem__ from generic dataloader.
380
+ Also defaults forecast_history and forecast_length to 1. Since AE will likely only use one row.
381
+ Same parameters as before.
374
382
375
383
:param file_path: The path to the file
376
384
:type file_path: str
@@ -589,14 +597,15 @@ def __getitem__(self, idx):
589
597
class VariableSequenceLength (CSVDataLoader ):
590
598
def __init__ (self , series_marker_column : str , csv_loader_params : Dict , pad_length = None , task = "classification" ,
591
599
n_classes = 9 + 90 ):
592
- """Enables eas(ier) loading of time-series with variable length data.
600
+ """Enables eas(ier) loading of time-series with variable length data
593
601
594
602
:param series_marker_column: The column that dealinates when an example begins and ends
595
603
:type series_marker_column: str
596
604
:param pad_length: If the specified the length to truncate sequences at or pad them till that length
597
605
:type pad_length: int
598
606
:param task: The specific task (e.g. classification, forecasting, auto_encode)
599
607
:type task: str
608
+
600
609
"""
601
610
super ().__init__ (** csv_loader_params )
602
611
self .pad_length = pad_length
@@ -636,7 +645,8 @@ def get_item_auto_encoder(self, idx):
636
645
return the_seq .float (), the_seq .float ()
637
646
638
647
def pad_input_data (self , sequence : int ):
639
- """Pads a sequence to a specified length."""
648
+ """Pads a sequence to a specified length.
649
+ """
640
650
if self .pad_length > sequence .shape [0 ]:
641
651
pad_dim = self .pad_length - sequence .shape [0 ]
642
652
return torch .nn .functional .pad (sequence , (0 , 0 , 0 , pad_dim ))
0 commit comments