-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISSUE-1619: Implement session flow control using new Proton window API #1647
base: main
Are you sure you want to change the base?
Conversation
src/adaptors/amqp/container.c
Outdated
// amount of outgoing data that can be buffered on a session before control is returned to Proton. This is necessary to | ||
// improve latency and allow capacity sharing among all links on the session. | ||
// | ||
const size_t qd_session_max_outgoing_bytes = 1048576; // max buffered bytes on a session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const size_t qd_session_max_outgoing_bytes = 1048576; // max buffered bytes on a session | |
const size_t qd_session_max_outgoing_bytes = 1048576; // 1 MB is the maximum bytes that can be written to an outgoing session before backing off. |
src/adaptors/amqp/container.c
Outdated
// improve latency and allow capacity sharing among all links on the session. | ||
// | ||
const size_t qd_session_max_outgoing_bytes = 1048576; // max buffered bytes on a session | ||
const size_t qd_session_low_outgoing_bytes = 524288; // low watermark for max buffered bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const size_t qd_session_low_outgoing_bytes = 524288; // low watermark for max buffered bytes | |
const size_t qd_session_low_outgoing_bytes = 524288; // Start writing outgoing bytes to a session only if the session outgoing capacity is greater than 0.5 MB. low watermark for max buffered bytes |
src/adaptors/amqp/container.c
Outdated
size_t out_window_limit; | ||
size_t out_window_low_watermark; | ||
}; | ||
const size_t qd_session_max_in_win_user = (size_t) 8388608; // AMQP application in window max bytes 8MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AMQP application
It is not clear to me what AMQP application means
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ganeshmurthy "user connection"? Basically any connection via a listener with the role "normal". These would include connections from the control plane to do management stuff. All non router-2-router connections.
src/adaptors/amqp/container.c
Outdated
size_t out_window_low_watermark; | ||
}; | ||
const size_t qd_session_max_in_win_user = (size_t) 8388608; // AMQP application in window max bytes 8MB | ||
const size_t qd_session_max_in_win_trunk = (size_t) 134217728; // inter-router in window max bytes 128MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const size_t qd_session_max_in_win_trunk = (size_t) 134217728; // inter-router in window max bytes 128MB | |
const size_t qd_session_max_in_win_trunk = (size_t) 134217728; // per inter-router connection window max byte limit 128MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ganeshmurthy but the maximum is not per connection, it is per session, right? Hence the variable prefix "qd_session_..."
// Assume defaults will be used | ||
*in_window = cf->session_max_in_window; | ||
|
||
if (qd_conn->policy_settings) { | ||
const qd_policy_spec_t *spec = &qd_conn->policy_settings->spec; | ||
if (!spec->outgoingConnection && spec->maxSessionWindow) { | ||
// Policy configures the window *in bytes* but Proton uses *frames*. Convert to frames | ||
uint32_t max_frame = spec->maxFrameSize ? spec->maxFrameSize : cf->max_frame_size; | ||
*in_window = spec->maxSessionWindow / max_frame; | ||
if (*in_window < 2) | ||
*in_window = 2; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Assume defaults will be used | |
*in_window = cf->session_max_in_window; | |
if (qd_conn->policy_settings) { | |
const qd_policy_spec_t *spec = &qd_conn->policy_settings->spec; | |
if (!spec->outgoingConnection && spec->maxSessionWindow) { | |
// Policy configures the window *in bytes* but Proton uses *frames*. Convert to frames | |
uint32_t max_frame = spec->maxFrameSize ? spec->maxFrameSize : cf->max_frame_size; | |
*in_window = spec->maxSessionWindow / max_frame; | |
if (*in_window < 2) | |
*in_window = 2; | |
} | |
} | |
if (qd_conn->policy_settings) { | |
const qd_policy_spec_t *spec = &qd_conn->policy_settings->spec; | |
if (!spec->outgoingConnection && spec->maxSessionWindow) { | |
// Policy configures the window *in bytes* but Proton uses *frames*. Convert to frames | |
uint32_t max_frame = spec->maxFrameSize ? spec->maxFrameSize : cf->max_frame_size; | |
*in_window = spec->maxSessionWindow / max_frame; | |
if (*in_window < 2) | |
*in_window = 2; | |
} else { | |
*in_window = 2; | |
} | |
} else { | |
// Assume defaults will be used | |
*in_window = cf->session_max_in_window; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ganeshmurthy I must be missing something but this proposal seems wrong - why would we unconditionally set the in_window to 2 if the connection is incoming or no session max was set? In these cases we definitely want to use the cf->session_max_in_window
include/qpid/dispatch/amqp_adaptor.h
Outdated
|
||
// Session windowing limits | ||
extern const size_t qd_session_max_in_win_user; // incoming window byte limit for user connections |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extern const size_t qd_session_max_in_win_user; // incoming window byte limit for user connections | |
extern const size_t qd_session_max_incoming_win_user; // incoming window byte limit for user connections |
include/qpid/dispatch/amqp_adaptor.h
Outdated
|
||
// Session windowing limits | ||
extern const size_t qd_session_max_in_win_user; // incoming window byte limit for user connections | ||
extern const size_t qd_session_max_in_win_trunk; // incoming window byte limit for inter-router connections |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extern const size_t qd_session_max_in_win_trunk; // incoming window byte limit for inter-router connections | |
extern const size_t qd_session_max_incoming_conn_win; // incoming window byte limit for inter-router connections |
@@ -69,15 +69,34 @@ struct qd_session_t { | |||
sys_atomic_t ref_count; | |||
pn_session_t *pn_session; | |||
qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty | |||
uint32_t remote_max_frame; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uint32_t remote_max_frame; | |
uint32_t remote_max_frame_size; |
// Proton does not support maxSessions > 32768 | ||
int64_t value = (int64_t) qd_entity_get_long(entity, "maxSessions"); CHECK(); | ||
if (value == 0) { | ||
value = 32768; // default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value = 32768; // default | |
value = QD_PN_MAX_SESSIONS; // default |
define QD_PN_MAX_SESSIONS in amqp.h ?
// and it does not exceed the proton APIs max of INT32_MAX | ||
value = (int64_t) qd_entity_get_long(entity, "maxFrameSize"); CHECK(); | ||
if (value == 0) { | ||
value = 16384; // default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value = 16384; // default | |
value = QD_DEFAULT_MAX_FRAME_SIZE; // default |
*/ | ||
size_t incoming_capacity; | ||
uint32_t session_max_in_window; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uint32_t session_max_in_window; | |
uint32_t session_max_incoming_window; |
5907d68
to
bb9546e
Compare
|
||
// if the remotes max window is smaller than the default outgoing bytes limit then adjust the limits down | ||
// otherwise we may never resume sending on blocked links (stall) since the low limit will never be exceeded. | ||
size_t window_bytes = in_window * qd_ssn->remote_max_frame; |
Check failure
Code scanning / CodeQL
Multiplication result converted to larger type High
No description provided.