《The Rust Programming language》代码练习(part 3 简单web server)

我与Rust的缘分起始于当我在编程论坛上闲逛时,无意间发现了这么一门现代型的系统安全的函数式系统编程语言,但是当时只是大致了解,并无深入学习,所以此次便将它细致性地学习了一遍。
学习内容为书籍《The Rust Programming language》的全部内容(已完成)、《Rust编程之道》的全部内容(未完成)和《The Rustonomicon》的部分内容(未完成)。

一. 内容概述

我将 《Rust 编程语言》 的学习内容分为基础学习(1至9章)与进阶学习(10至19章),这两个部分是对我学习内容的一个大概缩略。而后是一个根据书上最后一章(20章)进行的简单的 web server 程序构建,最后是对比 Rust 社区已有的actix web 框架的一个简单 example。
本文为《The Rust Programming language》最后一章的《 Web 实战项目》练习,此部分学习练习代码已经发在了开源平台 GiteeGitHub 平台上.

查看上一部分请转至

4.Rust简单 Web server 构建

4.1Rust 构建单线程web服务器:

构建的单线程web服务器仅进行get方法处理和响应,以及出现错误之后的404错误返回,进行了三次简单的重构之后,Rust代码如下:

src/main.rs:

use multithreaded_server::ThreadPool;
use std::time::Duration;
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::{TcpListener,TcpStream};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4); //创建含有线程数为4的线程池

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512]; //栈区创建一个 512 字节的缓冲区

    stream.read(&mut buffer).unwrap(); //从TcpStream中读取字节并放入缓冲区
    println!("Request: {}", String::from_utf8_lossy(&buffer[..]));

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "test.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "test.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();

    println!("Response: ");
    println!("{}", response);
}

​ 当然,基础的TCP socket只能使用由rust官方标准库的实现。

​ 上述简单单线程web server实现中创建了TCP socket,并进行了对7878端口的绑定和监听,对于每一个客户端和服务端之间打开的TCP socket流连接和请求/响应过程进行了处理,并获取了连接组成的迭代器。对每个连接进行错误处理之后,调用自定义连接处理函数handle_connection进行请求放置缓冲区处理后,响应web浏览器对应资源。

​ 代码中为方便调试,将请求与响应的内容进行了控制台输出。

以下是测试用的资源代码:

test.html:

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>test web server</title>
</head>

<body>
    <h1>test</h1>
    <p>this is test for little web server</p>
</body>

</html>

404.html

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">

<html xmlns="http://www.w3.org/1999/xhtml">

<head>

    <meta charset="UTF-8" http-equiv="Content-Type" content="text/html; charset=utf-8" />

    <title>404-对不起!您访问的页面不存在</title>

    <style type="text/css">
        .head404 {
            width: 580px;
            height: 234px;
            margin: 50px auto 0 auto;
            background: url(https://www.daixiaorui.com/Public/images/head404.png) no-repeat;
        }

        .txtbg404 {
            width: 499px;
            height: 169px;
            margin: 10px auto 0 auto;
            background: url(https://www.daixiaorui.com/Public/images/txtbg404.png) no-repeat;
        }

        .txtbg404 .txtbox {
            width: 390px;
            position: relative;
            top: 30px;
            left: 60px;
            color: #eee;
            font-size: 13px;
        }

        .txtbg404 .txtbox p {
            margin: 5px 0;
            line-height: 18px;
        }

        .txtbg404 .txtbox .paddingbox {
            padding-top: 15px;
        }

        .txtbg404 .txtbox p a {
            color: #eee;
            text-decoration: none;
        }

        .txtbg404 .txtbox p a:hover {
            color: #FC9D1D;
            text-decoration: underline;
        }
    </style>

</head>



<body bgcolor="#494949">

    <div class="head404"></div>

    <div class="txtbg404">

        <div class="txtbox">

            <p>对不起,您请求的页面不存在、或已被删除、或暂时不可用</p>

            <p class="paddingbox">请点击以下链接继续浏览网页</p>

            <p><a style="cursor:pointer" onclick="history.back()">返回上一页面</a></p>

            <p><a href="https://www.daixiaorui.com">返回网站首页</a></p>

        </div>

    </div>

</body>

</html>

</html>

​ 运行该简单web server后,浏览器请求127.0.0.1:7878/http://127.0.0.1:7878/sleep:

server_1

console也输出了相应request/respons的内容:
image-20210112002836825

如果访问未定义的路由,例如:127.0.0.1:7878/test,则会返回404.html如下:
server_3

由此便完成了简单的单线程web server。

​ 虽然此单线程web server也能进行资源处理与响应,但是Rust的高级特性并未在其中体现,譬如多线程无畏并发和高级trait。而且因为server运行于单线程中,一次只能处理一个请求,意味着未完成第一个处理之前不会处理第二个连接,我在代码中增加了sleep模拟了慢请求,如果请求/sleep五秒之内(即当前sleep请求正在处理)请求/,则会发现只能等sleep休眠五秒结束之后才会出现。

​ 即如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。于是我打算利用Rust的并发来进行server的改进。

4.2Rust 构建多线程web服务器

​ 构建多线程web server常用的办法是为其实现线程池,同样此处也将为此实现一个简单的线程池,其中含有固定数量的线程,将新请求发送到线程池做处理之后,线程池将维护接收到的请求队列,每个线程从队列中取出一个请求执行处理,然后向队列索取另一个请求,即设计的该线程池的并发数N为线程池线程数,如果每个线程都在做慢请求响应,则依然会造成阻塞队列,不过性能相比上面的单线程web server 增加了许多,所能处理的慢请求也变多了。

构建线程池的代码如下:

src/lib.rs:

use std::sync::{mpsc, Arc, Mutex};
use std::thread;

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);

                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);

                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

type Job = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NewJob(Job),
    Terminate,
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

//丢弃线程池
impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

​ 这里创建了ThreadPool的关联函数new,其用以创建线程池,new通过通道建立发送者和接收者,接收者实际上就是等待线程,所以在new中被数次克隆进等待队列works里,然后等待线程队列works的每个线程通过线程安全智能指针来实现共享任务队列,即同一个发送者sender。

​ Worker的关联函数new中用循环实现了对通道的接收者请求任务,如果没有得到任务,则会阻塞线程本身,并在得到时候执行,并且采用互斥器用以实现安全共享。当线程池被丢弃的时候会调用其drop函数进行线程队列的清理,清理线程池中的所有线程。

​ 由此实现了一个简单的异步执行连接的线程池,其对应的main函数如下:

use multithreaded_server::ThreadPool;
use std::time::Duration;
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::{TcpListener,TcpStream};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "test.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "test.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

运行之后在浏览器进行慢请求测试与并发测试之后发现,当前建立的多线程server较单线程版本所能处理的并发连接增加,并且一定范围内不受慢请求的影响,4个测试用并发数目获取资源的情况在进行6次/与/sleep请求之后,console输出情况如下:
server_4
即并发数为4,当新请求建立之后,线程池中的线程依然是队列形式进行处理,吞吐量增加,符合预期。

如果将web server的请求数量设置在4的话,进行4次请求之后,该web server程序停止运行,console输出如下情况:
server_5

可以看见线程队列进行了消息处理后被清理,符合预期。

同样,运行该多线程 webserver后,浏览器能正确显示与获取资源。

4.3Rust acitx web框架

​ 实际上Rust社区已经有许多不同的web框架的实现了,其中最出名的有actix和rocket,这里以actix web框架为例,其实现了相应的Http server、请求处理程序以及类型安全的信息提取器、错误处理、URL调度等,并且也实现了http2协议。

​ 下面是一个简单的actix web应用:

​ 对应项目构成如下:

server_6

main.rs:

use actix_files as fs;
use actix_session::{CookieSession, Session};
use actix_utils::mpsc;
use actix_web::http::{header, Method, StatusCode};
use actix_web::{
    error, get, guard, middleware, web, App, Error, HttpRequest, HttpResponse,
    HttpServer, Result,
};
use std::{env, io};

/// favicon handler
#[get("/favicon")]
async fn favicon() -> Result<fs::NamedFile> {
    Ok(fs::NamedFile::open("static/favicon.ico")?)
}

/// simple index handler
#[get("/welcome")]
async fn welcome(session: Session, req: HttpRequest) -> Result<HttpResponse> {
    println!("{:?}", req);

    // session
    let mut counter = 1;
    if let Some(count) = session.get::<i32>("counter")? {
        println!("SESSION value: {}", count);
        counter = count + 1;
    }

    // set counter to session
    session.set("counter", counter)?;

    // response
    Ok(HttpResponse::build(StatusCode::OK)
        .content_type("text/html; charset=utf-8")
        .body(include_str!("../static/welcome.html")))
}

/// 404 handler
async fn p404() -> Result<fs::NamedFile> {
    Ok(fs::NamedFile::open("static/404.html")?.set_status_code(StatusCode::NOT_FOUND))
}

/// response body
async fn response_body(path: web::Path<String>) -> HttpResponse {
    let text = format!("Hello {}!", *path);

    let (tx, rx_body) = mpsc::channel();
    let _ = tx.send(Ok::<_, Error>(web::Bytes::from(text)));

    HttpResponse::Ok().streaming(rx_body)
}

/// handler with path parameters like `/user/{name}/`
async fn with_param(
    req: HttpRequest,
    web::Path((name,)): web::Path<(String,)>,
) -> HttpResponse {
    println!("{:?}", req);

    HttpResponse::Ok()
        .content_type("text/plain")
        .body(format!("Hello {}!", name))
}

#[actix_web::main]
async fn main() -> io::Result<()> {
    env::set_var("RUST_LOG", "actix_web=debug,actix_server=info");
    env_logger::init();

    HttpServer::new(|| {
        App::new()
            // cookie session middleware
            .wrap(CookieSession::signed(&[0; 32]).secure(false))
            // enable logger - always register actix-web Logger middleware last
            .wrap(middleware::Logger::default())
            // register favicon
            .service(favicon)
            // register simple route, handle all methods
            .service(welcome)
            // with path parameters
            .service(web::resource("/user/{name}").route(web::get().to(with_param)))
            // async response body
            .service(
                web::resource("/async-body/{name}").route(web::get().to(response_body)),
            )
            .service(
                web::resource("/test").to(|req: HttpRequest| match *req.method() {
                    Method::GET => HttpResponse::Ok(),
                    Method::POST => HttpResponse::MethodNotAllowed(),
                    _ => HttpResponse::NotFound(),
                }),
            )
            .service(web::resource("/error").to(|| async {
                error::InternalError::new(
                    io::Error::new(io::ErrorKind::Other, "test"),
                    StatusCode::INTERNAL_SERVER_ERROR,
                )
            }))
            // static files
            .service(fs::Files::new("/static", "static").show_files_listing())
            // redirect
            .service(web::resource("/").route(web::get().to(|req: HttpRequest| {
                println!("{:?}", req);
                HttpResponse::Found()
                    .header(header::LOCATION, "static/welcome.html")
                    .finish()
            })))
            // default
            .default_service(
                // 404 for GET request
                web::resource("")
                    .route(web::get().to(p404))
                    // all requests that are not `GET`
                    .route(
                        web::route()
                            .guard(guard::Not(guard::Get()))
                            .to(HttpResponse::MethodNotAllowed),
                    ),
            )
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

​ 运行后示例如下:
server_7

​当然,这只是一个简单的应用example。

总结

对比自己编写的多线程异步web server,可以发现actix有其完善的异步处理、路由控制和资源控制,可见actix是一个简易且快速的web框架。

本作品采用《CC 协议》,转载必须注明作者和本文链接
文章作者:chen0adapter
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!