-
Notifications
You must be signed in to change notification settings - Fork 80
/
Copy pathrepoquery.py
223 lines (199 loc) · 7.47 KB
/
repoquery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import logging
import os
import re
import subprocess
from typing import Iterable, Sequence
XCPNG_YUMREPO_TMPL = """
[xcpng-{section}{suffix}]
name=xcpng - {section}{suffix}
baseurl=https://updates.xcp-ng.org/8/{version}/{section}/{rpmarch}/
gpgkey=https://xcp-ng.org/RPM-GPG-KEY-xcpng
failovermethod=priority
skip_if_unavailable=False
"""
XCPNG_YUMREPO_USER_TMPL = """
[xcpng-{section}{suffix}]
name=xcpng - {section}{suffix}
baseurl=https://koji.xcp-ng.org/repos/user/8/{version}/{section}/{rpmarch}/
gpgkey=https://xcp-ng.org/RPM-GPG-KEY-xcpng
failovermethod=priority
skip_if_unavailable=False
"""
# DNF v4 adds an implicit trailing newline to --qf format, but v5 does not
dnf_version = subprocess.check_output(['dnf', '--version'], universal_newlines=True).strip().split('.')
if int(dnf_version[0]) >= 5:
QFNL = "\n"
else:
QFNL = ""
def setup_xcpng_yum_repos(*, yum_repo_d: str, sections: Iterable[str],
bin_arch: str | None, version: str) -> None:
with open(os.path.join(yum_repo_d, "xcpng.repo"), "w") as yumrepoconf:
for section in sections:
# HACK: use USER_TMPL if section ends with a number
if section[-1].isdigit():
tmpl = XCPNG_YUMREPO_USER_TMPL
else:
tmpl = XCPNG_YUMREPO_TMPL
# binaries
if bin_arch:
block = tmpl.format(rpmarch=bin_arch,
section=section,
version=version,
suffix='',
)
yumrepoconf.write(block)
# sources
block = tmpl.format(rpmarch='Source',
section=section,
version=version,
suffix='-src',
)
yumrepoconf.write(block)
XS8_YUMREPO_TMPL = """
[xs8-{section}]
name=XS8 - {section}
baseurl=http://10.1.0.94/repos/XS8/{section}/xs8p-{section}/
failovermethod=priority
skip_if_unavailable=False
[xs8-{section}-src]
name=XS8 - {section} source
baseurl=http://10.1.0.94/repos/XS8/{section}/xs8p-{section}-source/
failovermethod=priority
skip_if_unavailable=False
"""
def setup_xs8_yum_repos(*, yum_repo_d: str, sections: Iterable[str])-> None:
with open(os.path.join(yum_repo_d, "xs8.repo"), "w") as yumrepoconf:
for section in sections:
block = XS8_YUMREPO_TMPL.format(section=section)
yumrepoconf.write(block)
DNF_BASE_CMD = None
def dnf_setup(*, dnf_conf: str, yum_repo_d: str) -> None:
global DNF_BASE_CMD
DNF_BASE_CMD = ['dnf', '--quiet',
'--releasever', 'WTF',
'--config', dnf_conf,
f'--setopt=reposdir={yum_repo_d}',
]
BINRPM_SOURCE_CACHE: dict[str, str] = {}
def rpm_source_package(rpmname: str) -> str:
return BINRPM_SOURCE_CACHE[rpmname]
def run_repoquery(args: list[str], split: bool = True) -> str | Sequence[str]:
assert DNF_BASE_CMD is not None
cmd = DNF_BASE_CMD + ['repoquery'] + args
logging.debug('$ %s', ' '.join(cmd))
output = subprocess.check_output(cmd, universal_newlines=True).strip()
logging.debug('> %s', output)
return output.split() if split else output
SRPM_BINRPMS_CACHE: dict[str, set[str]] = {} # binrpm-nevr -> srpm-nevr
def fill_srpm_binrpms_cache() -> None:
# HACK: get nevr for what dnf outputs as %{sourcerpm}
logging.debug("get epoch info for SRPMs")
args = [
'--disablerepo=*', '--enablerepo=*-src', '*',
'--qf', '%{name}-%{version}-%{release}.src.rpm,%{name}-%{evr}' + QFNL,
'--latest-limit=1',
]
SRPM_NEVR_CACHE = { # sourcerpm -> srpm-nevr
sourcerpm: nevr
for sourcerpm, nevr in (line.split(',')
for line in run_repoquery(args))
}
# binary -> source mapping
logging.debug("get binary to source mapping")
global SRPM_BINRPMS_CACHE, BINRPM_SOURCE_CACHE
args = [
'--disablerepo=*-src', '*',
'--qf', '%{name}-%{evr},%{sourcerpm}' + QFNL, # FIXME no epoch in sourcerpm, why does it work?
'--latest-limit=1',
]
BINRPM_SOURCE_CACHE = {
# packages without source are not in SRPM_NEVR_CACHE, fallback to sourcerpm
binrpm: SRPM_NEVR_CACHE.get(sourcerpm, srpm_strip_src_rpm(sourcerpm))
for binrpm, sourcerpm in (line.split(',')
for line in run_repoquery(args))
}
# reverse mapping source -> binaries
SRPM_BINRPMS_CACHE = {}
for binrpm, srpm in BINRPM_SOURCE_CACHE.items():
binrpms = SRPM_BINRPMS_CACHE.get(srpm, set())
if not binrpms:
SRPM_BINRPMS_CACHE[srpm] = binrpms
binrpms.add(binrpm)
def srpm_nevr(rpmname: str) -> str:
args = [
'--disablerepo=*', '--enablerepo=*-src',
'--qf=%{name}-%{evr}' + QFNL, # to get the epoch only when non-zero
'--latest-limit=1',
rpmname,
]
ret = run_repoquery(args)
assert ret, f"Found no SRPM named {rpmname}"
assert len(ret) == 1 # ensured by --latest-limit=1 ?
return ret[0]
# dnf insists on spitting .src.rpm names it cannot take as input itself
def srpm_strip_src_rpm(srpmname: str) -> str:
SUFFIX = ".src.rpm"
assert srpmname.endswith(SUFFIX), f"{srpmname} does not end in .src.rpm"
nrv = srpmname[:-len(SUFFIX)]
return nrv
def rpm_requires(rpmname: str) -> Sequence[str]:
args = [
'--disablerepo=*-src', # else requires of same-name SRPM are included
'--qf=%{name}-%{evr}' + QFNL, # to avoid getting the arch and explicit zero epoch
'--resolve',
'--requires', rpmname,
]
ret = run_repoquery(args)
return ret
def srpm_requires(srpmname: str) -> set[str]:
args = [
'--qf=%{name}-%{evr}' + QFNL, # to avoid getting the arch
'--resolve',
'--requires', f"{srpmname}.src",
]
ret = set(run_repoquery(args))
return ret
def srpm_binrpms(srpmname: str) -> set[str]:
ret = SRPM_BINRPMS_CACHE.get(srpmname, None)
if ret is None: # FIXME should not happen
logging.error("%r not found in cache", srpmname)
assert False
return []
logging.debug("binrpms for %s: %s", srpmname, ret)
return ret
UPSTREAM_REGEX = re.compile(r'\.el[0-9]+(_[0-9]+)?(\..*|)$')
RPM_NVR_SPLIT_REGEX = re.compile(r'^(.+)-([^-]+)-([^-]+)$')
def is_pristine_upstream(rpmname:str) -> bool:
if re.search(UPSTREAM_REGEX, rpmname):
return True
return False
def rpm_parse_nevr(nevr: str, suffix: str) -> tuple[str, str, str, str]:
"Parse into (name, epoch:version, release) stripping suffix from release"
m = re.match(RPM_NVR_SPLIT_REGEX, nevr)
assert m, f"{nevr} does not match NEVR pattern"
n, ev, r = m.groups()
if ":" in ev:
e, v = ev.split(":")
else:
e, v = "0", ev
if r.endswith(suffix):
r = r[:-len(suffix)]
return (n, e, v, r)
def all_binrpms() -> set[str]:
args = [
'--disablerepo=*-src',
'--qf=%{name}-%{evr}' + QFNL, # to avoid getting the arch
'--latest-limit=1', # only most recent for each package
'*',
]
ret = set(run_repoquery(args))
return ret
def all_srpms() -> set[str]:
args = [
'--disablerepo=*', '--enablerepo=*-src',
'--qf=%{name}-%{evr}' + QFNL, # to avoid getting the arch
'--latest-limit=1', # only most recent for each package
'*',
]
ret = set(run_repoquery(args))
return ret