-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PROTON-1442: [Cpp] Support for local transactions #437
base: main
Are you sure you want to change the base?
Conversation
74818fc
to
eb4514b
Compare
3e96834
to
fa9236d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken a fairly thorough look. If you have any questions about these review items we should discuss further.
PN_CPP_EXTERN target_options& type(const int); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be needed. Introduce a new coordinator class that is peer to sender and receiver
cpp/include/proton/transaction.hpp
Outdated
class | ||
PN_CPP_CLASS_EXTERN transaction_handler { | ||
public: | ||
PN_CPP_EXTERN virtual ~transaction_handler(); | ||
|
||
/// Called when a local transaction is declared. | ||
PN_CPP_EXTERN virtual void on_transaction_declared(transaction); | ||
|
||
/// Called when a local transaction is discharged successfully. | ||
PN_CPP_EXTERN virtual void on_transaction_committed(transaction); | ||
|
||
/// Called when a local transaction is discharged unsuccessfully (aborted). | ||
PN_CPP_EXTERN virtual void on_transaction_aborted(transaction); | ||
|
||
/// Called when a local transaction declare fails. | ||
PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction); | ||
|
||
/// Called when the commit of a local transaction fails. | ||
PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction); | ||
}; | ||
|
||
} // namespace proton | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in a different header file like message and messaging_handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the API makes transactions hidden in a session then these callbacks either go away or instead become session callbacks: on_session_transaction_committed(session&), etc. I think transaction_declared() goes away entirely and it's purpose is now another use for on_session_open(session&). on_.._declare_failed should be handled by the on_session_error(session&).
fa9236d
to
347cf6e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've not reviewed this comprehensively, but I think I'm now very much leaning towards not making the transaction class visible to the API user at all and having the transaction methods on the session. This is essentially the API in JMS and CMS.
cpp/include/proton/transaction.hpp
Outdated
class | ||
PN_CPP_CLASS_EXTERN transaction_handler { | ||
public: | ||
PN_CPP_EXTERN virtual ~transaction_handler(); | ||
|
||
/// Called when a local transaction is declared. | ||
PN_CPP_EXTERN virtual void on_transaction_declared(transaction); | ||
|
||
/// Called when a local transaction is discharged successfully. | ||
PN_CPP_EXTERN virtual void on_transaction_committed(transaction); | ||
|
||
/// Called when a local transaction is discharged unsuccessfully (aborted). | ||
PN_CPP_EXTERN virtual void on_transaction_aborted(transaction); | ||
|
||
/// Called when a local transaction declare fails. | ||
PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction); | ||
|
||
/// Called when the commit of a local transaction fails. | ||
PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction); | ||
}; | ||
|
||
} // namespace proton | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the API makes transactions hidden in a session then these callbacks either go away or instead become session callbacks: on_session_transaction_committed(session&), etc. I think transaction_declared() goes away entirely and it's purpose is now another use for on_session_open(session&). on_.._declare_failed should be handled by the on_session_error(session&).
cpp/include/proton/transfer.hpp
Outdated
PN_CPP_EXTERN void transaction(transaction t); | ||
|
||
PN_CPP_EXTERN class transaction transaction() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are not use visible. They are only used internally in the implementation - especially if the transaction is hidden inside the session.
send(s); | ||
} | ||
|
||
void send(proton::sender &s) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sender is defined as tx_recv class attribute, so I believe we don't need to pass the sender to send method.
cpp/examples/tx_recv.cpp
Outdated
transaction.accept(d); | ||
current_batch += 1; | ||
if(current_batch == batch_size) { | ||
transaction = proton::transaction(); // null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should do rather commit here, that way it works as expected and the receiver is closed after expected number of messages received (with the current implementation it's not).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should do rather commit here, that way it works as expected and the receiver is closed after expected number of messages received (with the current implementation it's not).
above mentioned change:
- transaction = proton::transaction(); // null
+ transaction.commit();
works as expected (against Artemis broker) when using multicast. However, when a pre-defined anycast queue is used, the commit() call ends with client segmentation fault.
|
||
int main(int argc, char **argv) { | ||
std::string address("127.0.0.1:5672/examples"); | ||
int message_count = 9; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect the defaults to be the same for tx_send and tx_recv (currently tx_send defaults to 6 messages while tx_recv to 9)
if(current_batch == batch_size) | ||
{ | ||
std::cout << " >> Txn attempt commit" << std::endl; | ||
if (batch_index % 2 == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that every second batch is aborted. I'm not sure if that's intended, though it works as expected (python example is not doing any aborts).
} | ||
} | ||
|
||
void on_tracker_accept(proton::tracker &t) override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe on_tracker_accept method is extra and is never fired. If so, counting confirmed doesn't make much sense (confirmed variable is not used to anything btw).
cpp/examples/tx_recv.cpp
Outdated
|
||
void on_message(proton::delivery &d, proton::message &msg) override { | ||
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; | ||
transaction.accept(d); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw. I'm thinking if transaction.accept() does actually have any effect? I mean, when I comment it out, the program behaves the same. btw. is this transaction accept any different from delivery accept? looks both settles the messages related to the delivery.
Does explicit delivery settling play role in transaction mode? or are they mutually exclusive and only commits and aborts applies? Can ie. single message in transaction be rejected while other messages accepted?
I tried to accept / reject the delivery and it seems to have no effect in transaction mode (while in python, doing so makes the messages to be threat outside of the transaction, just like a normal messages, https://issues.redhat.com/browse/ENTMQCL-513).
* Added an extra handler to the python binding so that we can handle transactioned dispositions
* Modified the Python example broker so that it understands transaction requests, prints some useful output about what is happening, but doesn't honor the transaction semantics. It will queue up transactioned messages immediately and also doesn't correctly handle outgoing message releases (but it doesn't for non-transactioned messages either
* Make it compile * Make it fit the existing software structure better
4c795ef
to
eb8afbd
Compare
@@ -274,11 +301,14 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) { | |||
|
|||
void on_link_remote_open(messaging_handler& handler, pn_event_t* event) { | |||
auto lnk = pn_event_link(event); | |||
// Currently don't implement (transaction) coordinator | |||
int type = pn_terminus_get_type(pn_link_remote_target(lnk)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused variable ‘type’
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lopts of detailed comments here, but the main change I'd like to see is either make coordinator a subclass of target or fold it into a simple boolean option of target.
int current_batch = 0; | ||
int committed = 0; | ||
|
||
proton::session session; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we need to store the session anywhere - I think it can always be derived inside any callback
} | ||
|
||
void on_session_open(proton::session &s) override { | ||
session = s; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not necessary
|
||
void on_message(proton::delivery &d, proton::message &msg) override { | ||
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; | ||
session.txn_accept(d); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
session == d.session()
s.connection().close(); | ||
} | ||
else { | ||
session.declare_transaction(*this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
session == s
int committed = 0; | ||
int confirmed = 0; | ||
|
||
proton::session session; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As tx_recv: I don't think you need to store away the session.
template <> struct wrapper<pn_disposition_t> { | ||
typedef disposition type; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent formatting
@@ -66,6 +66,7 @@ class sender_options::impl { | |||
option<bool> auto_settle; | |||
option<source_options> source; | |||
option<target_options> target; | |||
option<coordinator_options> coordinator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed if coordinator is a kind of target
if (coordinator.set) { | ||
proton::coordinator local_t(make_wrapper<proton::coordinator>(pn_link_target(unwrap(s)))); | ||
coordinator.value.apply(local_t); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
@@ -118,6 +123,7 @@ sender_options& sender_options::delivery_mode(proton::delivery_mode m) {impl_->d | |||
sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; } | |||
sender_options& sender_options::source(const source_options &s) {impl_->source = s; return *this; } | |||
sender_options& sender_options::target(const target_options &s) {impl_->target = s; return *this; } | |||
sender_options& sender_options::coordinator(const coordinator_options &s) {impl_->coordinator = s; return *this; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
@@ -0,0 +1,44 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call this file transaction_handler.cpp
PROTON-1442
AMQP Transaction Sequence:
Client establishes link to Broker (Transaction resource) to target with transaction coordinator type (usual types are sender/receiver) (ATTACH frame)
Client (Transaction Controller) sends a special message to that link to create transaction (TRANSFER frame)
Broker returns a disposition with the transaction id (DISPOSITION frame)