diff --git a/include/ap_mmn.h b/include/ap_mmn.h index fb8f4512d47..aac4e1a3401 100644 --- a/include/ap_mmn.h +++ b/include/ap_mmn.h @@ -735,14 +735,18 @@ * ap_check_output_pending() * 20211221.27 (2.5.1-dev) Add min_connection_timeout hook and * ap_get_connection_timeout() + * 20211221.28 (2.5.1-dev) Add ap_mpm_poll_suspended() and + * AP_MPMQ_CAN_POLL_SUSPENDED + * 20240701.0 (2.5.1-dev) Axe ap_mpm_register_poll_callback and + * ap_mpm_register_poll_callback_timeout */ #define MODULE_MAGIC_COOKIE 0x41503235UL /* "AP25" */ #ifndef MODULE_MAGIC_NUMBER_MAJOR -#define MODULE_MAGIC_NUMBER_MAJOR 20211221 +#define MODULE_MAGIC_NUMBER_MAJOR 20240701 #endif -#define MODULE_MAGIC_NUMBER_MINOR 27 /* 0...n */ +#define MODULE_MAGIC_NUMBER_MINOR 0 /* 0...n */ /** * Determine if the server's current MODULE_MAGIC_NUMBER is at least a diff --git a/include/ap_mpm.h b/include/ap_mpm.h index f2fd436d508..9a7ec6eeaa3 100644 --- a/include/ap_mpm.h +++ b/include/ap_mpm.h @@ -184,6 +184,8 @@ AP_DECLARE(apr_status_t) ap_os_create_privileged_process( #define AP_MPMQ_CAN_POLL 18 /** MPM supports CONN_STATE_ASYNC_WAITIO */ #define AP_MPMQ_CAN_WAITIO 19 +/** MPM implements the poll_suspended hook */ +#define AP_MPMQ_CAN_POLL_SUSPENDED 20 /** @} */ /** @@ -206,54 +208,13 @@ typedef void (ap_mpm_callback_fn_t)(void *baton); /* only added support in the Event MPM.... check for APR_ENOTIMPL */ AP_DECLARE(apr_status_t) ap_mpm_resume_suspended(conn_rec *c); /* only added support in the Event MPM.... check for APR_ENOTIMPL */ +AP_DECLARE(apr_status_t) ap_mpm_poll_suspended(conn_rec *c, apr_pool_t *p, + const apr_array_header_t *pfds, + apr_interval_time_t timeout); +/* only added support in the Event MPM.... check for APR_ENOTIMPL */ AP_DECLARE(apr_status_t) ap_mpm_register_timed_callback( apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton); -/** - * Register a callback on the readability or writability on a group of - * sockets/pipes. - * @param p Pool used by the MPM for its internal allocations - * @param pfds Array of apr_pollfd_t - * @param cbfn The callback function - * @param baton userdata for the callback function - * @return APR_SUCCESS if all sockets/pipes could be added to a pollset, - * APR_ENOTIMPL if no asynch support, or an apr_pollset_add error. - * @remark When activity is found on any 1 socket/pipe in the list, all are removed - * from the pollset and only 1 callback is issued. - * @remark The passed in pool can be cleared by cbfn and tofn when called back, - * it retains no MPM persistent data and won't be used until the next call - * to ap_mpm_register_poll_callback[_timeout]. - */ - -AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback( - apr_pool_t *p, const apr_array_header_t *pfds, - ap_mpm_callback_fn_t *cbfn, void *baton); - -/** - * Register a callback on the readability or writability on a group of sockets/pipes, - * with a timeout. - * @param p Pool used by the MPM for its internal allocations - * @param pfds Array of apr_pollfd_t - * @param cbfn The callback function - * @param tofn The callback function if the timeout expires - * @param baton userdata for the callback function - * @param timeout timeout for I/O in microseconds, unlimited if <= 0 - * @return APR_SUCCESS if all sockets/pipes could be added to a pollset, - * APR_ENOTIMPL if no asynch support, or an apr_pollset_add error. - * @remark When activity is found on any 1 socket/pipe in the list, all are removed - * from the pollset and only 1 callback is issued. - * @remark For each call, only one of tofn or cbfn will be called, never both. - * @remark The passed in pool can be cleared by cbfn and tofn when called back, - * it retains no MPM persistent data and won't be used until the next call - * to ap_mpm_register_poll_callback[_timeout]. - */ - -AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback_timeout( - apr_pool_t *p, const apr_array_header_t *pfds, - ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, - void *baton, apr_time_t timeout); - - typedef enum mpm_child_status { MPM_CHILD_STARTED, MPM_CHILD_EXITED, diff --git a/include/httpd.h b/include/httpd.h index ae08740b227..931f5fff49a 100644 --- a/include/httpd.h +++ b/include/httpd.h @@ -1334,7 +1334,7 @@ typedef enum { CONN_STATE_PROCESSING, /* Processed by process_connection hooks */ CONN_STATE_HANDLER, /* Processed by the modules handlers */ CONN_STATE_WRITE_COMPLETION, /* Flushed by the MPM before entering CONN_STATE_KEEPALIVE */ - CONN_STATE_SUSPENDED, /* Suspended in the MPM until ap_run_resume_suspended() */ + CONN_STATE_SUSPENDED, /* Suspended from the MPM until ap_run_resume_suspended() */ CONN_STATE_LINGER, /* MPM flushes then closes the connection with lingering */ CONN_STATE_LINGER_NORMAL, /* MPM has started lingering close with normal timeout */ CONN_STATE_LINGER_SHORT, /* MPM has started lingering close with short timeout */ diff --git a/include/mpm_common.h b/include/mpm_common.h index 34c61e2a6c2..43320b2b5c9 100644 --- a/include/mpm_common.h +++ b/include/mpm_common.h @@ -422,22 +422,12 @@ AP_DECLARE_HOOK(int, mpm_query, (int query_code, int *result, apr_status_t *rv)) AP_DECLARE_HOOK(apr_status_t, mpm_register_timed_callback, (apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton)) -/** - * register the specified callback - * @ingroup hooks - */ -AP_DECLARE_HOOK(apr_status_t, mpm_register_poll_callback, - (apr_pool_t *p, const apr_array_header_t *pds, - ap_mpm_callback_fn_t *cbfn, void *baton)) - -/* register the specified callback, with timeout +/** Put suspended connection's pollfds into the MPM's pollset * @ingroup hooks - * */ -AP_DECLARE_HOOK(apr_status_t, mpm_register_poll_callback_timeout, - (apr_pool_t *p, const apr_array_header_t *pds, - ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, - void *baton, apr_time_t timeout)) +AP_DECLARE_HOOK(apr_status_t, mpm_poll_suspended, + (conn_rec *c, apr_pool_t *p, const apr_array_header_t *pfds, + apr_interval_time_t timeout)) /** Resume the suspended connection * @ingroup hooks diff --git a/modules/http/http_core.c b/modules/http/http_core.c index 7e9f82f87dd..92a472d3fa7 100644 --- a/modules/http/http_core.c +++ b/modules/http/http_core.c @@ -182,20 +182,22 @@ static int ap_process_http_async_connection(conn_rec *c) * of nondeterministic failures later. */ r = NULL; - } - if (cs->state != CONN_STATE_WRITE_COMPLETION && - cs->state != CONN_STATE_SUSPENDED && - cs->state != CONN_STATE_LINGER) { - /* Something went wrong; close the connection */ - cs->state = CONN_STATE_LINGER; + switch (cs->state) { + case CONN_STATE_WRITE_COMPLETION: + case CONN_STATE_SUSPENDED: + case CONN_STATE_LINGER: + return OK; + default: + /* Unexpected, close */ + break; + } } } - else { /* ap_read_request failed - client may have closed */ - cs->state = CONN_STATE_LINGER; - } } + /* Something went wrong; close the connection */ + cs->state = CONN_STATE_LINGER; return OK; } diff --git a/modules/proxy/mod_proxy_http.c b/modules/proxy/mod_proxy_http.c index 38da5b0f7f6..66a66af7949 100644 --- a/modules/proxy/mod_proxy_http.c +++ b/modules/proxy/mod_proxy_http.c @@ -19,9 +19,12 @@ #include "mod_proxy.h" #include "ap_regex.h" #include "ap_mpm.h" +#include "mpm_common.h" module AP_MODULE_DECLARE_DATA proxy_http_module; +static int mpm_can_poll_suspended = 0; + static int (*ap_proxy_clear_connection_fn)(request_rec *r, apr_table_t *headers) = NULL; @@ -275,12 +278,6 @@ static void add_cl(apr_pool_t *p, #define MAX_MEM_SPOOL 16384 -typedef enum { - PROXY_HTTP_REQ_HAVE_HEADER = 0, - - PROXY_HTTP_TUNNELING -} proxy_http_state; - typedef enum { RB_INIT = 0, RB_STREAM_CL, @@ -307,7 +304,6 @@ typedef struct { char *old_cl_val, *old_te_val; apr_off_t cl_val; - proxy_http_state state; rb_methods rb_method; const char *upgrade; @@ -316,108 +312,148 @@ typedef struct { apr_pool_t *async_pool; apr_interval_time_t idle_timeout; - unsigned int can_go_async :1, + unsigned int can_suspend :1, do_100_continue :1, prefetch_nonblocking :1, - force10 :1; + force10 :1, + suspended :1, + upgraded :1; } proxy_http_req_t; -static void proxy_http_async_finish(proxy_http_req_t *req) +static int proxy_http_tunnel_pump(proxy_http_req_t *req) +{ + int status = ap_proxy_tunnel_run(req->tunnel); + if (status == HTTP_GATEWAY_TIME_OUT) { + if (!req->can_suspend) { + /* ap_proxy_tunnel_run() didn't log this */ + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r, APLOGNO() + "proxy: %s tunneling timed out", + req->proto); + } + else { + status = SUSPENDED; + } + } + return status; +} + +/* The backend and SUSPENDED client connections are done, + * release them (the latter in the MPM). + */ +static void proxy_http_async_done(proxy_http_req_t *req, int cancelled) { - conn_rec *c = req->r->connection; + request_rec *r = req->r; + conn_rec *c = r->connection; + proxy_conn_rec *backend = req->backend; + proxy_tunnel_rec *tunnel = req->tunnel; + int reusable = (!cancelled && !req->upgraded); + + ap_log_rerror(APLOG_MARK, APLOG_TRACE5, 0, r, "proxy %s: %s async", + req->proto, cancelled ? "cancel" : "finish"); + + if (req->async_pool) { + apr_pool_destroy(req->async_pool); + req->async_pool = NULL; + } - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r, - "proxy %s: finish async", req->proto); + if (!reusable) { + c->keepalive = AP_CONN_CLOSE; + backend->close = 1; + } /* Report bytes exchanged by the backend */ - req->backend->worker->s->read += - ap_proxy_tunnel_conn_bytes_in(req->tunnel->origin); - req->backend->worker->s->transferred += - ap_proxy_tunnel_conn_bytes_out(req->tunnel->origin); + backend->worker->s->read += + ap_proxy_tunnel_conn_bytes_in(tunnel->origin); + backend->worker->s->transferred += + ap_proxy_tunnel_conn_bytes_out(tunnel->origin); - proxy_run_detach_backend(req->r, req->backend); - ap_proxy_release_connection(req->proto, req->backend, req->r->server); + proxy_run_detach_backend(r, backend); + ap_proxy_release_connection(req->proto, backend, r->server); - ap_finalize_request_protocol(req->r); - ap_process_request_after_handler(req->r); - /* don't touch req or req->r from here */ + ap_finalize_request_protocol(r); + ap_process_request_after_handler(r); + /* don't dereference req or r from here! */ - c->cs->state = CONN_STATE_LINGER; + /* Return the client connection to the MPM */ + if (reusable) { + c->cs->state = CONN_STATE_WRITE_COMPLETION; + } + else { + c->cs->state = CONN_STATE_LINGER; + } ap_mpm_resume_suspended(c); } -/* If neither socket becomes readable in the specified timeout, - * this callback will kill the request. - * We do not have to worry about having a cancel and a IO both queued. - */ -static void proxy_http_async_cancel_cb(void *baton) -{ - proxy_http_req_t *req = (proxy_http_req_t *)baton; - - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r, - "proxy %s: cancel async", req->proto); - - req->r->connection->keepalive = AP_CONN_CLOSE; - req->backend->close = 1; - proxy_http_async_finish(req); -} +/* Tell the MPM to poll the connections and resume when ready */ +static void proxy_http_async_poll(proxy_http_req_t *req) +{ + conn_rec *c = req->r->connection; + proxy_tunnel_rec *tunnel = req->tunnel; -/* Invoked by the event loop when data is ready on either end. - * We don't need the invoke_mtx, since we never put multiple callback events - * in the queue. - */ -static void proxy_http_async_cb(void *baton) -{ - proxy_http_req_t *req = (proxy_http_req_t *)baton; - int status; + ap_log_rerror(APLOG_MARK, APLOG_TRACE5, 0, req->r, + "proxy %s: going async", req->proto); + /* Create/clear the subpool used by the MPM to allocate + * the temporary data needed for this polling. + */ if (req->async_pool) { - /* Clear MPM's temporary data */ apr_pool_clear(req->async_pool); } + else { + apr_pool_create(&req->async_pool, req->p); + } - switch (req->state) { - case PROXY_HTTP_TUNNELING: - /* Pump both ends until they'd block and then start over again */ - status = ap_proxy_tunnel_run(req->tunnel); - if (status == HTTP_GATEWAY_TIME_OUT) { - status = SUSPENDED; - } - break; + ap_mpm_poll_suspended(c, req->async_pool, tunnel->pfds, req->idle_timeout); +} - default: - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r, - "proxy %s: unexpected async state (%i)", - req->proto, (int)req->state); - status = HTTP_INTERNAL_SERVER_ERROR; - break; - } +/* The resume_connection hook called by the MPM when async polling completes (or times out) */ +static void proxy_http_resume_connection(conn_rec *c, request_rec *r) +{ + proxy_http_req_t *req = NULL; + int status; - if (status == SUSPENDED) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r, - "proxy %s: suspended, going async", - req->proto); - - if (!req->async_pool) { - /* Create the subpool used by the MPM to alloc its own - * temporary data, which we want to clear on the next - * round (above) to avoid leaks. - */ - apr_pool_create(&req->async_pool, req->p); - } + if (r) { + req = ap_get_module_config(r->request_config, &proxy_http_module); + } + if (!req || !req->suspended) { + return; + } + ap_assert(req->r == r); - ap_mpm_register_poll_callback_timeout(req->async_pool, - req->tunnel->pfds, - proxy_http_async_cb, - proxy_http_async_cancel_cb, - req, req->idle_timeout); + if (c->cs->state == CONN_STATE_SUSPENDED) { + status = proxy_http_tunnel_pump(req); + } + else { + AP_DEBUG_ASSERT(c->cs->state == CONN_STATE_LINGER); + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO() + "proxy: %s async tunneling timed out (state %i)", + req->proto, c->cs->state); + status = DONE; } - else if (ap_is_HTTP_ERROR(status)) { - proxy_http_async_cancel_cb(req); + if (status == SUSPENDED) { + /* Keep polling in the MPM */ + proxy_http_async_poll(req); } else { - proxy_http_async_finish(req); + /* Done with tunneling */ + proxy_http_async_done(req, status != OK); + } +} + +/* The suspend_connection hook called once the MPM gets the SUSPENDED connection */ +static void proxy_http_suspend_connection(conn_rec *c, request_rec *r) +{ + proxy_http_req_t *req = NULL; + + if (r) { + req = ap_get_module_config(r->request_config, &proxy_http_module); } + if (!req || !req->suspended) { + return; + } + ap_assert(req->r == r); + + proxy_http_async_poll(req); } static int stream_reqbody(proxy_http_req_t *req) @@ -1553,22 +1589,40 @@ int ap_proxy_http_process_response(proxy_http_req_t *req) "can't create tunnel for %s", upgrade); return HTTP_INTERNAL_SERVER_ERROR; } + if (req->can_suspend) { + /* If the MPM allows async polling, this thread will tunnel + * all it can now so long as it's not timeouting on the (short) + * async delay, returning to the MPM otherwise to get scheduled + * again when the connections are ready. + */ + req->tunnel->timeout = dconf->async_delay; + } + else { + /* If the MPM doesn't allow async polling, the full tunneling + * happens now in this thread and timeouting is a showstopper. + */ + req->tunnel->timeout = req->idle_timeout; + } r->status = HTTP_SWITCHING_PROTOCOLS; req->proto = upgrade; - - if (req->can_go_async) { - /* Let the MPM schedule the work when idle */ - req->state = PROXY_HTTP_TUNNELING; - req->tunnel->timeout = dconf->async_delay; - proxy_http_async_cb(req); + req->upgraded = 1; + + status = proxy_http_tunnel_pump(req); + if (status == SUSPENDED) { + /* Let the MPM call proxy_http_suspend_connection() when + * the connection is returned to it (i.e. not handled anywhere + * else anymore). This prevents the connection from being seen + * or handled by multiple threads at the same time, which could + * happen if we'd call ap_mpm_poll_suspended() directly from + * here, during the time for the connection to actually reaches + * the MPM whilst a new IO causes the connection to be + * rescheduled quickly. + */ + req->suspended = 1; return SUSPENDED; } - /* Let proxy tunnel forward everything within this thread */ - req->tunnel->timeout = req->idle_timeout; - status = ap_proxy_tunnel_run(req->tunnel); - /* Report bytes exchanged by the backend */ backend->worker->s->read += ap_proxy_tunnel_conn_bytes_in(req->tunnel->origin); @@ -1932,7 +1986,6 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, proxy_http_req_t *req = NULL; proxy_conn_rec *backend = NULL; apr_bucket_brigade *input_brigade = NULL; - int mpm_can_poll = 0; int is_ssl = 0; conn_rec *c = r->connection; proxy_dir_conf *dconf; @@ -1972,7 +2025,6 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, backend->is_ssl = is_ssl; dconf = ap_get_module_config(r->per_dir_config, &proxy_module); - ap_mpm_query(AP_MPMQ_CAN_POLL, &mpm_can_poll); req = apr_pcalloc(p, sizeof(*req)); req->p = p; @@ -1983,12 +2035,13 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, req->backend = backend; req->proto = scheme; req->bucket_alloc = c->bucket_alloc; - req->can_go_async = (mpm_can_poll && - dconf->async_delay_set && - dconf->async_delay >= 0); - req->state = PROXY_HTTP_REQ_HAVE_HEADER; + req->can_suspend = (mpm_can_poll_suspended && + dconf->async_delay_set && + dconf->async_delay >= 0); req->rb_method = RB_INIT; + ap_set_module_config(r->request_config, &proxy_http_module, req); + if (apr_table_get(r->subprocess_env, "force-proxy-request-1.0")) { req->force10 = 1; } @@ -2004,9 +2057,9 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, } } - if (req->can_go_async || req->upgrade) { + if (req->can_suspend || req->upgrade) { /* If ProxyAsyncIdleTimeout is not set, use backend timeout */ - if (req->can_go_async && dconf->async_idle_timeout_set) { + if (req->can_suspend && dconf->async_idle_timeout_set) { req->idle_timeout = dconf->async_idle_timeout; } else if (worker->s->timeout_set) { @@ -2045,7 +2098,7 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, * data to the backend ASAP? */ if (input_brigade - || req->can_go_async + || req->can_suspend || req->do_100_continue || apr_table_get(r->subprocess_env, "proxy-prefetch-nonblocking")) { @@ -2190,13 +2243,18 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker, static int proxy_http_post_config(apr_pool_t *pconf, apr_pool_t *plog, apr_pool_t *ptemp, server_rec *s) { - /* proxy_http_post_config() will be called twice during startup. So, don't * set up the static data the 1st time through. */ if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_PRE_CONFIG) { return OK; } +#ifdef AP_MPMQ_CAN_POLL_SUSPENDED + if (ap_mpm_query(AP_MPMQ_CAN_POLL_SUSPENDED, &mpm_can_poll_suspended)) { + mpm_can_poll_suspended = 0; + } +#endif + ap_proxy_clear_connection_fn = APR_RETRIEVE_OPTIONAL_FN(ap_proxy_clear_connection); if (!ap_proxy_clear_connection_fn) { @@ -2214,6 +2272,10 @@ static void ap_proxy_http_register_hook(apr_pool_t *p) proxy_hook_scheme_handler(proxy_http_handler, NULL, NULL, APR_HOOK_FIRST); proxy_hook_canon_handler(proxy_http_canon, NULL, NULL, APR_HOOK_FIRST); warn_rx = ap_pregcomp(p, "[0-9]{3}[ \t]+[^ \t]+[ \t]+\"[^\"]*\"([ \t]+\"([^\"]+)\")?", 0); + + /* For when the tunnel connections are suspended to and resumed from the MPM */ + ap_hook_suspend_connection(proxy_http_suspend_connection, NULL, NULL, APR_HOOK_FIRST); + ap_hook_resume_connection(proxy_http_resume_connection, NULL, NULL, APR_HOOK_FIRST); } AP_DECLARE_MODULE(proxy_http) = { diff --git a/modules/proxy/mod_proxy_wstunnel.c b/modules/proxy/mod_proxy_wstunnel.c index 0e5e6cb8128..3439b08b18d 100644 --- a/modules/proxy/mod_proxy_wstunnel.c +++ b/modules/proxy/mod_proxy_wstunnel.c @@ -17,13 +17,15 @@ #include "mod_proxy.h" #include "http_config.h" #include "ap_mpm.h" +#include "mpm_common.h" module AP_MODULE_DECLARE_DATA proxy_wstunnel_module; +static int mpm_can_poll_suspended = 0; + typedef struct { unsigned int fallback_to_proxy_http :1, fallback_to_proxy_http_set :1; - int mpm_can_poll; apr_time_t idle_timeout; apr_time_t async_delay; } proxyws_dir_conf; @@ -32,83 +34,130 @@ typedef struct ws_baton_t { request_rec *r; proxy_conn_rec *backend; proxy_tunnel_rec *tunnel; + apr_time_t idle_timeout; apr_pool_t *async_pool; const char *scheme; + int suspended; } ws_baton_t; static int can_fallback_to_proxy_http; -static void proxy_wstunnel_callback(void *b); - -static int proxy_wstunnel_pump(ws_baton_t *baton, int async) +static int proxy_wstunnel_pump(ws_baton_t *baton) { int status = ap_proxy_tunnel_run(baton->tunnel); if (status == HTTP_GATEWAY_TIME_OUT) { - if (!async) { + if (!mpm_can_poll_suspended) { /* ap_proxy_tunnel_run() didn't log this */ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, baton->r, APLOGNO(10225) - "Tunnel timed out"); + "proxy: %s tunneling timed out", + baton->scheme); } else { - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, APLOGNO(02542) - "Attempting to go async"); status = SUSPENDED; } } return status; } -static void proxy_wstunnel_finish(ws_baton_t *baton) -{ - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, "proxy_wstunnel_finish"); - ap_proxy_release_connection(baton->scheme, baton->backend, baton->r->server); - ap_finalize_request_protocol(baton->r); - ap_lingering_close(baton->r->connection); - ap_mpm_resume_suspended(baton->r->connection); - ap_process_request_after_handler(baton->r); /* don't touch baton or r after here */ +/* The backend and SUSPENDED client connections are done, + * release them (the latter in the MPM). + */ +static void proxy_wstunnel_done(ws_baton_t *baton, int cancelled) +{ + request_rec *r = baton->r; + conn_rec *c = r->connection; + proxy_conn_rec *backend = baton->backend; + + ap_log_rerror(APLOG_MARK, APLOG_TRACE5, 0, r, "proxy %s: %s async", + baton->scheme, cancelled ? "cancel" : "finish"); + + /* Upgraded connections not reusable */ + c->keepalive = AP_CONN_CLOSE; + backend->close = 1; + + ap_proxy_release_connection(baton->scheme, backend, r->server); + + ap_finalize_request_protocol(r); + ap_process_request_after_handler(r); + /* don't dereference baton or r from here! */ + + /* Return the client connection to the MPM */ + c->cs->state = CONN_STATE_LINGER; + ap_mpm_resume_suspended(c); } -/* If neither socket becomes readable in the specified timeout, - * this callback will kill the request. We do not have to worry about - * having a cancel and a IO both queued. - */ -static void proxy_wstunnel_cancel_callback(void *b) -{ - ws_baton_t *baton = (ws_baton_t*)b; - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, - "proxy_wstunnel_cancel_callback, IO timed out"); - proxy_wstunnel_finish(baton); +/* Tell the MPM to poll the connections and resume when ready */ +static void proxy_wstunnel_poll(ws_baton_t *baton) +{ + request_rec *r = baton->r; + conn_rec *c = r->connection; + + ap_log_rerror(APLOG_MARK, APLOG_TRACE5, 0, r, + "proxy %s: going async", baton->scheme); + + /* Create/clear the subpool used by the MPM to allocate + * the temporary data needed for this polling. + */ + if (baton->async_pool) { + apr_pool_clear(baton->async_pool); + } + else { + apr_pool_create(&baton->async_pool, r->pool); + } + + c->cs->state = CONN_STATE_SUSPENDED; + ap_mpm_poll_suspended(c, baton->async_pool, baton->tunnel->pfds, + baton->idle_timeout); } -/* Invoked by the event loop when data is ready on either end. - * Pump both ends until they'd block and then start over again - * We don't need the invoke_mtx, since we never put multiple callback events - * in the queue. - */ -static void proxy_wstunnel_callback(void *b) -{ - ws_baton_t *baton = (ws_baton_t*)b; +/* The resume_connection hook called by the MPM when polling completes (or times out) */ +static void proxy_wstunnel_resume_connection(conn_rec *c, request_rec *r) +{ + ws_baton_t *baton = NULL; + int status; - /* Clear MPM's temporary data */ - AP_DEBUG_ASSERT(baton->async_pool != NULL); - apr_pool_clear(baton->async_pool); + if (r) { + baton = ap_get_module_config(r->request_config, &proxy_wstunnel_module); + } + if (!baton || !baton->suspended) { + return; + } + ap_assert(baton->r == r); - if (proxy_wstunnel_pump(baton, 1) == SUSPENDED) { - proxyws_dir_conf *dconf = ap_get_module_config(baton->r->per_dir_config, - &proxy_wstunnel_module); + if (c->cs->state == CONN_STATE_SUSPENDED) { + status = proxy_wstunnel_pump(baton); + } + else { + AP_DEBUG_ASSERT(c->cs->state == CONN_STATE_LINGER); + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO() + "proxy: %s async tunneling timed out (state %i)", + baton->scheme, c->cs->state); + status = DONE; + } + if (status == SUSPENDED) { + /* Keep polling in the MPM */ + proxy_wstunnel_poll(baton); + } + else { + /* Done with tunneling */ + proxy_wstunnel_done(baton, status != OK); + } +} - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, - "proxy_wstunnel_callback suspend"); +/* The suspend_connection hook called once the MPM gets the SUSPENDED connection */ +static void proxy_wstunnel_suspend_connection(conn_rec *c, request_rec *r) +{ + ws_baton_t *baton = NULL; - ap_mpm_register_poll_callback_timeout(baton->async_pool, - baton->tunnel->pfds, - proxy_wstunnel_callback, - proxy_wstunnel_cancel_callback, - baton, dconf->idle_timeout); + if (r) { + baton = ap_get_module_config(r->request_config, &proxy_wstunnel_module); } - else { - proxy_wstunnel_finish(baton); + if (!baton || !baton->suspended) { + return; } + ap_assert(baton->r == r); + + proxy_wstunnel_poll(baton); } static int proxy_wstunnel_check_trans(request_rec *r, const char *url) @@ -296,51 +345,35 @@ static int proxy_wstunnel_request(apr_pool_t *p, request_rec *r, "error creating websocket tunnel"); return HTTP_INTERNAL_SERVER_ERROR; } + if (mpm_can_poll_suspended) { + tunnel->timeout = dconf->async_delay; + } + else { + tunnel->timeout = dconf->idle_timeout; + } baton = apr_pcalloc(r->pool, sizeof(*baton)); baton->r = r; baton->backend = conn; baton->tunnel = tunnel; baton->scheme = scheme; - - if (!dconf->mpm_can_poll) { - tunnel->timeout = dconf->idle_timeout; - status = proxy_wstunnel_pump(baton, 0); - } - else { - tunnel->timeout = dconf->async_delay; - status = proxy_wstunnel_pump(baton, 1); - if (status == SUSPENDED) { - /* Create the subpool used by the MPM to alloc its own - * temporary data, which we want to clear on the next - * round (above) to avoid leaks. - */ - apr_pool_create(&baton->async_pool, baton->r->pool); - - rv = ap_mpm_register_poll_callback_timeout( - baton->async_pool, - baton->tunnel->pfds, - proxy_wstunnel_callback, - proxy_wstunnel_cancel_callback, - baton, - dconf->idle_timeout); - if (rv == APR_SUCCESS) { - return SUSPENDED; - } - - if (APR_STATUS_IS_ENOTIMPL(rv)) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(02544) "No async support"); - tunnel->timeout = dconf->idle_timeout; - status = proxy_wstunnel_pump(baton, 0); /* force no async */ - } - else { - ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10211) - "error registering websocket tunnel"); - status = HTTP_INTERNAL_SERVER_ERROR; - } - } + baton->idle_timeout = dconf->idle_timeout; + ap_set_module_config(r->request_config, &proxy_wstunnel_module, baton); + + status = proxy_wstunnel_pump(baton); + if (status == SUSPENDED) { + /* Let the MPM call proxy_wstunnel_suspend_connection() when + * the connection is returned to it (i.e. not handled anywhere + * else anymore). This prevents the connection from being seen + * or handled by multiple threads at the same time, which could + * happen if we'd call ap_mpm_poll_suspended() directly from + * here, during the time for the connection to actually reaches + * the MPM whilst a new IO causes the connection to be + * rescheduled quickly. + */ + baton->suspended = 1; + return SUSPENDED; } - if (ap_is_HTTP_ERROR(status)) { /* Don't send an error page down an upgraded connection */ if (!tunnel->replied) { @@ -462,8 +495,6 @@ static void *create_proxyws_dir_config(apr_pool_t *p, char *dummy) new->fallback_to_proxy_http = 1; new->idle_timeout = -1; /* no timeout */ - ap_mpm_query(AP_MPMQ_CAN_POLL, &new->mpm_can_poll); - return (void *) new; } @@ -477,7 +508,6 @@ static void *merge_proxyws_dir_config(apr_pool_t *p, void *vbase, void *vadd) : base->fallback_to_proxy_http; new->fallback_to_proxy_http_set = (add->fallback_to_proxy_http_set || base->fallback_to_proxy_http_set); - new->mpm_can_poll = add->mpm_can_poll; new->idle_timeout = add->idle_timeout; new->async_delay = add->async_delay; @@ -514,6 +544,12 @@ static int proxy_wstunnel_post_config(apr_pool_t *pconf, apr_pool_t *plog, can_fallback_to_proxy_http = (ap_find_linked_module("mod_proxy_http.c") != NULL); +#ifdef AP_MPMQ_CAN_POLL_SUSPENDED + if (ap_mpm_query(AP_MPMQ_CAN_POLL_SUSPENDED, &mpm_can_poll_suspended)) { + mpm_can_poll_suspended = 0; + } +#endif + return OK; } @@ -542,6 +578,10 @@ static void ws_proxy_hooks(apr_pool_t *p) proxy_hook_scheme_handler(proxy_wstunnel_handler, NULL, aszSucc, APR_HOOK_FIRST); proxy_hook_check_trans(proxy_wstunnel_check_trans, NULL, aszSucc, APR_HOOK_MIDDLE); proxy_hook_canon_handler(proxy_wstunnel_canon, NULL, aszSucc, APR_HOOK_FIRST); + + /* For when the tunnel connections are suspended to and resumed from the MPM */ + ap_hook_suspend_connection(proxy_wstunnel_suspend_connection, NULL, NULL, APR_HOOK_FIRST); + ap_hook_resume_connection(proxy_wstunnel_resume_connection, NULL, NULL, APR_HOOK_FIRST); } AP_DECLARE_MODULE(proxy_wstunnel) = { diff --git a/modules/proxy/proxy_util.c b/modules/proxy/proxy_util.c index 88d174220d8..52595a03ec5 100644 --- a/modules/proxy/proxy_util.c +++ b/modules/proxy/proxy_util.c @@ -5898,7 +5898,7 @@ PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel) ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221) "proxy: %s: %s flushing failed (%i)", scheme, out->name, rc); - status = rc; + status = HTTP_BAD_GATEWAY; goto done; } diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 678fe71299f..2f6e2fab2f2 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -242,6 +242,8 @@ typedef struct event_srv_cfg_s event_srv_cfg; struct timeout_queue; static apr_thread_mutex_t *timeout_mutex; +struct user_poll_baton; + /* * The pollset for sockets that are in any of the timeout queues. Currently * we use the timeout_mutex to make sure that connections are added/removed @@ -293,6 +295,8 @@ struct event_conn_state_t { struct timeout_queue *q; /** the timer event for this entry */ timer_event_t *te; + /** user pollfds (for suspended connection) */ + struct user_poll_baton *user_baton; /* * when queued to workers @@ -313,6 +317,8 @@ struct event_conn_state_t { * hooks) */ suspended :1, + /** Did the connection timed out? */ + timed_out :1, /** Is lingering close from defer_lingering_close()? */ deferred_linger :1, /** Has ap_start_lingering_close() been called? */ @@ -493,6 +499,15 @@ static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *cs) apr_time_t elem_expiry; apr_time_t next_expiry; + /* It greatly simplifies the logic to use a single timeout value per q + * because the new element can just be added to the end of the list and + * it will stay sorted in expiration time sequence. If brand new + * sockets are sent to the event thread for a readability check, this + * will be a slight behavior change - they use the non-keepalive + * timeout today. With a normal client, the socket will be readable in + * a few milliseconds anyway. + */ + ap_assert(q && !cs->q); cs->q = q; @@ -615,14 +630,14 @@ typedef struct void *baton; } listener_poll_type; -typedef struct socket_callback_baton -{ - ap_mpm_callback_fn_t *cbfunc; - void *user_baton; +struct user_poll_baton { + apr_pool_t *pool; + event_conn_state_t *cs; apr_array_header_t *pfds; + apr_thread_mutex_t *mutex; /* pfds added/removed atomically */ timer_event_t *cancel_event; /* If a timeout was requested, a pointer to the timer event */ - struct socket_callback_baton *next; -} socket_callback_baton_t; + struct user_poll_baton *next; /* chaining */ +}; typedef struct event_child_bucket { ap_pod_t *pod; @@ -1118,6 +1133,9 @@ static int event_query(int query_code, int *result, apr_status_t *rv) case AP_MPMQ_CAN_WAITIO: *result = 1; break; + case AP_MPMQ_CAN_POLL_SUSPENDED: + *result = 1; + break; default: *rv = APR_ENOTIMPL; break; @@ -1221,14 +1239,6 @@ static apr_status_t decrement_connection_count(void *cs_) "connection %" CS_FMT_TO " cleaned up", CS_ARG_TO(cs)); - switch (cs->pub.state) { - case CONN_STATE_SUSPENDED: - apr_atomic_dec32(&suspended_count); - break; - default: - break; - } - /* Unblock the listener if it's waiting for connection_count = 0, * or if the listening sockets were disabled due to limits and can * now accept new connections. @@ -1249,15 +1259,24 @@ static apr_status_t decrement_connection_count(void *cs_) static void notify_suspend(event_conn_state_t *cs) { - ap_run_suspend_connection(cs->c, cs->r); - cs->c->sbh = NULL; + AP_DEBUG_ASSERT(!cs->suspended); + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, + "Suspend connection %" CS_FMT, CS_ARG(cs)); + apr_atomic_inc32(&suspended_count); cs->suspended = 1; + + cs->c->sbh = NULL; + cs->c->suspended_baton = cs; + ap_run_suspend_connection(cs->c, cs->r); } -static void notify_resume(event_conn_state_t *cs, int cleanup) +static void notify_resume(event_conn_state_t *cs) { - cs->suspended = 0; - cs->c->sbh = cleanup ? NULL : cs->sbh; + AP_DEBUG_ASSERT(cs->suspended); + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, + "Resume connection %" CS_FMT, CS_ARG(cs)); + + cs->c->sbh = cs->sbh; ap_run_resume_connection(cs->c, cs->r); } @@ -1359,12 +1378,13 @@ static void shutdown_connection(event_conn_state_t *cs, apr_time_t now, * if the connection is currently suspended as far as modules * know, provide notification of resumption. */ -static apr_status_t ptrans_pre_cleanup(void *dummy) +static apr_status_t ptrans_pre_cleanup(void *arg) { - event_conn_state_t *cs = dummy; - + event_conn_state_t *cs = arg; if (cs->suspended) { - notify_resume(cs, 1); + cs->sbh = NULL; + cs->pub.state = CONN_STATE_LINGER; + notify_resume(cs); } return APR_SUCCESS; } @@ -1439,7 +1459,8 @@ static int pollset_add_at(event_conn_state_t *cs, int sense, (int)cs->pfd.reqevents, CS_ARG(cs), at, line); - ap_assert(cs->q == NULL && cs->te == NULL && ((q != NULL) ^ (te != NULL))); + ap_assert((q != NULL) ^ (te != NULL)); + ap_assert(cs->q == NULL && cs->te == NULL); set_conn_state_sense(cs, sense); @@ -1496,8 +1517,6 @@ static int pollset_del_at(event_conn_state_t *cs, int locked, (int)cs->pfd.reqevents, CS_ARG(cs), at, line); - ap_assert((cs->q != NULL) ^ (cs->te != NULL)); - if (cs->q) { if (!locked) { apr_thread_mutex_lock(timeout_mutex); @@ -1507,7 +1526,7 @@ static int pollset_del_at(event_conn_state_t *cs, int locked, apr_thread_mutex_unlock(timeout_mutex); } } - else { + else if (cs->te) { cs->te->canceled = 1; cs->te = NULL; } @@ -1536,8 +1555,7 @@ static int pollset_del_at(event_conn_state_t *cs, int locked, /* Forward declare */ static timer_event_t *get_timer_event(apr_time_t timeout, ap_mpm_callback_fn_t *cbfn, void *baton, - int insert, - apr_array_header_t *pfds); + int insert); static void process_lingering_close(event_conn_state_t *cs); static event_conn_state_t *make_conn_state(apr_pool_t *p, apr_socket_t *csd) @@ -1639,22 +1657,28 @@ static void process_socket(apr_thread_t *thd, apr_pool_t *p, close_connection(cs); return; } - - cs->pub.sense = CONN_SENSE_DEFAULT; } else { /* The connection is scheduled back */ c = cs->c; c->current_thread = thd; c->id = conn_id; /* thread number is part of ID */ ap_update_sb_handle(cs->sbh, my_child_num, my_thread_num); - notify_resume(cs, 0); + } + + /* Suspended connections hooks run here and don't fall through */ + if (cs->suspended) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, + "resuming connection %" CS_FMT, CS_ARG(cs)); + notify_resume(cs); + return; } ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, "processing connection %" CS_FMT " (aborted %d, clogging %d)", CS_ARG(cs), c->aborted, c->clogging_input_filters); - if (cs->pub.state == CONN_STATE_LINGER) { + if (cs->pub.state == CONN_STATE_LINGER || c->aborted) { + cs->pub.state = CONN_STATE_LINGER; goto lingering_close; } @@ -1696,16 +1720,15 @@ static void process_socket(apr_thread_t *thd, apr_pool_t *p, * worker or prefork MPMs for instance. */ switch (rc) { - case DONE: - rc = OK; /* same as OK, fall through */ case OK: + case DONE: /* same as OK, fall through */ if (cs->pub.state == CONN_STATE_PROCESSING) { cs->pub.state = CONN_STATE_LINGER; } else if (cs->pub.state == CONN_STATE_KEEPALIVE) { cs->pub.state = CONN_STATE_WRITE_COMPLETION; } - break; + rc = OK; } if (rc != OK || (cs->pub.state != CONN_STATE_LINGER && cs->pub.state != CONN_STATE_ASYNC_WAITIO @@ -1734,7 +1757,6 @@ static void process_socket(apr_thread_t *thd, apr_pool_t *p, * event thread poll for read/writeability. */ ap_update_child_status(cs->sbh, SERVER_BUSY_READ, NULL); - notify_suspend(cs); /* If the connection timeout is actually different than the waitio_q's, * use a timer event to honor it (e.g. mod_reqtimeout may enforce its @@ -1746,7 +1768,7 @@ static void process_socket(apr_thread_t *thd, apr_pool_t *p, if (timeout < TIMERS_FUDGE_TIMEOUT) { timeout = TIMERS_FUDGE_TIMEOUT; } - te = get_timer_event(timeout, NULL, cs, 1, NULL); + te = get_timer_event(timeout, NULL, cs, 1); } else { q = cs->sc->io_q; @@ -1775,7 +1797,6 @@ static void process_socket(apr_thread_t *thd, apr_pool_t *p, } if (pending == AGAIN) { /* Let the event thread poll for write */ - notify_suspend(cs); cs->pub.sense = CONN_SENSE_DEFAULT; if (pollset_add(cs, CONN_SENSE_WANT_WRITE, cs->sc->wc_q, NULL)) { return; /* queued */ @@ -1803,16 +1824,7 @@ static void process_socket(apr_thread_t *thd, apr_pool_t *p, if (cs->pub.state == CONN_STATE_KEEPALIVE) { ap_update_child_status(cs->sbh, SERVER_BUSY_KEEPALIVE, NULL); - /* It greatly simplifies the logic to use a single timeout value per q - * because the new element can just be added to the end of the list and - * it will stay sorted in expiration time sequence. If brand new - * sockets are sent to the event thread for a readability check, this - * will be a slight behavior change - they use the non-keepalive - * timeout today. With a normal client, the socket will be readable in - * a few milliseconds anyway. - */ - notify_suspend(cs); - + cs->pub.sense = CONN_SENSE_DEFAULT; if (!pollset_add(cs, CONN_SENSE_WANT_READ, cs->ka_sc->ka_q, NULL)) { cs->pub.state = CONN_STATE_LINGER; goto lingering_close; @@ -1822,33 +1834,149 @@ static void process_socket(apr_thread_t *thd, apr_pool_t *p, } if (cs->pub.state == CONN_STATE_SUSPENDED) { - cs->c->suspended_baton = cs; - apr_atomic_inc32(&suspended_count); notify_suspend(cs); - return; /* done */ + return; /* suspended */ } lingering_close: process_lingering_close(cs); } +static apr_status_t user_poll_cleanup(void *data) +{ + struct user_poll_baton *user_baton = data; + apr_array_header_t *pfds = user_baton->pfds; + apr_status_t rc, final_rc = APR_SUCCESS; + int i; + + /* All the pollfds should be added/removed atomically, so synchronize + * with register_user_poll(). + */ + apr_thread_mutex_lock(user_baton->mutex); + for (i = 0; i < pfds->nelts; i++) { + apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i; + if (pfd->client_data) { + rc = apr_pollset_remove(event_pollset, pfd); + if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { + final_rc = rc; + } + pfd->client_data = NULL; + } + } + apr_thread_mutex_unlock(user_baton->mutex); + + if (final_rc) { + AP_DEBUG_ASSERT(0); + signal_threads(ST_GRACEFUL); + } + return final_rc; +} + +/* Put some user pollfds into the listener pollset for a SUSPENDED connection */ +static apr_status_t event_poll_suspended(conn_rec *c, apr_pool_t *p, + const apr_array_header_t *user_pfds, + apr_interval_time_t timeout) +{ + event_conn_state_t *cs = c->suspended_baton; + apr_status_t rc, final_rc = APR_SUCCESS; + struct user_poll_baton *user_baton; + apr_array_header_t *pfds; + listener_poll_type *pt; + int i; + + AP_DEBUG_ASSERT(cs != NULL); + AP_DEBUG_ASSERT(cs->suspended); + AP_DEBUG_ASSERT(user_pfds->nelts > 0); + if (cs == NULL) { + ap_log_cerror (APLOG_MARK, LOG_WARNING, 0, c, APLOGNO() + "event_poll_suspended: suspended_baton is NULL"); + return APR_EINVAL; + } + if (!cs->suspended) { + ap_log_cerror (APLOG_MARK, LOG_WARNING, 0, c, APLOGNO() + "event_poll_suspended: thread isn't suspended"); + return APR_EINVAL; + } + if (user_pfds->nelts <= 0) { + ap_log_cerror (APLOG_MARK, LOG_WARNING, 0, c, APLOGNO() + "event_poll_suspended: no poll FDs"); + return APR_EINVAL; + } + + cs->pub.state = CONN_STATE_SUSPENDED; + cs->user_baton = user_baton = apr_pcalloc(p, sizeof(*user_baton)); + apr_thread_mutex_create(&user_baton->mutex, APR_THREAD_MUTEX_DEFAULT, p); + user_baton->pfds = pfds = apr_array_copy(p, user_pfds); + user_baton->pool = p; + user_baton->cs = cs; + + apr_pool_pre_cleanup_register(p, user_baton, user_poll_cleanup); + + pt = apr_pcalloc(p, sizeof(*pt)); + pt->baton = user_baton; + pt->type = PT_USER; + + if (timeout >= 0) { + /* Prevent the timer from firing before the pollset is updated */ + if (timeout < TIMERS_FUDGE_TIMEOUT) { + timeout = TIMERS_FUDGE_TIMEOUT; + } + user_baton->cancel_event = get_timer_event(timeout, NULL, cs, 1); + } + cs->te = user_baton->cancel_event; + + /* All the pollfds should be added/removed atomically, so synchronize + * with user_poll_cleanup(). + */ + apr_thread_mutex_lock(user_baton->mutex); + for (i = 0; i < pfds->nelts; i++) { + apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i; + if (pfd->reqevents) { + if (pfd->reqevents & APR_POLLIN) { + pfd->reqevents |= APR_POLLHUP; + } + pfd->reqevents |= APR_POLLERR; + pfd->client_data = pt; + + rc = apr_pollset_add(event_pollset, pfd); + if (rc != APR_SUCCESS) { + final_rc = rc; + } + } + else { + pfd->client_data = NULL; + } + } + apr_thread_mutex_unlock(user_baton->mutex); + + if (final_rc) { + AP_DEBUG_ASSERT(0); + signal_threads(ST_GRACEFUL); + } + return final_rc; +} + /* Put a SUSPENDED connection back into a queue. */ -static apr_status_t event_resume_suspended (conn_rec *c) +static apr_status_t event_resume_suspended(conn_rec *c) { - event_conn_state_t* cs = (event_conn_state_t*) c->suspended_baton; + event_conn_state_t *cs = c->suspended_baton; + + AP_DEBUG_ASSERT(cs != NULL); + AP_DEBUG_ASSERT(cs->suspended); if (cs == NULL) { ap_log_cerror (APLOG_MARK, LOG_WARNING, 0, c, APLOGNO(02615) "event_resume_suspended: suspended_baton is NULL"); - return APR_EGENERAL; + return APR_EINVAL; } if (!cs->suspended) { ap_log_cerror (APLOG_MARK, LOG_WARNING, 0, c, APLOGNO(02616) - "event_resume_suspended: Thread isn't suspended"); - return APR_EGENERAL; + "event_resume_suspended: thread isn't suspended"); + return APR_EINVAL; } - apr_atomic_dec32(&suspended_count); - c->suspended_baton = NULL; + cs->c->suspended_baton = NULL; + cs->c->sbh = cs->sbh; + cs->suspended = 0; cs->pub.sense = CONN_SENSE_DEFAULT; if (cs->pub.state != CONN_STATE_LINGER) { @@ -1856,7 +1984,6 @@ static apr_status_t event_resume_suspended (conn_rec *c) if (pollset_add(cs, CONN_SENSE_WANT_WRITE, cs->sc->wc_q, NULL)) { return APR_SUCCESS; /* queued */ } - /* fall through lingering close on error */ cs->pub.state = CONN_STATE_LINGER; } @@ -2149,8 +2276,7 @@ static apr_thread_mutex_t *g_timer_skiplist_mtx; static timer_event_t *get_timer_event(apr_time_t timeout, ap_mpm_callback_fn_t *cbfn, void *baton, - int insert, - apr_array_header_t *pfds) + int insert) { timer_event_t *te; apr_time_t now = (timeout < 0) ? 0 : event_time_now(); @@ -2178,7 +2304,6 @@ static timer_event_t *get_timer_event(apr_time_t timeout, te->baton = baton; te->when = now + timeout; te->timeout = timeout; - te->pfds = pfds; if (insert) { apr_time_t next_expiry; @@ -2218,122 +2343,15 @@ static void put_timer_event(timer_event_t *te, int locked) } } -static apr_status_t event_register_timed_callback_ex(apr_time_t timeout, - ap_mpm_callback_fn_t *cbfn, - void *baton, - apr_array_header_t *pfds) -{ - if (!cbfn) { - return APR_EINVAL; - } - get_timer_event(timeout, cbfn, baton, 1, pfds); - return APR_SUCCESS; -} - static apr_status_t event_register_timed_callback(apr_time_t timeout, ap_mpm_callback_fn_t *cbfn, void *baton) { - event_register_timed_callback_ex(timeout, cbfn, baton, NULL); - return APR_SUCCESS; -} - -static apr_status_t event_cleanup_poll_callback(void *data) -{ - apr_status_t final_rc = APR_SUCCESS; - apr_array_header_t *pfds = data; - int i; - - for (i = 0; i < pfds->nelts; i++) { - apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i; - if (pfd->client_data) { - apr_status_t rc; - rc = apr_pollset_remove(event_pollset, pfd); - if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { - final_rc = rc; - } - pfd->client_data = NULL; - } - } - - if (final_rc) { - AP_DEBUG_ASSERT(0); - signal_threads(ST_GRACEFUL); - } - return final_rc; -} - -static apr_status_t event_register_poll_callback_ex(apr_pool_t *p, - const apr_array_header_t *pfds, - ap_mpm_callback_fn_t *cbfn, - ap_mpm_callback_fn_t *tofn, - void *baton, - apr_time_t timeout) -{ - listener_poll_type *pt; - socket_callback_baton_t *scb; - apr_status_t rc, final_rc = APR_SUCCESS; - int i; - - if (!cbfn || !tofn) { + if (!cbfn) { return APR_EINVAL; } - - scb = apr_pcalloc(p, sizeof(*scb)); - scb->cbfunc = cbfn; - scb->user_baton = baton; - scb->pfds = apr_array_copy(p, pfds); - - pt = apr_palloc(p, sizeof(*pt)); - pt->type = PT_USER; - pt->baton = scb; - - apr_pool_pre_cleanup_register(p, scb->pfds, event_cleanup_poll_callback); - - for (i = 0; i < scb->pfds->nelts; i++) { - apr_pollfd_t *pfd = (apr_pollfd_t *)scb->pfds->elts + i; - if (pfd->reqevents) { - if (pfd->reqevents & APR_POLLIN) { - pfd->reqevents |= APR_POLLHUP; - } - pfd->reqevents |= APR_POLLERR; - pfd->client_data = pt; - } - else { - pfd->client_data = NULL; - } - } - - if (timeout > 0) { - /* Prevent the timer from firing before the pollset is updated */ - if (timeout < TIMERS_FUDGE_TIMEOUT) { - timeout = TIMERS_FUDGE_TIMEOUT; - } - scb->cancel_event = get_timer_event(timeout, tofn, baton, 1, scb->pfds); - } - for (i = 0; i < scb->pfds->nelts; i++) { - apr_pollfd_t *pfd = (apr_pollfd_t *)scb->pfds->elts + i; - if (pfd->client_data) { - rc = apr_pollset_add(event_pollset, pfd); - if (rc != APR_SUCCESS) { - final_rc = rc; - } - } - } - return final_rc; -} - -static apr_status_t event_register_poll_callback(apr_pool_t *p, - const apr_array_header_t *pfds, - ap_mpm_callback_fn_t *cbfn, - void *baton) -{ - return event_register_poll_callback_ex(p, - pfds, - cbfn, - NULL, /* no timeout function */ - baton, - 0 /* no timeout */); + get_timer_event(timeout, cbfn, baton, 1); + return APR_SUCCESS; } /* @@ -2362,11 +2380,9 @@ static void process_lingering_close(event_conn_state_t *cs) conn_rec *c = cs->c; int rc = OK; - cs->pub.state = CONN_STATE_LINGER; - if (!cs->linger_started) { cs->linger_started = 1; /* once! */ - notify_suspend(cs); + cs->pub.state = CONN_STATE_LINGER; /* Shutdown the connection, i.e. pre_connection_close hooks, * SSL/TLS close notify, WC bucket, etc.. @@ -2429,8 +2445,7 @@ static void process_lingering_close(event_conn_state_t *cs) * Pre-condition: timeout_mutex must already be locked */ static unsigned int process_timeout_queue_ex(struct timeout_queue *queue, - apr_time_t now, - int shrink) + apr_time_t now, int shrink) { unsigned int count = 0; struct timeout_queue *q; @@ -2464,6 +2479,7 @@ static unsigned int process_timeout_queue_ex(struct timeout_queue *queue, break; } } + cs->timed_out = 1; if (cs_in_backlog(cs)) { /* Remove the backlog connection from worker_queue (note that @@ -2471,8 +2487,8 @@ static unsigned int process_timeout_queue_ex(struct timeout_queue *queue, * the backlog_q), and unreserve/set a worker/idler since * none could handle the event. */ - ap_assert(cs_qe(cs)->cb_baton == cs); ap_assert(cs->q == cs->sc->bl_q); + ap_assert(cs_qe(cs)->cb_baton == cs); ap_queue_info_idlers_inc(worker_queue_info); ap_queue_kill_event_locked(worker_queue, cs_qe(cs)); shutdown_connection(cs, now, 1); @@ -2586,7 +2602,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) apr_time_t next_expiry = -1; apr_interval_time_t timeout = -1; int workers_were_busy = 0, force_stats = 0; - socket_callback_baton_t *user_chain; + struct user_poll_baton *user_chain; const apr_pollfd_t *out_pfd; apr_time_t now, poll_time; event_conn_state_t *cs; @@ -2651,24 +2667,54 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) continue; } + /* A timer without a callback is a cancel event for a cs in + * either: + * 1. CONN_STATE_ASYNC_WAITIO: the timer enforces a timeout + * different from the cs->sc->io_q's; + * 2. CONN_STATE_SUSPENDED: the timer enforces a timeout for + * some user pollfds bound to the cs. + * In both cases te->baton is the (timed out) cs. + * For 1. we can shutdow the connection now, but for 2. we + * need to resume the suspended connection in a worker thread + * for the responsible module to know, which we do by setting + * CONN_STATE_LINGER but also cs->timed_out to make sure that, + * after the next/last ap_run_resume_connection(), this state + * is maintained/restored to issue the actual close. + */ if (!te->cbfunc) { cs = te->baton; + AP_DEBUG_ASSERT(cs != NULL); + AP_DEBUG_ASSERT(cs->te == te); put_timer_event(te, 1); - ap_assert(cs && cs->te == te); - ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, - "timed out connection %" CS_FMT, - CS_ARG(cs)); - (void)pollset_del(cs, 0); - kill_connection(cs, APR_TIMEUP); - continue; - } + cs->te = te = NULL; + cs->timed_out = 1; + + if (!cs->user_baton) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, + "timed out connection %" CS_FMT, + CS_ARG(cs)); + (void)pollset_del(cs, 0); + shutdown_connection(cs, now, 0); + continue; + } - if (te->pfds) { - /* remove all sockets from the pollset */ - apr_pool_cleanup_run(te->pfds->pool, te->pfds, - event_cleanup_poll_callback); + /* Remove all user pollfds from the pollset */ + AP_DEBUG_ASSERT(cs->user_baton->pfds != NULL); + apr_pool_cleanup_run(cs->user_baton->pool, cs->user_baton, + user_poll_cleanup); +#ifdef AP_DEBUG + memset(cs->user_baton, 0, sizeof(*cs->user_baton)); +#endif + cs->user_baton = NULL; + + AP_DEBUG_ASSERT(cs->suspended); + cs->pub.state = CONN_STATE_LINGER; } - push2worker(NULL, te, now, &workers_were_busy); + else { + cs = NULL; + } + + push2worker(cs, te, now, &workers_were_busy); } if (te) { next_expiry = te->when; @@ -2776,7 +2822,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) for (user_chain = NULL; num > 0; --num, ++out_pfd) { listener_poll_type *pt = out_pfd->client_data; - socket_callback_baton_t *baton; + struct user_poll_baton *user_baton; switch (pt->type) { case PT_CSD: @@ -2892,17 +2938,13 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) case PT_USER: /* Multiple pfds of the same baton might trigger in this pass * so chain once here and run the cleanup only after this loop - * to avoid lifetime issues (i.e. pfds->pool cleared while some - * of its pfd->client_data are still to be dereferenced here). + * to avoid lifetime issues (i.e. user_baton->pool cleared while + * some of its pfd->client_data are still to be dereferenced here). */ - baton = pt->baton; - if (baton != user_chain && !baton->next) { - if (baton->cancel_event) { - baton->cancel_event->canceled = 1; - baton->cancel_event = NULL; - } - baton->next = user_chain; - user_chain = baton; + user_baton = pt->baton; + if (user_baton != user_chain && !user_baton->next) { + user_baton->next = user_chain; + user_chain = user_baton; } break; } @@ -2910,21 +2952,32 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) /* Time to queue user callbacks chained above */ while (user_chain) { - socket_callback_baton_t *baton = user_chain; - user_chain = user_chain->next; - baton->next = NULL; + struct user_poll_baton *user_baton = user_chain; + user_chain = user_baton->next; + user_baton->next = NULL; + + cs = user_baton->cs; + AP_DEBUG_ASSERT(cs != NULL); + AP_DEBUG_ASSERT(cs->user_baton == user_baton); + AP_DEBUG_ASSERT(cs->te == user_baton->cancel_event); + AP_DEBUG_ASSERT(cs->pub.state == CONN_STATE_SUSPENDED); + AP_DEBUG_ASSERT(cs->suspended); + + /* Not cancellable anymore */ + if (cs->te) { + cs->te->canceled = 1; + cs->te = NULL; + } - /* remove all sockets from the pollset */ - apr_pool_cleanup_run(baton->pfds->pool, baton->pfds, - event_cleanup_poll_callback); + /* Remove all user pollfds from the pollset */ + apr_pool_cleanup_run(user_baton->pool, user_baton, + user_poll_cleanup); +#ifdef AP_DEBUG + memset(user_baton, 0, sizeof(*user_baton)); +#endif - /* masquerade as a timer event that is firing */ - te = get_timer_event(-1 /* fake timer */, - baton->cbfunc, - baton->user_baton, - 0, /* don't insert it */ - NULL /* no associated socket callback */); - push2worker(NULL, te, now, &workers_were_busy); + /* Schedule ap_run_resume_connection() */ + push2worker(cs, NULL, now, &workers_were_busy); } /* We process the timeout queues here only when the global @@ -2955,6 +3008,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) */ process_timeout_queue(shutdown_q, now); + /* No specific requirement/order for those */ process_timeout_queue(waitio_q, now); process_timeout_queue(write_completion_q, now); process_timeout_queue(keepalive_q, now); @@ -4429,7 +4483,6 @@ static void setup_slave_conn(conn_rec *c, void *csd) cs = make_conn_state(c->pool, csd); cs->c = c; cs->sc = mcs->sc; - cs->suspended = 0; cs->bucket_alloc = c->bucket_alloc; cs->pfd = mcs->pfd; cs->pub = mcs->pub; @@ -5076,14 +5129,11 @@ static void event_hooks(apr_pool_t * p) ap_hook_mpm_query(event_query, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_mpm_register_timed_callback(event_register_timed_callback, NULL, NULL, APR_HOOK_MIDDLE); - ap_hook_mpm_register_poll_callback(event_register_poll_callback, - NULL, NULL, APR_HOOK_MIDDLE); - ap_hook_mpm_register_poll_callback_timeout(event_register_poll_callback_ex, - NULL, NULL, APR_HOOK_MIDDLE); ap_hook_pre_read_request(event_pre_read_request, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_post_read_request(event_post_read_request, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_mpm_get_name(event_get_name, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_mpm_resume_suspended(event_resume_suspended, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_mpm_poll_suspended(event_poll_suspended, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_pre_connection(event_pre_connection, NULL, NULL, APR_HOOK_REALLY_FIRST); ap_hook_protocol_switch(event_protocol_switch, NULL, NULL, APR_HOOK_REALLY_FIRST); diff --git a/server/mpm_common.c b/server/mpm_common.c index 2973bc9f4f2..d055fa2fd99 100644 --- a/server/mpm_common.c +++ b/server/mpm_common.c @@ -68,10 +68,9 @@ APR_HOOK_LINK(mpm) \ APR_HOOK_LINK(mpm_query) \ APR_HOOK_LINK(mpm_register_timed_callback) \ - APR_HOOK_LINK(mpm_register_poll_callback) \ - APR_HOOK_LINK(mpm_register_poll_callback_timeout) \ APR_HOOK_LINK(mpm_get_name) \ APR_HOOK_LINK(mpm_resume_suspended) \ + APR_HOOK_LINK(mpm_poll_suspended) \ APR_HOOK_LINK(end_generation) \ APR_HOOK_LINK(child_status) \ APR_HOOK_LINK(output_pending) \ @@ -111,16 +110,11 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_timed_callback, AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_resume_suspended, (conn_rec *c), (c), APR_ENOTIMPL) -AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_poll_callback, - (apr_pool_t *p, const apr_array_header_t *pds, - ap_mpm_callback_fn_t *cbfn, void *baton), - (p, pds, cbfn, baton), APR_ENOTIMPL) -AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_poll_callback_timeout, - (apr_pool_t *p, const apr_array_header_t *pds, - ap_mpm_callback_fn_t *cbfn, - ap_mpm_callback_fn_t *tofn, - void *baton, apr_time_t timeout), - (p, pds, cbfn, tofn, baton, timeout), APR_ENOTIMPL) +AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_poll_suspended, + (conn_rec *c, apr_pool_t *p, + const apr_array_header_t *pfds, + apr_interval_time_t timeout), + (c, p, pfds, timeout), APR_ENOTIMPL) AP_IMPLEMENT_HOOK_RUN_FIRST(int, output_pending, (conn_rec *c), (c), DECLINED) AP_IMPLEMENT_HOOK_RUN_FIRST(int, input_pending, @@ -573,26 +567,17 @@ AP_DECLARE(apr_status_t) ap_mpm_resume_suspended(conn_rec *c) return ap_run_mpm_resume_suspended(c); } -AP_DECLARE(apr_status_t) ap_mpm_register_timed_callback(apr_time_t t, - ap_mpm_callback_fn_t *cbfn, void *baton) +AP_DECLARE(apr_status_t) ap_mpm_poll_suspended(conn_rec *c, apr_pool_t *p, + const apr_array_header_t *pfds, + apr_interval_time_t timeout) { - return ap_run_mpm_register_timed_callback(t, cbfn, baton); + return ap_run_mpm_poll_suspended(c, p, pfds, timeout); } -AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback( - apr_pool_t *p, const apr_array_header_t *pfds, +AP_DECLARE(apr_status_t) ap_mpm_register_timed_callback(apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton) { - return ap_run_mpm_register_poll_callback(p, pfds, cbfn, baton); -} - -AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback_timeout( - apr_pool_t *p, const apr_array_header_t *pfds, - ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, - void *baton, apr_time_t timeout) -{ - return ap_run_mpm_register_poll_callback_timeout(p, pfds, cbfn, tofn, - baton, timeout); + return ap_run_mpm_register_timed_callback(t, cbfn, baton); } AP_DECLARE(const char *)ap_show_mpm(void) diff --git a/server/mpm_fdqueue.h b/server/mpm_fdqueue.h index 29297fd60d5..4bb17c82955 100644 --- a/server/mpm_fdqueue.h +++ b/server/mpm_fdqueue.h @@ -89,7 +89,6 @@ struct timer_event_t ap_mpm_callback_fn_t *cbfunc; void *baton; int canceled; - apr_array_header_t *pfds; apr_interval_time_t timeout; }; typedef struct timer_event_t timer_event_t;