Skip to content

Commit

Permalink
WIP: Fix up to transactions work:
Browse files Browse the repository at this point in the history
* Make it compile
* Make it fit the existing software structure better
  • Loading branch information
astitcher committed Oct 15, 2024
1 parent 197572f commit e18ac3d
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 94 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ set(qpid-proton-cpp-source
src/terminus.cpp
src/timestamp.cpp
src/tracker.cpp
src/transaction.cpp
src/transfer.cpp
src/transport.cpp
src/type_id.cpp
Expand Down
6 changes: 3 additions & 3 deletions cpp/examples/tx_send.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
int confirmed = 0;
proton::container *container;
proton::transaction_handler transaction_handler;
proton::Transaction *transaction;
proton::transaction *transaction;
proton::connection connection;
public:
tx_send(const std::string &s, int c, int b):
Expand All @@ -59,7 +59,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
c.declare_transaction(connection, transaction_handler);
}

void on_transaction_declared(proton::Transaction &t) override {
void on_transaction_declared(proton::transaction &t) override {
transaction = &t;
send();
}
Expand Down Expand Up @@ -92,7 +92,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
confirmed += 1;
}

void on_transaction_committed(proton::Transaction &t) override {
void on_transaction_committed(proton::transaction &t) override {
committed += current_batch;
if(committed == total) {
std::cout << "All messages committed";
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/proton/container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class PN_CPP_CLASS_EXTERN container {
/// Cancel task for the given work_handle.
PN_CPP_EXTERN void cancel(work_handle);

PN_CPP_EXTERN Transaction declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge = false);
PN_CPP_EXTERN transaction declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge = false);
private:
/// Declare both v03 and v11 if compiling with c++11 as the library contains both.
/// A C++11 user should never call the v03 overload so it is private in this case
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/proton/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class source_options;
class ssl;
class target_options;
class tracker;
class transaction;
class transaction_handler;
class transport;
class url;
class void_function0;
Expand Down
7 changes: 0 additions & 7 deletions cpp/include/proton/tracker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class tracker : public transfer {
tracker(pn_delivery_t* d);
/// @endcond

Transaction *transaction;
public:
/// Create an empty tracker.
tracker() = default;
Expand All @@ -56,12 +55,6 @@ class tracker : public transfer {
/// Get the tag for this tracker.
PN_CPP_EXTERN binary tag() const;

// set_transaction here is a problem. As evry time we call it will change
// the pointer in current object and update won' be reflected in any copies of this tracker.
PN_CPP_EXTERN void set_transaction(Transaction *t);

PN_CPP_EXTERN Transaction* get_transaction() const;

/// @cond INTERNAL
friend class internal::factory<tracker>;
/// @endcond
Expand Down
30 changes: 9 additions & 21 deletions cpp/include/proton/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,50 +34,38 @@

namespace proton {

class transaction_handler;

class
PN_CPP_CLASS_EXTERN Transaction {
proton::sender txn_ctrl;
proton::transaction_handler handler;
// TODO int
int id = 0;
proton::tracker _declare;
proton::tracker _discharge;
bool failed = false;
std::vector<proton::tracker> pending;
PN_CPP_CLASS_EXTERN transaction {
public:
// TODO:
PN_CPP_EXTERN Transaction(proton::sender _txn_ctrl, proton::transaction_handler _handler, bool _settle_before_discharge = false);
PN_CPP_EXTERN virtual ~Transaction();
PN_CPP_EXTERN virtual ~transaction();
PN_CPP_EXTERN virtual void commit();
PN_CPP_EXTERN virtual void abort();
PN_CPP_EXTERN virtual void declare();
PN_CPP_EXTERN virtual void discharge(bool failed);
PN_CPP_EXTERN virtual proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
PN_CPP_EXTERN virtual proton::tracker send(proton::sender s, proton::message msg);
PN_CPP_EXTERN virtual void handle_outcome(proton::tracker t);
};

class
PN_CPP_CLASS_EXTERN transaction_handler {

public:
PN_CPP_EXTERN transaction_handler();
PN_CPP_EXTERN virtual ~transaction_handler();

/// Called when a local transaction is declared.
PN_CPP_EXTERN virtual void on_transaction_declared(Transaction&);
PN_CPP_EXTERN virtual void on_transaction_declared(transaction&);

/// Called when a local transaction is discharged successfully.
PN_CPP_EXTERN virtual void on_transaction_committed(Transaction&);
PN_CPP_EXTERN virtual void on_transaction_committed(transaction&);

/// Called when a local transaction is discharged unsuccessfully (aborted).
PN_CPP_EXTERN virtual void on_transaction_aborted(Transaction&);
PN_CPP_EXTERN virtual void on_transaction_aborted(transaction&);

/// Called when a local transaction declare fails.
PN_CPP_EXTERN virtual void on_transaction_declare_failed(Transaction&);
PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction&);

/// Called when the commit of a local transaction fails.
PN_CPP_EXTERN virtual void on_transaction_commit_failed(Transaction&);
PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction&);
};

} // namespace proton
Expand Down
5 changes: 5 additions & 0 deletions cpp/include/proton/transfer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class transfer : public internal::object<pn_delivery_t> {
/// Return true if the transfer has been settled.
PN_CPP_EXTERN bool settled() const;

// Set transaction
PN_CPP_EXTERN void transaction(transaction& t);

PN_CPP_EXTERN class transaction* transaction() const;

/// Set user data on this transfer.
PN_CPP_EXTERN void user_data(void* user_data) const;

Expand Down
41 changes: 2 additions & 39 deletions cpp/src/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
#include "proton/listen_handler.hpp"
#include "proton/listener.hpp"
#include "proton/uuid.hpp"
#include "proton/target_options.hpp"
#include "proton/sender_options.hpp"
#include "proton/transaction.hpp"

#include "proactor_container_impl.hpp"
#include <vector>
Expand All @@ -49,42 +46,8 @@ returned<connection> container::connect(const std::string &url) {
return connect(url, connection_options());
}

Transaction container::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) {

proton::target_options t;
class InternalTransactionHandler : public proton::messaging_handler {
// TODO: auto_settle
void on_tracker_settle(proton::tracker &t) override {
if(t.get_transaction()) {
t.get_transaction()->handle_outcome(t);
}
}

// TODO: Add on_unhandled function
};

// TODO: Sender should be created only once. (May be use Singleton Class)
// proton::target_options t;

std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
t.capabilities(cap);
// Type PN_COORDINATOR value is 3. It is a special target identifying a transaction coordinator.
// TODO: Change the type from int to enum.
t.type(3);

proton::sender_options so;
so.name("txn-ctrl");
// Todo: Check the value, Or by deafult null?
//so.source() ?
so.target(t);
InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it
so.handler(internal_handler);
proton::sender s = conn.open_sender("does not matter", so);

settle_before_discharge = false;

return Transaction(s, handler, settle_before_discharge);

transaction container::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) {
return impl_->declare_transaction(conn, handler, settle_before_discharge);
}

returned<sender> container::open_sender(const std::string &url) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/contexts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace proton {

class proton_handler;
class connector;
class transaction;

namespace io {class link_namer;}

Expand Down Expand Up @@ -161,6 +162,7 @@ class transfer_context : public context {
transfer_context() : user_data_(nullptr) {}
static transfer_context& get(pn_delivery_t* s);

transaction* transaction_;
void* user_data_;
};

Expand Down
41 changes: 41 additions & 0 deletions cpp/src/proactor_container_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "proton/listener.hpp"
#include "proton/reconnect_options.hpp"
#include "proton/ssl.hpp"
#include "proton/target_options.hpp"
#include "proton/transport.hpp"
#include "proton/url.hpp"

Expand Down Expand Up @@ -860,4 +861,44 @@ void container::impl::stop(const proton::error_condition& err) {
pn_condition_free(error_condition);
}

// TODO: declare this in separate internal header file
extern transaction mk_transaction_impl(sender&, transaction_handler&, bool);

transaction container::impl::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) {
class InternalTransactionHandler : public proton::messaging_handler {
// TODO: auto_settle
void on_tracker_settle(proton::tracker &t) override {
if(t.transaction()) {
//t.transaction()->handle_outcome(t);
}
}

// TODO: Add on_unhandled function
};

// TODO: Sender should be created only once. (May be use Singleton Class)
// proton::target_options t;

proton::target_options t;
std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
t.capabilities(cap);
// Type PN_COORDINATOR value is 3. It is a special target identifying a transaction coordinator.
// TODO: Change the type from int to enum.
t.type(3);

proton::sender_options so;
so.name("txn-ctrl");
// Todo: Check the value, Or by deafult null?
//so.source() ?
so.target(t);
InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it
so.handler(internal_handler);
proton::sender s = conn.open_sender("does not matter", so);

settle_before_discharge = false;

return mk_transaction_impl(s, handler, settle_before_discharge);

}

}
1 change: 1 addition & 0 deletions cpp/src/proactor_container_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class container::impl {
template <class T> static messaging_handler* get_handler(T s);
messaging_handler* get_handler(pn_event_t *event);
static work_queue::impl* make_work_queue(container&);
transaction declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge = false);

private:
class common_work_queue;
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,4 @@ namespace proton {
tracker::tracker(pn_delivery_t *d): transfer(make_wrapper(d)) {}
sender tracker::sender() const { return make_wrapper<class sender>(pn_delivery_link(pn_object())); }
binary tracker::tag() const { return bin(pn_delivery_tag(pn_object())); }
void tracker::set_transaction(Transaction *t) { this->transaction=t; }
Transaction* tracker::get_transaction() const { return this->transaction; }
}
75 changes: 54 additions & 21 deletions cpp/src/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,59 +25,92 @@

namespace proton {

void transaction_handler::on_transaction_declared(Transaction &) {}
void transaction_handler::on_transaction_committed(Transaction &) {}
void transaction_handler::on_transaction_aborted(Transaction &) {}
void transaction_handler::on_transaction_declare_failed(Transaction &) {}
void transaction_handler::on_transaction_commit_failed(Transaction &) {}

Transaction::Transaction(proton::sender _txn_ctrl, proton::transaction_handler _handler, bool _settle_before_discharge) {
txn_ctrl = _txn_ctrl;
handler = _handler;
bool settle_before_discharge = _settle_before_discharge;
void transaction_handler::on_transaction_declared(transaction &) {}
void transaction_handler::on_transaction_committed(transaction &) {}
void transaction_handler::on_transaction_aborted(transaction &) {}
void transaction_handler::on_transaction_declare_failed(transaction &) {}
void transaction_handler::on_transaction_commit_failed(transaction &) {}

transaction::~transaction() = default;
void transaction::commit() {};
void transaction::abort() {};
void transaction::declare() {};
proton::tracker transaction::send(proton::sender s, proton::message msg) { return {}; };

class transaction_impl : public transaction {
public:
proton::sender* txn_ctrl = nullptr;
proton::transaction_handler* handler = nullptr;
// TODO int
int id = 0;
proton::tracker _declare;
proton::tracker _discharge;
bool failed = false;
std::vector<proton::tracker> pending;

transaction_impl(proton::sender& _txn_ctrl, proton::transaction_handler& _handler, bool _settle_before_discharge);
void commit() override;
void abort() override;
void declare() override;
proton::tracker send(proton::sender s, proton::message msg) override;

void discharge(bool failed);
proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
void handle_outcome(proton::tracker t);
};

transaction_impl::transaction_impl(proton::sender& _txn_ctrl, proton::transaction_handler& _handler, bool _settle_before_discharge):
txn_ctrl(&_txn_ctrl),
handler(&_handler)
{
// bool settle_before_discharge = _settle_before_discharge;
declare();
}

void Transaction::commit() {
void transaction_impl::commit() {
discharge(false);
}

void Transaction::abort() {
void transaction_impl::abort() {
discharge(true);
}

void Transaction::declare() {
void transaction_impl::declare() {
proton::symbol descriptor("amqp:declare:list");
// proton::value _value = vd;
// TODO: How to make list;
std::vector<uint16_t> vd({NULL});
std::vector<uint16_t> vd;
proton::value _value;
// proton::get()
_declare = send_ctrl(descriptor, _value );
}

void Transaction::discharge(bool failed) {
this->failed = failed;
void transaction_impl::discharge(bool failed) {
failed = failed;
proton::symbol descriptor("amqp:declare:list");;
proton::value _value;
proton::tracker discharge = send_ctrl(descriptor, _value);
}

proton::tracker Transaction::send_ctrl(proton::symbol descriptor, proton::value _value) {
proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::value _value) {
proton::message msg = _value; // TODO
proton::tracker delivery = txn_ctrl.send(msg);
delivery.set_transaction(this);
proton::tracker delivery = txn_ctrl->send(msg);
delivery.transaction(*this);
return delivery;
}

proton::tracker Transaction::send(proton::sender s, proton::message msg) {
proton::tracker transaction_impl::send(proton::sender s, proton::message msg) {
proton::tracker tracker = s.send(msg);
return tracker;
}

void Transaction::handle_outcome(proton::tracker t) {
void transaction_impl::handle_outcome(proton::tracker t) {
// this->handler.on_transaction_declared();

}

transaction mk_transaction_impl(sender& s, transaction_handler& h, bool f) {
return transaction_impl{s, h, f};
}

}
Loading

0 comments on commit e18ac3d

Please sign in to comment.