This page looks best with JavaScript enabled

[译] Tokio 0.2 - Rust Crash Course lesson 9

 ·  ☕ 9 min read

原文地址
https://www.snoyman.com/blog/2019/12/rust-crash-course-09-tokio-0-2

Tokio 是一个事件驱动的非阻塞 I/O 平台,用于使用 Rust 编写异步应用程序。

如果你想用 Rust 编写高效的并发网络服务,你需要使用 Tokio 之类的工具。这并不是说这是 Tokio 的唯一用例。
你可以在网络服务之外使用事件马支的调度程序来做很棒的事情。这也不是说 Tokio 是唯一的解决方法。
async-std 库也提供了相似的功能。

但是,对于非阻塞 I/O 系统,网络服务可能是最常用的领域。并且 Tokio 是当今最流行和最成熟的系统。所以我们要从这个组合开始。

练习的解决方案将包含在博客文章的末尾。

这篇文章是 teaching Rust at FP Complete 系列文章的一部分。
如果你是在博客之外阅读这篇文章,你可以在介绍文章的顶部找到到系列文章中所有文章的 链接。
您还可以订阅 RSS feed。

Hello Tokio!

我们开始吧。继续创建一个新的Rust项目进行试验:

$ cargo new –bin usetokio

如果要确保使用与我相同的编译器版本,请正确设置rust-toolchain:

$ echo 1.39.0 > rust-toolchain

然后将 Tokio 设置为依赖项。为了简单,我们将安装所有的 features 。Cargo.toml 文件中:

1
2
[dependencies]
tokio = { version = "0.2", features = ["full"] }

提示 你可以运行 cargo build 来下载和构建 crates 。

现在,我们将编写一个异步的 Hello world 程序。在你的 src/main.rs 中输入以下内容:

1
2
3
4
5
6
7
8
9
use tokio::io;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut stdout = io::stdout();
    let mut hello: &[u8] = b"Hello, world!\n";
    io::copy(&mut hello, &mut stdout).await?;
    Ok(())
}

在上一课中,其中许多内容应该看起来很熟悉。回顾一下:

  • 因为我们将等待并生成 Future,因此我们的main 函数是异步的
  • 因为 main 是异步的,所以我们需要一个运行器来运行它,这就是为什么我们使用 #[tokio::main] 属性。
  • 出于执行 I/O 可能失败,因此我们返回一个 Result
.await?

我上次提到过它,现在我们真正见到了它,这个是将两个技术结合在一起: .await 用来链接 Futures , ? 来用处理错误。这些能够很好的协同工作真是太棒了。我可能会再提及几次,因为我真的非常喜欢。

接下来要注意的是,我们要使用 tokio::io::stdout() 来与标准输出交互的值。如果你熟悉它,那么它看起来非常类似 std::io::stdout() 。那是设计使然: tokio 中的很大一部分 API 只是将 std 异步化。

最后,我们可以看一下 tokio::io::copy ,你可能已经猜到,正如 API docs 所述:

这是 std::io::copy 的异步版本。

但是,这不是使用 Read 和 Write trait ,而是使用它们的异步同类: AsyncRead 和 AsyncWrite 。
字节切片(&[u8]) 是一个有效的 AsyncRead,因此我们可以将输入存储在那里,可能你已经猜到了,Stdout 是 AsyncWrite。

EXERCISE 1 修改代码,将标准输入的全部内容复制到标准输出,而还是输出 Hello, world!。

NOTE 在 useing tokio::io::AsyncWriteExt 后,你可以使用 stdout.write_all 来简化代码,但是我们会坚持使用 tokio::io::copy,因为我们将在整个过程中使用它。但是如果你好奇:

1
2
3
4
5
6
7
8
use tokio::io::{self, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut stdout = io::stdout();
    stdout.write_all(b"Hello, world!\n").await?;
    Ok(())
}

Spawning processes

Tokio 提供了一个和 std::process 相似的模块: tokio::process 。我们可以使用它再次实现 Hello Wrold

1
2
3
4
5
6
7
use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    Command::new("echo").arg("Hello, world!").spawn()?.await?;
    Ok(())
}

注意 ? 和 .await 是如何按照需要的顺序排列的。

  • 创建一个 Command 运行 echo
  • 给命令一个参数 "Hello, world!"
  • spwan ,它可能会失败
  • 第一个 ? : 如何失败了,返回 error 。否则,返回 Future
  • .await : 等到 Future 完成,捕获它的 Result
  • 第二个 ? :如果 Result 是 Err 返回 error

与之前使用回调的方式相比,async/.await 的一大优势是它在 loop 中可以轻松使用。

EXERCISE 2 扩展这个例子,打印 10 次 Hello, world!

Take a break

到目前为上,我们只是完成了单个的 .awaiting 。 但是在多个事件中 .await 也是很容易的。让我们使用 delay_for 来暂停一下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
use tokio::time;
use tokio::process::Command;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    Command::new("date").spawn()?.await?;
    time::delay_for(Duration::from_secs(1)).await;
    Command::new("date").spawn()?.await?;
    time::delay_for(Duration::from_secs(1)).await;
    Command::new("date").spawn()?.await?;
    Ok(())
}

我们还可以使用tokio::time::interval 函数为每次经过一段时间创建一个 “tick” 流。例如,程序每秒调用一次 date 直到被终止:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13

use tokio::time;
use tokio::process::Command;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut interval = time::interval(Duration::from_secs(1));
    loop {
        interval.tick().await;
        Command::new("date").spawn()?.await?;
    }
}

EXERCISE 3 loop 后面为什么没有 Ok(())

Time to spawn

一切都很好,但我们并没有真正利用异步编程。让我们解决这个问题!我们已经看到了两个不同的有趣程序:

  1. 每隔 1s 调用一次 date
  2. 将 stdin 中的所有输入拷贝到 stdout

是时候介绍 sapwn 了,以便我们可以将这二者合并为一个程序。首先,我们来演示一下琐碎的用法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
use std::time::Duration;
use tokio::process::Command;
use tokio::task;
use tokio::time;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    task::spawn(dating()).await??;
    Ok(())
}

async fn dating() -> Result<(), std::io::Error> {
    let mut interval = time::interval(Duration::from_secs(1));
    loop {
        interval.tick().await;
        Command::new("date").spawn()?.await?;
    }
}

你可能想知道:?? 是做什么的?是一些特殊的超级错误处理?不,这只是普通的错误处理 ? 调用了再次。让我们看一些类型签名来帮助我们理解这些:

1
2
3
4
5
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>;

impl<T> Future for JoinHandle<T> {
    type Output = Result<T, JoinError>;
}

spawn 返回一个 JoinHandle<T::Output> 。就我们而言,我们提供的 Future 是 dating(), 而 dating() 的输出类型是 Result<(), std::io::Error> 。
这意味着 task::spawn(dating()) 的类型是 JoinHandle<Result<(), std::io::Error>> 。

JoinHandle 实现了 Future 。因此我们当我们将 .await 应用在上面时,我们最终得到的类型是 Output = Result<T, JoinError> 。
我们知道 T 是 Result<(), std::io::Error>, 所以最终的类型是 Result<Result<(), std::io::Error>, JoinError>。

第一个 ? 处理的是外围的 Result,Err 时,以 JoinError 退出,Ok 时,给我们一个 Result<(), std::io::Error> 。
第二个 ? 处理 std::io::Error ,Ok 时,返回 () 。

EXERCISE 4 现在我们已经看到了spawn,您应该修改程序,使其在循环中调用两个 date ,并将 stdin 复制到 stdout

Synchronous code

你可能没有奢侈地仅与异步友好代码进行交互。也许你有一些非常不错的库想要利用,但是它在内部执行阻塞调用。
幸运的是,Tokio 给出了 spawn_blocking 函数。

task::spawn_blocking 函数和 task::spawn 类似,但却不是在 Tokio 运行时上生成非阻塞的 Future,而是在专用线程池上生成用于阻塞任务的阻塞函数。

EXERCISE 5 重写 dating() 函数,使用 spawn_blocking 和 std:🧵:sleep 以便大约每 1s 调用一次 date 。

Let’s network!

我将把 TcpListener docs
中的示例略微扩展到 (1)可编译, (2)实现一个 echo 服务。
该程序有一个很大的缺陷,我建议深度找到它。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
use tokio::io;
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (socket, _) = listener.accept().await?;
        echo(socket).await?;
    }
}

async fn echo(socket: TcpStream) -> io::Result<()> {
    let (mut recv, mut send) = io::split(socket);
    io::copy(&mut recv, &mut send).await?;
    Ok(())
}

我们使用 TcpListener 绑定 scoket 。绑定过程是同步的,所以我们使用 .await 来等待监听的 socket 可用,使用 ? 处理监听过程中出现的任何错误。

下一步,在 loop 中我们接收新的连接,和之前一样使用 .await?。捕获 socket (忽略元组第二部分的地址)。然后调用 echo 并 .await 它。

在 echo 中, 我们使用 tokio::io::split 将 TcpStream 拆成读写两部分。然后像之前一样把它们传递给 tokio::io::copy 。

错误在哪里?让我问一个问题:如果在第一个连接仍处于活动状态时进入第二个连接,该怎么办?理想情况下,将对其进行处理。但是,我们的程序只有一项任务。然后,该任务将在每次调用回显时唤醒。因此,直到第一个连接关闭,我们的第二个连接才会得到服务。

EXERCISE 6 修改上面的程序,使其能够正确处理并发连接。

TCP client and ownership

让我们写一个穷人版的 HTTP 客户端。
它将建立与硬编码服务器的连接,将所有 stdin 复制到服务器,然后将所有从服务器来的数据复制到 stdout 。
要使用此功能,您将手动输入 HTTP 请求,然后按 Ctrl-D 作为文件结尾。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
use tokio::io;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8080").await?;
    let (mut recv, mut send) = io::split(stream);
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();

    io::copy(&mut stdin, &mut send).await?;
    io::copy(&mut recv, &mut stdout).await?;

    Ok(())
}

很好,但还是有限制。它只处理HTTP等半双工协议,实际上不以任何方式支持保持活动状态。
我们希望使用spawn在不同的任务中运行两个副本,似乎很简单:

1
2
3
4
5
let send = spawn(io::copy(&mut stdin, &mut send));
let recv = spawn(io::copy(&mut recv, &mut stdout));

send.await??;
recv.await??;

不幸的是,无法编译。我们收到四个几乎完全相同的错误消息。让我们看一下第一个

error[E0597]: `stdin` does not live long enough
  --> src/main.rs:12:31
   |
12 |     let send = spawn(io::copy(&mut stdin, &mut send));
   |                      ---------^^^^^^^^^^------------
   |                      |        |
   |                      |        borrowed value does not live long enough
   |                      argument requires that `stdin` is borrowed for `'static`
...
19 | }
   | - `stdin` dropped here while still borrowed

copy Future 没有拥有 stdin (send 也一样) 。相反它有一个 stdin 可变引用。值仍然保存在 main 函数的 Future 中,忽略错误情况,我们知道 main 函数将等待 send 完成。因此生命周期看起来是正确的。但是 Rust 无法识别此生命周期信息。(另外,我还没有完全考虑到这一点,我可以肯定的是,在出现紧急情况时, send 可能会比使用 Future 发送时更早地释放掉)。

为了解决此问题,我们需要说服编译器生成拥有 stdin 的 Future 。最简单的方法是使用 async move 块。

Exercise 7 使用两个 async move 使用上面的代码编译通过。

Playing with lines

本节将对程序进行一系列修改。我建议您在查看解决方案之前先解决每个挑战。
但是,与其他练习不同,我将在线展示解决方案,因为它们是相互依赖的。

让我们构建一个异步程序计算标准输入的行数。你需要使用 lines 方法。
阅读文档,尝试找出使类型对齐所需的用途和包装器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let stdin = io::stdin();
    let stdin = io::BufReader::new(stdin);
    let mut count = 0u32;
    let mut lines = stdin.lines();
    while let Some(_) = lines.next_line().await? {
        count += 1;
    }
    println!("Lines on stdin: {}", count);
    Ok(())
}

再将其提高一级。让我们以文件名列表作为命令行参数,而不是标准输入,并计算所有文件中的总行数。

最初,可以一次读取一个文件。换句话说:不要打扰 spawn 。试一试,然后回到这里:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut count = 0u32;

    for filename in args {
        let file = tokio::fs::File::open(filename).await?;
        let file = io::BufReader::new(file);
        let mut lines = file.lines();
        while let Some(_) = lines.next_line().await? {
            count += 1;
        }
    }

    println!("Total lines: {}", count);
    Ok(())
}

但是现在是时候使它适当地异步了,并在单独的衍生任务中处理文件。
为了完成这项工作,我们需要产生所有任务,然后 .await 每个任务。

我使用 Vec<Future<Output=Result<u32, std::io::Error»>

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut tasks = vec![];

    for filename in args {
        tasks.push(tokio::spawn(async {
            let file = tokio::fs::File::open(filename).await?;
            let file = io::BufReader::new(file);
            let mut lines = file.lines();
            let mut count = 0u32;
            while let Some(_) = lines.next_line().await? {
                count += 1;
            }
            Ok(count) as Result<u32, std::io::Error>
        }));
    }

    let mut count = 0;
    for task in tasks {
        count += task.await??;
    }

    println!("Total lines: {}", count);
    Ok(())
}

最后,在这个过程中:让我们改变处理 count 的方式。
让我们让每个任务更新一个共享的可变变量,而不是等待第二个 for 循环中的计数。
为此,应使用 Arc<Mutex<u32>> 。不过,你仍然需要保留 Vec ,以确保等待所有文件被读取。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;
use std::sync::Arc;

// avoid thread blocking by using Tokio's mutex
use tokio::sync::Mutex;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut tasks = vec![];
    let count = Arc::new(Mutex::new(0u32));

    for filename in args {
        let count = count.clone();
        tasks.push(tokio::spawn(async move {
            let file = tokio::fs::File::open(filename).await?;
            let file = io::BufReader::new(file);
            let mut lines = file.lines();
            let mut local_count = 0u32;
            while let Some(_) = lines.next_line().await? {
                local_count += 1;
            }

            let mut count = count.lock().await;
            *count += local_count;
            Ok(()) as Result<(), std::io::Error>
        }));
    }

    for task in tasks {
        task.await??;
    }

    let count = count.lock().await;
    println!("Total lines: {}", *count);
    Ok(())
}

LocalSet and !Send

我认为,在 main 函数中 .awaiting count 并累加的方法是优越的。但是,我想教你其他东西。

如果用 Rc<RefCell<u32>> 替换 Arc<Mutex<u32>> 会发生什么呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;
use std::rc::Rc;
use std::cell::RefCell;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut tasks = vec![];
    let count = Rc::new(RefCell::new(0u32));

    for filename in args {
        let count = count.clone();
        tasks.push(tokio::spawn(async {
            let file = tokio::fs::File::open(filename).await?;
            let file = io::BufReader::new(file);
            let mut lines = file.lines();
            let mut local_count = 0u32;
            while let Some(_) = lines.next_line().await? {
                local_count += 1;
            }

            *count.borrow_mut() += local_count;
            Ok(()) as Result<(), std::io::Error>
        }));
    }

    for task in tasks {
        task.await??;
    }

    println!("Total lines: {}", count.borrow());
    Ok(())
}

有错误:

error[E0277]: `std::rc::Rc<std::cell::RefCell<u32>>` cannot be shared between threads safely
  --> src/main.rs:15:20
   |
15 |         tasks.push(tokio::spawn(async {
   |                    ^^^^^^^^^^^^ `std::rc::Rc<std::cell::RefCell<u32>>` cannot be shared between threads safely
   |
  ::: /Users/michael/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.2/src/task/spawn.rs:49:17
   |
49 |     T: Future + Send + 'static,
   |                 ---- required by this bound in `tokio::task::spawn::spawn`

任务会被安排到不同的线程。因此,Future 必须是 Send 。
而 Rc<RefCell<u32>> 被定义为 !Send 。
但是,在我们的示例中,使用多个 OS 线程不太可能加速我们的程序。
我们将做很多阻塞 I/O 。如果我们在一个 OS 线程 spawning 所有任务,避免使用 Send 。
可以肯定,Tokio 提供了这样的功能:tokio::task::spawn_local 。
使用它(并以 async move 而不是 async 添加),我们的程序可以编译,但是会在运行时中断:

thread 'main' panicked at '`spawn_local` called from outside of a local::LocalSet!', src/libcore/option.rs:1190:5

现在,我个人并不喜欢这种“运行时检测”功能,但是这个概念很简单:如果您希望在当前线程中生成,则需要设置运行时来支持该功能
我们这样做的方法是使用 LocalSet 。为了使用此功能,您需要放弃 #[tokio::main] 属性。

EXERCISE 8 请遵循 LocalSet 的文档以使上述程序与 Rc<RefCell<u32>> 一起使用。

Conclusion

异步编程中显然可以涵盖更多内容,但是希望这可以为使用 async/.await 语法和 Tokio 库本身奠定基础,这是您需要了解的最大基础。

Solutions

Solution 1

1
2
3
4
5
6
7
8
9
use tokio::io;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();
    io::copy(&mut stdin, &mut stdout).await?;
    Ok(())
}

Solution 2

1
2
3
4
5
6
7
8
9
use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    for _ in 1..=10 {
        Command::new("echo").arg("Hello, world!").spawn()?.await?;
    }
    Ok(())
}

Solution 3

因为 loop 将会永远运行或者因为错误而停止。loop 后面的代码并不会被实际调用。
因此,放置在此处的代码将生成警告。

Solution 4

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use std::time::Duration;
use tokio::process::Command;
use tokio::{io, task, time};

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let dating = task::spawn(dating());
    let copying = task::spawn(copying());

    dating.await??;
    copying.await??;

    Ok(())
}

async fn dating() -> Result<(), std::io::Error> {
    let mut interval = time::interval(Duration::from_secs(1));
    loop {
        interval.tick().await;
        Command::new("date").spawn()?.await?;
    }
}

async fn copying() -> Result<(), std::io::Error> {
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();
    io::copy(&mut stdin, &mut stdout).await?;
    Ok(())
}

Solution 5

1
2
3
4
5
6
async fn dating() -> Result<(), std::io::Error> {
    loop {
        task::spawn_blocking(|| { std:🧵:sleep(Duration::from_secs(1)) }).await?;
        Command::new("date").spawn()?.await?;
    }
}

Solution 6

1
2
3
4
loop {
    let (socket, _) = listener.accept().await?;
    tokio::spawn(echo(socket));
}

但是,有一个缺点值得注意:我们忽略了产生的任务所产生的错误。在这种情况下,最好的行为可能是处理产生的任务中的错误:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:8080").await?;

    let mut counter = 1u32;
    loop {
        let (socket, _) = listener.accept().await?;
        println!("Accepted connection #{}", counter);
        tokio::spawn(async move {
            match echo(socket).await {
                Ok(()) => println!("Connection #{} completed successfully", counter),
                Err(e) => println!("Connection #{} errored: {:?}", counter, e),
            }
        });
        counter += 1;
    }
}

Exericse 7

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use tokio::io;
use tokio::spawn;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8080").await?;
    let (mut recv, mut send) = io::split(stream);
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();

    let send = spawn(async move {
        io::copy(&mut stdin, &mut send).await
    });
    let recv = spawn(async move {
        io::copy(&mut recv, &mut stdout).await
    });

    send.await??;
    recv.await??;

    Ok(())
}

Solution 8

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use tokio::prelude::*;
use tokio::io::AsyncBufReadExt;
use std::rc::Rc;
use std::cell::RefCell;

fn main() -> Result<(), std::io::Error> {
    let mut rt = tokio::runtime::Runtime::new()?;
    let local = tokio::task::LocalSet::new();
    local.block_on(&mut rt, main_inner())
}

async fn main_inner() -> Result<(), std::io::Error> {
    let mut args = std::env::args();
    let _me = args.next(); // ignore command name
    let mut tasks = vec![];
    let count = Rc::new(RefCell::new(0u32));

    for filename in args {
        let count = count.clone();
        tasks.push(tokio::task::spawn_local(async move {
            let file = tokio::fs::File::open(filename).await?;
            let file = io::BufReader::new(file);
            let mut lines = file.lines();
            let mut local_count = 0u32;
            while let Some(_) = lines.next_line().await? {
                local_count += 1;
            }

            *count.borrow_mut() += local_count;
            Ok(()) as Result<(), std::io::Error>
        }));
    }

    for task in tasks {
        task.await??;
    }

    println!("Total lines: {}", count.borrow());
    Ok(())
}
Share on

Serendipity
WRITTEN BY
Serendipity
iOS/Golang/Rust

What's on this Page