epoll_wait等待接收
<h2>概述</h2>
<h2>分析</h2>
<pre><code class="language-c">// file: fs/eventpoll.c
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct fd f;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents &lt;= 0 || maxevents &gt; EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;
/* Get the &quot;struct file *&quot; for the eventpoll file */
f = fdget(epfd);
if (!f.file)
return -EBADF;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
if (!is_file_epoll(f.file))
goto error_fput;
/*
* At this point it is safe to assume that the &quot;private_data&quot; contains
* our own data structure.
*/
ep = f.file-&gt;private_data; // file 上的私有数据
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout); // 继续
error_fput:
fdput(f);
return error;
}
/**
* ep_poll - Retrieves ready events, and delivers them to the caller supplied
* event buffer.
*
* @ep: Pointer to the eventpoll context.
* @events: Pointer to the userspace buffer where the ready events should be
* stored.
* @maxevents: Size (in terms of number of events) of the caller event buffer.
* @timeout: Maximum timeout for the ready events fetch operation, in
* milliseconds. If the @timeout is zero, the function will not block,
* while if the @timeout is less than zero, the function will block
* until at least one event has been retrieved (or an error
* occurred).
*
* Returns: Returns the number of ready events which have been fetched, or an
* error code, in case of error.
*/
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
unsigned long flags;
long slack = 0;
wait_queue_t wait;
ktime_t expires, *to = NULL;
if (timeout &gt; 0) {
struct timespec end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&amp;end_time);
to = &amp;expires;
*to = timespec_to_ktime(end_time);
} else if (timeout == 0) {
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
spin_lock_irqsave(&amp;ep-&gt;lock, flags);
goto check_events;
}
fetch_events:
spin_lock_irqsave(&amp;ep-&gt;lock, flags);
if (!ep_events_available(ep)) { // 函数定义:!list_empty(&amp;ep-&gt;rdllist) || ep-&gt;ovflist != EP_UNACTIVE_PTR;
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&amp;wait, current); // 设置 wait 上的 func = default_wake_function
__add_wait_queue_exclusive(&amp;ep-&gt;wq, &amp;wait); // 将当前进程加入到 epoll-&gt;wq 的等待队列中
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) { // 判断进程是否有非阻塞的挂起信号
res = -EINTR;
break;
}
spin_unlock_irqrestore(&amp;ep-&gt;lock, flags);
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
spin_lock_irqsave(&amp;ep-&gt;lock, flags); // 唤醒后继续执行
}
__remove_wait_queue(&amp;ep-&gt;wq, &amp;wait); // 删除等待项
set_current_state(TASK_RUNNING);
}
check_events:
/* Is it worth to try to dig for events ? */
eavail = ep_events_available(ep);
spin_unlock_irqrestore(&amp;ep-&gt;lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res &amp;&amp; eavail &amp;&amp;
!(res = ep_send_events(ep, events, maxevents)) &amp;&amp; !timed_out) // 向用户进程返回就绪事件
goto fetch_events;
return res;
}
/**
* ep_events_available - Checks if ready events might be available.
*
* @ep: Pointer to the eventpoll context.
*
* Returns: Returns a value different than zero if ready events are available,
* or zero otherwise.
*/
static inline int ep_events_available(struct eventpoll *ep)
{
return !list_empty(&amp;ep-&gt;rdllist) || ep-&gt;ovflist != EP_UNACTIVE_PTR;
}
// file: include/linux/wait.h
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
q-&gt;flags = 0;
q-&gt;private = p; // 关联进程
q-&gt;func = default_wake_function; // wait 项上的回调函数
}
</code></pre>
<p>判断 ep->rdllist 和 ep->ovflist 后,如果没有事件,则让出 CPU,等待收包触发事件。</p>
<p>另一方面,软中断在收到包后,流程上和“同步接收”差不多:将包放到 sk->sk_receive_queue 上,然后调用 <code>sock_def_readable</code>,它主要是就是调用 wait 项上的函数。对于加入 epoll 的 socket,前面已经分析过,对应的函数就是 <code>ep_poll_callback</code>(在 epoll_ctl(ADD)->ep_insert->ep_ptable_queue_proc 中设置的):</p>
<pre><code class="language-c">// file: fs/eventpoll.c
/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
*/
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait); // container_of(p, struct eppoll_entry, wait)-&gt;base; // 即首先获取 struct eppoll_entry 对象,再获取其 base 字段
struct eventpoll *ep = epi-&gt;ep;
if ((unsigned long)key &amp; POLLFREE) {
ep_pwq_from_wait(wait)-&gt;whead = NULL;
/*
* whead = NULL above can race with ep_remove_wait_queue()
* which can do another remove_wait_queue() after us, so we
* can't use __remove_wait_queue(). whead-&gt;lock is held by
* the caller.
*/
list_del_init(&amp;wait-&gt;task_list); // 如果是 POLLFREE 才从等待队列中删除,否则保留?也就是说:这个 wait 项会一直放在 socket 的等待队列上?
}
spin_lock_irqsave(&amp;ep-&gt;lock, flags);
/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
if (!(epi-&gt;event.events &amp; ~EP_PRIVATE_BITS))
goto out_unlock;
/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the &quot;key&quot; parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for &quot;key&quot; != NULL before the event match test.
*/
if (key &amp;&amp; !((unsigned long) key &amp; epi-&gt;event.events))
goto out_unlock;
/*
* If we are transferring events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op-&gt;poll()
* semantics). All the events that happen during that period of time are
* chained in ep-&gt;ovflist and requeued later on.
*/
if (unlikely(ep-&gt;ovflist != EP_UNACTIVE_PTR)) {
if (epi-&gt;next == EP_UNACTIVE_PTR) {
epi-&gt;next = ep-&gt;ovflist;
ep-&gt;ovflist = epi;
if (epi-&gt;ws) {
/*
* Activate ep-&gt;ws since epi-&gt;ws may get
* deactivated at any time.
*/
__pm_stay_awake(ep-&gt;ws);
}
}
goto out_unlock;
}
/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&amp;epi-&gt;rdllink)) {
list_add_tail(&amp;epi-&gt;rdllink, &amp;ep-&gt;rdllist); // 核心操作:上文判断的就是这个 rdllist
ep_pm_stay_awake_rcu(epi);
}
/*
* Wake up ( if active ) both the eventpoll wait list and the -&gt;poll()
* wait list.
*/
if (waitqueue_active(&amp;ep-&gt;wq)) // !list_empty(&amp;q-&gt;task_list);
wake_up_locked(&amp;ep-&gt;wq); // epoll 上的等待队列,调用 wait 项上的回调函数,也是只唤醒 1 个等待项
if (waitqueue_active(&amp;ep-&gt;poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&amp;ep-&gt;lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&amp;ep-&gt;poll_wait);
return 1;
}
</code></pre>
<p>注意到这个过程涉及到 2 个 wait 项:一个是 socket 收到数据时,摘取一个 wait 项;另一个是(在执行 sokcet wait 项时)从 epoll 等待队列中摘取的 wait 项。</p>
<p>继续分析:
<code>default_wake_function</code> 会唤醒阻塞(即调用 epoll_wait 而睡眠)的进程,使其从暂停的代码处继续运行。会继续执行 <code>ep_poll</code> 里的 <code>ep_send_events</code> 来向用户进程返回就绪事件:</p>
<pre><code class="language-c">// file: fs/eventpoll.c
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &amp;esed, 0);
}
/**
* ep_scan_ready_list - Scans the ready list in a way that makes possible for
* the scan code, to call f_op-&gt;poll(). Also allows for
* O(NumReady) performance.
*
* @ep: Pointer to the epoll private data structure.
* @sproc: Pointer to the scan callback.
* @priv: Private opaque data passed to the @sproc callback.
* @depth: The current depth of recursive f_op-&gt;poll calls.
*
* Returns: The same integer error code returned by the @sproc callback.
*/
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv,
int depth)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() and epoll_ctl().
*/
mutex_lock_nested(&amp;ep-&gt;mtx, depth);
/*
* Steal the ready list, and re-init the original one to the
* empty list. Also, set ep-&gt;ovflist to NULL so that events
* happening while looping w/out locks, are not lost. We cannot
* have the poll callback to queue directly on ep-&gt;rdllist,
* because we want the &quot;sproc&quot; callback to be able to do it
* in a lockless way.
*/
spin_lock_irqsave(&amp;ep-&gt;lock, flags);
list_splice_init(&amp;ep-&gt;rdllist, &amp;txlist); // 将 rdllist 内容放到 txlist 中,并将 rdllist 清空
ep-&gt;ovflist = NULL; // 后面就绪的事件就放到 ovflist 中
spin_unlock_irqrestore(&amp;ep-&gt;lock, flags);
/*
* Now call the callback function.
*/
error = (*sproc)(ep, &amp;txlist, priv);
spin_lock_irqsave(&amp;ep-&gt;lock, flags);
/*
* During the time we spent inside the &quot;sproc&quot; callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
for (nepi = ep-&gt;ovflist; (epi = nepi) != NULL;
nepi = epi-&gt;next, epi-&gt;next = EP_UNACTIVE_PTR) {
/*
* We need to check if the item is already in the list.
* During the &quot;sproc&quot; callback execution time, items are
* queued into -&gt;ovflist but the &quot;txlist&quot; might already
* contain them, and the list_splice() below takes care of them.
*/
if (!ep_is_linked(&amp;epi-&gt;rdllink)) {
list_add_tail(&amp;epi-&gt;rdllink, &amp;ep-&gt;rdllist); // 将处理时间内产生的事件,放到 rdllist 中
ep_pm_stay_awake(epi);
}
}
/*
* We need to set back ep-&gt;ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep-&gt;rdllist.
*/
ep-&gt;ovflist = EP_UNACTIVE_PTR;
/*
* Quickly re-inject items left on &quot;txlist&quot;.
*/
list_splice(&amp;txlist, &amp;ep-&gt;rdllist); // 可能没有交付完,如总数限制,则再加回到 rdllist 中
__pm_relax(ep-&gt;ws);
if (!list_empty(&amp;ep-&gt;rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the -&gt;poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&amp;ep-&gt;wq)) // 如果还有事件,则尝试再唤醒一个 wait 项
wake_up_locked(&amp;ep-&gt;wq);
if (waitqueue_active(&amp;ep-&gt;poll_wait))
pwake++;
}
spin_unlock_irqrestore(&amp;ep-&gt;lock, flags);
mutex_unlock(&amp;ep-&gt;mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&amp;ep-&gt;poll_wait);
return error;
}
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
struct wakeup_source *ws;
poll_table pt;
init_poll_funcptr(&amp;pt, NULL);
/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding &quot;mtx&quot; during this call.
*/
for (eventcnt = 0, uevent = esed-&gt;events;
!list_empty(head) &amp;&amp; eventcnt &lt; esed-&gt;maxevents;) { // eventcnt 是计数;esed 用于向应用层返回数据
epi = list_first_entry(head, struct epitem, rdllink);
/*
* Activate ep-&gt;ws before deactivating epi-&gt;ws to prevent
* triggering auto-suspend here (in case we reactive epi-&gt;ws
* below).
*
* This could be rearranged to delay the deactivation of epi-&gt;ws
* instead, but then epi-&gt;ws would temporarily be out of sync
* with ep_is_linked().
*/
ws = ep_wakeup_source(epi);
if (ws) {
if (ws-&gt;active)
__pm_stay_awake(ep-&gt;ws);
__pm_relax(ws);
}
list_del_init(&amp;epi-&gt;rdllink); // 从就绪队列删除?
revents = ep_item_poll(epi, &amp;pt); // 获取 epi 对应 fd 的就绪事件。。之前 insert 时也调用了,对于 TCP 场景最终调用到:net/ipv4/tcp.c:tcp_poll()
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding &quot;mtx&quot;, so no operations coming from userspace
* can change the item.
*/
if (revents) { // 这里是有事件时处理,对于无事件的情况,则不会重新加到就绪列表,所以对于 LT 模式,这个时候才会从就绪列表中删除
if (__put_user(revents, &amp;uevent-&gt;events) ||
__put_user(epi-&gt;event.data, &amp;uevent-&gt;data)) {
list_add(&amp;epi-&gt;rdllink, head); // 复制失败,则重新加回到就绪列表中
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi-&gt;event.events &amp; EPOLLONESHOT)
epi-&gt;event.events &amp;= EP_PRIVATE_BITS;
else if (!(epi-&gt;event.events &amp; EPOLLET)) { // 非 ET 模式,则不会再加入到 rdllist 中
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep-&gt;rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding &quot;mtx&quot; and the
* poll callback will queue them in ep-&gt;ovflist.
*/
list_add_tail(&amp;epi-&gt;rdllink, &amp;ep-&gt;rdllist); // 重新加回队列有个问题:后面会再次检查就绪队列,重新唤醒一个 wait 项,就会导致惊群现象(在 LT 模式下)。
ep_pm_stay_awake(epi);
}
}
}
return eventcnt;
}
</code></pre>