Skip to content

Commit f07ae18

Browse files
committed
pw_poller: add local socket for patch series injection
Create a simple local control socket to inject series into the testing queue. Example use: echo "1011459" | nc -U ./poller.sock or echo "12345 net-next;67890 bpf-next" | nc -U ./poller.sock This should help us easily manually correct missed series as well as series missed due to crashes and restarts. Signed-off-by: Jakub Kicinski <[email protected]>
1 parent c005288 commit f07ae18

File tree

1 file changed

+92
-6
lines changed

1 file changed

+92
-6
lines changed

pw_poller.py

Lines changed: 92 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import json
1010
import os
1111
import shutil
12+
import socket
1213
import time
1314
import queue
1415
from typing import Dict
@@ -81,6 +82,9 @@ def __init__(self, config) -> None:
8182
listmodname = config.get('list', 'module', fallback='netdev')
8283
self.list_module = import_module(listmodname)
8384

85+
self._local_sock = None
86+
self._start_lock_sock(config)
87+
8488
def init_state_from_disk(self) -> None:
8589
try:
8690
with open('poller.state', 'r') as f:
@@ -149,7 +153,7 @@ def series_determine_tree(self, s: PwSeries) -> str:
149153

150154
return ret
151155

152-
def _process_series(self, pw_series) -> None:
156+
def _process_series(self, pw_series, force_tree=None) -> None:
153157
s = PwSeries(self._pw, pw_series)
154158

155159
log("Series info",
@@ -160,10 +164,16 @@ def _process_series(self, pw_series) -> None:
160164
log(p['name'], "")
161165
log_end_sec()
162166

163-
if not s['received_all']:
164-
raise IncompleteSeries
167+
if force_tree:
168+
comment = f"Force tree {force_tree}"
169+
s.tree_name = force_tree
170+
s.tree_mark_expected = None
171+
s.tree_marked = True
172+
else:
173+
comment = self.series_determine_tree(s)
174+
if not s['received_all']:
175+
raise IncompleteSeries
165176

166-
comment = self.series_determine_tree(s)
167177
s.need_async = self.list_module.series_needs_async(s)
168178
if s.need_async:
169179
comment += ', async'
@@ -178,11 +188,85 @@ def _process_series(self, pw_series) -> None:
178188
core.write_tree_selection_result(self.result_dir, s, comment)
179189
core.mark_done(self.result_dir, s)
180190

181-
def process_series(self, pw_series) -> None:
191+
def process_series(self, pw_series, force_tree=None) -> None:
182192
log_open_sec(f"Checking series {pw_series['id']} with {pw_series['total']} patches")
183193
try:
184-
self._process_series(pw_series)
194+
self._process_series(pw_series, force_tree)
195+
finally:
196+
log_end_sec()
197+
198+
def _start_lock_sock(self, config) -> None:
199+
socket_path = config.get('poller', 'local_sock_path', fallback=None)
200+
if not socket_path:
201+
return
202+
203+
if os.path.exists(socket_path):
204+
os.unlink(socket_path)
205+
206+
self._local_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
207+
self._local_sock.setblocking(False)
208+
self._local_sock.bind(socket_path)
209+
self._local_sock.listen(5)
210+
211+
log(f"Socket listener started on {socket_path}", "")
212+
213+
def _check_local_sock(self) -> None:
214+
if not self._local_sock:
215+
return
216+
217+
try:
218+
conn, _ = self._local_sock.accept()
219+
except BlockingIOError:
220+
return
221+
222+
log_open_sec("Processing local socket connection")
223+
try:
224+
data = b""
225+
while True:
226+
chunk = conn.recv(4096)
227+
data += chunk
228+
if len(chunk) < 4096:
229+
break
230+
231+
if data:
232+
data = data.decode("utf-8")
233+
series_ids = []
234+
items = data.split(";")
235+
for item in items:
236+
item = item.strip()
237+
if not item:
238+
continue
239+
240+
# We accept "series [tree]; series [tree]; ..."
241+
parts = item.rsplit(" ", 1)
242+
if len(parts) == 2:
243+
tree = parts[1].strip()
244+
else:
245+
tree = None
246+
try:
247+
s_id = int(parts[0].strip())
248+
series_ids.append((tree, s_id))
249+
log("Processing", series_ids[-1])
250+
except ValueError:
251+
log("Invalid number in tuple", item)
252+
continue
253+
254+
for tree, series_id in series_ids:
255+
try:
256+
pw_series = self._pw.get("series", series_id)
257+
self.process_series(pw_series, force_tree=tree)
258+
conn.sendall(f"OK: {series_id}\n".encode("utf-8"))
259+
except Exception as e:
260+
log("Error processing series", str(e))
261+
conn.sendall(f"ERROR: {series_id}: {e}\n".encode("utf-8"))
262+
finally:
263+
log_end_sec()
264+
else:
265+
conn.sendall(b"DONE\n")
266+
except Exception as e:
267+
log("Error processing socket request", str(e))
185268
finally:
269+
conn.close()
186270
log_end_sec()
187271

188272
def run(self, life) -> None:
@@ -210,6 +294,8 @@ def run(self, life) -> None:
210294
# shouldn't have had this event at all though
211295
pass
212296

297+
self._check_local_sock()
298+
213299
while not self._done_queue.empty():
214300
s = self._done_queue.get()
215301
log(f"Testing complete for series {s['id']}", "")

0 commit comments

Comments
 (0)