Rust编程语言入门之最后的项目:多线程 Web 服务器

打印 上一主题 下一主题

主题 790|帖子 790|积分 2370

最后的项目:多线程 Web 服务器

构建多线程 Web 服务器


  • 在 socket 上监听 TCP 连接
  • 解析少量的 HTTP 请求
  • 创建一个合适的 HTTP 响应
  • 使用线程池改进服务器的吞吐量
  • 优雅的停机和清理
  • 注意:并不是最佳实践
创建项目
  1. ~/rust
  2. ➜ cargo new hello
  3.      Created binary (application) `hello` package
  4. ~/rust
复制代码
main.rs 文件
  1. use std::net::TcpListener;
  2. fn main() {
  3.     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  4.     for stream in listener.incoming() {
  5.         let stream = stream.unwrap();
  6.         println!("Connection established!");
  7.     }
  8. }
复制代码
修改一:
  1. use std::io::prelude::*;
  2. use std::net::TcpListener;
  3. use std::net::TcpStream;
  4. fn main() {
  5.     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  6.     for stream in listener.incoming() {
  7.         let stream = stream.unwrap();
  8.         handle_connection(stream);
  9.     }
  10. }
  11. fn handle_connection(mut stream: TcpStream) {
  12.     let mut buffer = [0; 512];
  13.     stream.read(&mut buffer).unwrap();
  14.     println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
  15. }
复制代码
修改二:
  1. use std::io::prelude::*;
  2. use std::net::TcpListener;
  3. use std::net::TcpStream;
  4. fn main() {
  5.     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  6.     for stream in listener.incoming() {
  7.         let stream = stream.unwrap();
  8.         handle_connection(stream);
  9.     }
  10. }
  11. fn handle_connection(mut stream: TcpStream) {
  12.     let mut buffer = [0; 512];
  13.     stream.read(&mut buffer).unwrap();
  14.     // 请求
  15.     // Method Request-URI HTTP-Version CRLF
  16.     // headers CRLF
  17.     // message-body
  18.     // 响应
  19.     // HTTP-Version Status-Code Reason-Phrase CRLF
  20.     // headers CRLF
  21.     // message-body
  22.     let response = "HTTP/1.1 200 OK\r\n\r\n";
  23.     stream.write(response.as_bytes()).unwrap();
  24.     stream.flush().unwrap();
  25.     println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
  26. }
复制代码
修改三:
  1. use std::fs;
  2. use std::io::prelude::*;
  3. use std::net::TcpListener;
  4. use std::net::TcpStream;
  5. fn main() {
  6.     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  7.     for stream in listener.incoming() {
  8.         let stream = stream.unwrap();
  9.         handle_connection(stream);
  10.     }
  11. }
  12. fn handle_connection(mut stream: TcpStream) {
  13.     let mut buffer = [0; 512];
  14.     stream.read(&mut buffer).unwrap();
  15.     // 请求
  16.     // Method Request-URI HTTP-Version CRLF
  17.     // headers CRLF
  18.     // message-body
  19.     // 响应
  20.     // HTTP-Version Status-Code Reason-Phrase CRLF
  21.     // headers CRLF
  22.     // message-body
  23.     let contents = fs::read_to_string("hello.html").unwrap();
  24.     let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);
  25.     stream.write(response.as_bytes()).unwrap();
  26.     stream.flush().unwrap();
  27. }
复制代码
修改四:
  1. use std::fs;
  2. use std::io::prelude::*;
  3. use std::net::TcpListener;
  4. use std::net::TcpStream;
  5. fn main() {
  6.     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  7.     for stream in listener.incoming() {
  8.         let stream = stream.unwrap();
  9.         handle_connection(stream);
  10.     }
  11. }
  12. fn handle_connection(mut stream: TcpStream) {
  13.     let mut buffer = [0; 512];
  14.     stream.read(&mut buffer).unwrap();
  15.     let get = b"GET / HTTP/1.1\r\n";
  16.     if buffer.starts_with(get) {
  17.         let contents = fs::read_to_string("hello.html").unwrap();
  18.         let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);
  19.         stream.write(response.as_bytes()).unwrap();
  20.         stream.flush().unwrap();
  21.     } else {
  22.         let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
  23.         let contents = fs::read_to_string("404.html").unwrap();
  24.         let response = format!("{}{}", status_line, contents);
  25.         stream.write(response.as_bytes()).unwrap();
  26.         stream.flush().unwrap();
  27.     }
  28. }
复制代码
修改五:
  1. use std::fs;
  2. use std::io::prelude::*;
  3. use std::net::TcpListener;
  4. use std::net::TcpStream;
  5. fn main() {
  6.     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  7.     for stream in listener.incoming() {
  8.         let stream = stream.unwrap();
  9.         handle_connection(stream);
  10.     }
  11. }
  12. fn handle_connection(mut stream: TcpStream) {
  13.     let mut buffer = [0; 512];
  14.     stream.read(&mut buffer).unwrap();
  15.     let get = b"GET / HTTP/1.1\r\n";
  16.     let (status_line, filename) = if buffer.starts_with(get) {
  17.         ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  18.     } else {
  19.         ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
  20.     };
  21.     let contents = fs::read_to_string(filename).unwrap();
  22.     let response = format!("{}{}", status_line, contents);
  23.     stream.write(response.as_bytes()).unwrap();
  24.     stream.flush().unwrap();
  25. }
复制代码
hello.html 文件
  1. <!DOCTYPE html>
  2. <html lang="en">
  3.     <head>
  4.         <meta charset="utf-8">
  5.         <title>Hello</title>
  6.     </head>
  7.     <body>
  8.         <h1>Hello</h1>
  9.         <p>Hi from Rust</p>
  10.     </body>
  11. </html>
复制代码
404.html 文件
  1. <!DOCTYPE html>
  2. <html lang="en">
  3.     <head>
  4.         <meta charset="utf-8">
  5.         <title>Hello!</title>
  6.     </head>
  7.     <body>
  8.         <h1>Oops!</h1>
  9.         <p>Sorry, I don't know what you're asking for.</p>
  10.     </body>
  11. </html>
复制代码
单线程Web服务器
  1. use std::fs;
  2. use std::thread;
  3. use std::io::prelude::*;
  4. use std::net::TcpListener;
  5. use std::net::TcpStream;
  6. use std::time::Duration;
  7. fn main() {
  8.     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  9.     for stream in listener.incoming() {
  10.         let stream = stream.unwrap();
  11.         handle_connection(stream);
  12.     }
  13. }
  14. fn handle_connection(mut stream: TcpStream) {
  15.     let mut buffer = [0; 512];
  16.     stream.read(&mut buffer).unwrap();
  17.     let get = b"GET / HTTP/1.1\r\n";
  18.     let sleep = b"GET /sleep HTTP/1.1\r\n";
  19.     let (status_line, filename) = if buffer.starts_with(get) {
  20.         ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  21.     } else if buffer.starts_with(sleep) {
  22.         thread::sleep(Duration::from_secs(5));
  23.         ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  24.     } else {
  25.         ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
  26.     };
  27.     let contents = fs::read_to_string(filename).unwrap();
  28.     let response = format!("{}{}", status_line, contents);
  29.     stream.write(response.as_bytes()).unwrap();
  30.     stream.flush().unwrap();
  31. }
复制代码
开启线程
  1. use std::fs;
  2. use std::thread;
  3. use std::io::prelude::*;
  4. use std::net::TcpListener;
  5. use std::net::TcpStream;
  6. use std::time::Duration;
  7. fn main() {
  8.     let listener = TcpListener::bind("localhost:7878").unwrap();
  9.     for stream in listener.incoming() {
  10.         let stream = stream.unwrap();
  11.         thread::spawn(|| {
  12.             handle_connection(stream);
  13.         });
  14.     }
  15. }
  16. fn handle_connection(mut stream: TcpStream) {
  17.     let mut buffer = [0; 512];
  18.     stream.read(&mut buffer).unwrap();
  19.     let get = b"GET / HTTP/1.1\r\n";
  20.     let sleep = b"GET /sleep HTTP/1.1\r\n";
  21.     let (status_line, filename) = if buffer.starts_with(get) {
  22.         ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  23.     } else if buffer.starts_with(sleep) {
  24.         thread::sleep(Duration::from_secs(5));
  25.         ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  26.     } else {
  27.         ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
  28.     };
  29.     let contents = fs::read_to_string(filename).unwrap();
  30.     let response = format!("{}{}", status_line, contents);
  31.     stream.write(response.as_bytes()).unwrap();
  32.     stream.flush().unwrap();
  33. }
复制代码
lib.rs 文件
  1. use std::thread;
  2. pub struct ThreadPool {
  3.     threads: Vec<thread::JoinHandle<()>>,
  4. }
  5. impl ThreadPool {
  6.     /// Creates a new ThreadPool.
  7.     ///
  8.     /// The size is the number of threads in the pool.
  9.     ///
  10.     /// # Panics
  11.     ///
  12.     /// The `new` function will panic if the size is zero.
  13.     pub fn new(size: usize) -> ThreadPool {
  14.         assert!(size > 0);
  15.         let mut threads = Vec::with_capacity(size);
  16.         for _ in 0..size {
  17.             // create some threads and store them in the vector
  18.         }
  19.         ThreadPool { threads }
  20.     }
  21.     pub fn execute<F>(&self, f: F)
  22.     where
  23.         F: FnOnce() + Send + 'static,
  24.     {
  25.     }
  26. }
复制代码
lib.rs 修改一
  1. use std::thread;
  2. pub struct ThreadPool {
  3.     workers: Vec<Worker>,
  4. }
  5. impl ThreadPool {
  6.     /// Creates a new ThreadPool.
  7.     ///
  8.     /// The size is the number of threads in the pool.
  9.     ///
  10.     /// # Panics
  11.     ///
  12.     /// The `new` function will panic if the size is zero.
  13.     pub fn new(size: usize) -> ThreadPool {
  14.         assert!(size > 0);
  15.         let mut workers = Vec::with_capacity(size);
  16.         for id in 0..size {
  17.             workers.push(Worker::new(id));
  18.         }
  19.         ThreadPool { workers }
  20.     }
  21.     pub fn execute<F>(&self, f: F)
  22.     where
  23.         F: FnOnce() + Send + 'static,
  24.     {
  25.     }
  26. }
  27. struct Worker {
  28.     id: usize,
  29.     thread: thread::JoinHandle<()>,
  30. }
  31. impl Worker {
  32.     fn new(id: usize) -> Worker {
  33.         let thread = thread::spawn(|| {});
  34.         Worker { id, thread }
  35.     }
  36. }
复制代码
lib.rs 修改二
  1. use std::thread;
  2. use std::sync::mpsc;
  3. pub struct ThreadPool {
  4.     workers: Vec<Worker>,
  5.     sender: mpsc::Sender<Job>,
  6. }
  7. struct Job;
  8. impl ThreadPool {
  9.     /// Creates a new ThreadPool.
  10.     ///
  11.     /// The size is the number of threads in the pool.
  12.     ///
  13.     /// # Panics
  14.     ///
  15.     /// The `new` function will panic if the size is zero.
  16.     pub fn new(size: usize) -> ThreadPool {
  17.         assert!(size > 0);
  18.         let (sender, receiver) = mpsc::channel();
  19.         let mut workers = Vec::with_capacity(size);
  20.         for id in 0..size {
  21.             workers.push(Worker::new(id, receiver));
  22.         }
  23.         ThreadPool { workers, sender }
  24.     }
  25.     pub fn execute<F>(&self, f: F)
  26.     where
  27.         F: FnOnce() + Send + 'static,
  28.     {
  29.     }
  30. }
  31. struct Worker {
  32.     id: usize,
  33.     thread: thread::JoinHandle<()>,
  34. }
  35. impl Worker {
  36.     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
  37.         let thread = thread::spawn(|| {
  38.             receiver;
  39.         });
  40.         Worker { id, thread }
  41.     }
  42. }
复制代码
lib.rs 修改三
  1. use std::thread;
  2. use std::sync::mpsc;
  3. use std::sync::Arc;
  4. use std::sync::Mutex;
  5. pub struct ThreadPool {
  6.     workers: Vec<Worker>,
  7.     sender: mpsc::Sender<Job>,
  8. }
  9. struct Job;
  10. impl ThreadPool {
  11.     /// Creates a new ThreadPool.
  12.     ///
  13.     /// The size is the number of threads in the pool.
  14.     ///
  15.     /// # Panics
  16.     ///
  17.     /// The `new` function will panic if the size is zero.
  18.     pub fn new(size: usize) -> ThreadPool {
  19.         assert!(size > 0);
  20.         let (sender, receiver) = mpsc::channel();
  21.         let receiver = Arc::new(Mutex::new(receiver));
  22.         let mut workers = Vec::with_capacity(size);
  23.         for id in 0..size {
  24.             workers.push(Worker::new(id, Arc::clone(&receiver)));
  25.         }
  26.         ThreadPool { workers, sender }
  27.     }
  28.     pub fn execute<F>(&self, f: F)
  29.     where
  30.         F: FnOnce() + Send + 'static,
  31.     {
  32.     }
  33. }
  34. struct Worker {
  35.     id: usize,
  36.     thread: thread::JoinHandle<()>,
  37. }
  38. impl Worker {
  39.     fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
  40.         let thread = thread::spawn(|| {
  41.             receiver;
  42.         });
  43.         Worker { id, thread }
  44.     }
  45. }
复制代码
lib.rs 修改四
  1. use std::thread;
  2. use std::sync::mpsc;
  3. use std::sync::Arc;
  4. use std::sync::Mutex;
  5. pub struct ThreadPool {
  6.     workers: Vec<Worker>,
  7.     sender: mpsc::Sender<Job>,
  8. }
  9. // struct Job;
  10. type Job = Box<FnOnce() + Send + 'static>;
  11. impl ThreadPool {
  12.     /// Creates a new ThreadPool.
  13.     ///
  14.     /// The size is the number of threads in the pool.
  15.     ///
  16.     /// # Panics
  17.     ///
  18.     /// The `new` function will panic if the size is zero.
  19.     pub fn new(size: usize) -> ThreadPool {
  20.         assert!(size > 0);
  21.         let (sender, receiver) = mpsc::channel();
  22.         let receiver = Arc::new(Mutex::new(receiver));
  23.         let mut workers = Vec::with_capacity(size);
  24.         for id in 0..size {
  25.             workers.push(Worker::new(id, Arc::clone(&receiver)));
  26.         }
  27.         ThreadPool { workers, sender }
  28.     }
  29.     pub fn execute<F>(&self, f: F)
  30.     where
  31.         F: FnOnce() + Send + 'static,
  32.     {
  33.         let job = Box::new(f);
  34.         self.sender.send(job).unwrap();
  35.     }
  36. }
  37. struct Worker {
  38.     id: usize,
  39.     thread: thread::JoinHandle<()>,
  40. }
  41. impl Worker {
  42.     fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
  43.         let thread = thread::spawn(move || loop {
  44.             let job = receiver.lock().unwrap().recv().unwrap();
  45.             println!("Worker {} got a job; executing.", id);
  46.             (*job)();
  47.         });
  48.         Worker { id, thread }
  49.     }
  50. }
复制代码
lib.rs 修改五
[code]use std::thread;use std::sync::mpsc;use std::sync::Arc;use std::sync::Mutex;pub struct ThreadPool {    workers: Vec,    sender: mpsc::Sender,}// struct Job;// type Job = Box;impl ThreadPool {    /// Creates a new ThreadPool.    ///    /// The size is the number of threads in the pool.    ///    /// # Panics    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {        assert!(size > 0);        let (sender, receiver) = mpsc::channel();        let receiver = Arc::new(Mutex::new(receiver));        let mut workers = Vec::with_capacity(size);        for id in 0..size {            workers.push(Worker::new(id, Arc::clone(&receiver)));        }        ThreadPool { workers, sender }    }    pub fn execute(&self, f: F)    where        F: FnOnce() + Send + 'static,    {        let job = Box::new(f);        self.sender.send(job).unwrap();    }}struct Worker {    id: usize,    thread: thread::JoinHandle,}trait FnBox {    fn call_box(self: Box);}impl FnBox for F {    fn call_box(self: Box) {        (*self)()    }}type Job = Box;impl ThreadPool {    /// Creates a new ThreadPool.    ///    /// The size is the number of threads in the pool.    ///    /// # Panics    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {        assert!(size > 0);        let (sender, receiver) = mpsc::channel();        let receiver = Arc::new(Mutex::new(receiver));        let mut workers = Vec::with_capacity(size);        for id in 0..size {            workers.push(Worker::new(id, Arc::clone(&receiver)));        }        ThreadPool { workers, sender }    }    pub fn execute(&self, f: F)    where        F: FnOnce() + Send + 'static,    {        let job = Box::new(f);        self.sender.send(job).unwrap();    }}impl Drop for ThreadPool {    fn drop(&mut self) {        for worker in &mut self.workers {            println!("Shutting down worker {}", worker.id);            worker.thread.join().unwrap()        }    }}struct Worker {    id: usize,    thread: thread::JoinHandle,}trait FnBox {    fn call_box(self: Box);}impl FnBox for F {    fn call_box(self: Box) {        (*self)()    }}type Job = Box;impl ThreadPool {    /// Creates a new ThreadPool.    ///    /// The size is the number of threads in the pool.    ///    /// # Panics    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {        assert!(size > 0);        let (sender, receiver) = mpsc::channel();        let receiver = Arc::new(Mutex::new(receiver));        let mut workers = Vec::with_capacity(size);        for id in 0..size {            workers.push(Worker::new(id, Arc::clone(&receiver)));        }        ThreadPool { workers, sender }    }    pub fn execute(&self, f: F)    where        F: FnOnce() + Send + 'static,    {        let job = Box::new(f);        self.sender.send(job).unwrap();    }}impl Drop for ThreadPool {    fn drop(&mut self) {        for worker in &mut self.workers {            println!("Shutting down worker {}", worker.id);            if let Some(thread) = worker.thread.take() {                thread.join().unwrap();            }        }    }}struct Worker {    id: usize,    thread: Option,}trait FnBox {    fn call_box(self: Box);}impl FnBox for F {    fn call_box(self: Box) {        (*self)()    }}type Job = Box;impl ThreadPool {    /// Creates a new ThreadPool.    ///    /// The size is the number of threads in the pool.    ///    /// # Panics    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {        assert!(size > 0);        let (sender, receiver) = mpsc::channel();        let receiver = Arc::new(Mutex::new(receiver));        let mut workers = Vec::with_capacity(size);        for id in 0..size {            workers.push(Worker::new(id, Arc::clone(&receiver)));        }        ThreadPool { workers, sender }    }    pub fn execute(&self, f: F)    where        F: FnOnce() + Send + 'static,    {        let job = Box::new(f);        self.sender.send(Message::NewJob(job)).unwrap();    }}impl Drop for ThreadPool {    fn drop(&mut self) {        println!("Sending terminate message to all workers.");        for _ in &mut self.workers {            self.sender.send(Message::Terminate).unwrap();        }        println!("Shutting down all workers.");                for worker in &mut self.workers {            println!("Shutting down worker {}", worker.id);            if let Some(thread) = worker.thread.take() {                thread.join().unwrap();            }        }    }}struct Worker {    id: usize,    thread: Option,}trait FnBox {    fn call_box(self: Box);}impl FnBox for F {    fn call_box(self: Box) {        (*self)()    }}type Job = Box
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

科技颠覆者

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表