Rust 编程小项目:WebServer 05

视频地址

头条地址:https://www.ixigua.com/i680718929763709798...
B站地址:https://www.bilibili.com/video/BV177411m78...

github地址

https://github.com/anonymousGiga/web-serve...

单线程web server存在的问题

请求只能串行处理,也就是说当第一个连结处理完之前不会处理第二个连结。考虑如下例子:

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::fs;
use std::{thread, time};

fn handle_client(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();
    let get = b"GET / HTTP/1.1\r\n";
    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "main.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();
    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();

    let ten_millis = time::Duration::from_millis(10000); 
    thread::sleep(ten_millis);                //睡眠一段时间,模拟处理时间很长
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")? ;

    for stream in listener.incoming() {
        handle_client(stream?);
    }
    Ok(())
}

在浏览器中打开两个窗口,分别输入127.0.0.1:8080,会发现在第一个处理完之前,第二个不会响应。

使用多线程来解决问题

解决方式

修改main函数代码:

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

    for stream in listener.incoming() {
        // handle_client(stream?);
        let stream = stream.unwrap();
        let handle = thread::spawn(|| {
            handle_client(stream);
        });
        thread_vec.push(handle);
    }

    for handle in thread_vec {
        handle.join().unwrap();
    }

    Ok(())
}

从浏览器打开两个标签,进行测试,可以发现第一个没有处理完之前,第二个请求已经开始处理。

存在问题

当存在海量请求时,系统也会跟着创建海量的线程,最终造成系统崩溃。

使用线程池来解决问题

线程池

知识点

多线程、管道。

从主线程将任务发送到管道,工作线程等待在管道的接收端,当收到任务时,进行处理。

实现

初步设计

  • 定义ThreadPool结构

    use std::thread;
    pub struct ThreadPool {
        thread: Vec<thread::JoinHandle<()>>,
    }
  • 定义ThreadPool的方法

    impl ThreadPool {
        pub fn new(size: usize) -> ThreadPool {
            //--snip--
        }
    
        pub fn execute()
        //pub fn execute<F>(&self, f: F)
        //    where
        //        F: FnOnce() + Send + 'static
        {
            //--snip--
        }
    }
  • 下面我们考虑new函数,可能的实现是这样

    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let mut threads = Vec::with_capacity(size);
        for _ in 0..size {
            //创建线程:
            //问题来了,创建线程的时候需要传入闭包,也就是具体做的动作,
            //可是这个时候我们还没有具体的任务,怎么办?
        }
    
        ThreadPool {
            threads
        }
    }
  • execute函数

    //设计execute的函数,可以参考thread::spawn
    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
    
    }

初步设计的问题总结:

主要是在创建线程池的new函数中,需要传入具体的任务,可是此时还没有具体的任务,如何解决?

解决线程创建的问题

  • 重新定义ThreadPool结构体

    pub struct ThreadPool {
        workers: Vec<Worker>,
    }
  • ThreadPool的new方法

    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
    
        let mut workers = Vec::with_capacity(size);
    
        for id in 0..size {
            workers.push(Worker::new(id));
        }
    
        ThreadPool {
            workers
        }
    }
  • 在worker中创建线程

    struct Worker {
        id: usize,
        thread: thread::JoinHandle<()>,
    }
    
    impl Worker {
        fn new(id: usize) -> Worker {
            let thread = thread::spawn(|| {});
    
            Worker {
                id,
                thread,
            }
        }
    }

发送任务

  • 进一步将ThreadPool结构设计为

    use std::sync::mpsc;
    
    pub struct ThreadPool {
        workers: Vec<Worker>,
        sender: mpsc::Sender<Job>,
    }
    
    struct Job;
  • 完善new方法

    impl ThreadPool {
        // --snip--
        pub fn new(size: usize) -> ThreadPool {
            assert!(size > 0);
    
            let (sender, receiver) = mpsc::channel();//add
            let mut workers = Vec::with_capacity(size);
    
            for id in 0..size {
                //workers.push(Worker::new(id));
                workers.push(Worker::new(id, receiver));
            }
    
            ThreadPool {
                workers,
                sender,//add
            }
        }
        // --snip--
    }
    
    //--snip--
    
    impl Worker {
        fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
            let thread = thread::spawn(|| {
                receiver;
            });
    
            Worker {
                id,
                thread,
            }
        }
    }

    此段代码错误,因为receiver要在线程间传递,但是是非线程安全的。因此应该使用Arc<Mutex>。重新撰写new方法如下:

    use std::sync::Arc;
    use std::sync::Mutex;
    // --snip--
    
    impl ThreadPool {
        // --snip--
        pub fn new(size: usize) -> ThreadPool {
            assert!(size > 0);
    
            let (sender, receiver) = mpsc::channel();
    
            let receiver = Arc::new(Mutex::new(receiver));//add
    
            let mut workers = Vec::with_capacity(size);
    
            for id in 0..size {
                workers.push(Worker::new(id, Arc::clone(&receiver)));
            }
    
            ThreadPool {
                workers,
                sender,
            }
        }
    
        // --snip--
    }
    
    impl Worker {
        fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
            let thread = thread::spawn(move || {
                loop {
                    let job = receiver.lock().unwrap().recv().unwrap();
    
                    println!("Worker {} got a job; executing.", id);
    
                    job();
                }
            });
    
            Worker {
                id,
                thread,
            }
        }
    }
  • 实现execute方法

    type Job = Box<dyn FnOnce() + Send + 'static>;//修改Job为trait对象的类别名称
    
    impl ThreadPool {
        // --snip--
    
        pub fn execute<F>(&self, f: F)
            where
                F: FnOnce() + Send + 'static
        {
            let job = Box::new(f);
    
            self.sender.send(job).unwrap();
        }
    }

完整的代码

src/main.rs

use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::{thread, time};
use mylib::ThreadPool;

fn handle_client(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();
    let get = b"GET / HTTP/1.1\r\n";
    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "main.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();
    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();

    let ten_millis = time::Duration::from_millis(10000);
    thread::sleep(ten_millis);
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    // let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

    let pool = ThreadPool::new(4);


    for stream in listener.incoming() {
        // // handle_client(stream?);
        let stream = stream.unwrap();
        // let handle = thread::spawn(|| {
        //     handle_client(stream);
        // });
        // thread_vec.push(handle);

        pool.execute(|| {
            handle_client(stream);
        });
    }

    // for handle in thread_vec {
    //     handle.join().unwrap();
    // }

    Ok(())
}

src/mylib/lib.rs

use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    // fn new(id: usize) -> Worker {
    //     let thread = thread::spawn(|| {});

    //     Worker {
    //         id,
    //         thread,
    //     }
    // }

    // fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
    //     let thread = thread::spawn(|| {
    //         receiver;
    //     });

    //     Worker {
    //         id,
    //         thread,
    //     }
    // }

    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// struct Job;
type Job = Box<dyn FnOnce() + Send + 'static>;//修改Job为trait对象的类别名称

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        // let mut threads = Vec::with_capacity(size);
        // for _ in 0..size {
        //     //创建线程:
        //     //问题来了,创建线程的时候需要传入闭包,也就是具体做的动作,
        //     //可是这个时候我们还没有具体的任务,怎么办?
        // }

        // ThreadPool {
        //     threads
        // }

        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            //workers.push(Worker::new(id));
            //workers.push(Worker::new(id, receiver));
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

main的Cargo.toml

[dependencies]
mylib = {path = "./mylib"}

存在的问题

线程池中的线程怎么结束?

想知道如何解决这个问题,请关注令狐一冲,下回为您分解。

本作品采用《CC 协议》,转载必须注明作者和本文链接
令狐一冲
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
文章
255
粉丝
119
喜欢
308
收藏
128
排名:335
访问:2.8 万
私信
所有博文
社区赞助商