-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
328 lines (262 loc) · 12.5 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
import datetime
import logging
import os
import augmentation_retrieval
import data_controller
import upload_controller
import vector_retrieval
from flask import Flask, request
from flask import jsonify
from flask_cors import CORS
from flask_swagger_ui import get_swaggerui_blueprint
from werkzeug.utils import secure_filename
from bias_evaluation import evaluation_controller
from debiasing import debiasing_controller
''' RestAPI Configuration'''
# FLASK, CORS & Logging configuration
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'vec', 'vocab', 'vectors'}
MAX_CONTENT_LENGTH = 500 * 1024 * 1024
app = Flask(__name__)
CORS(app)
#cors = CORS(app, resources={r"/api/*": {"origins": "*"}})
app.secret_key = "secret key"
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['ALLOWED_EXTENSIONS'] = ALLOWED_EXTENSIONS
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
'''Swagger UI Configuration'''
SWAGGER_URL = '/swagger'
API_URL = '/static/swagger.json'
SWAGGERUI_BLUEPRINT = get_swaggerui_blueprint(SWAGGER_URL, API_URL, config={'app_name': "DEBIE"})
app.register_blueprint(SWAGGERUI_BLUEPRINT, url_prefix=SWAGGER_URL)
'''Logging Configuration'''
# logging.basicConfig(filename="logfile.log", level=logging.INFO)
# logging.info("APP: APP started at " + str(datetime.datetime.now()))
# print("logging configured")
'''API Endpoints'''
# API-Connection Test
@app.route('/REST/', methods=['GET'])
def test():
return 'CONNECTION WORKS'
# Retrieval of word vector representations for single words
# Example: http://127.0.0.1:5000/REST/retrieve_single_vector?embedding_space=fasttext&word=car
@app.route('/REST/vectors/single', methods=['GET'])
def retrieve_single_vector():
logging.info("APP:" + str(datetime.datetime.now()) + " Retrieve single vector is called")
bar = request.args.to_dict()
response, status_code = vector_retrieval.retrieve_vector('single', None, bar)
return response, status_code
# Retrieval of word vector representations for a list of words
@app.route('/REST/vectors/multiple', methods=['POST'])
def retrieve_multiple_vectors():
logging.info("APP: " + str(datetime.datetime.now()) + " Retrieve multiple vectors is called")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = vector_retrieval.retrieve_vector('multiple', content, bar)
return response, status_code
# Retrieves four augmentations for a word
@app.route('/REST/augmentations/single', methods=['GET'])
def retrieve_single_augmentation():
logging.info("APP: " + str(datetime.datetime.now()) + " Retrieve single augmentation is called")
bar = request.args.to_dict()
response, status_code = augmentation_retrieval.retrieve_augmentations('single', None, bar)
return response, status_code
# Retrieves 4 augmentations for a list of words
@app.route('/REST/augmentations/multiple', methods=['POST'])
def retrieve_multiple_augmentations():
logging.info("APP: " + str(datetime.datetime.now()) + " Retrieve multiple augmentations is called")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = augmentation_retrieval.retrieve_augmentations('multiple', content, bar)
return response, status_code
# Evaluates a bias specification with all implemented evaluation methods
@app.route('/REST/bias-evaluation/all', methods=['POST'])
def bias_evaluations_all():
logging.info("APP: " + str(datetime.datetime.now()) + " Bias Evaluation with ALL scores started")
# print("APP: " + str(datetime.datetime.now()) + " Bias Evaluation with ALL scores started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = evaluation_controller.evaluation('all', content, bar)
return response, status_code
# Evaluates a bias specification with the Embedding Coherence Test (ECT)
@app.route('/REST/bias-evaluation/ect', methods=['POST'])
def bias_evaluations_ect():
logging.info("APP: " + str(datetime.datetime.now()) + " Bias Evaluation with ECT scores started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = evaluation_controller.evaluation('ect', content, bar)
return response, status_code
# Evaluates a bias specification with the Bias Analogy Test (BAT)
@app.route('/REST/bias-evaluation/bat', methods=['POST'])
def bias_evaluations_bat():
logging.info("APP: " + str(datetime.datetime.now()) + " Bias Evaluation with BAT scores started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = evaluation_controller.evaluation('bat', content, bar)
return response, status_code
# Evaluates a bias specification with the Word Embedding Association Test (WEAT)
@app.route('/REST/bias-evaluation/weat', methods=['POST'])
def bias_evaluations_weat():
logging.info("APP: " + str(datetime.datetime.now()) + " Bias Evaluation with WEAT scores started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = evaluation_controller.evaluation('weat', content, bar)
return response, status_code
# Evaluates a bias specification with K-Means++ clustering
@app.route('/REST/bias-evaluation/kmeans', methods=['POST'])
def bias_evaluations_kmeans():
logging.info("APP: " + str(datetime.datetime.now()) + " Bias Evaluation with KMEANS scores started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = evaluation_controller.evaluation('kmeans', content, bar)
return response, status_code
# Evaluates a bias specification with SVM-Classifier
@app.route('/REST/bias-evaluation/svm', methods=['POST'])
def bias_evaluations_svm():
logging.info("APP: " + str(datetime.datetime.now()) + " Bias Evaluation with SVM-Classifier scores started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = evaluation_controller.evaluation('svm', content, bar)
return response, status_code
#
@app.route('/REST/bias-evaluation/simlex', methods=['POST'])
def bias_evaluations_simlex():
logging.info("APP: " + str(datetime.datetime.now()) + " Semantic Quality Test SimLex started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = evaluation_controller.evaluation('simlex', content, bar)
return response, status_code
#
@app.route('/REST/bias-evaluation/wordsim', methods=['POST'])
def bias_evaluations_wordsim():
logging.info("APP: " + str(datetime.datetime.now()) + " Semantic Quality Test WordSim started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = evaluation_controller.evaluation('wordsim', content, bar)
return response, status_code
# General Bias-Direction Debiasing of a bias specifiication returning values
@app.route('/REST/debiasing/gbdd', methods=['POST'])
def debiasing_gbdd():
logging.info("APP: " + str(datetime.datetime.now()) + " GBDD Debiasing started")
# print("APP: " + str(datetime.datetime.now()) + " GBDD Debiasing started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = debiasing_controller.debiasing('gbdd', content, bar)
return response, status_code
# Bias Analogy Model debiasing of a bias specifiication returning values
@app.route('/REST/debiasing/bam', methods=['POST'])
def debiasing_bam():
logging.info("APP: " + str(datetime.datetime.now()) + " BAM Debiasing started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = debiasing_controller.debiasing('bam', content, bar)
return response, status_code
# Debiasing of bias specifications using GBDD and BAM, returning values
@app.route('/REST/debiasing/gbddxbam', methods=['POST'])
def debiasing_gbdd_bam():
logging.info("APP: " + str(datetime.datetime.now()) + " GBDDxBAM Debiasing started")
content = request.get_json()
bar = request.args.to_dict()
response, status_code = debiasing_controller.debiasing('gbddxbam', content, bar)
return response, status_code
# Debiasing of bias specifications using BAM and GBDD, returning values in full size
@app.route('/REST/debiasing/bamxgbdd', methods=['POST'])
def debiasing_bam_gbdd():
logging.info("APP: " + str(datetime.datetime.now()) + " BAMxGBDD Debiasing started")
content = request.get_json()
bar = request.args.to_dict()
# print(content)
# print(bar)
response, status_code = debiasing_controller.debiasing('bamxgbdd', content, bar)
return response, status_code
# Upload of complete embedding spaces
@app.route('/REST/uploads/embedding-spaces', methods=['POST'])
def upload_embedding_space():
logging.info("APP: Receiving file from upload " + str(datetime.datetime.now()))
# print('Receiving file from upload')
if 'vectorFile' in request.files:
file = request.files['vectorFile']
if file.filename == '':
resp = jsonify({'message': 'No file selected for uploading'})
resp.status_code = 402
upload_controller.uploaded_binary = ''
return resp
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
resp = jsonify({'message': 'File successfully uploaded'})
upload_controller.uploaded_binary = 'false'
resp.status_code = 201
return resp
else:
resp = jsonify({'message': 'Allowed file types are txt, vec, vector or vocab'})
resp.status_code = 400
upload_controller.uploaded_binary = ''
return resp
if 'vocab' and 'vecs' in request.files:
vocab = request.files['vocab']
vecs = request.files['vecs']
if vocab.filename and vecs.filename == '':
resp = jsonify({'message': 'No files selected for uploading'})
resp.status_code = 402
upload_controller.uploaded_binary = ''
return resp
if vocab.filename and allowed_file(vocab.filename) and vecs.filename and allowed_file(vecs.filename):
vocab_filename = secure_filename(vocab.filename)
vocab.save(os.path.join(app.config['UPLOAD_FOLDER'], vocab_filename))
vecs_filename = secure_filename(vecs.filename)
vecs.save(os.path.join(app.config['UPLOAD_FOLDER'], vecs_filename))
resp = jsonify({'message': 'Files successfully uploaded'})
upload_controller.uploaded_binary = 'true'
resp.status_code = 201
return resp
else:
resp = jsonify({'message': 'Allowed file types are txt, vec, vector or vocab'})
resp.status_code = 401
upload_controller.uploaded_binary = ''
return resp
if 'vectorFile' and ('vocab' and 'vecs') not in request.files:
resp = jsonify({'message': 'No file(s) part of the request'})
resp.status_code = 400
return resp
@app.route('/REST/uploads/initialize', methods=['GET'])
def initialize_uploaded_embeddings():
logging.info("APP: " + str(datetime.datetime.now()) + " Initializing uploaded file(s)")
bar = request.args.to_dict()
# print(upload_controller.uploaded_binary)
if upload_controller.uploaded_binary == 'true':
vocab = bar['vocab']
vecs = bar['vecs']
data_controller.load_binary_uploads(vocab, vecs)
resp = jsonify({'message': 'INITIALIZED BINARY VOCAB AND VEC FILE SUCCESSFULLY'})
resp.status_code = 200
return resp, resp.status_code
if upload_controller.uploaded_binary == 'false':
file = bar['file']
data_controller.load_dict_uploaded_file(file)
resp = jsonify({'message': 'INITIALIZED VECTOR FILE SUCCESSFULLY'})
resp.status_code = 200
return resp, resp.status_code
else:
resp = jsonify({'message': 'NO UPLOADED FILE(S) FOUND'})
resp.status_code = 404
return resp, resp.status_code
@app.route('/REST/uploads/delete', methods=['DELETE'])
def delete_uploaded_file():
bar = request.args.to_dict()
filename = bar['file']
path = 'uploads/' + filename
logging.info("APP: " + str(datetime.datetime.now()) + " Deleting uploaded file")
try:
os.remove(path)
resp = jsonify({'message': 'REMOVED FILE SUCCESFULLY'})
except FileNotFoundError:
resp = jsonify({'message': 'FILE NOT FOUND'})
return resp, 404
return resp, 200
# Check if uploaded file-name is accepted
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
if __name__ == '__main__':
app.run()