Skip to content

Commit

Permalink
wqueue: expose wqueue API for customization
Browse files Browse the repository at this point in the history
Signed-off-by: ligd <[email protected]>
  • Loading branch information
GUIDINGLI authored and xiaoxiang781216 committed Aug 29, 2024
1 parent f5095d7 commit ce2ad51
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 142 deletions.
78 changes: 68 additions & 10 deletions include/nuttx/wqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@

#ifndef __ASSEMBLY__

/* Work queue forward declaration */

struct kwork_wqueue_s;

/* Defines the work callback */

typedef CODE void (*worker_t)(FAR void *arg);
Expand All @@ -247,13 +251,14 @@ struct work_s
{
struct
{
struct dq_entry_s dq; /* Implements a double linked list */
clock_t qtime; /* Time work queued */
struct dq_entry_s dq; /* Implements a double linked list */
clock_t qtime; /* Time work queued */
} s;
struct wdog_s timer; /* Delay expiry timer */
struct wdog_s timer; /* Delay expiry timer */
} u;
worker_t worker; /* Work callback */
FAR void *arg; /* Callback argument */
worker_t worker; /* Work callback */
FAR void *arg; /* Callback argument */
FAR struct kwork_wqueue_s *wq; /* Work queue */
};

/* This is an enumeration of the various events that may be
Expand Down Expand Up @@ -330,7 +335,50 @@ int work_usrstart(void);
#endif

/****************************************************************************
* Name: work_queue
* Name: work_queue_create
*
* Description:
* Create a new work queue. The work queue is identified by its work
* queue ID, which is used to queue works to the work queue and to
* perform other operations on the work queue.
* This function will create a work thread pool with nthreads threads.
* The work queue ID is returned on success.
*
* Input Parameters:
* name - Name of the new task
* priority - Priority of the new task
* stack_size - size (in bytes) of the stack needed
* nthreads - Number of work thread should be created
*
* Returned Value:
* The work queue handle returned on success. Otherwise, NULL
*
****************************************************************************/

FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name,
int priority,
int stack_size, int nthreads);

/****************************************************************************
* Name: work_queue_free
*
* Description:
* Destroy a work queue. The work queue is identified by its work queue ID.
* All worker threads will be destroyed and the work queue will be freed.
* The work queue ID is invalid after this function returns.
*
* Input Parameters:
* wqueue - The work queue handle
*
* Returned Value:
* Zero on success, a negated errno value on failure.
*
****************************************************************************/

int work_queue_free(FAR struct kwork_wqueue_s *wqueue);

/****************************************************************************
* Name: work_queue/work_queue_wq
*
* Description:
* Queue work to be performed at a later time. All queued work will be
Expand All @@ -344,7 +392,8 @@ int work_usrstart(void);
* pending work will be canceled and lost.
*
* Input Parameters:
* qid - The work queue ID
* qid - The work queue ID (must be HPWORK or LPWORK)
* wqueue - The work queue handle
* work - The work structure to queue
* worker - The worker callback to be invoked. The callback will be
* invoked on the worker thread of execution.
Expand All @@ -360,17 +409,21 @@ int work_usrstart(void);

int work_queue(int qid, FAR struct work_s *work, worker_t worker,
FAR void *arg, clock_t delay);
int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
FAR struct work_s *work, worker_t worker,
FAR void *arg, clock_t delay);

/****************************************************************************
* Name: work_cancel
* Name: work_cancel/work_cancel_wq
*
* Description:
* Cancel previously queued work. This removes work from the work queue.
* After work has been cancelled, it may be requeued by calling
* work_queue() again.
*
* Input Parameters:
* qid - The work queue ID
* qid - The work queue ID (must be HPWORK or LPWORK)
* wqueue - The work queue handle
* work - The previously queued work structure to cancel
*
* Returned Value:
Expand All @@ -382,9 +435,11 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
****************************************************************************/

int work_cancel(int qid, FAR struct work_s *work);
int work_cancel_wq(FAR struct kwork_wqueue_s *wqueue,
FAR struct work_s *work);

/****************************************************************************
* Name: work_cancel_sync
* Name: work_cancel_sync/work_cancel_sync_wq
*
* Description:
* Blocked cancel previously queued user-mode work. This removes work
Expand All @@ -393,6 +448,7 @@ int work_cancel(int qid, FAR struct work_s *work);
*
* Input Parameters:
* qid - The work queue ID (must be HPWORK or LPWORK)
* wqueue - The work queue handle
* work - The previously queued work structure to cancel
*
* Returned Value:
Expand All @@ -405,6 +461,8 @@ int work_cancel(int qid, FAR struct work_s *work);
****************************************************************************/

int work_cancel_sync(int qid, FAR struct work_s *work);
int work_cancel_sync_wq(FAR struct kwork_wqueue_s *wqueue,
FAR struct work_s *work);

/****************************************************************************
* Name: work_available
Expand Down
89 changes: 30 additions & 59 deletions sched/wqueue/kwork_cancel.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@
* work_queue() again.
*
* Input Parameters:
* wqueue - The work queue to use. Must be HPWORK or LPWORK
* nthread - The number of threads in the work queue
* > 0 unsynchronous cancel
* < 0 synchronous cancel
* wqueue - The work queue to use
* sync - true: synchronous cancel
* false: asynchronous cancel
* work - The previously queued work structure to cancel
*
* Returned Value:
Expand All @@ -64,13 +63,16 @@
*
****************************************************************************/

static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread,
static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
FAR struct work_s *work)
{
irqstate_t flags;
int ret = -ENOENT;

DEBUGASSERT(work != NULL);
if (wqueue == NULL || work == NULL)
{
return -EINVAL;
}

/* Cancelling the work is simply a matter of removing the work structure
* from the work queue. This must be done with interrupts disabled because
Expand All @@ -96,11 +98,11 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread,
work->worker = NULL;
ret = OK;
}
else if (!up_interrupt_context() && !sched_idletask() && nthread > 0)
else if (!up_interrupt_context() && !sched_idletask() && sync)
{
int wndx;

for (wndx = 0; wndx < nthread; wndx++)
for (wndx = 0; wndx < wqueue->nthreads; wndx++)
{
if (wqueue->worker[wndx].work == work &&
wqueue->worker[wndx].pid != nxsched_gettid())
Expand All @@ -121,20 +123,20 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread,
****************************************************************************/

/****************************************************************************
* Name: work_cancel
* Name: work_cancel/work_cancel_wq
*
* Description:
* Cancel previously queued user-mode work. This removes work from the
* user mode work queue. After work has been cancelled, it may be
* requeued by calling work_queue() again.
* Cancel previously queued work. This removes work from the work queue.
* After work has been cancelled, it may be requeued by calling
* work_queue() again.
*
* Input Parameters:
* qid - The work queue ID (must be HPWORK or LPWORK)
* wqueue - The work queue handle
* work - The previously queued work structure to cancel
*
* Returned Value:
* Zero (OK) on success, a negated errno on failure. This error may be
* reported:
* Zero on success, a negated errno on failure
*
* -ENOENT - There is no such work queued.
* -EINVAL - An invalid work queue was specified
Expand All @@ -143,33 +145,17 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread,

int work_cancel(int qid, FAR struct work_s *work)
{
#ifdef CONFIG_SCHED_HPWORK
if (qid == HPWORK)
{
/* Cancel high priority work */

return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork,
-1, work);
}
else
#endif
#ifdef CONFIG_SCHED_LPWORK
if (qid == LPWORK)
{
/* Cancel low priority work */
return work_qcancel(work_qid2wq(qid), false, work);
}

return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork,
-1, work);
}
else
#endif
{
return -EINVAL;
}
int work_cancel_wq(FAR struct kwork_wqueue_s *wqueue,
FAR struct work_s *work)
{
return work_qcancel(wqueue, false, work);
}

/****************************************************************************
* Name: work_cancel_sync
* Name: work_cancel_sync/work_cancel_sync_wq
*
* Description:
* Blocked cancel previously queued user-mode work. This removes work
Expand All @@ -178,6 +164,7 @@ int work_cancel(int qid, FAR struct work_s *work)
*
* Input Parameters:
* qid - The work queue ID (must be HPWORK or LPWORK)
* wqueue - The work queue handle
* work - The previously queued work structure to cancel
*
* Returned Value:
Expand All @@ -191,29 +178,13 @@ int work_cancel(int qid, FAR struct work_s *work)

int work_cancel_sync(int qid, FAR struct work_s *work)
{
#ifdef CONFIG_SCHED_HPWORK
if (qid == HPWORK)
{
/* Cancel high priority work */

return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork,
CONFIG_SCHED_HPNTHREADS, work);
}
else
#endif
#ifdef CONFIG_SCHED_LPWORK
if (qid == LPWORK)
{
/* Cancel low priority work */
return work_qcancel(work_qid2wq(qid), true, work);
}

return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork,
CONFIG_SCHED_LPNTHREADS, work);
}
else
#endif
{
return -EINVAL;
}
int work_cancel_sync_wq(FAR struct kwork_wqueue_s *wqueue,
FAR struct work_s *work)
{
return work_qcancel(wqueue, true, work);
}

#endif /* CONFIG_SCHED_WORKQUEUE */
Loading

0 comments on commit ce2ad51

Please sign in to comment.