公开学习文档

公开学习文档


工作队列

<h2>基础</h2> <p>可延迟函数运行在中断上下文;而工作队列运行在进程上下文。 中断上下文不可能发生进程切换?待确认。</p> <h2>结构体和定义</h2> <pre><code class="language-c">// file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c /* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: */ struct workqueue_struct { struct cpu_workqueue_struct cpu_wq[NR_CPUS]; const char *name; struct list_head list; /* Empty if single thread */ }; /* * The per-CPU workqueue (if single thread, we always use cpu 0's). * * The sequence counters are for flush_scheduled_work(). It wants to wait * until until all currently-scheduled works are completed, but it doesn't * want to be livelocked by new, incoming ones. So it waits until * remove_sequence is &gt;= the insert_sequence which pertained when * flush_scheduled_work() was called. */ struct cpu_workqueue_struct { spinlock_t lock; long remove_sequence; /* Least-recently added (next to run) */ long insert_sequence; /* Next to add */ struct list_head worklist; // 挂接各个 work_struct 任务 wait_queue_head_t more_work; wait_queue_head_t work_done; struct workqueue_struct *wq; task_t *thread; int run_depth; /* Detect run_workqueue() recursion depth */ } ____cacheline_aligned; //file: E:\ProgramWork\linux\linux-2.6.11\include\linux\workqueue.h struct work_struct { unsigned long pending; // 是否已经在工作队列的 list 中 struct list_head entry; // 用于挂接 list void (*func)(void *); void *data; void *wq_data; struct timer_list timer; };</code></pre> <p>worklist 字段是双向 list,集中所有挂起的 work_struct 任务。</p> <h2>流程</h2> <pre><code class="language-c">//file: E:\ProgramWork\linux\linux-2.6.11\include\linux\workqueue.h #define create_workqueue(name) __create_workqueue((name), 0) //file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c struct workqueue_struct *__create_workqueue(const char *name, int singlethread) { int cpu, destroy = 0; struct workqueue_struct *wq; struct task_struct *p; BUG_ON(strlen(name) &gt; 10); wq = kmalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return NULL; memset(wq, 0, sizeof(*wq)); wq-&gt;name = name; /* We don't need the distraction of CPUs appearing and vanishing. */ lock_cpu_hotplug(); if (singlethread) { INIT_LIST_HEAD(&amp;wq-&gt;list); p = create_workqueue_thread(wq, 0); if (!p) destroy = 1; else wake_up_process(p); } else { spin_lock(&amp;workqueue_lock); list_add(&amp;wq-&gt;list, &amp;workqueues); spin_unlock(&amp;workqueue_lock); for_each_online_cpu(cpu) { p = create_workqueue_thread(wq, cpu); if (p) { kthread_bind(p, cpu); wake_up_process(p); } else destroy = 1; } } unlock_cpu_hotplug(); /* * Was there any error during startup? If yes then clean up: */ if (destroy) { destroy_workqueue(wq); wq = NULL; } return wq; }</code></pre> <p>创建工作队列函数,其中会创建 n 个内核线程。</p> <pre><code class="language-c">//file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c /* * Queue work on a workqueue. Return non-zero if it was successfully * added. * * We queue the work to the CPU it was submitted, but there is no * guarantee that it will be processed by that CPU. */ int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) { int ret = 0, cpu = get_cpu(); if (!test_and_set_bit(0, &amp;work-&gt;pending)) { // 如果未设置,则会置位,并进入条件 if (unlikely(is_single_threaded(wq))) cpu = 0; BUG_ON(!list_empty(&amp;work-&gt;entry)); __queue_work(wq-&gt;cpu_wq + cpu, work); ret = 1; } put_cpu(); return ret; } /* Preempt must be disabled. */ static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) { unsigned long flags; spin_lock_irqsave(&amp;cwq-&gt;lock, flags); work-&gt;wq_data = cwq; list_add_tail(&amp;work-&gt;entry, &amp;cwq-&gt;worklist); cwq-&gt;insert_sequence++; wake_up(&amp;cwq-&gt;more_work); // 唤醒线程 spin_unlock_irqrestore(&amp;cwq-&gt;lock, flags); }</code></pre> <p>queue_work() 负责将 work_struct 加入到工作队列当中。</p> <pre><code class="language-c">//file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c static int worker_thread(void *__cwq) { struct cpu_workqueue_struct *cwq = __cwq; DECLARE_WAITQUEUE(wait, current); struct k_sigaction sa; sigset_t blocked; current-&gt;flags |= PF_NOFREEZE; set_user_nice(current, -5); /* Block and flush all signals */ sigfillset(&amp;blocked); sigprocmask(SIG_BLOCK, &amp;blocked, NULL); flush_signals(current); /* SIG_IGN makes children autoreap: see do_notify_parent(). */ sa.sa.sa_handler = SIG_IGN; sa.sa.sa_flags = 0; siginitset(&amp;sa.sa.sa_mask, sigmask(SIGCHLD)); do_sigaction(SIGCHLD, &amp;sa, (struct k_sigaction *)0); set_current_state(TASK_INTERRUPTIBLE); while (!kthread_should_stop()) { add_wait_queue(&amp;cwq-&gt;more_work, &amp;wait); if (list_empty(&amp;cwq-&gt;worklist)) schedule(); else __set_current_state(TASK_RUNNING); remove_wait_queue(&amp;cwq-&gt;more_work, &amp;wait); if (!list_empty(&amp;cwq-&gt;worklist)) run_workqueue(cwq); // 执行 worklist 上的任务 set_current_state(TASK_INTERRUPTIBLE); } __set_current_state(TASK_RUNNING); return 0; } static inline void run_workqueue(struct cpu_workqueue_struct *cwq) { unsigned long flags; /* * Keep taking off work from the queue until * done. */ spin_lock_irqsave(&amp;cwq-&gt;lock, flags); cwq-&gt;run_depth++; if (cwq-&gt;run_depth &gt; 3) { /* morton gets to eat his hat */ printk("%s: recursion depth exceeded: %d\n", __FUNCTION__, cwq-&gt;run_depth); dump_stack(); } while (!list_empty(&amp;cwq-&gt;worklist)) { struct work_struct *work = list_entry(cwq-&gt;worklist.next, struct work_struct, entry); void (*f) (void *) = work-&gt;func; void *data = work-&gt;data; list_del_init(cwq-&gt;worklist.next); spin_unlock_irqrestore(&amp;cwq-&gt;lock, flags); BUG_ON(work-&gt;wq_data != cwq); clear_bit(0, &amp;work-&gt;pending); f(data); spin_lock_irqsave(&amp;cwq-&gt;lock, flags); cwq-&gt;remove_sequence++; wake_up(&amp;cwq-&gt;work_done); } cwq-&gt;run_depth--; spin_unlock_irqrestore(&amp;cwq-&gt;lock, flags); }</code></pre> <p>以上是线程函数,遍历 worklist 并执行对应的函数。</p> <h2>events 预定义工作队列</h2> <pre><code class="language-c">//file: E:\ProgramWork\linux\linux-2.6.11\kernel\workqueue.c static struct workqueue_struct *keventd_wq; // 全局变量 void init_workqueues(void) { hotcpu_notifier(workqueue_cpu_callback, 0); keventd_wq = create_workqueue("events"); // 系统创建 events 工作队列 BUG_ON(!keventd_wq); } int fastcall schedule_work(struct work_struct *work) { return queue_work(keventd_wq, work); } int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay) { return queue_delayed_work(keventd_wq, work, delay); } void flush_scheduled_work(void) { flush_workqueue(keventd_wq); }</code></pre> <p>内核自身已经定义了 events 队列,可供用户使用。不应该在工作队列中使用长时间阻塞的函数,因为工作队列中的函数中串行执行的,可能会导致其它用户的函数无法及时执行。</p> <h2>kblockd</h2> <p>块设备使用的工作队列</p>

页面列表

ITEM_HTML