Skip to content

Commit 9b4eeca

Browse files
committed
Added unit tests for a comprehensive coverage of all scheduling disciplines in ns/scheduler.
- Added reusable SimPy helpers (CaptureSink, StoreSpy, ScriptedServer) plus enqueue utility to drive deterministic scheduler scenarios and monitor sampling (tests/scheduler/test_servers.py:13-91). - Expanded DRR coverage for weighted fairness, head-of-line deficit handling, and zero-downstream backpressure hooks (tests/scheduler/test_servers.py:93-138). - Added SP scenarios proving strict-priority ordering and zero-buffer propagation of upstream callbacks (tests/ scheduler/test_servers.py:140-184). - Validated WFQ scheduling by warming the active set to check finish-time ordering and by exercising the zero-buffer + zero-downstream path with explicit upstream acknowledgements (tests/scheduler/test_servers.py:186-237). - Covered both Virtual Clock selection logic and ServerMonitor sampling (with/without in-service packets) via scripted queue traces (tests/scheduler/test_servers.py:239-310).
1 parent 75ce0f5 commit 9b4eeca

File tree

1 file changed

+310
-0
lines changed

1 file changed

+310
-0
lines changed

tests/scheduler/test_servers.py

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
import pytest
2+
3+
simpy = pytest.importorskip("simpy")
4+
5+
from ns.packet.packet import Packet
6+
from ns.scheduler.drr import DRRServer
7+
from ns.scheduler.monitor import ServerMonitor
8+
from ns.scheduler.sp import SPServer
9+
from ns.scheduler.virtual_clock import VirtualClockServer
10+
from ns.scheduler.wfq import WFQServer
11+
12+
13+
class CaptureSink:
14+
"""Collect packets forwarded by a scheduler."""
15+
16+
def __init__(self, env, auto_ack=False):
17+
self.env = env
18+
self.auto_ack = auto_ack
19+
self.observed = []
20+
21+
def put(self, packet, upstream_update=None, upstream_store=None):
22+
record = {
23+
"time": self.env.now,
24+
"packet": packet,
25+
"upstream_update": upstream_update,
26+
"upstream_store": upstream_store,
27+
}
28+
self.observed.append(record)
29+
if self.auto_ack and upstream_update is not None:
30+
upstream_update(packet)
31+
32+
33+
class StoreSpy:
34+
"""Minimal object that records when `get` is called."""
35+
36+
def __init__(self):
37+
self.get_calls = 0
38+
39+
def get(self):
40+
self.get_calls += 1
41+
42+
43+
class ScriptedServer:
44+
"""Drive deterministic queue sizes for ServerMonitor tests."""
45+
46+
def __init__(self, env, flow_id, script):
47+
self.env = env
48+
self.flow_id = flow_id
49+
self.queue_sizes = {flow_id: 0}
50+
self.queue_bytes = {flow_id: 0}
51+
self.current_packet = None
52+
env.process(self._drive(script))
53+
54+
def _drive(self, script):
55+
for entry in script:
56+
yield self.env.timeout(entry["delay"])
57+
self.queue_sizes[self.flow_id] = entry["size"]
58+
self.queue_bytes[self.flow_id] = entry["bytes"]
59+
self.current_packet = entry["packet"]
60+
61+
def packet_in_service(self):
62+
return self.current_packet
63+
64+
def byte_size(self, queue_id):
65+
return self.queue_bytes[queue_id]
66+
67+
def size(self, queue_id):
68+
return self.queue_sizes[queue_id]
69+
70+
def all_flows(self):
71+
return [self.flow_id]
72+
73+
74+
def make_packet(packet_id, flow_id, size=1500, time=0.0):
75+
return Packet(
76+
time=time,
77+
size=size,
78+
packet_id=packet_id,
79+
flow_id=flow_id,
80+
src="src",
81+
dst="dst",
82+
)
83+
84+
85+
def enqueue(env, server, packet, delay=0.0, **kwargs):
86+
def _send():
87+
yield env.timeout(delay)
88+
yield server.put(packet, **kwargs)
89+
90+
env.process(_send())
91+
92+
93+
def test_drr_weighted_round_robin_serves_proportionally():
94+
env = simpy.Environment()
95+
server = DRRServer(env, rate=1e9, weights=[1, 2])
96+
sink = CaptureSink(env)
97+
server.out = sink
98+
99+
enqueue(env, server, make_packet(0, flow_id=0))
100+
enqueue(env, server, make_packet(1, flow_id=1))
101+
enqueue(env, server, make_packet(2, flow_id=1))
102+
103+
env.run(until=0.01)
104+
assert [entry["packet"].flow_id for entry in sink.observed] == [0, 1, 1]
105+
106+
107+
def test_drr_large_packet_waits_until_enough_deficit():
108+
env = simpy.Environment()
109+
server = DRRServer(env, rate=1e9, weights=[1, 1])
110+
sink = CaptureSink(env)
111+
server.out = sink
112+
113+
enqueue(env, server, make_packet(0, flow_id=0, size=3000))
114+
enqueue(env, server, make_packet(1, flow_id=1, size=1500))
115+
116+
env.run(until=0.01)
117+
assert [entry["packet"].flow_id for entry in sink.observed] == [1, 0]
118+
119+
120+
def test_drr_zero_downstream_buffer_provides_upstream_hooks():
121+
env = simpy.Environment()
122+
server = DRRServer(
123+
env,
124+
rate=1e9,
125+
weights={0: 1},
126+
zero_downstream_buffer=True,
127+
)
128+
sink = CaptureSink(env)
129+
server.out = sink
130+
131+
enqueue(env, server, make_packet(0, flow_id=0))
132+
env.run(until=0.01)
133+
134+
record = sink.observed[0]
135+
assert record["upstream_update"] == server.update
136+
assert record["upstream_store"] == server.stores[0]
137+
assert server.byte_size(0) == 0
138+
139+
140+
def test_sp_serves_highest_priority_first():
141+
env = simpy.Environment()
142+
priorities = {0: 1, 1: 3}
143+
server = SPServer(env, rate=1e9, priorities=priorities)
144+
sink = CaptureSink(env)
145+
server.out = sink
146+
147+
enqueue(env, server, make_packet(0, flow_id=0))
148+
enqueue(env, server, make_packet(1, flow_id=1))
149+
150+
env.run(until=0.01)
151+
assert [entry["packet"].flow_id for entry in sink.observed] == [1, 0]
152+
153+
154+
def test_sp_zero_buffer_calls_upstream_callbacks():
155+
env = simpy.Environment()
156+
server = SPServer(
157+
env,
158+
rate=1e9,
159+
priorities=[1],
160+
zero_buffer=True,
161+
)
162+
sink = CaptureSink(env)
163+
server.out = sink
164+
165+
upstream_store = StoreSpy()
166+
upstream_invoked = {"count": 0}
167+
168+
def upstream_update(packet):
169+
upstream_invoked["count"] += 1
170+
upstream_invoked["last"] = packet.packet_id
171+
172+
enqueue(
173+
env,
174+
server,
175+
make_packet(0, flow_id=0),
176+
upstream_update=upstream_update,
177+
upstream_store=upstream_store,
178+
)
179+
env.run(until=0.01)
180+
181+
assert upstream_store.get_calls == 1
182+
assert upstream_invoked["count"] == 1
183+
assert upstream_invoked["last"] == sink.observed[0]["packet"].packet_id
184+
185+
186+
def test_wfq_orders_by_finish_time_and_resets_virtual_time():
187+
env = simpy.Environment()
188+
weights = {0: 1, 1: 2}
189+
server = WFQServer(env, rate=1e9, weights=weights)
190+
sink = CaptureSink(env)
191+
server.out = sink
192+
193+
# Warm up the scheduler so later packets see a non-empty active set.
194+
enqueue(env, server, make_packet(0, flow_id=0, size=1000), delay=0.0)
195+
196+
enqueue(env, server, make_packet(1, flow_id=1, size=1000), delay=1e-6)
197+
enqueue(env, server, make_packet(2, flow_id=0, size=1000), delay=1.2e-6)
198+
enqueue(env, server, make_packet(3, flow_id=0, size=1000), delay=1.4e-6)
199+
200+
env.run(until=0.05)
201+
observed = [entry["packet"].flow_id for entry in sink.observed]
202+
assert observed[1:] == [1, 0, 0]
203+
assert server.vtime == 0.0
204+
205+
206+
def test_wfq_zero_downstream_buffer_and_zero_buffer_callbacks():
207+
env = simpy.Environment()
208+
server = WFQServer(
209+
env,
210+
rate=1e9,
211+
weights=[1],
212+
zero_buffer=True,
213+
zero_downstream_buffer=True,
214+
)
215+
sink = CaptureSink(env, auto_ack=True)
216+
server.out = sink
217+
218+
upstream_store = StoreSpy()
219+
upstream_invoked = {"count": 0}
220+
221+
def upstream_update(packet):
222+
upstream_invoked["count"] += 1
223+
upstream_invoked["last"] = packet.packet_id
224+
225+
enqueue(
226+
env,
227+
server,
228+
make_packet(0, flow_id=0),
229+
upstream_update=upstream_update,
230+
upstream_store=upstream_store,
231+
)
232+
233+
env.run(until=0.02)
234+
assert upstream_store.get_calls == 1
235+
assert upstream_invoked["count"] == 1
236+
assert sink.observed[0]["upstream_update"] == server.update
237+
238+
239+
def test_virtual_clock_prefers_smaller_vtick_classes():
240+
env = simpy.Environment()
241+
vticks = {0: 2.0, 1: 1.0}
242+
server = VirtualClockServer(env, rate=1e9, vticks=vticks)
243+
sink = CaptureSink(env)
244+
server.out = sink
245+
246+
enqueue(env, server, make_packet(0, flow_id=0, size=1000))
247+
enqueue(env, server, make_packet(1, flow_id=1, size=1000))
248+
249+
env.run(until=0.02)
250+
assert [entry["packet"].flow_id for entry in sink.observed] == [1, 0]
251+
252+
253+
def test_virtual_clock_zero_downstream_buffer_propagates_callbacks():
254+
env = simpy.Environment()
255+
vticks = [1.0]
256+
server = VirtualClockServer(
257+
env,
258+
rate=1e9,
259+
vticks=vticks,
260+
zero_downstream_buffer=True,
261+
)
262+
sink = CaptureSink(env, auto_ack=True)
263+
server.out = sink
264+
265+
enqueue(env, server, make_packet(0, flow_id=0))
266+
env.run(until=0.02)
267+
268+
record = sink.observed[0]
269+
assert record["upstream_update"] == server.update
270+
assert record["upstream_store"] == server.store
271+
272+
273+
def test_server_monitor_counts_queue_and_service_packets():
274+
env = simpy.Environment()
275+
monitored_packet = make_packet(0, flow_id=7, size=200)
276+
script = [
277+
{"delay": 0.25, "size": 1, "bytes": 100, "packet": monitored_packet},
278+
{"delay": 0.5, "size": 0, "bytes": 0, "packet": None},
279+
]
280+
server = ScriptedServer(env, flow_id=7, script=script)
281+
monitor = ServerMonitor(
282+
env,
283+
server,
284+
dist=lambda: 0.5,
285+
pkt_in_service_included=True,
286+
)
287+
288+
env.run(until=1.1)
289+
assert monitor.sizes[7] == [2, 0]
290+
assert monitor.byte_sizes[7] == [300, 0]
291+
292+
293+
def test_server_monitor_ignores_service_when_not_requested():
294+
env = simpy.Environment()
295+
monitored_packet = make_packet(0, flow_id=3, size=120)
296+
script = [
297+
{"delay": 0.25, "size": 2, "bytes": 256, "packet": monitored_packet},
298+
{"delay": 0.5, "size": 0, "bytes": 0, "packet": None},
299+
]
300+
server = ScriptedServer(env, flow_id=3, script=script)
301+
monitor = ServerMonitor(
302+
env,
303+
server,
304+
dist=lambda: 0.5,
305+
pkt_in_service_included=False,
306+
)
307+
308+
env.run(until=1.1)
309+
assert monitor.sizes[3] == [2, 0]
310+
assert monitor.byte_sizes[3] == [256, 0]

0 commit comments

Comments
 (0)