《The Rust Programming language》代码练习(part 3 简单web server)
我与Rust的缘分起始于当我在编程论坛上闲逛时,无意间发现了这么一门现代型的系统安全的函数式系统编程语言,但是当时只是大致了解,并无深入学习,所以此次便将它细致性地学习了一遍。
学习内容为书籍《The Rust Programming language》的全部内容(已完成)、《Rust编程之道》的全部内容(未完成)和《The Rustonomicon》的部分内容(未完成)。
一. 内容概述
我将 《Rust 编程语言》 的学习内容分为基础学习(1至9章)与进阶学习(10至19章),这两个部分是对我学习内容的一个大概缩略。而后是一个根据书上最后一章(20章)进行的简单的 web server 程序构建,最后是对比 Rust 社区已有的actix web 框架的一个简单 example。
本文为《The Rust Programming language》最后一章的《 Web 实战项目》练习,此部分学习练习代码已经发在了开源平台 Gitee 和 GitHub 平台上.
查看上一部分请转至
4.Rust简单 Web server 构建
4.1Rust 构建单线程web服务器:
构建的单线程web服务器仅进行get方法处理和响应,以及出现错误之后的404错误返回,进行了三次简单的重构之后,Rust代码如下:
src/main.rs:
use multithreaded_server::ThreadPool;
use std::time::Duration;
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::{TcpListener,TcpStream};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4); //创建含有线程数为4的线程池
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512]; //栈区创建一个 512 字节的缓冲区
stream.read(&mut buffer).unwrap(); //从TcpStream中读取字节并放入缓冲区
println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "test.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "test.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
println!("Response: ");
println!("{}", response);
}
当然,基础的TCP socket只能使用由rust官方标准库的实现。
上述简单单线程web server实现中创建了TCP socket,并进行了对7878端口的绑定和监听,对于每一个客户端和服务端之间打开的TCP socket流连接和请求/响应过程进行了处理,并获取了连接组成的迭代器。对每个连接进行错误处理之后,调用自定义连接处理函数handle_connection进行请求放置缓冲区处理后,响应web浏览器对应资源。
代码中为方便调试,将请求与响应的内容进行了控制台输出。
以下是测试用的资源代码:
test.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>test web server</title>
</head>
<body>
<h1>test</h1>
<p>this is test for little web server</p>
</body>
</html>
404.html
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta charset="UTF-8" http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>404-对不起!您访问的页面不存在</title>
<style type="text/css">
.head404 {
width: 580px;
height: 234px;
margin: 50px auto 0 auto;
background: url(https://www.daixiaorui.com/Public/images/head404.png) no-repeat;
}
.txtbg404 {
width: 499px;
height: 169px;
margin: 10px auto 0 auto;
background: url(https://www.daixiaorui.com/Public/images/txtbg404.png) no-repeat;
}
.txtbg404 .txtbox {
width: 390px;
position: relative;
top: 30px;
left: 60px;
color: #eee;
font-size: 13px;
}
.txtbg404 .txtbox p {
margin: 5px 0;
line-height: 18px;
}
.txtbg404 .txtbox .paddingbox {
padding-top: 15px;
}
.txtbg404 .txtbox p a {
color: #eee;
text-decoration: none;
}
.txtbg404 .txtbox p a:hover {
color: #FC9D1D;
text-decoration: underline;
}
</style>
</head>
<body bgcolor="#494949">
<div class="head404"></div>
<div class="txtbg404">
<div class="txtbox">
<p>对不起,您请求的页面不存在、或已被删除、或暂时不可用</p>
<p class="paddingbox">请点击以下链接继续浏览网页</p>
<p>》<a style="cursor:pointer" onclick="history.back()">返回上一页面</a></p>
<p>》<a href="https://www.daixiaorui.com">返回网站首页</a></p>
</div>
</div>
</body>
</html>
</html>
运行该简单web server后,浏览器请求127.0.0.1:7878/ 或http://127.0.0.1:7878/sleep:
console也输出了相应request/respons的内容:
如果访问未定义的路由,例如:127.0.0.1:7878/test,则会返回404.html如下:
由此便完成了简单的单线程web server。
虽然此单线程web server也能进行资源处理与响应,但是Rust的高级特性并未在其中体现,譬如多线程无畏并发和高级trait。而且因为server运行于单线程中,一次只能处理一个请求,意味着未完成第一个处理之前不会处理第二个连接,我在代码中增加了sleep模拟了慢请求,如果请求/sleep五秒之内(即当前sleep请求正在处理)请求/,则会发现只能等sleep休眠五秒结束之后才会出现。
即如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。于是我打算利用Rust的并发来进行server的改进。
4.2Rust 构建多线程web服务器
构建多线程web server常用的办法是为其实现线程池,同样此处也将为此实现一个简单的线程池,其中含有固定数量的线程,将新请求发送到线程池做处理之后,线程池将维护接收到的请求队列,每个线程从队列中取出一个请求执行处理,然后向队列索取另一个请求,即设计的该线程池的并发数N为线程池线程数,如果每个线程都在做慢请求响应,则依然会造成阻塞队列,不过性能相比上面的单线程web server 增加了许多,所能处理的慢请求也变多了。
构建线程池的代码如下:
src/lib.rs:
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
//丢弃线程池
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
这里创建了ThreadPool的关联函数new,其用以创建线程池,new通过通道建立发送者和接收者,接收者实际上就是等待线程,所以在new中被数次克隆进等待队列works里,然后等待线程队列works的每个线程通过线程安全智能指针来实现共享任务队列,即同一个发送者sender。
Worker的关联函数new中用循环实现了对通道的接收者请求任务,如果没有得到任务,则会阻塞线程本身,并在得到时候执行,并且采用互斥器用以实现安全共享。当线程池被丢弃的时候会调用其drop函数进行线程队列的清理,清理线程池中的所有线程。
由此实现了一个简单的异步执行连接的线程池,其对应的main函数如下:
use multithreaded_server::ThreadPool;
use std::time::Duration;
use std::fs;
use std::thread;
use std::io::prelude::*;
use std::net::{TcpListener,TcpStream};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "test.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "test.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
运行之后在浏览器进行慢请求测试与并发测试之后发现,当前建立的多线程server较单线程版本所能处理的并发连接增加,并且一定范围内不受慢请求的影响,4个测试用并发数目获取资源的情况在进行6次/与/sleep请求之后,console输出情况如下:
即并发数为4,当新请求建立之后,线程池中的线程依然是队列形式进行处理,吞吐量增加,符合预期。
如果将web server的请求数量设置在4的话,进行4次请求之后,该web server程序停止运行,console输出如下情况:
可以看见线程队列进行了消息处理后被清理,符合预期。
同样,运行该多线程 webserver后,浏览器能正确显示与获取资源。
4.3Rust acitx web框架
实际上Rust社区已经有许多不同的web框架的实现了,其中最出名的有actix和rocket,这里以actix web框架为例,其实现了相应的Http server、请求处理程序以及类型安全的信息提取器、错误处理、URL调度等,并且也实现了http2协议。
下面是一个简单的actix web应用:
对应项目构成如下:
main.rs:
use actix_files as fs;
use actix_session::{CookieSession, Session};
use actix_utils::mpsc;
use actix_web::http::{header, Method, StatusCode};
use actix_web::{
error, get, guard, middleware, web, App, Error, HttpRequest, HttpResponse,
HttpServer, Result,
};
use std::{env, io};
/// favicon handler
#[get("/favicon")]
async fn favicon() -> Result<fs::NamedFile> {
Ok(fs::NamedFile::open("static/favicon.ico")?)
}
/// simple index handler
#[get("/welcome")]
async fn welcome(session: Session, req: HttpRequest) -> Result<HttpResponse> {
println!("{:?}", req);
// session
let mut counter = 1;
if let Some(count) = session.get::<i32>("counter")? {
println!("SESSION value: {}", count);
counter = count + 1;
}
// set counter to session
session.set("counter", counter)?;
// response
Ok(HttpResponse::build(StatusCode::OK)
.content_type("text/html; charset=utf-8")
.body(include_str!("../static/welcome.html")))
}
/// 404 handler
async fn p404() -> Result<fs::NamedFile> {
Ok(fs::NamedFile::open("static/404.html")?.set_status_code(StatusCode::NOT_FOUND))
}
/// response body
async fn response_body(path: web::Path<String>) -> HttpResponse {
let text = format!("Hello {}!", *path);
let (tx, rx_body) = mpsc::channel();
let _ = tx.send(Ok::<_, Error>(web::Bytes::from(text)));
HttpResponse::Ok().streaming(rx_body)
}
/// handler with path parameters like `/user/{name}/`
async fn with_param(
req: HttpRequest,
web::Path((name,)): web::Path<(String,)>,
) -> HttpResponse {
println!("{:?}", req);
HttpResponse::Ok()
.content_type("text/plain")
.body(format!("Hello {}!", name))
}
#[actix_web::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix_web=debug,actix_server=info");
env_logger::init();
HttpServer::new(|| {
App::new()
// cookie session middleware
.wrap(CookieSession::signed(&[0; 32]).secure(false))
// enable logger - always register actix-web Logger middleware last
.wrap(middleware::Logger::default())
// register favicon
.service(favicon)
// register simple route, handle all methods
.service(welcome)
// with path parameters
.service(web::resource("/user/{name}").route(web::get().to(with_param)))
// async response body
.service(
web::resource("/async-body/{name}").route(web::get().to(response_body)),
)
.service(
web::resource("/test").to(|req: HttpRequest| match *req.method() {
Method::GET => HttpResponse::Ok(),
Method::POST => HttpResponse::MethodNotAllowed(),
_ => HttpResponse::NotFound(),
}),
)
.service(web::resource("/error").to(|| async {
error::InternalError::new(
io::Error::new(io::ErrorKind::Other, "test"),
StatusCode::INTERNAL_SERVER_ERROR,
)
}))
// static files
.service(fs::Files::new("/static", "static").show_files_listing())
// redirect
.service(web::resource("/").route(web::get().to(|req: HttpRequest| {
println!("{:?}", req);
HttpResponse::Found()
.header(header::LOCATION, "static/welcome.html")
.finish()
})))
// default
.default_service(
// 404 for GET request
web::resource("")
.route(web::get().to(p404))
// all requests that are not `GET`
.route(
web::route()
.guard(guard::Not(guard::Get()))
.to(HttpResponse::MethodNotAllowed),
),
)
})
.bind("127.0.0.1:8080")?
.run()
.await
}
运行后示例如下:
当然,这只是一个简单的应用example。
总结
对比自己编写的多线程异步web server,可以发现actix有其完善的异步处理、路由控制和资源控制,可见actix是一个简易且快速的web框架。
本作品采用《CC 协议》,转载必须注明作者和本文链接