公开学习文档

公开学习文档


epoll_wait等待接收

<h2>概述</h2> <h2>分析</h2> <pre><code class="language-c">// file: fs/eventpoll.c /* * Implement the event wait interface for the eventpoll file. It is the kernel * part of the user space epoll_wait(2). */ SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout) { int error; struct fd f; struct eventpoll *ep; /* The maximum number of event must be greater than zero */ if (maxevents &amp;lt;= 0 || maxevents &amp;gt; EP_MAX_EVENTS) return -EINVAL; /* Verify that the area passed by the user is writeable */ if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) return -EFAULT; /* Get the &amp;quot;struct file *&amp;quot; for the eventpoll file */ f = fdget(epfd); if (!f.file) return -EBADF; /* * We have to check that the file structure underneath the fd * the user passed to us _is_ an eventpoll file. */ error = -EINVAL; if (!is_file_epoll(f.file)) goto error_fput; /* * At this point it is safe to assume that the &amp;quot;private_data&amp;quot; contains * our own data structure. */ ep = f.file-&amp;gt;private_data; // file 上的私有数据 /* Time to fish for events ... */ error = ep_poll(ep, events, maxevents, timeout); // 继续 error_fput: fdput(f); return error; } /** * ep_poll - Retrieves ready events, and delivers them to the caller supplied * event buffer. * * @ep: Pointer to the eventpoll context. * @events: Pointer to the userspace buffer where the ready events should be * stored. * @maxevents: Size (in terms of number of events) of the caller event buffer. * @timeout: Maximum timeout for the ready events fetch operation, in * milliseconds. If the @timeout is zero, the function will not block, * while if the @timeout is less than zero, the function will block * until at least one event has been retrieved (or an error * occurred). * * Returns: Returns the number of ready events which have been fetched, or an * error code, in case of error. */ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res = 0, eavail, timed_out = 0; unsigned long flags; long slack = 0; wait_queue_t wait; ktime_t expires, *to = NULL; if (timeout &amp;gt; 0) { struct timespec end_time = ep_set_mstimeout(timeout); slack = select_estimate_accuracy(&amp;amp;end_time); to = &amp;amp;expires; *to = timespec_to_ktime(end_time); } else if (timeout == 0) { /* * Avoid the unnecessary trip to the wait queue loop, if the * caller specified a non blocking operation. */ timed_out = 1; spin_lock_irqsave(&amp;amp;ep-&amp;gt;lock, flags); goto check_events; } fetch_events: spin_lock_irqsave(&amp;amp;ep-&amp;gt;lock, flags); if (!ep_events_available(ep)) { // 函数定义:!list_empty(&amp;amp;ep-&amp;gt;rdllist) || ep-&amp;gt;ovflist != EP_UNACTIVE_PTR; /* * We don't have any available event to return to the caller. * We need to sleep here, and we will be wake up by * ep_poll_callback() when events will become available. */ init_waitqueue_entry(&amp;amp;wait, current); // 设置 wait 上的 func = default_wake_function __add_wait_queue_exclusive(&amp;amp;ep-&amp;gt;wq, &amp;amp;wait); // 将当前进程加入到 epoll-&amp;gt;wq 的等待队列中 for (;;) { /* * We don't want to sleep if the ep_poll_callback() sends us * a wakeup in between. That's why we set the task state * to TASK_INTERRUPTIBLE before doing the checks. */ set_current_state(TASK_INTERRUPTIBLE); if (ep_events_available(ep) || timed_out) break; if (signal_pending(current)) { // 判断进程是否有非阻塞的挂起信号 res = -EINTR; break; } spin_unlock_irqrestore(&amp;amp;ep-&amp;gt;lock, flags); if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) timed_out = 1; spin_lock_irqsave(&amp;amp;ep-&amp;gt;lock, flags); // 唤醒后继续执行 } __remove_wait_queue(&amp;amp;ep-&amp;gt;wq, &amp;amp;wait); // 删除等待项 set_current_state(TASK_RUNNING); } check_events: /* Is it worth to try to dig for events ? */ eavail = ep_events_available(ep); spin_unlock_irqrestore(&amp;amp;ep-&amp;gt;lock, flags); /* * Try to transfer events to user space. In case we get 0 events and * there's still timeout left over, we go trying again in search of * more luck. */ if (!res &amp;amp;&amp;amp; eavail &amp;amp;&amp;amp; !(res = ep_send_events(ep, events, maxevents)) &amp;amp;&amp;amp; !timed_out) // 向用户进程返回就绪事件 goto fetch_events; return res; } /** * ep_events_available - Checks if ready events might be available. * * @ep: Pointer to the eventpoll context. * * Returns: Returns a value different than zero if ready events are available, * or zero otherwise. */ static inline int ep_events_available(struct eventpoll *ep) { return !list_empty(&amp;amp;ep-&amp;gt;rdllist) || ep-&amp;gt;ovflist != EP_UNACTIVE_PTR; } // file: include/linux/wait.h static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p) { q-&amp;gt;flags = 0; q-&amp;gt;private = p; // 关联进程 q-&amp;gt;func = default_wake_function; // wait 项上的回调函数 } </code></pre> <p>判断 ep-&gt;rdllist 和 ep-&gt;ovflist 后,如果没有事件,则让出 CPU,等待收包触发事件。</p> <p>另一方面,软中断在收到包后,流程上和“同步接收”差不多:将包放到 sk-&gt;sk_receive_queue 上,然后调用 <code>sock_def_readable</code>,它主要是就是调用 wait 项上的函数。对于加入 epoll 的 socket,前面已经分析过,对应的函数就是 <code>ep_poll_callback</code>(在 epoll_ctl(ADD)-&gt;ep_insert-&gt;ep_ptable_queue_proc 中设置的):</p> <pre><code class="language-c">// file: fs/eventpoll.c /* * This is the callback that is passed to the wait queue wakeup * mechanism. It is called by the stored file descriptors when they * have events to report. */ static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) { int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); // container_of(p, struct eppoll_entry, wait)-&amp;gt;base; // 即首先获取 struct eppoll_entry 对象,再获取其 base 字段 struct eventpoll *ep = epi-&amp;gt;ep; if ((unsigned long)key &amp;amp; POLLFREE) { ep_pwq_from_wait(wait)-&amp;gt;whead = NULL; /* * whead = NULL above can race with ep_remove_wait_queue() * which can do another remove_wait_queue() after us, so we * can't use __remove_wait_queue(). whead-&amp;gt;lock is held by * the caller. */ list_del_init(&amp;amp;wait-&amp;gt;task_list); // 如果是 POLLFREE 才从等待队列中删除,否则保留?也就是说:这个 wait 项会一直放在 socket 的等待队列上? } spin_lock_irqsave(&amp;amp;ep-&amp;gt;lock, flags); /* * If the event mask does not contain any poll(2) event, we consider the * descriptor to be disabled. This condition is likely the effect of the * EPOLLONESHOT bit that disables the descriptor when an event is received, * until the next EPOLL_CTL_MOD will be issued. */ if (!(epi-&amp;gt;event.events &amp;amp; ~EP_PRIVATE_BITS)) goto out_unlock; /* * Check the events coming with the callback. At this stage, not * every device reports the events in the &amp;quot;key&amp;quot; parameter of the * callback. We need to be able to handle both cases here, hence the * test for &amp;quot;key&amp;quot; != NULL before the event match test. */ if (key &amp;amp;&amp;amp; !((unsigned long) key &amp;amp; epi-&amp;gt;event.events)) goto out_unlock; /* * If we are transferring events to userspace, we can hold no locks * (because we're accessing user memory, and because of linux f_op-&amp;gt;poll() * semantics). All the events that happen during that period of time are * chained in ep-&amp;gt;ovflist and requeued later on. */ if (unlikely(ep-&amp;gt;ovflist != EP_UNACTIVE_PTR)) { if (epi-&amp;gt;next == EP_UNACTIVE_PTR) { epi-&amp;gt;next = ep-&amp;gt;ovflist; ep-&amp;gt;ovflist = epi; if (epi-&amp;gt;ws) { /* * Activate ep-&amp;gt;ws since epi-&amp;gt;ws may get * deactivated at any time. */ __pm_stay_awake(ep-&amp;gt;ws); } } goto out_unlock; } /* If this file is already in the ready list we exit soon */ if (!ep_is_linked(&amp;amp;epi-&amp;gt;rdllink)) { list_add_tail(&amp;amp;epi-&amp;gt;rdllink, &amp;amp;ep-&amp;gt;rdllist); // 核心操作:上文判断的就是这个 rdllist ep_pm_stay_awake_rcu(epi); } /* * Wake up ( if active ) both the eventpoll wait list and the -&amp;gt;poll() * wait list. */ if (waitqueue_active(&amp;amp;ep-&amp;gt;wq)) // !list_empty(&amp;amp;q-&amp;gt;task_list); wake_up_locked(&amp;amp;ep-&amp;gt;wq); // epoll 上的等待队列,调用 wait 项上的回调函数,也是只唤醒 1 个等待项 if (waitqueue_active(&amp;amp;ep-&amp;gt;poll_wait)) pwake++; out_unlock: spin_unlock_irqrestore(&amp;amp;ep-&amp;gt;lock, flags); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&amp;amp;ep-&amp;gt;poll_wait); return 1; } </code></pre> <p>注意到这个过程涉及到 2 个 wait 项:一个是 socket 收到数据时,摘取一个 wait 项;另一个是(在执行 sokcet wait 项时)从 epoll 等待队列中摘取的 wait 项。</p> <p>继续分析: <code>default_wake_function</code> 会唤醒阻塞(即调用 epoll_wait 而睡眠)的进程,使其从暂停的代码处继续运行。会继续执行 <code>ep_poll</code> 里的 <code>ep_send_events</code> 来向用户进程返回就绪事件:</p> <pre><code class="language-c">// file: fs/eventpoll.c static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { struct ep_send_events_data esed; esed.maxevents = maxevents; esed.events = events; return ep_scan_ready_list(ep, ep_send_events_proc, &amp;amp;esed, 0); } /** * ep_scan_ready_list - Scans the ready list in a way that makes possible for * the scan code, to call f_op-&amp;gt;poll(). Also allows for * O(NumReady) performance. * * @ep: Pointer to the epoll private data structure. * @sproc: Pointer to the scan callback. * @priv: Private opaque data passed to the @sproc callback. * @depth: The current depth of recursive f_op-&amp;gt;poll calls. * * Returns: The same integer error code returned by the @sproc callback. */ static int ep_scan_ready_list(struct eventpoll *ep, int (*sproc)(struct eventpoll *, struct list_head *, void *), void *priv, int depth) { int error, pwake = 0; unsigned long flags; struct epitem *epi, *nepi; LIST_HEAD(txlist); /* * We need to lock this because we could be hit by * eventpoll_release_file() and epoll_ctl(). */ mutex_lock_nested(&amp;amp;ep-&amp;gt;mtx, depth); /* * Steal the ready list, and re-init the original one to the * empty list. Also, set ep-&amp;gt;ovflist to NULL so that events * happening while looping w/out locks, are not lost. We cannot * have the poll callback to queue directly on ep-&amp;gt;rdllist, * because we want the &amp;quot;sproc&amp;quot; callback to be able to do it * in a lockless way. */ spin_lock_irqsave(&amp;amp;ep-&amp;gt;lock, flags); list_splice_init(&amp;amp;ep-&amp;gt;rdllist, &amp;amp;txlist); // 将 rdllist 内容放到 txlist 中,并将 rdllist 清空 ep-&amp;gt;ovflist = NULL; // 后面就绪的事件就放到 ovflist 中 spin_unlock_irqrestore(&amp;amp;ep-&amp;gt;lock, flags); /* * Now call the callback function. */ error = (*sproc)(ep, &amp;amp;txlist, priv); spin_lock_irqsave(&amp;amp;ep-&amp;gt;lock, flags); /* * During the time we spent inside the &amp;quot;sproc&amp;quot; callback, some * other events might have been queued by the poll callback. * We re-insert them inside the main ready-list here. */ for (nepi = ep-&amp;gt;ovflist; (epi = nepi) != NULL; nepi = epi-&amp;gt;next, epi-&amp;gt;next = EP_UNACTIVE_PTR) { /* * We need to check if the item is already in the list. * During the &amp;quot;sproc&amp;quot; callback execution time, items are * queued into -&amp;gt;ovflist but the &amp;quot;txlist&amp;quot; might already * contain them, and the list_splice() below takes care of them. */ if (!ep_is_linked(&amp;amp;epi-&amp;gt;rdllink)) { list_add_tail(&amp;amp;epi-&amp;gt;rdllink, &amp;amp;ep-&amp;gt;rdllist); // 将处理时间内产生的事件,放到 rdllist 中 ep_pm_stay_awake(epi); } } /* * We need to set back ep-&amp;gt;ovflist to EP_UNACTIVE_PTR, so that after * releasing the lock, events will be queued in the normal way inside * ep-&amp;gt;rdllist. */ ep-&amp;gt;ovflist = EP_UNACTIVE_PTR; /* * Quickly re-inject items left on &amp;quot;txlist&amp;quot;. */ list_splice(&amp;amp;txlist, &amp;amp;ep-&amp;gt;rdllist); // 可能没有交付完,如总数限制,则再加回到 rdllist 中 __pm_relax(ep-&amp;gt;ws); if (!list_empty(&amp;amp;ep-&amp;gt;rdllist)) { /* * Wake up (if active) both the eventpoll wait list and * the -&amp;gt;poll() wait list (delayed after we release the lock). */ if (waitqueue_active(&amp;amp;ep-&amp;gt;wq)) // 如果还有事件,则尝试再唤醒一个 wait 项 wake_up_locked(&amp;amp;ep-&amp;gt;wq); if (waitqueue_active(&amp;amp;ep-&amp;gt;poll_wait)) pwake++; } spin_unlock_irqrestore(&amp;amp;ep-&amp;gt;lock, flags); mutex_unlock(&amp;amp;ep-&amp;gt;mtx); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&amp;amp;ep-&amp;gt;poll_wait); return error; } static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) { struct ep_send_events_data *esed = priv; int eventcnt; unsigned int revents; struct epitem *epi; struct epoll_event __user *uevent; struct wakeup_source *ws; poll_table pt; init_poll_funcptr(&amp;amp;pt, NULL); /* * We can loop without lock because we are passed a task private list. * Items cannot vanish during the loop because ep_scan_ready_list() is * holding &amp;quot;mtx&amp;quot; during this call. */ for (eventcnt = 0, uevent = esed-&amp;gt;events; !list_empty(head) &amp;amp;&amp;amp; eventcnt &amp;lt; esed-&amp;gt;maxevents;) { // eventcnt 是计数;esed 用于向应用层返回数据 epi = list_first_entry(head, struct epitem, rdllink); /* * Activate ep-&amp;gt;ws before deactivating epi-&amp;gt;ws to prevent * triggering auto-suspend here (in case we reactive epi-&amp;gt;ws * below). * * This could be rearranged to delay the deactivation of epi-&amp;gt;ws * instead, but then epi-&amp;gt;ws would temporarily be out of sync * with ep_is_linked(). */ ws = ep_wakeup_source(epi); if (ws) { if (ws-&amp;gt;active) __pm_stay_awake(ep-&amp;gt;ws); __pm_relax(ws); } list_del_init(&amp;amp;epi-&amp;gt;rdllink); // 从就绪队列删除? revents = ep_item_poll(epi, &amp;amp;pt); // 获取 epi 对应 fd 的就绪事件。。之前 insert 时也调用了,对于 TCP 场景最终调用到:net/ipv4/tcp.c:tcp_poll() /* * If the event mask intersect the caller-requested one, * deliver the event to userspace. Again, ep_scan_ready_list() * is holding &amp;quot;mtx&amp;quot;, so no operations coming from userspace * can change the item. */ if (revents) { // 这里是有事件时处理,对于无事件的情况,则不会重新加到就绪列表,所以对于 LT 模式,这个时候才会从就绪列表中删除 if (__put_user(revents, &amp;amp;uevent-&amp;gt;events) || __put_user(epi-&amp;gt;event.data, &amp;amp;uevent-&amp;gt;data)) { list_add(&amp;amp;epi-&amp;gt;rdllink, head); // 复制失败,则重新加回到就绪列表中 ep_pm_stay_awake(epi); return eventcnt ? eventcnt : -EFAULT; } eventcnt++; uevent++; if (epi-&amp;gt;event.events &amp;amp; EPOLLONESHOT) epi-&amp;gt;event.events &amp;amp;= EP_PRIVATE_BITS; else if (!(epi-&amp;gt;event.events &amp;amp; EPOLLET)) { // 非 ET 模式,则不会再加入到 rdllist 中 /* * If this file has been added with Level * Trigger mode, we need to insert back inside * the ready list, so that the next call to * epoll_wait() will check again the events * availability. At this point, no one can insert * into ep-&amp;gt;rdllist besides us. The epoll_ctl() * callers are locked out by * ep_scan_ready_list() holding &amp;quot;mtx&amp;quot; and the * poll callback will queue them in ep-&amp;gt;ovflist. */ list_add_tail(&amp;amp;epi-&amp;gt;rdllink, &amp;amp;ep-&amp;gt;rdllist); // 重新加回队列有个问题:后面会再次检查就绪队列,重新唤醒一个 wait 项,就会导致惊群现象(在 LT 模式下)。 ep_pm_stay_awake(epi); } } } return eventcnt; } </code></pre>

页面列表

ITEM_HTML