Skip to content

Commit

Permalink
mqueue: refactor of mqueue
Browse files Browse the repository at this point in the history
1. mainly for long time within critical_section
2. move out memcpy buffer from enter_critical_section()
3. let mq_send() return fail when mq buffer full in ISR

Signed-off-by: ligd <[email protected]>
  • Loading branch information
GUIDINGLI committed Sep 25, 2024
1 parent 960f515 commit f42296e
Show file tree
Hide file tree
Showing 8 changed files with 1,086 additions and 423 deletions.
2 changes: 0 additions & 2 deletions sched/mqueue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ if(NOT CONFIG_DISABLE_MQUEUE)
APPEND
SRCS
mq_send.c
mq_timedsend.c
mq_sndinternal.c
mq_receive.c
mq_timedreceive.c
mq_rcvinternal.c
mq_msgfree.c
mq_msgqalloc.c
Expand Down
4 changes: 2 additions & 2 deletions sched/mqueue/Make.defs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ endif

ifneq ($(CONFIG_DISABLE_MQUEUE),y)

CSRCS += mq_send.c mq_timedsend.c mq_sndinternal.c mq_receive.c
CSRCS += mq_timedreceive.c mq_rcvinternal.c mq_getattr.c
CSRCS += mq_send.c mq_sndinternal.c mq_receive.c
CSRCS += mq_rcvinternal.c mq_getattr.c
CSRCS += mq_msgfree.c mq_msgqalloc.c mq_msgqfree.c
CSRCS += mq_setattr.c mq_notify.c

Expand Down
7 changes: 7 additions & 0 deletions sched/mqueue/mq_msgfree.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <nuttx/irq.h>
#include <nuttx/arch.h>
#include <nuttx/kmalloc.h>
#include <nuttx/spinlock.h>

#include "mqueue/mqueue.h"

Expand All @@ -56,6 +57,8 @@

void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
{
irqstate_t flags;

/* If this is a generally available pre-allocated message,
* then just put it back in the free list.
*/
Expand All @@ -66,7 +69,9 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
* list from interrupt handlers.
*/

flags = spin_lock_irqsave(NULL);
list_add_tail(&g_msgfree, &mqmsg->node);
spin_unlock_irqrestore(NULL, flags);
}

/* If this is a message pre-allocated for interrupts,
Expand All @@ -79,7 +84,9 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
* list from interrupt handlers.
*/

flags = spin_lock_irqsave(NULL);
list_add_tail(&g_msgfreeirq, &mqmsg->node);
spin_unlock_irqrestore(NULL, flags);
}

/* Otherwise, deallocate it. Note: interrupt handlers
Expand Down
206 changes: 72 additions & 134 deletions sched/mqueue/mq_rcvinternal.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,66 +44,56 @@
#include "mqueue/mqueue.h"

/****************************************************************************
* Public Functions
* Private Functions
****************************************************************************/

/****************************************************************************
* Name: nxmq_verify_receive
* Name: nxmq_rcvtimeout
*
* Description:
* This is internal, common logic shared by both [nx]mq_receive and
* [nx]mq_timedreceive. This function verifies the input parameters that
* are common to both functions.
* This function is called if the timeout elapses before the message queue
* becomes non-empty.
*
* Input Parameters:
* msgq - Message queue descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* arg - the argument provided when the timeout was configured.
*
* Returned Value:
* On success, zero (OK) is returned. A negated errno value is returned
* on any failure:
* None
*
* EBADF Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the message
* queue.
* EINVAL Invalid 'msg' or 'msgq'
* Assumptions:
*
****************************************************************************/

#ifdef CONFIG_DEBUG_FEATURES
int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen)
static void nxmq_rcvtimeout(wdparm_t arg)
{
FAR struct inode *inode = mq->f_inode;
FAR struct mqueue_inode_s *msgq;
FAR struct tcb_s *wtcb = (FAR struct tcb_s *)(uintptr_t)arg;
irqstate_t flags;

if (inode == NULL)
{
return -EBADF;
}
/* Disable interrupts. This is necessary because an interrupt handler may
* attempt to send a message while we are doing this.
*/

msgq = inode->i_private;
flags = enter_critical_section();

/* Verify the input parameters */
/* It is also possible that an interrupt/context switch beat us to the
* punch and already changed the task's state.
*/

if (!msg || !msgq)
if (wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY)
{
return -EINVAL;
}
/* Restart with task with a timeout error */

if ((mq->f_oflags & O_RDOK) == 0)
{
return -EBADF;
nxmq_wait_irq(wtcb, ETIMEDOUT);
}

if (msglen < (size_t)msgq->maxmsgsize)
{
return -EMSGSIZE;
}
/* Interrupts may now be re-enabled. */

return OK;
leave_critical_section(flags);
}
#endif

/****************************************************************************
* Public Functions
****************************************************************************/

/****************************************************************************
* Name: nxmq_wait_receive
Expand All @@ -116,9 +106,10 @@ int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen)
*
* Input Parameters:
* msgq - Message queue descriptor
* oflags - flags from user set
* rcvmsg - The caller-provided location in which to return the newly
* received message.
* abstime - If non-NULL, this is the absolute time to wait until a
* message is received.
*
* Returned Value:
* On success, zero (OK) is returned. A negated errno value is returned
Expand All @@ -134,12 +125,12 @@ int nxmq_verify_receive(FAR struct file *mq, FAR char *msg, size_t msglen)
****************************************************************************/

int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
int oflags, FAR struct mqueue_msg_s **rcvmsg)
FAR struct mqueue_msg_s **rcvmsg,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct mqueue_msg_s *newmsg;
FAR struct tcb_s *rtcb;

DEBUGASSERT(rcvmsg != NULL);
FAR struct tcb_s *rtcb = this_task();

#ifdef CONFIG_CANCELLATION_POINTS
/* nxmq_wait_receive() is not a cancellation point, but it may be called
Expand All @@ -156,139 +147,90 @@ int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
}
#endif

if (abstime)
{
wd_start_realtime(&rtcb->waitdog, abstime,
nxmq_rcvtimeout, (wdparm_t)rtcb);
}
else if (ticks >= 0)
{
wd_start(&rtcb->waitdog, ticks,
nxmq_rcvtimeout, (wdparm_t)rtcb);
}

/* Get the message from the head of the queue */

while ((newmsg = (FAR struct mqueue_msg_s *)
list_remove_head(&msgq->msglist)) == NULL)
{
/* The queue is empty! Should we block until there the above condition
* has been satisfied?
*/

if ((oflags & O_NONBLOCK) == 0)
{
/* Yes.. Block and try again */

rtcb = this_task();
rtcb->waitobj = msgq;
msgq->cmn.nwaitnotempty++;

/* Initialize the 'errcode" used to communication wake-up error
* conditions.
*/

rtcb->errcode = OK;
msgq->cmn.nwaitnotempty++;

/* Make sure this is not the idle task, descheduling that
* isn't going to end well.
*/
/* Initialize the 'errcode" used to communication wake-up error
* conditions.
*/

DEBUGASSERT(!is_idle_task(rtcb));
rtcb->waitobj = msgq;
rtcb->errcode = OK;

/* Remove the tcb task from the running list. */
/* Remove the tcb task from the running list. */

nxsched_remove_self(rtcb);
nxsched_remove_self(rtcb);

/* Add the task to the specified blocked task list */
/* Add the task to the specified blocked task list */

rtcb->task_state = TSTATE_WAIT_MQNOTEMPTY;
nxsched_add_prioritized(rtcb, MQ_WNELIST(msgq->cmn));
rtcb->task_state = TSTATE_WAIT_MQNOTEMPTY;
nxsched_add_prioritized(rtcb, MQ_WNELIST(msgq->cmn));

/* Now, perform the context switch */
/* Now, perform the context switch */

up_switch_context(this_task(), rtcb);
up_switch_context(this_task(), rtcb);

/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* errno value (should be either EINTR or ETIMEDOUT).
*/
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* errno value (should be either EINTR or ETIMEDOUT).
*/

if (rtcb->errcode != OK)
{
return -rtcb->errcode;
}
}
else
if (rtcb->errcode != OK)
{
/* The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description.
*/

return -EAGAIN;
break;
}
}

/* If we got message, then decrement the number of messages in
* the queue while we are still in the critical section
*/

if (newmsg)
if (abstime || ticks >= 0)
{
if (msgq->nmsgs-- == msgq->maxmsgs)
{
nxmq_pollnotify(msgq, POLLOUT);
}
wd_cancel(&rtcb->waitdog);
}

*rcvmsg = newmsg;
return OK;
return -rtcb->errcode;
}

/****************************************************************************
* Name: nxmq_do_receive
* Name: nxmq_notify_receive
*
* Description:
* This is internal, common logic shared by both [nx]mq_receive and
* [nx]mq_timedreceive. This function accepts the message obtained by
* mq_waitmsg, provides the message content to the user, notifies any
* threads that were waiting for the message queue to become non-full,
* and disposes of the message structure
* [nx]mq_timedreceive.
* This function notifies any tasks that are waiting for the message queue
* to become non-empty. This function is called after a message is
* received from the message queue.
*
* Input Parameters:
* msgq - Message queue descriptor
* mqmsg - The message obtained by mq_waitmsg()
* ubuffer - The address of the user provided buffer to receive the message
* prio - The user-provided location to return the message priority.
*
* Returned Value:
* Returns the length of the received message. This function does not
* fail.
*
* Assumptions:
* - The caller has provided all validity checking of the input parameters
* using nxmq_verify_receive.
* - The user buffer, ubuffer, is known to be large enough to accept the
* largest message that an be sent on this message queue
* - Pre-emption should be disabled throughout this call.
*
****************************************************************************/

ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq,
FAR struct mqueue_msg_s *mqmsg,
FAR char *ubuffer, FAR unsigned int *prio)
void nxmq_notify_receive(FAR struct mqueue_inode_s *msgq)
{
FAR struct tcb_s *btcb;
ssize_t rcvmsglen;

/* Get the length of the message (also the return value) */

rcvmsglen = mqmsg->msglen;

/* Copy the message into the caller's buffer */

memcpy(ubuffer, (FAR const void *)mqmsg->mail, rcvmsglen);

/* Copy the message priority as well (if a buffer is provided) */

if (prio)
{
*prio = mqmsg->priority;
}

/* We are done with the message. Deallocate it now. */

nxmq_free_msg(mqmsg);

/* Check if any tasks are waiting for the MQ not full event. */

Expand Down Expand Up @@ -331,8 +273,4 @@ ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq,
up_switch_context(btcb, rtcb);
}
}

/* Return the length of the message transferred to the user buffer */

return rcvmsglen;
}
Loading

0 comments on commit f42296e

Please sign in to comment.