单线程服务器
src/main.rs
- use std::fs;
- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- // let mut buffer = [0; 1024];
- // stream.read(&mut buffer).unwrap();
- // // String::from_utf8_lossy 将数组切片转成字符串
- // println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
- let mut buffer = [0; 1024];
- stream.read(&mut buffer).unwrap();
- let get = b"GET / HTTP/1.1\r\n"; //获取字符串的字节表示
- if buffer.starts_with(get) {
- let contents = fs::read_to_string("hello.html").unwrap();
- let response = format!(
- "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
- contents.len(),
- contents
- );
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- } else {
- // 其他请求
- let status_line = "HTTP/1.1 404 NOT FOUND";
- let contents = fs::read_to_string("404.html").unwrap();
- let response = format!(
- "{}\r\nContent-Length: {}\r\n\r\n{}",
- status_line,
- contents.len(),
- contents
- );
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- }
- }
- // HTTP一个请求有如下格式:CRLF 序列也可以写成 \r\n
- // Method Request-URI HTTP-Version CRLF
- // headers CRLF
- // message-body
- // 响应:HTTP/1.1 200 OK\r\n\r\n
复制代码 hello.html
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="utf-8">
- <title>Hello!</title>
- </head>
- <body>
- <h1>Hello!</h1>
- <p>Hi from Rust</p>
- </body>
- </html>
复制代码 404.html
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="utf-8">
- <title>Hello!</title>
- </head>
- <body>
- <h1>Oops!</h1>
- <p>Sorry, I don't know what you're asking for.</p>
- </body>
- </html>
复制代码 重构
- use std::fs;
- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 1024];
- stream.read(&mut buffer).unwrap();
- let get = b"GET / HTTP/1.1\r\n";
- let (status_line, filename) = if buffer.starts_with(get) {
- ("HTTP/1.1 200 OK", "hello.html")
- } else {
- ("HTTP/1.1 404 NOT FOUND", "404.html")
- };
- let contents = fs::read_to_string(filename).unwrap();
- let response = format!(
- "{}HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
- status_line,
- contents.len(),
- contents
- );
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- }
- // HTTP一个请求有如下格式:CRLF 序列也可以写成 \r\n
- // Method Request-URI HTTP-Version CRLF
- // headers CRLF
- // message-body
- // 响应:HTTP/1.1 200 OK\r\n\r\n
复制代码 多线程服务器

lib.rs
- use std::{
- sync::{mpsc, Arc, Mutex},
- thread,
- };
- pub struct ThreadPool {
- workers: Vec<Worker>,
- sender: mpsc::Sender<Job>,
- }
- // 持有发送到channal的闭包
- // Job是某个特征对象的类型别名
- type Job = Box<dyn FnOnce() + Send + 'static>;
- impl ThreadPool {
- /// 创建线程池。
- ///
- /// 线程池中线程的数量。
- ///
- /// # Panics
- ///
- /// `new` 函数在 size 为 0 时会 panic。
- pub fn new(size: usize) -> ThreadPool {
- assert!(size > 0);
- let (sender, receiver) = mpsc::channel();
- //Arc 多线程安全的智能指针
- let receiver = Arc::new(Mutex::new(receiver));
- let mut workers = Vec::with_capacity(size);
- for id in 0..size {
- workers.push(Worker::new(id, Arc::clone(&receiver)));
- }
- ThreadPool { workers, sender }
- }
- pub fn execute<F>(&self, f: F)
- where
- F: FnOnce() + Send + 'static,
- {
- let job = Box::new(f);
- self.sender.send(job).unwrap();
- }
- }
- struct Worker {
- id: usize,
- thread: thread::JoinHandle<()>,
- }
- impl Worker {
- fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
- let thread = thread::spawn(move || loop {
- // receiver.lock() 获取一个互斥锁,recv()从channel获取一个job
- let job = receiver.lock().unwrap().recv().unwrap();
- println!("Worker {} got a job; executing.", id);
- job();
- });
- Worker { id, thread }
- }
- }
复制代码 main.rs
- use std::fs;
- use std::io::prelude::*;
- use std::net::TcpListener;
- use std::net::TcpStream;
- use std::thread;
- use std::time::Duration;
- use server::ThreadPool;
- fn main() {
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- let pool = ThreadPool::new(4);
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- pool.execute(|| {
- handle_connection(stream);
- });
- }
- }
- fn handle_connection(mut stream: TcpStream) {
- let mut buffer = [0; 1024];
- stream.read(&mut buffer).unwrap();
- let get = b"GET / HTTP/1.1\r\n";
- let sleep = b"GET /sleep HTTP/1.1\r\n";
- let (status_line, filename) = if buffer.starts_with(get) {
- ("HTTP/1.1 200 OK", "hello.html")
- } else if buffer.starts_with(sleep) {
- thread::sleep(Duration::from_secs(5));
- ("HTTP/1.1 200 OK", "hello.html")
- } else {
- ("HTTP/1.1 404 NOT FOUND", "404.html")
- };
- let contents = fs::read_to_string(filename).unwrap();
- let response = format!(
- "{}HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
- status_line,
- contents.len(),
- contents
- );
- stream.write(response.as_bytes()).unwrap();
- stream.flush().unwrap();
- }
复制代码 优雅停机与清算
lib.rs
- use std::{
- sync::{mpsc, Arc, Mutex},
- thread,
- };
- pub struct ThreadPool {
- workers: Vec<Worker>,
- sender: mpsc::Sender<Message>,
- }
- // 持有发送到channal的闭包
- // Job是某个特征对象的类型别名
- type Job = Box<dyn FnOnce() + Send + 'static>;
- // 向线程发送信号使其停止接收任务
- enum Message {
- NewJob(Job),
- Terminate,
- }
- impl ThreadPool {
- /// 创建线程池。
- ///
- /// 线程池中线程的数量。
- ///
- /// # Panics
- ///
- /// `new` 函数在 size 为 0 时会 panic。
- pub fn new(size: usize) -> ThreadPool {
- assert!(size > 0);
- let (sender, receiver) = mpsc::channel();
- //Arc 多线程安全的智能指针
- let receiver = Arc::new(Mutex::new(receiver));
- let mut workers = Vec::with_capacity(size);
- for id in 0..size {
- workers.push(Worker::new(id, Arc::clone(&receiver)));
- }
- ThreadPool { workers, sender }
- }
- pub fn execute<F>(&self, f: F)
- where
- F: FnOnce() + Send + 'static,
- {
- let job = Box::new(f);
- self.sender.send(Message::NewJob(job)).unwrap();
- }
- }
- // 为 ThreadPool 实现 Drop Trait
- impl Drop for ThreadPool {
- fn drop(&mut self) {
- println!("Sending terminate message to all workers.");
- for _ in &mut self.workers {
- self.sender.send(Message::Terminate).unwrap();
- }
- for worker in &mut self.workers {
- println!("Shutting down worker {}", worker.id);
- if let Some(thread) = worker.thread.take() {
- thread.join().unwrap();
- }
- }
- }
- }
- // 如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上调用 take 方法将值
- //从 Some 成员中移动出来
- //而对 None 成员不做处理。换句话说,正在运行的 Worker 的 thread 将是 Some 成员值,
- //而当需要清理 worker 时,将 Some 替换为 None,这样 worker 就没有可以运行的线程了
- struct Worker {
- id: usize,
- thread: Option<thread::JoinHandle<()>>,
- }
- impl Worker {
- fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
- let thread = thread::spawn(move || loop {
- let message = receiver.lock().unwrap().recv().unwrap();
- match message {
- Message::NewJob(job) => {
- println!("Worker {} got a job; executing.", id);
- job();
- }
- Message::Terminate => {
- println!("Worker {} was told to terminate.", id);
- break;
- }
- }
- });
- Worker {
- id,
- thread: Some(thread),
- }
- }
- }
复制代码 src/bin/main.rs
- use std::{
- sync::{mpsc, Arc, Mutex},
- thread,
- };
- pub struct ThreadPool {
- workers: Vec<Worker>,
- sender: mpsc::Sender<Message>,
- }
- // 持有发送到channal的闭包
- // Job是某个特征对象的类型别名
- type Job = Box<dyn FnOnce() + Send + 'static>;
- // 向线程发送信号使其停止接收任务
- enum Message {
- NewJob(Job),
- Terminate,
- }
- impl ThreadPool {
- /// 创建线程池。
- ///
- /// 线程池中线程的数量。
- ///
- /// # Panics
- ///
- /// `new` 函数在 size 为 0 时会 panic。
- pub fn new(size: usize) -> ThreadPool {
- assert!(size > 0);
- let (sender, receiver) = mpsc::channel();
- //Arc 多线程安全的智能指针
- let receiver = Arc::new(Mutex::new(receiver));
- let mut workers = Vec::with_capacity(size);
- for id in 0..size {
- workers.push(Worker::new(id, Arc::clone(&receiver)));
- }
- ThreadPool { workers, sender }
- }
- pub fn execute<F>(&self, f: F)
- where
- F: FnOnce() + Send + 'static,
- {
- let job = Box::new(f);
- self.sender.send(Message::NewJob(job)).unwrap();
- }
- }
- // 为 ThreadPool 实现 Drop Trait
- impl Drop for ThreadPool {
- fn drop(&mut self) {
- println!("Sending terminate message to all workers.");
- for _ in &mut self.workers {
- self.sender.send(Message::Terminate).unwrap();
- }
- for worker in &mut self.workers {
- println!("Shutting down worker {}", worker.id);
- if let Some(thread) = worker.thread.take() {
- thread.join().unwrap();
- }
- }
- }
- }
- // 如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上调用 take 方法将值
- //从 Some 成员中移动出来
- //而对 None 成员不做处理。换句话说,正在运行的 Worker 的 thread 将是 Some 成员值,
- //而当需要清理 worker 时,将 Some 替换为 None,这样 worker 就没有可以运行的线程了
- struct Worker {
- id: usize,
- thread: Option<thread::JoinHandle<()>>,
- }
- impl Worker {
- fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
- let thread = thread::spawn(move || loop {
- let message = receiver.lock().unwrap().recv().unwrap();
- match message {
- Message::NewJob(job) => {
- println!("Worker {} got a job; executing.", id);
- job();
- }
- Message::Terminate => {
- println!("Worker {} was told to terminate.", id);
- break;
- }
- }
- });
- Worker {
- id,
- thread: Some(thread),
- }
- }
- }
复制代码 参考
- 第20章~构建单线程服务器
- 构建单线程 Web 服务器
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |