最后的项目:多线程 Web 服务器
构建多线程 Web 服务器
- 在 socket 上监听 TCP 连接
- 解析少量的 HTTP 请求
- 创建一个合适的 HTTP 响应
- 使用线程池改进服务器的吞吐量
- 优雅的停机和清理
- 注意:并不是最佳实践
创建项目- ~/rust
- ➜ cargo new hello
- Created binary (application) `hello` package
- ~/rust
- ➜
复制代码 main.rs 文件- use std::net::TcpListener;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- println!("Connection established!");
- }
- }
复制代码 修改一:- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 512];
- stream.read(&mut buffer).unwrap();
- println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
- }
复制代码 修改二:- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 512];
- stream.read(&mut buffer).unwrap();
- // 请求
- // Method Request-URI HTTP-Version CRLF
- // headers CRLF
- // message-body
- // 响应
- // HTTP-Version Status-Code Reason-Phrase CRLF
- // headers CRLF
- // message-body
- let response = "HTTP/1.1 200 OK\r\n\r\n";
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
- }
复制代码 修改三:- use std::fs;
- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 512];
- stream.read(&mut buffer).unwrap();
- // 请求
- // Method Request-URI HTTP-Version CRLF
- // headers CRLF
- // message-body
- // 响应
- // HTTP-Version Status-Code Reason-Phrase CRLF
- // headers CRLF
- // message-body
- let contents = fs::read_to_string("hello.html").unwrap();
- let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- }
复制代码 修改四:- use std::fs;
- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 512];
- stream.read(&mut buffer).unwrap();
- let get = b"GET / HTTP/1.1\r\n";
- if buffer.starts_with(get) {
- let contents = fs::read_to_string("hello.html").unwrap();
- let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- } else {
- let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
- let contents = fs::read_to_string("404.html").unwrap();
- let response = format!("{}{}", status_line, contents);
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- }
- }
复制代码 修改五:- use std::fs;
- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 512];
- stream.read(&mut buffer).unwrap();
- let get = b"GET / HTTP/1.1\r\n";
- let (status_line, filename) = if buffer.starts_with(get) {
- ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
- } else {
- ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
- };
- let contents = fs::read_to_string(filename).unwrap();
- let response = format!("{}{}", status_line, contents);
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- }
复制代码 hello.html 文件- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="utf-8">
- <title>Hello</title>
- </head>
- <body>
- <h1>Hello</h1>
- <p>Hi from Rust</p>
- </body>
- </html>
复制代码 404.html 文件- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="utf-8">
- <title>Hello!</title>
- </head>
- <body>
- <h1>Oops!</h1>
- <p>Sorry, I don't know what you're asking for.</p>
- </body>
- </html>
复制代码 单线程Web服务器- use std::fs;
- use std::thread;
- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- use std::time::Duration;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 512];
- stream.read(&mut buffer).unwrap();
- let get = b"GET / HTTP/1.1\r\n";
- let sleep = b"GET /sleep HTTP/1.1\r\n";
- let (status_line, filename) = if buffer.starts_with(get) {
- ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
- } else if buffer.starts_with(sleep) {
- thread::sleep(Duration::from_secs(5));
- ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
- } else {
- ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
- };
- let contents = fs::read_to_string(filename).unwrap();
- let response = format!("{}{}", status_line, contents);
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- }
复制代码 开启线程- use std::fs;
- use std::thread;
- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- use std::time::Duration;
- fn main() {
- let listener = TcpListener::bind("localhost:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- thread::spawn(|| {
- handle_connection(stream);
- });
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 512];
- stream.read(&mut buffer).unwrap();
- let get = b"GET / HTTP/1.1\r\n";
- let sleep = b"GET /sleep HTTP/1.1\r\n";
- let (status_line, filename) = if buffer.starts_with(get) {
- ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
- } else if buffer.starts_with(sleep) {
- thread::sleep(Duration::from_secs(5));
- ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
- } else {
- ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
- };
- let contents = fs::read_to_string(filename).unwrap();
- let response = format!("{}{}", status_line, contents);
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- }
复制代码 lib.rs 文件- use std::thread;
- pub struct ThreadPool {
- threads: Vec<thread::JoinHandle<()>>,
- }
- impl ThreadPool {
- /// Creates a new ThreadPool.
- ///
- /// The size is the number of threads in the pool.
- ///
- /// # Panics
- ///
- /// The `new` function will panic if the size is zero.
- pub fn new(size: usize) -> ThreadPool {
- assert!(size > 0);
- let mut threads = Vec::with_capacity(size);
- for _ in 0..size {
- // create some threads and store them in the vector
- }
- ThreadPool { threads }
- }
- pub fn execute<F>(&self, f: F)
- where
- F: FnOnce() + Send + 'static,
- {
- }
- }
复制代码 lib.rs 修改一- use std::thread;
- pub struct ThreadPool {
- workers: Vec<Worker>,
- }
- impl ThreadPool {
- /// Creates a new ThreadPool.
- ///
- /// The size is the number of threads in the pool.
- ///
- /// # Panics
- ///
- /// The `new` function will panic if the size is zero.
- pub fn new(size: usize) -> ThreadPool {
- assert!(size > 0);
- let mut workers = Vec::with_capacity(size);
- for id in 0..size {
- workers.push(Worker::new(id));
- }
- ThreadPool { workers }
- }
- pub fn execute<F>(&self, f: F)
- where
- F: FnOnce() + Send + 'static,
- {
- }
- }
- struct Worker {
- id: usize,
- thread: thread::JoinHandle<()>,
- }
- impl Worker {
- fn new(id: usize) -> Worker {
- let thread = thread::spawn(|| {});
- Worker { id, thread }
- }
- }
复制代码 lib.rs 修改二- use std::thread;
- use std::sync::mpsc;
- pub struct ThreadPool {
- workers: Vec<Worker>,
- sender: mpsc::Sender<Job>,
- }
- struct Job;
- impl ThreadPool {
- /// Creates a new ThreadPool.
- ///
- /// The size is the number of threads in the pool.
- ///
- /// # Panics
- ///
- /// The `new` function will panic if the size is zero.
- pub fn new(size: usize) -> ThreadPool {
- assert!(size > 0);
- let (sender, receiver) = mpsc::channel();
- let mut workers = Vec::with_capacity(size);
- for id in 0..size {
- workers.push(Worker::new(id, receiver));
- }
- ThreadPool { workers, sender }
- }
- pub fn execute<F>(&self, f: F)
- where
- F: FnOnce() + Send + 'static,
- {
- }
- }
- struct Worker {
- id: usize,
- thread: thread::JoinHandle<()>,
- }
- impl Worker {
- fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
- let thread = thread::spawn(|| {
- receiver;
- });
- Worker { id, thread }
- }
- }
复制代码 lib.rs 修改三- use std::thread;
- use std::sync::mpsc;
- use std::sync::Arc;
- use std::sync::Mutex;
- pub struct ThreadPool {
- workers: Vec<Worker>,
- sender: mpsc::Sender<Job>,
- }
- struct Job;
- impl ThreadPool {
- /// Creates a new ThreadPool.
- ///
- /// The size is the number of threads in the pool.
- ///
- /// # Panics
- ///
- /// The `new` function will panic if the size is zero.
- pub fn new(size: usize) -> ThreadPool {
- assert!(size > 0);
- let (sender, receiver) = mpsc::channel();
- let receiver = Arc::new(Mutex::new(receiver));
- let mut workers = Vec::with_capacity(size);
- for id in 0..size {
- workers.push(Worker::new(id, Arc::clone(&receiver)));
- }
- ThreadPool { workers, sender }
- }
- pub fn execute<F>(&self, f: F)
- where
- F: FnOnce() + Send + 'static,
- {
- }
- }
- struct Worker {
- id: usize,
- thread: thread::JoinHandle<()>,
- }
- impl Worker {
- fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
- let thread = thread::spawn(|| {
- receiver;
- });
- Worker { id, thread }
- }
- }
复制代码 lib.rs 修改四- use std::thread;
- use std::sync::mpsc;
- use std::sync::Arc;
- use std::sync::Mutex;
- pub struct ThreadPool {
- workers: Vec<Worker>,
- sender: mpsc::Sender<Job>,
- }
- // struct Job;
- type Job = Box<FnOnce() + Send + 'static>;
- impl ThreadPool {
- /// Creates a new ThreadPool.
- ///
- /// The size is the number of threads in the pool.
- ///
- /// # Panics
- ///
- /// The `new` function will panic if the size is zero.
- pub fn new(size: usize) -> ThreadPool {
- assert!(size > 0);
- let (sender, receiver) = mpsc::channel();
- let receiver = Arc::new(Mutex::new(receiver));
- let mut workers = Vec::with_capacity(size);
- for id in 0..size {
- workers.push(Worker::new(id, Arc::clone(&receiver)));
- }
- ThreadPool { workers, sender }
- }
- pub fn execute<F>(&self, f: F)
- where
- F: FnOnce() + Send + 'static,
- {
- let job = Box::new(f);
- self.sender.send(job).unwrap();
- }
- }
- struct Worker {
- id: usize,
- thread: thread::JoinHandle<()>,
- }
- impl Worker {
- fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
- let thread = thread::spawn(move || loop {
- let job = receiver.lock().unwrap().recv().unwrap();
- println!("Worker {} got a job; executing.", id);
- (*job)();
- });
- Worker { id, thread }
- }
- }
复制代码 lib.rs 修改五
[code]use std::thread;use std::sync::mpsc;use std::sync::Arc;use std::sync::Mutex;pub struct ThreadPool { workers: Vec, sender: mpsc::Sender,}// struct Job;// type Job = Box;impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); }}struct Worker { id: usize, thread: thread::JoinHandle,}trait FnBox { fn call_box(self: Box);}impl FnBox for F { fn call_box(self: Box) { (*self)() }}type Job = Box;impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); }}impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap() } }}struct Worker { id: usize, thread: thread::JoinHandle,}trait FnBox { fn call_box(self: Box);}impl FnBox for F { fn call_box(self: Box) { (*self)() }}type Job = Box;impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); }}impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } }}struct Worker { id: usize, thread: Option,}trait FnBox { fn call_box(self: Box);}impl FnBox for F { fn call_box(self: Box) { (*self)() }}type Job = Box;impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); }}impl Drop for ThreadPool { fn drop(&mut self) { println!("Sending terminate message to all workers."); for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } println!("Shutting down all workers."); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } }}struct Worker { id: usize, thread: Option,}trait FnBox { fn call_box(self: Box);}impl FnBox for F { fn call_box(self: Box) { (*self)() }}type Job = Box |