001 Rust 异步编程,Why Async

前言

我们之前已经学习过Rust编程基础相关的一些知识,下面进入到Rust异步编程的学习,本节主要介绍为什么需要Rust异步编程。

场景说明

假定有一个客户端,需要从2个不同的网站进行下载任务,我们用Rust程序模拟下载过程,并说明为什么需要Rust异步编程。

服务端的实现

Server1

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::thread;
use std::time;

fn handle_client(mut stream: TcpStream, wait_time: u64) -> std::io::Result<()> {
    let mut buf = [0; 512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 {
            return Ok(());
        }

        thread::sleep(time::Duration::from_secs(wait_time));
        stream.write(&buf[..bytes_read])?;
        stream.write(&("\n".as_bytes()))?;
    }    
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;

    for stream in listener.incoming() {
        handle_client(stream?, 20)?;
    }

    Ok(())
}

server2

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::thread;
use std::time;

fn handle_client(mut stream: TcpStream, wait_time: u64) -> std::io::Result<()> {
    let mut buf = [0; 512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 {
            return Ok(());
        }

        thread::sleep(time::Duration::from_secs(wait_time));
        stream.write(&buf[..bytes_read])?;
        stream.write(&("\n".as_bytes()))?;
    }    
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8081")?;

    for stream in listener.incoming() {
        handle_client(stream?, 10)?;
    }

    Ok(())
}

client

迭代一

use std::net::TcpStream;
use std::io::{ prelude::*, BufReader, Write };
use std::str;

fn use_server(server: &str, port: u16, content: &str) -> std::io::Result<()> {
    let mut stream = TcpStream::connect((server, port))?;
    let _ = stream.write(content.as_bytes())?;

    let mut reader = BufReader::new(&stream);
    let mut buffer: Vec<u8> = Vec::new();
    reader.read_until(b'\n', &mut buffer)?;

    println!("recv from server: {} ", str::from_utf8(&buffer).unwrap());
    Ok(())
}

//迭代一
fn main() -> std::io::Result<()> {
    use_server("127.0.0.1", 8080, "use server 127.0.0.1:8080")?;
    use_server("127.0.0.1", 8081, "use server 127.0.0.1:8081")?;

    Ok(())
}

程序存在的问题:

必须从第一个网站下载完成后才能从第二个网站进行下载,不符合实际下载情况。

迭代二

针对迭代一中的问题,解决办法如下:

使用多线程实现,每个下载任务都起一个线程。

代码如下:

use std::net::TcpStream;
use std::io::{ prelude::*, BufReader, Write };
use std::str;
use std::thread;

fn use_server(server: &str, port: u16, content: &str) -> std::io::Result<()> {
    let mut stream = TcpStream::connect((server, port))?;
    let _ = stream.write(content.as_bytes())?;

    let mut reader = BufReader::new(&stream);
    let mut buffer: Vec<u8> = Vec::new();
    reader.read_until(b'\n', &mut buffer)?;

    println!("recv from server: {} ", str::from_utf8(&buffer).unwrap());
    Ok(())
}

迭代二
fn main() -> std::io::Result<()> {
    let mut handles: Vec<thread::JoinHandle<()>> = Vec::new(); 
    let handle = thread::spawn(move || {
        use_server("127.0.0.1", 8080, "use server 127.0.0.1:8080").unwrap();
    });
    handles.push(handle);

    let handle = thread::spawn(move || {
        use_server("127.0.0.1", 8081, "use server 127.0.0.1:8081").unwrap();
    });
    handles.push(handle);

    for handle in handles {
        handle.join().unwrap();
    }
    Ok(())
 }

程序存在问题:

1、如果有几千几万个任务,每个任务都起一个线程,怎么办?
2、如果使用线程池解决,但是我们每个下载任务其实都是发起请求后在等待server的响应,效率还是不够高。那么有没有什么办法解决这些问题?

迭代三

针对迭代二中的问题,可以考虑用异步编程来解决,即不需要起多个线程,每个任务就绪后再执行,修改过程如下:

  • 配置文件Cargo.toml

    [dependencies]
    futures = "0.3.4"
  • 源码:

    use std::net::TcpStream;
    use std::io::{ prelude::*, BufReader, Write };
    use std::str;
    use futures::join;
    use futures::executor;
    fn use_server(server: &str, port: u16, content: &str) -> std::io::Result<()> {
      let mut stream = TcpStream::connect((server, port))?;
      let _ = stream.write(content.as_bytes())?;
    
      let mut reader = BufReader::new(&stream);
      let mut buffer: Vec<u8> = Vec::new();
      reader.read_until(b'\n', &mut buffer)?;
    
      println!("recv from server: {} ", str::from_utf8(&buffer).unwrap());
      Ok(())
    }
    //迭代三
    async fn async_use_server(server: &str, port: u16, content: &str) {
      use_server(server, port, content).unwrap();
    }
    async fn use_all_server() {
      let f1 = async_use_server("127.0.0.1", 8080, "use server 127.0.0.1:8080");
      let f2 = async_use_server("127.0.0.1", 8081, "use server 127.0.0.1:8081");
      join!(f1, f2);
    }
    fn main() -> std::io::Result<()> {
      let f = use_all_server();
      executor::block_on(f);
      Ok(())
    }

通过Rust异步编程,我们期望能够达到不使用多线程达到迭代二中多线程一样的效果。

运行程序,发现并没有符合我们的预期,运行结果仍然是先下载第一个任务,再下载第二个任务。为什么会这样呢

这里留个悬念,本节主要说明为什么需要异步编程,至于程序为什么不及预期,我们会在以后的章节中给大家解惑。

本作品采用《CC 协议》,转载必须注明作者和本文链接
令狐一冲
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
文章
255
粉丝
120
喜欢
308
收藏
128
排名:335
访问:2.8 万
私信
所有博文
社区赞助商