科技颠覆者 发表于 2023-4-25 23:33:25

Rust编程语言入门之最后的项目:多线程 Web 服务器

最后的项目:多线程 Web 服务器

构建多线程 Web 服务器


[*]在 socket 上监听 TCP 连接
[*]解析少量的 HTTP 请求
[*]创建一个合适的 HTTP 响应
[*]使用线程池改进服务器的吞吐量
[*]优雅的停机和清理
[*]注意:并不是最佳实践
创建项目
~/rust
➜ cargo new hello
   Created binary (application) `hello` package

~/rust
➜main.rs 文件
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
      let stream = stream.unwrap();

      println!("Connection established!");
    }
}修改一:
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
      let stream = stream.unwrap();

      handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;

    stream.read(&mut buffer).unwrap();

    println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}修改二:
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
      let stream = stream.unwrap();

      handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;

    stream.read(&mut buffer).unwrap();

    // 请求
    // Method Request-URI HTTP-Version CRLF
    // headers CRLF
    // message-body

    // 响应
    // HTTP-Version Status-Code Reason-Phrase CRLF
    // headers CRLF
    // message-body

    let response = "HTTP/1.1 200 OK\r\n\r\n";

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();

    println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}修改三:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
      let stream = stream.unwrap();

      handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;

    stream.read(&mut buffer).unwrap();

    // 请求
    // Method Request-URI HTTP-Version CRLF
    // headers CRLF
    // message-body

    // 响应
    // HTTP-Version Status-Code Reason-Phrase CRLF
    // headers CRLF
    // message-body

    let contents = fs::read_to_string("hello.html").unwrap();
    let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}修改四:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
      let stream = stream.unwrap();

      handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;

    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    if buffer.starts_with(get) {
      let contents = fs::read_to_string("hello.html").unwrap();
      let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

      stream.write(response.as_bytes()).unwrap();
      stream.flush().unwrap();
    } else {
      let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
      let contents = fs::read_to_string("404.html").unwrap();

      let response = format!("{}{}", status_line, contents);

      stream.write(response.as_bytes()).unwrap();
      stream.flush().unwrap();
    }
}修改五:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
      let stream = stream.unwrap();

      handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;

    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
      ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
      ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}hello.html 文件
<!DOCTYPE html>
<html lang="en">
    <head>
      <meta charset="utf-8">
      <title>Hello</title>
    </head>

    <body>
      <h1>Hello</h1>
      <p>Hi from Rust</p>
    </body>
</html>404.html 文件
<!DOCTYPE html>
<html lang="en">
    <head>
      <meta charset="utf-8">
      <title>Hello!</title>
    </head>
    <body>
      <h1>Oops!</h1>
      <p>Sorry, I don't know what you're asking for.</p>
    </body>
</html>单线程Web服务器
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
      let stream = stream.unwrap();

      handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;

    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
      ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
      thread::sleep(Duration::from_secs(5));
      ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
      ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}开启线程
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("localhost:7878").unwrap();
    for stream in listener.incoming() {
      let stream = stream.unwrap();

      thread::spawn(|| {
            handle_connection(stream);
      });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;

    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
      ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
      thread::sleep(Duration::from_secs(5));
      ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
      ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}lib.rs 文件
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    /// Creates a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);
      let mut threads = Vec::with_capacity(size);

      for _ in 0..size {
            // create some threads and store them in the vector
      }
      ThreadPool { threads }
    }

    pub fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
    {
    }
}lib.rs 修改一
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    /// Creates a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);
      let mut workers = Vec::with_capacity(size);

      for id in 0..size {
            workers.push(Worker::new(id));
      }
      ThreadPool { workers }
    }

    pub fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
      let thread = thread::spawn(|| {});

      Worker { id, thread }
    }
}lib.rs 修改二
use std::thread;
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;
impl ThreadPool {
    /// Creates a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);
      let (sender, receiver) = mpsc::channel();
      let mut workers = Vec::with_capacity(size);

      for id in 0..size {
            workers.push(Worker::new(id, receiver));
      }
      ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
      let thread = thread::spawn(|| {
            receiver;
      });

      Worker { id, thread }
    }
}lib.rs 修改三
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;
impl ThreadPool {
    /// Creates a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);
      let (sender, receiver) = mpsc::channel();
      let receiver = Arc::new(Mutex::new(receiver));
      let mut workers = Vec::with_capacity(size);

      for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
      }
      ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
      let thread = thread::spawn(|| {
            receiver;
      });

      Worker { id, thread }
    }
}lib.rs 修改四
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// struct Job;
type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {
    /// Creates a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);
      let (sender, receiver) = mpsc::channel();
      let receiver = Arc::new(Mutex::new(receiver));
      let mut workers = Vec::with_capacity(size);

      for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
      }
      ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
    {
      let job = Box::new(f);
      self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
      let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {} got a job; executing.", id);

            (*job)();
      });

      Worker { id, thread }
    }
}lib.rs 修改五
use std::thread;use std::sync::mpsc;use std::sync::Arc;use std::sync::Mutex;pub struct ThreadPool {    workers: Vec,    sender: mpsc::Sender,}// struct Job;// type Job = Box;impl ThreadPool {    /// Creates a new ThreadPool.    ///    /// The size is the number of threads in the pool.    ///    /// # Panics    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {      assert!(size > 0);      let (sender, receiver) = mpsc::channel();      let receiver = Arc::new(Mutex::new(receiver));      let mut workers = Vec::with_capacity(size);      for id in 0..size {            workers.push(Worker::new(id, Arc::clone(&receiver)));      }      ThreadPool { workers, sender }    }    pub fn execute(&self, f: F)    where      F: FnOnce() + Send + 'static,    {      let job = Box::new(f);      self.sender.send(job).unwrap();    }}struct Worker {    id: usize,    thread: thread::JoinHandle,}trait FnBox {    fn call_box(self: Box);}impl FnBox for F {    fn call_box(self: Box) {      (*self)()    }}type Job = Box;impl ThreadPool {    /// Creates a new ThreadPool.    ///    /// The size is the number of threads in the pool.    ///    /// # Panics    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {      assert!(size > 0);      let (sender, receiver) = mpsc::channel();      let receiver = Arc::new(Mutex::new(receiver));      let mut workers = Vec::with_capacity(size);      for id in 0..size {            workers.push(Worker::new(id, Arc::clone(&receiver)));      }      ThreadPool { workers, sender }    }    pub fn execute(&self, f: F)    where      F: FnOnce() + Send + 'static,    {      let job = Box::new(f);      self.sender.send(job).unwrap();    }}impl Drop for ThreadPool {    fn drop(&mut self) {      for worker in &mut self.workers {            println!("Shutting down worker {}", worker.id);            worker.thread.join().unwrap()      }    }}struct Worker {    id: usize,    thread: thread::JoinHandle,}trait FnBox {    fn call_box(self: Box);}impl FnBox for F {    fn call_box(self: Box) {      (*self)()    }}type Job = Box;impl ThreadPool {    /// Creates a new ThreadPool.    ///    /// The size is the number of threads in the pool.    ///    /// # Panics    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {      assert!(size > 0);      let (sender, receiver) = mpsc::channel();      let receiver = Arc::new(Mutex::new(receiver));      let mut workers = Vec::with_capacity(size);      for id in 0..size {            workers.push(Worker::new(id, Arc::clone(&receiver)));      }      ThreadPool { workers, sender }    }    pub fn execute(&self, f: F)    where      F: FnOnce() + Send + 'static,    {      let job = Box::new(f);      self.sender.send(job).unwrap();    }}impl Drop for ThreadPool {    fn drop(&mut self) {      for worker in &mut self.workers {            println!("Shutting down worker {}", worker.id);            if let Some(thread) = worker.thread.take() {                thread.join().unwrap();            }      }    }}struct Worker {    id: usize,    thread: Option,}trait FnBox {    fn call_box(self: Box);}impl FnBox for F {    fn call_box(self: Box) {      (*self)()    }}type Job = Box;impl ThreadPool {    /// Creates a new ThreadPool.    ///    /// The size is the number of threads in the pool.    ///    /// # Panics    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {      assert!(size > 0);      let (sender, receiver) = mpsc::channel();      let receiver = Arc::new(Mutex::new(receiver));      let mut workers = Vec::with_capacity(size);      for id in 0..size {            workers.push(Worker::new(id, Arc::clone(&receiver)));      }      ThreadPool { workers, sender }    }    pub fn execute(&self, f: F)    where      F: FnOnce() + Send + 'static,    {      let job = Box::new(f);      self.sender.send(Message::NewJob(job)).unwrap();    }}impl Drop for ThreadPool {    fn drop(&mut self) {      println!("Sending terminate message to all workers.");      for _ in &mut self.workers {            self.sender.send(Message::Terminate).unwrap();      }      println!("Shutting down all workers.");                for worker in &mut self.workers {            println!("Shutting down worker {}", worker.id);            if let Some(thread) = worker.thread.take() {                thread.join().unwrap();            }      }    }}struct Worker {    id: usize,    thread: Option,}trait FnBox {    fn call_box(self: Box);}impl FnBox for F {    fn call_box(self: Box) {      (*self)()    }}type Job = Box
页: [1]
查看完整版本: Rust编程语言入门之最后的项目:多线程 Web 服务器