状态机之错误处理和连接关闭

最后更新于:2022-04-01 15:57:21

这里所说的错误有两种: 1.http协议规定的错误,如404错误; 2.服务器运行过程中的错误,如write错误。 对于http协议规定的错误,这里的“错误”是针对客户端的。lighttpd返回相应的错误提示文件之后,相当于顺利的完成了一次请求,只是结果和客户端想要的不一样而已。 对于服务器运行中的错误,状态机进入CON_STATE_ERROR状态。常见的错误原因:客户端提前断开连接。比如你不停的刷新页面,在你刷新的时候,前一次的连接没有完成,但被浏览器强行断开。对于服务器而言,刷新前后的两个连接是不相干的,服务器在接收后一个连接的时候仍然会继续处理前一次的连接。而前一次的连接已断开,这就产生了连接错误。 进入CON_STATE_ERROR状态后,如果前面的请求处理已经得到了结果。也就是http_status不为空。那么调用plugins_call_handle_request_done告诉插件请求处理结束: ~~~ /* even if the connection was drop we still have to write it to the access log */ if (con->http_status) { plugins_call_handle_request_done(srv, con); } ~~~ 如果使用了ssl,关闭ssl连接: ~~~ #ifdef USE_OPENSSL if (srv_sock->is_ssl) { /* 关闭ssl连接 */ } ERR_clear_error(); #endif ~~~ 接着: ~~~ switch(con->mode) { case DIRECT: #if 0 log_error_write(srv, __FILE__, __LINE__, "sd", "emergency exit: direct", con->fd); #endif break; default: switch(r = plugins_call_handle_connection_close(srv, con)) { case HANDLER_GO_ON: case HANDLER_FINISHED: break; default: log_error_write(srv, __FILE__, __LINE__, ""); break; } break; } connection_reset(srv, con); ~~~ 如果连接模式不是DIRECT,调用plugins_call_handle_connection_close告诉插件连接已经关闭。 如果设置了keep_alive,此时可能是服务器首先关闭连接的。调用shutdown关闭连接的读和写。如果关闭没有出错,状态机进入CON_STATE_CLOSE状态。如果没有设置keep_alive或者shutdown调用失败,那么直接关闭连接,结束状态机的运行。 ~~~ /* close the connection */ if ((con->keep_alive == 1) && (0 == shutdown(con->fd, SHUT_WR))) { con->close_timeout_ts = srv->cur_ts; connection_set_state(srv, con, CON_STATE_CLOSE); if (srv->srvconf.log_state_handling) { log_error_write(srv, __FILE__, __LINE__, "sd", "shutdown for fd", con->fd); } } else { connection_close(srv, con); } con->keep_alive = 0; srv->con_closed++; ~~~ 注意到,这里服务器主动关闭连接的时候用的是shutdown而不是close: 1.close使用引用计数,在计数为0时才关闭套接字;shutdown不管引用计数,直接激发TCP的正常连接终止序列; 2.close终止读和写两个方向的数据传送;shutdown可以指定只关闭连接的读,或只关闭连接的写,或两者均关闭。 以上lighttpd是关闭了连接的写这一半,对于TCP套接字来说,这叫做半关闭:当前留在套接字发送缓冲区的数据仍然可以发送,但是进程不能再对其调用写函数(由于读端没有关闭,所以服务器仍然可以读数据),当数据发送完毕之后,TCP连接终止。 另外,注意一下:con->close_timeout_ts = srv->cur_ts;将close_timeout_ts的值设置为当前时间,在下面会用到。 在CON_STATE_CLOSE阶段: ~~~ case CON_STATE_CLOSE: if (srv->srvconf.log_state_handling) { log_error_write(srv, __FILE__, __LINE__, "sds", "state for fd", con->fd, connection_get_state(con->state)); } if (con->keep_alive) { if (ioctl(con->fd, FIONREAD, &b)) { log_error_write(srv, __FILE__, __LINE__, "ss", "ioctl() failed", strerror(errno)); } if (b > 0) { char buf[1024]; log_error_write(srv, __FILE__, __LINE__, "sdd", "CLOSE-read()", con->fd, b); /* */ read(con->fd, buf, sizeof(buf)); } else { /* nothing to read */ con->close_timeout_ts = 0; } } else { con->close_timeout_ts = 0; } if (srv->cur_ts - con->close_timeout_ts > 1) { connection_close(srv, con); if (srv->srvconf.log_state_handling) { log_error_write(srv, __FILE__, __LINE__, "sd", "connection closed for fd", con->fd); } } break; ~~~ 如果缓冲区中还有数据,服务器会把数据读出来(然后丢弃),以腾出内存空间。 如果没有数据可读,那么设置close_timeout_ts=0,关闭连接。 如果有数据可读,读取数据之后,连接依然处在CON_STATE_CLOSE状态中(在出了CON_STATE_ERROR后,进入CON_STATE_CLOSE,这段时间cur_ts是没有改变的。如果有数据可读,此时const_time_ts是等于cur_ts的,因此连接并未被关闭),连接对应的fd被加入到fdevent系统中监听读事件。如果缓冲区中还有数据,那么在connection_handle_fdevent 函数中,也有上面这段代码,再次运行之,直到数据读完。随着close_timeout_ts被设置为0,在下次joblist的调度中,状态机将会关闭连接,清理所有资源。 至此,连接正式关闭。 关于状态机的简单解析就到此为止~
';

状态机之response

最后更新于:2022-04-01 15:57:19

在CON_STATE_RESPONSE_START状态中,服务器开始准备给客户端的response: ~~~ case CON_STATE_RESPONSE_START: /* * the decision is done * - create the HTTP-Response-Header * */ if (srv->srvconf.log_state_handling) { log_error_write(srv, __FILE__, __LINE__, "sds", "state for fd", con->fd, connection_get_state(con->state)); } if (-1 == connection_handle_write_prepare(srv, con)) { connection_set_state(srv, con, CON_STATE_ERROR); break; } connection_set_state(srv, con, CON_STATE_WRITE); break; ~~~ 可以看到,该状态主要调用了connection_handle_write_prepare函数,它根据客户端请求的method来设置response的headers(其实就是设置“Content-Length”的值)。 connection_handle_write_prepare函数的执行内容: 1.该函数首先判断连接的模式(mode)是否为DIRECT,如果是,说明连接没有经过插件处理,是由服务器自身处理的。 2.接着判断连接的请求method,如果是OPTION,则设置Allow的值,同时清空write_queue,因为没有数据需要返回。设置con->file_finished为1,表示不需要给客户端发送文件。 3.比较http_status的值,如果为204,205,304,说明服务器不需要给客户端返回文件,仅返回 response中headers及其之前的部分,设置con->file_finished为1。 4.判断file_finished的值。如果值为1,说明不需要给客户端返回文件数据。对于1xx,204和 304状态,将Content-Length设置为空值。如果method是HEAD,那么服务器可能需要返回一些数据,这时候要设置对应的 Content-Length。如果file_finished的值为0,那么要设置一下keep_alive的值。 5.最后,调用http_response_write_header将 headers写入write_queue,等待返回给客户端。如果一切顺利,状态机进入CON_STATE_WRITE状态。 下面是CON_STATE_WRITE分支的操作: ~~~ case CON_STATE_WRITE: if (srv->srvconf.log_state_handling) { log_error_write(srv, __FILE__, __LINE__, "sds", "state for fd", con->fd, connection_get_state(con->state)); } /* only try to write if we have something in the queue */ if (!chunkqueue_is_empty(con->write_queue)) { #if 0 log_error_write(srv, __FILE__, __LINE__, "dsd", con->fd, "packets to write:", con->write_queue->used); #endif } if (!chunkqueue_is_empty(con->write_queue) && con->is_writable) { if (-1 == connection_handle_write(srv, con)) { log_error_write(srv, __FILE__, __LINE__, "ds", con->fd, "handle write failed."); connection_set_state(srv, con, CON_STATE_ERROR); } else if (con->state == CON_STATE_WRITE) { con->write_request_ts = srv->cur_ts; } } break; ~~~ 由于数据可能不会一次写完,所以在CON_STATE_WRITE状态中,首先判断write_queue是否为空,即有没有数据需要发送。同时判断连接是否可写。如果有数据且可写,那么调用connection_handle_write发送数据。如果没有数据可写或者连接不可写,那么跳出switch(con->state)这个语句。 由于状态机状态没有发生改变,switch后面的if语句使得服务器退出了大while循环,进入循环后面的小switch(con->state)语句(如果连接的状态没有改变,说明连接读写数据还没有结束,但是需要等待IO事件)。在这里,进入 CON_STATE_WRITE分支,如果有数据可写且连接可写且没有达到流量限制,那么在fdevent中注册这个连接,等IO结束后继续写数据,否则,删除这个连接。 当有数据可写且连接可写时,进入connection_handle_write函数,下面看看该函数的作用: 1.首先调用network_write_chunkqueue函数,将write_queue中的数据写回给客户端。函数network_write_chunkqueue首先判断当前连接的流量是否超过了限制,如果是,则不发送任何数据,直接将连接加到作业列表(joblist)中,让其他连接发送数据。如果没有超限,那么首先设置TCP_CORK选项。这个选项可以将多个write调用的数据一起发送, 提高发送效率。 2.接下来调用srv->network_backend_wirte()函数真正的写数据。这个函数的定义有多个,在network_*.c文件中。服务器在network.c的network_init函数中会根据当前的运行环境设置不同的值。传统的IO方法先read再write,需要4次数据拷贝(从磁盘到内核缓冲区,从内核缓冲区到用户缓冲区,从用户缓冲区到网络接口的内核缓冲区,最后,从网络接口的内核缓冲区到网络设备缓冲区),为提高服务器的效率,不同的OS会提供一些特定的方法来减少拷贝的次数(直接IO),提高发送文件的速度,lighttpd根据不同的OS去调用特定的接口来实现network_backend_wirte()函数。这些实现大同小异,以network_write.c中的实现为例: 函数的主体是个大循环,遍历所有的chunk。 如果chunk的类型是 MEM_CHUNK,那么这个chunk中的数据是在内存中的,直接调用write或者windows下的send函数发送数据。 如果是 FILE_CHUNK类型,说明这个chunk表示的是一个文件,那么如果运行环境有mmap函数,就使用mmap映射文件并发送,否则就read再write。 如果这个chunk发送完了,则继续发送下一个chunk。 如果没有发送完(chunk_finished=0),退出循环,接着退出了这个函数。 3.服务器返回到network_write_chunkqueue中,做一些统计工作,再一次检查该连接的流量是否超限。 4.最后服务器返回到connection_handle_write中。 如果network_write_chunkqueue返回-1,表示服务器出错。状态机进 入CON_STATE_ERROR。 如果返回-2,则客户端关闭连接,状态机也进入CON_STATE_ERROR。 返回0表示发送完毕,进入下一个状态。 返回1说明数据没有发送完,标记is_wirtable为0。 5.从connection_handler_write函数返回后,如果数据没有发送完毕,那么状态机还在CON_STATE_WRITE状态,接着连接被加到fdevent系统中,等待下一次数据发送。重复上述过程直到发送完毕或出错。 6.如果数据发送完毕,状态机进入CON_STATE_RESPONSE_END状态。 在状态CON_STATE_RESPONSE_END中: 1.服务器首先调用 plugins_call_handle_request_done通知所有插件连接处理完毕。 2.判断是否保持连接,如果保持,将状态机设置为 CON_STATE_REQUEST_START。如果不保持,先调用plugins_call_handle_connection_close通知所有插件连接关闭,然后关闭连接。 3.重置con,清除前一次请求的数据。    至此,请求处理结束。
';

状态机之请求处理

最后更新于:2022-04-01 15:57:17

lighttpd请求处理的过程: 1.服务器与客户端建立连接后,连接进入CON_STATE_REQUEST_START状态,服务器做一些标记,如连接开始的时间等。 2.连接进入CON_STATE_READ状态,服务器从连接读取HTTP头并存放在con->requeset.request中。若一次调用没能读取全部数据,连接的状态将继续为READ,继续等待剩下的数据可读。 在对joblist进行处理的时候,依然会调用connecion_handle_read_state函数进行处理,函数中通过con->is_readable来判断是否有数据可读,如果没有,则只是处理一下以前已经读取的数据。 3.数据读取完之后,连接进入CON_STATE_REQUEST_END状态。 4.在REQUEST_END阶段,调用http_request_parse函数解析request请求。 函数首先解析Request line,解析出来的结果存放在con->request.http_method, con->request.http_version和con->request.uri中(前两个变量都是枚举类型,后一个是个buffer)。 解析完request line后,开始分析header lines。找到一个header field name后,就和所有已经定义的field name比较,看看是哪个。确定之后,就将field name和value保存到con->request.headers中。request.headers是一个array类型变量,存放的是data_string类型数据。其中,data_string的key是filed name,value就是field的成员。 5.解析完之后判断此次连接是否有POST数据,有则读取POST数据,否则进入HANDLE_REQUEST状态。 6.如果有POST数据要读,连接进入READ_POST状态。 READ_POST状态的处理和READ状态类似。 在connection_state_mechine函数中,这两个switch分支一样。在connection_handle_read_state函数中,前半部分读取数据是一样的,后面处理数据时才分开。 对于POST数据,由于数据可能很大,这时候可能会用到临时文件来存储。在程序中,作者对于小于64k的数据,直接存储在buffer中,大于64k则存储在临时文件中。在向临时文件写数据时,每个临时文件只写1M的数据。数据大于1M就再打开一个临时文件。 POST数据保存在con->requeset_content_queue,这是一个chunkqueue类型的成员变量,它是chunk结构体的链表: ~~~ typedef struct { chunk *first; chunk *last; /** * 这个unused的chunk是一个栈的形式。 * 这个指针指向栈顶,而chunk中的next指针则将栈连 接起来。 */ chunk *unused; size_t unused_chunks; array *tempdirs; off_t bytes_in, bytes_out; } chunkqueue; ~~~ unused成员是栈形式的链表,unused指向栈顶。它用来存储不用的chunk结构体,如果需要chunk,则先从这个栈中查找有无空闲的。如果chunk不使用了,可以加到栈顶。这样可以减少内存分配的时间,提高程序的效率。unused_chunks标记栈中有多少数据。 chunk的定义: ~~~ typedef struct chunk { enum { UNUSED_CHUNK, MEM_CHUNK, FILE_CHUNK } type; /* 内存中的存储块或预读缓存 */ buffer *mem; /* either the storage of the mem-chunk or the read-ahead buffer */ struct { /* * filechunk 文件块 */ buffer *name;/* name of the file */ off_t start;/* starting offset in the file */ off_t length;/* octets to send from the starting offset */ int fd; struct { char *start;/* the start pointer of the mmap'ed area */ size_t length;/* size of the mmap'ed area */ off_t offset; /* start is <n> octet away from the start of the file */ } mmap; int is_temp;/* file is temporary and will be deleted if on cleanup */ } file; off_t offset;/* octets sent from this chunk the size of the * chunk is either -mem-chunk: mem->used - 1 file-chunk: file.length */ struct chunk *next; } chunk; ~~~ chunk用来表述一块存储空间。这个存储空间可能在内存中,也可能在文件中。 type成员标记这个块是内存的还是文件的。 mem成员指向内存中的存储空间(实际上是一个buffer)。 file结构体则表示在文件中的存储空间(程序首先使用mmap函数将文件映射到内存中,mmap结构体的start成员保存映射到内存中的地址,于是对于文件的操作就可以像内存一样)。 7.读取完数据之后,进入HANDLE_REQUEST状态,此时请求已经解析完毕,本状态需要决定如何处理请求。 该状态调用http_response_prepare函数,然后根据函数的返回值进行相应的处理(http_response_prepare函数定义在response.c文件中,函数中调用了很多plugins_call_handle_xxxx函数,插件系统的接口函数主要是在这个函数中调用,这个函数也是服务器和插件系统交互的地方)。 在http_response_prepare函数中,通过对url的解析,逐步的调用插件来处理。对url解析的结果存放在con->uri中: ~~~ typedef struct { buffer *scheme; //http , https and so on buffer *authority; //user:password buffer *path; //www.xxx.com/xxx/xxxx.html buffer *path_raw; //www.xxx.com buffer *query; //key1=data1&key2=data2 } request_uri; ~~~ uri的定义为:(scheme)://(authority)(path)?(query)#fragment。 如: [http://user:passwd@www.google.com/pages/index.html?key1=data1&key2=data2#frag](http://user:passwd@www.google.com/pages/index.html?key1=data1&key2=data2#frag) 解析之后: ~~~ scheme = http authority = user:passwd path = www.google.com/pages/index.html path_raw = 未进行解码的path query = key1=data1&key2=date2 ~~~ 注意,在浏览器向服务器发送url请求的时候,会对其中的保留字符和不安全字符进行编码(参见RFC2396),比如汉字。编码的形式是% HEX HEX,即一个%加两个十六进制数。服务器在接到请求之后,要对这些编码过的字符进行解码。path_raw中保存的是还没解码的url,path保存的是已解码的url。 对fragment服务器直接抛弃,因为fragment是浏览器使用的。 当解析出url中的path之后,服务器调用插件的plugins_call_handle_uri_raw函数,插件根据未解码的url path进行处理。 如果没有插件进行处理,服务器调用插件的plugins_call_handle_uri_clean函数,它根据解码过的url path进行处理。 在这里,服务器根据解析出来的url地址直接将请求转发给插件,而不需要自己对请求进行处理。 当请求仍然没有被处理时,说明这个请求必须要在这被处理。服务器调用插件的plugins_call_handle_docroot函数对处理请求时的根目录进行设置。对于不同种类的资源,可以设置不同的根目录,提供一个虚拟服务器。接着,服务器根据根目录和请求的url地址,拼接出资源在本机上对应的物理地址。比如,doc root = /abc/root, url path = /doc/index.html,得到的物理地址就是/abc/root/doc/index.html。然后服务器调用插件的plugins_call_handle_physical函数,根据得到的物理地址进行相应的处理。最后,服务器调用插件的plugins_call_handle_subrequest_start函数和plugins_call_handle_subrequest函数进行最后的处理。 8.连接进入CON_STATE_RESPONSE_START状态,服务器准备给客户端的response,包括准备response头和写数据。 参考: [http://www.cnblogs.com/kernel_hcy/archive/2010/04/07/1706587.html](http://www.cnblogs.com/kernel_hcy/archive/2010/04/07/1706587.html)
';

状态机与插件

最后更新于:2022-04-01 15:57:14

Lighttpd启动时完成了一系列初始化操作后,就进入了一个包含11个状态的有限状态机中。 每个连接都是一个connection实例(con),状态的切换取决于con->state。 lighttpd经过初步处理后将con的基本信息初始化,而插件对事件的处理就是针对con进行的,它拿到con后按照业务需要进行相应处理,然后再交还给lighttpd,lighttpd根据con中的信息完成响应。 状态定义如下: ~~~ typedef enum { CON_STATE_CONNECT, //connect 连接开始 CON_STATE_REQUEST_START, //reqstart 开始读取请求 CON_STATE_READ, //read 读取并解析请求 CON_STATE_REQUEST_END, //reqend 读取请求结束 CON_STATE_READ_POST, //readpost 读取post数据 CON_STATE_HANDLE_REQUEST, //handelreq 处理请求 CON_STATE_RESPONSE_START, //respstart 开始回复 CON_STATE_WRITE, //write 回复写数据 CON_STATE_RESPONSE_END, //respend 回复结束 CON_STATE_ERROR, //error 出错 CON_STATE_CLOSE //close 连接关闭 } connection_state_t; ~~~ 下面就是lighttpd的状态机: ![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-25_56ceaefa9d729.jpg "") 在每个连接中都会保存这样一个状态机,用以表示当前连接的状态。 在连接建立以后,在connections.c/connection_accpet()函数中,lighttpd调用connection_set_state()函数,将新建立的连接的状态设置为CON_STATE_REQUEST_START。在这个状态中,lighttpd记录连接建立的时间等信息。 整个状态机的核心函数是connections.c/ connection_state_machine()函数。 函数的主体部分删减之后如下: ~~~ int connection_state_machine(server * srv, connection * con) { int done = 0, r; while (done == 0) { size_t ostate = con -> state; int b; //根据当前状态机的状态进行相应的处理和状态转换。 switch (con->state) { case CON_STATE_REQUEST_START: /* transient */ //do something case CON_STATE_REQUEST_END: /* transient */ //do something case CON_STATE_HANDLE_REQUEST: //do something case CON_STATE_RESPONSE_START: //do something case CON_STATE_RESPONSE_END: /* transient */ //do something case CON_STATE_CONNECT: //do something case CON_STATE_CLOSE: //do something case CON_STATE_READ_POST: //do something case CON_STATE_READ: //do something case CON_STATE_WRITE: //do something case CON_STATE_ERROR: /* transient */ //do something default: //do something break; }//end of switch(con -> state) ... if (done == -1) { done = 0; } else if (ostate == con->state) { done = 1; } } /* something else */ /* 将fd加入到fdevent系统中,等待IO事件。 * 当有数据可读的时候,在main函数中,lighttpd调用这个fd对应的handle函数, * 这里就是connection_handle_fdevent()函数。 * 这个函数一开始将连接加入到了joblist(作业队列)中。 */ switch (con->state) { case CON_STATE_READ_POST: case CON_STATE_READ: case CON_STATE_CLOSE: fdevent_event_add(srv->ev, &(con->fde_ndx), con->fd, FDEVENT_IN); break; case CON_STATE_WRITE: /* request write-fdevent only if we really need it * - if we have data to write * - if the socket is not writable yet */ if (!chunkqueue_is_empty(con->write_queue) && (con->is_writable == 0)&& (con->traffic_limit_reached == 0)) { fdevent_event_add(srv->ev, &(con->fde_ndx), con->fd, FDEVENT_OUT); } else { fdevent_event_del(srv->ev, &(con->fde_ndx), con->fd); } break; default: fdevent_event_del(srv->ev, &(con->fde_ndx), con->fd); break; } return 0; } ~~~ 这个函数首先根据当前的状态进入对应的switch分支执行相应的动作,然后根据情况进入下一个状态。 跳出switch语句之后,如果连接的状态没有改变,说明连接读写数据还没有结束,但是需要等待IO事件,这时跳出循环,等待IO事件。 如果在处理的过程中不需要等待IO事件,那么在while循环中,连接将被处理完毕并关闭。 在我们的main函数中,之前讨论过,在一个while循环中,处理超时,处理IO时间,之后有下面这段代码: ~~~ for (ndx = 0; ndx < srv->joblist->used; ndx++) { connection *con = srv->joblist->ptr[ndx]; handler_t r; connection_state_machine(srv, con); switch(r = plugins_call_handle_joblist(srv, con)) { case HANDLER_FINISHED: case HANDLER_GO_ON: break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } con->in_joblist = 0; } ~~~ 这段代码对joblist中的所有连接依次调用connection_state_machine()函数进行处理。 下面说明下各状态的主要内容: ~~~ CON_STATE_CONNECT 清除待读取队列中的数据-chunkqueue_reset(con->read_queue); 置con->request_count = 0。(本次连接还未处理过请求) CON_STATE_REQUEST_START /*transient */ 记录事件起始时间; con->request_count++(一次长连接最多可以处理的请求数量是有限制的); 转移到CON_STATE_READ状态。 CON_STATE_READ和CON_STATE_READ_POST connection_handle_read_state(srv,con); CON_STATE_REQUEST_END /*transient */ http_request_parse(srv, con); 解析请求,若是POST请求则转移到CON_STATE_READ_POST状态, 否则转移到CON_STATE_HANDLE_REQUEST状态。 CON_STATE_HANDLE_REQUEST http_response_prepare(srv, con); 函数中调用 handle_uri_raw; handle_uri_clean; handle_docroot; handle_physical; handle_subrequest_start; handle_subrequest。 如果函数返回了HANDLER_FINISHED,且con->mode!=DIRECT(事件已经被我们的业务插件接管), 则直接进入CON_STATE_RESPONSE_START。 否则lighttpd会做一些处理后再进入CON_STATE_RESPONSE_START状态。 如果函数返回了HANDLER_WAIT_FOR_FD或 HANDLER_WAIT_FOR_EVENT, 状态依旧会停留在CON_STATE_HANDLE_REQUEST,等待事件或数据。 如果函数返回了HANDLER_ERROR,进入到CON_STATE_ERROR状态。 CON_STATE_RESPONSE_START connection_handle_write_prepare(srv,con); CON_STATE_WRITE connection_handle_write(srv,con); CON_STATE_RESPONSE_END 调用插件的handle_request_done接口。 如果是长连接,重新回到CON_STATE_REQUEST_START;否则调用插件的handle_connection_close接口。 执行connection_close(srv, con);和connection_reset(srv, con);将连接关闭。 CON_STATE_ERROR /* transient */ 调用插件handle_request_done; 调用插件handle_connection_close; 执行connection_close将连接关闭。 CON_STATE_CLOSE connection_close(srv, con);将连接关闭。 ~~~ 以上是状态机的概况。 总览了状态机,我们知道状态机会针对相应的阶段对事件进行处理,那么状态机是如何处理这些事件的? 事实上,对于事件的处理,一部分是由lighttpd完成的,而一部分是由插件完成的。插件中那些负责事件处理的接口分布在某几个状态中。我们只需在插件的各个阶段完成指定工作并返回相应的返回值,就可以促使状态机完成状态切换,完成事件的整套处理流程,并最终由lighttpd完成事件的响应。 在插件中,我们可以编写代码来注册lighttpd提供的回调接口,lighttpd在初始化阶段、状态机执行阶段、退出阶段会分别调用这些回调函数,完成插件的实例化,初始化,连接重置,事件处理,插件释放等功能。 要了解lighttpd对插件的调用方式,需要明白一个概念:事件接管。 对于每个事件,都有一个mode字段(con->mode)。该字段的定义: ~~~ typedef enum { DIRECT, EXTERNAL } connection_type; ~~~ 连接对象有一个字段mode用来标识该连接是最初由服务器accept产生的客户端连接还是插件产生的其他辅助连接,当mode=DIRECT时表示对应连接由lighttpd服务器accept产生,mode!=DIRECT时表示对应连接是由插件产生的。 事件(con)初始化时mode是DIRECT;connection_reset(srv,con); lighttpd在大部分流程中会在入口检查到mode != DIRECT时直接返回GO_ON。即:此事件由用户插件接管,lighttpd不参与。 用户编写的插件应通过将mode置为插件自身的ID达到接管的作用。插件ID是在插件加载时由插件的加载顺序确定的,是插件的唯一标识。 用户编写插件在每个接口的一开始应该判断mode是否等于自身的ID,若相等才能继续执行,否则直接退出,返回GO_ON。 了解了以上概念之后,我们就可以理解lighttpd对插件的调用方式了: 在lighttpd需要调用插件某一个阶段的接口函数时,会对所有插件注册在该处的接口顺序调用,顺序与插件加载顺序相同。例如:调用uri_raw接口,会先调用A插件的mod_A_uri_raw,然后调用B插件的mod_B_uri_raw,直到将所有已加载插件这个位置的接口全部调用完成。但实际处理这次事件通常只有一个插件,即插件ID与mode相同的那个插件。 因此,假设在CON_STATE_HANDLE_REQUEST状态,lighttpd调用了插件的handle_uri_raw接口,但是我们有多个插件,每个插件都注册了handle_uri_raw这个接口,lighttpd也能辨别出要使用哪个插件。 如果插件在处理事件的过程中,想让lighttpd接管,还需要把mode置为DIRECT才行。 以上是lighttpd状态机和插件的总览概况。
';

fdevent系统-连接socket及超时处理

最后更新于:2022-04-01 15:57:12

上一篇我们探讨了lighttpd对监听socket的处理,这次我们看看连接socket的处理,以及相关超时的处理。 lighttpd和客户端建立连接的过程: 1.lighttpd检测监听socket的IO事件,如果有可读事件发生,那么表示有新的连接请求,于是调用network.c/network_server_handle_fdevent()来处理连接请求。 2.network_server_handle_fdevent()函数调用connections.c/connection_accept() 接受客户端的请求,建立连接,得到连接socket的fd,也就是accept函数的返回值。 3.建立连接后,这个连接对应的状态机状态被设置为CON_STATE_REQUEST_START,即开始读取客户端发过来的request。 4.从connection_accept函数返回到network_server_handle_fdevent()函数的for循环中后,程序调用connection_state_machine()函数,这个函数是根据当前连接的状态机状态来设置状态机的下一个状态,CON_STATE_REQUEST_START的下一个状态是CON_STATE_READ,这个状态表示连接正在读取客户端发送的数据。 5.当连接的状态机被设置成CON_STATE_READ后,在connection_state_machine()函数中有这样一个switch语句: ~~~ switch (con->state) { case CON_STATE_READ_POST: case CON_STATE_READ: case CON_STATE_CLOSE: fdevent_event_add(srv->ev, &(con->fde_ndx), con->fd, FDEVENT_IN); break; case CON_STATE_WRITE: /* * request write-fdevent only if we really need it * - if we have data to write * - if the socket is not writable yet */ if (!chunkqueue_is_empty(con->write_queue) && (con->is_writable == 0) &&(con->traffic_limit_reached == 0)) { fdevent_event_add(srv->ev, &(con->fde_ndx), con->fd, FDEVENT_OUT); } else { fdevent_event_del(srv->ev, &(con->fde_ndx), con->fd); } break; default: fdevent_event_del(srv->ev, &(con->fde_ndx), con->fd); break; } ~~~ 它将状态处在CON_STATE_READ_POST,CON_STATE_READ和CON_STATE_CLOSE的连接对应的连接socket fd加入到fdevent系统中,并监听【可读】事件。将处CON_STATE_WRITE状态且有数据要写的连接对应的socket fd加入到fdevent系统中,并监听【可写】事件。其他状态的连接则把对应的fd从fdevent系统中删除,因为这些连接不会有IO事件发生。 这样,连接socket fd就被加入到了fdevent系统中,之后等待IO事件的发生,这一部分在上一篇已经说明过了: ~~~ //启动事件轮询。底层使用的是IO多路转接。 if ((n = fdevent_poll(srv->ev, 1000)) > 0) { /* n是事件的数量 */ int revents; int fd_ndx = -1; /* 逐个处理已经准备好的请求,直到所有的请求处理结束 */ do { fdevent_handler handler; void *context; handler_t r; fd_ndx = fdevent_event_next_fdndx(srv->ev, fd_ndx); //获得发生了 I/O 事件的文件描述符在 fdarray 中的索引 revents = fdevent_event_get_revent(srv->ev, fd_ndx); //获得该文件描述符上发生的 I/O 事件类型 fd = fdevent_event_get_fd(srv->ev, fd_ndx); //获得该文件描述符 handler = fdevent_get_handler(srv->ev, fd); //获得 I/O 事件处理的回调函数 context = fdevent_get_context(srv->ev, fd); //获得 I/O 事件处理的上下文环境 /* * connection_handle_fdevent needs a joblist_append */ /** * 调用回调函数进行I/O事件处理,并传入相关参数 */ switch (r = (*handler) (srv, context, revents)) { case HANDLER_FINISHED: case HANDLER_GO_ON: case HANDLER_WAIT_FOR_EVENT: case HANDLER_WAIT_FOR_FD: break; case HANDLER_ERROR: SEGFAULT(); break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } }while (--n > 0); } else if (n < 0 && errno != EINTR) { log_error_write(srv, __FILE__, __LINE__, "ss","fdevent_poll failed:", strerror(errno)); } ~~~ 连接fd对应的处理函数是connections.c/connection_handle_fdevent()函数: ~~~ handler_t connection_handle_fdevent(void *s, void *context,int revents) { server *srv = (server *) s; connection *con = context; //把这个连接加到作业队列中。 joblist_append(srv, con); if (revents & FDEVENT_IN) { con->is_readable = 1; } if (revents & FDEVENT_OUT) { con->is_writable = 1; /* * we don't need the event twice */ } if (revents & ~(FDEVENT_IN | FDEVENT_OUT)) { /* * looks like an error 即可读又可写,可能是一个错误。 */ /* * FIXME: revents = 0x19 still means that we should read from the queue */ if (revents & FDEVENT_HUP) { if (con->state == CON_STATE_CLOSE) { con->close_timeout_ts = 0; } else { /* * sigio reports the wrong event here there was no HUP at all */ connection_set_state(srv, con, CON_STATE_ERROR); } } else if (revents & FDEVENT_ERR) { connection_set_state(srv, con, CON_STATE_ERROR); } else { log_error_write(srv, __FILE__, __LINE__, "sd","connection closed: poll() -> ???", revents); } } if (con->state == CON_STATE_READ|| con->state == CON_STATE_READ_POST) { connection_handle_read_state(srv, con); //继续读取数据,直到数据读取完毕 } // 数据的写回并没有放给状态机去处理。 if (con->state == CON_STATE_WRITE&& !chunkqueue_is_empty(con->write_queue) && con->is_writable) { if (-1 == connection_handle_write(srv, con)) { connection_set_state(srv, con, CON_STATE_ERROR); log_error_write(srv, __FILE__, __LINE__, "ds", con->fd,"handle write failed."); } else if (con->state == CON_STATE_WRITE) { //写数据出错,记录当前时间,用来判断连接超时。 con->write_request_ts = srv->cur_ts; } } if (con->state == CON_STATE_CLOSE) { /* * flush the read buffers 清空缓冲区中的数据。 */ int b; //获取缓冲区中数据的字节数 if (ioctl(con->fd, FIONREAD, &b)) { log_error_write(srv, __FILE__, __LINE__, "ss","ioctl() failed", strerror(errno)); } if (b > 0) { char buf[1024]; log_error_write(srv, __FILE__, __LINE__, "sdd","CLOSE-read()", con->fd, b); //将缓冲区中的数据读取后并丢弃,此时连接已经关闭,数据是无用数据。 read(con->fd, buf, sizeof(buf)); } else { /* * nothing to read 缓冲区中没有数据。复位连接关闭超时计时。 */ con->close_timeout_ts = 0; } } return HANDLER_FINISHED; } ~~~ connection_handle_fdevent()函数根据当前连接fd所发生的IO事件,对connection结构体中的标记变量赋值,如is_writable,is_readable等,并做一些时间的记录。这些事件所对应的【真正的IO处理则交给状态机处理】。状态机根据这些标记变量进行相应的动作处理。 下面的图简要的描述了fdevent系统对连接fd和监听fd的处理: ![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-25_56ceaefa7b523.jpg "") 接下来简单地看下连接超时的处理。 连接超时有三种:读数据超时,写数据超时和关闭超时。 处理超时的代码在server.c中的main函数woker进程开始部分: ~~~ /* main-loop */ while (!srv_shutdown) { //只要srv_shutdown不为1,工作进程持续执行 int n; size_t ndx; time_t min_ts; /* 处理HUP信号,代码省略 */ /* 处理ALARM信号 */ if (handle_sig_alarm) { /* a new second */ #ifdef USE_ALARM /* reset notification */ handle_sig_alarm = 0; #endif /* get current time */ min_ts = time(NULL); /* 比较服务器记录的时间和当前时间 * 如果值不一样,说明已经过了1s */ if (min_ts != srv->cur_ts) { int cs = 0; connections *conns = srv->conns; handler_t r; switch(r = plugins_call_handle_trigger(srv)) { //调用plugins_call_handle_trigger来处理各个模块的ALARM信号处理函数 case HANDLER_GO_ON: break; case HANDLER_ERROR: log_error_write(srv, __FILE__, __LINE__, "s", "one of the triggers failed"); break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } /* trigger waitpid */ srv->cur_ts = min_ts; //更新服务器记录时间 /* cleanup stat-cache */ stat_cache_trigger_cleanup(srv); //清除缓存,删除一些比较旧的节点 /** * check all connections for timeouts * */ for (ndx = 0; ndx < conns->used; ndx++) { //处理超时连接 int changed = 0; connection *con; int t_diff; con = conns->ptr[ndx]; if (con->state == CON_STATE_READ || con->state == CON_STATE_READ_POST) { //连接状态是读 if (con->request_count == 1) { //处理一个请求 if (srv->cur_ts - con->read_idle_ts > con->conf.max_read_idle) { /* time - out */ #if 0 log_error_write(srv, __FILE__, __LINE__, "sd", "connection closed - read-timeout:", con->fd); #endif connection_set_state(srv, con, CON_STATE_ERROR); //调用connection_set_state进行状态机的状态转换 changed = 1; } } else { //连接同时处理多个请求 if (srv->cur_ts - con->read_idle_ts > con->conf.max_keep_alive_idle) { /* time - out */ #if 0 log_error_write(srv, __FILE__, __LINE__, "sd", "connection closed - read-timeout:", con->fd); #endif connection_set_state(srv, con, CON_STATE_ERROR); changed = 1; } } } if ((con->state == CON_STATE_WRITE) && (con->write_request_ts != 0)) { //连接状态是写 #if 0 if (srv->cur_ts - con->write_request_ts > 60) { log_error_write(srv, __FILE__, __LINE__, "sdd", "connection closed - pre-write-request-timeout:", con->fd, srv->cur_ts - con->write_request_ts); } #endif if (srv->cur_ts - con->write_request_ts > con->conf.max_write_idle) { /* time - out */ #if 1 log_error_write(srv, __FILE__, __LINE__, "sbsosds", "NOTE: a request for", con->request.uri, "timed out after writing", con->bytes_written, "bytes. We waited", (int)con->conf.max_write_idle, "seconds. If this a problem increase server.max-write-idle"); #endif connection_set_state(srv, con, CON_STATE_ERROR); changed = 1; } } /* we don't like div by zero */ if (0 == (t_diff = srv->cur_ts - con->connection_start)) t_diff = 1; /* 处理传输速度限制 * 如果某一时刻平均传输速度达到了用户设置的最大值,则停止发送数据(con->traffic_limit_reached将被设为1, * 进入下面if中处理)。只要检测到平均传输速度小于用户设置的最大值就继续发送数据, * 则满足if的条件,con->traffic_limit_reached设为 0,同时调用状态机切换函数。 * 代码省略 * / } } /* 根据当前的资源利用情况禁用或启用 server sockets 服务,代码省略 */ /* something else,代码省略*/ /* 轮询 I/O 事件的发生, * 其中等待 I/O 事件发生的超时值为1秒。 * 代码省略 */ /* 进行其他处理,之后while一次循环完成 */ } ~~~ 为清晰地看超时处理部分的代码,我把一些无关的代码略去了。 总结如下: ~~~ while (未收到终止信号) { if (收到HUP) 处理HUP信号 if (handle_sig_alarm标识为1) { 获取当前时间; if (当前时间 != 服务器时间) { 调用各模块的超时处理函数; 更新服务器记录时间; for (每个连接) 处理超时; } } 根据当前的资源利用情况禁用或启用 server sockets 服务; 轮询 I/O 事件的发生; 对每一个发生的 I/O 事件进行处理; } ~~~ 可以看到,作者通过当前时间和服务器记录的当前时间来判断时间是否过了一秒。如果两个时间不一样,那么时间就过了一秒,子进程每循环一次都要比较服务器记录的时间和当前时间,直到两个时间不一样为止。 也就是说,作者好像并没有使用SIGALRM信号来判断超时,从代码中我们可以看到,关于SIGALRM的使用,只有在定义了USE_ALARM之后才生效: ~~~ #ifdef USE_ALARM /* reset notification */ handle_sig_alarm = 0; #endif ~~~ 然而我们查找USE_ALARM,发现事实上该标识没有被定义(而是被注释掉了): ~~~ #ifndef __sgi /* IRIX doesn't like the alarm based time() optimization */ /* #define USE_ALARM */ #endif ~~~ 所以handle_sig_alarm一直为1,即作者这里并不使用SIGALRM信号,这样可以减少很多信号处理,降低程序的复杂度。但是每次循环程序都要轮询一次,可能会影响效率(实际上效果如何,需要大家试一下哈~) 在处理程序中,lighttpd通过比较read_idle_ts,write_request_ts和当前时间的差值来判断连接是否读超时或写超时。如果这两个差值分别大于max_read_idle和max_write_idle则表示超时。如果一个连接正在处理多个请求时,读超时是和max_keep_alive_idle比较。这些上限值在配置中设置。 对于read_idle_ts,在连接进入CON_STATE_REQUEST_START状态时,记录了当前时间。如果连接长时间没有去读取request请求,则也表示连接超时。当连接开始读数据时,read_idle_ts记录开始读数据的时间。 对于write_request_ts,在处理CON_STATE_WRITE状态时,有对其赋值的语句。在connection_handle_fdevent函数中也有。事实上,只有在调用connection_handle_write函数出错并且连接处在CON_STATE_WRITE状态时,记录当前时间。 可见,lighttpd对读和写的超时处理是不一样的。对于读,设定了最长时间,不管读多少数据,一旦时间超了就算超时。而对于写,只有在写出错的时候才开始计算超时。如果没有出错,那么写数据花再多的时间也不算超时。(可能出现上传到一半就超时的问题,但是在绝大多数情况下,上传数据都是很小的,而下载的数据往往很多,因此,这样处理可以提高效率,如果需要上传大量数据,可以修改配置中的超时限制)。 fdevent就分析到这里!~ 学习内容参考自: [http://www.cnblogs.com/kernel_hcy/archive/2010/03/22/1691951.html](http://www.cnblogs.com/kernel_hcy/archive/2010/03/22/1691951.html)
';

fdevent系统-事件处理

最后更新于:2022-04-01 15:57:10

我们仍然从server.c的main函数开始。 当完成fdevent的初始化之后,第一个需要fdevent处理的事情就是将在初始化网络的过程中得到的监听fd(socket函数的返回值)注册到fdevent系统中。 该动作调用的是network_register_fdevents()函数,定义在network.c文件中: ~~~ /** * 在fd events系统中注册监听socket。 * 这个函数在子进程中被调用。 */ int network_register_fdevents(server * srv) { size_t i; if (-1 == fdevent_reset(srv->ev)){return -1;} /* * register fdevents after reset */ /* 遍历所有的监听fd并将其注册到fdevent系统中 */ for (i = 0; i < srv->srv_sockets.used; i++) { server_socket *srv_socket = srv->srv_sockets.ptr[i]; fdevent_register(srv->ev, srv_socket->fd, network_server_handle_fdevent, srv_socket); fdevent_event_add(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN); } return 0; } ~~~ 在初始化网络的过程中,调用socket函数之后,将其返回值(监听fd)保存在server结构体的srv_sockets成员中,这个成员是一个server_socket_array结构体,而server_socket_array结构体是server_socket结构体的指针数组。 server_socket结构体定义如下: ~~~ typedef struct { sock_addr addr; //socket fd对应的的地址。 int fd; //socket()函数返回的监听fd int fde_ndx; //和fd相同。 buffer *ssl_pemfile; buffer *ssl_ca_file; buffer *ssl_cipher_list; unsigned short ssl_use_sslv2; unsigned short use_ipv6; //标记是否使用ipv6 unsigned short is_ssl; buffer *srv_token; #ifdef USE_OPENSSL SSL_CTX *ssl_ctx; #endif unsigned short is_proxy_ssl; } server_socket; ~~~ fdevent_register()函数: ~~~ int fdevent_register(fdevents * ev, int fd, fdevent_handler handler, void *ctx) { /* 创建一个fdnode的实例,然后对其成员赋值 */ fdnode *fdn; fdn = fdnode_init(); fdn->handler = handler; fdn->fd = fd; fdn->ctx = ctx; /* 以fd为下标将实例存入fdevents结构体中的fdarray数组中。 * fd作为下标可以将查询时间变为 O(1) */ ev->fdarray[fd] = fdn; return 0; } ~~~ 第三个参数是一个函数指针,其定义为 ~~~ typedef handler_t(*fdevent_handler) (void *srv, void *ctx, int revents) ~~~ 这个函数指针对应XXX_handle_fdevent()类型的函数。比如network.c/ network_server_handle_fdevent() ,connections.c/ connection_handle_fdevent()。 这些函数的作用是在fdevent系统检测到fd有IO事件发生时,处理这些IO事件。 比如,network_server_handle_fdevent()处理监听fd(socket函数的返回值)发生的IO事件; connection_handle_fdevent()处理连接fd(accept函数的返回值)发生的IO事件。 除了上面的两个函数,还有 ~~~ stat_cacahe.c/stat_cache_handle_fdevent(), mod_cgi.c/cgi_handle_fdevent(), mod_fastcgi.c/fcgi_handle_fdevent(), mod_proxy.c/ proxy_handle_fdevent()和 mod_scgi.c/scgi_handle_fdevent()等。 ~~~ fdevent_event_add函数: ~~~ int fdevent_event_add(fdevents * ev, int *fde_ndx, int fd, int events) { int fde = fde_ndx ? *fde_ndx : -1; if (ev->event_add) fde = ev->event_add(ev, fde, fd, events) if (fde_ndx) *fde_ndx = fde; return 0; } ~~~ 函数中调用了fdevents结构体中event_add函数指针对应的函数。 我们看看fdevent_linux_sysepoll.c中的fdevent_linux_sysepoll_event_add()函数,这个函数的地址在初始化的时候被赋给fdevents中的event_add指针: ~~~ static int fdevent_linux_sysepoll_event_add(fdevents * ev, int fde_ndx, int fd, int events) { struct epoll_event ep; int add = 0; if (fde_ndx == -1) //描述符不在epoll的检测中,增加之。 add = 1; memset(&ep, 0, sizeof(ep)); ep.events = 0; /** * 在ep中设置需要监听的IO事件。 * EPOLLIN : 描述符可读。 * EPOLLOUT :描述符可写。 * 其他的事件还有:EPOLLRDHUP , EPOLLPRI, EPOLLERR, EPOLLHUP, EPOLLET, EPOLLONESHOT等。 */ if (events & FDEVENT_IN) ep.events |= EPOLLIN; if (events & FDEVENT_OUT) ep.events |= EPOLLOUT; /* * EPOLLERR :描述符发生错误。 * EPOLLHUP :描述符被挂断。通常是连接断开。 */ ep.events |= EPOLLERR | EPOLLHUP /* | EPOLLET */ ; ep.data.ptr = NULL; ep.data.fd = fd; /* * EPOLL_CTL_ADD : 增加描述符fd到ev->epoll_fd中,并关联ep中的事件到fd上。 * EPOLL_CTL_MOD : 修改fd所关联的事件。 */ if (0 != epoll_ctl(ev->epoll_fd, add ?EPOLL_CTL_ADD : EPOLL_CTL_MOD, fd, &ep)) { fprintf(stderr, "%s.%d: epoll_ctl failed: %s, dying\n",__FILE__,__LINE__, strerror(errno)); SEGFAULT(); return 0; } return fd; } ~~~ 函数的第四个参数events是一个整型,每一位对应一种IO事件。 在network_register_fdevents函数中,传给fdevent_event_add()函数的第四个参数是FDEVENT_IN,这是一个宏: ~~~ /* * 用于标记文件描述符的状态 */ #define FDEVENT_IN BV(0) //文件描述符是否可写 #define FDEVENT_PRI BV(1) //不阻塞的可读高优先级的数据 poll #define FDEVENT_OUT BV(2) //文件描述符是否可读 #define FDEVENT_ERR BV(3) //文件描述符是否出错 #define FDEVENT_HUP BV(4) //已挂断 poll #define FDEVENT_NVAL BV(5) //描述符不引用一打开文件 poll ~~~ 其中BV也是一个宏,定义在settings.c文件中: ~~~ #define BV(x) (1 << x) ~~~ 其作用就是将一个整数变量第x位置1,其余置0。 通过这些宏,就可以在一个整数中用不同的位表示不同的事件; 这些宏和epoll.h中的枚举EPOLL_EVENTS对应。 由于当有连接请求时,监听fd的表现是有数据可读,因此,只监听其FDEVENT_IN事件。注册之后,监听fd就开始等待连接请求。 ~~~ //启动事件轮询。底层使用的是IO多路转接。 if ((n = fdevent_poll(srv->ev, 1000)) > 0) { /* n是事件的数量 */ int revents; int fd_ndx = -1; /* 逐个处理已经准备好的请求,直到所有的请求处理结束 */ do { fdevent_handler handler; void *context; handler_t r; fd_ndx = fdevent_event_next_fdndx(srv->ev, fd_ndx); //获得发生了 I/O 事件的文件描述符在 fdarray 中的索引 revents = fdevent_event_get_revent(srv->ev, fd_ndx); //获得该文件描述符上发生的 I/O 事件类型 fd = fdevent_event_get_fd(srv->ev, fd_ndx); //获得该文件描述符 handler = fdevent_get_handler(srv->ev, fd); //获得 I/O 事件处理的回调函数 context = fdevent_get_context(srv->ev, fd); //获得 I/O 事件处理的上下文环境 /* * connection_handle_fdevent needs a joblist_append */ /** * 调用回调函数进行I/O事件处理,并传入相关参数 */ switch (r = (*handler) (srv, context, revents)) { case HANDLER_FINISHED: case HANDLER_GO_ON: case HANDLER_WAIT_FOR_EVENT: case HANDLER_WAIT_FOR_FD: break; case HANDLER_ERROR: SEGFAULT(); break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } }while (--n > 0); } else if (n < 0 && errno != EINTR) { log_error_write(srv, __FILE__, __LINE__, "ss","fdevent_poll failed:", strerror(errno)); } ~~~ 首先调用fdevent_poll()函数等待IO事件发生,如果没有IO事件,程序会阻塞在这个函数中。 如果有fd发生了IO事件,则从fdevent_poll函数中返回,返回值是发生了IO事件的fd的数量。 fdevent_poll()函数调用fdevents结构体中的poll,最终调用的是epoll_wait()函数。epoll_wait()函数将发生了IO事件的fd对应的epoll_evet结构体实例存储在fdevents结构体的epoll_events数组成员中。 fdevent_event_next_fdndx函数返回epoll_events数组中下一个元素的下标,fdevent_event_get_revent函数调用ev->event_get_revent()获得fd发生的IO事件,最终调用的是: ~~~ static int fdevent_linux_sysepoll_event_get_revent(fdevents * ev, size_t ndx) { int events = 0, e; e = ev->epoll_events[ndx].events; if (e & EPOLLIN) events |= FDEVENT_IN; if (e & EPOLLOUT) events |= FDEVENT_OUT; if (e & EPOLLERR) events |= FDEVENT_ERR; if (e & EPOLLHUP) events |= FDEVENT_HUP; if (e & EPOLLPRI) //有紧急数据到达(带外数据) events |= FDEVENT_PRI; return e; } ~~~ 这个函数就做了一个转换。 最后,在switch语句中调用fd对应的handler函数处理事件。对于监听fd,调用的函数为: ~~~ /** * 这个是监听socket的IO事件处理函数。 * 主要工作是建立和客户端的socket连接。只处理读事件。在处理过程中, * 每次调用这个函数都试图一次建立100个连接,这样可以提高效率。 */ handler_t network_server_handle_fdevent(void *s, void *context, int revents) { server *srv = (server *) s; server_socket *srv_socket = (server_socket *) context; connection *con; int loops = 0; UNUSED(context); /* * 只有fd事件是FDEVENT_IN时,才进行事件处理。 */ if (revents != FDEVENT_IN) { log_error_write(srv, __FILE__, __LINE__, "sdd", "strange event for server socket", srv_socket->fd, revents); return HANDLER_ERROR; } /* * accept()s at most 100 connections directly we jump out after 100 to give the waiting connections a chance *一次监听fd的IO事件,表示有客户端请求连接,对其的处理就是建立连接。建立连接后并不急着退出函数, * 而是继续尝试建立新连接,直到已经建立了100次连接。这样可以提高效率。 */ for (loops = 0; loops < 100 && NULL != (con =connection_accept(srv, srv_socket)); loops++) { handler_t r; //根据当前状态,改变con的状态机,并做出相应的动作。 connection_state_machine(srv, con); switch (r = plugins_call_handle_joblist(srv, con)) { case HANDLER_FINISHED: case HANDLER_GO_ON: break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } } return HANDLER_GO_ON; } ~~~ 监听fd有IO事件,表示有客户端请求连接,对其的处理就是建立连接。在这个函数中,建立连接后并不急着退出,而是继续【尝试】建立新连接,直到已经建立了100次连接。这样可以提高效率。 connection_accept()函数接受连接请求并返回一个connection结构体指针。接着对这个连接启动状态机。然后把连接加到作业队列中。 注意,在将监听fd注册到fdevent系统时,它被设置成了非阻塞的,因此,如果在调用accept()函数时没有连接请求,那么accept()函数会直接出错返回,这样connection_accept就返回一个NULL,退出了for循环。因此,这里所说的建立100次连接只是”尝试“而已,并不会在没有连接的时候阻塞。 至此,fdevent系统对于监听fd的处理就完成了一个循环。 处理完IO事件以后fd接着在epoll中等待下一次事件。
';

fdevent系统-初始化

最后更新于:2022-04-01 15:57:08

C程序在进行真正的编译之前都要进行预编译。 我们看看fdevent系统中的一些宏: ~~~ #if defined(HAVE_EPOLL_CTL) && defined(HAVE_SYS_EPOLL_H) # if defined HAVE_STDINT_H # include <stdint.h> # endif # define USE_LINUX_EPOLL # include <sys/epoll.h> #endif #if defined HAVE_POLL && (defined(HAVE_SYS_POLL_H) || defined(HAVE_POLL_H)) # define USE_POLL # ifdef HAVE_POLL_H # include <poll.h> # else # include <sys/poll.h> # endif # if defined HAVE_SIGTIMEDWAIT && defined(__linux__) # define USE_LINUX_SIGIO # include <signal.h> # endif #endif //…… ~~~ 上面的宏判断系统中是否有对应的多路IO系统,如果有,就定义对应的USE_XXX宏。 预编译完这些宏以后,对于当前系统中有的多路IO系统,就会有对应的USE_XXX符号被定义。预编译器接着运行,将那些不需要的代码都忽略。 fdevent.h中对所有可能的多路IO系统都定义了初始化函数: ~~~ int fdevent_select_init(fdevents * ev); int fdevent_poll_init(fdevents * ev); int fdevent_linux_rtsig_init(fdevents * ev); int fdevent_linux_sysepoll_init(fdevents * ev); int fdevent_solaris_devpoll_init(fdevents * ev); int fdevent_freebsd_kqueue_init(fdevents * ev); ~~~ 因此,对于系统中没有的多路IO系统对应的初始化函数,预编译结束后,这些初始化函数被定义为报错函数。如epoll对应的为: ~~~ #ifdef USE_LINUX_EPOLL /* 当定义了epoll时,epoll的函数实现代码 */ #else /* 当未定义epoll时,epoll只实现init函数(这是必须的,因为该函数在fdevent.h中定义了),并将它实现为报错函数 */ int fdevent_linux_sysepoll_init(fdevents *ev) { UNUSED(ev); fprintf(stderr, "%s.%d: linux-sysepoll not supported, try to set server.event-handler = \"poll\" or \"select\"\n", __FILE__, __LINE__); return -1; } #endif ~~~ 预编译后,开始真正的编译。我们假设系统中只有epoll。 首先,我们看一看配置中有关fdevent的设置。进入configfile.c文件中的config_set_defaults()函数。函数的一开始就有这么一个定义: ~~~ struct ev_map { fdevent_handler_t et; const char *name; } event_handlers[] = { /* * - poll is most reliable - select works everywhere - * linux-* are experimental */ #ifdef USE_POLL {FDEVENT_HANDLER_POLL, "poll"}, #endif #ifdef USE_SELECT {FDEVENT_HANDLER_SELECT, "select"}, #endif #ifdef USE_LINUX_EPOLL {FDEVENT_HANDLER_LINUX_SYSEPOLL, "linux-sysepoll"}, #endif #ifdef USE_LINUX_SIGIO {FDEVENT_HANDLER_LINUX_RTSIG, "linux-rtsig"}, #endif #ifdef USE_SOLARIS_DEVPOLL {FDEVENT_HANDLER_SOLARIS_DEVPOLL, "solaris-devpoll"}, #endif #ifdef USE_FREEBSD_KQUEUE {FDEVENT_HANDLER_FREEBSD_KQUEUE, "freebsd-kqueue"}, {FDEVENT_HANDLER_FREEBSD_KQUEUE, "kqueue"}, #endif {FDEVENT_HANDLER_UNSET, NULL} }; ~~~ 上面定义了一个struct ev_map类型的数组。数组的内容是当前系统中存在的多路IO系统的类型和名称。这里排序很有意思,从注释中可以看出,poll排在最前因为最可靠,select其次因为支持最广泛,epoll第三因为是最好的。 在这里,如果配置文件有配置,那么按配置文件来,如果没配置,则取上面数组中的第一个。以下是配置文件的格式: ~~~ ## set the event-handler (read the performance ##section in the manual) server.event-handler = "freebsd-kqueue" # needed on OS X ~~~ 接下来我们看server.c中的main函数。 前面有一些是设置fd数量的,其中select比较特殊需要特别处理,fd数量一个是系统的限制,一个是用户配置的限制。 当程序产生子进程后,在子进程中执行的第一条语句就是初始化fdevent系统: ~~~ if (NULL == (srv->ev = fdevent_init(srv->max_fds + 1, srv->event_handler))) { log_error_write(srv, __FILE__, __LINE__, "s", "fdevent_init failed"); return -1; } ~~~ 进入fdevent_init()函数: ~~~ /** * 初始化文件描述符事件数组fdevent */ fdevents *fdevent_init(size_t maxfds, fdevent_handler_t type) { fdevents *ev; //内存被初始化为0 ev = calloc(1, sizeof(*ev)); //分配数组 ev->fdarray = calloc(maxfds, sizeof(*ev->fdarray)); ev->maxfds = maxfds; //根据设定的多路IO的类型进行初始化。 switch (type) { case FDEVENT_HANDLER_POLL: if (0 != fdevent_poll_init(ev)) { fprintf(stderr, "%s.%d: event-handler poll failed\n", __FILE__, __LINE__); return NULL; } break; case FDEVENT_HANDLER_SELECT: if (0 != fdevent_select_init(ev)) { fprintf(stderr, "%s.%d: event-handler select failed\n", __FILE__, __LINE__); return NULL; } break; case FDEVENT_HANDLER_LINUX_RTSIG: if (0 != fdevent_linux_rtsig_init(ev)) { fprintf(stderr, "%s.%d: event-handler linux-rtsig failed, try to set server.event-handler = \"poll\" or \"select\"\n", __FILE__, __LINE__); return NULL; } break; case FDEVENT_HANDLER_LINUX_SYSEPOLL: if (0 != fdevent_linux_sysepoll_init(ev)) { fprintf(stderr, "%s.%d: event-handler linux-sysepoll failed, try to set server.event-handler = \"poll\" or \"select\"\n", __FILE__, __LINE__); return NULL; } break; case FDEVENT_HANDLER_SOLARIS_DEVPOLL: if (0 != fdevent_solaris_devpoll_init(ev)) { fprintf(stderr, "%s.%d: event-handler solaris-devpoll failed, try to set server.event-handler = \"poll\" or \"select\"\n", __FILE__, __LINE__); return NULL; } break; case FDEVENT_HANDLER_FREEBSD_KQUEUE: if (0 != fdevent_freebsd_kqueue_init(ev)) { fprintf(stderr, "%s.%d: event-handler freebsd-kqueue failed, try to set server.event-handler = \"poll\" or \"select\"\n", __FILE__, __LINE__); return NULL; } break; default: fprintf(stderr, "%s.%d: event-handler is unknown, try to set server.event-handler = \"poll\" or \"select\"\n", __FILE__, __LINE__); return NULL; } return ev; } ~~~ fdevent_init()函数根据fdevent_handler_t的值调用相应的初始化函数。我们进入fdevent_linux_sysepoll_init()函数: ~~~ int fdevent_linux_sysepoll_init(fdevents * ev) { ev->type = FDEVENT_HANDLER_LINUX_SYSEPOLL; #define SET(x) \ ev->x = fdevent_linux_sysepoll_##x; /* 通过SET宏对fdevents结构体中的函数指针赋值。然后创建epoll,最后做一些设置。*/ SET(free); SET(poll); SET(event_del); SET(event_add); SET(event_next_fdndx); SET(event_get_fd); SET(event_get_revent); //创建epoll if (-1 == (ev->epoll_fd = epoll_create(ev->maxfds))) { fprintf(stderr, "%s.%d: epoll_create failed (%s), try to set server.event-handler = \"poll\" or \"select\"\n", __FILE__, __LINE__, strerror(errno)); return -1; } //设置epoll_fd为运行exec()函数时关闭。 if (-1 == fcntl(ev->epoll_fd, F_SETFD, FD_CLOEXEC)) { fprintf(stderr, "%s.%d: epoll_create failed (%s), try to set server.event-handler = \"poll\" or \"select\"\n", __FILE__, __LINE__, strerror(errno)); close(ev->epoll_fd); return -1; } //创建fd事件数组。在epoll_wait函数中使用。 //存储发生了IO事件的fd和对应的IO事件。 ev->epoll_events = malloc(ev->maxfds * sizeof(*ev->epoll_events)); return 0; } ~~~ 至此fdevent的初始化工作全部完成。
';

fdevent系统-结构体及对外接口

最后更新于:2022-04-01 15:57:05

fdevent系统主要是处理各种IO事件,在web服务器中,主要就是向socket写数据和从socket读数据。 web服务器是IO密集型程序,因此,大部分的web服务器都采用非阻塞IO进行数据的读写。 lighttpd通过fdevent系统,采用类似OO中面向对象的方式将对IO事件的处理进行封装,对于不同的IO系统,提供一个统一的接口。 lighttpd采用了所谓的Reactor模式,也就是非阻塞IO加多路复用(non-blocking IO + IO multiplexing)。在多路复用上,lighttpd通过fdevent将各种不同的实现进行封装。 fdevent.h中fdevents结构体相当于一个虚基类,其中的函数指针是纯虚函数。对于每种实现,则相当于继承了这个基类并实现了其中的纯虚函数,也就是给函数指针赋一个函数地址值: ~~~ typedef struct fdevents { fdevent_handler_t type; //多路IO类型 fdnode **fdarray; //文件描述符数组 size_t maxfds; //最大的文件描述符数 #ifdef USE_LINUX_SIGIO int in_sigio; int signum; sigset_t sigset; siginfo_t siginfo; bitset *sigbset; #endif #ifdef USE_LINUX_EPOLL int epoll_fd; struct epoll_event *epoll_events; #endif #ifdef USE_POLL struct pollfd *pollfds; //描述符及其状态的结构体数组 size_t size; //数组中数据的个数 size_t used; //数组的大小 //用于存储pollfds中为使用的位置。 //由于可能的删除操作,会是pollfds中存在空档,将这些空档 //的索引存在unused中,便于下次插入操作时直接使用这些空档 //减少空间的浪费。 buffer_int unused; #endif #ifdef USE_SELECT //三个文件描述符集合 fd_set select_read; //可读,对应FDEVENT_IN fd_set select_write; //可写,对应FDEVENT_OUT fd_set select_error; //处于异常条件,对应FDEVENT_ERR //由于select函数会修改上面的三个集合, //因此,在这里保存一个初始的副本。 fd_set select_set_read; fd_set select_set_write; fd_set select_set_error; int select_max_fd; //最大的文件描述符数。 #endif #ifdef USE_SOLARIS_DEVPOLL int devpoll_fd; struct pollfd *devpollfds; #endif #ifdef USE_FREEBSD_KQUEUE int kq_fd; struct kevent *kq_results; bitset *kq_bevents; #endif #ifdef USE_SOLARIS_PORT int port_fd; #endif //统一的操作接口,与后面的函数声明对应。 int (*reset) (struct fdevents * ev); void (*free) (struct fdevents * ev); int (*event_add) (struct fdevents * ev, int fde_ndx, int fd, int events); int (*event_del) (struct fdevents * ev, int fde_ndx, int fd); int (*event_get_revent) (struct fdevents * ev, size_t ndx); int (*event_get_fd) (struct fdevents * ev, size_t ndx); int (*event_next_fdndx) (struct fdevents * ev, int ndx); int (*poll) (struct fdevents * ev, int timeout_ms); int (*fcntl_set) (struct fdevents * ev, int fd); } fdevents; ~~~ 结构体的第一个成员是一个枚举类型fdevent_handler_t: ~~~ typedef enum { FDEVENT_HANDLER_UNSET, //未定义 FDEVENT_HANDLER_SELECT, //select FDEVENT_HANDLER_POLL, //poll FDEVENT_HANDLER_LINUX_RTSIG, //rtsig FDEVENT_HANDLER_LINUX_SYSEPOLL, //sysepoll FDEVENT_HANDLER_SOLARIS_DEVPOLL, //devpoll FDEVENT_HANDLER_FREEBSD_KQUEUE, //kqueue FDEVENT_HANDLER_SOLARIS_PORT //port } fdevent_handler_t; ~~~ 结构体中的第二个成员fdnode **fdarray是一个fdnode类型变量的数组。fdnode的定义如下: ~~~ typedef struct _fdnode { fdevent_handler handler; //处理函数指针 void *ctx; //文件描述符的context int fd; //文件描述符 struct _fdnode *prev, *next; //指针 } fdnode; ~~~ fdevent_handler handler是一个函数指针,用来存储这个描述符处理函数的地址,其定义为 ~~~ typedef handler_t(*fdevent_handler) (void *srv, void *ctx, int revents); ~~~ 从最后两个变量可以看出,这应该是一个链表的节点,但是,这个结构体是以数组的形式存储的,也就是fdevents中的fdarray变量,这样可以提高查询的效率。 函数指针: ~~~ /* * 重置和释放fdevent系统。 */ int fdevent_reset(fdevents * ev); void fdevent_free(fdevents * ev); /* * 将fd增加到fd event系统中。events是要对fd要监听的事件。 * fde_ndx是fd对应的fdnode在ev->fdarray中的下标值的指针。 * 如果fde_ndx==NULL,则表示在fd event系统中增加fd。如果不为NULL,则表示这个 * fd已经在系统中存在,这个函数的功能就变为将对fd监听的事件变为events。 */ int fdevent_event_add(fdevents * ev, int *fde_ndx, int fd, int events); /* * 从fd event系统中删除fd。 fde_ndx的内容和上面的一致。 */ int fdevent_event_del(fdevents * ev, int *fde_ndx, int fd); /* * 返回ndx对应的fd所发生的事件。 * 这里的ndx和上面的fde_ndx不一样,这个ndx是ev->epoll_events中epoll_event结构体的下标。 * 第一次调用的时候,通常ndx为-1。 * 这个ndx和其对应的fd没有关系。而fde_ndx等于其对应的fd。 */ int fdevent_event_get_revent(fdevents * ev, size_t ndx); /* * 返回ndx对应的fd。 */ int fdevent_event_get_fd(fdevents * ev, size_t ndx); /* * 返回下一个发生IO事件的fd。 */ int fdevent_event_next_fdndx(fdevents * ev, int ndx); /* * 开始等待IO事件。timeout_ms是超时限制。 */ int fdevent_poll(fdevents * ev, int timeout_ms); /** * 设置fd的状态,通常是设置为运行exec在子进程中关闭和非阻塞。 */ int fdevent_fcntl_set(fdevents * ev, int fd); ~~~ 在fdevent.c文件中,这些函数的实现基本上都是简单的调用fdevents结构体中对应的函数指针。对于lighttpd,通过调用上面的这些函数完成IO事件的处理,对于具体到底是谁处理了这些事件,lighttpd并不知道,也不关心。 其他的函数声明: ~~~ /* * 返回fd对应的事件处理函数地址。也就是fdnode中handler的值。 */ fdevent_handler fdevent_get_handler(fdevents * ev, int fd); /* * 返回fd对应的环境。也就是fdnode中ctx的值。 */ void *fdevent_get_context(fdevents * ev, int fd); /* * 注册和取消注册fd。 * 就是生成一个fdnode,然后保存在ev->fdarray中。或者删除之。 */ int fdevent_register(fdevents * ev, int fd, fdevent_handler handler, void *ctx); int fdevent_unregister(fdevents * ev, int fd); /** * 初始化各种多路IO。 */ int fdevent_select_init(fdevents * ev); int fdevent_poll_init(fdevents * ev); int fdevent_linux_rtsig_init(fdevents * ev); int fdevent_linux_sysepoll_init(fdevents * ev); int fdevent_solaris_devpoll_init(fdevents * ev); int fdevent_freebsd_kqueue_init(fdevents * ev); ~~~ 文件fdevent.h中声明的一系列函数就是fdevent系统对外的接口,这相当于类的公有函数。lighttpd通过调用这些函数来实现IO事件的处理。在这些函数的具体实现仅仅是简单的调用了fdevents结构体中的函数指针。而这些函数指针所对应的函数分别定义在以fdevent_开头的.c文件中。从这些文件的名字可以看出其所对应的IO系统。在这些文件中,函数大多是static,这就相当于类的私有函数,起到隐藏具体实现的效果。 举个例子: 对外接口fdevent_event_add (相当于公有函数)调用结构体中的函数 (*event_add) , 而函数指针 event_add (纯虚函数)指向以fdevent_开头的.c文件中的相应函数(如fdevent_linux_sysepoll_event_add,私有函数)。 如此一来,具体使用了哪一个IO系统是完全对外隐藏的。
';

通用数组array.c(h)

最后更新于:2022-04-01 15:57:03

首先回顾以下,在array.h中,UNSET类型是一个宏: ~~~ #define DATA_UNSET \ data_type_t type; \ buffer *key; \ int is_index_key; /* 1 if key is a array index (autogenerated keys) */ \ struct data_unset *(*copy)(const struct data_unset *src); \ void (* free)(struct data_unset *p); \ void (* reset)(struct data_unset *p); \ int (*insert_dup)(struct data_unset *dst, struct data_unset *src); \ void (*print)(const struct data_unset *p, int depth) typedef struct data_unset { DATA_UNSET; } data_unset; ~~~ 使用宏DATA_UNSET,这样可以方便其他类型在定义中直接引用DATA_UNSET宏来模拟继承。在宏DATA_UNSET中,定义了下面五个函数指针: ~~~ struct data_unset *(*copy)(const struct data_unset *src); void (* free)(struct data_unset *p); void (* reset)(struct data_unset *p); int (*insert_dup)(struct data_unset *dst, struct data_unset *src); void (*print)(const struct data_unset *p, int depth) ~~~ 这些函数指针相当于UNSET的成员函数,其他类型可以通过对这五个指针赋值来实现成员函数的重写(Overwrite)。每种类型都配有自己特有的初始化函数,在这些初始化函数中,对上面这五个函数指针进行赋值。 作者很巧妙地用面向对象的思想来组织C代码。 我们可以实例性地看下STRING类型的初始化函数(data_string.c): ~~~ data_string *data_string_init(void) { data_string *ds; ds = calloc(1, sizeof(*ds)); //分配的空间会自动清零 assert(ds); /* 初始化数据成员,buffer_init用来分配内存空间 */ ds->key = buffer_init(); ds->value = buffer_init(); /* 成员函数,对函数指针赋值 */ ds->copy = data_string_copy; ds->free = data_string_free; ds->reset = data_string_reset; ds->insert_dup = data_string_insert_dup; ds->print = data_string_print; ds->type = TYPE_STRING; return ds; } ~~~ array.h中,各个数据类型的标志的定义: ~~~ typedef enum { TYPE_UNSET, /* 数据的类型未设置, 这几种数据类型使用了面向对象的设计思想, 这个类型相当于父类型 */ TYPE_STRING, /* 字符串类型 */ TYPE_COUNT, /* COUNT类型 */ TYPE_ARRAY, /* 数组类型 */ TYPE_INTEGER, /* 整数类型 */ TYPE_FASTCGI, /* FASTCGI类型 */ TYPE_CONFIG /* CONFIG类型 */ } data_type_t; ~~~ 除了UNSET类型,其他类型的操作函数的实现都在文件data_XXX.c中。这七个类型构成了通用数组所要处理的类型。 为何叫做通用数组呢? 因为在数组的定义和实现中只使用UNSET类型,基于上面的定义,通用数组可以不用关心数组中存储的到底是哪种具体的类型,只需将其按照UNSET类型来处理就可以了,所以说是通用的。 下面这个定义是通用数组的核心定义: ~~~ typedef struct { /* UNSET类型的指针型数组,存放数组中的元素 */ data_unset **data; /* 按 data 数据的排序顺序保存 data 的索引 */ size_t *sorted; size_t used; /* data中已经使用了的长度,也就是数组中元素个数 */ /* data的大小。data的大小会根据数据的多少变化,会为以后的数据预先分配空间 */ size_t size; /* 用于保存唯一索引,初始为 0,之后递增 */ size_t unique_ndx; /* 比used大的最小的2的倍数。也就是离used最近的且比used大的2的倍数 ,用于在数组中利用二分法查找元素*/ size_t next_power_of_2; /* data is weakref, don't bother the data */ /* data就是一个指针,不用关系其所指向的内容 */ int is_weakref; } array; ~~~ sorted(图中仅展示data_unset中的key): ![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-25_56ceaefa62429.jpg "") 还有一个定义: ~~~ typedef struct { DATA_UNSET; array *value; } data_array; ~~~ 它定义了一个array类型的数据,也就是说,通用数组中存放的数据可以是通用数组,这样可以形成多维的通用数组。 在array.h中定义了如下的通用数组操作函数: 1、array *array_init(void); 初始化数组,分配空间。 2、array *array_init_array(array * a); 用数组a来初始化一个数组。也就是得到一个a的深拷贝。 3、void array_free(array * a); 释放数组。释放所有空间。 4、void array_reset(array * a); 重置data中的所有数据(调用UNSET类型数据中的reset函数),并将used设为0。相当于清空数组。 5、int array_insert_unique(array * a, data_unset * str); 将str插入到数组中,如果数组中存在key与str相同的数据,则把str的内容拷贝到这个数据中。 6、data_unset *array_pop(array * a); 弹出data中的最后一个元素,返回其指针,data中的最后一个位置设为NULL。 7、int array_print(array * a, int depth); 打印数组中的内容。depth参数用于在打印多维数组时,实现缩进。 8、a_unset *array_get_unused_element(array * a, data_type_t t); 返回第一个未使用的数据,也就是used位置的数据,这个数据不在数组中,返回这个数据指针后,将data[unsed]设为NULL。可能返回NULL。 9、data_unset *array_get_element(array * a, const char *key); 根据key值,返回数组中key值与之相同的数据 10、data_unset *array_replace(array * a, data_unset * du); 如果数组中有与du的key值相同的数据,则用du替换那个数据,并返回那个数据的指针。如果不存在,则把du插入到数组中。(调用data_insert_unique函数) 11、 int array_strcasecmp(const char *a, size_t a_len, const char *b, size_t b_len); 这个函数并没实现,仅仅给出了上面的定义。 12、void array_print_indent(int depth); 根据depth打印空白,实现缩进。 13、size_t array_get_max_key_length(array * a); 返回数组中最长的key的长度。 下面看看array_get_index函数: ~~~ /* * sorted数组是个下标数组,存放的是排好序的输入元素的下标(见前面的图), * 相当于一个排好序的数组。 * 利用sorted数组进行二分查找。 * 若找到,返回元素在data数组中的位置,并通过rndx返回 * 其在sorted数组中的位置。 * 若没有找到,通过rndx返回此元素在sorted中的位置,并返回-1 */ static int array_get_index(array *a, const char *key, size_t keylen, int *rndx) { int ndx = -1; int i, pos = 0; /* pos中存放的是元素在数组data中的位置 */ if (key == NULL) return -1; /* * 当data的空间不够时,通用数组每次为data增加16个空间,第一次初始化时, * data的长度为16。因此,size始终是16的倍数。 * 而next_power_of_2是大于used最小的2的倍数,如used=5,那么 * next_power_of_2就等于8。 * 这样,used始终大于等于next_power_of_2的1/2。 * * next_power_of_2类似于一个标杆,利用这个标杆进行二分搜索 * 可以减少很多出错的几率,也使程序更加易懂。 */ /* try to find the string */ for (i = pos = a->next_power_of_2 / 2; ; i >>= 1) { int cmp; if (pos < 0) { pos += i; } else if (pos >= (int)a->used) { pos -= i; } else { /* 比较两个元素的key值 */ cmp = buffer_caseless_compare(key, keylen, a->data[a->sorted[pos]]->key->ptr, a->data[a->sorted[pos]]->key->used); if (cmp == 0) { /* found */ ndx = a->sorted[pos]; break; } else if (cmp < 0) { pos -= i; /* 所找数据在前半部分 */ } else { pos += i; /* 所找数据在后半部分 */ } } if (i == 0) break; } if (rndx) *rndx = pos; return ndx; } ~~~ 本数据结构的实现中,二分查找是一个特色,然后用sorted数组只对data中的数据的下标排序,也是一个很有用的技巧。
';

数据结构的组织

最后更新于:2022-04-01 15:57:01

我们看源文件,可以看到一些前缀了data_的文件: data_array.c、data_config.c、data_count.c、data_fastcgi.c、data_integer.c、data_string.c, 这些文件实现的是 lighttpd 的数据结构部分。 可以看到它们内部都包含了array.h。 ### 结构体定义 在 array.h 头文件内有几个结构体定义: data_unset、data_array、 data_config、data_count、data_fastcgi、data_integer、data_string。 data_unset 结构体: ~~~ typedef struct data_unset { DATA_UNSET; } data_unset; ~~~ DATA_UNSET 是一个宏,将其展开后就是其结构体元素: ~~~ typedef enum { TYPE_UNSET, TYPE_STRING, TYPE_COUNT, TYPE_ARRAY, TYPE_INTEGER, TYPE_FASTCGI, TYPE_CONFIG } data_type_t; #define DATA_UNSET \ data_type_t type; \ buffer *key; \ int is_index_key; /* 1 if key is a array index (autogenerated keys) */ \ struct data_unset *(*copy)(const struct data_unset *src); \ void (* free)(struct data_unset *p); \ void (* reset)(struct data_unset *p); \ int (*insert_dup)(struct data_unset *dst, struct data_unset *src); \ void (*print)(const struct data_unset *p, int depth) ~~~ 再看其它的几个结构体,不难发现每一个结构体的第一个元素都是 DATA_UNSET 宏。 另外,各个结构体也包含有自己特有的一些属性元素。 如果用面向对象的思想来看待这些结构体,很容易想到把data_unset 看作基类,而其它 data_array 等看作子类! ### 结构体实现 每个结构体的实现对应一个源文件,比如 data_array 的实现在 data_array.c 内、 data_config的实现在 data_config.c 内等。 每一个源文件都包含一个 data_xxxxx_init 的初始化函数和若干 个接口函数,这些接口函数将在初始化函数内被赋值给对应的函数指针,除此之外,初始化函数还要对其数据元素进行初始化,比如分配存储空间,赋初值,设置数据类型等。 接口函数功能: init:创建初始化,包括数据元素存储分配,函数指针赋值等; copy:拷贝创建; free:释放清理结构体; reset:将结构体数据元素重置为初始值; insert_dup:复制插入,一般仅是清理传入的第二个参数结构体; print:打印结构体数据元素。 现只做整体的了解,具体的数据结构源码后面再继续学习~
';

网络服务主模型

最后更新于:2022-04-01 15:56:59

### 一.概述 Lighttpd采用多进程网络服务模型。 进程分两种:监控进程watcher 和 工作进程 workers。 监控进程:fork工作进程并监视工作进程的数目,一旦有工作进程退出,监控进程立即fork新的工作进程。 工作进程:接收客户端请求并做出服务响应。 一般情况下,存在一个监控进程和多个工作进程。 max-worker值默认为0时,没有监控进程,只有一个工作进程。 关于初始化:Lighttpd很多地方内存申请都是采用calloc,malloc()和calloc()的主要区别是前者不能初始化所分配的内存空间,而后者能。 主流程入口文件:server.c ### 二.Lighttpd进程守护化 main函数中的函数daemonize调用使得Lighttpd进程转换为守护进程,从而脱离控制终端,在后台提供服务,避免了执行过程中的信息在终端上显示,也避免了服务被终端信息打断。 (启动时假如选项D可不守护化) 下面我们来分析下: ~~~ #ifdef HAVE_FORK static void daemonize(void) { #ifdef SIGTTOU /* 下面用于屏蔽一些有关控制终端操作的信号,防止守护进程没有正常运作之前控制终端受到干扰退出或挂起 */ signal(SIGTTOU, SIG_IGN); //忽略后台进程写控制终端信号 #endif #ifdef SIGTTIN signal(SIGTTIN, SIG_IGN); //忽略后台进程读控制终端信号 #endif #ifdef SIGTSTP signal(SIGTSTP, SIG_IGN); //忽略终端挂起 #endif /* 下面开始从普通进程转换为守护进程 * 目标1:后台运行。 * 做法:脱离控制终端->调用fork之后终止父进程,子进程被init收养,此步达到后台运行的目标。 */ if (0 != fork()) exit(0); /*,目标2:脱离控制终端,登陆会话和进程组。 * 做法:使用setsid创建新会话,成为新会话的首进程,则与原来的 * 登陆会话和进程组自动脱离,从而脱离控制终端。 * (上一步的fork保证了子进程不可能是一个会话的首进程,这是调用setsid的必要条件) */ if (-1 == setsid()) exit(0); /* 上面已经完成了大部分工作,但是有的系统上,当会话首进程打开 * 一个尚未与任何会话相关联的终端设备时,该设备自动作为控制 * 终端分配给该会话。 * 为避免该情况,我们再次fork进程,于是新进程不再是会话首进程。 * 会话首进程退出时可能会给所有会话内的进程发送SIGHUP,而该 * 信号默认是结束进程,故需要忽略该信号来防止孙子进程意外结束。 */ signal(SIGHUP, SIG_IGN); if (0 != fork()) exit(0); /* 最后目标:改变工作目录到根目录。 * 原因:进程活动时,其工作目录所在的文件系统不能卸下。 */ if (0 != chdir("/")) exit(0); } #endif ~~~ 进程属于一个进程组(一个或多个进程的集合),登陆会话是包含一个或多个进程组的集合,这些进程组共享一个控制终端。 ### 三.多进程网络服务模型 Lighttpd一开始是单进程的,在完成一组公共操作后开始转换为多进程。 代码框架如下: ![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-25_56ceaefa43133.jpg "") 具体代码分析注释: ~~~ #ifdef USE_ALARM signal(SIGALRM, signal_handler); /* setup periodic timer (1 second) */ if (setitimer(ITIMER_REAL, &interval, NULL)) { //每隔一秒产生一个ALARM log_error_write(srv, __FILE__, __LINE__, "s", "setting timer failed"); return -1; } getitimer(ITIMER_REAL, &interval); #endif #ifdef HAVE_FORK /* start watcher and workers */ num_childs = srv->srvconf.max_worker; //存放最大的子进程的数目 if (num_childs > 0) { int child = 0; //child变量用于标记是否为子进程,0代表父进程,1代表子进程 while (!child && !srv_shutdown && !graceful_shutdown) { //子进程不可进入,srv_shutdown=1 或 graceful_shutdown=1时父进程跳出 if (num_childs > 0) { switch (fork()) { //创建子进程 case -1: return -1; case 0: child = 1; //子进程标记 break; default: num_childs--; //父进程 break; } } else { //子进程产生完毕 int status; //保存子进程退出状态 if (-1 != wait(&status)) { //阻塞等待子进程退出,收尸 /** * one of our workers went away */ num_childs++; //表示可以再产生新的子进程 } else { switch (errno) { //发生中断 case EINTR: /** * if we receive a SIGHUP we have to close our logs ourself as we don't * have the mainloop who can help us here */ if (handle_sig_hup) { handle_sig_hup = 0; log_error_cycle(srv); //重新打开日志文件 /** * forward to all procs in the process-group * * we also send it ourself */ if (!forwarded_sig_hup) { //通知组内所有进程 forwarded_sig_hup = 1; //只通知一次,使得后面的kill只调用一次 kill(0, SIGHUP); } } break; default: break; } } } } /** * for the parent this is the exit-point */ if (!child) { //父进程的退出点,关闭所以工作进程,做一些清理工作(关闭日志,连接的网络资源,插件,内存等)。 /** * kill all children too */ if (graceful_shutdown) { kill(0, SIGINT); } else if (srv_shutdown) { kill(0, SIGTERM); } log_error_close(srv); network_close(srv); connections_free(srv); plugins_free(srv); server_free(srv); return 0; } } #endif ~~~ 下面的是子进程部分的关键代码分析: 一开始子进程进行各种初始化工作,包括fd时间处理器的初始化(fdevent_init(srv->max_fds + 1, srv->event_handler)),stat cache初始化(stat_cache_init())等。 子进程工作在一个大while循环中。 while的工作流程如下: 1、判断连接是否断开。如果断开,则调用处理程序进行处理并重新开始新一轮的日志记录。 2、判断是否接到了alarm函数发出的信号。接受到信号后,判断服务器记录的时间是否和当前时间相同。如果相同,说明时间还没有过一秒,继续处理连接请求。如果不相同,则时间已经过了一秒。那么,服务器则触发插件,清理超时连接,清理stat-cache缓存。这理里面最重要的是处理超时连接。程序中通过一个for循环查询所有的连接,比较其idle的时间和允许的最大idle时间来判断连接是否超时。如果连接超时,则让连接进入出错的状态(connection_set_state(srv, con, CON_STATE_ERROR);)。 3、判断服务器socket连接是否失效。如果失效了,则在不是服务器过载的情况下将所有连接重新加入到fdevent中。 4、如果socket没有失效,判断服务器是否过载。如果过载了,则关闭所有连接,清理服务器并退出服务器。 5、分配文件描述符。 6、启动事件轮询。等待各种IO时间的发生。包括文件读写,socket请求等。 7、一旦有事件发生,调用相应的处理函数进行处理。 8、最后,检查joblist中是否有未处理的job并处理之。 至此,一次循环结束了。然后,继续循环直到服务器关闭。 ~~~ /* main-loop */ while (!srv_shutdown) { //只要srv_shutdown不为1,工作进程持续执行 int n; size_t ndx; time_t min_ts; if (handle_sig_hup) { //如果收到HUP信号 handler_t r; /* reset notification */ handle_sig_hup = 0; //重置标识 /* cycle logfiles */ switch(r = plugins_call_handle_sighup(srv)) { //通过plugins_call_handle_sighup来调用各个模块的HUP处理函数 case HANDLER_GO_ON: break; default: log_error_write(srv, __FILE__, __LINE__, "sd", "sighup-handler return with an error", r); break; } if (-1 == log_error_cycle(srv)) { //重新打开日志文件,并写入收到HUP信号到日志。此处并没有重新读取配置文件 log_error_write(srv, __FILE__, __LINE__, "s", "cycling errorlog failed, dying"); return -1; } else { #ifdef HAVE_SIGACTION log_error_write(srv, __FILE__, __LINE__, "sdsd", "logfiles cycled UID =", last_sighup_info.si_uid, "PID =", last_sighup_info.si_pid); #else log_error_write(srv, __FILE__, __LINE__, "s", "logfiles cycled"); #endif } } if (handle_sig_alarm) { //收到ALARM信号 /* a new second */ #ifdef USE_ALARM /* reset notification */ handle_sig_alarm = 0; #endif /* get current time */ min_ts = time(NULL); if (min_ts != srv->cur_ts) { int cs = 0; connections *conns = srv->conns; handler_t r; switch(r = plugins_call_handle_trigger(srv)) { //调用plugins_call_handle_trigger来处理各个模块的ALARM信号处理函数 case HANDLER_GO_ON: break; case HANDLER_ERROR: log_error_write(srv, __FILE__, __LINE__, "s", "one of the triggers failed"); break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } /* trigger waitpid */ srv->cur_ts = min_ts; //更新服务器记录时间 /* cleanup stat-cache */ stat_cache_trigger_cleanup(srv); //清除缓存,删除一些比较旧的节点 /** * check all connections for timeouts * */ for (ndx = 0; ndx < conns->used; ndx++) { //处理超时连接 int changed = 0; connection *con; int t_diff; con = conns->ptr[ndx]; if (con->state == CON_STATE_READ || con->state == CON_STATE_READ_POST) { if (con->request_count == 1) { if (srv->cur_ts - con->read_idle_ts > con->conf.max_read_idle) { /* time - out */ #if 0 log_error_write(srv, __FILE__, __LINE__, "sd", "connection closed - read-timeout:", con->fd); #endif /* lighttpd 中采用了状态机(state-engine)去处理每一个连接, * 状态机中的每一种节点表示连接当时所处的状态,包括 connect 、 * reqstart 、 read 、 reqend 、 readpost 、handlereq、 * respstart、write、respend、error、close 这 11 个状态 */ connection_set_state(srv, con, CON_STATE_ERROR); //调用connection_set_state进行状态机的状态转换 changed = 1; } } else { if (srv->cur_ts - con->read_idle_ts > con->conf.max_keep_alive_idle) { /* time - out */ #if 0 log_error_write(srv, __FILE__, __LINE__, "sd", "connection closed - read-timeout:", con->fd); #endif connection_set_state(srv, con, CON_STATE_ERROR); changed = 1; } } } ……………… /* we don't like div by zero */ if (0 == (t_diff = srv->cur_ts - con->connection_start)) t_diff = 1; /* 处理传输速度限制 * 如果某一时刻平均传输速度达到了用户设置的最大值,则停止发送数据(con->traffic_limit_reached将被设为1, * 进入下面if中处理)。只要检测到平均传输速度小于用户设置的最大值就继续发送数据, * 则满足if的条件,con->traffic_limit_reached设为 0,同时调用状态机切换函数。 */ if (con->traffic_limit_reached && (con->conf.kbytes_per_second == 0 || ((con->bytes_written / t_diff) < con->conf.kbytes_per_second * 1024))) { /* enable connection again */ con->traffic_limit_reached = 0; changed = 1; } if (changed) { connection_state_machine(srv, con); } con->bytes_written_cur_second = 0; *(con->conf.global_bytes_per_second_cnt_ptr) = 0; #if 0 if (cs == 0) { fprintf(stderr, "connection-state: "); cs = 1; } fprintf(stderr, "c[%d,%d]: %s ", con->fd, con->fcgi.fd, connection_get_state(con->state)); #endif } if (cs == 1) fprintf(stderr, "\n"); } } /* 根据当前的资源利用情况禁用或启用 server sockets 服务 * * 禁用:当文件描述符(当前文件描述符和等待文件描述符之和)大于 0.9 倍服务器最大 * (系统允许或用户设置)文件描述符数目或当前连接大于最大(系统允许或用户设置)连接 * 数目或收到终止服务器指令时。 * * 启用:当文件描述符(当前文件描述符和等待文件描述符之和)小于 0.8 倍服务器最大 * (系统允许或用户设置)文件描述符数目并且当前连接小于 0.9 倍最大(系统允许或用户设 * 置)连接数目并且终止服务器标志为 0 时。 */ if (srv->sockets_disabled) { /* our server sockets are disabled, why ? */ if ((srv->cur_fds + srv->want_fds < srv->max_fds * 0.8) && /* we have enough unused fds */ (srv->conns->used < srv->max_conns * 0.9) && (0 == graceful_shutdown)) { for (i = 0; i < srv->srv_sockets.used; i++) { server_socket *srv_socket = srv->srv_sockets.ptr[i]; fdevent_event_add(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN); } log_error_write(srv, __FILE__, __LINE__, "s", "[note] sockets enabled again"); srv->sockets_disabled = 0; } } else { if ((srv->cur_fds + srv->want_fds > srv->max_fds * 0.9) || /* out of fds */ (srv->conns->used > srv->max_conns) || /* out of connections */ (graceful_shutdown)) { /* graceful_shutdown */ /* disable server-fds */ for (i = 0; i < srv->srv_sockets.used; i++) { //逐个删除该工作进程上的所有在 socket 描述符上的事件监听器(通过 fdevent_event_del 函数)。 server_socket *srv_socket = srv->srv_sockets.ptr[i]; fdevent_event_del(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd); if (graceful_shutdown) { //如果是关闭服务器则注销事件监听器结构(主要是释放内存空间,防止内存泄露)并且关闭文件描述符、删除附属文件。 /* we don't want this socket anymore, * * closing it right away will make it possible for * the next lighttpd to take over (graceful restart) * */ fdevent_unregister(srv->ev, srv_socket->fd); close(srv_socket->fd); srv_socket->fd = -1; /* network_close() will cleanup after us */ if (srv->srvconf.pid_file->used && srv->srvconf.changeroot->used == 0) { if (0 != unlink(srv->srvconf.pid_file->ptr)) { if (errno != EACCES && errno != EPERM) { log_error_write(srv, __FILE__, __LINE__, "sbds", "unlink failed for:", srv->srvconf.pid_file, errno, strerror(errno)); } } } } } if (graceful_shutdown) { log_error_write(srv, __FILE__, __LINE__, "s", "[note] graceful shutdown started"); } else if (srv->conns->used > srv->max_conns) { log_error_write(srv, __FILE__, __LINE__, "s", "[note] sockets disabled, connection limit reached"); } else { log_error_write(srv, __FILE__, __LINE__, "s", "[note] sockets disabled, out-of-fds"); } srv->sockets_disabled = 1; } } if (graceful_shutdown && srv->conns->used == 0) { /* we are in graceful shutdown phase and all connections are closed * we are ready to terminate without harming anyone */ srv_shutdown = 1; } /* we still have some fds to share */ if (srv->want_fds) { //如果有待处理的文件描述符,则通过状态机切换函数进行处理,为了合理利用资源,程序会保证至少有 16 个空闲文件描述符 /* check the fdwaitqueue for waiting fds */ int free_fds = srv->max_fds - srv->cur_fds - 16; connection *con; for (; free_fds > 0 && NULL != (con = fdwaitqueue_unshift(srv, srv->fdwaitqueue)); free_fds--) { connection_state_machine(srv, con); srv->want_fds--; } } /* 通过 fdevent_poll -> epoll_wait(以 USE_LINUX_EPOLL 为例)来轮询 I/O 事件的发生, * 其中等待 I/O 事件发生的超时值 timeout_ms=1000milliseconds,即 1 秒。 * 如果在等待的这 1 秒内有 I/O 事件发生,则返回的 n 值记录事件数目,随后用一个 do-while * 循环对每一个发生的 I/O 事件进行处理。 */ if ((n = fdevent_poll(srv->ev, 1000)) > 0) { /* n is the number of events */ int revents; int fd_ndx; #if 0 if (n > 0) { log_error_write(srv, __FILE__, __LINE__, "sd", "polls:", n); } #endif fd_ndx = -1; do { fdevent_handler handler; void *context; handler_t r; fd_ndx = fdevent_event_next_fdndx (srv->ev, fd_ndx); //获得发生了 I/O 事件的文件描述符在 fdarray 中的索引 revents = fdevent_event_get_revent (srv->ev, fd_ndx); //获得该文件描述符上发生的 I/O 事件类型 fd = fdevent_event_get_fd (srv->ev, fd_ndx); //获得该文件描述符 handler = fdevent_get_handler(srv->ev, fd); //获得 I/O 事件处理的回调函数 context = fdevent_get_context(srv->ev, fd); //获得 I/O 事件处理的上下文环境 /* connection_handle_fdevent needs a joblist_append */ #if 0 log_error_write(srv, __FILE__, __LINE__, "sdd", "event for", fd, revents); #endif switch (r = (*handler)(srv, context, revents)) { //调用回调函数进行 I/O 事件处理,并传入相关参数。 case HANDLER_FINISHED: case HANDLER_GO_ON: case HANDLER_WAIT_FOR_EVENT: case HANDLER_WAIT_FOR_FD: break; case HANDLER_ERROR: /* should never happen */ SEGFAULT(); break; default: log_error_write(srv, __FILE__, __LINE__, "d", r); break; } } while (--n > 0); } else if (n < 0 && errno != EINTR) { log_error_write(srv, __FILE__, __LINE__, "ss", "fdevent_poll failed:", strerror(errno)); } ~~~ 参考资料:《Lighttpd源码分析》 高群凯 编著
';

前言

最后更新于:2022-04-01 15:56:56

> 原文出处:[Lighttpd1.4.20源码分析](http://blog.csdn.net/column/details/lighttpd1-4-20.html) 作者:[jiange_zh](http://blog.csdn.net/jiange_zh) **本系列文章经作者授权在看云整理发布,未经作者允许,请勿转载!** # Lighttpd1.4.20源码分析 > 对Lighttpd服务器的源码框架进行分析,了解高性能web服务器的运作流程以及设计技术。
';