公开学习文档

公开学习文档


epoll-添加socket

<h2>概述</h2> <p>1、根据加入的 fd,获取对应的 socket 对象,并初始化 epitem 2、设置 socket 的 wait 函数 <code>ep_poll_callback</code>;同时也可能 poll 到一些事件 3、将 epi 插入到 struct eventpoll 对象(即 file 私有数据)的红黑树中 4、如果第 2 步中的事件,则加入到 ep 的就绪队列,然后唤醒进程</p> <h2>注意点</h2> <ul> <li>epoll_ctl 不能并行,因为会抢占 mtx 资源。</li> </ul> <h2>分析</h2> <p>系统调用:</p> <pre><code class="language-c">// file: fs/eventpoll.c /* * The following function implements the controller interface for * the eventpoll file that enables the insertion/removal/change of * file descriptors inside the interest set. */ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; int did_lock_epmutex = 0; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; error = -EFAULT; if (ep_op_has_event(op) &amp;amp;&amp;amp; copy_from_user(&amp;amp;epds, event, sizeof(struct epoll_event))) goto error_return; /* Get the &amp;quot;struct file *&amp;quot; for the eventpoll file */ error = -EBADF; file = fget(epfd); if (!file) goto error_return; /* Get the &amp;quot;struct file *&amp;quot; for the target file */ tfile = fget(fd); if (!tfile) goto error_fput; /* The target file descriptor must support poll */ error = -EPERM; if (!tfile-&amp;gt;f_op || !tfile-&amp;gt;f_op-&amp;gt;poll) goto error_tgt_fput; /* Check if EPOLLWAKEUP is allowed */ if ((epds.events &amp;amp; EPOLLWAKEUP) &amp;amp;&amp;amp; !capable(CAP_BLOCK_SUSPEND)) epds.events &amp;amp;= ~EPOLLWAKEUP; /* * We have to check that the file structure underneath the file descriptor * the user passed to us _is_ an eventpoll file. And also we do not permit * adding an epoll file descriptor inside itself. */ error = -EINVAL; if (file == tfile || !is_file_epoll(file)) goto error_tgt_fput; /* * At this point it is safe to assume that the &amp;quot;private_data&amp;quot; contains * our own data structure. */ ep = file-&amp;gt;private_data; // 核心管理结构 struct eventpoll {} /* * When we insert an epoll file descriptor, inside another epoll file * descriptor, there is the change of creating closed loops, which are * better be handled here, than in more critical paths. While we are * checking for loops we also determine the list of files reachable * and hang them on the tfile_check_list, so we can check that we * haven't created too many possible wakeup paths. * * We need to hold the epmutex across both ep_insert and ep_remove * b/c we want to make sure we are looking at a coherent view of * epoll network. */ if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) { mutex_lock(&amp;amp;epmutex); did_lock_epmutex = 1; } if (op == EPOLL_CTL_ADD) { if (is_file_epoll(tfile)) { error = -ELOOP; if (ep_loop_check(ep, tfile) != 0) { clear_tfile_check_list(); goto error_tgt_fput; } } else list_add(&amp;amp;tfile-&amp;gt;f_tfile_llink, &amp;amp;tfile_check_list); } // 此处的互斥锁是为了防止并发调用 epoll_ctl, 即保护内部数据结构 // 不会被并发的添加修改删除破坏 mutex_lock_nested(&amp;amp;ep-&amp;gt;mtx, 0); /* * Try to lookup the file inside our RB tree, Since we grabbed &amp;quot;mtx&amp;quot; * above, we can be sure to be able to use the item looked up by * ep_find() till we release the mutex. */ epi = ep_find(ep, tfile, fd); error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) { // 之前没有添加过 epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &amp;amp;epds, tfile, fd); // 继续 } else error = -EEXIST; clear_tfile_check_list(); break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &amp;amp;epds); } else error = -ENOENT; break; } mutex_unlock(&amp;amp;ep-&amp;gt;mtx); error_tgt_fput: if (did_lock_epmutex) mutex_unlock(&amp;amp;epmutex); fput(tfile); error_fput: fput(file); error_return: return error; } /* * Must be called with &amp;quot;mtx&amp;quot; held. */ static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) { int error, revents, pwake = 0; unsigned long flags; long user_watches; struct epitem *epi; struct ep_pqueue epq; user_watches = atomic_long_read(&amp;amp;ep-&amp;gt;user-&amp;gt;epoll_watches); if (unlikely(user_watches &amp;gt;= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) // 申请 epi 内存 return -ENOMEM; /* Item initialization follow here ... */ INIT_LIST_HEAD(&amp;amp;epi-&amp;gt;rdllink); INIT_LIST_HEAD(&amp;amp;epi-&amp;gt;fllink); INIT_LIST_HEAD(&amp;amp;epi-&amp;gt;pwqlist); // 等待队列 epi-&amp;gt;ep = ep; ep_set_ffd(&amp;amp;epi-&amp;gt;ffd, tfile, fd); // 即 epi-&amp;gt;ffd-&amp;gt;file = tfile; epi-&amp;gt;ffd-&amp;gt;fd = fd; epi-&amp;gt;event = *event; epi-&amp;gt;nwait = 0; epi-&amp;gt;next = EP_UNACTIVE_PTR; if (epi-&amp;gt;event.events &amp;amp; EPOLLWAKEUP) { error = ep_create_wakeup_source(epi); if (error) goto error_create_wakeup_source; } else { RCU_INIT_POINTER(epi-&amp;gt;ws, NULL); } /* Initialize the poll table using the queue callback */ epq.epi = epi; // 初始化 ep_pqueue 等待队列项,里面就 2 个字段:epi 和 pt init_poll_funcptr(&amp;amp;epq.pt, ep_ptable_queue_proc); // pt-&amp;gt;_qproc = ep_ptable_queue_proc 这是函数指针 /* * Attach the item to the poll hooks and get current event bits. * We can safely use the file* here because its usage count has * been increased by the caller of this function. Note that after * this operation completes, the poll callback can start hitting * the new item. */ revents = ep_item_poll(epi, &amp;amp;epq.pt); // 调用 ep_ptable_queue_proc 注意回调函数,为 ep_poll_callback。具体见下文分析 /* * We have to check if something went wrong during the poll wait queue * install process. Namely an allocation for a wait queue failed due * high memory pressure. */ error = -ENOMEM; if (epi-&amp;gt;nwait &amp;lt; 0) goto error_unregister; /* Add the current item to the list of active epoll hook for this file */ spin_lock(&amp;amp;tfile-&amp;gt;f_lock); list_add_tail(&amp;amp;epi-&amp;gt;fllink, &amp;amp;tfile-&amp;gt;f_ep_links); spin_unlock(&amp;amp;tfile-&amp;gt;f_lock); /* * Add the current item to the RB tree. All RB tree operations are * protected by &amp;quot;mtx&amp;quot;, and ep_insert() is called with &amp;quot;mtx&amp;quot; held. */ ep_rbtree_insert(ep, epi); // 将 epi 插入到 struct eventpoll 对象(即 file 私有数据)的红黑树中 /* now check if we've created too many backpaths */ error = -EINVAL; if (reverse_path_check()) goto error_remove_epi; /* We have to drop the new item inside our item list to keep track of it */ spin_lock_irqsave(&amp;amp;ep-&amp;gt;lock, flags); /* If the file is already &amp;quot;ready&amp;quot; we drop it inside the ready list */ if ((revents &amp;amp; event-&amp;gt;events) &amp;amp;&amp;amp; !ep_is_linked(&amp;amp;epi-&amp;gt;rdllink)) { list_add_tail(&amp;amp;epi-&amp;gt;rdllink, &amp;amp;ep-&amp;gt;rdllist); ep_pm_stay_awake(epi); /* Notify waiting tasks that events are available */ if (waitqueue_active(&amp;amp;ep-&amp;gt;wq)) wake_up_locked(&amp;amp;ep-&amp;gt;wq); if (waitqueue_active(&amp;amp;ep-&amp;gt;poll_wait)) pwake++; } spin_unlock_irqrestore(&amp;amp;ep-&amp;gt;lock, flags); atomic_long_inc(&amp;amp;ep-&amp;gt;user-&amp;gt;epoll_watches); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&amp;amp;ep-&amp;gt;poll_wait); return 0; error_remove_epi: spin_lock(&amp;amp;tfile-&amp;gt;f_lock); if (ep_is_linked(&amp;amp;epi-&amp;gt;fllink)) list_del_init(&amp;amp;epi-&amp;gt;fllink); spin_unlock(&amp;amp;tfile-&amp;gt;f_lock); rb_erase(&amp;amp;epi-&amp;gt;rbn, &amp;amp;ep-&amp;gt;rbr); error_unregister: ep_unregister_pollwait(ep, epi); /* * We need to do this because an event could have been arrived on some * allocated wait queue. Note that we don't care about the ep-&amp;gt;ovflist * list, since that is used/cleaned only inside a section bound by &amp;quot;mtx&amp;quot;. * And ep_insert() is called with &amp;quot;mtx&amp;quot; held. */ spin_lock_irqsave(&amp;amp;ep-&amp;gt;lock, flags); if (ep_is_linked(&amp;amp;epi-&amp;gt;rdllink)) list_del_init(&amp;amp;epi-&amp;gt;rdllink); spin_unlock_irqrestore(&amp;amp;ep-&amp;gt;lock, flags); wakeup_source_unregister(ep_wakeup_source(epi)); error_create_wakeup_source: kmem_cache_free(epi_cache, epi); return error; } /* * Each file descriptor added to the eventpoll interface will * have an entry of this type linked to the &amp;quot;rbr&amp;quot; RB tree. * Avoid increasing the size of this struct, there can be many thousands * of these on a server and we do not want this to take another cache line. */ struct epitem { /* RB tree node used to link this structure to the eventpoll RB tree */ struct rb_node rbn; /* List header used to link this structure to the eventpoll ready list */ struct list_head rdllink; /* * Works together &amp;quot;struct eventpoll&amp;quot;-&amp;gt;ovflist in keeping the * single linked chain of items. */ struct epitem *next; /* The file descriptor information this item refers to */ struct epoll_filefd ffd; // socket 文件及句柄信息 /* Number of active wait queue attached to poll operations */ int nwait; /* List containing poll wait queues */ struct list_head pwqlist; // 等待队列 /* The &amp;quot;container&amp;quot; of this item */ struct eventpoll *ep; // 管理对象(eventpoll 对象) /* List header used to link this item to the &amp;quot;struct file&amp;quot; items list */ struct list_head fllink; /* wakeup_source used when EPOLLWAKEUP is set */ struct wakeup_source __rcu *ws; /* The structure that describe the interested events and the source fd */ struct epoll_event event; }; /* Wrapper struct used by poll queueing */ struct ep_pqueue { poll_table pt; struct epitem *epi; }; /* Setup the structure that is used as key for the RB tree */ static inline void ep_set_ffd(struct epoll_filefd *ffd, struct file *file, int fd) { ffd-&amp;gt;file = file; ffd-&amp;gt;fd = fd; } </code></pre> <p>对于 <code>ep_item_poll</code>,代码如下:</p> <pre><code class="language-c">// file: fs/eventpoll.c static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt) { pt-&amp;gt;_key = epi-&amp;gt;event.events; return epi-&amp;gt;ffd.file-&amp;gt;f_op-&amp;gt;poll(epi-&amp;gt;ffd.file, pt) &amp;amp; epi-&amp;gt;event.events; // 调用的是 socket 对应的 struct file* 上的 f_op-&amp;gt;poll 函数 } </code></pre> <p>那么这个 <code>poll</code> 函数是什么呢?实际是 <code>sock_poll</code>,分析过程见(不对,这里的 fd 是通过 accept 获取到的,不是 socket 创建的):<a href="https://www.showdoc.com.cn/1832930169049935/10770316273234896">https://www.showdoc.com.cn/1832930169049935/10770316273234896</a></p> <p>继续看 <code>sock_poll</code>:</p> <pre><code class="language-c">// file: net/socket.c /* No kernel lock held - perfect */ static unsigned int sock_poll(struct file *file, poll_table *wait) { struct socket *sock; /* * We can't return errors to poll, so it's either yes or no. */ sock = file-&amp;gt;private_data; return sock-&amp;gt;ops-&amp;gt;poll(file, sock, wait); // 是在创建 socket 时设置的(详见 sokcet 系统调用代码)。对于 TCP 是 inet_stream_ops,里面的 .poll = tcp_poll } // file: net/ipv4/tcp.c /* * Wait for a TCP event. * * Note that we don't need to lock the socket, as the upper poll layers * take care of normal races (between the test and the event) and we don't * go look at any of the socket buffers directly. */ unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait) { unsigned int mask; struct sock *sk = sock-&amp;gt;sk; const struct tcp_sock *tp = tcp_sk(sk); sock_poll_wait(file, sk_sleep(sk), wait); // 判断 wait 是否已经挂上去,如果没有,则调用 wait-&amp;gt;_qproc 来进行挂载,即上文的 ep_ptable_queue_proc。这里的 wait 就是上文的 pt,注意和之前阻塞式读取时的 wait 不一样,建议对比看一下 if (sk-&amp;gt;sk_state == TCP_LISTEN) return inet_csk_listen_poll(sk); /* Socket is not locked. We are protected from async events * by poll logic and correct handling of state changes * made by other threads is impossible in any case. */ mask = 0; /* * POLLHUP is certainly not done right. But poll() doesn't * have a notion of HUP in just one direction, and for a * socket the read side is more interesting. * * Some poll() documentation says that POLLHUP is incompatible * with the POLLOUT/POLLWR flags, so somebody should check this * all. But careful, it tends to be safer to return too many * bits than too few, and you can easily break real applications * if you don't tell them that something has hung up! * * Check-me. * * Check number 1. POLLHUP is _UNMASKABLE_ event (see UNIX98 and * our fs/select.c). It means that after we received EOF, * poll always returns immediately, making impossible poll() on write() * in state CLOSE_WAIT. One solution is evident --- to set POLLHUP * if and only if shutdown has been made in both directions. * Actually, it is interesting to look how Solaris and DUX * solve this dilemma. I would prefer, if POLLHUP were maskable, * then we could set it on SND_SHUTDOWN. BTW examples given * in Stevens' books assume exactly this behaviour, it explains * why POLLHUP is incompatible with POLLOUT. --ANK * * NOTE. Check for TCP_CLOSE is added. The goal is to prevent * blocking on fresh not-connected or disconnected socket. --ANK */ if (sk-&amp;gt;sk_shutdown == SHUTDOWN_MASK || sk-&amp;gt;sk_state == TCP_CLOSE) mask |= POLLHUP; if (sk-&amp;gt;sk_shutdown &amp;amp; RCV_SHUTDOWN) mask |= POLLIN | POLLRDNORM | POLLRDHUP; /* Connected or passive Fast Open socket? */ if (sk-&amp;gt;sk_state != TCP_SYN_SENT &amp;amp;&amp;amp; (sk-&amp;gt;sk_state != TCP_SYN_RECV || tp-&amp;gt;fastopen_rsk != NULL)) { int target = sock_rcvlowat(sk, 0, INT_MAX); if (tp-&amp;gt;urg_seq == tp-&amp;gt;copied_seq &amp;amp;&amp;amp; !sock_flag(sk, SOCK_URGINLINE) &amp;amp;&amp;amp; tp-&amp;gt;urg_data) target++; /* Potential race condition. If read of tp below will * escape above sk-&amp;gt;sk_state, we can be illegally awaken * in SYN_* states. */ if (tp-&amp;gt;rcv_nxt - tp-&amp;gt;copied_seq &amp;gt;= target) mask |= POLLIN | POLLRDNORM; if (!(sk-&amp;gt;sk_shutdown &amp;amp; SEND_SHUTDOWN)) { if (sk_stream_wspace(sk) &amp;gt;= sk_stream_min_wspace(sk)) { mask |= POLLOUT | POLLWRNORM; } else { /* send SIGIO later */ set_bit(SOCK_ASYNC_NOSPACE, &amp;amp;sk-&amp;gt;sk_socket-&amp;gt;flags); set_bit(SOCK_NOSPACE, &amp;amp;sk-&amp;gt;sk_socket-&amp;gt;flags); /* Race breaker. If space is freed after * wspace test but before the flags are set, * IO signal will be lost. */ if (sk_stream_wspace(sk) &amp;gt;= sk_stream_min_wspace(sk)) mask |= POLLOUT | POLLWRNORM; } } else mask |= POLLOUT | POLLWRNORM; if (tp-&amp;gt;urg_data &amp;amp; TCP_URG_VALID) mask |= POLLPRI; } /* This barrier is coupled with smp_wmb() in tcp_reset() */ smp_rmb(); if (sk-&amp;gt;sk_err) mask |= POLLERR; return mask; } EXPORT_SYMBOL(tcp_poll); // file: include/net/sock.h /** * sock_poll_wait - place memory barrier behind the poll_wait call. * @filp: file * @wait_address: socket wait queue * @p: poll_table * * See the comments in the wq_has_sleeper function. */ static inline void sock_poll_wait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { if (!poll_does_not_wait(p) &amp;amp;&amp;amp; wait_address) { poll_wait(filp, wait_address, p); // 继续 /* We need to be sure we are in sync with the * socket flags modification. * * This memory barrier is paired in the wq_has_sleeper. */ smp_mb(); } } static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p &amp;amp;&amp;amp; p-&amp;gt;_qproc &amp;amp;&amp;amp; wait_address) p-&amp;gt;_qproc(filp, wait_address, p); // 就是上面的 init_poll_funcptr(&amp;amp;epq.pt, ep_ptable_queue_proc); 里的函数。这里的 filp 是 socket 对应的 file } </code></pre> <p>也就是说,本质上就是调用 <code>ep_ptable_queue_proc</code> 来注册函数,见下:</p> <pre><code class="language-c">// file: fs/eventpoll.c /* * This is the callback that is used to add our wait queue to the * target file wakeup lists. */ static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) // file:socket 对应的文件;whead::socket 上的等待队列(即 sk-&amp;gt;sk_wq-&amp;gt;wait);pt:存放 epi 和 函数+key { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq; if (epi-&amp;gt;nwait &amp;gt;= 0 &amp;amp;&amp;amp; (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { // 单独审计 pwq 挂在 sk-&amp;gt;sk_wq-&amp;gt;wait 上 init_waitqueue_func_entry(&amp;amp;pwq-&amp;gt;wait, ep_poll_callback); // wait 项里注册的是 ep_poll_callback pwq-&amp;gt;whead = whead; pwq-&amp;gt;base = epi; // 后文在执行 wait 的回调函数 ep_poll_callback 时,会从这个字段获取到关联的 epi 信息 add_wait_queue(whead, &amp;amp;pwq-&amp;gt;wait); // 将 wait 项添加到 socket 的等待队列(即 sk-&amp;gt;sk_wq-&amp;gt;wait)(用 wait-&amp;gt;task_list 链起来),在 socket 有数据的时候,会获取 wait 项执行里面的回调函数。注意不是 epoll 的等待队列。 list_add_tail(&amp;amp;pwq-&amp;gt;llink, &amp;amp;epi-&amp;gt;pwqlist); epi-&amp;gt;nwait++; } else { /* We have to signal that an error occurred */ epi-&amp;gt;nwait = -1; } } // file: include/linux/wait.h static inline void init_waitqueue_func_entry(wait_queue_t *q, wait_queue_func_t func) { q-&amp;gt;flags = 0; q-&amp;gt;private = NULL; q-&amp;gt;func = func; } </code></pre>

页面列表

ITEM_HTML