Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-1619: Implement session flow control using new Proton window API #1647

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/qpid/dispatch/amqp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef struct pn_link_t pn_link_t;
// These types are private to the AMQP adaptor:
typedef struct qd_connection_t qd_connection_t;
typedef struct qd_link_t qd_link_t;
typedef struct qd_session_t qd_session_t;


// For use by message.c
Expand All @@ -46,6 +47,8 @@ pn_link_t *qd_link_pn(const qd_link_t *link);
bool qd_connection_strip_annotations_in(const qd_connection_t *c);
uint64_t qd_connection_max_message_size(const qd_connection_t *c);
void qd_connection_log_policy_denial(const qd_link_t *link, const char *text);
qd_session_t *qd_link_get_session(const qd_link_t *link);
size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn);

// Used by the log module
void qd_amqp_connection_set_tracing(bool enabled);
Expand Down
9 changes: 0 additions & 9 deletions include/qpid/dispatch/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ typedef struct qd_link_t qd_link_t;
#define QD_QLIMIT_Q2_LOWER 32 // Re-enable link receive
#define QD_QLIMIT_Q2_UPPER (QD_QLIMIT_Q2_LOWER * 2) // Disable link receive

// Q3 limits the number of bytes allowed to be buffered in an AMQP session's outgoing buffer. Once the Q3 upper limit
// is hit (read via pn_session_outgoing_bytes), pn_link_send will no longer be called for ALL outgoing links sharing the
// session. When enough outgoing bytes have been drained below the lower limit pn_link_sends will resume. Note that Q3
// only applies to AMQP links. Non-AMQP (adaptor) link output is limited by the capacity of the raw connection buffer
// pool.

#define QD_QLIMIT_Q3_LOWER (QD_QLIMIT_Q2_UPPER * 2) // in qd_buffer_t's
#define QD_QLIMIT_Q3_UPPER (QD_QLIMIT_Q3_LOWER * 2)

// Callback for status change (confirmed persistent, loaded-in-memory, etc.)

typedef struct qd_message_t qd_message_t;
Expand Down
4 changes: 2 additions & 2 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@
},
"maxSessionFrames": {
"type": "integer",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 0 (unlimited window).",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This value sets a limit to the number of incoming frames the router will buffer before flow-control is enforced. Thus the maximum amount of memory required for holding incoming data is limited to (maxFrameSize * maxSessionFrames) bytes per session. If not explicitly set a default session window size that is optimized for the connection role is selected. Policy settings will not overwrite this value. maxSessionFrames has a minimum value of 2.",
"required": false,
"create": true
},
Expand Down Expand Up @@ -983,7 +983,7 @@
},
"maxSessionFrames": {
"type": "integer",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings will not overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 0 (unlimited window).",
"description": "Session incoming window measured in transfer frames for sessions created on this connection. This value sets a limit to the number of incoming frames the router will buffer before flow-control is enforced. Thus the maximum amount of memory required for holding incoming data is limited to (maxFrameSize * maxSessionFrames) bytes per session. If not explicitly set a default session window size that is optimized for the connection role is selected. Policy settings will not overwrite this value. maxSessionFrames has a minimum value of 2.",
"required": false,
"create": true
},
Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
qd_session_t *qd_ssn = qd_link_get_session(link);
if (qd_session_is_q3_blocked(qd_ssn)) {
// Q3 blocked - have we drained enough outgoing bytes?
if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_get_outgoing_threshold(qd_ssn)) {
if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_get_outgoing_capacity_low_threshold(qd_ssn)) {
// yes. We must now unblock all links that have been blocked by Q3

qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
Expand Down
159 changes: 138 additions & 21 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,45 @@
pn_session_t *pn_session;
qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty

// For outgoing session flow control. Never buffer more than out_window_limit bytes of data on the session before
// returning control to the proactor. This prevents memory bloat and allows proactor to send buffered data in a
// timely manner. The low watermark is used to unblock the session - do not resume writing to the session until the
// amount of available capacity has grown to at least the low watermark.

size_t out_window_limit;
size_t out_window_low_watermark;
// remotes maximum incoming frame size in bytes (see AMQP 1.0 Open Performative)
uint32_t remote_max_frame;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
uint32_t remote_max_frame;
uint32_t remote_max_frame_size;


// remotes incoming window size in frames (see AMQP 1.0 Begin Performative)
uint32_t remote_max_incoming_window;

// Session outgoing flow control: Stop writing outgoing data (calling pn_link_send()) to the session when the total
// number of buffered bytes has exceeded the high threshold (see Proton pn_session_outgoing_bytes()). Resume writing
// data when the session has sent enough data to reduce the number of buffered output bytes to below the low
// threshold. This prevents the router from buffering too much output data before allowing Proton to write it out.
// See qd_session_get_outgoing_capacity() for details.
size_t outgoing_bytes_high_threshold;
size_t outgoing_bytes_low_threshold;
};


// Session window limits (See Section 2.5.6 Session Flow Control in AMQP V1.0 Specification)
//
// A session incoming window determines how many incoming frames the session will accept across all incoming links. This
// places a limit on the number of incoming data bytes that have to be buffered on the session (connection max-frame *
// max incoming window frames). The local session incoming window configuration is determined by the maxFrameSize and
// maxSessionFrames configuration attributes of an AMQP listener/connector.
//
// The remote peers session window must be honored when writing output to a sending link: we must not send more data
// than the window allows
//
// Default window settings (in bytes not frames). Give inter-router connections a larger window for better performance.
const size_t qd_session_incoming_window_normal = (size_t) 8388608; // window for role=normal connections (8MB)
const size_t qd_session_incoming_window_router = (size_t) 134217728; // window for inter-router connections (128MB)


// Can we leverage the new Proton Session Window API?
//
#if (PN_VERSION_MAJOR > 0) || (PN_VERSION_MINOR > 39)
#define USE_PN_SESSION_WINDOWS 1
#else
#define USE_PN_SESSION_WINDOWS 0
#endif

// Bug workaround to free Proton links when we hope they are no longer used!
// Fingers crossed! :|
//
Expand Down Expand Up @@ -106,6 +136,12 @@
};

qd_session_t *qd_session(pn_session_t *pn_ssn);
static void qd_session_configure_incoming_window(qd_session_t *qd_ssn, uint32_t in_window);

#if USE_PN_SESSION_WINDOWS
// Access to the remote incoming window was added to Proton post-0.39.0
static void qd_session_set_remote_incoming_window(qd_session_t *qd_ssn, uint32_t in_window);
#endif

static inline qd_session_t *qd_session_from_pn(pn_session_t *pn_ssn)
{
Expand Down Expand Up @@ -439,9 +475,18 @@
qd_conn->n_sessions++;
}
DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn);
qd_policy_apply_session_settings(qd_ssn->pn_session, qd_conn);
uint32_t in_window;
qd_policy_get_session_settings(qd_conn, &in_window);
qd_session_configure_incoming_window(qd_ssn, in_window);
pn_session_open(qd_ssn->pn_session);
}
#if USE_PN_SESSION_WINDOWS
// Remote has opened, now the remote incoming window is available
qd_session_t *qd_ssn = qd_session_from_pn(ssn);
assert(qd_ssn);
qd_session_set_remote_incoming_window(qd_ssn,
pn_session_remote_incoming_window(ssn));
#endif
}
break;

Expand Down Expand Up @@ -700,7 +745,7 @@
DEQ_INSERT_TAIL(conn->child_sessions, qd_ssn);
conn->qd_sessions[ssn_class] = qd_ssn;
qd_session_incref(qd_ssn);
pn_session_set_incoming_capacity(qd_ssn->pn_session, cf->incoming_capacity);
qd_session_configure_incoming_window(qd_ssn, cf->session_max_in_window);
pn_session_open(qd_ssn->pn_session);
}

Expand Down Expand Up @@ -897,6 +942,9 @@
qd_session_t *qd_session(pn_session_t *pn_ssn)
{
assert(pn_ssn && qd_session_from_pn(pn_ssn) == 0);
pn_connection_t *pn_conn = pn_session_connection(pn_ssn);
pn_transport_t *pn_tport = pn_connection_transport(pn_conn);

qd_session_t *qd_ssn = new_qd_session_t();
if (qd_ssn) {
ZERO(qd_ssn);
Expand All @@ -905,10 +953,12 @@
qd_ssn->pn_session = pn_ssn;
DEQ_INIT(qd_ssn->q3_blocked_links);
pn_session_set_context(pn_ssn, qd_ssn);
qd_ssn->remote_max_frame = pn_transport_get_remote_max_frame(pn_tport);
assert(qd_ssn->remote_max_frame != 0);

// @TODO(kgiusti) make these dependent on connection role
qd_ssn->out_window_limit = QD_QLIMIT_Q3_UPPER * QD_BUFFER_SIZE;
qd_ssn->out_window_low_watermark = QD_QLIMIT_Q3_LOWER * QD_BUFFER_SIZE;
// These thresholds come from the old Q3 session byte limits
qd_ssn->outgoing_bytes_high_threshold = 1048576;
qd_ssn->outgoing_bytes_low_threshold = 524288;
}
return qd_ssn;
}
Expand Down Expand Up @@ -965,28 +1015,95 @@
*
* Returns the available outgoing data capacity for the session. This capacity must be shared by all sending links on
* this session.
*
* The capacity is determined by the remotes current incoming window minus any outgoing bytes already written to the
* session. In other words:
*
* capacity = pn_remote_incoming_window(session) - pn_session_outgoing_bytes(session)
*
* However we must also prevent the router from buffering too much outgoing data at once. This is especially a problem when
* the remote uses an unlimited incoming window (default proton behavior). To prevent this we set an additional limit to
* the maximum amount of outgoing data that can be buffered in the session.
*/
size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn)
{
assert(qd_ssn && qd_ssn->pn_session);

// discount any data already written but not yet sent
size_t buffered = pn_session_outgoing_bytes(qd_ssn->pn_session);
if (buffered < qd_ssn->out_window_limit) {
return qd_ssn->out_window_limit - buffered;
}
return 0;
if (buffered >= qd_ssn->outgoing_bytes_high_threshold)
return 0; // exceeded maximum buffered limit
size_t avail = qd_ssn->outgoing_bytes_high_threshold - buffered;

#if USE_PN_SESSION_WINDOWS
// never exceed the remaining incoming window capacity of the peer
size_t limit = pn_session_remote_incoming_window(qd_ssn->pn_session);
limit *= qd_ssn->remote_max_frame;
return MIN(avail, limit);
#else
return avail;
#endif
}


/** Return the session outgoing window low water mark
/** Get the sessions current outgoing capacity low threshold
*
* Blocked session can resume output once the available outgoing capacity reaches at least this value
* Returns the lower threshold for the sessions outgoing capacity. This threshold is used for resuming blocked output on
* the session. Output can resume once the available outgoing capacity increases beyond this threshold.
*/
size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn)
size_t qd_session_get_outgoing_capacity_low_threshold(const qd_session_t *qd_ssn)
{
assert(qd_ssn);
return qd_ssn->out_window_low_watermark;
return qd_ssn->outgoing_bytes_low_threshold;
}


/** Configure the sessions local incoming window limit.
*
* This sets the value of the incoming window for the session. This value is sent to the remote peer in the Begin
* Performative.
*
* @param qd_ssn Session to configure
* @param in_window maximum incoming window in frames
*/
static void qd_session_configure_incoming_window(qd_session_t *qd_ssn, uint32_t in_window)
{
// older proton session windowing would stall so do not enable it
#if USE_PN_SESSION_WINDOWS
// Use new window configuration API to set the maximum in window and low water mark
assert(in_window >= 2);
int rc = pn_session_set_incoming_window_and_lwm(qd_ssn->pn_session, in_window, in_window / 2);
(void) rc;
assert(rc == 0);
#endif
}


/** Set the session incoming window that was advertised by the remote
*
* This is the value for the remotes incoming session window. It arrives in the Begin Performative.
*
* @param qd_ssn Session to update
* @param in_window the incoming window as given by the remote.
*/
#if USE_PN_SESSION_WINDOWS
static void qd_session_set_remote_incoming_window(qd_session_t *qd_ssn, uint32_t in_window)
{
// The true window size is given in the Begin Performative. Once frames are transferred the value of the remote
// incoming window read from Proton can be less than the full size due to the window being in use. The assert is an
// attempt to prevent accidentally calling this function after frame transfer starts:
assert(in_window != 0 && in_window >= qd_ssn->remote_max_incoming_window);

qd_ssn->remote_max_incoming_window = in_window;

// if the remotes max window is smaller than the default outgoing bytes limit then adjust the limits down
// otherwise we may never resume sending on blocked links (stall) since the low limit will never be exceeded.
size_t window_bytes = in_window * qd_ssn->remote_max_frame;

Check failure

Code scanning / CodeQL

Multiplication result converted to larger type High

Multiplication result may overflow 'unsigned int' before it is converted to 'size_t'.
if (window_bytes < qd_ssn->outgoing_bytes_high_threshold) {
qd_ssn->outgoing_bytes_high_threshold = window_bytes;
qd_ssn->outgoing_bytes_low_threshold = window_bytes / 2;
}
}
#endif


/** Release all of the connections sessions
Expand Down
8 changes: 5 additions & 3 deletions src/adaptors/amqp/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,18 @@ uint64_t qd_link_link_id(const qd_link_t *link);
void qd_link_set_link_id(qd_link_t *link, uint64_t link_id);
struct qd_message_t;
void qd_link_set_incoming_msg(qd_link_t *link, struct qd_message_t *msg);
qd_session_t *qd_link_get_session(const qd_link_t *link);

void qd_session_incref(qd_session_t *qd_ssn);
void qd_session_decref(qd_session_t *qd_ssn);
bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn);
qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn);
size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn);
size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn);
size_t qd_session_get_outgoing_capacity_low_threshold(const qd_session_t *qd_ssn);

void qd_connection_release_sessions(qd_connection_t *qd_conn);

// Defaults for session incoming window size
extern const size_t qd_session_incoming_window_normal;
extern const size_t qd_session_incoming_window_router;

///@}
#endif
6 changes: 5 additions & 1 deletion src/adaptors/amqp/qd_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,12 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
// Common transport configuration.
//
pn_transport_set_max_frame(tport, config->max_frame_size);
pn_transport_set_channel_max(tport, config->max_sessions - 1);
pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
// pn_transport_set_channel_max sets the maximum session *identifier*, not the total number of sessions. Thus Proton
// will allow sessions with identifiers [0..max_sessions], which is one greater than the value we pass to
// pn_transport_set_channel_max. So to limit the maximum number of simultaineous sessions to config->max_sessions we
// have to decrement it by one for Proton.
pn_transport_set_channel_max(tport, config->max_sessions - 1);
}

void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_connection_t *pn_conn, qd_connection_t *qd_conn);
Expand Down
Loading
Loading