epoll-添加socket
<h2>概述</h2>
<p>1、根据加入的 fd,获取对应的 socket 对象,并初始化 epitem
2、设置 socket 的 wait 函数 <code>ep_poll_callback</code>;同时也可能 poll 到一些事件
3、将 epi 插入到 struct eventpoll 对象(即 file 私有数据)的红黑树中
4、如果第 2 步中的事件,则加入到 ep 的就绪队列,然后唤醒进程</p>
<h2>注意点</h2>
<ul>
<li>epoll_ctl 不能并行,因为会抢占 mtx 资源。</li>
</ul>
<h2>分析</h2>
<p>系统调用:</p>
<pre><code class="language-c">// file: fs/eventpoll.c
/*
* The following function implements the controller interface for
* the eventpoll file that enables the insertion/removal/change of
* file descriptors inside the interest set.
*/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
if (ep_op_has_event(op) &amp;&amp;
copy_from_user(&amp;epds, event, sizeof(struct epoll_event)))
goto error_return;
/* Get the &quot;struct file *&quot; for the eventpoll file */
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;
/* Get the &quot;struct file *&quot; for the target file */
tfile = fget(fd);
if (!tfile)
goto error_fput;
/* The target file descriptor must support poll */
error = -EPERM;
if (!tfile-&gt;f_op || !tfile-&gt;f_op-&gt;poll)
goto error_tgt_fput;
/* Check if EPOLLWAKEUP is allowed */
if ((epds.events &amp; EPOLLWAKEUP) &amp;&amp; !capable(CAP_BLOCK_SUSPEND))
epds.events &amp;= ~EPOLLWAKEUP;
/*
* We have to check that the file structure underneath the file descriptor
* the user passed to us _is_ an eventpoll file. And also we do not permit
* adding an epoll file descriptor inside itself.
*/
error = -EINVAL;
if (file == tfile || !is_file_epoll(file))
goto error_tgt_fput;
/*
* At this point it is safe to assume that the &quot;private_data&quot; contains
* our own data structure.
*/
ep = file-&gt;private_data; // 核心管理结构 struct eventpoll {}
/*
* When we insert an epoll file descriptor, inside another epoll file
* descriptor, there is the change of creating closed loops, which are
* better be handled here, than in more critical paths. While we are
* checking for loops we also determine the list of files reachable
* and hang them on the tfile_check_list, so we can check that we
* haven't created too many possible wakeup paths.
*
* We need to hold the epmutex across both ep_insert and ep_remove
* b/c we want to make sure we are looking at a coherent view of
* epoll network.
*/
if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&amp;epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
if (ep_loop_check(ep, tfile) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&amp;tfile-&gt;f_tfile_llink, &amp;tfile_check_list);
}
// 此处的互斥锁是为了防止并发调用 epoll_ctl, 即保护内部数据结构
// 不会被并发的添加修改删除破坏
mutex_lock_nested(&amp;ep-&gt;mtx, 0);
/*
* Try to lookup the file inside our RB tree, Since we grabbed &quot;mtx&quot;
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) { // 之前没有添加过
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &amp;epds, tfile, fd); // 继续
} else
error = -EEXIST;
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &amp;epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&amp;ep-&gt;mtx);
error_tgt_fput:
if (did_lock_epmutex)
mutex_unlock(&amp;epmutex);
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
/*
* Must be called with &quot;mtx&quot; held.
*/
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
user_watches = atomic_long_read(&amp;ep-&gt;user-&gt;epoll_watches);
if (unlikely(user_watches &gt;= max_user_watches))
return -ENOSPC;
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) // 申请 epi 内存
return -ENOMEM;
/* Item initialization follow here ... */
INIT_LIST_HEAD(&amp;epi-&gt;rdllink);
INIT_LIST_HEAD(&amp;epi-&gt;fllink);
INIT_LIST_HEAD(&amp;epi-&gt;pwqlist); // 等待队列
epi-&gt;ep = ep;
ep_set_ffd(&amp;epi-&gt;ffd, tfile, fd); // 即 epi-&gt;ffd-&gt;file = tfile; epi-&gt;ffd-&gt;fd = fd;
epi-&gt;event = *event;
epi-&gt;nwait = 0;
epi-&gt;next = EP_UNACTIVE_PTR;
if (epi-&gt;event.events &amp; EPOLLWAKEUP) {
error = ep_create_wakeup_source(epi);
if (error)
goto error_create_wakeup_source;
} else {
RCU_INIT_POINTER(epi-&gt;ws, NULL);
}
/* Initialize the poll table using the queue callback */
epq.epi = epi; // 初始化 ep_pqueue 等待队列项,里面就 2 个字段:epi 和 pt
init_poll_funcptr(&amp;epq.pt, ep_ptable_queue_proc); // pt-&gt;_qproc = ep_ptable_queue_proc 这是函数指针
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
revents = ep_item_poll(epi, &amp;epq.pt); // 调用 ep_ptable_queue_proc 注意回调函数,为 ep_poll_callback。具体见下文分析
/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
error = -ENOMEM;
if (epi-&gt;nwait &lt; 0)
goto error_unregister;
/* Add the current item to the list of active epoll hook for this file */
spin_lock(&amp;tfile-&gt;f_lock);
list_add_tail(&amp;epi-&gt;fllink, &amp;tfile-&gt;f_ep_links);
spin_unlock(&amp;tfile-&gt;f_lock);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by &quot;mtx&quot;, and ep_insert() is called with &quot;mtx&quot; held.
*/
ep_rbtree_insert(ep, epi); // 将 epi 插入到 struct eventpoll 对象(即 file 私有数据)的红黑树中
/* now check if we've created too many backpaths */
error = -EINVAL;
if (reverse_path_check())
goto error_remove_epi;
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&amp;ep-&gt;lock, flags);
/* If the file is already &quot;ready&quot; we drop it inside the ready list */
if ((revents &amp; event-&gt;events) &amp;&amp; !ep_is_linked(&amp;epi-&gt;rdllink)) {
list_add_tail(&amp;epi-&gt;rdllink, &amp;ep-&gt;rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&amp;ep-&gt;wq))
wake_up_locked(&amp;ep-&gt;wq);
if (waitqueue_active(&amp;ep-&gt;poll_wait))
pwake++;
}
spin_unlock_irqrestore(&amp;ep-&gt;lock, flags);
atomic_long_inc(&amp;ep-&gt;user-&gt;epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&amp;ep-&gt;poll_wait);
return 0;
error_remove_epi:
spin_lock(&amp;tfile-&gt;f_lock);
if (ep_is_linked(&amp;epi-&gt;fllink))
list_del_init(&amp;epi-&gt;fllink);
spin_unlock(&amp;tfile-&gt;f_lock);
rb_erase(&amp;epi-&gt;rbn, &amp;ep-&gt;rbr);
error_unregister:
ep_unregister_pollwait(ep, epi);
/*
* We need to do this because an event could have been arrived on some
* allocated wait queue. Note that we don't care about the ep-&gt;ovflist
* list, since that is used/cleaned only inside a section bound by &quot;mtx&quot;.
* And ep_insert() is called with &quot;mtx&quot; held.
*/
spin_lock_irqsave(&amp;ep-&gt;lock, flags);
if (ep_is_linked(&amp;epi-&gt;rdllink))
list_del_init(&amp;epi-&gt;rdllink);
spin_unlock_irqrestore(&amp;ep-&gt;lock, flags);
wakeup_source_unregister(ep_wakeup_source(epi));
error_create_wakeup_source:
kmem_cache_free(epi_cache, epi);
return error;
}
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the &quot;rbr&quot; RB tree.
* Avoid increasing the size of this struct, there can be many thousands
* of these on a server and we do not want this to take another cache line.
*/
struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
struct rb_node rbn;
/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink;
/*
* Works together &quot;struct eventpoll&quot;-&gt;ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;
/* The file descriptor information this item refers to */
struct epoll_filefd ffd; // socket 文件及句柄信息
/* Number of active wait queue attached to poll operations */
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist; // 等待队列
/* The &quot;container&quot; of this item */
struct eventpoll *ep; // 管理对象(eventpoll 对象)
/* List header used to link this item to the &quot;struct file&quot; items list */
struct list_head fllink;
/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws;
/* The structure that describe the interested events and the source fd */
struct epoll_event event;
};
/* Wrapper struct used by poll queueing */
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
/* Setup the structure that is used as key for the RB tree */
static inline void ep_set_ffd(struct epoll_filefd *ffd,
struct file *file, int fd)
{
ffd-&gt;file = file;
ffd-&gt;fd = fd;
}
</code></pre>
<p>对于 <code>ep_item_poll</code>,代码如下:</p>
<pre><code class="language-c">// file: fs/eventpoll.c
static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
pt-&gt;_key = epi-&gt;event.events;
return epi-&gt;ffd.file-&gt;f_op-&gt;poll(epi-&gt;ffd.file, pt) &amp; epi-&gt;event.events; // 调用的是 socket 对应的 struct file* 上的 f_op-&gt;poll 函数
}
</code></pre>
<p>那么这个 <code>poll</code> 函数是什么呢?实际是 <code>sock_poll</code>,分析过程见(不对,这里的 fd 是通过 accept 获取到的,不是 socket 创建的):<a href="https://www.showdoc.com.cn/1832930169049935/10770316273234896">https://www.showdoc.com.cn/1832930169049935/10770316273234896</a></p>
<p>继续看 <code>sock_poll</code>:</p>
<pre><code class="language-c">// file: net/socket.c
/* No kernel lock held - perfect */
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
struct socket *sock;
/*
* We can't return errors to poll, so it's either yes or no.
*/
sock = file-&gt;private_data;
return sock-&gt;ops-&gt;poll(file, sock, wait); // 是在创建 socket 时设置的(详见 sokcet 系统调用代码)。对于 TCP 是 inet_stream_ops,里面的 .poll = tcp_poll
}
// file: net/ipv4/tcp.c
/*
* Wait for a TCP event.
*
* Note that we don't need to lock the socket, as the upper poll layers
* take care of normal races (between the test and the event) and we don't
* go look at any of the socket buffers directly.
*/
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask;
struct sock *sk = sock-&gt;sk;
const struct tcp_sock *tp = tcp_sk(sk);
sock_poll_wait(file, sk_sleep(sk), wait); // 判断 wait 是否已经挂上去,如果没有,则调用 wait-&gt;_qproc 来进行挂载,即上文的 ep_ptable_queue_proc。这里的 wait 就是上文的 pt,注意和之前阻塞式读取时的 wait 不一样,建议对比看一下
if (sk-&gt;sk_state == TCP_LISTEN)
return inet_csk_listen_poll(sk);
/* Socket is not locked. We are protected from async events
* by poll logic and correct handling of state changes
* made by other threads is impossible in any case.
*/
mask = 0;
/*
* POLLHUP is certainly not done right. But poll() doesn't
* have a notion of HUP in just one direction, and for a
* socket the read side is more interesting.
*
* Some poll() documentation says that POLLHUP is incompatible
* with the POLLOUT/POLLWR flags, so somebody should check this
* all. But careful, it tends to be safer to return too many
* bits than too few, and you can easily break real applications
* if you don't tell them that something has hung up!
*
* Check-me.
*
* Check number 1. POLLHUP is _UNMASKABLE_ event (see UNIX98 and
* our fs/select.c). It means that after we received EOF,
* poll always returns immediately, making impossible poll() on write()
* in state CLOSE_WAIT. One solution is evident --- to set POLLHUP
* if and only if shutdown has been made in both directions.
* Actually, it is interesting to look how Solaris and DUX
* solve this dilemma. I would prefer, if POLLHUP were maskable,
* then we could set it on SND_SHUTDOWN. BTW examples given
* in Stevens' books assume exactly this behaviour, it explains
* why POLLHUP is incompatible with POLLOUT. --ANK
*
* NOTE. Check for TCP_CLOSE is added. The goal is to prevent
* blocking on fresh not-connected or disconnected socket. --ANK
*/
if (sk-&gt;sk_shutdown == SHUTDOWN_MASK || sk-&gt;sk_state == TCP_CLOSE)
mask |= POLLHUP;
if (sk-&gt;sk_shutdown &amp; RCV_SHUTDOWN)
mask |= POLLIN | POLLRDNORM | POLLRDHUP;
/* Connected or passive Fast Open socket? */
if (sk-&gt;sk_state != TCP_SYN_SENT &amp;&amp;
(sk-&gt;sk_state != TCP_SYN_RECV || tp-&gt;fastopen_rsk != NULL)) {
int target = sock_rcvlowat(sk, 0, INT_MAX);
if (tp-&gt;urg_seq == tp-&gt;copied_seq &amp;&amp;
!sock_flag(sk, SOCK_URGINLINE) &amp;&amp;
tp-&gt;urg_data)
target++;
/* Potential race condition. If read of tp below will
* escape above sk-&gt;sk_state, we can be illegally awaken
* in SYN_* states. */
if (tp-&gt;rcv_nxt - tp-&gt;copied_seq &gt;= target)
mask |= POLLIN | POLLRDNORM;
if (!(sk-&gt;sk_shutdown &amp; SEND_SHUTDOWN)) {
if (sk_stream_wspace(sk) &gt;= sk_stream_min_wspace(sk)) {
mask |= POLLOUT | POLLWRNORM;
} else { /* send SIGIO later */
set_bit(SOCK_ASYNC_NOSPACE,
&amp;sk-&gt;sk_socket-&gt;flags);
set_bit(SOCK_NOSPACE, &amp;sk-&gt;sk_socket-&gt;flags);
/* Race breaker. If space is freed after
* wspace test but before the flags are set,
* IO signal will be lost.
*/
if (sk_stream_wspace(sk) &gt;= sk_stream_min_wspace(sk))
mask |= POLLOUT | POLLWRNORM;
}
} else
mask |= POLLOUT | POLLWRNORM;
if (tp-&gt;urg_data &amp; TCP_URG_VALID)
mask |= POLLPRI;
}
/* This barrier is coupled with smp_wmb() in tcp_reset() */
smp_rmb();
if (sk-&gt;sk_err)
mask |= POLLERR;
return mask;
}
EXPORT_SYMBOL(tcp_poll);
// file: include/net/sock.h
/**
* sock_poll_wait - place memory barrier behind the poll_wait call.
* @filp: file
* @wait_address: socket wait queue
* @p: poll_table
*
* See the comments in the wq_has_sleeper function.
*/
static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p)
{
if (!poll_does_not_wait(p) &amp;&amp; wait_address) {
poll_wait(filp, wait_address, p); // 继续
/* We need to be sure we are in sync with the
* socket flags modification.
*
* This memory barrier is paired in the wq_has_sleeper.
*/
smp_mb();
}
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p &amp;&amp; p-&gt;_qproc &amp;&amp; wait_address)
p-&gt;_qproc(filp, wait_address, p); // 就是上面的 init_poll_funcptr(&amp;epq.pt, ep_ptable_queue_proc); 里的函数。这里的 filp 是 socket 对应的 file
}
</code></pre>
<p>也就是说,本质上就是调用 <code>ep_ptable_queue_proc</code> 来注册函数,见下:</p>
<pre><code class="language-c">// file: fs/eventpoll.c
/*
* This is the callback that is used to add our wait queue to the
* target file wakeup lists.
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt) // file:socket 对应的文件;whead::socket 上的等待队列(即 sk-&gt;sk_wq-&gt;wait);pt:存放 epi 和 函数+key
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi-&gt;nwait &gt;= 0 &amp;&amp; (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { // 单独审计 pwq 挂在 sk-&gt;sk_wq-&gt;wait 上
init_waitqueue_func_entry(&amp;pwq-&gt;wait, ep_poll_callback); // wait 项里注册的是 ep_poll_callback
pwq-&gt;whead = whead;
pwq-&gt;base = epi; // 后文在执行 wait 的回调函数 ep_poll_callback 时,会从这个字段获取到关联的 epi 信息
add_wait_queue(whead, &amp;pwq-&gt;wait); // 将 wait 项添加到 socket 的等待队列(即 sk-&gt;sk_wq-&gt;wait)(用 wait-&gt;task_list 链起来),在 socket 有数据的时候,会获取 wait 项执行里面的回调函数。注意不是 epoll 的等待队列。
list_add_tail(&amp;pwq-&gt;llink, &amp;epi-&gt;pwqlist);
epi-&gt;nwait++;
} else {
/* We have to signal that an error occurred */
epi-&gt;nwait = -1;
}
}
// file: include/linux/wait.h
static inline void init_waitqueue_func_entry(wait_queue_t *q,
wait_queue_func_t func)
{
q-&gt;flags = 0;
q-&gt;private = NULL;
q-&gt;func = func;
}
</code></pre>