Skip to content

Commit

Permalink
kevent() should wait indefinitely when timeout < 0; also introduced c…
Browse files Browse the repository at this point in the history
…onn_to_ctx helper func
  • Loading branch information
manjuraj committed Sep 20, 2013
1 parent 8108a11 commit a31b722
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 40 deletions.
Binary file added notes/kqueue.pdf
Binary file not shown.
5 changes: 2 additions & 3 deletions src/event/nc_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ event_wait(struct event_base *evb, int timeout)
int ep = evb->ep;
struct epoll_event *event = evb->event;
int nevent = evb->nevent;
event_cb_t cb = evb->cb;

ASSERT(ep > 0);
ASSERT(event != NULL);
Expand All @@ -231,8 +230,8 @@ event_wait(struct event_base *evb, int timeout)
events |= EVENT_WRITE;
}

if (cb != NULL) {
cb(ev->data.ptr, events);
if (evb->cb != NULL) {
evb->cb(ev->data.ptr, events);
}
}
return nsd;
Expand Down
54 changes: 40 additions & 14 deletions src/event/nc_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,33 +209,59 @@ int
event_wait(struct event_base *evb, int timeout)
{
int kq = evb->kq;
struct timespec ts = nc_msec_to_timespec(timeout);
event_cb_t cb = evb->cb;
struct timespec ts, *tsp;

ASSERT(kq > 0);

/* kevent should block indefinitely if timeout < 0 */
if (timeout < 0) {
tsp = NULL;
} else {
tsp = &ts;
tsp->tv_sec = timeout / 1000LL;
tsp->tv_nsec = (timeout % 1000LL) * 1000000LL;
}

for (;;) {
/*
* kevent() is used both to register new events with kqueue, and to
* retrieve any pending events. Changes that should be applied to the
* kqueue are given in the change[] and any returned events are placed
* in event[], up to the maximum sized allowed by nevent. The number
* of entries actually placed in event[] is returned by the kevent()
* call and saved in nreturned.
*
* Events are registered with the system by the application via a
* struct kevent, and an event is uniquely identified with the system
* by a (kq, ident, filter) tuple. This means that there can be only
* one (ident, filter) pair for a given kqueue
*/
evb->nreturned = kevent(kq, evb->change, evb->nchange, evb->event,
evb->nevent, &ts);
evb->nevent, tsp);
evb->nchange = 0;
if (evb->nreturned > 0) {
for (evb->nprocessed = 0; evb->nprocessed < evb->nreturned;
evb->nprocessed++) {
struct kevent *ev = &evb->event[evb->nprocessed];
uint32_t events = 0;

/*
* If an error occurs while processing an element of the
* change[] and there is enough room in the event[], then the
* event event will be placed in the eventlist with EV_ERROR
* set in flags and the system error(errno) in data.
*/
if (ev->flags & EV_ERROR) {
/*
* Error messages that can happen, when a delete fails.
* EBADF happens when the file descriptor has been
* closed,
* ENOENT when the file descriptor was closed and
* then reopened.
* EBADF happens when the file descriptor has been closed
* ENOENT when the file descriptor was closed and then
* reopened.
* EINVAL for some reasons not understood; EINVAL
* should not be returned ever; but FreeBSD does :-\
* An error is also indicated when a callback deletes
* an event we are still processing. In that case
* the data field is set to ENOENT.
* An error is also indicated when a callback deletes an
* event we are still processing. In that case the data
* field is set to ENOENT.
*/
if (ev->data == EBADF || ev->data == EINVAL ||
ev->data == ENOENT) {
Expand All @@ -252,16 +278,16 @@ event_wait(struct event_base *evb, int timeout)
events |= EVENT_WRITE;
}

if (cb != NULL && events != 0) {
cb(ev->udata, events);
if (evb->cb != NULL && events != 0) {
evb->cb(ev->udata, events);
}
}
return evb->nreturned;
}

if (evb->nreturned == 0) {
if (timeout == -1) {
log_error("kqueue on kq %d with %d events and %d timeout "
log_error("kevent on kq %d with %d events and %d timeout "
"returned no events", kq, evb->nevent, timeout);
return -1;
}
Expand All @@ -275,9 +301,9 @@ event_wait(struct event_base *evb, int timeout)

log_error("kevent on kq %d with %d events failed: %s", kq, evb->nevent,
strerror(errno));

return -1;
}

NOT_REACHED();
}

Expand Down
19 changes: 18 additions & 1 deletion src/nc_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,29 @@
* out_q ensures that we always send back the response of request at the head
* of the queue, before sending out responses of other completed requests in
* the queue.
*
*/

static uint32_t nfree_connq; /* # free conn q */
static struct conn_tqh free_connq; /* free conn q */

/*
* Return the context associated with this connection.
*/
struct context *
conn_to_ctx(struct conn *conn)
{
struct server_pool *pool;

if (conn->proxy || conn->client) {
pool = conn->owner;
} else {
struct server *server = conn->owner;
pool = server->owner;
}

return pool->ctx;
}

static struct conn *
_conn_get(void)
{
Expand Down
1 change: 1 addition & 0 deletions src/nc_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ struct conn {

TAILQ_HEAD(conn_tqh, conn);

struct context *conn_to_ctx(struct conn *conn);
struct conn *conn_get(void *owner, bool client, bool redis);
struct conn *conn_get_proxy(void *owner);
void conn_put(struct conn *conn);
Expand Down
13 changes: 3 additions & 10 deletions src/nc_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
static uint32_t ctx_id; /* context generation */

/* function prototype for use in core_ctx_create() */
static void
core_core(void *arg, uint32_t events);
static void core_core(void *arg, uint32_t events);

static struct context *
core_ctx_create(struct instance *nci)
Expand Down Expand Up @@ -278,14 +277,8 @@ static void
core_core(void *arg, uint32_t events)
{
rstatus_t status;
struct conn *conn = (struct conn *) arg;
struct context *ctx;

if ((conn->proxy) || (conn->client)) {
ctx = ((struct server_pool *) (conn -> owner)) -> ctx;
} else {
ctx = ((struct server_pool *) (((struct server *) (conn -> owner)) -> owner )) -> ctx;
}
struct conn *conn = arg;
struct context *ctx = conn_to_ctx(conn);

log_debug(LOG_VVERB, "event %04"PRIX32" on %c %d", events,
conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd);
Expand Down
1 change: 1 addition & 0 deletions src/nc_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ struct instance;
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <netinet/in.h>

#include <nc_array.h>
Expand Down
11 changes: 0 additions & 11 deletions src/nc_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -622,14 +622,3 @@ nc_unresolve_desc(int sd)

return nc_unresolve_addr(addr, addrlen);
}

struct timespec
nc_msec_to_timespec(int msec)
{
struct timeval tv = {msec / 1000LL, (msec % 1000LL) * 1000LL};
struct timespec ts;

TIMEVAL_TO_TIMESPEC(&tv, &ts);

return ts;
}
1 change: 0 additions & 1 deletion src/nc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ int _scnprintf(char *buf, size_t size, const char *fmt, ...);
int _vscnprintf(char *buf, size_t size, const char *fmt, va_list args);
int64_t nc_usec_now(void);
int64_t nc_msec_now(void);
struct timespec nc_msec_to_timespec(int msec);

/*
* Address resolution for internet (ipv4 and ipv6) and unix domain
Expand Down

0 comments on commit a31b722

Please sign in to comment.