轻量级web并发服务器——TinyWebServer的学习相识

打印 上一主题 下一主题

主题 534|帖子 534|积分 1602


前言

本文旨在学习该项目的同时对其代码、原理等内容有更深的明白,学习过程中鉴戒大量网上文章,如明白存在不当之处或有所遗漏短缺,还望各位大佬提点指教
部分图片来自网络

TinyWebServer是什么

WebServer是什么

一个WebServer指的是一个服务器程序大概运行该服务器程序的硬件,其重要功能是通过http协议与客户端(通常是浏览器)进行通信,能够接收、存储、处理惩罚来自客户端的http请求,并对其作出肯定的相应,返回客户端请求的内容或返回一个Error信息
TinyWebServer是什么

TinyWebServer是一个在Linux操纵系统下的轻量级web服务器,能够实现以下几种功能:


  • 使用线程池 + 非阻塞socket + epoll + 变乱处理惩罚的并发模子
  • 使用状态机剖析http请求报文,支持剖析POST和GET请求
  • 访问服务器数据库实现web端用户注册、登录功能,可以请求服务器图片和视频文件
  • 可以实现上万的并发毗连数据互换(Webbench测试)
相干基础知识

必要对Linux编程、网络编程有肯定相识
书籍推荐:《深入明白计算机系统》、《Unix网络编程》、《Linux高性能服务器编程》
项目中一些相干知识会在对应模块内提到

用户如何与服务器进行通信

用户通常使用web浏览器与服务器进行通信,web浏览器则通过将用户输入的域名剖析得到对应的ip地址,通过TCP协议的三次握手创建与目标web服务器的毗连,之后HTTP协议生成http请求报文发送到目标web服务器上,服务器则使用socket监听来自用户的请求。关于socket创建毗连方面的内容可以通过我之前的文章进行肯定的相识socket实现简朴的文件传输
当服务器处理惩罚一个http请求的时候,还必要继续监听其他用户的请求并为其分配另一逻辑单位用来处理惩罚,即并发(后面会提到线程池并发)。在该项目中,服务器使用epoll这种多路I/O复用技术来实现对监听socket和毗连socket的同时监听
注意:I/O复用可以同时监听多个文件描述符,但其本身是阻塞的,而且当有多个文件描述符同时停当的时候,如不采取额外措施则程序顺序处理惩罚此中停当的每个文件描述符
因此,为了进步服从,项目中使用了线程池来实现多线程并发,为每个停当的文件分配一个逻辑单位(线程)来处理惩罚

代码架构

该项目中的代码架构如下

接下来我将分模块进行学习明白

I/O多路复用

I/O模子

Linux提供了五种I/O处理惩罚模子(详见《Unix网络编程》):


  • 同步阻塞I/O

    • 必要阻塞调用线程等候数据到来
    • 必要阻塞等候数据从内核态拷贝到用户态

若服务器端采用单线程,当accept一个请求后,在recv或send调用阻塞时,无法accept其他请求,无法处理惩罚并发
若服务器端采用多线程,当accept一个请求后,开启线程进行recv,可以完成并发处理惩罚,但随请求数增加必要增加系统线程,占用大量内存空间,且线程切换会带来很大的开销


  • 同步非阻塞I/O

    • 调用线程不必要等候数据到来,但必要不停查询数据到来等候线程同步
    • 必要阻塞等候数据从内核态拷贝到用户态

服务器端accept一个请求后,加入fds聚集(一般为数组),每次轮询一遍fds聚集recv数据(非阻塞),没有数据立刻返回错误。轮询操纵会浪费大量不必要的CPU资源


  • 同步I/O多路复用

    • 现在用的最多的I/O模子
    • 同阻塞同步I/O,但等候的是多个文件描述符的数据

服务器端对一组文件描述符进行相干变乱的注册(fd列表),采用单线程通过select/poll/epoll等系统调用获取fd列表,遍历有变乱的fd进行accept/recv/send,使其能支持更多的并发毗连请求


  • 信号驱动I/O

    • 使用信号机制让调用线程不消阻塞等候数据到来
    • 必要阻塞等候数据从内核态拷贝到用户态

在网络编程中,与socket相干的读写变乱太多,无法在信号对应处理惩罚函数中区分产生该信号的变乱。只得当在I/O变乱单一情况下使用,例如监听端口的socket


  • 异步I/O

    • 不必要阻塞等候数据到来,信号通知调用线程获取数据
    • 不必要阻塞等候数据拷贝,内核主动完成拷贝过程

   在Linux下的异步I/O是不美满的,aio系列函数是由POSIX界说的异步操纵接口,不是真正的操纵系统级别支持的,而是在用户空间模拟出来的异步,且仅支持基于当地文件的aio异步操纵,网络编程中的socket是不支持的
在windows里实现了一套完整的支持socket的异步编程接口IOCP,是由操纵系统级别实现的异步I/O
  综合以上几种I/O模子的优缺点,现在Linux的高性能服务器
什么是I/O多路复用

I/O多路复用是一种同步I/O模子,实现一个线程可以监视多个文件句柄;一旦某个文件句柄停当,就能够通知应用程序进行相应的读写操纵;没有文件句柄停当时会阻塞应用程序,交出CPU


  • 多路指的是网络毗连
  • 复用指的是同一个线程复用
简朴来说,I/O多路复用就是一种时分复用,在同一个线程中,通过类似切换开关的方式来宏观上同时传输多个I/O流
I/O多路复用的三种实现方式

select

  1. //select函数接口
  2. #include <sys/select.h>
  3. #include <sys/time.h>
  4. #define FD_SETSIZE 1024
  5. #define NFDBITS (8 * sizeof(unsigned long))
  6. #define __FDSET_LONGS (FD_SETSIZE/NFDBITS)
  7. // 数据结构 (bitmap)
  8. typedef struct {
  9.     unsigned long fds_bits[__FDSET_LONGS];
  10. } fd_set;
  11. // API
  12. int select(
  13.     int max_fd,
  14.     fd_set *readset,
  15.     fd_set *writeset,
  16.     fd_set *exceptset,
  17.     struct timeval *timeout
  18. )                              // 返回值就绪描述符的数目
  19. FD_ZERO(int fd, fd_set* fds)   // 清空集合
  20. FD_SET(int fd, fd_set* fds)    // 将给定的描述符加入集合
  21. FD_ISSET(int fd, fd_set* fds)  // 判断指定描述符是否在集合中
  22. FD_CLR(int fd, fd_set* fds)    // 将给定的描述符从文件中删除  
复制代码
select 实现多路复用的方式是,将已毗连的 Socket 都放到一个文件描述符聚集,然后调用 select 函数将文件描述符聚集拷贝到内核里,让内核来检查是否有网络变乱产生。检查的方式很粗暴,就是通过遍历文件描述符聚集的方式,当检查到有变乱产生后,将此 Socket 标志为可读或可写, 接着再把整个文件描述符聚集拷贝回用户态里,然后用户态还必要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理惩罚
所以,对于 select 这种方式,必要进行 2 次「遍历」文件描述符聚集,一次是在内核态里,一次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符聚集,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中
select将监听的文件描述符分为三组,每一组监听差别的必要进行的IO操纵。readfds是必要进行读操纵的文件描述符,writefds是必要进行写操纵的文件描述符,exceptfds是必要进行异常变乱处理惩罚的文件描述符。这三个参数可以用NULL来表示对应的变乱不必要监听。当select返回时,每组文件描述符会被select过滤,只留下可以进行对应IO操纵的文件描述符
select的调用会阻塞到有文件描述符可以进行IO操纵或被信号打断大概超时才会返回(条件触发)
select的缺点


  • select 使用固定长度的 BitsMap表示文件描述符聚集,单个进程所打开的fd是有限定的,通过FD_SETSIZE设置,默认1024
  • 每次调用select,都要把fd聚集从用户态拷贝到内核态,在多个fd时开销较大
  • 对socket扫描时是线性扫描,采用轮询的方式,服从较低(高并发时)
poll

  1. #include <poll.h>
  2. // 数据结构
  3. struct pollfd {
  4.     int fd;                         // 需要监视的文件描述符
  5.     short events;                   // 需要内核监视的事件
  6.     short revents;                  // 实际发生的事件
  7. };
  8. // API
  9. int poll(struct pollfd fds[], nfds_t nfds, int timeout);
复制代码
和select用三组文件描述符差别的是,poll只有一个pollfd数组,数组中的每个元素都表示一个必要监听IO操纵变乱的文件描述符。events参数是我们必要关心的变乱,revents是全部内核监测到的变乱。poll也不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限定,当然还会受到系统文件描述符限定
但是 poll 和 select 并没有太大的本质区别,都是使用「线性布局」存储进程关注的 Socket 聚集,因此都必要遍历文件描述符聚集来找到可读或可写的 Socket,时间复杂度为 O(n),而且也必要在用户态与内核态之间拷贝文件描述符聚集,这种方式随着并发数上来,性能的损耗会呈指数级增长
poll的缺点


  • 每次调用poll,都必要把fd聚集从用户态拷贝到内核态,多个fd时开销很大
  • 对socket扫描时是线性扫描,采用轮询的方式,服从较低(高并发时)
epoll

  1. #include <sys/epoll.h>
  2. // 数据结构
  3. // 每一个epoll对象都有一个独立的eventpoll结构体
  4. // 用于存放通过epoll_ctl方法向epoll对象中添加进来的事件
  5. // epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
  6. struct eventpoll {
  7.     /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
  8.     struct rb_root  rbr;
  9.     /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
  10.     struct list_head rdlist;
  11. };
  12. // API
  13. int epoll_create(int size); // 内核中间加一个 ep 对象,把所有需要监听的 socket 都放到 ep 对象中
  14. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 负责把 socket 增加、删除到内核红黑树
  15. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);// epoll_wait 负责检测可读队列,没有可读 socket 则阻塞进程
复制代码
以上两种方式没有解决必要多次在用户态和内核态切换造成大量数据开销和轮询扫描socket导致服从低的题目,而epoll通过两种方面很好地解决了以上题目:


  • epoll在内核中使用红黑树来跟踪进程全部待检测的文件描述符,把必要监控的socket通过epoll_ctl()函数加入内核中的红黑树里,通过对红黑树进行操纵,每次只必要传入一个待检测的socket,减少了内核和用户空间大量的数据拷贝和内存分配
  • epoll使用变乱驱动的机制,内核里维护了一个链表“rdlist”来记录变乱,当某个socket有变乱发生时,通过回调函数内核会将其加入到这个停当变乱列表中,当用户调用 epoll_wait() 函数时,只会返回有变乱发生的文件描述符的个数,不必要轮询扫描整个 socket 聚集,大大进步了检测的服从
   

  • 程序可能随时调用epoll_ctl添加监视socket,也可能随时删除。当删除时,若该socket已经存放在停当列表中,它也应该被移除。因此停当列表应是一种能够快速插入和删除的数据布局,epoll选择了双向链表来实现停当队列
  • epoll将“维护监视队列”和“进程阻塞”分离,也意味着必要有个数据布局来保存监视的socket,至少要方便的添加和移除,还要便于搜刮,以避免重复添加,epoll选择了服从较好的红黑树作为索引布局
  • 因为操纵系统要兼顾多种功能,以及有更多必要保存的数据,rdlist并非直接引用socket,而是通过epitem间接引用,红黑树的节点也是epitem对象;同理,文件系统也非直接引用socket
  epoll的缺点


  • epoll在内核态维护文件描述符聚集,每次添加文件描述符必要执行一个系统调用,在有多个短期活跃毗连的情况下,epoll执行服从较低
epoll与LT/ET

epoll的三大函数



  • 创建epoll函数
  1. #include <sys / epoll.h>
  2. int epoll_create(int size)
  3. size:最大监听的fd+1
  4. return:成功返回文件描述符fd;失败返回-1,可根据错误码判断错误类型
复制代码
创建一个epoll的句柄eventpoll,会占用一个fd值,在linux下检察“/proc/进程id/fd/”能够看到该fd,因此在使用完epoll后必要调用close()关闭,否则可能导致fd耗尽
   自从Linux2.6.8版本以后,size值只必要保证大于0,因为内核可以动态的分配大小,不必要size这个提示了
    在linux 2.6.27中加入了epoll_create1(int flag)
flag为0时表示与epoll_create()完全一样;
flag = EPOLL_CLOEXEC,创建的epfd会设置FD_CLOEXEC;
flag = EPOLL_NONBLOCK,创建的epfd会设置为非阻塞
  

  • epoll变乱的注册函数
  1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
  2. epfd:epoll_create()返回的epoll fd
  3. op:操作值
  4. fd:需要监听的fd
  5. event:需要监听的事件
  6. return:成功返回0;失败返回-1,可根据错误码判断错误类型
复制代码


  • epoll_ctl()中操纵数op的三种操纵类型

    • EPOLL_CTL_ADD:注册目标fd到epoll fd中,同时关联event到fd上
    • EPOLL_CTL_MOD:修改已经注册到fd的监听变乱
    • EPOLL_CTL_DEL:从epoll fd中删除/移除已注册的fd

  • epoll_ctl()中变乱event的罗列如下:

    • EPOLLIN:表示关联的fd可以进行读操纵
    • EPOLLOUT:表示关联的fd可以进行写操纵
    • EPOLLRDHUP:表示socket关闭了毗连(Linux2.6.17后上层只需通过EPOLLRDHUP判断对端是否关闭socket,减少一次系统调用)
    • EPOLLPRI:表示关联的fd有紧急优先变乱可以进行读操纵
    • EPOLLERR:表示关联的fd发生了错误,epoll_wait会一直等候这个变乱,一般无需设置该属性
    • EPOLLHUP:表示关联的fd挂起,epoll_wait会一直等候这个变乱,一般无需设置该属性
    • EPOLLET:设置关联的fd为ET的工作方式,epoll默认的工作方式是LT
    • EPOLLONESHOT:设置关联的fd为one-shot的工作方式,表示只监听一次变乱,如果要再次监听,则需再次把该socket放入epoll队列中

当socket接收到数据后,中断程序会给eventpoll的停当列表“rdlist”添加socket引用,而不是直接唤醒进程


  • epoll等候变乱函数
  1. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
  2. epfd:epoll描述符
  3. events:分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)
  4. maxevents:本次可以返回的最大事件数目,通常与预分配的events数组的大小是相等的
  5. timeout:在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待
  6. return:成功返回需要处理的事件数目,返回0表示已超时;失败则返回-1,可以根据错误码判断错误类型
复制代码
当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程
当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒eventpoll等候队列中的进程,进程通过rdlist知道哪些socket发生了变革而无需轮询socket列表

ET(边沿触发)模式和LT(程度触发)模式



  • 边沿触发(edge-triggered)

    • socket的接收缓冲区状态变革时触发读变乱,即空的接收缓冲区刚接收到数据时触发读变乱
    • socket的发送缓冲区状态变革时触发写变乱,即满的缓冲区刚空出空间时触发读变乱
    • epoll_ctl()的events = EPOLLIN | EPOLLET 或 events = EPOLLOUT | EPOLLET 表示ET模式
    • 使用边沿触发模式时,当被监控的 Socket 描述符上有可读变乱发生时,服务器端只会从 epoll_wait 中苏醒一次,即使进程没有调用 read 函数从内核读取数据,也依然只苏醒一次,因此我们程序要保证一次性将内核缓冲区的数据读取完大概碰到EAGAIN错误,必要设置socket描述符为非阻塞套接字

  • 变乱触发(level-triggered)

    • socket接收缓冲区不为空,有数据可读,则读变乱一直触发
    • socket发送缓冲区不满,可以继续写入数据,则写变乱一直触发
    • epoll_ctl()的events = EPOLLIN | EPOLLLT 或 events = EPOLLOUT | EPOLLLT 表示ET模式
    • 使用程度触发模式时,当被监控的 Socket 上有可读变乱发生时,服务器端不停地从 epoll_wait 中苏醒,直到内核缓冲区数据被 read 函数读完才竣事,epoll默认采用LT模式工作(select和poll只有LT模式)

  • ET的处理惩罚过程

    • accept一个一个毗连,添加到epoll中监听EPOLLIN|EPOLLOUT变乱
    • 当EPOLLIN变乱到达时,read fd中的数据并处理惩罚,read必要一直读,直到返回EAGAIN为止
    • 当必要写出数据时,把数据write到fd中,直到数据全部写完,大概write返回EAGAIN
    • 当EPOLLOUT变乱到达时,继续把数据write到fd中,直到数据全部写完,大概write返回EAGAIN

  • LT的处理惩罚过程

    • accept一个毗连,添加到epoll中监听EPOLLIN变乱
    • 当EPOLLIN变乱到达时,read fd中的数据并处理惩罚
    • 当必要写出数据时,把数据write到fd中;如果数据较大,无法一次性写出,那么在epoll中监听EPOLLOUT变乱
    • 当EPOLLOUT变乱到达时,继续把数据write到fd中;如果数据写出完毕,那么在epoll中关闭EPOLLOUT变乱

ET模式的要求是必要一直读写,直到返回EAGAIN,否则就会遗漏变乱;LT的并不要求读写到返回EAGAIN为止,但通常会读写到返回EAGAIN,而且LT比ET多了一个开关EPOLLOUT的步骤
ET模式在某些场景下更加高效,但另一方面轻易遗漏变乱,轻易产生bug
对于nginx这种高性能服务器,ET模式是很好的,而其他的通用网络库,许多是使用LT,避免使用的过程中出现bug
epoll的常用框架

  1. for( ; ; )
  2.     {
  3.         nfds = epoll_wait(epfd,events,20,500);
  4.         for(i=0;i<nfds;++i)
  5.         {
  6.             if(events[i].data.fd==listenfd) //有新的连接
  7.             {
  8.                 connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接
  9.                 ev.data.fd=connfd;
  10.                 ev.events=EPOLLIN|EPOLLET;
  11.                 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中
  12.             }
  13.             else if( events[i].events&EPOLLIN ) //接收到数据,读socket
  14.             {
  15.                 n = read(sockfd, line, MAXLINE)) < 0    //读
  16.                 ev.data.ptr = md;     //md为自定义类型,添加数据
  17.                 ev.events=EPOLLOUT|EPOLLET;
  18.                 epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
  19.             }
  20.             else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
  21.             {
  22.                 struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;    //取数据
  23.                 sockfd = md->fd;
  24.                 send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //发送数据
  25.                 ev.data.fd=sockfd;
  26.                 ev.events=EPOLLIN|EPOLLET;
  27.                 epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
  28.             }
  29.             else
  30.             {
  31.                 //其他的处理
  32.             }
  33.         }
  34.     }
复制代码

HTTP——HTTP毗连与请求相应

在readme文档中提到,该类通过主从状态机封装了http毗连类,主状态机在内部调用从状态机,从状态机将处理惩罚状态和数据传给主状态机
有限状态机

有限状态机(Finite_state machine,FSM),又称有限状态主动机,简称状态机,是表示有限个状态以及在这些状态之间的转移和动作等行为的数学模子,其作用重要是描述对象在它的生命周期内所经历的状态序列,以及如何相应来自外界的各种变乱,在计算机科学中,有限状态机被广泛运用于建模、硬件电路系统设计、软件工程、编译器、网络协议等
有限状态机重要有三个特征:

  • 状态总数是有限的
  • 任意时刻只处在一种状态之中
  • 某种条件下,会从一个状态转变到另一个状态
http模块中的主从状态机剖析

http报文的布局如下图所示

以一个具体的报文为例

此中"OST"为请求方法,“/v3/cloudconf”为URL,“HTTP/1.1”为协议版本,之后到空行前的为请求头,空行后的为请求包体
头文件中分别界说了主状态机的三种状态和从状态机的三种状态

主状态机的状态表明当前正在处理惩罚请求报文的哪一部分

从状态机的状态表明对请求报文当前部分的处理惩罚是否出现题目
请求报文的剖析

当webserver的线程池有空闲线程时,某一线程调用process()来完成请求报文的剖析及相应
  1. void http_conn::process()
  2. {
  3.         HTTP_CODE read_ret = process_read();
  4.         if (read_ret == NO_REQUEST)                                                                        //表示请求不完整,需要继续接收请求数据
  5.         {
  6.                 modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);                //注册并监听读事件
  7.                 return;
  8.         }
  9.         bool write_ret = process_write(read_ret);                                        //调用process_write完成报文响应
  10.         if (!write_ret)
  11.         {
  12.                 close_connect();
  13.         }
  14.         modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);                        //注册并监听写事件
  15. }
复制代码
主状态机的状态转换使用process_read()封装,从状态机则用parse_line()封装
process_read()函数中,主状态机初始化从状态机,然后通过while循环实现主从状态机的状态转换以及循环处理惩罚报文内容。从状态机负责剖析指定报文内容,并根据剖析效果更改从状态机的状态;主状态机根据从状态机的返回值判断是否退出循环(终止处理惩罚/竣事处理惩罚),并根据从状态机的驱动更改自身状态
主状态机与从状态机的状态转换及其关系如下图所示

从状态机

在HTTP报文中,每一行的数据由“\r”、“\n”作为竣事字符,空行则是仅仅是字符“\r”、“\n”。因此,可以通过查找“\r”、“\n”将报文拆解成单独的行进行剖析。从状态机负责读取buffer中的数据,将每行数据末端的“\r”、“\n”符号改为“\0”,并更新从状态机在buffer中读取的位置m_checked_idx,以此来驱动主状态机剖析
  1. //从状态机,用于分析出一行内容
  2. //返回值为行的读取状态,有LINE_OK,LINE_BAD,LINE_OPEN
  3. http_conn::LINE_STATUS http_conn::parse_line()
  4. {
  5.         char temp;
  6.         for (; m_checked_idx < m_read_idx; ++m_checked_idx)
  7.                 //m_read_idx指向缓冲区m_read_buf的数据末尾的下一个字节
  8.                 //m_checked_idx指向从状态机目前正在分析的字节
  9.         {
  10.                 temp = m_read_buf[m_checked_idx];                //temp:将要分析的字节
  11.                 if (temp == '\r')                // \r有可能是完整行
  12.                 {
  13.                         if ((m_checked_idx + 1) == m_read_idx)                //该行仍有内容,并未读完
  14.                                 return LINE_OPEN;
  15.                         else if (m_read_buf[m_checked_idx + 1] == '\n')                //出现换行符,说明该行读完
  16.                         {
  17.                                 m_read_buf[m_checked_idx++] = '\0';                // \r、\n都改为结束符\0
  18.                                 m_read_buf[m_checked_idx++] = '\0';
  19.                                 return LINE_OK;
  20.                         }
  21.                         return LINE_BAD;
  22.                 }
  23.                 else if (temp == '\n')
  24.                 {
  25.                         if (m_checked_idx > 1 && m_read_buf[m_checked_idx - 1] == '\r')                //前一个字符是\r,则接收完整
  26.                         {
  27.                                 m_read_buf[m_checked_idx - 1] = '\0';
  28.                                 m_read_buf[m_checked_idx++] = '\0';
  29.                                 return LINE_OK;
  30.                         }
  31.                         return LINE_BAD;
  32.                 }
  33.         }
  34.         return LINE_OPEN;                //未发现换行符,说明读取的行不完整
  35. }
  36. /*
  37. LINE_OK:完整读取一行
  38. LINE_BAD:报文语法有误
  39. LINE_OPEN:读取的行不完整
  40. */
复制代码
主状态机

主状态机初始状态为CHECK_STATE_REQUESTLINE,通过调用从状态机来驱动主状态机。在主状态机剖析前,从状态机已经将每一行末端的“\r”、“\n”符号改为“\0”,以便主状态机直接取出对应字符串进行处理惩罚
   为了避免用户名和密码直接暴露在url中,项目中改用了POST请求,将用户名和密码添加在报文中作为消息体进行了封装
而在POST请求报文中,消息体的末端没有任何字符,不能使用从状态机的状态作为主状态机的while判断条件,因此在process_read()中额外添加了使用主状态机的状态进行判断的条件
剖析完消息体后,报文的完整剖析就完成了,但主状态机的状态还是CHECK_STATE_CONTENT,符合循环条件会再次进入循环,因此增加了“line_status == LINE_OK”并在完成消息体剖析后将该变量更改为LNE_OPEN,此时可以跳出循环完成报文剖析任务
  1. //通过while循环,封装主状态机,对每一行进行循环处理
  2. //此时,从状态机已经修改完毕,主状态机可以取出完整的行进行解析
  3. http_conn::HTTP_CODE http_conn::process_read()
  4. {
  5.         LINE_STATUS line_status = LINE_OK;                                                //初始化从状态机的状态
  6.         HTTP_CODE ret = NO_REQUEST;
  7.         char* text = 0;
  8.         //判断条件,从状态机驱动主状态机
  9.         while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK))
  10.         {
  11.                 text = get_line();
  12.                 m_start_line = m_checked_idx;                //m_start_line:每一个数据行在m_read_buf中的起始位置
  13.                                                                                         //m_checked_idx:从状态机在m_read_buf中的读取位置
  14.                 LOG_INFO("%s", text);
  15.                 switch (m_check_state)                                                //三种状态转换逻辑
  16.                 {
  17.                 case CHECK_STATE_REQUESTLINE:                                //正在分析请求行
  18.                 {
  19.                         ret = parse_request_line(text);                        //解析请求行
  20.                         if (ret == BAD_REQUEST)
  21.                                 return BAD_REQUEST;
  22.                         break;
  23.                 }
  24.                 case CHECK_STATE_HEADER:                                        //正在分析头部字段
  25.                 {
  26.                         ret = parse_headers(text);                                //解析请求头
  27.                         if (ret == BAD_REQUEST)
  28.                                 return BAD_REQUEST;
  29.                         else if (ret == GET_REQUEST)                        //get请求,需要跳转到报文响应函数
  30.                         {
  31.                                 return do_request();                                //响应客户请求
  32.                         }
  33.                         break;
  34.                 }
  35.                 case CHECK_STATE_CONTENT:                                        //解析消息体
  36.                 {
  37.                         ret = parse_content(text);
  38.                         if (ret == GET_REQUEST)                                        //post请求,跳转到报文响应函数
  39.                                 return do_request();
  40.                         line_status = LINE_OPEN;                                //更新,跳出循环,代表解析完了消息体
  41.                         break;
  42.                 }
  43.                 default:
  44.                         return INTERNAL_ERROR;
  45.                 }
  46.         }
  47.         return NO_REQUEST;
  48. }
复制代码
剖析请求行



  • 主状态机所处状态:CHECK_STATE_REQUESTLINE
  • 剖析函数从m_read_buf中剖析HTTP请求行,得到请求方法、目标以及HTTP版本号
  • 剖析完成后主状态机的状态变为CHECK_STATE_HEADER
  1. //解析http请求行,获得请求方法,目标url及http版本号
  2. http_conn::HTTP_CODE http_conn::parse_request_line(char* text)
  3. {
  4.         m_url = strpbrk(text, " \t");                                                        //请求该行中最先含有空格和\t任一字符的位置并返回       
  5.         if (!m_url)                                                                                                //没有目标字符,则代表报文格式有问题
  6.         {
  7.                 return BAD_REQUEST;
  8.         }
  9.         *m_url++ = '\0';                                                                                //将前面的数据取出,后移找到请求资源的第一个字符
  10.         char* method = text;
  11.         if (strcasecmp(method, "GET") == 0)                                                //确定请求方式
  12.                 m_method = GET;
  13.         else if (strcasecmp(method, "POST") == 0)
  14.         {
  15.                 m_method = POST;
  16.                 cgi = 1;                                                                                       
  17.         }
  18.         else
  19.                 return BAD_REQUEST;
  20.         m_url += strspn(m_url, " \t");                                                //得到url地址
  21.         m_version = strpbrk(m_url, " \t");
  22.         if (!m_version)
  23.                 return BAD_REQUEST;
  24.         *m_version++ = '\0';
  25.         m_version += strspn(m_version, " \t");                                //得到http版本号
  26.         if (strcasecmp(m_version, "HTTP/1.1") != 0)
  27.                 return BAD_REQUEST;                                                                //只接受HTTP/1.1版本
  28.         if (strncasecmp(m_url, "http://", 7) == 0)                       
  29.         {
  30.                 m_url += 7;
  31.                 m_url = strchr(m_url, '/');
  32.         }
  33.         if (strncasecmp(m_url, "https://", 8) == 0)
  34.         {
  35.                 m_url += 8;
  36.                 m_url = strchr(m_url, '/');
  37.         }
  38.         if (!m_url || m_url[0] != '/')                                //不符合规则的报文
  39.                 return BAD_REQUEST;
  40.         //当url为/时,显示判断界面
  41.         if (strlen(m_url) == 1)                                                //url为/,显示欢迎界面
  42.                 strcat(m_url, "judge.html");
  43.         m_check_state = CHECK_STATE_HEADER;                        //主状态机状态转移
  44.         return NO_REQUEST;
  45. }
复制代码
剖析请求头



  • 主状态机所处状态:CHECK_STATE_HEADER
  • 判断是空行还是请求头

    • 是空行,则判断content-length是否为0

      • 不为零,则是POST请求
      • 为零,则是GET请求

    • 是请求头,则重要分析connection、content-length等字段

      • connection字段判断毗连类型是长毗连还是短毗连
      • content-length用于读取POST请求的消息体长度


  1. //解析http请求的一个头部信息
  2. http_connect::HTTP_CODE http_connect::parse_headers(char* text)
  3. {
  4.         if (text[0] == '\0')                                                                                //判断是空头还是请求头
  5.         {
  6.                 if (m_content_length != 0)                                                                //具体判断是get请求还是post请求
  7.                 {
  8.                         m_check_state = CHECK_STATE_CONTENT;                                //post请求需要改变主状态机的状态
  9.                         return NO_REQUEST;
  10.                 }
  11.                 return GET_REQUEST;
  12.         }
  13.         else if (strncasecmp(text, "Connection:", 11) == 0)                        //解析头部连接字段
  14.         {
  15.                 text += 11;
  16.                 text += strspn(text, " \t");
  17.                 if (strcasecmp(text, "keep-alive") == 0)                                //判断是否为长连接
  18.                 {
  19.                         m_linger = true;                                                                        //为长连接,设置延迟关闭连接
  20.                 }
  21.         }
  22.         else if (strncasecmp(text, "Content-length:", 15) == 0)                //解析请求头的内容长度字段
  23.         {
  24.                 text += 15;
  25.                 text += strspn(text, " \t");
  26.                 m_content_length = atol(text);                                                        //atol(const char*str):将str所指的字符串转换为一个long int的长整数
  27.         }
  28.         else if (strncasecmp(text, "Host:", 5) == 0)                                //解析请求头部host字段
  29.         {
  30.                 text += 5;
  31.                 text += strspn(text, " \t");
  32.                 m_host = text;
  33.         }
  34.         else
  35.         {
  36.                 LOG_INFO("oop!unknow header: %s", text);
  37.         }
  38.         return NO_REQUEST;
  39. }
复制代码
剖析消息体



  • 主状态机所处状态:CHECK_STATE_CONTENT
  • 仅用于剖析POST请求
  • 用于保存POST请求消息体,为登录和注册做准备
  1. //判断http请求是否被完整读入
  2. http_connect::HTTP_CODE http_connect::parse_content(char* text)
  3. {
  4.         if (m_read_idx >= (m_content_length + m_checked_idx))
  5.         {
  6.                 text[m_content_length] = '\0';
  7.                 //POST请求中最后为输入的用户名和密码
  8.                 m_string = text;
  9.                 return GET_REQUEST;
  10.         }
  11.         return NO_REQUEST;
  12. }
复制代码
请求报文的相应

在完成请求报文的剖析后,明确用户想要登录/注册,必要跳转到相应的界面、添加用户名、验证用户等等,并将相应的数据写入相应报文返回给浏览器,具体流程图如下(图片取自微信公众号两猿社):

在头文件中根据HTTP请求的处理惩罚效果初始化了几种情况


  • NO_REQUEST

    • 请求不完整,必要继续读取请求报文数据
    • 跳转主线程继续监测读变乱

  • GET_REQUEST

    • 得到了完整的HTTP请求
    • 调用do_request完成请求资源映射

  • BAD_REQUEST

    • HTTP请求报文有语法错误或请求资源为目次
    • 跳转process_write完成相应报文

  • NO_RESOURCE

    • 请求资源不存在
    • 跳转process_write完成相应报文

  • FORBIDDEN_REQUEST

    • 请求资源克制访问,没有读取权限
    • 跳转process_write完成相应报文

  • FILE_REQUEST

    • 请求资源可以正常访问
    • 跳转process_write完成相应报文

  • INTERNAL_ERROR

    • 服务器内部错误,该效果在主状态机逻辑switch的default下,一般不会触发

  • CLOSED_CONNECTION

    • 客户端关闭毗连

do_request函数

在process_read()中完成请求报文的剖析后,状态机调用do_request()函数,该函数负责处理惩罚功能逻辑,具体做法为:将网站根目次与url文件拼接,然后通过stat判断文件属性。浏览器网址栏中的字符,即url,可以将其抽象成ip:port/xxx,xxx通过html文件的action属性进行设置。别的为了进步访问速度,通过mmap进行映射,将平常文件映射到内存逻辑地址。
  1. //对客户请求进行响应
  2. http_connect::HTTP_CODE http_connect::do_request()
  3. {
  4.         strcpy(m_real_file, doc_root);                                        //将初始化的m_real_file赋值为网站根目录
  5.         int len = strlen(doc_root);
  6.         //printf("m_url:%s\n", m_url);
  7.         const char* p = strrchr(m_url, '/');                        //找到m_url中“/”的位置
  8.         //处理cgi
  9.         if (cgi == 1 && (*(p + 1) == '2' || *(p + 1) == '3'))
  10.         {
  11.                 //根据标志判断是登录检测还是注册检测
  12.                 char flag = m_url[1];
  13.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  14.                 strcpy(m_url_real, "/");
  15.                 strcat(m_url_real, m_url + 2);
  16.                 strncpy(m_real_file + len, m_url_real, FILENAME_LEN - len - 1);
  17.                 free(m_url_real);
  18.                 //将用户名和密码提取出来
  19.                 //user=123        password=123
  20.                 char name[100], password[100];
  21.                 int i;
  22.                 for (i = 5; m_string[i] != '&'; ++i)
  23.                         name[i - 5] = m_string[i];
  24.                 name[i - 5] = '\0';
  25.                 int j = 0;
  26.                 for (i = i + 10; m_string[i] != '\0'; ++i, ++j)
  27.                         password[j] = m_string[i];
  28.                 password[j] = '\0';
  29.                 if (*(p + 1) == '3')
  30.                 {
  31.                         //如果是注册,先检测数据库中是否有重名的
  32.                         //没有重名的,进行增加数据
  33.                         char* sql_insert = (char*)malloc(sizeof(char) * 200);
  34.                         strcpy(sql_insert, "INSERT INTO user(username, passwd) VALUES(");
  35.                         strcat(sql_insert, "'");
  36.                         strcat(sql_insert, name);
  37.                         strcat(sql_insert, "', '");
  38.                         strcat(sql_insert, password);
  39.                         strcat(sql_insert, "')");
  40.                         if (users.find(name) == users.end())                        //说明库中没有重名
  41.                         {
  42.                                 m_lock.lock();
  43.                                 int res = mysql_query(mysql, sql_insert);
  44.                                 users.insert(pair<string, string>(name, password));
  45.                                 m_lock.unlock();
  46.                                 if (!res)
  47.                                         strcpy(m_url, "/log.html");
  48.                                 else
  49.                                         strcpy(m_url, "/registerError.html");
  50.                         }
  51.                         else
  52.                                 strcpy(m_url, "/registerError.html");
  53.                 }
  54.                 //如果是登录,直接判断
  55.                 //若浏览器端输入的用户名和密码在表中可以查找到,返回1,否则返回0
  56.                 else if (*(p + 1) == '2')
  57.                 {
  58.                         if (users.find(name) != users.end() && users[name] == password)
  59.                                 strcpy(m_url, "/welcome.html");
  60.                         else
  61.                                 strcpy(m_url, "/logError.html");
  62.                 }
  63.         }
  64.         if (*(p + 1) == '0')                                //如果请求资源为/0,表示跳转注册界面
  65.         {
  66.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  67.                 strcpy(m_url_real, "/register.html");
  68.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));        //将网站目录和/register.html进行拼接,更新到m_real_file中
  69.                 free(m_url_real);
  70.         }
  71.         else if (*(p + 1) == '1')                        //如果请求资源为/1,表示跳转登录页面
  72.         {
  73.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  74.                 strcpy(m_url_real, "/log.html");
  75.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));        //将网站目录和/log.html进行拼接,更新到m_real_file中
  76.                 free(m_url_real);
  77.         }
  78.         else if (*(p + 1) == '5')
  79.         {
  80.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  81.                 strcpy(m_url_real, "/picture.html");
  82.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
  83.                 free(m_url_real);
  84.         }
  85.         else if (*(p + 1) == '6')
  86.         {
  87.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  88.                 strcpy(m_url_real, "/video.html");
  89.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
  90.                 free(m_url_real);
  91.         }
  92.         else if (*(p + 1) == '7')
  93.         {
  94.                 char* m_url_real = (char*)malloc(sizeof(char) * 200);
  95.                 strcpy(m_url_real, "/fans.html");
  96.                 strncpy(m_real_file + len, m_url_real, strlen(m_url_real));
  97.                 free(m_url_real);
  98.         }
  99.         else
  100.                 strncpy(m_real_file + len, m_url, FILENAME_LEN - len - 1);
  101.         if (stat(m_real_file, &m_file_stat) < 0)                //通过stat获取请求资源文件信息,成功则将信息更新到m_file_stat结构体;失败返回NO_RESOURCE状态,表示资源不存在
  102.                 return NO_RESOURCE;
  103.         if (!(m_file_stat.st_mode & S_IROTH))                        //判断文件类型,客户端是否有访问权限
  104.                 return FORBIDDEN_REQUEST;
  105.         if (S_ISDIR(m_file_stat.st_mode))                                //判断该路径是否为目录
  106.                 return BAD_REQUEST;
  107.        
  108.         //os.open(file,flags[,mode]):打开一个文件
  109.         int fd = open(m_real_file, O_RDONLY);                        //以只读方式获取文件描述符,通过mmap将该文件映射到内存中               
  110.         m_file_address = (char*)mmap(0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
  111.         close(fd);                                                                                //避免文件描述符的浪费和占用
  112.         return FILE_REQUEST;                                                        //表示请求文件存在且可以访问
  113. }
  114. /*
  115. open(file,flags[,mode])中flags的参数
  116. O_RDONLY: 以只读的方式打开
  117. O_WRONLY: 以只写的方式打开
  118. O_RDWR : 以读写的方式打开
  119. O_NONBLOCK: 打开时不阻塞
  120. O_APPEND: 以追加的方式打开
  121. O_CREAT: 创建并打开一个新文件
  122. O_TRUNC: 打开一个文件并截断它的长度为零(必须有写权限)
  123. O_EXCL: 如果指定的文件存在,返回错误
  124. O_SHLOCK: 自动获取共享锁
  125. O_EXLOCK: 自动获取独立锁
  126. O_DIRECT: 消除或减少缓存效果
  127. O_FSYNC : 同步写入
  128. O_NOFOLLOW: 不追踪软链接
  129. */
复制代码


  • stat函数用于取得指定文件的文件属性,并将文件属性存储在布局体stat里
  1. #include <sys/types.h>
  2. #include <sys/stat.h>
  3. #include <unistd.h>
  4. //获取文件属性,存储在statbuf中
  5. int stat(const char *pathname, struct stat *statbuf);
  6. struct stat
  7. {
  8.         mode_t    st_mode;        /* 文件类型和权限 */
  9.         off_t     st_size;        /* 文件大小,字节数*/
  10. };
复制代码


  • mmap用于将文件等映射到内存,进步访问速度
  1. void* mmap(void* start,size_t length,int prot,int flags,int fd,off_t offset);
  2. int munmap(void* start,size_t length);
  3. /*
  4. start:映射区的开始地址,设置为0时表示由系统决定映射区的起始地址
  5. length:映射区的长度,从被映射文件开头offset个字节算起
  6. prot:期望的内存保护标志,不能与文件的打开模式冲突,可取以下几个值的或:PROT_READ(可读), PROT_WRITE(可写), PROT_EXEC(可执行), PROT_NONE(不可访问)
  7.         PROT_READ表示页内容可以被读取
  8. flags:指定映射对象的类型,映射选项和映射页是否可以共享,可以是以下几个常用值的或:MAP_FIXED(使用指定的映射起始地址), MAP_SHARED(与其它所有映射这个对象的进程共享映射空间), MAP_PRIVATE(建立一个写入时拷贝的私有映射)
  9.         MAP_PEIVATE建立一个写入时拷贝的私有映射,内存区域的写入不会影响到原文件
  10. fd:有效地文件描述符,一般是由open()函数返回
  11. offset:被映射对象内容的起点
  12. */
复制代码


  • iovec界说向量元素,通常该布局用作一个多元素的数组
  1. struct iovec {
  2.     void      *iov_base;      /* starting address of buffer */
  3.     size_t    iov_len;        /* size of buffer */
  4. };
  5. /*
  6. iov_base指向数据的地址
  7. iov_len表示数据的长度
  8. */
复制代码


  • writev用于在一次函数调用中写多个非连续缓冲区,有时将该函数称为聚集写。若成功则返回已写的字节数,通常等于全部缓冲区长度之和;否则返回-1
  1. #include <sys/uio.h>
  2. ssize_t writev(int filedes, const struct iovec *iov, int iovcnt);
  3. /*
  4. filedes表示文件描述符
  5. iov为io向量机制结构体iovec
  6. iovcnt为结构体的个数
  7. */
复制代码
  特别注意: 循环调用writev时,必要重新处理惩罚iovec中的指针和长度,该函数不会对这两个成员做任何处理惩罚。writev的返回值为已写的字节数,但这个返回值“实用性”并不高,因为参数传入的是iovec数组,计量单位是iovcnt,而不是字节数,我们仍旧必要通过遍历iovec来计算新的基址,别的写入数据的“竣事点”可能位于一个iovec的中间某个位置,因此必要调整临界iovec的io_base和io_len
  process_write函数

根据do_request的返回状态,服务器子线程调用process_write向m_write_buf中写入相应报文,在生成相应报文的过程中重要调用add_reponse()函数更新m_write_idx和m_write_buf
以下几个函数为内部调用add_response函数更新m_write_idx指针和缓冲区m_write_buf中的内容
  1. bool http_connect::add_response(const char* format, ...)
  2. {
  3.         if (m_write_idx >= WRITE_BUFFER_SIZE)                                        //如果写入内容超出m_write_buf大小则报错
  4.                 return false;
  5.         va_list arg_list;                                                                                //定义可变参数列表
  6.         va_start(arg_list, format);                                                                //将变量arg_list初始化为传入参数
  7.         int len = vsnprintf(m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list);        //将数据format从可变参数列表写入缓冲区写,返回写入数据的长度
  8.         if (len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx))                //如果写入的数据长度超过缓冲区剩余空间,则报错
  9.         {
  10.                 va_end(arg_list);
  11.                 return false;
  12.         }
  13.         m_write_idx += len;                                                                                //更新m_write_idx位置
  14.         va_end(arg_list);                                                                                //清空可变参列表
  15.         LOG_INFO("request:%s", m_write_buf);
  16.         return true;
  17. }
复制代码


  • add_status_line函数:添加状态行(http/1.1 状态码 状态消息)
  1. //添加状态行
  2. bool http_connect::add_status_line(int status, const char* title)
  3. {
  4.         return add_response("%s %d %s\r\n", "HTTP/1.1", status, title);
  5. }
复制代码


  • add_headers函数:添加消息报头,内部调用add_content_length和add_linger函数

    • content-length记录相应报文长度,用于浏览器端判断服务器是否发送完数据
    • connection记录毗连状态,用于告诉浏览器端保持长毗连

  1. bool http_connect::add_headers(int content_len)                                                        //添加消息报头,具体的添加文本长度、连接状态和空行
  2. {
  3.         return add_content_length(content_len) && add_linger() &&
  4.                 add_blank_line();
  5. }
  6. bool http_connect::add_content_length(int content_len)                                        //添加Content-Length,表示响应报文的长度
  7. {
  8.         return add_response("Content-Length:%d\r\n", content_len);
  9. }
  10. bool http_connect::add_linger()                                                                                        //添加连接状态,通知浏览器端是保持连接还是关闭
  11. {
  12.         return add_response("Connection:%s\r\n", (m_linger == true) ? "keep-alive" : "close");
  13. }
  14. bool http_connect::add_content_type()                                                                        //添加文本类型,这里是html
  15. {
  16.         return add_response("Content-Type:%s\r\n", "text/html");
  17. }
复制代码


  • add_blank_line:添加空行
  1. bool http_connect::add_blank_line()                                                                                //添加空行
  2. {
  3.         return add_response("%s", "\r\n");
  4. }
复制代码


  • add_content:添加文本content
  1. bool http_connect::add_content(const char* content)
  2. {
  3.         return add_response("%s", content);
  4. }
复制代码
相应报文分为两种,一种是请求文件的存在,通过io向量机制iovec,声明两个iovec,第一个指向m_write_buf,第二个指向mmap的地址m_file_address ;另一种是请求堕落,这时候只申请一个iovec,指向m_write_buf


  • iovec是一个布局体,内里有两个元素,指针成员iov_base指向一个缓冲区,这个缓冲区是存放的是writev将要发送的数据,成员iov_len表示现实写入的长度(详见do_request函数部分中iovec布局的描述)
   往相应报文里写的是服务器中html的文件数据,浏览器端对其进行剖析、渲染并表现在浏览器页面上
  1. bool http_connect::process_write(HTTP_CODE ret)
  2. {
  3.         switch (ret)
  4.         {
  5.         case INTERNAL_ERROR:                                                                                                                //内部错误,500
  6.         {
  7.                 add_status_line(500, error_500_title);                                                                        //状态行
  8.                 add_headers(strlen(error_500_form));                                                                        //消息报头
  9.                 if (!add_content(error_500_form))
  10.                         return false;
  11.                 break;
  12.         }
  13.         case BAD_REQUEST:                                                                                                                        //报文语法有误,404
  14.         {
  15.                 add_status_line(404, error_404_title);
  16.                 add_headers(strlen(error_404_form));
  17.                 if (!add_content(error_404_form))
  18.                         return false;
  19.                 break;
  20.         }
  21.         case FORBIDDEN_REQUEST:                                                                                                                //资源没有访问权限,403
  22.         {
  23.                 add_status_line(403, error_403_title);
  24.                 add_headers(strlen(error_403_form));
  25.                 if (!add_content(error_403_form))
  26.                         return false;
  27.                 break;
  28.         }
  29.         case FILE_REQUEST:                                                                                                                        //文件存在,200
  30.         {
  31.                 add_status_line(200, ok_200_title);
  32.                 if (m_file_stat.st_size != 0)                                                                                        //请求的资源存在
  33.                 {
  34.                         add_headers(m_file_stat.st_size);
  35.                         m_iv[0].iov_base = m_write_buf;                                                                                //第一个iovec指针指向响应报文缓冲区,长度指向m_write_idx
  36.                         m_iv[0].iov_len = m_write_idx;
  37.                         m_iv[1].iov_base = m_file_address;                                                                        //第二个iovec指针指向mmap返回的文件指针,长度指向文件大小
  38.                         m_iv[1].iov_len = m_file_stat.st_size;
  39.                         m_iv_count = 2;
  40.                         bytes_to_send = m_write_idx + m_file_stat.st_size;                                        //发送的全部数据为响应报文头部信息和文件大小
  41.                         return true;
  42.                 }
  43.                 else
  44.                 {
  45.                         const char* ok_string = "<html><body></body></html>";                                //请求的资源大小为0,则返回空白html文件
  46.                         add_headers(strlen(ok_string));
  47.                         if (!add_content(ok_string))
  48.                                 return false;
  49.                 }
  50.         }
  51.         default:
  52.                 return false;
  53.         }
  54.         m_iv[0].iov_base = m_write_buf;                                                                                                //除FILE_REQUEST状态外,其余状态只申请一个iovec,指向响应报文缓冲区
  55.         m_iv[0].iov_len = m_write_idx;
  56.         m_iv_count = 1;
  57.         bytes_to_send = m_write_idx;
  58.         return true;
  59. }
复制代码
write函数

服务器子线程调用process_write完成相应报文,随后注册epollout变乱。服务器主线程检测写变乱,并调用http_conn::write函数将相应报文发送给浏览器端
该函数具体逻辑如下:
生成相应报文时初始化byte_to_send(包罗头部信息和文件数据),通过writev函数循环发送相应报文数据,根据返回值更新byte_have_send和iovec布局体的指针和长度,并判断相应报文团体是否发送成功


  • 若writev单次发送成功,更新byte_to_send和byte_have_send的大小,若相应报文团体发送成功,则取消mmap映射,并判断是否是长毗连

    • 长毗连重置http类实例,注册读变乱,不关闭毗连
    • 短毗连直接关闭毗连

  • 若writev单次发送不成功,判断是否是写缓冲区满了

    • 若不是因为缓冲区满了而失败,取消mmap映射,关闭毗连
    • 若eagain则满了,更新iovec布局体的指针和长度,并注册写变乱,等候下一次写变乱触发(当写缓冲区从不可写变为可写,触发epollout),因此在此期间无法立刻接收到同一用户的下一请求,但可以保证毗连的完整性

  1. bool http_connect::write()
  2. {
  3.         int temp = 0;
  4.         if (bytes_to_send == 0)                                                                                                                //要发送的数据长度为0,表示响应报文为空,一般不会出现该情况
  5.         {
  6.                 modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
  7.                 init();
  8.                 return true;
  9.         }
  10.         while (1)
  11.         {
  12.                 temp = writev(m_sockfd, m_iv, m_iv_count);                                                                //将响应报文的状态行、消息头、空行和响应正文发送给浏览器端
  13.                 if (temp < 0)       
  14.                 {
  15.                         if (errno == EAGAIN)                                                                                                //判断缓冲区是否已满
  16.                         {
  17.                                 modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);                                //重新注册写事件,等待下一次触发
  18.                                 return true;
  19.                         }
  20.                         unmap();                                                                                                                        //发送失败,但不是缓冲区问题,取消映射
  21.                         return false;
  22.                 }
  23.                 bytes_have_send += temp;
  24.                 bytes_to_send -= temp;                                                                                                        //更新已发送字节数
  25.                 if (bytes_have_send >= m_iv[0].iov_len)                                                                        //第一个iovec头部信息的数据已发送完,发送第二个iovec数据
  26.                 {
  27.                         m_iv[0].iov_len = 0;                                                                                                //不再继续发送头部信息
  28.                         m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx);
  29.                         m_iv[1].iov_len = bytes_to_send;
  30.                 }
  31.                 else                                                                                                                                        //继续发送第一个iovec头部信息的数据
  32.                 {
  33.                         m_iv[0].iov_base = m_write_buf + bytes_have_send;
  34.                         m_iv[0].iov_len = m_iv[0].iov_len - bytes_have_send;
  35.                 }
  36.                 if (bytes_to_send <= 0)                                                                                                        //判断条件,数据已全部发送完
  37.                 {
  38.                         unmap();
  39.                         modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);                                        //在epoll树上重置EPOLLONESHOT事件
  40.                         if (m_linger)                                                                                                                //浏览器的请求为长连接
  41.                         {
  42.                                 init();                                                                                                                        //重新初始化HTTP对象
  43.                                 return true;
  44.                         }
  45.                         else
  46.                         {
  47.                                 return false;
  48.                         }
  49.                 }
  50.         }
  51. }
复制代码

ThreadPool——线程池

池式布局

在计算机体系布局中有许多池式布局,例如对象池、数据库毗连池、线程池、内存池等等。使用池式布局的重要原因是必要使用池式布局的对象一般存在创建时间长、资源占用高等特点,在对象过多的情况下或导致运行服从低下,因此使用池式布局复用有限的资源,进步运行服从
线程池

线程池其实就是将多个线程对象放到一个容器中,在该项目中使用了一个数组作为容器。在创建线程时一次性创建多个线程存放进线程池中等候执行任务;必要执行任务时从线程池中取出空闲线程执行,执行完后将线程放回线程池中等候下次执行任务;通过复用线程池中的线程来避免多次创建销毁线程导致的运行服从低下题目。
线程池的使用有效降低了多线程操纵中任务申请和开释产生的性能消耗,在进步线程使用率、进步线程相应速度、同一管理线程对象以及控制最大并发数等题目上起到了很好的效果
线程池重要实用的场景

由于线程池本就是用于减少线程频繁创建销毁导致的运行服从低下题目,仅当线程本身开销与线程执行任务的开销相比不可忽略的情况下作用明显;而在例如FTP服务器或Telnet服务器上,传输文件时间较长、开销较大、线程本身开销可以忽略不计的情况下,线程池能起到的作用就不敷明显,可能不是一种足够理想的方法
线程池通常实用以下几种场所:
1)单位时间内处理惩罚任务频繁且任务处理惩罚时间短
2)对及时性要求较高
线程池的本质及设计要点

本质上,线程池可以简化为一个“生产者-消耗者”模子。线程池执行任务是在“消耗”资源,往线程池中增加任务是“生产”资源。
线程池类的设计重要必要思量以下几点:


  • 界说线程所必要的“任务”

    • 线程所必要的“任务”指的是函数,为了让线程工作,必要向线程传递一个预先声明和界说的函数,而如果要让线程执行差别功能的函数,则必要在传递给线程的函数中调用其他的函数以实现正确的任务

  • 选择符合的容器

    • 对于线程,只必要使用数组就能满足要求;对于工作队列,则可能必要使用队列大概链表等

  • 正确的上锁

    • 生产者-消耗者模子中有包罗互斥锁、信号量等多种上锁模式,本项目中使用了Lock模块中的互斥锁、信号量、条件变量等实现了线程同步

proactor模子和reactor模子

Reactor模子

服务器采用线程池的方式进行资源复用,解决了频繁创建销毁线程带来的性能开销和资源浪费题目,同时也引入了一个新题目:线程如何高效处理惩罚多个毗连的业务?
一个毗连对应一个线程时通常采用“read -> 业务处理惩罚 -> send”的处理惩罚流程,socket默认为阻塞I/O,则当现在毗连无数据可读时线程阻塞在“read”操纵上,该阻塞方式不影响其他线程。但采用了线程池的话,一个线程如果在处理惩罚某个毗连的“read”操纵时阻塞则无法继续处理惩罚其他毗连的业务
为相识决该题目,最简朴的方式是将socket改为非阻塞,线程不停轮询调用“read”操纵来判断有无数据。但该方法线程不知道当前毗连是否有数据可读,因此必要每次通过“read”操纵判断,必要消耗CPU资源且服从低下。为相识决这个题目,可以使用I/O多路复用技术实现只有在毗连上有数据的时候线程才去发出读请求,将I/O多路复用技术封装之后就称为Reactor模子。在Reactor模子中重要有三类处理惩罚变乱:acceptor——负责毗连变乱、handler——负责变乱读写、reactor——负责变乱监听和变乱分发
Reactor模子重要由Reactor和处理惩罚资源池两个核心部分组成,此中Reactor负责监听和分发变乱(包罗毗连变乱、读写变乱),处理惩罚资源池负责处理惩罚变乱。Reactor的数量可以是一个或多个,处理惩罚资源池可以是单个进程/线程,也可以是多个进程/线程,因此Reactor重要分为三种方案:


  • 单Reactor单进程/线程

    • 该模子中reactor、acceptor和handler的功能都是由一个线程来执行的
    • reactor负责变乱的监听和分发
    • 有毗连变乱发生,则reactor分发给accepter创建毗连,然后创建一个handler
    • 有读写变乱发生,则reactor分发给handler进行处理惩罚并最终给客户端返回效果

   单Reactor单进程模子无法充分使用多核CPU的性能,且handler进行业务处理惩罚时进程无法处理惩罚其他毗连,如果业务处理惩罚耗时较长则造成相应耽误,不实用计算秘麋集型的场景,只实用于业务处理惩罚非常快速的场景
  单Reactor单进程模子的经典应用场景:Redis、Netty


  • 单Reactor多进程/线程

    • reactor、acceptor、handler的功能由一个线程来执行
    • 引入线程池,handler只负责读取请求和写回效果,具体的业务处理惩罚由线程池中的worker线程来完成

   单Reactor多线程模子能够充分使用多核CPU的功能,但是因为只有一个Reactor对象承担全部变乱的监听和相应,且只在主线程上运行,在面对刹时高并发的场景时reactor轻易成为性能瓶颈
  单Reactor多线程模子的经典应用场景:Netty


  • 多Reactor多进程/线程

    • 一个主reactor线程、多个子reactor线程和多个worker线程组成的一个线程池
    • 主reactor负责监听客户端变乱,并在同一个线程中让acceptor处理惩罚毗连变乱
    • 创建毗连后,主reactor把毗连分发给子reactor线程由其负责后续变乱处理惩罚
    • 子reactor让同线程的handler读取请求和返回效果,具体业务处理惩罚由worker线程完成

   在多Reactor多线程模子中,主线程和子线程分工明确,主线程只负责接收新毗连,子线程完成后续业务处理惩罚,子线程无需返回数据,直接将处理惩罚效果发送给客户端
  多Reactor多线程模子的经典应用场景:Netty、kafka、Nginx
Proactor模子

Reactor模子是非阻塞同步网络模子,而Proactor模子是异步网络模子
阻塞I/O必要等候“内核数据准备好”和“数据从内核态拷贝到用户态”两个过程;非阻塞I/O必要等候“数据从内核态拷贝到用户态”一个过程;而异步I/O两个过程都不必要等候
Proactor模子将I/O变乱的监听、I/O操纵的执行、I/O效果的返回统统交给内核来执行,在数据准备阶段和数据拷贝阶段全程无阻塞。在发起异步读写请求时,必要传入数据缓冲区的地址等信息,系统内核主动完成读写工作,读写工作不必要应用进程主动发起read/write来读写数据,而是由操纵系统来完成。操纵系统完成读写工作后就会通知应用进程直接处理惩罚数据
   在Linux下的异步I/O是不美满的,aio系列函数是由POSIX界说的异步操纵接口,不是真正的操纵系统级别支持的,而是在用户空间模拟出来的异步,且仅支持基于当地文件的aio异步操纵,网络编程中的socket是不支持的
在windows里实现了一套完整的支持socket的异步编程接口IOCP,是由操纵系统级别实现的异步I/O
  Reactor模子和Proactor模子的区别



  • Reactor模子是非阻塞同步网络模式,感知的是停当可读写变乱,即“来了变乱操纵系统通知应用进程,由应用进程处理惩罚变乱”;Proactor模子是异步网络模式,感知的是已完成的读写变乱,即“来了变乱操纵系统来处理惩罚,处理惩罚完再通知应用进程
  • 在Proactor中,用户程序必要向内核传递用户空间的读缓冲区地址;Reactor则不必要。这导致Proactor中每个并发操纵都要求有独立的缓冲区,在内存上有肯定的开销
  • Proactor的实现逻辑复杂,编码成本相较Reactor高
  • Proactor在处理惩罚高耗时I/O时性能高于Reactor;对于低耗时I/O的执行服从提拔并不明显
线程池的界说

  1. class threadpool
  2. {
  3. public:
  4.     /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
  5.     threadpool(int actor_model, connection_pool* connPool, int thread_number = 8, int max_request = 10000);
  6.     ~threadpool();
  7.     bool append(T* request, int state);
  8.     bool append_p(T* request);
  9. private:
  10.     /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
  11.     static void* worker(void* arg);
  12.     void run();
  13. private:
  14.     int m_thread_number;         //线程池中的线程数
  15.     int m_max_requests;         //请求队列中允许的最大请求数
  16.     pthread_t* m_threads;       //描述线程池的数组,其大小为m_thread_number
  17.     std::list<T*> m_workqueue; //请求队列
  18.     locker m_queuelocker;       //保护请求队列的互斥锁
  19.     sem m_queuestat;            //是否有任务需要处理
  20.     connection_pool* m_connPool;  //数据库
  21.     int m_actor_model;          //模型切换
  22. };
复制代码
如果把函数看成是一个对象,则在线程池类的接口中传递差别的函数也就是传递差别的对象。如上所述,一个线程如果必要执行差别功能的函数,则必要在函数内调用其他函数来完成功能,在此处的对外接口append()引用了C++的模板编程(template),实现将差别类型的函数添加进工作队列中
  1. template <typename T>                                                                                //模板编程
  2. bool threadpool<T>::append(T* request, int state)                  
  3. {
  4.     m_queuelocker.lock();
  5.     if (m_workqueue.size() >= m_max_requests)
  6.     {
  7.         m_queuelocker.unlock();
  8.         return false;
  9.     }
  10.     request->m_state = state;
  11.     m_workqueue.push_back(request);
  12.     m_queuelocker.unlock();
  13.     m_queuestat.post();
  14.     return true;
  15. }
复制代码
线程池的创建

  1. template <typename T>
  2. //线程池构造函数
  3. threadpool<T>::threadpool(int actor_model, connection_pool* connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool)
  4. {
  5.     if (thread_number <= 0 || max_requests <= 0)            //线程池异常
  6.         throw std::exception();
  7.     m_threads = new pthread_t[m_thread_number];
  8.     if (!m_threads)                                         
  9.         throw std::exception();
  10.     for (int i = 0; i < thread_number; ++i)                 //循环创建线程,并将工作线程按要求进行运行
  11.     {
  12.         if (pthread_create(m_threads + i, NULL, worker, this) != 0)         
  13.         {
  14.             delete[] m_threads;
  15.             throw std::exception();
  16.         }
  17.         if (pthread_detach(m_threads[i]))               //将线程设定为分离状态,不用单独对线程进行回收,避免内存泄露
  18.         {
  19.             delete[] m_threads;
  20.             throw std::exception();
  21.         }
  22.     }
  23. }
复制代码
此中必要注意的函数有
  1. pthread_create(pthread_t * thread, const pthread_attr_t * attr, void* (*start_routine)(void*), void *arg)
  2. *pthread:传递一个线程的指针变量或该类型变量的地址
  3. *attr:用于手动设置新建线程的属性,一般设置为NULL由系统默认的属性值创建线程
  4. *(start_routine)(void*):以函数指针的方式指明新建线程需要执行的函数
  5. *arg:指定传递给start_routine函数的实参
  6. 返回值:成功创建则返回0;失败则返回非零值,对应不同的宏表明创建失败的原因
  7. pthread_detach(pthread_t *thread)
  8. 将已经运行中的线程设定为分离状态,成功返回0,失败返回错误号指明失败原因
复制代码
linux线程有两种状态:joinable状态和unjoinable状态。如果线程是joinable状态,当线程函数本身退出不会开释线程所占用的堆栈和线程描述符等资源,只有调用了PTHREAD_JOIN()后由主线程阻塞等候子线程竣事,然后回收子线程资源;如果线程是unjoinable状态(分离状态),则在子线程竣事时会主动回收资源
一般情况下,使用PTHREAD_CREATE()创建线程执行任务后,线程处于joinable状态。如果没有主线程调用PTHREAD_JOIN()往返收线程资源的话,该线程会继续占用系统资源成为“僵尸线程”,为了避免该种情况,项目中使用PTHREAD_DETACH()将线程分离避免后续忘记回收线程资源
PTHREAD_DETACH()用于将运行中的线程设定为分离状态,线程主动与主控线程断开关系,线程竣事后,其退出状态不由其他线程获取,而是直接本身主动开释,常应用于网络、多线程服务器
   c++11引入了std::thread来创建线程,支持对线程join大概detach
  1. std::thread t(func);
  2.     if (t.joinable()) {
  3.         t.detach();
  4.     }
复制代码
待办工作加入请求队列

当epoll检测到端口有变乱激活时,即将该变乱放入请求队列中,等候工作线程处理惩罚
  1. bool threadpool<T>::append(T* request, int state)                  
  2. {
  3.     m_queuelocker.lock();
  4.     if (m_workqueue.size() >= m_max_requests)
  5.     {
  6.         m_queuelocker.unlock();
  7.         return false;
  8.     }
  9.     request->m_state = state;
  10.     m_workqueue.push_back(request);
  11.     m_queuelocker.unlock();
  12.     m_queuestat.post();
  13.     return true;
  14. }
  15. bool threadpool<T>::append_p(T* request)                        //proactor模式下的任务请求入队
  16. {
  17.     m_queuelocker.lock();
  18.     if (m_workqueue.size() >= m_max_requests)
  19.     {
  20.         m_queuelocker.unlock();
  21.         return false;
  22.     }
  23.     m_workqueue.push_back(request);
  24.     m_queuelocker.unlock();
  25.     m_queuestat.post();
  26.     return true;
  27. }
复制代码
本项目实现的是一个基于半同步/半反应堆式的并发布局(使用同步I/O模拟Proactor变乱处理惩罚模式),由I/O线程完成读写,I/O线程向工作线程分发的是已接受的数据
以Proactor模式为例的工作流程如下:


  • 主线程充当异步线程,负责监听全部socket上的变乱
  • 有新请求到来,主线程接收以得到新的毗连socket,然后往epoll内核变乱表中注册该socket上的读写变乱
  • 如果毗连socket上有读写变乱发生,主线程从socket上接收数据,并将数据封装成请求对象插入到请求队列中
  • 全部工作线程就寝在请求队列上,当有任务到来时通过竞争得到任务的接受权

线程处理惩罚

创建线程池时,调用pthread_create时指向了worker()静态成员函数,worker()内部调用run()
  1. //线程回调函数/工作函数,arg其实是this
  2. template <typename T>
  3. void* threadpool<T>::worker(void* arg)
  4. {
  5.     threadpool* pool = (threadpool*)arg;        //将参数强行转化为线程池类,获取threadpool对象地址
  6.     pool->run();                                                                //线程池中每个线程创建都会调用run()睡眠在队列中
  7.     return pool;
  8. }
复制代码
run()函数可以看做一个回环变乱,一直等候m_queuestat.post(),即新任务进入请求队列,此时从请求队列中取出一个任务进行处理惩罚
  1. //工作线程通过run函数不断等待任务队列有新任务,然后 加锁->取任务->解锁->执行任务
  2. template <typename T>
  3. void threadpool<T>::run()
  4. {
  5.     while (true)
  6.     {
  7.         m_queuestat.wait();                     //信号量等待
  8.         m_queuelocker.lock();                   //工作线程被唤醒后先加互斥锁
  9.         if (m_workqueue.empty())
  10.         {
  11.             m_queuelocker.unlock();
  12.             continue;
  13.         }
  14.         T* request = m_workqueue.front();           //从请求队列中取出第一个任务
  15.         m_workqueue.pop_front();                    //该任务从请求队列中弹出
  16.         m_queuelocker.unlock();
  17.         if (!request)
  18.             continue;
  19.         if (1 == m_actor_model)                     //proactor模式和reactor模式切换
  20.         {   /*
  21.             reactor模式:只有reactor模式下,标志位improv和timer_flag才会发挥作用
  22.             imporv:在read_once和write成功后会置1,对应request完成后置0,用于判断上一个请求是否已处理完毕
  23.             timer_flag:当http的读写失败后置1,用于判断用户连接是否异常
  24.             */
  25.             if (0 == request->m_state)                                              //请求状态:读/写
  26.             {
  27.                 if (request->read_once())
  28.                 {
  29.                     request->improv = 1;            
  30.                     connectionRAII mysqlcon(&request->mysql, m_connPool);           //从连接池获取连接
  31.                     request->process();                                             //工作线程执行任务
  32.                 }
  33.                 else
  34.                 {
  35.                     request->improv = 1;
  36.                     request->timer_flag = 1;
  37.                 }
  38.             }
  39.             else
  40.             {
  41.                 if (request->write())
  42.                 {
  43.                     request->improv = 1;
  44.                 }
  45.                 else
  46.                 {
  47.                     request->improv = 1;
  48.                     request->timer_flag = 1;
  49.                 }
  50.             }
  51.         }
  52.         else
  53.         {
  54.             connectionRAII mysqlcon(&request->mysql, m_connPool);
  55.             request->process();
  56.         }
  57.     }
  58. }
复制代码

CGIMysql——数据库毗连池

在处理惩罚用户注册、登录请求时,我们必要保存用户的用户名和密码用于注册或登录校验。该项目采用的方式是:每一个HTTP毗连获取一个数据库毗连,获取此中的用户账号密码进行对比,然后再开释该数据库毗连。若系统必要频繁访问数据库,则必要频繁创建和断开数据库毗连,而创建数据库毗连是一个很耗时的操纵,也轻易对数据库造成安全隐患。
在程序初始化的时候,集中创建多个数据库毗连,并把他们集中管理,供程序使用,可以保证较快的数据库读写速度,更加安全可靠。在本项目中,使用单例模式和链表创建数据库毗连池,实现对数据库毗连资源的复用,并将数据库毗连的获取与开释通过RAII机制封装,避免手动开释
项目中的数据库模块分为两部分,其一是数据库毗连池的界说,其二是使用毗连池完成登录和注册的校验功能。具体,工作线程从数据库毗连池取得一个毗连,访问数据库中的数据,访问完毕后将毗连交还毗连池
毗连池的功能重要有:初始化、获取毗连、开释毗连、销毁毗连池
单例模式创建

使用局部静态变量创建毗连池
  1. class connection_pool
  2. {
  3. public:
  4.         MYSQL* GetConnection();                                 //获取数据库连接
  5.         bool ReleaseConnection(MYSQL* conn); //释放连接
  6.         int GetFreeConn();                                         //获取连接
  7.         void DestroyPool();                                         //销毁所有连接
  8.         //单例模式
  9.         static connection_pool* GetInstance();
  10.         void init(string url, string User, string PassWord, string DataBaseName, int Port, int MaxConn, int close_log);
  11.         void CreateConnection(string url, string User, string PassWord, string DataBaseName, int Port, int close_log);
  12. private:
  13.         connection_pool();
  14.         ~connection_pool();
  15.         int m_MaxConn;  //最大连接数
  16.         int m_CurConn;  //当前已使用的连接数
  17.         int m_FreeConn; //当前空闲的连接数
  18.         locker lock;
  19.         list<MYSQL*> connList; //连接池
  20.         sem reserve;
  21. public:
  22.         string m_url;                         //主机地址
  23.         string m_Port;                 //数据库端口号
  24.         string m_User;                 //登陆数据库用户名
  25.         string m_PassWord;         //登陆数据库密码
  26.         string m_DatabaseName; //使用数据库名
  27.         int m_close_log;        //日志开关
  28. };
复制代码
初始化

   销毁毗连池没有直接被外部调用,而是通过RAII机制来完成主动开释;使用信号量实现多线程争夺毗连的同步机制,这里将信号量初始化为数据库的毗连总数
  1. //构造初始化
  2. void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log)
  3. {
  4.         m_url = url;                                                //初始化数据库信息
  5.         m_Port = Port;
  6.         m_User = User;
  7.         m_PassWord = PassWord;
  8.         m_DatabaseName = DBName;
  9.         m_close_log = close_log;
  10.         for (int i = 0; i < MaxConn; i++)                //创建MaxConn条数据库连接
  11.         {
  12.                 MYSQL* con = NULL;
  13.                 con = mysql_init(con);                                //mysql_init(MYSQL* mysql):初始化或分配与mysql_real_connect()相适应的MYSQL对象
  14.                 if (con == NULL)                                        //初始化失败
  15.                 {
  16.                         LOG_ERROR("MySQL Error");               
  17.                         exit(1);
  18.                 }
  19.                 con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0);
  20.                 if (con == NULL)                                        //建立连接失败
  21.                 {
  22.                         LOG_ERROR("MySQL Error");
  23.                         exit(1);
  24.                 }
  25.                 connList.push_back(con);                        //更新连接池和空闲连接数量
  26.                 ++m_FreeConn;                                               
  27.         }
  28.         reserve = sem(m_FreeConn);                                //将信号量初始化为最大连接次数
  29.         m_MaxConn = m_FreeConn;
  30. }
复制代码
此中值得注意的有
  1. mysql_real_connect(MYSQL* mysql, const char* host, const char* user, const char* passwd, const char* db, unsigned int port, const char* unix_socket, unsigned long client_flag)
  2. 数据库引擎建立连接函数
  3. mysql:定义的MYSQL变量
  4. host:MYSQL服务器的地址,决定了连接的类型。如果"host"是NULL或字符串"localhost",连接将被视为与本地主机的连接,如果操作系统支持套接字(Unix)或命名管道(Windows),将使用它们而不是TCP/IP连接到服务器
  5. user:登录用户名,如果“user”是NULL或空字符串"",用户将被视为当前用户,在UNIX环境下,它是当前的登录名
  6. passwd:登录密码
  7. db:要连接的数据库,如果db为NULL,连接会将该值设为默认的数据库
  8. port:MYSQL服务器的TCP服务端口,如果"port"不是0,其值将用作TCP/IP连接的端口号
  9. unix_socket:unix连接方式,如果unix_socket不是NULL,该字符串描述了应使用的套接字或命名管道
  10. clientflag:Mysql运行为ODBC数据库的标记,一般取0
  11. 返回值:连接成功,返回连接句柄,即第一个变量mysql;连接失败,返回NULL
复制代码
获取、开释毗连

当线程数量大于数据库毗连数量时,使用信号量进行同步,每次取出毗连,信号量原子减1,开释毗连原子加1,若毗连池内没有毗连了,则阻塞等候
别的,由于多线程操纵毗连池,会造成竞争,这里使用互斥锁完成同步,具体的同步机制均使用lock.h中封装好的类
  1. //当有请求时,从数据库连接池中返回一个可用连接,更新使用和空闲连接数
  2. MYSQL* connection_pool::GetConnection()
  3. {
  4.         MYSQL* con = NULL;
  5.         if (0 == connList.size())
  6.                 return NULL;
  7.         reserve.wait();                                        //取出连接,信号量原子减1,为0则等待
  8.         lock.lock();                                        //lock互斥锁保证同一时间只有一个线程对容器connList进行操作
  9.         con = connList.front();                        //得到第一个连接
  10.         connList.pop_front();                        //从连接池中弹出该连接
  11.         if (con->isClosed())                        //如果连接被关闭,删除后重新建立一个
  12.         {
  13.                 delete con;
  14.                 con = this->CreateConnection(this->m_url,this->m_User,this->m_PassWord,this->m_DatabaseName,this->m_Port,this->m_close_log);                //CreationConnection()实际是将之前init中for循环中的“创建一条数据库连接”操作重复一遍
  15.         }
  16.         if (con == NULL)
  17.         {
  18.                 --m_CurConn;
  19.         }
  20.         --m_FreeConn;
  21.         ++m_CurConn;
  22.         lock.unlock();
  23.         return con;
  24. }
  25. //释放当前使用的连接
  26. bool connection_pool::ReleaseConnection(MYSQL* con)
  27. {
  28.         if (NULL == con)                       
  29.                 return false;
  30.         lock.lock();
  31.         connList.push_back(con);
  32.         ++m_FreeConn;
  33.         --m_CurConn;
  34.         lock.unlock();
  35.         reserve.post();                                //释放连接原子加1
  36.         return true;
  37. }
复制代码
销毁毗连池

通过迭代器遍历毗连池链表,关闭对应数据库毗连,清空链表并重置空闲毗连和现有毗连数量
  1. //销毁数据库连接池
  2. void connection_pool::DestroyPool()
  3. {
  4.         lock.lock();
  5.         if (connList.size() > 0)
  6.         {
  7.                 list<MYSQL*>::iterator it;                                                                                //通过迭代器遍历,关闭数据库连接
  8.                 for (it = connList.begin(); it != connList.end(); ++it)
  9.                 {
  10.                         MYSQL* con = *it;
  11.                         mysql_close(con);
  12.                 }
  13.                 m_CurConn = 0;
  14.                 m_FreeConn = 0;
  15.                 connList.clear();                                                        //清空连接池
  16.         }
  17.         lock.unlock();
  18. }
复制代码
RAII机制开释数据库毗连

RAII(Resource Acquisition Is Initialization)是由c++之父Bjarne Stroustrup提出的,中文翻译为资源获取即初始化,指使用局部对象来管理资源的技术。这里的资源重要是指操纵系统中有限的东西如内存、网络套接字等等,局部对象是指存储在栈的对象,它的生命周期是由操纵系统来管理的,无需人工介入
资源的使用一般经历三个步骤a.获取资源 b.使用资源 c.销毁资源,但是资源的销毁往往是程序员经常忘记的一个环节,而C++引入了智能指针(C++11新特性之智能指针)的概念,使用了引用计数的方法,让程序员不必要关系手动开释内存。RAII则是使用了C++中一个对象出了其作用域会被主动析构的特点,在构造函数中申请空间,在析构函数中开释空间来控制资源的生命周期
  1. //定义
  2. class connectionRAII {
  3. public:
  4.         connectionRAII(MYSQL** con, connection_pool* connPool);                        //双指针对MYSQL *con修改
  5.         ~connectionRAII();
  6. private:
  7.         MYSQL* conRAII;
  8.         connection_pool* poolRAII;
  9. }
  10. //实现
  11. connectionRAII::connectionRAII(MYSQL** SQL, connection_pool* connPool) {
  12.         *SQL = connPool->GetConnection();
  13.        
  14.         conRAII = *SQL;
  15.         poolRAII = connPool;
  16. }
  17. connectionRAII::~connectionRAII() {
  18.         poolRAII->ReleaseConnection(conRAII);                               
  19. }
复制代码
RAII现实上应该是一种编程思想,将资源申请、开释等成对的操纵进行封装,实现在局部域内申请资源并及时销毁,使得程序员不需在局部域中时刻观察资源是否必要开释,避免遗漏

Timer——定时器模块

基础知识



  • 非活跃:指客户端与服务器创建毗连后,长时间不互换数据,一直占用服务器的文件描述符,造成毗连资源的浪费
  • 定时势件:指固定一段时间之后触发某段代码,由该段代码处理惩罚一个变乱(举例:从内核变乱表删除变乱,并关闭文件描述符,开释毗连资源)
  • 定时器:指使用布局体或其他形式,将多种定时势件进行封装起来(本项目中只涉及一种定时势件,即定时检查非活跃毗连,将定时势件与毗连资源封装为一个布局体定时器)
  • 定时器容器:指使用某种容器类数据布局,将上述多个定时器组合起来,便于对定时势件同一管理(本项目中使用升序链表将全部定时器串联组织起来)
  • 同一变乱源:将信号变乱与其他变乱一起处理惩罚。具体地,信号处理惩罚函数使用管道将信号传递给主循环,信号处理惩罚函数往管道的写端写入信号值,主循环则从管道的读端读出信号值,使用I/O复用系统调用来监听管道读端的可读变乱,这样信号变乱与其他文件描述符都可以通过epoll来监测,从而实现同一处理惩罚
模块功能

本项目中,服务器主循环为每一个毗连创建一个定时器,并对每个毗连进行定时。别的,使用升序时间链表容器将全部定时器串联起来,若主循环接收到定时通知,则在链表中依次执行定时任务
Linux提供了三种定时的方法:


  • socket选项SO_RECVTIMEO和SO_SNDTIMEO

    • socket选项SO_SNDTIMEO:用来设置socket发送数据的超时时间,若该时间内未接收到任何数据,则socket返回一个错误码
    • socket选项SO_SNDTIMEO:用来设置socket接收数据的超时时间,若该时间内无法发送完全部数据,则发送操纵失败并返回一个错误
           使用SO_SNDTIMEO选项时,必要先将socket设置为非阻塞模式,否则会忽略超时设置并阻塞发送操纵
        通过函数setsockopt()可以设置SO_SNDTIMEO和SO_RECVTIMEO选项

  1. int setsockopt( int socket, int level, int option_name,const void *option_value, size_t ,ption_len)
  2. socket:socket描述符
  3. level:被设置的选项的级别。如果要在socket级别上设置选项,则必须将level设置为SOL_SOCKET
复制代码


  • SIGALRM信号

    • SIGALRM是一种处理惩罚定时器信号的信号,代表在已界说的时间间隔后发生的告诫信号,可用于定时器、计时器和轮询器等
    • 通常使用系统调用alarm()来设置发生SIGALRM的时间间隔

  • I/O复用系统调用的超时参数

    • I/O复用系统调用自带的超时参数,既能同一处理惩罚信号和I/O变乱,也能同一处理惩罚定时势件
    • I/O复用系统调用可能在超时时间到期之前就返回(有I/O变乱发生),如果要使用它们来定时,就必要不停更新定时参数以反映剩余的时间

三种方法没有一劳永逸的应用场景,没有绝对的优劣。在本项目中重要使用的是SIGALRM信号
具体地,使用alarm函数周期性地触发SIGALRM信号,信号处理惩罚函数使用管道通知主循环,主循环接收到该信号后对升序链表上全部定时器进行处理惩罚,若该段时间内没有互换数据,则将该毗连关闭,开释所占用的资源。该模块重要分为两部分,其一为定时方法与信号通知流程,其二为定时器及其容器设计与定时任务的处理惩罚
基础API



  • SIGALRM、SIGTERM信号

    • SIGALRM:由alarm系统调用产生的timer时钟信号
    • SIGTERM:终端发送的终止信号

  • sigaction布局体、检查或修改与指定信号相干联的处理惩罚动作——sigaction()函数
  1. struct sigaction {
  2.         void (*sa_handler)(int);                                                        //函数指针,指向旧的信号处理函数
  3.         void (*sa_sigaction)(int, siginfo_t *, void *);                //新的信号处理函数
  4.         sigset_t sa_mask;                                                                        //信号阻塞集,指定在信号处理函数执行期间需要被屏蔽的信号
  5.         int sa_flags;                                                                                //信号的处理方式
  6.         void (*sa_restorer)(void);                                                        //已弃用
  7. }
  8. #include <signal.h>
  9. int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact)
  10. signum:要操作的信号
  11. act:要设置的对信号的新处理方式
  12. oldact:原来对信号的处理方式
  13. return:成功返回0,出现错误则返回-1
复制代码
  1. sa_flags指定信号的处理方式,可以是以下几种的“按位或”组合
  2.         SA_RESTART:使被信号打断的系统调用自动重新发起
  3.         SA_NOCLDSTOP:使父进程在它的子进程暂停或继续运行时不会收到 SIGCHLD 信号
  4.         SA_NOCLDWAIT:使父进程在它的子进程退出时不会收到 SIGCHLD 信号,这时子进程如果退出也不会成为僵尸进程
  5.         SA_NODEFER:使对信号的屏蔽无效,即在信号处理函数执行期间仍能发出这个信号
  6.         SA_RESETHAND:信号处理之后重新设置为默认的处理方式
  7.         SA_SIGINFO:使用 sa_sigaction 成员而不是 sa_handler 作为信号处理函数
复制代码


  • 初始化自界说信号集——sigfillset()函数

    • 现实使用中,常用sigemptyset()将信号集清空,再使用sigaddset()想信号集中添加所需的特定信号以达到更精致的控制信号的目的

  1. #include <signal.h>
  2. int sigfillset(sigset_t *set)                        //将参数set信号集初始化(信号集中所有标志位置1),然后把所有的信号加入到此信号集中
  3. set:指向信号集合的指针
  4. return:成功返回0,出现错误则返回-1
复制代码


  • 设置信号传送闹钟——alarm()函数

    • 如果未设置信号SIGALRM的处理惩罚函数,则alarm()默认终止进程

  1. #include <unisted.h>
  2. unsigned int alarm(unsigned int seconds)                设置信号SIGALRM在经过参数seconds秒数后发送给目前的进程
  3. seconds:要设定的定时时间,以秒为单位。在alarm调用后开始计时,超过该时间将触发SIGALRM信号
  4. return:返回当前进程之前设置的定时器剩余秒数
复制代码


  • 创建套接字对用于通信——socketpair()函数

    • 用SOCK_STREAM创建的套接字对是管道流,创建的通道是双向的,支持全双工通信

  1. #include <sys/types.h>
  2. #include <sys/socket.h>
  3. int socketpair(int domain, int type, int protocol, int sv[2])                //创建一对无名的、相互连接的套接字
  4. domain:协议族,只能为PF_UNIX或者AF_UNIX
  5. type:协议,可以是SOCK_STREAM(基于TCP)或SOCK_DGRAM(基于UDP)
  6. protocol:类型,只能为0
  7. sv[2]:套接字柄对,该两个句柄作用相同,均能进行读写双向操作
  8. return:创建成功返回0,失败返回-1
复制代码
信号处理惩罚机制

Linux下的信号采用异步处理惩罚机制,信号处理惩罚函数与当前进程是两条差别的执行门路。当进程收到信号时,操纵系统会中断进程当前的正常流程,转而进入信号处理惩罚函数执行操纵,完成后再返回中断的地方继续执行
为避免信号竞态征象发生,信号处理惩罚期间系统不会再次触发它。所以,为确保该信号不被屏蔽太久,信号处理惩罚函数必要尽可能快地执行完毕。一般的信号处理惩罚函数必要处理惩罚该信号对应的逻辑,当该逻辑比较复杂时,信号处理惩罚函数执行时间过长,会导致信号屏蔽太久。为相识决该题目,本项目中信号处理惩罚函数仅发送信号通知程序主循环,将信号对应的处理惩罚逻辑放在程序主循环中,由主循环执行信号对应的逻辑代码。 信号处理惩罚的流程如下图所示(图片来自微信公众号两猿社)

信号的接收

接收信号的任务不是由用户进程来完成的,而是由内核署理的。当内核接收到信号后,会将其放到对应进程的信号队列中,同时向进程发送一个中断,使其陷入内核态。注意,此时信号还只是在队列中,对进程来说临时是不知道有信号到来的
信号的检测

进程陷入内核态后,有两种场景会对信号进行检测:


  • 进程从内核态返回到用户态前进行信号检测
  • 进程在内核态中,从就寝状态被唤醒的时候进行信号检测
当发现有新信号时,便会进入下一步,信号的处理惩罚
信号的处理惩罚

信号处理惩罚函数是在用户态上的

  • 内核态)调用处理惩罚函数前,内核将当前内核栈的内容备份拷贝到用户栈上,而且修改指令寄存器(eip)将其指向信号处理惩罚函数
  • 用户态)进程返回到用户态中,执行信号处理惩罚函数
  • 内核态)信号处理惩罚函数执行完成后,必要返回内核态检查是否有其他信号未处理惩罚
  • 用户态)全部信号都处理惩罚完成,则将内核栈规复(从步骤1中的用户栈备份拷贝回来),同时规复指令寄存器(eip)将其指向中断前的运行位置,最后回到用户态继续执行进程


  • 信号处理惩罚函数
  1. //信号处理函数中仅仅通过管道发送信号值,不处理信号对应的逻辑,缩短异步执行时间,减少对主程序的影响
  2. void Utils::sig_handler(int sig)
  3. {
  4.     //为保证函数的可重入性,保留原来的errno
  5.     //可重入性表示中断后再次进入该函数,环境变量与之前相同,不会丢失数据
  6.     int save_errno = errno;
  7.     int msg = sig;
  8.     send(u_pipefd[1], (char*)&msg, 1, 0);                //将信号值从管道写端写入,传输字符类型,而非整型
  9.     errno = save_errno;                                                        //将原来的errno赋值为当前的errno
  10. }
复制代码
信号处理惩罚函数中仅通过管道发送信号值,不处理惩罚信号对应的逻辑,缩短异步执行时间,减少对主程序的影响
  1. //设置信号函数
  2. void Utils::addsig(int sig, void(handler)(int), bool restart)       //项目中设置信号函数,仅关注SIGTERM和SIGALRM两个信号
  3. {
  4.     struct sigaction sa;                             //创建sigaction结构体变量
  5.     memset(&sa, '\0', sizeof(sa));
  6.     sa.sa_handler = handler;                        //信号处理函数中仅仅发送信号值,不做对应逻辑处理
  7.     if (restart)
  8.         sa.sa_flags |= SA_RESTART;
  9.     sigfillset(&sa.sa_mask);                        //将所有信号添加到信号集中
  10.     assert(sigaction(sig, &sa, NULL) != -1);        //执行sigaction函数
  11. }
  12. 项目中设置信号函数,仅关注SIGTERM和AIGALRM两个信号
复制代码
信号通知逻辑



  • 创建管道,此中管道写端写入信号值,管道读端通过I/O复用系统监测读变乱
  • 设置信号处理惩罚函数SIGALRM(时间到了触发)和SIGTERM(kill会触发)

    • 通过struct sigaction布局体和sigaction函数注册信号捕捉函数
    • 在布局体的handler参数设置信号处理惩罚函数,具体地,从管道写端写入信号的名字

  • 使用I/O复用系统监听管道读端文件描述符的可读变乱
  • 信息值传递给主循环,主循环再根据接收到的信号值执行目标信号对应的逻辑代码
定时器设计

定时器类的界说

项目中将毗连资源、定时势件和超时时间封装为定时类


  • 毗连资源包罗客户端套接字地址、文件描述符和定时器
  • 定时势件为回调函数,将其封装起来由用户自界说,这里是删除非活动socket上的注册变乱,并关闭
  • 定时器超时时间 = 浏览器和服务器毗连时刻 + 固定时间(TIMESLOT),这里定时器使用绝对时间作为超时值
  1. //连接资源结构体成员需要用到定时器类,需要前向声明
  2. class util_timer;
  3. struct client_data                  //开辟用户socket结构 对应于最大处理fd
  4. {
  5.     sockaddr_in address;            //客户端socket地址
  6.     int sockfd;                     //socket文件描述符
  7.     util_timer* timer;              //定时器
  8. };  
  9. class util_timer                    //定时器类
  10. {
  11. public:
  12.     util_timer() : prev(NULL), next(NULL) {}
  13. public:
  14.     time_t expire;                              //超时时间
  15.     void (*cb_func)(client_data*);              //回调函数
  16.     client_data* user_data;                     //连接资源
  17.     util_timer* prev;                           //前向定时器
  18.     util_timer* next;                           //后继定时器
  19. };
复制代码
定时势件,具体地,从内核变乱表删除变乱,关闭文件描述符,开释毗连资源
  1. //定时器回调函数
  2. void cb_func(client_data* user_data)
  3. {
  4.     epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);        //删除非活动连接在socket上的注册事件
  5.     assert(user_data);
  6.     close(user_data->sockfd);                        //关闭文件描述符
  7.     http_conn::m_user_count--;                        //减少连接数
  8. }
复制代码
定时器的创建与销毁

该项目的定时器采用升序双向链表作为容器,具体地,为每个毗连创建一个定时器,将其添加到链表中,并按照超时时间升序排序。在本项目中,定时器的创建与销毁采用了RAII思想,在构造函数与析构函数中完成定时器容器的创建与销毁
  1. sort_timer_lst::sort_timer_lst()
  2. {
  3.     head = NULL;
  4.     tail = NULL;
  5. }
  6. sort_timer_lst::~sort_timer_lst()       //常规销毁链表
  7. {
  8.     util_timer* tmp = head;
  9.     while (tmp)
  10.     {
  11.         head = tmp->next;
  12.         delete tmp;
  13.         tmp = head;
  14.     }
  15. }
复制代码
添加定时任务

添加定时任务即将新毗连的定时器添加到链表中。若当前链表中只有头尾结点(链表为空),则直接使头尾结点指向该定时器


  • 若新的定时器(当前正在添加到链表中的定时器)的超时时间小于当前头部结点,则直接将新的定时器结点作为头部结点
  1. void sort_timer_lst::add_timer(util_timer* timer)       //添加定时器,内部调用私有成员add_timer
  2. {
  3.     if (!timer)
  4.     {
  5.         return;
  6.     }
  7.     if (!head)
  8.     {
  9.         head = tail = timer;
  10.         return;
  11.     }
  12.     if (timer->expire < head->expire)       //如果新的定时器超时时间小于当前头部结点,则直接将当前定时器结点作为头部结点
  13.     {
  14.         timer->next = head;
  15.         head->prev = timer;
  16.         head = timer;
  17.         return;
  18.     }
  19.     add_timer(timer, head);                 //否则调用私有成员,调整内部结点
  20. }
复制代码


  • 若新的定时器结点的超时时间大于等于当前头部结点,则必要插入到链表中,采用双向链表的插入操纵
  1. //私有成员,被公有成员add_timer和adjust_time调用,主要用于调整链表内部结点
  2. void sort_timer_lst::add_timer(util_timer* timer, util_timer* lst_head)     //主要用于调整链表内部结点
  3. {
  4.     util_timer* prev = lst_head;
  5.     util_timer* tmp = prev->next;
  6.     while (tmp)                                             //遍历当前结点之后的链表,按照超时时间找到目标定时器对应的位置,常规双向链表插入操作
  7.     {
  8.         if (timer->expire < tmp->expire)
  9.         {
  10.             prev->next = timer;
  11.             timer->next = tmp;
  12.             tmp->prev = timer;
  13.             timer->prev = prev;
  14.             break;
  15.         }
  16.         prev = tmp;
  17.         tmp = tmp->next;
  18.     }
  19.     if (!tmp)                                           //遍历完发现,目标定时器需要放到尾结点处
  20.     {
  21.         prev->next = timer;
  22.         timer->prev = prev;
  23.         timer->next = NULL;
  24.         tail = timer;
  25.     }
  26. }
复制代码
任务超时时间调整

当定时任务发生变革,调整对应定时器在链表中的位置


  • 客户端在设定时间内有数据收发,则当前时刻对该定时器重新设定时间
  • 被调整的目标定时器在尾部,或定时器新的超时时间仍旧小于下一个定时器的超时,则不消调整;否则先将定时器从链表取出,重新插入链表
  1. void sort_timer_lst::adjust_timer(util_timer* timer)        //调整定时器,任务发生变化时,调整定时器在链表中的位置
  2. {
  3.     if (!timer)
  4.     {
  5.         return;
  6.     }
  7.     util_timer* tmp = timer->next;
  8.     if (!tmp || (timer->expire < tmp->expire))
  9.     {
  10.         return;
  11.     }
  12.     if (timer == head)                                  //被调整定时器是链表头结点,将定时器取出,重新插入               
  13.     {
  14.         head = head->next;
  15.         head->prev = NULL;
  16.         timer->next = NULL;
  17.         add_timer(timer, head);
  18.     }
  19.     else                                                //被调整定时器在内部,将定时器取出,重新插入
  20.     {
  21.         timer->prev->next = timer->next;
  22.         timer->next->prev = timer->prev;
  23.         add_timer(timer, timer->next);
  24.     }
  25. }
复制代码
删除定时任务

定时器超时,则将其从链表中删除
  1. void sort_timer_lst::del_timer(util_timer* timer)           //删除定时器
  2. {
  3.     if (!timer)
  4.     {
  5.         return;
  6.     }
  7.     if ((timer == head) && (timer == tail))                 //链表中只有一个定时器,需要删除该定时器
  8.     {
  9.         delete timer;
  10.         head = NULL;
  11.         tail = NULL;
  12.         return;
  13.     }
  14.     if (timer == head)                                      //被删除的定时器为头结点
  15.     {
  16.         head = head->next;
  17.         head->prev = NULL;
  18.         delete timer;
  19.         return;
  20.     }
  21.     if (timer == tail)                                      //被删除的定时器为尾结点
  22.     {
  23.         tail = tail->prev;
  24.         tail->next = NULL;
  25.         delete timer;
  26.         return;
  27.     }
  28.     timer->prev->next = timer->next;                        //被删除的定时器在链表内部,常规链表结点删除
  29.     timer->next->prev = timer->prev;
  30.     delete timer;
  31. }
复制代码
定时任务处理惩罚函数

使用同一变乱源,SIGALRM信号每次被触发,主循环中调用一次定时任务处理惩罚函数,处理惩罚链表容器中到期的定时器
具体逻辑如下:


  • 遍历定时器升序链表容器,重新结点开始依次处理惩罚每个定时器,直到碰到尚未到期的定时器
  • 若当前时间小于定时器超时时间,跳出循环,即未找到到期的定时器;若当前时间大于定时器超时时间,即找到了到期的定时器,执行回调函数,然后将它从链表中删除,然后继续遍历
  1. void sort_timer_lst::tick()
  2. {
  3.     if (!head)
  4.     {
  5.         return;
  6.     }
  7.     time_t cur = time(NULL);                                //获取当前时间
  8.     util_timer* tmp = head;
  9.     while (tmp)                                             //遍历定时器链表
  10.     {
  11.         if (cur < tmp->expire)                              //链表容器为升序排列,当前时间小于定时器的超时时间,后面的定时器也没有到期
  12.         {
  13.             break;
  14.         }
  15.         tmp->cb_func(tmp->user_data);                       //当前定时器到期,则调用回调函数,执行定时事件
  16.         head = tmp->next;                                   //将处理后的定时器从链表容器中删除,并重置头结点
  17.         if (head)
  18.         {
  19.             head->prev = NULL;
  20.         }
  21.         delete tmp;
  22.         tmp = head;
  23.     }
  24. }
复制代码
定时器的使用(webserver.cpp)


  • 客户端与服务器毗连时,创建该毗连对应的定时器,并将定时器添加到链表上
  1. void WebServer::timer(int connfd, struct sockaddr_in client_address)
  2. {
  3.     users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName);
  4.     //初始化client_data数据
  5.     //创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中
  6.     users_timer[connfd].address = client_address;
  7.     users_timer[connfd].sockfd = connfd;
  8.     util_timer *timer = new util_timer;                                //创建定时器
  9.     timer->user_data = &users_timer[connfd];                //绑定用户数据
  10.     timer->cb_func = cb_func;                                                //设置回调函数
  11.     time_t cur = time(NULL);
  12.     timer->expire = cur + 3 * TIMESLOT;                                //设置超时时间
  13.     users_timer[connfd].timer = timer;
  14.     utils.m_timer_lst.add_timer(timer);                                //添加定时器到链表中
  15. }
复制代码

  • 处理惩罚异常变乱时,执行定时势件,服务器关闭毗连,并将定时器从链表中移除
  1. void WebServer::eventLoop()
  2. {
  3.     bool timeout = false;                                                           //超时默认为false
  4.     bool stop_server = false;
  5.     while (!stop_server)
  6.     {
  7.         int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);           //监测发生事件的文件描述符
  8.         if (number < 0 && errno != EINTR)
  9.         {
  10.             LOG_ERROR("%s", "epoll failure");
  11.             break;
  12.         }
  13.         for (int i = 0; i < number; i++)                                            //轮询事件描述符
  14.         {
  15.             int sockfd = events[i].data.fd;
  16.             //处理新到的客户连接
  17.             if (sockfd == m_listenfd)
  18.             {
  19.                 bool flag = dealclinetdata();
  20.                 if (false == flag)
  21.                     continue;
  22.             }
  23.             else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))             //处理异常事件
  24.             {
  25.                 //服务器端关闭连接,移除对应的定时器
  26.                 util_timer* timer = users_timer[sockfd].timer;
  27.                 deal_timer(timer, sockfd);
  28.             }
  29.             //处理定时器信号
  30.             else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN))   //管道读端对应文件描述符发生读事件        
  31.             {
  32.                 bool flag = dealwithsignal(timeout, stop_server);
  33.                 if (false == flag)
  34.                     LOG_ERROR("%s", "dealclientdata failure");
  35.             }
  36.             //处理客户连接上接收到的数据
  37.             else if (events[i].events & EPOLLIN)
  38.             {
  39.                 dealwithread(sockfd);
  40.             }
  41.             else if (events[i].events & EPOLLOUT)
  42.             {
  43.                 dealwithwrite(sockfd);
  44.             }
  45.         }
  46.         if (timeout)                //处理定时器为非必须事件,收到信号并不是立马处理,而是完成读写事件后再进行处理
  47.         {
  48.             utils.timer_handler();
  49.             LOG_INFO("%s", "timer tick");
  50.             timeout = false;
  51.         }
  52.     }
  53. }
复制代码
此中,处理惩罚定时器信号部分中的dealwithsignal()内容如下
  1. bool WebServer::dealwithsignal(bool& timeout, bool& stop_server)
  2. {
  3.     int ret = 0;
  4.     int sig;
  5.     char signals[1024];
  6.     ret = recv(m_pipefd[0], signals, sizeof(signals), 0);                //从管道读端读出信号值,成功返回字节数,失败返回-1
  7.     if (ret == -1)
  8.     {
  9.         return false;
  10.     }
  11.     else if (ret == 0)
  12.     {
  13.         return false;
  14.     }
  15.     else
  16.     {
  17.         for (int i = 0; i < ret; ++i)
  18.         {
  19.             switch (signals[i])
  20.             {
  21.             case SIGALRM:                       //接收到SIGALRM信号,timeout设置为true
  22.             {
  23.                 timeout = true;
  24.                 break;
  25.             }
  26.             case SIGTERM:                       //接收到SIGTERM信号,终止进程运行,stop_server设置为true
  27.             {
  28.                 stop_server = true;
  29.                 break;
  30.             }
  31.             }
  32.         }
  33.     }
  34.     return true;
  35. }
复制代码
处理惩罚客户毗连上接收到的数据,其函数内容为
  1. void WebServer::dealwithread(int sockfd)
  2. {
  3.     util_timer* timer = users_timer[sockfd].timer;
  4.     //reactor
  5.     if (1 == m_actormodel)
  6.     {
  7.         if (timer)
  8.         {
  9.             adjust_timer(timer);
  10.         }
  11.         //若监测到读事件,将该事件放入请求队列
  12.         m_pool->append(users + sockfd, 0);
  13.         while (true)
  14.         {
  15.             if (1 == users[sockfd].improv)
  16.             {
  17.                 if (1 == users[sockfd].timer_flag)
  18.                 {
  19.                     deal_timer(timer, sockfd);
  20.                     users[sockfd].timer_flag = 0;
  21.                 }
  22.                 users[sockfd].improv = 0;
  23.                 break;
  24.             }
  25.         }
  26.     }
  27.     else
  28.     {
  29.         //proactor
  30.         if (users[sockfd].read_once())
  31.         {
  32.             LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
  33.             //若监测到读事件,将该事件放入请求队列
  34.             m_pool->append_p(users + sockfd);
  35.             if (timer)
  36.             {
  37.                 adjust_timer(timer);                //有数据传输,则将定时器向后延迟3个单位,并对新的定时器在链表上的位置进行调整
  38.             }
  39.         }
  40.         else
  41.         {
  42.             deal_timer(timer, sockfd);                //服务器端关闭连接,移除对应的定时器
  43.         }
  44.     }
  45. }
  46. void WebServer::dealwithwrite(int sockfd)
  47. {
  48.     util_timer* timer = users_timer[sockfd].timer;
  49.     //reactor
  50.     if (1 == m_actormodel)
  51.     {
  52.         if (timer)
  53.         {
  54.             adjust_timer(timer);
  55.         }
  56.         m_pool->append(users + sockfd, 1);
  57.         while (true)
  58.         {
  59.             if (1 == users[sockfd].improv)
  60.             {
  61.                 if (1 == users[sockfd].timer_flag)
  62.                 {
  63.                     deal_timer(timer, sockfd);
  64.                     users[sockfd].timer_flag = 0;
  65.                 }
  66.                 users[sockfd].improv = 0;
  67.                 break;
  68.             }
  69.         }
  70.     }
  71.     else
  72.     {
  73.         //proactor
  74.         if (users[sockfd].write())
  75.         {
  76.             LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
  77.             if (timer)
  78.             {
  79.                 adjust_timer(timer);                //有数据传输,则将定时器向后延迟3个单位,并对新的定时器在链表上的位置进行调整
  80.             }
  81.         }
  82.         else
  83.         {
  84.             deal_timer(timer, sockfd);                //服务器端关闭连接,移除对应的定时器
  85.         }
  86.     }
  87. }
复制代码

Lock——信号同步机制封装

竟态

竟态指多个进程或线程在对共享资源进行读写时,由于对共享资源的访问顺序不确定或变乱差等因素导致的错误行为。具体来说,当两个或多个进程或线程试图同时访问同一共享资源时,它们之间可能会产生竟态,从而导致程序出现不可猜测的效果
竟态是一种非常难以调试和解决的题目,因为竟态通常只在特定的时间和特定的运行情况下才会出现,因此可能必要多次运行程序才华重现题目。为避免竟态,必要使用各种同步机制,例如互斥锁、条件变量、信号量等,来保护共享资源的访问,确保只有一个进程或线程能够访问该资源
信号量——sem类

信号量是一种特别的变量,它只能取自然数而且只支持两种操纵,P(wait)和V(signal)


  • P 操纵会使得信号量的值减 1,如果此时信号量的值小于 0,则调用进程或线程会被阻塞,等候其他进程或线程对信号量进行 V 操纵,使得信号量的值大于 0,此时阻塞的进程或线程才华继续执行
  • V 操纵会使得信号量的值加 1,如果此时信号量的值小于等于 0,则会唤醒阻塞在该信号量上的某个进程或线程
信号量的取值可以是任何自然数,根据初始值的差别可以分为两类:


  • 二进制信号量:指初始值为 1 的信号量,此类信号量只有 1 和 0 两个值,通常用来替代互斥锁实现线程同步
  • 计数信号量:指初始值大于 1 的信号量,当进程中存在多个线程,但某公共资源允许同时访问的线程数量是有限的,这时就可以用计数信号量来限定同时访问资源的线程数量
根据使用场景的差别,信号量也可以分为两类:


  • 无名信号量:也被称作基于内存的信号量,只可以在共享内存的情况下,比如实现进程中各个线程之间的互斥和同步
  • 命名信号量:通常用于不共享内存的情况下,比如进程间通信
在本项目中重要实现的是线程同步,只必要使用无名信号量,重要使用以下几个函数


  • 初始化信号量sem_init()
  1. include <semaphore.h>
  2. int sem_init(sem_t *sem,int pshared,unsignedint value)                //初始化一个信号量
  3. sem:指向要初始化的信号量的指针
  4. pshared:指定信号量的共享方式。如果值为 0,则信号量将在进程内部共享。如果值为非 0,则信号量可以在不同进程之间共享,需要使用共享内存
  5. value:指定信号量的初值
  6. return:成功则返回0,否则返回-1
复制代码


  • 销毁信号量sem_destory()
  1. include <semaphore.h>
  2. int sem_destroy(sem_t *sem)                        //销毁一个信号量
  3. sem:指向要销毁的信号量的指针
  4. return:成功返回0,否则返回-1
复制代码


  • 对信号量进行P操纵sem_wait()
  1. include <semaphore.h>
  2. int sem_wait(sem_t *sem)                        //对信号量进行 P 操作,如果信号量的值小于等于 0,则会阻塞当前线程
  3. sem:指向要操作的信号量的指针
  4. return:成功返回0,否则返回-1
复制代码


  • 对信号进行V操纵sem_post()
  1. include <semaphore.h>
  2. int sem_post(sem_t *sem)                        //对信号量进行 V 操作,如果信号量的值小于等于 0,则会唤醒阻塞在该信号量上的某个线程
  3. sem:指向要操作的信号量的指针
  4. return:成功返回0,否则返回-1
复制代码
互斥锁——locker类

互斥锁,也称互斥量,可以保护关键代码段,以确保独占式访问.当进入关键代码段,得到互斥锁将其加锁;离开关键代码段,唤醒等候该互斥锁的线程
互斥量是一种用于保护临界区的同步机制,可以确保同一时刻只有一个线程访问共享资源。当一个线程访问共享资源时,必要先获取互斥量的锁,其他线程必要等候该锁开释才华继续执行
互斥量不是为了消除竞争,现实上,资源还是共享的,线程间也还是竞争的,只不过通过这种“锁”机制就将共享资源的访问变成互斥操纵,也就是说一个线程操纵这个资源时,其它线程无法操纵它,从而消除与时间有关的错误。但是,这种锁机制不是欺凌的,互斥锁实质上是操纵系统提供的一把“发起锁”(又称“协同锁”),发动身序中有多线程访问共享资源的时候使用该机制
因此,即使有了mutex,其它线程如果不按照这种锁机制来访问共享数据的话,依然会造成数据杂乱。所以为了避免这种情况,全部访问该共享资源的线程必须采用类似的锁机制


  • 初始化互斥量pthread_mutex_init()
  1. include <pthread.h>
  2. int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)                //初始化互斥量
  3. mutex:指向要初始化的互斥量的指针
  4. attr:指向互斥量属性对象的指针,通常设置为 NULL
  5. return:成功则返回0,否则返回一个正整数的错误码
复制代码


  • 销毁互斥量pthread_mutex_destroy()
  1. include <pthread.h>
  2. int pthread_mutex_destroy(pthread_mutex_t *mutex)                        //销毁互斥量
  3. mutex:指向要销毁的互斥量的指针
  4. return:成功则返回0,否则返回一个正整数的错误码
复制代码


  • 给互斥锁加锁pthread_mutex_lock()
  1. include <pthread.h>
  2. int pthread_mutex_lock(pthread_mutex_t *mutex)                                //给互斥量加锁,如果互斥量已经被锁住,则阻塞当前线程,直到互斥量被解锁
  3. mutex:指向要加锁的互斥量的指针
  4. return:成功则返回0,否则返回一个正整数的错误码
复制代码


  • 解锁互斥量pthread_mutex_unlock()
  1. include <pthread.h>
  2. int pthread_mutex_unlock(pthread_mutex_t *mutex)                        //解锁互斥量,如果有等待该互斥量的线程,则唤醒其中的一个线程
  3. mutex:指向要加锁的互斥量的指针
  4. return:成功则返回0,否则返回一个正整数的错误码
复制代码
条件变量——cond类

条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等候这个共享数据的线程
条件变量使用线程间共享的全局变量进行同步,重要包罗两个动作:一个线程等候条件变量的条件成立而挂起;另一个线程使条件成立(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥量结合在一起


  • 初始化条件变量pthread_cond_init()
  1. include <pthread.h>
  2. int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)                //初始化条件变量对象,设置相关属性
  3. cond:指向条件变量对象的指针
  4. attr:指向线程条件属性对象的指针。一般为 NULL
  5. return:成功则返回0,失败返回错误号
复制代码


  • 销毁条件变量对象pthread_cond_destory()
  1. include <pthread.h>
  2. int pthread_cond_destroy(pthread_cond_t *cond)                //销毁条件变量对象,释放资源
  3. cond:指向条件变量对象的指针
  4. return:成功则返回0,失败返回错误号
复制代码


  • 唤醒线程pthread_cond_broadcast()

    • 函数以广播的方式唤醒全部等候目标条件变量的线程

  1. include <pthread.h>
  2. int pthread_cond_broadcast(pthread_cond_t *cond)                //唤醒所有在条件变量上等待的线程
  3. cond:指向条件变量对象的指针
  4. return:成功则返回0,失败返回错误号
复制代码


  • 阻塞线程pthread_cond_wait()

    • 函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁mutex解锁,当函数成功返回为0时,互斥锁会再次被锁上. 也就是说函数内部会有一次解锁和加锁操纵

  1. include <pthread.h>
  2. int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)                //让当前线程阻塞在条件变量上等待唤醒
  3. cond:指向条件变量对象的指针
  4. mutex:指向互斥锁对象的指针,用于保护条件变量
  5. return:成功则返回0,失败返回错误号
复制代码


  • 发送信号通知线程pthread_cond_signal()

    • pthread_cond_signal 只会通知一个等候该条件变量的线程,如果有多个线程在等候,则只有一个线程会收到通知,别的线程还会继续等候,直到下一次收到信号
    • 必须在已经得到与条件变量相干的互斥锁之后才华调用该函数
    • 如果没有等候该条件变量的线程,调用该函数也不会产生任何作用

  1. include <pthread.h>
  2. int pthread_cond_signal(pthread_cond_t *cond)
  3. cond:指向条件变量的指针
  4. return:成功则返回0,失败返回错误号
复制代码
锁机制的功能

锁机制用来实现多线程同步,通过锁机制,确保任一时刻只能有一个线程能进入关键代码段
为便于实现同步类的RAII机制,该项目在pthread库的基础上进行了封装,实现了类似于C++11的mutex标准库功能
Log——日志系统

日志系统分为两部分,一部分是单例模式和阻塞队列的界说;另一部分是日志类的界说与使用
基础知识



  • 日志

    • 由服务器主动创建,并记录运行状态,错误信息,访问数据的文件

  • 同步日志与异步日志

    • 同步日志:日志写入函数与工作线程串行执行,由于涉及到I/O操纵,当单条日志比较大的时候,同步模式会阻塞整个处理惩罚流程,服务器所能处理惩罚的并发能力将有所下降,尤其是在峰值的时候,写日志可能成为系统的瓶颈
    • 异步日志:将所写的日志内容先存入阻塞队列中,写线程从阻塞队列中取出内容,写入日志

  • 生产者-消耗者模子

    • 并发编程中的经典模子
    • 以多线程为例,为了实现线程间数据同步,生产者线程与消耗者线程共享一个缓冲区,此中生产者线程往缓冲区中push消息,消耗者线程从缓冲区中pop消息

  • 阻塞队列

    • 将生产者-消耗者模子进行封装,使用循环数组实现队列,作为两者共享的缓冲区

  • 单例模式

    • 最常用的设计模式之一,保证一个类仅有一个实例,并提供一个访问它的全局访问点,该实例被全部程序模块共享

同步日志与异步日志的比较



  • 同步日志

    • 优点

      • 代码简朴 同步日志是在主线程中直接输出日志,代码相对简朴。不必要处理惩罚线程间同步和异步题目
      • 日志输出顺序可控 同步日志是按照输入的顺序直接输出,因此输出的顺序比较可控,可以保证日志的顺序
      • 传统且易于调试 同步日志是较传统的方式,易于排错和调试

    • 缺点

      • 性能题目 由于同步日志是在主线程中直接输出,因此它会阻塞该线程,增加了主线程I/O等候的时间,可能会导致性能题目
      • 安全题目 如果在写日志时程序瓦解或被欺凌关闭,会导致日志数据丢失
      • 时间上的依赖 同步日志的输出时间与输入时间类似,这可能会对程序的性能造成负面影响

    • 实用范围

      • 同步日志实用于必要简朴、稳固且扩展性不是很高的场景,如一些小型程序大概进行调试的场景


  • 异步日志

    • 优点

      • 时间上的独立 异步日志是把日志数据缓存到内存或磁盘中,由专门的线程来将缓存的数据输出到日志文件或其他存储介质中,这样就可以将日志输入的时间与日志输出时间分开,而且不会被I/O等候时间影响,进步了日志输入的服从
      • 数据安全 异步日志通过将日志数据缓存到内存或磁盘中来避免了数据丢失。即使程序突然瓦解或被欺凌关闭,尚未输出的日志数据也已被保存,避免了数据丢失
      • 减少I/O负担 异步日志通过将日志数据缓存到内存或磁盘中来避免了大量的I/O操纵,从而减轻了I/O操纵对系统的负担,进步了程序的性能

    • 缺点

      • 多线程同步 异步日志必要多线程共同完成逻辑,如果线程同步不好处理惩罚,就可能出现竞争条件和死锁等题目
      • 日志顺序题目 由于异步日志是异步输出的,因此无法保证日志输出的顺序与其输入的顺序同等。这可能会在某些情况下引起题目
      • 代码复杂性 实现异步日志通常必要使用多线程编程、线程同步等技术,从而增加了代码的复杂性和难度

    • 实用范围

      • 异步日志实用于必要大量日志输出和较高的日志处理惩罚速度的场景,如高并发的网络服务器应用程序


单例模式



  • 实现思绪

    • 私有化它的构造函数,以防止外界创建单例类的对象
    • 使用类的私有静态指针变量指向类的唯一实例
    • 用一个公有的静态方法获取该实例

  • 两种实现方式
懒汉模式:在第一次被使用时才初始化(耽误初始化)
  1. class Singleton
  2. {
  3. private:
  4.         static Singleton* instance;                        //私有化静态指针指向唯一实例
  5. private:
  6.         Singleton() {};
  7.         ~Singleton() {};
  8. public:
  9.         static Singleton* getInstance();                         //公有静态方法获取实例
  10. };
  11. Singleton* Singleton::getInstance()
  12. {
  13.         if(instance == NULL)
  14.                 instance = new Singleton();
  15.         return instance;
  16. }
  17. // init static member
  18. Singleton* Singleton::instance = NULL;
复制代码
懒汉模式存在内存泄漏的题目,可以通过使用智能指针或使用静态的嵌套类对象来解决。以下是使用静态嵌套类对象的例子
  1. class Singleton
  2. {
  3. private:
  4.         static Singleton* instance;
  5. private:
  6.         Singleton() { };
  7.         ~Singleton() { };
  8. private:
  9.         class Deletor {
  10.         public:
  11.                 ~Deletor() {
  12.                         if(Singleton::instance != NULL)
  13.                                 delete Singleton::instance;
  14.                 }
  15.         };
  16.         static Deletor deletor;
  17.        
  18. public:
  19.         static Singleton* getInstance();
  20. };
  21. Singleton* Singleton::getInstance()
  22. {
  23.         if(instance == NULL)
  24.                 instance = new Singleton();
  25.         return instance;
  26. }
  27. // init static member
  28. Singleton::Deletor Singleton::deletor;
  29. Singleton* Singleton::instance = NULL;
复制代码
懒汉模式另有个题目是在多线程情况下第一次初始化过程中会出现线程安全题目,为相识决该题目可以使用互斥锁,同时为了避免每次初始化过程都加锁而使用双检测锁(Double Check Lock,DCL)
  1. class Singleton
  2. {
  3. private:
  4.         static Singleton* instance;                        //私有化静态指针指向唯一实例
  5.         static pthread_mutex_t lock;                //静态锁,静态函数只能访问静态成员
  6. private:
  7.         Singleton() {};
  8.         ~Singleton() {};
  9. public:
  10.         static Singleton* getInstance()                         //公有静态方法获取实例
  11. };
  12. Singleton* Singleton::instance = NULL;
  13. Singleton* Singleton::getInstance(){
  14.         if(instance == NULL)
  15.         {
  16.                 pthread_mutex_lock(&lock);
  17.                 if(NULL == instance)
  18.                 {
  19.                         instance == new Singleton();
  20.                 }
  21.                 pthread_mutex_unlock(&lock);
  22.         }
  23.         return instance;
复制代码
C++中规定了local static在多线程条件下的初始化行为,要求编译器保证了内部静态变量的线程安全性。在C++11标准下,《Effective C++》提出了一种更优雅的单例模式实现,使用函数内的 local static 对象,这种方法不消加锁和解锁操纵。这样,只有当第一次访问getInstance()方法时才创建实例。这种方法也被称为Meyers’ Singleton
  1. class Singleton
  2. {
  3. private:
  4.         Singleton() { };
  5.         ~Singleton() { };
  6. public:
  7.         static Singleton& getInstance();
  8. };
  9. Singleton::Singleton::getInstance()
  10.         {
  11.                 static Singleton instance;
  12.                 return instance;
  13.         }
复制代码
饿汉模式:在程序运行时立刻初始化
饿汉模式不必要用锁,就可以实现线程安全。原因在于,在程序运行时就界说了对象,并对其初始化。之后,不管哪个线程调用成员函数getinstance(),都只不过是返回一个对象的指针而已。所以是线程安全的,不必要在获取实例的成员函数中加锁
  1. class Singleton{
  2. private:
  3.     static Singleton* instance;
  4.     single(){}
  5.     ~single(){}
  6. public:
  7.     static Singleton* getinstance();
  8. };
  9. Singleton* Singleton::instance = new Singleton();
  10. Singleton* Singleton::getInstance(){
  11.     return instance;
  12. }
  13. //测试方法
  14. int main(){
  15.     Singleton *p1 = Singleton::getInstance();
  16.     Singleton *p2 = Singleton::getInstance();
  17.     if (p1 == p2)
  18.         cout << "same" << endl;
  19.     system("pause");
  20.     return 0;
  21. }
复制代码
但是饿汉模式存在一个潜伏题目:no-local static对象(函数外的static对象)在差别编译单位中的初始化顺序是未界说的,即“ static Singleton instance ”和“ static Singleton* getInstance() ”二者的初始化顺序不确定,如果在初始化完成之前调用 getInstance() 方法会返回一个未界说的实例


  • 单例模式的优点

    • 在内存里只有一个实例,减少了内存的开销,尤其是频繁的创建和销毁实例
    • 避免对资源的多重占用

  • 单例模式的缺点

    • 没有接口,不能继续,与单一职责原则冲突,一个类应该只关心内部逻辑,而不关心表面怎么样来实例化

阻塞队列

条件变量与生产者-消耗者模子

在Lock模块中提到,条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等候该共享数据的线程
此中pthread_cond_wait()的使用方式如下
  1. pthread _mutex_lock(&mutex)                                        //加锁避免因为多线程访问导致的资源竞争
  2. while(线程执行的条件是否成立){                       
  3.     pthread_cond_wait(&cond, &mutex);               
  4. }
  5. pthread_mutex_unlock(&mutex);
复制代码
pthread_cond_wait执行后的内部操纵分为以下几步:

  • 将线程放在条件变量的请求队列后,内部解锁(满足执行条件,线程阻塞自身开释公共资源)
  • 线程等候被pthread_cond_broadcast信号唤醒大概pthread_cond_signal信号唤醒,唤醒后去竞争锁
  • 线程若竞争到互斥锁,内部再次加锁
   一般来说,在多线程资源竞争的时候,在一个使用资源的线程内里(消耗者)判断资源是否可用,不可用,便调用pthread_cond_wait,在另一个线程内里(生产者)如果判断资源可用的话,则调用pthread_cond_signal发送一个资源可用信号
但wait成功后,有可能多个线程都在等候这个资源可用的信号,信号发出后只有一个资源可用。未竞争到资源的线程应当继续等候资源,故使用while而不是if(如只有一个消耗者,则可以使用if)
  阻塞队列(block_queue.h)

阻塞队列类中封装了生产者-消耗者模子,使用了循环数组实现了队列(也可使用STL中的队列)作为共享缓冲区
比较重要的是以下几个函数


  • 往队列添加元素(生产者生产了一个元素)
  1. //往队列添加元素,需要将所有使用队列的线程先唤醒
  2. //当有元素push进队列,相当于生产者生产了一个元素
  3. //若当前没有线程等待条件变量,则唤醒无意义
  4. bool push(const T& item)
  5. {
  6.     m_mutex.lock();
  7.     if (m_size >= m_max_size)
  8.     {
  9.         m_cond.broadcast();
  10.         m_mutex.unlock();
  11.         return false;
  12.     }
  13.     m_back = (m_back + 1) % m_max_size;                将新增数据放在循环数组的对应位置
  14.     m_array[m_back] = item;
  15.     m_size++;
  16.     m_cond.broadcast();
  17.     m_mutex.unlock();
  18.     return true;
  19. }
复制代码


  • 从队列中取出元素(消耗者消耗了一个元素)
  1. //pop时,如果当前队列没有元素,将会等待条件变量
  2. bool pop(T& item)
  3. {
  4.     m_mutex.lock();
  5.     while (m_size <= 0)
  6.     {
  7.         if (!m_cond.wait(m_mutex.get()))
  8.         {
  9.             m_mutex.unlock();
  10.             return false;
  11.         }
  12.     }
  13.     m_front = (m_front + 1) % m_max_size;
  14.     item = m_array[m_front];
  15.     m_size--;
  16.     m_mutex.unlock();
  17.     return true;
  18. }
复制代码


  • 增加的超时处理惩罚

    • 线程等候相干函数

      • sleep()存在缺陷:在等候期间线程无法唤醒
      • pthread_cond_wait()存在缺陷:必须借助别的线程触发信号,否则线程自身无法唤醒
      • pthread_cond_timedwait()的优势:超时或有信号触发唤醒线程,较为灵活


  1. //在pthread_cond_wait基础上增加了等待的时间,只指定时间内能抢到互斥锁即可
  2. //其他逻辑不变
  3. bool pop(T &item, int ms_timeout)
  4. {
  5.     struct timespec t = {0, 0};                //struct timespec提供秒和纳秒单位,最高精度是纳秒
  6.     struct timeval now = {0, 0};        //struct  timeval:提供秒和微秒单位,最高精度是微秒
  7.     gettimeofday(&now, NULL);                //gettimeofday(struct  timeval*tv,struct  timezone *tz),返回当前时间tv以及当前时区信息tz
  8.     pthread_mutex_lock(m_mutex);
  9.     if (m_size <= 0)
  10.     {
  11.         t.tv_sec = now.tv_sec + ms_timeout / 1000;
  12.         t.tv_nsec = (ms_timeout % 1000) * 1000;
  13.         if (0 != pthread_cond_timedwait(m_cond, m_mutex, &t))        //pthread_cond_timedwait()中的&t需要时间的绝对值
  14.         {
  15.             pthread_mutex_unlock(m_mutex);
  16.             return false;
  17.         }
  18.     }
  19.     if (m_size <= 0)                        //唤醒后线程未竞争到资源
  20.     {
  21.         pthread_mutex_unlock(m_mutex);
  22.         return false;
  23.     }
  24.     m_front = (m_front + 1) % m_max_size;
  25.     item = m_array[m_front];
  26.     m_size--;
  27.     pthread_mutex_unlock(m_mutex);
  28.     return true;
  29. }
复制代码
日志类

基础API



  • fputs():把字符串写入到指定的stream中(不包罗空字符)
  1. int fputs(const char *str, FILE *stream)
  2. str:一个数组,包含了要写入的以空字符终止的字符序列
  3. stream:指向 FILE 对象的指针,该 FILE 对象标识了要被写入字符串的流
  4. return:成功返回一个非负值,错误返回EOF
复制代码


  • 可变参数宏_VA__ARGS__
    __VA_ARGS__是一个可变参数的宏,是新的 C99 规范中新增的,现在似乎只有 gcc 支持( VC 从 VC2005 开始支持)。界说时宏界说中参数列表的最后一个参数为省略号 “…”
    例如:
  1. #include <stdio.h>
  2. #define myprintf(...) printf(__VA_ARGS__)
  3. int main()
  4. {
  5.     myprintf("0123456789\n");
  6.     myprintf("www.csdn.net\n");
  7.     return 0;
  8. }
  9. /*
  10. 0123456789
  11. www.csdn.net
  12. */
  13. //搭配va_list的format使用
  14. #define my_print2(format, ...) printf(format, __VA_ARGS__)
  15. #define my_print3(format, ...) printf(format, ##__VA_ARGS__)
复制代码
__VA_ARGS__宏前面加上##的作用在于,当可变参数的个数为0时,上述例子中printf参数列表中的的##会把前面多余的 “,” 去掉,否则会编译堕落,发起使用时加上 “##”,使得程序更加结实


  • fflush()函数:更新缓存区

    • 调用fflush()会将缓冲区中的内容写到stream所指的文件中去
    • 若stream为NULL,则会将全部打开的文件进行数据更新

  1. int fflush(FILE *stream)
  2. stream:指向 FILE 对象的指针
  3. fflush(stdin):刷新缓冲区,将缓冲区内的数据清空并丢弃
  4. fflush(stdout):刷新缓冲区,将缓冲区内的数据强制输出到设备
复制代码
在使用多个输出函数连续进行多次输出到控制台时,有可能上一个数据还没输出完毕,还在输出缓冲区中时,下一个printf就把另一个数据加入输出缓冲区,覆盖了原来的数据,出现输堕落误。为了避免这种错误,可以在prinf()后加上 “fflush(stdout);” 欺凌马上输出到控制台
printf打印到标准输出时,终端是行缓存, 碰到\n就将缓存输出,如果多次printf中只有最后一次打印带 “\n” ,则只有最后一次碰到 “\n” 时才将全部内容一次打印出来。stdout默认是是行缓冲的,碰到 “\n” 就写内容清缓存,而加上 “fflush(stdout);” 与 有 “\n” 作用一样,只是不换行
  1. int print1()                                //先进入sleep后打印 “hello world!”
  2. {
  3.         printf("hello");
  4.     sleep(5);
  5.     printf(" world!\n");
  6.     return 0;
  7. }
  8. int print2()                                //先打印 “hello” 再进入sleep 后打印 “ world!”
  9. {
  10.         printf("hello");
  11.         fflush(stdout);
  12.     sleep(5);
  13.     printf(" world!\n");
  14.     return 0;
  15. }
复制代码
日志类流程




  • 日志文件

    • 局部变量的懒汉模式获取实例
    • 生成日志文件,并判断同步和异步写入方式

  • 同步写入

    • 判断是否分文件
    • 直接格式化输出内容,将信息写入日志文件

  • 异步写入

    • 判断是否分文件
    • 格式化输出内容,将内容写入阻塞队列,创建一个写线程,从阻塞队列取出内容写入日志文件

具体实现



  • 日志类界说(log.h)

    • 通过局部变量的懒汉单例模式创建日志实例,对其进行初始化生成日志文件后,格式化输出内容,并根据差别的写入方式,完成对应逻辑,写入日志文件
    • 日志类中的方法不会被其他程序直接调用,通过四个可变参数宏提供其他程序的调用方法并对日志品级进行分类

  1. #pragma once
  2. #ifndef LOG_H
  3. #define LOG_H
  4. #include <stdio.h>
  5. #include <iostream>
  6. #include <string>
  7. #include <stdarg.h>
  8. #include <pthread.h>
  9. #include "block_queue.h"
  10. using namespace std;
  11. class Log
  12. {
  13. public:
  14.     //C++11以后,使用局部变量懒汉不用加锁
  15.     static Log* get_instance()  //采用C++11提供的Meyers' Singleton,无须加锁
  16.     {
  17.         static Log instance;
  18.         return &instance;
  19.     }
  20.     static void* flush_log_thread(void* args)           //异步写日志公有方法,调用私有方法async_write_log()
  21.     {
  22.         Log::get_instance()->async_write_log();
  23.     }
  24.     //可选择的参数有日志文件、日志缓冲区大小、最大行数以及最长日志条队列
  25.     bool init(const char* file_name, int close_log, int log_buf_size = 8192, int split_lines = 5000000, int max_queue_size = 0);
  26.     void write_log(int level, const char* format, ...);     //将输出内容按照标准格式整理
  27.     void flush(void);       //强制刷新缓冲区
  28. private:
  29.     Log();
  30.     virtual ~Log();
  31.     void* async_write_log()
  32.     {
  33.         string single_log;
  34.         //从阻塞队列中取出一个日志string,写入文件
  35.         while (m_log_queue->pop(single_log))
  36.         {
  37.             m_mutex.lock();
  38.             fputs(single_log.c_str(), m_fp);
  39.             m_mutex.unlock();
  40.         }
  41.     }
  42. private:
  43.     char dir_name[128]; //路径名
  44.     char log_name[128]; //log文件名
  45.     int m_split_lines;  //日志最大行数
  46.     int m_log_buf_size; //日志缓冲区大小
  47.     long long m_count;  //日志行数记录
  48.     int m_today;        //因为按天分类,记录当前时间是那一天
  49.     FILE* m_fp;         //打开log的文件指针
  50.     char* m_buf;        //要输出的内容
  51.     block_queue<string>* m_log_queue; //阻塞队列
  52.     bool m_is_async;                  //是否同步标志位
  53.     locker m_mutex;                   //同步类
  54.     int m_close_log; //关闭日志
  55. };
  56. //这四个宏定义在其他文件中使用,主要用于不同类型的日志输出
  57. #define LOG_DEBUG(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(0, format, ##__VA_ARGS__); Log::get_instance()->flush();}
  58. #define LOG_INFO(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(1, format, ##__VA_ARGS__); Log::get_instance()->flush();}
  59. #define LOG_WARN(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(2, format, ##__VA_ARGS__); Log::get_instance()->flush();}
  60. #define LOG_ERROR(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(3, format, ##__VA_ARGS__); Log::get_instance()->flush();}
  61. #endif
复制代码


  • 初始化生成日志文件&判断写入方式

    • 通过单例模式获取唯一的日志类,调用init方法,初始化生成日志文件,服务器启动按当前时刻创建日志,前缀为时间,后缀为自界说log文件名,并记录创建日志的时间day和行数count
    • 写入方式通过初始化时是否设置阻塞队列大小来判断,若队列大小为0,则为同步,否则为异步

  1. //异步需要设置阻塞队列的长度,同步不需要设置
  2. bool Log::init(const char* file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size)
  3. {
  4.     //如果设置了max_queue_size,则设置为异步
  5.     if (max_queue_size >= 1)
  6.     {
  7.         m_is_async = true;
  8.         m_log_queue = new block_queue<string>(max_queue_size);
  9.         pthread_t tid;
  10.         //flush_log_thread为回调函数,这里表示创建线程异步写日志
  11.         pthread_create(&tid, NULL, flush_log_thread, NULL);
  12.     }
  13.     //输出内容的长度
  14.     m_close_log = close_log;
  15.     m_log_buf_size = log_buf_size;
  16.     m_buf = new char[m_log_buf_size];
  17.     memset(m_buf, '\0', m_log_buf_size);
  18.     m_split_lines = split_lines;            //日志的最大行数
  19.     time_t t = time(NULL);
  20.     struct tm* sys_tm = localtime(&t);
  21.     struct tm my_tm = *sys_tm;
  22.     const char* p = strrchr(file_name, '/');        //从后往前找到第一个“/”的位置
  23.     char log_full_name[256] = { 0 };
  24.     //自定义日志名
  25.     if (p == NULL)      //若输入的文件名没有“/”,则以“时间+文件名”作为日志名
  26.     {
  27.         snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name);
  28.     }
  29.     else
  30.     {
  31.         strcpy(log_name, p + 1);
  32.         strncpy(dir_name, file_name, p - file_name + 1);        //p - file_name + 1:文件所在路径文件夹的长度    dir_name相当于“./”
  33.         snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name);
  34.     }
  35.     m_today = my_tm.tm_mday;
  36.     m_fp = fopen(log_full_name, "a");
  37.     if (m_fp == NULL)
  38.     {
  39.         return false;
  40.     }
  41.     return true;
  42. }
复制代码


  • 日志文件分级分文件并写入

    • 日志文件分级

      • Debug:调试代码时的输出,在系统现实运行时一般不使用
      • Warn:这种告诫与调试时终端的warning类似,是调试代码时使用
      • Info:报告系统当前的状态,当前执行的流程或接收的信息等
      • Error和Fatal:输出系统的错误信息

    • 日志分文件

      • 若当前day为创建日志时间,写入日志,否则按当前时间创建新log,更新创建时间和行数
      • 若行数超过最大行限定,在当前日志的末端加count/max_lines为后缀创建新log


  1. void Log::write_log(int level, const char* format, ...)
  2. {
  3.     struct timeval now = { 0, 0 };
  4.     gettimeofday(&now, NULL);
  5.     time_t t = now.tv_sec;
  6.     struct tm* sys_tm = localtime(&t);      //得到当前时间
  7.     struct tm my_tm = *sys_tm;
  8.     char s[16] = { 0 };
  9.     switch (level)              //文件分级
  10.     {
  11.     case 0:
  12.         strcpy(s, "[debug]:");
  13.         break;
  14.     case 1:
  15.         strcpy(s, "[info]:");
  16.         break;
  17.     case 2:
  18.         strcpy(s, "[warn]:");
  19.         break;
  20.     case 3:
  21.         strcpy(s, "[erro]:");
  22.         break;
  23.     default:
  24.         strcpy(s, "[info]:");
  25.         break;
  26.     }
  27.     //写入一个log,对m_count++, m_split_lines最大行数
  28.     m_mutex.lock();
  29.     m_count++;
  30.     if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0) //日志不是今天或写入的日志行数是最大行的倍数
  31.     {
  32.         char new_log[256] = { 0 };
  33.         fflush(m_fp);
  34.         fclose(m_fp);
  35.         char tail[16] = { 0 };
  36.         //格式化日志中的时间部分
  37.         snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday);     
  38.         if (m_today != my_tm.tm_mday)           //日期不是今天,创建当天日志,更新m_today和m_count
  39.         {
  40.             snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name);
  41.             m_today = my_tm.tm_mday;
  42.             m_count = 0;
  43.         }
  44.         else                                   //写入的日志超出最大行,在之前的日志名基础上加后缀“m_count/m_split_lines”
  45.         {
  46.             snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines);
  47.         }
  48.         m_fp = fopen(new_log, "a");
  49.     }
  50.     m_mutex.unlock();
  51.     va_list valst;
  52.     va_start(valst, format);    //将传入的format参数赋值给valst,便于格式化输出
  53.     string log_str;
  54.     m_mutex.lock();
  55.     //写入的具体时间内容格式
  56.     int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ",        //时间格式化,snprintf成功返回写字符的总数,其中不包括结尾的null字符
  57.         my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday,
  58.         my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s);
  59.     int m = vsnprintf(m_buf + n, m_log_buf_size - n - 1, format, valst);        //内容格式化,用于向字符串中打印数据、数据格式用户自定义,返回写入到字符数组str中的字符个数(不包含终止符)
  60.     m_buf[n + m] = '\n';
  61.     m_buf[n + m + 1] = '\0';
  62.     log_str = m_buf;
  63.     m_mutex.unlock();
  64.     if (m_is_async && !m_log_queue->full())     //异步且阻塞队列未满,则将日志信息加入阻塞队列
  65.     {
  66.         m_log_queue->push(log_str);
  67.     }
  68.     else                                        //同步则加锁向文件中写
  69.     {
  70.         m_mutex.lock();
  71.         fputs(log_str.c_str(), m_fp);
  72.         m_mutex.unlock();
  73.     }
  74.     va_end(valst);
  75. }
复制代码
va_start和va_end

  1. #define va_start(list,param1)   ( list = (va_list)&param1+ sizeof(param1) )
  2. #define va_end(list) ( list = (va_list)0 )
复制代码


  • va_start宏,获取可变参数列表的第一个参数的地址(list是类型为va_list的指针,param1是可变参数最左边的参数)
  • va_end宏,清空va_list可变参数列表
   某些版本的va_start宏为了方便对va_list的遍历,就给参数列表动态分配内存。这样一种C实现很可能使用va_end宏来开释此前动态分配的内存;
如果忘记调用宏va_end,最后得到的程序可能在某些机型上没有题目,而在另一些机型上则发生“内存泄漏”
——《C陷阱与缺陷》
  
重要参考文章

项目代码

参考项目代码(github)
项目团体明白

TinyWebServer—从0到服务器开发
小白视角:一文读懂社长的TinyWebServer
C++网络编程入门:轻量级web并发服务器开发
#Web服务器-原始版本(来自微信公众号:两猿社)
I/O多路复用

一文看懂IO多路复用
IO多路复用的三种机制select、poll、epoll
epoll的明白
epoll中的ET和LT模式区别
http模块明白

线程池web服务器http_conn请求类
深入浅出明白有限状态机
什么是状态机,一篇文章就够了
浅析epoll—epoll函数深入讲解
从零开始,编写一个Web服务器—HTTP部分详细讲解以及代码实现
mmap()函数参数详解
一篇让你彻底相识http请求报文和相应报文的布局
GET和POST两种基本请求方法的区别
线程池模块明白

WEB服务器——TinyWebServer代码详细讲解(threadpool模块)
基于Linux的C++轻量级web服务器webserver/httpserver——线程池
如何深刻明白Reactor和Proactor?
数据库毗连池模块明白

基于MysqlConnector/C++的数据库毗连池的实现
zwiley的随机——数据库毗连池
Mysql接口API详细使用说明
定时器模块明白

Web服务器—TinyWebServer代码详细讲解(timer模块)
如何实现一个定时器?看这一篇就够了
定时器设计(一)
信号同步机制明白

TinyWebServer 相干函数使用与样例 [线程同步机制]
良许Linux——互斥量mutex
日志系统明白

C++单例模式
生产者-消耗者模子:理论讲解及实现
va_list、va_start和va_end使用

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

悠扬随风

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表