-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathextract_features.py
229 lines (162 loc) · 7.16 KB
/
extract_features.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# Extract features from MPU dataset:
# https://sites.google.com/view/mobile-phone-use-dataset
import sys
import os
import time
import argparse
import shutil
import pandas as pd
# import numpy as np
from tqdm import tqdm
from datetime import datetime, timedelta
from multiprocessing import Pool, cpu_count
import traceback
##############################################################################################
# FILE PATHS
##############################################################################################
GLOBAL_PATH = "." # The global path that holds the INPUT_DATA and OUTPUT_DATA folders. e.g. '~/Users/<user-name>/Project'.
INPUT_DATA = "mobile_phone_use" # The MPU dataset folder
OUTPUT_DATA = "features" # The folder that will store the extracted features per user
PARTICIPANTS_INFO_FILENAME = "pinfo.csv" # Should be 'pinfo.csv'
GT_COLUMN = "Esm_TiredAwake" # Your Ground Truth column of the Esm sensor event.
##############################################################################################
# METHODS
##############################################################################################
def extract_features(pinfo, df, ff):
# This is the place where the feature extraction should happen (per user).
# 'pinfo' is a dict with the participant's information.
# 'df' is the dataframe with the participant's mobile phone use data.
# 'ff' is an empty dataframe that will store the features. GT_COLUMN is also copied.
# This example uses the 'Acc' sensor (Accelerometer) and extracts the
# average acceleration of the last measurement before each Esm event.
df['Acc_Avg'].fillna(method='ffill', inplace=True)
ff['ft_last_acc'] = df['Acc_Avg']
def extract_features_per_core(params):
# unpack parameters
pinfo, input_data_path, output_data_path = params
try:
# prepare paths
input_file_path = os.path.join(input_data_path, "%s.csv" % pinfo.uuid)
output_file_path = os.path.join(output_data_path, "%s.csv" % pinfo.uuid)
# read data file
df = pd.read_csv(input_file_path, low_memory=False)
# init ff (features dataframe) and set GT
ff = df[df.sensor_id == "Esm"][[GT_COLUMN]].copy().dropna()
# extract features using pinfo, from df to ff.
extract_features(pinfo, df, ff)
# sort columns
sorted_columns = sort_columns(ff.columns)
ff = ff[sorted_columns]
# save into csv
ff.to_csv(output_file_path, index=False)
# Status ok
return True
except KeyboardInterrupt:
return False
except Exception:
e = sys.exc_info()[0]
msg = sys.exc_info()[1]
tb = sys.exc_info()[2]
message = "exception: %s '%s'" % (e, msg)
tqdm.write(message)
traceback.print_tb(tb)
return False
def extract_all_features(pdf, input_data_path, output_data_path, nproc):
# choose between single core vs multi-core
if nproc <= 1:
for _, pinfo in tqdm(pdf.iterrows(), total=len(pdf), desc='User', ncols=80):
# pack params and extract features
params = (pinfo, input_data_path, output_data_path)
status = extract_features_per_core(params)
# check for KeyboardInterrupt
if status is False:
raise KeyboardInterrupt
else:
# init pool with nproc
pool = Pool(processes=nproc)
# prepare parameters
params = [(pinfo, input_data_path, output_data_path) for _, pinfo in pdf.iterrows()]
try:
for status in tqdm(pool.imap_unordered(extract_features_per_core, params), total=len(pdf), desc='User', ncols=80):
# check for KeyboardInterrupt
if status is False:
raise KeyboardInterrupt
except KeyboardInterrupt:
pool.terminate()
def sort_columns(columns):
# sort columns by name, GT_COLUMN should be the last column
columns = sorted(list(columns))
# columns.insert(0, columns.pop(columns.index("u2"))) # first
columns.append(columns.pop(columns.index(GT_COLUMN))) # last
return columns
def ensure_path(path, clean=False):
if clean and os.path.exists(path):
shutil.rmtree(path)
if not os.path.exists(path):
os.makedirs(path)
def parse_arguments(args):
parser = argparse.ArgumentParser(description="extract features (using 'ft_' as a column prefix)")
parser.add_argument('-p', '--parallel', dest='parallel', type=int, nargs=1, metavar='nproc', default=[0],
help='execute in parallel, nproc=number of processors to use.')
parser.add_argument('-sd', '--sudden-death', dest='sudden_death', action='store', nargs='*', metavar='uuid',
help='sudden death: use particular uuid to test the features extraction; either specify the uuid or omit it and it reads out a default one from code (ie. u000)')
parsed = vars(parser.parse_args(args))
return parsed
##############################################################################################
# MAIN
##############################################################################################
def main(args):
'''main function'''
# detemine number of CPUs
nproc = args['parallel'][0]
if nproc <= 0:
# automatically selects about 80% of the available CPUs
cpus = cpu_count()
nproc = int(cpus * 0.8 + 0.5)
else:
nproc = min([nproc, cpu_count()])
print("using %d CPUs" % nproc)
# get paths
global_path = os.path.expanduser(GLOBAL_PATH)
input_data_path = os.path.join(global_path, INPUT_DATA, "data")
output_data_path = os.path.join(global_path, OUTPUT_DATA)
# clean and ensure dir
ensure_path(output_data_path, clean=True)
# load pinfo.csv
pinfo_path = os.path.join(global_path, INPUT_DATA, PARTICIPANTS_INFO_FILENAME)
print(pinfo_path)
if not os.path.isfile(pinfo_path):
sys.exit("Participant's info file with name '%s' does not exist." % PARTICIPANTS_INFO_FILENAME)
# load json file
with open(pinfo_path) as data_file:
pdf = pd.read_csv(data_file)
# determine sudden_death
sudden_death = args['sudden_death']
if sudden_death is not None:
if len(sudden_death) == 0:
sudden_death = ['u000'] # default user
# apply sudden_death
pdf = pdf[pdf.uuid.isin(sudden_death)]
# begin feature extraction
extract_all_features(pdf, input_data_path, output_data_path, nproc)
if __name__ == '__main__':
# parse args
args = parse_arguments(sys.argv[1:])
try:
# track time
print("Started at: %s" % (datetime.now()))
start_time = time.time()
# call main
main(args)
# save and report elapsed time
elapsed_time = time.time() - start_time
print("\nSuccess! Duration: %s" % str(timedelta(seconds=int(elapsed_time))))
except(KeyboardInterrupt):
sys.exit("Interrupted: Exiting on request.")
except(SystemExit):
e = sys.exc_info()[0]
msg = sys.exc_info()[1]
tb = sys.exc_info()[2]
message = "exception: %s '%s'" % (e, msg)
tqdm.write(message)
traceback.print_tb(tb)