农民 发表于 2024-11-17 20:19:06

rust构建web服务器

单线程服务器

src/main.rs
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
      let stream = stream.unwrap();

      handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // let mut buffer = ;

    // stream.read(&mut buffer).unwrap();

    // // String::from_utf8_lossy 将数组切片转成字符串
    // println!("Request: {}", String::from_utf8_lossy(&buffer[..]));

    let mut buffer = ;
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n"; //获取字符串的字节表示

    if buffer.starts_with(get) {
      let contents = fs::read_to_string("hello.html").unwrap();

      let response = format!(
            "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
            contents.len(),
            contents
      );

      stream.write(response.as_bytes()).unwrap();
      stream.flush().unwrap();
    } else {
      // 其他请求
      let status_line = "HTTP/1.1 404 NOT FOUND";
      let contents = fs::read_to_string("404.html").unwrap();

      let response = format!(
            "{}\r\nContent-Length: {}\r\n\r\n{}",
            status_line,
            contents.len(),
            contents
      );

      stream.write(response.as_bytes()).unwrap();
      stream.flush().unwrap();
    }
}

// HTTP一个请求有如下格式:CRLF 序列也可以写成 \r\n
// Method Request-URI HTTP-Version CRLF
// headers CRLF
// message-body

// 响应:HTTP/1.1 200 OK\r\n\r\n

hello.html
<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>

<body>
    <h1>Hello!</h1>
    <p>Hi from Rust</p>
</body>

</html>
404.html
<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>

<body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
</body>

</html>
重构
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
      let stream = stream.unwrap();

      handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;
    stream.read(&mut buffer).unwrap();
    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
      ("HTTP/1.1 200 OK", "hello.html")
    } else {
      ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
      "{}HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
      status_line,
      contents.len(),
      contents
    );

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

// HTTP一个请求有如下格式:CRLF 序列也可以写成 \r\n
// Method Request-URI HTTP-Version CRLF
// headers CRLF
// message-body

// 响应:HTTP/1.1 200 OK\r\n\r\n

多线程服务器

https://i-blog.csdnimg.cn/direct/554f3b96235f41599db2682dcf254039.png
lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
// 持有发送到channal的闭包
// Job是某个特征对象的类型别名
type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// 创建线程池。
    ///
    /// 线程池中线程的数量。
    ///
    /// # Panics
    ///
    /// `new` 函数在 size 为 0 时会 panic。
    pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);

      let (sender, receiver) = mpsc::channel();
      //Arc 多线程安全的智能指针
      let receiver = Arc::new(Mutex::new(receiver));

      let mut workers = Vec::with_capacity(size);

      for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
      }

      ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
    {
      let job = Box::new(f);

      self.sender.send(job).unwrap();
    }
}
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
      let thread = thread::spawn(move || loop {
            // receiver.lock() 获取一个互斥锁,recv()从channel获取一个job
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {} got a job; executing.", id);

            job();
      });

      Worker { id, thread }
    }
}

main.rs
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

use std::thread;
use std::time::Duration;

use server::ThreadPool;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
      let stream = stream.unwrap();

      pool.execute(|| {
            handle_connection(stream);
      });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = ;
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
      ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
      thread::sleep(Duration::from_secs(5));
      ("HTTP/1.1 200 OK", "hello.html")
    } else {
      ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
      "{}HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
      status_line,
      contents.len(),
      contents
    );

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

优雅停机与清算

lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}
// 持有发送到channal的闭包
// Job是某个特征对象的类型别名
type Job = Box<dyn FnOnce() + Send + 'static>;

// 向线程发送信号使其停止接收任务
enum Message {
    NewJob(Job),
    Terminate,
}

impl ThreadPool {
    /// 创建线程池。
    ///
    /// 线程池中线程的数量。
    ///
    /// # Panics
    ///
    /// `new` 函数在 size 为 0 时会 panic。
    pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);

      let (sender, receiver) = mpsc::channel();
      //Arc 多线程安全的智能指针
      let receiver = Arc::new(Mutex::new(receiver));

      let mut workers = Vec::with_capacity(size);

      for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
      }

      ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
    {
      let job = Box::new(f);

      self.sender.send(Message::NewJob(job)).unwrap();
    }
}

// 为 ThreadPool 实现 Drop Trait
impl Drop for ThreadPool {
    fn drop(&mut self) {
      println!("Sending terminate message to all workers.");

      for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
      }

      for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
      }
    }
}
// 如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上调用 take 方法将值
//从 Some 成员中移动出来
//而对 None 成员不做处理。换句话说,正在运行的 Worker 的 thread 将是 Some 成员值,
//而当需要清理 worker 时,将 Some 替换为 None,这样 worker 就没有可以运行的线程了
struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
      let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                  println!("Worker {} got a job; executing.", id);

                  job();
                }
                Message::Terminate => {
                  println!("Worker {} was told to terminate.", id);

                  break;
                }
            }
      });

      Worker {
            id,
            thread: Some(thread),
      }
    }
}

src/bin/main.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}
// 持有发送到channal的闭包
// Job是某个特征对象的类型别名
type Job = Box<dyn FnOnce() + Send + 'static>;

// 向线程发送信号使其停止接收任务
enum Message {
    NewJob(Job),
    Terminate,
}

impl ThreadPool {
    /// 创建线程池。
    ///
    /// 线程池中线程的数量。
    ///
    /// # Panics
    ///
    /// `new` 函数在 size 为 0 时会 panic。
    pub fn new(size: usize) -> ThreadPool {
      assert!(size > 0);

      let (sender, receiver) = mpsc::channel();
      //Arc 多线程安全的智能指针
      let receiver = Arc::new(Mutex::new(receiver));

      let mut workers = Vec::with_capacity(size);

      for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
      }

      ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
    {
      let job = Box::new(f);

      self.sender.send(Message::NewJob(job)).unwrap();
    }
}

// 为 ThreadPool 实现 Drop Trait
impl Drop for ThreadPool {
    fn drop(&mut self) {
      println!("Sending terminate message to all workers.");

      for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
      }

      for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
      }
    }
}
// 如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上调用 take 方法将值
//从 Some 成员中移动出来
//而对 None 成员不做处理。换句话说,正在运行的 Worker 的 thread 将是 Some 成员值,
//而当需要清理 worker 时,将 Some 替换为 None,这样 worker 就没有可以运行的线程了
struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
      let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                  println!("Worker {} got a job; executing.", id);

                  job();
                }
                Message::Terminate => {
                  println!("Worker {} was told to terminate.", id);

                  break;
                }
            }
      });

      Worker {
            id,
            thread: Some(thread),
      }
    }
}

参考



[*]第20章~构建单线程服务器
[*]构建单线程 Web 服务器

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: rust构建web服务器