Skip to content

Commit 38af0f6

Browse files
committed
collector: remove deprecated RPC code
Change-Id: I995398ee239754a4d333460112700caeec516eb5
1 parent 512b7f0 commit 38af0f6

File tree

8 files changed

+4
-203
lines changed

8 files changed

+4
-203
lines changed

ceilometer/collector.py

-19
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,9 @@
4545
default=False,
4646
help='Requeue the event on the collector event queue '
4747
'when the collector fails to dispatch it.'),
48-
cfg.BoolOpt('enable_rpc',
49-
default=False,
50-
help='Enable the RPC functionality of collector. This '
51-
'functionality is now deprecated in favour of notifier '
52-
'publisher and queues.')
5348
]
5449

5550
cfg.CONF.register_opts(OPTS, group="collector")
56-
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
57-
group='publisher_rpc')
5851
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
5952
group='publisher_notifier')
6053
cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging',
@@ -73,7 +66,6 @@ def start(self):
7366
# ensure dispatcher is configured before starting other services
7467
dispatcher_managers = dispatcher.load_dispatcher_manager()
7568
(self.meter_manager, self.event_manager) = dispatcher_managers
76-
self.rpc_server = None
7769
self.sample_listener = None
7870
self.event_listener = None
7971
super(CollectorService, self).start()
@@ -83,12 +75,6 @@ def start(self):
8375

8476
transport = messaging.get_transport(optional=True)
8577
if transport:
86-
if cfg.CONF.collector.enable_rpc:
87-
LOG.warning('RPC collector is deprecated in favour of queues. '
88-
'Please switch to notifier publisher.')
89-
self.rpc_server = messaging.get_rpc_server(
90-
transport, cfg.CONF.publisher_rpc.metering_topic, self)
91-
9278
if list(self.meter_manager):
9379
sample_target = oslo_messaging.Target(
9480
topic=cfg.CONF.publisher_notifier.metering_topic)
@@ -109,9 +95,6 @@ def start(self):
10995
requeue_event_on_dispatcher_error))
11096
self.event_listener.start()
11197

112-
if cfg.CONF.collector.enable_rpc:
113-
self.rpc_server.start()
114-
11598
if not cfg.CONF.collector.udp_address:
11699
# Add a dummy thread to have wait() working
117100
self.tg.add_timer(604800, lambda: None)
@@ -144,8 +127,6 @@ def start_udp(self):
144127

145128
def stop(self):
146129
self.udp_run = False
147-
if cfg.CONF.collector.enable_rpc and self.rpc_server:
148-
self.rpc_server.stop()
149130
if self.sample_listener:
150131
utils.kill_listeners([self.sample_listener])
151132
if self.event_listener:

ceilometer/messaging.py

-17
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,6 @@ def deserialize_context(self, context):
8181
oslo_serializer.JsonPayloadSerializer())
8282

8383

84-
def get_rpc_server(transport, topic, endpoint):
85-
"""Return a configured oslo_messaging rpc server."""
86-
cfg.CONF.import_opt('host', 'ceilometer.service')
87-
target = oslo_messaging.Target(server=cfg.CONF.host, topic=topic)
88-
return oslo_messaging.get_rpc_server(transport, target,
89-
[endpoint], executor='threading',
90-
serializer=_SERIALIZER)
91-
92-
93-
def get_rpc_client(transport, retry=None, **kwargs):
94-
"""Return a configured oslo_messaging RPCClient."""
95-
target = oslo_messaging.Target(**kwargs)
96-
return oslo_messaging.RPCClient(transport, target,
97-
serializer=_SERIALIZER,
98-
retry=retry)
99-
100-
10184
def get_notification_listener(transport, targets, endpoints,
10285
allow_requeue=False):
10386
"""Return a configured oslo_messaging notification listener."""

ceilometer/opts.py

-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ def list_opts():
104104
('polling', ceilometer.agent.manager.OPTS),
105105
('publisher', ceilometer.publisher.utils.OPTS),
106106
('publisher_notifier', ceilometer.publisher.messaging.NOTIFIER_OPTS),
107-
('publisher_rpc', ceilometer.publisher.messaging.RPC_OPTS),
108107
('rgw_admin_credentials', ceilometer.objectstore.rgw.CREDENTIAL_OPTS),
109108
# NOTE(sileht): the configuration file contains only the options
110109
# for the password plugin that handles keystone v2 and v3 API

ceilometer/publisher/messaging.py

+1-32
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,6 @@
3535

3636
LOG = log.getLogger(__name__)
3737

38-
RPC_OPTS = [
39-
cfg.StrOpt('metering_topic',
40-
default='metering',
41-
deprecated_for_removal=True,
42-
help='The topic that ceilometer uses for metering messages.',
43-
deprecated_group="DEFAULT",
44-
),
45-
]
46-
4738
NOTIFIER_OPTS = [
4839
cfg.StrOpt('metering_topic',
4940
default='metering',
@@ -63,8 +54,6 @@
6354
)
6455
]
6556

66-
cfg.CONF.register_opts(RPC_OPTS,
67-
group="publisher_rpc")
6857
cfg.CONF.register_opts(NOTIFIER_OPTS,
6958
group="publisher_notifier")
7059
cfg.CONF.import_opt('host', 'ceilometer.service')
@@ -122,7 +111,7 @@ def publish_samples(self, context, samples):
122111
sample, cfg.CONF.publisher.telemetry_secret)
123112
for sample in samples
124113
]
125-
topic = cfg.CONF.publisher_rpc.metering_topic
114+
topic = cfg.CONF.publisher_notifier.metering_topic
126115
self.local_queue.append((context, topic, meters))
127116

128117
if self.per_meter_topic:
@@ -201,26 +190,6 @@ def _send(self, context, topic, meters):
201190
"""Send the meters to the messaging topic."""
202191

203192

204-
class RPCPublisher(MessagingPublisher):
205-
def __init__(self, parsed_url):
206-
super(RPCPublisher, self).__init__(parsed_url)
207-
208-
options = urlparse.parse_qs(parsed_url.query)
209-
self.target = options.get('target', ['record_metering_data'])[0]
210-
211-
self.rpc_client = messaging.get_rpc_client(
212-
messaging.get_transport(),
213-
retry=self.retry, version='1.0'
214-
)
215-
216-
def _send(self, context, topic, meters):
217-
try:
218-
self.rpc_client.prepare(topic=topic).cast(context, self.target,
219-
data=meters)
220-
except oslo_messaging.MessageDeliveryFailure as e:
221-
raise_delivery_failure(e)
222-
223-
224193
class NotifierPublisher(MessagingPublisher):
225194
def __init__(self, parsed_url, default_topic):
226195
super(NotifierPublisher, self).__init__(parsed_url)

ceilometer/tests/functional/test_collector.py

-29
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717
import mock
1818
import msgpack
1919
from oslo_config import fixture as fixture_config
20-
from oslo_context import context
2120
import oslo_messaging
2221
from oslo_utils import timeutils
2322
from oslotest import mockpatch
2423
from stevedore import extension
2524

2625
from ceilometer import collector
2726
from ceilometer import dispatcher
28-
from ceilometer import messaging
2927
from ceilometer.publisher import utils
3028
from ceilometer import sample
3129
from ceilometer.tests import base as tests_base
@@ -208,18 +206,6 @@ def test_only_udp(self, udp_start, rpc_start):
208206
self.assertEqual(0, rpc_start.call_count)
209207
self.assertEqual(1, udp_start.call_count)
210208

211-
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
212-
@mock.patch.object(collector.CollectorService, 'start_udp')
213-
def test_only_rpc(self, udp_start, rpc_start):
214-
"""Check that only RPC is started if udp_address is empty."""
215-
self.CONF.set_override('enable_rpc', True, group='collector')
216-
self.CONF.set_override('udp_address', '', group='collector')
217-
self._setup_fake_dispatcher()
218-
self.srv.start()
219-
# two calls because two servers (notification and rpc)
220-
self.assertEqual(2, rpc_start.call_count)
221-
self.assertEqual(0, udp_start.call_count)
222-
223209
def test_udp_receive_valid_encoding(self):
224210
self._setup_messaging(False)
225211
mock_dispatcher = self._setup_fake_dispatcher()
@@ -231,21 +217,6 @@ def test_udp_receive_valid_encoding(self):
231217
mock_dispatcher.method_calls[0][1][0],
232218
"not-so-secret"))
233219

234-
@mock.patch('ceilometer.storage.impl_log.LOG')
235-
def test_collector_no_mock(self, mylog):
236-
self.CONF.set_override('enable_rpc', True, group='collector')
237-
self.CONF.set_override('udp_address', '', group='collector')
238-
mylog.info.side_effect = lambda *args: self.srv.stop()
239-
self.srv.start()
240-
241-
client = messaging.get_rpc_client(self.transport, version='1.0')
242-
cclient = client.prepare(topic='metering')
243-
cclient.cast(context.RequestContext(),
244-
'record_metering_data', data=[self.utf8_msg])
245-
self.srv.rpc_server.wait()
246-
mylog.info.assert_called_once_with(
247-
'metering data test for test_run_tasks: 1')
248-
249220
def _test_collector_requeue(self, listener):
250221

251222
mock_dispatcher = self._setup_fake_dispatcher()

ceilometer/tests/unit/publisher/test_messaging_publisher.py

+1-77
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919

2020
import mock
2121
from oslo_config import fixture as fixture_config
22-
from oslo_context import context
2322
from oslo_utils import netutils
2423
import testscenarios.testcase
2524

2625
from ceilometer.event.storage import models as event
27-
from ceilometer import messaging
2826
from ceilometer.publisher import messaging as msg_publisher
2927
from ceilometer import sample
3028
from ceilometer.tests import base as tests_base
@@ -103,76 +101,6 @@ def setUp(self):
103101
self.setup_messaging(self.CONF)
104102

105103

106-
class RpcOnlyPublisherTest(BasePublisherTestCase):
107-
def test_published_no_mock(self):
108-
publisher = msg_publisher.RPCPublisher(
109-
netutils.urlsplit('rpc://'))
110-
111-
endpoint = mock.MagicMock(['record_metering_data'])
112-
collector = messaging.get_rpc_server(
113-
self.transport, self.CONF.publisher_rpc.metering_topic, endpoint)
114-
endpoint.record_metering_data.side_effect = (lambda *args, **kwds:
115-
collector.stop())
116-
117-
collector.start()
118-
publisher.publish_samples(context.RequestContext(),
119-
self.test_sample_data)
120-
collector.wait()
121-
122-
class Matcher(object):
123-
@staticmethod
124-
def __eq__(data):
125-
for i, sample_item in enumerate(data):
126-
if (sample_item['counter_name'] !=
127-
self.test_sample_data[i].name):
128-
return False
129-
return True
130-
131-
endpoint.record_metering_data.assert_called_once_with(
132-
mock.ANY, data=Matcher())
133-
134-
def test_publish_target(self):
135-
publisher = msg_publisher.RPCPublisher(
136-
netutils.urlsplit('rpc://?target=custom_procedure_call'))
137-
cast_context = mock.MagicMock()
138-
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
139-
prepare.return_value = cast_context
140-
publisher.publish_samples(mock.MagicMock(),
141-
self.test_sample_data)
142-
143-
prepare.assert_called_once_with(
144-
topic=self.CONF.publisher_rpc.metering_topic)
145-
cast_context.cast.assert_called_once_with(
146-
mock.ANY, 'custom_procedure_call', data=mock.ANY)
147-
148-
def test_published_with_per_meter_topic(self):
149-
publisher = msg_publisher.RPCPublisher(
150-
netutils.urlsplit('rpc://?per_meter_topic=1'))
151-
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
152-
publisher.publish_samples(mock.MagicMock(),
153-
self.test_sample_data)
154-
155-
class MeterGroupMatcher(object):
156-
def __eq__(self, meters):
157-
return len(set(meter['counter_name']
158-
for meter in meters)) == 1
159-
160-
topic = self.CONF.publisher_rpc.metering_topic
161-
expected = [mock.call(topic=topic),
162-
mock.call().cast(mock.ANY, 'record_metering_data',
163-
data=mock.ANY),
164-
mock.call(topic=topic + '.test'),
165-
mock.call().cast(mock.ANY, 'record_metering_data',
166-
data=MeterGroupMatcher()),
167-
mock.call(topic=topic + '.test2'),
168-
mock.call().cast(mock.ANY, 'record_metering_data',
169-
data=MeterGroupMatcher()),
170-
mock.call(topic=topic + '.test3'),
171-
mock.call().cast(mock.ANY, 'record_metering_data',
172-
data=MeterGroupMatcher())]
173-
self.assertEqual(expected, prepare.mock_calls)
174-
175-
176104
class NotifierOnlyPublisherTest(BasePublisherTestCase):
177105

178106
@mock.patch('oslo_messaging.Notifier')
@@ -203,17 +131,13 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
203131
publisher_cls=msg_publisher.EventNotifierPublisher,
204132
test_data=BasePublisherTestCase.test_event_data,
205133
pub_func='publish_events', attr='event_type')),
206-
('rpc', dict(protocol="rpc",
207-
publisher_cls=msg_publisher.RPCPublisher,
208-
test_data=BasePublisherTestCase.test_sample_data,
209-
pub_func='publish_samples', attr='source')),
210134
]
211135

212136
def setUp(self):
213137
super(TestPublisher, self).setUp()
214138
self.topic = (self.CONF.publisher_notifier.event_topic
215139
if self.pub_func == 'publish_events' else
216-
self.CONF.publisher_rpc.metering_topic)
140+
self.CONF.publisher_notifier.metering_topic)
217141

218142

219143
class TestPublisherPolicy(TestPublisher):

setup.cfg

-3
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,6 @@ ceilometer.transformer =
218218

219219
ceilometer.publisher =
220220
test = ceilometer.publisher.test:TestPublisher
221-
meter_publisher = ceilometer.publisher.messaging:RPCPublisher
222-
meter = ceilometer.publisher.messaging:RPCPublisher
223-
rpc = ceilometer.publisher.messaging:RPCPublisher
224221
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
225222
udp = ceilometer.publisher.udp:UDPPublisher
226223
file = ceilometer.publisher.file:FilePublisher

tools/send_test_data.py

+2-25
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import make_test_data
3232
from oslo_config import cfg
33-
from oslo_context import context
3433
import oslo_messaging
3534
from six import moves
3635

@@ -39,11 +38,6 @@
3938
from ceilometer import service
4039

4140

42-
def send_batch_rpc(rpc_client, topic, batch):
43-
rpc_client.prepare(topic=topic).cast(context.RequestContext(),
44-
'record_metering_data', data=batch)
45-
46-
4741
def send_batch_notifier(notifier, topic, batch):
4842
notifier.sample({}, event_type=topic, payload=batch)
4943

@@ -58,13 +52,6 @@ def get_notifier(config_file):
5852
)
5953

6054

61-
def get_rpc_client(config_file):
62-
service.prepare_service(argv=['/', '--config-file', config_file])
63-
transport = messaging.get_transport()
64-
rpc_client = messaging.get_rpc_client(transport, version='1.0')
65-
return rpc_client
66-
67-
6855
def generate_data(send_batch, make_data_args, samples_count,
6956
batch_size, resources_count, topic):
7057
make_data_args.interval = 1
@@ -104,12 +91,6 @@ def generate_data(send_batch, make_data_args, samples_count,
10491

10592
def get_parser():
10693
parser = argparse.ArgumentParser()
107-
parser.add_argument(
108-
'--notify',
109-
dest='notify',
110-
type=bool,
111-
default=True
112-
)
11394

11495
parser.add_argument(
11596
'--batch-size',
@@ -148,12 +129,8 @@ def get_parser():
148129
def main():
149130
args = get_parser().parse_known_args()[0]
150131
make_data_args = make_test_data.get_parser().parse_known_args()[0]
151-
if args.notify:
152-
notifier = get_notifier(args.config_file)
153-
send_batch = functools.partial(send_batch_notifier, notifier)
154-
else:
155-
rpc_client = get_rpc_client(args.config_file)
156-
send_batch = functools.partial(send_batch_rpc, rpc_client)
132+
notifier = get_notifier(args.config_file)
133+
send_batch = functools.partial(send_batch_notifier, notifier)
157134
result_dir = args.result_dir
158135
del args.notify
159136
del args.config_file

0 commit comments

Comments
 (0)