-
Notifications
You must be signed in to change notification settings - Fork 479
/
Copy pathcheck_bi_aggr
executable file
·291 lines (255 loc) · 9.39 KB
/
check_bi_aggr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#!/usr/bin/env python3
# Copyright (C) 2019 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.
"""Check Checkmk BI aggregations"""
import argparse
import json
import os
import sys
import time
import traceback
from collections.abc import Sequence
from pathlib import Path
import requests
import urllib3
from cmk.utils import password_store
from cmk.utils.local_secrets import AutomationUserSecret
from cmk.utils.user import UserId
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def parse_arguments(argv: Sequence[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(prog=__file__.rsplit("/", 1)[-1], description=__doc__)
parser.add_argument(
"-b",
"--base-url",
metavar="BASE_URL",
required=True,
help="The base URL to the monitoring environment, e.g. http://<hostname>/<site-id>",
)
parser.add_argument(
"-a",
"--aggr-name",
metavar="AGGR_NAME",
required=True,
help=(
"Name of the aggregation, not the aggregation group."
" It is possible that there are multiple aggregations with an equal name,"
" but you should ensure that it is a unique one to prevent confusion."
),
)
parser.add_argument(
"-u",
"--user-name",
metavar="USER",
required=False, # depends on --use-automation-user
help=(
"User ID of an automation user which is permitted to see all contents of the aggregation."
),
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--secret-reference",
help="Password store reference of the automation secret of the user.",
)
group.add_argument(
"-s",
"--secret",
metavar="SECRET",
help="Automation secret of the user.",
)
parser.add_argument(
"--use-automation-user",
action="store_true",
help="Use credentials from the local 'automation' user.",
)
parser.add_argument(
"-m",
"--auth-mode",
metavar="AUTH_MODE",
default="header",
# Kerberos auth support was removed with 2.4.0 but kept here to show a helpful error message
# in case a user still has configured it. Can be removed with 2.5.
choices=["basic", "digest", "header", "kerberos"],
help="Authentication mode, defaults to 'header'.",
)
parser.add_argument(
"-t",
"--timeout",
metavar="TIMEOUT",
type=int,
default=60,
help="HTTP connect timeout in seconds (Default: 60).",
)
parser.add_argument(
"-r",
"--track-downtimes",
action="store_true",
help="Track downtimes. This requires the hostname to be set.",
)
parser.add_argument(
"-n",
"--hostname",
metavar="HOSTNAME",
default=None,
help="The hostname for which this check is run.",
)
parser.add_argument(
"--in-downtime",
metavar="S",
choices=["normal", "ok", "warn"],
default="normal",
help=(
"S can be 'ok' or 'warn'. Force this state if the aggregate is in scheduled downtime."
" OK states will always be unchanged."
),
)
parser.add_argument(
"--acknowledged",
metavar="S",
choices=["normal", "ok", "warn"],
default="normal",
help=("Same as --in-downtime, but for acknowledged aggregates."),
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
help="Enable debug mode.",
)
return parser.parse_args(argv)
def _make_credentials(args: argparse.Namespace) -> tuple[str, str]:
if args.use_automation_user:
try:
return "automation", AutomationUserSecret(UserId("automation")).read()
except (OSError, ValueError):
sys.stderr.write('Unable to read credentials for "automation" user.\n')
sys.exit(1)
if (user := args.user_name) is None:
sys.stderr.write("Please provide a valid user name.\n")
sys.exit(1)
if (ref := args.secret_reference) is not None:
pw_id, pw_file = ref.split(":", 1)
return user, password_store.lookup(Path(pw_file), pw_id)
if args.secret is not None:
return user, args.secret
sys.stderr.write("Please provide a valid login secret.\n")
sys.exit(1)
# returning int requires more refactoring atm
def main(argv: Sequence[str]) -> None:
args = parse_arguments(argv=argv)
username, secret = _make_credentials(args=args)
if args.track_downtimes and not args.hostname:
sys.stderr.write("Please provide a hostname when using downtime tracking.\n")
sys.exit(1)
def init_auth(pw: str) -> requests.auth.AuthBase | None:
match args.auth_mode:
case "kerberos":
raise ValueError(
"Kerberos auth is not supported anymore. Please have a look at "
"werk #16569 for further information."
)
case "digest":
return requests.auth.HTTPDigestAuth(username, pw)
case "basic":
return requests.auth.HTTPBasicAuth(username, pw)
case "header":
return None
case other:
raise ValueError(f"Unknown auth mode: {other!r}")
endpoint_url = f"{args.base_url.rstrip('/')}/check_mk/api/1.0/domain-types/bi_aggregation/actions/aggregation_state/invoke"
auth = init_auth(secret)
if args.debug:
sys.stderr.write("URL: %s\n" % endpoint_url)
try:
r = requests.post(
endpoint_url,
timeout=args.timeout,
auth=auth,
headers={"Authorization": f"Bearer {username} {secret}"} if not auth else None,
json={"filter_names": [args.aggr_name]},
)
r.raise_for_status()
raw_response = r.text
except requests.Timeout:
sys.stdout.write("ERROR: Socket timeout while opening URL: %s\n" % (endpoint_url))
sys.exit(3)
except requests.URLRequired as e:
sys.stdout.write("UNKNOWN: %s\n" % e)
sys.exit(3)
except Exception as e:
sys.stdout.write(
f"ERROR: Exception while opening URL: {endpoint_url} - {e}\n{traceback.format_exc()}"
)
sys.exit(3)
try:
response_data = json.loads(raw_response)
except Exception as e:
sys.stdout.write(f"ERROR: Invalid response ({e}): {raw_response}\n")
sys.exit(3)
try:
aggr_state = response_data["aggregations"][args.aggr_name]["state"]
except KeyError:
sys.stdout.write(
f"ERROR Aggregation {args.aggr_name} does not exist or user is not permitted"
)
sys.exit(3)
if aggr_state == -1:
aggr_state = 3
aggr_output = "Aggregation state is %s" % ["OK", "WARN", "CRIT", "UNKNOWN"][aggr_state]
# Handle downtimes and acknowledgements
is_aggr_in_downtime = response_data["aggregations"][args.aggr_name]["in_downtime"]
if args.in_downtime != "normal" and is_aggr_in_downtime:
aggr_output += ", currently in downtime"
if args.in_downtime == "ok":
aggr_state = 0
else: # "warn"
aggr_state = min(aggr_state, 1)
if args.track_downtimes:
# connect to livestatus
try:
import livestatus
except ImportError:
sys.stderr.write(
"The python livestatus api module is missing. Please install from\n"
"Check_MK livestatus sources to a python import path.\n"
)
sys.exit(1)
socket_path = Path(os.environ["OMD_ROOT"]) / "tmp/run/live"
conn = livestatus.SingleSiteConnection(f"unix:{socket_path}")
now = time.time()
# find out if, according to previous tracking, there already is a downtime
ids = conn.query_table(
(
"GET downtimes\n"
"Columns: id\n"
"Filter: host_name = %s\n"
"Filter: service_description = %s\n"
"Filter: author = tracking\n"
"Filter: end_time > %d"
)
% (args.hostname, args.aggr_name, now)
)
downtime_tracked = len(ids) > 0
if downtime_tracked != is_aggr_in_downtime:
# there is a discrepance between tracked downtime state and the real state
if is_aggr_in_downtime:
# need to track downtime
conn.command(
"[%d] SCHEDULE_SVC_DOWNTIME;%s;%s;%d;%d;1;0;0;"
"tracking;Automatic downtime"
% (now, args.hostname, args.aggr_name, now, 2147483647)
)
else:
for dt_id in ids:
conn.command("[%d] DEL_SVC_DOWNTIME;%d" % (now, dt_id[0]))
is_aggr_acknowledged = response_data["aggregations"][args.aggr_name]["acknowledged"]
if args.acknowledged != "normal" and is_aggr_acknowledged:
aggr_output += ", is acknowledged"
if args.acknowledged == "ok":
aggr_state = 0
else: # "warn"
aggr_state = min(aggr_state, 1)
sys.stdout.write("%s\n" % aggr_output)
sys.exit(aggr_state)
if __name__ == "__main__":
main(sys.argv[1:])