| 
 | 1 | +/*  | 
 | 2 | + * Licensed to the Apache Software Foundation (ASF) under one  | 
 | 3 | + * or more contributor license agreements.  See the NOTICE file  | 
 | 4 | + * distributed with this work for additional information  | 
 | 5 | + * regarding copyright ownership.  The ASF licenses this file  | 
 | 6 | + * to you under the Apache License, Version 2.0 (the  | 
 | 7 | + * "License"); you may not use this file except in compliance  | 
 | 8 | + * with the License.  You may obtain a copy of the License at  | 
 | 9 | + *  | 
 | 10 | + *   http://www.apache.org/licenses/LICENSE-2.0  | 
 | 11 | + *  | 
 | 12 | + * Unless required by applicable law or agreed to in writing,  | 
 | 13 | + * software distributed under the License is distributed on an  | 
 | 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  | 
 | 15 | + * KIND, either express or implied.  See the License for the  | 
 | 16 | + * specific language governing permissions and limitations  | 
 | 17 | + * under the License.  | 
 | 18 | + */  | 
 | 19 | + | 
 | 20 | +//  | 
 | 21 | +// C++11 only  | 
 | 22 | +//  | 
 | 23 | +// A multi-threaded client that calls proton::container::run() in one thread, sends  | 
 | 24 | +// messages in another and receives messages in a third.  | 
 | 25 | +//  | 
 | 26 | +// Note this client does not deal with flow-control. If the sender is faster  | 
 | 27 | +// than the receiver, messages will build up in memory on the sending side.  | 
 | 28 | +// See @ref multithreaded_client_flow_control.cpp for a more complex example with  | 
 | 29 | +// flow control.  | 
 | 30 | +//  | 
 | 31 | +// NOTE: no proper error handling  | 
 | 32 | + | 
 | 33 | +#include <proton/connection.hpp>  | 
 | 34 | +#include <proton/connection_options.hpp>  | 
 | 35 | +#include <proton/container.hpp>  | 
 | 36 | +#include <proton/message.hpp>  | 
 | 37 | +#include <proton/messaging_handler.hpp>  | 
 | 38 | +#include <proton/receiver.hpp>  | 
 | 39 | +#include <proton/sender.hpp>  | 
 | 40 | +#include <proton/work_queue.hpp>  | 
 | 41 | + | 
 | 42 | +#include <condition_variable>  | 
 | 43 | +#include <iostream>  | 
 | 44 | +#include <mutex>  | 
 | 45 | +#include <queue>  | 
 | 46 | +#include <sstream>  | 
 | 47 | +#include <string>  | 
 | 48 | +#include <thread>  | 
 | 49 | + | 
 | 50 | +// Handler for a single thread-safe sending and receiving connection.  | 
 | 51 | +class client : public proton::messaging_handler {  | 
 | 52 | +    // Invariant  | 
 | 53 | +    const std::string url_;  | 
 | 54 | +    const std::string address_;  | 
 | 55 | + | 
 | 56 | +    // Only used in proton handler thread  | 
 | 57 | +    proton::sender sender_;  | 
 | 58 | + | 
 | 59 | +    // Shared by proton and user threads, protected by lock_  | 
 | 60 | +    std::mutex lock_;  | 
 | 61 | +    proton::work_queue *work_queue_;  | 
 | 62 | +    std::condition_variable sender_ready_;  | 
 | 63 | +    std::queue<proton::message> messages_;  | 
 | 64 | +    std::condition_variable messages_ready_;  | 
 | 65 | + | 
 | 66 | +  public:  | 
 | 67 | +    client(const std::string& url, const std::string& address) : url_(url), address_(address) {}  | 
 | 68 | + | 
 | 69 | +    // Thread safe  | 
 | 70 | +    void send(const proton::message& msg) {  | 
 | 71 | +        // Use [=] to copy the message, we cannot pass it by reference since it  | 
 | 72 | +        // will be used in another thread.  | 
 | 73 | +        work_queue()->add([=]() { sender_.send(msg); });  | 
 | 74 | +    }  | 
 | 75 | + | 
 | 76 | +    // Thread safe  | 
 | 77 | +    proton::message receive() {  | 
 | 78 | +        std::unique_lock<std::mutex> l(lock_);  | 
 | 79 | +        while (messages_.empty()) messages_ready_.wait(l);  | 
 | 80 | +        auto msg = std::move(messages_.front());  | 
 | 81 | +        messages_.pop();  | 
 | 82 | +        return msg;  | 
 | 83 | +    }  | 
 | 84 | + | 
 | 85 | +    // Thread safe  | 
 | 86 | +    void close() {  | 
 | 87 | +        work_queue()->add([=]() { sender_.connection().close(); });  | 
 | 88 | +    }  | 
 | 89 | + | 
 | 90 | +  private:  | 
 | 91 | + | 
 | 92 | +    proton::work_queue* work_queue() {  | 
 | 93 | +        // Wait till work_queue_ and sender_ are initialized.  | 
 | 94 | +        std::unique_lock<std::mutex> l(lock_);  | 
 | 95 | +        while (!work_queue_) sender_ready_.wait(l);  | 
 | 96 | +        return work_queue_;  | 
 | 97 | +    }  | 
 | 98 | + | 
 | 99 | +    // == messaging_handler overrides, only called in proton hander thread  | 
 | 100 | + | 
 | 101 | +    // Note: this example creates a connection when the container starts.  | 
 | 102 | +    // To create connections after the container has started, use  | 
 | 103 | +    // container::connect().  | 
 | 104 | +    // See @ref multithreaded_client_flow_control.cpp for an example.  | 
 | 105 | +    void on_container_start(proton::container& cont) override {  | 
 | 106 | +        cont.connect(url_);  | 
 | 107 | +    }  | 
 | 108 | + | 
 | 109 | +    void on_connection_open(proton::connection& conn) override {  | 
 | 110 | +        conn.open_sender(address_);  | 
 | 111 | +        conn.open_receiver(address_);  | 
 | 112 | +    }  | 
 | 113 | + | 
 | 114 | +    void on_sender_open(proton::sender& s) override {  | 
 | 115 | +        {  | 
 | 116 | +            // sender_ and work_queue_ must be set atomically  | 
 | 117 | +            std::lock_guard<std::mutex> l(lock_);  | 
 | 118 | +            sender_ = s;  | 
 | 119 | +            work_queue_ = &s.work_queue();  | 
 | 120 | +        }  | 
 | 121 | +        sender_ready_.notify_all();  | 
 | 122 | +    }  | 
 | 123 | + | 
 | 124 | +    void on_message(proton::delivery& dlv, proton::message& msg) override {  | 
 | 125 | +        {  | 
 | 126 | +            std::lock_guard<std::mutex> l(lock_);  | 
 | 127 | +            messages_.push(msg);  | 
 | 128 | +        }  | 
 | 129 | +        messages_ready_.notify_all();  | 
 | 130 | +    }  | 
 | 131 | + | 
 | 132 | +    void on_error(const proton::error_condition& e) override {  | 
 | 133 | +        std::cerr << "unexpected error: " << e << std::endl;  | 
 | 134 | +        exit(1);  | 
 | 135 | +    }  | 
 | 136 | +};  | 
 | 137 | + | 
 | 138 | +int main(int argc, const char** argv) {  | 
 | 139 | +    try {  | 
 | 140 | +        if (argc != 4) {  | 
 | 141 | +            std ::cerr <<  | 
 | 142 | +                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"  | 
 | 143 | +                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"  | 
 | 144 | +                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"  | 
 | 145 | +                "MESSAGE-COUNT: number of messages to send\n";  | 
 | 146 | +            return 1;  | 
 | 147 | +        }  | 
 | 148 | +        const char *url = argv[1];  | 
 | 149 | +        const char *address = argv[2];  | 
 | 150 | +        int n_messages = atoi(argv[3]);  | 
 | 151 | + | 
 | 152 | +        client cl(url, address);  | 
 | 153 | +        proton::container container(cl);  | 
 | 154 | +        std::thread container_thread([&]() { container.run(); });  | 
 | 155 | + | 
 | 156 | +        std::thread sender([&]() {  | 
 | 157 | +                for (int i = 0; i < n_messages; ++i) {  | 
 | 158 | +                    proton::message msg(std::to_string(i + 1));  | 
 | 159 | +                    cl.send(msg);  | 
 | 160 | +                    std::cout << "sent: " << msg.body() << std::endl;  | 
 | 161 | +                }  | 
 | 162 | +            });  | 
 | 163 | + | 
 | 164 | +        int received = 0;  | 
 | 165 | +        std::thread receiver([&]() {  | 
 | 166 | +                for (int i = 0; i < n_messages; ++i) {  | 
 | 167 | +                    auto msg = cl.receive();  | 
 | 168 | +                    std::cout << "received: " << msg.body() << std::endl;  | 
 | 169 | +                    ++received;  | 
 | 170 | +                }  | 
 | 171 | +            });  | 
 | 172 | + | 
 | 173 | +        sender.join();  | 
 | 174 | +        receiver.join();  | 
 | 175 | +        cl.close();  | 
 | 176 | +        container_thread.join();  | 
 | 177 | +        std::cout << "received " << received << " messages" << std::endl;  | 
 | 178 | + | 
 | 179 | +        return 0;  | 
 | 180 | +    } catch (const std::exception& e) {  | 
 | 181 | +        std::cerr << e.what() << std::endl;  | 
 | 182 | +    }  | 
 | 183 | + | 
 | 184 | +    return 1;  | 
 | 185 | +}  | 
0 commit comments