diff --git a/c/include/proton/disposition.h b/c/include/proton/disposition.h index db0d1cea6..4965b2fd6 100644 --- a/c/include/proton/disposition.h +++ b/c/include/proton/disposition.h @@ -90,6 +90,12 @@ typedef struct pn_disposition_t pn_disposition_t; */ #define PN_MODIFIED (0x0000000000000027) +/** + * The PN_TRANSACTIONAL_STATE delivery state is a non terminal state + * indicating the transactional state of a delivery. + */ +#define PN_TRANSACTIONAL_STATE (0x0000000000000034) + /** * Get the type of a disposition. * @@ -234,6 +240,7 @@ PN_EXTERN pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition); typedef struct pn_received_disposition_t pn_received_disposition_t; typedef struct pn_rejected_disposition_t pn_rejected_disposition_t; typedef struct pn_modified_disposition_t pn_modified_disposition_t; +typedef struct pn_transactional_disposition_t pn_transactional_disposition_t; typedef struct pn_custom_disposition_t pn_custom_disposition_t; /** @@ -254,6 +261,7 @@ PN_EXTERN pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *dispo PN_EXTERN pn_received_disposition_t *pn_received_disposition(pn_disposition_t *disposition); PN_EXTERN pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *disposition); PN_EXTERN pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition); +PN_EXTERN pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition); /** * Access the disposition as a raw pn_data_t. @@ -383,6 +391,17 @@ PN_EXTERN void pn_modified_disposition_set_undeliverable(pn_modified_disposition */ PN_EXTERN pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition); + +PN_EXTERN pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition); +PN_EXTERN void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id); +PN_EXTERN uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition); + +/** + * Set the provisional outcome of the message if the transaction is committed successfully. + * Only terminal disposition states are allowed (PN_ACCEPTED, PN_REJECTED, PN_RELEASED, PN_MODIFIED) + */ +PN_EXTERN void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t outcome); + /** * @} */ diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h index 2cd59a64c..f20bdff10 100644 --- a/c/src/core/emitters.h +++ b/c/src/core/emitters.h @@ -647,6 +647,19 @@ static inline void emit_modified_disposition(pni_emitter_t* emitter, pni_compoun } } +static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition); + +static inline void emit_transactional_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_transactional_disposition_t *disposition){ + for (bool small_encoding = true; ; small_encoding = false) { + pni_compound_context c = emit_list(emitter, compound0, small_encoding, true); + pni_compound_context compound = c; + emit_binary_bytes(emitter, &compound, disposition->id); + emit_raw(emitter, &compound, disposition->outcome_raw); + emit_end_list(emitter, &compound, small_encoding); + if (encode_succeeded(emitter, &compound)) break; + } +} + static inline void emit_custom_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_custom_disposition_t *disposition){ emit_descriptor(emitter, compound0, disposition->type); if ((disposition->data && pn_data_size(disposition->data) == 0) || @@ -687,6 +700,10 @@ static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED); emit_modified_disposition(emitter, compound0, &disposition->u.s_modified); return; + case PN_DISP_TRANSACTIONAL: + emit_descriptor(emitter, compound0, AMQP_DESC_TRANSACTIONAL_STATE); + emit_transactional_disposition(emitter, compound0, &disposition->u.s_transactional); + return; case PN_DISP_CUSTOM: emit_custom_disposition(emitter, compound0, &disposition->u.s_custom); return; diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index da563813a..f10e6a8f0 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -341,6 +341,7 @@ typedef enum pn_disposition_type_t { PN_DISP_REJECTED = PN_REJECTED, PN_DISP_RELEASED = PN_RELEASED, PN_DISP_MODIFIED = PN_MODIFIED, + PN_DISP_TRANSACTIONAL = PN_TRANSACTIONAL_STATE, } pn_disposition_type_t; struct pn_received_disposition_t { @@ -359,6 +360,11 @@ struct pn_modified_disposition_t { bool undeliverable; }; +struct pn_transactional_disposition_t { + pn_bytes_t id; + pn_bytes_t outcome_raw; +}; + struct pn_custom_disposition_t { pn_data_t *data; pn_bytes_t data_raw; @@ -370,6 +376,7 @@ struct pn_disposition_t { struct pn_received_disposition_t s_received; struct pn_rejected_disposition_t s_rejected; struct pn_modified_disposition_t s_modified; + struct pn_transactional_disposition_t s_transactional; struct pn_custom_disposition_t s_custom; } u; uint16_t type; diff --git a/c/src/core/engine.c b/c/src/core/engine.c index 7ccb308b9..597151642 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -27,6 +27,7 @@ #include "consumers.h" #include "core/frame_consumers.h" #include "emitters.h" +#include "core/frame_generators.h" #include "fixed_string.h" #include "framing.h" #include "memory.h" @@ -1597,6 +1598,10 @@ static void pn_disposition_finalize(pn_disposition_t *ds) pn_data_free(ds->u.s_custom.data); pn_bytes_free(ds->u.s_custom.data_raw); break; + case PN_DISP_TRANSACTIONAL: + pn_bytes_free(ds->u.s_transactional.id); + pn_bytes_free(ds->u.s_transactional.outcome_raw); + break; } } @@ -1868,6 +1873,9 @@ void pni_disposition_to_raw(pn_disposition_t *disposition) { case PN_DISP_MODIFIED: emit_modified_disposition(&emitter, &compound, &disposition->u.s_modified); break; + case PN_DISP_TRANSACTIONAL: + emit_transactional_disposition(&emitter, &compound, &disposition->u.s_transactional); + break; } if (type != PN_DISP_EMPTY) { @@ -2005,6 +2013,13 @@ pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition return &disposition->u.s_modified; } +pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition) +{ + if (disposition->type==PN_DISP_EMPTY) disposition->type = PN_DISP_TRANSACTIONAL; + else if (disposition->type!=PN_DISP_TRANSACTIONAL) return NULL; + return &disposition->u.s_transactional; +} + pn_data_t *pn_custom_disposition_data(pn_custom_disposition_t *disposition) { assert(disposition); @@ -2084,6 +2099,44 @@ pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *dispos return disposition->annotations; } +pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition) +{ + assert(disposition); + return disposition->id; +} + +void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id) +{ + assert(disposition); + pn_bytes_free(disposition->id); + disposition->id = pn_bytes_dup(id); +} + +uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition) +{ + assert(disposition); + if (disposition->outcome_raw.size) { + bool qtype = false; + uint64_t type; + pn_amqp_decode_DQLq(disposition->outcome_raw, &qtype, &type); + if (qtype) { + return type; + } + } + return PN_DISP_EMPTY; +} + +void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t type) +{ + assert(disposition); + // Generate a described LiST0 directly - this needs a max of 11 bytes + char outcome_scratch[11]; + pn_rwbytes_t scratch = {.size=sizeof(outcome_scratch), .start=outcome_scratch}; + pn_bytes_t outcome_raw = pn_amqp_encode_DLEe(&scratch, type); + pn_bytes_free(disposition->outcome_raw); + disposition->outcome_raw = pn_bytes_dup(outcome_raw); +} + pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery) { if (delivery) { @@ -2804,6 +2857,7 @@ const char *pn_disposition_type_name(uint64_t d) { case PN_REJECTED: return "rejected"; case PN_RELEASED: return "released"; case PN_MODIFIED: return "modified"; + case PN_TRANSACTIONAL_STATE: return "transactional_state"; default: return "unknown"; } } diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 892e430c1..98d015e63 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1602,6 +1602,20 @@ static void pni_amqp_decode_disposition (uint64_t type, pn_bytes_t disp_data, pn } break; } + case AMQP_DESC_TRANSACTIONAL_STATE: { + pn_bytes_t id; + bool qoutcome; + pn_bytes_t outcome_raw; + pn_amqp_decode_DqEzQRe(disp_data, &id, &qoutcome, &outcome_raw); + disp->type = PN_DISP_TRANSACTIONAL; + pn_bytes_free(disp->u.s_transactional.id); + disp->u.s_transactional.id = pn_bytes_dup(id); + disp->u.s_transactional.outcome_raw = (pn_bytes_t){0, NULL}; + if (qoutcome) { + disp->u.s_transactional.outcome_raw = pn_bytes_dup(outcome_raw); + } + break; + } default: { pn_bytes_t data_raw = (pn_bytes_t){0, NULL}; pn_amqp_decode_DqR(disp_data, &data_raw); diff --git a/c/tools/codec-generator/specs.json b/c/tools/codec-generator/specs.json index dddf6d6f8..82b4a013f 100644 --- a/c/tools/codec-generator/specs.json +++ b/c/tools/codec-generator/specs.json @@ -2,6 +2,7 @@ "fill_specs": [ "R", "DLR", + "DL[]", "DL[c]", "DL[?HIIII]", "DL[?IIII?I?I?In?o]", @@ -44,6 +45,7 @@ "D.[s]", "D.[z]", "D.[Bz]", + "D.[z?R]", "D.[R]", "D?L.", "D?L?." diff --git a/sasl_conf/proton-server.conf b/sasl_conf/proton-server.conf new file mode 100644 index 000000000..15f683a44 --- /dev/null +++ b/sasl_conf/proton-server.conf @@ -0,0 +1,4 @@ + +sasldb_path: /home/andrew/Work/proton/src-jj/sasl_conf/proton.sasldb +mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS + \ No newline at end of file diff --git a/sasl_conf/proton.sasldb b/sasl_conf/proton.sasldb new file mode 100644 index 000000000..d6a270e35 Binary files /dev/null and b/sasl_conf/proton.sasldb differ