Skip to content

Commit

Permalink
ISSUE-1619: Implement session flow control using new Proton window API
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Nov 1, 2024
1 parent b769cd3 commit 5907d68
Show file tree
Hide file tree
Showing 17 changed files with 339 additions and 125 deletions.
8 changes: 8 additions & 0 deletions include/qpid/dispatch/amqp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ typedef struct pn_link_t pn_link_t;
// These types are private to the AMQP adaptor:
typedef struct qd_connection_t qd_connection_t;
typedef struct qd_link_t qd_link_t;
typedef struct qd_session_t qd_session_t;

// Session windowing limits
extern const size_t qd_session_max_in_win_user; // incoming window byte limit for user connections
extern const size_t qd_session_max_in_win_trunk; // incoming window byte limit for inter-router connections
extern const size_t qd_session_max_outgoing_bytes; // limit to outgoing buffered data
extern const size_t qd_session_low_outgoing_bytes; // low water mark to resume buffering outgoing data

// For use by message.c

Expand All @@ -46,6 +52,8 @@ pn_link_t *qd_link_pn(const qd_link_t *link);
bool qd_connection_strip_annotations_in(const qd_connection_t *c);
uint64_t qd_connection_max_message_size(const qd_connection_t *c);
void qd_connection_log_policy_denial(const qd_link_t *link, const char *text);
qd_session_t *qd_link_get_session(const qd_link_t *link);
size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn);

// Used by the log module
void qd_amqp_connection_set_tracing(bool enabled);
Expand Down
9 changes: 0 additions & 9 deletions include/qpid/dispatch/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ typedef struct qd_link_t qd_link_t;
#define QD_QLIMIT_Q2_LOWER 32 // Re-enable link receive
#define QD_QLIMIT_Q2_UPPER (QD_QLIMIT_Q2_LOWER * 2) // Disable link receive

// Q3 limits the number of bytes allowed to be buffered in an AMQP session's outgoing buffer. Once the Q3 upper limit
// is hit (read via pn_session_outgoing_bytes), pn_link_send will no longer be called for ALL outgoing links sharing the
// session. When enough outgoing bytes have been drained below the lower limit pn_link_sends will resume. Note that Q3
// only applies to AMQP links. Non-AMQP (adaptor) link output is limited by the capacity of the raw connection buffer
// pool.

#define QD_QLIMIT_Q3_LOWER (QD_QLIMIT_Q2_UPPER * 2) // in qd_buffer_t's
#define QD_QLIMIT_Q3_UPPER (QD_QLIMIT_Q3_LOWER * 2)

// Callback for status change (confirmed persistent, loaded-in-memory, etc.)

typedef struct qd_message_t qd_message_t;
Expand Down
4 changes: 2 additions & 2 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@
},
"maxSessionFrames": {
"type": "integer",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 0 (unlimited window).",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This value sets a limit to the number of incoming frames the router will buffer before flow-control is enforced. Thus the maximum amount of memory required for holding incoming data is limited to (maxFrameSize * maxSessionFrames) bytes per session. If not explicitly set a default session window size that is optimized for the connection role is selected. Policy settings will not overwrite this value. maxSessionFrames has a minimum value of 2.",
"required": false,
"create": true
},
Expand Down Expand Up @@ -983,7 +983,7 @@
},
"maxSessionFrames": {
"type": "integer",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings will not overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 0 (unlimited window).",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This value sets a limit to the number of incoming frames the router will buffer before flow-control is enforced. Thus the maximum amount of memory required for holding incoming data is limited to (maxFrameSize * maxSessionFrames) bytes per session. If not explicitly set a default session window size that is optimized for the connection role is selected. Policy settings will not overwrite this value. maxSessionFrames has a minimum value of 2.",
"required": false,
"create": true
},
Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
qd_session_t *qd_ssn = qd_link_get_session(link);
if (qd_session_is_q3_blocked(qd_ssn)) {
// Q3 blocked - have we drained enough outgoing bytes?
if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_get_outgoing_threshold(qd_ssn)) {
if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_low_outgoing_bytes) {
// yes. We must now unblock all links that have been blocked by Q3

qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
Expand Down
82 changes: 60 additions & 22 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,34 @@ struct qd_session_t {
sys_atomic_t ref_count;
pn_session_t *pn_session;
qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty
uint32_t remote_max_frame;
};

// For outgoing session flow control. Never buffer more than out_window_limit bytes of data on the session before
// returning control to the proactor. This prevents memory bloat and allows proactor to send buffered data in a
// timely manner. The low watermark is used to unblock the session - do not resume writing to the session until the
// amount of available capacity has grown to at least the low watermark.
// Session window limits
//
// A session incoming window determines how many incoming frames the session will accept across all incoming links. This
// places a limit on the number of incoming data bytes that have to be buffered on the session (connection max-frame *
// max incoming window frames). The local session incoming window configuration is determined by the maxFrameSize and
// maxSessionFrames configuration attributes of an AMQP listener/connector.
//
// The remote peers session window must be honored when writing output to a sending link. In addition we limit the
// amount of outgoing data that can be buffered on a session before control is returned to Proton. This is necessary to
// improve latency and allow capacity sharing among all links on the session.
//
const size_t qd_session_max_outgoing_bytes = 1048576; // max buffered bytes on a session
const size_t qd_session_low_outgoing_bytes = 524288; // low watermark for max buffered bytes

size_t out_window_limit;
size_t out_window_low_watermark;
};
const size_t qd_session_max_in_win_user = (size_t) 8388608; // AMQP application in window max bytes 8MB
const size_t qd_session_max_in_win_trunk = (size_t) 134217728; // inter-router in window max bytes 128MB


// Can we leverage the new Proton Session Window API?
//
#if (PN_VERSION_MAJOR > 0) || (PN_VERSION_MINOR > 39)
#define USE_PN_SESSION_WINDOWS 1
#else
#define USE_PN_SESSION_WINDOWS 0
#endif

// Bug workaround to free Proton links when we hope they are no longer used!
// Fingers crossed! :|
Expand Down Expand Up @@ -439,7 +458,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
qd_conn->n_sessions++;
}
DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn);
qd_policy_apply_session_settings(qd_ssn->pn_session, qd_conn);
uint32_t in_window;
qd_policy_get_session_settings(qd_conn, &in_window);
qd_session_set_max_in_window(qd_ssn, in_window);
pn_session_open(qd_ssn->pn_session);
}
}
Expand Down Expand Up @@ -700,7 +721,7 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name,
DEQ_INSERT_TAIL(conn->child_sessions, qd_ssn);
conn->qd_sessions[ssn_class] = qd_ssn;
qd_session_incref(qd_ssn);
pn_session_set_incoming_capacity(qd_ssn->pn_session, cf->incoming_capacity);
qd_session_set_max_in_window(qd_ssn, cf->session_max_in_window);
pn_session_open(qd_ssn->pn_session);
}

Expand Down Expand Up @@ -897,6 +918,9 @@ void qd_link_set_link_id(qd_link_t *link, uint64_t link_id)
qd_session_t *qd_session(pn_session_t *pn_ssn)
{
assert(pn_ssn && qd_session_from_pn(pn_ssn) == 0);
pn_connection_t *pn_conn = pn_session_connection(pn_ssn);
pn_transport_t *pn_tport = pn_connection_transport(pn_conn);

qd_session_t *qd_ssn = new_qd_session_t();
if (qd_ssn) {
ZERO(qd_ssn);
Expand All @@ -905,10 +929,8 @@ qd_session_t *qd_session(pn_session_t *pn_ssn)
qd_ssn->pn_session = pn_ssn;
DEQ_INIT(qd_ssn->q3_blocked_links);
pn_session_set_context(pn_ssn, qd_ssn);

// @TODO(kgiusti) make these dependent on connection role
qd_ssn->out_window_limit = QD_QLIMIT_Q3_UPPER * QD_BUFFER_SIZE;
qd_ssn->out_window_low_watermark = QD_QLIMIT_Q3_LOWER * QD_BUFFER_SIZE;
qd_ssn->remote_max_frame = pn_transport_get_remote_max_frame(pn_tport);
assert(qd_ssn->remote_max_frame != 0);
}
return qd_ssn;
}
Expand Down Expand Up @@ -970,22 +992,38 @@ size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn)
{
assert(qd_ssn && qd_ssn->pn_session);

// discount any data already written but not yet sent
size_t buffered = pn_session_outgoing_bytes(qd_ssn->pn_session);
if (buffered < qd_ssn->out_window_limit) {
return qd_ssn->out_window_limit - buffered;
}
return 0;
if (buffered >= qd_session_max_outgoing_bytes)
return 0; // exceeded maximum buffered limit
size_t avail = qd_session_max_outgoing_bytes - buffered;

#if USE_PN_SESSION_WINDOWS
// never exceed the remaining in window of the peer
size_t limit = pn_session_remote_incoming_window(qd_ssn->pn_session);
limit *= qd_ssn->remote_max_frame;
return MIN(avail, limit);
#else
return avail;
#endif
}


/** Return the session outgoing window low water mark
/** Configure the sessions incoming window limit
*
* Blocked session can resume output once the available outgoing capacity reaches at least this value
* @param qd_ssn Session to configure
* @param in_window maximum incoming window in frames
*/
size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn)
void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window)
{
assert(qd_ssn);
return qd_ssn->out_window_low_watermark;
// older proton session windowing would stall so do not enable it
#if USE_PN_SESSION_WINDOWS
// Use new window configuration API to set the maximum in window and low water mark
assert(in_window >= 2);
int rc = pn_session_set_incoming_window_and_lwm(qd_ssn->pn_session, in_window, in_window / 2);
(void) rc;
assert(rc == 0);
#endif
}


Expand Down
4 changes: 1 addition & 3 deletions src/adaptors/amqp/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,12 @@ uint64_t qd_link_link_id(const qd_link_t *link);
void qd_link_set_link_id(qd_link_t *link, uint64_t link_id);
struct qd_message_t;
void qd_link_set_incoming_msg(qd_link_t *link, struct qd_message_t *msg);
qd_session_t *qd_link_get_session(const qd_link_t *link);

void qd_session_incref(qd_session_t *qd_ssn);
void qd_session_decref(qd_session_t *qd_ssn);
bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn);
qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn);
size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn);
size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn);
void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window);

void qd_connection_release_sessions(qd_connection_t *qd_conn);

Expand Down
6 changes: 5 additions & 1 deletion src/adaptors/amqp/qd_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,12 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
// Common transport configuration.
//
pn_transport_set_max_frame(tport, config->max_frame_size);
pn_transport_set_channel_max(tport, config->max_sessions - 1);
pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
// pn_transport_set_channel_max sets the maximum session *identifier*, not the total number of sessions. Thus Proton
// will allow sessions with identifiers [0..max_sessions], which is one greater than the value we pass to
// pn_transport_set_channel_max. So to limit the maximum number of simultaineous sessions to config->max_sessions we
// have to decrement it by one for Proton.
pn_transport_set_channel_max(tport, config->max_sessions - 1);
}

void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_connection_t *pn_conn, qd_connection_t *qd_conn);
Expand Down
83 changes: 41 additions & 42 deletions src/adaptors/amqp/server_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "dispatch_private.h"
#include "entity.h"

#include <qpid/dispatch/amqp_adaptor.h>
#include <qpid/dispatch/log.h>
#include <qpid/dispatch/tls_common.h>

Expand Down Expand Up @@ -134,9 +135,6 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config,
config->http = qd_entity_opt_bool(entity, "http", false); CHECK();
config->http_root_dir = qd_entity_opt_string(entity, "httpRootDir", 0); CHECK();
config->http = config->http || config->http_root_dir; /* httpRootDir implies http */
config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK();
config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK();
uint64_t ssn_frames = qd_entity_opt_long(entity, "maxSessionFrames", 0); CHECK();
config->idle_timeout_seconds = qd_entity_get_long(entity, "idleTimeoutSeconds"); CHECK();
if (is_listener) {
config->initial_handshake_timeout_seconds = qd_entity_get_long(entity, "initialHandshakeTimeoutSeconds"); CHECK();
Expand Down Expand Up @@ -191,48 +189,49 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config,
if (config->link_capacity == 0)
config->link_capacity = 250;

if (config->max_sessions == 0 || config->max_sessions > 32768)
// Proton disallows > 32768
config->max_sessions = 32768;

if (config->max_frame_size < QD_AMQP_MIN_MAX_FRAME_SIZE)
// Silently promote the minimum max-frame-size
// Proton will do this but the number is needed for the
// incoming capacity calculation.
config->max_frame_size = QD_AMQP_MIN_MAX_FRAME_SIZE;

//
// Given session frame count and max frame size, compute session incoming_capacity
// On 64-bit systems the capacity has no practical limit.
// On 32-bit systems the largest default capacity is half the process address space.
//
bool is_64bit = sizeof(size_t) == 8;
#define MAX_32BIT_CAPACITY ((size_t)(2147483647))
if (ssn_frames == 0) {
config->incoming_capacity = is_64bit ? MAX_32BIT_CAPACITY * (size_t)config->max_frame_size : MAX_32BIT_CAPACITY;
} else {
// Limited incoming frames.
if (is_64bit) {
// Specify this to proton by setting capacity to be
// the product (max_frame_size * ssn_frames).
config->incoming_capacity = (size_t)config->max_frame_size * (size_t)ssn_frames;
// Proton does not support maxSessions > 32768
int64_t value = (int64_t) qd_entity_get_long(entity, "maxSessions"); CHECK();
if (value == 0) {
value = 32768; // default
} else if (value < 0 || value > 32768) {
(void) qd_error(QD_ERROR_CONFIG,
"Invalid maxSessions specified (%"PRId64"). Minimum value is 1 and maximum value is %i",
value, 32768);
goto error;
}
config->max_sessions = (uint32_t) value;

// Ensure maxFrameSize is at least the minimum value required by the standard,
// and it does not exceed the proton APIs max of INT32_MAX
value = (int64_t) qd_entity_get_long(entity, "maxFrameSize"); CHECK();
if (value == 0) {
value = 16384; // default
} else if (value < QD_AMQP_MIN_MAX_FRAME_SIZE || value > INT32_MAX) {
(void) qd_error(QD_ERROR_CONFIG,
"Invalid maxFrameSize specified (%"PRId64"). Minimum value is %d and maximum value is %"PRIi32,
value, QD_AMQP_MIN_MAX_FRAME_SIZE, INT32_MAX);
goto error;
}
config->max_frame_size = (uint32_t) value;

// Ensure that maxSessionFrames does not exceed the proton APIs max of INT32_MAX
value = (int64_t) qd_entity_opt_long(entity, "maxSessionFrames", 0); CHECK();
if (value == 0) {
// Use a sane default. Allow router to router links more capacity than AMQP application links
if (strcmp(config->role, "normal") == 0) {
value = qd_session_max_in_win_user / config->max_frame_size;
} else {
// 32-bit systems have an upper bound to the capacity
uint64_t max_32bit_capacity = (uint64_t)MAX_32BIT_CAPACITY;
uint64_t capacity = (uint64_t)config->max_frame_size * (uint64_t)ssn_frames;
if (capacity <= max_32bit_capacity) {
config->incoming_capacity = (size_t)capacity;
} else {
config->incoming_capacity = MAX_32BIT_CAPACITY;
uint64_t actual_frames = max_32bit_capacity / (uint64_t)config->max_frame_size;

qd_log(LOG_CONN_MGR, QD_LOG_WARNING,
"Server configuration for I/O adapter entity name:'%s', host:'%s', port:'%s', "
"requested maxSessionFrames truncated from %" PRId64 " to %" PRId64,
config->name, config->host, config->port, ssn_frames, actual_frames);
}
value = qd_session_max_in_win_trunk / config->max_frame_size;
}
// Ensure the window is at least 2 frames to allow a non-zero low water mark
value = MAX(value, 2);
} else if (value < 2 || value > INT32_MAX) {
(void) qd_error(QD_ERROR_CONFIG,
"Invalid maxSessionFrames specified (%"PRId64"). Minimum value is 2 and maximum value is %"PRIi32,
value, INT32_MAX);
goto error;
}
config->session_max_in_window = (uint32_t) value;

//
// For now we are hardwiring this attribute to true. If there's an outcry from the
Expand Down
14 changes: 8 additions & 6 deletions src/adaptors/amqp/server_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ typedef struct qd_server_config_t {
int inter_router_cost;

/**
* The maximum size of an AMQP frame in octets.
* The maximum size of an AMQP frame in octets. Frome maxFrameSize configuration attribute.
*/
uint32_t max_frame_size;

Expand All @@ -209,12 +209,14 @@ typedef struct qd_server_config_t {
uint32_t max_sessions;

/**
* The incoming capacity value is calculated to be (sessionMaxFrames * maxFrameSize).
* In a round about way the calculation forces the AMQP Begin/incoming-capacity value
* to equal the specified sessionMaxFrames value measured in units of transfer frames.
* This calculation is done to satisfy proton pn_session_set_incoming_capacity().
* The session incoming window limit in frames. From maxSessionFrames configuration attribute
*
* The window indicates the maximum number of incoming *frames* that the session will buffer. This places a limit on
* the amount of memory the router needs to reserve to accomodate data arriving from the peer session endpoint.
*
* The maximum amount of memory is computed as (max_frame_size * session_max_in_window)
*/
size_t incoming_capacity;
uint32_t session_max_in_window;

/**
* The idle timeout, in seconds. If the peer sends no data frames in this many seconds, the
Expand Down
Loading

0 comments on commit 5907d68

Please sign in to comment.