Skip to content

Commit

Permalink
wqueue: wqueue remove csection
Browse files Browse the repository at this point in the history
reason:
We decouple semcount from business logic
by using an independent counting variable,
which allows us to remove critical sections in many cases.

Signed-off-by: hujun5 <[email protected]>
  • Loading branch information
hujun260 committed Nov 4, 2024
1 parent 38fbb4e commit d7d8355
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 49 deletions.
18 changes: 8 additions & 10 deletions sched/wqueue/kwork_cancel.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,20 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
* new work is typically added to the work queue from interrupt handlers.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_wqueue_lock);
if (work->worker != NULL)
{
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/

if (WDOG_ISACTIVE(&work->u.timer))
{
wd_cancel(&work->u.timer);
}
else
work->worker = NULL;
wd_cancel(&work->u.timer);
if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q))
{
dq_rem((FAR dq_entry_t *)work, &wqueue->q);
}

work->worker = NULL;
ret = OK;
}
else if (!up_interrupt_context() && !sched_idletask() && sync)
Expand All @@ -86,14 +83,15 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
if (wqueue->worker[wndx].work == work &&
wqueue->worker[wndx].pid != nxsched_gettid())
{
wqueue->worker[wndx].wait_count--;
spin_unlock_irqrestore(&g_wqueue_lock, flags);
nxsem_wait_uninterruptible(&wqueue->worker[wndx].wait);
ret = 1;
break;
return 1;
}
}
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_wqueue_lock, flags);
return ret;
}

Expand Down
45 changes: 25 additions & 20 deletions sched/wqueue/kwork_notifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ static dq_queue_t g_notifier_free;

static dq_queue_t g_notifier_pending;

static spinlock_t g_work_notifier_lock;

/****************************************************************************
* Private Functions
****************************************************************************/
Expand Down Expand Up @@ -166,17 +168,21 @@ static void work_notifier_worker(FAR void *arg)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);

/* Remove the notification from the pending list */

dq_rem(&notifier->entry, &g_notifier_pending);
notifier = work_notifier_find(notifier->key);
if (notifier != NULL)
{
dq_rem(&notifier->entry, &g_notifier_pending);

/* Put the notification to the free list */
/* Put the notification to the free list */

dq_addlast(&notifier->entry, &g_notifier_free);
dq_addlast(&notifier->entry, &g_notifier_free);
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
}

/****************************************************************************
Expand Down Expand Up @@ -213,14 +219,14 @@ int work_notifier_setup(FAR struct work_notifier_s *info)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);

/* Try to get the entry from the free list */

notifier = (FAR struct work_notifier_entry_s *)
dq_remfirst(&g_notifier_free);

leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);

if (notifier == NULL)
{
Expand All @@ -245,7 +251,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);

/* Generate a unique key for this notification */

Expand All @@ -262,7 +268,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info)
dq_addlast(&notifier->entry, &g_notifier_pending);
ret = notifier->key;

leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
}

return ret;
Expand Down Expand Up @@ -293,7 +299,7 @@ void work_notifier_teardown(int key)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);

/* Find the entry matching this PID in the g_notifier_pending list. We
* assume that there is only one.
Expand All @@ -304,19 +310,18 @@ void work_notifier_teardown(int key)
{
/* Cancel the work, this may be waiting */

if (work_cancel_sync(notifier->info.qid, &notifier->work) != 1)
{
/* Remove the notification from the pending list */
work_cancel(notifier->info.qid, &notifier->work);

dq_rem(&notifier->entry, &g_notifier_pending);
/* Remove the notification from the pending list */

/* Put the notification to the free list */
dq_rem(&notifier->entry, &g_notifier_pending);

dq_addlast(&notifier->entry, &g_notifier_free);
}
/* Put the notification to the free list */

dq_addlast(&notifier->entry, &g_notifier_free);
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
}

/****************************************************************************
Expand Down Expand Up @@ -352,7 +357,7 @@ void work_notifier_signal(enum work_evtype_e evtype,
* the notifications and been sent.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);
sched_lock();

/* Process the notification at the head of the pending list until the
Expand Down Expand Up @@ -397,7 +402,7 @@ void work_notifier_signal(enum work_evtype_e evtype,
}

sched_unlock();
leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
}

#endif /* CONFIG_WQUEUE_NOTIFIER */
36 changes: 24 additions & 12 deletions sched/wqueue/kwork_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@
#define queue_work(wqueue, work) \
do \
{ \
int sem_count; \
dq_addlast((FAR dq_entry_t *)(work), &(wqueue)->q); \
nxsem_get_value(&(wqueue)->sem, &sem_count); \
if (sem_count < 0) /* There are threads waiting for sem. */ \
if ((wqueue)->wait_count < 0) /* There are threads waiting for sem. */ \
{ \
(wqueue)->wait_count++; \
nxsem_post(&(wqueue)->sem); \
} \
} \
Expand All @@ -68,24 +67,28 @@
static void work_timer_expiry(wdparm_t arg)
{
FAR struct work_s *work = (FAR struct work_s *)arg;
irqstate_t flags = enter_critical_section();
irqstate_t flags = spin_lock_irqsave(&g_wqueue_lock);

queue_work(work->wq, work);
leave_critical_section(flags);
/* We have being canceled */

if (work->worker != NULL)
{
queue_work(work->wq, work);
}

spin_unlock_irqrestore(&g_wqueue_lock, flags);
}

static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads,
FAR struct work_s *work)
{
int semcount;
int wndx;

for (wndx = 0; wndx < nthreads; wndx++)
{
if (kworkers[wndx].work == work)
{
nxsem_get_value(&kworkers[wndx].wait, &semcount);
if (semcount < 0)
if (kworkers[wndx].wait_count < 0)
{
return true;
}
Expand Down Expand Up @@ -145,13 +148,22 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
* task logic or from interrupt handling logic.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_wqueue_lock);

/* Remove the entry from the timer and work queue. */

if (work->worker != NULL)
{
work_cancel_wq(wqueue, work);
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/

work->worker = NULL;
wd_cancel(&work->u.timer);
if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q))
{
dq_rem((FAR dq_entry_t *)work, &wqueue->q);
}
}

if (work_is_canceling(wqueue->worker, wqueue->nthreads, work))
Expand All @@ -177,7 +189,7 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
}

out:
leave_critical_section(flags);
spin_unlock_irqrestore(&g_wqueue_lock, flags);
return ret;
}

Expand Down
20 changes: 13 additions & 7 deletions sched/wqueue/kwork_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ struct lp_wqueue_s g_lpwork =

#endif /* CONFIG_SCHED_LPWORK */

spinlock_t g_wqueue_lock = SP_UNLOCKED;

/****************************************************************************
* Private Functions
****************************************************************************/
Expand Down Expand Up @@ -138,7 +140,6 @@ static int work_thread(int argc, FAR char *argv[])
worker_t worker;
irqstate_t flags;
FAR void *arg;
int semcount;

/* Get the handle from argv */

Expand All @@ -147,7 +148,7 @@ static int work_thread(int argc, FAR char *argv[])
kworker = (FAR struct kworker_s *)
((uintptr_t)strtoul(argv[2], NULL, 16));

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_wqueue_lock);

/* Loop forever */

Expand Down Expand Up @@ -189,19 +190,19 @@ static int work_thread(int argc, FAR char *argv[])
* performed... we don't have any idea how long this will take!
*/

leave_critical_section(flags);
spin_unlock_irqrestore(&g_wqueue_lock, flags);
CALL_WORKER(worker, arg);
flags = enter_critical_section();
flags = spin_lock_irqsave(&g_wqueue_lock);

/* Mark the thread un-busy */

kworker->work = NULL;

/* Check if someone is waiting, if so, wakeup it */

nxsem_get_value(&kworker->wait, &semcount);
while (semcount++ < 0)
while (kworker->wait_count < 0)
{
kworker->wait_count++;
nxsem_post(&kworker->wait);
}
}
Expand All @@ -211,10 +212,13 @@ static int work_thread(int argc, FAR char *argv[])
* posted.
*/

wqueue->wait_count--;
spin_unlock_irqrestore(&g_wqueue_lock, flags);
nxsem_wait_uninterruptible(&wqueue->sem);
flags = spin_lock_irqsave(&g_wqueue_lock);
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_wqueue_lock, flags);

nxsem_post(&wqueue->exsem);
return OK;
Expand Down Expand Up @@ -276,6 +280,7 @@ static int work_thread_create(FAR const char *name, int priority,
}

wqueue->worker[wndx].pid = pid;
wqueue->worker[wndx].wait_count = 0;
}

sched_unlock();
Expand Down Expand Up @@ -334,6 +339,7 @@ FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name,
nxsem_init(&wqueue->sem, 0, 0);
nxsem_init(&wqueue->exsem, 0, 0);
wqueue->nthreads = nthreads;
wqueue->wait_count = 0;

/* Create the work queue thread pool */

Expand Down
5 changes: 5 additions & 0 deletions sched/wqueue/wqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <nuttx/clock.h>
#include <nuttx/queue.h>
#include <nuttx/wqueue.h>
#include <nuttx/spinlock.h>

#ifdef CONFIG_SCHED_WORKQUEUE

Expand All @@ -58,6 +59,7 @@ struct kworker_s
pid_t pid; /* The task ID of the worker thread */
FAR struct work_s *work; /* The work structure */
sem_t wait; /* Sync waiting for worker done */
volatile int16_t wait_count;
};

/* This structure defines the state of one kernel-mode work queue */
Expand All @@ -69,6 +71,7 @@ struct kwork_wqueue_s
sem_t exsem; /* Sync waiting for thread exit */
uint8_t nthreads; /* Number of worker threads */
bool exit; /* A flag to request the thread to exit */
volatile int16_t wait_count;
struct kworker_s worker[0]; /* Describes a worker thread */
};

Expand Down Expand Up @@ -126,6 +129,8 @@ extern struct hp_wqueue_s g_hpwork;
extern struct lp_wqueue_s g_lpwork;
#endif

extern spinlock_t g_wqueue_lock;

/****************************************************************************
* Public Function Prototypes
****************************************************************************/
Expand Down

0 comments on commit d7d8355

Please sign in to comment.