Skip to content

Commit 51cc60e

Browse files
committed
tools: add pull-lab poller
Introduce example_pull_lab.py to watch KernelCI job events and automatically fetch artifacts for pull-labs demo jobs. The tool caches downloads, reconstructs an initrd with modules, and launches a QEMU x86 VM so developers can quickly reproduce jobs locally. Signed-off-by: Denys Fedoryshchenko <[email protected]>
1 parent ff280ce commit 51cc60e

File tree

1 file changed

+210
-0
lines changed

1 file changed

+210
-0
lines changed

tools/example_pull_lab.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
#!/usr/bin/env python3
2+
"""
3+
This example will retrieve the latest events from the KernelCI API
4+
and print them to the console.
5+
It will filter only completed kernel builds, limit to 100 events per request,
6+
and retrieve corresponding nodes with artifacts.
7+
"""
8+
import argparse
9+
import tempfile
10+
import requests
11+
import json
12+
import sys
13+
import os
14+
import time
15+
import hashlib
16+
import subprocess
17+
18+
19+
# This is staging server: "https://staging.kernelci.org:9000/latest"
20+
# For production use "https://api.kernelci.org/latest/"
21+
BASE_URI = "https://staging.kernelci.org:9000/latest"
22+
EVENTS_PATH = "/events"
23+
24+
# Start from the beginning of time, but you might implement
25+
# saving last processed timestamp to a file or database
26+
timestamp = "1970-01-01T00:00:00.000000"
27+
28+
29+
"""
30+
kind:
31+
32+
There are a few different kinds:
33+
34+
* checkout: a new git tree checkout to be tested. Maestro frequently cuts
35+
new tree checkout from tree it is subscribed to. See 'config/pipeline.yaml'
36+
* kbuild: a new kernel build for a given config and arch
37+
* job: the execution of a test suite
38+
* test: the execution of a test inside a job
39+
40+
41+
state:
42+
43+
In this example we track state=done to get an event when Maestro is ready to
44+
provide all the information about the node. Eg for checkout it will provide
45+
the commit hash to test and for builds the location of the kernel binaries built.
46+
"""
47+
48+
49+
def pollevents(timestamp, kind):
50+
url = (
51+
BASE_URI
52+
+ EVENTS_PATH
53+
+ f"?state=done&kind={kind}&limit=1000&recursive=true&from={timestamp}"
54+
)
55+
print(url)
56+
response = requests.get(url)
57+
response.raise_for_status()
58+
return response.json()
59+
60+
61+
def retrieve_job_definition(url):
62+
print(f"Retrieving job definition from: {url}")
63+
response = requests.get(url)
64+
response.raise_for_status()
65+
return response.json()
66+
67+
68+
def check_cache(url):
69+
# if file exists in _cache/ with name as md5 of url, return path
70+
md5 = hashlib.md5(url.encode()).hexdigest()
71+
cache_path = f"_cache/{md5}"
72+
if os.path.exists(cache_path):
73+
print(f"Cache hit for {url}")
74+
return cache_path
75+
print(f"Cache miss for {url}")
76+
return None
77+
78+
79+
def store_in_cache(url, filepath):
80+
# I have slow internet, so i need to cache downloads
81+
md5 = hashlib.md5(url.encode()).hexdigest()
82+
cache_path = f"_cache/{md5}"
83+
os.makedirs("_cache", exist_ok=True)
84+
os.system(f"cp {filepath} {cache_path}")
85+
print(f"Stored {url} in cache at {cache_path}")
86+
87+
88+
def download_artifact(url, dest):
89+
cached_path = check_cache(url)
90+
if cached_path:
91+
# If we have a cached version, use it
92+
print(f"Using cached version of {url}")
93+
# copy cached file to dest
94+
os.system(f"cp {cached_path} {dest}")
95+
return
96+
print(f"Downloading artifact from: {url} to {dest}")
97+
response = requests.get(url, stream=True)
98+
response.raise_for_status()
99+
with open(dest, "wb") as f:
100+
for chunk in response.iter_content(chunk_size=8192):
101+
f.write(chunk)
102+
# Store downloaded file in cache
103+
store_in_cache(url, dest)
104+
105+
106+
def launch_x86_vm(kernel, ramdisk):
107+
# This is a placeholder function to launch the x86 VM using QEMU
108+
# In a real implementation, you would use subprocess to call QEMU with appropriate arguments
109+
print(f"Launching x86 VM with kernel: {kernel} and ramdisk: {ramdisk}")
110+
# Example command (not executed here):
111+
cmd = f"qemu-system-x86_64 -kernel {kernel} -initrd {ramdisk} -m 1024 -nographic -append 'console=ttyS0'"
112+
print(f"Executing command: {cmd}")
113+
os.system(cmd)
114+
115+
116+
def prepare_x86(artifacts):
117+
# create temporary directory for all qemu files
118+
with tempfile.TemporaryDirectory() as tmpdir:
119+
kernel_url = artifacts.get("kernel")
120+
modules_url = artifacts.get("modules")
121+
ramdisk_url = artifacts.get("ramdisk")
122+
kernel = f"{tmpdir}/kernel"
123+
modules = f"{tmpdir}/modules.tar.xz"
124+
ramdisk = f"{tmpdir}/ramdisk.cpio.gz"
125+
download_artifact(kernel_url, kernel)
126+
download_artifact(modules_url, modules)
127+
download_artifact(ramdisk_url, ramdisk)
128+
modules_dir = os.path.join(tmpdir, "modules_temp")
129+
os.makedirs(modules_dir, exist_ok=True)
130+
subprocess.run(["tar", "-xf", modules, "-C", modules_dir], check=True)
131+
132+
modules_cpio = os.path.join(tmpdir, "modules.cpio")
133+
subprocess.run(
134+
f"(cd {modules_dir} && find . | cpio -o --format=newc) > {modules_cpio}",
135+
shell=True,
136+
check=True,
137+
)
138+
139+
subprocess.run(
140+
["gzip", "-d", "-f", ramdisk], check=True
141+
) # produces ramdisk.cpio in same dir
142+
original_cpio = os.path.join(tmpdir, "ramdisk.cpio")
143+
144+
merged_cpio = os.path.join(tmpdir, "new_ramdisk.cpio")
145+
# Simple concatenation preserves all original entries then adds modules entries
146+
with open(merged_cpio, "wb") as out_f:
147+
for part in (original_cpio, modules_cpio):
148+
with open(part, "rb") as in_f:
149+
out_f.write(in_f.read())
150+
151+
# 5. Compress merged archive
152+
subprocess.run(["gzip", "-f", merged_cpio], check=True)
153+
ramdisk = f"{merged_cpio}.gz"
154+
print(
155+
f"Launching x86 VM with kernel: {kernel}, modules: {modules}, ramdisk: {ramdisk}"
156+
)
157+
print("To quit the VM, use Ctrl-A X in the QEMU window.")
158+
print("Press Enter to continue...")
159+
input()
160+
launch_x86_vm(kernel, ramdisk)
161+
162+
163+
def main():
164+
global timestamp
165+
166+
parser = argparse.ArgumentParser(description="Listen to events in Maestro.")
167+
args = parser.parse_args()
168+
while True:
169+
try:
170+
events = pollevents(timestamp, "job")
171+
if len(events) == 0:
172+
print("No new events, sleeping for 30 seconds")
173+
time.sleep(30)
174+
continue
175+
print(f"Got {len(events)} events")
176+
for event in events:
177+
data = event.get("data", {}).get("data", {})
178+
if (
179+
data.get("platform") == "qemu"
180+
and data.get("runtime", {}) == "pull-labs-demo"
181+
):
182+
# print(json.dumps(event, indent=2))
183+
# retrieve data.node.artifacts.job_definition
184+
node = event.get("node", {})
185+
artifacts = node.get("artifacts", {})
186+
job_definition_url = artifacts.get("job_definition", "")
187+
# validate if job_definition is valid url
188+
if not job_definition_url.startswith("http"):
189+
print(f"Invalid job_definition URL: {job_definition_url}")
190+
continue
191+
print(f"Valid job_definition URL: {job_definition_url}")
192+
jobdata = retrieve_job_definition(job_definition_url)
193+
# we must have inside artifacts and inside artifacts kernel,modules,ramdisk
194+
job_artifacts = jobdata.get("artifacts", {})
195+
if all(
196+
key in job_artifacts for key in ["kernel", "modules", "ramdisk"]
197+
):
198+
print("Job definition contains all required artifacts")
199+
prepare_x86(job_artifacts)
200+
201+
# print(json.dumps(data2, indent=2))
202+
# print(json.dumps(event, indent=2))
203+
timestamp = event["timestamp"]
204+
except requests.exceptions.RequestException as e:
205+
print(f"Error: {e}")
206+
sys.exit(1)
207+
208+
209+
if __name__ == "__main__":
210+
main()

0 commit comments

Comments
 (0)