ToB企服应用市场:ToB评测及商务社交产业平台

标题: 轻量级web并发服务器——TinyWebServer的学习相识 [打印本页]

作者: 悠扬随风    时间: 2024-9-19 05:31
标题: 轻量级web并发服务器——TinyWebServer的学习相识

前言

本文旨在学习该项目的同时对其代码、原理等内容有更深的明白,学习过程中鉴戒大量网上文章,如明白存在不当之处或有所遗漏短缺,还望各位大佬提点指教
部分图片来自网络

TinyWebServer是什么

WebServer是什么

一个WebServer指的是一个服务器程序大概运行该服务器程序的硬件,其重要功能是通过http协议与客户端(通常是浏览器)进行通信,能够接收、存储、处理惩罚来自客户端的http请求,并对其作出肯定的相应,返回客户端请求的内容或返回一个Error信息
TinyWebServer是什么

TinyWebServer是一个在Linux操纵系统下的轻量级web服务器,能够实现以下几种功能:

相干基础知识

必要对Linux编程、网络编程有肯定相识
书籍推荐:《深入明白计算机系统》、《Unix网络编程》、《Linux高性能服务器编程》
项目中一些相干知识会在对应模块内提到

用户如何与服务器进行通信

用户通常使用web浏览器与服务器进行通信,web浏览器则通过将用户输入的域名剖析得到对应的ip地址,通过TCP协议的三次握手创建与目标web服务器的毗连,之后HTTP协议生成http请求报文发送到目标web服务器上,服务器则使用socket监听来自用户的请求。关于socket创建毗连方面的内容可以通过我之前的文章进行肯定的相识socket实现简朴的文件传输
当服务器处理惩罚一个http请求的时候,还必要继续监听其他用户的请求并为其分配另一逻辑单位用来处理惩罚,即并发(后面会提到线程池并发)。在该项目中,服务器使用epoll这种多路I/O复用技术来实现对监听socket和毗连socket的同时监听
注意:I/O复用可以同时监听多个文件描述符,但其本身是阻塞的,而且当有多个文件描述符同时停当的时候,如不采取额外措施则程序顺序处理惩罚此中停当的每个文件描述符
因此,为了进步服从,项目中使用了线程池来实现多线程并发,为每个停当的文件分配一个逻辑单位(线程)来处理惩罚

代码架构

该项目中的代码架构如下

接下来我将分模块进行学习明白

I/O多路复用

I/O模子

Linux提供了五种I/O处理惩罚模子(详见《Unix网络编程》):

若服务器端采用单线程,当accept一个请求后,在recv或send调用阻塞时,无法accept其他请求,无法处理惩罚并发
若服务器端采用多线程,当accept一个请求后,开启线程进行recv,可以完成并发处理惩罚,但随请求数增加必要增加系统线程,占用大量内存空间,且线程切换会带来很大的开销

服务器端accept一个请求后,加入fds聚集(一般为数组),每次轮询一遍fds聚集recv数据(非阻塞),没有数据立刻返回错误。轮询操纵会浪费大量不必要的CPU资源

服务器端对一组文件描述符进行相干变乱的注册(fd列表),采用单线程通过select/poll/epoll等系统调用获取fd列表,遍历有变乱的fd进行accept/recv/send,使其能支持更多的并发毗连请求

在网络编程中,与socket相干的读写变乱太多,无法在信号对应处理惩罚函数中区分产生该信号的变乱。只得当在I/O变乱单一情况下使用,例如监听端口的socket

   在Linux下的异步I/O是不美满的,aio系列函数是由POSIX界说的异步操纵接口,不是真正的操纵系统级别支持的,而是在用户空间模拟出来的异步,且仅支持基于当地文件的aio异步操纵,网络编程中的socket是不支持的
在windows里实现了一套完整的支持socket的异步编程接口IOCP,是由操纵系统级别实现的异步I/O
  综合以上几种I/O模子的优缺点,现在Linux的高性能服务器
什么是I/O多路复用

I/O多路复用是一种同步I/O模子,实现一个线程可以监视多个文件句柄;一旦某个文件句柄停当,就能够通知应用程序进行相应的读写操纵;没有文件句柄停当时会阻塞应用程序,交出CPU

简朴来说,I/O多路复用就是一种时分复用,在同一个线程中,通过类似切换开关的方式来宏观上同时传输多个I/O流
I/O多路复用的三种实现方式

select

  1. //select函数接口
  2. #include <sys/select.h>
  3. #include <sys/time.h>
  4. #define FD_SETSIZE 1024
  5. #define NFDBITS (8 * sizeof(unsigned long))
  6. #define __FDSET_LONGS (FD_SETSIZE/NFDBITS)
  7. // 数据结构 (bitmap)
  8. typedef struct {
  9.     unsigned long fds_bits[__FDSET_LONGS];
  10. } fd_set;
  11. // API
  12. int select(
  13.     int max_fd,
  14.     fd_set *readset,
  15.     fd_set *writeset,
  16.     fd_set *exceptset,
  17.     struct timeval *timeout
  18. )                              // 返回值就绪描述符的数目
  19. FD_ZERO(int fd, fd_set* fds)   // 清空集合
  20. FD_SET(int fd, fd_set* fds)    // 将给定的描述符加入集合
  21. FD_ISSET(int fd, fd_set* fds)  // 判断指定描述符是否在集合中
  22. FD_CLR(int fd, fd_set* fds)    // 将给定的描述符从文件中删除  
复制代码
select 实现多路复用的方式是,将已毗连的 Socket 都放到一个文件描述符聚集,然后调用 select 函数将文件描述符聚集拷贝到内核里,让内核来检查是否有网络变乱产生。检查的方式很粗暴,就是通过遍历文件描述符聚集的方式,当检查到有变乱产生后,将此 Socket 标志为可读或可写, 接着再把整个文件描述符聚集拷贝回用户态里,然后用户态还必要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理惩罚
所以,对于 select 这种方式,必要进行 2 次「遍历」文件描述符聚集,一次是在内核态里,一次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符聚集,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中
select将监听的文件描述符分为三组,每一组监听差别的必要进行的IO操纵。readfds是必要进行读操纵的文件描述符,writefds是必要进行写操纵的文件描述符,exceptfds是必要进行异常变乱处理惩罚的文件描述符。这三个参数可以用NULL来表示对应的变乱不必要监听。当select返回时,每组文件描述符会被select过滤,只留下可以进行对应IO操纵的文件描述符
select的调用会阻塞到有文件描述符可以进行IO操纵或被信号打断大概超时才会返回(条件触发)
select的缺点

poll

  1. #include <poll.h>
  2. // 数据结构
  3. struct pollfd {
  4.     int fd;                         // 需要监视的文件描述符
  5.     short events;                   // 需要内核监视的事件
  6.     short revents;                  // 实际发生的事件
  7. };
  8. // API
  9. int poll(struct pollfd fds[], nfds_t nfds, int timeout);
复制代码
和select用三组文件描述符差别的是,poll只有一个pollfd数组,数组中的每个元素都表示一个必要监听IO操纵变乱的文件描述符。events参数是我们必要关心的变乱,revents是全部内核监测到的变乱。poll也不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限定,当然还会受到系统文件描述符限定
但是 poll 和 select 并没有太大的本质区别,都是使用「线性布局」存储进程关注的 Socket 聚集,因此都必要遍历文件描述符聚集来找到可读或可写的 Socket,时间复杂度为 O(n),而且也必要在用户态与内核态之间拷贝文件描述符聚集,这种方式随着并发数上来,性能的损耗会呈指数级增长
poll的缺点

epoll

  1. #include <sys/epoll.h>
  2. // 数据结构
  3. // 每一个epoll对象都有一个独立的eventpoll结构体
  4. // 用于存放通过epoll_ctl方法向epoll对象中添加进来的事件
  5. // epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
  6. struct eventpoll {
  7.     /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
  8.     struct rb_root  rbr;
  9.     /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
  10.     struct list_head rdlist;
  11. };
  12. // API
  13. int epoll_create(int size); // 内核中间加一个 ep 对象,把所有需要监听的 socket 都放到 ep 对象中
  14. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 负责把 socket 增加、删除到内核红黑树
  15. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);// epoll_wait 负责检测可读队列,没有可读 socket 则阻塞进程
复制代码
以上两种方式没有解决必要多次在用户态和内核态切换造成大量数据开销和轮询扫描socket导致服从低的题目,而epoll通过两种方面很好地解决了以上题目:

     epoll的缺点

epoll与LT/ET

epoll的三大函数


  1. #include <sys / epoll.h>
  2. int epoll_create(int size)
  3. size:最大监听的fd+1
  4. return:成功返回文件描述符fd;失败返回-1,可根据错误码判断错误类型
复制代码
创建一个epoll的句柄eventpoll,会占用一个fd值,在linux下检察“/proc/进程id/fd/”能够看到该fd,因此在使用完epoll后必要调用close()关闭,否则可能导致fd耗尽
   自从Linux2.6.8版本以后,size值只必要保证大于0,因为内核可以动态的分配大小,不必要size这个提示了
    在linux 2.6.27中加入了epoll_create1(int flag)
flag为0时表示与epoll_create()完全一样;
flag = EPOLL_CLOEXEC,创建的epfd会设置FD_CLOEXEC;
flag = EPOLL_NONBLOCK,创建的epfd会设置为非阻塞
  
  1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
  2. epfd:epoll_create()返回的epoll fd
  3. op:操作值
  4. fd:需要监听的fd
  5. event:需要监听的事件
  6. return:成功返回0;失败返回-1,可根据错误码判断错误类型
复制代码

当socket接收到数据后,中断程序会给eventpoll的停当列表“rdlist”添加socket引用,而不是直接唤醒进程

  1. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
  2. epfd:epoll描述符
  3. events:分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)
  4. maxevents:本次可以返回的最大事件数目,通常与预分配的events数组的大小是相等的
  5. timeout:在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待
  6. return:成功返回需要处理的事件数目,返回0表示已超时;失败则返回-1,可以根据错误码判断错误类型
复制代码
当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程
当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒eventpoll等候队列中的进程,进程通过rdlist知道哪些socket发生了变革而无需轮询socket列表

ET(边沿触发)模式和LT(程度触发)模式


ET模式的要求是必要一直读写,直到返回EAGAIN,否则就会遗漏变乱;LT的并不要求读写到返回EAGAIN为止,但通常会读写到返回EAGAIN,而且LT比ET多了一个开关EPOLLOUT的步骤
ET模式在某些场景下更加高效,但另一方面轻易遗漏变乱,轻易产生bug
对于nginx这种高性能服务器,ET模式是很好的,而其他的通用网络库,许多是使用LT,避免使用的过程中出现bug
epoll的常用框架

  1. for( ; ; )
  2.     {
  3.         nfds = epoll_wait(epfd,events,20,500);
  4.         for(i=0;i<nfds;++i)
  5.         {
  6.             if(events[i].data.fd==listenfd) //有新的连接
  7.             {
  8.                 connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接
  9.                 ev.data.fd=connfd;
  10.                 ev.events=EPOLLIN|EPOLLET;
  11.                 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中
  12.             }
  13.             else if( events[i].events&EPOLLIN ) //接收到数据,读socket
  14.             {
  15.                 n = read(sockfd, line, MAXLINE)) < 0    //读
  16.                 ev.data.ptr = md;     //md为自定义类型,添加数据
  17.                 ev.events=EPOLLOUT|EPOLLET;
  18.                 epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
  19.             }
  20.             else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
  21.             {
  22.                 struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;    //取数据
  23.                 sockfd = md->fd;
  24.                 send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //发送数据
  25.                 ev.data.fd=sockfd;
  26.                 ev.events=EPOLLIN|EPOLLET;
  27.                 epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
  28.             }
  29.             else
  30.             {
  31.                 //其他的处理
  32.             }
  33.         }
  34.     }
复制代码

HTTP——HTTP毗连与请求相应

在readme文档中提到,该类通过主从状态机封装了http毗连类,主状态机在内部调用从状态机,从状态机将处理惩罚状态和数据传给主状态机
有限状态机

有限状态机(Finite_state machine,FSM),又称有限状态主动机,简称状态机,是表示有限个状态以及在这些状态之间的转移和动作等行为的数学模子,其作用重要是描述对象在它的生命周期内所经历的状态序列,以及如何相应来自外界的各种变乱,在计算机科学中,有限状态机被广泛运用于建模、硬件电路系统设计、软件工程、编译器、网络协议等
有限状态机重要有三个特征:
http模块中的主从状态机剖析

http报文的布局如下图所示

以一个具体的报文为例

此中"OST"为请求方法,“/v3/cloudconf”为URL,“HTTP/1.1”为协议版本,之后到空行前的为请求头,空行后的为请求包体
头文件中分别界说了主状态机的三种状态和从状态机的三种状态

主状态机的状态表明当前正在处理惩罚请求报文的哪一部分

从状态机的状态表明对请求报文当前部分的处理惩罚是否出现题目
请求报文的剖析

当webserver的线程池有空闲线程时,某一线程调用process()来完成请求报文的剖析及相应
  1. void http_conn::process()
  2. {
  3.         HTTP_CODE read_ret = process_read();
  4.         if (read_ret == NO_REQUEST)                                                                        //表示请求不完整,需要继续接收请求数据
  5.         {
  6.                 modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);                //注册并监听读事件
  7.                 return;
  8.         }
  9.         bool write_ret = process_write(read_ret);                                        //调用process_write完成报文响应
  10.         if (!write_ret)
  11.         {
  12.                 close_connect();
  13.         }
  14.         modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);                        //注册并监听写事件
  15. }
复制代码
主状态机的状态转换使用process_read()封装,从状态机则用parse_line()封装
process_read()函数中,主状态机初始化从状态机,然后通过while循环实现主从状态机的状态转换以及循环处理惩罚报文内容。从状态机负责剖析指定报文内容,并根据剖析效果更改从状态机的状态;主状态机根据从状态机的返回值判断是否退出循环(终止处理惩罚/竣事处理惩罚),并根据从状态机的驱动更改自身状态
主状态机与从状态机的状态转换及其关系如下图所示

从状态机

在HTTP报文中,每一行的数据由“\r”、“\n”作为竣事字符,空行则是仅仅是字符“\r”、“\n”。因此,可以通过查找“\r”、“\n”将报文拆解成单独的行进行剖析。从状态机负责读取buffer中的数据,将每行数据末端的“\r”、“\n”符号改为“\0”,并更新从状态机在buffer中读取的位置m_checked_idx,以此来驱动主状态机剖析
  1. //从状态机,用于分析出一行内容
  2. //返回值为行的读取状态,有LINE_OK,LINE_BAD,LINE_OPEN
  3. http_conn::LINE_STATUS http_conn::parse_line()
  4. {
  5.         char temp;
  6.         for (; m_checked_idx < m_read_idx; ++m_checked_idx)
  7.                 //m_read_idx指向缓冲区m_read_buf的数据末尾的下一个字节
  8.                 //m_checked_idx指向从状态机目前正在分析的字节
  9.         {
  10.                 temp = m_read_buf[m_checked_idx];                //temp:将要分析的字节
  11.                 if (temp == '\r')                // \r有可能是完整行
  12.                 {
  13.                         if ((m_checked_idx + 1) == m_read_idx)                //该行仍有内容,并未读完
  14.                                 return LINE_OPEN;
  15.                         else if (m_read_buf[m_checked_idx + 1] == '\n')                //出现换行符,说明该行读完
  16.                         {
  17.                                 m_read_buf[m_checked_idx++] = '\0';                // \r、\n都改为结束符\0
  18.                                 m_read_buf[m_checked_idx++] = '\0';
  19.                                 return LINE_OK;
  20.                         }
  21.                         return LINE_BAD;
  22.                 }
  23.                 else if (temp == '\n')
  24.                 {
  25.                         if (m_checked_idx > 1 && m_read_buf[m_checked_idx - 1] == '\r')                //前一个字符是\r,则接收完整
  26.                         {
  27.                                 m_read_buf[m_checked_idx - 1] = '\0';
  28.                                 m_read_buf[m_checked_idx++] = '\0';
  29.                                 return LINE_OK;
  30.                         }
  31.                         return LINE_BAD;
  32.                 }
  33.         }
  34.         return LINE_OPEN;                //未发现换行符,说明读取的行不完整
  35. }
  36. /*
  37. LINE_OK:完整读取一行
  38. LINE_BAD:报文语法有误
  39. LINE_OPEN:读取的行不完整
  40. */
复制代码
主状态机

主状态机初始状态为CHECK_STATE_REQUESTLINE,通过调用从状态机来驱动主状态机。在主状态机剖析前,从状态机已经将每一行末端的“\r”、“\n”符号改为“\0”,以便主状态机直接取出对应字符串进行处理惩罚
   为了避免用户名和密码直接暴露在url中,项目中改用了POST请求,将用户名和密码添加在报文中作为消息体进行了封装
而在POST请求报文中,消息体的末端没有任何字符,不能使用从状态机的状态作为主状态机的while判断条件,因此在process_read()中额外添加了使用主状态机的状态进行判断的条件
剖析完消息体后,报文的完整剖析就完成了,但主状态机的状态还是CHECK_STATE_CONTENT,符合循环条件会再次进入循环,因此增加了“line_status == LINE_OK”并在完成消息体剖析后将该变量更改为LNE_OPEN,此时可以跳出循环完成报文剖析任务
  1. //通过while循环,封装主状态机,对每一行进行循环处理
  2. //此时,从状态机已经修改完毕,主状态机可以取出完整的行进行解析
  3. http_conn::HTTP_CODE http_conn::process_read()
  4. {
  5.         LINE_STATUS line_status = LINE_OK;                                                //初始化从状态机的状态
  6.         HTTP_CODE ret = NO_REQUEST;
  7.         char* text = 0;
  8.         //判断条件,从状态机驱动主状态机
  9.         while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK))
  10.         {
  11.                 text = get_line();
  12.                 m_start_line = m_checked_idx;                //m_start_line:每一个数据行在m_read_buf中的起始位置
  13.                                                                                         //m_checked_idx:从状态机在m_read_buf中的读取位置
  14.                 LOG_INFO("%s", text);
  15.                 switch (m_check_state)                                                //三种状态转换逻辑
  16.                 {
  17.                 case CHECK_STATE_REQUESTLINE:                                //正在分析请求行
  18.                 {
  19.                         ret = parse_request_line(text);                        //解析请求行
  20.                         if (ret == BAD_REQUEST)
  21.                                 return BAD_REQUEST;
  22.                         break;
  23.                 }
  24.                 case CHECK_STATE_HEADER:                                        //正在分析头部字段
  25.                 {
  26.                         ret = parse_headers(text);                                //解析请求头
  27.                         if (ret == BAD_REQUEST)
  28.                                 return BAD_REQUEST;
  29.                         else if (ret == GET_REQUEST)                        //get请求,需要跳转到报文响应函数
  30.                         {
  31.                                 return do_request();                                //响应客户请求
  32.                         }
  33.                         break;
  34.                 }
  35.                 case CHECK_STATE_CONTENT:                                        //解析消息体
  36.                 {
  37.                         ret = parse_content(text);
  38.                         if (ret == GET_REQUEST)                                        //post请求,跳转到报文响应函数
  39.                                 return do_request();
  40.                         line_status = LINE_OPEN;                                //更新,跳出循环,代表解析完了消息体
  41.                         break;
  42.                 }
  43.                 default:
  44.                         return INTERNAL_ERROR;
  45.                 }
  46.         }
  47.         return NO_REQUEST;
  48. }
复制代码
剖析请求行


  1. //解析http请求行,获得请求方法,目标url及http版本号
  2. http_conn::HTTP_CODE http_conn::parse_request_line(char* text)
  3. {
  4.         m_url = strpbrk(text, " \t");                                                        //请求该行中最先含有空格和\t任一字符的位置并返回       
  5.         if (!m_url)                                                                                                //没有目标字符,则代表报文格式有问题
  6.         {
  7.                 return BAD_REQUEST;
  8.         }
  9.         *m_url++ = '\0';                                                                                //将前面的数据取出,后移找到请求资源的第一个字符
  10.         char* method = text;
  11.         if (strcasecmp(method, "GET") == 0)                                                //确定请求方式
  12.                 m_method = GET;
  13.         else if (strcasecmp(method, "POST") == 0)
  14.         {
  15.                 m_method = POST;
  16.                 cgi = 1;                                                                                       
  17.         }
  18.         else
  19.                 return BAD_REQUEST;
  20.         m_url += strspn(m_url, " \t");                                                //得到url地址
  21.         m_version = strpbrk(m_url, " \t");
  22.         if (!m_version)
  23.                 return BAD_REQUEST;
  24.         *m_version++ = '\0';
  25.         m_version += strspn(m_version, " \t");                                //得到http版本号
  26.         if (strcasecmp(m_version, "HTTP/1.1") != 0)
  27.                 return BAD_REQUEST;                                                                //只接受HTTP/1.1版本
  28.         if (strncasecmp(m_url, "http://", 7) == 0)                       
  29.         {
  30.                 m_url += 7;
  31.                 m_url = strchr(m_url, '/');
  32.         }
  33.         if (strncasecmp(m_url, "https://", 8) == 0)
  34.         {
  35.                 m_url += 8;
  36.                 m_url = strchr(m_url, '/');
  37.         }
  38.         if (!m_url || m_url[0] != '/')                                //不符合规则的报文
  39.                 return BAD_REQUEST;
  40.         //当url为/时,显示判断界面
  41.         if (strlen(m_url) == 1)                                                //url为/,显示欢迎界面
  42.                 strcat(m_url, "judge.html");
  43.         m_check_state = CHECK_STATE_HEADER;                        //主状态机状态转移
  44.         return NO_REQUEST;
  45. }
复制代码
剖析请求头


  1. //解析http请求的一个头部信息
  2. http_connect::HTTP_CODE http_connect::parse_headers(char* text)
  3. {
  4.         if (text[0] == '\0')                                                                                //判断是空头还是请求头
  5.         {
  6.                 if (m_content_length != 0)                                                                //具体判断是get请求还是post请求
  7.                 {
  8.                         m_check_state = CHECK_STATE_CONTENT;                                //post请求需要改变主状态机的状态
  9.                         return NO_REQUEST;
  10.                 }
  11.                 return GET_REQUEST;
  12.         }
  13.         else if (strncasecmp(text, "Connection:", 11) == 0)                        //解析头部连接字段
  14.         {
  15.                 text += 11;
  16.                 text += strspn(text, " \t");
  17.                 if (strcasecmp(text, "keep-alive") == 0)                                //判断是否为长连接
  18.                 {
  19.                         m_linger = true;                                                                        //为长连接,设置延迟关闭连接
  20.                 }
  21.         }
  22.         else if (strncasecmp(text, "Content-length:", 15) == 0)                //解析请求头的内容长度字段
  23.         {
  24.                 text += 15;
  25.                 text += strspn(text, " \t");
  26.                 m_content_length = atol(text);                                                        //atol(const char*str):将str所指的字符串转换为一个long int的长整数
  27.         }
  28.         else if (strncasecmp(text, "Host:", 5) == 0)                                //解析请求头部host字段
  29.         {
  30.                 text += 5;
  31.                 text += strspn(text, " \t");
  32.                 m_host = text;
  33.         }
  34.         else
  35.         {
  36.                 LOG_INFO("oop!unknow header: %s", text);
  37.         }
  38.         return NO_REQUEST;
  39. }
复制代码
剖析消息体


  1. //判断http请求是否被完整读入
  2. http_connect::HTTP_CODE http_connect::parse_content(char* text)
  3. {
  4.         if (m_read_idx >= (m_content_length + m_checked_idx))
  5.         {
  6.                 text[m_content_length] = '\0';
  7.                 //POST请求中最后为输入的用户名和密码
  8.                 m_string = text;
  9.                 return GET_REQUEST;
  10.         }
  11.         return NO_REQUEST;
  12. }
复制代码
请求报文的相应

在完成请求报文的剖析后,明确用户想要登录/注册,必要跳转到相应的界面、添加用户名、验证用户等等,并将相应的数据写入相应报文返回给浏览器,具体流程图如下(图片取自微信公众号两猿社):

在头文件中根据HTTP请求的处理惩罚效果初始化了几种情况

do_request函数

在process_read()中完成请求报文的剖析后,状态机调用do_request()函数,该函数负责处理惩罚功能逻辑,具体做法为:将网站根目次与url文件拼接,然后通过stat判断文件属性。浏览器网址栏中的字符,即url,可以将其抽象成ip:port/xxx,xxx通过html文件的action属性进行设置。别的为了进步访问速度,通过mmap进行映射,将平常文件映射到内存逻辑地址。
  1. //对客户请求进行响应
  2. http_connect::HTTP_CODE http_connect::do_request()
  3. {
  4.         strcpy(m_real_file, doc_root);                                        //将初始化的m_real_file赋值为网站根目录
  5.         int len = strlen(doc_root);
  6.         //printf("m_url:%s\n", m_url);
  7.         const char* p = strrchr(m_url, '/');                        //找到m_url中“/”的位置
  8.         //处理cgi
  9.         if (cgi == 1 && (*(p + 1) == '2' || *(p + 1) == '3'))
  10.         {
  11.                 //根据标志判断是登录检测还是注册检测
  12.                 char flag = m_url[1];
  13.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  14.                 strcpy(m_url_real, "/");
  15.                 strcat(m_url_real, m_url + 2);
  16.                 strncpy(m_real_file + len, m_url_real, FILENAME_LEN - len - 1);
  17.                 free(m_url_real);
  18.                 //将用户名和密码提取出来
  19.                 //user=123        password=123
  20.                 char name[100], password[100];
  21.                 int i;
  22.                 for (i = 5; m_string[i] != '&'; ++i)
  23.                         name[i - 5] = m_string[i];
  24.                 name[i - 5] = '\0';
  25.                 int j = 0;
  26.                 for (i = i + 10; m_string[i] != '\0'; ++i, ++j)
  27.                         password[j] = m_string[i];
  28.                 password[j] = '\0';
  29.                 if (*(p + 1) == '3')
  30.                 {
  31.                         //如果是注册,先检测数据库中是否有重名的
  32.                         //没有重名的,进行增加数据
  33.                         char* sql_insert = (char*)malloc(sizeof(char) * 200);
  34.                         strcpy(sql_insert, "INSERT INTO user(username, passwd) VALUES(");
  35.                         strcat(sql_insert, "'");
  36.                         strcat(sql_insert, name);
  37.                         strcat(sql_insert, "', '");
  38.                         strcat(sql_insert, password);
  39.                         strcat(sql_insert, "')");
  40.                         if (users.find(name) == users.end())                        //说明库中没有重名
  41.                         {
  42.                                 m_lock.lock();
  43.                                 int res = mysql_query(mysql, sql_insert);
  44.                                 users.insert(pair<string, string>(name, password));
  45.                                 m_lock.unlock();
  46.                                 if (!res)
  47.                                         strcpy(m_url, "/log.html");
  48.                                 else
  49.                                         strcpy(m_url, "/registerError.html");
  50.                         }
  51.                         else
  52.                                 strcpy(m_url, "/registerError.html");
  53.                 }
  54.                 //如果是登录,直接判断
  55.                 //若浏览器端输入的用户名和密码在表中可以查找到,返回1,否则返回0
  56.                 else if (*(p + 1) == '2')
  57.                 {
  58.                         if (users.find(name) != users.end() && users[name] == password)
  59.                                 strcpy(m_url, "/welcome.html");
  60.                         else
  61.                                 strcpy(m_url, "/logError.html");
  62.                 }
  63.         }
  64.         if (*(p + 1) == '0')                                //如果请求资源为/0,表示跳转注册界面
  65.         {
  66.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  67.                 strcpy(m_url_real, "/register.html");
  68.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));        //将网站目录和/register.html进行拼接,更新到m_real_file中
  69.                 free(m_url_real);
  70.         }
  71.         else if (*(p + 1) == '1')                        //如果请求资源为/1,表示跳转登录页面
  72.         {
  73.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  74.                 strcpy(m_url_real, "/log.html");
  75.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));        //将网站目录和/log.html进行拼接,更新到m_real_file中
  76.                 free(m_url_real);
  77.         }
  78.         else if (*(p + 1) == '5')
  79.         {
  80.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  81.                 strcpy(m_url_real, "/picture.html");
  82.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
  83.                 free(m_url_real);
  84.         }
  85.         else if (*(p + 1) == '6')
  86.         {
  87.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  88.                 strcpy(m_url_real, "/video.html");
  89.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
  90.                 free(m_url_real);
  91.         }
  92.         else if (*(p + 1) == '7')
  93.         {
  94.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  95.                 strcpy(m_url_real, "/fans.html");
  96.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
  97.                 free(m_url_real);
  98.         }
  99.         else
  100.                 strncpy(m_real_file + len, m_url, FILENAME_LEN - len - 1);
  101.         if (stat(m_real_file, &m_file_stat) < 0)                //通过stat获取请求资源文件信息,成功则将信息更新到m_file_stat结构体;失败返回NO_RESOURCE状态,表示资源不存在
  102.                 return NO_RESOURCE;
  103.         if (!(m_file_stat.st_mode & S_IROTH))                        //判断文件类型,客户端是否有访问权限
  104.                 return FORBIDDEN_REQUEST;
  105.         if (S_ISDIR(m_file_stat.st_mode))                                //判断该路径是否为目录
  106.                 return BAD_REQUEST;
  107.        
  108.         //os.open(file,flags[,mode]):打开一个文件
  109.         int fd = open(m_real_file, O_RDONLY);                        //以只读方式获取文件描述符,通过mmap将该文件映射到内存中               
  110.         m_file_address = (char*)mmap(0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
  111.         close(fd);                                                                                //避免文件描述符的浪费和占用
  112.         return FILE_REQUEST;                                                        //表示请求文件存在且可以访问
  113. }
  114. /*
  115. open(file,flags[,mode])中flags的参数
  116. O_RDONLY: 以只读的方式打开
  117. O_WRONLY: 以只写的方式打开
  118. O_RDWR : 以读写的方式打开
  119. O_NONBLOCK: 打开时不阻塞
  120. O_APPEND: 以追加的方式打开
  121. O_CREAT: 创建并打开一个新文件
  122. O_TRUNC: 打开一个文件并截断它的长度为零(必须有写权限)
  123. O_EXCL: 如果指定的文件存在,返回错误
  124. O_SHLOCK: 自动获取共享锁
  125. O_EXLOCK: 自动获取独立锁
  126. O_DIRECT: 消除或减少缓存效果
  127. O_FSYNC : 同步写入
  128. O_NOFOLLOW: 不追踪软链接
  129. */
复制代码

  1. #include <sys/types.h>
  2. #include <sys/stat.h>
  3. #include <unistd.h>
  4. //获取文件属性,存储在statbuf中
  5. int stat(const char *pathname, struct stat *statbuf);
  6. struct stat
  7. {
  8.         mode_t    st_mode;        /* 文件类型和权限 */
  9.         off_t     st_size;        /* 文件大小,字节数*/
  10. };
复制代码

  1. void* mmap(void* start,size_t length,int prot,int flags,int fd,off_t offset);
  2. int munmap(void* start,size_t length);
  3. /*
  4. start:映射区的开始地址,设置为0时表示由系统决定映射区的起始地址
  5. length:映射区的长度,从被映射文件开头offset个字节算起
  6. prot:期望的内存保护标志,不能与文件的打开模式冲突,可取以下几个值的或:PROT_READ(可读), PROT_WRITE(可写), PROT_EXEC(可执行), PROT_NONE(不可访问)
  7.         PROT_READ表示页内容可以被读取
  8. flags:指定映射对象的类型,映射选项和映射页是否可以共享,可以是以下几个常用值的或:MAP_FIXED(使用指定的映射起始地址), MAP_SHARED(与其它所有映射这个对象的进程共享映射空间), MAP_PRIVATE(建立一个写入时拷贝的私有映射)
  9.         MAP_PEIVATE建立一个写入时拷贝的私有映射,内存区域的写入不会影响到原文件
  10. fd:有效地文件描述符,一般是由open()函数返回
  11. offset:被映射对象内容的起点
  12. */
复制代码

  1. struct iovec {
  2.     void      *iov_base;      /* starting address of buffer */
  3.     size_t    iov_len;        /* size of buffer */
  4. };
  5. /*
  6. iov_base指向数据的地址
  7. iov_len表示数据的长度
  8. */
复制代码

  1. #include <sys/uio.h>
  2. ssize_t writev(int filedes, const struct iovec *iov, int iovcnt);
  3. /*
  4. filedes表示文件描述符
  5. iov为io向量机制结构体iovec
  6. iovcnt为结构体的个数
  7. */
复制代码
  特别注意: 循环调用writev时,必要重新处理惩罚iovec中的指针和长度,该函数不会对这两个成员做任何处理惩罚。writev的返回值为已写的字节数,但这个返回值“实用性”并不高,因为参数传入的是iovec数组,计量单位是iovcnt,而不是字节数,我们仍旧必要通过遍历iovec来计算新的基址,别的写入数据的“竣事点”可能位于一个iovec的中间某个位置,因此必要调整临界iovec的io_base和io_len
  process_write函数

根据do_request的返回状态,服务器子线程调用process_write向m_write_buf中写入相应报文,在生成相应报文的过程中重要调用add_reponse()函数更新m_write_idx和m_write_buf
以下几个函数为内部调用add_response函数更新m_write_idx指针和缓冲区m_write_buf中的内容
  1. bool http_connect::add_response(const char* format, ...)
  2. {
  3.         if (m_write_idx >= WRITE_BUFFER_SIZE)                                        //如果写入内容超出m_write_buf大小则报错
  4.                 return false;
  5.         va_list arg_list;                                                                                //定义可变参数列表
  6.         va_start(arg_list, format);                                                                //将变量arg_list初始化为传入参数
  7.         int len = vsnprintf(m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list);        //将数据format从可变参数列表写入缓冲区写,返回写入数据的长度
  8.         if (len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx))                //如果写入的数据长度超过缓冲区剩余空间,则报错
  9.         {
  10.                 va_end(arg_list);
  11.                 return false;
  12.         }
  13.         m_write_idx += len;                                                                                //更新m_write_idx位置
  14.         va_end(arg_list);                                                                                //清空可变参列表
  15.         LOG_INFO("request:%s", m_write_buf);
  16.         return true;
  17. }
复制代码

  1. //添加状态行
  2. bool http_connect::add_status_line(int status, const char* title)
  3. {
  4.         return add_response("%s %d %s\r\n", "HTTP/1.1", status, title);
  5. }
复制代码

  1. bool http_connect::add_headers(int content_len)                                                        //添加消息报头,具体的添加文本长度、连接状态和空行
  2. {
  3.         return add_content_length(content_len) && add_linger() &&
  4.                 add_blank_line();
  5. }
  6. bool http_connect::add_content_length(int content_len)                                        //添加Content-Length,表示响应报文的长度
  7. {
  8.         return add_response("Content-Length:%d\r\n", content_len);
  9. }
  10. bool http_connect::add_linger()                                                                                        //添加连接状态,通知浏览器端是保持连接还是关闭
  11. {
  12.         return add_response("Connection:%s\r\n", (m_linger == true) ? "keep-alive" : "close");
  13. }
  14. bool http_connect::add_content_type()                                                                        //添加文本类型,这里是html
  15. {
  16.         return add_response("Content-Type:%s\r\n", "text/html");
  17. }
复制代码

  1. bool http_connect::add_blank_line()                                                                                //添加空行
  2. {
  3.         return add_response("%s", "\r\n");
  4. }
复制代码

  1. bool http_connect::add_content(const char* content)
  2. {
  3.         return add_response("%s", content);
  4. }
复制代码
相应报文分为两种,一种是请求文件的存在,通过io向量机制iovec,声明两个iovec,第一个指向m_write_buf,第二个指向mmap的地址m_file_address ;另一种是请求堕落,这时候只申请一个iovec,指向m_write_buf

   往相应报文里写的是服务器中html的文件数据,浏览器端对其进行剖析、渲染并表现在浏览器页面上
  1. bool http_connect::process_write(HTTP_CODE ret)
  2. {
  3.         switch (ret)
  4.         {
  5.         case INTERNAL_ERROR:                                                                                                                //内部错误,500
  6.         {
  7.                 add_status_line(500, error_500_title);                                                                        //状态行
  8.                 add_headers(strlen(error_500_form));                                                                        //消息报头
  9.                 if (!add_content(error_500_form))
  10.                         return false;
  11.                 break;
  12.         }
  13.         case BAD_REQUEST:                                                                                                                        //报文语法有误,404
  14.         {
  15.                 add_status_line(404, error_404_title);
  16.                 add_headers(strlen(error_404_form));
  17.                 if (!add_content(error_404_form))
  18.                         return false;
  19.                 break;
  20.         }
  21.         case FORBIDDEN_REQUEST:                                                                                                                //资源没有访问权限,403
  22.         {
  23.                 add_status_line(403, error_403_title);
  24.                 add_headers(strlen(error_403_form));
  25.                 if (!add_content(error_403_form))
  26.                         return false;
  27.                 break;
  28.         }
  29.         case FILE_REQUEST:                                                                                                                        //文件存在,200
  30.         {
  31.                 add_status_line(200, ok_200_title);
  32.                 if (m_file_stat.st_size != 0)                                                                                        //请求的资源存在
  33.                 {
  34.                         add_headers(m_file_stat.st_size);
  35.                         m_iv[0].iov_base = m_write_buf;                                                                                //第一个iovec指针指向响应报文缓冲区,长度指向m_write_idx
  36.                         m_iv[0].iov_len = m_write_idx;
  37.                         m_iv[1].iov_base = m_file_address;                                                                        //第二个iovec指针指向mmap返回的文件指针,长度指向文件大小
  38.                         m_iv[1].iov_len = m_file_stat.st_size;
  39.                         m_iv_count = 2;
  40.                         bytes_to_send = m_write_idx + m_file_stat.st_size;                                        //发送的全部数据为响应报文头部信息和文件大小
  41.                         return true;
  42.                 }
  43.                 else
  44.                 {
  45.                         const char* ok_string = "<html><body></body></html>";                                //请求的资源大小为0,则返回空白html文件
  46.                         add_headers(strlen(ok_string));
  47.                         if (!add_content(ok_string))
  48.                                 return false;
  49.                 }
  50.         }
  51.         default:
  52.                 return false;
  53.         }
  54.         m_iv[0].iov_base = m_write_buf;                                                                                                //除FILE_REQUEST状态外,其余状态只申请一个iovec,指向响应报文缓冲区
  55.         m_iv[0].iov_len = m_write_idx;
  56.         m_iv_count = 1;
  57.         bytes_to_send = m_write_idx;
  58.         return true;
  59. }
复制代码
write函数

服务器子线程调用process_write完成相应报文,随后注册epollout变乱。服务器主线程检测写变乱,并调用http_conn::write函数将相应报文发送给浏览器端
该函数具体逻辑如下:
生成相应报文时初始化byte_to_send(包罗头部信息和文件数据),通过writev函数循环发送相应报文数据,根据返回值更新byte_have_send和iovec布局体的指针和长度,并判断相应报文团体是否发送成功

  1. bool http_connect::write()
  2. {
  3.         int temp = 0;
  4.         if (bytes_to_send == 0)                                                                                                                //要发送的数据长度为0,表示响应报文为空,一般不会出现该情况
  5.         {
  6.                 modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
  7.                 init();
  8.                 return true;
  9.         }
  10.         while (1)
  11.         {
  12.                 temp = writev(m_sockfd, m_iv, m_iv_count);                                                                //将响应报文的状态行、消息头、空行和响应正文发送给浏览器端
  13.                 if (temp < 0)       
  14.                 {
  15.                         if (errno == EAGAIN)                                                                                                //判断缓冲区是否已满
  16.                         {
  17.                                 modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);                                //重新注册写事件,等待下一次触发
  18.                                 return true;
  19.                         }
  20.                         unmap();                                                                                                                        //发送失败,但不是缓冲区问题,取消映射
  21.                         return false;
  22.                 }
  23.                 bytes_have_send += temp;
  24.                 bytes_to_send -= temp;                                                                                                        //更新已发送字节数
  25.                 if (bytes_have_send >= m_iv[0].iov_len)                                                                        //第一个iovec头部信息的数据已发送完,发送第二个iovec数据
  26.                 {
  27.                         m_iv[0].iov_len = 0;                                                                                                //不再继续发送头部信息
  28.                         m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx);
  29.                         m_iv[1].iov_len = bytes_to_send;
  30.                 }
  31.                 else                                                                                                                                        //继续发送第一个iovec头部信息的数据
  32.                 {
  33.                         m_iv[0].iov_base = m_write_buf + bytes_have_send;
  34.                         m_iv[0].iov_len = m_iv[0].iov_len - bytes_have_send;
  35.                 }
  36.                 if (bytes_to_send <= 0)                                                                                                        //判断条件,数据已全部发送完
  37.                 {
  38.                         unmap();
  39.                         modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);                                        //在epoll树上重置EPOLLONESHOT事件
  40.                         if (m_linger)                                                                                                                //浏览器的请求为长连接
  41.                         {
  42.                                 init();                                                                                                                        //重新初始化HTTP对象
  43.                                 return true;
  44.                         }
  45.                         else
  46.                         {
  47.                                 return false;
  48.                         }
  49.                 }
  50.         }
  51. }
复制代码

ThreadPool——线程池

池式布局

在计算机体系布局中有许多池式布局,例如对象池、数据库毗连池、线程池、内存池等等。使用池式布局的重要原因是必要使用池式布局的对象一般存在创建时间长、资源占用高等特点,在对象过多的情况下或导致运行服从低下,因此使用池式布局复用有限的资源,进步运行服从
线程池

线程池其实就是将多个线程对象放到一个容器中,在该项目中使用了一个数组作为容器。在创建线程时一次性创建多个线程存放进线程池中等候执行任务;必要执行任务时从线程池中取出空闲线程执行,执行完后将线程放回线程池中等候下次执行任务;通过复用线程池中的线程来避免多次创建销毁线程导致的运行服从低下题目。
线程池的使用有效降低了多线程操纵中任务申请和开释产生的性能消耗,在进步线程使用率、进步线程相应速度、同一管理线程对象以及控制最大并发数等题目上起到了很好的效果
线程池重要实用的场景

由于线程池本就是用于减少线程频繁创建销毁导致的运行服从低下题目,仅当线程本身开销与线程执行任务的开销相比不可忽略的情况下作用明显;而在例如FTP服务器或Telnet服务器上,传输文件时间较长、开销较大、线程本身开销可以忽略不计的情况下,线程池能起到的作用就不敷明显,可能不是一种足够理想的方法
线程池通常实用以下几种场所:
1)单位时间内处理惩罚任务频繁且任务处理惩罚时间短
2)对及时性要求较高
线程池的本质及设计要点

本质上,线程池可以简化为一个“生产者-消耗者”模子。线程池执行任务是在“消耗”资源,往线程池中增加任务是“生产”资源。
线程池类的设计重要必要思量以下几点:

proactor模子和reactor模子

Reactor模子

服务器采用线程池的方式进行资源复用,解决了频繁创建销毁线程带来的性能开销和资源浪费题目,同时也引入了一个新题目:线程如何高效处理惩罚多个毗连的业务?
一个毗连对应一个线程时通常采用“read -> 业务处理惩罚 -> send”的处理惩罚流程,socket默认为阻塞I/O,则当现在毗连无数据可读时线程阻塞在“read”操纵上,该阻塞方式不影响其他线程。但采用了线程池的话,一个线程如果在处理惩罚某个毗连的“read”操纵时阻塞则无法继续处理惩罚其他毗连的业务
为相识决该题目,最简朴的方式是将socket改为非阻塞,线程不停轮询调用“read”操纵来判断有无数据。但该方法线程不知道当前毗连是否有数据可读,因此必要每次通过“read”操纵判断,必要消耗CPU资源且服从低下。为相识决这个题目,可以使用I/O多路复用技术实现只有在毗连上有数据的时候线程才去发出读请求,将I/O多路复用技术封装之后就称为Reactor模子。在Reactor模子中重要有三类处理惩罚变乱:acceptor——负责毗连变乱、handler——负责变乱读写、reactor——负责变乱监听和变乱分发
Reactor模子重要由Reactor和处理惩罚资源池两个核心部分组成,此中Reactor负责监听和分发变乱(包罗毗连变乱、读写变乱),处理惩罚资源池负责处理惩罚变乱。Reactor的数量可以是一个或多个,处理惩罚资源池可以是单个进程/线程,也可以是多个进程/线程,因此Reactor重要分为三种方案:

   单Reactor单进程模子无法充分使用多核CPU的性能,且handler进行业务处理惩罚时进程无法处理惩罚其他毗连,如果业务处理惩罚耗时较长则造成相应耽误,不实用计算秘麋集型的场景,只实用于业务处理惩罚非常快速的场景
  单Reactor单进程模子的经典应用场景:Redis、Netty

   单Reactor多线程模子能够充分使用多核CPU的功能,但是因为只有一个Reactor对象承担全部变乱的监听和相应,且只在主线程上运行,在面对刹时高并发的场景时reactor轻易成为性能瓶颈
  单Reactor多线程模子的经典应用场景:Netty

   在多Reactor多线程模子中,主线程和子线程分工明确,主线程只负责接收新毗连,子线程完成后续业务处理惩罚,子线程无需返回数据,直接将处理惩罚效果发送给客户端
  多Reactor多线程模子的经典应用场景:Netty、kafka、Nginx
Proactor模子

Reactor模子是非阻塞同步网络模子,而Proactor模子是异步网络模子
阻塞I/O必要等候“内核数据准备好”和“数据从内核态拷贝到用户态”两个过程;非阻塞I/O必要等候“数据从内核态拷贝到用户态”一个过程;而异步I/O两个过程都不必要等候
Proactor模子将I/O变乱的监听、I/O操纵的执行、I/O效果的返回统统交给内核来执行,在数据准备阶段和数据拷贝阶段全程无阻塞。在发起异步读写请求时,必要传入数据缓冲区的地址等信息,系统内核主动完成读写工作,读写工作不必要应用进程主动发起read/write来读写数据,而是由操纵系统来完成。操纵系统完成读写工作后就会通知应用进程直接处理惩罚数据
   在Linux下的异步I/O是不美满的,aio系列函数是由POSIX界说的异步操纵接口,不是真正的操纵系统级别支持的,而是在用户空间模拟出来的异步,且仅支持基于当地文件的aio异步操纵,网络编程中的socket是不支持的
在windows里实现了一套完整的支持socket的异步编程接口IOCP,是由操纵系统级别实现的异步I/O
  Reactor模子和Proactor模子的区别


线程池的界说

  1. class threadpool
  2. {
  3. public:
  4.     /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
  5.     threadpool(int actor_model, connection_pool* connPool, int thread_number = 8, int max_request = 10000);
  6.     ~threadpool();
  7.     bool append(T* request, int state);
  8.     bool append_p(T* request);
  9. private:
  10.     /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
  11.     static void* worker(void* arg);
  12.     void run();
  13. private:
  14.     int m_thread_number;         //线程池中的线程数
  15.     int m_max_requests;         //请求队列中允许的最大请求数
  16.     pthread_t* m_threads;       //描述线程池的数组,其大小为m_thread_number
  17.     std::list<T*> m_workqueue; //请求队列
  18.     locker m_queuelocker;       //保护请求队列的互斥锁
  19.     sem m_queuestat;            //是否有任务需要处理
  20.     connection_pool* m_connPool;  //数据库
  21.     int m_actor_model;          //模型切换
  22. };
复制代码
如果把函数看成是一个对象,则在线程池类的接口中传递差别的函数也就是传递差别的对象。如上所述,一个线程如果必要执行差别功能的函数,则必要在函数内调用其他函数来完成功能,在此处的对外接口append()引用了C++的模板编程(template),实现将差别类型的函数添加进工作队列中
  1. template <typename T>                                                                                //模板编程
  2. bool threadpool<T>::append(T* request, int state)                  
  3. {
  4.     m_queuelocker.lock();
  5.     if (m_workqueue.size() >= m_max_requests)
  6.     {
  7.         m_queuelocker.unlock();
  8.         return false;
  9.     }
  10.     request->m_state = state;
  11.     m_workqueue.push_back(request);
  12.     m_queuelocker.unlock();
  13.     m_queuestat.post();
  14.     return true;
  15. }
复制代码
线程池的创建

  1. template <typename T>
  2. //线程池构造函数
  3. threadpool<T>::threadpool(int actor_model, connection_pool* connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool)
  4. {
  5.     if (thread_number <= 0 || max_requests <= 0)            //线程池异常
  6.         throw std::exception();
  7.     m_threads = new pthread_t[m_thread_number];
  8.     if (!m_threads)                                         
  9.         throw std::exception();
  10.     for (int i = 0; i < thread_number; ++i)                 //循环创建线程,并将工作线程按要求进行运行
  11.     {
  12.         if (pthread_create(m_threads + i, NULL, worker, this) != 0)         
  13.         {
  14.             delete[] m_threads;
  15.             throw std::exception();
  16.         }
  17.         if (pthread_detach(m_threads[i]))               //将线程设定为分离状态,不用单独对线程进行回收,避免内存泄露
  18.         {
  19.             delete[] m_threads;
  20.             throw std::exception();
  21.         }
  22.     }
  23. }
复制代码
此中必要注意的函数有
  1. pthread_create(pthread_t * thread, const pthread_attr_t * attr, void* (*start_routine)(void*), void *arg)
  2. *pthread:传递一个线程的指针变量或该类型变量的地址
  3. *attr:用于手动设置新建线程的属性,一般设置为NULL由系统默认的属性值创建线程
  4. *(start_routine)(void*):以函数指针的方式指明新建线程需要执行的函数
  5. *arg:指定传递给start_routine函数的实参
  6. 返回值:成功创建则返回0;失败则返回非零值,对应不同的宏表明创建失败的原因
  7. pthread_detach(pthread_t *thread)
  8. 将已经运行中的线程设定为分离状态,成功返回0,失败返回错误号指明失败原因
复制代码
linux线程有两种状态:joinable状态和unjoinable状态。如果线程是joinable状态,当线程函数本身退出不会开释线程所占用的堆栈和线程描述符等资源,只有调用了PTHREAD_JOIN()后由主线程阻塞等候子线程竣事,然后回收子线程资源;如果线程是unjoinable状态(分离状态),则在子线程竣事时会主动回收资源
一般情况下,使用PTHREAD_CREATE()创建线程执行任务后,线程处于joinable状态。如果没有主线程调用PTHREAD_JOIN()往返收线程资源的话,该线程会继续占用系统资源成为“僵尸线程”,为了避免该种情况,项目中使用PTHREAD_DETACH()将线程分离避免后续忘记回收线程资源
PTHREAD_DETACH()用于将运行中的线程设定为分离状态,线程主动与主控线程断开关系,线程竣事后,其退出状态不由其他线程获取,而是直接本身主动开释,常应用于网络、多线程服务器
   c++11引入了std::thread来创建线程,支持对线程join大概detach
  1. std::thread t(func);
  2.     if (t.joinable()) {
  3.         t.detach();
  4.     }
复制代码
待办工作加入请求队列

当epoll检测到端口有变乱激活时,即将该变乱放入请求队列中,等候工作线程处理惩罚
  1. bool threadpool<T>::append(T* request, int state)                  
  2. {
  3.     m_queuelocker.lock();
  4.     if (m_workqueue.size() >= m_max_requests)
  5.     {
  6.         m_queuelocker.unlock();
  7.         return false;
  8.     }
  9.     request->m_state = state;
  10.     m_workqueue.push_back(request);
  11.     m_queuelocker.unlock();
  12.     m_queuestat.post();
  13.     return true;
  14. }
  15. bool threadpool<T>::append_p(T* request)                        //proactor模式下的任务请求入队
  16. {
  17.     m_queuelocker.lock();
  18.     if (m_workqueue.size() >= m_max_requests)
  19.     {
  20.         m_queuelocker.unlock();
  21.         return false;
  22.     }
  23.     m_workqueue.push_back(request);
  24.     m_queuelocker.unlock();
  25.     m_queuestat.post();
  26.     return true;
  27. }
复制代码
本项目实现的是一个基于半同步/半反应堆式的并发布局(使用同步I/O模拟Proactor变乱处理惩罚模式),由I/O线程完成读写,I/O线程向工作线程分发的是已接受的数据
以Proactor模式为例的工作流程如下:


线程处理惩罚

创建线程池时,调用pthread_create时指向了worker()静态成员函数,worker()内部调用run()
  1. //线程回调函数/工作函数,arg其实是this
  2. template <typename T>
  3. void* threadpool<T>::worker(void* arg)
  4. {
  5.     threadpool* pool = (threadpool*)arg;        //将参数强行转化为线程池类,获取threadpool对象地址
  6.     pool->run();                                                                //线程池中每个线程创建都会调用run()睡眠在队列中
  7.     return pool;
  8. }
复制代码
run()函数可以看做一个回环变乱,一直等候m_queuestat.post(),即新任务进入请求队列,此时从请求队列中取出一个任务进行处理惩罚
  1. //工作线程通过run函数不断等待任务队列有新任务,然后 加锁->取任务->解锁->执行任务
  2. template <typename T>
  3. void threadpool<T>::run()
  4. {
  5.     while (true)
  6.     {
  7.         m_queuestat.wait();                     //信号量等待
  8.         m_queuelocker.lock();                   //工作线程被唤醒后先加互斥锁
  9.         if (m_workqueue.empty())
  10.         {
  11.             m_queuelocker.unlock();
  12.             continue;
  13.         }
  14.         T* request = m_workqueue.front();           //从请求队列中取出第一个任务
  15.         m_workqueue.pop_front();                    //该任务从请求队列中弹出
  16.         m_queuelocker.unlock();
  17.         if (!request)
  18.             continue;
  19.         if (1 == m_actor_model)                     //proactor模式和reactor模式切换
  20.         {   /*
  21.             reactor模式:只有reactor模式下,标志位improv和timer_flag才会发挥作用
  22.             imporv:在read_once和write成功后会置1,对应request完成后置0,用于判断上一个请求是否已处理完毕
  23.             timer_flag:当http的读写失败后置1,用于判断用户连接是否异常
  24.             */
  25.             if (0 == request->m_state)                                              //请求状态:读/写
  26.             {
  27.                 if (request->read_once())
  28.                 {
  29.                     request->improv = 1;            
  30.                     connectionRAII mysqlcon(&request->mysql, m_connPool);           //从连接池获取连接
  31.                     request->process();                                             //工作线程执行任务
  32.                 }
  33.                 else
  34.                 {
  35.                     request->improv = 1;
  36.                     request->timer_flag = 1;
  37.                 }
  38.             }
  39.             else
  40.             {
  41.                 if (request->write())
  42.                 {
  43.                     request->improv = 1;
  44.                 }
  45.                 else
  46.                 {
  47.                     request->improv = 1;
  48.                     request->timer_flag = 1;
  49.                 }
  50.             }
  51.         }
  52.         else
  53.         {
  54.             connectionRAII mysqlcon(&request->mysql, m_connPool);
  55.             request->process();
  56.         }
  57.     }
  58. }
复制代码

CGIMysql——数据库毗连池

在处理惩罚用户注册、登录请求时,我们必要保存用户的用户名和密码用于注册或登录校验。该项目采用的方式是:每一个HTTP毗连获取一个数据库毗连,获取此中的用户账号密码进行对比,然后再开释该数据库毗连。若系统必要频繁访问数据库,则必要频繁创建和断开数据库毗连,而创建数据库毗连是一个很耗时的操纵,也轻易对数据库造成安全隐患。
在程序初始化的时候,集中创建多个数据库毗连,并把他们集中管理,供程序使用,可以保证较快的数据库读写速度,更加安全可靠。在本项目中,使用单例模式和链表创建数据库毗连池,实现对数据库毗连资源的复用,并将数据库毗连的获取与开释通过RAII机制封装,避免手动开释
项目中的数据库模块分为两部分,其一是数据库毗连池的界说,其二是使用毗连池完成登录和注册的校验功能。具体,工作线程从数据库毗连池取得一个毗连,访问数据库中的数据,访问完毕后将毗连交还毗连池
毗连池的功能重要有:初始化、获取毗连、开释毗连、销毁毗连池
单例模式创建

使用局部静态变量创建毗连池
  1. class connection_pool
  2. {
  3. public:
  4.         MYSQL* GetConnection();                                 //获取数据库连接
  5.         bool ReleaseConnection(MYSQL* conn); //释放连接
  6.         int GetFreeConn();                                         //获取连接
  7.         void DestroyPool();                                         //销毁所有连接
  8.         //单例模式
  9.         static connection_pool* GetInstance();
  10.         void init(string url, string User, string PassWord, string DataBaseName, int Port, int MaxConn, int close_log);
  11.         void CreateConnection(string url, string User, string PassWord, string DataBaseName, int Port, int close_log);
  12. private:
  13.         connection_pool();
  14.         ~connection_pool();
  15.         int m_MaxConn;  //最大连接数
  16.         int m_CurConn;  //当前已使用的连接数
  17.         int m_FreeConn; //当前空闲的连接数
  18.         locker lock;
  19.         list<MYSQL*> connList; //连接池
  20.         sem reserve;
  21. public:
  22.         string m_url;                         //主机地址
  23.         string m_Port;                 //数据库端口号
  24.         string m_User;                 //登陆数据库用户名
  25.         string m_PassWord;         //登陆数据库密码
  26.         string m_DatabaseName; //使用数据库名
  27.         int m_close_log;        //日志开关
  28. };
复制代码
初始化

   销毁毗连池没有直接被外部调用,而是通过RAII机制来完成主动开释;使用信号量实现多线程争夺毗连的同步机制,这里将信号量初始化为数据库的毗连总数
  1. //构造初始化
  2. void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log)
  3. {
  4.         m_url = url;                                                //初始化数据库信息
  5.         m_Port = Port;
  6.         m_User = User;
  7.         m_PassWord = PassWord;
  8.         m_DatabaseName = DBName;
  9.         m_close_log = close_log;
  10.         for (int i = 0; i < MaxConn; i++)                //创建MaxConn条数据库连接
  11.         {
  12.                 MYSQL* con = NULL;
  13.                 con = mysql_init(con);                                //mysql_init(MYSQL* mysql):初始化或分配与mysql_real_connect()相适应的MYSQL对象
  14.                 if (con == NULL)                                        //初始化失败
  15.                 {
  16.                         LOG_ERROR("MySQL Error");               
  17.                         exit(1);
  18.                 }
  19.                 con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0);
  20.                 if (con == NULL)                                        //建立连接失败
  21.                 {
  22.                         LOG_ERROR("MySQL Error");
  23.                         exit(1);
  24.                 }
  25.                 connList.push_back(con);                        //更新连接池和空闲连接数量
  26.                 ++m_FreeConn;                                               
  27.         }
  28.         reserve = sem(m_FreeConn);                                //将信号量初始化为最大连接次数
  29.         m_MaxConn = m_FreeConn;
  30. }
复制代码
此中值得注意的有
  1. mysql_real_connect(MYSQL* mysql, const char* host, const char* user, const char* passwd, const char* db, unsigned int port, const char* unix_socket, unsigned long client_flag)
  2. 数据库引擎建立连接函数
  3. mysql:定义的MYSQL变量
  4. host:MYSQL服务器的地址,决定了连接的类型。如果"host"是NULL或字符串"localhost",连接将被视为与本地主机的连接,如果操作系统支持套接字(Unix)或命名管道(Windows),将使用它们而不是TCP/IP连接到服务器
  5. user:登录用户名,如果“user”是NULL或空字符串"",用户将被视为当前用户,在UNIX环境下,它是当前的登录名
  6. passwd:登录密码
  7. db:要连接的数据库,如果db为NULL,连接会将该值设为默认的数据库
  8. port:MYSQL服务器的TCP服务端口,如果"port"不是0,其值将用作TCP/IP连接的端口号
  9. unix_socket:unix连接方式,如果unix_socket不是NULL,该字符串描述了应使用的套接字或命名管道
  10. clientflag:Mysql运行为ODBC数据库的标记,一般取0
  11. 返回值:连接成功,返回连接句柄,即第一个变量mysql;连接失败,返回NULL
复制代码
获取、开释毗连

当线程数量大于数据库毗连数量时,使用信号量进行同步,每次取出毗连,信号量原子减1,开释毗连原子加1,若毗连池内没有毗连了,则阻塞等候
别的,由于多线程操纵毗连池,会造成竞争,这里使用互斥锁完成同步,具体的同步机制均使用lock.h中封装好的类
  1. //当有请求时,从数据库连接池中返回一个可用连接,更新使用和空闲连接数
  2. MYSQL* connection_pool::GetConnection()
  3. {
  4.         MYSQL* con = NULL;
  5.         if (0 == connList.size())
  6.                 return NULL;
  7.         reserve.wait();                                        //取出连接,信号量原子减1,为0则等待
  8.         lock.lock();                                        //lock互斥锁保证同一时间只有一个线程对容器connList进行操作
  9.         con = connList.front();                        //得到第一个连接
  10.         connList.pop_front();                        //从连接池中弹出该连接
  11.         if (con->isClosed())                        //如果连接被关闭,删除后重新建立一个
  12.         {
  13.                 delete con;
  14.                 con = this->CreateConnection(this->m_url,this->m_User,this->m_PassWord,this->m_DatabaseName,this->m_Port,this->m_close_log);                //CreationConnection()实际是将之前init中for循环中的“创建一条数据库连接”操作重复一遍
  15.         }
  16.         if (con == NULL)
  17.         {
  18.                 --m_CurConn;
  19.         }
  20.         --m_FreeConn;
  21.         ++m_CurConn;
  22.         lock.unlock();
  23.         return con;
  24. }
  25. //释放当前使用的连接
  26. bool connection_pool::ReleaseConnection(MYSQL* con)
  27. {
  28.         if (NULL == con)                       
  29.                 return false;
  30.         lock.lock();
  31.         connList.push_back(con);
  32.         ++m_FreeConn;
  33.         --m_CurConn;
  34.         lock.unlock();
  35.         reserve.post();                                //释放连接原子加1
  36.         return true;
  37. }
复制代码
销毁毗连池

通过迭代器遍历毗连池链表,关闭对应数据库毗连,清空链表并重置空闲毗连和现有毗连数量
  1. //销毁数据库连接池
  2. void connection_pool::DestroyPool()
  3. {
  4.         lock.lock();
  5.         if (connList.size() > 0)
  6.         {
  7.                 list<MYSQL*>::iterator it;                                                                                //通过迭代器遍历,关闭数据库连接
  8.                 for (it = connList.begin(); it != connList.end(); ++it)
  9.                 {
  10.                         MYSQL* con = *it;
  11.                         mysql_close(con);
  12.                 }
  13.                 m_CurConn = 0;
  14.                 m_FreeConn = 0;
  15.                 connList.clear();                                                        //清空连接池
  16.         }
  17.         lock.unlock();
  18. }
复制代码
RAII机制开释数据库毗连

RAII(Resource Acquisition Is Initialization)是由c++之父Bjarne Stroustrup提出的,中文翻译为资源获取即初始化,指使用局部对象来管理资源的技术。这里的资源重要是指操纵系统中有限的东西如内存、网络套接字等等,局部对象是指存储在栈的对象,它的生命周期是由操纵系统来管理的,无需人工介入
资源的使用一般经历三个步骤a.获取资源 b.使用资源 c.销毁资源,但是资源的销毁往往是程序员经常忘记的一个环节,而C++引入了智能指针(C++11新特性之智能指针)的概念,使用了引用计数的方法,让程序员不必要关系手动开释内存。RAII则是使用了C++中一个对象出了其作用域会被主动析构的特点,在构造函数中申请空间,在析构函数中开释空间来控制资源的生命周期
  1. //定义
  2. class connectionRAII {
  3. public:
  4.         connectionRAII(MYSQL** con, connection_pool* connPool);                        //双指针对MYSQL *con修改
  5.         ~connectionRAII();
  6. private:
  7.         MYSQL* conRAII;
  8.         connection_pool* poolRAII;
  9. }
  10. //实现
  11. connectionRAII::connectionRAII(MYSQL** SQL, connection_pool* connPool) {
  12.         *SQL = connPool->GetConnection();
  13.        
  14.         conRAII = *SQL;
  15.         poolRAII = connPool;
  16. }
  17. connectionRAII::~connectionRAII() {
  18.         poolRAII->ReleaseConnection(conRAII);                               
  19. }
复制代码
RAII现实上应该是一种编程思想,将资源申请、开释等成对的操纵进行封装,实现在局部域内申请资源并及时销毁,使得程序员不需在局部域中时刻观察资源是否必要开释,避免遗漏

Timer——定时器模块

基础知识


模块功能

本项目中,服务器主循环为每一个毗连创建一个定时器,并对每个毗连进行定时。别的,使用升序时间链表容器将全部定时器串联起来,若主循环接收到定时通知,则在链表中依次执行定时任务
Linux提供了三种定时的方法:

  1. int setsockopt( int socket, int level, int option_name,const void *option_value, size_t ,ption_len)
  2. socket:socket描述符
  3. level:被设置的选项的级别。如果要在socket级别上设置选项,则必须将level设置为SOL_SOCKET
复制代码

三种方法没有一劳永逸的应用场景,没有绝对的优劣。在本项目中重要使用的是SIGALRM信号
具体地,使用alarm函数周期性地触发SIGALRM信号,信号处理惩罚函数使用管道通知主循环,主循环接收到该信号后对升序链表上全部定时器进行处理惩罚,若该段时间内没有互换数据,则将该毗连关闭,开释所占用的资源。该模块重要分为两部分,其一为定时方法与信号通知流程,其二为定时器及其容器设计与定时任务的处理惩罚
基础API


  1. struct sigaction {
  2.         void (*sa_handler)(int);                                                        //函数指针,指向旧的信号处理函数
  3.         void (*sa_sigaction)(int, siginfo_t *, void *);                //新的信号处理函数
  4.         sigset_t sa_mask;                                                                        //信号阻塞集,指定在信号处理函数执行期间需要被屏蔽的信号
  5.         int sa_flags;                                                                                //信号的处理方式
  6.         void (*sa_restorer)(void);                                                        //已弃用
  7. }
  8. #include <signal.h>
  9. int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact)
  10. signum:要操作的信号
  11. act:要设置的对信号的新处理方式
  12. oldact:原来对信号的处理方式
  13. return:成功返回0,出现错误则返回-1
复制代码
  1. sa_flags指定信号的处理方式,可以是以下几种的“按位或”组合
  2.         SA_RESTART:使被信号打断的系统调用自动重新发起
  3.         SA_NOCLDSTOP:使父进程在它的子进程暂停或继续运行时不会收到 SIGCHLD 信号
  4.         SA_NOCLDWAIT:使父进程在它的子进程退出时不会收到 SIGCHLD 信号,这时子进程如果退出也不会成为僵尸进程
  5.         SA_NODEFER:使对信号的屏蔽无效,即在信号处理函数执行期间仍能发出这个信号
  6.         SA_RESETHAND:信号处理之后重新设置为默认的处理方式
  7.         SA_SIGINFO:使用 sa_sigaction 成员而不是 sa_handler 作为信号处理函数
复制代码

  1. #include <signal.h>
  2. int sigfillset(sigset_t *set)                        //将参数set信号集初始化(信号集中所有标志位置1),然后把所有的信号加入到此信号集中
  3. set:指向信号集合的指针
  4. return:成功返回0,出现错误则返回-1
复制代码

  1. #include <unisted.h>
  2. unsigned int alarm(unsigned int seconds)                设置信号SIGALRM在经过参数seconds秒数后发送给目前的进程
  3. seconds:要设定的定时时间,以秒为单位。在alarm调用后开始计时,超过该时间将触发SIGALRM信号
  4. return:返回当前进程之前设置的定时器剩余秒数
复制代码

  1. #include <sys/types.h>
  2. #include <sys/socket.h>
  3. int socketpair(int domain, int type, int protocol, int sv[2])                //创建一对无名的、相互连接的套接字
  4. domain:协议族,只能为PF_UNIX或者AF_UNIX
  5. type:协议,可以是SOCK_STREAM(基于TCP)或SOCK_DGRAM(基于UDP)
  6. protocol:类型,只能为0
  7. sv[2]:套接字柄对,该两个句柄作用相同,均能进行读写双向操作
  8. return:创建成功返回0,失败返回-1
复制代码
信号处理惩罚机制

Linux下的信号采用异步处理惩罚机制,信号处理惩罚函数与当前进程是两条差别的执行门路。当进程收到信号时,操纵系统会中断进程当前的正常流程,转而进入信号处理惩罚函数执行操纵,完成后再返回中断的地方继续执行
为避免信号竞态征象发生,信号处理惩罚期间系统不会再次触发它。所以,为确保该信号不被屏蔽太久,信号处理惩罚函数必要尽可能快地执行完毕。一般的信号处理惩罚函数必要处理惩罚该信号对应的逻辑,当该逻辑比较复杂时,信号处理惩罚函数执行时间过长,会导致信号屏蔽太久。为相识决该题目,本项目中信号处理惩罚函数仅发送信号通知程序主循环,将信号对应的处理惩罚逻辑放在程序主循环中,由主循环执行信号对应的逻辑代码。 信号处理惩罚的流程如下图所示(图片来自微信公众号两猿社)

信号的接收

接收信号的任务不是由用户进程来完成的,而是由内核署理的。当内核接收到信号后,会将其放到对应进程的信号队列中,同时向进程发送一个中断,使其陷入内核态。注意,此时信号还只是在队列中,对进程来说临时是不知道有信号到来的
信号的检测

进程陷入内核态后,有两种场景会对信号进行检测:

当发现有新信号时,便会进入下一步,信号的处理惩罚
信号的处理惩罚

信号处理惩罚函数是在用户态上的

  1. //信号处理函数中仅仅通过管道发送信号值,不处理信号对应的逻辑,缩短异步执行时间,减少对主程序的影响
  2. void Utils::sig_handler(int sig)
  3. {
  4.     //为保证函数的可重入性,保留原来的errno
  5.     //可重入性表示中断后再次进入该函数,环境变量与之前相同,不会丢失数据
  6.     int save_errno = errno;
  7.     int msg = sig;
  8.     send(u_pipefd[1], (char*)&msg, 1, 0);                //将信号值从管道写端写入,传输字符类型,而非整型
  9.     errno = save_errno;                                                        //将原来的errno赋值为当前的errno
  10. }
复制代码
信号处理惩罚函数中仅通过管道发送信号值,不处理惩罚信号对应的逻辑,缩短异步执行时间,减少对主程序的影响
  1. //设置信号函数
  2. void Utils::addsig(int sig, void(handler)(int), bool restart)       //项目中设置信号函数,仅关注SIGTERM和SIGALRM两个信号
  3. {
  4.     struct sigaction sa;                             //创建sigaction结构体变量
  5.     memset(&sa, '\0', sizeof(sa));
  6.     sa.sa_handler = handler;                        //信号处理函数中仅仅发送信号值,不做对应逻辑处理
  7.     if (restart)
  8.         sa.sa_flags |= SA_RESTART;
  9.     sigfillset(&sa.sa_mask);                        //将所有信号添加到信号集中
  10.     assert(sigaction(sig, &sa, NULL) != -1);        //执行sigaction函数
  11. }
  12. 项目中设置信号函数,仅关注SIGTERM和AIGALRM两个信号
复制代码
信号通知逻辑


定时器设计

定时器类的界说

项目中将毗连资源、定时势件和超时时间封装为定时类

  1. //连接资源结构体成员需要用到定时器类,需要前向声明
  2. class util_timer;
  3. struct client_data                  //开辟用户socket结构 对应于最大处理fd
  4. {
  5.     sockaddr_in address;            //客户端socket地址
  6.     int sockfd;                     //socket文件描述符
  7.     util_timer* timer;              //定时器
  8. };  
  9. class util_timer                    //定时器类
  10. {
  11. public:
  12.     util_timer() : prev(NULL), next(NULL) {}
  13. public:
  14.     time_t expire;                              //超时时间
  15.     void (*cb_func)(client_data*);              //回调函数
  16.     client_data* user_data;                     //连接资源
  17.     util_timer* prev;                           //前向定时器
  18.     util_timer* next;                           //后继定时器
  19. };
复制代码
定时势件,具体地,从内核变乱表删除变乱,关闭文件描述符,开释毗连资源
  1. //定时器回调函数
  2. void cb_func(client_data* user_data)
  3. {
  4.     epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);        //删除非活动连接在socket上的注册事件
  5.     assert(user_data);
  6.     close(user_data->sockfd);                        //关闭文件描述符
  7.     http_conn::m_user_count--;                        //减少连接数
  8. }
复制代码
定时器的创建与销毁

该项目的定时器采用升序双向链表作为容器,具体地,为每个毗连创建一个定时器,将其添加到链表中,并按照超时时间升序排序。在本项目中,定时器的创建与销毁采用了RAII思想,在构造函数与析构函数中完成定时器容器的创建与销毁
  1. sort_timer_lst::sort_timer_lst()
  2. {
  3.     head = NULL;
  4.     tail = NULL;
  5. }
  6. sort_timer_lst::~sort_timer_lst()       //常规销毁链表
  7. {
  8.     util_timer* tmp = head;
  9.     while (tmp)
  10.     {
  11.         head = tmp->next;
  12.         delete tmp;
  13.         tmp = head;
  14.     }
  15. }
复制代码
添加定时任务

添加定时任务即将新毗连的定时器添加到链表中。若当前链表中只有头尾结点(链表为空),则直接使头尾结点指向该定时器

  1. void sort_timer_lst::add_timer(util_timer* timer)       //添加定时器,内部调用私有成员add_timer
  2. {
  3.     if (!timer)
  4.     {
  5.         return;
  6.     }
  7.     if (!head)
  8.     {
  9.         head = tail = timer;
  10.         return;
  11.     }
  12.     if (timer->expire < head->expire)       //如果新的定时器超时时间小于当前头部结点,则直接将当前定时器结点作为头部结点
  13.     {
  14.         timer->next = head;
  15.         head->prev = timer;
  16.         head = timer;
  17.         return;
  18.     }
  19.     add_timer(timer, head);                 //否则调用私有成员,调整内部结点
  20. }
复制代码

  1. //私有成员,被公有成员add_timer和adjust_time调用,主要用于调整链表内部结点
  2. void sort_timer_lst::add_timer(util_timer* timer, util_timer* lst_head)     //主要用于调整链表内部结点
  3. {
  4.     util_timer* prev = lst_head;
  5.     util_timer* tmp = prev->next;
  6.     while (tmp)                                             //遍历当前结点之后的链表,按照超时时间找到目标定时器对应的位置,常规双向链表插入操作
  7.     {
  8.         if (timer->expire < tmp->expire)
  9.         {
  10.             prev->next = timer;
  11.             timer->next = tmp;
  12.             tmp->prev = timer;
  13.             timer->prev = prev;
  14.             break;
  15.         }
  16.         prev = tmp;
  17.         tmp = tmp->next;
  18.     }
  19.     if (!tmp)                                           //遍历完发现,目标定时器需要放到尾结点处
  20.     {
  21.         prev->next = timer;
  22.         timer->prev = prev;
  23.         timer->next = NULL;
  24.         tail = timer;
  25.     }
  26. }
复制代码
任务超时时间调整

当定时任务发生变革,调整对应定时器在链表中的位置

  1. void sort_timer_lst::adjust_timer(util_timer* timer)        //调整定时器,任务发生变化时,调整定时器在链表中的位置
  2. {
  3.     if (!timer)
  4.     {
  5.         return;
  6.     }
  7.     util_timer* tmp = timer->next;
  8.     if (!tmp || (timer->expire < tmp->expire))
  9.     {
  10.         return;
  11.     }
  12.     if (timer == head)                                  //被调整定时器是链表头结点,将定时器取出,重新插入               
  13.     {
  14.         head = head->next;
  15.         head->prev = NULL;
  16.         timer->next = NULL;
  17.         add_timer(timer, head);
  18.     }
  19.     else                                                //被调整定时器在内部,将定时器取出,重新插入
  20.     {
  21.         timer->prev->next = timer->next;
  22.         timer->next->prev = timer->prev;
  23.         add_timer(timer, timer->next);
  24.     }
  25. }
复制代码
删除定时任务

定时器超时,则将其从链表中删除
  1. void sort_timer_lst::del_timer(util_timer* timer)           //删除定时器
  2. {
  3.     if (!timer)
  4.     {
  5.         return;
  6.     }
  7.     if ((timer == head) && (timer == tail))                 //链表中只有一个定时器,需要删除该定时器
  8.     {
  9.         delete timer;
  10.         head = NULL;
  11.         tail = NULL;
  12.         return;
  13.     }
  14.     if (timer == head)                                      //被删除的定时器为头结点
  15.     {
  16.         head = head->next;
  17.         head->prev = NULL;
  18.         delete timer;
  19.         return;
  20.     }
  21.     if (timer == tail)                                      //被删除的定时器为尾结点
  22.     {
  23.         tail = tail->prev;
  24.         tail->next = NULL;
  25.         delete timer;
  26.         return;
  27.     }
  28.     timer->prev->next = timer->next;                        //被删除的定时器在链表内部,常规链表结点删除
  29.     timer->next->prev = timer->prev;
  30.     delete timer;
  31. }
复制代码
定时任务处理惩罚函数

使用同一变乱源,SIGALRM信号每次被触发,主循环中调用一次定时任务处理惩罚函数,处理惩罚链表容器中到期的定时器
具体逻辑如下:

  1. void sort_timer_lst::tick()
  2. {
  3.     if (!head)
  4.     {
  5.         return;
  6.     }
  7.     time_t cur = time(NULL);                                //获取当前时间
  8.     util_timer* tmp = head;
  9.     while (tmp)                                             //遍历定时器链表
  10.     {
  11.         if (cur < tmp->expire)                              //链表容器为升序排列,当前时间小于定时器的超时时间,后面的定时器也没有到期
  12.         {
  13.             break;
  14.         }
  15.         tmp->cb_func(tmp->user_data);                       //当前定时器到期,则调用回调函数,执行定时事件
  16.         head = tmp->next;                                   //将处理后的定时器从链表容器中删除,并重置头结点
  17.         if (head)
  18.         {
  19.             head->prev = NULL;
  20.         }
  21.         delete tmp;
  22.         tmp = head;
  23.     }
  24. }
复制代码
定时器的使用(webserver.cpp)

  1. void WebServer::timer(int connfd, struct sockaddr_in client_address)
  2. {
  3.     users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName);
  4.     //初始化client_data数据
  5.     //创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中
  6.     users_timer[connfd].address = client_address;
  7.     users_timer[connfd].sockfd = connfd;
  8.     util_timer *timer = new util_timer;                                //创建定时器
  9.     timer->user_data = &users_timer[connfd];                //绑定用户数据
  10.     timer->cb_func = cb_func;                                                //设置回调函数
  11.     time_t cur = time(NULL);
  12.     timer->expire = cur + 3 * TIMESLOT;                                //设置超时时间
  13.     users_timer[connfd].timer = timer;
  14.     utils.m_timer_lst.add_timer(timer);                                //添加定时器到链表中
  15. }
复制代码
  1. void WebServer::eventLoop()
  2. {
  3.     bool timeout = false;                                                           //超时默认为false
  4.     bool stop_server = false;
  5.     while (!stop_server)
  6.     {
  7.         int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);           //监测发生事件的文件描述符
  8.         if (number < 0 && errno != EINTR)
  9.         {
  10.             LOG_ERROR("%s", "epoll failure");
  11.             break;
  12.         }
  13.         for (int i = 0; i < number; i++)                                            //轮询事件描述符
  14.         {
  15.             int sockfd = events[i].data.fd;
  16.             //处理新到的客户连接
  17.             if (sockfd == m_listenfd)
  18.             {
  19.                 bool flag = dealclinetdata();
  20.                 if (false == flag)
  21.                     continue;
  22.             }
  23.             else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))             //处理异常事件
  24.             {
  25.                 //服务器端关闭连接,移除对应的定时器
  26.                 util_timer* timer = users_timer[sockfd].timer;
  27.                 deal_timer(timer, sockfd);
  28.             }
  29.             //处理定时器信号
  30.             else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN))   //管道读端对应文件描述符发生读事件        
  31.             {
  32.                 bool flag = dealwithsignal(timeout, stop_server);
  33.                 if (false == flag)
  34.                     LOG_ERROR("%s", "dealclientdata failure");
  35.             }
  36.             //处理客户连接上接收到的数据
  37.             else if (events[i].events & EPOLLIN)
  38.             {
  39.                 dealwithread(sockfd);
  40.             }
  41.             else if (events[i].events & EPOLLOUT)
  42.             {
  43.                 dealwithwrite(sockfd);
  44.             }
  45.         }
  46.         if (timeout)                //处理定时器为非必须事件,收到信号并不是立马处理,而是完成读写事件后再进行处理
  47.         {
  48.             utils.timer_handler();
  49.             LOG_INFO("%s", "timer tick");
  50.             timeout = false;
  51.         }
  52.     }
  53. }
复制代码
此中,处理惩罚定时器信号部分中的dealwithsignal()内容如下
  1. bool WebServer::dealwithsignal(bool& timeout, bool& stop_server)
  2. {
  3.     int ret = 0;
  4.     int sig;
  5.     char signals[1024];
  6.     ret = recv(m_pipefd[0], signals, sizeof(signals), 0);                //从管道读端读出信号值,成功返回字节数,失败返回-1
  7.     if (ret == -1)
  8.     {
  9.         return false;
  10.     }
  11.     else if (ret == 0)
  12.     {
  13.         return false;
  14.     }
  15.     else
  16.     {
  17.         for (int i = 0; i < ret; ++i)
  18.         {
  19.             switch (signals[i])
  20.             {
  21.             case SIGALRM:                       //接收到SIGALRM信号,timeout设置为true
  22.             {
  23.                 timeout = true;
  24.                 break;
  25.             }
  26.             case SIGTERM:                       //接收到SIGTERM信号,终止进程运行,stop_server设置为true
  27.             {
  28.                 stop_server = true;
  29.                 break;
  30.             }
  31.             }
  32.         }
  33.     }
  34.     return true;
  35. }
复制代码
处理惩罚客户毗连上接收到的数据,其函数内容为
  1. void WebServer::dealwithread(int sockfd)
  2. {
  3.     util_timer* timer = users_timer[sockfd].timer;
  4.     //reactor
  5.     if (1 == m_actormodel)
  6.     {
  7.         if (timer)
  8.         {
  9.             adjust_timer(timer);
  10.         }
  11.         //若监测到读事件,将该事件放入请求队列
  12.         m_pool->append(users + sockfd, 0);
  13.         while (true)
  14.         {
  15.             if (1 == users[sockfd].improv)
  16.             {
  17.                 if (1 == users[sockfd].timer_flag)
  18.                 {
  19.                     deal_timer(timer, sockfd);
  20.                     users[sockfd].timer_flag = 0;
  21.                 }
  22.                 users[sockfd].improv = 0;
  23.                 break;
  24.             }
  25.         }
  26.     }
  27.     else
  28.     {
  29.         //proactor
  30.         if (users[sockfd].read_once())
  31.         {
  32.             LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
  33.             //若监测到读事件,将该事件放入请求队列
  34.             m_pool->append_p(users + sockfd);
  35.             if (timer)
  36.             {
  37.                 adjust_timer(timer);                //有数据传输,则将定时器向后延迟3个单位,并对新的定时器在链表上的位置进行调整
  38.             }
  39.         }
  40.         else
  41.         {
  42.             deal_timer(timer, sockfd);                //服务器端关闭连接,移除对应的定时器
  43.         }
  44.     }
  45. }
  46. void WebServer::dealwithwrite(int sockfd)
  47. {
  48.     util_timer* timer = users_timer[sockfd].timer;
  49.     //reactor
  50.     if (1 == m_actormodel)
  51.     {
  52.         if (timer)
  53.         {
  54.             adjust_timer(timer);
  55.         }
  56.         m_pool->append(users + sockfd, 1);
  57.         while (true)
  58.         {
  59.             if (1 == users[sockfd].improv)
  60.             {
  61.                 if (1 == users[sockfd].timer_flag)
  62.                 {
  63.                     deal_timer(timer, sockfd);
  64.                     users[sockfd].timer_flag = 0;
  65.                 }
  66.                 users[sockfd].improv = 0;
  67.                 break;
  68.             }
  69.         }
  70.     }
  71.     else
  72.     {
  73.         //proactor
  74.         if (users[sockfd].write())
  75.         {
  76.             LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
  77.             if (timer)
  78.             {
  79.                 adjust_timer(timer);                //有数据传输,则将定时器向后延迟3个单位,并对新的定时器在链表上的位置进行调整
  80.             }
  81.         }
  82.         else
  83.         {
  84.             deal_timer(timer, sockfd);                //服务器端关闭连接,移除对应的定时器
  85.         }
  86.     }
  87. }
复制代码

Lock——信号同步机制封装

竟态

竟态指多个进程或线程在对共享资源进行读写时,由于对共享资源的访问顺序不确定或变乱差等因素导致的错误行为。具体来说,当两个或多个进程或线程试图同时访问同一共享资源时,它们之间可能会产生竟态,从而导致程序出现不可猜测的效果
竟态是一种非常难以调试和解决的题目,因为竟态通常只在特定的时间和特定的运行情况下才会出现,因此可能必要多次运行程序才华重现题目。为避免竟态,必要使用各种同步机制,例如互斥锁、条件变量、信号量等,来保护共享资源的访问,确保只有一个进程或线程能够访问该资源
信号量——sem类

信号量是一种特别的变量,它只能取自然数而且只支持两种操纵,P(wait)和V(signal)

信号量的取值可以是任何自然数,根据初始值的差别可以分为两类:

根据使用场景的差别,信号量也可以分为两类:

在本项目中重要实现的是线程同步,只必要使用无名信号量,重要使用以下几个函数

  1. include <semaphore.h>
  2. int sem_init(sem_t *sem,int pshared,unsignedint value)                //初始化一个信号量
  3. sem:指向要初始化的信号量的指针
  4. pshared:指定信号量的共享方式。如果值为 0,则信号量将在进程内部共享。如果值为非 0,则信号量可以在不同进程之间共享,需要使用共享内存
  5. value:指定信号量的初值
  6. return:成功则返回0,否则返回-1
复制代码

  1. include <semaphore.h>
  2. int sem_destroy(sem_t *sem)                        //销毁一个信号量
  3. sem:指向要销毁的信号量的指针
  4. return:成功返回0,否则返回-1
复制代码

  1. include <semaphore.h>
  2. int sem_wait(sem_t *sem)                        //对信号量进行 P 操作,如果信号量的值小于等于 0,则会阻塞当前线程
  3. sem:指向要操作的信号量的指针
  4. return:成功返回0,否则返回-1
复制代码

  1. include <semaphore.h>
  2. int sem_post(sem_t *sem)                        //对信号量进行 V 操作,如果信号量的值小于等于 0,则会唤醒阻塞在该信号量上的某个线程
  3. sem:指向要操作的信号量的指针
  4. return:成功返回0,否则返回-1
复制代码
互斥锁——locker类

互斥锁,也称互斥量,可以保护关键代码段,以确保独占式访问.当进入关键代码段,得到互斥锁将其加锁;离开关键代码段,唤醒等候该互斥锁的线程
互斥量是一种用于保护临界区的同步机制,可以确保同一时刻只有一个线程访问共享资源。当一个线程访问共享资源时,必要先获取互斥量的锁,其他线程必要等候该锁开释才华继续执行
互斥量不是为了消除竞争,现实上,资源还是共享的,线程间也还是竞争的,只不过通过这种“锁”机制就将共享资源的访问变成互斥操纵,也就是说一个线程操纵这个资源时,其它线程无法操纵它,从而消除与时间有关的错误。但是,这种锁机制不是欺凌的,互斥锁实质上是操纵系统提供的一把“发起锁”(又称“协同锁”),发动身序中有多线程访问共享资源的时候使用该机制
因此,即使有了mutex,其它线程如果不按照这种锁机制来访问共享数据的话,依然会造成数据杂乱。所以为了避免这种情况,全部访问该共享资源的线程必须采用类似的锁机制

  1. include <pthread.h>
  2. int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)                //初始化互斥量
  3. mutex:指向要初始化的互斥量的指针
  4. attr:指向互斥量属性对象的指针,通常设置为 NULL
  5. return:成功则返回0,否则返回一个正整数的错误码
复制代码

  1. include <pthread.h>
  2. int pthread_mutex_destroy(pthread_mutex_t *mutex)                        //销毁互斥量
  3. mutex:指向要销毁的互斥量的指针
  4. return:成功则返回0,否则返回一个正整数的错误码
复制代码

  1. include <pthread.h>
  2. int pthread_mutex_lock(pthread_mutex_t *mutex)                                //给互斥量加锁,如果互斥量已经被锁住,则阻塞当前线程,直到互斥量被解锁
  3. mutex:指向要加锁的互斥量的指针
  4. return:成功则返回0,否则返回一个正整数的错误码
复制代码

  1. include <pthread.h>
  2. int pthread_mutex_unlock(pthread_mutex_t *mutex)                        //解锁互斥量,如果有等待该互斥量的线程,则唤醒其中的一个线程
  3. mutex:指向要加锁的互斥量的指针
  4. return:成功则返回0,否则返回一个正整数的错误码
复制代码
条件变量——cond类

条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等候这个共享数据的线程
条件变量使用线程间共享的全局变量进行同步,重要包罗两个动作:一个线程等候条件变量的条件成立而挂起;另一个线程使条件成立(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥量结合在一起

  1. include <pthread.h>
  2. int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)                //初始化条件变量对象,设置相关属性
  3. cond:指向条件变量对象的指针
  4. attr:指向线程条件属性对象的指针。一般为 NULL
  5. return:成功则返回0,失败返回错误号
复制代码

  1. include <pthread.h>
  2. int pthread_cond_destroy(pthread_cond_t *cond)                //销毁条件变量对象,释放资源
  3. cond:指向条件变量对象的指针
  4. return:成功则返回0,失败返回错误号
复制代码

  1. include <pthread.h>
  2. int pthread_cond_broadcast(pthread_cond_t *cond)                //唤醒所有在条件变量上等待的线程
  3. cond:指向条件变量对象的指针
  4. return:成功则返回0,失败返回错误号
复制代码

  1. include <pthread.h>
  2. int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)                //让当前线程阻塞在条件变量上等待唤醒
  3. cond:指向条件变量对象的指针
  4. mutex:指向互斥锁对象的指针,用于保护条件变量
  5. return:成功则返回0,失败返回错误号
复制代码

  1. include <pthread.h>
  2. int pthread_cond_signal(pthread_cond_t *cond)
  3. cond:指向条件变量的指针
  4. return:成功则返回0,失败返回错误号
复制代码
锁机制的功能

锁机制用来实现多线程同步,通过锁机制,确保任一时刻只能有一个线程能进入关键代码段
为便于实现同步类的RAII机制,该项目在pthread库的基础上进行了封装,实现了类似于C++11的mutex标准库功能
Log——日志系统

日志系统分为两部分,一部分是单例模式和阻塞队列的界说;另一部分是日志类的界说与使用
基础知识


同步日志与异步日志的比较


单例模式


懒汉模式:在第一次被使用时才初始化(耽误初始化)
  1. class Singleton
  2. {
  3. private:
  4.         static Singleton* instance;                        //私有化静态指针指向唯一实例
  5. private:
  6.         Singleton() {};
  7.         ~Singleton() {};
  8. public:
  9.         static Singleton* getInstance();                         //公有静态方法获取实例
  10. };
  11. Singleton* Singleton::getInstance()
  12. {
  13.         if(instance == NULL)
  14.                 instance = new Singleton();
  15.         return instance;
  16. }
  17. // init static member
  18. Singleton* Singleton::instance = NULL;
复制代码
懒汉模式存在内存泄漏的题目,可以通过使用智能指针或使用静态的嵌套类对象来解决。以下是使用静态嵌套类对象的例子
  1. class Singleton
  2. {
  3. private:
  4.         static Singleton* instance;
  5. private:
  6.         Singleton() { };
  7.         ~Singleton() { };
  8. private:
  9.         class Deletor {
  10.         public:
  11.                 ~Deletor() {
  12.                         if(Singleton::instance != NULL)
  13.                                 delete Singleton::instance;
  14.                 }
  15.         };
  16.         static Deletor deletor;
  17.        
  18. public:
  19.         static Singleton* getInstance();
  20. };
  21. Singleton* Singleton::getInstance()
  22. {
  23.         if(instance == NULL)
  24.                 instance = new Singleton();
  25.         return instance;
  26. }
  27. // init static member
  28. Singleton::Deletor Singleton::deletor;
  29. Singleton* Singleton::instance = NULL;
复制代码
懒汉模式另有个题目是在多线程情况下第一次初始化过程中会出现线程安全题目,为相识决该题目可以使用互斥锁,同时为了避免每次初始化过程都加锁而使用双检测锁(Double Check Lock,DCL)
  1. class Singleton
  2. {
  3. private:
  4.         static Singleton* instance;                        //私有化静态指针指向唯一实例
  5.         static pthread_mutex_t lock;                //静态锁,静态函数只能访问静态成员
  6. private:
  7.         Singleton() {};
  8.         ~Singleton() {};
  9. public:
  10.         static Singleton* getInstance()                         //公有静态方法获取实例
  11. };
  12. Singleton* Singleton::instance = NULL;
  13. Singleton* Singleton::getInstance(){
  14.         if(instance == NULL)
  15.         {
  16.                 pthread_mutex_lock(&lock);
  17.                 if(NULL == instance)
  18.                 {
  19.                         instance == new Singleton();
  20.                 }
  21.                 pthread_mutex_unlock(&lock);
  22.         }
  23.         return instance;
复制代码
C++中规定了local static在多线程条件下的初始化行为,要求编译器保证了内部静态变量的线程安全性。在C++11标准下,《Effective C++》提出了一种更优雅的单例模式实现,使用函数内的 local static 对象,这种方法不消加锁和解锁操纵。这样,只有当第一次访问getInstance()方法时才创建实例。这种方法也被称为Meyers’ Singleton
  1. class Singleton
  2. {
  3. private:
  4.         Singleton() { };
  5.         ~Singleton() { };
  6. public:
  7.         static Singleton& getInstance();
  8. };
  9. Singleton::Singleton::getInstance()
  10.         {
  11.                 static Singleton instance;
  12.                 return instance;
  13.         }
复制代码
饿汉模式:在程序运行时立刻初始化
饿汉模式不必要用锁,就可以实现线程安全。原因在于,在程序运行时就界说了对象,并对其初始化。之后,不管哪个线程调用成员函数getinstance(),都只不过是返回一个对象的指针而已。所以是线程安全的,不必要在获取实例的成员函数中加锁
  1. class Singleton{
  2. private:
  3.     static Singleton* instance;
  4.     single(){}
  5.     ~single(){}
  6. public:
  7.     static Singleton* getinstance();
  8. };
  9. Singleton* Singleton::instance = new Singleton();
  10. Singleton* Singleton::getInstance(){
  11.     return instance;
  12. }
  13. //测试方法
  14. int main(){
  15.     Singleton *p1 = Singleton::getInstance();
  16.     Singleton *p2 = Singleton::getInstance();
  17.     if (p1 == p2)
  18.         cout << "same" << endl;
  19.     system("pause");
  20.     return 0;
  21. }
复制代码
但是饿汉模式存在一个潜伏题目:no-local static对象(函数外的static对象)在差别编译单位中的初始化顺序是未界说的,即“ static Singleton instance ”和“ static Singleton* getInstance() ”二者的初始化顺序不确定,如果在初始化完成之前调用 getInstance() 方法会返回一个未界说的实例

阻塞队列

条件变量与生产者-消耗者模子

在Lock模块中提到,条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等候该共享数据的线程
此中pthread_cond_wait()的使用方式如下
  1. pthread _mutex_lock(&mutex)                                        //加锁避免因为多线程访问导致的资源竞争
  2. while(线程执行的条件是否成立){                       
  3.     pthread_cond_wait(&cond, &mutex);               
  4. }
  5. pthread_mutex_unlock(&mutex);
复制代码
pthread_cond_wait执行后的内部操纵分为以下几步:
   一般来说,在多线程资源竞争的时候,在一个使用资源的线程内里(消耗者)判断资源是否可用,不可用,便调用pthread_cond_wait,在另一个线程内里(生产者)如果判断资源可用的话,则调用pthread_cond_signal发送一个资源可用信号
但wait成功后,有可能多个线程都在等候这个资源可用的信号,信号发出后只有一个资源可用。未竞争到资源的线程应当继续等候资源,故使用while而不是if(如只有一个消耗者,则可以使用if)
  阻塞队列(block_queue.h)

阻塞队列类中封装了生产者-消耗者模子,使用了循环数组实现了队列(也可使用STL中的队列)作为共享缓冲区
比较重要的是以下几个函数

  1. //往队列添加元素,需要将所有使用队列的线程先唤醒
  2. //当有元素push进队列,相当于生产者生产了一个元素
  3. //若当前没有线程等待条件变量,则唤醒无意义
  4. bool push(const T& item)
  5. {
  6.     m_mutex.lock();
  7.     if (m_size >= m_max_size)
  8.     {
  9.         m_cond.broadcast();
  10.         m_mutex.unlock();
  11.         return false;
  12.     }
  13.     m_back = (m_back + 1) % m_max_size;                将新增数据放在循环数组的对应位置
  14.     m_array[m_back] = item;
  15.     m_size++;
  16.     m_cond.broadcast();
  17.     m_mutex.unlock();
  18.     return true;
  19. }
复制代码

  1. //pop时,如果当前队列没有元素,将会等待条件变量
  2. bool pop(T& item)
  3. {
  4.     m_mutex.lock();
  5.     while (m_size <= 0)
  6.     {
  7.         if (!m_cond.wait(m_mutex.get()))
  8.         {
  9.             m_mutex.unlock();
  10.             return false;
  11.         }
  12.     }
  13.     m_front = (m_front + 1) % m_max_size;
  14.     item = m_array[m_front];
  15.     m_size--;
  16.     m_mutex.unlock();
  17.     return true;
  18. }
复制代码

  1. //在pthread_cond_wait基础上增加了等待的时间,只指定时间内能抢到互斥锁即可
  2. //其他逻辑不变
  3. bool pop(T &item, int ms_timeout)
  4. {
  5.     struct timespec t = {0, 0};                //struct timespec提供秒和纳秒单位,最高精度是纳秒
  6.     struct timeval now = {0, 0};        //struct  timeval:提供秒和微秒单位,最高精度是微秒
  7.     gettimeofday(&now, NULL);                //gettimeofday(struct  timeval*tv,struct  timezone *tz),返回当前时间tv以及当前时区信息tz
  8.     pthread_mutex_lock(m_mutex);
  9.     if (m_size <= 0)
  10.     {
  11.         t.tv_sec = now.tv_sec + ms_timeout / 1000;
  12.         t.tv_nsec = (ms_timeout % 1000) * 1000;
  13.         if (0 != pthread_cond_timedwait(m_cond, m_mutex, &t))        //pthread_cond_timedwait()中的&t需要时间的绝对值
  14.         {
  15.             pthread_mutex_unlock(m_mutex);
  16.             return false;
  17.         }
  18.     }
  19.     if (m_size <= 0)                        //唤醒后线程未竞争到资源
  20.     {
  21.         pthread_mutex_unlock(m_mutex);
  22.         return false;
  23.     }
  24.     m_front = (m_front + 1) % m_max_size;
  25.     item = m_array[m_front];
  26.     m_size--;
  27.     pthread_mutex_unlock(m_mutex);
  28.     return true;
  29. }
复制代码
日志类

基础API


  1. int fputs(const char *str, FILE *stream)
  2. str:一个数组,包含了要写入的以空字符终止的字符序列
  3. stream:指向 FILE 对象的指针,该 FILE 对象标识了要被写入字符串的流
  4. return:成功返回一个非负值,错误返回EOF
复制代码

  1. #include <stdio.h>
  2. #define myprintf(...) printf(__VA_ARGS__)
  3. int main()
  4. {
  5.     myprintf("0123456789\n");
  6.     myprintf("www.csdn.net\n");
  7.     return 0;
  8. }
  9. /*
  10. 0123456789
  11. www.csdn.net
  12. */
  13. //搭配va_list的format使用
  14. #define my_print2(format, ...) printf(format, __VA_ARGS__)
  15. #define my_print3(format, ...) printf(format, ##__VA_ARGS__)
复制代码
__VA_ARGS__宏前面加上##的作用在于,当可变参数的个数为0时,上述例子中printf参数列表中的的##会把前面多余的 “,” 去掉,否则会编译堕落,发起使用时加上 “##”,使得程序更加结实

  1. int fflush(FILE *stream)
  2. stream:指向 FILE 对象的指针
  3. fflush(stdin):刷新缓冲区,将缓冲区内的数据清空并丢弃
  4. fflush(stdout):刷新缓冲区,将缓冲区内的数据强制输出到设备
复制代码
在使用多个输出函数连续进行多次输出到控制台时,有可能上一个数据还没输出完毕,还在输出缓冲区中时,下一个printf就把另一个数据加入输出缓冲区,覆盖了原来的数据,出现输堕落误。为了避免这种错误,可以在prinf()后加上 “fflush(stdout);” 欺凌马上输出到控制台
printf打印到标准输出时,终端是行缓存, 碰到\n就将缓存输出,如果多次printf中只有最后一次打印带 “\n” ,则只有最后一次碰到 “\n” 时才将全部内容一次打印出来。stdout默认是是行缓冲的,碰到 “\n” 就写内容清缓存,而加上 “fflush(stdout);” 与 有 “\n” 作用一样,只是不换行
  1. int print1()                                //先进入sleep后打印 “hello world!”
  2. {
  3.         printf("hello");
  4.     sleep(5);
  5.     printf(" world!\n");
  6.     return 0;
  7. }
  8. int print2()                                //先打印 “hello” 再进入sleep 后打印 “ world!”
  9. {
  10.         printf("hello");
  11.         fflush(stdout);
  12.     sleep(5);
  13.     printf(" world!\n");
  14.     return 0;
  15. }
复制代码
日志类流程



具体实现


  1. #pragma once
  2. #ifndef LOG_H
  3. #define LOG_H
  4. #include <stdio.h>
  5. #include <iostream>
  6. #include <string>
  7. #include <stdarg.h>
  8. #include <pthread.h>
  9. #include "block_queue.h"
  10. using namespace std;
  11. class Log
  12. {
  13. public:
  14.     //C++11以后,使用局部变量懒汉不用加锁
  15.     static Log* get_instance()  //采用C++11提供的Meyers' Singleton,无须加锁
  16.     {
  17.         static Log instance;
  18.         return &instance;
  19.     }
  20.     static void* flush_log_thread(void* args)           //异步写日志公有方法,调用私有方法async_write_log()
  21.     {
  22.         Log::get_instance()->async_write_log();
  23.     }
  24.     //可选择的参数有日志文件、日志缓冲区大小、最大行数以及最长日志条队列
  25.     bool init(const char* file_name, int close_log, int log_buf_size = 8192, int split_lines = 5000000, int max_queue_size = 0);
  26.     void write_log(int level, const char* format, ...);     //将输出内容按照标准格式整理
  27.     void flush(void);       //强制刷新缓冲区
  28. private:
  29.     Log();
  30.     virtual ~Log();
  31.     void* async_write_log()
  32.     {
  33.         string single_log;
  34.         //从阻塞队列中取出一个日志string,写入文件
  35.         while (m_log_queue->pop(single_log))
  36.         {
  37.             m_mutex.lock();
  38.             fputs(single_log.c_str(), m_fp);
  39.             m_mutex.unlock();
  40.         }
  41.     }
  42. private:
  43.     char dir_name[128]; //路径名
  44.     char log_name[128]; //log文件名
  45.     int m_split_lines;  //日志最大行数
  46.     int m_log_buf_size; //日志缓冲区大小
  47.     long long m_count;  //日志行数记录
  48.     int m_today;        //因为按天分类,记录当前时间是那一天
  49.     FILE* m_fp;         //打开log的文件指针
  50.     char* m_buf;        //要输出的内容
  51.     block_queue<string>* m_log_queue; //阻塞队列
  52.     bool m_is_async;                  //是否同步标志位
  53.     locker m_mutex;                   //同步类
  54.     int m_close_log; //关闭日志
  55. };
  56. //这四个宏定义在其他文件中使用,主要用于不同类型的日志输出
  57. #define LOG_DEBUG(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(0, format, ##__VA_ARGS__); Log::get_instance()->flush();}
  58. #define LOG_INFO(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(1, format, ##__VA_ARGS__); Log::get_instance()->flush();}
  59. #define LOG_WARN(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(2, format, ##__VA_ARGS__); Log::get_instance()->flush();}
  60. #define LOG_ERROR(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(3, format, ##__VA_ARGS__); Log::get_instance()->flush();}
  61. #endif
复制代码

  1. //异步需要设置阻塞队列的长度,同步不需要设置
  2. bool Log::init(const char* file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size)
  3. {
  4.     //如果设置了max_queue_size,则设置为异步
  5.     if (max_queue_size >= 1)
  6.     {
  7.         m_is_async = true;
  8.         m_log_queue = new block_queue<string>(max_queue_size);
  9.         pthread_t tid;
  10.         //flush_log_thread为回调函数,这里表示创建线程异步写日志
  11.         pthread_create(&tid, NULL, flush_log_thread, NULL);
  12.     }
  13.     //输出内容的长度
  14.     m_close_log = close_log;
  15.     m_log_buf_size = log_buf_size;
  16.     m_buf = new char[m_log_buf_size];
  17.     memset(m_buf, '\0', m_log_buf_size);
  18.     m_split_lines = split_lines;            //日志的最大行数
  19.     time_t t = time(NULL);
  20.     struct tm* sys_tm = localtime(&t);
  21.     struct tm my_tm = *sys_tm;
  22.     const char* p = strrchr(file_name, '/');        //从后往前找到第一个“/”的位置
  23.     char log_full_name[256] = { 0 };
  24.     //自定义日志名
  25.     if (p == NULL)      //若输入的文件名没有“/”,则以“时间+文件名”作为日志名
  26.     {
  27.         snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name);
  28.     }
  29.     else
  30.     {
  31.         strcpy(log_name, p + 1);
  32.         strncpy(dir_name, file_name, p - file_name + 1);        //p - file_name + 1:文件所在路径文件夹的长度    dir_name相当于“./”
  33.         snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name);
  34.     }
  35.     m_today = my_tm.tm_mday;
  36.     m_fp = fopen(log_full_name, "a");
  37.     if (m_fp == NULL)
  38.     {
  39.         return false;
  40.     }
  41.     return true;
  42. }
复制代码

  1. void Log::write_log(int level, const char* format, ...)
  2. {
  3.     struct timeval now = { 0, 0 };
  4.     gettimeofday(&now, NULL);
  5.     time_t t = now.tv_sec;
  6.     struct tm* sys_tm = localtime(&t);      //得到当前时间
  7.     struct tm my_tm = *sys_tm;
  8.     char s[16] = { 0 };
  9.     switch (level)              //文件分级
  10.     {
  11.     case 0:
  12.         strcpy(s, "[debug]:");
  13.         break;
  14.     case 1:
  15.         strcpy(s, "[info]:");
  16.         break;
  17.     case 2:
  18.         strcpy(s, "[warn]:");
  19.         break;
  20.     case 3:
  21.         strcpy(s, "[erro]:");
  22.         break;
  23.     default:
  24.         strcpy(s, "[info]:");
  25.         break;
  26.     }
  27.     //写入一个log,对m_count++, m_split_lines最大行数
  28.     m_mutex.lock();
  29.     m_count++;
  30.     if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0) //日志不是今天或写入的日志行数是最大行的倍数
  31.     {
  32.         char new_log[256] = { 0 };
  33.         fflush(m_fp);
  34.         fclose(m_fp);
  35.         char tail[16] = { 0 };
  36.         //格式化日志中的时间部分
  37.         snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday);     
  38.         if (m_today != my_tm.tm_mday)           //日期不是今天,创建当天日志,更新m_today和m_count
  39.         {
  40.             snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name);
  41.             m_today = my_tm.tm_mday;
  42.             m_count = 0;
  43.         }
  44.         else                                   //写入的日志超出最大行,在之前的日志名基础上加后缀“m_count/m_split_lines”
  45.         {
  46.             snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines);
  47.         }
  48.         m_fp = fopen(new_log, "a");
  49.     }
  50.     m_mutex.unlock();
  51.     va_list valst;
  52.     va_start(valst, format);    //将传入的format参数赋值给valst,便于格式化输出
  53.     string log_str;
  54.     m_mutex.lock();
  55.     //写入的具体时间内容格式
  56.     int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ",        //时间格式化,snprintf成功返回写字符的总数,其中不包括结尾的null字符
  57.         my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday,
  58.         my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s);
  59.     int m = vsnprintf(m_buf + n, m_log_buf_size - n - 1, format, valst);        //内容格式化,用于向字符串中打印数据、数据格式用户自定义,返回写入到字符数组str中的字符个数(不包含终止符)
  60.     m_buf[n + m] = '\n';
  61.     m_buf[n + m + 1] = '\0';
  62.     log_str = m_buf;
  63.     m_mutex.unlock();
  64.     if (m_is_async && !m_log_queue->full())     //异步且阻塞队列未满,则将日志信息加入阻塞队列
  65.     {
  66.         m_log_queue->push(log_str);
  67.     }
  68.     else                                        //同步则加锁向文件中写
  69.     {
  70.         m_mutex.lock();
  71.         fputs(log_str.c_str(), m_fp);
  72.         m_mutex.unlock();
  73.     }
  74.     va_end(valst);
  75. }
复制代码
va_start和va_end

  1. #define va_start(list,param1)   ( list = (va_list)&param1+ sizeof(param1) )
  2. #define va_end(list) ( list = (va_list)0 )
复制代码

   某些版本的va_start宏为了方便对va_list的遍历,就给参数列表动态分配内存。这样一种C实现很可能使用va_end宏来开释此前动态分配的内存;
如果忘记调用宏va_end,最后得到的程序可能在某些机型上没有题目,而在另一些机型上则发生“内存泄漏”
——《C陷阱与缺陷》
  
重要参考文章

项目代码

参考项目代码(github)
项目团体明白

TinyWebServer—从0到服务器开发
小白视角:一文读懂社长的TinyWebServer
C++网络编程入门:轻量级web并发服务器开发
#Web服务器-原始版本(来自微信公众号:两猿社)
I/O多路复用

一文看懂IO多路复用
IO多路复用的三种机制select、poll、epoll
epoll的明白
epoll中的ET和LT模式区别
http模块明白

线程池web服务器http_conn请求类
深入浅出明白有限状态机
什么是状态机,一篇文章就够了
浅析epoll—epoll函数深入讲解
从零开始,编写一个Web服务器—HTTP部分详细讲解以及代码实现
mmap()函数参数详解
一篇让你彻底相识http请求报文和相应报文的布局
GET和POST两种基本请求方法的区别
线程池模块明白

WEB服务器——TinyWebServer代码详细讲解(threadpool模块)
基于Linux的C++轻量级web服务器webserver/httpserver——线程池
如何深刻明白Reactor和Proactor?
数据库毗连池模块明白

基于MysqlConnector/C++的数据库毗连池的实现
zwiley的随机——数据库毗连池
Mysql接口API详细使用说明
定时器模块明白

Web服务器—TinyWebServer代码详细讲解(timer模块)
如何实现一个定时器?看这一篇就够了
定时器设计(一)
信号同步机制明白

TinyWebServer 相干函数使用与样例 [线程同步机制]
良许Linux——互斥量mutex
日志系统明白

C++单例模式
生产者-消耗者模子:理论讲解及实现
va_list、va_start和va_end使用

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4