Rust の Stream の基本的な取り回しを理解する

こんにちは、エンジニアの渡辺(@mochi_neko_7)です。

今回は Rust における Stream の基本的な取り回しを紹介します。

Anthropic の Claude API の Rust Client を書いていた際に Streaming API の対応をしていたところ、自身が Rust の Stream をちゃんと理解していなかったことに気づき勉強し直していました。

ところが日本語で解説している記事をあまり見かけなかったため、本記事で自分の理解している範囲で基礎的な内容を整理して紹介したいと思います。

もし間違った記述等ありましたらご指摘いただけますと幸いです。

環境

  • Rust: v1.76.0
  • futures_core: v0.3.30
  • async_stream: v0.3.5
  • futures_util: v0.3.30
  • tokio_stream: v0.1.15

Rust の Stream

そもそも Rust の Stream はどんなものか、という基礎の解説は英語ですが下記にあります。

www.qovery.com

こちらを参考にしていただきつつ、ここでは次の二点を押さえておきましょう。

  1. 非同期版 Iterator としての Stream
  2. Poll の扱い

1. 非同期版 Iterator としての Stream

Rust の Iterator は下記のような trait で定義されています。

pub trait Iterator {
    type Item;

    fn next(&mut self) -> Option<Self::Item>;
}

Iterator in std::iter - Rust

Option<Self::Item> の戻り値は値を返せる場合には Some(Item) を、 Iterator の終端に達して値を返せない場合には None を返すものです。

他言語だと boolean と nullable の組み合わせでこれらを表現したりしますが、Rust では Option でシンプルに表現できます。

一方、Stream の trait の定義を見てみると、非同期システムの都合で見かけが変わっている点はあるものの基本的には同じ形式をしていることがわかります。

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context
    ) -> Poll<Option<Self::Item>>;
}

Stream in futures_core::stream - Rust

本題にあまり関係ない size_hint は省略しています。

実際、futures_util の StreamExt を利用すると

use futures_util::StreamExt;

while let Some(item) = stream.next().await {
    // item の処理
}

のように Iterator に .await を付けたような格好で呼び出すことができます。

2. Poll の扱い

Poll は下記で定義されています。

pub enum Poll<T> {
    Ready(T),
    Pending,
}

Poll in std::task - Rust

状態として二種類あります。

  • Poll::Ready(T) : 値 T を返す準備ができている状態
  • Poll::Pending : 値を返す準備ができていない状態

Stream の場合は TOption<Self::Item> を指定するので、状態は次の三種類を取ります。

  • Poll::Ready(Some(T)) : 値 T を返す準備ができている状態
  • Poll::Ready(None) : Stream の終端に達した状態
  • Poll::Pending : 値を返す準備ができていない状態

Stream を読むだけの場合には StreamExtnext().await を呼ぶかと思いますので、Stream trait を自分で実装する場合以外は Poll の細かい取り扱いは気にしなくて済みます。

Stream の周辺 crate

Stream は Rust のコアには最低限の trait しか用意されておらず、さまざまな便利機能は現在では別 crate で提供されています。

1. futures_core

Stream の定義は futures_core crate から提供されています。

docs.rs

docs.rs

他 crate もこの Stream を利用していますし Stream を Re-export してくれている場合も多いので、以下の拡張 crate を利用する際には不要な場合もあります。

2. async_stream

async_stream は標準では提供されていない yield 構文による Stream の生成を、stream! マクロによってサポートしています。

docs.rs

use async_stream::stream;
use futures_core::stream::Stream;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

ちなみに Iterator から Stream を生成したい場合には以下で紹介する crate の futures_util::stream::iter()tokio_stream::iter() などを利用できます。

3. futures_util

futures_util は Stream を含む Rust の非同期 API の拡張機能を提供します。

docs.rs

将来的に安定した機能は futures_core に統合されるものらしいです。

特に重要なのが StreamExt で、非同期 API .await で呼び出せる StreamExt::next()StreamExt::map()StreamExt::filter() などの Iterator と同様の Adapter 系機能などを提供します。

StreamExt in futures_util::stream - Rust

先にも使用例を書きましたが、futures_util::StreamExt の利用イメージは下記になります。

use futures_util::StreamExt;

async fn some_function() {
    let mut stream = futures_util::stream::iter(vec![0, 1, 2]);

    while let Some(value) = stream.next().await {
        println!("Got {}", value);
    }
}

ただし、futures_util::StreamExt::next() は非同期ではありますがシングルスレッドで動作するため、マルチスレッドを利用したい場合には tokio-stream などマルチスレッドに対応している非同期バックエンドによる実装を利用した方が良い場合もありますので、ユースケースに合わせて選択してください。

4. tokio-stream

tokio の非同期バックエンドと連携した運用をする際には、tokio-stream で提供されている拡張機能を利用することもできます。

docs.rs

futures_util::StreamExt の代わりに tokio_stream::StreamExt を使用するイメージです。

use tokio_stream::StreamExt;

async fn some_function() {
    let mut stream = tokio_stream::stream::iter(vec![0, 1, 2]);

    while let Some(value) = stream.next().await {
        println!("Got {}", value);
    }
}

ただし、現行バージョンでは poll_next_unpin など futures_util::StreamExt ではサポートしていても tokio_stream::StreamExt ではサポートされていない機能などもありますので注意が必要です。

使い分け

futures_utiltokio_stream はどちらも StreamExt を提供していますが、非同期バックエンドが異なるためユースケースに応じて使い分けが必要です。

  • futures_util : 標準の非同期 API (futures_core) バックエンド、シングルスレッド
  • tokio_stream : tokio バックエンド、マルチスレッド対応

他の非同期バックエンドを利用する際には対応するものを探してみてください。

Stream の変換

Stream の変換は StreamExt の Adapter で済む場合もありますが、バッファリングをして最適化した場合など自分で Stream の変換処理を書いた方がいいケースもあります。

Stream の取り回しの確認も兼ねて、例として reqwest、つまり HTTP のレスポンスを Byte の Stream として受け取り、行ごとのテキストに分割して Stream として提供することを考えてみましょう。

この場合、Byte 配列をバッファリングしつつ改行コードを見つけたら Chunk を返す、という Stream の実装をします。

use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{Buf, BytesMut};
use futures_core::Stream;
use pin_project::pin_project;

// Stream の処理のエラーの定義 (thiserrorを利用)
#[derive(Debug, thiserror::Error)]
enum LineStreamError {
    #[error("Failed to read reqwest stream: {0:?}")]
    ReqwestError(#[from] reqwest::Error),
    #[error("Failed to decode chunk to UTF-8 string: {0:?}")]
    StringDecodingError(#[from] std::string::FromUtf8Error),
}

// Stream の Item の定義
type LineStreamResult = Result<String, LineStreamError>;

// 自作の行ごとのテキストの Stream の定義
#[pin_project]
struct LineStream<S>
where
    S: Stream<Item = ReqwestStreamItem> + Unpin,
{
    #[pin]
    stream: S, // 元の Stream
    buffer: BytesMut, // bytes の可変長バッファ
}

// reqwest の Response Stream の Item
type ReqwestStreamItem = Result<bytes::Bytes, reqwest::Error>;

// 初期化メソッドの定義
impl<S> LineStream<S>
where
    S: Stream<Item = ReqwestStreamItem> + Unpin,
{
    fn new(stream: S) -> Self {
        LineStream {
            stream,
            buffer: BytesMut::new(),
        }
    }
}

// 目的の Stream trait の実装
impl<S> Stream for LineStream<S>
where
    S: Stream<Item = ReqwestStreamItem> + Unpin,
{
    type Item = LineStreamResult; // 最終的に欲しい Item の型をここで指定する

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<LineStreamResult>> {
        let mut this = selft.project(); // Unpin

        loop {
            // バッファに改行を含む場合
            if let Some(position) = this
                .buffer
                .iter()
                .position(|b| *b == b'\n')
            {
                // position より手前の bytesを取り出す
                let line = this.buffer.split_to(position);
                // position にある改行はスキップする
                this.buffer.advance(1);
                // bytes を UTF-8 String にデコードする
                let line = String::from_utf8(line.to_vec())
                    .map_err(StreamLineError::StringDecodingError)?;
                // デコードした文字列を返す
                return Poll::Ready(Some(Ok(line)));
            }

            // 内部の Stream の読み込み
            match this
                .as_mut()
                .stream
                .poll_next(cx)
            {
                // bytes を受け取った場合
                | Poll::Ready(Some(Ok(chunk))) => {
                    // バッファに bytes を格納する
                    this.buffer.extend(&chunk);
                    // 手前の改行を取り出す処理に入るまで loop を継続する
                },
                // Error が返ってきた場合
                | Poll::Ready(Some(Err(error))) => {
                    return Poll::Ready(Some(Err(
                        StreamLineError::ReqwestError(error),
                    )));
                },
                // Stream の終端に達した場合
                | Poll::Ready(None) => {
                    // バッファが空の場合は何もしない
                    return if this.buffer.is_empty() {
                        Poll::Ready(None)
                    // バッファにデータが残っている場合は処理を試みる
                    } else {
                        let line = this.buffer.split_off(0);
                        let line = String::from_utf8(line.to_vec())
                            .map_err(StreamLineError::StringDecodingError)?;
                        Poll::Ready(Some(Ok(line)))
                    };
                },
                // 待機中の場合はそのまま
                | Poll::Pending => return Poll::Pending,
            }
        }
    }
}

利用している crate は下記になります。

  • futures_core
  • reqwest
  • bytes
  • pin_project
  • thiserror

まず、#[pin_project]#[pin]let mut this = selft.project(); などは元の Stream を Unpin するためのものです。

futures_util::StreamExt には StreamExt::poll_next_unpin という便利メソッドがあるのでそちらを使うと楽ですが、使わない場合には pin_project crate を使って Unpin を実装できます。

StreamExt in futures_util::stream - Rust

今回の変換処理の主題はテキストの bytes を読み込んでバッファに溜めつつ、改行を見つけた場合に行分のテキストを取り出したいものでした。

元の Stream を読み込んでバッファに溜めていく処理は

// 内部の Stream の読み込み
match this
    .as_mut()
    .stream
    .poll_next(cx)
{
    ...
}

のブロックで実装しています。

そして、

// バッファに改行を含む場合
if let Some(position) = this
    .buffer
    .iter()
    .position(|b| *b == b'\n')
{
    ...
}

の処理で改行コード \n を探索し、発見した場合にはそれをテキストに変換して結果を返します。

実際に使用するイメージは下記になります。

use tokio_stream::StreamExt;
use xxx::LineStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = reqwest::get("http://httpbin.org/ip")
        .await?
        .bytes_stream();

    let mut line_stream = LineStream::new(stream);

    if let Some(line) = line_stream.next().await {
        println!("Line: {:?}", line);
    }

    Ok(())
}

ライブラリで Stream を利用する際に気をつけたいこと

アプリケーションとして Stream を扱う場合にはアプリケーション全体で利用する非同期バックエンドに準拠して実装をすれば良いですが、ライブラリとして Stream を扱う場合にはライブラリ利用者がどの非同期バックエンドを使うのか想定することはできません。

ライブラリ内部で特定の非同期バックエンドに依存した処理を他でしていない場合には、使用する crate を適切に選択肢、なるべく依存関係を少なく(つまりバイナリのサイズを小さく)したいでしょう。

その場合、futures_core crate のみで実装し、futuers_utiltokio_stream の具体の拡張を利用しないのも選択肢の一つです。

具体例として自分が開発している Claude API の Rust Client

github.com

では Streaming モードの API もサポートしていますが、API としては

pub async fn create_a_message_stream(
    &self,
    request_body: MessagesRequestBody
) -> MessagesResult<impl futures_core::stream::Stream<Item = ChunkStreamResult>>

Client in clust - Rust

のように futures_core のみ使用し、README や examples では futures_utiltokio_stream の両方を使ったサンプルを提供しています。

とはいえ futures_util に関しては将来的にはコアに含められる可能性もあるため、どこまで利用すべきかはプロジェクト次第かと思います。

まとめ

以上で紹介した内容を端的にまとめると下記になります。

  • Stream は非同期版の Iterator
  • next()map()filter() などの拡張機能は、futures_utiltokio_streamStreamExt などで提供されている
  • ユースケースに応じてコア crate(futures_core)、拡張 crate (async_streamfutures_utiltokio_stream など)を使い分ける
  • Stream の変換処理は Adapter だと難しい(あるいは効率が悪い)場合でも、 Stream trait を自分で実装すれば可能
  • Stream を利用したライブラリを作成する場合には futures_core のみで実装するのも選択肢の一つ

おわりに

自分で実際に Stream を利用したライブラリの開発を通じて理解した内容を基に、Stream の基本的な取り回しを紹介しました。

もっと深掘りをすれば TryStream や cx: &mut Context<'_> って何?といった話もありそうですが、今回はもっともシンプルに Stream を扱う際に必要になる話に限定しました。

Rust の Stream は高度に抽象化されているのもあり、ちゃんと理解した後だとかなり扱いやすいなと思いました。

Rust の入門ドキュメントには Stream の解説がなかった気がしますが、似たような仕組みの Channel より扱いやすいので使えて損はないでしょう。

Claude の API の Rust Client の Streaming API の実装も参考にしていたければと思います。

Rust で何かの crate の API で提供される Stream を利用したい、その Stream を扱いたい形式に変換したい、あるいは Stream を使った API を含むライブラリを開発したい方に参考になれば幸いです。