Skip to content

Commit

Permalink
fixup: adjust limits based on peer window, review input
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Nov 12, 2024
1 parent 1ed963e commit bb9546e
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 26 deletions.
5 changes: 0 additions & 5 deletions include/qpid/dispatch/amqp_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ typedef struct qd_connection_t qd_connection_t;
typedef struct qd_link_t qd_link_t;
typedef struct qd_session_t qd_session_t;

// Session windowing limits
extern const size_t qd_session_max_in_win_user; // incoming window byte limit for user connections
extern const size_t qd_session_max_in_win_trunk; // incoming window byte limit for inter-router connections
extern const size_t qd_session_max_outgoing_bytes; // limit to outgoing buffered data
extern const size_t qd_session_low_outgoing_bytes; // low water mark to resume buffering outgoing data

// For use by message.c

Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
qd_session_t *qd_ssn = qd_link_get_session(link);
if (qd_session_is_q3_blocked(qd_ssn)) {
// Q3 blocked - have we drained enough outgoing bytes?
if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_low_outgoing_bytes) {
if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_get_outgoing_capacity_low_threshold(qd_ssn)) {
// yes. We must now unblock all links that have been blocked by Q3

qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn);
Expand Down
105 changes: 89 additions & 16 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,36 @@ struct qd_session_t {
sys_atomic_t ref_count;
pn_session_t *pn_session;
qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty

// remotes maximum incoming frame size in bytes (see AMQP 1.0 Open Performative)
uint32_t remote_max_frame;

// remotes incoming window size in frames (see AMQP 1.0 Begin Performative)
uint32_t remote_max_incoming_window;

// Session outgoing flow control: Stop writing outgoing data (calling pn_link_send()) to the session when the total
// number of buffered bytes has exceeded the high threshold (see Proton pn_session_outgoing_bytes()). Resume writing
// data when the session has sent enough data to reduce the number of buffered output bytes to below the low
// threshold. This prevents the router from buffering too much output data before allowing Proton to write it out.
// See qd_session_get_outgoing_capacity() for details.
size_t outgoing_bytes_high_threshold;
size_t outgoing_bytes_low_threshold;
};

// Session window limits

// Session window limits (See Section 2.5.6 Session Flow Control in AMQP V1.0 Specification)
//
// A session incoming window determines how many incoming frames the session will accept across all incoming links. This
// places a limit on the number of incoming data bytes that have to be buffered on the session (connection max-frame *
// max incoming window frames). The local session incoming window configuration is determined by the maxFrameSize and
// maxSessionFrames configuration attributes of an AMQP listener/connector.
//
// The remote peers session window must be honored when writing output to a sending link. In addition we limit the
// amount of outgoing data that can be buffered on a session before control is returned to Proton. This is necessary to
// improve latency and allow capacity sharing among all links on the session.
// The remote peers session window must be honored when writing output to a sending link: we must not send more data
// than the window allows
//
const size_t qd_session_max_outgoing_bytes = 1048576; // max buffered bytes on a session
const size_t qd_session_low_outgoing_bytes = 524288; // low watermark for max buffered bytes

const size_t qd_session_max_in_win_user = (size_t) 8388608; // AMQP application in window max bytes 8MB
const size_t qd_session_max_in_win_trunk = (size_t) 134217728; // inter-router in window max bytes 128MB
// Default window settings (in bytes not frames). Give inter-router connections a larger window for better performance.
const size_t qd_session_incoming_window_normal = (size_t) 8388608; // window for role=normal connections (8MB)
const size_t qd_session_incoming_window_router = (size_t) 134217728; // window for inter-router connections (128MB)


// Can we leverage the new Proton Session Window API?
Expand Down Expand Up @@ -125,6 +136,9 @@ struct qd_container_t {
};

qd_session_t *qd_session(pn_session_t *pn_ssn);
static void qd_session_configure_incoming_window(qd_session_t *qd_ssn, uint32_t in_window);
static void qd_session_set_remote_incoming_window(qd_session_t *qd_ssn, uint32_t in_window);


static inline qd_session_t *qd_session_from_pn(pn_session_t *pn_ssn)
{
Expand Down Expand Up @@ -460,9 +474,16 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn);
uint32_t in_window;
qd_policy_get_session_settings(qd_conn, &in_window);
qd_session_set_max_in_window(qd_ssn, in_window);
qd_session_configure_incoming_window(qd_ssn, in_window);
pn_session_open(qd_ssn->pn_session);
}
#if USE_PN_SESSION_WINDOWS
// Remote has opened, now the remote incoming window is available
qd_session_t *qd_ssn = qd_session_from_pn(ssn);
assert(qd_ssn);
qd_session_set_remote_incoming_window(qd_ssn,
pn_session_remote_incoming_window(ssn));
#endif
}
break;

Expand Down Expand Up @@ -721,7 +742,7 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name,
DEQ_INSERT_TAIL(conn->child_sessions, qd_ssn);
conn->qd_sessions[ssn_class] = qd_ssn;
qd_session_incref(qd_ssn);
qd_session_set_max_in_window(qd_ssn, cf->session_max_in_window);
qd_session_configure_incoming_window(qd_ssn, cf->session_max_in_window);
pn_session_open(qd_ssn->pn_session);
}

Expand Down Expand Up @@ -931,6 +952,10 @@ qd_session_t *qd_session(pn_session_t *pn_ssn)
pn_session_set_context(pn_ssn, qd_ssn);
qd_ssn->remote_max_frame = pn_transport_get_remote_max_frame(pn_tport);
assert(qd_ssn->remote_max_frame != 0);

// These thresholds come from the old Q3 session byte limits
qd_ssn->outgoing_bytes_high_threshold = 1048576;
qd_ssn->outgoing_bytes_low_threshold = 524288;
}
return qd_ssn;
}
Expand Down Expand Up @@ -987,19 +1012,28 @@ bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn)
*
* Returns the available outgoing data capacity for the session. This capacity must be shared by all sending links on
* this session.
*
* The capacity is determined by the remotes current incoming window minus any outgoing bytes already written to the
* session. In other words:
*
* capacity = pn_remote_incoming_window(session) - pn_session_outgoing_bytes(session)
*
* However we must also prevent the router from buffering too much outgoing data at once. This is especially a problem when
* the remote uses an unlimited incoming window (default proton behavior). To prevent this we set an additional limit to
* the maximum amount of outgoing data that can be buffered in the session.
*/
size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn)
{
assert(qd_ssn && qd_ssn->pn_session);

// discount any data already written but not yet sent
size_t buffered = pn_session_outgoing_bytes(qd_ssn->pn_session);
if (buffered >= qd_session_max_outgoing_bytes)
if (buffered >= qd_ssn->outgoing_bytes_high_threshold)
return 0; // exceeded maximum buffered limit
size_t avail = qd_session_max_outgoing_bytes - buffered;
size_t avail = qd_ssn->outgoing_bytes_high_threshold - buffered;

#if USE_PN_SESSION_WINDOWS
// never exceed the remaining in window of the peer
// never exceed the remaining incoming window capacity of the peer
size_t limit = pn_session_remote_incoming_window(qd_ssn->pn_session);
limit *= qd_ssn->remote_max_frame;
return MIN(avail, limit);
Expand All @@ -1009,12 +1043,26 @@ size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn)
}


/** Configure the sessions incoming window limit
/** Get the sessions current outgoing capacity low threshold
*
* Returns the lower threshold for the sessions outgoing capacity. This threshold is used for resuming blocked output on
* the session. Output can resume once the available outgoing capacity increases beyond this threshold.
*/
size_t qd_session_get_outgoing_capacity_low_threshold(const qd_session_t *qd_ssn)
{
return qd_ssn->outgoing_bytes_low_threshold;
}


/** Configure the sessions local incoming window limit.
*
* This sets the value of the incoming window for the session. This value is sent to the remote peer in the Begin
* Performative.
*
* @param qd_ssn Session to configure
* @param in_window maximum incoming window in frames
*/
void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window)
static void qd_session_configure_incoming_window(qd_session_t *qd_ssn, uint32_t in_window)
{
// older proton session windowing would stall so do not enable it
#if USE_PN_SESSION_WINDOWS
Expand All @@ -1026,6 +1074,31 @@ void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window)
#endif
}

/** Set the session incoming window that was advertised by the remote
*
* This is the value for the remotes incoming session window. It arrives in the Begin Performative and may be updated by
* arriving Flow Performatives.
*
* @param qd_ssn Session to update
* @param in_window the incoming window as given by the remote.
*/
static void qd_session_set_remote_incoming_window(qd_session_t *qd_ssn, uint32_t in_window)
{
assert(in_window != 0);

// Record the largest window advertised by the remote.
if (in_window > qd_ssn->remote_max_incoming_window) {
qd_ssn->remote_max_incoming_window = in_window;
// if the remotes max window is smaller than the default outgoing bytes limit then adjust the limits down
// otherwise we may never resume sending on blocked links (stall) since the low limit will never be exceeded.
size_t window_bytes = in_window * qd_ssn->remote_max_frame;

Check failure

Code scanning / CodeQL

Multiplication result converted to larger type High

Multiplication result may overflow 'unsigned int' before it is converted to 'size_t'.
if (window_bytes < qd_ssn->outgoing_bytes_high_threshold) {
qd_ssn->outgoing_bytes_high_threshold = window_bytes;
qd_ssn->outgoing_bytes_low_threshold = window_bytes / 2;
}
}
}


/** Release all of the connections sessions
*
Expand Down
6 changes: 5 additions & 1 deletion src/adaptors/amqp/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@ void qd_session_incref(qd_session_t *qd_ssn);
void qd_session_decref(qd_session_t *qd_ssn);
bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn);
qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn);
void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window);
size_t qd_session_get_outgoing_capacity_low_threshold(const qd_session_t *qd_ssn);

void qd_connection_release_sessions(qd_connection_t *qd_conn);

// Defaults for session incoming window size
extern const size_t qd_session_incoming_window_normal;
extern const size_t qd_session_incoming_window_router;

///@}
#endif
6 changes: 3 additions & 3 deletions src/adaptors/amqp/server_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
/*
* Configuration record for listener and connector entities
*/

#include "container.h"
#include "server_config.h"
#include "dispatch_private.h"
#include "entity.h"
Expand Down Expand Up @@ -219,9 +219,9 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config,
if (value == 0) {
// Use a sane default. Allow router to router links more capacity than AMQP application links
if (strcmp(config->role, "normal") == 0) {
value = qd_session_max_in_win_user / config->max_frame_size;
value = qd_session_incoming_window_normal / config->max_frame_size;
} else {
value = qd_session_max_in_win_trunk / config->max_frame_size;
value = qd_session_incoming_window_router / config->max_frame_size;
}
// Ensure the window is at least 2 frames to allow a non-zero low water mark
value = MAX(value, 2);
Expand Down

0 comments on commit bb9546e

Please sign in to comment.