Skip to content

Commit a5f88a1

Browse files
committed
PROTON-1557: c++ improve multi-threaded clients
2 clients: - multithreaded_client.cpp: simple send thread, receive thread, run thread - multithreaded_client_flow_control: multi-connection, block for flow control Changes: - reduced needless diff between examples - use separate work_queue* to clarify separate thread safety rules from sender - took work_queue->add() out of lock to emphasize it is thread safe - use fixed argument list, same arg order
1 parent 68c8cf4 commit a5f88a1

4 files changed

+272
-170
lines changed

examples/cpp/CMakeLists.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ if(HAS_CPP11)
6363
# Examples that require C++11
6464
foreach(example
6565
scheduled_send
66-
send_recv_mt
66+
multithreaded_client
67+
multithreaded_client_flow_control
6768
)
6869
add_executable(${example} ${example}.cpp)
6970
endforeach()

examples/cpp/mt_queue.hpp

-102
This file was deleted.

examples/cpp/multithreaded_client.cpp

+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
//
21+
// C++11 only
22+
//
23+
// A multi-threaded client that calls proton::container::run() in one thread, sends
24+
// messages in another and receives messages in a third.
25+
//
26+
// Note this client does not deal with flow-control. If the sender is faster
27+
// than the receiver, messages will build up in memory on the sending side.
28+
// See @ref multithreaded_client_flow_control.cpp for a more complex example with
29+
// flow control.
30+
//
31+
// NOTE: no proper error handling
32+
33+
#include <proton/connection.hpp>
34+
#include <proton/connection_options.hpp>
35+
#include <proton/container.hpp>
36+
#include <proton/message.hpp>
37+
#include <proton/messaging_handler.hpp>
38+
#include <proton/receiver.hpp>
39+
#include <proton/sender.hpp>
40+
#include <proton/work_queue.hpp>
41+
42+
#include <condition_variable>
43+
#include <iostream>
44+
#include <mutex>
45+
#include <queue>
46+
#include <sstream>
47+
#include <string>
48+
#include <thread>
49+
50+
// Handler for a single thread-safe sending and receiving connection.
51+
class client : public proton::messaging_handler {
52+
// Invariant
53+
const std::string url_;
54+
const std::string address_;
55+
56+
// Only used in proton handler thread
57+
proton::sender sender_;
58+
59+
// Shared by proton and user threads, protected by lock_
60+
std::mutex lock_;
61+
proton::work_queue *work_queue_;
62+
std::condition_variable sender_ready_;
63+
std::queue<proton::message> messages_;
64+
std::condition_variable messages_ready_;
65+
66+
public:
67+
client(const std::string& url, const std::string& address) : url_(url), address_(address) {}
68+
69+
// Thread safe
70+
void send(const proton::message& msg) {
71+
// Use [=] to copy the message, we cannot pass it by reference since it
72+
// will be used in another thread.
73+
work_queue()->add([=]() { sender_.send(msg); });
74+
}
75+
76+
// Thread safe
77+
proton::message receive() {
78+
std::unique_lock<std::mutex> l(lock_);
79+
while (messages_.empty()) messages_ready_.wait(l);
80+
auto msg = std::move(messages_.front());
81+
messages_.pop();
82+
return msg;
83+
}
84+
85+
// Thread safe
86+
void close() {
87+
work_queue()->add([=]() { sender_.connection().close(); });
88+
}
89+
90+
private:
91+
92+
proton::work_queue* work_queue() {
93+
// Wait till work_queue_ and sender_ are initialized.
94+
std::unique_lock<std::mutex> l(lock_);
95+
while (!work_queue_) sender_ready_.wait(l);
96+
return work_queue_;
97+
}
98+
99+
// == messaging_handler overrides, only called in proton hander thread
100+
101+
// Note: this example creates a connection when the container starts.
102+
// To create connections after the container has started, use
103+
// container::connect().
104+
// See @ref multithreaded_client_flow_control.cpp for an example.
105+
void on_container_start(proton::container& cont) override {
106+
cont.connect(url_);
107+
}
108+
109+
void on_connection_open(proton::connection& conn) override {
110+
conn.open_sender(address_);
111+
conn.open_receiver(address_);
112+
}
113+
114+
void on_sender_open(proton::sender& s) override {
115+
{
116+
// sender_ and work_queue_ must be set atomically
117+
std::lock_guard<std::mutex> l(lock_);
118+
sender_ = s;
119+
work_queue_ = &s.work_queue();
120+
}
121+
sender_ready_.notify_all();
122+
}
123+
124+
void on_message(proton::delivery& dlv, proton::message& msg) override {
125+
{
126+
std::lock_guard<std::mutex> l(lock_);
127+
messages_.push(msg);
128+
}
129+
messages_ready_.notify_all();
130+
}
131+
132+
void on_error(const proton::error_condition& e) override {
133+
std::cerr << "unexpected error: " << e << std::endl;
134+
exit(1);
135+
}
136+
};
137+
138+
int main(int argc, const char** argv) {
139+
try {
140+
if (argc != 4) {
141+
std ::cerr <<
142+
"Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
143+
"CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
144+
"AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
145+
"MESSAGE-COUNT: number of messages to send\n";
146+
return 1;
147+
}
148+
const char *url = argv[1];
149+
const char *address = argv[2];
150+
int n_messages = atoi(argv[3]);
151+
152+
client cl(url, address);
153+
proton::container container(cl);
154+
std::thread container_thread([&]() { container.run(); });
155+
156+
std::thread sender([&]() {
157+
for (int i = 0; i < n_messages; ++i) {
158+
proton::message msg(std::to_string(i + 1));
159+
cl.send(msg);
160+
std::cout << "sent: " << msg.body() << std::endl;
161+
}
162+
});
163+
164+
int received = 0;
165+
std::thread receiver([&]() {
166+
for (int i = 0; i < n_messages; ++i) {
167+
auto msg = cl.receive();
168+
std::cout << "received: " << msg.body() << std::endl;
169+
++received;
170+
}
171+
});
172+
173+
sender.join();
174+
receiver.join();
175+
cl.close();
176+
container_thread.join();
177+
std::cout << "received " << received << " messages" << std::endl;
178+
179+
return 0;
180+
} catch (const std::exception& e) {
181+
std::cerr << e.what() << std::endl;
182+
}
183+
184+
return 1;
185+
}

0 commit comments

Comments
 (0)