State_Threads实现多协程服务器

打印 上一主题 下一主题

主题 793|帖子 793|积分 2379

  1. /*
  2. * Portions created by SGI are Copyright (C) 2000 Silicon Graphics, Inc.
  3. * All Rights Reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions
  7. * are met:
  8. *
  9. * 1. Redistributions of source code must retain the above copyright
  10. *    notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. *    notice, this list of conditions and the following disclaimer in the
  13. *    documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of Silicon Graphics, Inc. nor the names of its
  15. *    contributors may be used to endorse or promote products derived from
  16. *    this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19. * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22. * HOLDERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
  24. * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  25. * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  26. * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  27. * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  28. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. */
  30. 系统socket编成,listen后在accept阻塞监听:[连接](https://blog.csdn.net/qq_36784975/article/details/88768320?ops_request_misc=%257B%2522request%255Fid%2522%253A%25220BE0E468-2E50-442E-B409-280726303185%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=0BE0E468-2E50-442E-B409-280726303185&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-88768320-null-null.142%5Ev100%5Epc_search_result_base4&utm_term=socket%28PF_INET,%20SOCK_STREAM,%200%29%29&spm=1018.2226.3001.4187)
  31. 源码:[连接](https://download.csdn.net/download/weixin_43466192/89891912)
  32. 罗老师:[https://blog.csdn.net/u012117034/article/details/135124626](https://blog.csdn.net/u012117034/article/details/135124626)
  33. 源码目录介绍:[https://blog.csdn.net/caoshangpa/article/details/79582873](https://blog.csdn.net/caoshangpa/article/details/79582873)
  34. #include <stdio.h>
  35. #include <stdlib.h>
  36. #include <string.h>
  37. #include <unistd.h>
  38. #include <time.h>
  39. #include <sys/types.h>
  40. #include <sys/stat.h>
  41. #include <sys/socket.h>
  42. #include <sys/wait.h>
  43. #include <netinet/in.h>
  44. #include <arpa/inet.h>
  45. #include <netdb.h>
  46. #include <fcntl.h>
  47. #include <signal.h>
  48. #include <pwd.h>
  49. #include "st.h"
  50. /******************************************************************
  51. * Server configuration parameters
  52. */
  53. /* Log files */
  54. #define PID_FILE    "pid"
  55. #define ERRORS_FILE "errors"
  56. #define ACCESS_FILE "access"
  57. /* Default server port */
  58. #define SERV_PORT_DEFAULT 8000
  59. /* Socket listen queue size */
  60. #define LISTENQ_SIZE_DEFAULT 256
  61. /* Max number of listening sockets ("hardware virtual servers") */
  62. #define MAX_BIND_ADDRS 16
  63. /* Max number of "spare" threads per process per socket */
  64. #define MAX_WAIT_THREADS_DEFAULT 8
  65. /* Number of file descriptors needed to handle one client session */
  66. #define FD_PER_THREAD 2
  67. /* Access log buffer flushing interval (in seconds) */
  68. #define ACCLOG_FLUSH_INTERVAL 30
  69. /* Request read timeout (in seconds) */
  70. #define REQUEST_TIMEOUT 30
  71. /******************************************************************
  72. * Global data
  73. */
  74. struct socket_info {
  75.   st_netfd_t nfd;               /* Listening socket                     */
  76.   char *addr;                   /* Bind address                         */
  77.   unsigned int port;            /* Port                                 */
  78.   int wait_threads;             /* Number of threads waiting to accept  */
  79.   int busy_threads;             /* Number of threads processing request */
  80.   int rqst_count;               /* Total number of processed requests   */
  81. } srv_socket[MAX_BIND_ADDRS];   /* Array of listening sockets           */
  82. static int sk_count = 0;        /* Number of listening sockets          */
  83. static int vp_count = 0;        /* Number of server processes (VPs)     */
  84. static pid_t *vp_pids;          /* Array of VP pids                     */
  85. static int my_index = -1;       /* Current process index */
  86. static pid_t my_pid = -1;       /* Current process pid   */
  87. static st_netfd_t sig_pipe[2];  /* Signal pipe           */
  88. /*
  89. * Configuration flags/parameters
  90. */
  91. static int interactive_mode = 0;
  92. static int serialize_accept = 0;
  93. static int log_access       = 0;
  94. static char *logdir     = NULL;
  95. static char *username   = NULL;
  96. static int listenq_size = LISTENQ_SIZE_DEFAULT;
  97. static int errfd        = STDERR_FILENO;
  98. /*
  99. * Thread throttling parameters (all numbers are per listening socket).
  100. * Zero values mean use default.
  101. */
  102. static int max_threads = 0;       /* Max number of threads         */
  103. static int max_wait_threads = 0;  /* Max number of "spare" threads */
  104. static int min_wait_threads = 2;  /* Min number of "spare" threads */
  105. /******************************************************************
  106. * Useful macros
  107. */
  108. #ifndef INADDR_NONE
  109. #define INADDR_NONE 0xffffffff
  110. #endif
  111. #define SEC2USEC(s) ((s)*1000000LL)
  112. #define WAIT_THREADS(i)  (srv_socket[i].wait_threads)
  113. #define BUSY_THREADS(i)  (srv_socket[i].busy_threads)
  114. #define TOTAL_THREADS(i) (WAIT_THREADS(i) + BUSY_THREADS(i))
  115. #define RQST_COUNT(i)    (srv_socket[i].rqst_count)
  116. /******************************************************************
  117. * Forward declarations
  118. */
  119. static void usage(const char *progname);
  120. static void parse_arguments(int argc, char *argv[]);
  121. static void start_daemon(void);
  122. static void set_thread_throttling(void);
  123. static void create_listeners(void);
  124. static void change_user(void);
  125. static void open_log_files(void);
  126. static void start_processes(void);
  127. static void wdog_sighandler(int signo);
  128. static void child_sighandler(int signo);
  129. static void install_sighandlers(void);
  130. static void start_threads(void);
  131. static void *process_signals(void *arg);
  132. static void *flush_acclog_buffer(void *arg);
  133. static void *handle_connections(void *arg);
  134. static void dump_server_info(void);
  135. static void Signal(int sig, void (*handler)(int));
  136. static int cpu_count(void);
  137. extern void handle_session(long srv_socket_index, st_netfd_t cli_nfd);
  138. extern void load_configs(void);
  139. extern void logbuf_open(void);
  140. extern void logbuf_flush(void);
  141. extern void logbuf_close(void);
  142. /* Error reporting functions defined in the error.c file */
  143. extern void err_sys_report(int fd, const char *fmt, ...);
  144. extern void err_sys_quit(int fd, const char *fmt, ...);
  145. extern void err_sys_dump(int fd, const char *fmt, ...);
  146. extern void err_report(int fd, const char *fmt, ...);
  147. extern void err_quit(int fd, const char *fmt, ...);
  148. /*
  149. * General server example: accept a client connection and do something.
  150. * This program just outputs a short HTML page, but can be easily adapted
  151. * to do other things.
  152. *
  153. * This server creates a constant number of processes ("virtual processors"
  154. * or VPs) and replaces them when they die. Each virtual processor manages
  155. * its own independent set of state threads (STs), the number of which varies
  156. * with load against the server. Each state thread listens to exactly one
  157. * listening socket. The initial process becomes the watchdog, waiting for
  158. * children (VPs) to die or for a signal requesting termination or restart.
  159. * Upon receiving a restart signal (SIGHUP), all VPs close and then reopen
  160. * log files and reload configuration. All currently active connections remain
  161. * active. It is assumed that new configuration affects only request
  162. * processing and not the general server parameters such as number of VPs,
  163. * thread limits, bind addresses, etc. Those are specified as command line
  164. * arguments, so the server has to be stopped and then started again in order
  165. * to change them.
  166. *
  167. * Each state thread loops processing connections from a single listening
  168. * socket. Only one ST runs on a VP at a time, and VPs do not share memory,
  169. * so no mutual exclusion locking is necessary on any data, and the entire
  170. * server is free to use all the static variables and non-reentrant library
  171. * functions it wants, greatly simplifying programming and debugging and
  172. * increasing performance (for example, it is safe to ++ and -- all global
  173. * counters or call inet_ntoa(3) without any mutexes). The current thread on
  174. * each VP maintains equilibrium on that VP, starting a new thread or
  175. * terminating itself if the number of spare threads exceeds the lower or
  176. * upper limit.
  177. *
  178. * All I/O operations on sockets must use the State Thread library's I/O
  179. * functions because only those functions prevent blocking of the entire VP
  180. * process and perform state thread scheduling.
  181. */
  182. int main(int argc, char *argv[])
  183. {
  184.   /* Parse command-line options */
  185.   parse_arguments(argc, argv);
  186.   /* Allocate array of server pids */
  187.   if ((vp_pids = calloc(vp_count, sizeof(pid_t))) == NULL)
  188.     err_sys_quit(errfd, "ERROR: calloc failed");
  189.   /* Start the daemon */
  190.   if (!interactive_mode)
  191.     start_daemon();
  192.   /* Initialize the ST library */
  193.   if (st_init() < 0)
  194.     err_sys_quit(errfd, "ERROR: initialization failed: st_init");
  195.   /* Set thread throttling parameters */
  196.   set_thread_throttling();
  197.   /* Create listening sockets */
  198.   create_listeners();
  199.   /* Change the user */
  200.   if (username)
  201.     change_user();
  202.   /* Open log files */
  203.   open_log_files();
  204.   /* Start server processes (VPs) */
  205.   start_processes();
  206.   /* Turn time caching on */
  207.   st_timecache_set(1);
  208.   /* Install signal handlers */
  209.   install_sighandlers();
  210.   /* Load configuration from config files */
  211.   load_configs();
  212.   /* Start all threads */
  213.   start_threads();
  214.   /* Become a signal processing thread */
  215.   process_signals(NULL);
  216.   /* NOTREACHED */
  217.   return 1;
  218. }
  219. /******************************************************************/
  220. static void usage(const char *progname)
  221. {
  222.   fprintf(stderr, "Usage: %s -l <log_directory> [<options>]\n\n"
  223.           "Possible options:\n\n"
  224.           "\t-b <host>:<port>        Bind to specified address. Multiple"
  225.           " addresses\n"
  226.           "\t                        are permitted.\n"
  227.           "\t-p <num_processes>      Create specified number of processes.\n"
  228.           "\t-t <min_thr>:<max_thr>  Specify thread limits per listening"
  229.           " socket\n"
  230.           "\t                        across all processes.\n"
  231.           "\t-u <user>               Change server's user id to specified"
  232.           " value.\n"
  233.           "\t-q <backlog>            Set max length of pending connections"
  234.           " queue.\n"
  235.           "\t-a                      Enable access logging.\n"
  236.           "\t-i                      Run in interactive mode.\n"
  237.           "\t-S                      Serialize all accept() calls.\n"
  238.           "\t-h                      Print this message.\n",
  239.           progname);
  240.   exit(1);
  241. }
  242. /******************************************************************/
  243. static void parse_arguments(int argc, char *argv[])
  244. {
  245.   extern char *optarg;
  246.   int opt;
  247.   char *c;
  248.   while ((opt = getopt(argc, argv, "b:p:l:t:u:q:aiSh")) != EOF) {
  249.     switch (opt) {
  250.     case 'b':
  251.       if (sk_count >= MAX_BIND_ADDRS)
  252.         err_quit(errfd, "ERROR: max number of bind addresses (%d) exceeded",
  253.                  MAX_BIND_ADDRS);
  254.       if ((c = strdup(optarg)) == NULL)
  255.         err_sys_quit(errfd, "ERROR: strdup");
  256.       srv_socket[sk_count++].addr = c;
  257.       break;
  258.     case 'p':
  259.       vp_count = atoi(optarg);
  260.       if (vp_count < 1)
  261.         err_quit(errfd, "ERROR: invalid number of processes: %s", optarg);
  262.       break;
  263.     case 'l':
  264.       logdir = optarg;
  265.       break;
  266.     case 't':
  267.       max_wait_threads = (int) strtol(optarg, &c, 10);
  268.       if (*c++ == ':')
  269.         max_threads = atoi(c);
  270.       if (max_wait_threads < 0 || max_threads < 0)
  271.         err_quit(errfd, "ERROR: invalid number of threads: %s", optarg);
  272.       break;
  273.     case 'u':
  274.       username = optarg;
  275.       break;
  276.     case 'q':
  277.       listenq_size = atoi(optarg);
  278.       if (listenq_size < 1)
  279.         err_quit(errfd, "ERROR: invalid listen queue size: %s", optarg);
  280.       break;
  281.     case 'a':
  282.       log_access = 1;
  283.       break;
  284.     case 'i':
  285.       interactive_mode = 1;
  286.       break;
  287.     case 'S':
  288.       /*
  289.        * Serialization decision is tricky on some platforms. For example,
  290.        * Solaris 2.6 and above has kernel sockets implementation, so supposedly
  291.        * there is no need for serialization. The ST library may be compiled
  292.        * on one OS version, but used on another, so the need for serialization
  293.        * should be determined at run time by the application. Since it's just
  294.        * an example, the serialization decision is left up to user.
  295.        * Only on platforms where the serialization is never needed on any OS
  296.        * version st_netfd_serialize_accept() is a no-op.
  297.        */
  298.       serialize_accept = 1;
  299.       break;
  300.     case 'h':
  301.     case '?':
  302.       usage(argv[0]);
  303.     }
  304.   }
  305.   if (logdir == NULL && !interactive_mode) {
  306.     err_report(errfd, "ERROR: logging directory is required\n");
  307.     usage(argv[0]);
  308.   }
  309.   if (getuid() == 0 && username == NULL)
  310.     err_report(errfd, "WARNING: running as super-user!");
  311.   if (vp_count == 0 && (vp_count = cpu_count()) < 1)
  312.     vp_count = 1;
  313.   if (sk_count == 0) {
  314.     sk_count = 1;
  315.     srv_socket[0].addr = "0.0.0.0";
  316.   }
  317. }
  318. /******************************************************************/
  319. static void start_daemon(void)
  320. {
  321.   pid_t pid;
  322.   /* Start forking */
  323.   if ((pid = fork()) < 0)
  324.     err_sys_quit(errfd, "ERROR: fork");
  325.   if (pid > 0)
  326.     exit(0);                  /* parent */
  327.   /* First child process */
  328.   setsid();                   /* become session leader */
  329.   if ((pid = fork()) < 0)
  330.     err_sys_quit(errfd, "ERROR: fork");
  331.   if (pid > 0)                /* first child */
  332.     exit(0);
  333.   umask(022);
  334.   if (chdir(logdir) < 0)
  335.     err_sys_quit(errfd, "ERROR: can't change directory to %s: chdir", logdir);
  336. }
  337. /******************************************************************
  338. * For simplicity, the minimal size of thread pool is considered
  339. * as a maximum number of spare threads (max_wait_threads) that
  340. * will be created upon server startup. The pool size can grow up
  341. * to the max_threads value. Note that this is a per listening
  342. * socket limit. It is also possible to limit the total number of
  343. * threads for all sockets rather than impose a per socket limit.
  344. */
  345. static void set_thread_throttling(void)
  346. {
  347.   /*
  348.    * Calculate total values across all processes.
  349.    * All numbers are per listening socket.
  350.    */
  351.   if (max_wait_threads == 0)
  352.     max_wait_threads = MAX_WAIT_THREADS_DEFAULT * vp_count;
  353.   /* Assuming that each client session needs FD_PER_THREAD file descriptors */
  354.   if (max_threads == 0)
  355.     max_threads = (st_getfdlimit() * vp_count) / FD_PER_THREAD / sk_count;
  356.   if (max_wait_threads > max_threads)
  357.     max_wait_threads = max_threads;
  358.   /*
  359.    * Now calculate per-process values.
  360.    */
  361.   if (max_wait_threads % vp_count)
  362.     max_wait_threads = max_wait_threads / vp_count + 1;
  363.   else
  364.     max_wait_threads = max_wait_threads / vp_count;
  365.   if (max_threads % vp_count)
  366.     max_threads = max_threads / vp_count + 1;
  367.   else
  368.     max_threads = max_threads / vp_count;
  369.   if (min_wait_threads > max_wait_threads)
  370.     min_wait_threads = max_wait_threads;
  371. }
  372. /******************************************************************/
  373. static void create_listeners(void)
  374. {
  375.   int i, n, sock;
  376.   char *c;
  377.   struct sockaddr_in serv_addr;
  378.   struct hostent *hp;
  379.   unsigned short port;
  380.   for (i = 0; i < sk_count; i++) {
  381.     port = 0;
  382.     if ((c = strchr(srv_socket[i].addr, ':')) != NULL) {
  383.       *c++ = '\0';
  384.       port = (unsigned short) atoi(c);
  385.     }
  386.     if (srv_socket[i].addr[0] == '\0')
  387.       srv_socket[i].addr = "0.0.0.0";
  388.     if (port == 0)
  389.       port = SERV_PORT_DEFAULT;
  390.     /* Create server socket */
  391.     if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0)
  392.       err_sys_quit(errfd, "ERROR: can't create socket: socket");
  393.     n = 1;
  394.     if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n)) < 0)
  395.       err_sys_quit(errfd, "ERROR: can't set SO_REUSEADDR: setsockopt");
  396.     memset(&serv_addr, 0, sizeof(serv_addr));
  397.     serv_addr.sin_family = AF_INET;
  398.     serv_addr.sin_port = htons(port);
  399.     serv_addr.sin_addr.s_addr = inet_addr(srv_socket[i].addr);
  400.     if (serv_addr.sin_addr.s_addr == INADDR_NONE) {
  401.       /* not dotted-decimal */
  402.       if ((hp = gethostbyname(srv_socket[i].addr)) == NULL)
  403.         err_quit(errfd, "ERROR: can't resolve address: %s",
  404.                  srv_socket[i].addr);
  405.       memcpy(&serv_addr.sin_addr, hp->h_addr, hp->h_length);
  406.     }
  407.     srv_socket[i].port = port;
  408.     /* Do bind and listen */
  409.     if (bind(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
  410.       err_sys_quit(errfd, "ERROR: can't bind to address %s, port %hu",
  411.                    srv_socket[i].addr, port);
  412.     if (listen(sock, listenq_size) < 0)
  413.       err_sys_quit(errfd, "ERROR: listen");
  414.     /* Create file descriptor object from OS socket */
  415.     if ((srv_socket[i].nfd = st_netfd_open_socket(sock)) == NULL)
  416.       err_sys_quit(errfd, "ERROR: st_netfd_open_socket");
  417.     /*
  418.      * On some platforms (e.g. IRIX, Linux) accept() serialization is never
  419.      * needed for any OS version.  In that case st_netfd_serialize_accept()
  420.      * is just a no-op. Also see the comment above.
  421.      */
  422.     if (serialize_accept && st_netfd_serialize_accept(srv_socket[i].nfd) < 0)
  423.       err_sys_quit(errfd, "ERROR: st_netfd_serialize_accept");
  424.   }
  425. }
  426. /******************************************************************/
  427. static void change_user(void)
  428. {
  429.   struct passwd *pw;
  430.   if ((pw = getpwnam(username)) == NULL)
  431.     err_quit(errfd, "ERROR: can't find user '%s': getpwnam failed", username);
  432.   if (setgid(pw->pw_gid) < 0)
  433.     err_sys_quit(errfd, "ERROR: can't change group id: setgid");
  434.   if (setuid(pw->pw_uid) < 0)
  435.     err_sys_quit(errfd, "ERROR: can't change user id: setuid");
  436.   err_report(errfd, "INFO: changed process user id to '%s'", username);
  437. }
  438. /******************************************************************/
  439. static void open_log_files(void)
  440. {
  441.   int fd;
  442.   char str[32];
  443.   if (interactive_mode)
  444.     return;
  445.   /* Open access log */
  446.   if (log_access)
  447.     logbuf_open();
  448.   /* Open and write pid to pid file */
  449.   if ((fd = open(PID_FILE, O_CREAT | O_WRONLY | O_TRUNC, 0644)) < 0)
  450.     err_sys_quit(errfd, "ERROR: can't open pid file: open");
  451.   sprintf(str, "%d\n", (int)getpid());
  452.   if (write(fd, str, strlen(str)) != strlen(str))
  453.     err_sys_quit(errfd, "ERROR: can't write to pid file: write");
  454.   close(fd);
  455.   /* Open error log file */
  456.   if ((fd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0)
  457.     err_sys_quit(errfd, "ERROR: can't open error log file: open");
  458.   errfd = fd;
  459.   err_report(errfd, "INFO: starting the server...");
  460. }
  461. /******************************************************************/
  462. static void start_processes(void)
  463. {
  464.   int i, status;
  465.   pid_t pid;
  466.   sigset_t mask, omask;
  467.   if (interactive_mode) {
  468.     my_index = 0;
  469.     my_pid = getpid();
  470.     return;
  471.   }
  472.   for (i = 0; i < vp_count; i++) {
  473.     if ((pid = fork()) < 0) {
  474.       err_sys_report(errfd, "ERROR: can't create process: fork");
  475.       if (i == 0)
  476.         exit(1);
  477.       err_report(errfd, "WARN: started only %d processes out of %d", i,
  478.                  vp_count);
  479.       vp_count = i;
  480.       break;
  481.     }
  482.     if (pid == 0) {
  483.       my_index = i;
  484.       my_pid = getpid();
  485.       /* Child returns to continue in main() */
  486.       return;
  487.     }
  488.     vp_pids[i] = pid;
  489.   }
  490.   /*
  491.    * Parent process becomes a "watchdog" and never returns to main().
  492.    */
  493.   /* Install signal handlers */
  494.   Signal(SIGTERM, wdog_sighandler);  /* terminate */
  495.   Signal(SIGHUP,  wdog_sighandler);  /* restart   */
  496.   Signal(SIGUSR1, wdog_sighandler);  /* dump info */
  497.   /* Now go to sleep waiting for a child termination or a signal */
  498.   for ( ; ; ) {
  499.     if ((pid = wait(&status)) < 0) {
  500.       if (errno == EINTR)
  501.         continue;
  502.       err_sys_quit(errfd, "ERROR: watchdog: wait");
  503.     }
  504.     /* Find index of the exited child */
  505.     for (i = 0; i < vp_count; i++) {
  506.       if (vp_pids[i] == pid)
  507.         break;
  508.     }
  509.     /* Block signals while printing and forking */
  510.     sigemptyset(&mask);
  511.     sigaddset(&mask, SIGTERM);
  512.     sigaddset(&mask, SIGHUP);
  513.     sigaddset(&mask, SIGUSR1);
  514.     sigprocmask(SIG_BLOCK, &mask, &omask);
  515.     if (WIFEXITED(status))
  516.       err_report(errfd, "WARN: watchdog: process %d (pid %d) exited"
  517.                  " with status %d", i, pid, WEXITSTATUS(status));
  518.     else if (WIFSIGNALED(status))
  519.       err_report(errfd, "WARN: watchdog: process %d (pid %d) terminated"
  520.                  " by signal %d", i, pid, WTERMSIG(status));
  521.     else if (WIFSTOPPED(status))
  522.       err_report(errfd, "WARN: watchdog: process %d (pid %d) stopped"
  523.                  " by signal %d", i, pid, WSTOPSIG(status));
  524.     else
  525.       err_report(errfd, "WARN: watchdog: process %d (pid %d) terminated:"
  526.                  " unknown termination reason", i, pid);
  527.     /* Fork another VP */
  528.     if ((pid = fork()) < 0) {
  529.       err_sys_report(errfd, "ERROR: watchdog: can't create process: fork");
  530.     } else if (pid == 0) {
  531.       my_index = i;
  532.       my_pid = getpid();
  533.       /* Child returns to continue in main() */
  534.       return;
  535.     }
  536.     vp_pids[i] = pid;
  537.     /* Restore the signal mask */
  538.     sigprocmask(SIG_SETMASK, &omask, NULL);
  539.   }
  540. }
  541. /******************************************************************/
  542. static void wdog_sighandler(int signo)
  543. {
  544.   int i, err;
  545.   /* Save errno */
  546.   err = errno;
  547.   /* Forward the signal to all children */
  548.   for (i = 0; i < vp_count; i++) {
  549.     if (vp_pids[i] > 0)
  550.       kill(vp_pids[i], signo);
  551.   }
  552.   /*
  553.    * It is safe to do pretty much everything here because process is
  554.    * sleeping in wait() which is async-safe.
  555.    */
  556.   switch (signo) {
  557.   case SIGHUP:
  558.     err_report(errfd, "INFO: watchdog: caught SIGHUP");
  559.     /* Reopen log files - needed for log rotation */
  560.     if (log_access) {
  561.       logbuf_close();
  562.       logbuf_open();
  563.     }
  564.     close(errfd);
  565.     if ((errfd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0)
  566.       err_sys_quit(STDERR_FILENO, "ERROR: watchdog: open");
  567.     break;
  568.   case SIGTERM:
  569.     /* Non-graceful termination */
  570.     err_report(errfd, "INFO: watchdog: caught SIGTERM, terminating");
  571.     unlink(PID_FILE);
  572.     exit(0);
  573.   case SIGUSR1:
  574.     err_report(errfd, "INFO: watchdog: caught SIGUSR1");
  575.     break;
  576.   default:
  577.     err_report(errfd, "INFO: watchdog: caught signal %d", signo);
  578.   }
  579.   /* Restore errno */
  580.   errno = err;
  581. }
  582. /******************************************************************/
  583. static void install_sighandlers(void)
  584. {
  585.   sigset_t mask;
  586.   int p[2];
  587.   /* Create signal pipe */
  588.   if (pipe(p) < 0)
  589.     err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create"
  590.                  " signal pipe: pipe", my_index, my_pid);
  591.   if ((sig_pipe[0] = st_netfd_open(p[0])) == NULL ||
  592.       (sig_pipe[1] = st_netfd_open(p[1])) == NULL)
  593.     err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create"
  594.                  " signal pipe: st_netfd_open", my_index, my_pid);
  595.   /* Install signal handlers */
  596.   Signal(SIGTERM, child_sighandler);  /* terminate */
  597.   Signal(SIGHUP,  child_sighandler);  /* restart   */
  598.   Signal(SIGUSR1, child_sighandler);  /* dump info */
  599.   /* Unblock signals */
  600.   sigemptyset(&mask);
  601.   sigaddset(&mask, SIGTERM);
  602.   sigaddset(&mask, SIGHUP);
  603.   sigaddset(&mask, SIGUSR1);
  604.   sigprocmask(SIG_UNBLOCK, &mask, NULL);
  605. }
  606. /******************************************************************/
  607. static void child_sighandler(int signo)
  608. {
  609.   int err, fd;
  610.   err = errno;
  611.   fd = st_netfd_fileno(sig_pipe[1]);
  612.   /* write() is async-safe */
  613.   if (write(fd, &signo, sizeof(int)) != sizeof(int))
  614.     err_sys_quit(errfd, "ERROR: process %d (pid %d): child's signal"
  615.                  " handler: write", my_index, my_pid);
  616.   errno = err;
  617. }
  618. /******************************************************************
  619. * The "main" function of the signal processing thread.
  620. */
  621. /* ARGSUSED */
  622. static void *process_signals(void *arg)
  623. {
  624.   int signo;
  625.   for ( ; ; ) {
  626.     /* Read the next signal from the signal pipe */
  627.     if (st_read(sig_pipe[0], &signo, sizeof(int),
  628.      ST_UTIME_NO_TIMEOUT) != sizeof(int))
  629.       err_sys_quit(errfd, "ERROR: process %d (pid %d): signal processor:"
  630.                    " st_read", my_index, my_pid);
  631.     switch (signo) {
  632.     case SIGHUP:
  633.       err_report(errfd, "INFO: process %d (pid %d): caught SIGHUP,"
  634.                  " reloading configuration", my_index, my_pid);
  635.       if (interactive_mode) {
  636.         load_configs();
  637.         break;
  638.       }
  639.       /* Reopen log files - needed for log rotation */
  640.       if (log_access) {
  641.         logbuf_flush();
  642.         logbuf_close();
  643.         logbuf_open();
  644.       }
  645.       close(errfd);
  646.       if ((errfd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0)
  647.         err_sys_quit(STDERR_FILENO, "ERROR: process %d (pid %d): signal"
  648.                      " processor: open", my_index, my_pid);
  649.       /* Reload configuration */
  650.       load_configs();
  651.       break;
  652.     case SIGTERM:
  653.       /*
  654.        * Terminate ungracefully since it is generally not known how long
  655.        * it will take to gracefully complete all client sessions.
  656.        */
  657.       err_report(errfd, "INFO: process %d (pid %d): caught SIGTERM,"
  658.                  " terminating", my_index, my_pid);
  659.       if (log_access)
  660.         logbuf_flush();
  661.       exit(0);
  662.     case SIGUSR1:
  663.       err_report(errfd, "INFO: process %d (pid %d): caught SIGUSR1",
  664.                  my_index, my_pid);
  665.       /* Print server info to stderr */
  666.       dump_server_info();
  667.       break;
  668.     default:
  669.       err_report(errfd, "INFO: process %d (pid %d): caught signal %d",
  670.                  my_index, my_pid, signo);
  671.     }
  672.   }
  673.   /* NOTREACHED */
  674.   return NULL;
  675. }
  676. /******************************************************************
  677. * The "main" function of the access log flushing thread.
  678. */
  679. /* ARGSUSED */
  680. static void *flush_acclog_buffer(void *arg)
  681. {
  682.   for ( ; ; ) {
  683.     st_sleep(ACCLOG_FLUSH_INTERVAL);
  684.     logbuf_flush();
  685.   }
  686.   /* NOTREACHED */
  687.   return NULL;
  688. }
  689. /******************************************************************/
  690. static void start_threads(void)
  691. {
  692.   long i, n;
  693.   /* Create access log flushing thread */
  694.   if (log_access && st_thread_create(flush_acclog_buffer, NULL, 0, 0) == NULL)
  695.     err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create"
  696.                  " log flushing thread", my_index, my_pid);
  697.   /* Create connections handling threads */
  698.   for (i = 0; i < sk_count; i++) {
  699.     err_report(errfd, "INFO: process %d (pid %d): starting %d threads"
  700.                " on %s:%u", my_index, my_pid, max_wait_threads,
  701.                srv_socket[i].addr, srv_socket[i].port);
  702.     WAIT_THREADS(i) = 0;
  703.     BUSY_THREADS(i) = 0;
  704.     RQST_COUNT(i) = 0;
  705.     for (n = 0; n < max_wait_threads; n++) {
  706.       if (st_thread_create(handle_connections, (void *)i, 0, 0) != NULL)
  707.         WAIT_THREADS(i)++;
  708.       else
  709.         err_sys_report(errfd, "ERROR: process %d (pid %d): can't create"
  710.                        " thread", my_index, my_pid);
  711.     }
  712.     if (WAIT_THREADS(i) == 0)
  713.       exit(1);
  714.   }
  715. }
  716. /******************************************************************/
  717. static void *handle_connections(void *arg)
  718. {
  719.   st_netfd_t srv_nfd, cli_nfd;
  720.   struct sockaddr_in from;
  721.   int fromlen;
  722.   long i = (long) arg;
  723.   srv_nfd = srv_socket[i].nfd;
  724.   fromlen = sizeof(from);
  725.   while (WAIT_THREADS(i) <= max_wait_threads) {
  726.     cli_nfd = st_accept(srv_nfd, (struct sockaddr *)&from, &fromlen,
  727.      ST_UTIME_NO_TIMEOUT);
  728.     if (cli_nfd == NULL) {
  729.       err_sys_report(errfd, "ERROR: can't accept connection: st_accept");
  730.       continue;
  731.     }
  732.     /* Save peer address, so we can retrieve it later */
  733.     st_netfd_setspecific(cli_nfd, &from.sin_addr, NULL);
  734.     WAIT_THREADS(i)--;
  735.     BUSY_THREADS(i)++;
  736.     if (WAIT_THREADS(i) < min_wait_threads && TOTAL_THREADS(i) < max_threads) {
  737.       /* Create another spare thread */
  738.       if (st_thread_create(handle_connections, (void *)i, 0, 0) != NULL)
  739.         WAIT_THREADS(i)++;
  740.       else
  741.         err_sys_report(errfd, "ERROR: process %d (pid %d): can't create"
  742.                        " thread", my_index, my_pid);
  743.     }
  744.     handle_session(i, cli_nfd);
  745.     st_netfd_close(cli_nfd);
  746.     WAIT_THREADS(i)++;
  747.     BUSY_THREADS(i)--;
  748.   }
  749.   WAIT_THREADS(i)--;
  750.   return NULL;
  751. }
  752. /******************************************************************/
  753. static void dump_server_info(void)
  754. {
  755.   char *buf;
  756.   int i, len;
  757.   if ((buf = malloc(sk_count * 512)) == NULL) {
  758.     err_sys_report(errfd, "ERROR: malloc failed");
  759.     return;
  760.   }
  761.   len = sprintf(buf, "\n\nProcess #%d (pid %d):\n", my_index, (int)my_pid);
  762.   for (i = 0; i < sk_count; i++) {
  763.     len += sprintf(buf + len, "\nListening Socket #%d:\n"
  764.                    "-------------------------\n"
  765.                    "Address                    %s:%u\n"
  766.                    "Thread limits (min/max)    %d/%d\n"
  767.                    "Waiting threads            %d\n"
  768.                    "Busy threads               %d\n"
  769.                    "Requests served            %d\n",
  770.                    i, srv_socket[i].addr, srv_socket[i].port,
  771.                    max_wait_threads, max_threads,
  772.                    WAIT_THREADS(i), BUSY_THREADS(i), RQST_COUNT(i));
  773.   }
  774.   write(STDERR_FILENO, buf, len);
  775.   free(buf);
  776. }
  777. /******************************************************************
  778. * Stubs
  779. */
  780. /*
  781. * Session handling function stub. Just dumps small HTML page.
  782. */
  783. void handle_session(long srv_socket_index, st_netfd_t cli_nfd)
  784. {
  785.   static char resp[] = "HTTP/1.0 200 OK\r\nContent-type: text/html\r\n"
  786.                        "Connection: close\r\n\r\n<H2>It worked!</H2>\n";
  787.   char buf[512];
  788.   int n = sizeof(resp) - 1;
  789.   struct in_addr *from = st_netfd_getspecific(cli_nfd);
  790.   if (st_read(cli_nfd, buf, sizeof(buf), SEC2USEC(REQUEST_TIMEOUT)) < 0) {
  791.     err_sys_report(errfd, "WARN: can't read request from %s: st_read",
  792.                    inet_ntoa(*from));
  793.     return;
  794.   }
  795.   if (st_write(cli_nfd, resp, n, ST_UTIME_NO_TIMEOUT) != n) {
  796.     err_sys_report(errfd, "WARN: can't write response to %s: st_write",
  797.                    inet_ntoa(*from));
  798.     return;
  799.   }
  800.   RQST_COUNT(srv_socket_index)++;
  801. }
  802. /*
  803. * Configuration loading function stub.
  804. */
  805. void load_configs(void)
  806. {
  807.   err_report(errfd, "INFO: process %d (pid %d): configuration loaded",
  808.              my_index, my_pid);
  809. }
  810. /*
  811. * Buffered access logging methods.
  812. * Note that stdio functions (fopen(3), fprintf(3), fflush(3), etc.) cannot
  813. * be used if multiple VPs are created since these functions can flush buffer
  814. * at any point and thus write only partial log record to disk.
  815. * Also, it is completely safe for all threads of the same VP to write to
  816. * the same log buffer without any mutex protection (one buffer per VP, of
  817. * course).
  818. */
  819. void logbuf_open(void)
  820. {
  821. }
  822. void logbuf_flush(void)
  823. {
  824. }
  825. void logbuf_close(void)
  826. {
  827. }
  828. /******************************************************************
  829. * Small utility functions
  830. */
  831. static void Signal(int sig, void (*handler)(int))
  832. {
  833.   struct sigaction sa;
  834.   sa.sa_handler = handler;
  835.   sigemptyset(&sa.sa_mask);
  836.   sa.sa_flags = 0;
  837.   sigaction(sig, &sa, NULL);
  838. }
  839. static int cpu_count(void)
  840. {
  841.   int n;
  842. #if defined (_SC_NPROCESSORS_ONLN)
  843.   n = (int) sysconf(_SC_NPROCESSORS_ONLN);
  844. #elif defined (_SC_NPROC_ONLN)
  845.   n = (int) sysconf(_SC_NPROC_ONLN);
  846. #elif defined (HPUX)
  847. #include <sys/mpctl.h>
  848.   n = mpctl(MPC_GETNUMSPUS, 0, 0);
  849. #else
  850.   n = -1;
  851.   errno = ENOSYS;
  852. #endif
  853.   return n;
  854. }
  855. /******************************************************************/
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

涛声依旧在

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表