这是一篇博文翻译,略有删减,整理代码方便统一阅读,Github链接:https://github.com/lesterli/rust-practice/tree/master/head-first/async-primer。
原文在2月11号的【Rust日报】中给大家推荐过, 原文链接: https://omarabid.com/async-rust
本文并不全面介绍Rust异步主题。如果对新的async/await关键字Futures感到疑惑,并且对Tokio的用途很感兴趣,那么到最后应该会不再毫无头绪。
Rust异步技术是Rust领域的新热点,它被誉为Rust的重要里程碑,特别适合开发高性能网络应用程序的人们。
让我们从头开始。
什么是异步?
关于Async,我给一个简短的版本:如果有一个处理器,想同时执行(类似)两项任务,将如何做?解决方案是先运行第一个任务,然后切换并运行第二个任务,然后再切换回去,依此类推,直到完成两个任务。
如果想给人以计算机同时运行两个任务的感觉(即多任务处理),则此功能很有用。另一个用例是IO操作。当程序等待网络响应时,CPU处于空闲状态。这是切换到另一个任务的理想时间。
那么我们如何编写异步代码?
首先,让我们从一些同步代码开始。
同步代码
让我们做一个简单的程序,该程序读取两个文件:file1.txt
和file2.txt
。我们从file1.txt
开始,然后移至file2.txt
。
我们将程序分为两个文件:main.rs
和file.rs
。file.rs
有一个函数:read_file
,在main.rs
中,用每个文件的路径为参数调用此函数。参见下面代码:
// sync-example/src/file.rs
use std::fs::File;
use std::io::{self, Read};
pub fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path)?;
let mut buffer = String::new();
file.read_to_string(&mut buffer)?;
Ok(buffer)
}
// sync-example/src/main.rs
use std::io;
mod file;
fn main() -> io::Result<()> {
println!("program started");
let file1 = file::read_file("src/file1.txt")?;
println!("processed file 1");
let file2 = file::read_file("src/file2.txt")?;
println!("processed file 2");
dbg!(&file1);
dbg!(&file2);
Ok(())
}
使用cargo run
编译并运行程序。该程序应该毫无意外地运行,但是请确保已在src
文件夹中放置了两个文件(file1.txt
和file2.txt
)。
program started
processed file 1
processed file 2
[src/main.rs:14] &file1 = "file1"
[src/main.rs:15] &file2 = "file2"
到目前为止,一切都很好。如果需要在处理file2.txt
之前先处理file1.txt
,那么这是唯一的方法。但是有时不必关心每个文件的处理顺序。理想情况下,希望尽快处理文件。
在这种情况下,我们可以利用多线程。
多线程方法
为此,我们为每个函数调用运行一个单独的线程。由于我们使用的是多线程代码,并且如果要访问线程外部的文件内容,则必须使用Rust提供的同步原语之一。
这将如何影响代码:file.rs
将保持不变,因此这已经是一件好事了。在main.rs
中,我们需要初始化两个RwLock
;这些将稍后在线程中用于存储文件内容。
然后,我们运行一个无限循环,尝试读取这两个变量的内容。如果这些变量不为空,则我们知道文件处理(或读取)已完成。 (这意味着文件不应为空;否则,我们的程序将错误地保持等待状态。另一种方法是使用Option<String>
并检查Option
是否为None
)。
此代码需要crate lazy_static
。
// multi-example/src/main.rs
use std::io;
use std::sync::RwLock;
use std::thread;
use lazy_static::lazy_static;
mod file;
// A sync primitive that allows to read/write from variables between threads.
// we declare the variables here, this requires the lazy_static crate
lazy_static! {
static ref FILE1: RwLock<String> = RwLock::new(String::from(""));
static ref FILE2: RwLock<String> = RwLock::new(String::from(""));
}
fn main() -> io::Result<()> {
println!("program started");
let thread_1 = thread::spawn(|| {
let mut w1 = FILE1.write().unwrap();
*w1 = file::read_file("src/file1.txt").unwrap();
println!("read file 1");
});
println!("Launched Thread 1");
let thread_2 = thread::spawn(|| {
let mut w2 = FILE2.write().unwrap();
*w2 = file::read_file("src/file2.txt").unwrap();
println!("read file 2");
});
println!("Launched Thread 2");
let mut rf1: bool = false;
let mut rf2: bool = false;
loop {
// read()
let r1 = FILE1.read().unwrap();
let r2 = FILE2.read().unwrap();
if *r1 != String::from("") && rf1 == false {
println!("completed file 1");
rf1 = true;
}
if *r2 != String::from("") && rf2 == false {
println!("completed file 2");
rf2 = true;
}
}
Ok(())
}
有趣的是,如果我们有一个非常大的file1.txt
,我们将得到一个奇怪的输出。首先处理第二个文件(读取文件2);但在我们的循环内部,该程序似乎阻塞并等待第一个文件。
program started
Launched Thread 1
Launched Thread 2
read file 2
read file 1
completed file 1
completed file 2
多线程可能有点棘手,因为我们必须考虑可能阻塞的原子操作。我们使用read
函数来解锁我们的变量,并且文档对这种行为发出警告。
使用共享的读取访问权限锁定此
rwlock
,阻塞当前线程,直到可以获取它为止。
幸运的是,有一个try_read
函数,如果无法获取锁,则返回Err
。
尝试使用共享的读取访问权限获取此
rwlock
。如果此时不能授予访问权限,则返回
Err
。 否则,将返回RAII保护,当该保护被删除时,该保护将释放共享访问。
在第二次尝试中,我们使用try_read
并忽略返回的Errs
,因为它们应该表示我们的锁正忙。这有助于将程序移至下一个变量,并处理先准备好的变量。
// multi-example/src/main.rs
...
loop {
// try_read()
let r1 = FILE1.try_read();
let r2 = FILE2.try_read();
match r1 {
Ok(v) => {
if *v != String::from("") && rf1 == false {
println!("completed file 1");
rf1 = true;
}
}
// If rwlock can't be acquired, ignore the error
Err(_) => {}
}
match r2 {
Ok(v) => {
if *v != String::from("") && rf2 == false {
println!("completed file 2");
rf2 = true;
}
}
// If rwlock can't be acquired, ignore the error
Err(_) => {}
}
}
...
现在执行方式有所不同。如果file1.txt
比file2.txt
大得多,则应首先处理第二个文件。
program started
Launched Thread 1
Launched Thread 2
read file 2
completed file 2
read file 1
completed file 1
多线程的局限性
如果我们已经有多线程,为什么我们需要异步?有两个主要优点:性能和简单性。产生线程很昂贵;从以上内容可以得出结论,编写多线程代码可能会变得非常复杂。
异步,关键字
Rust的重点是使编写Async代码尽可能简单。只需要在函数声明之前添加async/await
关键字即可使代码异步:函数声明前async
,解析异步函数await
。
这听起来很不错。试一试吧。
use std::fs::File;
use std::io::{self, Read};
pub async fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path)?;
let mut buffer = String::new();
file.read_to_string(&mut buffer)?;
Ok(buffer)
}
use std::io;
mod file;
fn main() -> io::Result<()> {
let r1 = file::read_file("src/file1.txt");
let r2 = file::read_file("src/file2.txt");
let f1 = r1.await;
let f2 = r2.await;
dbg!(f1);
dbg!(f2);
Ok(())
}
但是这不能通过编译,await
仅在异步块或函数中可用。如果我们尝试运行此代码,则编译器将引发此错误。
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/main.rs:9:14
|
5 | fn main() -> io::Result<()> {
| ---- this is not `async`
...
9 | let f1 = r1.await;
| ^^^^^^^^ only allowed inside `async` functions and blocks
我们可以使main
函数异步吗?不幸的是,事情并非如此简单。我们得到另一个错误。
error[E0277]: `main` has invalid return type `impl std::future::Future`
--> src/main.rs:5:20
|
5 | async fn main() -> io::Result<()> {
| ^^^^^^^^^^^^^^ `main` can only return types that implement `std::process::Termination`
|
= help: consider using `()`, or a `Result`
但是,错误消息有点令人着迷。似乎async
关键字使我们的函数返回Future
而不是声明的类型。
异步函数的返回类型是Future
(确切地说是实现Future
特性的闭包)。
那await
呢?await
在整个Future
中循环直至完成。但是,还有另外一个谜团:Rust无法自解析Future
。我们需要一个执行器来运行此异步代码。
什么是执行器?
如果回顾一下我们的多线程示例,会注意到我们使用循环来检测何时处理文件。这很简单:无限循环直到变量中包含某些内容,然后执行某些操作。如果读取两个文件,我们可以通过跳出循环来改善这一点。
一个异步执行器是循环。默认情况下,Rust没有任何内置的执行程序。有许多异步运行时;async-std
和Tokio
是最受欢迎的。运行时的工作是轮询异步函数(Future
),直到它们最终返回一个值。
一个简单的执行器
crate futures
有一个非常基本的执行器,并且具有将两个Future
连接的函数。让我们试一试。
以下代码使用crate futures
版本0.3.4。
// async-example/src/main.rs
use futures::executor::block_on;
use futures::join;
use std::io;
mod file;
fn main() -> io::Result<()> {
println!("Program started");
// Block on the final future
block_on(load_files());
Ok(())
}
async fn load_files() {
// Join the two futures together
join!(load_file_1(), load_file_2());
}
async fn load_file_1() {
let r1 = file::read_file("src/file1.txt").await;
println!("file 1 size: {}", r1.unwrap().len());
}
async fn load_file_2() {
let r2 = file::read_file("src/file2.txt").await;
println!("file 2 size: {}", r2.unwrap().len());
}
为了验证异步性,将一堆数据转储到file1.txt
中。
Program started
file 1 size: 5399
file 2 size: 5
不幸的是,这看起来(确实)第一个文件函数再次阻塞了。
那么异步到底是什么?
与多线程类似,异步编程中也有一些陷阱和问题。事实是,async
关键字不会神奇地使代码异步;它只是使函数返回Future
。仍然必须繁重地安排代码执行时间。
这意味着函数必须迅速返回尚未准备就绪的状态,而不是被困在进行计算的过程中。在我们的情况下,阻塞是特定在File::Open
和file.read_to_string
处发生的。这两个函数不是异步的,因此会阻止执行。
我们需要创建这两个函数的异步版本。幸运的是,一些使用async-std
的人做了工作,将Rust中的std
库重写为异步版本。
使用async-std
的文件IO
我们唯一要做的更改是将我们的std
导入替换为async_std
。
对于以下示例,我们使用crate async-std
版本1.5.0。
// async-example/src/file.rs
// We use async_std instead of std, it's that simple.
use async_std::io;
use async_std::fs::File;
use async_std::prelude::*;
pub async fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path).await?;
let mut buffer = String::new();
file.read_to_string(&mut buffer).await?;
Ok(buffer)
}
main.rs
中的代码保持不变;该程序仍使用crate futures
中的block_on
执行程序。
编译并运行程序。(确保有一个大的file1.txt
)
Program started
file 2 size: 5
file 1 size: 5399
最后!程序首先快速处理file2.txt
,然后移至file1.txt
。
让我们回顾一下到目前为止所学到的东西:
-
async
使我们的函数返回Future
。 - 运行我们的
Future
需要一个运行时。 - 运行时检查
Future
是否准备就绪;并在就绪时返回其值。
总结
在这篇文章中,我们介绍了同步代码,多线程代码,Rust中的一些异步术语,async-std
库和简单的Future
实现。实际上,这是一个"轻量级"的介绍,为简洁起见,省略了许多细节。