This repository has been archived by the owner on Jan 3, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathrunserver.py
executable file
·100 lines (82 loc) · 4.04 KB
/
runserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# /ai4e_api_tools has been added to the PYTHONPATH, so we can reference those
# libraries directly.
import json
from flask import Flask, request, abort
from ai4e_app_insights_wrapper import AI4EAppInsights
from ai4e_service import APIService
from sas_blob import SasBlob
from PIL import Image
import tf_detector
from io import BytesIO
from os import getenv
import uuid
import sys
import numpy as np
print("Creating Application")
ACCEPTED_CONTENT_TYPES = ['image/png', 'application/octet-stream', 'image/jpeg']
blob_access_duration_hrs = 1
app = Flask(__name__)
# Use the AI4EAppInsights library to send log messages.
log = AI4EAppInsights()
# Use the APIService to executes your functions within a logging trace, supports long-running/async functions,
# handles SIGTERM signals from AKS, etc., and handles concurrent requests.
with app.app_context():
ai4e_service = APIService(app, log)
# Load the model
# The model was copied to this location when the container was built; see ../Dockerfile
model_path = '/app/tf_iNat_api/frozen_inference_graph.pb'
detection_graph = tf_detector.load_model(model_path)
# Define a function for processing request data, if appliciable. This function loads data or files into
# a dictionary for access in your API function. We pass this function as a parameter to your API setup.
def process_request_data(request):
return_values = {'image_bytes': None}
try:
# Attempt to load the body
return_values['image_bytes'] = BytesIO(request.data)
except:
log.log_error('Unable to load the request data') # Log to Application Insights
return return_values
# POST, async API endpoint example
@ai4e_service.api_async_func(
api_path = '/detect',
methods = ['POST'],
request_processing_function = process_request_data, # This is the data process function that you created above.
maximum_concurrent_requests = 5, # If the number of requests exceed this limit, a 503 is returned to the caller.
content_types = ACCEPTED_CONTENT_TYPES,
content_max_length = 10000, # In bytes
trace_name = 'post:detect')
def detect(*args, **kwargs):
print('runserver.py: detect() called, generating detections...')
image_bytes = kwargs.get('image_bytes')
taskId = kwargs.get('taskId')
# Update the task status, so the caller knows it has been accepted and is running.
ai4e_service.api_task_manager.UpdateTaskStatus(taskId, 'running - generate_detections')
try:
image = tf_detector.open_image(image_bytes)
boxes, scores, clsses, image = tf_detector.generate_detections(
detection_graph, image)
ai4e_service.api_task_manager.UpdateTaskStatus(taskId, 'rendering boxes')
# image is modified in place
# here confidence_threshold is hardcoded, but you can ask that as a input from the request
tf_detector.render_bounding_boxes(
boxes, scores, clsses, image, confidence_threshold=0.5)
print('runserver.py: detect(), rendering and saving result image...')
# save the PIL Image object to a ByteIO stream so that it can be written to blob storage
output_img_stream = BytesIO()
image.save(output_img_stream, format='jpeg')
output_img_stream.seek(0)
sas_blob_helper = SasBlob()
# Create a unique name for a blob container
container_name = str(uuid.uuid4()).replace('-','')
# Create a writable sas container and return its url
sas_url = sas_blob_helper.create_writable_container_sas(
getenv('STORAGE_ACCOUNT_NAME'), getenv('STORAGE_ACCOUNT_KEY'), container_name, blob_access_duration_hrs)
# Write the image to the blob
sas_blob_helper.write_blob(sas_url, 'detect_output.jpg', output_img_stream)
ai4e_service.api_task_manager.CompleteTask(taskId, 'completed - output written to: ' + sas_url)
print('runserver.py: detect() finished.')
except:
log.log_exception(sys.exc_info()[0], taskId)
ai4e_service.api_task_manager.FailTask(taskId, 'failed: ' + str(sys.exc_info()[0]))
if __name__ == '__main__':
app.run()