工作队列
<h2>基础</h2>
<p>可延迟函数运行在中断上下文;而工作队列运行在进程上下文。
中断上下文不可能发生进程切换?待确认。</p>
<h2>结构体和定义</h2>
<pre><code class="language-c">// file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
struct cpu_workqueue_struct cpu_wq[NR_CPUS];
const char *name;
struct list_head list; /* Empty if single thread */
};
/*
* The per-CPU workqueue (if single thread, we always use cpu 0's).
*
* The sequence counters are for flush_scheduled_work(). It wants to wait
* until until all currently-scheduled works are completed, but it doesn't
* want to be livelocked by new, incoming ones. So it waits until
* remove_sequence is >= the insert_sequence which pertained when
* flush_scheduled_work() was called.
*/
struct cpu_workqueue_struct {
spinlock_t lock;
long remove_sequence; /* Least-recently added (next to run) */
long insert_sequence; /* Next to add */
struct list_head worklist; // 挂接各个 work_struct 任务
wait_queue_head_t more_work;
wait_queue_head_t work_done;
struct workqueue_struct *wq;
task_t *thread;
int run_depth; /* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;
//file: E:\ProgramWork\linux\linux-2.6.11\include\linux\workqueue.h
struct work_struct {
unsigned long pending; // 是否已经在工作队列的 list 中
struct list_head entry; // 用于挂接 list
void (*func)(void *);
void *data;
void *wq_data;
struct timer_list timer;
};</code></pre>
<p>worklist 字段是双向 list,集中所有挂起的 work_struct 任务。</p>
<h2>流程</h2>
<pre><code class="language-c">//file: E:\ProgramWork\linux\linux-2.6.11\include\linux\workqueue.h
#define create_workqueue(name) __create_workqueue((name), 0)
//file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c
struct workqueue_struct *__create_workqueue(const char *name,
int singlethread)
{
int cpu, destroy = 0;
struct workqueue_struct *wq;
struct task_struct *p;
BUG_ON(strlen(name) > 10);
wq = kmalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
memset(wq, 0, sizeof(*wq));
wq->name = name;
/* We don't need the distraction of CPUs appearing and vanishing. */
lock_cpu_hotplug();
if (singlethread) {
INIT_LIST_HEAD(&wq->list);
p = create_workqueue_thread(wq, 0);
if (!p)
destroy = 1;
else
wake_up_process(p);
} else {
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
for_each_online_cpu(cpu) {
p = create_workqueue_thread(wq, cpu);
if (p) {
kthread_bind(p, cpu);
wake_up_process(p);
} else
destroy = 1;
}
}
unlock_cpu_hotplug();
/*
* Was there any error during startup? If yes then clean up:
*/
if (destroy) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}</code></pre>
<p>创建工作队列函数,其中会创建 n 个内核线程。</p>
<pre><code class="language-c">//file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c
/*
* Queue work on a workqueue. Return non-zero if it was successfully
* added.
*
* We queue the work to the CPU it was submitted, but there is no
* guarantee that it will be processed by that CPU.
*/
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0, cpu = get_cpu();
if (!test_and_set_bit(0, &work->pending)) { // 如果未设置,则会置位,并进入条件
if (unlikely(is_single_threaded(wq)))
cpu = 0;
BUG_ON(!list_empty(&work->entry));
__queue_work(wq->cpu_wq + cpu, work);
ret = 1;
}
put_cpu();
return ret;
}
/* Preempt must be disabled. */
static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
work->wq_data = cwq;
list_add_tail(&work->entry, &cwq->worklist);
cwq->insert_sequence++;
wake_up(&cwq->more_work); // 唤醒线程
spin_unlock_irqrestore(&cwq->lock, flags);
}</code></pre>
<p>queue_work() 负责将 work_struct 加入到工作队列当中。</p>
<pre><code class="language-c">//file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa;
sigset_t blocked;
current->flags |= PF_NOFREEZE;
set_user_nice(current, -5);
/* Block and flush all signals */
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
/* SIG_IGN makes children autoreap: see do_notify_parent(). */
sa.sa.sa_handler = SIG_IGN;
sa.sa.sa_flags = 0;
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
set_current_state(TASK_INTERRUPTIBLE);
while (!kthread_should_stop()) {
add_wait_queue(&cwq->more_work, &wait);
if (list_empty(&cwq->worklist))
schedule();
else
__set_current_state(TASK_RUNNING);
remove_wait_queue(&cwq->more_work, &wait);
if (!list_empty(&cwq->worklist))
run_workqueue(cwq); // 执行 worklist 上的任务
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;
}
static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
{
unsigned long flags;
/*
* Keep taking off work from the queue until
* done.
*/
spin_lock_irqsave(&cwq->lock, flags);
cwq->run_depth++;
if (cwq->run_depth > 3) {
/* morton gets to eat his hat */
printk("%s: recursion depth exceeded: %d\n",
__FUNCTION__, cwq->run_depth);
dump_stack();
}
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
void (*f) (void *) = work->func;
void *data = work->data;
list_del_init(cwq->worklist.next);
spin_unlock_irqrestore(&cwq->lock, flags);
BUG_ON(work->wq_data != cwq);
clear_bit(0, &work->pending);
f(data);
spin_lock_irqsave(&cwq->lock, flags);
cwq->remove_sequence++;
wake_up(&cwq->work_done);
}
cwq->run_depth--;
spin_unlock_irqrestore(&cwq->lock, flags);
}</code></pre>
<p>以上是线程函数,遍历 worklist 并执行对应的函数。</p>
<h2>events 预定义工作队列</h2>
<pre><code class="language-c">//file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c
static struct workqueue_struct *keventd_wq; // 全局变量
void init_workqueues(void)
{
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events"); // 系统创建 events 工作队列
BUG_ON(!keventd_wq);
}
int fastcall schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
{
return queue_delayed_work(keventd_wq, work, delay);
}
void flush_scheduled_work(void)
{
flush_workqueue(keventd_wq);
}</code></pre>
<p>内核自身已经定义了 events 队列,可供用户使用。不应该在工作队列中使用长时间阻塞的函数,因为工作队列中的函数中串行执行的,可能会导致其它用户的函数无法及时执行。</p>
<h2>kblockd</h2>
<p>块设备使用的工作队列</p>