This repository has been archived by the owner on Feb 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgmond-influxdb-bridge.py
executable file
·211 lines (175 loc) · 8.11 KB
/
gmond-influxdb-bridge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/python
import argparse
import influxdb
import json
import re
import socket
import time
from lxml import etree
from os import path
from telnetlib import Telnet
argparser = argparse.ArgumentParser()
argparser.add_argument('-d', '--debug', help="Set debug level - the higher the level, the further down the rabbit hole...")
argparser.add_argument('-f', '--config', help="Config file to load")
argparser.parse_args()
args = argparser.parse_args()
if not args.config:
print "Could not load module 'telepathy'.\nPlease specify a configuration file via -f"
exit(1)
def D(level, msg):
if args.debug and int(args.debug) >= level:
print "DEBUG{0} :: {1}".format(level, msg)
def INFO(msg):
print "INFO :: {0}".format(msg)
def WARN(msg):
print "WARN :: {0}".format(msg)
def ERR(msg):
print "ERROR :: {0}".format(msg)
def ERREXIT(msg):
print "ERROR :: {0}".format(msg)
exit(1)
def parse_config(config_filename):
# hat tip to riquetd for code to parse comments out of json config files
comment_re = re.compile(
'(^)?[^\S\n]*/(?:\*(.*?)\*/[^\S\n]*|/[^\n]*)($)?',
re.DOTALL | re.MULTILINE
)
try:
with open(config_filename) as f:
content = ''.join(f.readlines())
match = comment_re.search(content)
while match:
content = content[:match.start()] + content[match.end():]
match = comment_re.search(content)
config_data = json.loads(content)
f.close()
except ValueError as e:
ERR ("Failed to load config, invalid JSON:")
ERREXIT (" {0}".format(e))
for val in ['hosts', 'db']:
if not val in config_data:
ERREXIT ("missing '{0}' config value - exiting.".format(val))
for val in ['host', 'port', 'name', 'user', 'pass']:
if not val in config_data['db']:
ERREXIT ("missing db['{0}'] config value - exiting.".format(val))
return config_data
def get_xml_data(hostname, timeout, node_failures, node_failure_threshold):
xml_data = None
try:
t = Telnet()
t.open(hostname, 8649, timeout)
except socket.timeout:
# currently only incrementing failure count for nodes that time out.
# the other failures happen quickly enough to have little to no impact
# on total elapsed time, and thus aren't worthy worrying about
ERR ("timeout connecting to host {0}. recording failure".format(hostname))
node_failures[hostname] += 1
WARN ("host {0} has failed {1} times - {2} more failures before removal from host list".format(hostname, node_failures[hostname], node_failure_threshold - node_failures[hostname]))
return "node_failed"
except socket.gaierror:
ERR ("host not found: {0}".format(hostname))
except socket.error:
ERR ("could not connect to host - connection refused: {0}".format(hostname))
else:
D (1, "successfully connected to host {0}".format(hostname))
try:
xml_data = t.read_all()
except:
ERR ("couldn't read data from host {0}".format(hostname))
else:
D (3, "XML dump:\n{0}".format(xml_data))
t.close()
D (1, "connection to {0} closed".format(hostname))
return xml_data
def parse_metrics(xml_data, payload):
metrics = set([])
root_elem = etree.fromstring(xml_data)
cluster_name = root_elem[0].attrib['NAME']
# Build a unique set of metric names found in any host whose metrics are collected
# by the current gmond_host. Should allow a single instance of the bridge to poll
# the collector hosts for multiple clusters without forcing a metrics set on them
for metric_elem in root_elem.findall(".//METRIC"):
metrics.add(metric_elem.attrib['NAME'])
# Dump any metrics are blacklisted (usually just values that aren't number types)
metrics -= metrics_blacklist
D (2, "metrics found: {0}\n{1}".format(len(metrics), metrics))
for metric_name in metrics:
metric_data = {'name': metric_name, 'columns': columns, 'points': [] }
for metric_elem in root_elem.findall(".//METRIC[@NAME='{0}']".format(metric_name)):
metric_type = metric_elem.attrib['TYPE']
metric_value = sanitize_metric(metric_elem.attrib['VAL'], metric_type)
# findall() because I don't trust gmond to always report EXTRA_ELEMENTs in the same order
group = next(metric_elem.iterfind("EXTRA_DATA/EXTRA_ELEMENT[@NAME='GROUP']")).attrib['VAL']
host_name=metric_elem.getparent().attrib['NAME']
points = [cluster_name, host_name, metric_value, group, epoch_time]
D (3, "host={0} : group={1} : metric_name={2} : metric_value={3} : metric_type={4} : val_class={5}".format(host_name, group, metric_name, metric_value, metric_type, metric_value.__class__.__name__))
metric_data['points'].append(points)
payload.append(metric_data)
def sanitize_metric(value, datatype):
D (3, "sanitize_metric: datatype = {0}".format(datatype))
if datatype == "float" or datatype == "double":
D (3, "converting to float")
return float(value)
elif datatype == "uint16" or datatype == "uint32":
D (3, "converting to int")
return int(value)
else:
WARN ("non-number value detected, which is almost certainly a bug in gmond-influxdb-bridge - you should talk to github.com/cboggs")
return value
def push_metrics(db_host, db_port, db_user, db_pass, db_name, payload):
D (1, "db_host={0} : db_port={1} : db_user={2} : db_pass={3} : db_name={4}".format(db_host, db_port, db_user, db_pass, db_name))
client = influxdb.InfluxDBClient(db_host, db_port, db_user, db_pass, db_name)
try:
client.write_points(payload, 's')
except:
ERR ("could not write data points to InfluxDB")
else:
D (1, "successfully wrote data points to InfluxDB")
config = parse_config(args.config)
gmond_hosts = set(config['hosts'])
metrics_blacklist = set(config['metrics_blacklist'])
columns = config['columns']
interval = config['interval']
timeout = config['timeout']
node_failure_threshold = config['node_failure_threshold']
db = config['db']
INFO ("bridge starting up")
D (1, "gmond_hosts: {0}".format(gmond_hosts))
D (1, "metrics_blacklist: {0}".format(metrics_blacklist))
D (1, "columns: {0}".format(columns))
D (1, "interval: {0}".format(interval))
D (1, "timeout: {0}".format(timeout))
D (1, "db connection: http://{0}:{1}/db/{2}/series?u={3}&p={4}".format(db['host'], db['port'], db['name'], db['user'], db['pass']))
node_failures = {}
for host in gmond_hosts:
node_failures[host] = 0
while True:
epoch_time = int(time.time())
D (1, "epoch time: {0}".format(epoch_time))
payload = []
nodes_to_remove = []
for host in gmond_hosts:
xml_data = get_xml_data(host, timeout, node_failures, node_failure_threshold)
if xml_data and xml_data != 'node_failed':
parse_metrics(xml_data, payload)
elif not xml_data:
WARN ("no data found on host {0}".format(host))
elif xml_data == 'node_failed' and node_failures[host] >= node_failure_threshold:
WARN ("marking host {0} for removal".format(host))
nodes_to_remove.append(host)
for host in nodes_to_remove:
WARN ("removed host {0} due to excessive failures".format(host))
gmond_hosts.remove(host)
if len(payload):
D (1, "pushing metrics to InfluxDB")
push_metrics(db['host'], db['port'], db['user'], db['pass'], db['name'], payload)
elapsed_time = int(time.time()) - epoch_time
D (1, "elapsed time: {0}s".format(str(elapsed_time)))
# adjust sleep time in an attempt to keep a consistent publish interval
# don't sleep at all if elapsed_time > interval to minimize loss of resolution
if elapsed_time < interval:
adjusted_sleep = interval - elapsed_time
D (1, "sleep time adjusted to {0}s".format(str(adjusted_sleep)))
time.sleep(adjusted_sleep)
else:
WARN ("elapsed time >= polling interval, skipping sleep (you may want to run multiple instances of gmond-influxdb-bridge, each polling fewer gmonds)\n")