forked from harvard-acc/DeepRecSys
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
52 changed files
with
7,337 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
|
||
from __future__ import absolute_import, division, print_function, unicode_literals | ||
|
||
from utils.utils import cli | ||
from functools import reduce | ||
import operator | ||
|
||
from inferenceEngine import inferenceEngine | ||
from accelInferenceEngine import accelInferenceEngine | ||
from loadGenerator import loadGenerator | ||
|
||
from multiprocessing import Process, Queue | ||
import csv | ||
import sys | ||
import os | ||
import time | ||
import numpy as np | ||
|
||
import signal | ||
|
||
def DeepRecSys(): | ||
print("Running DeepRecSys") | ||
|
||
# ###################################################################### | ||
# Get and print command line arguments for this experiment | ||
# ###################################################################### | ||
args = cli() | ||
|
||
arg_keys = [str(key) for key in vars(args)] | ||
print("============================================================") | ||
print("DeepRecSys configuration") | ||
for key in arg_keys: | ||
print(key, getattr(args, key)) | ||
print("============================================================") | ||
|
||
if args.queue == True: | ||
|
||
if args.model_accel: | ||
args.inference_engines += 1 | ||
|
||
print("[DeepRecSys] total inference engine ", args.inference_engines) | ||
|
||
# Setup single request Queue and multiple response queues | ||
requestQueue = Queue(maxsize=1024) | ||
accelRequestQueue = Queue(maxsize=32) | ||
pidQueue = Queue() | ||
responseQueues = [] | ||
inferenceEngineReadyQueue = Queue() | ||
|
||
for _ in range(args.inference_engines): | ||
responseQueues.append(Queue()) | ||
|
||
# Create load generator to mimic per-server load | ||
loadGeneratorReturnQueue = Queue() | ||
DeepRecLoadGenerator = Process( target = loadGenerator, | ||
args = (args, requestQueue, loadGeneratorReturnQueue, inferenceEngineReadyQueue, pidQueue, accelRequestQueue) | ||
) | ||
|
||
# Create backend inference engines that consume requests from load | ||
# generator | ||
DeepRecEngines = [] | ||
for i in range(args.inference_engines): | ||
if (args.model_accel) and (i == (args.inference_engines - 1)): | ||
p = Process( target = accelInferenceEngine, | ||
args = (args, accelRequestQueue, i, responseQueues[i], inferenceEngineReadyQueue) | ||
) | ||
else: | ||
p = Process( target = inferenceEngine, | ||
args = (args, requestQueue, i, responseQueues[i], inferenceEngineReadyQueue) | ||
) | ||
p.daemon = True | ||
DeepRecEngines.append(p) | ||
|
||
# Start all processes | ||
for i in range(args.inference_engines): | ||
DeepRecEngines[i].start() | ||
|
||
DeepRecLoadGenerator.start() | ||
|
||
responses_list = [] | ||
inference_engines_finished = 0 | ||
|
||
response_sets = {} | ||
response_latencies = [] | ||
final_response_latencies = [] | ||
|
||
request_granularity = int(args.req_granularity) | ||
|
||
while inference_engines_finished != args.inference_engines: | ||
for i in range(args.inference_engines): | ||
if (responseQueues[i].qsize()): | ||
response = responseQueues[i].get() | ||
|
||
# Process responses to determine what the running tail latency is and | ||
# send new batch-size to loadGenerator | ||
if response == None: | ||
inference_engines_finished += 1 | ||
print("Joined ", inference_engines_finished, " inference engines") | ||
sys.stdout.flush() | ||
else: | ||
key = (response.epoch, response.batch_id, response.exp_packet) | ||
if key in response_sets.keys(): # Response already in the list | ||
curr_val = response_sets[key] | ||
|
||
val = (response.arrival_time, | ||
response.inference_end_time, | ||
response.total_sub_batches) | ||
|
||
arr = min(curr_val[0], val[0]) | ||
inf = max(curr_val[1], val[1]) | ||
remain = curr_val[2]-1 | ||
response_sets[ (response.epoch, response.batch_id, response.exp_packet) ] = (arr, inf, remain) | ||
else: # New response! | ||
arr = response.arrival_time | ||
inf = response.inference_end_time | ||
remain = response.total_sub_batches - 1 | ||
|
||
response_sets[ (response.epoch, response.batch_id, response.exp_packet) ] = (arr, inf, remain) | ||
|
||
# If this request is over then we can go ahead and compute the | ||
# request latency in order to guide batch-scheduler | ||
if remain == 0: | ||
response_latencies.append( inf - arr ) | ||
|
||
# If we are done finding the optimum batching and accelerator | ||
# partitioning threshold then we log the response latency to | ||
# measure packets later | ||
if not response.exp_packet: | ||
final_response_latencies.append( inf - arr ) | ||
|
||
if len(response_latencies) % request_granularity == 0: | ||
print("Running latency: ", np.percentile(response_latencies[int(-1 * request_granularity):], 95) * 1000.) | ||
sys.stdout.flush() | ||
# Add | ||
pidQueue.put ( np.percentile(response_latencies[int(-1 * request_granularity):], 95) * 1000. ) | ||
|
||
# Add responses to final list | ||
responses_list.append(response.__dict__) | ||
|
||
print("Finished runing over the inference engines") | ||
sys.stdout.flush() | ||
|
||
log_dir = reduce(lambda x, y: x + y, args.log_file.split("/")[:-1]) | ||
|
||
if not os.path.exists(log_dir): | ||
os.makedirs(log_dir) | ||
|
||
with open(args.log_file, "w") as f: | ||
for response in responses_list: | ||
f.writelines(str(response) + "\n") | ||
|
||
# Join/end all processes | ||
DeepRecLoadGenerator.join() | ||
total_requests = loadGeneratorReturnQueue.get() | ||
|
||
cpu_sub_requests = total_requests[0] | ||
cpu_requests = total_requests[1] | ||
accel_requests = total_requests[2] | ||
|
||
agg_requests = cpu_sub_requests + accel_requests | ||
|
||
print("Exiting DeepRecSys after printing ", len(responses_list), "/" , agg_requests) | ||
|
||
print("CPU sub requests ", cpu_sub_requests, "/" , agg_requests) | ||
print("CPU requests ", cpu_requests) | ||
print("Accel requests ", accel_requests, "/" , agg_requests) | ||
|
||
meas_qps_responses = list(filter(lambda x: (not x['exp_packet']) and (x['sub_id'] == 0), responses_list)) | ||
|
||
initial_time = meas_qps_responses[0]['inference_end_time'] | ||
end_time = meas_qps_responses[-1]['inference_end_time'] | ||
|
||
print("Measured QPS: ", (len(meas_qps_responses)) / (end_time - initial_time)) | ||
print("Measured p95 tail-latency: ", np.percentile(final_response_latencies, 95) * 1000., " ms") | ||
print("Measured p99 tail-latency: ", np.percentile(final_response_latencies, 99) * 1000., " ms") | ||
|
||
sys.stdout.flush() | ||
|
||
for i in range(args.inference_engines): | ||
DeepRecEngines[i].terminate() | ||
|
||
else: # No queue, run DeepRecSys in standalone mode | ||
inferenceEngine(args) | ||
|
||
return | ||
|
||
|
||
if __name__=="__main__": | ||
DeepRecSys() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
|
||
from __future__ import absolute_import, division, print_function, unicode_literals | ||
|
||
import numpy as np | ||
|
||
from utils.packets import ServiceResponse | ||
from utils.utils import debugPrint | ||
|
||
# data generation | ||
import threading | ||
from multiprocessing import Queue | ||
from accelerator.predict_execution import * | ||
|
||
import time | ||
import sys | ||
|
||
|
||
def accelInferenceEngine(args, | ||
requestQueue=None, | ||
engine_id=None, | ||
responseQueue=None, | ||
inferenceEngineReadyQueue=None): | ||
|
||
### some basic setup ### | ||
np.random.seed(args.numpy_rand_seed) | ||
np.set_printoptions(precision=args.print_precision) | ||
|
||
if requestQueue == None: | ||
print("If you want to run Accel in isolation please use the DeepRecBench/models/ directory directly") | ||
sys.stdout.flush() | ||
sys.exit() | ||
|
||
else: | ||
inferenceEngineReadyQueue.put(True) | ||
|
||
model_name = args.model_name | ||
if not (model_name in ["wnd", "rm1", "rm2", "rm3", "din", "dien", "mtwnd"]): | ||
print("Model not found in ones supported") | ||
sys.stdout.flush() | ||
sys.exit() | ||
|
||
accel_data = GPU_Data(root_dir = args.accel_root_dir, hardware="nvidia_gtx_1080_ti") | ||
|
||
while True: | ||
debugPrint(args, "Accel", "Trying to pull request") | ||
request = requestQueue.get() | ||
debugPrint(args, "Accel", "Pulled request") | ||
|
||
if request is None: | ||
debugPrint(args, "Accel", "Sending final done signal") | ||
responseQueue.put(None) | ||
debugPrint(args, "Accel", "Sent final done signal") | ||
return | ||
|
||
batch_id = request.batch_id | ||
batch_size = request.batch_size | ||
|
||
start_time = time.time() | ||
|
||
# Model Accel execution time | ||
# For GPUs this based on real measured hardware performance (inference and dataloading) | ||
# based on accelerator/predict_execution.py | ||
eval_time = predict_time(model_name, batch_size, accel_data) | ||
time.sleep(eval_time / 1000. ) # Eval time is in milli-seconds | ||
|
||
end_time = time.time() | ||
|
||
response = ServiceResponse( consumer_id = engine_id, | ||
epoch = request.epoch, | ||
batch_id = request.batch_id, | ||
batch_size = request.batch_size, | ||
arrival_time = request.arrival_time, | ||
process_start_time = start_time, | ||
queue_end_time = end_time, | ||
inference_end_time = end_time, | ||
out_batch_size = request.batch_size, | ||
total_sub_batches = request.total_sub_batches, | ||
exp_packet = request.exp_packet, | ||
sub_id = request.sub_id, | ||
) | ||
|
||
debugPrint(args, "Accel", "Sending response back") | ||
responseQueue.put(response) | ||
debugPrint(args, "Accel", "Sent response back") | ||
|
||
return | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# Instructions | ||
|
||
For each accelerator model (e.g., `nvidia_gtx_1080_ti`), | ||
|
||
1. Run `generate_data.py` to generate accelerator usage raw data (you should see a populated `raw_data` directory if successful) | ||
2. Run `predict_execution.py` to test accelerator execution time modeling (you should see a populated `characterization_data` directory if successful) |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
from __future__ import print_function | ||
import os, sys | ||
|
||
def sweep_cmd(config_file, model_file, output_file): | ||
|
||
num_epochs = 100 | ||
num_batches = 4 | ||
|
||
rt_config_gpu = "--inference_only --inter_op_workers 1 --caffe2_net_type async_dag --use_accel " | ||
model_config = "--config_file ../../models/configs/" + config_file + " " | ||
|
||
with open(output_file, 'w') as outfile: | ||
sys.stdout = outfile | ||
|
||
# Sweep Batch Size from {2^0 = 1, ... ,2^14 = 16384} | ||
for x in range(6): | ||
n = 4**x | ||
|
||
data_config = "--nepochs " + str(num_epochs) + " --num_batches " + str(num_batches) + " --mini_batch_size " + str(n) + " --max_mini_batch_size " + str(n) | ||
gpu_command = "python ../../models/" + model_file + " " + rt_config_gpu + model_config + data_config | ||
|
||
print("--------------------Running ("+model_file+") GPU Test with Batch Size " + str(n) +"--------------------\n") | ||
outfile.write(os.popen(gpu_command).read()+"\n") | ||
|
||
sys.stdout = sys.__stdout__ | ||
|
||
|
||
|
||
if __name__ == "__main__": | ||
if not os.path.exists("raw_data"): | ||
os.mkdir("raw_data") | ||
|
||
sweep_cmd("wide_and_deep.json", "wide_and_deep.py", "raw_data/results_wnd.txt") | ||
sweep_cmd("dlrm_rm1.json", "dlrm_s_caffe2.py", "raw_data/results_rm1.txt") | ||
sweep_cmd("dlrm_rm2.json", "dlrm_s_caffe2.py", "raw_data/results_rm2.txt") | ||
sweep_cmd("dlrm_rm3.json", "dlrm_s_caffe2.py", "raw_data/results_rm3.txt") | ||
sweep_cmd("ncf.json", "ncf.py", "raw_data/results_ncf.txt") | ||
sweep_cmd("mtwnd.json", "multi_task_wnd.py", "raw_data/results_mtwnd.txt") | ||
sweep_cmd("din.json", "din.py", "raw_data/results_din.txt") | ||
sweep_cmd("dien.json", "dien.py", "raw_data/results_dien.txt") |
Oops, something went wrong.