Skip to content

Commit

Permalink
PROTON-28xx: Reorganize dispositions representation
Browse files Browse the repository at this point in the history
The natural representation for dispositions is a union because different
types of disposition can only have quite different pieces of data.
  • Loading branch information
astitcher committed Aug 8, 2024
1 parent 38213cb commit cc084a1
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 156 deletions.
39 changes: 23 additions & 16 deletions c/src/core/emitters.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,50 +621,57 @@ static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context
return;
}

emit_descriptor(emitter, compound0, disposition->type);
switch (disposition->type) {
case PN_RECEIVED:
case PN_DISP_RECEIVED:
emit_descriptor(emitter, compound0, AMQP_DESC_RECEIVED);
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_uint(emitter, &compound, disposition->section_number);
emit_ulong(emitter, &compound, disposition->section_offset);
emit_uint(emitter, &compound, disposition->u.s_received.section_number);
emit_ulong(emitter, &compound, disposition->u.s_received.section_offset);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
return;
case PN_ACCEPTED:
case PN_RELEASED:
case PN_DISP_ACCEPTED:
emit_descriptor(emitter, compound0, AMQP_DESC_ACCEPTED);
emit_list0(emitter, compound0);
return;
case PN_REJECTED:
case PN_DISP_RELEASED:
emit_descriptor(emitter, compound0, AMQP_DESC_RELEASED);
emit_list0(emitter, compound0);
return;
case PN_DISP_REJECTED:
emit_descriptor(emitter, compound0, AMQP_DESC_REJECTED);
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_condition(emitter, &compound, &disposition->condition);
emit_condition(emitter, &compound, &disposition->u.s_rejected.condition);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
return;
case PN_MODIFIED:
case PN_DISP_MODIFIED:
emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED);
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_bool(emitter, &compound, disposition->failed);
emit_bool(emitter, &compound, disposition->undeliverable);
emit_copy_or_raw(emitter, &compound, disposition->annotations, disposition->annotations_raw);
emit_bool(emitter, &compound, disposition->u.s_modified.failed);
emit_bool(emitter, &compound, disposition->u.s_modified.undeliverable);
emit_copy_or_raw(emitter, &compound, disposition->u.s_modified.annotations, disposition->u.s_modified.annotations_raw);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
return;
default:
if ((disposition->data && pn_data_size(disposition->data) == 0) ||
(!disposition->data && disposition->data_raw.size == 0)) {
case PN_DISP_CUSTOM:
emit_descriptor(emitter, compound0, disposition->u.s_custom.type);
if ((disposition->u.s_custom.data && pn_data_size(disposition->u.s_custom.data) == 0) ||
(!disposition->u.s_custom.data && disposition->u.s_custom.data_raw.size == 0)) {
emit_list0(emitter, compound0);
return;
}
pni_compound_context c = make_compound();
emit_copy_or_raw(emitter, &c, disposition->data, disposition->data_raw);
emit_copy_or_raw(emitter, &c, disposition->u.s_custom.data, disposition->u.s_custom.data_raw);
compound0->count++;
return;
}
Expand Down
41 changes: 31 additions & 10 deletions c/src/core/engine-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,38 @@ struct pn_link_t {
bool more_pending;
};

typedef enum pn_disposition_type_t {
PN_DISP_EMPTY = 0,
PN_DISP_CUSTOM = 1,
PN_DISP_RECEIVED = PN_RECEIVED,
PN_DISP_ACCEPTED = PN_ACCEPTED,
PN_DISP_REJECTED = PN_REJECTED,
PN_DISP_RELEASED = PN_RELEASED,
PN_DISP_MODIFIED = PN_MODIFIED,
} pn_disposition_type_t;

struct pn_disposition_t {
pn_condition_t condition;
uint64_t type;
pn_data_t *data;
pn_bytes_t data_raw;
pn_data_t *annotations;
pn_bytes_t annotations_raw;
uint64_t section_offset;
uint32_t section_number;
bool failed;
bool undeliverable;
union {
struct {
uint64_t section_offset;
uint32_t section_number;
} s_received;
struct {
pn_condition_t condition;
} s_rejected;
struct {
pn_data_t *annotations;
pn_bytes_t annotations_raw;
bool failed;
bool undeliverable;
} s_modified;
struct {
pn_data_t *data;
uint64_t type;
pn_bytes_t data_raw;
} s_custom;
} u;
uint16_t type;
bool settled;
};

Expand Down
160 changes: 119 additions & 41 deletions c/src/core/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -1510,11 +1510,22 @@ pn_session_t *pn_link_session(pn_link_t *link)

static void pn_disposition_finalize(pn_disposition_t *ds)
{
pn_free(ds->data);
pn_bytes_free(ds->data_raw);
pn_free(ds->annotations);
pn_bytes_free(ds->annotations_raw);
pn_condition_tini(&ds->condition);
switch (ds->type) {
case PN_MODIFIED:
pn_free(ds->u.s_modified.annotations);
pn_bytes_free(ds->u.s_modified.annotations_raw);
break;
case PN_REJECTED:
pn_condition_tini(&ds->u.s_rejected.condition);
case PN_ACCEPTED:
case PN_RECEIVED:
case PN_RELEASED:
break;
default:
pn_free(ds->u.s_custom.data);
pn_bytes_free(ds->u.s_custom.data_raw);
break;
}
}

static void pn_delivery_incref(void *object)
Expand Down Expand Up @@ -1589,28 +1600,30 @@ static void pn_delivery_finalize(void *object)

static void pn_disposition_init(pn_disposition_t *ds)
{
ds->data = NULL;
ds->data_raw = (pn_bytes_t){0, NULL};
ds->annotations = NULL;
ds->annotations_raw = (pn_bytes_t){0, NULL};
pn_condition_init(&ds->condition);
memset(ds, 0, sizeof(*ds));
}

static void pn_disposition_clear(pn_disposition_t *ds)
{
ds->type = 0;
ds->section_number = 0;
ds->section_offset = 0;
ds->failed = false;
ds->undeliverable = false;
ds->settled = false;
pn_data_clear(ds->data);
pn_bytes_free(ds->data_raw);
ds->data_raw = (pn_bytes_t){0, NULL};
pn_data_clear(ds->annotations);
pn_bytes_free(ds->annotations_raw);
ds->annotations_raw = (pn_bytes_t){0, NULL};
pn_condition_clear(&ds->condition);
switch (ds->type) {
case PN_RECEIVED:
break;
case PN_MODIFIED:
pn_data_free(ds->u.s_modified.annotations);
pn_bytes_free(ds->u.s_modified.annotations_raw);
break;
case PN_REJECTED:
pn_condition_tini(&ds->u.s_rejected.condition);
case PN_ACCEPTED:
case PN_RELEASED:
break;
break;
default:
pn_data_free(ds->u.s_custom.data);
pn_bytes_free(ds->u.s_custom.data_raw);
break;
}
memset(ds, 0, sizeof(*ds));
}

void pn_delivery_inspect(void *obj, pn_fixed_string_t *dst) {
Expand Down Expand Up @@ -1737,7 +1750,7 @@ void pn_delivery_dump(pn_delivery_t *d)
printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%d, "
"remote.settled=%d, updated=%d, current=%d, writable=%d, readable=%d, "
"work=%d}",
tag, d->local.type, d->remote.type, d->local.settled,
tag, pn_disposition_type(&d->local), pn_disposition_type(&d->remote), d->local.settled,
d->remote.settled, d->updated, pn_delivery_current(d),
pn_delivery_writable(d), pn_delivery_readable(d), d->work);
}
Expand All @@ -1763,75 +1776,113 @@ pn_record_t *pn_delivery_attachments(pn_delivery_t *delivery)
uint64_t pn_disposition_type(pn_disposition_t *disposition)
{
assert(disposition);
return disposition->type;
switch (disposition->type) {
case PN_DISP_CUSTOM:
return disposition->u.s_custom.type;
default:
// This relies on the disposition types having the protocol values
return (uint64_t)disposition->type;
}
}

pn_data_t *pn_disposition_data(pn_disposition_t *disposition)
{
assert(disposition);
pni_switch_to_data(&disposition->data_raw, &disposition->data);
return disposition->data;
if (disposition->type != PN_DISP_CUSTOM) {
pn_disposition_clear(disposition);
disposition->type = PN_DISP_CUSTOM;
}
pni_switch_to_data(&disposition->u.s_custom.data_raw, &disposition->u.s_custom.data);
return disposition->u.s_custom.data;
}

uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition)
{
assert(disposition);
return disposition->section_number;
if (disposition->type == PN_RECEIVED) return disposition->u.s_received.section_number;
else return 0;
}

void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number)
{
assert(disposition);
disposition->section_number = section_number;
if (disposition->type != PN_RECEIVED) {
pn_disposition_clear(disposition);
disposition->type = PN_RECEIVED;
}
disposition->u.s_received.section_number = section_number;
}

uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition)
{
assert(disposition);
return disposition->section_offset;
if (disposition->type == PN_RECEIVED) return disposition->u.s_received.section_offset;
else return 0;
}

void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset)
{
assert(disposition);
disposition->section_offset = section_offset;
if (disposition->type != PN_RECEIVED) {
pn_disposition_clear(disposition);
disposition->type = PN_RECEIVED;
}
disposition->u.s_received.section_offset = section_offset;
}

bool pn_disposition_is_failed(pn_disposition_t *disposition)
{
assert(disposition);
return disposition->failed;
if (disposition->type == PN_MODIFIED) return disposition->u.s_modified.failed;
else return false;
}

void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed)
{
assert(disposition);
disposition->failed = failed;
if (disposition->type != PN_MODIFIED) {
pn_disposition_clear(disposition);
disposition->type = PN_MODIFIED;
}
disposition->u.s_modified.failed = failed;
}

bool pn_disposition_is_undeliverable(pn_disposition_t *disposition)
{
assert(disposition);
return disposition->undeliverable;
if (disposition->type == PN_MODIFIED) return disposition->u.s_modified.undeliverable;
else return false;
}

void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable)
{
assert(disposition);
disposition->undeliverable = undeliverable;
if (disposition->type != PN_MODIFIED) {
pn_disposition_clear(disposition);
disposition->type = PN_MODIFIED;
}
disposition->u.s_modified.undeliverable = undeliverable;
}

pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition)
{
assert(disposition);
pni_switch_to_data(&disposition->annotations_raw, &disposition->annotations);
return disposition->annotations;
if (disposition->type != PN_MODIFIED) {
pn_disposition_clear(disposition);
disposition->type = PN_MODIFIED;
}
pni_switch_to_data(&disposition->u.s_modified.annotations_raw, &disposition->u.s_modified.annotations);
return disposition->u.s_modified.annotations;
}

pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition)
{
assert(disposition);
return &disposition->condition;
if (disposition->type != PN_REJECTED) {
pn_disposition_clear(disposition);
disposition->type = PN_REJECTED;
}
return &disposition->u.s_rejected.condition;
}

pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
Expand Down Expand Up @@ -2122,7 +2173,7 @@ pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery)
uint64_t pn_delivery_local_state(pn_delivery_t *delivery)
{
assert(delivery);
return delivery->local.type;
return pn_disposition_type(&delivery->local);
}

pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery)
Expand All @@ -2134,7 +2185,7 @@ pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery)
uint64_t pn_delivery_remote_state(pn_delivery_t *delivery)
{
assert(delivery);
return delivery->remote.type;
return pn_disposition_type(&delivery->remote);
}

bool pn_delivery_settled(pn_delivery_t *delivery)
Expand All @@ -2156,7 +2207,34 @@ void pn_delivery_clear(pn_delivery_t *delivery)
void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
{
if (!delivery) return;
delivery->local.type = state;
if (delivery->local.type == PN_DISP_CUSTOM) {
switch (state) {
case PN_ACCEPTED:
case PN_REJECTED:
case PN_RECEIVED:
case PN_MODIFIED:
case PN_RELEASED:
break;
default:
delivery->local.u.s_custom.type = state;
pni_add_tpwork(delivery);
return;
}
}
if (delivery->local.type != state) pn_disposition_clear(&delivery->local);
switch (state) {
case PN_ACCEPTED:
case PN_REJECTED:
case PN_RECEIVED:
case PN_MODIFIED:
case PN_RELEASED:
delivery->local.type = state;
break;
default:
delivery->local.type = PN_DISP_CUSTOM;
delivery->local.u.s_custom.type = state;
break;
}
pni_add_tpwork(delivery);
}

Expand Down
Loading

0 comments on commit cc084a1

Please sign in to comment.