Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: optimize resource distribution #3108

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
17 changes: 16 additions & 1 deletion integration_tests/test_framework/simple_rpc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,23 @@ def __init__(self, session: Session, url: str, method: str, timeout: int, node:
self.method = method
self.timeout = timeout
self.node = node

def __call__(self, *args, **argsn) -> Any:
retry = 3
retry_message_list = [
"Specified block header does not exist",
]
while retry > 0:
try:
return self._call(*args, **argsn)
except ReceivedErrorResponseError as e:
if e.response.message in retry_message_list:
retry -= 1
time.sleep(0.2)
continue
raise e

def _call(self, *args, **argsn) -> Any:
if argsn:
raise ValueError('json rpc 2 only supports array arguments')

Expand Down
57 changes: 48 additions & 9 deletions tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import subprocess
import os
import sys
import time
from concurrent.futures import ProcessPoolExecutor

PORT_MIN = 11000
Expand Down Expand Up @@ -106,7 +107,15 @@ def run():
sys.exit(TEST_FAILURE_ERROR_CODE)

def run_single_round(options):
TEST_SCRIPTS = []
# Add slow tests to the front of the queue
slow_tests = [
"pos/retire_param_hard_fork_test.py",
# These tests are also slow but also has a relative high resource usage
# running these tests together with heavy tests will cause node to behave abnormally
# "pubsub/eth_logs_test.py",
# "pubsub/epochs_test.py"
]
TEST_SCRIPTS = slow_tests.copy()

test_dir = os.path.dirname(os.path.realpath(__file__))

Expand All @@ -119,32 +128,62 @@ def run_single_round(options):
"pubsub",
"evm_space",
]
slow_tests = {"full_node_tests/p2p_era_test.py", "pos/retire_param_hard_fork_test.py"}
resource_heavy_tests = [
"full_node_tests/p2p_era_test.py",
"crash_test.py",
"reorg_test.py",
"crash_archive_era150_test.py",
"pos/hard_fork_test.py",
"erc20_test.py",
]

# By default, run all *_test.py files in the specified subfolders.
for subdir in test_subdirs:
subdir_path = os.path.join(test_dir, subdir)
for file in os.listdir(subdir_path):
if file.endswith("_test.py"):
rel_path = os.path.join(subdir, file)
if rel_path not in slow_tests:
if rel_path not in resource_heavy_tests and rel_path not in slow_tests:
TEST_SCRIPTS.append(rel_path)

executor = ProcessPoolExecutor(max_workers=options.max_workers)
test_results = []
pending_tasks = []

py = "python3"
if hasattr(sys, "getwindowsversion"):
py = "python"

i = 0
# Start slow tests first to avoid waiting for long-tail jobs
for script in slow_tests:
f = executor.submit(run_single_test, py, script, test_dir, i, options.port_min, options.port_max)
test_results.append((script, f))
i += 1
for script in TEST_SCRIPTS:
heavy_idx = 0
test_idx = 0
while heavy_idx < len(resource_heavy_tests) or test_idx < len(TEST_SCRIPTS):
# Check if there are any heavy tests currently running
has_pending_heavy = any(
(s in resource_heavy_tests) and not f.done()
for s, f in test_results
)

# Prioritize submitting heavy tests (when no heavy test is running)
if heavy_idx < len(resource_heavy_tests) and not has_pending_heavy:
script = resource_heavy_tests[heavy_idx]
heavy_idx += 1
elif test_idx < len(TEST_SCRIPTS):
script = TEST_SCRIPTS[test_idx]
test_idx += 1
else:
time.sleep(0.5)
continue # Wait for pending tasks to complete

# Wait until number of pending tasks drops below threshold
while len(pending_tasks) >= options.max_workers:
# Remove completed tasks
pending_tasks = [task for task in pending_tasks if not task.done()]
if len(pending_tasks) >= options.max_workers:
time.sleep(0.1) # Brief sleep to avoid CPU spinning

f = executor.submit(run_single_test, py, script, test_dir, i, options.port_min, options.port_max)
pending_tasks.append(f)
test_results.append((script, f))
i += 1

Expand Down
17 changes: 16 additions & 1 deletion tests/test_framework/simple_rpc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,23 @@ def __init__(self, session: Session, url: str, method: str, timeout: int, node:
self.method = method
self.timeout = timeout
self.node = node

def __call__(self, *args, **argsn) -> Any:
retry = 3
retry_message_list = [
"Specified block header does not exist",
]
while retry > 0:
try:
return self._call(*args, **argsn)
except ReceivedErrorResponseError as e:
if e.response.message in retry_message_list:
retry -= 1
time.sleep(0.2)
continue
raise e

def _call(self, *args, **argsn) -> Any:
if argsn:
raise ValueError('json rpc 2 only supports array arguments')

Expand Down