Rust 编程小项目:WebServer 05
视频地址
头条地址:https://www.ixigua.com/i680718929763709798...
B站地址:https://www.bilibili.com/video/BV177411m78...
github地址
https://github.com/anonymousGiga/web-serve...
单线程web server存在的问题
请求只能串行处理,也就是说当第一个连结处理完之前不会处理第二个连结。考虑如下例子:
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::fs;
use std::{thread, time};
fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "main.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
let ten_millis = time::Duration::from_millis(10000);
thread::sleep(ten_millis); //睡眠一段时间,模拟处理时间很长
}
fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080")? ;
for stream in listener.incoming() {
handle_client(stream?);
}
Ok(())
}
在浏览器中打开两个窗口,分别输入127.0.0.1:8080
,会发现在第一个处理完之前,第二个不会响应。
使用多线程来解决问题
解决方式
修改main函数代码:
fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080")?;
let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();
for stream in listener.incoming() {
// handle_client(stream?);
let stream = stream.unwrap();
let handle = thread::spawn(|| {
handle_client(stream);
});
thread_vec.push(handle);
}
for handle in thread_vec {
handle.join().unwrap();
}
Ok(())
}
从浏览器打开两个标签,进行测试,可以发现第一个没有处理完之前,第二个请求已经开始处理。
存在问题
当存在海量请求时,系统也会跟着创建海量的线程,最终造成系统崩溃。
使用线程池来解决问题
线程池
知识点
多线程、管道。
从主线程将任务发送到管道,工作线程等待在管道的接收端,当收到任务时,进行处理。
实现
初步设计
定义ThreadPool结构
use std::thread; pub struct ThreadPool { thread: Vec<thread::JoinHandle<()>>, }
定义ThreadPool的方法
impl ThreadPool { pub fn new(size: usize) -> ThreadPool { //--snip-- } pub fn execute() //pub fn execute<F>(&self, f: F) // where // F: FnOnce() + Send + 'static { //--snip-- } }
下面我们考虑new函数,可能的实现是这样
pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { //创建线程: //问题来了,创建线程的时候需要传入闭包,也就是具体做的动作, //可是这个时候我们还没有具体的任务,怎么办? } ThreadPool { threads } }
execute函数
//设计execute的函数,可以参考thread::spawn pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { }
初步设计的问题总结:
主要是在创建线程池的new函数中,需要传入具体的任务,可是此时还没有具体的任务,如何解决?
解决线程创建的问题
重新定义ThreadPool结构体
pub struct ThreadPool { workers: Vec<Worker>, }
ThreadPool的new方法
pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } }
在worker中创建线程
struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread, } } }
发送任务
进一步将ThreadPool结构设计为
use std::sync::mpsc; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job;
完善new方法
impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel();//add let mut workers = Vec::with_capacity(size); for id in 0..size { //workers.push(Worker::new(id)); workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender,//add } } // --snip-- } //--snip-- impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread, } } }
此段代码错误,因为receiver要在线程间传递,但是是非线程安全的。因此应该使用Arc<Mutex>。重新撰写new方法如下:
use std::sync::Arc; use std::sync::Mutex; // --snip-- impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver));//add let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender, } } // --snip-- } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); job(); } }); Worker { id, thread, } } }
实现execute方法
type Job = Box<dyn FnOnce() + Send + 'static>;//修改Job为trait对象的类别名称 impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(job).unwrap(); } }
完整的代码
src/main.rs
use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::{thread, time};
use mylib::ThreadPool;
fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "main.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
let ten_millis = time::Duration::from_millis(10000);
thread::sleep(ten_millis);
}
fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080")?;
// let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
// // handle_client(stream?);
let stream = stream.unwrap();
// let handle = thread::spawn(|| {
// handle_client(stream);
// });
// thread_vec.push(handle);
pool.execute(|| {
handle_client(stream);
});
}
// for handle in thread_vec {
// handle.join().unwrap();
// }
Ok(())
}
src/mylib/lib.rs
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
// fn new(id: usize) -> Worker {
// let thread = thread::spawn(|| {});
// Worker {
// id,
// thread,
// }
// }
// fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
// let thread = thread::spawn(|| {
// receiver;
// });
// Worker {
// id,
// thread,
// }
// }
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
}
});
Worker {
id,
thread,
}
}
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// struct Job;
type Job = Box<dyn FnOnce() + Send + 'static>;//修改Job为trait对象的类别名称
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
// let mut threads = Vec::with_capacity(size);
// for _ in 0..size {
// //创建线程:
// //问题来了,创建线程的时候需要传入闭包,也就是具体做的动作,
// //可是这个时候我们还没有具体的任务,怎么办?
// }
// ThreadPool {
// threads
// }
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
//workers.push(Worker::new(id));
//workers.push(Worker::new(id, receiver));
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
main的Cargo.toml
[dependencies]
mylib = {path = "./mylib"}
存在的问题
线程池中的线程怎么结束?
想知道如何解决这个问题,请关注令狐一冲,下回为您分解。
本作品采用《CC 协议》,转载必须注明作者和本文链接