fdevent系统-事件处理

最后更新于:2022-04-01 15:57:10

我们仍然从server.c的main函数开始。 当完成fdevent的初始化之后,第一个需要fdevent处理的事情就是将在初始化网络的过程中得到的监听fd(socket函数的返回值)注册到fdevent系统中。 该动作调用的是network_register_fdevents()函数,定义在network.c文件中: ~~~ /** * 在fd events系统中注册监听socket。 * 这个函数在子进程中被调用。 */ int network_register_fdevents(server * srv) { size_t i; if (-1 == fdevent_reset(srv->ev)){return -1;} /* * register fdevents after reset */ /* 遍历所有的监听fd并将其注册到fdevent系统中 */ for (i = 0; i < srv->srv_sockets.used; i++) { server_socket *srv_socket = srv->srv_sockets.ptr[i]; fdevent_register(srv->ev, srv_socket->fd, network_server_handle_fdevent, srv_socket); fdevent_event_add(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN); } return 0; } ~~~ 在初始化网络的过程中,调用socket函数之后,将其返回值(监听fd)保存在server结构体的srv_sockets成员中,这个成员是一个server_socket_array结构体,而server_socket_array结构体是server_socket结构体的指针数组。 server_socket结构体定义如下: ~~~ typedef struct { sock_addr addr; //socket fd对应的的地址。 int fd; //socket()函数返回的监听fd int fde_ndx; //和fd相同。 buffer *ssl_pemfile; buffer *ssl_ca_file; buffer *ssl_cipher_list; unsigned short ssl_use_sslv2; unsigned short use_ipv6; //标记是否使用ipv6 unsigned short is_ssl; buffer *srv_token; #ifdef USE_OPENSSL SSL_CTX *ssl_ctx; #endif unsigned short is_proxy_ssl; } server_socket; ~~~ fdevent_register()函数: ~~~ int fdevent_register(fdevents * ev, int fd, fdevent_handler handler, void *ctx) { /* 创建一个fdnode的实例,然后对其成员赋值 */ fdnode *fdn; fdn = fdnode_init(); fdn->handler = handler; fdn->fd = fd; fdn->ctx = ctx; /* 以fd为下标将实例存入fdevents结构体中的fdarray数组中。 * fd作为下标可以将查询时间变为 O(1) */ ev->fdarray[fd] = fdn; return 0; } ~~~ 第三个参数是一个函数指针,其定义为 ~~~ typedef handler_t(*fdevent_handler) (void *srv, void *ctx, int revents) ~~~ 这个函数指针对应XXX_handle_fdevent()类型的函数。比如network.c/ network_server_handle_fdevent() ,connections.c/ connection_handle_fdevent()。 这些函数的作用是在fdevent系统检测到fd有IO事件发生时,处理这些IO事件。 比如,network_server_handle_fdevent()处理监听fd(socket函数的返回值)发生的IO事件; connection_handle_fdevent()处理连接fd(accept函数的返回值)发生的IO事件。 除了上面的两个函数,还有 ~~~ stat_cacahe.c/stat_cache_handle_fdevent(), mod_cgi.c/cgi_handle_fdevent(), mod_fastcgi.c/fcgi_handle_fdevent(), mod_proxy.c/ proxy_handle_fdevent()和 mod_scgi.c/scgi_handle_fdevent()等。 ~~~ fdevent_event_add函数: ~~~ int fdevent_event_add(fdevents * ev, int *fde_ndx, int fd, int events) { int fde = fde_ndx ? *fde_ndx : -1; if (ev->event_add) fde = ev->event_add(ev, fde, fd, events) if (fde_ndx) *fde_ndx = fde; return 0; } ~~~ 函数中调用了fdevents结构体中event_add函数指针对应的函数。 我们看看fdevent_linux_sysepoll.c中的fdevent_linux_sysepoll_event_add()函数,这个函数的地址在初始化的时候被赋给fdevents中的event_add指针: ~~~ static int fdevent_linux_sysepoll_event_add(fdevents * ev, int fde_ndx, int fd, int events) { struct epoll_event ep; int add = 0; if (fde_ndx == -1) //描述符不在epoll的检测中,增加之。 add = 1; memset(&ep, 0, sizeof(ep)); ep.events = 0; /** * 在ep中设置需要监听的IO事件。 * EPOLLIN : 描述符可读。 * EPOLLOUT :描述符可写。 * 其他的事件还有:EPOLLRDHUP , EPOLLPRI, EPOLLERR, EPOLLHUP, EPOLLET, EPOLLONESHOT等。 */ if (events & FDEVENT_IN) ep.events |= EPOLLIN; if (events & FDEVENT_OUT) ep.events |= EPOLLOUT; /* * EPOLLERR :描述符发生错误。 * EPOLLHUP :描述符被挂断。通常是连接断开。 */ ep.events |= EPOLLERR | EPOLLHUP /* | EPOLLET */ ; ep.data.ptr = NULL; ep.data.fd = fd; /* * EPOLL_CTL_ADD : 增加描述符fd到ev->epoll_fd中,并关联ep中的事件到fd上。 * EPOLL_CTL_MOD : 修改fd所关联的事件。 */ if (0 != epoll_ctl(ev->epoll_fd, add ?EPOLL_CTL_ADD : EPOLL_CTL_MOD, fd, &ep)) { fprintf(stderr, "%s.%d: epoll_ctl failed: %s, dying\n",__FILE__,__LINE__, strerror(errno)); SEGFAULT(); return 0; } return fd; } ~~~ 函数的第四个参数events是一个整型,每一位对应一种IO事件。 在network_register_fdevents函数中,传给fdevent_event_add()函数的第四个参数是FDEVENT_IN,这是一个宏: ~~~ /* * 用于标记文件描述符的状态 */ #define FDEVENT_IN BV(0) //文件描述符是否可写 #define FDEVENT_PRI BV(1) //不阻塞的可读高优先级的数据 poll #define FDEVENT_OUT BV(2) //文件描述符是否可读 #define FDEVENT_ERR BV(3) //文件描述符是否出错 #define FDEVENT_HUP BV(4) //已挂断 poll #define FDEVENT_NVAL BV(5) //描述符不引用一打开文件 poll ~~~ 其中BV也是一个宏,定义在settings.c文件中: ~~~ #define BV(x) (1 << x) ~~~ 其作用就是将一个整数变量第x位置1,其余置0。 通过这些宏,就可以在一个整数中用不同的位表示不同的事件; 这些宏和epoll.h中的枚举EPOLL_EVENTS对应。 由于当有连接请求时,监听fd的表现是有数据可读,因此,只监听其FDEVENT_IN事件。注册之后,监听fd就开始等待连接请求。 ~~~ //启动事件轮询。底层使用的是IO多路转接。 if ((n = fdevent_poll(srv->ev, 1000)) > 0) { /* n是事件的数量 */ int revents; int fd_ndx = -1; /* 逐个处理已经准备好的请求,直到所有的请求处理结束 */ do { fdevent_handler handler; void *context; handler_t r; fd_ndx = fdevent_event_next_fdndx(srv->ev, fd_ndx); //获得发生了 I/O 事件的文件描述符在 fdarray 中的索引 revents = fdevent_event_get_revent(srv->ev, fd_ndx); //获得该文件描述符上发生的 I/O 事件类型 fd = fdevent_event_get_fd(srv->ev, fd_ndx); //获得该文件描述符 handler = fdevent_get_handler(srv->ev, fd); //获得 I/O 事件处理的回调函数 context = fdevent_get_context(srv->ev, fd); //获得 I/O 事件处理的上下文环境 /* * connection_handle_fdevent needs a joblist_append */ /** * 调用回调函数进行I/O事件处理,并传入相关参数 */ switch (r = (*handler) (srv, context, revents)) { case HANDLER_FINISHED: case HANDLER_GO_ON: case HANDLER_WAIT_FOR_EVENT: case HANDLER_WAIT_FOR_FD: break; case HANDLER_ERROR: SEGFAULT(); break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } }while (--n > 0); } else if (n < 0 && errno != EINTR) { log_error_write(srv, __FILE__, __LINE__, "ss","fdevent_poll failed:", strerror(errno)); } ~~~ 首先调用fdevent_poll()函数等待IO事件发生,如果没有IO事件,程序会阻塞在这个函数中。 如果有fd发生了IO事件,则从fdevent_poll函数中返回,返回值是发生了IO事件的fd的数量。 fdevent_poll()函数调用fdevents结构体中的poll,最终调用的是epoll_wait()函数。epoll_wait()函数将发生了IO事件的fd对应的epoll_evet结构体实例存储在fdevents结构体的epoll_events数组成员中。 fdevent_event_next_fdndx函数返回epoll_events数组中下一个元素的下标,fdevent_event_get_revent函数调用ev->event_get_revent()获得fd发生的IO事件,最终调用的是: ~~~ static int fdevent_linux_sysepoll_event_get_revent(fdevents * ev, size_t ndx) { int events = 0, e; e = ev->epoll_events[ndx].events; if (e & EPOLLIN) events |= FDEVENT_IN; if (e & EPOLLOUT) events |= FDEVENT_OUT; if (e & EPOLLERR) events |= FDEVENT_ERR; if (e & EPOLLHUP) events |= FDEVENT_HUP; if (e & EPOLLPRI) //有紧急数据到达(带外数据) events |= FDEVENT_PRI; return e; } ~~~ 这个函数就做了一个转换。 最后,在switch语句中调用fd对应的handler函数处理事件。对于监听fd,调用的函数为: ~~~ /** * 这个是监听socket的IO事件处理函数。 * 主要工作是建立和客户端的socket连接。只处理读事件。在处理过程中, * 每次调用这个函数都试图一次建立100个连接,这样可以提高效率。 */ handler_t network_server_handle_fdevent(void *s, void *context, int revents) { server *srv = (server *) s; server_socket *srv_socket = (server_socket *) context; connection *con; int loops = 0; UNUSED(context); /* * 只有fd事件是FDEVENT_IN时,才进行事件处理。 */ if (revents != FDEVENT_IN) { log_error_write(srv, __FILE__, __LINE__, "sdd", "strange event for server socket", srv_socket->fd, revents); return HANDLER_ERROR; } /* * accept()s at most 100 connections directly we jump out after 100 to give the waiting connections a chance *一次监听fd的IO事件,表示有客户端请求连接,对其的处理就是建立连接。建立连接后并不急着退出函数, * 而是继续尝试建立新连接,直到已经建立了100次连接。这样可以提高效率。 */ for (loops = 0; loops < 100 && NULL != (con =connection_accept(srv, srv_socket)); loops++) { handler_t r; //根据当前状态,改变con的状态机,并做出相应的动作。 connection_state_machine(srv, con); switch (r = plugins_call_handle_joblist(srv, con)) { case HANDLER_FINISHED: case HANDLER_GO_ON: break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } } return HANDLER_GO_ON; } ~~~ 监听fd有IO事件,表示有客户端请求连接,对其的处理就是建立连接。在这个函数中,建立连接后并不急着退出,而是继续【尝试】建立新连接,直到已经建立了100次连接。这样可以提高效率。 connection_accept()函数接受连接请求并返回一个connection结构体指针。接着对这个连接启动状态机。然后把连接加到作业队列中。 注意,在将监听fd注册到fdevent系统时,它被设置成了非阻塞的,因此,如果在调用accept()函数时没有连接请求,那么accept()函数会直接出错返回,这样connection_accept就返回一个NULL,退出了for循环。因此,这里所说的建立100次连接只是”尝试“而已,并不会在没有连接的时候阻塞。 至此,fdevent系统对于监听fd的处理就完成了一个循环。 处理完IO事件以后fd接着在epoll中等待下一次事件。
';