Skip to content

Commit fa9236d

Browse files
committed
Add tx_recv file
1 parent eb4514b commit fa9236d

File tree

7 files changed

+148
-52
lines changed

7 files changed

+148
-52
lines changed

cpp/examples/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ foreach(example
6161
service_bus
6262
multithreaded_client
6363
multithreaded_client_flow_control
64-
tx_send)
64+
tx_send
65+
tx_recv)
6566
add_executable(${example} ${example}.cpp)
6667
target_link_libraries(${example} Proton::cpp Threads::Threads)
6768
endforeach()

cpp/examples/simple_recv.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class simple_recv : public proton::messaging_handler {
6161
}
6262

6363
if (expected == 0 || received < expected) {
64-
std::cout << msg.body() << std::endl;
64+
std::cout << "MessageID: " << msg.id() << " Body: " << msg.body() << std::endl;
6565
received++;
6666

6767
if (received == expected) {

cpp/examples/tx_recv.cpp

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
#include "options.hpp"
23+
24+
#include <proton/connection.hpp>
25+
#include <proton/container.hpp>
26+
#include <proton/message.hpp>
27+
#include <proton/message_id.hpp>
28+
#include <proton/messaging_handler.hpp>
29+
#include <proton/types.hpp>
30+
#include <proton/transaction.hpp>
31+
32+
#include <iostream>
33+
#include <map>
34+
#include <string>
35+
36+
#include <chrono>
37+
#include <thread>
38+
39+
class tx_recv : public proton::messaging_handler, proton::transaction_handler {
40+
private:
41+
proton::receiver receiver;
42+
std::string url;
43+
int expected;
44+
int batch_size;
45+
int current_batch = 0;
46+
int committed = 0;
47+
48+
proton::container *container;
49+
proton::transaction transaction;
50+
proton::connection connection;
51+
public:
52+
tx_recv(const std::string &s, int c, int b):
53+
url(s), expected(c), batch_size(b) {}
54+
55+
void on_container_start(proton::container &c) override {
56+
container = &c;
57+
receiver = c.open_receiver(url);
58+
connection = receiver.connection();
59+
std::cout << " [on_container_start] declare_txn started..." << std::endl;
60+
c.declare_transaction(connection, *this);
61+
std::cout << " [on_container_start] completed!!" << &transaction
62+
<< std::endl;
63+
}
64+
65+
void on_transaction_declare_failed(proton::transaction) {}
66+
void on_transaction_commit_failed(proton::transaction) {
67+
std::cout << "Transaction Commit Failed" << std::endl;
68+
connection.close();
69+
exit(-1);
70+
}
71+
72+
void on_transaction_declared(proton::transaction t) override {
73+
std::cout << "[on_transaction_declared] txn called " << (&t)
74+
<< std::endl;
75+
// connection.close();
76+
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
77+
<< "\t" << transaction.is_empty() << std::endl;
78+
receiver.add_credit(batch_size);
79+
transaction = t;
80+
}
81+
82+
void on_message(proton::delivery &d, proton::message &msg) override {
83+
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl;
84+
transaction.accept(d);
85+
current_batch += 1;
86+
if(current_batch == batch_size) {
87+
transaction = proton::transaction(); // null
88+
}
89+
}
90+
91+
void on_transaction_committed(proton::transaction t) override {
92+
committed += current_batch;
93+
current_batch = 0;
94+
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
95+
if(committed == expected) {
96+
std::cout << "All messages committed" << std::endl;
97+
connection.close();
98+
}
99+
else {
100+
container->declare_transaction(connection, *this);
101+
}
102+
}
103+
104+
};
105+
106+
int main(int argc, char **argv) {
107+
std::string address("127.0.0.1:5672/examples");
108+
int message_count = 9;
109+
int batch_size = 3;
110+
example::options opts(argc, argv);
111+
112+
opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
113+
opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT");
114+
opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE");
115+
116+
try {
117+
opts.parse();
118+
119+
tx_recv recv(address, message_count, batch_size);
120+
proton::container(recv).run();
121+
122+
return 0;
123+
} catch (const example::bad_option& e) {
124+
std::cout << opts << std::endl << e.what() << std::endl;
125+
} catch (const std::exception& e) {
126+
std::cerr << e.what() << std::endl;
127+
}
128+
129+
return 1;
130+
}

cpp/examples/tx_send.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,14 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
4949
int confirmed = 0;
5050

5151
proton::container *container;
52-
// proton::transaction_handler transaction_handler;
5352
proton::transaction transaction;
5453
proton::connection connection;
5554
public:
5655
tx_send(const std::string &s, int c, int b):
5756
url(s), total(c), batch_size(b), sent(0) {}
5857

5958
void on_container_start(proton::container &c) override {
60-
container = &c; // TODO: Fix error
59+
container = &c;
6160
sender = c.open_sender(url);
6261
connection = sender.connection();
6362
std::cout << " [on_container_start] declare_txn started..." << std::endl;
@@ -76,7 +75,6 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
7675
void on_transaction_declared(proton::transaction t) override {
7776
std::cout << "[on_transaction_declared] txn called " << (&t)
7877
<< std::endl;
79-
// connection.close();
8078
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
8179
<< "\t" << transaction.is_empty() << std::endl;
8280
transaction = t;
@@ -85,23 +83,22 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
8583
}
8684

8785
void on_sendable(proton::sender &s) override {
88-
// send();
8986
std::cout << " [OnSendable] transaction: " << &transaction
9087
<< std::endl;
9188
send(s);
9289
}
9390

9491
void send(proton::sender &s) {
95-
// TODO: Add more condition in while loop
92+
static int unique_id = 10000;
9693
while (!transaction.is_empty() && sender.credit() &&
9794
(committed + current_batch) < total) {
9895
proton::message msg;
9996
std::map<std::string, int> m;
10097
m["sequence"] = committed + current_batch;
10198

102-
msg.id(committed + current_batch + 1);
99+
msg.id(unique_id++);
103100
msg.body(m);
104-
std::cout << " [example] transaction send msg: " << msg
101+
std::cout << "##### [example] transaction send msg: " << msg
105102
<< std::endl;
106103
transaction.send(sender, msg);
107104
current_batch += 1;

cpp/include/proton/transaction.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class transaction_impl {
5555

5656
void discharge(bool failed);
5757
void release_pending();
58-
void accept(tracker &d);
58+
void accept(delivery &d);
5959
void update(tracker &d, uint64_t state);
6060
void set_id(binary _id);
6161

@@ -92,6 +92,7 @@ PN_CPP_CLASS_EXTERN transaction {
9292
PN_CPP_EXTERN void declare();
9393
PN_CPP_EXTERN void handle_outcome(proton::tracker);
9494
PN_CPP_EXTERN proton::tracker send(proton::sender s, proton::message msg);
95+
PN_CPP_EXTERN void accept(delivery &t);
9596

9697
friend class transaction_impl;
9798
friend class container::impl;

cpp/src/messaging_adapter.cpp

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -333,42 +333,8 @@ void on_link_remote_open(messaging_handler& handler, pn_event_t* event) {
333333
return;
334334
}
335335
std::cout<<" IN on_link_remote_open(.PN_COORDINATOR) success " << std::endl;
336+
std::cout<<" IN on_link_remote_open(.PN_COORDINATOR) have handler " << &handler << std::endl;
336337

337-
// WHY???
338-
// pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk));
339-
// pn_terminus_copy(pn_link_target(lnk), pn_link_remote_target(lnk));
340-
341-
// We need a new class?
342-
// auto coordinator = pn_link_remote_target(lnk);
343-
344-
345-
// proton::target_options to;
346-
// std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
347-
// to.capabilities(cap);
348-
// to.type(PN_COORDINATOR);
349-
350-
// proton::receiver_options ro;
351-
// ro.name("txn-ctrl");
352-
// ro.target(to);
353-
// ro.handler(handler);
354-
// receiver r(make_wrapper<receiver>(lnk));
355-
356-
// proton::receiver rcv = r.connection().open_receiver("does not matter", ro);
357-
std::cout<<" IN on_link_remote_open(.PN_COORDINATOR) have handler " << &handler << std::endl;
358-
359-
// handler.on_receiver_open(rcv);
360-
// credit_topup(lnk);
361-
362-
// pn_delivery_t *dlv = pn_event_delivery(event);
363-
// tracker t(make_wrapper<tracker>(dlv));
364-
365-
// // sender s(make_wrapper<sender>(lnk));
366-
// handler.on_tracker_settle(t);
367-
// TODO: find what to do...
368-
// HAHA.. treating coordinator like sender...
369-
// sender s(make_wrapper<sender>(lnk));
370-
// handler.on_sender_open(s);
371-
// pn_link_close(lnk);
372338
return;
373339
}
374340
if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link

cpp/src/transaction.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,19 @@ transaction::transaction(transaction_impl *impl)
5050
: _impl(impl) {}
5151
// transaction::transaction( transaction_impl* impl): _impl(impl){}
5252
transaction::~transaction() = default;
53-
void transaction::commit() { _impl->commit(); };
54-
void transaction::abort() { _impl->abort(); };
55-
void transaction::declare() { _impl->declare(); };
56-
bool transaction::is_empty() { return _impl == NULL; };
53+
void transaction::commit() { _impl->commit(); }
54+
void transaction::abort() { _impl->abort(); }
55+
void transaction::declare() { _impl->declare(); }
56+
bool transaction::is_empty() { return _impl == NULL; }
57+
void transaction::accept(delivery &t) { return _impl->accept(t); }
5758
proton::tracker transaction::send(proton::sender s, proton::message msg) {
5859
return _impl->send(s, msg);
59-
};
60+
}
6061
void transaction::handle_outcome(proton::tracker t) {
6162
std::cout << " transaction::handle_outcome = NO OP base class "
6263
<< std::endl;
6364
_impl->handle_outcome(t);
64-
};
65+
}
6566

6667
transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
6768
proton::transaction_handler &_handler,
@@ -142,7 +143,7 @@ proton::tracker transaction_impl::send(proton::sender s, proton::message msg) {
142143
return tracker;
143144
}
144145

145-
void transaction_impl::accept(tracker &t) {
146+
void transaction_impl::accept(delivery &t) {
146147
// TODO: settle-before-discharge
147148
t.settle();
148149
// pending.push_back(d);

0 commit comments

Comments
 (0)