33import json
44import time
55import random
6+ import threading
67import requests
8+ from concurrent .futures import ThreadPoolExecutor , as_completed
79
810from validate import send_cypher_query , send_trapi_query
911
@@ -22,7 +24,8 @@ def save_results(results, output_path):
2224 p_out .write (json .dumps (results , indent = 4 ))
2325
2426
25- def run_queries_for_endpoint (endpoint_name , url , performance_spec , results , iterations , output_path ):
27+ def run_queries_for_endpoint (endpoint_name , url , performance_spec , results , iterations , output_path ,
28+ save_lock = None ):
2629 """Run all queries in performance_spec against a single endpoint URL."""
2730 print (f'Running performance analysis for: { endpoint_name } ({ url } )' )
2831 query_count = 0
@@ -77,8 +80,12 @@ def run_queries_for_endpoint(endpoint_name, url, performance_spec, results, iter
7780 average = sum (success_durations ) / len (success_durations ) if success_durations else "N/A"
7881 print (f'Average time for { query_name } to { endpoint_name } , { spec_name } : { average } ' )
7982 if query_count % 10 == 0 :
80- print (f'Saving intermediate results ({ query_count } queries completed)...' )
81- save_results (results , output_path )
83+ print (f'Saving intermediate results ({ query_count } queries completed for { endpoint_name } )...' )
84+ if save_lock :
85+ with save_lock :
86+ save_results (results , output_path )
87+ else :
88+ save_results (results , output_path )
8289
8390
8491def run_performance_analysis (deployments_to_validate = None , performance_spec = None , iterations = 3 ,
@@ -92,10 +99,12 @@ def run_performance_analysis(deployments_to_validate=None, performance_spec=None
9299 os .makedirs ('./performance_results' , exist_ok = True )
93100 output_path = f'./performance_results/performance_analysis_results_{ random .randrange (100000 )} .json'
94101
102+ save_lock = threading .Lock ()
103+
104+ # Build list of (endpoint_name, url) pairs to run in parallel
105+ endpoint_tasks = []
95106 if endpoints :
96- for endpoint_name , url in endpoints .items ():
97- run_queries_for_endpoint (endpoint_name , url , performance_spec ,
98- plater_performance_results , iterations , output_path )
107+ endpoint_tasks = list (endpoints .items ())
99108 else :
100109 graph_deployment_spec_path = os .path .join (os .path .dirname (__file__ ), 'deployment_spec.yaml' )
101110 with open (graph_deployment_spec_path ) as graph_deployment_spec_file :
@@ -106,8 +115,23 @@ def run_performance_analysis(deployments_to_validate=None, performance_spec=None
106115 if not deployments_to_validate or deployment_env in deployments_to_validate :
107116 for plater in performance_spec :
108117 url = automat_url + plater + "/" if "localhost" not in automat_url else automat_url
109- run_queries_for_endpoint (deployment_env , url , performance_spec ,
110- plater_performance_results , iterations , output_path )
118+ endpoint_tasks .append ((deployment_env , url ))
119+
120+ with ThreadPoolExecutor (max_workers = len (endpoint_tasks ) or 1 ) as executor :
121+ futures = {
122+ executor .submit (
123+ run_queries_for_endpoint , name , url , performance_spec ,
124+ plater_performance_results , iterations , output_path , save_lock
125+ ): name
126+ for name , url in endpoint_tasks
127+ }
128+ for future in as_completed (futures ):
129+ name = futures [future ]
130+ try :
131+ future .result ()
132+ print (f'Completed all queries for: { name } ' )
133+ except Exception as e :
134+ print (f'Error running queries for { name } : { e } ' )
111135
112136 save_results (plater_performance_results , output_path )
113137
@@ -136,12 +160,15 @@ def run_performance_analysis(deployments_to_validate=None, performance_spec=None
136160 # or pass TRAPI endpoints directly (no deployment spec needed)
137161 trapi_endpoints = {
138162 "neo4j_plater" : "https://robokop-automat.apps.renci.org/robokopkg/" ,
139- "memgraph_plater" : "https://automat.renci.org/robokopkg-memgraph/" ,
163+ #"memgraph_plater": "https://automat.renci.org/robokopkg-memgraph/",
164+ "gandalf_dev" : "https://automat-dev.apps.renci.org/robokopkg/" ,
165+ "gandalf_ci" : "https://automat.ci.transltr.io/robokopkg/" ,
166+
140167 }
141168 trapi_spec = {
142- "robokopkg" : {"files" : ["./performance_queries/robokop_one_hop_trapi.jsonl" ] ,
143- "queries" : [ "robokop_small_Behavior_affects " ],
169+ "robokopkg" : {"files" : ["./performance_queries/robokop_one_hop_trapi.jsonl" ,
170+ "./performance_queries/robokop_two_hop_trapi.jsonl " ],
144171 "query_type" : "trapi" }
145172 }
146- run_performance_analysis (performance_spec = trapi_spec , endpoints = trapi_endpoints , iterations = 1 )
173+ run_performance_analysis (performance_spec = trapi_spec , endpoints = trapi_endpoints , iterations = 3 )
147174
0 commit comments