Skip to content

Commit 40684da

Browse files
committed
collector: never allow to lose data
The current default allows to lose data very easily: if the dispatcher fails to send data to the backend (e.g. Gnocchi is down), then the dispatcher raises and the data are lost for ever. This is completely unacceptable, and nobody should be able to configure Ceilometer in that way. This patch entirely remove that option, and switch the behavior to something sane. Change-Id: I45cb3da84eb2a785f46b3ec676c1a052ce999206
1 parent 09aec58 commit 40684da

File tree

3 files changed

+21
-65
lines changed

3 files changed

+21
-65
lines changed

ceilometer/collector.py

+6-30
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,6 @@
3737
cfg.PortOpt('udp_port',
3838
default=4952,
3939
help='Port to which the UDP socket is bound.'),
40-
cfg.BoolOpt('requeue_sample_on_dispatcher_error',
41-
default=False,
42-
help='Requeue the sample on the collector sample queue '
43-
'when the collector fails to dispatch it. This is only valid '
44-
'if the sample come from the notifier publisher.'),
45-
cfg.BoolOpt('requeue_event_on_dispatcher_error',
46-
default=False,
47-
help='Requeue the event on the collector event queue '
48-
'when the collector fails to dispatch it.'),
4940
cfg.IntOpt('batch_size',
5041
default=1,
5142
help='Number of notification messages to wait before '
@@ -91,8 +82,7 @@ def start(self):
9182
messaging.get_batch_notification_listener(
9283
transport, [sample_target],
9384
[SampleEndpoint(self.meter_manager)],
94-
allow_requeue=(cfg.CONF.collector.
95-
requeue_sample_on_dispatcher_error),
85+
allow_requeue=True,
9686
batch_size=cfg.CONF.collector.batch_size,
9787
batch_timeout=cfg.CONF.collector.batch_timeout))
9888
self.sample_listener.start()
@@ -104,8 +94,7 @@ def start(self):
10494
messaging.get_batch_notification_listener(
10595
transport, [event_target],
10696
[EventEndpoint(self.event_manager)],
107-
allow_requeue=(cfg.CONF.collector.
108-
requeue_event_on_dispatcher_error),
97+
allow_requeue=True,
10998
batch_size=cfg.CONF.collector.batch_size,
11099
batch_timeout=cfg.CONF.collector.batch_timeout))
111100
self.event_listener.start()
@@ -158,9 +147,8 @@ def record_metering_data(self, context, data):
158147

159148

160149
class CollectorEndpoint(object):
161-
def __init__(self, dispatcher_manager, requeue_on_error):
150+
def __init__(self, dispatcher_manager):
162151
self.dispatcher_manager = dispatcher_manager
163-
self.requeue_on_error = requeue_on_error
164152

165153
def sample(self, messages):
166154
"""RPC endpoint for notification messages
@@ -172,28 +160,16 @@ def sample(self, messages):
172160
try:
173161
self.dispatcher_manager.map_method(self.method, samples)
174162
except Exception:
175-
if self.requeue_on_error:
176-
LOG.exception(_LE("Dispatcher failed to handle the %s, "
177-
"requeue it."), self.ep_type)
178-
return oslo_messaging.NotificationResult.REQUEUE
179-
raise
163+
LOG.exception(_LE("Dispatcher failed to handle the %s, "
164+
"requeue it."), self.ep_type)
165+
return oslo_messaging.NotificationResult.REQUEUE
180166

181167

182168
class SampleEndpoint(CollectorEndpoint):
183169
method = 'record_metering_data'
184170
ep_type = 'sample'
185171

186-
def __init__(self, dispatcher_manager):
187-
super(SampleEndpoint, self).__init__(
188-
dispatcher_manager,
189-
cfg.CONF.collector.requeue_sample_on_dispatcher_error)
190-
191172

192173
class EventEndpoint(CollectorEndpoint):
193174
method = 'record_events'
194175
ep_type = 'event'
195-
196-
def __init__(self, dispatcher_manager):
197-
super(EventEndpoint, self).__init__(
198-
dispatcher_manager,
199-
cfg.CONF.collector.requeue_event_on_dispatcher_error)

ceilometer/tests/functional/test_collector.py

-35
Original file line numberDiff line numberDiff line change
@@ -236,46 +236,11 @@ def _test_collector_requeue(self, listener, batch_listener=False):
236236
mock.Mock())
237237
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
238238
def test_collector_sample_requeue(self):
239-
self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
240-
group='collector')
241239
self._test_collector_requeue('sample_listener')
242240

243241
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
244242
mock.Mock())
245243
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
246244
def test_collector_event_requeue(self):
247-
self.CONF.set_override('requeue_event_on_dispatcher_error', True,
248-
group='collector')
249245
self.CONF.set_override('store_events', True, group='notification')
250246
self._test_collector_requeue('event_listener')
251-
252-
def _test_collector_no_requeue(self, listener):
253-
mock_dispatcher = self._setup_fake_dispatcher()
254-
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
255-
mock_dispatcher.record_metering_data.side_effect = (FakeException
256-
('boom'))
257-
mock_dispatcher.record_events.side_effect = (FakeException
258-
('boom'))
259-
260-
self.srv.start()
261-
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
262-
self.assertRaises(FakeException, endp.sample, [
263-
{'ctxt': {}, 'publisher_id': 'pub_id', 'event_type': 'event',
264-
'payload': {}, 'metadata': {}}])
265-
266-
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
267-
mock.Mock())
268-
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
269-
def test_collector_sample_no_requeue(self):
270-
self.CONF.set_override('requeue_sample_on_dispatcher_error', False,
271-
group='collector')
272-
self._test_collector_no_requeue('sample_listener')
273-
274-
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
275-
mock.Mock())
276-
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
277-
def test_collector_event_no_requeue(self):
278-
self.CONF.set_override('requeue_event_on_dispatcher_error', False,
279-
group='collector')
280-
self.CONF.set_override('store_events', True, group='notification')
281-
self._test_collector_no_requeue('event_listener')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
critical:
3+
- >
4+
The previous configuration options default for
5+
`requeue_sample_on_dispatcher_error' and
6+
`requeue_event_on_dispatcher_error' allowed to lose data very easily: if
7+
the dispatcher failed to send data to the backend (e.g. Gnocchi is down),
8+
then the dispatcher raised and the data were lost forever. This was
9+
completely unacceptable, and nobody should be able to configure Ceilometer
10+
in that way."
11+
12+
upgrade:
13+
- >
14+
The options `requeue_event_on_dispatcher_error' and
15+
`requeue_sample_on_dispatcher_error' have been enabled and removed.

0 commit comments

Comments
 (0)