Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/netsim.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:

- name: Fetch and build iroh
run: |
git clone https://github.com/n0-computer/iroh.git
git clone --depth 1 https://github.com/n0-computer/iroh.git
cd iroh
cargo build --release --all-features -p iroh-relay -p iroh-dns-server
cargo build --release --all-features -p iroh --examples
Expand Down Expand Up @@ -84,7 +84,7 @@ jobs:
c='${{ steps.detect_comment_config.outputs.NETSIM_CONFIG }}'
if [ -z "${c}" ];
then
sudo python3 main.py sims/iroh
sudo python3 main.py --max-workers=4 sims/iroh
else
echo $c >> custom_sim.json
sudo python3 main.py custom_sim.json
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/netsim_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:

- name: Fetch and build iroh
run: |
git clone https://github.com/n0-computer/iroh.git
git clone --depth 1 https://github.com/n0-computer/iroh.git
cd iroh
cargo build --release --all-features -p iroh-relay -p iroh-dns-server
cargo build --release --all-features -p iroh --examples
Expand All @@ -60,7 +60,7 @@ jobs:
cd netsim
sudo kill -9 $(pgrep ovs) || true
sudo mn --clean
sudo python3 main.py --integration sims/integration
sudo python3 main.py --integration --max-workers=4 sims/integration

- name: Setup Environment (PR)
if: ${{ github.event_name == 'pull_request' }}
Expand Down
100 changes: 64 additions & 36 deletions netsim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from parsing.netsim import process_logs, process_integration_logs
from sniffer.sniff import Sniffer
from sniffer.process import run_viz
from util import cleanup_tmp_dirs, eject
from util import cleanup_tmp_dirs, eject, FAILED_TESTS, write_failure_summary

TIMEOUT = 60 * 5

Expand All @@ -36,33 +36,34 @@ def setup_env_vars(prefix, node_name, temp_dir, node_env, debug=False):


def parse_node_params(node, prefix, node_params, runner_id):
"""Parse parameters from node logs with validation."""
"""Parse parameters from node logs with validation using fast polling."""
parsed_params = {}
wait_time = node.get("wait", 1)
max_wait = node.get("wait", 1)
parser_type = node["param_parser"]
expected_nodes = []
poll_interval = 0.2

# Wait for parameters to be available
for _ in range(wait_time):
time.sleep(1)
for i in range(int(node["count"])):
node_name = f'{node["name"]}_{i}_r{runner_id}'
expected_nodes.append(node_name)
log_file = f"logs/{prefix}__{node_name}.txt"
expected_nodes = [
f'{node["name"]}_{i}_r{runner_id}'
for i in range(int(node["count"]))
]

max_iterations = max(1, int(max_wait / poll_interval))
for iteration in range(max_iterations):
for node_name in expected_nodes:
if node_name in parsed_params:
continue

log_file = f"logs/{prefix}__{node_name}.txt"
if not os.path.exists(log_file):
error(f"Warning: Log file not found: {log_file}")
continue

try:
with open(log_file, "r") as f:
lines = f.readlines()
for idx, line in enumerate(lines):
# Parser 1: Simple ticket (used in lossy/standard sims)
if parser_type == "iroh_ticket" and line.startswith("All-in-one ticket"):
parsed_params[node_name] = line[len("All-in-one ticket: "):].strip()
break
# Parser 2: Endpoint with addresses (used in iroh/integration sims)
if parser_type == "iroh_endpoint_with_addrs" and line.startswith("Endpoint id:"):
if idx + 1 >= len(lines):
break
Expand All @@ -82,7 +83,10 @@ def parse_node_params(node, prefix, node_params, runner_id):
except Exception as e:
error(f"Error parsing parameters from {log_file}: {e}")

# Validate that all expected parameters were found
if all(n in parsed_params for n in expected_nodes):
break
time.sleep(poll_interval)

missing_params = [n for n in expected_nodes if n not in parsed_params]
if missing_params:
error("\n" + "=" * 80 + "\n")
Expand All @@ -96,19 +100,17 @@ def parse_node_params(node, prefix, node_params, runner_id):
return parsed_params


def terminate_processes(p_box):
def terminate_processes(p_box, prefix):
"""Gracefully terminate processes, then forcefully kill if needed."""
for p, cmd in p_box:
error(f"Terminating process: {p.pid} {cmd[:100]}\n")
for node_name, p, cmd in p_box:
error(f"Terminating [{prefix}__{node_name}]: {cmd[:80]}\n")
p.terminate()

# Wait for processes to terminate gracefully
time.sleep(2)
time.sleep(0.5)

# Force kill any remaining processes
for p, cmd in p_box:
for node_name, p, cmd in p_box:
if p.poll() is None:
error(f"Force killing hung process: {p.pid} {cmd[:100]}\n")
error(f"Force killing [{prefix}__{node_name}]: {cmd[:80]}\n")
p.kill()


Expand All @@ -117,11 +119,12 @@ def monitor_short_processes(p_short_box, prefix):
process_errors = []
start_time = time.time()

# Monitor processes until all complete or timeout
for _ in range(TIMEOUT):
time.sleep(1)
# Monitor processes until all complete or timeout (poll every 200ms)
max_polls = TIMEOUT * 5
for _ in range(max_polls):
if not any(p.poll() is None for (_, p, _) in p_short_box):
break
time.sleep(0.2)

elapsed_time = time.time() - start_time

Expand All @@ -133,7 +136,7 @@ def monitor_short_processes(p_short_box, prefix):
error(f"\nProcess timed out after {elapsed_time:.1f}s for node {node_name}\n")
error(f"Command was: {cmd}\n")
p.terminate()
time.sleep(1)
time.sleep(0.2)
if p.poll() is None:
error(f"Force killing timed out process for node {node_name}\n")
p.kill()
Expand Down Expand Up @@ -302,7 +305,7 @@ def run_case(nodes, runner_id, prefix, args, debug=False, visualize=False):
if "process" in node and node["process"] == "short":
p_short_box.append((node_name, p, cmd))
else:
p_box.append((p, cmd))
p_box.append((node_name, p, cmd))

if "param_parser" in node:
node_params.update(parse_node_params(node, prefix, node_params, runner_id))
Expand All @@ -314,26 +317,38 @@ def run_case(nodes, runner_id, prefix, args, debug=False, visualize=False):
process_errors = monitor_short_processes(p_short_box, prefix)
if process_errors:
error("\n" + "=" * 80 + "\n")
error("PROCESS ERRORS DETECTED:\n")
error(f"PROCESS ERRORS DETECTED in {prefix}:\n")
error("=" * 80 + "\n")
for err_msg in process_errors:
error(err_msg + "\n")
error("=" * 80 + "\n")
failure_entry = {"prefix": prefix, "errors": []}
for err_msg in process_errors:
if err_msg.startswith("TIMEOUT:"):
node = err_msg.split("'")[1]
reason = f"timeout after {TIMEOUT}s"
elif err_msg.startswith("FAILED:"):
node = err_msg.split("'")[1]
code_start = err_msg.find("code ") + 5
code_end = err_msg.find(".", code_start)
reason = f"exit code {err_msg[code_start:code_end]}"
else:
node = "unknown"
reason = err_msg[:50]
failure_entry["errors"].append({"node": node, "reason": reason})
FAILED_TESTS.append(failure_entry)
if args.integration:
eject(nodes, prefix, runner_id, temp_dirs)
else:
error("WARNING: Continuing despite errors (not in integration mode)\n")

terminate_processes(p_box)
terminate_processes(p_box, prefix)
cleanup_tmp_dirs(temp_dirs)
return (net, sniffer)


def run(case, runner_id, name, skiplist, args):
def run(case, runner_id, name, args):
prefix = name + "__" + case["name"]
if prefix in skiplist:
print("Skipping:", prefix)
return
nodes = case["nodes"]
viz = False
if "visualize" in case:
Expand All @@ -356,13 +371,25 @@ def run(case, runner_id, name, skiplist, args):


def run_parallel(cases, name, skiplist, args, max_workers=4):
# Filter skipped cases before chunking for optimal parallelism
filtered = []
for case in cases:
prefix = name + "__" + case["name"]
if prefix in skiplist:
print("Skipping:", prefix)
else:
filtered.append(case)

if not filtered:
return

with concurrent.futures.ThreadPoolExecutor() as executor:
chunks = [cases[i : i + max_workers] for i in range(0, len(cases), max_workers)]
chunks = [filtered[i : i + max_workers] for i in range(0, len(filtered), max_workers)]
for chunk in chunks:
futures = []
r = []
for i, case in enumerate(chunk):
futures.append(executor.submit(run, case, i, name, skiplist, args))
futures.append(executor.submit(run, case, i, name, args))
for future in concurrent.futures.as_completed(futures):
try:
rx = future.result()
Expand Down Expand Up @@ -435,4 +462,5 @@ def run_parallel(cases, name, skiplist, args, max_workers=4):
print(f"Start testing: %s\n" % path)
run_parallel(config["cases"], name, skiplist, args, args.max_workers)

write_failure_summary()
print("Done")
31 changes: 29 additions & 2 deletions netsim/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
import os

FAILED_TESTS = []


def write_failure_summary(output_path="logs/failed_tests.txt"):
if not FAILED_TESTS:
return
with open(output_path, "w") as f:
for failure in FAILED_TESTS:
prefix = failure["prefix"]
f.write(f"FAILED: {prefix}\n")
for node_err in failure["errors"]:
node_name = node_err["node"]
reason = node_err["reason"]
log_path = f"logs/{prefix}__{node_name}.txt"
f.write(f" - {node_name}: {reason}\n")
f.write(f" Log: {log_path}\n")
if os.path.isfile(log_path):
f.write(" --- Last 10 lines ---\n")
try:
with open(log_path, "r") as lf:
lines = lf.readlines()
for line in lines[-10:]:
f.write(f" {line.rstrip()}\n")
except Exception:
f.write(" [failed to read log]\n")
f.write("\n")


def logs_on_error(nodes, prefix, runner_id, code=1, message=None):
node_counts = {}
Expand Down Expand Up @@ -29,7 +56,7 @@ def cleanup_tmp_dirs(temp_dirs):
temp_dir.cleanup()


def eject(nodes, prefix, runner_id, temp_dirs):
logs_on_error(nodes, prefix, runner_id)
def eject(_nodes, prefix, _runner_id, temp_dirs):
write_failure_summary()
cleanup_tmp_dirs(temp_dirs)
raise Exception("Netsim run failed: %s" % prefix)
Loading