fdevent系统-事件处理
最后更新于:2022-04-01 15:57:10
我们仍然从server.c的main函数开始。
当完成fdevent的初始化之后,第一个需要fdevent处理的事情就是将在初始化网络的过程中得到的监听fd(socket函数的返回值)注册到fdevent系统中。
该动作调用的是network_register_fdevents()函数,定义在network.c文件中:
~~~
/**
* 在fd events系统中注册监听socket。
* 这个函数在子进程中被调用。
*/
int network_register_fdevents(server * srv)
{
size_t i;
if (-1 == fdevent_reset(srv->ev)){return -1;}
/*
* register fdevents after reset
*/
/* 遍历所有的监听fd并将其注册到fdevent系统中 */
for (i = 0; i < srv->srv_sockets.used; i++)
{
server_socket *srv_socket = srv->srv_sockets.ptr[i];
fdevent_register(srv->ev, srv_socket->fd, network_server_handle_fdevent, srv_socket);
fdevent_event_add(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN);
}
return 0;
}
~~~
在初始化网络的过程中,调用socket函数之后,将其返回值(监听fd)保存在server结构体的srv_sockets成员中,这个成员是一个server_socket_array结构体,而server_socket_array结构体是server_socket结构体的指针数组。
server_socket结构体定义如下:
~~~
typedef struct
{
sock_addr addr; //socket fd对应的的地址。
int fd; //socket()函数返回的监听fd
int fde_ndx; //和fd相同。
buffer *ssl_pemfile;
buffer *ssl_ca_file;
buffer *ssl_cipher_list;
unsigned short ssl_use_sslv2;
unsigned short use_ipv6; //标记是否使用ipv6
unsigned short is_ssl;
buffer *srv_token;
#ifdef USE_OPENSSL
SSL_CTX *ssl_ctx;
#endif
unsigned short is_proxy_ssl;
} server_socket;
~~~
fdevent_register()函数:
~~~
int fdevent_register(fdevents * ev, int fd, fdevent_handler handler, void *ctx)
{
/* 创建一个fdnode的实例,然后对其成员赋值 */
fdnode *fdn;
fdn = fdnode_init();
fdn->handler = handler;
fdn->fd = fd;
fdn->ctx = ctx;
/* 以fd为下标将实例存入fdevents结构体中的fdarray数组中。
* fd作为下标可以将查询时间变为 O(1)
*/
ev->fdarray[fd] = fdn;
return 0;
}
~~~
第三个参数是一个函数指针,其定义为
~~~
typedef handler_t(*fdevent_handler) (void *srv, void *ctx, int revents)
~~~
这个函数指针对应XXX_handle_fdevent()类型的函数。比如network.c/ network_server_handle_fdevent() ,connections.c/ connection_handle_fdevent()。
这些函数的作用是在fdevent系统检测到fd有IO事件发生时,处理这些IO事件。
比如,network_server_handle_fdevent()处理监听fd(socket函数的返回值)发生的IO事件;
connection_handle_fdevent()处理连接fd(accept函数的返回值)发生的IO事件。
除了上面的两个函数,还有
~~~
stat_cacahe.c/stat_cache_handle_fdevent(),
mod_cgi.c/cgi_handle_fdevent(),
mod_fastcgi.c/fcgi_handle_fdevent(),
mod_proxy.c/ proxy_handle_fdevent()和
mod_scgi.c/scgi_handle_fdevent()等。
~~~
fdevent_event_add函数:
~~~
int fdevent_event_add(fdevents * ev, int *fde_ndx, int fd, int events)
{
int fde = fde_ndx ? *fde_ndx : -1;
if (ev->event_add)
fde = ev->event_add(ev, fde, fd, events)
if (fde_ndx)
*fde_ndx = fde;
return 0;
}
~~~
函数中调用了fdevents结构体中event_add函数指针对应的函数。
我们看看fdevent_linux_sysepoll.c中的fdevent_linux_sysepoll_event_add()函数,这个函数的地址在初始化的时候被赋给fdevents中的event_add指针:
~~~
static int fdevent_linux_sysepoll_event_add(fdevents * ev, int fde_ndx, int fd, int events)
{
struct epoll_event ep;
int add = 0;
if (fde_ndx == -1) //描述符不在epoll的检测中,增加之。
add = 1;
memset(&ep, 0, sizeof(ep));
ep.events = 0;
/**
* 在ep中设置需要监听的IO事件。
* EPOLLIN : 描述符可读。
* EPOLLOUT :描述符可写。
* 其他的事件还有:EPOLLRDHUP , EPOLLPRI, EPOLLERR, EPOLLHUP, EPOLLET, EPOLLONESHOT等。
*/
if (events & FDEVENT_IN)
ep.events |= EPOLLIN;
if (events & FDEVENT_OUT)
ep.events |= EPOLLOUT;
/*
* EPOLLERR :描述符发生错误。
* EPOLLHUP :描述符被挂断。通常是连接断开。
*/
ep.events |= EPOLLERR | EPOLLHUP /* | EPOLLET */ ;
ep.data.ptr = NULL;
ep.data.fd = fd;
/*
* EPOLL_CTL_ADD : 增加描述符fd到ev->epoll_fd中,并关联ep中的事件到fd上。
* EPOLL_CTL_MOD : 修改fd所关联的事件。
*/
if (0 != epoll_ctl(ev->epoll_fd, add ?EPOLL_CTL_ADD : EPOLL_CTL_MOD, fd, &ep))
{
fprintf(stderr, "%s.%d: epoll_ctl failed: %s, dying\n",__FILE__,__LINE__, strerror(errno));
SEGFAULT();
return 0;
}
return fd;
}
~~~
函数的第四个参数events是一个整型,每一位对应一种IO事件。
在network_register_fdevents函数中,传给fdevent_event_add()函数的第四个参数是FDEVENT_IN,这是一个宏:
~~~
/*
* 用于标记文件描述符的状态
*/
#define FDEVENT_IN BV(0) //文件描述符是否可写
#define FDEVENT_PRI BV(1) //不阻塞的可读高优先级的数据 poll
#define FDEVENT_OUT BV(2) //文件描述符是否可读
#define FDEVENT_ERR BV(3) //文件描述符是否出错
#define FDEVENT_HUP BV(4) //已挂断 poll
#define FDEVENT_NVAL BV(5) //描述符不引用一打开文件 poll
~~~
其中BV也是一个宏,定义在settings.c文件中:
~~~
#define BV(x) (1 << x)
~~~
其作用就是将一个整数变量第x位置1,其余置0。
通过这些宏,就可以在一个整数中用不同的位表示不同的事件;
这些宏和epoll.h中的枚举EPOLL_EVENTS对应。
由于当有连接请求时,监听fd的表现是有数据可读,因此,只监听其FDEVENT_IN事件。注册之后,监听fd就开始等待连接请求。
~~~
//启动事件轮询。底层使用的是IO多路转接。
if ((n = fdevent_poll(srv->ev, 1000)) > 0)
{
/* n是事件的数量 */
int revents;
int fd_ndx = -1;
/* 逐个处理已经准备好的请求,直到所有的请求处理结束 */
do
{
fdevent_handler handler;
void *context;
handler_t r;
fd_ndx = fdevent_event_next_fdndx(srv->ev, fd_ndx); //获得发生了 I/O 事件的文件描述符在 fdarray 中的索引
revents = fdevent_event_get_revent(srv->ev, fd_ndx); //获得该文件描述符上发生的 I/O 事件类型
fd = fdevent_event_get_fd(srv->ev, fd_ndx); //获得该文件描述符
handler = fdevent_get_handler(srv->ev, fd); //获得 I/O 事件处理的回调函数
context = fdevent_get_context(srv->ev, fd); //获得 I/O 事件处理的上下文环境
/*
* connection_handle_fdevent needs a joblist_append
*/
/**
* 调用回调函数进行I/O事件处理,并传入相关参数
*/
switch (r = (*handler) (srv, context, revents))
{
case HANDLER_FINISHED:
case HANDLER_GO_ON:
case HANDLER_WAIT_FOR_EVENT:
case HANDLER_WAIT_FOR_FD:
break;
case HANDLER_ERROR:
SEGFAULT();
break;
default:
log_error_write(srv, __FILE__, __LINE__, "d", r);
break;
}
}while (--n > 0);
}
else if (n < 0 && errno != EINTR)
{
log_error_write(srv, __FILE__, __LINE__, "ss","fdevent_poll failed:", strerror(errno));
}
~~~
首先调用fdevent_poll()函数等待IO事件发生,如果没有IO事件,程序会阻塞在这个函数中。
如果有fd发生了IO事件,则从fdevent_poll函数中返回,返回值是发生了IO事件的fd的数量。
fdevent_poll()函数调用fdevents结构体中的poll,最终调用的是epoll_wait()函数。epoll_wait()函数将发生了IO事件的fd对应的epoll_evet结构体实例存储在fdevents结构体的epoll_events数组成员中。
fdevent_event_next_fdndx函数返回epoll_events数组中下一个元素的下标,fdevent_event_get_revent函数调用ev->event_get_revent()获得fd发生的IO事件,最终调用的是:
~~~
static int fdevent_linux_sysepoll_event_get_revent(fdevents * ev, size_t ndx)
{
int events = 0, e;
e = ev->epoll_events[ndx].events;
if (e & EPOLLIN)
events |= FDEVENT_IN;
if (e & EPOLLOUT)
events |= FDEVENT_OUT;
if (e & EPOLLERR)
events |= FDEVENT_ERR;
if (e & EPOLLHUP)
events |= FDEVENT_HUP;
if (e & EPOLLPRI) //有紧急数据到达(带外数据)
events |= FDEVENT_PRI;
return e;
}
~~~
这个函数就做了一个转换。
最后,在switch语句中调用fd对应的handler函数处理事件。对于监听fd,调用的函数为:
~~~
/**
* 这个是监听socket的IO事件处理函数。
* 主要工作是建立和客户端的socket连接。只处理读事件。在处理过程中,
* 每次调用这个函数都试图一次建立100个连接,这样可以提高效率。
*/
handler_t network_server_handle_fdevent(void *s, void *context, int revents)
{
server *srv = (server *) s;
server_socket *srv_socket = (server_socket *) context;
connection *con;
int loops = 0;
UNUSED(context);
/*
* 只有fd事件是FDEVENT_IN时,才进行事件处理。
*/
if (revents != FDEVENT_IN)
{
log_error_write(srv, __FILE__, __LINE__, "sdd", "strange event for server socket", srv_socket->fd, revents);
return HANDLER_ERROR;
}
/*
* accept()s at most 100 connections directly we jump out after 100 to give the waiting connections a chance
*一次监听fd的IO事件,表示有客户端请求连接,对其的处理就是建立连接。建立连接后并不急着退出函数,
* 而是继续尝试建立新连接,直到已经建立了100次连接。这样可以提高效率。
*/
for (loops = 0; loops < 100 && NULL != (con =connection_accept(srv, srv_socket)); loops++)
{
handler_t r;
//根据当前状态,改变con的状态机,并做出相应的动作。
connection_state_machine(srv, con);
switch (r = plugins_call_handle_joblist(srv, con))
{
case HANDLER_FINISHED:
case HANDLER_GO_ON:
break;
default:
log_error_write(srv, __FILE__, __LINE__, "d", r);
break;
}
}
return HANDLER_GO_ON;
}
~~~
监听fd有IO事件,表示有客户端请求连接,对其的处理就是建立连接。在这个函数中,建立连接后并不急着退出,而是继续【尝试】建立新连接,直到已经建立了100次连接。这样可以提高效率。
connection_accept()函数接受连接请求并返回一个connection结构体指针。接着对这个连接启动状态机。然后把连接加到作业队列中。
注意,在将监听fd注册到fdevent系统时,它被设置成了非阻塞的,因此,如果在调用accept()函数时没有连接请求,那么accept()函数会直接出错返回,这样connection_accept就返回一个NULL,退出了for循环。因此,这里所说的建立100次连接只是”尝试“而已,并不会在没有连接的时候阻塞。
至此,fdevent系统对于监听fd的处理就完成了一个循环。
处理完IO事件以后fd接着在epoll中等待下一次事件。